benchmark silo added
[c11concurrency-benchmarks.git] / silo / tuple.h
diff --git a/silo/tuple.h b/silo/tuple.h
new file mode 100644 (file)
index 0000000..5817923
--- /dev/null
@@ -0,0 +1,1099 @@
+#ifndef _NDB_TUPLE_H_
+#define _NDB_TUPLE_H_
+
+#include <atomic>
+#include <vector>
+#include <string>
+#include <utility>
+#include <limits>
+#include <unordered_map>
+#include <ostream>
+#include <thread>
+
+#include "amd64.h"
+#include "core.h"
+#include "counter.h"
+#include "macros.h"
+#include "varkey.h"
+#include "util.h"
+#include "rcu.h"
+#include "thread.h"
+#include "spinlock.h"
+#include "small_unordered_map.h"
+#include "prefetch.h"
+#include "ownership_checker.h"
+
+// debugging tool
+//#define TUPLE_LOCK_OWNERSHIP_CHECKING
+
+template <template <typename> class Protocol, typename Traits>
+  class transaction; // forward decl
+
+// XXX: hack
+extern std::string (*g_proto_version_str)(uint64_t v);
+
+/**
+ * A dbtuple is the type of value which we stick
+ * into underlying (non-transactional) data structures- it
+ * also contains the memory of the value
+ */
+struct dbtuple {
+  friend std::ostream &
+  operator<<(std::ostream &o, const dbtuple &tuple);
+
+public:
+  // trying to save space by putting constraints
+  // on node maximums
+  typedef uint32_t version_t;
+  typedef uint16_t node_size_type;
+  typedef uint64_t tid_t;
+  typedef uint8_t * record_type;
+  typedef const uint8_t * const_record_type;
+  typedef size_t size_type;
+  typedef std::string string_type;
+
+  static const tid_t MIN_TID = 0;
+  static const tid_t MAX_TID = (tid_t) -1;
+
+  // lock ownership helpers- works by recording all tuple
+  // locks obtained in each transaction, and then when the txn
+  // finishes, calling AssertAllTupleLocksReleased(), which makes
+  // sure the current thread is no longer the owner of any locks
+  // acquired during the txn
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+  static inline void
+  TupleLockRegionBegin()
+  {
+    ownership_checker<dbtuple, dbtuple>::NodeLockRegionBegin();
+  }
+
+  // is used to signal the end of a tuple lock region
+  static inline void
+  AssertAllTupleLocksReleased()
+  {
+    ownership_checker<dbtuple, dbtuple>::AssertAllNodeLocksReleased();
+  }
+private:
+  static inline void
+  AddTupleToLockRegion(const dbtuple *n)
+  {
+    ownership_checker<dbtuple, dbtuple>::AddNodeToLockRegion(n);
+  }
+#endif
+
+private:
+  static const version_t HDR_LOCKED_MASK = 0x1;
+
+  static const version_t HDR_DELETING_SHIFT = 1;
+  static const version_t HDR_DELETING_MASK = 0x1 << HDR_DELETING_SHIFT;
+
+  static const version_t HDR_WRITE_INTENT_SHIFT = 2;
+  static const version_t HDR_WRITE_INTENT_MASK = 0x1 << HDR_WRITE_INTENT_SHIFT;
+
+  static const version_t HDR_MODIFYING_SHIFT = 3;
+  static const version_t HDR_MODIFYING_MASK = 0x1 << HDR_MODIFYING_SHIFT;
+
+  static const version_t HDR_LATEST_SHIFT = 4;
+  static const version_t HDR_LATEST_MASK = 0x1 << HDR_LATEST_SHIFT;
+
+  static const version_t HDR_VERSION_SHIFT = 5;
+  static const version_t HDR_VERSION_MASK = ((version_t)-1) << HDR_VERSION_SHIFT;
+
+public:
+
+#ifdef TUPLE_MAGIC
+  class magic_failed_exception: public std::exception {};
+  static const uint8_t TUPLE_MAGIC = 0x29U;
+  uint8_t magic;
+  inline ALWAYS_INLINE void CheckMagic() const
+  {
+    if (unlikely(magic != TUPLE_MAGIC)) {
+      print(1);
+      // so we can catch it later and print out useful debugging output
+      throw magic_failed_exception();
+    }
+  }
+#else
+  inline ALWAYS_INLINE void CheckMagic() const {}
+#endif
+
+  // NB(stephentu): ABA problem happens after some multiple of
+  // 2^(NBits(version_t)-6) concurrent modifications- somewhat low probability
+  // event, so we let it happen
+  //
+  // <-- low bits
+  // [ locked | deleting | write_intent | modifying | latest | version ]
+  // [  0..1  |   1..2   |    2..3      |   3..4    |  4..5  |  5..32  ]
+  volatile version_t hdr;
+
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+  std::thread::id lock_owner;
+#endif
+
+  // uninterpreted TID
+  tid_t version;
+
+  // small sizes on purpose
+  node_size_type size; // actual size of record
+                       // (only meaningful is the deleting bit is not set)
+  node_size_type alloc_size; // max size record allowed. is the space
+                             // available for the record buf
+
+  dbtuple *next; // be very careful about traversing this pointer,
+                 // GC is capable of reaping it at certain (well defined)
+                 // points, and will not bother to set it to null
+
+#ifdef TUPLE_CHECK_KEY
+  // for debugging
+  std::string key;
+  void *tree;
+#endif
+
+#ifdef CHECK_INVARIANTS
+  // for debugging
+  std::atomic<uint64_t> opaque;
+#endif
+
+  // must be last field
+  uint8_t value_start[0];
+
+  void print(std::ostream &o, unsigned len) const;
+
+private:
+  // private ctor/dtor b/c we do some special memory stuff
+  // ctors start node off as latest node
+
+  static inline ALWAYS_INLINE node_size_type
+  CheckBounds(size_type s)
+  {
+    INVARIANT(s <= std::numeric_limits<node_size_type>::max());
+    return s;
+  }
+
+  dbtuple(const dbtuple &) = delete;
+  dbtuple(dbtuple &&) = delete;
+  dbtuple &operator=(const dbtuple &) = delete;
+
+  // creates a (new) record with a tentative value at MAX_TID
+  dbtuple(size_type size, size_type alloc_size, bool acquire_lock)
+    :
+#ifdef TUPLE_MAGIC
+      magic(TUPLE_MAGIC),
+#endif
+      hdr(HDR_LATEST_MASK |
+          (acquire_lock ? (HDR_LOCKED_MASK | HDR_WRITE_INTENT_MASK) : 0) |
+          (!size ? HDR_DELETING_MASK : 0))
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+      , lock_owner()
+#endif
+      , version(MAX_TID)
+      , size(CheckBounds(size))
+      , alloc_size(CheckBounds(alloc_size))
+      , next(nullptr)
+#ifdef TUPLE_CHECK_KEY
+      , key()
+      , tree(nullptr)
+#endif
+#ifdef CHECK_INVARIANTS
+      , opaque(0)
+#endif
+  {
+    INVARIANT(((char *)this) + sizeof(*this) == (char *) &value_start[0]);
+    INVARIANT(is_latest());
+    INVARIANT(size || is_deleting());
+    ++g_evt_dbtuple_creates;
+    g_evt_dbtuple_bytes_allocated += alloc_size + sizeof(dbtuple);
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+    if (acquire_lock) {
+      lock_owner = std::this_thread::get_id();
+      AddTupleToLockRegion(this);
+      INVARIANT(is_lock_owner());
+    }
+#endif
+    COMPILER_MEMORY_FENCE; // for acquire_lock
+  }
+
+  // creates a record at version derived from base
+  // (inheriting its value).
+  dbtuple(tid_t version,
+          struct dbtuple *base,
+          size_type alloc_size,
+          bool set_latest)
+    :
+#ifdef TUPLE_MAGIC
+      magic(TUPLE_MAGIC),
+#endif
+      hdr(set_latest ? HDR_LATEST_MASK : 0)
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+      , lock_owner()
+#endif
+      , version(version)
+      , size(base->size)
+      , alloc_size(CheckBounds(alloc_size))
+      , next(base->next)
+#ifdef TUPLE_CHECK_KEY
+      , key()
+      , tree(nullptr)
+#endif
+#ifdef CHECK_INVARIANTS
+      , opaque(0)
+#endif
+  {
+    INVARIANT(size <= alloc_size);
+    INVARIANT(set_latest == is_latest());
+    if (base->is_deleting())
+      mark_deleting();
+    NDB_MEMCPY(&value_start[0], base->get_value_start(), size);
+    ++g_evt_dbtuple_creates;
+    g_evt_dbtuple_bytes_allocated += alloc_size + sizeof(dbtuple);
+  }
+
+  // creates a spill record, copying in the *old* value if necessary, but
+  // setting the size to the *new* value
+  dbtuple(tid_t version,
+          const_record_type r,
+          size_type old_size,
+          size_type new_size,
+          size_type alloc_size,
+          struct dbtuple *next,
+          bool set_latest,
+          bool needs_old_value)
+    :
+#ifdef TUPLE_MAGIC
+      magic(TUPLE_MAGIC),
+#endif
+      hdr((set_latest ? HDR_LATEST_MASK : 0) | (!new_size ? HDR_DELETING_MASK : 0))
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+      , lock_owner()
+#endif
+      , version(version)
+      , size(CheckBounds(new_size))
+      , alloc_size(CheckBounds(alloc_size))
+      , next(next)
+#ifdef TUPLE_CHECK_KEY
+      , key()
+      , tree(nullptr)
+#endif
+#ifdef CHECK_INVARIANTS
+      , opaque(0)
+#endif
+  {
+    INVARIANT(!needs_old_value || old_size <= alloc_size);
+    INVARIANT(new_size <= alloc_size);
+    INVARIANT(set_latest == is_latest());
+    INVARIANT(new_size || is_deleting());
+    if (needs_old_value)
+      NDB_MEMCPY(&value_start[0], r, old_size);
+    ++g_evt_dbtuple_creates;
+    g_evt_dbtuple_bytes_allocated += alloc_size + sizeof(dbtuple);
+  }
+
+  friend class rcu;
+  ~dbtuple();
+
+  static event_avg_counter g_evt_avg_dbtuple_stable_version_spins;
+  static event_avg_counter g_evt_avg_dbtuple_lock_acquire_spins;
+  static event_avg_counter g_evt_avg_dbtuple_read_retries;
+
+public:
+
+  enum ReadStatus {
+    READ_FAILED,
+    READ_EMPTY,
+    READ_RECORD,
+  };
+
+  inline void
+  prefetch() const
+  {
+#ifdef TUPLE_PREFETCH
+    prefetch_bytes(this, sizeof(*this) + alloc_size);
+#endif
+  }
+
+  // gcs *this* instance, ignoring the chain
+  void gc_this();
+
+  inline bool
+  is_locked() const
+  {
+    return IsLocked(hdr);
+  }
+
+  static inline bool
+  IsLocked(version_t v)
+  {
+    return v & HDR_LOCKED_MASK;
+  }
+
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+  inline bool
+  is_lock_owner() const
+  {
+    return std::this_thread::get_id() == lock_owner;
+  }
+#else
+  inline bool
+  is_lock_owner() const
+  {
+    return true;
+  }
+#endif
+
+  inline version_t
+  lock(bool write_intent)
+  {
+    // XXX: implement SPINLOCK_BACKOFF
+    CheckMagic();
+#ifdef ENABLE_EVENT_COUNTERS
+    unsigned nspins = 0;
+#endif
+    version_t v = hdr;
+    const version_t lockmask = write_intent ?
+      (HDR_LOCKED_MASK | HDR_WRITE_INTENT_MASK) :
+      (HDR_LOCKED_MASK);
+    while (IsLocked(v) ||
+           !__sync_bool_compare_and_swap(&hdr, v, v | lockmask)) {
+      nop_pause();
+      v = hdr;
+#ifdef ENABLE_EVENT_COUNTERS
+      ++nspins;
+#endif
+    }
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+    lock_owner = std::this_thread::get_id();
+    AddTupleToLockRegion(this);
+    INVARIANT(is_lock_owner());
+#endif
+    COMPILER_MEMORY_FENCE;
+    INVARIANT(IsLocked(hdr));
+    INVARIANT(!write_intent || IsWriteIntent(hdr));
+    INVARIANT(!IsModifying(hdr));
+#ifdef ENABLE_EVENT_COUNTERS
+    g_evt_avg_dbtuple_lock_acquire_spins.offer(nspins);
+#endif
+    return hdr;
+  }
+
+  inline void
+  unlock()
+  {
+    CheckMagic();
+    version_t v = hdr;
+    bool newv = false;
+    INVARIANT(IsLocked(v));
+    INVARIANT(is_lock_owner());
+    if (IsModifying(v) || IsWriteIntent(v)) {
+      newv = true;
+      const version_t n = Version(v);
+      v &= ~HDR_VERSION_MASK;
+      v |= (((n + 1) << HDR_VERSION_SHIFT) & HDR_VERSION_MASK);
+    }
+    // clear locked + modifying bits
+    v &= ~(HDR_LOCKED_MASK | HDR_MODIFYING_MASK | HDR_WRITE_INTENT_MASK);
+    if (newv) {
+      INVARIANT(!reader_check_version(v));
+      INVARIANT(!writer_check_version(v));
+    }
+    INVARIANT(!IsLocked(v));
+    INVARIANT(!IsModifying(v));
+    INVARIANT(!IsWriteIntent(v));
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+    lock_owner = std::thread::id();
+    INVARIANT(!is_lock_owner());
+#endif
+    COMPILER_MEMORY_FENCE;
+    hdr = v;
+  }
+
+  inline bool
+  is_deleting() const
+  {
+    return IsDeleting(hdr);
+  }
+
+  static inline bool
+  IsDeleting(version_t v)
+  {
+    return v & HDR_DELETING_MASK;
+  }
+
+  inline void
+  mark_deleting()
+  {
+    CheckMagic();
+    // the lock on the latest version guards non-latest versions
+    INVARIANT(!is_latest() || is_locked());
+    INVARIANT(!is_latest() || is_lock_owner());
+    INVARIANT(!is_deleting());
+    hdr |= HDR_DELETING_MASK;
+  }
+
+  inline void
+  clear_deleting()
+  {
+    CheckMagic();
+    INVARIANT(is_locked());
+    INVARIANT(is_lock_owner());
+    INVARIANT(is_deleting());
+    hdr &= ~HDR_DELETING_MASK;
+  }
+
+  inline bool
+  is_modifying() const
+  {
+    return IsModifying(hdr);
+  }
+
+  inline void
+  mark_modifying()
+  {
+    CheckMagic();
+    version_t v = hdr;
+    INVARIANT(IsLocked(v));
+    INVARIANT(is_lock_owner());
+    //INVARIANT(!IsModifying(v)); // mark_modifying() must be re-entrant
+    v |= HDR_MODIFYING_MASK;
+    COMPILER_MEMORY_FENCE; // XXX: is this fence necessary?
+    hdr = v;
+    COMPILER_MEMORY_FENCE;
+  }
+
+  static inline bool
+  IsModifying(version_t v)
+  {
+    return v & HDR_MODIFYING_MASK;
+  }
+
+  inline bool
+  is_write_intent() const
+  {
+    return IsWriteIntent(hdr);
+  }
+
+  static inline bool
+  IsWriteIntent(version_t v)
+  {
+    return v & HDR_WRITE_INTENT_MASK;
+  }
+
+  inline bool
+  is_latest() const
+  {
+    return IsLatest(hdr);
+  }
+
+  static inline bool
+  IsLatest(version_t v)
+  {
+    return v & HDR_LATEST_MASK;
+  }
+
+  inline void
+  clear_latest()
+  {
+    CheckMagic();
+    INVARIANT(is_locked());
+    INVARIANT(is_lock_owner());
+    INVARIANT(is_latest());
+    hdr &= ~HDR_LATEST_MASK;
+  }
+
+  static inline version_t
+  Version(version_t v)
+  {
+    return (v & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
+  }
+
+  inline version_t
+  reader_stable_version(bool allow_write_intent) const
+  {
+    version_t v = hdr;
+#ifdef ENABLE_EVENT_COUNTERS
+    unsigned long nspins = 0;
+#endif
+    while (IsModifying(v) ||
+           (!allow_write_intent && IsWriteIntent(v))) {
+      nop_pause();
+      v = hdr;
+#ifdef ENABLE_EVENT_COUNTERS
+      ++nspins;
+#endif
+    }
+    COMPILER_MEMORY_FENCE;
+#ifdef ENABLE_EVENT_COUNTERS
+    g_evt_avg_dbtuple_stable_version_spins.offer(nspins);
+#endif
+    return v;
+  }
+
+  /**
+   * returns true if succeeded, false otherwise
+   */
+  inline bool
+  try_writer_stable_version(version_t &v, unsigned int spins) const
+  {
+    v = hdr;
+    while (IsWriteIntent(v) && spins--) {
+      INVARIANT(IsLocked(v));
+      nop_pause();
+      v = hdr;
+    }
+    const bool ret = !IsWriteIntent(v);
+    COMPILER_MEMORY_FENCE;
+    INVARIANT(ret || IsLocked(v));
+    INVARIANT(!ret || !IsModifying(v));
+    return ret;
+  }
+
+  inline version_t
+  unstable_version() const
+  {
+    return hdr;
+  }
+
+  inline bool
+  reader_check_version(version_t version) const
+  {
+    COMPILER_MEMORY_FENCE;
+    // are the versions the same, modulo the
+    // {locked, write_intent, latest} bits?
+    const version_t MODULO_BITS =
+      (HDR_LOCKED_MASK | HDR_WRITE_INTENT_MASK | HDR_LATEST_MASK);
+    return (hdr & ~MODULO_BITS) == (version & ~MODULO_BITS);
+  }
+
+  inline bool
+  writer_check_version(version_t version) const
+  {
+    COMPILER_MEMORY_FENCE;
+    return hdr == version;
+  }
+
+  inline ALWAYS_INLINE struct dbtuple *
+  get_next()
+  {
+    return next;
+  }
+
+  inline const struct dbtuple *
+  get_next() const
+  {
+    return next;
+  }
+
+  inline ALWAYS_INLINE void
+  set_next(struct dbtuple *next)
+  {
+    CheckMagic();
+    this->next = next;
+  }
+
+  inline void
+  clear_next()
+  {
+    CheckMagic();
+    this->next = nullptr;
+  }
+
+  inline ALWAYS_INLINE uint8_t *
+  get_value_start()
+  {
+    CheckMagic();
+    return &value_start[0];
+  }
+
+  inline ALWAYS_INLINE const uint8_t *
+  get_value_start() const
+  {
+    return &value_start[0];
+  }
+
+  // worst name ever...
+  inline bool
+  is_not_behind(tid_t t) const
+  {
+    return version <= t;
+  }
+
+private:
+
+#ifdef ENABLE_EVENT_COUNTERS
+  struct scoped_recorder {
+    scoped_recorder(unsigned long &n) : n(&n) {}
+    ~scoped_recorder()
+    {
+      g_evt_avg_dbtuple_read_retries.offer(*n);
+    }
+  private:
+    unsigned long *n;
+  };
+#endif
+
+  // written to be non-recursive
+  template <typename Reader, typename StringAllocator>
+  static ReadStatus
+  record_at_chain(
+      const dbtuple *starting, tid_t t, tid_t &start_t,
+      Reader &reader, StringAllocator &sa, bool allow_write_intent)
+  {
+#ifdef ENABLE_EVENT_COUNTERS
+    unsigned long nretries = 0;
+    scoped_recorder rec(nretries);
+#endif
+    const dbtuple *current = starting;
+  loop:
+    INVARIANT(current->version != MAX_TID);
+    const version_t v = current->reader_stable_version(allow_write_intent);
+    const struct dbtuple *p;
+    const bool found = current->is_not_behind(t);
+    if (found) {
+      start_t = current->version;
+      const size_t read_sz = IsDeleting(v) ? 0 : current->size;
+      if (unlikely(read_sz && !reader(current->get_value_start(), read_sz, sa)))
+        goto retry;
+      if (unlikely(!current->reader_check_version(v)))
+        goto retry;
+      return read_sz ? READ_RECORD : READ_EMPTY;
+    } else {
+      p = current->get_next();
+    }
+    if (unlikely(!current->reader_check_version(v)))
+      goto retry;
+    if (p) {
+      current = p;
+      goto loop;
+    }
+    // see note in record_at()
+    start_t = MIN_TID;
+    return READ_EMPTY;
+  retry:
+#ifdef ENABLE_EVENT_COUNTERS
+    ++nretries;
+#endif
+    goto loop;
+  }
+
+  // we force one level of inlining, but don't force record_at_chain()
+  // to be inlined
+  template <typename Reader, typename StringAllocator>
+  inline ALWAYS_INLINE ReadStatus
+  record_at(
+      tid_t t, tid_t &start_t,
+      Reader &reader, StringAllocator &sa, bool allow_write_intent) const
+  {
+#ifdef ENABLE_EVENT_COUNTERS
+    unsigned long nretries = 0;
+    scoped_recorder rec(nretries);
+#endif
+    if (unlikely(version == MAX_TID)) {
+      // XXX(stephentu): HACK! we use MAX_TID to indicate a tentative
+      // "insert"- the actual latest value is empty.
+      //
+      // since our system is screwed anyways if we ever reach MAX_TID, this
+      // is OK for now, but a real solution should exist at some point
+      start_t = MIN_TID;
+      return READ_EMPTY;
+    }
+  loop:
+    const version_t v = reader_stable_version(allow_write_intent);
+    const struct dbtuple *p;
+    const bool found = is_not_behind(t);
+    if (found) {
+      //if (unlikely(!IsLatest(v)))
+      //  return READ_FAILED;
+      start_t = version;
+      const size_t read_sz = IsDeleting(v) ? 0 : size;
+      if (unlikely(read_sz && !reader(get_value_start(), read_sz, sa)))
+        goto retry;
+      if (unlikely(!reader_check_version(v)))
+        goto retry;
+      return read_sz ? READ_RECORD : READ_EMPTY;
+    } else {
+      p = get_next();
+    }
+    if (unlikely(!reader_check_version(v)))
+      goto retry;
+    if (p)
+      return record_at_chain(p, t, start_t, reader, sa, allow_write_intent);
+    // NB(stephentu): if we reach the end of a chain then we assume that
+    // the record exists as a deleted record.
+    //
+    // This is safe because we have been very careful to not garbage collect
+    // elements along the chain until it is guaranteed that the record
+    // is superceded by later record in any consistent read. Therefore,
+    // if we reach the end of the chain, then it *must* be the case that
+    // the record does not actually exist.
+    //
+    // Note that MIN_TID is the *wrong* tid to use here given wrap-around- we
+    // really should be setting this value to the tid which represents the
+    // oldest TID possible in the system. But we currently don't implement
+    // wrap around
+    start_t = MIN_TID;
+    return READ_EMPTY;
+  retry:
+#ifdef ENABLE_EVENT_COUNTERS
+    ++nretries;
+#endif
+    goto loop;
+  }
+
+  static event_counter g_evt_dbtuple_creates;
+  static event_counter g_evt_dbtuple_logical_deletes;
+  static event_counter g_evt_dbtuple_physical_deletes;
+  static event_counter g_evt_dbtuple_bytes_allocated;
+  static event_counter g_evt_dbtuple_bytes_freed;
+  static event_counter g_evt_dbtuple_spills;
+  static event_counter g_evt_dbtuple_inplace_buf_insufficient;
+  static event_counter g_evt_dbtuple_inplace_buf_insufficient_on_spill;
+  static event_avg_counter g_evt_avg_record_spill_len;
+
+public:
+
+  /**
+   * Read the record at tid t. Returns true if such a record exists, false
+   * otherwise (ie the record was GC-ed, or other reasons). On a successful
+   * read, the value @ start_t will be stored in r
+   *
+   * NB(stephentu): calling stable_read() while holding the lock
+   * is an error- this will cause deadlock
+   */
+  template <typename Reader, typename StringAllocator>
+  inline ALWAYS_INLINE ReadStatus
+  stable_read(
+      tid_t t, tid_t &start_t,
+      Reader &reader, StringAllocator &sa,
+      bool allow_write_intent) const
+  {
+    return record_at(t, start_t, reader, sa, allow_write_intent);
+  }
+
+  inline bool
+  is_latest_version(tid_t t) const
+  {
+    return is_latest() && is_not_behind(t);
+  }
+
+  bool
+  stable_is_latest_version(tid_t t) const
+  {
+    version_t v = 0;
+    if (!try_writer_stable_version(v, 16))
+      return false;
+    // now v is a stable version
+    INVARIANT(!IsWriteIntent(v));
+    INVARIANT(!IsModifying(v));
+    const bool ret = IsLatest(v) && is_not_behind(t);
+    // only check_version() if the answer would be true- otherwise,
+    // no point in doing a version check
+    if (ret && writer_check_version(v))
+      return true;
+    else
+      // no point in retrying, since we know it will fail (since we had a
+      // version change)
+      return false;
+  }
+
+  inline bool
+  latest_value_is_nil() const
+  {
+    return is_latest() && size == 0;
+  }
+
+  inline bool
+  stable_latest_value_is_nil() const
+  {
+    version_t v = 0;
+    if (!try_writer_stable_version(v, 16))
+      return false;
+    INVARIANT(!IsWriteIntent(v));
+    INVARIANT(!IsModifying(v));
+    const bool ret = IsLatest(v) && size == 0;
+    if (ret && writer_check_version(v))
+      return true;
+    else
+      return false;
+  }
+
+  struct write_record_ret {
+    write_record_ret() : head_(), rest_(), forced_spill_() {}
+    write_record_ret(dbtuple *head, dbtuple* rest, bool forced_spill)
+      : head_(head), rest_(rest), forced_spill_(forced_spill)
+    {
+      INVARIANT(head);
+      INVARIANT(head != rest);
+      INVARIANT(!forced_spill || rest);
+    }
+    dbtuple *head_;
+    dbtuple *rest_;
+    bool forced_spill_;
+  };
+
+  // XXX: kind of hacky, but we do this to avoid virtual
+  // functions / passing multiple function pointers around
+  enum TupleWriterMode {
+    TUPLE_WRITER_NEEDS_OLD_VALUE, // all three args ignored
+    TUPLE_WRITER_COMPUTE_NEEDED,
+    TUPLE_WRITER_COMPUTE_DELTA_NEEDED, // last two args ignored
+    TUPLE_WRITER_DO_WRITE,
+    TUPLE_WRITER_DO_DELTA_WRITE,
+  };
+  typedef size_t (*tuple_writer_t)(TupleWriterMode, const void *, uint8_t *, size_t);
+
+  /**
+   * Always writes the record in the latest (newest) version slot,
+   * not asserting whether or not inserting r @ t would violate the
+   * sorted order invariant
+   *
+   * ret.first  = latest tuple after the write (guaranteed to not be nullptr)
+   * ret.second = old version of tuple, iff no overwrite (can be nullptr)
+   *
+   * Note: if this != ret.first, then we need a tree replacement
+   */
+  template <typename Transaction>
+  write_record_ret
+  write_record_at(const Transaction *txn, tid_t t,
+                  const void *v, tuple_writer_t writer)
+  {
+#ifndef DISABLE_OVERWRITE_IN_PLACE
+    CheckMagic();
+    INVARIANT(is_locked());
+    INVARIANT(is_lock_owner());
+    INVARIANT(is_latest());
+    INVARIANT(is_write_intent());
+
+    const size_t new_sz =
+      v ? writer(TUPLE_WRITER_COMPUTE_NEEDED, v, get_value_start(), size) : 0;
+    INVARIANT(!v || new_sz);
+    INVARIANT(is_deleting() || size);
+    const size_t old_sz = is_deleting() ? 0 : size;
+
+    if (!new_sz)
+      ++g_evt_dbtuple_logical_deletes;
+
+    // try to overwrite this record
+    if (likely(txn->can_overwrite_record_tid(version, t) && old_sz)) {
+      INVARIANT(!is_deleting());
+      // see if we have enough space
+      if (likely(new_sz <= alloc_size)) {
+        // directly update in place
+        mark_modifying();
+        if (v)
+          writer(TUPLE_WRITER_DO_WRITE, v, get_value_start(), old_sz);
+        version = t;
+        size = new_sz;
+        if (!new_sz)
+          mark_deleting();
+        return write_record_ret(this, nullptr, false);
+      }
+
+      //std::cerr
+      //  << "existing: " << g_proto_version_str(version) << std::endl
+      //  << "new     : " << g_proto_version_str(t)       << std::endl
+      //  << "alloc_size : " << alloc_size                << std::endl
+      //  << "new_sz     : " << new_sz                    << std::endl;
+
+      // keep this tuple in the chain (it's wasteful, but not incorrect)
+      // so that cleanup is easier
+      //
+      // XXX(stephentu): alloc_spill() should acquire the lock on
+      // the returned tuple in the ctor, as an optimization
+
+      const bool needs_old_value =
+        writer(TUPLE_WRITER_NEEDS_OLD_VALUE, nullptr, nullptr, 0);
+      INVARIANT(new_sz);
+      INVARIANT(v);
+      dbtuple * const rep =
+        alloc_spill(t, get_value_start(), old_sz, new_sz,
+                    this, true, needs_old_value);
+      writer(TUPLE_WRITER_DO_WRITE, v, rep->get_value_start(), old_sz);
+      INVARIANT(rep->is_latest());
+      INVARIANT(rep->size == new_sz);
+      clear_latest();
+      ++g_evt_dbtuple_inplace_buf_insufficient;
+
+      // [did not spill because of epochs, need to replace this with rep]
+      return write_record_ret(rep, this, false);
+    }
+
+    //std::cerr
+    //  << "existing: " << g_proto_version_str(version) << std::endl
+    //  << "new     : " << g_proto_version_str(t)       << std::endl
+    //  << "alloc_size : " << alloc_size                << std::endl
+    //  << "new_sz     : " << new_sz                    << std::endl;
+
+    // need to spill
+    ++g_evt_dbtuple_spills;
+    g_evt_avg_record_spill_len.offer(size);
+
+    if (new_sz <= alloc_size && old_sz) {
+      INVARIANT(!is_deleting());
+      dbtuple * const spill = alloc(version, this, false);
+      INVARIANT(!spill->is_latest());
+      mark_modifying();
+      set_next(spill);
+      if (v)
+        writer(TUPLE_WRITER_DO_WRITE, v, get_value_start(), size);
+      version = t;
+      size = new_sz;
+      if (!new_sz)
+        mark_deleting();
+      return write_record_ret(this, spill, true);
+    }
+
+    const bool needs_old_value =
+      writer(TUPLE_WRITER_NEEDS_OLD_VALUE, nullptr, nullptr, 0);
+    dbtuple * const rep =
+      alloc_spill(t, get_value_start(), old_sz, new_sz,
+                  this, true, needs_old_value);
+    if (v)
+      writer(TUPLE_WRITER_DO_WRITE, v, rep->get_value_start(), size);
+    INVARIANT(rep->is_latest());
+    INVARIANT(rep->size == new_sz);
+    INVARIANT(new_sz || rep->is_deleting()); // set by alloc_spill()
+    clear_latest();
+    ++g_evt_dbtuple_inplace_buf_insufficient_on_spill;
+    return write_record_ret(rep, this, true);
+#else
+    CheckMagic();
+    INVARIANT(is_locked());
+    INVARIANT(is_lock_owner());
+    INVARIANT(is_latest());
+    INVARIANT(is_write_intent());
+
+    const size_t new_sz =
+      v ? writer(TUPLE_WRITER_COMPUTE_NEEDED, v, get_value_start(), size) : 0;
+    INVARIANT(!v || new_sz);
+    INVARIANT(is_deleting() || size);
+    const size_t old_sz = is_deleting() ? 0 : size;
+
+    if (!new_sz)
+      ++g_evt_dbtuple_logical_deletes;
+
+    const bool needs_old_value =
+      writer(TUPLE_WRITER_NEEDS_OLD_VALUE, nullptr, nullptr, 0);
+    dbtuple * const rep =
+      alloc_spill(t, get_value_start(), old_sz, new_sz,
+                  this, true, needs_old_value);
+    if (v)
+      writer(TUPLE_WRITER_DO_WRITE, v, rep->get_value_start(), size);
+    INVARIANT(rep->is_latest());
+    INVARIANT(rep->size == new_sz);
+    INVARIANT(new_sz || rep->is_deleting()); // set by alloc_spill()
+    clear_latest();
+    ++g_evt_dbtuple_inplace_buf_insufficient_on_spill;
+    return write_record_ret(rep, this, true);
+#endif
+  }
+
+  // NB: we round up allocation sizes because jemalloc will do this
+  // internally anyways, so we might as well grab more usable space (really
+  // just internal vs external fragmentation)
+
+  static inline dbtuple *
+  alloc_first(size_type sz, bool acquire_lock)
+  {
+    INVARIANT(sz <= std::numeric_limits<node_size_type>::max());
+    const size_t max_alloc_sz =
+      std::numeric_limits<node_size_type>::max() + sizeof(dbtuple);
+    const size_t alloc_sz =
+      std::min(
+          util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(dbtuple) + sz),
+          max_alloc_sz);
+    char *p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
+    INVARIANT(p);
+    INVARIANT((alloc_sz - sizeof(dbtuple)) >= sz);
+    return new (p) dbtuple(
+        sz, alloc_sz - sizeof(dbtuple), acquire_lock);
+  }
+
+  static inline dbtuple *
+  alloc(tid_t version, struct dbtuple *base, bool set_latest)
+  {
+    const size_t max_alloc_sz =
+      std::numeric_limits<node_size_type>::max() + sizeof(dbtuple);
+    const size_t alloc_sz =
+      std::min(
+          util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(dbtuple) + base->size),
+          max_alloc_sz);
+    char *p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
+    INVARIANT(p);
+    return new (p) dbtuple(
+        version, base, alloc_sz - sizeof(dbtuple), set_latest);
+  }
+
+  static inline dbtuple *
+  alloc_spill(tid_t version, const_record_type value, size_type oldsz,
+              size_type newsz, struct dbtuple *next, bool set_latest,
+              bool copy_old_value)
+  {
+    INVARIANT(oldsz <= std::numeric_limits<node_size_type>::max());
+    INVARIANT(newsz <= std::numeric_limits<node_size_type>::max());
+
+    const size_t needed_sz =
+      copy_old_value ? std::max(newsz, oldsz) : newsz;
+    const size_t max_alloc_sz =
+      std::numeric_limits<node_size_type>::max() + sizeof(dbtuple);
+    const size_t alloc_sz =
+      std::min(
+          util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(dbtuple) + needed_sz),
+          max_alloc_sz);
+    char *p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
+    INVARIANT(p);
+    return new (p) dbtuple(
+        version, value, oldsz, newsz,
+        alloc_sz - sizeof(dbtuple), next, set_latest, copy_old_value);
+  }
+
+
+private:
+  static inline void
+  destruct_and_free(dbtuple *n)
+  {
+    const size_t alloc_sz = n->alloc_size + sizeof(*n);
+    n->~dbtuple();
+    rcu::s_instance.dealloc(n, alloc_sz);
+  }
+
+public:
+  static void
+  deleter(void *p)
+  {
+    dbtuple * const n = (dbtuple *) p;
+    INVARIANT(!n->is_latest());
+    INVARIANT(!n->is_locked());
+    INVARIANT(!n->is_modifying());
+    destruct_and_free(n);
+  }
+
+  static inline void
+  release(dbtuple *n)
+  {
+    if (unlikely(!n))
+      return;
+    INVARIANT(n->is_locked());
+    INVARIANT(!n->is_latest());
+    rcu::s_instance.free_with_fn(n, deleter);
+  }
+
+  static inline void
+  release_no_rcu(dbtuple *n)
+  {
+    if (unlikely(!n))
+      return;
+    INVARIANT(!n->is_latest());
+    destruct_and_free(n);
+  }
+
+  static std::string
+  VersionInfoStr(version_t v);
+
+}
+#if !defined(TUPLE_CHECK_KEY) && \
+    !defined(CHECK_INVARIANTS) && \
+    !defined(TUPLE_LOCK_OWNERSHIP_CHECKING)
+PACKED
+#endif
+;
+
+#endif /* _NDB_TUPLE_H_ */