2 * A stand-alone binary which doesn't depend on the system,
3 * used to test the current persistence strategy
18 #include <sys/types.h>
28 #include "record/serializer.h"
35 // copied from txn_proto2_impl.h
37 static const uint64_t NBitsNumber = 24;
39 static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
40 static const size_t NMaxCores = NMAXCORES;
42 static const uint64_t CoreMask = (NMaxCores - 1);
44 static const uint64_t NumIdShift = CoreBits;
45 static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
47 static const uint64_t EpochShift = CoreBits + NBitsNumber;
48 static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
51 uint64_t CoreId(uint64_t v)
57 uint64_t NumId(uint64_t v)
59 return (v & NumIdMask) >> NumIdShift;
63 uint64_t EpochId(uint64_t v)
65 return (v & EpochMask) >> EpochShift;
69 uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
71 // some sanity checking
72 static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
73 static_assert((CoreMask & NumIdMask) == 0, "xx");
74 static_assert((NumIdMask & EpochMask) == 0, "xx");
75 return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
79 vecidmax(uint64_t coremax, const vector<uint64_t> &v)
81 uint64_t ret = NumId(coremax);
82 for (size_t i = 0; i < v.size(); i++)
83 ret = max(ret, NumId(v[i]));
91 b << "[core=" << CoreId(v) << " | n="
92 << NumId(v) << " | epoch="
100 //fillstring(std::string &s, size_t t)
103 // for (size_t i = 0; i < t; i++)
107 template <typename PRNG>
109 fillkey(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
112 serializer<uint64_t, false> ser;
113 ser.write((uint8_t *) s.data(), idx);
116 template <typename PRNG>
118 fillvalue(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
120 uniform_int_distribution<uint32_t> dist(0, 10000);
122 serializer<uint32_t, false> s_uint32_t;
123 for (size_t i = 0; i < sz; i += sizeof(uint32_t)) {
124 if (i + sizeof(uint32_t) <= sz) {
125 const uint32_t x = dist(prng);
126 s_uint32_t.write((uint8_t *) &s[i], x);
131 /** simulate global database state */
133 static const size_t g_nrecords = 1000000;
134 static const size_t g_ntxns_worker = 1000000;
135 static const size_t g_nmax_loggers = 16;
137 static vector<uint64_t> g_database;
138 static atomic<uint64_t> g_ntxns_committed(0);
139 static atomic<uint64_t> g_ntxns_written(0);
140 static atomic<uint64_t> g_bytes_written[g_nmax_loggers];
142 static size_t g_nworkers = 1;
143 static int g_verbose = 0;
144 static int g_fsync_background = 0;
145 static size_t g_readset = 30;
146 static size_t g_writeset = 16;
147 static size_t g_keysize = 8; // in bytes
148 static size_t g_valuesize = 32; // in bytes
150 /** simulation framework */
152 // all simulations are epoch based
153 class database_simulation {
155 static const unsigned long g_epoch_time_ns = 30000000; /* 30ms in ns */
157 database_simulation()
160 epoch_number_(1), // start at 1 so 0 can be fully persistent initially
161 system_sync_epoch_(0)
163 // XXX: depends on g_nworkers to be set by now
164 for (size_t i = 0; i < g_nworkers; i++)
165 per_thread_epochs_[i]->store(1, memory_order_release);
166 for (size_t i = 0; i < g_nmax_loggers; i++)
167 for (size_t j = 0; j < g_nworkers; j++)
168 per_thread_sync_epochs_[i].epochs_[j].store(0, memory_order_release);
171 virtual ~database_simulation() {}
176 epoch_thread_ = move(thread(&database_simulation::epoch_thread, this));
179 virtual void worker(unsigned id) = 0;
181 virtual void logger(const vector<int> &fd,
182 const vector<vector<unsigned>> &assignments) = 0;
187 keep_going_->store(false, memory_order_release);
188 epoch_thread_.join();
192 AssignmentsValid(const vector<vector<unsigned>> &assignments,
196 // each worker must be assigned exactly once in the assignment
197 // there must be <= nfds assignments
199 if (assignments.size() > nfds)
203 for (auto &assignment : assignments)
204 for (auto w : assignment) {
205 if (seen.count(w) || w >= nworkers)
210 return seen.size() == nworkers;
217 while (keep_going_->load(memory_order_acquire)) {
219 t.tv_sec = g_epoch_time_ns / ONE_SECOND_NS;
220 t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
221 nanosleep(&t, nullptr);
223 // make sure all threads are at the current epoch
224 const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
227 bool allthere = true;
229 i < g_nworkers && keep_going_->load(memory_order_acquire);
231 if (per_thread_epochs_[i]->load(memory_order_acquire) < curepoch) {
236 if (!keep_going_->load(memory_order_acquire))
243 //cerr << "bumping epoch" << endl;
244 epoch_number_->store(curepoch + 1, memory_order_release); // bump it
248 aligned_padded_elem<atomic<bool>> keep_going_;
250 thread epoch_thread_;
252 aligned_padded_elem<atomic<uint64_t>> epoch_number_;
254 aligned_padded_elem<atomic<uint64_t>> per_thread_epochs_[NMAXCORES];
256 // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
257 // through (including) all transactions <= epoch v on core j. since core =>
258 // logger mapping is static, taking:
259 // min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
260 // yields the entire system's persistent epoch
262 atomic<uint64_t> epochs_[NMAXCORES];
264 } per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
266 // conservative estimate (<=) for:
267 // min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
268 aligned_padded_elem<atomic<uint64_t>> system_sync_epoch_;
271 struct logbuf_header {
272 uint64_t nentries_; // > 0 for all valid log buffers
273 uint64_t last_tid_; // TID of the last commit
277 bool io_scheduled_; // has the logger scheduled IO yet?
278 size_t curoff_; // current offset into buf_, either for writing
279 // or during the dep computation phase
280 size_t remaining_; // number of deps remaining to compute
281 std::string buf_; // the actual buffer, of size g_buffer_size
286 return (uint8_t *) buf_.data() + curoff_;
289 inline logbuf_header *
292 return (logbuf_header *) buf_.data();
295 inline const logbuf_header *
298 return (const logbuf_header *) buf_.data();
302 class onecopy_logbased_simulation : public database_simulation {
304 static const size_t g_perthread_buffers = 64; // 64 outstanding buffers
305 static const size_t g_buffer_size = (1<<20); // in bytes
306 static const size_t g_horizon_size = (1<<16); // in bytes, for compression only
308 static circbuf<pbuffer, g_perthread_buffers> g_all_buffers[NMAXCORES];
309 static circbuf<pbuffer, g_perthread_buffers> g_persist_buffers[NMAXCORES];
313 virtual const uint8_t *
314 read_log_entry(const uint8_t *p, uint64_t &tid,
315 std::function<void(uint64_t)> readfunctor) = 0;
318 compute_log_record_space() const = 0;
321 write_log_record(uint8_t *p,
323 const vector<uint64_t> &readset,
324 const vector<pair<string, string>> &writeset) = 0;
327 logger_on_io_completion() {}
330 do_compression() const = 0;
333 getbuffer(unsigned id)
335 // block until we get a buf
336 pbuffer *ret = g_all_buffers[id].deq();
337 ret->io_scheduled_ = false;
338 ret->buf_.assign(g_buffer_size, 0);
339 ret->curoff_ = sizeof(logbuf_header);
348 database_simulation::init();
349 for (size_t i = 0; i < g_nworkers; i++) {
350 for (size_t j = 0; j < g_perthread_buffers; j++) {
351 struct pbuffer *p = new pbuffer;
352 g_all_buffers[i].enq(p);
359 inplace_update_persistent_info(
360 vector<pair<uint64_t, uint64_t>> &outstanding_commits,
361 uint64_t cursyncepoch)
363 size_t ncommits_synced = 0;
364 // can erase all entries with x.first <= cursyncepoch
366 for (; idx < outstanding_commits.size(); idx++) {
367 if (outstanding_commits[idx].first <= cursyncepoch)
368 ncommits_synced += outstanding_commits[idx].second;
373 // erase entries [0, idx)
375 outstanding_commits.erase(outstanding_commits.begin(),
376 outstanding_commits.begin() + idx);
378 return ncommits_synced;
382 ensure_buffer_with_space(unsigned id, pbuffer *cur, size_t space_needed)
386 } else if (g_buffer_size - cur->curoff_ < space_needed) {
387 g_persist_buffers[id].enq(cur);
391 INVARIANT(g_buffer_size - cur->curoff_ >= space_needed);
396 * write the horizon from [p, p+sz) into cur, assuming that cur has enough
397 * space. space needed is at least:
398 * sizeof(uint32_t) + LZ4_compressBound(sz)
400 * also updates the buffer's headers and offset to reflect the write
402 * returns the compressed size of the horizon
405 write_horizon(void *lz4ctx,
406 const uint8_t *p, uint64_t sz,
407 uint64_t nentries, uint64_t lasttid,
410 #ifdef CHECK_INVARIANTS
411 const uint64_t needed = sizeof(uint32_t) + LZ4_compressBound(sz);
412 INVARIANT(g_buffer_size - cur->curoff_ >= needed);
415 const int ret = LZ4_compress_heap(
418 (char *) cur->pointer() + sizeof(uint32_t),
422 serializer<uint32_t, false> s_uint32_t;
423 s_uint32_t.write(cur->pointer(), ret);
424 cur->curoff_ += sizeof(uint32_t) + ret;
425 cur->header()->nentries_ += nentries;
426 cur->header()->last_tid_ = lasttid;
433 worker(unsigned id) OVERRIDE
435 const bool compress = do_compression();
436 uint8_t horizon[g_horizon_size]; // LZ4 looks at 65kb windows
438 // where are we in the window, how many elems in this window?
439 size_t horizon_p = 0, horizon_nentries = 0;
440 uint64_t horizon_last_tid = 0; // last committed TID in the horizon
442 double cratios = 0.0;
443 unsigned long ncompressions = 0;
445 void *lz4ctx = nullptr; // holds a heap-allocated LZ4 hash table
447 lz4ctx = LZ4_create();
451 // read/write sets are uniform for now
452 uniform_int_distribution<unsigned> dist(0, g_nrecords - 1);
454 vector<uint64_t> readset(g_readset);
455 vector<pair<string, string>> writeset(g_writeset);
456 for (auto &pr : writeset) {
457 pr.first.reserve(g_keysize);
458 pr.second.reserve(g_valuesize);
461 struct pbuffer *curbuf = nullptr;
462 uint64_t lasttid = 0,
463 ncommits_currentepoch = 0,
465 vector<pair<uint64_t, uint64_t>> outstanding_commits;
466 for (size_t i = 0; i < g_ntxns_worker; i++) {
469 const uint64_t lastepoch = per_thread_epochs_[id]->load(memory_order_acquire);
470 const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
472 if (lastepoch != curepoch) {
473 // try to sync outstanding commits
474 INVARIANT(curepoch == (lastepoch + 1));
475 const size_t cursyncepoch = system_sync_epoch_->load(memory_order_acquire);
477 inplace_update_persistent_info(outstanding_commits, cursyncepoch);
479 // add information about the last epoch
480 outstanding_commits.emplace_back(lastepoch, ncommits_currentepoch);
481 ncommits_currentepoch = 0;
483 per_thread_epochs_[id]->store(curepoch, memory_order_release);
486 for (size_t j = 0; j < g_readset; j++)
487 readset[j] = g_database[dist(prng)];
489 const uint64_t idmax = tidhelpers::vecidmax(lasttid, readset);
490 // XXX: ignore future epochs for now
491 const uint64_t tidcommit = tidhelpers::MakeTid(id, idmax + 1, curepoch);
494 for (size_t j = 0; j < g_writeset; j++) {
495 auto idx = dist(prng);
496 g_database[idx] = lasttid;
497 fillkey(writeset[j].first, idx, g_keysize, prng);
498 fillvalue(writeset[j].second, idx, g_valuesize, prng);
501 const uint64_t space_needed = compute_log_record_space();
503 if (horizon_p + space_needed > g_horizon_size) {
504 // need to compress and write horizon
505 curbuf = ensure_buffer_with_space(id, curbuf,
506 sizeof(uint32_t) + LZ4_compressBound(horizon_p));
508 const uint64_t compsz =
509 write_horizon(lz4ctx, &horizon[0], horizon_p,
510 horizon_nentries, horizon_last_tid,
513 const double cratio = double(horizon_p) / double(compsz);
518 horizon_p = horizon_nentries = horizon_last_tid = 0;
521 write_log_record(&horizon[0] + horizon_p, tidcommit, readset, writeset);
522 horizon_p += space_needed;
524 horizon_last_tid = tidcommit;
525 ncommits_currentepoch++;
527 curbuf = ensure_buffer_with_space(id, curbuf, space_needed);
528 uint8_t *p = curbuf->pointer();
529 write_log_record(p, tidcommit, readset, writeset);
530 //cerr << "write tidcommit=" << tidhelpers::Str(tidcommit) << endl;
531 curbuf->curoff_ += space_needed;
532 curbuf->header()->nentries_++;
533 curbuf->header()->last_tid_ = tidcommit;
534 ncommits_currentepoch++;
539 if (horizon_nentries) {
540 curbuf = ensure_buffer_with_space(id, curbuf,
541 sizeof(uint32_t) + LZ4_compressBound(horizon_p));
543 const uint64_t compsz =
544 write_horizon(lz4ctx, &horizon[0], horizon_p,
545 horizon_nentries, horizon_last_tid,
548 const double cratio = double(horizon_p) / double(compsz);
552 horizon_p = horizon_nentries = horizon_last_tid = 0;
558 // XXX: hacky - an agreed upon future epoch for all threads to converge
560 const uint64_t FutureEpoch = 100000;
561 const uint64_t waitfor = tidhelpers::EpochId(
562 curbuf->header()->last_tid_);
563 INVARIANT(per_thread_epochs_[id]->load(memory_order_acquire) == waitfor);
564 ALWAYS_ASSERT(waitfor < FutureEpoch);
565 curbuf->header()->last_tid_ =
566 tidhelpers::MakeTid(id, 0, FutureEpoch);
567 g_persist_buffers[id].enq(curbuf);
568 outstanding_commits.emplace_back(waitfor, ncommits_currentepoch);
569 //cerr << "worker " << id << " waitfor epoch " << waitfor << endl;
570 // get these commits persisted
571 while (system_sync_epoch_->load(memory_order_acquire) < waitfor)
574 inplace_update_persistent_info(outstanding_commits, waitfor);
575 ALWAYS_ASSERT(outstanding_commits.empty());
578 if (g_verbose && compress)
579 cerr << "Average compression ratio: " << cratios / double(ncompressions) << endl;
581 g_ntxns_committed.fetch_add(ncommits_synced, memory_order_release);
586 fsyncer(unsigned id, int fd, one_way_post<int> &channel)
598 channel.consume(ret);
603 writer(unsigned id, int fd, const vector<unsigned> &assignment)
605 vector<iovec> iovs(g_nworkers * g_perthread_buffers);
606 vector<pbuffer *> pxs;
607 struct timespec last_io_completed;
608 one_way_post<int> *channel =
609 g_fsync_background ? new one_way_post<int> : nullptr;
610 uint64_t total_nbytes_written = 0,
611 total_txns_written = 0;
613 bool sense = false; // cur is at sense, prev is at !sense
614 uint64_t nbytes_written[2], txns_written[2], epoch_prefixes[2][g_nworkers];
615 memset(&nbytes_written[0], 0, sizeof(nbytes_written));
616 memset(&txns_written[0], 0, sizeof(txns_written));
617 memset(&epoch_prefixes[0], 0, sizeof(epoch_prefixes[0]));
618 memset(&epoch_prefixes[1], 0, sizeof(epoch_prefixes[1]));
620 clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
622 if (g_fsync_background) {
623 fsync_thread = move(thread(
624 &onecopy_logbased_simulation::fsyncer, this, id, fd, ref(*channel)));
625 fsync_thread.detach();
628 while (keep_going_->load(memory_order_acquire)) {
630 // don't allow this loop to proceed less than an epoch's worth of time,
631 // so we can batch IO
632 struct timespec now, diff;
633 clock_gettime(CLOCK_MONOTONIC, &now);
634 timespec_utils::subtract(&now, &last_io_completed, &diff);
635 if (diff.tv_sec == 0 && diff.tv_nsec < long(g_epoch_time_ns)) {
636 // need to sleep it out
639 ts.tv_nsec = g_epoch_time_ns - diff.tv_nsec;
640 nanosleep(&ts, nullptr);
642 clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
645 nbytes_written[sense] = txns_written[sense] = 0;
646 for (auto idx : assignment) {
647 INVARIANT(idx >= 0 && idx < g_nworkers);
648 g_persist_buffers[idx].peekall(pxs);
649 for (auto px : pxs) {
651 INVARIANT(!px->io_scheduled_);
652 iovs[nwritten].iov_base = (void *) px->buf_.data();
653 iovs[nwritten].iov_len = px->curoff_;
654 nbytes_written[sense] += px->curoff_;
655 px->io_scheduled_ = true;
656 px->curoff_ = sizeof(logbuf_header);
657 px->remaining_ = px->header()->nentries_;
658 txns_written[sense] += px->header()->nentries_;
660 INVARIANT(tidhelpers::CoreId(px->header()->last_tid_) == idx);
661 INVARIANT(epoch_prefixes[sense][idx] <=
662 tidhelpers::EpochId(px->header()->last_tid_));
663 INVARIANT(tidhelpers::EpochId(px->header()->last_tid_) > 0);
664 epoch_prefixes[sense][idx] =
665 tidhelpers::EpochId(px->header()->last_tid_) - 1;
670 // XXX: should probably sleep here
672 if (!g_fsync_background || !channel->can_post()) {
673 //cerr << "writer skipping because no work to do" << endl;
678 //cerr << "writer " << id << " nwritten " << nwritten << endl;
681 nwritten ? writev(fd, &iovs[0], nwritten) : 0;
688 if (g_fsync_background) {
689 // wait for fsync from the previous write
691 channel->post(0, true);
693 INVARIANT(channel->can_post());
696 int ret = fdatasync(fd);
704 // update metadata from previous write
705 for (size_t i = 0; i < g_nworkers; i++) {
707 per_thread_sync_epochs_[id].epochs_[i].load(memory_order_acquire);
708 const uint64_t x1 = epoch_prefixes[dosense][i];
710 per_thread_sync_epochs_[id].epochs_[i].store(
711 x1, memory_order_release);
713 total_nbytes_written += nbytes_written[dosense];
714 total_txns_written += txns_written[dosense];
719 // return all buffers that have been io_scheduled_ - we can do this as
720 // soon as write returns
721 for (auto idx : assignment) {
723 while ((px = g_persist_buffers[idx].peek()) &&
725 g_persist_buffers[idx].deq();
726 g_all_buffers[idx].enq(px);
731 g_bytes_written[id].store(total_nbytes_written, memory_order_release);
732 g_ntxns_written.fetch_add(total_txns_written, memory_order_release);
736 advance_system_sync_epoch(const vector<vector<unsigned>> &assignments)
738 uint64_t min_so_far = numeric_limits<uint64_t>::max();
739 for (size_t i = 0; i < assignments.size(); i++)
740 for (auto j : assignments[i])
742 min(per_thread_sync_epochs_[i].epochs_[j].load(memory_order_acquire), min_so_far);
744 #ifdef CHECK_INVARIANTS
745 const uint64_t syssync = system_sync_epoch_->load(memory_order_acquire);
746 INVARIANT(syssync <= min_so_far);
748 system_sync_epoch_->store(min_so_far, memory_order_release);
753 logger(const vector<int> &fds,
754 const vector<vector<unsigned>> &assignments_given) OVERRIDE
756 // compute thread => logger assignment
757 vector<thread> writers;
758 vector<vector<unsigned>> assignments(assignments_given);
760 if (assignments.empty()) {
761 // compute assuming homogenous disks
762 if (g_nworkers <= fds.size()) {
763 // each thread gets its own logging worker
764 for (size_t i = 0; i < g_nworkers; i++)
765 assignments.push_back({(unsigned) i});
767 // XXX: currently we assume each logger is equally as fast- we should
768 // adjust ratios accordingly for non-homogenous loggers
769 const size_t threads_per_logger = g_nworkers / fds.size();
770 for (size_t i = 0; i < fds.size(); i++) {
771 assignments.emplace_back(
773 i * threads_per_logger,
774 ((i + 1) == fds.size()) ?
776 (i + 1) * threads_per_logger));
781 INVARIANT(AssignmentsValid(assignments, fds.size(), g_nworkers));
784 for (size_t i = 0; i < assignments.size(); i++)
785 writers.emplace_back(
786 &onecopy_logbased_simulation::writer,
787 this, i, fds[i], ref(assignments[i]));
789 cerr << "assignments: " << assignments << endl;
790 while (keep_going_->load(memory_order_acquire)) {
791 // periodically compute which epoch is the persistence epoch,
792 // and update system_sync_epoch_
795 t.tv_sec = g_epoch_time_ns / ONE_SECOND_NS;
796 t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
797 nanosleep(&t, nullptr);
799 advance_system_sync_epoch(assignments);
802 for (auto &t : writers)
806 cerr << "current epoch: " << epoch_number_->load(memory_order_acquire) << endl;
807 cerr << "sync epoch : " << system_sync_epoch_->load(memory_order_acquire) << endl;
808 const double xsec = tt.lap_ms() / 1000.0;
809 for (size_t i = 0; i < writers.size(); i++)
810 cerr << "writer " << i << " " <<
811 (double(g_bytes_written[i].load(memory_order_acquire)) /
813 xsec) << " MB/sec" << endl;
818 vector<pbuffer *> pxs_; // just some scratch space
821 circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
822 onecopy_logbased_simulation::g_all_buffers[NMAXCORES];
823 circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
824 onecopy_logbased_simulation::g_persist_buffers[NMAXCORES];
826 class explicit_deptracking_simulation : public onecopy_logbased_simulation {
829 /** global state about our persistence calculations */
831 // contains the latest TID inclusive, per core, which is (transitively)
832 // persistent. note that the prefix of the DB which is totally persistent is
833 // simply the max of this table.
834 static uint64_t g_persistence_vc[NMAXCORES];
838 bool do_compression() const OVERRIDE { return false; }
841 read_log_entry(const uint8_t *p, uint64_t &tid,
842 std::function<void(uint64_t)> readfunctor) OVERRIDE
844 serializer<uint8_t, false> s_uint8_t;
845 serializer<uint64_t, false> s_uint64_t;
847 uint8_t readset_sz, writeset_sz, key_sz, value_sz;
850 p = s_uint64_t.read(p, &tid);
851 p = s_uint8_t.read(p, &readset_sz);
852 INVARIANT(size_t(readset_sz) == g_readset);
853 for (size_t i = 0; i < size_t(readset_sz); i++) {
854 p = s_uint64_t.read(p, &v);
858 p = s_uint8_t.read(p, &writeset_sz);
859 INVARIANT(size_t(writeset_sz) == g_writeset);
860 for (size_t i = 0; i < size_t(writeset_sz); i++) {
861 p = s_uint8_t.read(p, &key_sz);
862 INVARIANT(size_t(key_sz) == g_keysize);
864 p = s_uint8_t.read(p, &value_sz);
865 INVARIANT(size_t(value_sz) == g_valuesize);
866 p += size_t(value_sz);
873 compute_log_record_space() const OVERRIDE
875 // compute how much space we need for this entry
876 uint64_t space_needed = 0;
878 // 8 bytes to indicate TID
879 space_needed += sizeof(uint64_t);
881 // one byte to indicate # of read deps
884 // each dep occupies 8 bytes
885 space_needed += g_readset * sizeof(uint64_t);
887 // one byte to indicate # of records written
890 // each record occupies (1 + key_length + 1 + value_length) bytes
891 space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
897 write_log_record(uint8_t *p,
899 const vector<uint64_t> &readset,
900 const vector<pair<string, string>> &writeset) OVERRIDE
902 serializer<uint8_t, false> s_uint8_t;
903 serializer<uint64_t, false> s_uint64_t;
905 p = s_uint64_t.write(p, tidcommit);
906 p = s_uint8_t.write(p, readset.size());
907 for (auto t : readset)
908 p = s_uint64_t.write(p, t);
909 p = s_uint8_t.write(p, writeset.size());
910 for (auto &pr : writeset) {
911 p = s_uint8_t.write(p, pr.first.size());
912 memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
913 p = s_uint8_t.write(p, pr.second.size());
914 memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
919 logger_on_io_completion() OVERRIDE
921 ALWAYS_ASSERT(false); // currently broken
925 for (size_t i = 0; i < NMAXCORES; i++) {
926 g_persist_buffers[i].peekall(pxs_);
927 for (auto px : pxs_) {
929 if (!px->io_scheduled_)
932 INVARIANT(px->remaining_ > 0);
933 INVARIANT(px->curoff_ < g_buffer_size);
935 const uint8_t *p = px->pointer();
939 //cerr << "processing buffer " << px << " with curoff_=" << px->curoff_ << endl
940 // << " p=" << intptr_t(p) << endl;
942 while (px->remaining_ && allsat) {
944 const uint8_t *nextp =
945 read_log_entry(p, committid, [&allsat](uint64_t readdep) {
948 const uint64_t cid = tidhelpers::CoreId(readdep);
949 if (readdep > g_persistence_vc[cid])
953 //cerr << "committid=" << tidhelpers::Str(committid)
954 // << ", g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i])
956 INVARIANT(tidhelpers::CoreId(committid) == i);
957 INVARIANT(g_persistence_vc[i] < committid);
958 g_persistence_vc[i] = committid;
962 px->curoff_ = intptr_t(p) - intptr_t(px->buf_.data());
965 // done, no further entries will be satisfied
970 INVARIANT(px->remaining_ == 0);
971 // finished entire buffer
972 struct pbuffer *pxcheck = g_persist_buffers[i].deq();
975 g_all_buffers[i].enq(px);
976 //cerr << "buffer flused at g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i]) << endl;
978 INVARIANT(px->remaining_ > 0);
979 break; // cannot process core's list any further
988 uint64_t explicit_deptracking_simulation::g_persistence_vc[NMAXCORES] = {0};
990 class epochbased_simulation : public onecopy_logbased_simulation {
992 epochbased_simulation(bool compress)
993 : compress_(compress)
998 bool do_compression() const OVERRIDE { return compress_; }
1002 read_log_entry(const uint8_t *p, uint64_t &tid,
1003 std::function<void(uint64_t)> readfunctor) OVERRIDE
1005 serializer<uint8_t, false> s_uint8_t;
1006 serializer<uint64_t, false> s_uint64_t;
1008 uint8_t writeset_sz, key_sz, value_sz;
1010 p = s_uint64_t.read(p, &tid);
1011 p = s_uint8_t.read(p, &writeset_sz);
1012 INVARIANT(size_t(writeset_sz) == g_writeset);
1013 for (size_t i = 0; i < size_t(writeset_sz); i++) {
1014 p = s_uint8_t.read(p, &key_sz);
1015 INVARIANT(size_t(key_sz) == g_keysize);
1016 p += size_t(key_sz);
1017 p = s_uint8_t.read(p, &value_sz);
1018 INVARIANT(size_t(value_sz) == g_valuesize);
1019 p += size_t(value_sz);
1026 compute_log_record_space() const OVERRIDE
1028 // compute how much space we need for this entry
1029 uint64_t space_needed = 0;
1031 // 8 bytes to indicate TID
1032 space_needed += sizeof(uint64_t);
1034 // one byte to indicate # of records written
1037 // each record occupies (1 + key_length + 1 + value_length) bytes
1038 space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
1040 return space_needed;
1044 write_log_record(uint8_t *p,
1046 const vector<uint64_t> &readset,
1047 const vector<pair<string, string>> &writeset) OVERRIDE
1049 serializer<uint8_t, false> s_uint8_t;
1050 serializer<uint64_t, false> s_uint64_t;
1052 p = s_uint64_t.write(p, tidcommit);
1053 p = s_uint8_t.write(p, writeset.size());
1054 for (auto &pr : writeset) {
1055 p = s_uint8_t.write(p, pr.first.size());
1056 memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
1057 p = s_uint8_t.write(p, pr.second.size());
1058 memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
1067 main(int argc, char **argv)
1069 string strategy = "epoch";
1070 vector<string> logfiles;
1071 vector<vector<unsigned>> assignments;
1074 static struct option long_options[] =
1076 {"verbose" , no_argument , &g_verbose , 1} ,
1077 {"fsync-back" , no_argument , &g_fsync_background, 1},
1078 {"num-threads" , required_argument , 0 , 't'} ,
1079 {"strategy" , required_argument , 0 , 's'} ,
1080 {"readset" , required_argument , 0 , 'r'} ,
1081 {"writeset" , required_argument , 0 , 'w'} ,
1082 {"keysize" , required_argument , 0 , 'k'} ,
1083 {"valuesize" , required_argument , 0 , 'v'} ,
1084 {"logfile" , required_argument , 0 , 'l'} ,
1085 {"assignment" , required_argument , 0 , 'a'} ,
1088 int option_index = 0;
1089 int c = getopt_long(argc, argv, "t:s:r:w:k:v:l:a:", long_options, &option_index);
1095 if (long_options[option_index].flag != 0)
1101 g_nworkers = strtoul(optarg, nullptr, 10);
1109 g_readset = strtoul(optarg, nullptr, 10);
1113 g_writeset = strtoul(optarg, nullptr, 10);
1117 g_keysize = strtoul(optarg, nullptr, 10);
1121 g_valuesize = strtoul(optarg, nullptr, 10);
1125 logfiles.emplace_back(optarg);
1129 assignments.emplace_back(
1130 ParseCSVString<unsigned, RangeAwareParser<unsigned>>(optarg));
1134 /* getopt_long already printed an error message. */
1141 ALWAYS_ASSERT(g_nworkers >= 1);
1142 ALWAYS_ASSERT(g_readset >= 0);
1143 ALWAYS_ASSERT(g_writeset > 0);
1144 ALWAYS_ASSERT(g_keysize > 0);
1145 ALWAYS_ASSERT(g_valuesize >= 0);
1146 ALWAYS_ASSERT(!logfiles.empty());
1147 ALWAYS_ASSERT(logfiles.size() <= g_nmax_loggers);
1149 assignments.empty() ||
1150 database_simulation::AssignmentsValid(
1151 assignments, logfiles.size(), g_nworkers));
1154 cerr << "{nworkers=" << g_nworkers
1155 << ", readset=" << g_readset
1156 << ", writeset=" << g_writeset
1157 << ", keysize=" << g_keysize
1158 << ", valuesize=" << g_valuesize
1159 << ", logfiles=" << logfiles
1160 << ", strategy=" << strategy
1161 << ", fsync_background=" << g_fsync_background
1162 << ", assignments=" << assignments
1165 if (strategy != "deptracking" &&
1166 strategy != "epoch" &&
1167 strategy != "epoch-compress")
1168 ALWAYS_ASSERT(false);
1170 g_database.resize(g_nrecords); // all start at TID=0
1173 for (auto &fname : logfiles) {
1174 int fd = open(fname.c_str(), O_CREAT|O_WRONLY|O_TRUNC, 0664);
1182 unique_ptr<database_simulation> sim;
1183 if (strategy == "deptracking")
1184 sim.reset(new explicit_deptracking_simulation);
1185 else if (strategy == "epoch")
1186 sim.reset(new epochbased_simulation(false));
1187 else if (strategy == "epoch-compress")
1188 sim.reset(new epochbased_simulation(true));
1190 ALWAYS_ASSERT(false);
1193 thread logger_thread(
1194 &database_simulation::logger, sim.get(), fds, ref(assignments));
1196 vector<thread> workers;
1197 util::timer tt, tt1;
1198 for (size_t i = 0; i < g_nworkers; i++)
1199 workers.emplace_back(&database_simulation::worker, sim.get(), i);
1200 for (auto &p: workers)
1203 logger_thread.join();
1205 const double ntxns_committed = g_ntxns_committed.load();
1206 const double xsec = tt.lap_ms() / 1000.0;
1207 const double rate = double(ntxns_committed) / xsec;
1209 cerr << "txns commited rate: " << rate << " txns/sec" << endl;
1210 cerr << " (" << size_t(ntxns_committed) << " in " << xsec << " sec)" << endl;
1212 const double ntxns_written = g_ntxns_written.load();
1213 const double rate1 = double(ntxns_written) / xsec;
1214 cerr << "txns written rate: " << rate1 << " txns/sec" << endl;
1215 cerr << " (" << size_t(ntxns_written) << " in " << xsec << " sec)" << endl;
1217 cout << rate << endl;