2 * An implementation of TPC-C based off of:
3 * https://github.com/oltpbenchmark/oltpbench/tree/master/src/com/oltpbenchmark/benchmarks/tpcc
20 #include "../macros.h"
21 #include "../scopedperf.hh"
22 #include "../spinlock.h"
27 #include "ndb_database.h"
28 #include "kvdb_database.h"
33 static inline ALWAYS_INLINE size_t
36 return (size_t) scale_factor;
41 static constexpr inline ALWAYS_INLINE size_t
47 static constexpr inline ALWAYS_INLINE size_t
48 NumDistrictsPerWarehouse()
53 static constexpr inline ALWAYS_INLINE size_t
54 NumCustomersPerDistrict()
59 // T must implement lock()/unlock(). Both must *not* throw exceptions
61 class scoped_multilock {
63 inline scoped_multilock()
68 inline ~scoped_multilock()
78 ALWAYS_ASSERT(!did_lock);
79 locks.emplace_back(&t);
85 ALWAYS_ASSERT(!did_lock);
87 sort(locks.begin(), locks.end());
88 #ifdef CHECK_INVARIANTS
89 if (set<T *>(locks.begin(), locks.end()).size() != locks.size()) {
91 cerr << "lock: " << hexify(t) << endl;
92 INVARIANT(false && "duplicate locks found");
102 typename util::vec<T *, 64>::type locks;
105 // like a lock_guard, but has the option of not acquiring
106 template <typename T>
107 class scoped_lock_guard {
109 inline scoped_lock_guard(T &l)
115 inline scoped_lock_guard(T *l)
122 inline ~scoped_lock_guard()
132 // configuration flags
133 static int g_disable_xpartition_txn = 0;
134 static int g_disable_read_only_scans = 0;
135 static int g_enable_partition_locks = 0;
136 static int g_enable_separate_tree_per_partition = 0;
137 static int g_new_order_remote_item_pct = 1;
138 static int g_new_order_fast_id_gen = 0;
139 static int g_uniform_item_dist = 0;
140 static unsigned g_txn_workload_mix[] = { 45, 43, 4, 4, 4 }; // default TPC-C workload mix
142 static aligned_padded_elem<spinlock> *g_partition_locks = nullptr;
143 static aligned_padded_elem<atomic<uint64_t>> *g_district_ids = nullptr;
145 // maps a wid => partition id
146 static inline ALWAYS_INLINE unsigned int
147 PartitionId(unsigned int wid)
149 INVARIANT(wid >= 1 && wid <= NumWarehouses());
151 if (NumWarehouses() <= nthreads)
152 // more workers than partitions, so its easy
154 const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
155 const unsigned partid = wid / nwhse_per_partition;
156 if (partid >= nthreads)
161 static inline ALWAYS_INLINE spinlock &
162 LockForPartition(unsigned int wid)
164 INVARIANT(g_enable_partition_locks);
165 return g_partition_locks[PartitionId(wid)].elem;
168 static inline atomic<uint64_t> &
169 NewOrderIdHolder(unsigned warehouse, unsigned district)
171 INVARIANT(warehouse >= 1 && warehouse <= NumWarehouses());
172 INVARIANT(district >= 1 && district <= NumDistrictsPerWarehouse());
174 (warehouse - 1) * NumDistrictsPerWarehouse() + (district - 1);
175 return g_district_ids[idx].elem;
178 static inline uint64_t
179 FastNewOrderIdGen(unsigned warehouse, unsigned district)
181 return NewOrderIdHolder(warehouse, district).fetch_add(1, memory_order_acq_rel);
185 // these sanity checks are just a few simple checks to make sure
186 // the data is not entirely corrupted
188 static inline ALWAYS_INLINE void
189 SanityCheckCustomer(const customer::key *k, const customer::value *v)
191 INVARIANT(k->c_w_id >= 1 && static_cast<size_t>(k->c_w_id) <= NumWarehouses());
192 INVARIANT(k->c_d_id >= 1 && static_cast<size_t>(k->c_d_id) <= NumDistrictsPerWarehouse());
193 INVARIANT(k->c_id >= 1 && static_cast<size_t>(k->c_id) <= NumCustomersPerDistrict());
194 #ifdef DISABLE_FIELD_SELECTION
195 INVARIANT(v->c_credit == "BC" || v->c_credit == "GC");
196 INVARIANT(v->c_middle == "OE");
200 static inline ALWAYS_INLINE void
201 SanityCheckWarehouse(const warehouse::key *k, const warehouse::value *v)
203 INVARIANT(k->w_id >= 1 && static_cast<size_t>(k->w_id) <= NumWarehouses());
204 #ifdef DISABLE_FIELD_SELECTION
205 INVARIANT(v->w_state.size() == 2);
206 INVARIANT(v->w_zip == "123456789");
210 static inline ALWAYS_INLINE void
211 SanityCheckDistrict(const district::key *k, const district::value *v)
213 INVARIANT(k->d_w_id >= 1 && static_cast<size_t>(k->d_w_id) <= NumWarehouses());
214 INVARIANT(k->d_id >= 1 && static_cast<size_t>(k->d_id) <= NumDistrictsPerWarehouse());
215 #ifdef DISABLE_FIELD_SELECTION
216 INVARIANT(v->d_next_o_id >= 3001);
217 INVARIANT(v->d_state.size() == 2);
218 INVARIANT(v->d_zip == "123456789");
222 static inline ALWAYS_INLINE void
223 SanityCheckItem(const item::key *k, const item::value *v)
225 INVARIANT(k->i_id >= 1 && static_cast<size_t>(k->i_id) <= NumItems());
226 #ifdef DISABLE_FIELD_SELECTION
227 INVARIANT(v->i_price >= 1.0 && v->i_price <= 100.0);
231 static inline ALWAYS_INLINE void
232 SanityCheckStock(const stock::key *k, const stock::value *v)
234 INVARIANT(k->s_w_id >= 1 && static_cast<size_t>(k->s_w_id) <= NumWarehouses());
235 INVARIANT(k->s_i_id >= 1 && static_cast<size_t>(k->s_i_id) <= NumItems());
238 static inline ALWAYS_INLINE void
239 SanityCheckNewOrder(const new_order::key *k, const new_order::value *v)
241 INVARIANT(k->no_w_id >= 1 && static_cast<size_t>(k->no_w_id) <= NumWarehouses());
242 INVARIANT(k->no_d_id >= 1 && static_cast<size_t>(k->no_d_id) <= NumDistrictsPerWarehouse());
245 static inline ALWAYS_INLINE void
246 SanityCheckOOrder(const oorder::key *k, const oorder::value *v)
248 INVARIANT(k->o_w_id >= 1 && static_cast<size_t>(k->o_w_id) <= NumWarehouses());
249 INVARIANT(k->o_d_id >= 1 && static_cast<size_t>(k->o_d_id) <= NumDistrictsPerWarehouse());
250 #ifdef DISABLE_FIELD_SELECTION
251 INVARIANT(v->o_c_id >= 1 && static_cast<size_t>(v->o_c_id) <= NumCustomersPerDistrict());
252 INVARIANT(v->o_carrier_id >= 0 && static_cast<size_t>(v->o_carrier_id) <= NumDistrictsPerWarehouse());
253 INVARIANT(v->o_ol_cnt >= 5 && v->o_ol_cnt <= 15);
257 static inline ALWAYS_INLINE void
258 SanityCheckOrderLine(const order_line::key *k, const order_line::value *v)
260 INVARIANT(k->ol_w_id >= 1 && static_cast<size_t>(k->ol_w_id) <= NumWarehouses());
261 INVARIANT(k->ol_d_id >= 1 && static_cast<size_t>(k->ol_d_id) <= NumDistrictsPerWarehouse());
262 INVARIANT(k->ol_number >= 1 && k->ol_number <= 15);
263 #ifdef DISABLE_FIELD_SELECTION
264 INVARIANT(v->ol_i_id >= 1 && static_cast<size_t>(v->ol_i_id) <= NumItems());
269 static string NameTokens[] =
283 class tpcc_worker_mixin {
286 // only TPCC loaders need to call this- workers are automatically
287 // pinned by their worker id (which corresponds to warehouse id
290 // pins the *calling* thread
292 PinToWarehouseId(unsigned int wid)
294 const unsigned int partid = PartitionId(wid);
295 ALWAYS_ASSERT(partid < nthreads);
296 const unsigned int pinid = partid;
298 cerr << "PinToWarehouseId(): coreid=" << coreid::core_id()
299 << " pinned to whse=" << wid << " (partid=" << partid << ")"
301 rcu::s_instance.pin_current_thread(pinid);
302 rcu::s_instance.fault_region();
305 static inline uint32_t
306 GetCurrentTimeMillis()
309 //ALWAYS_ASSERT(gettimeofday(&tv, 0) == 0);
310 //return tv.tv_sec * 1000;
312 // XXX(stephentu): implement a scalable GetCurrentTimeMillis()
313 // for now, we just give each core an increasing number
315 static __thread uint32_t tl_hack = 0;
319 // utils for generating random #s and strings
321 static inline ALWAYS_INLINE int
322 CheckBetweenInclusive(int v, int lower, int upper)
324 INVARIANT(v >= lower);
325 INVARIANT(v <= upper);
329 static inline ALWAYS_INLINE int
330 RandomNumber(fast_random &r, int min, int max)
332 return CheckBetweenInclusive((int) (r.next_uniform() * (max - min + 1) + min), min, max);
335 static inline ALWAYS_INLINE int
336 NonUniformRandom(fast_random &r, int A, int C, int min, int max)
338 return (((RandomNumber(r, 0, A) | RandomNumber(r, min, max)) + C) % (max - min + 1)) + min;
341 static inline ALWAYS_INLINE int
342 GetItemId(fast_random &r)
344 return CheckBetweenInclusive(
345 g_uniform_item_dist ?
346 RandomNumber(r, 1, NumItems()) :
347 NonUniformRandom(r, 8191, 7911, 1, NumItems()),
351 static inline ALWAYS_INLINE int
352 GetCustomerId(fast_random &r)
354 return CheckBetweenInclusive(NonUniformRandom(r, 1023, 259, 1, NumCustomersPerDistrict()), 1, NumCustomersPerDistrict());
357 // pick a number between [start, end)
358 static inline ALWAYS_INLINE unsigned
359 PickWarehouseId(fast_random &r, unsigned start, unsigned end)
361 INVARIANT(start < end);
362 const unsigned diff = end - start;
365 return (r.next() % diff) + start;
367 // all tokens are at most 5 chars long
368 static const size_t CustomerLastNameMaxSize = 5 * 3;
371 GetCustomerLastName(uint8_t *buf, fast_random &r, int num)
373 const string &s0 = NameTokens[num / 100];
374 const string &s1 = NameTokens[(num / 10) % 10];
375 const string &s2 = NameTokens[num % 10];
376 uint8_t *const begin = buf;
377 const size_t s0_sz = s0.size();
378 const size_t s1_sz = s1.size();
379 const size_t s2_sz = s2.size();
380 NDB_MEMCPY(buf, s0.data(), s0_sz); buf += s0_sz;
381 NDB_MEMCPY(buf, s1.data(), s1_sz); buf += s1_sz;
382 NDB_MEMCPY(buf, s2.data(), s2_sz); buf += s2_sz;
386 static inline ALWAYS_INLINE size_t
387 GetCustomerLastName(char *buf, fast_random &r, int num)
389 return GetCustomerLastName((uint8_t *) buf, r, num);
393 GetCustomerLastName(fast_random &r, int num)
396 ret.resize(CustomerLastNameMaxSize);
397 ret.resize(GetCustomerLastName((uint8_t *) &ret[0], r, num));
401 static inline ALWAYS_INLINE string
402 GetNonUniformCustomerLastNameLoad(fast_random &r)
404 return GetCustomerLastName(r, NonUniformRandom(r, 255, 157, 0, 999));
407 static inline ALWAYS_INLINE size_t
408 GetNonUniformCustomerLastNameRun(uint8_t *buf, fast_random &r)
410 return GetCustomerLastName(buf, r, NonUniformRandom(r, 255, 223, 0, 999));
413 static inline ALWAYS_INLINE size_t
414 GetNonUniformCustomerLastNameRun(char *buf, fast_random &r)
416 return GetNonUniformCustomerLastNameRun((uint8_t *) buf, r);
419 static inline ALWAYS_INLINE string
420 GetNonUniformCustomerLastNameRun(fast_random &r)
422 return GetCustomerLastName(r, NonUniformRandom(r, 255, 223, 0, 999));
425 // following oltpbench, we really generate strings of len - 1...
427 RandomStr(fast_random &r, uint len)
429 // this is a property of the oltpbench implementation...
434 string buf(len - 1, 0);
435 while (i < (len - 1)) {
436 const char c = (char) r.next_char();
437 // XXX(stephentu): oltpbench uses java's Character.isLetter(), which
438 // is a less restrictive filter than isalnum()
446 // RandomNStr() actually produces a string of length len
448 RandomNStr(fast_random &r, uint len)
450 const char base = '0';
452 for (uint i = 0; i < len; i++)
453 buf[i] = (char)(base + (r.next() % 10));
459 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, tpcc_txn, tpcc_txn_cg)
461 template <typename Database, bool AllowReadOnlyScans>
462 class tpcc_worker : public bench_worker, public tpcc_worker_mixin {
464 tpcc_worker(unsigned int worker_id,
465 unsigned long seed, Database *db,
466 const tpcc_tables<Database> &tables,
467 spin_barrier *barrier_a, spin_barrier *barrier_b,
468 uint warehouse_id_start, uint warehouse_id_end)
469 : bench_worker(worker_id, true, seed, db,
470 barrier_a, barrier_b),
473 warehouse_id_start(warehouse_id_start),
474 warehouse_id_end(warehouse_id_end)
476 ALWAYS_ASSERT(g_disable_read_only_scans == !AllowReadOnlyScans);
477 INVARIANT(warehouse_id_start >= 1);
478 INVARIANT(warehouse_id_start <= NumWarehouses());
479 INVARIANT(warehouse_id_end > warehouse_id_start);
480 INVARIANT(warehouse_id_end <= (NumWarehouses() + 1));
481 NDB_MEMSET(&last_no_o_ids[0], 0, sizeof(last_no_o_ids));
483 cerr << "tpcc: worker id " << worker_id
484 << " => warehouses [" << warehouse_id_start
485 << ", " << warehouse_id_end << ")"
490 // XXX(stephentu): tune this
491 static const size_t NMaxCustomerIdxScanElems = 512;
493 txn_result txn_new_order();
496 TxnNewOrder(bench_worker *w)
498 ANON_REGION("TxnNewOrder:", &tpcc_txn_cg);
499 return static_cast<tpcc_worker *>(w)->txn_new_order();
502 txn_result txn_delivery();
505 TxnDelivery(bench_worker *w)
507 ANON_REGION("TxnDelivery:", &tpcc_txn_cg);
508 return static_cast<tpcc_worker *>(w)->txn_delivery();
511 txn_result txn_payment();
514 TxnPayment(bench_worker *w)
516 ANON_REGION("TxnPayment:", &tpcc_txn_cg);
517 return static_cast<tpcc_worker *>(w)->txn_payment();
520 txn_result txn_order_status();
523 TxnOrderStatus(bench_worker *w)
525 ANON_REGION("TxnOrderStatus:", &tpcc_txn_cg);
526 return static_cast<tpcc_worker *>(w)->txn_order_status();
529 txn_result txn_stock_level();
532 TxnStockLevel(bench_worker *w)
534 ANON_REGION("TxnStockLevel:", &tpcc_txn_cg);
535 return static_cast<tpcc_worker *>(w)->txn_stock_level();
538 virtual workload_desc_vec
539 get_workload() const OVERRIDE
542 // numbers from sigmod.csail.mit.edu:
543 //w.push_back(workload_desc("NewOrder", 1.0, TxnNewOrder)); // ~10k ops/sec
544 //w.push_back(workload_desc("Payment", 1.0, TxnPayment)); // ~32k ops/sec
545 //w.push_back(workload_desc("Delivery", 1.0, TxnDelivery)); // ~104k ops/sec
546 //w.push_back(workload_desc("OrderStatus", 1.0, TxnOrderStatus)); // ~33k ops/sec
547 //w.push_back(workload_desc("StockLevel", 1.0, TxnStockLevel)); // ~2k ops/sec
549 for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
550 m += g_txn_workload_mix[i];
551 ALWAYS_ASSERT(m == 100);
552 if (g_txn_workload_mix[0])
553 w.push_back(workload_desc("NewOrder", double(g_txn_workload_mix[0])/100.0, TxnNewOrder));
554 if (g_txn_workload_mix[1])
555 w.push_back(workload_desc("Payment", double(g_txn_workload_mix[1])/100.0, TxnPayment));
556 if (g_txn_workload_mix[2])
557 w.push_back(workload_desc("Delivery", double(g_txn_workload_mix[2])/100.0, TxnDelivery));
558 if (g_txn_workload_mix[3])
559 w.push_back(workload_desc("OrderStatus", double(g_txn_workload_mix[3])/100.0, TxnOrderStatus));
560 if (g_txn_workload_mix[4])
561 w.push_back(workload_desc("StockLevel", double(g_txn_workload_mix[4])/100.0, TxnStockLevel));
568 on_run_setup() OVERRIDE
572 const size_t a = worker_id % coreid::num_cpus_online();
573 const size_t b = a % nthreads;
574 rcu::s_instance.pin_current_thread(b);
575 rcu::s_instance.fault_region();
579 tpcc_tables<Database> tables;
580 const uint warehouse_id_start;
581 const uint warehouse_id_end;
582 int32_t last_no_o_ids[10]; // XXX(stephentu): hack
585 template <typename Database>
586 class tpcc_warehouse_loader : public typed_bench_loader<Database>,
587 public tpcc_worker_mixin {
589 tpcc_warehouse_loader(unsigned long seed,
591 const tpcc_tables<Database> &tables)
592 : typed_bench_loader<Database>(seed, db),
601 uint64_t warehouse_total_sz = 0, n_warehouses = 0;
603 vector<warehouse::value> warehouses;
605 scoped_str_arena s_arena(this->arena);
606 typename Database::template
607 TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
608 for (uint i = 1; i <= NumWarehouses(); i++) {
609 const warehouse::key k(i);
611 const string w_name = RandomStr(this->r, RandomNumber(this->r, 6, 10));
612 const string w_street_1 = RandomStr(this->r, RandomNumber(this->r, 10, 20));
613 const string w_street_2 = RandomStr(this->r, RandomNumber(this->r, 10, 20));
614 const string w_city = RandomStr(this->r, RandomNumber(this->r, 10, 20));
615 const string w_state = RandomStr(this->r, 3);
616 const string w_zip = "123456789";
620 v.w_tax = (float) RandomNumber(this->r, 0, 2000) / 10000.0;
621 v.w_name.assign(w_name);
622 v.w_street_1.assign(w_street_1);
623 v.w_street_2.assign(w_street_2);
624 v.w_city.assign(w_city);
625 v.w_state.assign(w_state);
626 v.w_zip.assign(w_zip);
628 checker::SanityCheckWarehouse(&k, &v);
629 const size_t sz = Size(v);
630 warehouse_total_sz += sz;
632 tables.tbl_warehouse(i)->insert(txn, k, v);
633 warehouses.push_back(v);
635 ALWAYS_ASSERT(txn.commit());
638 scoped_str_arena s_arena(this->arena);
639 typename Database::template
640 TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
641 for (uint i = 1; i <= NumWarehouses(); i++) {
642 const warehouse::key k(i);
644 ALWAYS_ASSERT(tables.tbl_warehouse(i)->search(txn, k, v));
645 ALWAYS_ASSERT(warehouses[i - 1] == v);
646 checker::SanityCheckWarehouse(&k, &v);
648 ALWAYS_ASSERT(txn.commit());
650 } catch (typename Database::abort_exception_type &e) {
651 // shouldn't abort on loading!
652 ALWAYS_ASSERT(false);
655 cerr << "[INFO] finished loading warehouse" << endl;
656 cerr << "[INFO] * average warehouse record length: "
657 << (double(warehouse_total_sz)/double(n_warehouses)) << " bytes" << endl;
661 tpcc_tables<Database> tables;
664 template <typename Database>
665 class tpcc_item_loader : public typed_bench_loader<Database>,
666 public tpcc_worker_mixin {
668 tpcc_item_loader(unsigned long seed,
670 const tpcc_tables<Database> &tables)
671 : typed_bench_loader<Database>(seed, db),
680 const ssize_t bsize = this->typed_db()->txn_max_batch_size();
681 auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
682 uint64_t total_sz = 0;
684 for (uint i = 1; i <= NumItems(); i++) {
685 // items don't "belong" to a certain warehouse, so no pinning
686 const item::key k(i);
689 const string i_name = RandomStr(this->r, RandomNumber(this->r, 14, 24));
690 v.i_name.assign(i_name);
691 v.i_price = (float) RandomNumber(this->r, 100, 10000) / 100.0;
692 const int len = RandomNumber(this->r, 26, 50);
693 if (RandomNumber(this->r, 1, 100) > 10) {
694 const string i_data = RandomStr(this->r, len);
695 v.i_data.assign(i_data);
697 const int startOriginal = RandomNumber(this->r, 2, (len - 8));
698 const string i_data = RandomStr(this->r, startOriginal + 1) + "ORIGINAL" + RandomStr(this->r, len - startOriginal - 7);
699 v.i_data.assign(i_data);
701 v.i_im_id = RandomNumber(this->r, 1, 10000);
703 checker::SanityCheckItem(&k, &v);
704 const size_t sz = Size(v);
706 // XXX: replicate items table across all NUMA nodes
707 tables.tbl_item(1)->insert(*txn, k, v); // this table is shared, so any partition is OK
709 if (bsize != -1 && !(i % bsize)) {
710 ALWAYS_ASSERT(txn->commit());
711 txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
715 ALWAYS_ASSERT(txn->commit());
716 } catch (typename Database::abort_exception_type &e) {
717 // shouldn't abort on loading!
718 ALWAYS_ASSERT(false);
721 cerr << "[INFO] finished loading item" << endl;
722 cerr << "[INFO] * average item record length: "
723 << (double(total_sz)/double(NumItems())) << " bytes" << endl;
727 tpcc_tables<Database> tables;
730 template <typename Database>
731 class tpcc_stock_loader : public typed_bench_loader<Database>,
732 public tpcc_worker_mixin {
734 tpcc_stock_loader(unsigned long seed,
736 const tpcc_tables<Database> &tables,
737 ssize_t warehouse_id)
738 : typed_bench_loader<Database>(seed, db),
741 warehouse_id(warehouse_id)
743 ALWAYS_ASSERT(warehouse_id == -1 ||
744 (warehouse_id >= 1 &&
745 static_cast<size_t>(warehouse_id) <= NumWarehouses()));
752 uint64_t stock_total_sz = 0, n_stocks = 0;
753 const uint w_start = (warehouse_id == -1) ?
754 1 : static_cast<uint>(warehouse_id);
755 const uint w_end = (warehouse_id == -1) ?
756 NumWarehouses() : static_cast<uint>(warehouse_id);
758 for (uint w = w_start; w <= w_end; w++) {
759 const size_t batchsize =
760 (this->typed_db()->txn_max_batch_size() == -1) ?
761 NumItems() : this->typed_db()->txn_max_batch_size();
762 const size_t nbatches = (batchsize > NumItems()) ? 1 : (NumItems() / batchsize);
767 for (uint b = 0; b < nbatches;) {
768 scoped_str_arena s_arena(this->arena);
769 auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
771 const size_t iend = std::min((b + 1) * batchsize + 1, NumItems());
772 for (uint i = (b * batchsize + 1); i <= iend; i++) {
773 const stock::key k(w, i);
774 const stock_data::key k_data(w, i);
777 v.s_quantity = RandomNumber(this->r, 10, 100);
782 stock_data::value v_data;
783 const int len = RandomNumber(this->r, 26, 50);
784 if (RandomNumber(this->r, 1, 100) > 10) {
785 const string s_data = RandomStr(this->r, len);
786 v_data.s_data.assign(s_data);
788 const int startOriginal = RandomNumber(this->r, 2, (len - 8));
789 const string s_data = RandomStr(this->r, startOriginal + 1) +
790 "ORIGINAL" + RandomStr(this->r, len - startOriginal - 7);
791 v_data.s_data.assign(s_data);
793 v_data.s_dist_01.assign(RandomStr(this->r, 24));
794 v_data.s_dist_02.assign(RandomStr(this->r, 24));
795 v_data.s_dist_03.assign(RandomStr(this->r, 24));
796 v_data.s_dist_04.assign(RandomStr(this->r, 24));
797 v_data.s_dist_05.assign(RandomStr(this->r, 24));
798 v_data.s_dist_06.assign(RandomStr(this->r, 24));
799 v_data.s_dist_07.assign(RandomStr(this->r, 24));
800 v_data.s_dist_08.assign(RandomStr(this->r, 24));
801 v_data.s_dist_09.assign(RandomStr(this->r, 24));
802 v_data.s_dist_10.assign(RandomStr(this->r, 24));
804 checker::SanityCheckStock(&k, &v);
805 const size_t sz = Size(v);
806 stock_total_sz += sz;
808 tables.tbl_stock(w)->insert(*txn, k, v);
809 tables.tbl_stock_data(w)->insert(*txn, k_data, v_data);
815 cerr << "[WARNING] stock loader loading abort" << endl;
817 } catch (typename Database::abort_exception_type &e) {
819 ALWAYS_ASSERT(warehouse_id != -1);
821 cerr << "[WARNING] stock loader loading abort" << endl;
827 if (warehouse_id == -1) {
828 cerr << "[INFO] finished loading stock" << endl;
829 cerr << "[INFO] * average stock record length: "
830 << (double(stock_total_sz)/double(n_stocks)) << " bytes" << endl;
832 cerr << "[INFO] finished loading stock (w=" << warehouse_id << ")" << endl;
838 tpcc_tables<Database> tables;
839 ssize_t warehouse_id;
842 template <typename Database>
843 class tpcc_district_loader : public typed_bench_loader<Database>,
844 public tpcc_worker_mixin {
846 tpcc_district_loader(unsigned long seed,
848 const tpcc_tables<Database> &tables)
849 : typed_bench_loader<Database>(seed, db),
858 const ssize_t bsize = this->typed_db()->txn_max_batch_size();
859 auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
860 uint64_t district_total_sz = 0, n_districts = 0;
863 for (uint w = 1; w <= NumWarehouses(); w++) {
866 for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++, cnt++) {
867 const district::key k(w, d);
871 v.d_tax = (float) (RandomNumber(this->r, 0, 2000) / 10000.0);
872 v.d_next_o_id = 3001;
873 v.d_name.assign(RandomStr(this->r, RandomNumber(this->r, 6, 10)));
874 v.d_street_1.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
875 v.d_street_2.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
876 v.d_city.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
877 v.d_state.assign(RandomStr(this->r, 3));
878 v.d_zip.assign("123456789");
880 checker::SanityCheckDistrict(&k, &v);
881 const size_t sz = Size(v);
882 district_total_sz += sz;
884 tables.tbl_district(w)->insert(*txn, k, v);
886 if (bsize != -1 && !((cnt + 1) % bsize)) {
887 ALWAYS_ASSERT(txn->commit());
888 txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
893 ALWAYS_ASSERT(txn->commit());
894 } catch (typename Database::abort_exception_type &e) {
895 // shouldn't abort on loading!
896 ALWAYS_ASSERT(false);
899 cerr << "[INFO] finished loading district" << endl;
900 cerr << "[INFO] * average district record length: "
901 << (double(district_total_sz)/double(n_districts)) << " bytes" << endl;
905 tpcc_tables<Database> tables;
908 template <typename Database>
909 class tpcc_customer_loader : public typed_bench_loader<Database>,
910 public tpcc_worker_mixin {
912 tpcc_customer_loader(unsigned long seed,
914 const tpcc_tables<Database> &tables,
915 ssize_t warehouse_id)
916 : typed_bench_loader<Database>(seed, db),
919 warehouse_id(warehouse_id)
921 ALWAYS_ASSERT(warehouse_id == -1 ||
922 (warehouse_id >= 1 &&
923 static_cast<size_t>(warehouse_id) <= NumWarehouses()));
930 const uint w_start = (warehouse_id == -1) ?
931 1 : static_cast<uint>(warehouse_id);
932 const uint w_end = (warehouse_id == -1) ?
933 NumWarehouses() : static_cast<uint>(warehouse_id);
934 const size_t batchsize =
935 (this->typed_db()->txn_max_batch_size() == -1) ?
936 NumCustomersPerDistrict() : this->typed_db()->txn_max_batch_size();
937 const size_t nbatches =
938 (batchsize > NumCustomersPerDistrict()) ?
939 1 : (NumCustomersPerDistrict() / batchsize);
940 cerr << "num batches: " << nbatches << endl;
942 uint64_t total_sz = 0;
944 for (uint w = w_start; w <= w_end; w++) {
947 for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
948 for (uint batch = 0; batch < nbatches;) {
949 scoped_str_arena s_arena(this->arena);
950 typename Database::template TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
951 const size_t cstart = batch * batchsize;
952 const size_t cend = std::min((batch + 1) * batchsize, NumCustomersPerDistrict());
954 for (uint cidx0 = cstart; cidx0 < cend; cidx0++) {
955 const uint c = cidx0 + 1;
956 const customer::key k(w, d, c);
959 v.c_discount = (float) (RandomNumber(this->r, 1, 5000) / 10000.0);
960 if (RandomNumber(this->r, 1, 100) <= 10)
961 v.c_credit.assign("BC");
963 v.c_credit.assign("GC");
966 v.c_last.assign(GetCustomerLastName(this->r, c - 1));
968 v.c_last.assign(GetNonUniformCustomerLastNameLoad(this->r));
970 v.c_first.assign(RandomStr(this->r, RandomNumber(this->r, 8, 16)));
971 v.c_credit_lim = 50000;
974 v.c_ytd_payment = 10;
976 v.c_delivery_cnt = 0;
978 v.c_street_1.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
979 v.c_street_2.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
980 v.c_city.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
981 v.c_state.assign(RandomStr(this->r, 3));
982 v.c_zip.assign(RandomNStr(this->r, 4) + "11111");
983 v.c_phone.assign(RandomNStr(this->r, 16));
984 v.c_since = GetCurrentTimeMillis();
985 v.c_middle.assign("OE");
986 v.c_data.assign(RandomStr(this->r, RandomNumber(this->r, 300, 500)));
988 checker::SanityCheckCustomer(&k, &v);
989 const size_t sz = Size(v);
991 tables.tbl_customer(w)->insert(txn, k, v);
993 // customer name index
994 const customer_name_idx::key k_idx(k.c_w_id, k.c_d_id, v.c_last.str(true), v.c_first.str(true));
995 const customer_name_idx::value v_idx(k.c_id);
997 // index structure is:
998 // (c_w_id, c_d_id, c_last, c_first) -> (c_id)
1000 tables.tbl_customer_name_idx(w)->insert(txn, k_idx, v_idx);
1002 history::key k_hist;
1004 k_hist.h_c_d_id = d;
1005 k_hist.h_c_w_id = w;
1008 k_hist.h_date = GetCurrentTimeMillis();
1010 history::value v_hist;
1011 v_hist.h_amount = 10;
1012 v_hist.h_data.assign(RandomStr(this->r, RandomNumber(this->r, 10, 24)));
1014 tables.tbl_history(w)->insert(txn, k_hist, v_hist);
1021 cerr << "[WARNING] customer loader loading abort" << endl;
1023 } catch (typename Database::abort_exception_type &e) {
1026 cerr << "[WARNING] customer loader loading abort" << endl;
1033 if (warehouse_id == -1) {
1034 cerr << "[INFO] finished loading customer" << endl;
1035 cerr << "[INFO] * average customer record length: "
1036 << (double(total_sz)/double(NumWarehouses()*NumDistrictsPerWarehouse()*NumCustomersPerDistrict()))
1037 << " bytes " << endl;
1039 cerr << "[INFO] finished loading customer (w=" << warehouse_id << ")" << endl;
1045 tpcc_tables<Database> tables;
1046 ssize_t warehouse_id;
1049 template <typename Database>
1050 class tpcc_order_loader : public typed_bench_loader<Database>,
1051 public tpcc_worker_mixin {
1053 tpcc_order_loader(unsigned long seed,
1055 const tpcc_tables<Database> &tables,
1056 ssize_t warehouse_id)
1057 : typed_bench_loader<Database>(seed, db),
1058 tpcc_worker_mixin(),
1060 warehouse_id(warehouse_id)
1062 ALWAYS_ASSERT(warehouse_id == -1 ||
1063 (warehouse_id >= 1 &&
1064 static_cast<size_t>(warehouse_id) <= NumWarehouses()));
1071 uint64_t order_line_total_sz = 0, n_order_lines = 0;
1072 uint64_t oorder_total_sz = 0, n_oorders = 0;
1073 uint64_t new_order_total_sz = 0, n_new_orders = 0;
1075 const uint w_start = (warehouse_id == -1) ?
1076 1 : static_cast<uint>(warehouse_id);
1077 const uint w_end = (warehouse_id == -1) ?
1078 NumWarehouses() : static_cast<uint>(warehouse_id);
1080 for (uint w = w_start; w <= w_end; w++) {
1082 PinToWarehouseId(w);
1083 for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
1086 while (c_ids.size() != NumCustomersPerDistrict()) {
1087 const auto x = (this->r.next() % NumCustomersPerDistrict()) + 1;
1088 if (c_ids_s.count(x))
1091 c_ids.emplace_back(x);
1093 for (uint c = 1; c <= NumCustomersPerDistrict();) {
1094 scoped_str_arena s_arena(this->arena);
1095 typename Database::template TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
1097 const oorder::key k_oo(w, d, c);
1100 v_oo.o_c_id = c_ids[c - 1];
1101 if (k_oo.o_id < 2101)
1102 v_oo.o_carrier_id = RandomNumber(this->r, 1, 10);
1104 v_oo.o_carrier_id = 0;
1105 v_oo.o_ol_cnt = RandomNumber(this->r, 5, 15);
1106 v_oo.o_all_local = 1;
1107 v_oo.o_entry_d = GetCurrentTimeMillis();
1109 checker::SanityCheckOOrder(&k_oo, &v_oo);
1110 const size_t sz = Size(v_oo);
1111 oorder_total_sz += sz;
1113 tables.tbl_oorder(w)->insert(txn, k_oo, v_oo);
1115 const oorder_c_id_idx::key k_oo_idx(k_oo.o_w_id, k_oo.o_d_id, v_oo.o_c_id, k_oo.o_id);
1116 const oorder_c_id_idx::value v_oo_idx(0);
1118 tables.tbl_oorder_c_id_idx(w)->insert(txn, k_oo_idx, v_oo_idx);
1121 const new_order::key k_no(w, d, c);
1122 const new_order::value v_no;
1124 checker::SanityCheckNewOrder(&k_no, &v_no);
1125 const size_t sz = Size(v_no);
1126 new_order_total_sz += sz;
1128 tables.tbl_new_order(w)->insert(txn, k_no, v_no);
1131 for (uint l = 1; l <= uint(v_oo.o_ol_cnt); l++) {
1132 const order_line::key k_ol(w, d, c, l);
1134 order_line::value v_ol;
1135 v_ol.ol_i_id = RandomNumber(this->r, 1, 100000);
1136 if (k_ol.ol_o_id < 2101) {
1137 v_ol.ol_delivery_d = v_oo.o_entry_d;
1140 v_ol.ol_delivery_d = 0;
1141 // random within [0.01 .. 9,999.99]
1142 v_ol.ol_amount = (float) (RandomNumber(this->r, 1, 999999) / 100.0);
1145 v_ol.ol_supply_w_id = k_ol.ol_w_id;
1146 v_ol.ol_quantity = 5;
1147 // v_ol.ol_dist_info comes from stock_data(ol_supply_w_id, ol_o_id)
1148 //v_ol.ol_dist_info = RandomStr(this->r, 24);
1150 checker::SanityCheckOrderLine(&k_ol, &v_ol);
1151 const size_t sz = Size(v_ol);
1152 order_line_total_sz += sz;
1154 tables.tbl_order_line(w)->insert(txn, k_ol, v_ol);
1160 ALWAYS_ASSERT(warehouse_id != -1);
1162 cerr << "[WARNING] order loader loading abort" << endl;
1164 } catch (typename Database::abort_exception_type &e) {
1166 ALWAYS_ASSERT(warehouse_id != -1);
1168 cerr << "[WARNING] order loader loading abort" << endl;
1175 if (warehouse_id == -1) {
1176 cerr << "[INFO] finished loading order" << endl;
1177 cerr << "[INFO] * average order_line record length: "
1178 << (double(order_line_total_sz)/double(n_order_lines)) << " bytes" << endl;
1179 cerr << "[INFO] * average oorder record length: "
1180 << (double(oorder_total_sz)/double(n_oorders)) << " bytes" << endl;
1181 cerr << "[INFO] * average new_order record length: "
1182 << (double(new_order_total_sz)/double(n_new_orders)) << " bytes" << endl;
1184 cerr << "[INFO] finished loading order (w=" << warehouse_id << ")" << endl;
1190 tpcc_tables<Database> tables;
1191 ssize_t warehouse_id;
1194 static event_counter evt_tpcc_cross_partition_new_order_txns("tpcc_cross_partition_new_order_txns");
1195 static event_counter evt_tpcc_cross_partition_payment_txns("tpcc_cross_partition_payment_txns");
1197 template <typename Database, bool AllowReadOnlyScans>
1198 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1199 tpcc_worker<Database, AllowReadOnlyScans>::txn_new_order()
1201 const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1202 const uint districtID = RandomNumber(this->r, 1, 10);
1203 const uint customerID = GetCustomerId(r);
1204 const uint numItems = RandomNumber(this->r, 5, 15);
1205 uint itemIDs[15], supplierWarehouseIDs[15], orderQuantities[15];
1206 bool allLocal = true;
1207 for (uint i = 0; i < numItems; i++) {
1208 itemIDs[i] = GetItemId(r);
1209 if (likely(g_disable_xpartition_txn ||
1210 NumWarehouses() == 1 ||
1211 RandomNumber(this->r, 1, 100) > g_new_order_remote_item_pct)) {
1212 supplierWarehouseIDs[i] = warehouse_id;
1215 supplierWarehouseIDs[i] = RandomNumber(this->r, 1, NumWarehouses());
1216 } while (supplierWarehouseIDs[i] == warehouse_id);
1219 orderQuantities[i] = RandomNumber(this->r, 1, 10);
1221 INVARIANT(!g_disable_xpartition_txn || allLocal);
1223 ++evt_tpcc_cross_partition_new_order_txns;
1225 // XXX(stephentu): implement rollback
1227 // worst case txn profile:
1231 // 1 new_order insert
1234 // 1 oorder_cid_idx insert
1239 // 1 order_line insert
1241 // output from txn counters:
1242 // max_absent_range_set_size : 0
1243 // max_absent_set_size : 0
1244 // max_node_scan_size : 0
1245 // max_read_set_size : 15
1246 // max_write_set_size : 15
1247 // num_txn_contexts : 9
1248 typename Database::template TransactionType<abstract_db::HINT_TPCC_NEW_ORDER>::type txn(txn_flags, this->arena);
1249 scoped_str_arena s_arena(this->arena);
1250 scoped_multilock<spinlock> mlock;
1251 if (g_enable_partition_locks) {
1253 mlock.enq(LockForPartition(warehouse_id));
1255 small_unordered_map<unsigned int, bool, 64> lockset;
1256 mlock.enq(LockForPartition(warehouse_id));
1257 lockset[PartitionId(warehouse_id)] = 1;
1258 for (uint i = 0; i < numItems; i++) {
1259 if (lockset.find(PartitionId(supplierWarehouseIDs[i])) == lockset.end()) {
1260 mlock.enq(LockForPartition(supplierWarehouseIDs[i]));
1261 lockset[PartitionId(supplierWarehouseIDs[i])] = 1;
1269 const customer::key k_c(warehouse_id, districtID, customerID);
1270 customer::value v_c;
1272 tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c,
1274 customer::value::c_discount_field,
1275 customer::value::c_last_field,
1276 customer::value::c_credit_field)));
1277 checker::SanityCheckCustomer(&k_c, &v_c);
1279 const warehouse::key k_w(warehouse_id);
1280 warehouse::value v_w;
1282 tables.tbl_warehouse(warehouse_id)->search(txn, k_w, v_w,
1283 GUARDED_FIELDS(warehouse::value::w_tax_field)));
1284 checker::SanityCheckWarehouse(&k_w, &v_w);
1286 const district::key k_d(warehouse_id, districtID);
1287 district::value v_d;
1289 tables.tbl_district(warehouse_id)->search(txn, k_d, v_d,
1291 district::value::d_next_o_id_field,
1292 district::value::d_tax_field)));
1293 checker::SanityCheckDistrict(&k_d, &v_d);
1295 const uint64_t my_next_o_id = g_new_order_fast_id_gen ?
1296 FastNewOrderIdGen(warehouse_id, districtID) : v_d.d_next_o_id;
1298 const new_order::key k_no(warehouse_id, districtID, my_next_o_id);
1299 const new_order::value v_no;
1300 const size_t new_order_sz = Size(v_no);
1301 tables.tbl_new_order(warehouse_id)->insert(txn, k_no, v_no);
1302 ret += new_order_sz;
1304 if (!g_new_order_fast_id_gen) {
1306 tables.tbl_district(warehouse_id)->put(txn, k_d, v_d,
1307 GUARDED_FIELDS(district::value::d_next_o_id_field));
1310 const oorder::key k_oo(warehouse_id, districtID, k_no.no_o_id);
1312 v_oo.o_c_id = int32_t(customerID);
1313 v_oo.o_carrier_id = 0; // seems to be ignored
1314 v_oo.o_ol_cnt = int8_t(numItems);
1315 v_oo.o_all_local = allLocal;
1316 v_oo.o_entry_d = GetCurrentTimeMillis();
1318 const size_t oorder_sz = Size(v_oo);
1319 tables.tbl_oorder(warehouse_id)->insert(txn, k_oo, v_oo);
1322 const oorder_c_id_idx::key k_oo_idx(warehouse_id, districtID, customerID, k_no.no_o_id);
1323 const oorder_c_id_idx::value v_oo_idx(0);
1325 tables.tbl_oorder_c_id_idx(warehouse_id)->insert(txn, k_oo_idx, v_oo_idx);
1327 for (uint ol_number = 1; ol_number <= numItems; ol_number++) {
1328 const uint ol_supply_w_id = supplierWarehouseIDs[ol_number - 1];
1329 const uint ol_i_id = itemIDs[ol_number - 1];
1330 const uint ol_quantity = orderQuantities[ol_number - 1];
1332 const item::key k_i(ol_i_id);
1334 ALWAYS_ASSERT(tables.tbl_item(1)->search(txn, k_i, v_i,
1336 item::value::i_price_field,
1337 item::value::i_name_field,
1338 item::value::i_data_field)));
1339 checker::SanityCheckItem(&k_i, &v_i);
1341 const stock::key k_s(ol_supply_w_id, ol_i_id);
1343 ALWAYS_ASSERT(tables.tbl_stock(ol_supply_w_id)->search(txn, k_s, v_s));
1344 checker::SanityCheckStock(&k_s, &v_s);
1346 stock::value v_s_new(v_s);
1347 if (v_s_new.s_quantity - ol_quantity >= 10)
1348 v_s_new.s_quantity -= ol_quantity;
1350 v_s_new.s_quantity += -int32_t(ol_quantity) + 91;
1351 v_s_new.s_ytd += ol_quantity;
1352 v_s_new.s_remote_cnt += (ol_supply_w_id == warehouse_id) ? 0 : 1;
1354 tables.tbl_stock(ol_supply_w_id)->put(txn, k_s, v_s_new);
1356 const order_line::key k_ol(warehouse_id, districtID, k_no.no_o_id, ol_number);
1357 order_line::value v_ol;
1358 v_ol.ol_i_id = int32_t(ol_i_id);
1359 v_ol.ol_delivery_d = 0; // not delivered yet
1360 v_ol.ol_amount = float(ol_quantity) * v_i.i_price;
1361 v_ol.ol_supply_w_id = int32_t(ol_supply_w_id);
1362 v_ol.ol_quantity = int8_t(ol_quantity);
1364 const size_t order_line_sz = Size(v_ol);
1365 tables.tbl_order_line(warehouse_id)->insert(txn, k_ol, v_ol);
1366 ret += order_line_sz;
1369 //measure_txn_counters(txn, "txn_new_order");
1370 if (likely(txn.commit()))
1371 return txn_result(true, ret);
1372 } catch (typename Database::abort_exception_type &e) {
1375 return txn_result(false, 0);
1378 template <typename Database>
1379 class new_order_scan_callback : public
1380 Database::template IndexType<schema<new_order>>::type::search_range_callback {
1382 new_order_scan_callback() : invoked(false) {}
1384 invoke(const new_order::key &key, const new_order::value &value) OVERRIDE
1386 INVARIANT(!invoked);
1389 #ifdef CHECK_INVARIANTS
1390 checker::SanityCheckNewOrder(&key, &value);
1394 inline const new_order::key &
1399 inline bool was_invoked() const { return invoked; }
1401 new_order::key k_no;
1405 STATIC_COUNTER_DECL(scopedperf::tod_ctr, delivery_probe0_tod, delivery_probe0_cg)
1407 template <typename Database, bool AllowReadOnlyScans>
1408 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1409 tpcc_worker<Database, AllowReadOnlyScans>::txn_delivery()
1411 const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1412 const uint o_carrier_id = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1413 const uint32_t ts = GetCurrentTimeMillis();
1415 // worst case txn profile:
1417 // 1 new_order scan node
1419 // 2 order_line scan nodes
1420 // 15 order_line puts
1421 // 1 new_order remove
1426 // output from counters:
1427 // max_absent_range_set_size : 0
1428 // max_absent_set_size : 0
1429 // max_node_scan_size : 21
1430 // max_read_set_size : 133
1431 // max_write_set_size : 133
1432 // num_txn_contexts : 4
1433 typename Database::template TransactionType<abstract_db::HINT_TPCC_DELIVERY>::type txn(txn_flags, this->arena);
1434 scoped_str_arena s_arena(this->arena);
1435 scoped_lock_guard<spinlock> slock(
1436 g_enable_partition_locks ? &LockForPartition(warehouse_id) : nullptr);
1439 for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
1440 const new_order::key k_no_0(warehouse_id, d, last_no_o_ids[d - 1]);
1441 const new_order::key k_no_1(warehouse_id, d, numeric_limits<int32_t>::max());
1442 new_order_scan_callback<Database> new_order_c;
1444 ANON_REGION("DeliverNewOrderScan:", &delivery_probe0_cg);
1445 tables.tbl_new_order(warehouse_id)->search_range_call(
1446 txn, k_no_0, &k_no_1, new_order_c);
1449 const new_order::key &k_no = new_order_c.get_key();
1450 if (unlikely(!new_order_c.was_invoked()))
1452 last_no_o_ids[d - 1] = k_no.no_o_id + 1; // XXX: update last seen
1454 const oorder::key k_oo(warehouse_id, d, k_no.no_o_id);
1456 if (unlikely(!tables.tbl_oorder(warehouse_id)->search(txn, k_oo, v_oo,
1458 oorder::value::o_c_id_field,
1459 oorder::value::o_carrier_id_field)))) {
1460 // even if we read the new order entry, there's no guarantee
1461 // we will read the oorder entry: in this case the txn will abort,
1462 // but we're simply bailing out early
1464 return txn_result(false, 0);
1466 checker::SanityCheckOOrder(&k_oo, &v_oo);
1468 // never more than 15 order_lines per order
1469 static_limit_callback<typename Database::template IndexType<schema<order_line>>::type, 15, false> c;
1470 const order_line::key k_oo_0(warehouse_id, d, k_no.no_o_id, 0);
1471 const order_line::key k_oo_1(warehouse_id, d, k_no.no_o_id, numeric_limits<int32_t>::max());
1473 // XXX(stephentu): mutable scans would help here
1474 tables.tbl_order_line(warehouse_id)->search_range_call(
1475 txn, k_oo_0, &k_oo_1, c, false,
1477 order_line::value::ol_amount_field,
1478 order_line::value::ol_delivery_d_field));
1480 for (size_t i = 0; i < c.size(); i++) {
1481 #if defined(CHECK_INVARIANTS) && defined(DISABLE_FIELD_SELECTION)
1482 checker::SanityCheckOrderLine(&c.key(i), &c.value(i));
1484 sum += c.value(i).ol_amount;
1485 c.value(i).ol_delivery_d = ts;
1486 tables.tbl_order_line(warehouse_id)->put(txn, c.key(i), c.value(i),
1487 GUARDED_FIELDS(order_line::value::ol_delivery_d_field));
1491 tables.tbl_new_order(warehouse_id)->remove(txn, k_no);
1492 ret -= 0 /*new_order_c.get_value_size()*/;
1495 v_oo.o_carrier_id = o_carrier_id;
1496 tables.tbl_oorder(warehouse_id)->put(txn, k_oo, v_oo,
1497 GUARDED_FIELDS(oorder::value::o_carrier_id_field));
1499 const uint c_id = v_oo.o_c_id;
1500 const float ol_total = sum;
1503 const customer::key k_c(warehouse_id, d, c_id);
1504 customer::value v_c;
1505 ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c,
1506 GUARDED_FIELDS(customer::value::c_balance_field)));
1507 v_c.c_balance += ol_total;
1508 tables.tbl_customer(warehouse_id)->put(txn, k_c, v_c,
1509 GUARDED_FIELDS(customer::value::c_balance_field));
1511 //measure_txn_counters(txn, "txn_delivery");
1512 if (likely(txn.commit()))
1513 return txn_result(true, ret);
1514 } catch (typename Database::abort_exception_type &e) {
1517 return txn_result(false, 0);
1520 static event_avg_counter evt_avg_cust_name_idx_scan_size("avg_cust_name_idx_scan_size");
1522 template <typename Database, bool AllowReadOnlyScans>
1523 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1524 tpcc_worker<Database, AllowReadOnlyScans>::txn_payment()
1526 const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1527 const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1528 uint customerDistrictID, customerWarehouseID;
1529 if (likely(g_disable_xpartition_txn ||
1530 NumWarehouses() == 1 ||
1531 RandomNumber(this->r, 1, 100) <= 85)) {
1532 customerDistrictID = districtID;
1533 customerWarehouseID = warehouse_id;
1535 customerDistrictID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1537 customerWarehouseID = RandomNumber(this->r, 1, NumWarehouses());
1538 } while (customerWarehouseID == warehouse_id);
1540 const float paymentAmount = (float) (RandomNumber(this->r, 100, 500000) / 100.0);
1541 const uint32_t ts = GetCurrentTimeMillis();
1542 INVARIANT(!g_disable_xpartition_txn || customerWarehouseID == warehouse_id);
1544 // output from txn counters:
1545 // max_absent_range_set_size : 0
1546 // max_absent_set_size : 0
1547 // max_node_scan_size : 10
1548 // max_read_set_size : 71
1549 // max_write_set_size : 1
1550 // num_txn_contexts : 5
1551 typename Database::template TransactionType<abstract_db::HINT_TPCC_PAYMENT>::type txn(txn_flags, this->arena);
1552 scoped_str_arena s_arena(this->arena);
1553 scoped_multilock<spinlock> mlock;
1554 if (g_enable_partition_locks) {
1555 mlock.enq(LockForPartition(warehouse_id));
1556 if (PartitionId(customerWarehouseID) != PartitionId(warehouse_id))
1557 mlock.enq(LockForPartition(customerWarehouseID));
1560 if (customerWarehouseID != warehouse_id)
1561 ++evt_tpcc_cross_partition_payment_txns;
1565 const warehouse::key k_w(warehouse_id);
1566 warehouse::value v_w;
1568 tables.tbl_warehouse(warehouse_id)->search(txn, k_w, v_w, GUARDED_FIELDS(
1569 warehouse::value::w_ytd_field,
1570 warehouse::value::w_street_1_field,
1571 warehouse::value::w_street_1_field,
1572 warehouse::value::w_city_field,
1573 warehouse::value::w_state_field,
1574 warehouse::value::w_zip_field,
1575 warehouse::value::w_name_field)));
1576 checker::SanityCheckWarehouse(&k_w, &v_w);
1578 v_w.w_ytd += paymentAmount;
1579 tables.tbl_warehouse(warehouse_id)->put(txn, k_w, v_w, GUARDED_FIELDS(warehouse::value::w_ytd_field));
1581 const district::key k_d(warehouse_id, districtID);
1582 district::value v_d;
1583 ALWAYS_ASSERT(tables.tbl_district(warehouse_id)->search(txn, k_d, v_d, GUARDED_FIELDS(
1584 district::value::d_ytd_field,
1585 district::value::d_street_1_field,
1586 district::value::d_street_1_field,
1587 district::value::d_city_field,
1588 district::value::d_state_field,
1589 district::value::d_zip_field,
1590 district::value::d_name_field)));
1591 checker::SanityCheckDistrict(&k_d, &v_d);
1593 v_d.d_ytd += paymentAmount;
1594 tables.tbl_district(warehouse_id)->put(txn, k_d, v_d, GUARDED_FIELDS(district::value::d_ytd_field));
1597 customer::value v_c;
1598 if (RandomNumber(this->r, 1, 100) <= 60) {
1600 uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
1601 static_assert(sizeof(lastname_buf) == 16, "XX");
1602 NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
1603 GetNonUniformCustomerLastNameRun(lastname_buf, r);
1605 static const string zeros(16, 0);
1606 static const string ones(16, 255);
1608 customer_name_idx::key k_c_idx_0;
1609 k_c_idx_0.c_w_id = customerWarehouseID;
1610 k_c_idx_0.c_d_id = customerDistrictID;
1611 k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
1612 k_c_idx_0.c_first.assign(zeros);
1614 customer_name_idx::key k_c_idx_1;
1615 k_c_idx_1.c_w_id = customerWarehouseID;
1616 k_c_idx_1.c_d_id = customerDistrictID;
1617 k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
1618 k_c_idx_1.c_first.assign(ones);
1620 // probably a safe bet for now
1621 bytes_static_limit_callback<
1622 typename Database::template IndexType<schema<customer_name_idx>>::type,
1623 NMaxCustomerIdxScanElems, true> c(s_arena.get());
1624 tables.tbl_customer_name_idx(customerWarehouseID)->bytes_search_range_call(
1625 txn, k_c_idx_0, &k_c_idx_1, c);
1626 ALWAYS_ASSERT(c.size() > 0);
1627 INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
1628 int index = c.size() / 2;
1629 if (c.size() % 2 == 0)
1631 evt_avg_cust_name_idx_scan_size.offer(c.size());
1633 customer_name_idx::value v_c_idx_temp;
1634 const customer_name_idx::value *v_c_idx = Decode(c.value(index), v_c_idx_temp);
1636 k_c.c_w_id = customerWarehouseID;
1637 k_c.c_d_id = customerDistrictID;
1638 k_c.c_id = v_c_idx->c_id;
1639 ALWAYS_ASSERT(tables.tbl_customer(customerWarehouseID)->search(txn, k_c, v_c));
1642 const uint customerID = GetCustomerId(r);
1643 k_c.c_w_id = customerWarehouseID;
1644 k_c.c_d_id = customerDistrictID;
1645 k_c.c_id = customerID;
1646 ALWAYS_ASSERT(tables.tbl_customer(customerWarehouseID)->search(txn, k_c, v_c));
1648 checker::SanityCheckCustomer(&k_c, &v_c);
1650 v_c.c_balance -= paymentAmount;
1651 v_c.c_ytd_payment += paymentAmount;
1652 v_c.c_payment_cnt++;
1653 if (strncmp(v_c.c_credit.data(), "BC", 2) == 0) {
1655 int n = snprintf(buf, sizeof(buf), "%d %d %d %d %d %f | %s",
1662 v_c.c_data.c_str());
1663 v_c.c_data.resize_junk(
1664 min(static_cast<size_t>(n), v_c.c_data.max_size()));
1665 NDB_MEMCPY((void *) v_c.c_data.data(), &buf[0], v_c.c_data.size());
1668 tables.tbl_customer(customerWarehouseID)->put(txn, k_c, v_c);
1670 const history::key k_h(k_c.c_d_id, k_c.c_w_id, k_c.c_id, districtID, warehouse_id, ts);
1672 v_h.h_amount = paymentAmount;
1673 v_h.h_data.resize_junk(v_h.h_data.max_size());
1674 int n = snprintf((char *) v_h.h_data.data(), v_h.h_data.max_size() + 1,
1677 v_d.d_name.c_str());
1678 v_h.h_data.resize_junk(min(static_cast<size_t>(n), v_h.h_data.max_size()));
1680 const size_t history_sz = Size(v_h);
1681 tables.tbl_history(warehouse_id)->insert(txn, k_h, v_h);
1684 //measure_txn_counters(txn, "txn_payment");
1685 if (likely(txn.commit()))
1686 return txn_result(true, ret);
1687 } catch (typename Database::abort_exception_type &e) {
1690 return txn_result(false, 0);
1693 template <typename Database>
1694 class order_line_nop_callback : public
1695 Database::template IndexType<schema<order_line>>::type::bytes_search_range_callback {
1698 order_line_nop_callback() : n(0) {}
1699 virtual bool invoke(
1701 const string &value)
1703 INVARIANT(key.size() == sizeof(order_line::key));
1704 order_line::value v_ol_temp;
1705 const order_line::value *v_ol UNUSED = Decode(value, v_ol_temp);
1706 #ifdef CHECK_INVARIANTS
1707 order_line::key k_ol_temp;
1708 const order_line::key *k_ol = Decode(key, k_ol_temp);
1709 checker::SanityCheckOrderLine(k_ol, v_ol);
1717 STATIC_COUNTER_DECL(scopedperf::tod_ctr, order_status_probe0_tod, order_status_probe0_cg)
1719 template <typename Database, bool AllowReadOnlyScans>
1720 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1721 tpcc_worker<Database, AllowReadOnlyScans>::txn_order_status()
1723 const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1724 const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1726 // output from txn counters:
1727 // max_absent_range_set_size : 0
1728 // max_absent_set_size : 0
1729 // max_node_scan_size : 13
1730 // max_read_set_size : 81
1731 // max_write_set_size : 0
1732 // num_txn_contexts : 4
1733 const uint64_t read_only_mask =
1734 !AllowReadOnlyScans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
1735 typename Database::template TransactionType<
1736 AllowReadOnlyScans ?
1737 abstract_db::HINT_TPCC_ORDER_STATUS_READ_ONLY :
1738 abstract_db::HINT_TPCC_ORDER_STATUS>::type txn(
1739 txn_flags | read_only_mask, this->arena);
1740 scoped_str_arena s_arena(this->arena);
1741 // NB: since txn_order_status() is a RO txn, we assume that
1742 // locking is un-necessary (since we can just read from some old snapshot)
1746 customer::value v_c;
1747 if (RandomNumber(this->r, 1, 100) <= 60) {
1749 uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
1750 static_assert(sizeof(lastname_buf) == 16, "xx");
1751 NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
1752 GetNonUniformCustomerLastNameRun(lastname_buf, r);
1754 static const string zeros(16, 0);
1755 static const string ones(16, 255);
1757 customer_name_idx::key k_c_idx_0;
1758 k_c_idx_0.c_w_id = warehouse_id;
1759 k_c_idx_0.c_d_id = districtID;
1760 k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
1761 k_c_idx_0.c_first.assign(zeros);
1763 customer_name_idx::key k_c_idx_1;
1764 k_c_idx_1.c_w_id = warehouse_id;
1765 k_c_idx_1.c_d_id = districtID;
1766 k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
1767 k_c_idx_1.c_first.assign(ones);
1769 // NMaxCustomerIdxScanElems is probably a safe bet for now
1770 bytes_static_limit_callback<
1771 typename Database::template IndexType<schema<customer_name_idx>>::type,
1772 NMaxCustomerIdxScanElems, true> c(s_arena.get());
1773 tables.tbl_customer_name_idx(warehouse_id)->bytes_search_range_call(
1774 txn, k_c_idx_0, &k_c_idx_1, c);
1775 ALWAYS_ASSERT(c.size() > 0);
1776 INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
1777 int index = c.size() / 2;
1778 if (c.size() % 2 == 0)
1780 evt_avg_cust_name_idx_scan_size.offer(c.size());
1782 customer_name_idx::value v_c_idx_temp;
1783 const customer_name_idx::value *v_c_idx = Decode(c.value(index), v_c_idx_temp);
1785 k_c.c_w_id = warehouse_id;
1786 k_c.c_d_id = districtID;
1787 k_c.c_id = v_c_idx->c_id;
1788 ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c));
1792 const uint customerID = GetCustomerId(r);
1793 k_c.c_w_id = warehouse_id;
1794 k_c.c_d_id = districtID;
1795 k_c.c_id = customerID;
1796 ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c));
1798 checker::SanityCheckCustomer(&k_c, &v_c);
1800 // XXX(stephentu): HACK- we bound the # of elems returned by this scan to
1801 // 15- this is because we don't have reverse scans. In an ideal system, a
1802 // reverse scan would only need to read 1 btree node. We could simulate a
1803 // lookup by only reading the first element- but then we would *always*
1804 // read the first order by any customer. To make this more interesting, we
1805 // randomly select which elem to pick within the 1st or 2nd btree nodes.
1806 // This is obviously a deviation from TPC-C, but it shouldn't make that
1807 // much of a difference in terms of performance numbers (in fact we are
1808 // making it worse for us)
1809 latest_key_callback<
1810 typename Database::template IndexType<schema<oorder_c_id_idx>>::type> c_oorder(
1811 *this->arena.next(), (r.next() % 15) + 1);
1812 const oorder_c_id_idx::key k_oo_idx_0(warehouse_id, districtID, k_c.c_id, 0);
1813 const oorder_c_id_idx::key k_oo_idx_1(warehouse_id, districtID, k_c.c_id, numeric_limits<int32_t>::max());
1815 ANON_REGION("OrderStatusOOrderScan:", &order_status_probe0_cg);
1816 tables.tbl_oorder_c_id_idx(warehouse_id)->bytes_search_range_call(
1817 txn, k_oo_idx_0, &k_oo_idx_1, c_oorder);
1819 ALWAYS_ASSERT(c_oorder.size());
1821 oorder_c_id_idx::key k_oo_idx_temp;
1822 const oorder_c_id_idx::key *k_oo_idx = Decode(c_oorder.kstr(), k_oo_idx_temp);
1823 const uint o_id = k_oo_idx->o_o_id;
1826 // XXX(stephentu): what's wrong w/ it?
1827 order_line_nop_callback<Database> c_order_line;
1828 const order_line::key k_ol_0(warehouse_id, districtID, o_id, 0);
1829 const order_line::key k_ol_1(warehouse_id, districtID, o_id, numeric_limits<int32_t>::max());
1830 tables.tbl_order_line(warehouse_id)->bytes_search_range_call(
1831 txn, k_ol_0, &k_ol_1, c_order_line);
1832 ALWAYS_ASSERT(c_order_line.n >= 5 && c_order_line.n <= 15);
1834 //measure_txn_counters(txn, "txn_order_status");
1835 if (likely(txn.commit()))
1836 return txn_result(true, 0);
1837 } catch (typename Database::abort_exception_type &e) {
1840 return txn_result(false, 0);
1843 template <typename Database>
1844 class order_line_scan_callback : public
1845 Database::template IndexType<schema<order_line>>::type::bytes_search_range_callback {
1847 order_line_scan_callback() : n(0) {}
1848 virtual bool invoke(
1850 const string &value)
1852 INVARIANT(key.size() == sizeof(order_line::key));
1853 order_line::value v_ol;
1855 #ifdef DISABLE_FIELD_SELECTION
1856 const uint64_t mask = numeric_limits<uint64_t>::max();
1858 const uint64_t mask = compute_fields_mask(0);
1861 typed_txn_btree_<schema<order_line>>::do_record_read(
1862 (const uint8_t *) value.data(), value.size(), mask, &v_ol);
1864 #if defined(CHECK_INVARIANTS) && defined(DISABLE_FIELD_SELECTION)
1865 order_line::key k_ol_temp;
1866 const order_line::key *k_ol = Decode(key, k_ol_temp);
1867 checker::SanityCheckOrderLine(k_ol, &v_ol);
1870 s_i_ids[v_ol.ol_i_id] = 1;
1875 small_unordered_map<uint, bool, 512> s_i_ids;
1878 STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe0_tod, stock_level_probe0_cg)
1879 STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe1_tod, stock_level_probe1_cg)
1880 STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe2_tod, stock_level_probe2_cg)
1882 static event_avg_counter evt_avg_stock_level_loop_join_lookups("stock_level_loop_join_lookups");
1884 template <typename Database, bool AllowReadOnlyScans>
1885 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1886 tpcc_worker<Database, AllowReadOnlyScans>::txn_stock_level()
1888 const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1889 const uint threshold = RandomNumber(this->r, 10, 20);
1890 const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1892 // output from txn counters:
1893 // max_absent_range_set_size : 0
1894 // max_absent_set_size : 0
1895 // max_node_scan_size : 19
1896 // max_read_set_size : 241
1897 // max_write_set_size : 0
1898 // n_node_scan_large_instances : 1
1899 // n_read_set_large_instances : 2
1900 // num_txn_contexts : 3
1901 const uint64_t read_only_mask =
1902 !AllowReadOnlyScans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
1903 typename Database::template TransactionType<
1904 AllowReadOnlyScans ?
1905 abstract_db::HINT_TPCC_STOCK_LEVEL_READ_ONLY :
1906 abstract_db::HINT_TPCC_STOCK_LEVEL>::type txn(
1907 txn_flags | read_only_mask, this->arena);
1908 scoped_str_arena s_arena(this->arena);
1909 // NB: since txn_stock_level() is a RO txn, we assume that
1910 // locking is un-necessary (since we can just read from some old snapshot)
1912 const district::key k_d(warehouse_id, districtID);
1913 district::value v_d;
1914 ALWAYS_ASSERT(tables.tbl_district(warehouse_id)->search(txn, k_d, v_d));
1915 checker::SanityCheckDistrict(&k_d, &v_d);
1916 const uint64_t cur_next_o_id = g_new_order_fast_id_gen ?
1917 NewOrderIdHolder(warehouse_id, districtID).load(memory_order_acquire) :
1920 // manual joins are fun!
1921 order_line_scan_callback<Database> c;
1922 const int32_t lower = cur_next_o_id >= 20 ? (cur_next_o_id - 20) : 0;
1923 const order_line::key k_ol_0(warehouse_id, districtID, lower, 0);
1924 const order_line::key k_ol_1(warehouse_id, districtID, cur_next_o_id, 0);
1926 // mask must be kept in sync w/ order_line_scan_callback
1927 #ifdef DISABLE_FIELD_SELECTION
1928 const size_t nfields = order_line::value::NFIELDS;
1930 const size_t nfields = 1;
1932 ANON_REGION("StockLevelOrderLineScan:", &stock_level_probe0_cg);
1933 tables.tbl_order_line(warehouse_id)->bytes_search_range_call(
1934 txn, k_ol_0, &k_ol_1, c, nfields);
1937 small_unordered_map<uint, bool, 512> s_i_ids_distinct;
1938 for (auto &p : c.s_i_ids) {
1939 ANON_REGION("StockLevelLoopJoinIter:", &stock_level_probe1_cg);
1940 const stock::key k_s(warehouse_id, p.first);
1942 INVARIANT(p.first >= 1 && p.first <= NumItems());
1944 ANON_REGION("StockLevelLoopJoinGet:", &stock_level_probe2_cg);
1945 ALWAYS_ASSERT(tables.tbl_stock(warehouse_id)->search(txn, k_s, v_s, GUARDED_FIELDS(stock::value::s_quantity_field)));
1947 if (v_s.s_quantity < int(threshold))
1948 s_i_ids_distinct[p.first] = 1;
1950 evt_avg_stock_level_loop_join_lookups.offer(c.s_i_ids.size());
1951 // NB(stephentu): s_i_ids_distinct.size() is the computed result of this txn
1953 //measure_txn_counters(txn, "txn_stock_level");
1954 if (likely(txn.commit()))
1955 return txn_result(true, 0);
1956 } catch (typename Database::abort_exception_type &e) {
1959 return txn_result(false, 0);
1962 template <typename T>
1964 unique_filter(const vector<T> &v)
1969 if (!seen.count(e)) {
1970 ret.emplace_back(e);
1976 template <typename Database, bool AllowReadOnlyScans>
1977 class tpcc_bench_runner : public typed_bench_runner<Database> {
1981 IsTableReadOnly(const char *name)
1983 return strcmp("item", name) == 0;
1987 IsTableAppendOnly(const char *name)
1989 return strcmp("history", name) == 0 ||
1990 strcmp("oorder_c_id_idx", name) == 0;
1993 template <typename Schema>
1994 static vector<shared_ptr<typename Database::template IndexType<Schema>::type>>
1995 OpenTablesForTablespace(Database *db, const char *name, size_t expected_size)
1997 const bool is_read_only = IsTableReadOnly(name);
1998 const bool is_append_only = IsTableAppendOnly(name);
1999 const string s_name(name);
2000 vector<typename Database::template IndexType<Schema>::ptr_type> ret(NumWarehouses());
2001 if (g_enable_separate_tree_per_partition && !is_read_only) {
2002 if (NumWarehouses() <= nthreads) {
2003 for (size_t i = 0; i < NumWarehouses(); i++)
2004 ret[i] = db->template open_index<Schema>(s_name + "_" + to_string(i), expected_size, is_append_only);
2006 const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
2007 for (size_t partid = 0; partid < nthreads; partid++) {
2008 const unsigned wstart = partid * nwhse_per_partition;
2009 const unsigned wend = (partid + 1 == nthreads) ?
2010 NumWarehouses() : (partid + 1) * nwhse_per_partition;
2012 db->template open_index<Schema>(s_name + "_" + to_string(partid), expected_size, is_append_only);
2013 for (size_t i = wstart; i < wend; i++)
2018 auto idx = db->template open_index<Schema>(s_name, expected_size, is_append_only);
2019 for (size_t i = 0; i < NumWarehouses(); i++)
2026 tpcc_bench_runner(Database *db)
2027 : typed_bench_runner<Database>(db)
2030 #define OPEN_TABLESPACE_X(x) \
2032 tables.tbl_ ## x ## _vec = OpenTablesForTablespace<schema<x>>(db, #x, sizeof(x::value)); \
2033 auto v = unique_filter(tables.tbl_ ## x ## _vec); \
2034 for (size_t i = 0; i < v.size(); i++) \
2035 this->open_tables[string(#x) + "_" + to_string(i)] = v[i]; \
2038 TPCC_TABLE_LIST(OPEN_TABLESPACE_X);
2040 #undef OPEN_TABLESPACE_X
2042 if (g_enable_partition_locks) {
2043 static_assert(sizeof(aligned_padded_elem<spinlock>) == CACHELINE_SIZE, "xx");
2044 void * const px = memalign(CACHELINE_SIZE, sizeof(aligned_padded_elem<spinlock>) * nthreads);
2046 ALWAYS_ASSERT(reinterpret_cast<uintptr_t>(px) % CACHELINE_SIZE == 0);
2047 g_partition_locks = reinterpret_cast<aligned_padded_elem<spinlock> *>(px);
2048 for (size_t i = 0; i < nthreads; i++) {
2049 new (&g_partition_locks[i]) aligned_padded_elem<spinlock>();
2050 ALWAYS_ASSERT(!g_partition_locks[i].elem.is_locked());
2054 if (g_new_order_fast_id_gen) {
2058 sizeof(aligned_padded_elem<atomic<uint64_t>>) *
2059 NumWarehouses() * NumDistrictsPerWarehouse());
2060 g_district_ids = reinterpret_cast<aligned_padded_elem<atomic<uint64_t>> *>(px);
2061 for (size_t i = 0; i < NumWarehouses() * NumDistrictsPerWarehouse(); i++)
2062 new (&g_district_ids[i]) atomic<uint64_t>(3001);
2067 virtual vector<unique_ptr<bench_loader>>
2070 vector<unique_ptr<bench_loader>> ret;
2071 ret.emplace_back(new tpcc_warehouse_loader<Database>(9324, this->typed_db(), tables));
2072 ret.emplace_back(new tpcc_item_loader<Database>(235443, this->typed_db(), tables));
2073 if (enable_parallel_loading) {
2074 fast_random r(89785943);
2075 for (uint i = 1; i <= NumWarehouses(); i++)
2076 ret.emplace_back(new tpcc_stock_loader<Database>(r.next(), this->typed_db(), tables, i));
2078 ret.emplace_back(new tpcc_stock_loader<Database>(89785943, this->typed_db(), tables, -1));
2080 ret.emplace_back(new tpcc_district_loader<Database>(129856349, this->typed_db(), tables));
2081 if (enable_parallel_loading) {
2082 fast_random r(923587856425);
2083 for (uint i = 1; i <= NumWarehouses(); i++)
2084 ret.emplace_back(new tpcc_customer_loader<Database>(r.next(), this->typed_db(), tables, i));
2086 ret.emplace_back(new tpcc_customer_loader<Database>(923587856425, this->typed_db(), tables, -1));
2088 if (enable_parallel_loading) {
2089 fast_random r(2343352);
2090 for (uint i = 1; i <= NumWarehouses(); i++)
2091 ret.emplace_back(new tpcc_order_loader<Database>(r.next(), this->typed_db(), tables, i));
2093 ret.emplace_back(new tpcc_order_loader<Database>(2343352, this->typed_db(), tables, -1));
2100 vector<unique_ptr<bench_worker>>
2103 const unsigned alignment = coreid::num_cpus_online();
2104 const int blockstart =
2105 coreid::allocate_contiguous_aligned_block(nthreads, alignment);
2106 ALWAYS_ASSERT(blockstart >= 0);
2107 ALWAYS_ASSERT((blockstart % alignment) == 0);
2108 fast_random r(23984543);
2109 vector<unique_ptr<bench_worker>> ret;
2110 if (NumWarehouses() <= nthreads) {
2111 for (size_t i = 0; i < nthreads; i++)
2113 new tpcc_worker<Database, RO>(
2115 r.next(), this->typed_db(), tables,
2116 &this->barrier_a, &this->barrier_b,
2117 (i % NumWarehouses()) + 1, (i % NumWarehouses()) + 2));
2119 const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
2120 for (size_t i = 0; i < nthreads; i++) {
2121 const unsigned wstart = i * nwhse_per_partition;
2122 const unsigned wend = (i + 1 == nthreads) ?
2123 NumWarehouses() : (i + 1) * nwhse_per_partition;
2125 new tpcc_worker<Database, RO>(
2127 r.next(), this->typed_db(), tables,
2128 &this->barrier_a, &this->barrier_b, wstart+1, wend+2));
2135 virtual vector<unique_ptr<bench_worker>>
2138 return g_disable_read_only_scans ? make_workers_impl<false>() : make_workers_impl<true>();
2142 tpcc_tables<Database> tables;
2145 template <typename Database>
2146 static unique_ptr<bench_runner>
2147 MakeBenchRunner(Database *db)
2149 return unique_ptr<bench_runner>(
2150 g_disable_read_only_scans ?
2151 static_cast<bench_runner *>(new tpcc_bench_runner<Database, false>(db)) :
2152 static_cast<bench_runner *>(new tpcc_bench_runner<Database, true>(db)));
2156 tpcc_do_test(const string &dbtype,
2157 const persistconfig &cfg,
2158 int argc, char **argv)
2162 bool did_spec_remote_pct = false;
2164 static struct option long_options[] =
2166 {"disable-cross-partition-transactions" , no_argument , &g_disable_xpartition_txn , 1} ,
2167 {"disable-read-only-snapshots" , no_argument , &g_disable_read_only_scans , 1} ,
2168 {"enable-partition-locks" , no_argument , &g_enable_partition_locks , 1} ,
2169 {"enable-separate-tree-per-partition" , no_argument , &g_enable_separate_tree_per_partition , 1} ,
2170 {"new-order-remote-item-pct" , required_argument , 0 , 'r'} ,
2171 {"new-order-fast-id-gen" , no_argument , &g_new_order_fast_id_gen , 1} ,
2172 {"uniform-item-dist" , no_argument , &g_uniform_item_dist , 1} ,
2173 {"workload-mix" , required_argument , 0 , 'w'} ,
2176 int option_index = 0;
2177 int c = getopt_long(argc, argv, "r:", long_options, &option_index);
2182 if (long_options[option_index].flag != 0)
2188 g_new_order_remote_item_pct = strtoul(optarg, NULL, 10);
2189 ALWAYS_ASSERT(g_new_order_remote_item_pct >= 0 && g_new_order_remote_item_pct <= 100);
2190 did_spec_remote_pct = true;
2195 const vector<string> toks = split(optarg, ',');
2196 ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
2198 for (size_t i = 0; i < toks.size(); i++) {
2199 unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
2200 ALWAYS_ASSERT(p >= 0 && p <= 100);
2202 g_txn_workload_mix[i] = p;
2204 ALWAYS_ASSERT(s == 100);
2209 /* getopt_long already printed an error message. */
2217 if (did_spec_remote_pct && g_disable_xpartition_txn) {
2218 cerr << "WARNING: --new-order-remote-item-pct given with --disable-cross-partition-transactions" << endl;
2219 cerr << " --new-order-remote-item-pct will have no effect" << endl;
2223 cerr << "tpcc settings:" << endl;
2224 cerr << " cross_partition_transactions : " << !g_disable_xpartition_txn << endl;
2225 cerr << " read_only_snapshots : " << !g_disable_read_only_scans << endl;
2226 cerr << " partition_locks : " << g_enable_partition_locks << endl;
2227 cerr << " separate_tree_per_partition : " << g_enable_separate_tree_per_partition << endl;
2228 cerr << " new_order_remote_item_pct : " << g_new_order_remote_item_pct << endl;
2229 cerr << " new_order_fast_id_gen : " << g_new_order_fast_id_gen << endl;
2230 cerr << " uniform_item_dist : " << g_uniform_item_dist << endl;
2231 cerr << " workload_mix : " <<
2232 format_list(g_txn_workload_mix,
2233 g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix)) << endl;
2236 unique_ptr<abstract_db> db;
2237 unique_ptr<bench_runner> r;
2239 if (dbtype == "ndb-proto2") {
2240 if (!cfg.logfiles_.empty()) {
2241 vector<vector<unsigned>> assignments_used;
2243 nthreads, cfg.logfiles_, cfg.assignments_, &assignments_used,
2248 cerr << "[logging subsystem]" << endl;
2249 cerr << " assignments: " << assignments_used << endl;
2250 cerr << " call fsync : " << !cfg.nofsync_ << endl;
2251 cerr << " compression: " << cfg.do_compress_ << endl;
2252 cerr << " fake_writes: " << cfg.fake_writes_ << endl;
2255 #ifdef PROTO2_CAN_DISABLE_GC
2256 if (!cfg.disable_gc_)
2257 transaction_proto2_static::InitGC();
2259 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
2260 if (cfg.disable_snapshots_)
2261 transaction_proto2_static::DisableSnapshots();
2263 typedef ndb_database<transaction_proto2> Database;
2264 Database *raw = new Database;
2266 r = MakeBenchRunner(raw);
2267 } else if (dbtype == "kvdb-st") {
2268 typedef kvdb_database<false> Database;
2269 Database *raw = new Database;
2271 r = MakeBenchRunner(raw);
2273 ALWAYS_ASSERT(false);