8 #include "txn_proto2_impl.h"
10 #include "typed_txn_btree.h"
15 #include "record/encoder.h"
16 #include "record/inline_str.h"
18 #include "scopedperf.hh"
20 #if defined(NDB_MASSTREE)
21 #define HAVE_REVERSE_RANGE_SCANS
26 uint64_t initial_timestamp;
28 struct test_callback_ctr {
29 test_callback_ctr(size_t *ctr) : ctr(ctr) {}
31 operator()(const concurrent_btree::string_type &k, const string &v) const
39 // all combinations of txn flags to test
40 static uint64_t TxnFlags[] = { 0, transaction_base::TXN_FLAG_LOW_LEVEL_SCAN };
44 always_assert_cond_in_txn(
45 const P &t, bool cond,
46 const char *condstr, const char *func,
47 const char *filename, int lineno)
51 static mutex g_report_lock;
52 std::lock_guard<mutex> guard(g_report_lock);
53 cerr << func << " (" << filename << ":" << lineno << ") - Condition `"
54 << condstr << "' failed!" << endl;
56 sleep(1); // XXX(stephentu): give time for debug dump to reach console
57 // why doesn't flushing solve this?
61 #define ALWAYS_ASSERT_COND_IN_TXN(t, cond) \
62 always_assert_cond_in_txn(t, cond, #cond, __PRETTY_FUNCTION__, __FILE__, __LINE__)
66 AssertSuccessfulCommit(P &t)
68 ALWAYS_ASSERT_COND_IN_TXN(t, t.commit(false));
73 AssertFailedCommit(P &t)
75 ALWAYS_ASSERT_COND_IN_TXN(t, !t.commit(false));
80 AssertByteEquality(const T &t, const uint8_t * v, size_t sz)
82 ALWAYS_ASSERT(sizeof(T) == sz);
83 bool success = memcmp(&t, v, sz) == 0;
85 cerr << "expecting: " << hexify(string((const char *) &t, sizeof(T))) << endl;
86 cerr << "got : " << hexify(string((const char *) v, sz)) << endl;
93 AssertByteEquality(const T &t, const string &v)
95 AssertByteEquality(t, (const uint8_t *) v.data(), v.size());
100 rec(uint64_t v) : v(v) {}
104 static inline ostream &
105 operator<<(ostream &o, const rec &r)
107 o << "[rec=" << r.v << "]";
111 template <template <typename> class TxnType, typename Traits>
115 for (size_t txn_flags_idx = 0;
116 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
118 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
120 VERBOSE(cerr << "Testing with flags=0x" << hexify(txn_flags) << endl);
122 txn_btree<TxnType> btr(sizeof(rec));
123 typename Traits::StringAllocator arena;
126 TxnType<Traits> t(txn_flags, arena);
128 ALWAYS_ASSERT_COND_IN_TXN(t, !btr.search(t, u64_varkey(0), v));
129 btr.insert_object(t, u64_varkey(0), rec(0));
130 ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(0), v));
131 ALWAYS_ASSERT_COND_IN_TXN(t, !v.empty());
132 AssertByteEquality(rec(0), v);
133 AssertSuccessfulCommit(t);
134 VERBOSE(cerr << "------" << endl);
139 t0(txn_flags, arena), t1(txn_flags, arena);
142 ALWAYS_ASSERT_COND_IN_TXN(t0, btr.search(t0, u64_varkey(0), v0));
143 ALWAYS_ASSERT_COND_IN_TXN(t0, !v0.empty());
144 AssertByteEquality(rec(0), v0);
146 btr.insert_object(t0, u64_varkey(0), rec(1));
147 ALWAYS_ASSERT_COND_IN_TXN(t1, btr.search(t1, u64_varkey(0), v1));
148 ALWAYS_ASSERT_COND_IN_TXN(t1, !v1.empty());
150 if (t1.is_snapshot())
151 // we don't read-uncommitted for consistent snapshot
152 AssertByteEquality(rec(0), v1);
154 AssertSuccessfulCommit(t0);
157 // if we have a consistent snapshot, then this txn should not abort
158 ALWAYS_ASSERT_COND_IN_TXN(t1, t1.is_snapshot());
159 } catch (transaction_abort_exception &e) {
160 // if we dont have a snapshot, then we expect an abort
161 ALWAYS_ASSERT_COND_IN_TXN(t1, !t1.is_snapshot());
163 VERBOSE(cerr << "------" << endl);
169 t0(txn_flags, arena), t1(txn_flags, arena);
172 ALWAYS_ASSERT_COND_IN_TXN(t0, btr.search(t0, u64_varkey(0), v0));
173 ALWAYS_ASSERT_COND_IN_TXN(t0, !v0.empty());
174 AssertByteEquality(rec(1), v0);
176 btr.insert_object(t0, u64_varkey(0), rec(2));
177 ALWAYS_ASSERT_COND_IN_TXN(t1, btr.search(t1, u64_varkey(0), v1));
178 ALWAYS_ASSERT_COND_IN_TXN(t1, !v1.empty());
179 // our API allows v1 to be a dirty read though (t1 will abort)
181 btr.insert_object(t1, u64_varkey(0), rec(3));
183 AssertSuccessfulCommit(t0);
184 AssertFailedCommit(t1);
185 VERBOSE(cerr << "------" << endl);
191 t0(txn_flags, arena), t1(txn_flags, arena);
193 const u64_varkey vend(5);
195 test_callback_ctr cb(&ctr);
196 btr.search_range(t0, u64_varkey(1), &vend, cb);
197 ALWAYS_ASSERT_COND_IN_TXN(t0, ctr == 0);
199 btr.insert_object(t1, u64_varkey(2), rec(4));
200 AssertSuccessfulCommit(t1);
202 btr.insert_object(t0, u64_varkey(0), rec(0));
203 AssertFailedCommit(t0);
204 VERBOSE(cerr << "------" << endl);
208 TxnType<Traits> t(txn_flags, arena);
209 const u64_varkey vend(20);
211 test_callback_ctr cb(&ctr);
212 btr.search_range(t, u64_varkey(10), &vend, cb);
213 ALWAYS_ASSERT_COND_IN_TXN(t, ctr == 0);
214 btr.insert_object(t, u64_varkey(15), rec(5));
215 AssertSuccessfulCommit(t);
216 VERBOSE(cerr << "------" << endl);
219 txn_epoch_sync<TxnType>::sync();
220 txn_epoch_sync<TxnType>::finish();
224 struct bufrec { char buf[256]; };
226 template <template <typename> class TxnType, typename Traits>
230 for (size_t txn_flags_idx = 0;
231 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
233 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
234 txn_btree<TxnType> btr(sizeof(bufrec));
235 typename Traits::StringAllocator arena;
237 NDB_MEMSET(r.buf, 'a', ARRAY_NELEMS(r.buf));
238 for (size_t i = 0; i < 100; i++) {
239 TxnType<Traits> t(txn_flags, arena);
240 btr.insert_object(t, u64_varkey(i), r);
241 AssertSuccessfulCommit(t);
243 for (size_t i = 0; i < 100; i++) {
244 TxnType<Traits> t(txn_flags, arena);
246 ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(i), v));
247 AssertByteEquality(r, v);
248 AssertSuccessfulCommit(t);
250 txn_epoch_sync<TxnType>::sync();
251 txn_epoch_sync<TxnType>::finish();
255 template <template <typename> class TxnType, typename Traits>
257 test_absent_key_race()
259 for (size_t txn_flags_idx = 0;
260 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
262 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
263 txn_btree<TxnType> btr;
264 typename Traits::StringAllocator arena;
268 t0(txn_flags, arena), t1(txn_flags, arena);
270 ALWAYS_ASSERT_COND_IN_TXN(t0, !btr.search(t0, u64_varkey(0), v0));
271 ALWAYS_ASSERT_COND_IN_TXN(t1, !btr.search(t1, u64_varkey(0), v1));
274 btr.insert_object(t0, u64_varkey(0), rec(1));
277 btr.insert_object(t1, u64_varkey(0), rec(1));
279 // t0 should win, t1 should abort
280 AssertSuccessfulCommit(t0);
281 AssertFailedCommit(t1);
284 txn_epoch_sync<TxnType>::sync();
285 txn_epoch_sync<TxnType>::finish();
289 template <template <typename> class TxnType, typename Traits>
291 test_inc_value_size()
293 for (size_t txn_flags_idx = 0;
294 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
296 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
297 txn_btree<TxnType> btr;
298 typename Traits::StringAllocator arena;
299 const size_t upper = numeric_limits<dbtuple::node_size_type>::max();
300 for (size_t i = 1; i < upper; i++) {
301 const string v(i, 'a');
302 TxnType<Traits> t(txn_flags, arena);
303 btr.insert(t, u64_varkey(0), (const uint8_t *) v.data(), v.size());
304 AssertSuccessfulCommit(t);
306 txn_epoch_sync<TxnType>::sync();
307 txn_epoch_sync<TxnType>::finish();
311 template <template <typename> class TxnType, typename Traits>
315 for (size_t txn_flags_idx = 0;
316 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
318 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
319 txn_btree<TxnType> btr0, btr1;
320 typename Traits::StringAllocator arena;
321 for (size_t i = 0; i < 100; i++) {
322 TxnType<Traits> t(txn_flags, arena);
323 btr0.insert_object(t, u64_varkey(i), rec(123));
324 btr1.insert_object(t, u64_varkey(i), rec(123));
325 AssertSuccessfulCommit(t);
328 for (size_t i = 0; i < 100; i++) {
329 TxnType<Traits> t(txn_flags, arena);
331 bool ret0 = btr0.search(t, u64_varkey(i), v0);
332 bool ret1 = btr1.search(t, u64_varkey(i), v1);
333 AssertSuccessfulCommit(t);
334 ALWAYS_ASSERT_COND_IN_TXN(t, ret0);
335 ALWAYS_ASSERT_COND_IN_TXN(t, !v0.empty());
336 ALWAYS_ASSERT_COND_IN_TXN(t, ret1);
337 ALWAYS_ASSERT_COND_IN_TXN(t, !v1.empty());
338 AssertByteEquality(rec(123), v0);
339 AssertByteEquality(rec(123), v1);
342 txn_epoch_sync<TxnType>::sync();
343 txn_epoch_sync<TxnType>::finish();
347 template <template <typename> class TxnType, typename Traits>
349 test_read_only_snapshot()
351 for (size_t txn_flags_idx = 0;
352 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
354 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
355 txn_btree<TxnType> btr;
356 typename Traits::StringAllocator arena;
359 TxnType<Traits> t(txn_flags, arena);
360 btr.insert_object(t, u64_varkey(0), rec(0));
361 AssertSuccessfulCommit(t);
364 // XXX(stephentu): HACK! we need to wait for the GC to
365 // compute a new consistent snapshot version that includes this
367 txn_epoch_sync<TxnType>::sync();
371 t0(txn_flags, arena),
372 t1(txn_flags | transaction_base::TXN_FLAG_READ_ONLY, arena);
374 ALWAYS_ASSERT_COND_IN_TXN(t0, btr.search(t0, u64_varkey(0), v0));
375 ALWAYS_ASSERT_COND_IN_TXN(t0, !v0.empty());
376 AssertByteEquality(rec(0), v0);
378 btr.insert_object(t0, u64_varkey(0), rec(1));
380 ALWAYS_ASSERT_COND_IN_TXN(t1, btr.search(t1, u64_varkey(0), v1));
381 ALWAYS_ASSERT_COND_IN_TXN(t1, !v1.empty());
382 AssertByteEquality(rec(0), v1);
384 AssertSuccessfulCommit(t0);
385 AssertSuccessfulCommit(t1);
388 txn_epoch_sync<TxnType>::sync();
389 txn_epoch_sync<TxnType>::finish();
393 namespace test_long_keys_ns {
396 make_long_key(int32_t a, int32_t b, int32_t c, int32_t d) {
397 char buf[4 * sizeof(int32_t)];
398 int32_t *p = (int32_t *) &buf[0];
403 return string(&buf[0], ARRAY_NELEMS(buf));
406 template <template <typename> class Protocol>
407 class counting_scan_callback : public txn_btree<Protocol>::search_range_callback {
409 counting_scan_callback(uint64_t expect) : ctr(0), expect(expect) {}
412 invoke(const typename txn_btree<Protocol>::keystring_type &k, const string &v)
414 AssertByteEquality(rec(expect), v);
424 namespace test_insert_same_key_ns {
425 struct rec0 { rec0(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[8]; };
426 struct rec1 { rec1(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[128]; };
427 struct rec2 { rec2(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[1024]; };
428 struct rec3 { rec3(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[2048]; };
429 struct rec4 { rec4(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[4096]; };
432 template <template <typename> class TxnType, typename Traits>
436 using namespace test_long_keys_ns;
438 for (size_t txn_flags_idx = 0;
439 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
441 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
442 txn_btree<TxnType> btr;
443 typename Traits::StringAllocator arena;
446 TxnType<Traits> t(txn_flags, arena);
447 for (size_t a = 0; a < N; a++)
448 for (size_t b = 0; b < N; b++)
449 for (size_t c = 0; c < N; c++)
450 for (size_t d = 0; d < N; d++) {
451 const string k = make_long_key(a, b, c, d);
452 btr.insert_object(t, varkey(k), rec(1));
454 AssertSuccessfulCommit(t);
458 TxnType<Traits> t(txn_flags, arena);
459 const string lowkey_s = make_long_key(1, 2, 3, 0);
460 const string highkey_s = make_long_key(1, 2, 3, N);
461 const varkey highkey(highkey_s);
462 counting_scan_callback<TxnType> c(1);
463 btr.search_range_call(t, varkey(lowkey_s), &highkey, c);
464 AssertSuccessfulCommit(t);
466 cerr << "c.ctr: " << c.ctr << ", N: " << N << endl;
467 ALWAYS_ASSERT_COND_IN_TXN(t, c.ctr == N);
470 #ifdef HAVE_REVERSE_RANGE_SCANS
472 TxnType<Traits> t(txn_flags, arena);
473 const string lowkey_s = make_long_key(4, 5, 3, 0);
474 const string highkey_s = make_long_key(4, 5, 3, N);
475 const varkey lowkey(lowkey_s);
476 counting_scan_callback<TxnType> c(1);
477 btr.rsearch_range_call(t, varkey(highkey_s), &lowkey, c);
478 AssertSuccessfulCommit(t);
480 cerr << "c.ctr: " << c.ctr << ", N: " << N << endl;
481 ALWAYS_ASSERT_COND_IN_TXN(t, c.ctr == (N-1));
485 txn_epoch_sync<TxnType>::sync();
486 txn_epoch_sync<TxnType>::finish();
489 cerr << "test_long_keys passed" << endl;
492 template <template <typename> class TxnType, typename Traits>
496 using namespace test_long_keys_ns;
497 for (size_t txn_flags_idx = 0;
498 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
500 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
502 const uint8_t lowkey_cstr[] = {
503 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x45, 0x49, 0x4E, 0x47,
504 0x41, 0x54, 0x49, 0x4F, 0x4E, 0x45, 0x49, 0x4E, 0x47, 0x00, 0x00, 0x00,
505 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
506 0x00, 0x00, 0x00, 0x00,
508 const string lowkey_s((const char *) &lowkey_cstr[0], ARRAY_NELEMS(lowkey_cstr));
509 const uint8_t highkey_cstr[] = {
510 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x45, 0x49, 0x4E, 0x47,
511 0x41, 0x54, 0x49, 0x4F, 0x4E, 0x45, 0x49, 0x4E, 0x47, 0x00, 0x00, 0x00,
512 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
513 0xFF, 0xFF, 0xFF, 0xFF
515 const string highkey_s((const char *) &highkey_cstr[0], ARRAY_NELEMS(highkey_cstr));
517 txn_btree<TxnType> btr;
518 typename Traits::StringAllocator arena;
520 TxnType<Traits> t(txn_flags, arena);
521 btr.insert_object(t, varkey(lowkey_s), rec(12345));
522 AssertSuccessfulCommit(t);
526 TxnType<Traits> t(txn_flags, arena);
528 ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, varkey(lowkey_s), v));
529 AssertByteEquality(rec(12345), v);
530 AssertSuccessfulCommit(t);
534 TxnType<Traits> t(txn_flags, arena);
535 counting_scan_callback<TxnType> c(12345);
536 const varkey highkey(highkey_s);
537 btr.search_range_call(t, varkey(lowkey_s), &highkey, c);
538 AssertSuccessfulCommit(t);
539 ALWAYS_ASSERT_COND_IN_TXN(t, c.ctr == 1);
542 txn_epoch_sync<TxnType>::sync();
543 txn_epoch_sync<TxnType>::finish();
547 template <template <typename> class TxnType, typename Traits>
549 test_insert_same_key()
551 using namespace test_insert_same_key_ns;
552 for (size_t txn_flags_idx = 0;
553 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
555 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
557 txn_btree<TxnType> btr;
558 typename Traits::StringAllocator arena;
561 TxnType<Traits> t(txn_flags, arena);
562 btr.insert_object(t, u64_varkey(0), rec0('a'));
563 btr.insert_object(t, u64_varkey(0), rec1('b'));
564 btr.insert_object(t, u64_varkey(0), rec2('c'));
565 AssertSuccessfulCommit(t);
569 TxnType<Traits> t(txn_flags, arena);
570 btr.insert_object(t, u64_varkey(0), rec3('d'));
571 btr.insert_object(t, u64_varkey(0), rec4('e'));
572 AssertSuccessfulCommit(t);
575 txn_epoch_sync<TxnType>::sync();
576 txn_epoch_sync<TxnType>::finish();
580 #define TESTREC_KEY_FIELDS(x, y) \
583 #define TESTREC_VALUE_FIELDS(x, y) \
586 y(inline_str_fixed<10>,v2)
587 DO_STRUCT(testrec, TESTREC_KEY_FIELDS, TESTREC_VALUE_FIELDS)
589 namespace test_typed_btree_ns {
591 static const pair<testrec::key, testrec::value> scan_values[] = {
592 {testrec::key(10, 1), testrec::value(123456 + 1, 10, "A")},
593 {testrec::key(10, 2), testrec::value(123456 + 2, 10, "B")},
594 {testrec::key(10, 3), testrec::value(123456 + 3, 10, "C")},
595 {testrec::key(10, 4), testrec::value(123456 + 4, 10, "D")},
596 {testrec::key(10, 5), testrec::value(123456 + 5, 10, "E")},
599 template <template <typename> class Protocol>
600 class scan_callback : public typed_txn_btree<Protocol, schema<testrec>>::search_range_callback {
602 constexpr scan_callback() : n(0) {}
605 invoke(const testrec::key &key,
606 const testrec::value &value)
608 ALWAYS_ASSERT(n < ARRAY_NELEMS(scan_values));
609 ALWAYS_ASSERT(scan_values[n].first == key);
610 ALWAYS_ASSERT(scan_values[n].second.v2 == value.v2);
621 template <template <typename> class TxnType, typename Traits>
625 using namespace test_typed_btree_ns;
627 typedef typed_txn_btree<TxnType, schema<testrec>> ttxn_btree_type;
629 typename Traits::StringAllocator arena;
630 typedef TxnType<Traits> txn_type;
632 const testrec::key k0(1, 1);
633 const testrec::value v0(2, 3, "hello");
636 txn_type t(0, arena);
638 ALWAYS_ASSERT_COND_IN_TXN(t, !btr.search(t, k0, v));
639 btr.insert(t, k0, v0);
640 AssertSuccessfulCommit(t);
644 txn_type t(0, arena);
646 ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, k0, v));
647 ALWAYS_ASSERT_COND_IN_TXN(t, v0 == v);
648 AssertSuccessfulCommit(t);
652 txn_type t(0, arena);
654 const bool ret = btr.search(t, k0, v, FIELDS(1, 2));
655 ALWAYS_ASSERT_COND_IN_TXN(t, ret);
656 ALWAYS_ASSERT_COND_IN_TXN(t, v.v1 == v0.v1);
657 ALWAYS_ASSERT_COND_IN_TXN(t, v.v2 == v0.v2);
658 AssertSuccessfulCommit(t);
662 txn_type t(0, arena);
664 const bool ret = btr.search(t, k0, v, FIELDS(0, 2));
665 ALWAYS_ASSERT_COND_IN_TXN(t, ret);
666 ALWAYS_ASSERT_COND_IN_TXN(t, v.v0 == v0.v0);
667 ALWAYS_ASSERT_COND_IN_TXN(t, v.v2 == v0.v2);
668 AssertSuccessfulCommit(t);
672 txn_type t(0, arena);
673 for (size_t i = 0; i < ARRAY_NELEMS(scan_values); i++)
674 btr.insert(t, scan_values[i].first, scan_values[i].second);
675 AssertSuccessfulCommit(t);
679 txn_type t(0, arena);
680 const testrec::key begin(10, 0);
681 scan_callback<TxnType> cb;
682 btr.search_range_call(t, begin, nullptr, cb, false, FIELDS(2));
683 AssertSuccessfulCommit(t);
686 txn_epoch_sync<TxnType>::sync();
687 txn_epoch_sync<TxnType>::finish();
689 cerr << "test_typed_btree() passed" << endl;
692 template <template <typename> class Protocol>
693 class txn_btree_worker : public ndb_thread {
695 txn_btree_worker(txn_btree<Protocol> &btr, uint64_t txn_flags)
696 : btr(&btr), txn_flags(txn_flags) {}
697 inline uint64_t get_txn_flags() const { return txn_flags; }
699 txn_btree<Protocol> *const btr;
700 const uint64_t txn_flags;
703 namespace mp_stress_test_allocator_ns {
705 static const size_t nworkers = 28;
706 static const size_t nkeys = nworkers * 4;
708 static atomic<bool> running(true);
710 template <template <typename> class TxnType, typename Traits>
711 class worker : public txn_btree_worker<TxnType> {
713 worker(unsigned id, txn_btree<TxnType> &btr, uint64_t txn_flags)
714 : txn_btree_worker<TxnType>(btr, txn_flags),
715 id(id), commits(0), aborts(0) {}
719 rcu::s_instance.pin_current_thread(id);
720 fast_random r(reinterpret_cast<unsigned long>(this));
722 while (running.load()) {
723 typename Traits::StringAllocator arena;
724 TxnType<Traits> t(this->txn_flags, arena);
726 // RMW on a small space of keys
727 const u64_varkey k(r.next() % nkeys);
728 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, k, v));
729 ((rec *) v.data())->v++;
730 this->btr->put(t, k, v);
733 } catch (transaction_abort_exception &e) {
738 unsigned get_id() const { return id; }
739 unsigned get_commits() const { return commits; }
740 unsigned get_aborts() const { return aborts; }
749 template <template <typename> class TxnType, typename Traits>
751 mp_stress_test_allocator()
753 using namespace mp_stress_test_allocator_ns;
754 txn_btree<TxnType> btr;
756 rcu::s_instance.pin_current_thread(0);
757 typename Traits::StringAllocator arena;
758 TxnType<Traits> t(0, arena);
759 for (size_t i = 0; i < nkeys; i++)
760 btr.insert_object(t, u64_varkey(i), rec(0));
761 AssertSuccessfulCommit(t);
763 vector< shared_ptr< worker<TxnType, Traits> > > v;
764 for (size_t i = 0; i < nworkers; i++)
765 v.emplace_back(new worker<TxnType, Traits>(i, btr, 0));
769 running.store(false);
772 cerr << "worker " << p->get_id()
773 << " commits " << p->get_commits()
774 << " aborts " << p->get_aborts()
777 cerr << "mp_stress_test_allocator passed" << endl;
780 namespace mp_stress_test_insert_removes_ns {
782 static const size_t S = numeric_limits<dbtuple::node_size_type>::max();
785 static const size_t nworkers = 4;
786 static atomic<bool> running(true);
787 template <template <typename> class TxnType, typename Traits>
788 class worker : public txn_btree_worker<TxnType> {
790 worker(txn_btree<TxnType> &btr, uint64_t txn_flags)
791 : txn_btree_worker<TxnType>(btr, txn_flags) {}
795 fast_random r(reinterpret_cast<unsigned long>(this));
796 while (running.load()) {
797 typename Traits::StringAllocator arena;
798 TxnType<Traits> t(this->txn_flags, arena);
800 switch (r.next() % 3) {
802 this->btr->insert_object(t, u64_varkey(0), rec(1));
805 this->btr->insert_object(t, u64_varkey(0), big_rec());
808 this->btr->remove(t, u64_varkey(0));
812 } catch (transaction_abort_exception &e) {
819 template <template <typename> class TxnType, typename Traits>
821 mp_stress_test_insert_removes()
823 using namespace mp_stress_test_insert_removes_ns;
824 txn_btree<TxnType> btr;
825 vector< shared_ptr< worker<TxnType, Traits> > > v;
826 for (size_t i = 0; i < nworkers; i++)
827 v.emplace_back(new worker<TxnType, Traits>(btr, 0));
830 sleep(5); // let many epochs pass
831 running.store(false);
837 namespace mp_test1_ns {
838 // read-modify-write test (counters)
840 const size_t niters = 1000;
842 template <template <typename> class TxnType, typename Traits>
843 class worker : public txn_btree_worker<TxnType> {
845 worker(txn_btree<TxnType> &btr, uint64_t txn_flags)
846 : txn_btree_worker<TxnType>(btr, txn_flags) {}
850 for (size_t i = 0; i < niters; i++) {
852 typename Traits::StringAllocator arena;
853 TxnType<Traits> t(this->txn_flags, arena);
857 if (!this->btr->search(t, u64_varkey(0), v)) {
860 r = *((const rec *) v.data());
863 this->btr->insert_object(t, u64_varkey(0), r);
865 } catch (transaction_abort_exception &e) {
873 template <template <typename> class TxnType, typename Traits>
877 using namespace mp_test1_ns;
879 for (size_t txn_flags_idx = 0;
880 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
882 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
883 txn_btree<TxnType> btr;
884 typename Traits::StringAllocator arena;
886 worker<TxnType, Traits> w0(btr, txn_flags);
887 worker<TxnType, Traits> w1(btr, txn_flags);
888 worker<TxnType, Traits> w2(btr, txn_flags);
889 worker<TxnType, Traits> w3(btr, txn_flags);
891 w0.start(); w1.start(); w2.start(); w3.start();
892 w0.join(); w1.join(); w2.join(); w3.join();
895 TxnType<Traits> t(txn_flags, arena);
897 ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(0), v));
898 ALWAYS_ASSERT_COND_IN_TXN(t, !v.empty());
899 const uint64_t rv = ((const rec *) v.data())->v;
900 if (rv != (niters * 4))
901 cerr << "v: " << rv << ", expected: " << (niters * 4) << endl;
902 ALWAYS_ASSERT_COND_IN_TXN(t, rv == (niters * 4));
903 AssertSuccessfulCommit(t);
906 txn_epoch_sync<TxnType>::sync();
907 txn_epoch_sync<TxnType>::finish();
911 namespace mp_test2_ns {
913 static const uint64_t ctr_key = 0;
914 static const uint64_t range_begin = 100;
915 static const uint64_t range_end = 150;
919 uint32_t values_[range_end - range_begin];
923 NDB_MEMSET(&values_[0], 0, sizeof(values_));
930 INVARIANT(count_ <= ARRAY_NELEMS(values_));
933 uint32_t last = values_[0];
934 INVARIANT(last >= range_begin && last < range_end);
935 for (size_t i = 1; i < count_; last = values_[i], i++) {
936 INVARIANT(last < values_[i]);
937 INVARIANT(values_[i] >= range_begin && values_[i] < range_end);
941 inline void sanity_check() const {}
944 // assumes list is in ascending order, no dups, and there is space
947 // inserts in ascending order
951 if (count_ >= ARRAY_NELEMS(values_)) {
952 cerr << "ERROR when trying to insert " << v
953 << ": " << vectorize() << endl;
956 INVARIANT(count_ < ARRAY_NELEMS(values_));
957 if (!count_ || values_[count_ - 1] < v) {
958 values_[count_++] = v;
960 INVARIANT(contains(v));
963 for (size_t i = 0; i < count_; i++) {
964 if (values_[i] > v) {
965 // move values_[i, count_) into slots values_[i+1, count_+1)
966 memmove(&values_[i+1], &values_[i], (count_ - i) * sizeof(uint32_t));
970 INVARIANT(contains(v));
977 contains(uint32_t v) const
979 for (size_t i = 0; i < count_; i++)
985 // removes v from list, returns true if actual removal happened.
986 // assumes v is in ascending order with no dups
990 for (size_t i = 0; i < count_; i++) {
991 if (values_[i] == v) {
992 // move values_[i+1, count_) into slots values_[i, count_-1)
993 memmove(&values_[i], &values_[i+1], (count_ - (i+1)) * sizeof(uint32_t));
996 INVARIANT(!contains(v));
997 // no dups assumption
999 } else if (values_[i] > v) {
1000 // ascending order assumption
1004 INVARIANT(!contains(v));
1008 inline vector<uint32_t>
1012 for (size_t i = 0; i < count_; i++)
1013 v.push_back(values_[i]);
1018 static volatile bool running = true;
1020 // expects the values to be monotonically increasing (as records)
1021 template <template <typename> class Protocol>
1022 class counting_scan_callback : public txn_btree<Protocol>::search_range_callback {
1025 invoke(const typename txn_btree<Protocol>::keystring_type &k,
1028 ALWAYS_ASSERT(k.length() == 8);
1029 const uint64_t u64k =
1030 host_endian_trfm<uint64_t>()(*reinterpret_cast<const uint64_t *>(k.data()));
1031 if (v.size() != sizeof(rec)) {
1032 cerr << "v.size(): " << v.size() << endl;
1033 cerr << "sizeof rec: " << sizeof(rec) << endl;
1034 ALWAYS_ASSERT(false);
1036 const rec *r = (const rec *) v.data();
1037 ALWAYS_ASSERT(u64k == r->v);
1038 VERBOSE(cerr << "counting_scan_callback: " << hexify(k) << " => " << r->v << endl);
1039 values_.push_back(r->v);
1042 vector<uint32_t> values_;
1045 template <template <typename> class TxnType, typename Traits>
1046 class mutate_worker : public txn_btree_worker<TxnType> {
1048 mutate_worker(txn_btree<TxnType> &btr, uint64_t flags)
1049 : txn_btree_worker<TxnType>(btr, flags), naborts(0) {}
1053 for (size_t i = range_begin; running && i < range_end; i++) {
1055 typename Traits::StringAllocator arena;
1056 //bool did_remove = false;
1057 //uint64_t did_v = 0;
1059 TxnType<Traits> t(this->txn_flags, arena);
1061 control_rec ctr_rec;
1063 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(ctr_key), v_ctr));
1064 ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1065 ALWAYS_ASSERT_COND_IN_TXN(t, ((const control_rec *) v_ctr.data())->count_ > 1);
1066 ctr_rec = *((const control_rec *) v_ctr.data());
1067 if (this->btr->search(t, u64_varkey(i), v)) {
1068 AssertByteEquality(rec(i), v);
1069 this->btr->remove(t, u64_varkey(i));
1070 ALWAYS_ASSERT_COND_IN_TXN(t, ctr_rec.remove(i));
1071 //did_remove = true;
1073 this->btr->insert_object(t, u64_varkey(i), rec(i));
1076 //did_v = ctr_rec.v;
1077 ctr_rec.sanity_check();
1078 this->btr->insert_object(t, u64_varkey(ctr_key), ctr_rec);
1080 } catch (transaction_abort_exception &e) {
1087 // TxnType<Traits> t(this->txn_flags, arena);
1090 // ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(ctr_key), v_ctr));
1091 // ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1092 // const bool ret = this->btr->search(t, u64_varkey(i), v);
1094 // if (reinterpret_cast<const rec *>(v_ctr.data())->v != did_v) {
1095 // cerr << "rec.v: " << reinterpret_cast<const rec *>(v_ctr.data())->v << ", did_v: " << did_v << endl;
1096 // ALWAYS_ASSERT(false);
1098 // if (did_remove && ret) {
1099 // cerr << "removed previous, but still found" << endl;
1100 // ALWAYS_ASSERT(false);
1101 // } else if (!did_remove && !ret) {
1102 // cerr << "did not previous, but not found" << endl;
1103 // ALWAYS_ASSERT(false);
1105 // } catch (transaction_abort_exception &e) {
1106 // // possibly aborts due to GC mechanism- if so, just move on
1115 template <template <typename> class TxnType, typename Traits>
1116 class reader_worker : public txn_btree_worker<TxnType> {
1118 reader_worker(txn_btree<TxnType> &btr, uint64_t flags, bool reverse)
1119 : txn_btree_worker<TxnType>(btr, flags),
1120 reverse_(reverse), validations(0), naborts(0) {}
1124 typename Traits::StringAllocator arena;
1125 TxnType<Traits> t(this->txn_flags, arena);
1128 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(ctr_key), v_ctr));
1129 ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1130 counting_scan_callback<TxnType> c;
1132 const u64_varkey kend(range_end);
1133 this->btr->search_range_call(t, u64_varkey(range_begin), &kend, c);
1135 const u64_varkey mkey(range_begin - 1);
1136 this->btr->rsearch_range_call(t, u64_varkey(range_end), &mkey, c);
1138 c.values_ = vector<uint32_t>(c.values_.rbegin(), c.values_.rend());
1142 const control_rec *crec = (const control_rec *) v_ctr.data();
1143 crec->sanity_check();
1144 auto cvec = crec->vectorize();
1145 if (c.values_ != cvec) {
1146 cerr << "observed (" << c.values_.size() << "): " << c.values_ << endl;
1147 cerr << "db value (" << cvec.size() << "): " << cvec << endl;
1148 ALWAYS_ASSERT_COND_IN_TXN(t, c.values_ == cvec);
1151 VERBOSE(cerr << "successful validation" << endl);
1152 } catch (transaction_abort_exception &e) {
1154 if (this->txn_flags & transaction_base::TXN_FLAG_READ_ONLY)
1155 // RO txns shouldn't abort
1156 ALWAYS_ASSERT_COND_IN_TXN(t, false);
1166 template <template <typename> class TxnType, typename Traits>
1170 using namespace mp_test2_ns;
1171 for (size_t txn_flags_idx = 0;
1172 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1174 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1175 txn_btree<TxnType> btr;
1176 typename Traits::StringAllocator arena;
1178 TxnType<Traits> t(txn_flags, arena);
1180 for (size_t i = range_begin; i < range_end; i++)
1182 btr.insert_object(t, u64_varkey(i), rec(i));
1183 ctrl.values_[ctrl.count_++] = i;
1185 ctrl.sanity_check();
1186 btr.insert_object(t, u64_varkey(ctr_key), ctrl);
1187 AssertSuccessfulCommit(t);
1190 // XXX(stephentu): HACK! we need to wait for the GC to
1191 // compute a new consistent snapshot version that includes this
1193 txn_epoch_sync<TxnType>::sync();
1196 // make sure the first validation passes
1198 txn_flags | transaction_base::TXN_FLAG_READ_ONLY, arena);
1199 typename Traits::StringAllocator arena;
1201 ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(ctr_key), v_ctr));
1202 ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1203 counting_scan_callback<TxnType> c;
1204 const u64_varkey kend(range_end);
1205 btr.search_range_call(t, u64_varkey(range_begin), &kend, c);
1206 AssertSuccessfulCommit(t);
1208 const control_rec *crec = (const control_rec *) v_ctr.data();
1209 crec->sanity_check();
1210 auto cvec = crec->vectorize();
1211 if (c.values_ != cvec) {
1212 cerr << "observed: " << c.values_ << endl;
1213 cerr << "db value: " << cvec << endl;
1214 ALWAYS_ASSERT_COND_IN_TXN(t, c.values_ == cvec);
1216 VERBOSE(cerr << "initial read only scan passed" << endl);
1219 mutate_worker<TxnType, Traits> w0(btr, txn_flags);
1220 //reader_worker<TxnType, Traits> w1(btr, txn_flags);
1221 reader_worker<TxnType, Traits> w2(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, false);
1222 reader_worker<TxnType, Traits> w3(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, false);
1223 #ifdef HAVE_REVERSE_RANGE_SCANS
1224 reader_worker<TxnType, Traits> w4(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, true);
1226 reader_worker<TxnType, Traits> w4(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, false);
1230 __sync_synchronize();
1239 __sync_synchronize();
1246 cerr << "mutate naborts: " << w0.naborts << endl;
1247 //cerr << "reader naborts: " << w1.naborts << endl;
1248 //cerr << "reader validations: " << w1.validations << endl;
1249 cerr << "read-only reader 1 naborts: " << w2.naborts << endl;
1250 cerr << "read-only reader 1 validations: " << w2.validations << endl;
1251 cerr << "read-only reader 2 naborts: " << w3.naborts << endl;
1252 cerr << "read-only reader 2 validations: " << w3.validations << endl;
1253 cerr << "read-only reader 3 naborts: " << w4.naborts << endl;
1254 cerr << "read-only reader 3 validations: " << w4.validations << endl;
1256 txn_epoch_sync<TxnType>::sync();
1257 txn_epoch_sync<TxnType>::finish();
1261 namespace mp_test3_ns {
1263 static const size_t amount_per_person = 100;
1264 static const size_t naccounts = 100;
1265 static const size_t niters = 1000000;
1267 template <template <typename> class TxnType, typename Traits>
1268 class transfer_worker : public txn_btree_worker<TxnType> {
1270 transfer_worker(txn_btree<TxnType> &btr, uint64_t flags, unsigned long seed)
1271 : txn_btree_worker<TxnType>(btr, flags), seed(seed) {}
1274 fast_random r(seed);
1275 for (size_t i = 0; i < niters; i++) {
1278 typename Traits::StringAllocator arena;
1279 TxnType<Traits> t(this->txn_flags, arena);
1280 uint64_t a = r.next() % naccounts;
1281 uint64_t b = r.next() % naccounts;
1282 while (unlikely(a == b))
1283 b = r.next() % naccounts;
1284 string arecv, brecv;
1285 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(a), arecv));
1286 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(b), brecv));
1287 const rec *arec = (const rec *) arecv.data();
1288 const rec *brec = (const rec *) brecv.data();
1292 const uint64_t xfer = (arec->v > 1) ? (r.next() % (arec->v - 1) + 1) : 1;
1293 this->btr->insert_object(t, u64_varkey(a), rec(arec->v - xfer));
1294 this->btr->insert_object(t, u64_varkey(b), rec(brec->v + xfer));
1297 } catch (transaction_abort_exception &e) {
1303 const unsigned long seed;
1306 template <template <typename> class TxnType, typename Traits>
1307 class invariant_worker_scan : public txn_btree_worker<TxnType>,
1308 public txn_btree<TxnType>::search_range_callback {
1310 invariant_worker_scan(txn_btree<TxnType> &btr, uint64_t flags)
1311 : txn_btree_worker<TxnType>(btr, flags), running(true),
1312 validations(0), naborts(0), sum(0) {}
1317 typename Traits::StringAllocator arena;
1318 TxnType<Traits> t(this->txn_flags, arena);
1320 this->btr->search_range_call(t, u64_varkey(0), NULL, *this);
1322 ALWAYS_ASSERT_COND_IN_TXN(t, sum == (naccounts * amount_per_person));
1324 } catch (transaction_abort_exception &e) {
1329 virtual bool invoke(const typename txn_btree<TxnType>::keystring_type &k,
1332 sum += ((rec *) v.data())->v;
1335 volatile bool running;
1341 template <template <typename> class TxnType, typename Traits>
1342 class invariant_worker_1by1 : public txn_btree_worker<TxnType> {
1344 invariant_worker_1by1(txn_btree<TxnType> &btr, uint64_t flags)
1345 : txn_btree_worker<TxnType>(btr, flags), running(true),
1346 validations(0), naborts(0) {}
1351 typename Traits::StringAllocator arena;
1352 TxnType<Traits> t(this->txn_flags, arena);
1354 for (uint64_t i = 0; i < naccounts; i++) {
1356 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(i), v));
1357 sum += ((const rec *) v.data())->v;
1360 if (sum != (naccounts * amount_per_person)) {
1361 cerr << "sum: " << sum << endl;
1362 cerr << "naccounts * amount_per_person: " << (naccounts * amount_per_person) << endl;
1364 ALWAYS_ASSERT_COND_IN_TXN(t, sum == (naccounts * amount_per_person));
1366 } catch (transaction_abort_exception &e) {
1371 volatile bool running;
1378 template <template <typename> class TxnType, typename Traits>
1382 using namespace mp_test3_ns;
1384 for (size_t txn_flags_idx = 0;
1385 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1387 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1389 txn_btree<TxnType> btr;
1390 typename Traits::StringAllocator arena;
1392 TxnType<Traits> t(txn_flags, arena);
1393 for (uint64_t i = 0; i < naccounts; i++)
1394 btr.insert_object(t, u64_varkey(i), rec(amount_per_person));
1395 AssertSuccessfulCommit(t);
1398 txn_epoch_sync<TxnType>::sync();
1400 transfer_worker<TxnType, Traits> w0(btr, txn_flags, 342),
1401 w1(btr, txn_flags, 93852),
1402 w2(btr, txn_flags, 23085),
1403 w3(btr, txn_flags, 859438989);
1404 invariant_worker_scan<TxnType, Traits> w4(btr, txn_flags);
1405 invariant_worker_1by1<TxnType, Traits> w5(btr, txn_flags);
1406 invariant_worker_scan<TxnType, Traits> w6(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY);
1407 invariant_worker_1by1<TxnType, Traits> w7(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY);
1409 w0.start(); w1.start(); w2.start(); w3.start(); w4.start(); w5.start(); w6.start(); w7.start();
1410 w0.join(); w1.join(); w2.join(); w3.join();
1411 w4.running = false; w5.running = false; w6.running = false; w7.running = false;
1412 __sync_synchronize();
1413 w4.join(); w5.join(); w6.join(); w7.join();
1415 cerr << "scan validations: " << w4.validations << ", scan aborts: " << w4.naborts << endl;
1416 cerr << "1by1 validations: " << w5.validations << ", 1by1 aborts: " << w5.naborts << endl;
1417 cerr << "scan-readonly validations: " << w6.validations << ", scan-readonly aborts: " << w6.naborts << endl;
1418 cerr << "1by1-readonly validations: " << w7.validations << ", 1by1-readonly aborts: " << w7.naborts << endl;
1420 txn_epoch_sync<TxnType>::sync();
1421 txn_epoch_sync<TxnType>::finish();
1425 namespace mp_test_simple_write_skew_ns {
1426 static const size_t NDoctors = 16;
1428 volatile bool running = true;
1430 template <template <typename> class TxnType, typename Traits>
1431 class get_worker : public txn_btree_worker<TxnType> {
1433 get_worker(unsigned int d, txn_btree<TxnType> &btr, uint64_t txn_flags)
1434 : txn_btree_worker<TxnType>(btr, txn_flags), n(0), d(d) {}
1439 typename Traits::StringAllocator arena;
1440 TxnType<Traits> t(this->txn_flags, arena);
1442 // try to take this doctor off call
1443 unsigned int ctr = 0;
1444 for (unsigned int i = 0; i < NDoctors && ctr < 2; i++) {
1446 ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(i), v));
1447 INVARIANT(v.size() == sizeof(rec));
1448 const rec *r = (const rec *) v.data();
1453 this->btr->insert_object(t, u64_varkey(d), rec(0));
1455 ALWAYS_ASSERT_COND_IN_TXN(t, ctr >= 1);
1457 // place this doctor on call
1458 this->btr->insert_object(t, u64_varkey(d), rec(1));
1462 } catch (transaction_abort_exception &e) {
1472 template <template <typename> class TxnType, typename Traits>
1473 class scan_worker : public txn_btree_worker<TxnType>,
1474 public txn_btree<TxnType>::search_range_callback {
1476 scan_worker(unsigned int d, txn_btree<TxnType> &btr, uint64_t txn_flags)
1477 : txn_btree_worker<TxnType>(btr, txn_flags), n(0), d(d), ctr(0) {}
1482 typename Traits::StringAllocator arena;
1483 TxnType<Traits> t(this->txn_flags, arena);
1486 this->btr->search_range_call(t, u64_varkey(0), NULL, *this);
1488 this->btr->insert_object(t, u64_varkey(d), rec(0));
1490 ALWAYS_ASSERT_COND_IN_TXN(t, ctr >= 1);
1492 this->btr->insert_object(t, u64_varkey(d), rec(1));
1496 } catch (transaction_abort_exception &e) {
1501 virtual bool invoke(const typename txn_btree<TxnType>::keystring_type &k,
1504 INVARIANT(v.size() == sizeof(rec));
1505 const rec *r = (const rec *) v.data();
1517 template <template <typename> class TxnType, typename Traits>
1519 mp_test_simple_write_skew()
1521 using namespace mp_test_simple_write_skew_ns;
1523 for (size_t txn_flags_idx = 0;
1524 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1526 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1528 txn_btree<TxnType> btr;
1529 typename Traits::StringAllocator arena;
1531 TxnType<Traits> t(txn_flags, arena);
1532 static_assert(NDoctors >= 2, "XX");
1533 for (uint64_t i = 0; i < NDoctors; i++)
1534 btr.insert_object(t, u64_varkey(i), rec(i < 2 ? 1 : 0));
1535 AssertSuccessfulCommit(t);
1538 txn_epoch_sync<TxnType>::sync();
1541 __sync_synchronize();
1542 vector<txn_btree_worker<TxnType> *> workers;
1543 for (size_t i = 0; i < NDoctors / 2; i++)
1544 workers.push_back(new get_worker<TxnType, Traits>(i, btr, txn_flags));
1545 for (size_t i = NDoctors / 2; i < NDoctors; i++)
1546 workers.push_back(new scan_worker<TxnType, Traits>(i, btr, txn_flags));
1547 for (size_t i = 0; i < NDoctors; i++)
1548 workers[i]->start();
1551 __sync_synchronize();
1554 size_t n_get_succ = 0, n_scan_succ = 0;
1555 for (size_t i = 0; i < NDoctors; i++) {
1557 if (get_worker<TxnType, Traits> *w = dynamic_cast<get_worker<TxnType, Traits> *>(workers[i]))
1559 else if (scan_worker<TxnType, Traits> *w = dynamic_cast<scan_worker<TxnType, Traits> *>(workers[i]))
1560 n_scan_succ += w->n;
1562 ALWAYS_ASSERT(false);
1567 cerr << "get_worker txns: " << n_get_succ << endl;
1568 cerr << "scan_worker txns: " << n_scan_succ << endl;
1570 txn_epoch_sync<TxnType>::sync();
1571 txn_epoch_sync<TxnType>::finish();
1575 namespace mp_test_batch_processing_ns {
1577 volatile bool running = true;
1579 static inline string
1580 MakeKey(uint32_t batch_id, uint32_t receipt_id)
1582 const big_endian_trfm<int32_t> t;
1584 uint32_t *p = (uint32_t *) &buf[0];
1586 *p++ = t(receipt_id);
1590 template <template <typename> class TxnType, typename Traits>
1591 class report_worker : public ndb_thread,
1592 public txn_btree<TxnType>::search_range_callback {
1594 report_worker(txn_btree<TxnType> &ctrl, txn_btree<TxnType> &receipts, uint64_t txn_flags)
1595 : ctrl(&ctrl), receipts(&receipts), txn_flags(txn_flags), n(0), m(0), sum(0) {}
1600 typename Traits::StringAllocator arena;
1601 TxnType<Traits> t(this->txn_flags, arena);
1603 ALWAYS_ASSERT_COND_IN_TXN(t, ctrl->search(t, u64_varkey(0), v));
1604 ALWAYS_ASSERT_COND_IN_TXN(t, v.size() == sizeof(rec));
1605 const rec * const r = (const rec *) v.data();
1606 const uint32_t prev_bid = r->v - 1; // prev batch
1608 const string endkey = MakeKey(prev_bid, numeric_limits<uint32_t>::max());
1609 receipts->search_range_call(t, MakeKey(prev_bid, 0), &endkey, *this);
1611 map<uint32_t, uint32_t>::iterator it = reports.find(prev_bid);
1612 if (it != reports.end()) {
1613 ALWAYS_ASSERT_COND_IN_TXN(t, sum == it->second);
1616 reports[prev_bid] = sum;
1619 } catch (transaction_abort_exception &e) {
1624 virtual bool invoke(const typename txn_btree<TxnType>::keystring_type &k,
1627 INVARIANT(v.size() == sizeof(rec));
1628 const rec * const r = (const rec *) v.data();
1633 txn_btree<TxnType> *ctrl;
1634 txn_btree<TxnType> *receipts;
1642 map<uint32_t, uint32_t> reports;
1646 template <template <typename> class TxnType, typename Traits>
1647 class new_receipt_worker : public ndb_thread {
1649 new_receipt_worker(txn_btree<TxnType> &ctrl, txn_btree<TxnType> &receipts, uint64_t txn_flags)
1650 : ctrl(&ctrl), receipts(&receipts), txn_flags(txn_flags),
1651 n(0), last_bid(0), last_rid(0) {}
1656 typename Traits::StringAllocator arena;
1657 TxnType<Traits> t(this->txn_flags, arena);
1659 ALWAYS_ASSERT_COND_IN_TXN(t, ctrl->search(t, u64_varkey(0), v));
1660 ALWAYS_ASSERT_COND_IN_TXN(t, v.size() == sizeof(rec));
1661 const rec * const r = (const rec *) v.data();
1662 const uint32_t cur_bid = r->v;
1663 const uint32_t cur_rid = (cur_bid != last_bid) ? 0 : last_rid + 1;
1664 const string rkey = MakeKey(cur_bid, cur_rid);
1665 receipts->insert_object(t, rkey, rec(1));
1670 } catch (transaction_abort_exception &e) {
1677 txn_btree<TxnType> *ctrl;
1678 txn_btree<TxnType> *receipts;
1689 template <template <typename> class TxnType, typename Traits>
1690 class incr_worker : public ndb_thread {
1692 incr_worker(txn_btree<TxnType> &ctrl, txn_btree<TxnType> &receipts, uint64_t txn_flags)
1693 : ctrl(&ctrl), receipts(&receipts), txn_flags(txn_flags), n(0) {}
1697 NDB_MEMSET(&t, 0, sizeof(t));
1698 t.tv_nsec = 1000; // 1 us
1701 typename Traits::StringAllocator arena;
1702 TxnType<Traits> t(this->txn_flags, arena);
1704 ALWAYS_ASSERT_COND_IN_TXN(t, ctrl->search(t, u64_varkey(0), v));
1705 ALWAYS_ASSERT_COND_IN_TXN(t, v.size() == sizeof(rec));
1706 const rec * const r = (const rec *) v.data();
1707 ctrl->insert_object(t, u64_varkey(0), rec(r->v + 1));
1710 } catch (transaction_abort_exception &e) {
1713 nanosleep(&t, NULL);
1717 txn_btree<TxnType> *ctrl;
1718 txn_btree<TxnType> *receipts;
1726 template <template <typename> class TxnType, typename Traits>
1728 mp_test_batch_processing()
1730 using namespace mp_test_batch_processing_ns;
1732 for (size_t txn_flags_idx = 0;
1733 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1735 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1737 txn_btree<TxnType> ctrl;
1738 txn_btree<TxnType> receipts;
1739 typename Traits::StringAllocator arena;
1741 TxnType<Traits> t(txn_flags, arena);
1742 ctrl.insert_object(t, u64_varkey(0), rec(1));
1743 AssertSuccessfulCommit(t);
1746 txn_epoch_sync<TxnType>::sync();
1748 report_worker<TxnType, Traits> w0(ctrl, receipts, txn_flags);
1749 new_receipt_worker<TxnType, Traits> w1(ctrl, receipts, txn_flags);
1750 incr_worker<TxnType, Traits> w2(ctrl, receipts, txn_flags);
1752 __sync_synchronize();
1753 w0.start(); w1.start(); w2.start();
1756 __sync_synchronize();
1757 w0.join(); w1.join(); w2.join();
1760 cerr << "report_worker txns : " << w0.n << endl;
1761 cerr << "report_worker validations: " << w0.m << endl;
1762 cerr << "new_receipt_worker txns : " << w1.n << endl;
1763 cerr << "incr_worker txns : " << w2.n << endl;
1765 txn_epoch_sync<TxnType>::sync();
1766 txn_epoch_sync<TxnType>::finish();
1770 namespace read_only_perf_ns {
1771 const size_t nkeys = 140000000; // 140M
1772 //const size_t nkeys = 100000; // 100K
1774 unsigned long seeds[] = {
1775 9576455804445224191ULL,
1776 3303315688255411629ULL,
1777 3116364238170296072ULL,
1778 641702699332002535ULL,
1779 17755947590284612420ULL,
1780 13349066465957081273ULL,
1781 16389054441777092823ULL,
1782 2687412585397891607ULL,
1783 16665670053534306255ULL,
1784 5166823197462453937ULL,
1785 1252059952779729626ULL,
1786 17962022827457676982ULL,
1787 940911318964853784ULL,
1788 479878990529143738ULL,
1789 250864516707124695ULL,
1790 8507722621803716653ULL,
1793 volatile bool running = false;
1795 template <template <typename> class TxnType, typename Traits>
1796 class worker : public txn_btree_worker<TxnType> {
1798 worker(unsigned int seed, txn_btree<TxnType> &btr, uint64_t txn_flags)
1799 : txn_btree_worker<TxnType>(btr, txn_flags), n(0), seed(seed) {}
1802 fast_random r(seed);
1804 const uint64_t k = r.next() % nkeys;
1807 typename Traits::StringAllocator arena;
1808 TxnType<Traits> t(this->txn_flags, arena);
1810 bool found = this->btr->search(t, u64_varkey(k), v);
1812 ALWAYS_ASSERT_COND_IN_TXN(t, found);
1813 AssertByteEquality(rec(k + 1), v);
1814 } catch (transaction_abort_exception &e) {
1826 template <template <typename> class TxnType, typename Traits>
1830 using namespace read_only_perf_ns;
1831 for (size_t txn_flags_idx = 0;
1832 txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1834 const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1836 txn_btree<TxnType> btr;
1839 const size_t nkeyspertxn = 100000;
1840 for (size_t i = 0; i < nkeys / nkeyspertxn; i++) {
1842 const size_t end = (i == (nkeys / nkeyspertxn - 1)) ? nkeys : ((i + 1) * nkeyspertxn);
1843 for (size_t j = i * nkeyspertxn; j < end; j++)
1844 btr.insert_object(t, u64_varkey(j), rec(j + 1));
1845 AssertSuccessfulCommit(t);
1846 cerr << "batch " << i << " completed" << endl;
1848 cerr << "btree loaded, test starting" << endl;
1851 vector<worker<TxnType, Traits> *> workers;
1852 for (size_t i = 0; i < ARRAY_NELEMS(seeds); i++)
1853 workers.push_back(new worker<TxnType, Traits>(seeds[i], btr, txn_flags));
1857 COMPILER_MEMORY_FENCE;
1858 for (size_t i = 0; i < ARRAY_NELEMS(seeds); i++)
1859 workers[i]->start();
1861 COMPILER_MEMORY_FENCE;
1863 COMPILER_MEMORY_FENCE;
1864 uint64_t total_n = 0;
1865 for (size_t i = 0; i < ARRAY_NELEMS(seeds); i++) {
1867 total_n += workers[i]->n;
1871 double agg_throughput = double(total_n) / (double(t.lap()) / 1000000.0);
1872 double avg_per_core_throughput = agg_throughput / double(ARRAY_NELEMS(seeds));
1874 cerr << "agg_read_throughput: " << agg_throughput << " gets/sec" << endl;
1875 cerr << "avg_per_core_read_throughput: " << avg_per_core_throughput << " gets/sec/core" << endl;
1877 txn_epoch_sync<TxnType>::sync();
1878 txn_epoch_sync<TxnType>::finish();
1882 void txn_btree_test()
1884 cerr << "Test proto2" << endl;
1885 test_typed_btree<transaction_proto2, default_stable_transaction_traits>();
1886 test1<transaction_proto2, default_transaction_traits>();
1887 test2<transaction_proto2, default_transaction_traits>();
1888 test_absent_key_race<transaction_proto2, default_transaction_traits>();
1889 test_inc_value_size<transaction_proto2, default_transaction_traits>();
1890 test_multi_btree<transaction_proto2, default_transaction_traits>();
1891 test_read_only_snapshot<transaction_proto2, default_transaction_traits>();
1892 test_long_keys<transaction_proto2, default_transaction_traits>();
1893 test_long_keys2<transaction_proto2, default_transaction_traits>();
1894 test_insert_same_key<transaction_proto2, default_transaction_traits>();
1896 //mp_stress_test_allocator<transaction_proto2, default_transaction_traits>();
1897 mp_stress_test_insert_removes<transaction_proto2, default_transaction_traits>();
1898 mp_test1<transaction_proto2, default_transaction_traits>();
1899 mp_test2<transaction_proto2, default_transaction_traits>();
1900 mp_test3<transaction_proto2, default_transaction_traits>();
1901 mp_test_simple_write_skew<transaction_proto2, default_transaction_traits>();
1902 mp_test_batch_processing<transaction_proto2, default_transaction_traits>();
1904 //read_only_perf<transaction_proto1>();
1905 //read_only_perf<transaction_proto2>();