benchmark silo added
[c11concurrency-benchmarks.git] / silo / new-benchmarks / bench.cc
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <vector>
5 #include <utility>
6 #include <string>
7
8 #include <stdlib.h>
9 #include <sched.h>
10 #include <unistd.h>
11 #include <sys/sysinfo.h>
12
13 #include "bench.h"
14
15 #include "../counter.h"
16 #include "../scopedperf.hh"
17 #include "../allocator.h"
18
19 #ifdef USE_JEMALLOC
20 //cannot include this header b/c conflicts with malloc.h
21 //#include <jemalloc/jemalloc.h>
22 extern "C" void malloc_stats_print(void (*write_cb)(void *, const char *), void *cbopaque, const char *opts);
23 extern "C" int mallctl(const char *name, void *oldp, size_t *oldlenp, void *newp, size_t newlen);
24 #endif
25 #ifdef USE_TCMALLOC
26 #include <google/heap-profiler.h>
27 #endif
28
29 using namespace std;
30 using namespace util;
31
32 size_t nthreads = 1;
33 volatile bool running = true;
34 int verbose = 0;
35 uint64_t txn_flags = 0;
36 double scale_factor = 1.0;
37 uint64_t runtime = 30;
38 uint64_t ops_per_worker = 0;
39 int run_mode = RUNMODE_TIME;
40 int enable_parallel_loading = false;
41 int pin_cpus = 0;
42 int slow_exit = 0;
43 int retry_aborted_transaction = 0;
44 int no_reset_counters = 0;
45 int backoff_aborted_transaction = 0;
46
47 template <typename T>
48 static vector<T>
49 elemwise_sum(const vector<T> &a, const vector<T> &b)
50 {
51   INVARIANT(a.size() == b.size());
52   vector<T> ret(a.size());
53   for (size_t i = 0; i < a.size(); i++)
54     ret[i] = a[i] + b[i];
55   return ret;
56 }
57
58 template <typename K, typename V>
59 static void
60 map_agg(map<K, V> &agg, const map<K, V> &m)
61 {
62   for (typename map<K, V>::const_iterator it = m.begin();
63        it != m.end(); ++it)
64     agg[it->first] += it->second;
65 }
66
67 // returns <free_bytes, total_bytes>
68 static pair<uint64_t, uint64_t>
69 get_system_memory_info()
70 {
71   struct sysinfo inf;
72   sysinfo(&inf);
73   return make_pair(inf.mem_unit * inf.freeram, inf.mem_unit * inf.totalram);
74 }
75
76 static bool
77 clear_file(const char *name)
78 {
79   ofstream ofs(name);
80   ofs.close();
81   return true;
82 }
83
84 static void
85 write_cb(void *p, const char *s) UNUSED;
86 static void
87 write_cb(void *p, const char *s)
88 {
89   const char *f = "jemalloc.stats";
90   static bool s_clear_file UNUSED = clear_file(f);
91   ofstream ofs(f, ofstream::app);
92   ofs << s;
93   ofs.flush();
94   ofs.close();
95 }
96
97 static event_avg_counter evt_avg_abort_spins("avg_abort_spins");
98
99 void
100 bench_worker::run()
101 {
102   // XXX(stephentu): so many nasty hacks here. should actually
103   // fix some of this stuff one day
104   if (set_core_id)
105     coreid::set_core_id(worker_id); // cringe
106   {
107     scoped_rcu_region r; // register this thread in rcu region
108   }
109   on_run_setup();
110   scoped_db_thread_ctx ctx(db, false);
111   const workload_desc_vec workload = get_workload();
112   txn_counts.resize(workload.size());
113   barrier_a->count_down();
114   barrier_b->wait_for();
115   while (running && (run_mode != RUNMODE_OPS || ntxn_commits < ops_per_worker)) {
116     double d = r.next_uniform();
117     for (size_t i = 0; i < workload.size(); i++) {
118       if ((i + 1) == workload.size() || d < workload[i].frequency) {
119       retry:
120         timer t;
121         const unsigned long old_seed = r.get_seed();
122         const auto ret = workload[i].fn(this);
123         if (likely(ret.first)) {
124           ++ntxn_commits;
125           latency_numer_us += t.lap();
126           backoff_shifts >>= 1;
127         } else {
128           ++ntxn_aborts;
129           if (retry_aborted_transaction && running) {
130             if (backoff_aborted_transaction) {
131               if (backoff_shifts < 63)
132                 backoff_shifts++;
133               uint64_t spins = 1UL << backoff_shifts;
134               spins *= 100000; // XXX: tuned pretty arbitrarily
135               evt_avg_abort_spins.offer(spins);
136               while (spins) {
137                 nop_pause();
138                 spins--;
139               }
140             }
141             r.set_seed(old_seed);
142             goto retry;
143           }
144         }
145         size_delta += ret.second; // should be zero on abort
146         txn_counts[i]++; // txn_counts aren't used to compute throughput (is
147                          // just an informative number to print to the console
148                          // in verbose mode)
149         break;
150       }
151       d -= workload[i].frequency;
152     }
153   }
154 }
155
156 void
157 bench_runner::run()
158 {
159   // load data
160   const vector<unique_ptr<bench_loader>> loaders = make_loaders();
161   {
162     spin_barrier b(loaders.size());
163     const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
164     {
165       scoped_timer t("dataloading", verbose);
166       for (auto &p : loaders) {
167         p->set_barrier(b);
168         p->start();
169       }
170       for (auto &p : loaders)
171         p->join();
172     }
173     const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
174     const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
175     const double delta_mb = double(delta)/1048576.0;
176     if (verbose)
177       cerr << "DB size: " << delta_mb << " MB" << endl;
178   }
179
180   db->do_txn_epoch_sync(); // also waits for worker threads to be persisted
181   {
182     const auto persisted_info = db->get_ntxn_persisted();
183     if (get<0>(persisted_info) != get<1>(persisted_info))
184       cerr << "ERROR: " << persisted_info << endl;
185     //ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
186     if (verbose)
187       cerr << persisted_info << " txns persisted in loading phase" << endl;
188   }
189   db->reset_ntxn_persisted();
190
191   if (!no_reset_counters) {
192     event_counter::reset_all_counters(); // XXX: for now - we really should have a before/after loading
193     PERF_EXPR(scopedperf::perfsum_base::resetall());
194   }
195   {
196     const auto persisted_info = db->get_ntxn_persisted();
197     if (get<0>(persisted_info) != 0 ||
198         get<1>(persisted_info) != 0 ||
199         get<2>(persisted_info) != 0.0) {
200       cerr << persisted_info << endl;
201       ALWAYS_ASSERT(false);
202     }
203   }
204
205   map<string, size_t> table_sizes_before;
206   if (verbose) {
207     for (auto &p : open_tables) {
208       scoped_rcu_region guard;
209       const size_t s = p.second->size();
210       cerr << "table " << p.first << " size " << s << endl;
211       table_sizes_before[p.first] = s;
212     }
213     cerr << "starting benchmark..." << endl;
214   }
215
216   const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
217
218   const vector<unique_ptr<bench_worker>> workers = make_workers();
219   ALWAYS_ASSERT(!workers.empty());
220   for (auto &p : workers)
221     p->start();
222
223   barrier_a.wait_for(); // wait for all threads to start up
224   timer t, t_nosync;
225   barrier_b.count_down(); // bombs away!
226   if (run_mode == RUNMODE_TIME) {
227     sleep(runtime);
228     running = false;
229   }
230   __sync_synchronize();
231   for (size_t i = 0; i < nthreads; i++)
232     workers[i]->join();
233   const unsigned long elapsed_nosync = t_nosync.lap();
234   db->do_txn_finish(); // waits for all worker txns to persist
235   size_t n_commits = 0;
236   size_t n_aborts = 0;
237   uint64_t latency_numer_us = 0;
238   for (size_t i = 0; i < nthreads; i++) {
239     n_commits += workers[i]->get_ntxn_commits();
240     n_aborts += workers[i]->get_ntxn_aborts();
241     latency_numer_us += workers[i]->get_latency_numer_us();
242   }
243   const auto persisted_info = db->get_ntxn_persisted();
244
245   const unsigned long elapsed = t.lap(); // lap() must come after do_txn_finish(),
246                                          // because do_txn_finish() potentially
247                                          // waits a bit
248
249   // various sanity checks
250   ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
251   // not == b/c persisted_info does not count read-only txns
252   ALWAYS_ASSERT(n_commits >= get<1>(persisted_info));
253
254   const double elapsed_nosync_sec = double(elapsed_nosync) / 1000000.0;
255   const double agg_nosync_throughput = double(n_commits) / elapsed_nosync_sec;
256   const double avg_nosync_per_core_throughput = agg_nosync_throughput / double(workers.size());
257
258   const double elapsed_sec = double(elapsed) / 1000000.0;
259   const double agg_throughput = double(n_commits) / elapsed_sec;
260   const double avg_per_core_throughput = agg_throughput / double(workers.size());
261
262   const double agg_abort_rate = double(n_aborts) / elapsed_sec;
263   const double avg_per_core_abort_rate = agg_abort_rate / double(workers.size());
264
265   // we can use n_commits here, because we explicitly wait for all txns
266   // run to be durable
267   const double agg_persist_throughput = double(n_commits) / elapsed_sec;
268   const double avg_per_core_persist_throughput =
269     agg_persist_throughput / double(workers.size());
270
271   // XXX(stephentu): latency currently doesn't account for read-only txns
272   const double avg_latency_us =
273     double(latency_numer_us) / double(n_commits);
274   const double avg_latency_ms = avg_latency_us / 1000.0;
275   const double avg_persist_latency_ms =
276     get<2>(persisted_info) / 1000.0;
277
278   if (verbose) {
279     const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
280     const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
281     const double delta_mb = double(delta)/1048576.0;
282     map<string, size_t> agg_txn_counts = workers[0]->get_txn_counts();
283     ssize_t size_delta = workers[0]->get_size_delta();
284     for (size_t i = 1; i < workers.size(); i++) {
285       map_agg(agg_txn_counts, workers[i]->get_txn_counts());
286       size_delta += workers[i]->get_size_delta();
287     }
288     const double size_delta_mb = double(size_delta)/1048576.0;
289     map<string, counter_data> ctrs = event_counter::get_all_counters();
290
291     cerr << "--- table statistics ---" << endl;
292     for (auto &p : open_tables) {
293       scoped_rcu_region guard;
294       const size_t s = p.second->size();
295       const ssize_t delta = ssize_t(s) - ssize_t(table_sizes_before[p.first]);
296       cerr << "table " << p.first << " size " << p.second->size();
297       if (delta < 0)
298         cerr << " (" << delta << " records)" << endl;
299       else
300         cerr << " (+" << delta << " records)" << endl;
301     }
302 #ifdef ENABLE_BENCH_TXN_COUNTERS
303     cerr << "--- txn counter statistics ---" << endl;
304     {
305       // take from thread 0 for now
306       abstract_db::txn_counter_map agg = workers[0]->get_local_txn_counters();
307       for (auto &p : agg) {
308         cerr << p.first << ":" << endl;
309         for (auto &q : p.second)
310           cerr << "  " << q.first << " : " << q.second << endl;
311       }
312     }
313 #endif
314     cerr << "--- benchmark statistics ---" << endl;
315     cerr << "runtime: " << elapsed_sec << " sec" << endl;
316     cerr << "memory delta: " << delta_mb  << " MB" << endl;
317     cerr << "memory delta rate: " << (delta_mb / elapsed_sec)  << " MB/sec" << endl;
318     cerr << "logical memory delta: " << size_delta_mb << " MB" << endl;
319     cerr << "logical memory delta rate: " << (size_delta_mb / elapsed_sec) << " MB/sec" << endl;
320     cerr << "agg_nosync_throughput: " << agg_nosync_throughput << " ops/sec" << endl;
321     cerr << "avg_nosync_per_core_throughput: " << avg_nosync_per_core_throughput << " ops/sec/core" << endl;
322     cerr << "agg_throughput: " << agg_throughput << " ops/sec" << endl;
323     cerr << "avg_per_core_throughput: " << avg_per_core_throughput << " ops/sec/core" << endl;
324     cerr << "agg_persist_throughput: " << agg_persist_throughput << " ops/sec" << endl;
325     cerr << "avg_per_core_persist_throughput: " << avg_per_core_persist_throughput << " ops/sec/core" << endl;
326     cerr << "avg_latency: " << avg_latency_ms << " ms" << endl;
327     cerr << "avg_persist_latency: " << avg_persist_latency_ms << " ms" << endl;
328     cerr << "agg_abort_rate: " << agg_abort_rate << " aborts/sec" << endl;
329     cerr << "avg_per_core_abort_rate: " << avg_per_core_abort_rate << " aborts/sec/core" << endl;
330     cerr << "txn breakdown: " << format_list(agg_txn_counts.begin(), agg_txn_counts.end()) << endl;
331     cerr << "--- system counters (for benchmark) ---" << endl;
332     for (map<string, counter_data>::iterator it = ctrs.begin();
333          it != ctrs.end(); ++it)
334       cerr << it->first << ": " << it->second << endl;
335     cerr << "--- perf counters (if enabled, for benchmark) ---" << endl;
336     PERF_EXPR(scopedperf::perfsum_base::printall());
337     cerr << "--- allocator stats ---" << endl;
338     ::allocator::DumpStats();
339     cerr << "---------------------------------------" << endl;
340
341 #ifdef USE_JEMALLOC
342     cerr << "dumping heap profile..." << endl;
343     mallctl("prof.dump", NULL, NULL, NULL, 0);
344     cerr << "printing jemalloc stats..." << endl;
345     malloc_stats_print(write_cb, NULL, "");
346 #endif
347 #ifdef USE_TCMALLOC
348     HeapProfilerDump("before-exit");
349 #endif
350   }
351
352   // output for plotting script
353   cout << agg_throughput << " "
354        << agg_persist_throughput << " "
355        << avg_latency_ms << " "
356        << avg_persist_latency_ms << " "
357        << agg_abort_rate << endl;
358   cout.flush();
359
360   if (!slow_exit)
361     exit(0); // exit() instead of returning, so we don't call a bunch of dtors
362
363   map<string, uint64_t> agg_stats;
364   for (auto &p : open_tables)
365     map_agg(agg_stats, p.second->clear());
366   if (verbose)
367     for (auto &p : agg_stats)
368       cerr << p.first << " : " << p.second << endl;
369 }
370
371 template <typename K, typename V>
372 struct map_maxer {
373   typedef map<K, V> map_type;
374   void
375   operator()(map_type &agg, const map_type &m) const
376   {
377     for (typename map_type::const_iterator it = m.begin();
378         it != m.end(); ++it)
379       agg[it->first] = std::max(agg[it->first], it->second);
380   }
381 };
382
383 //template <typename KOuter, typename KInner, typename VInner>
384 //struct map_maxer<KOuter, map<KInner, VInner>> {
385 //  typedef map<KInner, VInner> inner_map_type;
386 //  typedef map<KOuter, inner_map_type> map_type;
387 //};
388
389 //#ifdef ENABLE_BENCH_TXN_COUNTERS
390 //void
391 //bench_worker::measure_txn_counters(void *txn, const char *txn_name)
392 //{
393 //  auto ret = db->get_txn_counters(txn);
394 //  map_maxer<string, uint64_t>()(local_txn_counters[txn_name], ret);
395 //}
396 //#endif
397
398 map<string, size_t>
399 bench_worker::get_txn_counts() const
400 {
401   map<string, size_t> m;
402   const workload_desc_vec workload = get_workload();
403   for (size_t i = 0; i < txn_counts.size(); i++)
404     m[workload[i].name] = txn_counts[i];
405   return m;
406 }