benchmark silo added
[c11concurrency-benchmarks.git] / silo / btree.h
diff --git a/silo/btree.h b/silo/btree.h
new file mode 100644 (file)
index 0000000..79a53ad
--- /dev/null
@@ -0,0 +1,1794 @@
+#pragma once
+
+#include <assert.h>
+#include <malloc.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <iostream>
+#include <string>
+#include <vector>
+#include <utility>
+#include <atomic>
+#include <thread>
+
+#include "log2.hh"
+#include "ndb_type_traits.h"
+#include "varkey.h"
+#include "counter.h"
+#include "macros.h"
+#include "prefetch.h"
+#include "amd64.h"
+#include "rcu.h"
+#include "util.h"
+#include "small_vector.h"
+#include "ownership_checker.h"
+
+namespace private_ {
+  template <typename T, typename P> struct u64manip;
+  template <typename P>
+  struct u64manip<uint64_t, P> {
+    static inline uint64_t Load(uint64_t t) { return t; }
+    static inline void Store(uint64_t &t, uint64_t v) { t = v; }
+    static inline uint64_t
+    Lock(uint64_t &t)
+    {
+#ifdef CHECK_INVARIANTS
+      INVARIANT(!P::IsLocked(t));
+      t |= P::HDR_LOCKED_MASK;
+      return t;
+#endif
+      return 0;
+    }
+    static inline uint64_t
+    LockWithSpinCount(uint64_t &t, unsigned &spins)
+    {
+      const uint64_t ret = Lock(t);
+      spins = 0;
+      return ret;
+    }
+    static inline void
+    Unlock(uint64_t &t)
+    {
+#ifdef CHECK_INVARIANTS
+      INVARIANT(P::IsLocked(t));
+      t &= ~(P::HDR_LOCKED_MASK | P::HDR_MODIFYING_MASK);
+#endif
+    }
+    static inline uint64_t
+    StableVersion(uint64_t t)
+    {
+      INVARIANT(!P::IsModifying(t));
+      return t;
+    }
+    static inline bool
+    CheckVersion(uint64_t t, uint64_t stablev)
+    {
+      INVARIANT(!P::IsModifying(stablev));
+      INVARIANT((t & ~P::HDR_LOCKED_MASK) == (stablev & ~P::HDR_LOCKED_MASK));
+      return true;
+    }
+  };
+  template <typename P>
+  struct u64manip<std::atomic<uint64_t>, P> {
+    static inline uint64_t
+    Load(const std::atomic<uint64_t> &t)
+    {
+      return t.load(std::memory_order_acquire);
+    }
+    static inline void
+    Store(std::atomic<uint64_t> &t, uint64_t v)
+    {
+      t.store(v, std::memory_order_release);
+    }
+    static inline uint64_t
+    Lock(std::atomic<uint64_t> &t)
+    {
+#ifdef SPINLOCK_BACKOFF
+      uint64_t backoff_shift = 0;
+#endif
+      uint64_t v = Load(t);
+      while ((v & P::HDR_LOCKED_MASK) ||
+             !t.compare_exchange_strong(v, v | P::HDR_LOCKED_MASK)) {
+#ifdef SPINLOCK_BACKOFF
+        if (backoff_shift < 63)
+          backoff_shift++;
+        uint64_t spins = (1UL << backoff_shift) * BACKOFF_SPINS_FACTOR;
+        while (spins) {
+          nop_pause();
+          spins--;
+        }
+#else
+        nop_pause();
+#endif
+        v = Load(t);
+      }
+      COMPILER_MEMORY_FENCE;
+      return v;
+    }
+    static inline uint64_t
+    LockWithSpinCount(std::atomic<uint64_t> &t, unsigned &spins)
+    {
+#ifdef SPINLOCK_BACKOFF
+      uint64_t backoff_shift = 0;
+#endif
+      spins = 0;
+      uint64_t v = Load(t);
+      while ((v & P::HDR_LOCKED_MASK) ||
+             !t.compare_exchange_strong(v, v | P::HDR_LOCKED_MASK)) {
+#ifdef SPINLOCK_BACKOFF
+        if (backoff_shift < 63)
+          backoff_shift++;
+        uint64_t backoff_spins = (1UL << backoff_shift) * BACKOFF_SPINS_FACTOR;
+        while (backoff_spins) {
+          nop_pause();
+          backoff_spins--;
+        }
+#else
+        nop_pause();
+#endif
+        v = Load(t);
+        spins++;
+      }
+      COMPILER_MEMORY_FENCE;
+      return v;
+    }
+    static inline void
+    Unlock(std::atomic<uint64_t> &v)
+    {
+      INVARIANT(P::IsLocked(v));
+      const uint64_t oldh = Load(v);
+      uint64_t h = oldh;
+      bool newv = false;
+      if ((h & P::HDR_MODIFYING_MASK) ||
+          (h & P::HDR_DELETING_MASK)) {
+        newv = true;
+        const uint64_t n = (h & P::HDR_VERSION_MASK) >> P::HDR_VERSION_SHIFT;
+        h &= ~P::HDR_VERSION_MASK;
+        h |= (((n + 1) << P::HDR_VERSION_SHIFT) & P::HDR_VERSION_MASK);
+      }
+      // clear locked + modifying bits
+      h &= ~(P::HDR_LOCKED_MASK | P::HDR_MODIFYING_MASK);
+      if (newv)
+        INVARIANT(!CheckVersion(oldh, h));
+      INVARIANT(!(h & P::HDR_LOCKED_MASK));
+      INVARIANT(!(h & P::HDR_MODIFYING_MASK));
+      COMPILER_MEMORY_FENCE;
+      Store(v, h);
+    }
+    static inline uint64_t
+    StableVersion(const std::atomic<uint64_t> &t)
+    {
+      uint64_t v = Load(t);
+      while ((v & P::HDR_MODIFYING_MASK)) {
+        nop_pause();
+        v = Load(t);
+      }
+      COMPILER_MEMORY_FENCE;
+      return v;
+    }
+    static inline bool
+    CheckVersion(uint64_t t, uint64_t stablev)
+    {
+      INVARIANT(!(stablev & P::HDR_MODIFYING_MASK));
+      COMPILER_MEMORY_FENCE;
+      return (t & ~P::HDR_LOCKED_MASK) ==
+             (stablev & ~P::HDR_LOCKED_MASK);
+    }
+  };
+}
+
+/**
+ * manipulates a btree version
+ *
+ * hdr bits: layout is (actual bytes depend on the NKeysPerNode parameter)
+ *
+ * <-- low bits
+ * [type | key_slots_used | locked | is_root | modifying | deleting | version ]
+ * [0:1  | 1:5            | 5:6    | 6:7     | 7:8       | 8:9      | 9:64    ]
+ *
+ * bit invariants:
+ *   1) modifying => locked
+ *   2) deleting  => locked
+ *
+ * WARNING: the correctness of our concurrency scheme relies on being able
+ * to do a memory reads/writes from/to hdr atomically. x86 architectures
+ * guarantee that aligned writes are atomic (see intel spec)
+ */
+template <typename VersionType, unsigned NKeysPerNode>
+class btree_version_manip {
+
+  typedef
+    typename private_::typeutil<VersionType>::func_param_type
+    LoadVersionType;
+
+  typedef
+    private_::u64manip<
+      VersionType,
+      btree_version_manip<VersionType, NKeysPerNode>>
+    U64Manip;
+
+  static inline constexpr uint64_t
+  LowMask(uint64_t nbits)
+  {
+    return (1UL << nbits) - 1UL;
+  }
+
+public:
+
+  static const uint64_t HDR_TYPE_BITS = 1;
+  static const uint64_t HDR_TYPE_MASK = 0x1;
+
+  static const uint64_t HDR_KEY_SLOTS_SHIFT = HDR_TYPE_BITS;
+  static const uint64_t HDR_KEY_SLOTS_BITS = ceil_log2_const(NKeysPerNode);
+  static const uint64_t HDR_KEY_SLOTS_MASK = LowMask(HDR_KEY_SLOTS_BITS) << HDR_KEY_SLOTS_SHIFT;
+
+  static const uint64_t HDR_LOCKED_SHIFT = HDR_KEY_SLOTS_SHIFT + HDR_KEY_SLOTS_BITS;
+  static const uint64_t HDR_LOCKED_BITS = 1;
+  static const uint64_t HDR_LOCKED_MASK = LowMask(HDR_LOCKED_BITS) << HDR_LOCKED_SHIFT;
+
+  static const uint64_t HDR_IS_ROOT_SHIFT = HDR_LOCKED_SHIFT + HDR_LOCKED_BITS;
+  static const uint64_t HDR_IS_ROOT_BITS = 1;
+  static const uint64_t HDR_IS_ROOT_MASK = LowMask(HDR_IS_ROOT_BITS) << HDR_IS_ROOT_SHIFT;
+
+  static const uint64_t HDR_MODIFYING_SHIFT = HDR_IS_ROOT_SHIFT + HDR_IS_ROOT_BITS;
+  static const uint64_t HDR_MODIFYING_BITS = 1;
+  static const uint64_t HDR_MODIFYING_MASK = LowMask(HDR_MODIFYING_BITS) << HDR_MODIFYING_SHIFT;
+
+  static const uint64_t HDR_DELETING_SHIFT = HDR_MODIFYING_SHIFT + HDR_MODIFYING_BITS;
+  static const uint64_t HDR_DELETING_BITS = 1;
+  static const uint64_t HDR_DELETING_MASK = LowMask(HDR_DELETING_BITS) << HDR_DELETING_SHIFT;
+
+  static const uint64_t HDR_VERSION_SHIFT = HDR_DELETING_SHIFT + HDR_DELETING_BITS;
+  static const uint64_t HDR_VERSION_MASK = ((uint64_t)-1) << HDR_VERSION_SHIFT;
+
+  // sanity checks
+  static_assert(NKeysPerNode >= 1, "XX");
+
+  static_assert(std::numeric_limits<uint64_t>::max() == (
+      HDR_TYPE_MASK |
+      HDR_KEY_SLOTS_MASK |
+      HDR_LOCKED_MASK |
+      HDR_IS_ROOT_MASK |
+      HDR_MODIFYING_MASK |
+      HDR_DELETING_MASK |
+      HDR_VERSION_MASK
+        ), "XX");
+
+  static_assert( !(HDR_TYPE_MASK & HDR_KEY_SLOTS_MASK) , "XX");
+  static_assert( !(HDR_KEY_SLOTS_MASK & HDR_LOCKED_MASK) , "XX");
+  static_assert( !(HDR_LOCKED_MASK & HDR_IS_ROOT_MASK) , "XX");
+  static_assert( !(HDR_IS_ROOT_MASK & HDR_MODIFYING_MASK) , "XX");
+  static_assert( !(HDR_MODIFYING_MASK & HDR_DELETING_MASK) , "XX");
+  static_assert( !(HDR_DELETING_MASK & HDR_VERSION_MASK) , "XX");
+
+  // low level ops
+
+  static inline uint64_t
+  Load(LoadVersionType v)
+  {
+    return U64Manip::Load(v);
+  }
+
+  static inline void
+  Store(VersionType &t, uint64_t v)
+  {
+    U64Manip::Store(t, v);
+  }
+
+  // accessors
+
+  static inline bool
+  IsLeafNode(LoadVersionType v)
+  {
+    return (Load(v) & HDR_TYPE_MASK) == 0;
+  }
+
+  static inline bool
+  IsInternalNode(LoadVersionType v)
+  {
+    return !IsLeafNode(v);
+  }
+
+  static inline size_t
+  KeySlotsUsed(LoadVersionType v)
+  {
+    return (Load(v) & HDR_KEY_SLOTS_MASK) >> HDR_KEY_SLOTS_SHIFT;
+  }
+
+  static inline bool
+  IsLocked(LoadVersionType v)
+  {
+    return (Load(v) & HDR_LOCKED_MASK);
+  }
+
+  static inline bool
+  IsRoot(LoadVersionType v)
+  {
+    return (Load(v) & HDR_IS_ROOT_MASK);
+  }
+
+  static inline bool
+  IsModifying(LoadVersionType v)
+  {
+    return (Load(v) & HDR_MODIFYING_MASK);
+  }
+
+  static inline bool
+  IsDeleting(LoadVersionType v)
+  {
+    return (Load(v) & HDR_DELETING_MASK);
+  }
+
+  static inline uint64_t
+  Version(LoadVersionType v)
+  {
+    return (Load(v) & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
+  }
+
+  static std::string
+  VersionInfoStr(LoadVersionType v)
+  {
+    std::ostringstream buf;
+    buf << "[";
+
+    if (IsLeafNode(v))
+      buf << "LEAF";
+    else
+      buf << "INT";
+    buf << " | ";
+
+    buf << KeySlotsUsed(v) << " | ";
+
+    if (IsLocked(v))
+      buf << "LOCKED";
+    else
+      buf << "-";
+    buf << " | ";
+
+    if (IsRoot(v))
+      buf << "ROOT";
+    else
+      buf << "-";
+    buf << " | ";
+
+    if (IsModifying(v))
+      buf << "MOD";
+    else
+      buf << "-";
+    buf << " | ";
+
+    if (IsDeleting(v))
+      buf << "DEL";
+    else
+      buf << "-";
+    buf << " | ";
+
+    buf << Version(v);
+
+    buf << "]";
+    return buf.str();
+  }
+
+  // mutators
+
+  static inline void
+  SetKeySlotsUsed(VersionType &v, size_t n)
+  {
+    INVARIANT(n <= NKeysPerNode);
+    INVARIANT(IsModifying(v));
+    uint64_t h = Load(v);
+    h &= ~HDR_KEY_SLOTS_MASK;
+    h |= (n << HDR_KEY_SLOTS_SHIFT);
+    Store(v, h);
+  }
+
+  static inline void
+  IncKeySlotsUsed(VersionType &v)
+  {
+    SetKeySlotsUsed(v, KeySlotsUsed(v) + 1);
+  }
+
+  static inline void
+  DecKeySlotsUsed(VersionType &v)
+  {
+    INVARIANT(KeySlotsUsed(v) > 0);
+    SetKeySlotsUsed(v, KeySlotsUsed(v) - 1);
+  }
+
+  static inline void
+  SetRoot(VersionType &v)
+  {
+    INVARIANT(IsLocked(v));
+    INVARIANT(!IsRoot(v));
+    uint64_t h = Load(v);
+    h |= HDR_IS_ROOT_MASK;
+    Store(v, h);
+  }
+
+  static inline void
+  ClearRoot(VersionType &v)
+  {
+    INVARIANT(IsLocked(v));
+    INVARIANT(IsRoot(v));
+    uint64_t h = Load(v);
+    h &= ~HDR_IS_ROOT_MASK;
+    Store(v, h);
+  }
+
+  static inline void
+  MarkModifying(VersionType &v)
+  {
+    INVARIANT(IsLocked(v));
+    INVARIANT(!IsModifying(v));
+    uint64_t h = Load(v);
+    h |= HDR_MODIFYING_MASK;
+    Store(v, h);
+  }
+
+  static inline void
+  MarkDeleting(VersionType &v)
+  {
+    INVARIANT(IsLocked(v));
+    INVARIANT(!IsDeleting(v));
+    uint64_t h = Load(v);
+    h |= HDR_DELETING_MASK;
+    Store(v, h);
+  }
+
+  // concurrency control
+
+  static inline uint64_t
+  StableVersion(LoadVersionType v)
+  {
+    return U64Manip::StableVersion(v);
+  }
+
+  static inline uint64_t
+  UnstableVersion(LoadVersionType v)
+  {
+    return Load(v);
+  }
+
+  static inline bool
+  CheckVersion(LoadVersionType v, uint64_t stablev)
+  {
+    return U64Manip::CheckVersion(Load(v), stablev);
+  }
+
+  static inline uint64_t
+  Lock(VersionType &v)
+  {
+    return U64Manip::Lock(v);
+  }
+
+  static inline uint64_t
+  LockWithSpinCount(VersionType &v, unsigned &spins)
+  {
+    return U64Manip::LockWithSpinCount(v, spins);
+  }
+
+  static inline void
+  Unlock(VersionType &v)
+  {
+    U64Manip::Unlock(v);
+  }
+};
+
+struct base_btree_config {
+  static const unsigned int NKeysPerNode = 15;
+  static const bool RcuRespCaller = true;
+};
+
+struct concurrent_btree_traits : public base_btree_config {
+  typedef std::atomic<uint64_t> VersionType;
+};
+
+struct single_threaded_btree_traits : public base_btree_config {
+  typedef uint64_t VersionType;
+};
+
+/**
+ * A concurrent, variable key length b+-tree, optimized for read heavy
+ * workloads.
+ *
+ * This b+-tree maps uninterpreted binary strings (key_type) of arbitrary
+ * length to a single pointer (value_type). Binary string values are copied
+ * into the b+-tree, so the caller does not have to worry about preserving
+ * memory for key values.
+ *
+ * This b+-tree does not manage the memory pointed to by value_type. The
+ * pointer is treated completely opaquely.
+ *
+ * So far, this b+-tree has only been tested on 64-bit intel x86 processors.
+ * It's correctness, as it is implemented (not conceptually), requires the
+ * semantics of total store order (TSO) for correctness. To fix this, we would
+ * change compiler fences into actual memory fences, at the very least.
+ */
+template <typename P>
+class btree {
+  template <template <typename> class, typename>
+    friend class base_txn_btree;
+public:
+  typedef varkey key_type;
+  typedef std::string string_type;
+  typedef uint64_t key_slice;
+  typedef uint8_t* value_type;
+  typedef typename std::conditional<!P::RcuRespCaller,
+      scoped_rcu_region,
+      disabled_rcu_region>::type rcu_region;
+
+  // public to assist in testing
+  static const unsigned int NKeysPerNode    = P::NKeysPerNode;
+  static const unsigned int NMinKeysPerNode = P::NKeysPerNode / 2;
+
+private:
+
+  typedef std::pair<ssize_t, size_t> key_search_ret;
+
+  typedef
+    btree_version_manip<typename P::VersionType, NKeysPerNode>
+    VersionManip;
+  typedef
+    btree_version_manip<uint64_t, NKeysPerNode>
+    RawVersionManip;
+
+  struct node {
+
+    typename P::VersionType hdr_;
+
+#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
+    std::thread::id lock_owner_;
+#endif /* BTREE_LOCK_OWNERSHIP_CHECKING */
+
+    /**
+     * Keys are assumed to be stored in contiguous sorted order, so that all
+     * the used slots are grouped together. That is, elems in positions
+     * [0, key_slots_used) are valid, and elems in positions
+     * [key_slots_used, NKeysPerNode) are empty
+     */
+    key_slice keys_[NKeysPerNode];
+
+    node() :
+      hdr_()
+#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
+      , lock_owner_()
+#endif
+    {}
+    ~node()
+    {
+      INVARIANT(!is_locked());
+      INVARIANT(is_deleting());
+    }
+
+    inline bool
+    is_leaf_node() const
+    {
+      return VersionManip::IsLeafNode(hdr_);
+    }
+
+    inline bool
+    is_internal_node() const
+    {
+      return !is_leaf_node();
+    }
+
+    inline size_t
+    key_slots_used() const
+    {
+      return VersionManip::KeySlotsUsed(hdr_);
+    }
+
+    inline void
+    set_key_slots_used(size_t n)
+    {
+      VersionManip::SetKeySlotsUsed(hdr_, n);
+    }
+
+    inline void
+    inc_key_slots_used()
+    {
+      VersionManip::IncKeySlotsUsed(hdr_);
+    }
+
+    inline void
+    dec_key_slots_used()
+    {
+      VersionManip::DecKeySlotsUsed(hdr_);
+    }
+
+    inline bool
+    is_locked() const
+    {
+      return VersionManip::IsLocked(hdr_);
+    }
+
+#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
+    inline bool
+    is_lock_owner() const
+    {
+      return std::this_thread::get_id() == lock_owner_;
+    }
+#else
+    inline bool
+    is_lock_owner() const
+    {
+      return true;
+    }
+#endif /* BTREE_LOCK_OWNERSHIP_CHECKING */
+
+    inline uint64_t
+    lock()
+    {
+#ifdef ENABLE_EVENT_COUNTERS
+      static event_avg_counter
+        evt_avg_btree_leaf_node_lock_acquire_spins(
+            util::cxx_typename<btree<P>>::value() +
+            std::string("_avg_btree_leaf_node_lock_acquire_spins"));
+      static event_avg_counter
+        evt_avg_btree_internal_node_lock_acquire_spins(
+            util::cxx_typename<btree<P>>::value() +
+            std::string("_avg_btree_internal_node_lock_acquire_spins"));
+      unsigned spins;
+      const uint64_t ret = VersionManip::LockWithSpinCount(hdr_, spins);
+      if (is_leaf_node())
+        evt_avg_btree_leaf_node_lock_acquire_spins.offer(spins);
+      else
+        evt_avg_btree_internal_node_lock_acquire_spins.offer(spins);
+#else
+      const uint64_t ret = VersionManip::Lock(hdr_);
+#endif
+#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
+      lock_owner_ = std::this_thread::get_id();
+      AddNodeToLockRegion(this);
+      INVARIANT(is_lock_owner());
+#endif
+      return ret;
+    }
+
+    inline void
+    unlock()
+    {
+#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
+      lock_owner_ = std::thread::id();
+      INVARIANT(!is_lock_owner());
+#endif
+      VersionManip::Unlock(hdr_);
+    }
+
+    inline bool
+    is_root() const
+    {
+      return VersionManip::IsRoot(hdr_);
+    }
+
+    inline void
+    set_root()
+    {
+      VersionManip::SetRoot(hdr_);
+    }
+
+    inline void
+    clear_root()
+    {
+      VersionManip::ClearRoot(hdr_);
+    }
+
+    inline bool
+    is_modifying() const
+    {
+      return VersionManip::IsModifying(hdr_);
+    }
+
+    inline void
+    mark_modifying()
+    {
+      VersionManip::MarkModifying(hdr_);
+    }
+
+    inline bool
+    is_deleting() const
+    {
+      return VersionManip::IsDeleting(hdr_);
+    }
+
+    inline void
+    mark_deleting()
+    {
+      VersionManip::MarkDeleting(hdr_);
+    }
+
+    inline uint64_t
+    unstable_version() const
+    {
+      return VersionManip::UnstableVersion(hdr_);
+    }
+
+    /**
+     * spin until we get a version which is not modifying (but can be locked)
+     */
+    inline uint64_t
+    stable_version() const
+    {
+      return VersionManip::StableVersion(hdr_);
+    }
+
+    inline bool
+    check_version(uint64_t version) const
+    {
+      return VersionManip::CheckVersion(hdr_, version);
+    }
+
+    inline std::string
+    version_info_str() const
+    {
+      return VersionManip::VersionInfoStr(hdr_);
+    }
+
+    void base_invariant_unique_keys_check() const;
+
+    // [min_key, max_key)
+    void
+    base_invariant_checker(const key_slice *min_key,
+                           const key_slice *max_key,
+                           bool is_root) const;
+
+    /** manually simulated virtual function (so we don't make node virtual) */
+    void
+    invariant_checker(const key_slice *min_key,
+                      const key_slice *max_key,
+                      const node *left_sibling,
+                      const node *right_sibling,
+                      bool is_root) const;
+
+    /** another manually simulated virtual function */
+    inline void prefetch() const;
+  };
+
+  struct leaf_node : public node {
+    union value_or_node_ptr {
+      value_type v_;
+      node *n_;
+    };
+
+    key_slice min_key_; // really is min_key's key slice
+    value_or_node_ptr values_[NKeysPerNode];
+
+    // format is:
+    // [ slice_length | type | unused ]
+    // [    0:4       |  4:5 |  5:8   ]
+    uint8_t lengths_[NKeysPerNode];
+
+    leaf_node *prev_;
+    leaf_node *next_;
+
+    // starts out empty- once set, doesn't get freed until dtor (even if all
+    // keys w/ suffixes get removed)
+    imstring *suffixes_;
+
+    inline ALWAYS_INLINE varkey
+    suffix(size_t i) const
+    {
+      return suffixes_ ? varkey(suffixes_[i]) : varkey();
+    }
+
+    //static event_counter g_evt_suffixes_array_created;
+
+    inline void
+    alloc_suffixes()
+    {
+      INVARIANT(this->is_modifying());
+      INVARIANT(!suffixes_);
+      suffixes_ = new imstring[NKeysPerNode];
+      //++g_evt_suffixes_array_created;
+    }
+
+    inline void
+    ensure_suffixes()
+    {
+      INVARIANT(this->is_modifying());
+      if (!suffixes_)
+        alloc_suffixes();
+    }
+
+    leaf_node();
+    ~leaf_node();
+
+    static const uint64_t LEN_LEN_MASK = 0xf;
+
+    static const uint64_t LEN_TYPE_SHIFT = 4;
+    static const uint64_t LEN_TYPE_MASK = 0x1 << LEN_TYPE_SHIFT;
+
+    inline void
+    prefetch() const
+    {
+#ifdef BTREE_NODE_PREFETCH
+      prefetch_object(this);
+#endif
+    }
+
+    inline size_t
+    keyslice_length(size_t n) const
+    {
+      INVARIANT(n < NKeysPerNode);
+      return lengths_[n] & LEN_LEN_MASK;
+    }
+
+    inline void
+    keyslice_set_length(size_t n, size_t len, bool layer)
+    {
+      INVARIANT(n < NKeysPerNode);
+      INVARIANT(this->is_modifying());
+      INVARIANT(len <= 9);
+      INVARIANT(!layer || len == 9);
+      lengths_[n] = (len | (layer ? LEN_TYPE_MASK : 0));
+    }
+
+    inline bool
+    value_is_layer(size_t n) const
+    {
+      INVARIANT(n < NKeysPerNode);
+      return lengths_[n] & LEN_TYPE_MASK;
+    }
+
+    inline void
+    value_set_layer(size_t n)
+    {
+      INVARIANT(n < NKeysPerNode);
+      INVARIANT(this->is_modifying());
+      INVARIANT(keyslice_length(n) == 9);
+      INVARIANT(!value_is_layer(n));
+      lengths_[n] |= LEN_TYPE_MASK;
+    }
+
+    /**
+     * keys[key_search(k).first] == k if key_search(k).first != -1
+     * key does not exist otherwise. considers key length also
+     */
+    inline key_search_ret
+    key_search(key_slice k, size_t len) const
+    {
+      size_t n = this->key_slots_used();
+      ssize_t lower = 0;
+      ssize_t upper = n;
+      while (lower < upper) {
+        ssize_t i = (lower + upper) / 2;
+        key_slice k0 = this->keys_[i];
+        size_t len0 = this->keyslice_length(i);
+        if (k0 < k || (k0 == k && len0 < len))
+          lower = i + 1;
+        else if (k0 == k && len0 == len)
+          return key_search_ret(i, n);
+        else
+          upper = i;
+      }
+      return key_search_ret(-1, n);
+    }
+
+    /**
+     * tightest lower bound key, -1 if no such key exists. operates only
+     * on key slices (internal nodes have unique key slices)
+     */
+    inline key_search_ret
+    key_lower_bound_search(key_slice k, size_t len) const
+    {
+      ssize_t ret = -1;
+      size_t n = this->key_slots_used();
+      ssize_t lower = 0;
+      ssize_t upper = n;
+      while (lower < upper) {
+        ssize_t i = (lower + upper) / 2;
+        key_slice k0 = this->keys_[i];
+        size_t len0 = this->keyslice_length(i);
+        if (k0 < k || (k0 == k && len0 < len)) {
+          ret = i;
+          lower = i + 1;
+        } else if (k0 == k && len0 == len) {
+          return key_search_ret(i, n);
+        } else {
+          upper = i;
+        }
+      }
+      return key_search_ret(ret, n);
+    }
+
+    void
+    invariant_checker_impl(const key_slice *min_key,
+                           const key_slice *max_key,
+                           const node *left_sibling,
+                           const node *right_sibling,
+                           bool is_root) const;
+
+    static inline leaf_node*
+    alloc()
+    {
+      void * const p = rcu::s_instance.alloc(LeafNodeAllocSize);
+      INVARIANT(p);
+      return new (p) leaf_node;
+    }
+
+    static void
+    deleter(void *p)
+    {
+      leaf_node *n = (leaf_node *) p;
+      INVARIANT(n->is_deleting());
+      INVARIANT(!n->is_locked());
+      n->~leaf_node();
+      rcu::s_instance.dealloc(p, LeafNodeAllocSize);
+    }
+
+    static inline void
+    release(leaf_node *n)
+    {
+      if (unlikely(!n))
+        return;
+      n->mark_deleting();
+      rcu::s_instance.free_with_fn(n, deleter);
+    }
+
+  };
+
+  struct internal_node : public node {
+    /**
+     * child at position child_idx is responsible for keys
+     * [keys[child_idx - 1], keys[child_idx])
+     *
+     * in the case where child_idx == 0 or child_idx == this->key_slots_used(), then
+     * the responsiblity value of the min/max key, respectively, is determined
+     * by the parent
+     */
+    node *children_[NKeysPerNode + 1];
+
+    internal_node();
+    ~internal_node();
+
+    inline void
+    prefetch() const
+    {
+#ifdef BTREE_NODE_PREFETCH
+      prefetch_object(this);
+#endif
+    }
+
+    /**
+     * keys[key_search(k).first] == k if key_search(k).first != -1
+     * key does not exist otherwise. operates ony on key slices
+     * (internal nodes have unique key slices)
+     */
+    inline key_search_ret
+    key_search(key_slice k) const
+    {
+      size_t n = this->key_slots_used();
+      ssize_t lower = 0;
+      ssize_t upper = n;
+      while (lower < upper) {
+        ssize_t i = (lower + upper) / 2;
+        key_slice k0 = this->keys_[i];
+        if (k0 == k)
+          return key_search_ret(i, n);
+        else if (k0 > k)
+          upper = i;
+        else
+          lower = i + 1;
+      }
+      return key_search_ret(-1, n);
+    }
+
+    /**
+     * tightest lower bound key, -1 if no such key exists. operates only
+     * on key slices (internal nodes have unique key slices)
+     */
+    inline key_search_ret
+    key_lower_bound_search(key_slice k) const
+    {
+      ssize_t ret = -1;
+      size_t n = this->key_slots_used();
+      ssize_t lower = 0;
+      ssize_t upper = n;
+      while (lower < upper) {
+        ssize_t i = (lower + upper) / 2;
+        key_slice k0 = this->keys_[i];
+        if (k0 == k)
+          return key_search_ret(i, n);
+        else if (k0 > k)
+          upper = i;
+        else {
+          ret = i;
+          lower = i + 1;
+        }
+      }
+      return key_search_ret(ret, n);
+    }
+
+    void
+    invariant_checker_impl(const key_slice *min_key,
+                           const key_slice *max_key,
+                           const node *left_sibling,
+                           const node *right_sibling,
+                           bool is_root) const;
+
+    // XXX: alloc(), deleter(), and release() are copied from leaf_node-
+    // we should templatize them to avoid code duplication
+
+    static inline internal_node*
+    alloc()
+    {
+      void * const p = rcu::s_instance.alloc(InternalNodeAllocSize);
+      INVARIANT(p);
+      return new (p) internal_node;
+    }
+
+    static void
+    deleter(void *p)
+    {
+      internal_node *n = (internal_node *) p;
+      INVARIANT(n->is_deleting());
+      INVARIANT(!n->is_locked());
+      n->~internal_node();
+      rcu::s_instance.dealloc(p, InternalNodeAllocSize);
+    }
+
+    static inline void
+    release(internal_node *n)
+    {
+      if (unlikely(!n))
+        return;
+      n->mark_deleting();
+      rcu::s_instance.free_with_fn(n, deleter);
+    }
+
+  } PACKED;
+
+#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
+public:
+  static inline void
+  NodeLockRegionBegin()
+  {
+    ownership_checker<btree<P>, typename btree<P>::node>::NodeLockRegionBegin();
+  }
+  static inline void
+  AssertAllNodeLocksReleased()
+  {
+    ownership_checker<btree<P>, typename btree<P>::node>::AssertAllNodeLocksReleased();
+  }
+private:
+  static inline void
+  AddNodeToLockRegion(const node *n)
+  {
+    ownership_checker<btree<P>, typename btree<P>::node>::AddNodeToLockRegion(n);
+  }
+#endif
+
+#ifdef BTREE_NODE_ALLOC_CACHE_ALIGNED
+  static const size_t LeafNodeAllocSize = util::round_up<size_t, LG_CACHELINE_SIZE>(sizeof(leaf_node));
+  static const size_t InternalNodeAllocSize = util::round_up<size_t, LG_CACHELINE_SIZE>(sizeof(internal_node));
+#else
+  static const size_t LeafNodeAllocSize = sizeof(leaf_node);
+  static const size_t InternalNodeAllocSize = sizeof(internal_node);
+#endif
+
+  static inline leaf_node*
+  AsLeaf(node *n)
+  {
+    INVARIANT(!n || n->is_leaf_node());
+    return static_cast<leaf_node *>(n);
+  }
+
+  static inline const leaf_node*
+  AsLeaf(const node *n)
+  {
+    return AsLeaf(const_cast<node *>(n));
+  }
+
+  static inline internal_node*
+  AsInternal(node *n)
+  {
+    INVARIANT(!n || n->is_internal_node());
+    return static_cast<internal_node *>(n);
+  }
+
+  static inline const internal_node*
+  AsInternal(const node *n)
+  {
+    return AsInternal(const_cast<node *>(n));
+  }
+
+  static inline leaf_node *
+  AsLeafCheck(node *n, uint64_t v)
+  {
+    return likely(n) && RawVersionManip::IsLeafNode(v) ?
+      static_cast<leaf_node *>(n) : NULL;
+  }
+
+  static inline leaf_node*
+  AsLeafCheck(node *n)
+  {
+    return likely(n) && n->is_leaf_node() ?
+      static_cast<leaf_node *>(n) : NULL;
+  }
+
+  static inline const leaf_node*
+  AsLeafCheck(const node *n, uint64_t v)
+  {
+    return AsLeafCheck(const_cast<node *>(n), v);
+  }
+
+  static inline const leaf_node*
+  AsLeafCheck(const node *n)
+  {
+    return AsLeafCheck(const_cast<node *>(n));
+  }
+
+  static inline internal_node*
+  AsInternalCheck(node *n)
+  {
+    return likely(n) && n->is_internal_node() ? static_cast<internal_node *>(n) : NULL;
+  }
+
+  static inline const internal_node*
+  AsInternalCheck(const node *n)
+  {
+    return AsInternalCheck(const_cast<node *>(n));
+  }
+
+  static inline void
+  UnlockNodes(typename util::vec<node *>::type &locked_nodes)
+  {
+    for (auto it = locked_nodes.begin(); it != locked_nodes.end(); ++it)
+      (*it)->unlock();
+    locked_nodes.clear();
+  }
+
+  template <typename T>
+  static inline T
+  UnlockAndReturn(typename util::vec<node *>::type &locked_nodes, T t)
+  {
+    UnlockNodes(locked_nodes);
+    return t;
+  }
+
+  static bool
+  CheckVersion(uint64_t a, uint64_t b)
+  {
+    return VersionManip::CheckVersion(a, b);
+  }
+
+  /**
+   * Is not thread safe, and does not use RCU to free memory
+   *
+   * Should only be called when there are no outstanding operations on
+   * any nodes reachable from n
+   */
+  static void recursive_delete(node *n);
+
+  node *volatile root_;
+
+public:
+
+  // XXX(stephentu): trying out a very opaque node API for now
+  typedef struct node node_opaque_t;
+  typedef std::pair< const node_opaque_t *, uint64_t > versioned_node_t;
+  struct insert_info_t {
+    const node_opaque_t* node;
+    uint64_t old_version;
+    uint64_t new_version;
+  };
+
+  btree() : root_(leaf_node::alloc())
+  {
+    static_assert(
+        NKeysPerNode > (sizeof(key_slice) + 2), "XX"); // so we can always do a split
+    static_assert(
+        NKeysPerNode <=
+        (VersionManip::HDR_KEY_SLOTS_MASK >> VersionManip::HDR_KEY_SLOTS_SHIFT), "XX");
+
+#ifdef CHECK_INVARIANTS
+    root_->lock();
+    root_->set_root();
+    root_->unlock();
+#else
+    root_->set_root();
+#endif /* CHECK_INVARIANTS */
+  }
+
+  ~btree()
+  {
+    // NOTE: it is assumed on deletion time there are no
+    // outstanding requests to the btree, so deletion proceeds
+    // in a non-threadsafe manner
+    recursive_delete(root_);
+    root_ = NULL;
+  }
+
+  /**
+   * NOT THREAD SAFE
+   */
+  inline void
+  clear()
+  {
+    recursive_delete(root_);
+    root_ = leaf_node::alloc();
+#ifdef CHECK_INVARIANTS
+    root_->lock();
+    root_->set_root();
+    root_->unlock();
+#else
+    root_->set_root();
+#endif /* CHECK_INVARIANTS */
+  }
+
+  /** Note: invariant checking is not thread safe */
+  inline void
+  invariant_checker() const
+  {
+    root_->invariant_checker(NULL, NULL, NULL, NULL, true);
+  }
+
+          /** NOTE: the public interface assumes that the caller has taken care
+           * of setting up RCU */
+
+  inline bool
+  search(const key_type &k, value_type &v,
+         versioned_node_t *search_info = nullptr) const
+  {
+    rcu_region guard;
+    typename util::vec<leaf_node *>::type ns;
+    return search_impl(k, v, ns, search_info);
+  }
+
+  /**
+   * The low level callback interface is as follows:
+   *
+   * Consider a scan in the range [a, b):
+   *   1) on_resp_node() is called at least once per node which
+   *      has a responibility range that overlaps with the scan range
+   *   2) invoke() is called per <k, v>-pair such that k is in [a, b)
+   *
+   * The order of calling on_resp_node() and invoke() is up to the implementation.
+   */
+  class low_level_search_range_callback {
+  public:
+    virtual ~low_level_search_range_callback() {}
+
+    /**
+     * This node lies within the search range (at version v)
+     */
+    virtual void on_resp_node(const node_opaque_t *n, uint64_t version) = 0;
+
+    /**
+     * This key/value pair was read from node n @ version
+     */
+    virtual bool invoke(const string_type &k, value_type v,
+                        const node_opaque_t *n, uint64_t version) = 0;
+  };
+
+  /**
+   * A higher level interface if you don't care about node and version numbers
+   */
+  class search_range_callback : public low_level_search_range_callback {
+  public:
+    virtual void
+    on_resp_node(const node_opaque_t *n, uint64_t version)
+    {
+    }
+
+    virtual bool
+    invoke(const string_type &k, value_type v,
+           const node_opaque_t *n, uint64_t version)
+    {
+      return invoke(k, v);
+    }
+
+    virtual bool invoke(const string_type &k, value_type v) = 0;
+  };
+
+private:
+  template <typename T>
+  class type_callback_wrapper : public search_range_callback {
+  public:
+    type_callback_wrapper(T *callback) : callback_(callback) {}
+    virtual bool
+    invoke(const string_type &k, value_type v)
+    {
+      return callback_->operator()(k, v);
+    }
+  private:
+    T *const callback_;
+  };
+
+  struct leaf_kvinfo {
+    key_slice key_; // in host endian
+    key_slice key_big_endian_;
+    typename leaf_node::value_or_node_ptr vn_;
+    bool layer_;
+    size_t length_;
+    varkey suffix_;
+    leaf_kvinfo() {} // for STL
+    leaf_kvinfo(key_slice key,
+                typename leaf_node::value_or_node_ptr vn,
+                bool layer,
+                size_t length,
+                const varkey &suffix)
+      : key_(key), key_big_endian_(util::big_endian_trfm<key_slice>()(key)),
+        vn_(vn), layer_(layer), length_(length), suffix_(suffix)
+    {}
+
+    inline const char *
+    keyslice() const
+    {
+      return (const char *) &key_big_endian_;
+    }
+  };
+
+  bool search_range_at_layer(leaf_node *leaf,
+                             string_type &prefix,
+                             const key_type &lower,
+                             bool inc_lower,
+                             const key_type *upper,
+                             low_level_search_range_callback &callback) const;
+
+public:
+
+  /**
+   * For all keys in [lower, *upper), invoke callback in ascending order.
+   * If upper is NULL, then there is no upper bound
+   *
+
+   * This function by default provides a weakly consistent view of the b-tree. For
+   * instance, consider the following tree, where n = 3 is the max number of
+   * keys in a node:
+   *
+   *              [D|G]
+   *             /  |  \
+   *            /   |   \
+   *           /    |    \
+   *          /     |     \
+   *   [A|B|C]<->[D|E|F]<->[G|H|I]
+   *
+   * Suppose we want to scan [A, inf), so we traverse to the leftmost leaf node
+   * and start a left-to-right walk. Suppose we have emitted keys A, B, and C,
+   * and we are now just about to scan the middle leaf node.  Now suppose
+   * another thread concurrently does delete(A), followed by a delete(H).  Now
+   * the scaning thread resumes and emits keys D, E, F, G, and I, omitting H
+   * because H was deleted. This is an inconsistent view of the b-tree, since
+   * the scanning thread has observed the deletion of H but did not observe the
+   * deletion of A, but we know that delete(A) happens before delete(H).
+   *
+   * The weakly consistent guarantee provided is the following: all keys
+   * which, at the time of invocation, are known to exist in the btree
+   * will be discovered on a scan (provided the key falls within the scan's range),
+   * and provided there are no concurrent modifications/removals of that key
+   *
+   * Note that scans within a single node are consistent
+   *
+   * XXX: add other modes which provide better consistency:
+   * A) locking mode
+   * B) optimistic validation mode
+   *
+   * the last string parameter is an optional string buffer to use:
+   * if null, a stack allocated string will be used. if not null, must
+   * ensure:
+   *   A) buf->empty() at the beginning
+   *   B) no concurrent mutation of string
+   * note that string contents upon return are arbitrary
+   */
+  void
+  search_range_call(const key_type &lower,
+                    const key_type *upper,
+                    low_level_search_range_callback &callback,
+                    string_type *buf = nullptr) const;
+
+  // (lower, upper]
+  void
+  rsearch_range_call(const key_type &upper,
+                     const key_type *lower,
+                     low_level_search_range_callback &callback,
+                     std::string *buf = nullptr) const
+  {
+    NDB_UNIMPLEMENTED("rsearch_range_call");
+  }
+
+  /**
+   * Callback is expected to implement bool operator()(key_slice k, value_type v),
+   * where the callback returns true if it wants to keep going, false otherwise
+   *
+   */
+  template <typename T>
+  inline void
+  search_range(const key_type &lower,
+               const key_type *upper,
+               T &callback,
+               string_type *buf = nullptr) const
+  {
+    type_callback_wrapper<T> w(&callback);
+    search_range_call(lower, upper, w, buf);
+  }
+
+  template <typename F>
+  inline void
+  rsearch_range(const key_type &upper,
+                const key_type *lower,
+                F& callback,
+                std::string *buf = nullptr) const
+  {
+    NDB_UNIMPLEMENTED("rsearch_range");
+  }
+
+  /**
+   * returns true if key k did not already exist, false otherwise
+   * If k exists with a different mapping, still returns false
+   *
+   * If false and old_v is not NULL, then the overwritten value of v
+   * is written into old_v
+   */
+  inline bool
+  insert(const key_type &k, value_type v,
+         value_type *old_v = NULL,
+         insert_info_t *insert_info = NULL)
+  {
+    rcu_region guard;
+    return insert_stable_location((node **) &root_, k, v, false, old_v, insert_info);
+  }
+
+  /**
+   * Only puts k=>v if k does not exist in map. returns true
+   * if k inserted, false otherwise (k exists already)
+   */
+  inline bool
+  insert_if_absent(const key_type &k, value_type v,
+                   insert_info_t *insert_info = NULL)
+  {
+    rcu_region guard;
+    return insert_stable_location((node **) &root_, k, v, true, NULL, insert_info);
+  }
+
+  /**
+   * return true if a value was removed, false otherwise.
+   *
+   * if true and old_v is not NULL, then the removed value of v
+   * is written into old_v
+   */
+  inline bool
+  remove(const key_type &k, value_type *old_v = NULL)
+  {
+    rcu_region guard;
+    return remove_stable_location((node **) &root_, k, old_v);
+  }
+
+private:
+  bool
+  insert_stable_location(node **root_location, const key_type &k, value_type v,
+                         bool only_if_absent, value_type *old_v,
+                         insert_info_t *insert_info);
+
+  bool
+  remove_stable_location(node **root_location, const key_type &k, value_type *old_v);
+
+public:
+
+  /**
+   * The tree walk API is a bit strange, due to the optimistic nature of the
+   * btree.
+   *
+   * The way it works is that, on_node_begin() is first called. In
+   * on_node_begin(), a callback function should read (but not modify) the
+   * values it is interested in, and save them.
+   *
+   * Then, either one of on_node_success() or on_node_failure() is called. If
+   * on_node_success() is called, then the previous values read in
+   * on_node_begin() are indeed valid.  If on_node_failure() is called, then
+   * the previous values are not valid and should be discarded.
+   */
+  class tree_walk_callback {
+  public:
+    virtual ~tree_walk_callback() {}
+    virtual void on_node_begin(const node_opaque_t *n) = 0;
+    virtual void on_node_success() = 0;
+    virtual void on_node_failure() = 0;
+  };
+
+  void tree_walk(tree_walk_callback &callback) const;
+
+private:
+  class size_walk_callback : public tree_walk_callback {
+  public:
+    size_walk_callback() : spec_size_(0), size_(0) {}
+    virtual void on_node_begin(const node_opaque_t *n);
+    virtual void on_node_success();
+    virtual void on_node_failure();
+    inline size_t get_size() const { return size_; }
+  private:
+    size_t spec_size_;
+    size_t size_;
+  };
+
+public:
+  /**
+   * Is thread-safe, but not really designed to perform well with concurrent
+   * modifications. also the value returned is not consistent given concurrent
+   * modifications
+   */
+  inline size_t
+  size() const
+  {
+    size_walk_callback c;
+    tree_walk(c);
+    return c.get_size();
+  }
+
+  static inline uint64_t
+  ExtractVersionNumber(const node_opaque_t *n)
+  {
+    // XXX(stephentu): I think we must use stable_version() for
+    // correctness, but I am not 100% sure. It's definitely correct to use it,
+    // but maybe we can get away with unstable_version()?
+    return RawVersionManip::Version(n->stable_version());
+  }
+
+  // [value, has_suffix]
+  static std::vector< std::pair<value_type, bool> >
+  ExtractValues(const node_opaque_t *n);
+
+  void print() {
+  }
+
+  /**
+   * Not well defined if n is being concurrently modified, just for debugging
+   */
+  static std::string
+  NodeStringify(const node_opaque_t *n);
+
+  static inline size_t
+  InternalNodeSize()
+  {
+    return sizeof(internal_node);
+  }
+
+  static inline size_t
+  LeafNodeSize()
+  {
+    return sizeof(leaf_node);
+  }
+
+private:
+
+  /**
+   * Move the array slice from [p, n) to the right by k position, occupying [p + k, n + k),
+   * leaving the values of array[p] to array[p + k -1] undefined. Has no effect if p >= n
+   *
+   * Note: Assumes that array[n] is valid memory.
+   */
+  template <typename T>
+  static inline ALWAYS_INLINE void
+  sift_right(T *array, size_t p, size_t n, size_t k = 1)
+  {
+    if (k == 0)
+      return;
+    for (size_t i = n + k - 1; i > p + k - 1; i--)
+      array[i] = array[i - k];
+  }
+
+  // use swap() to do moves, for efficiency- avoid this
+  // variant when using primitive arrays
+  template <typename T>
+  static inline ALWAYS_INLINE void
+  sift_swap_right(T *array, size_t p, size_t n, size_t k = 1)
+  {
+    if (k == 0)
+      return;
+    for (size_t i = n + k - 1; i > p + k - 1; i--)
+      array[i].swap(array[i - k]);
+  }
+
+  /**
+   * Move the array slice from [p + k, n) to the left by k positions, occupying [p, n - k),
+   * overwriting array[p]..array[p+k-1] Has no effect if p + k >= n
+   */
+  template <typename T>
+  static inline ALWAYS_INLINE void
+  sift_left(T *array, size_t p, size_t n, size_t k = 1)
+  {
+    if (unlikely(p + k >= n))
+      return;
+    for (size_t i = p; i < n - k; i++)
+      array[i] = array[i + k];
+  }
+
+  template <typename T>
+  static inline ALWAYS_INLINE void
+  sift_swap_left(T *array, size_t p, size_t n, size_t k = 1)
+  {
+    if (unlikely(p + k >= n))
+      return;
+    for (size_t i = p; i < n - k; i++)
+      array[i].swap(array[i + k]);
+  }
+
+  /**
+   * Copy [p, n) from source into dest. Has no effect if p >= n
+   */
+  template <typename T>
+  static inline ALWAYS_INLINE void
+  copy_into(T *dest, T *source, size_t p, size_t n)
+  {
+    for (size_t i = p; i < n; i++)
+      *dest++ = source[i];
+  }
+
+  template <typename T>
+  static inline ALWAYS_INLINE void
+  swap_with(T *dest, T *source, size_t p, size_t n)
+  {
+    for (size_t i = p; i < n; i++)
+      (dest++)->swap(source[i]);
+  }
+
+  leaf_node *leftmost_descend_layer(node *n) const;
+
+  /**
+   * Assumes RCU region scope is held
+   */
+  bool search_impl(const key_type &k, value_type &v,
+                   typename util::vec<leaf_node *>::type &leaf_nodes,
+                   versioned_node_t *search_info = nullptr) const;
+
+  static leaf_node *
+  FindRespLeafNode(
+      leaf_node *leaf, uint64_t kslice, uint64_t &version);
+
+  /**
+   * traverses the lower leaf levels for a leaf node resp for kslice such that
+   * version is stable and not deleting. resp info is given via idxmatch +
+   * idxlowerbound
+   *
+   * if idxmatch != -1, then ignore idxlowerbound
+   *
+   * note to actually use the info, you still need to validate it (the info is
+   * tentative as of the version)
+   */
+  static leaf_node *
+  FindRespLeafLowerBound(
+      leaf_node *leaf, uint64_t kslice,
+      size_t kslicelen, uint64_t &version,
+      size_t &n, ssize_t &idxmatch, ssize_t &idxlowerbound);
+
+  static leaf_node *
+  FindRespLeafExact(
+      leaf_node *leaf, uint64_t kslice,
+      size_t kslicelen, uint64_t &version,
+      size_t &n, ssize_t &idxmatch);
+
+  typedef std::pair<node *, uint64_t> insert_parent_entry;
+
+  enum insert_status {
+    I_NONE_NOMOD, // no nodes split nor modified
+    I_NONE_MOD, // no nodes split, but modified
+    I_RETRY,
+    I_SPLIT, // node(s) split
+  };
+
+  /**
+   * insert k=>v into node n. if this insert into n causes it to split into two
+   * nodes, return the new node (upper half of keys). in this case, min_key is set to the
+   * smallest key that the new node is responsible for. otherwise return null, in which
+   * case min_key's value is not defined.
+   *
+   * NOTE: our implementation of insert0() is not as efficient as possible, in favor
+   * of code clarity
+   */
+  insert_status
+  insert0(node *n,
+          const key_type &k,
+          value_type v,
+          bool only_if_absent,
+          value_type *old_v,
+          insert_info_t *insert_info,
+          key_slice &min_key,
+          node *&new_node,
+          typename util::vec<insert_parent_entry>::type &parents,
+          typename util::vec<node *>::type &locked_nodes);
+
+  enum remove_status {
+    R_NONE_NOMOD,
+    R_NONE_MOD,
+    R_RETRY,
+    R_STOLE_FROM_LEFT,
+    R_STOLE_FROM_RIGHT,
+    R_MERGE_WITH_LEFT,
+    R_MERGE_WITH_RIGHT,
+    R_REPLACE_NODE,
+  };
+
+  inline ALWAYS_INLINE void
+  remove_pos_from_leaf_node(leaf_node *leaf, size_t pos, size_t n)
+  {
+    INVARIANT(leaf->key_slots_used() == n);
+    INVARIANT(pos < n);
+    if (leaf->value_is_layer(pos)) {
+#ifdef CHECK_INVARIANTS
+      leaf->values_[pos].n_->lock();
+#endif
+      leaf->values_[pos].n_->mark_deleting();
+      INVARIANT(leaf->values_[pos].n_->is_leaf_node());
+      INVARIANT(leaf->values_[pos].n_->key_slots_used() == 0);
+      leaf_node::release((leaf_node *) leaf->values_[pos].n_);
+#ifdef CHECK_INVARIANTS
+      leaf->values_[pos].n_->unlock();
+#endif
+    }
+    sift_left(leaf->keys_, pos, n);
+    sift_left(leaf->values_, pos, n);
+    sift_left(leaf->lengths_, pos, n);
+    if (leaf->suffixes_)
+      sift_swap_left(leaf->suffixes_, pos, n);
+    leaf->dec_key_slots_used();
+  }
+
+  inline ALWAYS_INLINE void
+  remove_pos_from_internal_node(
+      internal_node *internal, size_t key_pos, size_t child_pos, size_t n)
+  {
+    INVARIANT(internal->key_slots_used() == n);
+    INVARIANT(key_pos < n);
+    INVARIANT(child_pos < n + 1);
+    sift_left(internal->keys_, key_pos, n);
+    sift_left(internal->children_, child_pos, n + 1);
+    internal->dec_key_slots_used();
+  }
+
+  struct remove_parent_entry {
+    // non-const members for STL
+    node *parent_;
+    node *parent_left_sibling_;
+    node *parent_right_sibling_;
+    uint64_t parent_version_;
+
+    // default ctor for STL
+    remove_parent_entry()
+      : parent_(NULL), parent_left_sibling_(NULL),
+        parent_right_sibling_(NULL), parent_version_(0)
+    {}
+
+    remove_parent_entry(node *parent,
+                        node *parent_left_sibling,
+                        node *parent_right_sibling,
+                        uint64_t parent_version)
+      : parent_(parent), parent_left_sibling_(parent_left_sibling),
+        parent_right_sibling_(parent_right_sibling),
+        parent_version_(parent_version)
+    {}
+  };
+
+  remove_status
+  remove0(node *np,
+          key_slice *min_key,
+          key_slice *max_key,
+          const key_type &k,
+          value_type *old_v,
+          node *left_node,
+          node *right_node,
+          key_slice &new_key,
+          node *&replace_node,
+          typename util::vec<remove_parent_entry>::type &parents,
+          typename util::vec<node *>::type &locked_nodes);
+};
+
+template <typename P>
+inline void
+btree<P>::node::prefetch() const
+{
+  if (is_leaf_node())
+    AsLeaf(this)->prefetch();
+  else
+    AsInternal(this)->prefetch();
+}
+
+extern void TestConcurrentBtreeFast();
+extern void TestConcurrentBtreeSlow();
+
+#if !NDB_MASSTREE
+typedef btree<concurrent_btree_traits> concurrent_btree;
+typedef btree<single_threaded_btree_traits> single_threaded_btree;
+#endif