11 #include <sys/sysinfo.h>
15 #include "../counter.h"
16 #include "../scopedperf.hh"
17 #include "../allocator.h"
20 //cannot include this header b/c conflicts with malloc.h
21 //#include <jemalloc/jemalloc.h>
22 extern "C" void malloc_stats_print(void (*write_cb)(void *, const char *), void *cbopaque, const char *opts);
23 extern "C" int mallctl(const char *name, void *oldp, size_t *oldlenp, void *newp, size_t newlen);
26 #include <google/heap-profiler.h>
33 volatile bool running = true;
35 uint64_t txn_flags = 0;
36 double scale_factor = 1.0;
37 uint64_t runtime = 30;
38 uint64_t ops_per_worker = 0;
39 int run_mode = RUNMODE_TIME;
40 int enable_parallel_loading = false;
43 int retry_aborted_transaction = 0;
44 int no_reset_counters = 0;
45 int backoff_aborted_transaction = 0;
49 elemwise_sum(const vector<T> &a, const vector<T> &b)
51 INVARIANT(a.size() == b.size());
52 vector<T> ret(a.size());
53 for (size_t i = 0; i < a.size(); i++)
58 template <typename K, typename V>
60 map_agg(map<K, V> &agg, const map<K, V> &m)
62 for (typename map<K, V>::const_iterator it = m.begin();
64 agg[it->first] += it->second;
67 // returns <free_bytes, total_bytes>
68 static pair<uint64_t, uint64_t>
69 get_system_memory_info()
73 return make_pair(inf.mem_unit * inf.freeram, inf.mem_unit * inf.totalram);
77 clear_file(const char *name)
85 write_cb(void *p, const char *s) UNUSED;
87 write_cb(void *p, const char *s)
89 const char *f = "jemalloc.stats";
90 static bool s_clear_file UNUSED = clear_file(f);
91 ofstream ofs(f, ofstream::app);
97 static event_avg_counter evt_avg_abort_spins("avg_abort_spins");
102 // XXX(stephentu): so many nasty hacks here. should actually
103 // fix some of this stuff one day
105 coreid::set_core_id(worker_id); // cringe
107 scoped_rcu_region r; // register this thread in rcu region
110 scoped_db_thread_ctx ctx(db, false);
111 const workload_desc_vec workload = get_workload();
112 txn_counts.resize(workload.size());
113 barrier_a->count_down();
114 barrier_b->wait_for();
115 while (running && (run_mode != RUNMODE_OPS || ntxn_commits < ops_per_worker)) {
116 double d = r.next_uniform();
117 for (size_t i = 0; i < workload.size(); i++) {
118 if ((i + 1) == workload.size() || d < workload[i].frequency) {
121 const unsigned long old_seed = r.get_seed();
122 const auto ret = workload[i].fn(this);
123 if (likely(ret.first)) {
125 latency_numer_us += t.lap();
126 backoff_shifts >>= 1;
129 if (retry_aborted_transaction && running) {
130 if (backoff_aborted_transaction) {
131 if (backoff_shifts < 63)
133 uint64_t spins = 1UL << backoff_shifts;
134 spins *= 100000; // XXX: tuned pretty arbitrarily
135 evt_avg_abort_spins.offer(spins);
141 r.set_seed(old_seed);
145 size_delta += ret.second; // should be zero on abort
146 txn_counts[i]++; // txn_counts aren't used to compute throughput (is
147 // just an informative number to print to the console
151 d -= workload[i].frequency;
160 const vector<unique_ptr<bench_loader>> loaders = make_loaders();
162 spin_barrier b(loaders.size());
163 const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
165 scoped_timer t("dataloading", verbose);
166 for (auto &p : loaders) {
170 for (auto &p : loaders)
173 const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
174 const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
175 const double delta_mb = double(delta)/1048576.0;
177 cerr << "DB size: " << delta_mb << " MB" << endl;
180 db->do_txn_epoch_sync(); // also waits for worker threads to be persisted
182 const auto persisted_info = db->get_ntxn_persisted();
183 if (get<0>(persisted_info) != get<1>(persisted_info))
184 cerr << "ERROR: " << persisted_info << endl;
185 //ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
187 cerr << persisted_info << " txns persisted in loading phase" << endl;
189 db->reset_ntxn_persisted();
191 if (!no_reset_counters) {
192 event_counter::reset_all_counters(); // XXX: for now - we really should have a before/after loading
193 PERF_EXPR(scopedperf::perfsum_base::resetall());
196 const auto persisted_info = db->get_ntxn_persisted();
197 if (get<0>(persisted_info) != 0 ||
198 get<1>(persisted_info) != 0 ||
199 get<2>(persisted_info) != 0.0) {
200 cerr << persisted_info << endl;
201 ALWAYS_ASSERT(false);
205 map<string, size_t> table_sizes_before;
207 for (auto &p : open_tables) {
208 scoped_rcu_region guard;
209 const size_t s = p.second->size();
210 cerr << "table " << p.first << " size " << s << endl;
211 table_sizes_before[p.first] = s;
213 cerr << "starting benchmark..." << endl;
216 const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
218 const vector<unique_ptr<bench_worker>> workers = make_workers();
219 ALWAYS_ASSERT(!workers.empty());
220 for (auto &p : workers)
223 barrier_a.wait_for(); // wait for all threads to start up
225 barrier_b.count_down(); // bombs away!
226 if (run_mode == RUNMODE_TIME) {
230 __sync_synchronize();
231 for (size_t i = 0; i < nthreads; i++)
233 const unsigned long elapsed_nosync = t_nosync.lap();
234 db->do_txn_finish(); // waits for all worker txns to persist
235 size_t n_commits = 0;
237 uint64_t latency_numer_us = 0;
238 for (size_t i = 0; i < nthreads; i++) {
239 n_commits += workers[i]->get_ntxn_commits();
240 n_aborts += workers[i]->get_ntxn_aborts();
241 latency_numer_us += workers[i]->get_latency_numer_us();
243 const auto persisted_info = db->get_ntxn_persisted();
245 const unsigned long elapsed = t.lap(); // lap() must come after do_txn_finish(),
246 // because do_txn_finish() potentially
249 // various sanity checks
250 ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
251 // not == b/c persisted_info does not count read-only txns
252 ALWAYS_ASSERT(n_commits >= get<1>(persisted_info));
254 const double elapsed_nosync_sec = double(elapsed_nosync) / 1000000.0;
255 const double agg_nosync_throughput = double(n_commits) / elapsed_nosync_sec;
256 const double avg_nosync_per_core_throughput = agg_nosync_throughput / double(workers.size());
258 const double elapsed_sec = double(elapsed) / 1000000.0;
259 const double agg_throughput = double(n_commits) / elapsed_sec;
260 const double avg_per_core_throughput = agg_throughput / double(workers.size());
262 const double agg_abort_rate = double(n_aborts) / elapsed_sec;
263 const double avg_per_core_abort_rate = agg_abort_rate / double(workers.size());
265 // we can use n_commits here, because we explicitly wait for all txns
267 const double agg_persist_throughput = double(n_commits) / elapsed_sec;
268 const double avg_per_core_persist_throughput =
269 agg_persist_throughput / double(workers.size());
271 // XXX(stephentu): latency currently doesn't account for read-only txns
272 const double avg_latency_us =
273 double(latency_numer_us) / double(n_commits);
274 const double avg_latency_ms = avg_latency_us / 1000.0;
275 const double avg_persist_latency_ms =
276 get<2>(persisted_info) / 1000.0;
279 const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
280 const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
281 const double delta_mb = double(delta)/1048576.0;
282 map<string, size_t> agg_txn_counts = workers[0]->get_txn_counts();
283 ssize_t size_delta = workers[0]->get_size_delta();
284 for (size_t i = 1; i < workers.size(); i++) {
285 map_agg(agg_txn_counts, workers[i]->get_txn_counts());
286 size_delta += workers[i]->get_size_delta();
288 const double size_delta_mb = double(size_delta)/1048576.0;
289 map<string, counter_data> ctrs = event_counter::get_all_counters();
291 cerr << "--- table statistics ---" << endl;
292 for (auto &p : open_tables) {
293 scoped_rcu_region guard;
294 const size_t s = p.second->size();
295 const ssize_t delta = ssize_t(s) - ssize_t(table_sizes_before[p.first]);
296 cerr << "table " << p.first << " size " << p.second->size();
298 cerr << " (" << delta << " records)" << endl;
300 cerr << " (+" << delta << " records)" << endl;
302 #ifdef ENABLE_BENCH_TXN_COUNTERS
303 cerr << "--- txn counter statistics ---" << endl;
305 // take from thread 0 for now
306 abstract_db::txn_counter_map agg = workers[0]->get_local_txn_counters();
307 for (auto &p : agg) {
308 cerr << p.first << ":" << endl;
309 for (auto &q : p.second)
310 cerr << " " << q.first << " : " << q.second << endl;
314 cerr << "--- benchmark statistics ---" << endl;
315 cerr << "runtime: " << elapsed_sec << " sec" << endl;
316 cerr << "memory delta: " << delta_mb << " MB" << endl;
317 cerr << "memory delta rate: " << (delta_mb / elapsed_sec) << " MB/sec" << endl;
318 cerr << "logical memory delta: " << size_delta_mb << " MB" << endl;
319 cerr << "logical memory delta rate: " << (size_delta_mb / elapsed_sec) << " MB/sec" << endl;
320 cerr << "agg_nosync_throughput: " << agg_nosync_throughput << " ops/sec" << endl;
321 cerr << "avg_nosync_per_core_throughput: " << avg_nosync_per_core_throughput << " ops/sec/core" << endl;
322 cerr << "agg_throughput: " << agg_throughput << " ops/sec" << endl;
323 cerr << "avg_per_core_throughput: " << avg_per_core_throughput << " ops/sec/core" << endl;
324 cerr << "agg_persist_throughput: " << agg_persist_throughput << " ops/sec" << endl;
325 cerr << "avg_per_core_persist_throughput: " << avg_per_core_persist_throughput << " ops/sec/core" << endl;
326 cerr << "avg_latency: " << avg_latency_ms << " ms" << endl;
327 cerr << "avg_persist_latency: " << avg_persist_latency_ms << " ms" << endl;
328 cerr << "agg_abort_rate: " << agg_abort_rate << " aborts/sec" << endl;
329 cerr << "avg_per_core_abort_rate: " << avg_per_core_abort_rate << " aborts/sec/core" << endl;
330 cerr << "txn breakdown: " << format_list(agg_txn_counts.begin(), agg_txn_counts.end()) << endl;
331 cerr << "--- system counters (for benchmark) ---" << endl;
332 for (map<string, counter_data>::iterator it = ctrs.begin();
333 it != ctrs.end(); ++it)
334 cerr << it->first << ": " << it->second << endl;
335 cerr << "--- perf counters (if enabled, for benchmark) ---" << endl;
336 PERF_EXPR(scopedperf::perfsum_base::printall());
337 cerr << "--- allocator stats ---" << endl;
338 ::allocator::DumpStats();
339 cerr << "---------------------------------------" << endl;
342 cerr << "dumping heap profile..." << endl;
343 mallctl("prof.dump", NULL, NULL, NULL, 0);
344 cerr << "printing jemalloc stats..." << endl;
345 malloc_stats_print(write_cb, NULL, "");
348 HeapProfilerDump("before-exit");
352 // output for plotting script
353 cout << agg_throughput << " "
354 << agg_persist_throughput << " "
355 << avg_latency_ms << " "
356 << avg_persist_latency_ms << " "
357 << agg_abort_rate << endl;
361 exit(0); // exit() instead of returning, so we don't call a bunch of dtors
363 map<string, uint64_t> agg_stats;
364 for (auto &p : open_tables)
365 map_agg(agg_stats, p.second->clear());
367 for (auto &p : agg_stats)
368 cerr << p.first << " : " << p.second << endl;
371 template <typename K, typename V>
373 typedef map<K, V> map_type;
375 operator()(map_type &agg, const map_type &m) const
377 for (typename map_type::const_iterator it = m.begin();
379 agg[it->first] = std::max(agg[it->first], it->second);
383 //template <typename KOuter, typename KInner, typename VInner>
384 //struct map_maxer<KOuter, map<KInner, VInner>> {
385 // typedef map<KInner, VInner> inner_map_type;
386 // typedef map<KOuter, inner_map_type> map_type;
389 //#ifdef ENABLE_BENCH_TXN_COUNTERS
391 //bench_worker::measure_txn_counters(void *txn, const char *txn_name)
393 // auto ret = db->get_txn_counters(txn);
394 // map_maxer<string, uint64_t>()(local_txn_counters[txn_name], ret);
399 bench_worker::get_txn_counts() const
401 map<string, size_t> m;
402 const workload_desc_vec workload = get_workload();
403 for (size_t i = 0; i < txn_counts.size(); i++)
404 m[workload[i].name] = txn_counts[i];