modify build setup
[c11concurrency-benchmarks.git] / silo / new-benchmarks / kvdb_database.h
1 #ifndef _KVDB_WRAPPER_IMPL_H_
2 #define _KVDB_WRAPPER_IMPL_H_
3
4 #include <vector>
5 #include <limits>
6 #include <utility>
7
8 #include "../varint.h"
9 #include "../macros.h"
10 #include "../util.h"
11 #include "../amd64.h"
12 #include "../lockguard.h"
13 #include "../prefetch.h"
14 #include "../scopedperf.hh"
15 #include "../counter.h"
16
17 namespace private_ {
18   static event_avg_counter evt_avg_kvdb_stable_version_spins("avg_kvdb_stable_version_spins");
19   static event_avg_counter evt_avg_kvdb_lock_acquire_spins("avg_kvdb_lock_acquire_spins");
20   static event_avg_counter evt_avg_kvdb_read_retries("avg_kvdb_read_retries");
21
22   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe0, kvdb_get_probe0_cg);
23   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe1, kvdb_get_probe1_cg);
24   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_put_probe0, kvdb_put_probe0_cg);
25   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_insert_probe0, kvdb_insert_probe0_cg);
26   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_scan_probe0, kvdb_scan_probe0_cg);
27   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_remove_probe0, kvdb_remove_probe0_cg);
28 }
29
30 // defines single-threaded version
31 template <bool UseConcurrencyControl>
32 struct record_version {
33   uint16_t sz;
34
35   inline ALWAYS_INLINE bool
36   is_locked() const
37   {
38     return false;
39   }
40
41   inline ALWAYS_INLINE void lock() {}
42
43   inline ALWAYS_INLINE void unlock() {}
44
45   static inline ALWAYS_INLINE size_t
46   Size(uint32_t v)
47   {
48     return 0;
49   }
50
51   inline ALWAYS_INLINE size_t
52   size() const
53   {
54     return sz;
55   }
56
57   inline ALWAYS_INLINE void
58   set_size(size_t s)
59   {
60     INVARIANT(s <= std::numeric_limits<uint16_t>::max());
61     sz = s;
62   }
63
64   inline ALWAYS_INLINE uint32_t
65   stable_version() const
66   {
67     return 0;
68   }
69
70   inline ALWAYS_INLINE bool
71   check_version(uint32_t version) const
72   {
73     return true;
74   }
75 };
76
77 // concurrency control version
78 template <>
79 struct record_version<true> {
80   // [ locked | size  | version ]
81   // [  0..1  | 1..17 | 17..32  ]
82
83   static const uint32_t HDR_LOCKED_MASK = 0x1;
84
85   static const uint32_t HDR_SIZE_SHIFT = 1;
86   static const uint32_t HDR_SIZE_MASK = std::numeric_limits<uint16_t>::max() << HDR_SIZE_SHIFT;
87
88   static const uint32_t HDR_VERSION_SHIFT = 17;
89   static const uint32_t HDR_VERSION_MASK = ((uint32_t)-1) << HDR_VERSION_SHIFT;
90
91   record_version<true>() : hdr(0) {}
92
93   volatile uint32_t hdr;
94
95   static inline bool
96   IsLocked(uint32_t v)
97   {
98     return v & HDR_LOCKED_MASK;
99   }
100
101   inline bool
102   is_locked() const
103   {
104     return IsLocked(hdr);
105   }
106
107   inline void
108   lock()
109   {
110 #ifdef ENABLE_EVENT_COUNTERS
111     unsigned long nspins = 0;
112 #endif
113     uint32_t v = hdr;
114     while (IsLocked(v) ||
115            !__sync_bool_compare_and_swap(&hdr, v, v | HDR_LOCKED_MASK)) {
116       nop_pause();
117       v = hdr;
118 #ifdef ENABLE_EVENT_COUNTERS
119       ++nspins;
120 #endif
121     }
122     COMPILER_MEMORY_FENCE;
123 #ifdef ENABLE_EVENT_COUNTERS
124     private_::evt_avg_kvdb_lock_acquire_spins.offer(nspins);
125 #endif
126   }
127
128   inline void
129   unlock()
130   {
131     uint32_t v = hdr;
132     INVARIANT(IsLocked(v));
133     const uint32_t n = Version(v);
134     v &= ~HDR_VERSION_MASK;
135     v |= (((n + 1) << HDR_VERSION_SHIFT) & HDR_VERSION_MASK);
136     v &= ~HDR_LOCKED_MASK;
137     INVARIANT(!IsLocked(v));
138     COMPILER_MEMORY_FENCE;
139     hdr = v;
140   }
141
142   static inline size_t
143   Size(uint32_t v)
144   {
145     return (v & HDR_SIZE_MASK) >> HDR_SIZE_SHIFT;
146   }
147
148   inline size_t
149   size() const
150   {
151     return Size(hdr);
152   }
153
154   inline void
155   set_size(size_t s)
156   {
157     INVARIANT(s <= std::numeric_limits<uint16_t>::max());
158     INVARIANT(is_locked());
159     const uint16_t new_sz = static_cast<uint16_t>(s);
160     hdr &= ~HDR_SIZE_MASK;
161     hdr |= (new_sz << HDR_SIZE_SHIFT);
162     INVARIANT(size() == s);
163   }
164
165   static inline uint32_t
166   Version(uint32_t v)
167   {
168     return (v & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
169   }
170
171   inline uint32_t
172   stable_version() const
173   {
174     uint32_t v = hdr;
175 #ifdef ENABLE_EVENT_COUNTERS
176     unsigned long nspins = 0;
177 #endif
178     while (IsLocked(v)) {
179       nop_pause();
180       v = hdr;
181 #ifdef ENABLE_EVENT_COUNTERS
182       ++nspins;
183 #endif
184     }
185     COMPILER_MEMORY_FENCE;
186 #ifdef ENABLE_EVENT_COUNTERS
187     private_::evt_avg_kvdb_stable_version_spins.offer(nspins);
188 #endif
189     return v;
190   }
191
192   inline bool
193   check_version(uint32_t version) const
194   {
195     COMPILER_MEMORY_FENCE;
196     return hdr == version;
197   }
198 };
199
200 template <bool UseConcurrencyControl>
201 struct basic_kvdb_record : public record_version<UseConcurrencyControl> {
202   typedef record_version<UseConcurrencyControl> super_type;
203   uint16_t alloc_size;
204   char data[0];
205
206   basic_kvdb_record(uint16_t alloc_size, const std::string &s)
207     : record_version<UseConcurrencyControl>(),
208       alloc_size(alloc_size)
209   {
210     NDB_MEMCPY(&data[0], s.data(), s.size());
211     this->set_size(s.size());
212   }
213
214   // just allocate, and set size to 0
215   basic_kvdb_record(uint16_t alloc_size)
216     : record_version<UseConcurrencyControl>(),
217       alloc_size(alloc_size)
218   {
219     this->set_size(0);
220   }
221
222   inline void
223   prefetch() const
224   {
225 #ifdef TUPLE_PREFETCH
226     prefetch_bytes(this, sizeof(*this) + this->size());
227 #endif
228   }
229
230 private:
231   template <typename Reader, typename StringAllocator>
232   inline bool
233   do_guarded_read(Reader &reader, StringAllocator &sa) const
234   {
235     const size_t read_sz = this->size();
236     INVARIANT(read_sz);
237     return reader((uint8_t *) &this->data[0], read_sz, sa);
238   }
239
240 public:
241
242   template <typename Reader, typename StringAllocator>
243   inline void
244   do_read(Reader &reader, StringAllocator &sa) const
245   {
246     if (UseConcurrencyControl) {
247 #ifdef ENABLE_EVENT_COUNTERS
248       unsigned long nretries = 0;
249 #endif
250     retry:
251       const uint32_t v = this->stable_version();
252       if (unlikely(!do_guarded_read(reader, sa) ||
253                    !this->check_version(v))) {
254 #ifdef ENABLE_EVENT_COUNTERS
255         ++nretries;
256 #endif
257         goto retry;
258       }
259 #ifdef ENABLE_EVENT_COUNTERS
260       private_::evt_avg_kvdb_read_retries.offer(nretries);
261 #endif
262     } else {
263       const bool ret = do_guarded_read(reader, sa);
264       if (!ret)
265         INVARIANT(false);
266     }
267   }
268
269   template <typename Writer>
270   inline bool
271   do_write(Writer &writer)
272   {
273     INVARIANT(!UseConcurrencyControl || this->is_locked());
274     const size_t new_sz = writer.compute_needed((const uint8_t *) &this->data[0], this->size());
275     if (unlikely(new_sz > alloc_size))
276       return false;
277     writer((uint8_t *) &this->data[0], this->size());
278     this->set_size(new_sz);
279     return true;
280   }
281
282   static basic_kvdb_record *
283   alloc(size_t alloc_sz)
284   {
285     INVARIANT(alloc_sz <= std::numeric_limits<uint16_t>::max());
286     const size_t max_alloc_sz =
287       std::numeric_limits<uint16_t>::max() + sizeof(basic_kvdb_record);
288     const size_t actual_alloc_sz =
289       std::min(
290           util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(basic_kvdb_record) + alloc_sz),
291           max_alloc_sz);
292     char * const p = reinterpret_cast<char *>(rcu::s_instance.alloc(actual_alloc_sz));
293     INVARIANT(p);
294     return new (p) basic_kvdb_record(actual_alloc_sz - sizeof(basic_kvdb_record));
295   }
296
297   static basic_kvdb_record *
298   alloc(const std::string &s)
299   {
300     const size_t sz = s.size();
301     const size_t max_alloc_sz =
302       std::numeric_limits<uint16_t>::max() + sizeof(basic_kvdb_record);
303     const size_t alloc_sz =
304       std::min(
305           util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(basic_kvdb_record) + sz),
306           max_alloc_sz);
307     char * const p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
308     INVARIANT(p);
309     return new (p) basic_kvdb_record(alloc_sz - sizeof(basic_kvdb_record), s);
310   }
311
312 private:
313   static inline void
314   deleter(void *r)
315   {
316     basic_kvdb_record * const px =
317       reinterpret_cast<basic_kvdb_record *>(r);
318     const size_t alloc_sz = px->alloc_size + sizeof(*px);
319     px->~basic_kvdb_record();
320     rcu::s_instance.dealloc(px, alloc_sz);
321   }
322
323 public:
324   static void
325   release(basic_kvdb_record *r)
326   {
327     if (unlikely(!r))
328       return;
329     rcu::s_instance.free_with_fn(r, deleter);
330   }
331
332   static void
333   release_no_rcu(basic_kvdb_record *r)
334   {
335     if (unlikely(!r))
336       return;
337     deleter(r);
338   }
339
340 } PACKED;
341
342 template <typename Btree, bool UseConcurrencyControl>
343 struct purge_tree_walker : public Btree::tree_walk_callback {
344   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
345
346 #ifdef TXN_BTREE_DUMP_PURGE_STATS
347   purge_tree_walker()
348     : purge_stats_nodes(0),
349       purge_stats_nosuffix_nodes(0) {}
350   std::vector<uint16_t> purge_stats_nkeys_node;
351   size_t purge_stats_nodes;
352   size_t purge_stats_nosuffix_nodes;
353
354   void
355   dump_stats()
356   {
357     size_t v = 0;
358     for (std::vector<uint16_t>::iterator it = purge_stats_nkeys_node.begin();
359         it != purge_stats_nkeys_node.end(); ++it)
360       v += *it;
361     const double avg_nkeys_node = double(v)/double(purge_stats_nkeys_node.size());
362     const double avg_fill_factor = avg_nkeys_node/double(Btree::NKeysPerNode);
363     std::cerr << "btree node stats" << std::endl;
364     std::cerr << "    avg_nkeys_node: " << avg_nkeys_node << std::endl;
365     std::cerr << "    avg_fill_factor: " << avg_fill_factor << std::endl;
366     std::cerr << "    num_nodes: " << purge_stats_nodes << std::endl;
367     std::cerr << "    num_nosuffix_nodes: " << purge_stats_nosuffix_nodes << std::endl;
368   }
369 #endif
370
371   virtual void
372   on_node_begin(const typename Btree::node_opaque_t *n)
373   {
374     INVARIANT(spec_values.empty());
375     spec_values = Btree::ExtractValues(n);
376   }
377
378   virtual void
379   on_node_success()
380   {
381     for (size_t i = 0; i < spec_values.size(); i++) {
382       kvdb_record * const r = (kvdb_record *) spec_values[i].first;
383       kvdb_record::release_no_rcu(r);
384     }
385 #ifdef TXN_BTREE_DUMP_PURGE_STATS
386     purge_stats_nkeys_node.push_back(spec_values.size());
387     purge_stats_nodes++;
388     for (size_t i = 0; i < spec_values.size(); i++)
389       if (spec_values[i].second)
390         goto done;
391     purge_stats_nosuffix_nodes++;
392 done:
393 #endif
394     spec_values.clear();
395   }
396
397   virtual void
398   on_node_failure()
399   {
400     spec_values.clear();
401   }
402
403 private:
404   std::vector<std::pair<typename Btree::value_type, bool>> spec_values;
405 };
406
407 class kvdb_txn {
408 public:
409   inline kvdb_txn(uint64_t, str_arena &a) : a(&a) {}
410   inline str_arena & string_allocator() { return *a; }
411
412   inline bool
413   commit()
414   {
415     return true;
416   }
417
418   inline void
419   abort()
420   {
421     // should never abort
422     ALWAYS_ASSERT(false);
423   }
424
425 private:
426   str_arena *a;
427   scoped_rcu_region region;
428 };
429
430 template <typename Schema, bool UseConcurrencyControl>
431 class kvdb_index : public abstract_ordered_index {
432 public:
433
434   typedef typename Schema::base_type base_type;
435   typedef typename Schema::key_type key_type;
436   typedef typename Schema::value_type value_type;
437   typedef typename Schema::value_descriptor_type value_descriptor_type;
438   typedef typename Schema::key_encoder_type key_encoder_type;
439   typedef typename Schema::value_encoder_type value_encoder_type;
440
441   static const uint64_t AllFieldsMask = typed_txn_btree_<Schema>::AllFieldsMask;
442   typedef util::Fields<AllFieldsMask> AllFields;
443
444   struct search_range_callback {
445   public:
446     virtual ~search_range_callback() {}
447     virtual bool invoke(const key_type &k, const value_type &v) = 0;
448   };
449
450   struct bytes_search_range_callback {
451   public:
452     virtual ~bytes_search_range_callback() {}
453     virtual bool invoke(const std::string &k, const std::string &v) = 0;
454   };
455
456 private:
457   // leverage the definitions for txn_btree and typed_txn_btree
458
459   typedef txn_btree_::key_reader bytes_key_reader;
460   typedef txn_btree_::single_value_reader bytes_single_value_reader;
461   typedef txn_btree_::value_reader bytes_value_reader;
462
463   typedef
464     typename typed_txn_btree_<Schema>::key_writer
465     key_writer;
466   typedef
467     typename typed_txn_btree_<Schema>::key_reader
468     key_reader;
469
470   typedef
471     typename typed_txn_btree_<Schema>::value_writer
472     value_writer;
473   typedef
474     typename typed_txn_btree_<Schema>::single_value_reader
475     single_value_reader;
476   typedef
477     typename typed_txn_btree_<Schema>::value_reader
478     value_reader;
479
480   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
481
482   template <typename Btree, typename Callback, typename KeyReader, typename ValueReader>
483   class kvdb_wrapper_search_range_callback : public Btree::search_range_callback {
484   public:
485
486     kvdb_wrapper_search_range_callback(
487         Callback &upcall,
488         KeyReader &kr,
489         ValueReader &vr,
490         str_arena &arena)
491       : upcall(&upcall), kr(&kr),
492         vr(&vr), arena(&arena) {}
493
494     virtual bool
495     invoke(const typename Btree::string_type &k, typename Btree::value_type v)
496     {
497       const kvdb_record * const r =
498         reinterpret_cast<const kvdb_record *>(v);
499       r->prefetch();
500       r->do_read(*vr, *arena);
501       return upcall->invoke((*kr)(k), vr->results());
502     }
503
504   private:
505     Callback *upcall;
506     KeyReader *kr;
507     ValueReader *vr;
508     str_arena *arena;
509   };
510
511 public:
512
513   kvdb_index(size_t value_size_hint,
514             bool mostly_append,
515             const std::string &name)
516     : name(name)
517   {}
518
519   // virtual interface
520
521   virtual size_t
522   size() const OVERRIDE
523   {
524     return btr.size();
525   }
526
527   virtual std::map<std::string, uint64_t>
528   clear() OVERRIDE
529   {
530     purge_tree_walker<my_btree, UseConcurrencyControl> w;
531     btr.tree_walk(w);
532     btr.clear();
533 #ifdef TXN_BTREE_DUMP_PURGE_STATS
534     std::cerr << "purging kvdb index: " << name << std::endl;
535     w.dump_stats();
536 #endif
537     return std::map<std::string, uint64_t>();
538   }
539
540   // templated interface
541
542   template <typename FieldsMask = AllFields>
543   inline bool search(
544       kvdb_txn &t, const key_type &k, value_type &v,
545       FieldsMask fm = FieldsMask());
546
547   template <typename FieldsMask = AllFields>
548   inline void search_range_call(
549       kvdb_txn &t, const key_type &lower, const key_type *upper,
550       search_range_callback &callback,
551       bool no_key_results = false /* skip decoding of keys? */,
552       FieldsMask fm = FieldsMask());
553
554   // a lower-level variant which does not bother to decode the key/values
555   inline void bytes_search_range_call(
556       kvdb_txn &t, const key_type &lower, const key_type *upper,
557       bytes_search_range_callback &callback,
558       size_t value_fields_prefix = std::numeric_limits<size_t>::max());
559
560   template <typename FieldsMask = AllFields>
561   inline void put(
562       kvdb_txn &t, const key_type &k, const value_type &v,
563       FieldsMask fm = FieldsMask());
564
565   inline void insert(
566       kvdb_txn &t, const key_type &k, const value_type &v);
567
568   inline void remove(
569       kvdb_txn &t, const key_type &k);
570
571 private:
572
573   template <typename Callback, typename KeyReader, typename ValueReader>
574   inline void do_search_range_call(
575       kvdb_txn &t, const key_type &lower, const key_type *upper,
576       Callback &callback, KeyReader &kr, ValueReader &vr);
577
578   std::string name;
579   typedef
580     typename std::conditional<
581       UseConcurrencyControl,
582       concurrent_btree,
583       single_threaded_btree>::type
584     my_btree;
585    my_btree btr;
586 };
587
588 template <bool UseConcurrencyControl>
589 class kvdb_database : public abstract_db {
590 public:
591
592   template <typename Schema>
593   struct IndexType {
594     typedef kvdb_index<Schema, UseConcurrencyControl> type;
595     typedef std::shared_ptr<type> ptr_type;
596   };
597
598   template <enum abstract_db::TxnProfileHint hint>
599   struct TransactionType
600   {
601     typedef kvdb_txn type;
602     typedef std::shared_ptr<type> ptr_type;
603   };
604
605   template <enum abstract_db::TxnProfileHint hint>
606   inline typename TransactionType<hint>::ptr_type
607   new_txn(uint64_t txn_flags, str_arena &arena) const
608   {
609     return std::make_shared<typename TransactionType<hint>::type>(txn_flags, arena);
610   }
611
612   typedef transaction_abort_exception abort_exception_type;
613
614   virtual void
615   do_txn_epoch_sync() const OVERRIDE
616   {
617   }
618
619   virtual void
620   do_txn_finish() const OVERRIDE
621   {
622   }
623
624   template <typename Schema>
625   inline typename IndexType<Schema>::ptr_type
626   open_index(const std::string &name,
627              size_t value_size_hint,
628              bool mostly_append)
629   {
630     return std::make_shared<typename IndexType<Schema>::type>(
631         value_size_hint, mostly_append, name);
632   }
633 };
634
635 template <typename Schema, bool UseConcurrencyControl>
636 template <typename FieldsMask>
637 bool
638 kvdb_index<Schema, UseConcurrencyControl>::search(
639     kvdb_txn &t, const key_type &k, value_type &v,
640     FieldsMask fm)
641 {
642   key_writer kw(&k);
643   const std::string * const keypx =
644     kw.fully_materialize(false, t.string_allocator());
645
646   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
647   ANON_REGION("kvdb_ordered_index::get:", &private_::kvdb_get_probe0_cg);
648   typename my_btree::value_type p = 0;
649   if (btr.search(varkey(*keypx), p)) {
650     ANON_REGION("kvdb_ordered_index::get:do_read:", &private_::kvdb_get_probe1_cg);
651     const kvdb_record * const r = reinterpret_cast<const kvdb_record *>(p);
652     r->prefetch();
653     single_value_reader vr(v, FieldsMask::value);
654     r->do_read(vr, t.string_allocator());
655     return true;
656   }
657   return false;
658 }
659
660 template <typename Schema, bool UseConcurrencyControl>
661 template <typename FieldsMask>
662 void
663 kvdb_index<Schema, UseConcurrencyControl>::search_range_call(
664     kvdb_txn &t, const key_type &lower, const key_type *upper,
665     search_range_callback &callback,
666     bool no_key_results,
667     FieldsMask fm)
668 {
669   key_reader kr(no_key_results);
670   value_reader vr(FieldsMask::value);
671
672   do_search_range_call(t, lower, upper, callback, kr, vr);
673 }
674
675 template <typename Schema, bool UseConcurrencyControl>
676 void
677 kvdb_index<Schema, UseConcurrencyControl>::bytes_search_range_call(
678     kvdb_txn &t, const key_type &lower, const key_type *upper,
679     bytes_search_range_callback &callback,
680     size_t value_fields_prefix)
681 {
682   const value_encoder_type value_encoder;
683   const size_t max_bytes_read =
684     value_encoder.encode_max_nbytes_prefix(value_fields_prefix);
685   bytes_key_reader kr;
686   bytes_value_reader vr(max_bytes_read);
687
688   do_search_range_call(t, lower, upper, callback, kr, vr);
689 }
690
691 template <typename Schema, bool UseConcurrencyControl>
692 template <typename FieldsMask>
693 void
694 kvdb_index<Schema, UseConcurrencyControl>::put(
695     kvdb_txn &t, const key_type &key, const value_type &value,
696     FieldsMask fm)
697 {
698   key_writer kw(&key);
699   const std::string * const keypx =
700     kw.fully_materialize(false, t.string_allocator());
701   if (UseConcurrencyControl)
702     // XXX: currently unsupported- need to ensure locked values
703     // are the canonical versions pointed to by the tree
704     ALWAYS_ASSERT(false);
705   value_writer vw(&value, FieldsMask::value);
706   typename my_btree::value_type v = 0, v_old = 0;
707   if (btr.search(varkey(*keypx), v)) {
708     kvdb_record * const r = reinterpret_cast<kvdb_record *>(v);
709     r->prefetch();
710     lock_guard<kvdb_record> guard(*r);
711     if (r->do_write(vw))
712       return;
713     // replace - slow-path
714     kvdb_record * const rnew =
715       kvdb_record::alloc(*vw.fully_materialize(false, t.string_allocator()));
716     btr.insert(varkey(*keypx), (typename my_btree::value_type) rnew, &v_old, 0);
717     INVARIANT((typename my_btree::value_type) r == v_old);
718     // rcu-free the old record
719     kvdb_record::release(r);
720     return;
721   }
722
723   // also slow-path
724   kvdb_record * const rnew =
725     kvdb_record::alloc(*vw.fully_materialize(false, t.string_allocator()));
726   if (!btr.insert(varkey(*keypx), (typename my_btree::value_type) rnew, &v_old, 0)) {
727     kvdb_record * const r = (kvdb_record *) v_old;
728     kvdb_record::release(r);
729   }
730   return;
731 }
732
733 template <typename Schema, bool UseConcurrencyControl>
734 void
735 kvdb_index<Schema, UseConcurrencyControl>::insert(
736     kvdb_txn &t, const key_type &k, const value_type &v)
737 {
738   key_writer kw(&k);
739   const std::string * const keypx =
740     kw.fully_materialize(false, t.string_allocator());
741   if (UseConcurrencyControl)
742     // XXX: currently unsupported- see above
743     ALWAYS_ASSERT(false);
744   value_writer vw(&v, AllFieldsMask);
745   const size_t sz = vw.compute_needed(nullptr, 0);
746   kvdb_record * const rec = kvdb_record::alloc(sz);
747   vw((uint8_t *) &rec->data[0], 0);
748   rec->set_size(sz);
749   if (likely(btr.insert_if_absent(varkey(*keypx), (typename my_btree::value_type) rec, nullptr)))
750     return;
751   kvdb_record::release_no_rcu(rec);
752   put(t, k, v);
753 }
754
755 template <typename Schema, bool UseConcurrencyControl>
756 void
757 kvdb_index<Schema, UseConcurrencyControl>::remove(
758     kvdb_txn &t, const key_type &k)
759 {
760   key_writer kw(&k);
761   const std::string * const keypx =
762     kw.fully_materialize(false, t.string_allocator());
763   ANON_REGION("kvdb_ordered_index::remove:", &private_::kvdb_remove_probe0_cg);
764   if (UseConcurrencyControl)
765     // XXX: currently unsupported- see above
766     ALWAYS_ASSERT(false);
767   typename my_btree::value_type v = 0;
768   if (likely(btr.remove(varkey(*keypx), &v))) {
769     kvdb_record * const r = reinterpret_cast<kvdb_record *>(v);
770     kvdb_record::release(r);
771   }
772 }
773
774 template <typename Schema, bool UseConcurrencyControl>
775 template <typename Callback, typename KeyReader, typename ValueReader>
776 void
777 kvdb_index<Schema, UseConcurrencyControl>::do_search_range_call(
778       kvdb_txn &t, const key_type &lower, const key_type *upper,
779       Callback &callback, KeyReader &kr, ValueReader &vr)
780 {
781   key_writer lower_key_writer(&lower);
782   key_writer upper_key_writer(upper);
783   const std::string * const lower_str =
784     lower_key_writer.fully_materialize(false, t.string_allocator());
785   const std::string * const upper_str =
786     upper_key_writer.fully_materialize(false, t.string_allocator());
787
788   kvdb_wrapper_search_range_callback<
789     my_btree,
790     Callback,
791     KeyReader,
792     ValueReader> c(callback, kr, vr, t.string_allocator());
793
794   varkey uppervk;
795   if (upper_str)
796     uppervk = varkey(*upper_str);
797   btr.search_range_call(varkey(*lower_str), upper_str ? &uppervk : nullptr, c, t.string_allocator()());
798 }
799
800 #endif /* _KVDB_WRAPPER_IMPL_H_ */