10 #include "../macros.h"
11 #include "../varkey.h"
12 #include "../thread.h"
14 #include "../spinbarrier.h"
24 queue_key(uint64_t id0, uint64_t id1)
26 big_endian_trfm<uint64_t> t;
27 string buf(2 * sizeof(uint64_t), 0);
28 uint64_t *p = (uint64_t *) &buf[0];
34 static const string queue_values("ABCDEFGH");
36 class queue_worker : public bench_worker {
38 queue_worker(unsigned int worker_id,
39 unsigned long seed, abstract_db *db,
40 const map<string, abstract_ordered_index *> &open_tables,
41 spin_barrier *barrier_a, spin_barrier *barrier_b,
42 uint64_t id, bool consumer)
43 : bench_worker(worker_id, false, seed, db,
44 open_tables, barrier_a, barrier_b),
45 tbl(open_tables.at("table")), id(id), consumer(consumer),
46 ctr(consumer ? 0 : nkeys)
53 void *txn = db->new_txn(txn_flags, arena, txn_buf());
55 const string k = queue_key(id, ctr);
56 tbl->insert(txn, k, queue_values);
57 if (likely(db->commit_txn(txn))) {
59 return txn_result(true, queue_values.size());
61 } catch (abstract_db::abstract_abort_exception &ex) {
64 return txn_result(false, 0);
68 TxnProduce(bench_worker *w)
70 return static_cast<queue_worker *>(w)->txn_produce();
76 void *txn = db->new_txn(txn_flags, arena, txn_buf());
78 const string lowk = queue_key(id, 0);
79 const string highk = queue_key(id, numeric_limits<uint64_t>::max());
81 tbl->scan(txn, lowk, &highk, c);
83 if (likely(!c.values.empty())) {
84 ALWAYS_ASSERT(c.values.size() == 1);
85 const string &k = c.values.front().first;
87 ret = -queue_values.size();
89 if (likely(db->commit_txn(txn)))
90 return txn_result(true, ret);
91 } catch (abstract_db::abstract_abort_exception &ex) {
94 return txn_result(false, 0);
98 TxnConsume(bench_worker *w)
100 return static_cast<queue_worker *>(w)->txn_consume();
104 txn_consume_scanhint()
106 void *txn = db->new_txn(txn_flags, arena, txn_buf());
108 const string lowk = queue_key(id, ctr);
109 const string highk = queue_key(id, numeric_limits<uint64_t>::max());
111 tbl->scan(txn, lowk, &highk, c);
112 const bool found = !c.values.empty();
115 ALWAYS_ASSERT(c.values.size() == 1);
116 const string &k = c.values.front().first;
118 ret = -queue_values.size();
120 if (likely(db->commit_txn(txn))) {
121 if (likely(found)) ctr++;
122 return txn_result(true, ret);
124 } catch (abstract_db::abstract_abort_exception &ex) {
127 return txn_result(false, 0);
131 TxnConsumeScanHint(bench_worker *w)
133 return static_cast<queue_worker *>(w)->txn_consume_scanhint();
139 void *txn = db->new_txn(txn_flags, arena, txn_buf());
141 const string k = queue_key(id, ctr);
145 if (likely((found = tbl->get(txn, k, v)))) {
147 ret = -queue_values.size();
149 if (likely(db->commit_txn(txn))) {
150 if (likely(found)) ctr++;
151 return txn_result(true, ret);
153 } catch (abstract_db::abstract_abort_exception &ex) {
156 return txn_result(false, 0);
160 TxnConsumeNoScan(bench_worker *w)
162 return static_cast<queue_worker *>(w)->txn_consume_noscan();
165 virtual workload_desc_vec
170 w.push_back(workload_desc("Consume", 1.0, TxnConsume));
171 //w.push_back(workload_desc("ConsumeScanHint", 1.0, TxnConsumeScanHint));
172 //w.push_back(workload_desc("ConsumeNoScan", 1.0, TxnConsumeNoScan));
174 w.push_back(workload_desc("Produce", 1.0, TxnProduce));
179 abstract_ordered_index *tbl;
185 class queue_table_loader : public bench_loader {
187 queue_table_loader(unsigned long seed,
189 const map<string, abstract_ordered_index *> &open_tables)
190 : bench_loader(seed, db, open_tables)
197 abstract_ordered_index *tbl = open_tables.at("table");
200 const size_t batchsize = (db->txn_max_batch_size() == -1) ?
201 10000 : db->txn_max_batch_size();
202 ALWAYS_ASSERT(batchsize > 0);
203 const size_t nbatches = nkeys / batchsize;
204 for (size_t id = 0; id < nthreads / 2; id++) {
206 void *txn = db->new_txn(txn_flags, arena, txn_buf());
207 for (size_t j = 0; j < nkeys; j++) {
208 const string k = queue_key(id, j);
209 const string &v = queue_values;
210 tbl->insert(txn, k, v);
213 cerr << "batch 1/1 done" << endl;
214 ALWAYS_ASSERT(db->commit_txn(txn));
216 for (size_t i = 0; i < nbatches; i++) {
217 size_t keyend = (i == nbatches - 1) ? nkeys : (i + 1) * batchsize;
218 void *txn = db->new_txn(txn_flags, arena, txn_buf());
219 for (size_t j = i * batchsize; j < keyend; j++) {
220 const string k = queue_key(id, j);
221 const string &v = queue_values;
222 tbl->insert(txn, k, v);
225 cerr << "batch " << (i + 1) << "/" << nbatches << " done" << endl;
226 ALWAYS_ASSERT(db->commit_txn(txn));
230 } catch (abstract_db::abstract_abort_exception &ex) {
231 // shouldn't abort on loading!
232 ALWAYS_ASSERT(false);
235 cerr << "[INFO] finished loading table" << endl;
239 class queue_bench_runner : public bench_runner {
241 queue_bench_runner(abstract_db *db, bool write_only)
242 : bench_runner(db), write_only(write_only)
244 open_tables["table"] = db->open_index("table", queue_values.size());
248 virtual vector<bench_loader *>
251 vector<bench_loader *> ret;
252 ret.push_back(new queue_table_loader(0, db, open_tables));
256 virtual vector<bench_worker *>
259 fast_random r(8544290);
260 vector<bench_worker *> ret;
262 for (size_t i = 0; i < nthreads; i++)
265 i, r.next(), db, open_tables,
266 &barrier_a, &barrier_b, i, false));
268 ALWAYS_ASSERT(nthreads >= 2);
269 if (verbose && (nthreads % 2))
270 cerr << "queue_bench_runner: odd number of workers given" << endl;
271 for (size_t i = 0; i < nthreads / 2; i++) {
274 i, r.next(), db, open_tables,
275 &barrier_a, &barrier_b, i, true));
278 i + 1, r.next(), db, open_tables,
279 &barrier_a, &barrier_b, i, false));
290 queue_do_test(abstract_db *db, int argc, char **argv)
292 nkeys = size_t(scale_factor * 1000.0);
293 ALWAYS_ASSERT(nkeys > 0);
294 queue_bench_runner r(db, true);