1 #ifndef _NDB_TXN_PROTO2_IMPL_H_
2 #define _NDB_TXN_PROTO2_IMPL_H_
13 #include "txn_btree.h"
16 #include "spinbarrier.h"
17 #include "record/serializer.h"
20 template <typename Traits> class transaction_proto2;
21 template <template <typename> class Transaction>
24 // the system has a single logging subsystem (composed of multiple lgogers)
25 // NOTE: currently, the persistence epoch is tied 1:1 with the ticker's epoch
27 friend class transaction_proto2_static;
29 friend class transaction_proto2;
30 // XXX: should only allow txn_epoch_sync<transaction_proto2> as friend
31 template <template <typename> class T>
32 friend class txn_epoch_sync;
35 static const size_t g_nmax_loggers = 16;
36 static const size_t g_perthread_buffers = 256; // 256 outstanding buffers
37 static const size_t g_buffer_size = (1<<20); // in bytes
38 static const size_t g_horizon_buffer_size = 2 * (1<<16); // in bytes
39 static const size_t g_max_lag_epochs = 128; // cannot lag more than 128 epochs
40 static const bool g_pin_loggers_to_numa_nodes = false;
43 IsPersistenceEnabled()
49 IsCompressionEnabled()
51 return g_use_compression;
54 // init the logging subsystem.
56 // should only be called ONCE is not thread-safe. if assignments_used is not
57 // null, then fills it with a copy of the assignment actually computed
60 const std::vector<std::string> &logfiles,
61 const std::vector<std::vector<unsigned>> &assignments_given,
62 std::vector<std::vector<unsigned>> *assignments_used = nullptr,
63 bool call_fsync = true,
64 bool use_compression = false,
65 bool fake_writes = false);
67 struct logbuf_header {
68 uint64_t nentries_; // > 0 for all valid log buffers
69 uint64_t last_tid_; // TID of the last commit
73 uint64_t earliest_start_us_; // start time of the earliest txn
74 bool io_scheduled_; // has the logger scheduled IO yet?
76 unsigned curoff_; // current offset into buf_ for writing
78 const unsigned core_id_; // which core does this pbuffer belong to?
80 const unsigned buf_sz_;
83 uint8_t buf_start_[0];
85 // to allocate a pbuffer, use placement new:
86 // const size_t bufsz = ...;
87 // char *p = malloc(sizeof(pbuffer) + bufsz);
88 // pbuffer *pb = new (p) pbuffer(core_id, bufsz);
90 // NOTE: it is not necessary to call the destructor for pbuffer, since
91 // it only contains PODs
92 pbuffer(unsigned core_id, unsigned buf_sz)
93 : core_id_(core_id), buf_sz_(buf_sz)
95 INVARIANT(((char *)this) + sizeof(*this) == (char *) &buf_start_[0]);
96 INVARIANT(buf_sz > sizeof(logbuf_header));
100 pbuffer(const pbuffer &) = delete;
101 pbuffer &operator=(const pbuffer &) = delete;
102 pbuffer(pbuffer &&) = delete;
107 earliest_start_us_ = 0;
108 io_scheduled_ = false;
109 curoff_ = sizeof(logbuf_header);
110 NDB_MEMSET(&buf_start_[0], 0, buf_sz_);
116 INVARIANT(curoff_ >= sizeof(logbuf_header));
117 INVARIANT(curoff_ <= buf_sz_);
118 return &buf_start_[0] + curoff_;
124 return &buf_start_[0] + sizeof(logbuf_header);
130 INVARIANT(curoff_ >= sizeof(logbuf_header));
131 INVARIANT(curoff_ <= buf_sz_);
132 return curoff_ - sizeof(logbuf_header);
135 inline logbuf_header *
138 return reinterpret_cast<logbuf_header *>(&buf_start_[0]);
141 inline const logbuf_header *
144 return reinterpret_cast<const logbuf_header *>(&buf_start_[0]);
148 space_remaining() const
150 INVARIANT(curoff_ >= sizeof(logbuf_header));
151 INVARIANT(curoff_ <= buf_sz_);
152 return buf_sz_ - curoff_;
156 can_hold_tid(uint64_t tid) const;
160 AssignmentsValid(const std::vector<std::vector<unsigned>> &assignments,
164 // each worker must be assigned exactly once in the assignment
165 // there must be <= nfds assignments
167 if (assignments.size() > nfds)
170 std::set<unsigned> seen;
171 for (auto &assignment : assignments)
172 for (auto w : assignment) {
173 if (seen.count(w) || w >= nworkers)
178 return seen.size() == nworkers;
181 typedef circbuf<pbuffer, g_perthread_buffers> pbuffer_circbuf;
183 static std::tuple<uint64_t, uint64_t, double>
184 compute_ntxns_persisted_statistics();
186 // purge counters from each thread about the number of
189 clear_ntxns_persisted_statistics();
191 // wait until the logging system appears to be idle.
193 // note that this isn't a guarantee, just a best effort attempt
195 wait_for_idle_state();
197 // waits until the epoch on invocation time is persisted
199 wait_until_current_point_persisted();
206 // don't use percore<std::atomic<uint64_t>> because we don't want padding
207 std::atomic<uint64_t> epochs_[NMAXCORES];
208 std::atomic<uint64_t> dummy_work_; // so we can do some fake work
215 void *lz4ctx_; // for compression
216 pbuffer *horizon_; // for compression
218 circbuf<pbuffer, g_perthread_buffers> all_buffers_; // logger pushes to core
219 circbuf<pbuffer, g_perthread_buffers> persist_buffers_; // core pushes to logger
221 persist_ctx() : init_(false), lz4ctx_(nullptr), horizon_(nullptr) {}
224 // context per one epoch
225 struct persist_stats {
226 // how many txns this thread has persisted in total
227 std::atomic<uint64_t> ntxns_persisted_;
229 // how many txns have been pushed to the logger (but not necessarily persisted)
230 std::atomic<uint64_t> ntxns_pushed_;
232 // committed (but not necessarily pushed, nor persisted)
233 std::atomic<uint64_t> ntxns_committed_;
235 // sum of all latencies (divid by ntxns_persisted_ to get avg latency in
236 // us) for *persisted* txns (is conservative)
237 std::atomic<uint64_t> latency_numer_;
239 // per last g_max_lag_epochs information
240 struct per_epoch_stats {
241 std::atomic<uint64_t> ntxns_;
242 std::atomic<uint64_t> earliest_start_us_;
244 per_epoch_stats() : ntxns_(0), earliest_start_us_(0) {}
245 } d_[g_max_lag_epochs];
248 ntxns_persisted_(0), ntxns_pushed_(0),
249 ntxns_committed_(0), latency_numer_(0) {}
255 advance_system_sync_epoch(
256 const std::vector<std::vector<unsigned>> &assignments);
258 // makes copy on purpose
261 std::vector<unsigned> assignment);
263 static void persister(
264 std::vector<std::vector<unsigned>> assignments);
267 INITMODE_NONE, // no initialization
268 INITMODE_REG, // just use malloc() to init buffers
269 INITMODE_RCU, // try to use the RCU numa aware allocator
272 static inline persist_ctx &
273 persist_ctx_for(uint64_t core_id, InitMode imode)
275 INVARIANT(core_id < g_persist_ctxs.size());
276 persist_ctx &ctx = g_persist_ctxs[core_id];
277 if (unlikely(!ctx.init_ && imode != INITMODE_NONE)) {
278 size_t needed = g_perthread_buffers * (sizeof(pbuffer) + g_buffer_size);
279 if (IsCompressionEnabled())
280 needed += size_t(LZ4_create_size()) +
281 sizeof(pbuffer) + g_horizon_buffer_size;
283 (imode == INITMODE_REG) ?
284 (char *) malloc(needed) :
285 (char *) rcu::s_instance.alloc_static(needed);
286 if (IsCompressionEnabled()) {
288 mem += LZ4_create_size();
289 ctx.horizon_ = new (mem) pbuffer(core_id, g_horizon_buffer_size);
290 mem += sizeof(pbuffer) + g_horizon_buffer_size;
292 for (size_t i = 0; i < g_perthread_buffers; i++) {
293 ctx.all_buffers_.enq(new (mem) pbuffer(core_id, g_buffer_size));
294 mem += sizeof(pbuffer) + g_buffer_size;
303 static bool g_persist; // whether or not logging is enabled
305 static bool g_call_fsync; // whether or not fsync() needs to be called
306 // in order to be considered durable
308 static bool g_use_compression; // whether or not to compress log buffers
310 static bool g_fake_writes; // whether or not to fake doing writes (to measure
311 // pure overhead of disk)
313 static size_t g_nworkers; // assignments are computed based on g_nworkers
314 // but a logger responsible for core i is really
315 // responsible for cores i + k * g_nworkers, for k
318 // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
319 // through (including) all transactions <= epoch v on core j. since core =>
320 // logger mapping is static, taking:
321 // min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
322 // yields the entire system's persistent epoch
324 per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
326 // conservative estimate (<=) for:
327 // min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
328 static util::aligned_padded_elem<std::atomic<uint64_t>>
329 system_sync_epoch_ CACHE_ALIGNED;
331 static percore<persist_ctx> g_persist_ctxs CACHE_ALIGNED;
333 static percore<persist_stats> g_persist_stats CACHE_ALIGNED;
337 static event_counter g_evt_log_buffer_epoch_boundary;
338 static event_counter g_evt_log_buffer_out_of_space;
339 static event_counter g_evt_log_buffer_bytes_before_compress;
340 static event_counter g_evt_log_buffer_bytes_after_compress;
341 static event_counter g_evt_logger_writev_limit_met;
342 static event_counter g_evt_logger_max_lag_wait;
343 static event_avg_counter g_evt_avg_log_entry_ntxns;
344 static event_avg_counter g_evt_avg_log_buffer_compress_time_us;
345 static event_avg_counter g_evt_avg_logger_bytes_per_writev;
346 static event_avg_counter g_evt_avg_logger_bytes_per_sec;
349 static inline std::ostream &
350 operator<<(std::ostream &o, txn_logger::logbuf_header &hdr)
352 o << "{nentries_=" << hdr.nentries_ << ", last_tid_="
353 << g_proto_version_str(hdr.last_tid_) << "}";
357 class transaction_proto2_static {
361 // each epoch is tied (1:1) to the ticker subsystem's tick. this is the
362 // speed of the persistence layer.
364 // however, read only txns and GC are tied to multiples of the ticker
367 #ifdef CHECK_INVARIANTS
368 static const uint64_t ReadOnlyEpochMultiplier = 10; /* 10 * 1 ms */
370 static const uint64_t ReadOnlyEpochMultiplier = 25; /* 25 * 40 ms */
371 static_assert(ticker::tick_us * ReadOnlyEpochMultiplier == 1000000, "");
374 static_assert(ReadOnlyEpochMultiplier >= 1, "XX");
376 static const uint64_t ReadOnlyEpochUsec =
377 ticker::tick_us * ReadOnlyEpochMultiplier;
379 static inline uint64_t constexpr
380 to_read_only_tick(uint64_t epoch_tick)
382 return epoch_tick / ReadOnlyEpochMultiplier;
385 // in this protocol, the version number is:
386 // (note that for tid_t's, the top bit is reserved and
387 // *must* be set to zero
389 // [ core | number | epoch | reserved ]
390 // [ 0..9 | 9..33 | 33..63 | 63..64 ]
392 static inline ALWAYS_INLINE
393 uint64_t CoreId(uint64_t v)
398 static inline ALWAYS_INLINE
399 uint64_t NumId(uint64_t v)
401 return (v & NumIdMask) >> NumIdShift;
404 static inline ALWAYS_INLINE
405 uint64_t EpochId(uint64_t v)
407 return (v & EpochMask) >> EpochShift;
410 // XXX(stephentu): HACK
414 INVARIANT(!rcu::s_instance.in_rcu_region());
415 const uint64_t e = to_read_only_tick(
416 ticker::s_instance.global_last_tick_exclusive());
418 std::cerr << "wait_an_epoch(): consistent reads happening in e-1, but e=0 so special case"
421 std::cerr << "wait_an_epoch(): consistent reads happening in e-1: "
422 << (e-1) << std::endl;
424 while (to_read_only_tick(ticker::s_instance.global_last_tick_exclusive()) == e)
426 COMPILER_MEMORY_FENCE;
430 ComputeReadOnlyTid(uint64_t global_tick_ex)
432 const uint64_t a = (global_tick_ex / ReadOnlyEpochMultiplier);
433 const uint64_t b = a * ReadOnlyEpochMultiplier;
435 // want to read entries <= b-1, special casing for b=0
437 return MakeTid(0, 0, 0);
439 return MakeTid(CoreMask, NumIdMask >> NumIdShift, b - 1);
442 static const uint64_t NBitsNumber = 24;
444 // XXX(stephentu): need to implement core ID recycling
445 static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
446 static const size_t NMaxCores = NMAXCORES;
448 static const uint64_t CoreMask = (NMaxCores - 1);
450 static const uint64_t NumIdShift = CoreBits;
451 static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
453 static const uint64_t EpochShift = CoreBits + NBitsNumber;
454 // since the reserve bit is always zero, we don't need a special mask
455 static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
457 static inline ALWAYS_INLINE
458 uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
460 // some sanity checking
461 static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
462 static_assert((CoreMask & NumIdMask) == 0, "xx");
463 static_assert((NumIdMask & EpochMask) == 0, "xx");
464 return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
468 set_hack_status(bool hack_status)
470 g_hack->status_ = hack_status;
476 return g_hack->status_;
479 // thread-safe, can be called many times
480 static void InitGC();
482 static void PurgeThreadOutstandingGCTasks();
484 #ifdef PROTO2_CAN_DISABLE_GC
488 return g_flags->g_gc_init.load(std::memory_order_acquire);
492 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
496 g_flags->g_disable_snapshots.store(true, std::memory_order_release);
501 return !g_flags->g_disable_snapshots.load(std::memory_order_acquire);
506 struct delete_entry {
507 #ifdef CHECK_INVARIANTS
508 dbtuple *tuple_ahead_;
509 uint64_t trigger_tid_;
513 marked_ptr<std::string> key_;
514 concurrent_btree *btr_;
518 #ifdef CHECK_INVARIANTS
519 tuple_ahead_(nullptr),
526 delete_entry(dbtuple *tuple_ahead,
527 uint64_t trigger_tid,
529 const marked_ptr<std::string> &key,
530 concurrent_btree *btr)
532 #ifdef CHECK_INVARIANTS
533 tuple_ahead_(tuple_ahead),
534 trigger_tid_(trigger_tid),
547 typedef basic_px_queue<delete_entry, 4096> px_queue;
550 uint64_t last_commit_tid_;
551 unsigned last_reaped_epoch_;
552 #ifdef ENABLE_EVENT_COUNTERS
553 uint64_t last_reaped_timestamp_us_;
557 std::deque<std::string *> pool_;
560 , last_reaped_epoch_(0)
561 #ifdef ENABLE_EVENT_COUNTERS
562 , last_reaped_timestamp_us_(0)
565 ALWAYS_ASSERT(((uintptr_t)this % CACHELINE_SIZE) == 0);
566 queue_.alloc_freelist(rcu::NQueueGroups);
567 scratch_.alloc_freelist(rcu::NQueueGroups);
572 clean_up_to_including(threadctx &ctx, uint64_t ro_tick_geq);
575 static inline txn_logger::pbuffer *
576 wait_for_head(txn_logger::pbuffer_circbuf &pull_buf)
578 // XXX(stephentu): spinning for now
579 txn_logger::pbuffer *px;
580 while (unlikely(!(px = pull_buf.peek()))) {
582 ++g_evt_worker_thread_wait_log_buffer;
584 INVARIANT(!px->io_scheduled_);
588 // pushes horizon to the front entry of pull_buf, pushing
589 // to push_buf if necessary
591 // horizon is reset after push_horizon_to_buffer() returns
593 // returns the number of txns pushed from buffer to *logger*
594 // (if doing so was necessary)
596 push_horizon_to_buffer(txn_logger::pbuffer *horizon,
598 txn_logger::pbuffer_circbuf &pull_buf,
599 txn_logger::pbuffer_circbuf &push_buf)
601 INVARIANT(txn_logger::IsCompressionEnabled());
602 if (unlikely(!horizon->header()->nentries_))
604 INVARIANT(horizon->datasize());
606 size_t ntxns_pushed_to_logger = 0;
608 // horizon out of space- try to push horizon to buffer
609 txn_logger::pbuffer *px = wait_for_head(pull_buf);
610 const uint64_t compressed_space_needed =
611 sizeof(uint32_t) + LZ4_compressBound(horizon->datasize());
613 bool buffer_cond = false;
614 if (px->space_remaining() < compressed_space_needed ||
615 (buffer_cond = !px->can_hold_tid(horizon->header()->last_tid_))) {
616 // buffer out of space- push buffer to logger
617 INVARIANT(px->header()->nentries_);
618 ntxns_pushed_to_logger = px->header()->nentries_;
619 txn_logger::pbuffer *px1 = pull_buf.deq();
620 INVARIANT(px == px1);
622 px = wait_for_head(pull_buf);
624 ++txn_logger::g_evt_log_buffer_epoch_boundary;
626 ++txn_logger::g_evt_log_buffer_out_of_space;
629 INVARIANT(px->space_remaining() >= compressed_space_needed);
630 if (!px->header()->nentries_)
631 px->earliest_start_us_ = horizon->earliest_start_us_;
632 px->header()->nentries_ += horizon->header()->nentries_;
633 px->header()->last_tid_ = horizon->header()->last_tid_;
635 #ifdef ENABLE_EVENT_COUNTERS
638 const int ret = LZ4_compress_heap_limitedOutput(
640 (const char *) horizon->datastart(),
641 (char *) px->pointer() + sizeof(uint32_t),
643 px->space_remaining() - sizeof(uint32_t));
644 #ifdef ENABLE_EVENT_COUNTERS
645 txn_logger::g_evt_avg_log_buffer_compress_time_us.offer(tt.lap());
646 txn_logger::g_evt_log_buffer_bytes_before_compress.inc(horizon->datasize());
647 txn_logger::g_evt_log_buffer_bytes_after_compress.inc(ret);
650 #if defined(CHECK_INVARIANTS) && defined(PARANOID_CHECKING)
652 uint8_t decode_buf[txn_logger::g_horizon_buffer_size];
653 const int decode_ret =
654 LZ4_decompress_safe_partial(
655 (const char *) px->pointer() + sizeof(uint32_t),
656 (char *) &decode_buf[0],
658 txn_logger::g_horizon_buffer_size,
659 txn_logger::g_horizon_buffer_size);
660 INVARIANT(decode_ret >= 0);
661 INVARIANT(size_t(decode_ret) == horizon->datasize());
662 INVARIANT(memcmp(horizon->datastart(),
663 &decode_buf[0], decode_ret) == 0);
667 serializer<uint32_t, false> s_uint32_t;
668 s_uint32_t.write(px->pointer(), ret);
669 px->curoff_ += sizeof(uint32_t) + uint32_t(ret);
672 return ntxns_pushed_to_logger;
676 std::atomic<bool> status_;
677 std::atomic<uint64_t> global_tid_;
678 constexpr hackstruct() : status_(false), global_tid_(0) {}
681 // use to simulate global TID for comparsion
682 static util::aligned_padded_elem<hackstruct>
683 g_hack CACHE_ALIGNED;
686 std::atomic<bool> g_gc_init;
687 std::atomic<bool> g_disable_snapshots;
688 constexpr flags() : g_gc_init(false), g_disable_snapshots(false) {}
690 static util::aligned_padded_elem<flags> g_flags;
692 static percore_lazy<threadctx> g_threadctxs;
694 static event_counter g_evt_worker_thread_wait_log_buffer;
695 static event_counter g_evt_dbtuple_no_space_for_delkey;
696 static event_counter g_evt_proto_gc_delete_requeue;
697 static event_avg_counter g_evt_avg_log_entry_size;
698 static event_avg_counter g_evt_avg_proto_gc_queue_len;
702 txn_logger::pbuffer::can_hold_tid(uint64_t tid) const
704 return !header()->nentries_ ||
705 (transaction_proto2_static::EpochId(header()->last_tid_) ==
706 transaction_proto2_static::EpochId(tid));
709 // protocol 2 - no global consistent TIDs
710 template <typename Traits>
711 class transaction_proto2 : public transaction<transaction_proto2, Traits>,
712 private transaction_proto2_static {
714 friend class transaction<transaction_proto2, Traits>;
715 typedef transaction<transaction_proto2, Traits> super_type;
719 typedef Traits traits_type;
720 typedef transaction_base::tid_t tid_t;
721 typedef transaction_base::string_type string_type;
722 typedef typename super_type::dbtuple_write_info dbtuple_write_info;
723 typedef typename super_type::dbtuple_write_info_vec dbtuple_write_info_vec;
724 typedef typename super_type::read_set_map read_set_map;
725 typedef typename super_type::absent_set_map absent_set_map;
726 typedef typename super_type::write_set_map write_set_map;
727 typedef typename super_type::write_set_u32_vec write_set_u32_vec;
729 transaction_proto2(uint64_t flags,
730 typename Traits::StringAllocator &sa)
731 : transaction<transaction_proto2, Traits>(flags, sa)
733 if (this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY) {
734 const uint64_t global_tick_ex =
735 this->rcu_guard_->guard()->impl().global_last_tick_exclusive();
736 u_.last_consistent_tid = ComputeReadOnlyTid(global_tick_ex);
738 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
739 dbtuple::TupleLockRegionBegin();
741 INVARIANT(rcu::s_instance.in_rcu_region());
744 ~transaction_proto2()
746 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
747 dbtuple::AssertAllTupleLocksReleased();
749 INVARIANT(rcu::s_instance.in_rcu_region());
753 can_overwrite_record_tid(tid_t prev, tid_t cur) const
755 INVARIANT(prev <= cur);
757 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
758 if (!IsSnapshotsEnabled())
762 // XXX(stephentu): the !prev check is a *bit* of a hack-
763 // we're assuming that !prev (MIN_TID) corresponds to an
764 // absent (removed) record, so it is safe to overwrite it,
766 // This is an OK assumption with *no TID wrap around*.
767 return (to_read_only_tick(EpochId(prev)) ==
768 to_read_only_tick(EpochId(cur))) ||
772 // can only read elements in this epoch or previous epochs
774 can_read_tid(tid_t t) const
780 on_tid_finish(tid_t commit_tid)
782 if (!txn_logger::IsPersistenceEnabled() ||
783 this->state != transaction_base::TXN_COMMITED)
785 // need to write into log buffer
787 serializer<uint32_t, true> vs_uint32_t;
789 // compute how much space is necessary
790 uint64_t space_needed = 0;
792 // 8 bytes to indicate TID
793 space_needed += sizeof(uint64_t);
795 // variable bytes to indicate # of records written
796 #ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
797 const unsigned nwrites = 0;
799 const unsigned nwrites = this->write_set.size();
802 space_needed += vs_uint32_t.nbytes(&nwrites);
804 // each record needs to be recorded
805 write_set_u32_vec value_sizes;
806 for (unsigned idx = 0; idx < nwrites; idx++) {
807 const transaction_base::write_record_t &rec = this->write_set[idx];
808 const uint32_t k_nbytes = rec.get_key().size();
809 space_needed += vs_uint32_t.nbytes(&k_nbytes);
810 space_needed += k_nbytes;
812 const uint32_t v_nbytes = rec.get_value() ?
814 dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED,
815 rec.get_value(), nullptr, 0) : 0;
816 space_needed += vs_uint32_t.nbytes(&v_nbytes);
817 space_needed += v_nbytes;
819 value_sizes.push_back(v_nbytes);
822 g_evt_avg_log_entry_size.offer(space_needed);
823 INVARIANT(space_needed <= txn_logger::g_horizon_buffer_size);
824 INVARIANT(space_needed <= txn_logger::g_buffer_size);
826 const unsigned long my_core_id = coreid::core_id();
828 txn_logger::persist_ctx &ctx =
829 txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_REG);
830 txn_logger::persist_stats &stats =
831 txn_logger::g_persist_stats[my_core_id];
832 txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
833 txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
835 util::non_atomic_fetch_add(stats.ntxns_committed_, 1UL);
837 const bool do_compress = txn_logger::IsCompressionEnabled();
839 // try placing in horizon
840 bool horizon_cond = false;
841 if (ctx.horizon_->space_remaining() < space_needed ||
842 (horizon_cond = !ctx.horizon_->can_hold_tid(commit_tid))) {
843 if (!ctx.horizon_->datasize()) {
844 std::cerr << "space_needed: " << space_needed << std::endl;
845 std::cerr << "space_remaining: " << ctx.horizon_->space_remaining() << std::endl;
846 std::cerr << "can_hold_tid: " << ctx.horizon_->can_hold_tid(commit_tid) << std::endl;
848 INVARIANT(ctx.horizon_->datasize());
849 // horizon out of space, so we push it
850 const uint64_t npushed =
851 push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
853 util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
856 INVARIANT(ctx.horizon_->space_remaining() >= space_needed);
857 const uint64_t written =
858 write_current_txn_into_buffer(ctx.horizon_, commit_tid, value_sizes);
859 if (written != space_needed)
865 txn_logger::pbuffer *px = wait_for_head(pull_buf);
866 INVARIANT(px && px->core_id_ == my_core_id);
868 if (px->space_remaining() < space_needed ||
869 (cond = !px->can_hold_tid(commit_tid))) {
870 INVARIANT(px->header()->nentries_);
871 txn_logger::pbuffer *px0 = pull_buf.deq();
872 INVARIANT(px == px0);
873 INVARIANT(px0->header()->nentries_);
874 util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
877 ++txn_logger::g_evt_log_buffer_epoch_boundary;
879 ++txn_logger::g_evt_log_buffer_out_of_space;
883 const uint64_t written =
884 write_current_txn_into_buffer(px, commit_tid, value_sizes);
885 if (written != space_needed)
892 // assumes enough space in px to hold this txn
894 write_current_txn_into_buffer(
895 txn_logger::pbuffer *px,
897 const write_set_u32_vec &value_sizes)
899 INVARIANT(px->can_hold_tid(commit_tid));
901 if (unlikely(!px->header()->nentries_))
902 px->earliest_start_us_ = this->rcu_guard_->guard()->start_us();
904 uint8_t *p = px->pointer();
907 serializer<uint32_t, true> vs_uint32_t;
908 serializer<uint64_t, false> s_uint64_t;
910 #ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
911 const unsigned nwrites = 0;
913 const unsigned nwrites = this->write_set.size();
917 INVARIANT(nwrites == value_sizes.size());
919 p = s_uint64_t.write(p, commit_tid);
920 p = vs_uint32_t.write(p, nwrites);
922 for (unsigned idx = 0; idx < nwrites; idx++) {
923 const transaction_base::write_record_t &rec = this->write_set[idx];
924 const uint32_t k_nbytes = rec.get_key().size();
925 p = vs_uint32_t.write(p, k_nbytes);
926 NDB_MEMCPY(p, rec.get_key().data(), k_nbytes);
928 const uint32_t v_nbytes = value_sizes[idx];
929 p = vs_uint32_t.write(p, v_nbytes);
931 rec.get_writer()(dbtuple::TUPLE_WRITER_DO_DELTA_WRITE, rec.get_value(), p, v_nbytes);
936 px->curoff_ += (p - porig);
937 px->header()->nentries_++;
938 px->header()->last_tid_ = commit_tid;
940 return uint64_t(p - porig);
945 inline ALWAYS_INLINE bool is_snapshot() const {
946 return this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY;
949 inline transaction_base::tid_t
952 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
953 if (!IsSnapshotsEnabled())
954 // when snapshots are disabled, but we have a RO txn, we simply allow
955 // it to read all the latest values and treat them as consistent
957 // it's not correct, but its for the factor analysis
958 return dbtuple::MAX_TID;
960 return u_.last_consistent_tid;
964 dump_debug_info() const
966 transaction<transaction_proto2, Traits>::dump_debug_info();
967 if (this->is_snapshot())
968 std::cerr << " last_consistent_tid: "
969 << g_proto_version_str(u_.last_consistent_tid) << std::endl;
972 transaction_base::tid_t
973 gen_commit_tid(const dbtuple_write_info_vec &write_tuples)
975 const size_t my_core_id = this->rcu_guard_->guard()->core();
976 threadctx &ctx = g_threadctxs.get(my_core_id);
977 INVARIANT(!this->is_snapshot());
979 COMPILER_MEMORY_FENCE;
980 u_.commit_epoch = ticker::s_instance.global_current_tick();
981 COMPILER_MEMORY_FENCE;
983 tid_t ret = ctx.last_commit_tid_;
984 INVARIANT(ret == dbtuple::MIN_TID || CoreId(ret) == my_core_id);
985 if (u_.commit_epoch != EpochId(ret))
986 ret = MakeTid(0, 0, u_.commit_epoch);
988 // What is this? Is txn_proto1_impl used?
989 if (g_hack->status_.load(std::memory_order_acquire))
990 g_hack->global_tid_.fetch_add(1, std::memory_order_acq_rel);
992 // XXX(stephentu): I believe this is correct, but not 100% sure
993 //const size_t my_core_id = 0;
996 typename read_set_map::const_iterator it = this->read_set.begin();
997 typename read_set_map::const_iterator it_end = this->read_set.end();
998 for (; it != it_end; ++it) {
999 if (it->get_tid() > ret)
1000 ret = it->get_tid();
1005 typename dbtuple_write_info_vec::const_iterator it = write_tuples.begin();
1006 typename dbtuple_write_info_vec::const_iterator it_end = write_tuples.end();
1007 for (; it != it_end; ++it) {
1008 INVARIANT(it->tuple->is_locked());
1009 INVARIANT(it->tuple->is_lock_owner());
1010 INVARIANT(it->tuple->is_write_intent());
1011 INVARIANT(!it->tuple->is_modifying());
1012 INVARIANT(it->tuple->is_latest());
1013 if (it->is_insert())
1014 // we inserted this node, so we don't want to do the checks below
1016 const tid_t t = it->tuple->version;
1018 // XXX(stephentu): we are overly conservative for now- technically this
1019 // abort isn't necessary (we really should just write the value in the correct
1021 //if (EpochId(t) > u_.commit_epoch) {
1022 // std::cerr << "t: " << g_proto_version_str(t) << std::endl;
1023 // std::cerr << "epoch: " << u_.commit_epoch << std::endl;
1024 // this->dump_debug_info();
1027 // t == dbtuple::MAX_TID when a txn does an insert of a new tuple
1028 // followed by 1+ writes to the same tuple.
1029 INVARIANT(EpochId(t) <= u_.commit_epoch || t == dbtuple::MAX_TID);
1030 if (t != dbtuple::MAX_TID && t > ret)
1034 INVARIANT(EpochId(ret) == u_.commit_epoch);
1035 ret = MakeTid(my_core_id, NumId(ret) + 1, u_.commit_epoch);
1038 // XXX(stephentu): this txn hasn't actually been commited yet,
1039 // and could potentially be aborted - but it's ok to increase this #, since
1040 // subsequent txns on this core will read this # anyways
1041 return (ctx.last_commit_tid_ = ret);
1044 inline ALWAYS_INLINE void
1045 on_dbtuple_spill(dbtuple *tuple_ahead, dbtuple *tuple)
1047 #ifdef PROTO2_CAN_DISABLE_GC
1052 INVARIANT(rcu::s_instance.in_rcu_region());
1053 INVARIANT(!tuple->is_latest());
1055 // >= not > only b/c of the special case of inserting a new tuple +
1056 // overwriting the newly inserted record with a longer sequence of bytes in
1058 INVARIANT(tuple_ahead->version >= tuple->version);
1060 if (tuple->is_deleting()) {
1061 INVARIANT(tuple->is_locked());
1062 INVARIANT(tuple->is_lock_owner());
1067 const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
1068 INVARIANT(to_read_only_tick(EpochId(tuple->version)) <= ro_tick);
1070 #ifdef CHECK_INVARIANTS
1072 INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
1075 // when all snapshots are happening >= the current epoch,
1076 // then we can safely remove tuple
1077 threadctx &ctx = g_threadctxs.my();
1079 delete_entry(tuple_ahead, tuple_ahead->version,
1080 tuple, marked_ptr<std::string>(), nullptr),
1084 inline ALWAYS_INLINE void
1085 on_logical_delete(dbtuple *tuple, const std::string &key, concurrent_btree *btr)
1087 #ifdef PROTO2_CAN_DISABLE_GC
1092 INVARIANT(tuple->is_locked());
1093 INVARIANT(tuple->is_lock_owner());
1094 INVARIANT(tuple->is_write_intent());
1095 INVARIANT(tuple->is_latest());
1096 INVARIANT(tuple->is_deleting());
1097 INVARIANT(!tuple->size);
1098 INVARIANT(rcu::s_instance.in_rcu_region());
1100 const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
1101 threadctx &ctx = g_threadctxs.my();
1103 #ifdef CHECK_INVARIANTS
1105 INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
1108 if (likely(key.size() <= tuple->alloc_size)) {
1109 NDB_MEMCPY(tuple->get_value_start(), key.data(), key.size());
1110 tuple->size = key.size();
1112 // eligible for deletion when all snapshots >= the current epoch
1113 marked_ptr<std::string> mpx;
1117 delete_entry(nullptr, tuple->version, tuple, mpx, btr),
1120 // this is a rare event
1121 ++g_evt_dbtuple_no_space_for_delkey;
1122 std::string *spx = nullptr;
1123 if (ctx.pool_.empty()) {
1124 spx = new std::string(key.data(), key.size()); // XXX: use numa memory?
1126 spx = ctx.pool_.front();
1127 ctx.pool_.pop_front();
1128 spx->assign(key.data(), key.size());
1132 marked_ptr<std::string> mpx(spx);
1136 delete_entry(nullptr, tuple->version, tuple, mpx, btr),
1142 on_post_rcu_region_completion()
1144 #ifdef PROTO2_CAN_DISABLE_GC
1148 const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
1149 if (unlikely(!last_tick_ex))
1151 // we subtract one from the global last tick, because of the way
1152 // consistent TIDs are computed, the global_last_tick_exclusive() can
1153 // increase by at most one tick during a transaction.
1154 const uint64_t ro_tick_ex = to_read_only_tick(last_tick_ex - 1);
1155 if (unlikely(!ro_tick_ex))
1156 // won't have anything to clean
1158 // all reads happening at >= ro_tick_geq
1159 const uint64_t ro_tick_geq = ro_tick_ex - 1;
1160 threadctx &ctx = g_threadctxs.my();
1161 clean_up_to_including(ctx, ro_tick_geq);
1167 // the global epoch this txn is running in (this # is read when it starts)
1168 // -- snapshot txns only
1169 uint64_t last_consistent_tid;
1170 // the epoch for this txn -- committing non-snapshot txns only
1171 uint64_t commit_epoch;
1175 // txn_btree_handler specialization
1177 struct base_txn_btree_handler<transaction_proto2> {
1181 #ifndef PROTO2_CAN_DISABLE_GC
1182 transaction_proto2_static::InitGC();
1185 static const bool has_background_task = true;
1189 struct txn_epoch_sync<transaction_proto2> : public transaction_proto2_static {
1194 if (txn_logger::IsPersistenceEnabled())
1195 txn_logger::wait_until_current_point_persisted();
1200 if (txn_logger::IsPersistenceEnabled())
1201 txn_logger::wait_until_current_point_persisted();
1204 thread_init(bool loader)
1206 if (!txn_logger::IsPersistenceEnabled())
1208 const unsigned long my_core_id = coreid::core_id();
1209 // try to initialize using numa allocator
1210 txn_logger::persist_ctx_for(
1212 loader ? txn_logger::INITMODE_REG : txn_logger::INITMODE_RCU);
1217 if (!txn_logger::IsPersistenceEnabled())
1219 const unsigned long my_core_id = coreid::core_id();
1220 txn_logger::persist_ctx &ctx =
1221 txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_NONE);
1222 if (unlikely(!ctx.init_))
1224 txn_logger::persist_stats &stats =
1225 txn_logger::g_persist_stats[my_core_id];
1226 txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
1227 txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
1228 if (txn_logger::IsCompressionEnabled() &&
1229 ctx.horizon_->header()->nentries_) {
1230 INVARIANT(ctx.horizon_->datasize());
1231 const uint64_t npushed =
1232 push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
1234 util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
1236 txn_logger::pbuffer *px = pull_buf.peek();
1237 if (!px || !px->header()->nentries_) {
1238 //std::cerr << "core " << my_core_id
1239 // << " nothing to push to logger" << std::endl;
1242 //std::cerr << "core " << my_core_id
1243 // << " pushing buffer to logger" << std::endl;
1244 txn_logger::pbuffer *px0 = pull_buf.deq();
1245 util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
1246 INVARIANT(px0 == px);
1249 static std::tuple<uint64_t, uint64_t, double>
1250 compute_ntxn_persisted()
1252 if (!txn_logger::IsPersistenceEnabled())
1253 return std::make_tuple(0, 0, 0.0);
1254 return txn_logger::compute_ntxns_persisted_statistics();
1257 reset_ntxn_persisted()
1259 if (!txn_logger::IsPersistenceEnabled())
1261 txn_logger::clear_ntxns_persisted_statistics();
1265 #endif /* _NDB_TXN_PROTO2_IMPL_H_ */