1 #ifndef _NDB_WRAPPER_IMPL_H_
2 #define _NDB_WRAPPER_IMPL_H_
5 #include "ndb_wrapper.h"
6 #include "../counter.h"
11 #include "../scopedperf.hh"
13 //#include "../txn_proto1_impl.h"
14 #include "../txn_proto2_impl.h"
17 struct hint_default_traits : public default_transaction_traits {
18 typedef str_arena StringAllocator;
23 struct hint_kv_get_put_traits {
24 static const size_t read_set_expected_size = 1;
25 static const size_t write_set_expected_size = 1;
26 static const size_t absent_set_expected_size = 1;
27 static const bool stable_input_memory = true;
28 static const bool hard_expected_sizes = true;
29 static const bool read_own_writes = false;
30 typedef str_arena StringAllocator;
33 struct hint_kv_rmw_traits : public hint_kv_get_put_traits {};
35 struct hint_kv_scan_traits {
36 static const size_t read_set_expected_size = 100;
37 static const size_t write_set_expected_size = 1;
38 static const size_t absent_set_expected_size = read_set_expected_size / 7 + 1;
39 static const bool stable_input_memory = true;
40 static const bool hard_expected_sizes = false;
41 static const bool read_own_writes = false;
42 typedef str_arena StringAllocator;
47 struct hint_read_only_traits {
48 static const size_t read_set_expected_size = 1;
49 static const size_t write_set_expected_size = 1;
50 static const size_t absent_set_expected_size = 1;
51 static const bool stable_input_memory = true;
52 static const bool hard_expected_sizes = true;
53 static const bool read_own_writes = false;
54 typedef str_arena StringAllocator;
57 struct hint_tpcc_new_order_traits {
58 static const size_t read_set_expected_size = 35;
59 static const size_t write_set_expected_size = 35;
60 static const size_t absent_set_expected_size = 1;
61 static const bool stable_input_memory = true;
62 static const bool hard_expected_sizes = true;
63 static const bool read_own_writes = false;
64 typedef str_arena StringAllocator;
67 struct hint_tpcc_payment_traits {
68 static const size_t read_set_expected_size = 85;
69 static const size_t write_set_expected_size = 10;
70 static const size_t absent_set_expected_size = 15;
71 static const bool stable_input_memory = true;
72 static const bool hard_expected_sizes = false;
73 static const bool read_own_writes = false;
74 typedef str_arena StringAllocator;
77 struct hint_tpcc_delivery_traits {
78 static const size_t read_set_expected_size = 175;
79 static const size_t write_set_expected_size = 175;
80 static const size_t absent_set_expected_size = 35;
81 static const bool stable_input_memory = true;
82 static const bool hard_expected_sizes = false;
83 static const bool read_own_writes = false;
84 typedef str_arena StringAllocator;
87 struct hint_tpcc_order_status_traits {
88 static const size_t read_set_expected_size = 95;
89 static const size_t write_set_expected_size = 1;
90 static const size_t absent_set_expected_size = 25;
91 static const bool stable_input_memory = true;
92 static const bool hard_expected_sizes = false;
93 static const bool read_own_writes = false;
94 typedef str_arena StringAllocator;
97 struct hint_tpcc_order_status_read_only_traits : public hint_read_only_traits {};
99 struct hint_tpcc_stock_level_traits {
100 static const size_t read_set_expected_size = 500;
101 static const size_t write_set_expected_size = 1;
102 static const size_t absent_set_expected_size = 25;
103 static const bool stable_input_memory = true;
104 static const bool hard_expected_sizes = false;
105 static const bool read_own_writes = false;
106 typedef str_arena StringAllocator;
109 struct hint_tpcc_stock_level_read_only_traits : public hint_read_only_traits {};
111 #define TXN_PROFILE_HINT_OP(x) \
112 x(abstract_db::HINT_DEFAULT, hint_default_traits) \
113 x(abstract_db::HINT_KV_GET_PUT, hint_kv_get_put_traits) \
114 x(abstract_db::HINT_KV_RMW, hint_kv_rmw_traits) \
115 x(abstract_db::HINT_KV_SCAN, hint_kv_scan_traits) \
116 x(abstract_db::HINT_TPCC_NEW_ORDER, hint_tpcc_new_order_traits) \
117 x(abstract_db::HINT_TPCC_PAYMENT, hint_tpcc_payment_traits) \
118 x(abstract_db::HINT_TPCC_DELIVERY, hint_tpcc_delivery_traits) \
119 x(abstract_db::HINT_TPCC_ORDER_STATUS, hint_tpcc_order_status_traits) \
120 x(abstract_db::HINT_TPCC_ORDER_STATUS_READ_ONLY, hint_tpcc_order_status_read_only_traits) \
121 x(abstract_db::HINT_TPCC_STOCK_LEVEL, hint_tpcc_stock_level_traits) \
122 x(abstract_db::HINT_TPCC_STOCK_LEVEL_READ_ONLY, hint_tpcc_stock_level_read_only_traits)
124 template <template <typename> class Transaction>
125 ndb_wrapper<Transaction>::ndb_wrapper(
126 const std::vector<std::string> &logfiles,
127 const std::vector<std::vector<unsigned>> &assignments_given,
129 bool use_compression,
132 if (logfiles.empty())
134 std::vector<std::vector<unsigned>> assignments_used;
136 nthreads, logfiles, assignments_given, &assignments_used,
141 std::cerr << "[logging subsystem]" << std::endl;
142 std::cerr << " assignments: " << assignments_used << std::endl;
143 std::cerr << " call fsync : " << call_fsync << std::endl;
144 std::cerr << " compression: " << use_compression << std::endl;
145 std::cerr << " fake_writes: " << fake_writes << std::endl;
149 template <template <typename> class Transaction>
151 ndb_wrapper<Transaction>::sizeof_txn_object(uint64_t txn_flags) const
153 #define MY_OP_X(a, b) sizeof(typename cast< b >::type),
154 const size_t xs[] = {
155 TXN_PROFILE_HINT_OP(MY_OP_X)
159 for (size_t i = 0; i < ARRAY_NELEMS(xs); i++)
160 xmax = std::max(xmax, xs[i]);
164 template <template <typename> class Transaction>
166 ndb_wrapper<Transaction>::new_txn(
172 ndbtxn * const p = reinterpret_cast<ndbtxn *>(buf);
174 #define MY_OP_X(a, b) \
176 new (&p->buf[0]) typename cast< b >::type(txn_flags, arena); \
179 TXN_PROFILE_HINT_OP(MY_OP_X)
181 ALWAYS_ASSERT(false);
187 template <typename T>
188 static inline ALWAYS_INLINE void
191 PERF_DECL(static std::string probe1_name(std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
192 ANON_REGION(probe1_name.c_str(), &private_::ndb_dtor_probe0_cg);
196 template <template <typename> class Transaction>
198 ndb_wrapper<Transaction>::commit_txn(void *txn)
200 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
201 #define MY_OP_X(a, b) \
204 auto t = cast< b >()(p); \
205 const bool ret = t->commit(); \
210 TXN_PROFILE_HINT_OP(MY_OP_X)
212 ALWAYS_ASSERT(false);
218 template <template <typename> class Transaction>
220 ndb_wrapper<Transaction>::abort_txn(void *txn)
222 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
223 #define MY_OP_X(a, b) \
226 auto t = cast< b >()(p); \
232 TXN_PROFILE_HINT_OP(MY_OP_X)
234 ALWAYS_ASSERT(false);
239 template <template <typename> class Transaction>
241 ndb_wrapper<Transaction>::print_txn_debug(void *txn) const
243 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
244 #define MY_OP_X(a, b) \
247 auto t = cast< b >()(p); \
248 t->dump_debug_info(); \
252 TXN_PROFILE_HINT_OP(MY_OP_X)
254 ALWAYS_ASSERT(false);
259 template <template <typename> class Transaction>
260 std::map<std::string, uint64_t>
261 ndb_wrapper<Transaction>::get_txn_counters(void *txn) const
263 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
264 #define MY_OP_X(a, b) \
267 auto t = cast< b >()(p); \
268 return t->get_txn_counters(); \
271 TXN_PROFILE_HINT_OP(MY_OP_X)
273 ALWAYS_ASSERT(false);
276 return std::map<std::string, uint64_t>();
279 template <template <typename> class Transaction>
280 abstract_ordered_index *
281 ndb_wrapper<Transaction>::open_index(const std::string &name, size_t value_size_hint, bool mostly_append)
283 return new ndb_ordered_index<Transaction>(name, value_size_hint, mostly_append);
286 template <template <typename> class Transaction>
288 ndb_wrapper<Transaction>::close_index(abstract_ordered_index *idx)
293 template <template <typename> class Transaction>
294 ndb_ordered_index<Transaction>::ndb_ordered_index(
295 const std::string &name, size_t value_size_hint, bool mostly_append)
296 : name(name), btr(value_size_hint, mostly_append, name)
299 //std::cerr << name << " : btree= "
300 // << btr.get_underlying_btree()
304 template <template <typename> class Transaction>
306 ndb_ordered_index<Transaction>::get(
308 const std::string &key,
309 std::string &value, size_t max_bytes_read)
311 PERF_DECL(static std::string probe1_name(std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
312 ANON_REGION(probe1_name.c_str(), &private_::ndb_get_probe0_cg);
313 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
315 #define MY_OP_X(a, b) \
318 auto t = cast< b >()(p); \
319 if (!btr.search(*t, key, value, max_bytes_read)) \
324 TXN_PROFILE_HINT_OP(MY_OP_X)
326 ALWAYS_ASSERT(false);
329 INVARIANT(!value.empty());
331 } catch (transaction_abort_exception &ex) {
332 throw abstract_db::abstract_abort_exception();
336 // XXX: find way to remove code duplication below using C++ templates!
338 template <template <typename> class Transaction>
340 ndb_ordered_index<Transaction>::put(
342 const std::string &key,
343 const std::string &value)
345 PERF_DECL(static std::string probe1_name(std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
346 ANON_REGION(probe1_name.c_str(), &private_::ndb_put_probe0_cg);
347 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
349 #define MY_OP_X(a, b) \
352 auto t = cast< b >()(p); \
353 btr.put(*t, key, value); \
357 TXN_PROFILE_HINT_OP(MY_OP_X)
359 ALWAYS_ASSERT(false);
362 } catch (transaction_abort_exception &ex) {
363 throw abstract_db::abstract_abort_exception();
368 template <template <typename> class Transaction>
370 ndb_ordered_index<Transaction>::put(
375 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
377 #define MY_OP_X(a, b) \
380 auto t = cast< b >()(p); \
381 btr.put(*t, std::move(key), std::move(value)); \
385 TXN_PROFILE_HINT_OP(MY_OP_X)
387 ALWAYS_ASSERT(false);
390 } catch (transaction_abort_exception &ex) {
391 throw abstract_db::abstract_abort_exception();
396 template <template <typename> class Transaction>
398 ndb_ordered_index<Transaction>::insert(
400 const std::string &key,
401 const std::string &value)
403 PERF_DECL(static std::string probe1_name(std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
404 ANON_REGION(probe1_name.c_str(), &private_::ndb_insert_probe0_cg);
405 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
407 #define MY_OP_X(a, b) \
410 auto t = cast< b >()(p); \
411 btr.insert(*t, key, value); \
415 TXN_PROFILE_HINT_OP(MY_OP_X)
417 ALWAYS_ASSERT(false);
420 } catch (transaction_abort_exception &ex) {
421 throw abstract_db::abstract_abort_exception();
426 template <template <typename> class Transaction>
428 ndb_ordered_index<Transaction>::insert(
433 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
435 #define MY_OP_X(a, b) \
438 auto t = cast< b >()(p); \
439 btr.insert(*t, std::move(key), std::move(value)); \
443 TXN_PROFILE_HINT_OP(MY_OP_X)
445 ALWAYS_ASSERT(false);
448 } catch (transaction_abort_exception &ex) {
449 throw abstract_db::abstract_abort_exception();
454 template <template <typename> class Transaction>
455 class ndb_wrapper_search_range_callback : public txn_btree<Transaction>::search_range_callback {
457 ndb_wrapper_search_range_callback(abstract_ordered_index::scan_callback &upcall)
461 invoke(const typename txn_btree<Transaction>::keystring_type &k,
462 const typename txn_btree<Transaction>::string_type &v)
464 return upcall->invoke(k.data(), k.length(), v);
468 abstract_ordered_index::scan_callback *upcall;
471 template <template <typename> class Transaction>
473 ndb_ordered_index<Transaction>::scan(
475 const std::string &start_key,
476 const std::string *end_key,
477 scan_callback &callback,
480 PERF_DECL(static std::string probe1_name(std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
481 ANON_REGION(probe1_name.c_str(), &private_::ndb_scan_probe0_cg);
482 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
483 ndb_wrapper_search_range_callback<Transaction> c(callback);
485 #define MY_OP_X(a, b) \
488 auto t = cast< b >()(p); \
489 btr.search_range_call(*t, start_key, end_key, c); \
493 TXN_PROFILE_HINT_OP(MY_OP_X)
495 ALWAYS_ASSERT(false);
498 } catch (transaction_abort_exception &ex) {
499 throw abstract_db::abstract_abort_exception();
503 template <template <typename> class Transaction>
505 ndb_ordered_index<Transaction>::rscan(
507 const std::string &start_key,
508 const std::string *end_key,
509 scan_callback &callback,
512 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
513 ndb_wrapper_search_range_callback<Transaction> c(callback);
515 #define MY_OP_X(a, b) \
518 auto t = cast< b >()(p); \
519 btr.rsearch_range_call(*t, start_key, end_key, c); \
523 TXN_PROFILE_HINT_OP(MY_OP_X)
525 ALWAYS_ASSERT(false);
528 } catch (transaction_abort_exception &ex) {
529 throw abstract_db::abstract_abort_exception();
533 template <template <typename> class Transaction>
535 ndb_ordered_index<Transaction>::remove(void *txn, const std::string &key)
537 PERF_DECL(static std::string probe1_name(std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
538 ANON_REGION(probe1_name.c_str(), &private_::ndb_remove_probe0_cg);
539 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
541 #define MY_OP_X(a, b) \
544 auto t = cast< b >()(p); \
545 btr.remove(*t, key); \
549 TXN_PROFILE_HINT_OP(MY_OP_X)
551 ALWAYS_ASSERT(false);
554 } catch (transaction_abort_exception &ex) {
555 throw abstract_db::abstract_abort_exception();
559 template <template <typename> class Transaction>
561 ndb_ordered_index<Transaction>::remove(void *txn, std::string &&key)
563 ndbtxn * const p = reinterpret_cast<ndbtxn *>(txn);
565 #define MY_OP_X(a, b) \
568 auto t = cast< b >()(p); \
569 btr.remove(*t, std::move(key)); \
573 TXN_PROFILE_HINT_OP(MY_OP_X)
575 ALWAYS_ASSERT(false);
578 } catch (transaction_abort_exception &ex) {
579 throw abstract_db::abstract_abort_exception();
583 template <template <typename> class Transaction>
585 ndb_ordered_index<Transaction>::size() const
587 return btr.size_estimate();
590 template <template <typename> class Transaction>
591 std::map<std::string, uint64_t>
592 ndb_ordered_index<Transaction>::clear()
594 #ifdef TXN_BTREE_DUMP_PURGE_STATS
595 std::cerr << "purging txn index: " << name << std::endl;
597 return btr.unsafe_purge(true);
600 #endif /* _NDB_WRAPPER_IMPL_H_ */