benchmark silo added
[c11concurrency-benchmarks.git] / silo / benchmarks / queue.cc
1 #include <iostream>
2 #include <sstream>
3 #include <vector>
4 #include <utility>
5 #include <string>
6
7 #include <stdlib.h>
8 #include <unistd.h>
9
10 #include "../macros.h"
11 #include "../varkey.h"
12 #include "../thread.h"
13 #include "../util.h"
14 #include "../spinbarrier.h"
15
16 #include "bench.h"
17
18 using namespace std;
19 using namespace util;
20
21 static size_t nkeys;
22
23 static inline string
24 queue_key(uint64_t id0, uint64_t id1)
25 {
26   big_endian_trfm<uint64_t> t;
27   string buf(2 * sizeof(uint64_t), 0);
28   uint64_t *p = (uint64_t *) &buf[0];
29   *p++ = t(id0);
30   *p++ = t(id1);
31   return buf;
32 }
33
34 static const string queue_values("ABCDEFGH");
35
36 class queue_worker : public bench_worker {
37 public:
38   queue_worker(unsigned int worker_id,
39                unsigned long seed, abstract_db *db,
40                const map<string, abstract_ordered_index *> &open_tables,
41                spin_barrier *barrier_a, spin_barrier *barrier_b,
42                uint64_t id, bool consumer)
43     : bench_worker(worker_id, false, seed, db,
44                    open_tables, barrier_a, barrier_b),
45       tbl(open_tables.at("table")), id(id), consumer(consumer),
46       ctr(consumer ? 0 : nkeys)
47   {
48   }
49
50   txn_result
51   txn_produce()
52   {
53     void *txn = db->new_txn(txn_flags, arena, txn_buf());
54     try {
55       const string k = queue_key(id, ctr);
56       tbl->insert(txn, k, queue_values);
57       if (likely(db->commit_txn(txn))) {
58         ctr++;
59         return txn_result(true, queue_values.size());
60       }
61     } catch (abstract_db::abstract_abort_exception &ex) {
62       db->abort_txn(txn);
63     }
64     return txn_result(false, 0);
65   }
66
67   static txn_result
68   TxnProduce(bench_worker *w)
69   {
70     return static_cast<queue_worker *>(w)->txn_produce();
71   }
72
73   txn_result
74   txn_consume()
75   {
76     void *txn = db->new_txn(txn_flags, arena, txn_buf());
77     try {
78       const string lowk = queue_key(id, 0);
79       const string highk = queue_key(id, numeric_limits<uint64_t>::max());
80       limit_callback c(1);
81       tbl->scan(txn, lowk, &highk, c);
82       ssize_t ret = 0;
83       if (likely(!c.values.empty())) {
84         ALWAYS_ASSERT(c.values.size() == 1);
85         const string &k = c.values.front().first;
86         tbl->remove(txn, k);
87         ret = -queue_values.size();
88       }
89       if (likely(db->commit_txn(txn)))
90         return txn_result(true, ret);
91     } catch (abstract_db::abstract_abort_exception &ex) {
92       db->abort_txn(txn);
93     }
94     return txn_result(false, 0);
95   }
96
97   static txn_result
98   TxnConsume(bench_worker *w)
99   {
100     return static_cast<queue_worker *>(w)->txn_consume();
101   }
102
103   txn_result
104   txn_consume_scanhint()
105   {
106     void *txn = db->new_txn(txn_flags, arena, txn_buf());
107     try {
108       const string lowk = queue_key(id, ctr);
109       const string highk = queue_key(id, numeric_limits<uint64_t>::max());
110       limit_callback c(1);
111       tbl->scan(txn, lowk, &highk, c);
112       const bool found = !c.values.empty();
113       ssize_t ret = 0;
114       if (likely(found)) {
115         ALWAYS_ASSERT(c.values.size() == 1);
116         const string &k = c.values.front().first;
117         tbl->remove(txn, k);
118         ret = -queue_values.size();
119       }
120       if (likely(db->commit_txn(txn))) {
121         if (likely(found)) ctr++;
122         return txn_result(true, ret);
123       }
124     } catch (abstract_db::abstract_abort_exception &ex) {
125       db->abort_txn(txn);
126     }
127     return txn_result(false, 0);
128   }
129
130   static txn_result
131   TxnConsumeScanHint(bench_worker *w)
132   {
133     return static_cast<queue_worker *>(w)->txn_consume_scanhint();
134   }
135
136   txn_result
137   txn_consume_noscan()
138   {
139     void *txn = db->new_txn(txn_flags, arena, txn_buf());
140     try {
141       const string k = queue_key(id, ctr);
142       string v;
143       bool found = false;
144       ssize_t ret = 0;
145       if (likely((found = tbl->get(txn, k, v)))) {
146         tbl->remove(txn, k);
147         ret = -queue_values.size();
148       }
149       if (likely(db->commit_txn(txn))) {
150         if (likely(found)) ctr++;
151         return txn_result(true, ret);
152       }
153     } catch (abstract_db::abstract_abort_exception &ex) {
154       db->abort_txn(txn);
155     }
156     return txn_result(false, 0);
157   }
158
159   static txn_result
160   TxnConsumeNoScan(bench_worker *w)
161   {
162     return static_cast<queue_worker *>(w)->txn_consume_noscan();
163   }
164
165   virtual workload_desc_vec
166   get_workload() const
167   {
168     workload_desc_vec w;
169     if (consumer)
170       w.push_back(workload_desc("Consume", 1.0, TxnConsume));
171       //w.push_back(workload_desc("ConsumeScanHint", 1.0, TxnConsumeScanHint));
172       //w.push_back(workload_desc("ConsumeNoScan", 1.0, TxnConsumeNoScan));
173     else
174       w.push_back(workload_desc("Produce", 1.0, TxnProduce));
175     return w;
176   }
177
178 private:
179   abstract_ordered_index *tbl;
180   uint64_t id;
181   bool consumer;
182   uint64_t ctr;
183 };
184
185 class queue_table_loader : public bench_loader {
186 public:
187   queue_table_loader(unsigned long seed,
188                      abstract_db *db,
189                      const map<string, abstract_ordered_index *> &open_tables)
190     : bench_loader(seed, db, open_tables)
191   {}
192
193 protected:
194   virtual void
195   load()
196   {
197     abstract_ordered_index *tbl = open_tables.at("table");
198     try {
199       // load
200       const size_t batchsize = (db->txn_max_batch_size() == -1) ?
201         10000 : db->txn_max_batch_size();
202       ALWAYS_ASSERT(batchsize > 0);
203       const size_t nbatches = nkeys / batchsize;
204       for (size_t id = 0; id < nthreads / 2; id++) {
205         if (nbatches == 0) {
206           void *txn = db->new_txn(txn_flags, arena, txn_buf());
207           for (size_t j = 0; j < nkeys; j++) {
208             const string k = queue_key(id, j);
209             const string &v = queue_values;
210             tbl->insert(txn, k, v);
211           }
212           if (verbose)
213             cerr << "batch 1/1 done" << endl;
214           ALWAYS_ASSERT(db->commit_txn(txn));
215         } else {
216           for (size_t i = 0; i < nbatches; i++) {
217             size_t keyend = (i == nbatches - 1) ? nkeys : (i + 1) * batchsize;
218             void *txn = db->new_txn(txn_flags, arena, txn_buf());
219             for (size_t j = i * batchsize; j < keyend; j++) {
220               const string k = queue_key(id, j);
221               const string &v = queue_values;
222               tbl->insert(txn, k, v);
223             }
224             if (verbose)
225               cerr << "batch " << (i + 1) << "/" << nbatches << " done" << endl;
226             ALWAYS_ASSERT(db->commit_txn(txn));
227           }
228         }
229       }
230     } catch (abstract_db::abstract_abort_exception &ex) {
231       // shouldn't abort on loading!
232       ALWAYS_ASSERT(false);
233     }
234     if (verbose)
235       cerr << "[INFO] finished loading table" << endl;
236   }
237 };
238
239 class queue_bench_runner : public bench_runner {
240 public:
241   queue_bench_runner(abstract_db *db, bool write_only)
242     : bench_runner(db), write_only(write_only)
243   {
244     open_tables["table"] = db->open_index("table", queue_values.size());
245   }
246
247 protected:
248   virtual vector<bench_loader *>
249   make_loaders()
250   {
251     vector<bench_loader *> ret;
252     ret.push_back(new queue_table_loader(0, db, open_tables));
253     return ret;
254   }
255
256   virtual vector<bench_worker *>
257   make_workers()
258   {
259     fast_random r(8544290);
260     vector<bench_worker *> ret;
261     if (write_only) {
262       for (size_t i = 0; i < nthreads; i++)
263         ret.push_back(
264           new queue_worker(
265             i, r.next(), db, open_tables,
266             &barrier_a, &barrier_b, i, false));
267     } else {
268       ALWAYS_ASSERT(nthreads >= 2);
269       if (verbose && (nthreads % 2))
270         cerr << "queue_bench_runner: odd number of workers given" << endl;
271       for (size_t i = 0; i < nthreads / 2; i++) {
272         ret.push_back(
273           new queue_worker(
274             i, r.next(), db, open_tables,
275             &barrier_a, &barrier_b, i, true));
276         ret.push_back(
277           new queue_worker(
278             i + 1, r.next(), db, open_tables,
279             &barrier_a, &barrier_b, i, false));
280       }
281     }
282     return ret;
283   }
284
285 private:
286   bool write_only;
287 };
288
289 void
290 queue_do_test(abstract_db *db, int argc, char **argv)
291 {
292   nkeys = size_t(scale_factor * 1000.0);
293   ALWAYS_ASSERT(nkeys > 0);
294   queue_bench_runner r(db, true);
295   r.run();
296 }