inital commit
[c11concurrency-benchmarks.git] / mabain / src / test / mbtest1.cpp
1 #include <stdlib.h>
2 #include <unistd.h>
3 #include <time.h>
4 #include <assert.h>
5 #include <atomic>
6 #include <fstream>
7
8 #include "../db.h"
9
10 #include "./test_key.h"
11
12 using namespace mabain;
13
14 static std::atomic<int64_t> key_low;
15 static std::atomic<int64_t> key_high;
16 static int64_t memcap_i = 256*1024*1024;
17 static int64_t memcap_d = 256*1024*1024;
18 static pthread_t wid = 0;
19
20 static std::string mbdir = "/var/tmp/mabain_test";
21 static bool stop_processing = false;
22 static uint32_t run_time = 3600;
23
24 static void* run_mb_test(void *arg);
25
26 static void* Reader(void *arg)
27 {
28     DB db(mbdir.c_str(), CONSTS::ReaderOptions(), memcap_i, memcap_d);
29     if(!db.is_open()) {
30         std::cerr << "failed tp open db\n";
31         abort();
32     }
33
34     int64_t ikey;
35     int rval;
36     TestKey tkey_int(MABAIN_TEST_KEY_TYPE_INT);
37     TestKey tkey_sha1(MABAIN_TEST_KEY_TYPE_SHA_128);
38     TestKey tkey_sha2(MABAIN_TEST_KEY_TYPE_SHA_256);
39     std::string keystr;
40     MBData mbd;
41     int ktype;
42
43     while(!stop_processing) {
44         //usleep(5);
45
46         if(key_high.load(std::memory_order_consume) == key_low.load(std::memory_order_consume))
47             continue;
48         ikey = key_low.load(std::memory_order_consume);
49         ikey += rand() % (key_high.load(std::memory_order_consume) - key_low.load(std::memory_order_consume));
50
51         ktype = rand() % 3;
52         switch(ktype) {
53         case 0:
54             keystr = tkey_int.get_key(ikey);
55             break;
56         case 1:
57             keystr = tkey_sha1.get_key(ikey);
58             break;
59         case 2:
60             keystr = tkey_sha2.get_key(ikey);
61             break;
62         }
63         rval = db.Find(keystr, mbd);
64
65         if(ikey < key_low.load(std::memory_order_consume))
66             continue;
67
68         if(rval == MBError::SUCCESS) {
69             if(keystr != std::string((char *)mbd.buff, mbd.data_len)) {
70                 std::cout << "value not match for key " << ikey << ": " << keystr << "\n";
71                 abort();
72             }
73         } else if(rval != MBError::NOT_EXIST) {
74             std::cout << "unexpected return from Find: " << rval << "\n";
75             abort();
76         }
77     }
78
79     db.Close();
80     return NULL;
81 }
82
83 static void Verify(DB &db)
84 {
85     int64_t key0 = key_low.load(std::memory_order_consume);
86     int64_t key1 = key_high.load(std::memory_order_consume);
87     TestKey tkey_int(MABAIN_TEST_KEY_TYPE_INT);
88     TestKey tkey_sha1(MABAIN_TEST_KEY_TYPE_SHA_128);
89     TestKey tkey_sha2(MABAIN_TEST_KEY_TYPE_SHA_256);
90     MBData mbd;
91     std::string keystr;
92     for(int64_t i = key0; i < key1; i++) {
93         keystr = tkey_int.get_key(i);
94         if(db.Find(keystr, mbd) != MBError::SUCCESS) {
95             std::cout << "failed to find key " << i << ": " << keystr << std::endl;
96             abort();
97         }
98         assert(keystr == std::string((char *)mbd.buff, mbd.data_len));
99
100         keystr = tkey_sha1.get_key(i);
101         if(db.Find(keystr, mbd) != MBError::SUCCESS) {
102             std::cout << "failed to find key " << i << ": " << keystr << std::endl;
103             abort();
104         }
105         assert(keystr == std::string((char *)mbd.buff, mbd.data_len));
106
107         keystr = tkey_sha2.get_key(i);
108         if(db.Find(keystr, mbd) != MBError::SUCCESS) {
109             std::cout << "failed to find key " << i << ": " << keystr << std::endl;
110             abort();
111         }
112         assert(keystr == std::string((char *)mbd.buff, mbd.data_len));
113     }
114 }
115
116 static void* AddThread(void *arg)
117 {
118     DB db(mbdir.c_str(), CONSTS::ReaderOptions());
119     int num = rand() % 1500;
120
121     int64_t key;
122     TestKey tkey_int(MABAIN_TEST_KEY_TYPE_INT);
123     TestKey tkey_sha1(MABAIN_TEST_KEY_TYPE_SHA_128);
124     TestKey tkey_sha2(MABAIN_TEST_KEY_TYPE_SHA_256);
125     std::string keystr;
126
127     for(int i = 0; i < num; i++) {
128         key = key_high.fetch_add(1, std::memory_order_release);
129         keystr = tkey_int.get_key(key);
130         assert(db.Add(keystr, keystr) == MBError::SUCCESS);
131
132         keystr = tkey_sha1.get_key(key);
133         assert(db.Add(keystr, keystr) == MBError::SUCCESS);
134
135         keystr = tkey_sha2.get_key(key);
136         assert(db.Add(keystr, keystr) == MBError::SUCCESS);
137     }
138     db.Close();
139     return NULL;
140 }
141
142 static void Populate(int nt)
143 {
144     pthread_t tid[256];
145     assert(nt < 256);
146     for(int i = 0; i < nt; i++) {
147         if(pthread_create(&tid[i], NULL, AddThread, NULL) != 0) {
148             std::cout << "failed to create MultiThreadAdd thread\n";
149             abort();
150         }
151     }
152
153     for(int i = 0; i < nt; i++) {
154         if(pthread_join(tid[i], NULL) != 0) {
155             std::cout << "failed to join MultiThreadAdd thread\n";
156             abort();
157         }
158     }
159 }
160
161 static void load_key_ids()
162 {
163     int64_t klow, khigh;
164     std::ifstream ifs;
165     std::string path = mbdir + "/key_id";
166     ifs.open (path.c_str(), std::ofstream::in);
167     if(ifs.is_open()) {
168         ifs >> klow;
169         ifs >> khigh;
170         ifs.close();
171     } else {
172         klow = 0;
173         khigh = 0;
174     }
175     key_low = klow;
176     key_high = khigh;
177     std::cout << "Loaded " << key_low << " " << key_high << "\n";
178 }
179
180 static void store_key_ids()
181 {
182     std::string path = mbdir + "/key_id";
183     FILE *fp;
184     fp = fopen(path.c_str(), "w");
185     if(!fp) return;
186     fprintf(fp, "%d\n%d", (int)key_low.load(std::memory_order_consume),
187                           (int)key_high.load(std::memory_order_consume));
188     fclose(fp);
189 }
190
191 static void* DeleteThread(void *arg)
192 {
193     DB db(mbdir.c_str(), CONSTS::ReaderOptions());
194     int num = rand() % 5;
195
196     int64_t key;
197     TestKey tkey_int(MABAIN_TEST_KEY_TYPE_INT);
198     TestKey tkey_sha1(MABAIN_TEST_KEY_TYPE_SHA_128);
199     TestKey tkey_sha2(MABAIN_TEST_KEY_TYPE_SHA_256);
200     std::string keystr;
201
202     for(int i = 0; i < num; i++) {
203         key = key_low.fetch_add(1, std::memory_order_release);
204
205         keystr = tkey_int.get_key(key);
206         assert(db.Remove(keystr) == MBError::SUCCESS);
207
208         keystr = tkey_sha1.get_key(key);
209         assert(db.Remove(keystr) == MBError::SUCCESS);
210
211         keystr = tkey_sha2.get_key(key);
212         assert(db.Remove(keystr) == MBError::SUCCESS);
213     }
214
215     db.Close();
216     return NULL;
217 }
218
219 static void Prune(int nt)
220 {
221     pthread_t tid[256];
222     assert(nt < 256);
223     for(int i = 0; i < nt; i++) {
224         if(pthread_create(&tid[i], NULL, DeleteThread, NULL) != 0) {
225             std::cout << "failed to create MultiThreadAdd thread\n";
226             abort();
227         }
228     }
229
230     for(int i = 0; i < nt; i++) {
231         if(pthread_join(tid[i], NULL) != 0) {
232             std::cout << "failed to join MultiThreadAdd thread\n";
233             abort();
234         }
235     }
236 }
237
238 void stop_mb_test()
239 {
240     if(wid != 0) {
241         if(pthread_join(wid, NULL) != 0) {
242             std::cout << "cannot join mbtest thread\n";
243               }
244     }
245 }
246
247 static void CheckCount()
248 {
249     DB *db = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), memcap_i, memcap_d);
250     if(db == NULL) return;
251
252     int64_t count = 0;
253     for(DB::iterator iter = db->begin(false, false); iter != db->end(); ++iter) {
254         count++;
255     }
256     std::cout << "Count using iterator: " << count << "\tcount from API: "
257               << db->Count() << "\n";
258     delete db;
259 }
260
261 void start_mb_test()
262 {
263     if(pthread_create(&wid, NULL, run_mb_test, NULL) != 0) {
264         std::cout << "failed to create test thread" << std::endl;
265     }
266 }
267
268 static void* run_mb_test(void *arg)
269 {
270     int64_t run_stop_time = time(NULL) + run_time;
271     int nreaders = 4;
272     int nupdates = 4;
273     srand(time(NULL));
274
275     load_key_ids();
276
277     MBConfig mbconf;
278     int options;
279     memset(&mbconf, 0, sizeof(mbconf));
280     mbconf.mbdir = mbdir.c_str();
281     options = CONSTS::WriterOptions() | CONSTS::ASYNC_WRITER_MODE;
282     mbconf.options = options;
283     mbconf.memcap_index = 128ULL*1024*1024;
284     mbconf.memcap_data = 128ULL*1024*1024;
285     mbconf.block_size_index = 64U*1024*1024;
286     mbconf.block_size_data = 64U*1024*1024;
287     mbconf.max_num_data_block = 3;
288     mbconf.max_num_index_block = 3;
289     mbconf.num_entry_per_bucket = 500;
290     DB *db = new DB(mbconf);
291     if(!db->is_open()) {
292         std::cerr << "failed to open writer db" << std::endl;
293         delete db;
294         abort();
295     }
296
297     pthread_t tid;
298     for(int i = 0; i < nreaders; i++) {
299         if(pthread_create(&tid, NULL, Reader, NULL) != 0) {
300             std::cout << "failed to create reader thread" << std::endl;
301             abort();
302         }
303     }
304
305     int rcn = 0;
306     int64_t loop_cnt = 0;
307     while(!stop_processing) {
308         Populate(nupdates);
309         Prune(nupdates);
310
311         std::cout << "LOOP " << loop_cnt << ": " << db->Count() << "\n";
312         if(loop_cnt % 5 == 0) {
313             std::cout << "RUN RC\n";
314             int rval = db->CollectResource(24LL*1024*1024, 24LL*1024*1024, 128LL*1024*1024, 0xFFFFFFFFFFFF);
315             if(rval == MBError::SUCCESS) {
316                 rcn++;
317                 if(rcn % 57 == 0 && !(options & CONSTS::ASYNC_WRITER_MODE)) {
318                     // Note if in async mode, the DB handle cannot be used for lookup.
319                     std::cout << "Verifying after rc" << std::endl;
320                     Verify(*db);
321                 } else if(rcn % 20 == 0) {
322                     db->Close();
323                     delete db;
324                     mbconf.options = options;
325                     db = new DB(mbconf);
326                     if(!db->is_open()) {
327                         delete db;
328                         abort();
329                     }
330                     CheckCount();
331                 } else if(rcn % 20000523 == 0) {
332                     if(system("reboot") != 0) {
333                     }
334                 }
335             }
336         }
337
338         sleep(1);
339         loop_cnt++;
340         if(time(NULL) >= run_stop_time) {
341             stop_processing = true;
342         }
343     }
344
345     db->Close();
346     delete db;
347     store_key_ids();
348
349
350     return NULL;
351 }
352
353 static void SetTestStatus(bool success)
354 {
355     std::string cmd;
356     if(success) {
357         cmd = std::string("touch ") + mbdir + "/_success";
358     } else {
359         cmd = std::string("rm ") + mbdir + "/_success >" + mbdir + "/out 2>" + mbdir + "/err";
360     }
361     if(system(cmd.c_str()) != 0) {
362     }
363 }
364
365 int main(int argc, char *argv[])
366 {
367     if(argc > 1) {
368         mbdir = std::string(argv[1]);
369         std::cout << "Test db directory is " << mbdir << "\n";
370     }
371     if(argc > 2) {
372         run_time = atoi(argv[2]);
373         std::cout << "running " << argv[0] << " for " << run_time << " seconds...\n";
374     }
375
376     DB::SetLogFile(mbdir + "/mabain.log");
377     SetTestStatus(false);
378     run_mb_test(NULL);
379     SetTestStatus(true);
380     DB::CloseLogFile();
381     return 0;
382 }