--- /dev/null
+/**
+ * An implementation of TPC-C based off of:
+ * https://github.com/oltpbenchmark/oltpbench/tree/master/src/com/oltpbenchmark/benchmarks/tpcc
+ */
+
+#include <sys/time.h>
+#include <string>
+#include <ctype.h>
+#include <stdlib.h>
+#include <malloc.h>
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <set>
+#include <vector>
+
+#include "../txn.h"
+#include "../macros.h"
+#include "../scopedperf.hh"
+#include "../spinlock.h"
+
+#include "bench.h"
+#include "tpcc.h"
+
+#include "ndb_database.h"
+#include "kvdb_database.h"
+
+using namespace std;
+using namespace util;
+
+static inline ALWAYS_INLINE size_t
+NumWarehouses()
+{
+ return (size_t) scale_factor;
+}
+
+// config constants
+
+static constexpr inline ALWAYS_INLINE size_t
+NumItems()
+{
+ return 100000;
+}
+
+static constexpr inline ALWAYS_INLINE size_t
+NumDistrictsPerWarehouse()
+{
+ return 10;
+}
+
+static constexpr inline ALWAYS_INLINE size_t
+NumCustomersPerDistrict()
+{
+ return 3000;
+}
+
+// T must implement lock()/unlock(). Both must *not* throw exceptions
+template <typename T>
+class scoped_multilock {
+public:
+ inline scoped_multilock()
+ : did_lock(false)
+ {
+ }
+
+ inline ~scoped_multilock()
+ {
+ if (did_lock)
+ for (auto &t : locks)
+ t->unlock();
+ }
+
+ inline void
+ enq(T &t)
+ {
+ ALWAYS_ASSERT(!did_lock);
+ locks.emplace_back(&t);
+ }
+
+ inline void
+ multilock()
+ {
+ ALWAYS_ASSERT(!did_lock);
+ if (locks.size() > 1)
+ sort(locks.begin(), locks.end());
+#ifdef CHECK_INVARIANTS
+ if (set<T *>(locks.begin(), locks.end()).size() != locks.size()) {
+ for (auto &t : locks)
+ cerr << "lock: " << hexify(t) << endl;
+ INVARIANT(false && "duplicate locks found");
+ }
+#endif
+ for (auto &t : locks)
+ t->lock();
+ did_lock = true;
+ }
+
+private:
+ bool did_lock;
+ typename util::vec<T *, 64>::type locks;
+};
+
+// like a lock_guard, but has the option of not acquiring
+template <typename T>
+class scoped_lock_guard {
+public:
+ inline scoped_lock_guard(T &l)
+ : l(&l)
+ {
+ this->l->lock();
+ }
+
+ inline scoped_lock_guard(T *l)
+ : l(l)
+ {
+ if (this->l)
+ this->l->lock();
+ }
+
+ inline ~scoped_lock_guard()
+ {
+ if (l)
+ l->unlock();
+ }
+
+private:
+ T *l;
+};
+
+// configuration flags
+static int g_disable_xpartition_txn = 0;
+static int g_disable_read_only_scans = 0;
+static int g_enable_partition_locks = 0;
+static int g_enable_separate_tree_per_partition = 0;
+static int g_new_order_remote_item_pct = 1;
+static int g_new_order_fast_id_gen = 0;
+static int g_uniform_item_dist = 0;
+static unsigned g_txn_workload_mix[] = { 45, 43, 4, 4, 4 }; // default TPC-C workload mix
+
+static aligned_padded_elem<spinlock> *g_partition_locks = nullptr;
+static aligned_padded_elem<atomic<uint64_t>> *g_district_ids = nullptr;
+
+// maps a wid => partition id
+static inline ALWAYS_INLINE unsigned int
+PartitionId(unsigned int wid)
+{
+ INVARIANT(wid >= 1 && wid <= NumWarehouses());
+ wid -= 1; // 0-idx
+ if (NumWarehouses() <= nthreads)
+ // more workers than partitions, so its easy
+ return wid;
+ const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
+ const unsigned partid = wid / nwhse_per_partition;
+ if (partid >= nthreads)
+ return nthreads - 1;
+ return partid;
+}
+
+static inline ALWAYS_INLINE spinlock &
+LockForPartition(unsigned int wid)
+{
+ INVARIANT(g_enable_partition_locks);
+ return g_partition_locks[PartitionId(wid)].elem;
+}
+
+static inline atomic<uint64_t> &
+NewOrderIdHolder(unsigned warehouse, unsigned district)
+{
+ INVARIANT(warehouse >= 1 && warehouse <= NumWarehouses());
+ INVARIANT(district >= 1 && district <= NumDistrictsPerWarehouse());
+ const unsigned idx =
+ (warehouse - 1) * NumDistrictsPerWarehouse() + (district - 1);
+ return g_district_ids[idx].elem;
+}
+
+static inline uint64_t
+FastNewOrderIdGen(unsigned warehouse, unsigned district)
+{
+ return NewOrderIdHolder(warehouse, district).fetch_add(1, memory_order_acq_rel);
+}
+
+struct checker {
+ // these sanity checks are just a few simple checks to make sure
+ // the data is not entirely corrupted
+
+ static inline ALWAYS_INLINE void
+ SanityCheckCustomer(const customer::key *k, const customer::value *v)
+ {
+ INVARIANT(k->c_w_id >= 1 && static_cast<size_t>(k->c_w_id) <= NumWarehouses());
+ INVARIANT(k->c_d_id >= 1 && static_cast<size_t>(k->c_d_id) <= NumDistrictsPerWarehouse());
+ INVARIANT(k->c_id >= 1 && static_cast<size_t>(k->c_id) <= NumCustomersPerDistrict());
+#ifdef DISABLE_FIELD_SELECTION
+ INVARIANT(v->c_credit == "BC" || v->c_credit == "GC");
+ INVARIANT(v->c_middle == "OE");
+#endif
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckWarehouse(const warehouse::key *k, const warehouse::value *v)
+ {
+ INVARIANT(k->w_id >= 1 && static_cast<size_t>(k->w_id) <= NumWarehouses());
+#ifdef DISABLE_FIELD_SELECTION
+ INVARIANT(v->w_state.size() == 2);
+ INVARIANT(v->w_zip == "123456789");
+#endif
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckDistrict(const district::key *k, const district::value *v)
+ {
+ INVARIANT(k->d_w_id >= 1 && static_cast<size_t>(k->d_w_id) <= NumWarehouses());
+ INVARIANT(k->d_id >= 1 && static_cast<size_t>(k->d_id) <= NumDistrictsPerWarehouse());
+#ifdef DISABLE_FIELD_SELECTION
+ INVARIANT(v->d_next_o_id >= 3001);
+ INVARIANT(v->d_state.size() == 2);
+ INVARIANT(v->d_zip == "123456789");
+#endif
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckItem(const item::key *k, const item::value *v)
+ {
+ INVARIANT(k->i_id >= 1 && static_cast<size_t>(k->i_id) <= NumItems());
+#ifdef DISABLE_FIELD_SELECTION
+ INVARIANT(v->i_price >= 1.0 && v->i_price <= 100.0);
+#endif
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckStock(const stock::key *k, const stock::value *v)
+ {
+ INVARIANT(k->s_w_id >= 1 && static_cast<size_t>(k->s_w_id) <= NumWarehouses());
+ INVARIANT(k->s_i_id >= 1 && static_cast<size_t>(k->s_i_id) <= NumItems());
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckNewOrder(const new_order::key *k, const new_order::value *v)
+ {
+ INVARIANT(k->no_w_id >= 1 && static_cast<size_t>(k->no_w_id) <= NumWarehouses());
+ INVARIANT(k->no_d_id >= 1 && static_cast<size_t>(k->no_d_id) <= NumDistrictsPerWarehouse());
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckOOrder(const oorder::key *k, const oorder::value *v)
+ {
+ INVARIANT(k->o_w_id >= 1 && static_cast<size_t>(k->o_w_id) <= NumWarehouses());
+ INVARIANT(k->o_d_id >= 1 && static_cast<size_t>(k->o_d_id) <= NumDistrictsPerWarehouse());
+#ifdef DISABLE_FIELD_SELECTION
+ INVARIANT(v->o_c_id >= 1 && static_cast<size_t>(v->o_c_id) <= NumCustomersPerDistrict());
+ INVARIANT(v->o_carrier_id >= 0 && static_cast<size_t>(v->o_carrier_id) <= NumDistrictsPerWarehouse());
+ INVARIANT(v->o_ol_cnt >= 5 && v->o_ol_cnt <= 15);
+#endif
+ }
+
+ static inline ALWAYS_INLINE void
+ SanityCheckOrderLine(const order_line::key *k, const order_line::value *v)
+ {
+ INVARIANT(k->ol_w_id >= 1 && static_cast<size_t>(k->ol_w_id) <= NumWarehouses());
+ INVARIANT(k->ol_d_id >= 1 && static_cast<size_t>(k->ol_d_id) <= NumDistrictsPerWarehouse());
+ INVARIANT(k->ol_number >= 1 && k->ol_number <= 15);
+#ifdef DISABLE_FIELD_SELECTION
+ INVARIANT(v->ol_i_id >= 1 && static_cast<size_t>(v->ol_i_id) <= NumItems());
+#endif
+ }
+};
+
+static string NameTokens[] =
+ {
+ string("BAR"),
+ string("OUGHT"),
+ string("ABLE"),
+ string("PRI"),
+ string("PRES"),
+ string("ESE"),
+ string("ANTI"),
+ string("CALLY"),
+ string("ATION"),
+ string("EING"),
+ };
+
+class tpcc_worker_mixin {
+public:
+
+ // only TPCC loaders need to call this- workers are automatically
+ // pinned by their worker id (which corresponds to warehouse id
+ // in TPCC)
+ //
+ // pins the *calling* thread
+ static void
+ PinToWarehouseId(unsigned int wid)
+ {
+ const unsigned int partid = PartitionId(wid);
+ ALWAYS_ASSERT(partid < nthreads);
+ const unsigned int pinid = partid;
+ if (verbose)
+ cerr << "PinToWarehouseId(): coreid=" << coreid::core_id()
+ << " pinned to whse=" << wid << " (partid=" << partid << ")"
+ << endl;
+ rcu::s_instance.pin_current_thread(pinid);
+ rcu::s_instance.fault_region();
+ }
+
+ static inline uint32_t
+ GetCurrentTimeMillis()
+ {
+ //struct timeval tv;
+ //ALWAYS_ASSERT(gettimeofday(&tv, 0) == 0);
+ //return tv.tv_sec * 1000;
+
+ // XXX(stephentu): implement a scalable GetCurrentTimeMillis()
+ // for now, we just give each core an increasing number
+
+ static __thread uint32_t tl_hack = 0;
+ return tl_hack++;
+ }
+
+ // utils for generating random #s and strings
+
+ static inline ALWAYS_INLINE int
+ CheckBetweenInclusive(int v, int lower, int upper)
+ {
+ INVARIANT(v >= lower);
+ INVARIANT(v <= upper);
+ return v;
+ }
+
+ static inline ALWAYS_INLINE int
+ RandomNumber(fast_random &r, int min, int max)
+ {
+ return CheckBetweenInclusive((int) (r.next_uniform() * (max - min + 1) + min), min, max);
+ }
+
+ static inline ALWAYS_INLINE int
+ NonUniformRandom(fast_random &r, int A, int C, int min, int max)
+ {
+ return (((RandomNumber(r, 0, A) | RandomNumber(r, min, max)) + C) % (max - min + 1)) + min;
+ }
+
+ static inline ALWAYS_INLINE int
+ GetItemId(fast_random &r)
+ {
+ return CheckBetweenInclusive(
+ g_uniform_item_dist ?
+ RandomNumber(r, 1, NumItems()) :
+ NonUniformRandom(r, 8191, 7911, 1, NumItems()),
+ 1, NumItems());
+ }
+
+ static inline ALWAYS_INLINE int
+ GetCustomerId(fast_random &r)
+ {
+ return CheckBetweenInclusive(NonUniformRandom(r, 1023, 259, 1, NumCustomersPerDistrict()), 1, NumCustomersPerDistrict());
+ }
+
+ // pick a number between [start, end)
+ static inline ALWAYS_INLINE unsigned
+ PickWarehouseId(fast_random &r, unsigned start, unsigned end)
+ {
+ INVARIANT(start < end);
+ const unsigned diff = end - start;
+ if (diff == 1)
+ return start;
+ return (r.next() % diff) + start;
+ }
+ // all tokens are at most 5 chars long
+ static const size_t CustomerLastNameMaxSize = 5 * 3;
+
+ static inline size_t
+ GetCustomerLastName(uint8_t *buf, fast_random &r, int num)
+ {
+ const string &s0 = NameTokens[num / 100];
+ const string &s1 = NameTokens[(num / 10) % 10];
+ const string &s2 = NameTokens[num % 10];
+ uint8_t *const begin = buf;
+ const size_t s0_sz = s0.size();
+ const size_t s1_sz = s1.size();
+ const size_t s2_sz = s2.size();
+ NDB_MEMCPY(buf, s0.data(), s0_sz); buf += s0_sz;
+ NDB_MEMCPY(buf, s1.data(), s1_sz); buf += s1_sz;
+ NDB_MEMCPY(buf, s2.data(), s2_sz); buf += s2_sz;
+ return buf - begin;
+ }
+
+ static inline ALWAYS_INLINE size_t
+ GetCustomerLastName(char *buf, fast_random &r, int num)
+ {
+ return GetCustomerLastName((uint8_t *) buf, r, num);
+ }
+
+ static inline string
+ GetCustomerLastName(fast_random &r, int num)
+ {
+ string ret;
+ ret.resize(CustomerLastNameMaxSize);
+ ret.resize(GetCustomerLastName((uint8_t *) &ret[0], r, num));
+ return ret;
+ }
+
+ static inline ALWAYS_INLINE string
+ GetNonUniformCustomerLastNameLoad(fast_random &r)
+ {
+ return GetCustomerLastName(r, NonUniformRandom(r, 255, 157, 0, 999));
+ }
+
+ static inline ALWAYS_INLINE size_t
+ GetNonUniformCustomerLastNameRun(uint8_t *buf, fast_random &r)
+ {
+ return GetCustomerLastName(buf, r, NonUniformRandom(r, 255, 223, 0, 999));
+ }
+
+ static inline ALWAYS_INLINE size_t
+ GetNonUniformCustomerLastNameRun(char *buf, fast_random &r)
+ {
+ return GetNonUniformCustomerLastNameRun((uint8_t *) buf, r);
+ }
+
+ static inline ALWAYS_INLINE string
+ GetNonUniformCustomerLastNameRun(fast_random &r)
+ {
+ return GetCustomerLastName(r, NonUniformRandom(r, 255, 223, 0, 999));
+ }
+
+ // following oltpbench, we really generate strings of len - 1...
+ static inline string
+ RandomStr(fast_random &r, uint len)
+ {
+ // this is a property of the oltpbench implementation...
+ if (!len)
+ return "";
+
+ uint i = 0;
+ string buf(len - 1, 0);
+ while (i < (len - 1)) {
+ const char c = (char) r.next_char();
+ // XXX(stephentu): oltpbench uses java's Character.isLetter(), which
+ // is a less restrictive filter than isalnum()
+ if (!isalnum(c))
+ continue;
+ buf[i++] = c;
+ }
+ return buf;
+ }
+
+ // RandomNStr() actually produces a string of length len
+ static inline string
+ RandomNStr(fast_random &r, uint len)
+ {
+ const char base = '0';
+ string buf(len, 0);
+ for (uint i = 0; i < len; i++)
+ buf[i] = (char)(base + (r.next() % 10));
+ return buf;
+ }
+
+};
+
+STATIC_COUNTER_DECL(scopedperf::tsc_ctr, tpcc_txn, tpcc_txn_cg)
+
+template <typename Database, bool AllowReadOnlyScans>
+class tpcc_worker : public bench_worker, public tpcc_worker_mixin {
+public:
+ tpcc_worker(unsigned int worker_id,
+ unsigned long seed, Database *db,
+ const tpcc_tables<Database> &tables,
+ spin_barrier *barrier_a, spin_barrier *barrier_b,
+ uint warehouse_id_start, uint warehouse_id_end)
+ : bench_worker(worker_id, true, seed, db,
+ barrier_a, barrier_b),
+ tpcc_worker_mixin(),
+ tables(tables),
+ warehouse_id_start(warehouse_id_start),
+ warehouse_id_end(warehouse_id_end)
+ {
+ ALWAYS_ASSERT(g_disable_read_only_scans == !AllowReadOnlyScans);
+ INVARIANT(warehouse_id_start >= 1);
+ INVARIANT(warehouse_id_start <= NumWarehouses());
+ INVARIANT(warehouse_id_end > warehouse_id_start);
+ INVARIANT(warehouse_id_end <= (NumWarehouses() + 1));
+ NDB_MEMSET(&last_no_o_ids[0], 0, sizeof(last_no_o_ids));
+ if (verbose) {
+ cerr << "tpcc: worker id " << worker_id
+ << " => warehouses [" << warehouse_id_start
+ << ", " << warehouse_id_end << ")"
+ << endl;
+ }
+ }
+
+ // XXX(stephentu): tune this
+ static const size_t NMaxCustomerIdxScanElems = 512;
+
+ txn_result txn_new_order();
+
+ static txn_result
+ TxnNewOrder(bench_worker *w)
+ {
+ ANON_REGION("TxnNewOrder:", &tpcc_txn_cg);
+ return static_cast<tpcc_worker *>(w)->txn_new_order();
+ }
+
+ txn_result txn_delivery();
+
+ static txn_result
+ TxnDelivery(bench_worker *w)
+ {
+ ANON_REGION("TxnDelivery:", &tpcc_txn_cg);
+ return static_cast<tpcc_worker *>(w)->txn_delivery();
+ }
+
+ txn_result txn_payment();
+
+ static txn_result
+ TxnPayment(bench_worker *w)
+ {
+ ANON_REGION("TxnPayment:", &tpcc_txn_cg);
+ return static_cast<tpcc_worker *>(w)->txn_payment();
+ }
+
+ txn_result txn_order_status();
+
+ static txn_result
+ TxnOrderStatus(bench_worker *w)
+ {
+ ANON_REGION("TxnOrderStatus:", &tpcc_txn_cg);
+ return static_cast<tpcc_worker *>(w)->txn_order_status();
+ }
+
+ txn_result txn_stock_level();
+
+ static txn_result
+ TxnStockLevel(bench_worker *w)
+ {
+ ANON_REGION("TxnStockLevel:", &tpcc_txn_cg);
+ return static_cast<tpcc_worker *>(w)->txn_stock_level();
+ }
+
+ virtual workload_desc_vec
+ get_workload() const OVERRIDE
+ {
+ workload_desc_vec w;
+ // numbers from sigmod.csail.mit.edu:
+ //w.push_back(workload_desc("NewOrder", 1.0, TxnNewOrder)); // ~10k ops/sec
+ //w.push_back(workload_desc("Payment", 1.0, TxnPayment)); // ~32k ops/sec
+ //w.push_back(workload_desc("Delivery", 1.0, TxnDelivery)); // ~104k ops/sec
+ //w.push_back(workload_desc("OrderStatus", 1.0, TxnOrderStatus)); // ~33k ops/sec
+ //w.push_back(workload_desc("StockLevel", 1.0, TxnStockLevel)); // ~2k ops/sec
+ unsigned m = 0;
+ for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
+ m += g_txn_workload_mix[i];
+ ALWAYS_ASSERT(m == 100);
+ if (g_txn_workload_mix[0])
+ w.push_back(workload_desc("NewOrder", double(g_txn_workload_mix[0])/100.0, TxnNewOrder));
+ if (g_txn_workload_mix[1])
+ w.push_back(workload_desc("Payment", double(g_txn_workload_mix[1])/100.0, TxnPayment));
+ if (g_txn_workload_mix[2])
+ w.push_back(workload_desc("Delivery", double(g_txn_workload_mix[2])/100.0, TxnDelivery));
+ if (g_txn_workload_mix[3])
+ w.push_back(workload_desc("OrderStatus", double(g_txn_workload_mix[3])/100.0, TxnOrderStatus));
+ if (g_txn_workload_mix[4])
+ w.push_back(workload_desc("StockLevel", double(g_txn_workload_mix[4])/100.0, TxnStockLevel));
+ return w;
+ }
+
+protected:
+
+ virtual void
+ on_run_setup() OVERRIDE
+ {
+ if (!pin_cpus)
+ return;
+ const size_t a = worker_id % coreid::num_cpus_online();
+ const size_t b = a % nthreads;
+ rcu::s_instance.pin_current_thread(b);
+ rcu::s_instance.fault_region();
+ }
+
+private:
+ tpcc_tables<Database> tables;
+ const uint warehouse_id_start;
+ const uint warehouse_id_end;
+ int32_t last_no_o_ids[10]; // XXX(stephentu): hack
+};
+
+template <typename Database>
+class tpcc_warehouse_loader : public typed_bench_loader<Database>,
+ public tpcc_worker_mixin {
+public:
+ tpcc_warehouse_loader(unsigned long seed,
+ Database *db,
+ const tpcc_tables<Database> &tables)
+ : typed_bench_loader<Database>(seed, db),
+ tpcc_worker_mixin(),
+ tables(tables)
+ {}
+
+protected:
+ virtual void
+ load()
+ {
+ uint64_t warehouse_total_sz = 0, n_warehouses = 0;
+ try {
+ vector<warehouse::value> warehouses;
+ {
+ scoped_str_arena s_arena(this->arena);
+ typename Database::template
+ TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
+ for (uint i = 1; i <= NumWarehouses(); i++) {
+ const warehouse::key k(i);
+
+ const string w_name = RandomStr(this->r, RandomNumber(this->r, 6, 10));
+ const string w_street_1 = RandomStr(this->r, RandomNumber(this->r, 10, 20));
+ const string w_street_2 = RandomStr(this->r, RandomNumber(this->r, 10, 20));
+ const string w_city = RandomStr(this->r, RandomNumber(this->r, 10, 20));
+ const string w_state = RandomStr(this->r, 3);
+ const string w_zip = "123456789";
+
+ warehouse::value v;
+ v.w_ytd = 300000;
+ v.w_tax = (float) RandomNumber(this->r, 0, 2000) / 10000.0;
+ v.w_name.assign(w_name);
+ v.w_street_1.assign(w_street_1);
+ v.w_street_2.assign(w_street_2);
+ v.w_city.assign(w_city);
+ v.w_state.assign(w_state);
+ v.w_zip.assign(w_zip);
+
+ checker::SanityCheckWarehouse(&k, &v);
+ const size_t sz = Size(v);
+ warehouse_total_sz += sz;
+ n_warehouses++;
+ tables.tbl_warehouse(i)->insert(txn, k, v);
+ warehouses.push_back(v);
+ }
+ ALWAYS_ASSERT(txn.commit());
+ }
+ {
+ scoped_str_arena s_arena(this->arena);
+ typename Database::template
+ TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
+ for (uint i = 1; i <= NumWarehouses(); i++) {
+ const warehouse::key k(i);
+ warehouse::value v;
+ ALWAYS_ASSERT(tables.tbl_warehouse(i)->search(txn, k, v));
+ ALWAYS_ASSERT(warehouses[i - 1] == v);
+ checker::SanityCheckWarehouse(&k, &v);
+ }
+ ALWAYS_ASSERT(txn.commit());
+ }
+ } catch (typename Database::abort_exception_type &e) {
+ // shouldn't abort on loading!
+ ALWAYS_ASSERT(false);
+ }
+ if (verbose) {
+ cerr << "[INFO] finished loading warehouse" << endl;
+ cerr << "[INFO] * average warehouse record length: "
+ << (double(warehouse_total_sz)/double(n_warehouses)) << " bytes" << endl;
+ }
+ }
+
+ tpcc_tables<Database> tables;
+};
+
+template <typename Database>
+class tpcc_item_loader : public typed_bench_loader<Database>,
+ public tpcc_worker_mixin {
+public:
+ tpcc_item_loader(unsigned long seed,
+ Database *db,
+ const tpcc_tables<Database> &tables)
+ : typed_bench_loader<Database>(seed, db),
+ tpcc_worker_mixin(),
+ tables(tables)
+ {}
+
+protected:
+ virtual void
+ load()
+ {
+ const ssize_t bsize = this->typed_db()->txn_max_batch_size();
+ auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
+ uint64_t total_sz = 0;
+ try {
+ for (uint i = 1; i <= NumItems(); i++) {
+ // items don't "belong" to a certain warehouse, so no pinning
+ const item::key k(i);
+
+ item::value v;
+ const string i_name = RandomStr(this->r, RandomNumber(this->r, 14, 24));
+ v.i_name.assign(i_name);
+ v.i_price = (float) RandomNumber(this->r, 100, 10000) / 100.0;
+ const int len = RandomNumber(this->r, 26, 50);
+ if (RandomNumber(this->r, 1, 100) > 10) {
+ const string i_data = RandomStr(this->r, len);
+ v.i_data.assign(i_data);
+ } else {
+ const int startOriginal = RandomNumber(this->r, 2, (len - 8));
+ const string i_data = RandomStr(this->r, startOriginal + 1) + "ORIGINAL" + RandomStr(this->r, len - startOriginal - 7);
+ v.i_data.assign(i_data);
+ }
+ v.i_im_id = RandomNumber(this->r, 1, 10000);
+
+ checker::SanityCheckItem(&k, &v);
+ const size_t sz = Size(v);
+ total_sz += sz;
+ // XXX: replicate items table across all NUMA nodes
+ tables.tbl_item(1)->insert(*txn, k, v); // this table is shared, so any partition is OK
+
+ if (bsize != -1 && !(i % bsize)) {
+ ALWAYS_ASSERT(txn->commit());
+ txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
+ this->arena.reset();
+ }
+ }
+ ALWAYS_ASSERT(txn->commit());
+ } catch (typename Database::abort_exception_type &e) {
+ // shouldn't abort on loading!
+ ALWAYS_ASSERT(false);
+ }
+ if (verbose) {
+ cerr << "[INFO] finished loading item" << endl;
+ cerr << "[INFO] * average item record length: "
+ << (double(total_sz)/double(NumItems())) << " bytes" << endl;
+ }
+ }
+
+ tpcc_tables<Database> tables;
+};
+
+template <typename Database>
+class tpcc_stock_loader : public typed_bench_loader<Database>,
+ public tpcc_worker_mixin {
+public:
+ tpcc_stock_loader(unsigned long seed,
+ Database *db,
+ const tpcc_tables<Database> &tables,
+ ssize_t warehouse_id)
+ : typed_bench_loader<Database>(seed, db),
+ tpcc_worker_mixin(),
+ tables(tables),
+ warehouse_id(warehouse_id)
+ {
+ ALWAYS_ASSERT(warehouse_id == -1 ||
+ (warehouse_id >= 1 &&
+ static_cast<size_t>(warehouse_id) <= NumWarehouses()));
+ }
+
+protected:
+ virtual void
+ load()
+ {
+ uint64_t stock_total_sz = 0, n_stocks = 0;
+ const uint w_start = (warehouse_id == -1) ?
+ 1 : static_cast<uint>(warehouse_id);
+ const uint w_end = (warehouse_id == -1) ?
+ NumWarehouses() : static_cast<uint>(warehouse_id);
+
+ for (uint w = w_start; w <= w_end; w++) {
+ const size_t batchsize =
+ (this->typed_db()->txn_max_batch_size() == -1) ?
+ NumItems() : this->typed_db()->txn_max_batch_size();
+ const size_t nbatches = (batchsize > NumItems()) ? 1 : (NumItems() / batchsize);
+
+ if (pin_cpus)
+ PinToWarehouseId(w);
+
+ for (uint b = 0; b < nbatches;) {
+ scoped_str_arena s_arena(this->arena);
+ auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
+ try {
+ const size_t iend = std::min((b + 1) * batchsize + 1, NumItems());
+ for (uint i = (b * batchsize + 1); i <= iend; i++) {
+ const stock::key k(w, i);
+ const stock_data::key k_data(w, i);
+
+ stock::value v;
+ v.s_quantity = RandomNumber(this->r, 10, 100);
+ v.s_ytd = 0;
+ v.s_order_cnt = 0;
+ v.s_remote_cnt = 0;
+
+ stock_data::value v_data;
+ const int len = RandomNumber(this->r, 26, 50);
+ if (RandomNumber(this->r, 1, 100) > 10) {
+ const string s_data = RandomStr(this->r, len);
+ v_data.s_data.assign(s_data);
+ } else {
+ const int startOriginal = RandomNumber(this->r, 2, (len - 8));
+ const string s_data = RandomStr(this->r, startOriginal + 1) +
+ "ORIGINAL" + RandomStr(this->r, len - startOriginal - 7);
+ v_data.s_data.assign(s_data);
+ }
+ v_data.s_dist_01.assign(RandomStr(this->r, 24));
+ v_data.s_dist_02.assign(RandomStr(this->r, 24));
+ v_data.s_dist_03.assign(RandomStr(this->r, 24));
+ v_data.s_dist_04.assign(RandomStr(this->r, 24));
+ v_data.s_dist_05.assign(RandomStr(this->r, 24));
+ v_data.s_dist_06.assign(RandomStr(this->r, 24));
+ v_data.s_dist_07.assign(RandomStr(this->r, 24));
+ v_data.s_dist_08.assign(RandomStr(this->r, 24));
+ v_data.s_dist_09.assign(RandomStr(this->r, 24));
+ v_data.s_dist_10.assign(RandomStr(this->r, 24));
+
+ checker::SanityCheckStock(&k, &v);
+ const size_t sz = Size(v);
+ stock_total_sz += sz;
+ n_stocks++;
+ tables.tbl_stock(w)->insert(*txn, k, v);
+ tables.tbl_stock_data(w)->insert(*txn, k_data, v_data);
+ }
+ if (txn->commit()) {
+ b++;
+ } else {
+ if (verbose)
+ cerr << "[WARNING] stock loader loading abort" << endl;
+ }
+ } catch (typename Database::abort_exception_type &e) {
+ txn->abort();
+ ALWAYS_ASSERT(warehouse_id != -1);
+ if (verbose)
+ cerr << "[WARNING] stock loader loading abort" << endl;
+ }
+ }
+ }
+
+ if (verbose) {
+ if (warehouse_id == -1) {
+ cerr << "[INFO] finished loading stock" << endl;
+ cerr << "[INFO] * average stock record length: "
+ << (double(stock_total_sz)/double(n_stocks)) << " bytes" << endl;
+ } else {
+ cerr << "[INFO] finished loading stock (w=" << warehouse_id << ")" << endl;
+ }
+ }
+ }
+
+private:
+ tpcc_tables<Database> tables;
+ ssize_t warehouse_id;
+};
+
+template <typename Database>
+class tpcc_district_loader : public typed_bench_loader<Database>,
+ public tpcc_worker_mixin {
+public:
+ tpcc_district_loader(unsigned long seed,
+ Database *db,
+ const tpcc_tables<Database> &tables)
+ : typed_bench_loader<Database>(seed, db),
+ tpcc_worker_mixin(),
+ tables(tables)
+ {}
+
+protected:
+ virtual void
+ load()
+ {
+ const ssize_t bsize = this->typed_db()->txn_max_batch_size();
+ auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
+ uint64_t district_total_sz = 0, n_districts = 0;
+ try {
+ uint cnt = 0;
+ for (uint w = 1; w <= NumWarehouses(); w++) {
+ if (pin_cpus)
+ PinToWarehouseId(w);
+ for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++, cnt++) {
+ const district::key k(w, d);
+
+ district::value v;
+ v.d_ytd = 30000;
+ v.d_tax = (float) (RandomNumber(this->r, 0, 2000) / 10000.0);
+ v.d_next_o_id = 3001;
+ v.d_name.assign(RandomStr(this->r, RandomNumber(this->r, 6, 10)));
+ v.d_street_1.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
+ v.d_street_2.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
+ v.d_city.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
+ v.d_state.assign(RandomStr(this->r, 3));
+ v.d_zip.assign("123456789");
+
+ checker::SanityCheckDistrict(&k, &v);
+ const size_t sz = Size(v);
+ district_total_sz += sz;
+ n_districts++;
+ tables.tbl_district(w)->insert(*txn, k, v);
+
+ if (bsize != -1 && !((cnt + 1) % bsize)) {
+ ALWAYS_ASSERT(txn->commit());
+ txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
+ this->arena.reset();
+ }
+ }
+ }
+ ALWAYS_ASSERT(txn->commit());
+ } catch (typename Database::abort_exception_type &e) {
+ // shouldn't abort on loading!
+ ALWAYS_ASSERT(false);
+ }
+ if (verbose) {
+ cerr << "[INFO] finished loading district" << endl;
+ cerr << "[INFO] * average district record length: "
+ << (double(district_total_sz)/double(n_districts)) << " bytes" << endl;
+ }
+ }
+
+ tpcc_tables<Database> tables;
+};
+
+template <typename Database>
+class tpcc_customer_loader : public typed_bench_loader<Database>,
+ public tpcc_worker_mixin {
+public:
+ tpcc_customer_loader(unsigned long seed,
+ Database *db,
+ const tpcc_tables<Database> &tables,
+ ssize_t warehouse_id)
+ : typed_bench_loader<Database>(seed, db),
+ tpcc_worker_mixin(),
+ tables(tables),
+ warehouse_id(warehouse_id)
+ {
+ ALWAYS_ASSERT(warehouse_id == -1 ||
+ (warehouse_id >= 1 &&
+ static_cast<size_t>(warehouse_id) <= NumWarehouses()));
+ }
+
+protected:
+ virtual void
+ load()
+ {
+ const uint w_start = (warehouse_id == -1) ?
+ 1 : static_cast<uint>(warehouse_id);
+ const uint w_end = (warehouse_id == -1) ?
+ NumWarehouses() : static_cast<uint>(warehouse_id);
+ const size_t batchsize =
+ (this->typed_db()->txn_max_batch_size() == -1) ?
+ NumCustomersPerDistrict() : this->typed_db()->txn_max_batch_size();
+ const size_t nbatches =
+ (batchsize > NumCustomersPerDistrict()) ?
+ 1 : (NumCustomersPerDistrict() / batchsize);
+ cerr << "num batches: " << nbatches << endl;
+
+ uint64_t total_sz = 0;
+
+ for (uint w = w_start; w <= w_end; w++) {
+ if (pin_cpus)
+ PinToWarehouseId(w);
+ for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
+ for (uint batch = 0; batch < nbatches;) {
+ scoped_str_arena s_arena(this->arena);
+ typename Database::template TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
+ const size_t cstart = batch * batchsize;
+ const size_t cend = std::min((batch + 1) * batchsize, NumCustomersPerDistrict());
+ try {
+ for (uint cidx0 = cstart; cidx0 < cend; cidx0++) {
+ const uint c = cidx0 + 1;
+ const customer::key k(w, d, c);
+
+ customer::value v;
+ v.c_discount = (float) (RandomNumber(this->r, 1, 5000) / 10000.0);
+ if (RandomNumber(this->r, 1, 100) <= 10)
+ v.c_credit.assign("BC");
+ else
+ v.c_credit.assign("GC");
+
+ if (c <= 1000)
+ v.c_last.assign(GetCustomerLastName(this->r, c - 1));
+ else
+ v.c_last.assign(GetNonUniformCustomerLastNameLoad(this->r));
+
+ v.c_first.assign(RandomStr(this->r, RandomNumber(this->r, 8, 16)));
+ v.c_credit_lim = 50000;
+
+ v.c_balance = -10;
+ v.c_ytd_payment = 10;
+ v.c_payment_cnt = 1;
+ v.c_delivery_cnt = 0;
+
+ v.c_street_1.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
+ v.c_street_2.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
+ v.c_city.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
+ v.c_state.assign(RandomStr(this->r, 3));
+ v.c_zip.assign(RandomNStr(this->r, 4) + "11111");
+ v.c_phone.assign(RandomNStr(this->r, 16));
+ v.c_since = GetCurrentTimeMillis();
+ v.c_middle.assign("OE");
+ v.c_data.assign(RandomStr(this->r, RandomNumber(this->r, 300, 500)));
+
+ checker::SanityCheckCustomer(&k, &v);
+ const size_t sz = Size(v);
+ total_sz += sz;
+ tables.tbl_customer(w)->insert(txn, k, v);
+
+ // customer name index
+ const customer_name_idx::key k_idx(k.c_w_id, k.c_d_id, v.c_last.str(true), v.c_first.str(true));
+ const customer_name_idx::value v_idx(k.c_id);
+
+ // index structure is:
+ // (c_w_id, c_d_id, c_last, c_first) -> (c_id)
+
+ tables.tbl_customer_name_idx(w)->insert(txn, k_idx, v_idx);
+
+ history::key k_hist;
+ k_hist.h_c_id = c;
+ k_hist.h_c_d_id = d;
+ k_hist.h_c_w_id = w;
+ k_hist.h_d_id = d;
+ k_hist.h_w_id = w;
+ k_hist.h_date = GetCurrentTimeMillis();
+
+ history::value v_hist;
+ v_hist.h_amount = 10;
+ v_hist.h_data.assign(RandomStr(this->r, RandomNumber(this->r, 10, 24)));
+
+ tables.tbl_history(w)->insert(txn, k_hist, v_hist);
+ }
+ if (txn.commit()) {
+ batch++;
+ } else {
+ txn.abort();
+ if (verbose)
+ cerr << "[WARNING] customer loader loading abort" << endl;
+ }
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ if (verbose)
+ cerr << "[WARNING] customer loader loading abort" << endl;
+ }
+ }
+ }
+ }
+
+ if (verbose) {
+ if (warehouse_id == -1) {
+ cerr << "[INFO] finished loading customer" << endl;
+ cerr << "[INFO] * average customer record length: "
+ << (double(total_sz)/double(NumWarehouses()*NumDistrictsPerWarehouse()*NumCustomersPerDistrict()))
+ << " bytes " << endl;
+ } else {
+ cerr << "[INFO] finished loading customer (w=" << warehouse_id << ")" << endl;
+ }
+ }
+ }
+
+private:
+ tpcc_tables<Database> tables;
+ ssize_t warehouse_id;
+};
+
+template <typename Database>
+class tpcc_order_loader : public typed_bench_loader<Database>,
+ public tpcc_worker_mixin {
+public:
+ tpcc_order_loader(unsigned long seed,
+ Database *db,
+ const tpcc_tables<Database> &tables,
+ ssize_t warehouse_id)
+ : typed_bench_loader<Database>(seed, db),
+ tpcc_worker_mixin(),
+ tables(tables),
+ warehouse_id(warehouse_id)
+ {
+ ALWAYS_ASSERT(warehouse_id == -1 ||
+ (warehouse_id >= 1 &&
+ static_cast<size_t>(warehouse_id) <= NumWarehouses()));
+ }
+
+protected:
+ virtual void
+ load()
+ {
+ uint64_t order_line_total_sz = 0, n_order_lines = 0;
+ uint64_t oorder_total_sz = 0, n_oorders = 0;
+ uint64_t new_order_total_sz = 0, n_new_orders = 0;
+
+ const uint w_start = (warehouse_id == -1) ?
+ 1 : static_cast<uint>(warehouse_id);
+ const uint w_end = (warehouse_id == -1) ?
+ NumWarehouses() : static_cast<uint>(warehouse_id);
+
+ for (uint w = w_start; w <= w_end; w++) {
+ if (pin_cpus)
+ PinToWarehouseId(w);
+ for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
+ set<uint> c_ids_s;
+ vector<uint> c_ids;
+ while (c_ids.size() != NumCustomersPerDistrict()) {
+ const auto x = (this->r.next() % NumCustomersPerDistrict()) + 1;
+ if (c_ids_s.count(x))
+ continue;
+ c_ids_s.insert(x);
+ c_ids.emplace_back(x);
+ }
+ for (uint c = 1; c <= NumCustomersPerDistrict();) {
+ scoped_str_arena s_arena(this->arena);
+ typename Database::template TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
+ try {
+ const oorder::key k_oo(w, d, c);
+
+ oorder::value v_oo;
+ v_oo.o_c_id = c_ids[c - 1];
+ if (k_oo.o_id < 2101)
+ v_oo.o_carrier_id = RandomNumber(this->r, 1, 10);
+ else
+ v_oo.o_carrier_id = 0;
+ v_oo.o_ol_cnt = RandomNumber(this->r, 5, 15);
+ v_oo.o_all_local = 1;
+ v_oo.o_entry_d = GetCurrentTimeMillis();
+
+ checker::SanityCheckOOrder(&k_oo, &v_oo);
+ const size_t sz = Size(v_oo);
+ oorder_total_sz += sz;
+ n_oorders++;
+ tables.tbl_oorder(w)->insert(txn, k_oo, v_oo);
+
+ const oorder_c_id_idx::key k_oo_idx(k_oo.o_w_id, k_oo.o_d_id, v_oo.o_c_id, k_oo.o_id);
+ const oorder_c_id_idx::value v_oo_idx(0);
+
+ tables.tbl_oorder_c_id_idx(w)->insert(txn, k_oo_idx, v_oo_idx);
+
+ if (c >= 2101) {
+ const new_order::key k_no(w, d, c);
+ const new_order::value v_no;
+
+ checker::SanityCheckNewOrder(&k_no, &v_no);
+ const size_t sz = Size(v_no);
+ new_order_total_sz += sz;
+ n_new_orders++;
+ tables.tbl_new_order(w)->insert(txn, k_no, v_no);
+ }
+
+ for (uint l = 1; l <= uint(v_oo.o_ol_cnt); l++) {
+ const order_line::key k_ol(w, d, c, l);
+
+ order_line::value v_ol;
+ v_ol.ol_i_id = RandomNumber(this->r, 1, 100000);
+ if (k_ol.ol_o_id < 2101) {
+ v_ol.ol_delivery_d = v_oo.o_entry_d;
+ v_ol.ol_amount = 0;
+ } else {
+ v_ol.ol_delivery_d = 0;
+ // random within [0.01 .. 9,999.99]
+ v_ol.ol_amount = (float) (RandomNumber(this->r, 1, 999999) / 100.0);
+ }
+
+ v_ol.ol_supply_w_id = k_ol.ol_w_id;
+ v_ol.ol_quantity = 5;
+ // v_ol.ol_dist_info comes from stock_data(ol_supply_w_id, ol_o_id)
+ //v_ol.ol_dist_info = RandomStr(this->r, 24);
+
+ checker::SanityCheckOrderLine(&k_ol, &v_ol);
+ const size_t sz = Size(v_ol);
+ order_line_total_sz += sz;
+ n_order_lines++;
+ tables.tbl_order_line(w)->insert(txn, k_ol, v_ol);
+ }
+ if (txn.commit()) {
+ c++;
+ } else {
+ txn.abort();
+ ALWAYS_ASSERT(warehouse_id != -1);
+ if (verbose)
+ cerr << "[WARNING] order loader loading abort" << endl;
+ }
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ ALWAYS_ASSERT(warehouse_id != -1);
+ if (verbose)
+ cerr << "[WARNING] order loader loading abort" << endl;
+ }
+ }
+ }
+ }
+
+ if (verbose) {
+ if (warehouse_id == -1) {
+ cerr << "[INFO] finished loading order" << endl;
+ cerr << "[INFO] * average order_line record length: "
+ << (double(order_line_total_sz)/double(n_order_lines)) << " bytes" << endl;
+ cerr << "[INFO] * average oorder record length: "
+ << (double(oorder_total_sz)/double(n_oorders)) << " bytes" << endl;
+ cerr << "[INFO] * average new_order record length: "
+ << (double(new_order_total_sz)/double(n_new_orders)) << " bytes" << endl;
+ } else {
+ cerr << "[INFO] finished loading order (w=" << warehouse_id << ")" << endl;
+ }
+ }
+ }
+
+private:
+ tpcc_tables<Database> tables;
+ ssize_t warehouse_id;
+};
+
+static event_counter evt_tpcc_cross_partition_new_order_txns("tpcc_cross_partition_new_order_txns");
+static event_counter evt_tpcc_cross_partition_payment_txns("tpcc_cross_partition_payment_txns");
+
+template <typename Database, bool AllowReadOnlyScans>
+typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
+tpcc_worker<Database, AllowReadOnlyScans>::txn_new_order()
+{
+ const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
+ const uint districtID = RandomNumber(this->r, 1, 10);
+ const uint customerID = GetCustomerId(r);
+ const uint numItems = RandomNumber(this->r, 5, 15);
+ uint itemIDs[15], supplierWarehouseIDs[15], orderQuantities[15];
+ bool allLocal = true;
+ for (uint i = 0; i < numItems; i++) {
+ itemIDs[i] = GetItemId(r);
+ if (likely(g_disable_xpartition_txn ||
+ NumWarehouses() == 1 ||
+ RandomNumber(this->r, 1, 100) > g_new_order_remote_item_pct)) {
+ supplierWarehouseIDs[i] = warehouse_id;
+ } else {
+ do {
+ supplierWarehouseIDs[i] = RandomNumber(this->r, 1, NumWarehouses());
+ } while (supplierWarehouseIDs[i] == warehouse_id);
+ allLocal = false;
+ }
+ orderQuantities[i] = RandomNumber(this->r, 1, 10);
+ }
+ INVARIANT(!g_disable_xpartition_txn || allLocal);
+ if (!allLocal)
+ ++evt_tpcc_cross_partition_new_order_txns;
+
+ // XXX(stephentu): implement rollback
+ //
+ // worst case txn profile:
+ // 1 customer get
+ // 1 warehouse get
+ // 1 district get
+ // 1 new_order insert
+ // 1 district put
+ // 1 oorder insert
+ // 1 oorder_cid_idx insert
+ // 15 times:
+ // 1 item get
+ // 1 stock get
+ // 1 stock put
+ // 1 order_line insert
+ //
+ // output from txn counters:
+ // max_absent_range_set_size : 0
+ // max_absent_set_size : 0
+ // max_node_scan_size : 0
+ // max_read_set_size : 15
+ // max_write_set_size : 15
+ // num_txn_contexts : 9
+ typename Database::template TransactionType<abstract_db::HINT_TPCC_NEW_ORDER>::type txn(txn_flags, this->arena);
+ scoped_str_arena s_arena(this->arena);
+ scoped_multilock<spinlock> mlock;
+ if (g_enable_partition_locks) {
+ if (allLocal) {
+ mlock.enq(LockForPartition(warehouse_id));
+ } else {
+ small_unordered_map<unsigned int, bool, 64> lockset;
+ mlock.enq(LockForPartition(warehouse_id));
+ lockset[PartitionId(warehouse_id)] = 1;
+ for (uint i = 0; i < numItems; i++) {
+ if (lockset.find(PartitionId(supplierWarehouseIDs[i])) == lockset.end()) {
+ mlock.enq(LockForPartition(supplierWarehouseIDs[i]));
+ lockset[PartitionId(supplierWarehouseIDs[i])] = 1;
+ }
+ }
+ }
+ mlock.multilock();
+ }
+ try {
+ ssize_t ret = 0;
+ const customer::key k_c(warehouse_id, districtID, customerID);
+ customer::value v_c;
+ ALWAYS_ASSERT(
+ tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c,
+ GUARDED_FIELDS(
+ customer::value::c_discount_field,
+ customer::value::c_last_field,
+ customer::value::c_credit_field)));
+ checker::SanityCheckCustomer(&k_c, &v_c);
+
+ const warehouse::key k_w(warehouse_id);
+ warehouse::value v_w;
+ ALWAYS_ASSERT(
+ tables.tbl_warehouse(warehouse_id)->search(txn, k_w, v_w,
+ GUARDED_FIELDS(warehouse::value::w_tax_field)));
+ checker::SanityCheckWarehouse(&k_w, &v_w);
+
+ const district::key k_d(warehouse_id, districtID);
+ district::value v_d;
+ ALWAYS_ASSERT(
+ tables.tbl_district(warehouse_id)->search(txn, k_d, v_d,
+ GUARDED_FIELDS(
+ district::value::d_next_o_id_field,
+ district::value::d_tax_field)));
+ checker::SanityCheckDistrict(&k_d, &v_d);
+
+ const uint64_t my_next_o_id = g_new_order_fast_id_gen ?
+ FastNewOrderIdGen(warehouse_id, districtID) : v_d.d_next_o_id;
+
+ const new_order::key k_no(warehouse_id, districtID, my_next_o_id);
+ const new_order::value v_no;
+ const size_t new_order_sz = Size(v_no);
+ tables.tbl_new_order(warehouse_id)->insert(txn, k_no, v_no);
+ ret += new_order_sz;
+
+ if (!g_new_order_fast_id_gen) {
+ v_d.d_next_o_id++;
+ tables.tbl_district(warehouse_id)->put(txn, k_d, v_d,
+ GUARDED_FIELDS(district::value::d_next_o_id_field));
+ }
+
+ const oorder::key k_oo(warehouse_id, districtID, k_no.no_o_id);
+ oorder::value v_oo;
+ v_oo.o_c_id = int32_t(customerID);
+ v_oo.o_carrier_id = 0; // seems to be ignored
+ v_oo.o_ol_cnt = int8_t(numItems);
+ v_oo.o_all_local = allLocal;
+ v_oo.o_entry_d = GetCurrentTimeMillis();
+
+ const size_t oorder_sz = Size(v_oo);
+ tables.tbl_oorder(warehouse_id)->insert(txn, k_oo, v_oo);
+ ret += oorder_sz;
+
+ const oorder_c_id_idx::key k_oo_idx(warehouse_id, districtID, customerID, k_no.no_o_id);
+ const oorder_c_id_idx::value v_oo_idx(0);
+
+ tables.tbl_oorder_c_id_idx(warehouse_id)->insert(txn, k_oo_idx, v_oo_idx);
+
+ for (uint ol_number = 1; ol_number <= numItems; ol_number++) {
+ const uint ol_supply_w_id = supplierWarehouseIDs[ol_number - 1];
+ const uint ol_i_id = itemIDs[ol_number - 1];
+ const uint ol_quantity = orderQuantities[ol_number - 1];
+
+ const item::key k_i(ol_i_id);
+ item::value v_i;
+ ALWAYS_ASSERT(tables.tbl_item(1)->search(txn, k_i, v_i,
+ GUARDED_FIELDS(
+ item::value::i_price_field,
+ item::value::i_name_field,
+ item::value::i_data_field)));
+ checker::SanityCheckItem(&k_i, &v_i);
+
+ const stock::key k_s(ol_supply_w_id, ol_i_id);
+ stock::value v_s;
+ ALWAYS_ASSERT(tables.tbl_stock(ol_supply_w_id)->search(txn, k_s, v_s));
+ checker::SanityCheckStock(&k_s, &v_s);
+
+ stock::value v_s_new(v_s);
+ if (v_s_new.s_quantity - ol_quantity >= 10)
+ v_s_new.s_quantity -= ol_quantity;
+ else
+ v_s_new.s_quantity += -int32_t(ol_quantity) + 91;
+ v_s_new.s_ytd += ol_quantity;
+ v_s_new.s_remote_cnt += (ol_supply_w_id == warehouse_id) ? 0 : 1;
+
+ tables.tbl_stock(ol_supply_w_id)->put(txn, k_s, v_s_new);
+
+ const order_line::key k_ol(warehouse_id, districtID, k_no.no_o_id, ol_number);
+ order_line::value v_ol;
+ v_ol.ol_i_id = int32_t(ol_i_id);
+ v_ol.ol_delivery_d = 0; // not delivered yet
+ v_ol.ol_amount = float(ol_quantity) * v_i.i_price;
+ v_ol.ol_supply_w_id = int32_t(ol_supply_w_id);
+ v_ol.ol_quantity = int8_t(ol_quantity);
+
+ const size_t order_line_sz = Size(v_ol);
+ tables.tbl_order_line(warehouse_id)->insert(txn, k_ol, v_ol);
+ ret += order_line_sz;
+ }
+
+ //measure_txn_counters(txn, "txn_new_order");
+ if (likely(txn.commit()))
+ return txn_result(true, ret);
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ }
+ return txn_result(false, 0);
+}
+
+template <typename Database>
+class new_order_scan_callback : public
+ Database::template IndexType<schema<new_order>>::type::search_range_callback {
+public:
+ new_order_scan_callback() : invoked(false) {}
+ virtual bool
+ invoke(const new_order::key &key, const new_order::value &value) OVERRIDE
+ {
+ INVARIANT(!invoked);
+ k_no = key;
+ invoked = true;
+#ifdef CHECK_INVARIANTS
+ checker::SanityCheckNewOrder(&key, &value);
+#endif
+ return false;
+ }
+ inline const new_order::key &
+ get_key() const
+ {
+ return k_no;
+ }
+ inline bool was_invoked() const { return invoked; }
+private:
+ new_order::key k_no;
+ bool invoked;
+};
+
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, delivery_probe0_tod, delivery_probe0_cg)
+
+template <typename Database, bool AllowReadOnlyScans>
+typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
+tpcc_worker<Database, AllowReadOnlyScans>::txn_delivery()
+{
+ const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
+ const uint o_carrier_id = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
+ const uint32_t ts = GetCurrentTimeMillis();
+
+ // worst case txn profile:
+ // 10 times:
+ // 1 new_order scan node
+ // 1 oorder get
+ // 2 order_line scan nodes
+ // 15 order_line puts
+ // 1 new_order remove
+ // 1 oorder put
+ // 1 customer get
+ // 1 customer put
+ //
+ // output from counters:
+ // max_absent_range_set_size : 0
+ // max_absent_set_size : 0
+ // max_node_scan_size : 21
+ // max_read_set_size : 133
+ // max_write_set_size : 133
+ // num_txn_contexts : 4
+ typename Database::template TransactionType<abstract_db::HINT_TPCC_DELIVERY>::type txn(txn_flags, this->arena);
+ scoped_str_arena s_arena(this->arena);
+ scoped_lock_guard<spinlock> slock(
+ g_enable_partition_locks ? &LockForPartition(warehouse_id) : nullptr);
+ try {
+ ssize_t ret = 0;
+ for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
+ const new_order::key k_no_0(warehouse_id, d, last_no_o_ids[d - 1]);
+ const new_order::key k_no_1(warehouse_id, d, numeric_limits<int32_t>::max());
+ new_order_scan_callback<Database> new_order_c;
+ {
+ ANON_REGION("DeliverNewOrderScan:", &delivery_probe0_cg);
+ tables.tbl_new_order(warehouse_id)->search_range_call(
+ txn, k_no_0, &k_no_1, new_order_c);
+ }
+
+ const new_order::key &k_no = new_order_c.get_key();
+ if (unlikely(!new_order_c.was_invoked()))
+ continue;
+ last_no_o_ids[d - 1] = k_no.no_o_id + 1; // XXX: update last seen
+
+ const oorder::key k_oo(warehouse_id, d, k_no.no_o_id);
+ oorder::value v_oo;
+ if (unlikely(!tables.tbl_oorder(warehouse_id)->search(txn, k_oo, v_oo,
+ GUARDED_FIELDS(
+ oorder::value::o_c_id_field,
+ oorder::value::o_carrier_id_field)))) {
+ // even if we read the new order entry, there's no guarantee
+ // we will read the oorder entry: in this case the txn will abort,
+ // but we're simply bailing out early
+ txn.abort();
+ return txn_result(false, 0);
+ }
+ checker::SanityCheckOOrder(&k_oo, &v_oo);
+
+ // never more than 15 order_lines per order
+ static_limit_callback<typename Database::template IndexType<schema<order_line>>::type, 15, false> c;
+ const order_line::key k_oo_0(warehouse_id, d, k_no.no_o_id, 0);
+ const order_line::key k_oo_1(warehouse_id, d, k_no.no_o_id, numeric_limits<int32_t>::max());
+
+ // XXX(stephentu): mutable scans would help here
+ tables.tbl_order_line(warehouse_id)->search_range_call(
+ txn, k_oo_0, &k_oo_1, c, false,
+ GUARDED_FIELDS(
+ order_line::value::ol_amount_field,
+ order_line::value::ol_delivery_d_field));
+ float sum = 0.0;
+ for (size_t i = 0; i < c.size(); i++) {
+#if defined(CHECK_INVARIANTS) && defined(DISABLE_FIELD_SELECTION)
+ checker::SanityCheckOrderLine(&c.key(i), &c.value(i));
+#endif
+ sum += c.value(i).ol_amount;
+ c.value(i).ol_delivery_d = ts;
+ tables.tbl_order_line(warehouse_id)->put(txn, c.key(i), c.value(i),
+ GUARDED_FIELDS(order_line::value::ol_delivery_d_field));
+ }
+
+ // delete new order
+ tables.tbl_new_order(warehouse_id)->remove(txn, k_no);
+ ret -= 0 /*new_order_c.get_value_size()*/;
+
+ // update oorder
+ v_oo.o_carrier_id = o_carrier_id;
+ tables.tbl_oorder(warehouse_id)->put(txn, k_oo, v_oo,
+ GUARDED_FIELDS(oorder::value::o_carrier_id_field));
+
+ const uint c_id = v_oo.o_c_id;
+ const float ol_total = sum;
+
+ // update customer
+ const customer::key k_c(warehouse_id, d, c_id);
+ customer::value v_c;
+ ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c,
+ GUARDED_FIELDS(customer::value::c_balance_field)));
+ v_c.c_balance += ol_total;
+ tables.tbl_customer(warehouse_id)->put(txn, k_c, v_c,
+ GUARDED_FIELDS(customer::value::c_balance_field));
+ }
+ //measure_txn_counters(txn, "txn_delivery");
+ if (likely(txn.commit()))
+ return txn_result(true, ret);
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ }
+ return txn_result(false, 0);
+}
+
+static event_avg_counter evt_avg_cust_name_idx_scan_size("avg_cust_name_idx_scan_size");
+
+template <typename Database, bool AllowReadOnlyScans>
+typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
+tpcc_worker<Database, AllowReadOnlyScans>::txn_payment()
+{
+ const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
+ const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
+ uint customerDistrictID, customerWarehouseID;
+ if (likely(g_disable_xpartition_txn ||
+ NumWarehouses() == 1 ||
+ RandomNumber(this->r, 1, 100) <= 85)) {
+ customerDistrictID = districtID;
+ customerWarehouseID = warehouse_id;
+ } else {
+ customerDistrictID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
+ do {
+ customerWarehouseID = RandomNumber(this->r, 1, NumWarehouses());
+ } while (customerWarehouseID == warehouse_id);
+ }
+ const float paymentAmount = (float) (RandomNumber(this->r, 100, 500000) / 100.0);
+ const uint32_t ts = GetCurrentTimeMillis();
+ INVARIANT(!g_disable_xpartition_txn || customerWarehouseID == warehouse_id);
+
+ // output from txn counters:
+ // max_absent_range_set_size : 0
+ // max_absent_set_size : 0
+ // max_node_scan_size : 10
+ // max_read_set_size : 71
+ // max_write_set_size : 1
+ // num_txn_contexts : 5
+ typename Database::template TransactionType<abstract_db::HINT_TPCC_PAYMENT>::type txn(txn_flags, this->arena);
+ scoped_str_arena s_arena(this->arena);
+ scoped_multilock<spinlock> mlock;
+ if (g_enable_partition_locks) {
+ mlock.enq(LockForPartition(warehouse_id));
+ if (PartitionId(customerWarehouseID) != PartitionId(warehouse_id))
+ mlock.enq(LockForPartition(customerWarehouseID));
+ mlock.multilock();
+ }
+ if (customerWarehouseID != warehouse_id)
+ ++evt_tpcc_cross_partition_payment_txns;
+ try {
+ ssize_t ret = 0;
+
+ const warehouse::key k_w(warehouse_id);
+ warehouse::value v_w;
+ ALWAYS_ASSERT(
+ tables.tbl_warehouse(warehouse_id)->search(txn, k_w, v_w, GUARDED_FIELDS(
+ warehouse::value::w_ytd_field,
+ warehouse::value::w_street_1_field,
+ warehouse::value::w_street_1_field,
+ warehouse::value::w_city_field,
+ warehouse::value::w_state_field,
+ warehouse::value::w_zip_field,
+ warehouse::value::w_name_field)));
+ checker::SanityCheckWarehouse(&k_w, &v_w);
+
+ v_w.w_ytd += paymentAmount;
+ tables.tbl_warehouse(warehouse_id)->put(txn, k_w, v_w, GUARDED_FIELDS(warehouse::value::w_ytd_field));
+
+ const district::key k_d(warehouse_id, districtID);
+ district::value v_d;
+ ALWAYS_ASSERT(tables.tbl_district(warehouse_id)->search(txn, k_d, v_d, GUARDED_FIELDS(
+ district::value::d_ytd_field,
+ district::value::d_street_1_field,
+ district::value::d_street_1_field,
+ district::value::d_city_field,
+ district::value::d_state_field,
+ district::value::d_zip_field,
+ district::value::d_name_field)));
+ checker::SanityCheckDistrict(&k_d, &v_d);
+
+ v_d.d_ytd += paymentAmount;
+ tables.tbl_district(warehouse_id)->put(txn, k_d, v_d, GUARDED_FIELDS(district::value::d_ytd_field));
+
+ customer::key k_c;
+ customer::value v_c;
+ if (RandomNumber(this->r, 1, 100) <= 60) {
+ // cust by name
+ uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
+ static_assert(sizeof(lastname_buf) == 16, "XX");
+ NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
+ GetNonUniformCustomerLastNameRun(lastname_buf, r);
+
+ static const string zeros(16, 0);
+ static const string ones(16, 255);
+
+ customer_name_idx::key k_c_idx_0;
+ k_c_idx_0.c_w_id = customerWarehouseID;
+ k_c_idx_0.c_d_id = customerDistrictID;
+ k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
+ k_c_idx_0.c_first.assign(zeros);
+
+ customer_name_idx::key k_c_idx_1;
+ k_c_idx_1.c_w_id = customerWarehouseID;
+ k_c_idx_1.c_d_id = customerDistrictID;
+ k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
+ k_c_idx_1.c_first.assign(ones);
+
+ // probably a safe bet for now
+ bytes_static_limit_callback<
+ typename Database::template IndexType<schema<customer_name_idx>>::type,
+ NMaxCustomerIdxScanElems, true> c(s_arena.get());
+ tables.tbl_customer_name_idx(customerWarehouseID)->bytes_search_range_call(
+ txn, k_c_idx_0, &k_c_idx_1, c);
+ ALWAYS_ASSERT(c.size() > 0);
+ INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
+ int index = c.size() / 2;
+ if (c.size() % 2 == 0)
+ index--;
+ evt_avg_cust_name_idx_scan_size.offer(c.size());
+
+ customer_name_idx::value v_c_idx_temp;
+ const customer_name_idx::value *v_c_idx = Decode(c.value(index), v_c_idx_temp);
+
+ k_c.c_w_id = customerWarehouseID;
+ k_c.c_d_id = customerDistrictID;
+ k_c.c_id = v_c_idx->c_id;
+ ALWAYS_ASSERT(tables.tbl_customer(customerWarehouseID)->search(txn, k_c, v_c));
+ } else {
+ // cust by ID
+ const uint customerID = GetCustomerId(r);
+ k_c.c_w_id = customerWarehouseID;
+ k_c.c_d_id = customerDistrictID;
+ k_c.c_id = customerID;
+ ALWAYS_ASSERT(tables.tbl_customer(customerWarehouseID)->search(txn, k_c, v_c));
+ }
+ checker::SanityCheckCustomer(&k_c, &v_c);
+
+ v_c.c_balance -= paymentAmount;
+ v_c.c_ytd_payment += paymentAmount;
+ v_c.c_payment_cnt++;
+ if (strncmp(v_c.c_credit.data(), "BC", 2) == 0) {
+ char buf[501];
+ int n = snprintf(buf, sizeof(buf), "%d %d %d %d %d %f | %s",
+ k_c.c_id,
+ k_c.c_d_id,
+ k_c.c_w_id,
+ districtID,
+ warehouse_id,
+ paymentAmount,
+ v_c.c_data.c_str());
+ v_c.c_data.resize_junk(
+ min(static_cast<size_t>(n), v_c.c_data.max_size()));
+ NDB_MEMCPY((void *) v_c.c_data.data(), &buf[0], v_c.c_data.size());
+ }
+
+ tables.tbl_customer(customerWarehouseID)->put(txn, k_c, v_c);
+
+ const history::key k_h(k_c.c_d_id, k_c.c_w_id, k_c.c_id, districtID, warehouse_id, ts);
+ history::value v_h;
+ v_h.h_amount = paymentAmount;
+ v_h.h_data.resize_junk(v_h.h_data.max_size());
+ int n = snprintf((char *) v_h.h_data.data(), v_h.h_data.max_size() + 1,
+ "%.10s %.10s",
+ v_w.w_name.c_str(),
+ v_d.d_name.c_str());
+ v_h.h_data.resize_junk(min(static_cast<size_t>(n), v_h.h_data.max_size()));
+
+ const size_t history_sz = Size(v_h);
+ tables.tbl_history(warehouse_id)->insert(txn, k_h, v_h);
+ ret += history_sz;
+
+ //measure_txn_counters(txn, "txn_payment");
+ if (likely(txn.commit()))
+ return txn_result(true, ret);
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ }
+ return txn_result(false, 0);
+}
+
+template <typename Database>
+class order_line_nop_callback : public
+ Database::template IndexType<schema<order_line>>::type::bytes_search_range_callback {
+
+public:
+ order_line_nop_callback() : n(0) {}
+ virtual bool invoke(
+ const string &key,
+ const string &value)
+ {
+ INVARIANT(key.size() == sizeof(order_line::key));
+ order_line::value v_ol_temp;
+ const order_line::value *v_ol UNUSED = Decode(value, v_ol_temp);
+#ifdef CHECK_INVARIANTS
+ order_line::key k_ol_temp;
+ const order_line::key *k_ol = Decode(key, k_ol_temp);
+ checker::SanityCheckOrderLine(k_ol, v_ol);
+#endif
+ ++n;
+ return true;
+ }
+ size_t n;
+};
+
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, order_status_probe0_tod, order_status_probe0_cg)
+
+template <typename Database, bool AllowReadOnlyScans>
+typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
+tpcc_worker<Database, AllowReadOnlyScans>::txn_order_status()
+{
+ const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
+ const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
+
+ // output from txn counters:
+ // max_absent_range_set_size : 0
+ // max_absent_set_size : 0
+ // max_node_scan_size : 13
+ // max_read_set_size : 81
+ // max_write_set_size : 0
+ // num_txn_contexts : 4
+ const uint64_t read_only_mask =
+ !AllowReadOnlyScans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
+ typename Database::template TransactionType<
+ AllowReadOnlyScans ?
+ abstract_db::HINT_TPCC_ORDER_STATUS_READ_ONLY :
+ abstract_db::HINT_TPCC_ORDER_STATUS>::type txn(
+ txn_flags | read_only_mask, this->arena);
+ scoped_str_arena s_arena(this->arena);
+ // NB: since txn_order_status() is a RO txn, we assume that
+ // locking is un-necessary (since we can just read from some old snapshot)
+ try {
+
+ customer::key k_c;
+ customer::value v_c;
+ if (RandomNumber(this->r, 1, 100) <= 60) {
+ // cust by name
+ uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
+ static_assert(sizeof(lastname_buf) == 16, "xx");
+ NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
+ GetNonUniformCustomerLastNameRun(lastname_buf, r);
+
+ static const string zeros(16, 0);
+ static const string ones(16, 255);
+
+ customer_name_idx::key k_c_idx_0;
+ k_c_idx_0.c_w_id = warehouse_id;
+ k_c_idx_0.c_d_id = districtID;
+ k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
+ k_c_idx_0.c_first.assign(zeros);
+
+ customer_name_idx::key k_c_idx_1;
+ k_c_idx_1.c_w_id = warehouse_id;
+ k_c_idx_1.c_d_id = districtID;
+ k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
+ k_c_idx_1.c_first.assign(ones);
+
+ // NMaxCustomerIdxScanElems is probably a safe bet for now
+ bytes_static_limit_callback<
+ typename Database::template IndexType<schema<customer_name_idx>>::type,
+ NMaxCustomerIdxScanElems, true> c(s_arena.get());
+ tables.tbl_customer_name_idx(warehouse_id)->bytes_search_range_call(
+ txn, k_c_idx_0, &k_c_idx_1, c);
+ ALWAYS_ASSERT(c.size() > 0);
+ INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
+ int index = c.size() / 2;
+ if (c.size() % 2 == 0)
+ index--;
+ evt_avg_cust_name_idx_scan_size.offer(c.size());
+
+ customer_name_idx::value v_c_idx_temp;
+ const customer_name_idx::value *v_c_idx = Decode(c.value(index), v_c_idx_temp);
+
+ k_c.c_w_id = warehouse_id;
+ k_c.c_d_id = districtID;
+ k_c.c_id = v_c_idx->c_id;
+ ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c));
+
+ } else {
+ // cust by ID
+ const uint customerID = GetCustomerId(r);
+ k_c.c_w_id = warehouse_id;
+ k_c.c_d_id = districtID;
+ k_c.c_id = customerID;
+ ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c));
+ }
+ checker::SanityCheckCustomer(&k_c, &v_c);
+
+ // XXX(stephentu): HACK- we bound the # of elems returned by this scan to
+ // 15- this is because we don't have reverse scans. In an ideal system, a
+ // reverse scan would only need to read 1 btree node. We could simulate a
+ // lookup by only reading the first element- but then we would *always*
+ // read the first order by any customer. To make this more interesting, we
+ // randomly select which elem to pick within the 1st or 2nd btree nodes.
+ // This is obviously a deviation from TPC-C, but it shouldn't make that
+ // much of a difference in terms of performance numbers (in fact we are
+ // making it worse for us)
+ latest_key_callback<
+ typename Database::template IndexType<schema<oorder_c_id_idx>>::type> c_oorder(
+ *this->arena.next(), (r.next() % 15) + 1);
+ const oorder_c_id_idx::key k_oo_idx_0(warehouse_id, districtID, k_c.c_id, 0);
+ const oorder_c_id_idx::key k_oo_idx_1(warehouse_id, districtID, k_c.c_id, numeric_limits<int32_t>::max());
+ {
+ ANON_REGION("OrderStatusOOrderScan:", &order_status_probe0_cg);
+ tables.tbl_oorder_c_id_idx(warehouse_id)->bytes_search_range_call(
+ txn, k_oo_idx_0, &k_oo_idx_1, c_oorder);
+ }
+ ALWAYS_ASSERT(c_oorder.size());
+
+ oorder_c_id_idx::key k_oo_idx_temp;
+ const oorder_c_id_idx::key *k_oo_idx = Decode(c_oorder.kstr(), k_oo_idx_temp);
+ const uint o_id = k_oo_idx->o_o_id;
+
+ // XXX: fix
+ // XXX(stephentu): what's wrong w/ it?
+ order_line_nop_callback<Database> c_order_line;
+ const order_line::key k_ol_0(warehouse_id, districtID, o_id, 0);
+ const order_line::key k_ol_1(warehouse_id, districtID, o_id, numeric_limits<int32_t>::max());
+ tables.tbl_order_line(warehouse_id)->bytes_search_range_call(
+ txn, k_ol_0, &k_ol_1, c_order_line);
+ ALWAYS_ASSERT(c_order_line.n >= 5 && c_order_line.n <= 15);
+
+ //measure_txn_counters(txn, "txn_order_status");
+ if (likely(txn.commit()))
+ return txn_result(true, 0);
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ }
+ return txn_result(false, 0);
+}
+
+template <typename Database>
+class order_line_scan_callback : public
+ Database::template IndexType<schema<order_line>>::type::bytes_search_range_callback {
+public:
+ order_line_scan_callback() : n(0) {}
+ virtual bool invoke(
+ const string &key,
+ const string &value)
+ {
+ INVARIANT(key.size() == sizeof(order_line::key));
+ order_line::value v_ol;
+
+#ifdef DISABLE_FIELD_SELECTION
+ const uint64_t mask = numeric_limits<uint64_t>::max();
+#else
+ const uint64_t mask = compute_fields_mask(0);
+#endif
+
+ typed_txn_btree_<schema<order_line>>::do_record_read(
+ (const uint8_t *) value.data(), value.size(), mask, &v_ol);
+
+#if defined(CHECK_INVARIANTS) && defined(DISABLE_FIELD_SELECTION)
+ order_line::key k_ol_temp;
+ const order_line::key *k_ol = Decode(key, k_ol_temp);
+ checker::SanityCheckOrderLine(k_ol, &v_ol);
+#endif
+
+ s_i_ids[v_ol.ol_i_id] = 1;
+ n++;
+ return true;
+ }
+ size_t n;
+ small_unordered_map<uint, bool, 512> s_i_ids;
+};
+
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe0_tod, stock_level_probe0_cg)
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe1_tod, stock_level_probe1_cg)
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe2_tod, stock_level_probe2_cg)
+
+static event_avg_counter evt_avg_stock_level_loop_join_lookups("stock_level_loop_join_lookups");
+
+template <typename Database, bool AllowReadOnlyScans>
+typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
+tpcc_worker<Database, AllowReadOnlyScans>::txn_stock_level()
+{
+ const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
+ const uint threshold = RandomNumber(this->r, 10, 20);
+ const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
+
+ // output from txn counters:
+ // max_absent_range_set_size : 0
+ // max_absent_set_size : 0
+ // max_node_scan_size : 19
+ // max_read_set_size : 241
+ // max_write_set_size : 0
+ // n_node_scan_large_instances : 1
+ // n_read_set_large_instances : 2
+ // num_txn_contexts : 3
+ const uint64_t read_only_mask =
+ !AllowReadOnlyScans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
+ typename Database::template TransactionType<
+ AllowReadOnlyScans ?
+ abstract_db::HINT_TPCC_STOCK_LEVEL_READ_ONLY :
+ abstract_db::HINT_TPCC_STOCK_LEVEL>::type txn(
+ txn_flags | read_only_mask, this->arena);
+ scoped_str_arena s_arena(this->arena);
+ // NB: since txn_stock_level() is a RO txn, we assume that
+ // locking is un-necessary (since we can just read from some old snapshot)
+ try {
+ const district::key k_d(warehouse_id, districtID);
+ district::value v_d;
+ ALWAYS_ASSERT(tables.tbl_district(warehouse_id)->search(txn, k_d, v_d));
+ checker::SanityCheckDistrict(&k_d, &v_d);
+ const uint64_t cur_next_o_id = g_new_order_fast_id_gen ?
+ NewOrderIdHolder(warehouse_id, districtID).load(memory_order_acquire) :
+ v_d.d_next_o_id;
+
+ // manual joins are fun!
+ order_line_scan_callback<Database> c;
+ const int32_t lower = cur_next_o_id >= 20 ? (cur_next_o_id - 20) : 0;
+ const order_line::key k_ol_0(warehouse_id, districtID, lower, 0);
+ const order_line::key k_ol_1(warehouse_id, districtID, cur_next_o_id, 0);
+ {
+ // mask must be kept in sync w/ order_line_scan_callback
+#ifdef DISABLE_FIELD_SELECTION
+ const size_t nfields = order_line::value::NFIELDS;
+#else
+ const size_t nfields = 1;
+#endif
+ ANON_REGION("StockLevelOrderLineScan:", &stock_level_probe0_cg);
+ tables.tbl_order_line(warehouse_id)->bytes_search_range_call(
+ txn, k_ol_0, &k_ol_1, c, nfields);
+ }
+ {
+ small_unordered_map<uint, bool, 512> s_i_ids_distinct;
+ for (auto &p : c.s_i_ids) {
+ ANON_REGION("StockLevelLoopJoinIter:", &stock_level_probe1_cg);
+ const stock::key k_s(warehouse_id, p.first);
+ stock::value v_s;
+ INVARIANT(p.first >= 1 && p.first <= NumItems());
+ {
+ ANON_REGION("StockLevelLoopJoinGet:", &stock_level_probe2_cg);
+ ALWAYS_ASSERT(tables.tbl_stock(warehouse_id)->search(txn, k_s, v_s, GUARDED_FIELDS(stock::value::s_quantity_field)));
+ }
+ if (v_s.s_quantity < int(threshold))
+ s_i_ids_distinct[p.first] = 1;
+ }
+ evt_avg_stock_level_loop_join_lookups.offer(c.s_i_ids.size());
+ // NB(stephentu): s_i_ids_distinct.size() is the computed result of this txn
+ }
+ //measure_txn_counters(txn, "txn_stock_level");
+ if (likely(txn.commit()))
+ return txn_result(true, 0);
+ } catch (typename Database::abort_exception_type &e) {
+ txn.abort();
+ }
+ return txn_result(false, 0);
+}
+
+template <typename T>
+static vector<T>
+unique_filter(const vector<T> &v)
+{
+ set<T> seen;
+ vector<T> ret;
+ for (auto &e : v)
+ if (!seen.count(e)) {
+ ret.emplace_back(e);
+ seen.insert(e);
+ }
+ return ret;
+}
+
+template <typename Database, bool AllowReadOnlyScans>
+class tpcc_bench_runner : public typed_bench_runner<Database> {
+private:
+
+ static bool
+ IsTableReadOnly(const char *name)
+ {
+ return strcmp("item", name) == 0;
+ }
+
+ static bool
+ IsTableAppendOnly(const char *name)
+ {
+ return strcmp("history", name) == 0 ||
+ strcmp("oorder_c_id_idx", name) == 0;
+ }
+
+ template <typename Schema>
+ static vector<shared_ptr<typename Database::template IndexType<Schema>::type>>
+ OpenTablesForTablespace(Database *db, const char *name, size_t expected_size)
+ {
+ const bool is_read_only = IsTableReadOnly(name);
+ const bool is_append_only = IsTableAppendOnly(name);
+ const string s_name(name);
+ vector<typename Database::template IndexType<Schema>::ptr_type> ret(NumWarehouses());
+ if (g_enable_separate_tree_per_partition && !is_read_only) {
+ if (NumWarehouses() <= nthreads) {
+ for (size_t i = 0; i < NumWarehouses(); i++)
+ ret[i] = db->template open_index<Schema>(s_name + "_" + to_string(i), expected_size, is_append_only);
+ } else {
+ const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
+ for (size_t partid = 0; partid < nthreads; partid++) {
+ const unsigned wstart = partid * nwhse_per_partition;
+ const unsigned wend = (partid + 1 == nthreads) ?
+ NumWarehouses() : (partid + 1) * nwhse_per_partition;
+ auto idx =
+ db->template open_index<Schema>(s_name + "_" + to_string(partid), expected_size, is_append_only);
+ for (size_t i = wstart; i < wend; i++)
+ ret[i] = idx;
+ }
+ }
+ } else {
+ auto idx = db->template open_index<Schema>(s_name, expected_size, is_append_only);
+ for (size_t i = 0; i < NumWarehouses(); i++)
+ ret[i] = idx;
+ }
+ return ret;
+ }
+
+public:
+ tpcc_bench_runner(Database *db)
+ : typed_bench_runner<Database>(db)
+ {
+
+#define OPEN_TABLESPACE_X(x) \
+ do { \
+ tables.tbl_ ## x ## _vec = OpenTablesForTablespace<schema<x>>(db, #x, sizeof(x::value)); \
+ auto v = unique_filter(tables.tbl_ ## x ## _vec); \
+ for (size_t i = 0; i < v.size(); i++) \
+ this->open_tables[string(#x) + "_" + to_string(i)] = v[i]; \
+ } while (0);
+
+ TPCC_TABLE_LIST(OPEN_TABLESPACE_X);
+
+#undef OPEN_TABLESPACE_X
+
+ if (g_enable_partition_locks) {
+ static_assert(sizeof(aligned_padded_elem<spinlock>) == CACHELINE_SIZE, "xx");
+ void * const px = memalign(CACHELINE_SIZE, sizeof(aligned_padded_elem<spinlock>) * nthreads);
+ ALWAYS_ASSERT(px);
+ ALWAYS_ASSERT(reinterpret_cast<uintptr_t>(px) % CACHELINE_SIZE == 0);
+ g_partition_locks = reinterpret_cast<aligned_padded_elem<spinlock> *>(px);
+ for (size_t i = 0; i < nthreads; i++) {
+ new (&g_partition_locks[i]) aligned_padded_elem<spinlock>();
+ ALWAYS_ASSERT(!g_partition_locks[i].elem.is_locked());
+ }
+ }
+
+ if (g_new_order_fast_id_gen) {
+ void * const px =
+ memalign(
+ CACHELINE_SIZE,
+ sizeof(aligned_padded_elem<atomic<uint64_t>>) *
+ NumWarehouses() * NumDistrictsPerWarehouse());
+ g_district_ids = reinterpret_cast<aligned_padded_elem<atomic<uint64_t>> *>(px);
+ for (size_t i = 0; i < NumWarehouses() * NumDistrictsPerWarehouse(); i++)
+ new (&g_district_ids[i]) atomic<uint64_t>(3001);
+ }
+ }
+
+protected:
+ virtual vector<unique_ptr<bench_loader>>
+ make_loaders()
+ {
+ vector<unique_ptr<bench_loader>> ret;
+ ret.emplace_back(new tpcc_warehouse_loader<Database>(9324, this->typed_db(), tables));
+ ret.emplace_back(new tpcc_item_loader<Database>(235443, this->typed_db(), tables));
+ if (enable_parallel_loading) {
+ fast_random r(89785943);
+ for (uint i = 1; i <= NumWarehouses(); i++)
+ ret.emplace_back(new tpcc_stock_loader<Database>(r.next(), this->typed_db(), tables, i));
+ } else {
+ ret.emplace_back(new tpcc_stock_loader<Database>(89785943, this->typed_db(), tables, -1));
+ }
+ ret.emplace_back(new tpcc_district_loader<Database>(129856349, this->typed_db(), tables));
+ if (enable_parallel_loading) {
+ fast_random r(923587856425);
+ for (uint i = 1; i <= NumWarehouses(); i++)
+ ret.emplace_back(new tpcc_customer_loader<Database>(r.next(), this->typed_db(), tables, i));
+ } else {
+ ret.emplace_back(new tpcc_customer_loader<Database>(923587856425, this->typed_db(), tables, -1));
+ }
+ if (enable_parallel_loading) {
+ fast_random r(2343352);
+ for (uint i = 1; i <= NumWarehouses(); i++)
+ ret.emplace_back(new tpcc_order_loader<Database>(r.next(), this->typed_db(), tables, i));
+ } else {
+ ret.emplace_back(new tpcc_order_loader<Database>(2343352, this->typed_db(), tables, -1));
+ }
+ return ret;
+ }
+
+private:
+ template <bool RO>
+ vector<unique_ptr<bench_worker>>
+ make_workers_impl()
+ {
+ const unsigned alignment = coreid::num_cpus_online();
+ const int blockstart =
+ coreid::allocate_contiguous_aligned_block(nthreads, alignment);
+ ALWAYS_ASSERT(blockstart >= 0);
+ ALWAYS_ASSERT((blockstart % alignment) == 0);
+ fast_random r(23984543);
+ vector<unique_ptr<bench_worker>> ret;
+ if (NumWarehouses() <= nthreads) {
+ for (size_t i = 0; i < nthreads; i++)
+ ret.emplace_back(
+ new tpcc_worker<Database, RO>(
+ blockstart + i,
+ r.next(), this->typed_db(), tables,
+ &this->barrier_a, &this->barrier_b,
+ (i % NumWarehouses()) + 1, (i % NumWarehouses()) + 2));
+ } else {
+ const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
+ for (size_t i = 0; i < nthreads; i++) {
+ const unsigned wstart = i * nwhse_per_partition;
+ const unsigned wend = (i + 1 == nthreads) ?
+ NumWarehouses() : (i + 1) * nwhse_per_partition;
+ ret.emplace_back(
+ new tpcc_worker<Database, RO>(
+ blockstart + i,
+ r.next(), this->typed_db(), tables,
+ &this->barrier_a, &this->barrier_b, wstart+1, wend+2));
+ }
+ }
+ return ret;
+ }
+
+protected:
+ virtual vector<unique_ptr<bench_worker>>
+ make_workers()
+ {
+ return g_disable_read_only_scans ? make_workers_impl<false>() : make_workers_impl<true>();
+ }
+
+private:
+ tpcc_tables<Database> tables;
+};
+
+template <typename Database>
+static unique_ptr<bench_runner>
+MakeBenchRunner(Database *db)
+{
+ return unique_ptr<bench_runner>(
+ g_disable_read_only_scans ?
+ static_cast<bench_runner *>(new tpcc_bench_runner<Database, false>(db)) :
+ static_cast<bench_runner *>(new tpcc_bench_runner<Database, true>(db)));
+}
+
+void
+tpcc_do_test(const string &dbtype,
+ const persistconfig &cfg,
+ int argc, char **argv)
+{
+ // parse options
+ optind = 1;
+ bool did_spec_remote_pct = false;
+ while (1) {
+ static struct option long_options[] =
+ {
+ {"disable-cross-partition-transactions" , no_argument , &g_disable_xpartition_txn , 1} ,
+ {"disable-read-only-snapshots" , no_argument , &g_disable_read_only_scans , 1} ,
+ {"enable-partition-locks" , no_argument , &g_enable_partition_locks , 1} ,
+ {"enable-separate-tree-per-partition" , no_argument , &g_enable_separate_tree_per_partition , 1} ,
+ {"new-order-remote-item-pct" , required_argument , 0 , 'r'} ,
+ {"new-order-fast-id-gen" , no_argument , &g_new_order_fast_id_gen , 1} ,
+ {"uniform-item-dist" , no_argument , &g_uniform_item_dist , 1} ,
+ {"workload-mix" , required_argument , 0 , 'w'} ,
+ {0, 0, 0, 0}
+ };
+ int option_index = 0;
+ int c = getopt_long(argc, argv, "r:", long_options, &option_index);
+ if (c == -1)
+ break;
+ switch (c) {
+ case 0:
+ if (long_options[option_index].flag != 0)
+ break;
+ abort();
+ break;
+
+ case 'r':
+ g_new_order_remote_item_pct = strtoul(optarg, NULL, 10);
+ ALWAYS_ASSERT(g_new_order_remote_item_pct >= 0 && g_new_order_remote_item_pct <= 100);
+ did_spec_remote_pct = true;
+ break;
+
+ case 'w':
+ {
+ const vector<string> toks = split(optarg, ',');
+ ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
+ unsigned s = 0;
+ for (size_t i = 0; i < toks.size(); i++) {
+ unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
+ ALWAYS_ASSERT(p >= 0 && p <= 100);
+ s += p;
+ g_txn_workload_mix[i] = p;
+ }
+ ALWAYS_ASSERT(s == 100);
+ }
+ break;
+
+ case '?':
+ /* getopt_long already printed an error message. */
+ exit(1);
+
+ default:
+ abort();
+ }
+ }
+
+ if (did_spec_remote_pct && g_disable_xpartition_txn) {
+ cerr << "WARNING: --new-order-remote-item-pct given with --disable-cross-partition-transactions" << endl;
+ cerr << " --new-order-remote-item-pct will have no effect" << endl;
+ }
+
+ if (verbose) {
+ cerr << "tpcc settings:" << endl;
+ cerr << " cross_partition_transactions : " << !g_disable_xpartition_txn << endl;
+ cerr << " read_only_snapshots : " << !g_disable_read_only_scans << endl;
+ cerr << " partition_locks : " << g_enable_partition_locks << endl;
+ cerr << " separate_tree_per_partition : " << g_enable_separate_tree_per_partition << endl;
+ cerr << " new_order_remote_item_pct : " << g_new_order_remote_item_pct << endl;
+ cerr << " new_order_fast_id_gen : " << g_new_order_fast_id_gen << endl;
+ cerr << " uniform_item_dist : " << g_uniform_item_dist << endl;
+ cerr << " workload_mix : " <<
+ format_list(g_txn_workload_mix,
+ g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix)) << endl;
+ }
+
+ unique_ptr<abstract_db> db;
+ unique_ptr<bench_runner> r;
+
+ if (dbtype == "ndb-proto2") {
+ if (!cfg.logfiles_.empty()) {
+ vector<vector<unsigned>> assignments_used;
+ txn_logger::Init(
+ nthreads, cfg.logfiles_, cfg.assignments_, &assignments_used,
+ !cfg.nofsync_,
+ cfg.do_compress_,
+ cfg.fake_writes_);
+ if (verbose) {
+ cerr << "[logging subsystem]" << endl;
+ cerr << " assignments: " << assignments_used << endl;
+ cerr << " call fsync : " << !cfg.nofsync_ << endl;
+ cerr << " compression: " << cfg.do_compress_ << endl;
+ cerr << " fake_writes: " << cfg.fake_writes_ << endl;
+ }
+ }
+#ifdef PROTO2_CAN_DISABLE_GC
+ if (!cfg.disable_gc_)
+ transaction_proto2_static::InitGC();
+#endif
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+ if (cfg.disable_snapshots_)
+ transaction_proto2_static::DisableSnapshots();
+#endif
+ typedef ndb_database<transaction_proto2> Database;
+ Database *raw = new Database;
+ db.reset(raw);
+ r = MakeBenchRunner(raw);
+ } else if (dbtype == "kvdb-st") {
+ typedef kvdb_database<false> Database;
+ Database *raw = new Database;
+ db.reset(raw);
+ r = MakeBenchRunner(raw);
+ } else
+ ALWAYS_ASSERT(false);
+
+ r->run();
+}