benchmark silo added
[c11concurrency-benchmarks.git] / silo / txn_proto2_impl.h
1 #ifndef _NDB_TXN_PROTO2_IMPL_H_
2 #define _NDB_TXN_PROTO2_IMPL_H_
3
4 #include <iostream>
5 #include <atomic>
6 #include <vector>
7 #include <set>
8
9 #include <lz4.h>
10
11 #include "txn.h"
12 #include "txn_impl.h"
13 #include "txn_btree.h"
14 #include "macros.h"
15 #include "circbuf.h"
16 #include "spinbarrier.h"
17 #include "record/serializer.h"
18
19 // forward decl
20 template <typename Traits> class transaction_proto2;
21 template <template <typename> class Transaction>
22   class txn_epoch_sync;
23
24 // the system has a single logging subsystem (composed of multiple lgogers)
25 // NOTE: currently, the persistence epoch is tied 1:1 with the ticker's epoch
26 class txn_logger {
27   friend class transaction_proto2_static;
28   template <typename T>
29     friend class transaction_proto2;
30   // XXX: should only allow txn_epoch_sync<transaction_proto2> as friend
31   template <template <typename> class T>
32     friend class txn_epoch_sync;
33 public:
34
35   static const size_t g_nmax_loggers = 16;
36   static const size_t g_perthread_buffers = 256; // 256 outstanding buffers
37   static const size_t g_buffer_size = (1<<20); // in bytes
38   static const size_t g_horizon_buffer_size = 2 * (1<<16); // in bytes
39   static const size_t g_max_lag_epochs = 128; // cannot lag more than 128 epochs
40   static const bool   g_pin_loggers_to_numa_nodes = false;
41
42   static inline bool
43   IsPersistenceEnabled()
44   {
45     return g_persist;
46   }
47
48   static inline bool
49   IsCompressionEnabled()
50   {
51     return g_use_compression;
52   }
53
54   // init the logging subsystem.
55   //
56   // should only be called ONCE is not thread-safe.  if assignments_used is not
57   // null, then fills it with a copy of the assignment actually computed
58   static void Init(
59       size_t nworkers,
60       const std::vector<std::string> &logfiles,
61       const std::vector<std::vector<unsigned>> &assignments_given,
62       std::vector<std::vector<unsigned>> *assignments_used = nullptr,
63       bool call_fsync = true,
64       bool use_compression = false,
65       bool fake_writes = false);
66
67   struct logbuf_header {
68     uint64_t nentries_; // > 0 for all valid log buffers
69     uint64_t last_tid_; // TID of the last commit
70   } PACKED;
71
72   struct pbuffer {
73     uint64_t earliest_start_us_; // start time of the earliest txn
74     bool io_scheduled_; // has the logger scheduled IO yet?
75
76     unsigned curoff_; // current offset into buf_ for writing
77
78     const unsigned core_id_; // which core does this pbuffer belong to?
79
80     const unsigned buf_sz_;
81
82     // must be last field
83     uint8_t buf_start_[0];
84
85     // to allocate a pbuffer, use placement new:
86     //    const size_t bufsz = ...;
87     //    char *p = malloc(sizeof(pbuffer) + bufsz);
88     //    pbuffer *pb = new (p) pbuffer(core_id, bufsz);
89     //
90     // NOTE: it is not necessary to call the destructor for pbuffer, since
91     // it only contains PODs
92     pbuffer(unsigned core_id, unsigned buf_sz)
93       : core_id_(core_id), buf_sz_(buf_sz)
94     {
95       INVARIANT(((char *)this) + sizeof(*this) == (char *) &buf_start_[0]);
96       INVARIANT(buf_sz > sizeof(logbuf_header));
97       reset();
98     }
99
100     pbuffer(const pbuffer &) = delete;
101     pbuffer &operator=(const pbuffer &) = delete;
102     pbuffer(pbuffer &&) = delete;
103
104     inline void
105     reset()
106     {
107       earliest_start_us_ = 0;
108       io_scheduled_ = false;
109       curoff_ = sizeof(logbuf_header);
110       NDB_MEMSET(&buf_start_[0], 0, buf_sz_);
111     }
112
113     inline uint8_t *
114     pointer()
115     {
116       INVARIANT(curoff_ >= sizeof(logbuf_header));
117       INVARIANT(curoff_ <= buf_sz_);
118       return &buf_start_[0] + curoff_;
119     }
120
121     inline uint8_t *
122     datastart()
123     {
124       return &buf_start_[0] + sizeof(logbuf_header);
125     }
126
127     inline size_t
128     datasize() const
129     {
130       INVARIANT(curoff_ >= sizeof(logbuf_header));
131       INVARIANT(curoff_ <= buf_sz_);
132       return curoff_ - sizeof(logbuf_header);
133     }
134
135     inline logbuf_header *
136     header()
137     {
138       return reinterpret_cast<logbuf_header *>(&buf_start_[0]);
139     }
140
141     inline const logbuf_header *
142     header() const
143     {
144       return reinterpret_cast<const logbuf_header *>(&buf_start_[0]);
145     }
146
147     inline size_t
148     space_remaining() const
149     {
150       INVARIANT(curoff_ >= sizeof(logbuf_header));
151       INVARIANT(curoff_ <= buf_sz_);
152       return buf_sz_ - curoff_;
153     }
154
155     inline bool
156     can_hold_tid(uint64_t tid) const;
157   } PACKED;
158
159   static bool
160   AssignmentsValid(const std::vector<std::vector<unsigned>> &assignments,
161                    unsigned nfds,
162                    unsigned nworkers)
163   {
164     // each worker must be assigned exactly once in the assignment
165     // there must be <= nfds assignments
166
167     if (assignments.size() > nfds)
168       return false;
169
170     std::set<unsigned> seen;
171     for (auto &assignment : assignments)
172       for (auto w : assignment) {
173         if (seen.count(w) || w >= nworkers)
174           return false;
175         seen.insert(w);
176       }
177
178     return seen.size() == nworkers;
179   }
180
181   typedef circbuf<pbuffer, g_perthread_buffers> pbuffer_circbuf;
182
183   static std::tuple<uint64_t, uint64_t, double>
184   compute_ntxns_persisted_statistics();
185
186   // purge counters from each thread about the number of
187   // persisted txns
188   static void
189   clear_ntxns_persisted_statistics();
190
191   // wait until the logging system appears to be idle.
192   //
193   // note that this isn't a guarantee, just a best effort attempt
194   static void
195   wait_for_idle_state();
196
197   // waits until the epoch on invocation time is persisted
198   static void
199   wait_until_current_point_persisted();
200
201 private:
202
203   // data structures
204
205   struct epoch_array {
206     // don't use percore<std::atomic<uint64_t>> because we don't want padding
207     std::atomic<uint64_t> epochs_[NMAXCORES];
208     std::atomic<uint64_t> dummy_work_; // so we can do some fake work
209     CACHE_PADOUT;
210   };
211
212   struct persist_ctx {
213     bool init_;
214
215     void *lz4ctx_;     // for compression
216     pbuffer *horizon_; // for compression
217
218     circbuf<pbuffer, g_perthread_buffers> all_buffers_;     // logger pushes to core
219     circbuf<pbuffer, g_perthread_buffers> persist_buffers_; // core pushes to logger
220
221     persist_ctx() : init_(false), lz4ctx_(nullptr), horizon_(nullptr) {}
222   };
223
224   // context per one epoch
225   struct persist_stats {
226     // how many txns this thread has persisted in total
227     std::atomic<uint64_t> ntxns_persisted_;
228
229     // how many txns have been pushed to the logger (but not necessarily persisted)
230     std::atomic<uint64_t> ntxns_pushed_;
231
232     // committed (but not necessarily pushed, nor persisted)
233     std::atomic<uint64_t> ntxns_committed_;
234
235     // sum of all latencies (divid by ntxns_persisted_ to get avg latency in
236     // us) for *persisted* txns (is conservative)
237     std::atomic<uint64_t> latency_numer_;
238
239     // per last g_max_lag_epochs information
240     struct per_epoch_stats {
241       std::atomic<uint64_t> ntxns_;
242       std::atomic<uint64_t> earliest_start_us_;
243
244       per_epoch_stats() : ntxns_(0), earliest_start_us_(0) {}
245     } d_[g_max_lag_epochs];
246
247     persist_stats() :
248       ntxns_persisted_(0), ntxns_pushed_(0),
249       ntxns_committed_(0), latency_numer_(0) {}
250   };
251
252   // helpers
253
254   static void
255   advance_system_sync_epoch(
256       const std::vector<std::vector<unsigned>> &assignments);
257
258   // makes copy on purpose
259   static void writer(
260       unsigned id, int fd,
261       std::vector<unsigned> assignment);
262
263   static void persister(
264       std::vector<std::vector<unsigned>> assignments);
265
266   enum InitMode {
267     INITMODE_NONE, // no initialization
268     INITMODE_REG,  // just use malloc() to init buffers
269     INITMODE_RCU,  // try to use the RCU numa aware allocator
270   };
271
272   static inline persist_ctx &
273   persist_ctx_for(uint64_t core_id, InitMode imode)
274   {
275     INVARIANT(core_id < g_persist_ctxs.size());
276     persist_ctx &ctx = g_persist_ctxs[core_id];
277     if (unlikely(!ctx.init_ && imode != INITMODE_NONE)) {
278       size_t needed = g_perthread_buffers * (sizeof(pbuffer) + g_buffer_size);
279       if (IsCompressionEnabled())
280         needed += size_t(LZ4_create_size()) +
281           sizeof(pbuffer) + g_horizon_buffer_size;
282       char *mem =
283         (imode == INITMODE_REG) ?
284           (char *) malloc(needed) :
285           (char *) rcu::s_instance.alloc_static(needed);
286       if (IsCompressionEnabled()) {
287         ctx.lz4ctx_ = mem;
288         mem += LZ4_create_size();
289         ctx.horizon_ = new (mem) pbuffer(core_id, g_horizon_buffer_size);
290         mem += sizeof(pbuffer) + g_horizon_buffer_size;
291       }
292       for (size_t i = 0; i < g_perthread_buffers; i++) {
293         ctx.all_buffers_.enq(new (mem) pbuffer(core_id, g_buffer_size));
294         mem += sizeof(pbuffer) + g_buffer_size;
295       }
296       ctx.init_ = true;
297     }
298     return ctx;
299   }
300
301   // static state
302
303   static bool g_persist; // whether or not logging is enabled
304
305   static bool g_call_fsync; // whether or not fsync() needs to be called
306                             // in order to be considered durable
307
308   static bool g_use_compression; // whether or not to compress log buffers
309
310   static bool g_fake_writes; // whether or not to fake doing writes (to measure
311                              // pure overhead of disk)
312
313   static size_t g_nworkers; // assignments are computed based on g_nworkers
314                             // but a logger responsible for core i is really
315                             // responsible for cores i + k * g_nworkers, for k
316                             // >= 0
317
318   // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
319   // through (including) all transactions <= epoch v on core j. since core =>
320   // logger mapping is static, taking:
321   //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
322   // yields the entire system's persistent epoch
323   static epoch_array
324     per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
325
326   // conservative estimate (<=) for:
327   //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
328   static util::aligned_padded_elem<std::atomic<uint64_t>>
329     system_sync_epoch_ CACHE_ALIGNED;
330
331   static percore<persist_ctx> g_persist_ctxs CACHE_ALIGNED;
332
333   static percore<persist_stats> g_persist_stats CACHE_ALIGNED;
334
335   // counters
336
337   static event_counter g_evt_log_buffer_epoch_boundary;
338   static event_counter g_evt_log_buffer_out_of_space;
339   static event_counter g_evt_log_buffer_bytes_before_compress;
340   static event_counter g_evt_log_buffer_bytes_after_compress;
341   static event_counter g_evt_logger_writev_limit_met;
342   static event_counter g_evt_logger_max_lag_wait;
343   static event_avg_counter g_evt_avg_log_entry_ntxns;
344   static event_avg_counter g_evt_avg_log_buffer_compress_time_us;
345   static event_avg_counter g_evt_avg_logger_bytes_per_writev;
346   static event_avg_counter g_evt_avg_logger_bytes_per_sec;
347 };
348
349 static inline std::ostream &
350 operator<<(std::ostream &o, txn_logger::logbuf_header &hdr)
351 {
352   o << "{nentries_=" << hdr.nentries_ << ", last_tid_="
353     << g_proto_version_str(hdr.last_tid_) << "}";
354   return o;
355 }
356
357 class transaction_proto2_static {
358 public:
359
360   // NOTE:
361   // each epoch is tied (1:1) to the ticker subsystem's tick. this is the
362   // speed of the persistence layer.
363   //
364   // however, read only txns and GC are tied to multiples of the ticker
365   // subsystem's tick
366
367 #ifdef CHECK_INVARIANTS
368   static const uint64_t ReadOnlyEpochMultiplier = 10; /* 10 * 1 ms */
369 #else
370   static const uint64_t ReadOnlyEpochMultiplier = 25; /* 25 * 40 ms */
371   static_assert(ticker::tick_us * ReadOnlyEpochMultiplier == 1000000, "");
372 #endif
373
374   static_assert(ReadOnlyEpochMultiplier >= 1, "XX");
375
376   static const uint64_t ReadOnlyEpochUsec =
377     ticker::tick_us * ReadOnlyEpochMultiplier;
378
379   static inline uint64_t constexpr
380   to_read_only_tick(uint64_t epoch_tick)
381   {
382     return epoch_tick / ReadOnlyEpochMultiplier;
383   }
384
385   // in this protocol, the version number is:
386   // (note that for tid_t's, the top bit is reserved and
387   // *must* be set to zero
388   //
389   // [ core  | number |  epoch | reserved ]
390   // [ 0..9  | 9..33  | 33..63 |  63..64  ]
391
392   static inline ALWAYS_INLINE
393   uint64_t CoreId(uint64_t v)
394   {
395     return v & CoreMask;
396   }
397
398   static inline ALWAYS_INLINE
399   uint64_t NumId(uint64_t v)
400   {
401     return (v & NumIdMask) >> NumIdShift;
402   }
403
404   static inline ALWAYS_INLINE
405   uint64_t EpochId(uint64_t v)
406   {
407     return (v & EpochMask) >> EpochShift;
408   }
409
410   // XXX(stephentu): HACK
411   static void
412   wait_an_epoch()
413   {
414     INVARIANT(!rcu::s_instance.in_rcu_region());
415     const uint64_t e = to_read_only_tick(
416         ticker::s_instance.global_last_tick_exclusive());
417     if (!e) {
418       std::cerr << "wait_an_epoch(): consistent reads happening in e-1, but e=0 so special case"
419                 << std::endl;
420     } else {
421       std::cerr << "wait_an_epoch(): consistent reads happening in e-1: "
422                 << (e-1) << std::endl;
423     }
424     while (to_read_only_tick(ticker::s_instance.global_last_tick_exclusive()) == e)
425       nop_pause();
426     COMPILER_MEMORY_FENCE;
427   }
428
429   static uint64_t
430   ComputeReadOnlyTid(uint64_t global_tick_ex)
431   {
432     const uint64_t a = (global_tick_ex / ReadOnlyEpochMultiplier);
433     const uint64_t b = a * ReadOnlyEpochMultiplier;
434
435     // want to read entries <= b-1, special casing for b=0
436     if (!b)
437       return MakeTid(0, 0, 0);
438     else
439       return MakeTid(CoreMask, NumIdMask >> NumIdShift, b - 1);
440   }
441
442   static const uint64_t NBitsNumber = 24;
443
444   // XXX(stephentu): need to implement core ID recycling
445   static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
446   static const size_t NMaxCores = NMAXCORES;
447
448   static const uint64_t CoreMask = (NMaxCores - 1);
449
450   static const uint64_t NumIdShift = CoreBits;
451   static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
452
453   static const uint64_t EpochShift = CoreBits + NBitsNumber;
454   // since the reserve bit is always zero, we don't need a special mask
455   static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
456
457   static inline ALWAYS_INLINE
458   uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
459   {
460     // some sanity checking
461     static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
462     static_assert((CoreMask & NumIdMask) == 0, "xx");
463     static_assert((NumIdMask & EpochMask) == 0, "xx");
464     return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
465   }
466
467   static inline void
468   set_hack_status(bool hack_status)
469   {
470     g_hack->status_ = hack_status;
471   }
472
473   static inline bool
474   get_hack_status()
475   {
476     return g_hack->status_;
477   }
478
479   // thread-safe, can be called many times
480   static void InitGC();
481
482   static void PurgeThreadOutstandingGCTasks();
483
484 #ifdef PROTO2_CAN_DISABLE_GC
485   static inline bool
486   IsGCEnabled()
487   {
488     return g_flags->g_gc_init.load(std::memory_order_acquire);
489   }
490 #endif
491
492 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
493   static void
494   DisableSnapshots()
495   {
496     g_flags->g_disable_snapshots.store(true, std::memory_order_release);
497   }
498   static inline bool
499   IsSnapshotsEnabled()
500   {
501     return !g_flags->g_disable_snapshots.load(std::memory_order_acquire);
502   }
503 #endif
504
505 protected:
506   struct delete_entry {
507 #ifdef CHECK_INVARIANTS
508     dbtuple *tuple_ahead_;
509     uint64_t trigger_tid_;
510 #endif
511
512     dbtuple *tuple_;
513     marked_ptr<std::string> key_;
514     concurrent_btree *btr_;
515
516     delete_entry()
517       :
518 #ifdef CHECK_INVARIANTS
519         tuple_ahead_(nullptr),
520         trigger_tid_(0),
521 #endif
522         tuple_(),
523         key_(),
524         btr_(nullptr) {}
525
526     delete_entry(dbtuple *tuple_ahead,
527                  uint64_t trigger_tid,
528                  dbtuple *tuple,
529                  const marked_ptr<std::string> &key,
530                  concurrent_btree *btr)
531       :
532 #ifdef CHECK_INVARIANTS
533         tuple_ahead_(tuple_ahead),
534         trigger_tid_(trigger_tid),
535 #endif
536         tuple_(tuple),
537         key_(key),
538         btr_(btr) {}
539
540     inline dbtuple *
541     tuple()
542     {
543       return tuple_;
544     }
545   };
546
547   typedef basic_px_queue<delete_entry, 4096> px_queue;
548
549   struct threadctx {
550     uint64_t last_commit_tid_;
551     unsigned last_reaped_epoch_;
552 #ifdef ENABLE_EVENT_COUNTERS
553     uint64_t last_reaped_timestamp_us_;
554 #endif
555     px_queue queue_;
556     px_queue scratch_;
557     std::deque<std::string *> pool_;
558     threadctx() :
559         last_commit_tid_(0)
560       , last_reaped_epoch_(0)
561 #ifdef ENABLE_EVENT_COUNTERS
562       , last_reaped_timestamp_us_(0)
563 #endif
564     {
565       ALWAYS_ASSERT(((uintptr_t)this % CACHELINE_SIZE) == 0);
566       queue_.alloc_freelist(rcu::NQueueGroups);
567       scratch_.alloc_freelist(rcu::NQueueGroups);
568     }
569   };
570
571   static void
572   clean_up_to_including(threadctx &ctx, uint64_t ro_tick_geq);
573
574   // helper methods
575   static inline txn_logger::pbuffer *
576   wait_for_head(txn_logger::pbuffer_circbuf &pull_buf)
577   {
578     // XXX(stephentu): spinning for now
579     txn_logger::pbuffer *px;
580     while (unlikely(!(px = pull_buf.peek()))) {
581       nop_pause();
582       ++g_evt_worker_thread_wait_log_buffer;
583     }
584     INVARIANT(!px->io_scheduled_);
585     return px;
586   }
587
588   // pushes horizon to the front entry of pull_buf, pushing
589   // to push_buf if necessary
590   //
591   // horizon is reset after push_horizon_to_buffer() returns
592   //
593   // returns the number of txns pushed from buffer to *logger*
594   // (if doing so was necessary)
595   static inline size_t
596   push_horizon_to_buffer(txn_logger::pbuffer *horizon,
597                          void *lz4ctx,
598                          txn_logger::pbuffer_circbuf &pull_buf,
599                          txn_logger::pbuffer_circbuf &push_buf)
600   {
601     INVARIANT(txn_logger::IsCompressionEnabled());
602     if (unlikely(!horizon->header()->nentries_))
603       return 0;
604     INVARIANT(horizon->datasize());
605
606     size_t ntxns_pushed_to_logger = 0;
607
608     // horizon out of space- try to push horizon to buffer
609     txn_logger::pbuffer *px = wait_for_head(pull_buf);
610     const uint64_t compressed_space_needed =
611       sizeof(uint32_t) + LZ4_compressBound(horizon->datasize());
612
613     bool buffer_cond = false;
614     if (px->space_remaining() < compressed_space_needed ||
615         (buffer_cond = !px->can_hold_tid(horizon->header()->last_tid_))) {
616       // buffer out of space- push buffer to logger
617       INVARIANT(px->header()->nentries_);
618       ntxns_pushed_to_logger = px->header()->nentries_;
619       txn_logger::pbuffer *px1 = pull_buf.deq();
620       INVARIANT(px == px1);
621       push_buf.enq(px1);
622       px = wait_for_head(pull_buf);
623       if (buffer_cond)
624         ++txn_logger::g_evt_log_buffer_epoch_boundary;
625       else
626         ++txn_logger::g_evt_log_buffer_out_of_space;
627     }
628
629     INVARIANT(px->space_remaining() >= compressed_space_needed);
630     if (!px->header()->nentries_)
631       px->earliest_start_us_ = horizon->earliest_start_us_;
632     px->header()->nentries_ += horizon->header()->nentries_;
633     px->header()->last_tid_  = horizon->header()->last_tid_;
634
635 #ifdef ENABLE_EVENT_COUNTERS
636     util::timer tt;
637 #endif
638     const int ret = LZ4_compress_heap_limitedOutput(
639         lz4ctx,
640         (const char *) horizon->datastart(),
641         (char *) px->pointer() + sizeof(uint32_t),
642         horizon->datasize(),
643         px->space_remaining() - sizeof(uint32_t));
644 #ifdef ENABLE_EVENT_COUNTERS
645     txn_logger::g_evt_avg_log_buffer_compress_time_us.offer(tt.lap());
646     txn_logger::g_evt_log_buffer_bytes_before_compress.inc(horizon->datasize());
647     txn_logger::g_evt_log_buffer_bytes_after_compress.inc(ret);
648 #endif
649     INVARIANT(ret > 0);
650 #if defined(CHECK_INVARIANTS) && defined(PARANOID_CHECKING)
651     {
652       uint8_t decode_buf[txn_logger::g_horizon_buffer_size];
653       const int decode_ret =
654         LZ4_decompress_safe_partial(
655             (const char *) px->pointer() + sizeof(uint32_t),
656             (char *) &decode_buf[0],
657             ret,
658             txn_logger::g_horizon_buffer_size,
659             txn_logger::g_horizon_buffer_size);
660       INVARIANT(decode_ret >= 0);
661       INVARIANT(size_t(decode_ret) == horizon->datasize());
662       INVARIANT(memcmp(horizon->datastart(),
663                        &decode_buf[0], decode_ret) == 0);
664     }
665 #endif
666
667     serializer<uint32_t, false> s_uint32_t;
668     s_uint32_t.write(px->pointer(), ret);
669     px->curoff_ += sizeof(uint32_t) + uint32_t(ret);
670     horizon->reset();
671
672     return ntxns_pushed_to_logger;
673   }
674
675   struct hackstruct {
676     std::atomic<bool> status_;
677     std::atomic<uint64_t> global_tid_;
678     constexpr hackstruct() : status_(false), global_tid_(0) {}
679   };
680
681   // use to simulate global TID for comparsion
682   static util::aligned_padded_elem<hackstruct>
683     g_hack CACHE_ALIGNED;
684
685   struct flags {
686     std::atomic<bool> g_gc_init;
687     std::atomic<bool> g_disable_snapshots;
688     constexpr flags() : g_gc_init(false), g_disable_snapshots(false) {}
689   };
690   static util::aligned_padded_elem<flags> g_flags;
691
692   static percore_lazy<threadctx> g_threadctxs;
693
694   static event_counter g_evt_worker_thread_wait_log_buffer;
695   static event_counter g_evt_dbtuple_no_space_for_delkey;
696   static event_counter g_evt_proto_gc_delete_requeue;
697   static event_avg_counter g_evt_avg_log_entry_size;
698   static event_avg_counter g_evt_avg_proto_gc_queue_len;
699 };
700
701 bool
702 txn_logger::pbuffer::can_hold_tid(uint64_t tid) const
703 {
704   return !header()->nentries_ ||
705          (transaction_proto2_static::EpochId(header()->last_tid_) ==
706           transaction_proto2_static::EpochId(tid));
707 }
708
709 // protocol 2 - no global consistent TIDs
710 template <typename Traits>
711 class transaction_proto2 : public transaction<transaction_proto2, Traits>,
712                            private transaction_proto2_static {
713
714   friend class transaction<transaction_proto2, Traits>;
715   typedef transaction<transaction_proto2, Traits> super_type;
716
717 public:
718
719   typedef Traits traits_type;
720   typedef transaction_base::tid_t tid_t;
721   typedef transaction_base::string_type string_type;
722   typedef typename super_type::dbtuple_write_info dbtuple_write_info;
723   typedef typename super_type::dbtuple_write_info_vec dbtuple_write_info_vec;
724   typedef typename super_type::read_set_map read_set_map;
725   typedef typename super_type::absent_set_map absent_set_map;
726   typedef typename super_type::write_set_map write_set_map;
727   typedef typename super_type::write_set_u32_vec write_set_u32_vec;
728
729   transaction_proto2(uint64_t flags,
730                      typename Traits::StringAllocator &sa)
731     : transaction<transaction_proto2, Traits>(flags, sa)
732   {
733     if (this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY) {
734       const uint64_t global_tick_ex =
735         this->rcu_guard_->guard()->impl().global_last_tick_exclusive();
736       u_.last_consistent_tid = ComputeReadOnlyTid(global_tick_ex);
737     }
738 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
739     dbtuple::TupleLockRegionBegin();
740 #endif
741     INVARIANT(rcu::s_instance.in_rcu_region());
742   }
743
744   ~transaction_proto2()
745   {
746 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
747     dbtuple::AssertAllTupleLocksReleased();
748 #endif
749     INVARIANT(rcu::s_instance.in_rcu_region());
750   }
751
752   inline bool
753   can_overwrite_record_tid(tid_t prev, tid_t cur) const
754   {
755     INVARIANT(prev <= cur);
756
757 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
758     if (!IsSnapshotsEnabled())
759       return true;
760 #endif
761
762     // XXX(stephentu): the !prev check is a *bit* of a hack-
763     // we're assuming that !prev (MIN_TID) corresponds to an
764     // absent (removed) record, so it is safe to overwrite it,
765     //
766     // This is an OK assumption with *no TID wrap around*.
767     return (to_read_only_tick(EpochId(prev)) ==
768             to_read_only_tick(EpochId(cur))) ||
769            !prev;
770   }
771
772   // can only read elements in this epoch or previous epochs
773   inline bool
774   can_read_tid(tid_t t) const
775   {
776     return true;
777   }
778
779   inline void
780   on_tid_finish(tid_t commit_tid)
781   {
782     if (!txn_logger::IsPersistenceEnabled() ||
783         this->state != transaction_base::TXN_COMMITED)
784       return;
785     // need to write into log buffer
786
787     serializer<uint32_t, true> vs_uint32_t;
788
789     // compute how much space is necessary
790     uint64_t space_needed = 0;
791
792     // 8 bytes to indicate TID
793     space_needed += sizeof(uint64_t);
794
795     // variable bytes to indicate # of records written
796 #ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
797     const unsigned nwrites = 0;
798 #else
799     const unsigned nwrites = this->write_set.size();
800 #endif
801
802     space_needed += vs_uint32_t.nbytes(&nwrites);
803
804     // each record needs to be recorded
805     write_set_u32_vec value_sizes;
806     for (unsigned idx = 0; idx < nwrites; idx++) {
807       const transaction_base::write_record_t &rec = this->write_set[idx];
808       const uint32_t k_nbytes = rec.get_key().size();
809       space_needed += vs_uint32_t.nbytes(&k_nbytes);
810       space_needed += k_nbytes;
811
812       const uint32_t v_nbytes = rec.get_value() ?
813           rec.get_writer()(
814               dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED,
815               rec.get_value(), nullptr, 0) : 0;
816       space_needed += vs_uint32_t.nbytes(&v_nbytes);
817       space_needed += v_nbytes;
818
819       value_sizes.push_back(v_nbytes);
820     }
821
822     g_evt_avg_log_entry_size.offer(space_needed);
823     INVARIANT(space_needed <= txn_logger::g_horizon_buffer_size);
824     INVARIANT(space_needed <= txn_logger::g_buffer_size);
825
826     const unsigned long my_core_id = coreid::core_id();
827
828     txn_logger::persist_ctx &ctx =
829       txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_REG);
830     txn_logger::persist_stats &stats =
831       txn_logger::g_persist_stats[my_core_id];
832     txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
833     txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
834
835     util::non_atomic_fetch_add(stats.ntxns_committed_, 1UL);
836
837     const bool do_compress = txn_logger::IsCompressionEnabled();
838     if (do_compress) {
839       // try placing in horizon
840       bool horizon_cond = false;
841       if (ctx.horizon_->space_remaining() < space_needed ||
842           (horizon_cond = !ctx.horizon_->can_hold_tid(commit_tid))) {
843         if (!ctx.horizon_->datasize()) {
844           std::cerr << "space_needed: " << space_needed << std::endl;
845           std::cerr << "space_remaining: " << ctx.horizon_->space_remaining() << std::endl;
846           std::cerr << "can_hold_tid: " << ctx.horizon_->can_hold_tid(commit_tid) << std::endl;
847         }
848         INVARIANT(ctx.horizon_->datasize());
849         // horizon out of space, so we push it
850         const uint64_t npushed =
851           push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
852         if (npushed)
853           util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
854       }
855
856       INVARIANT(ctx.horizon_->space_remaining() >= space_needed);
857       const uint64_t written =
858         write_current_txn_into_buffer(ctx.horizon_, commit_tid, value_sizes);
859       if (written != space_needed)
860         INVARIANT(false);
861
862     } else {
863
864     retry:
865       txn_logger::pbuffer *px = wait_for_head(pull_buf);
866       INVARIANT(px && px->core_id_ == my_core_id);
867       bool cond = false;
868       if (px->space_remaining() < space_needed ||
869           (cond = !px->can_hold_tid(commit_tid))) {
870         INVARIANT(px->header()->nentries_);
871         txn_logger::pbuffer *px0 = pull_buf.deq();
872         INVARIANT(px == px0);
873         INVARIANT(px0->header()->nentries_);
874         util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
875         push_buf.enq(px0);
876         if (cond)
877           ++txn_logger::g_evt_log_buffer_epoch_boundary;
878         else
879           ++txn_logger::g_evt_log_buffer_out_of_space;
880         goto retry;
881       }
882
883       const uint64_t written =
884         write_current_txn_into_buffer(px, commit_tid, value_sizes);
885       if (written != space_needed)
886         INVARIANT(false);
887     }
888   }
889
890 private:
891
892   // assumes enough space in px to hold this txn
893   inline uint64_t
894   write_current_txn_into_buffer(
895       txn_logger::pbuffer *px,
896       uint64_t commit_tid,
897       const write_set_u32_vec &value_sizes)
898   {
899     INVARIANT(px->can_hold_tid(commit_tid));
900
901     if (unlikely(!px->header()->nentries_))
902       px->earliest_start_us_ = this->rcu_guard_->guard()->start_us();
903
904     uint8_t *p = px->pointer();
905     uint8_t *porig = p;
906
907     serializer<uint32_t, true> vs_uint32_t;
908     serializer<uint64_t, false> s_uint64_t;
909
910 #ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
911     const unsigned nwrites = 0;
912 #else
913     const unsigned nwrites = this->write_set.size();
914 #endif
915
916
917     INVARIANT(nwrites == value_sizes.size());
918
919     p = s_uint64_t.write(p, commit_tid);
920     p = vs_uint32_t.write(p, nwrites);
921
922     for (unsigned idx = 0; idx < nwrites; idx++) {
923       const transaction_base::write_record_t &rec = this->write_set[idx];
924       const uint32_t k_nbytes = rec.get_key().size();
925       p = vs_uint32_t.write(p, k_nbytes);
926       NDB_MEMCPY(p, rec.get_key().data(), k_nbytes);
927       p += k_nbytes;
928       const uint32_t v_nbytes = value_sizes[idx];
929       p = vs_uint32_t.write(p, v_nbytes);
930       if (v_nbytes) {
931         rec.get_writer()(dbtuple::TUPLE_WRITER_DO_DELTA_WRITE, rec.get_value(), p, v_nbytes);
932         p += v_nbytes;
933       }
934     }
935
936     px->curoff_ += (p - porig);
937     px->header()->nentries_++;
938     px->header()->last_tid_ = commit_tid;
939
940     return uint64_t(p - porig);
941   }
942
943 public:
944
945   inline ALWAYS_INLINE bool is_snapshot() const {
946     return this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY;
947   }
948
949   inline transaction_base::tid_t
950   snapshot_tid() const
951   {
952 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
953     if (!IsSnapshotsEnabled())
954       // when snapshots are disabled, but we have a RO txn, we simply allow
955       // it to read all the latest values and treat them as consistent
956       //
957       // it's not correct, but its for the factor analysis
958       return dbtuple::MAX_TID;
959 #endif
960     return u_.last_consistent_tid;
961   }
962
963   void
964   dump_debug_info() const
965   {
966     transaction<transaction_proto2, Traits>::dump_debug_info();
967     if (this->is_snapshot())
968       std::cerr << "  last_consistent_tid: "
969         << g_proto_version_str(u_.last_consistent_tid) << std::endl;
970   }
971
972   transaction_base::tid_t
973   gen_commit_tid(const dbtuple_write_info_vec &write_tuples)
974   {
975     const size_t my_core_id = this->rcu_guard_->guard()->core();
976     threadctx &ctx = g_threadctxs.get(my_core_id);
977     INVARIANT(!this->is_snapshot());
978
979     COMPILER_MEMORY_FENCE;
980     u_.commit_epoch = ticker::s_instance.global_current_tick();
981     COMPILER_MEMORY_FENCE;
982
983     tid_t ret = ctx.last_commit_tid_;
984     INVARIANT(ret == dbtuple::MIN_TID || CoreId(ret) == my_core_id);
985     if (u_.commit_epoch != EpochId(ret))
986       ret = MakeTid(0, 0, u_.commit_epoch);
987
988     // What is this? Is txn_proto1_impl used?
989     if (g_hack->status_.load(std::memory_order_acquire))
990       g_hack->global_tid_.fetch_add(1, std::memory_order_acq_rel);
991
992     // XXX(stephentu): I believe this is correct, but not 100% sure
993     //const size_t my_core_id = 0;
994     //tid_t ret = 0;
995     {
996       typename read_set_map::const_iterator it     = this->read_set.begin();
997       typename read_set_map::const_iterator it_end = this->read_set.end();
998       for (; it != it_end; ++it) {
999         if (it->get_tid() > ret)
1000           ret = it->get_tid();
1001       }
1002     }
1003
1004     {
1005       typename dbtuple_write_info_vec::const_iterator it     = write_tuples.begin();
1006       typename dbtuple_write_info_vec::const_iterator it_end = write_tuples.end();
1007       for (; it != it_end; ++it) {
1008         INVARIANT(it->tuple->is_locked());
1009         INVARIANT(it->tuple->is_lock_owner());
1010         INVARIANT(it->tuple->is_write_intent());
1011         INVARIANT(!it->tuple->is_modifying());
1012         INVARIANT(it->tuple->is_latest());
1013         if (it->is_insert())
1014           // we inserted this node, so we don't want to do the checks below
1015           continue;
1016         const tid_t t = it->tuple->version;
1017
1018         // XXX(stephentu): we are overly conservative for now- technically this
1019         // abort isn't necessary (we really should just write the value in the correct
1020         // position)
1021         //if (EpochId(t) > u_.commit_epoch) {
1022         //  std::cerr << "t: " << g_proto_version_str(t) << std::endl;
1023         //  std::cerr << "epoch: " << u_.commit_epoch << std::endl;
1024         //  this->dump_debug_info();
1025         //}
1026
1027         // t == dbtuple::MAX_TID when a txn does an insert of a new tuple
1028         // followed by 1+ writes to the same tuple.
1029         INVARIANT(EpochId(t) <= u_.commit_epoch || t == dbtuple::MAX_TID);
1030         if (t != dbtuple::MAX_TID && t > ret)
1031           ret = t;
1032       }
1033
1034       INVARIANT(EpochId(ret) == u_.commit_epoch);
1035       ret = MakeTid(my_core_id, NumId(ret) + 1, u_.commit_epoch);
1036     }
1037
1038     // XXX(stephentu): this txn hasn't actually been commited yet,
1039     // and could potentially be aborted - but it's ok to increase this #, since
1040     // subsequent txns on this core will read this # anyways
1041     return (ctx.last_commit_tid_ = ret);
1042   }
1043
1044   inline ALWAYS_INLINE void
1045   on_dbtuple_spill(dbtuple *tuple_ahead, dbtuple *tuple)
1046   {
1047 #ifdef PROTO2_CAN_DISABLE_GC
1048     if (!IsGCEnabled())
1049       return;
1050 #endif
1051
1052     INVARIANT(rcu::s_instance.in_rcu_region());
1053     INVARIANT(!tuple->is_latest());
1054
1055     // >= not > only b/c of the special case of inserting a new tuple +
1056     // overwriting the newly inserted record with a longer sequence of bytes in
1057     // the *same* txn
1058     INVARIANT(tuple_ahead->version >= tuple->version);
1059
1060     if (tuple->is_deleting()) {
1061       INVARIANT(tuple->is_locked());
1062       INVARIANT(tuple->is_lock_owner());
1063       // already on queue
1064       return;
1065     }
1066
1067     const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
1068     INVARIANT(to_read_only_tick(EpochId(tuple->version)) <= ro_tick);
1069
1070 #ifdef CHECK_INVARIANTS
1071     uint64_t exp = 0;
1072     INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
1073 #endif
1074
1075     // when all snapshots are happening >= the current epoch,
1076     // then we can safely remove tuple
1077     threadctx &ctx = g_threadctxs.my();
1078     ctx.queue_.enqueue(
1079         delete_entry(tuple_ahead, tuple_ahead->version,
1080           tuple, marked_ptr<std::string>(), nullptr),
1081         ro_tick);
1082   }
1083
1084   inline ALWAYS_INLINE void
1085   on_logical_delete(dbtuple *tuple, const std::string &key, concurrent_btree *btr)
1086   {
1087 #ifdef PROTO2_CAN_DISABLE_GC
1088     if (!IsGCEnabled())
1089       return;
1090 #endif
1091
1092     INVARIANT(tuple->is_locked());
1093     INVARIANT(tuple->is_lock_owner());
1094     INVARIANT(tuple->is_write_intent());
1095     INVARIANT(tuple->is_latest());
1096     INVARIANT(tuple->is_deleting());
1097     INVARIANT(!tuple->size);
1098     INVARIANT(rcu::s_instance.in_rcu_region());
1099
1100     const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
1101     threadctx &ctx = g_threadctxs.my();
1102
1103 #ifdef CHECK_INVARIANTS
1104     uint64_t exp = 0;
1105     INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
1106 #endif
1107
1108     if (likely(key.size() <= tuple->alloc_size)) {
1109       NDB_MEMCPY(tuple->get_value_start(), key.data(), key.size());
1110       tuple->size = key.size();
1111
1112       // eligible for deletion when all snapshots >= the current epoch
1113       marked_ptr<std::string> mpx;
1114       mpx.set_flags(0x1);
1115
1116       ctx.queue_.enqueue(
1117           delete_entry(nullptr, tuple->version, tuple, mpx, btr),
1118           ro_tick);
1119     } else {
1120       // this is a rare event
1121       ++g_evt_dbtuple_no_space_for_delkey;
1122       std::string *spx = nullptr;
1123       if (ctx.pool_.empty()) {
1124         spx = new std::string(key.data(), key.size()); // XXX: use numa memory?
1125       } else {
1126         spx = ctx.pool_.front();
1127         ctx.pool_.pop_front();
1128         spx->assign(key.data(), key.size());
1129       }
1130       INVARIANT(spx);
1131
1132       marked_ptr<std::string> mpx(spx);
1133       mpx.set_flags(0x1);
1134
1135       ctx.queue_.enqueue(
1136           delete_entry(nullptr, tuple->version, tuple, mpx, btr),
1137           ro_tick);
1138     }
1139   }
1140
1141   void
1142   on_post_rcu_region_completion()
1143   {
1144 #ifdef PROTO2_CAN_DISABLE_GC
1145     if (!IsGCEnabled())
1146       return;
1147 #endif
1148     const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
1149     if (unlikely(!last_tick_ex))
1150       return;
1151     // we subtract one from the global last tick, because of the way
1152     // consistent TIDs are computed, the global_last_tick_exclusive() can
1153     // increase by at most one tick during a transaction.
1154     const uint64_t ro_tick_ex = to_read_only_tick(last_tick_ex - 1);
1155     if (unlikely(!ro_tick_ex))
1156       // won't have anything to clean
1157       return;
1158     // all reads happening at >= ro_tick_geq
1159     const uint64_t ro_tick_geq = ro_tick_ex - 1;
1160     threadctx &ctx = g_threadctxs.my();
1161     clean_up_to_including(ctx, ro_tick_geq);
1162   }
1163
1164 private:
1165
1166   union {
1167     // the global epoch this txn is running in (this # is read when it starts)
1168     // -- snapshot txns only
1169     uint64_t last_consistent_tid;
1170     // the epoch for this txn -- committing non-snapshot txns only
1171     uint64_t commit_epoch;
1172   } u_;
1173 };
1174
1175 // txn_btree_handler specialization
1176 template <>
1177 struct base_txn_btree_handler<transaction_proto2> {
1178   static inline void
1179   on_construct()
1180   {
1181 #ifndef PROTO2_CAN_DISABLE_GC
1182     transaction_proto2_static::InitGC();
1183 #endif
1184   }
1185   static const bool has_background_task = true;
1186 };
1187
1188 template <>
1189 struct txn_epoch_sync<transaction_proto2> : public transaction_proto2_static {
1190   static void
1191   sync()
1192   {
1193     wait_an_epoch();
1194     if (txn_logger::IsPersistenceEnabled())
1195       txn_logger::wait_until_current_point_persisted();
1196   }
1197   static void
1198   finish()
1199   {
1200     if (txn_logger::IsPersistenceEnabled())
1201       txn_logger::wait_until_current_point_persisted();
1202   }
1203   static void
1204   thread_init(bool loader)
1205   {
1206     if (!txn_logger::IsPersistenceEnabled())
1207       return;
1208     const unsigned long my_core_id = coreid::core_id();
1209     // try to initialize using numa allocator
1210     txn_logger::persist_ctx_for(
1211         my_core_id,
1212         loader ? txn_logger::INITMODE_REG : txn_logger::INITMODE_RCU);
1213   }
1214   static void
1215   thread_end()
1216   {
1217     if (!txn_logger::IsPersistenceEnabled())
1218       return;
1219     const unsigned long my_core_id = coreid::core_id();
1220     txn_logger::persist_ctx &ctx =
1221       txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_NONE);
1222     if (unlikely(!ctx.init_))
1223       return;
1224     txn_logger::persist_stats &stats =
1225       txn_logger::g_persist_stats[my_core_id];
1226     txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
1227     txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
1228     if (txn_logger::IsCompressionEnabled() &&
1229         ctx.horizon_->header()->nentries_) {
1230       INVARIANT(ctx.horizon_->datasize());
1231       const uint64_t npushed =
1232         push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
1233       if (npushed)
1234         util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
1235     }
1236     txn_logger::pbuffer *px = pull_buf.peek();
1237     if (!px || !px->header()->nentries_) {
1238       //std::cerr << "core " << my_core_id
1239       //          << " nothing to push to logger" << std::endl;
1240       return;
1241     }
1242     //std::cerr << "core " << my_core_id
1243     //          << " pushing buffer to logger" << std::endl;
1244     txn_logger::pbuffer *px0 = pull_buf.deq();
1245     util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
1246     INVARIANT(px0 == px);
1247     push_buf.enq(px0);
1248   }
1249   static std::tuple<uint64_t, uint64_t, double>
1250   compute_ntxn_persisted()
1251   {
1252     if (!txn_logger::IsPersistenceEnabled())
1253       return std::make_tuple(0, 0, 0.0);
1254     return txn_logger::compute_ntxns_persisted_statistics();
1255   }
1256   static void
1257   reset_ntxn_persisted()
1258   {
1259     if (!txn_logger::IsPersistenceEnabled())
1260       return;
1261     txn_logger::clear_ntxns_persisted_statistics();
1262   }
1263 };
1264
1265 #endif /* _NDB_TXN_PROTO2_IMPL_H_ */