1 #ifndef _KVDB_WRAPPER_IMPL_H_
2 #define _KVDB_WRAPPER_IMPL_H_
12 #include "../lockguard.h"
13 #include "../prefetch.h"
14 #include "../scopedperf.hh"
15 #include "../counter.h"
18 static event_avg_counter evt_avg_kvdb_stable_version_spins("avg_kvdb_stable_version_spins");
19 static event_avg_counter evt_avg_kvdb_lock_acquire_spins("avg_kvdb_lock_acquire_spins");
20 static event_avg_counter evt_avg_kvdb_read_retries("avg_kvdb_read_retries");
22 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe0, kvdb_get_probe0_cg);
23 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe1, kvdb_get_probe1_cg);
24 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_put_probe0, kvdb_put_probe0_cg);
25 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_insert_probe0, kvdb_insert_probe0_cg);
26 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_scan_probe0, kvdb_scan_probe0_cg);
27 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_remove_probe0, kvdb_remove_probe0_cg);
30 // defines single-threaded version
31 template <bool UseConcurrencyControl>
32 struct record_version {
35 inline ALWAYS_INLINE bool
41 inline ALWAYS_INLINE void lock() {}
43 inline ALWAYS_INLINE void unlock() {}
45 static inline ALWAYS_INLINE size_t
51 inline ALWAYS_INLINE size_t
57 inline ALWAYS_INLINE void
60 INVARIANT(s <= std::numeric_limits<uint16_t>::max());
64 inline ALWAYS_INLINE uint32_t
65 stable_version() const
70 inline ALWAYS_INLINE bool
71 check_version(uint32_t version) const
77 // concurrency control version
79 struct record_version<true> {
80 // [ locked | size | version ]
81 // [ 0..1 | 1..17 | 17..32 ]
83 static const uint32_t HDR_LOCKED_MASK = 0x1;
85 static const uint32_t HDR_SIZE_SHIFT = 1;
86 static const uint32_t HDR_SIZE_MASK = std::numeric_limits<uint16_t>::max() << HDR_SIZE_SHIFT;
88 static const uint32_t HDR_VERSION_SHIFT = 17;
89 static const uint32_t HDR_VERSION_MASK = ((uint32_t)-1) << HDR_VERSION_SHIFT;
91 record_version<true>() : hdr(0) {}
93 volatile uint32_t hdr;
98 return v & HDR_LOCKED_MASK;
104 return IsLocked(hdr);
110 #ifdef ENABLE_EVENT_COUNTERS
111 unsigned long nspins = 0;
114 while (IsLocked(v) ||
115 !__sync_bool_compare_and_swap(&hdr, v, v | HDR_LOCKED_MASK)) {
118 #ifdef ENABLE_EVENT_COUNTERS
122 COMPILER_MEMORY_FENCE;
123 #ifdef ENABLE_EVENT_COUNTERS
124 private_::evt_avg_kvdb_lock_acquire_spins.offer(nspins);
132 INVARIANT(IsLocked(v));
133 const uint32_t n = Version(v);
134 v &= ~HDR_VERSION_MASK;
135 v |= (((n + 1) << HDR_VERSION_SHIFT) & HDR_VERSION_MASK);
136 v &= ~HDR_LOCKED_MASK;
137 INVARIANT(!IsLocked(v));
138 COMPILER_MEMORY_FENCE;
145 return (v & HDR_SIZE_MASK) >> HDR_SIZE_SHIFT;
157 INVARIANT(s <= std::numeric_limits<uint16_t>::max());
158 INVARIANT(is_locked());
159 const uint16_t new_sz = static_cast<uint16_t>(s);
160 hdr &= ~HDR_SIZE_MASK;
161 hdr |= (new_sz << HDR_SIZE_SHIFT);
162 INVARIANT(size() == s);
165 static inline uint32_t
168 return (v & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
172 stable_version() const
175 #ifdef ENABLE_EVENT_COUNTERS
176 unsigned long nspins = 0;
178 while (IsLocked(v)) {
181 #ifdef ENABLE_EVENT_COUNTERS
185 COMPILER_MEMORY_FENCE;
186 #ifdef ENABLE_EVENT_COUNTERS
187 private_::evt_avg_kvdb_stable_version_spins.offer(nspins);
193 check_version(uint32_t version) const
195 COMPILER_MEMORY_FENCE;
196 return hdr == version;
200 template <bool UseConcurrencyControl>
201 struct basic_kvdb_record : public record_version<UseConcurrencyControl> {
202 typedef record_version<UseConcurrencyControl> super_type;
206 basic_kvdb_record(uint16_t alloc_size, const std::string &s)
207 : record_version<UseConcurrencyControl>(),
208 alloc_size(alloc_size)
210 NDB_MEMCPY(&data[0], s.data(), s.size());
211 this->set_size(s.size());
214 // just allocate, and set size to 0
215 basic_kvdb_record(uint16_t alloc_size)
216 : record_version<UseConcurrencyControl>(),
217 alloc_size(alloc_size)
225 #ifdef TUPLE_PREFETCH
226 prefetch_bytes(this, sizeof(*this) + this->size());
231 template <typename Reader, typename StringAllocator>
233 do_guarded_read(Reader &reader, StringAllocator &sa) const
235 const size_t read_sz = this->size();
237 return reader((uint8_t *) &this->data[0], read_sz, sa);
242 template <typename Reader, typename StringAllocator>
244 do_read(Reader &reader, StringAllocator &sa) const
246 if (UseConcurrencyControl) {
247 #ifdef ENABLE_EVENT_COUNTERS
248 unsigned long nretries = 0;
251 const uint32_t v = this->stable_version();
252 if (unlikely(!do_guarded_read(reader, sa) ||
253 !this->check_version(v))) {
254 #ifdef ENABLE_EVENT_COUNTERS
259 #ifdef ENABLE_EVENT_COUNTERS
260 private_::evt_avg_kvdb_read_retries.offer(nretries);
263 const bool ret = do_guarded_read(reader, sa);
269 template <typename Writer>
271 do_write(Writer &writer)
273 INVARIANT(!UseConcurrencyControl || this->is_locked());
274 const size_t new_sz = writer.compute_needed((const uint8_t *) &this->data[0], this->size());
275 if (unlikely(new_sz > alloc_size))
277 writer((uint8_t *) &this->data[0], this->size());
278 this->set_size(new_sz);
282 static basic_kvdb_record *
283 alloc(size_t alloc_sz)
285 INVARIANT(alloc_sz <= std::numeric_limits<uint16_t>::max());
286 const size_t max_alloc_sz =
287 std::numeric_limits<uint16_t>::max() + sizeof(basic_kvdb_record);
288 const size_t actual_alloc_sz =
290 util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(basic_kvdb_record) + alloc_sz),
292 char * const p = reinterpret_cast<char *>(rcu::s_instance.alloc(actual_alloc_sz));
294 return new (p) basic_kvdb_record(actual_alloc_sz - sizeof(basic_kvdb_record));
297 static basic_kvdb_record *
298 alloc(const std::string &s)
300 const size_t sz = s.size();
301 const size_t max_alloc_sz =
302 std::numeric_limits<uint16_t>::max() + sizeof(basic_kvdb_record);
303 const size_t alloc_sz =
305 util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(basic_kvdb_record) + sz),
307 char * const p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
309 return new (p) basic_kvdb_record(alloc_sz - sizeof(basic_kvdb_record), s);
316 basic_kvdb_record * const px =
317 reinterpret_cast<basic_kvdb_record *>(r);
318 const size_t alloc_sz = px->alloc_size + sizeof(*px);
319 px->~basic_kvdb_record();
320 rcu::s_instance.dealloc(px, alloc_sz);
325 release(basic_kvdb_record *r)
329 rcu::s_instance.free_with_fn(r, deleter);
333 release_no_rcu(basic_kvdb_record *r)
342 template <typename Btree, bool UseConcurrencyControl>
343 struct purge_tree_walker : public Btree::tree_walk_callback {
344 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
346 #ifdef TXN_BTREE_DUMP_PURGE_STATS
348 : purge_stats_nodes(0),
349 purge_stats_nosuffix_nodes(0) {}
350 std::vector<uint16_t> purge_stats_nkeys_node;
351 size_t purge_stats_nodes;
352 size_t purge_stats_nosuffix_nodes;
358 for (std::vector<uint16_t>::iterator it = purge_stats_nkeys_node.begin();
359 it != purge_stats_nkeys_node.end(); ++it)
361 const double avg_nkeys_node = double(v)/double(purge_stats_nkeys_node.size());
362 const double avg_fill_factor = avg_nkeys_node/double(Btree::NKeysPerNode);
363 std::cerr << "btree node stats" << std::endl;
364 std::cerr << " avg_nkeys_node: " << avg_nkeys_node << std::endl;
365 std::cerr << " avg_fill_factor: " << avg_fill_factor << std::endl;
366 std::cerr << " num_nodes: " << purge_stats_nodes << std::endl;
367 std::cerr << " num_nosuffix_nodes: " << purge_stats_nosuffix_nodes << std::endl;
372 on_node_begin(const typename Btree::node_opaque_t *n)
374 INVARIANT(spec_values.empty());
375 spec_values = Btree::ExtractValues(n);
381 for (size_t i = 0; i < spec_values.size(); i++) {
382 kvdb_record * const r = (kvdb_record *) spec_values[i].first;
383 kvdb_record::release_no_rcu(r);
385 #ifdef TXN_BTREE_DUMP_PURGE_STATS
386 purge_stats_nkeys_node.push_back(spec_values.size());
388 for (size_t i = 0; i < spec_values.size(); i++)
389 if (spec_values[i].second)
391 purge_stats_nosuffix_nodes++;
404 std::vector<std::pair<typename Btree::value_type, bool>> spec_values;
409 inline kvdb_txn(uint64_t, str_arena &a) : a(&a) {}
410 inline str_arena & string_allocator() { return *a; }
421 // should never abort
422 ALWAYS_ASSERT(false);
427 scoped_rcu_region region;
430 template <typename Schema, bool UseConcurrencyControl>
431 class kvdb_index : public abstract_ordered_index {
434 typedef typename Schema::base_type base_type;
435 typedef typename Schema::key_type key_type;
436 typedef typename Schema::value_type value_type;
437 typedef typename Schema::value_descriptor_type value_descriptor_type;
438 typedef typename Schema::key_encoder_type key_encoder_type;
439 typedef typename Schema::value_encoder_type value_encoder_type;
441 static const uint64_t AllFieldsMask = typed_txn_btree_<Schema>::AllFieldsMask;
442 typedef util::Fields<AllFieldsMask> AllFields;
444 struct search_range_callback {
446 virtual ~search_range_callback() {}
447 virtual bool invoke(const key_type &k, const value_type &v) = 0;
450 struct bytes_search_range_callback {
452 virtual ~bytes_search_range_callback() {}
453 virtual bool invoke(const std::string &k, const std::string &v) = 0;
457 // leverage the definitions for txn_btree and typed_txn_btree
459 typedef txn_btree_::key_reader bytes_key_reader;
460 typedef txn_btree_::single_value_reader bytes_single_value_reader;
461 typedef txn_btree_::value_reader bytes_value_reader;
464 typename typed_txn_btree_<Schema>::key_writer
467 typename typed_txn_btree_<Schema>::key_reader
471 typename typed_txn_btree_<Schema>::value_writer
474 typename typed_txn_btree_<Schema>::single_value_reader
477 typename typed_txn_btree_<Schema>::value_reader
480 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
482 template <typename Btree, typename Callback, typename KeyReader, typename ValueReader>
483 class kvdb_wrapper_search_range_callback : public Btree::search_range_callback {
486 kvdb_wrapper_search_range_callback(
491 : upcall(&upcall), kr(&kr),
492 vr(&vr), arena(&arena) {}
495 invoke(const typename Btree::string_type &k, typename Btree::value_type v)
497 const kvdb_record * const r =
498 reinterpret_cast<const kvdb_record *>(v);
500 r->do_read(*vr, *arena);
501 return upcall->invoke((*kr)(k), vr->results());
513 kvdb_index(size_t value_size_hint,
515 const std::string &name)
522 size() const OVERRIDE
527 virtual std::map<std::string, uint64_t>
530 purge_tree_walker<my_btree, UseConcurrencyControl> w;
533 #ifdef TXN_BTREE_DUMP_PURGE_STATS
534 std::cerr << "purging kvdb index: " << name << std::endl;
537 return std::map<std::string, uint64_t>();
540 // templated interface
542 template <typename FieldsMask = AllFields>
544 kvdb_txn &t, const key_type &k, value_type &v,
545 FieldsMask fm = FieldsMask());
547 template <typename FieldsMask = AllFields>
548 inline void search_range_call(
549 kvdb_txn &t, const key_type &lower, const key_type *upper,
550 search_range_callback &callback,
551 bool no_key_results = false /* skip decoding of keys? */,
552 FieldsMask fm = FieldsMask());
554 // a lower-level variant which does not bother to decode the key/values
555 inline void bytes_search_range_call(
556 kvdb_txn &t, const key_type &lower, const key_type *upper,
557 bytes_search_range_callback &callback,
558 size_t value_fields_prefix = std::numeric_limits<size_t>::max());
560 template <typename FieldsMask = AllFields>
562 kvdb_txn &t, const key_type &k, const value_type &v,
563 FieldsMask fm = FieldsMask());
566 kvdb_txn &t, const key_type &k, const value_type &v);
569 kvdb_txn &t, const key_type &k);
573 template <typename Callback, typename KeyReader, typename ValueReader>
574 inline void do_search_range_call(
575 kvdb_txn &t, const key_type &lower, const key_type *upper,
576 Callback &callback, KeyReader &kr, ValueReader &vr);
580 typename std::conditional<
581 UseConcurrencyControl,
583 single_threaded_btree>::type
588 template <bool UseConcurrencyControl>
589 class kvdb_database : public abstract_db {
592 template <typename Schema>
594 typedef kvdb_index<Schema, UseConcurrencyControl> type;
595 typedef std::shared_ptr<type> ptr_type;
598 template <enum abstract_db::TxnProfileHint hint>
599 struct TransactionType
601 typedef kvdb_txn type;
602 typedef std::shared_ptr<type> ptr_type;
605 template <enum abstract_db::TxnProfileHint hint>
606 inline typename TransactionType<hint>::ptr_type
607 new_txn(uint64_t txn_flags, str_arena &arena) const
609 return std::make_shared<typename TransactionType<hint>::type>(txn_flags, arena);
612 typedef transaction_abort_exception abort_exception_type;
615 do_txn_epoch_sync() const OVERRIDE
620 do_txn_finish() const OVERRIDE
624 template <typename Schema>
625 inline typename IndexType<Schema>::ptr_type
626 open_index(const std::string &name,
627 size_t value_size_hint,
630 return std::make_shared<typename IndexType<Schema>::type>(
631 value_size_hint, mostly_append, name);
635 template <typename Schema, bool UseConcurrencyControl>
636 template <typename FieldsMask>
638 kvdb_index<Schema, UseConcurrencyControl>::search(
639 kvdb_txn &t, const key_type &k, value_type &v,
643 const std::string * const keypx =
644 kw.fully_materialize(false, t.string_allocator());
646 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
647 ANON_REGION("kvdb_ordered_index::get:", &private_::kvdb_get_probe0_cg);
648 typename my_btree::value_type p = 0;
649 if (btr.search(varkey(*keypx), p)) {
650 ANON_REGION("kvdb_ordered_index::get:do_read:", &private_::kvdb_get_probe1_cg);
651 const kvdb_record * const r = reinterpret_cast<const kvdb_record *>(p);
653 single_value_reader vr(v, FieldsMask::value);
654 r->do_read(vr, t.string_allocator());
660 template <typename Schema, bool UseConcurrencyControl>
661 template <typename FieldsMask>
663 kvdb_index<Schema, UseConcurrencyControl>::search_range_call(
664 kvdb_txn &t, const key_type &lower, const key_type *upper,
665 search_range_callback &callback,
669 key_reader kr(no_key_results);
670 value_reader vr(FieldsMask::value);
672 do_search_range_call(t, lower, upper, callback, kr, vr);
675 template <typename Schema, bool UseConcurrencyControl>
677 kvdb_index<Schema, UseConcurrencyControl>::bytes_search_range_call(
678 kvdb_txn &t, const key_type &lower, const key_type *upper,
679 bytes_search_range_callback &callback,
680 size_t value_fields_prefix)
682 const value_encoder_type value_encoder;
683 const size_t max_bytes_read =
684 value_encoder.encode_max_nbytes_prefix(value_fields_prefix);
686 bytes_value_reader vr(max_bytes_read);
688 do_search_range_call(t, lower, upper, callback, kr, vr);
691 template <typename Schema, bool UseConcurrencyControl>
692 template <typename FieldsMask>
694 kvdb_index<Schema, UseConcurrencyControl>::put(
695 kvdb_txn &t, const key_type &key, const value_type &value,
699 const std::string * const keypx =
700 kw.fully_materialize(false, t.string_allocator());
701 if (UseConcurrencyControl)
702 // XXX: currently unsupported- need to ensure locked values
703 // are the canonical versions pointed to by the tree
704 ALWAYS_ASSERT(false);
705 value_writer vw(&value, FieldsMask::value);
706 typename my_btree::value_type v = 0, v_old = 0;
707 if (btr.search(varkey(*keypx), v)) {
708 kvdb_record * const r = reinterpret_cast<kvdb_record *>(v);
710 lock_guard<kvdb_record> guard(*r);
713 // replace - slow-path
714 kvdb_record * const rnew =
715 kvdb_record::alloc(*vw.fully_materialize(false, t.string_allocator()));
716 btr.insert(varkey(*keypx), (typename my_btree::value_type) rnew, &v_old, 0);
717 INVARIANT((typename my_btree::value_type) r == v_old);
718 // rcu-free the old record
719 kvdb_record::release(r);
724 kvdb_record * const rnew =
725 kvdb_record::alloc(*vw.fully_materialize(false, t.string_allocator()));
726 if (!btr.insert(varkey(*keypx), (typename my_btree::value_type) rnew, &v_old, 0)) {
727 kvdb_record * const r = (kvdb_record *) v_old;
728 kvdb_record::release(r);
733 template <typename Schema, bool UseConcurrencyControl>
735 kvdb_index<Schema, UseConcurrencyControl>::insert(
736 kvdb_txn &t, const key_type &k, const value_type &v)
739 const std::string * const keypx =
740 kw.fully_materialize(false, t.string_allocator());
741 if (UseConcurrencyControl)
742 // XXX: currently unsupported- see above
743 ALWAYS_ASSERT(false);
744 value_writer vw(&v, AllFieldsMask);
745 const size_t sz = vw.compute_needed(nullptr, 0);
746 kvdb_record * const rec = kvdb_record::alloc(sz);
747 vw((uint8_t *) &rec->data[0], 0);
749 if (likely(btr.insert_if_absent(varkey(*keypx), (typename my_btree::value_type) rec, nullptr)))
751 kvdb_record::release_no_rcu(rec);
755 template <typename Schema, bool UseConcurrencyControl>
757 kvdb_index<Schema, UseConcurrencyControl>::remove(
758 kvdb_txn &t, const key_type &k)
761 const std::string * const keypx =
762 kw.fully_materialize(false, t.string_allocator());
763 ANON_REGION("kvdb_ordered_index::remove:", &private_::kvdb_remove_probe0_cg);
764 if (UseConcurrencyControl)
765 // XXX: currently unsupported- see above
766 ALWAYS_ASSERT(false);
767 typename my_btree::value_type v = 0;
768 if (likely(btr.remove(varkey(*keypx), &v))) {
769 kvdb_record * const r = reinterpret_cast<kvdb_record *>(v);
770 kvdb_record::release(r);
774 template <typename Schema, bool UseConcurrencyControl>
775 template <typename Callback, typename KeyReader, typename ValueReader>
777 kvdb_index<Schema, UseConcurrencyControl>::do_search_range_call(
778 kvdb_txn &t, const key_type &lower, const key_type *upper,
779 Callback &callback, KeyReader &kr, ValueReader &vr)
781 key_writer lower_key_writer(&lower);
782 key_writer upper_key_writer(upper);
783 const std::string * const lower_str =
784 lower_key_writer.fully_materialize(false, t.string_allocator());
785 const std::string * const upper_str =
786 upper_key_writer.fully_materialize(false, t.string_allocator());
788 kvdb_wrapper_search_range_callback<
792 ValueReader> c(callback, kr, vr, t.string_allocator());
796 uppervk = varkey(*upper_str);
797 btr.search_range_call(varkey(*lower_str), upper_str ? &uppervk : nullptr, c, t.string_allocator()());
800 #endif /* _KVDB_WRAPPER_IMPL_H_ */