1 #ifndef _KVDB_WRAPPER_IMPL_H_
2 #define _KVDB_WRAPPER_IMPL_H_
8 #include "kvdb_wrapper.h"
10 #include "../macros.h"
13 #include "../lockguard.h"
14 #include "../prefetch.h"
15 #include "../scopedperf.hh"
16 #include "../counter.h"
19 static event_avg_counter evt_avg_kvdb_stable_version_spins("avg_kvdb_stable_version_spins");
20 static event_avg_counter evt_avg_kvdb_lock_acquire_spins("avg_kvdb_lock_acquire_spins");
21 static event_avg_counter evt_avg_kvdb_read_retries("avg_kvdb_read_retries");
23 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe0, kvdb_get_probe0_cg);
24 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe1, kvdb_get_probe1_cg);
25 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_put_probe0, kvdb_put_probe0_cg);
26 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_insert_probe0, kvdb_insert_probe0_cg);
27 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_scan_probe0, kvdb_scan_probe0_cg);
28 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_remove_probe0, kvdb_remove_probe0_cg);
31 // defines single-threaded version
32 template <bool UseConcurrencyControl>
33 struct record_version {
36 inline ALWAYS_INLINE bool
42 inline ALWAYS_INLINE void lock() {}
44 inline ALWAYS_INLINE void unlock() {}
46 static inline ALWAYS_INLINE size_t
52 inline ALWAYS_INLINE size_t
58 inline ALWAYS_INLINE void
61 INVARIANT(s <= std::numeric_limits<uint16_t>::max());
65 inline ALWAYS_INLINE uint32_t
66 stable_version() const
71 inline ALWAYS_INLINE bool
72 check_version(uint32_t version) const
78 // concurrency control version
80 struct record_version<true> {
81 // [ locked | size | version ]
82 // [ 0..1 | 1..17 | 17..32 ]
84 static const uint32_t HDR_LOCKED_MASK = 0x1;
86 static const uint32_t HDR_SIZE_SHIFT = 1;
87 static const uint32_t HDR_SIZE_MASK = std::numeric_limits<uint16_t>::max() << HDR_SIZE_SHIFT;
89 static const uint32_t HDR_VERSION_SHIFT = 17;
90 static const uint32_t HDR_VERSION_MASK = ((uint32_t)-1) << HDR_VERSION_SHIFT;
92 record_version<true>() : hdr(0) {}
94 volatile uint32_t hdr;
99 return v & HDR_LOCKED_MASK;
105 return IsLocked(hdr);
111 #ifdef ENABLE_EVENT_COUNTERS
112 unsigned long nspins = 0;
115 while (IsLocked(v) ||
116 !__sync_bool_compare_and_swap(&hdr, v, v | HDR_LOCKED_MASK)) {
119 #ifdef ENABLE_EVENT_COUNTERS
123 COMPILER_MEMORY_FENCE;
124 #ifdef ENABLE_EVENT_COUNTERS
125 private_::evt_avg_kvdb_lock_acquire_spins.offer(nspins);
133 INVARIANT(IsLocked(v));
134 const uint32_t n = Version(v);
135 v &= ~HDR_VERSION_MASK;
136 v |= (((n + 1) << HDR_VERSION_SHIFT) & HDR_VERSION_MASK);
137 v &= ~HDR_LOCKED_MASK;
138 INVARIANT(!IsLocked(v));
139 COMPILER_MEMORY_FENCE;
146 return (v & HDR_SIZE_MASK) >> HDR_SIZE_SHIFT;
158 INVARIANT(s <= std::numeric_limits<uint16_t>::max());
159 INVARIANT(is_locked());
160 const uint16_t new_sz = static_cast<uint16_t>(s);
161 hdr &= ~HDR_SIZE_MASK;
162 hdr |= (new_sz << HDR_SIZE_SHIFT);
163 INVARIANT(size() == s);
166 static inline uint32_t
169 return (v & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
173 stable_version() const
176 #ifdef ENABLE_EVENT_COUNTERS
177 unsigned long nspins = 0;
179 while (IsLocked(v)) {
182 #ifdef ENABLE_EVENT_COUNTERS
186 COMPILER_MEMORY_FENCE;
187 #ifdef ENABLE_EVENT_COUNTERS
188 private_::evt_avg_kvdb_stable_version_spins.offer(nspins);
194 check_version(uint32_t version) const
196 COMPILER_MEMORY_FENCE;
197 return hdr == version;
201 template <bool UseConcurrencyControl>
202 struct basic_kvdb_record : public record_version<UseConcurrencyControl> {
203 typedef record_version<UseConcurrencyControl> super_type;
207 basic_kvdb_record(uint16_t alloc_size, const std::string &s)
208 : record_version<UseConcurrencyControl>(),
209 alloc_size(alloc_size)
211 #ifdef CHECK_INVARIANTS
213 this->set_size(s.size());
217 this->set_size(s.size());
226 prefetch_bytes(this, sizeof(*this) + size());
231 do_read(std::string &s, size_t max_bytes_read) const
233 if (UseConcurrencyControl) {
234 #ifdef ENABLE_EVENT_COUNTERS
235 unsigned long nretries = 0;
238 const uint32_t v = this->stable_version();
239 const size_t sz = std::min(super_type::Size(v), max_bytes_read);
240 s.assign(&data[0], sz);
241 if (unlikely(!this->check_version(v))) {
242 #ifdef ENABLE_EVENT_COUNTERS
247 #ifdef ENABLE_EVENT_COUNTERS
248 private_::evt_avg_kvdb_read_retries.offer(nretries);
251 const size_t sz = std::min(this->size(), max_bytes_read);
252 s.assign(&data[0], sz);
257 do_write(const std::string &s)
259 INVARIANT(!UseConcurrencyControl || this->is_locked());
260 if (unlikely(s.size() > alloc_size))
262 this->set_size(s.size());
263 NDB_MEMCPY(&data[0], s.data(), s.size());
267 static basic_kvdb_record *
268 alloc(const std::string &s)
270 const size_t sz = s.size();
271 const size_t max_alloc_sz =
272 std::numeric_limits<uint16_t>::max() + sizeof(basic_kvdb_record);
273 const size_t alloc_sz =
275 util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(basic_kvdb_record) + sz),
277 char * const p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
279 return new (p) basic_kvdb_record(alloc_sz - sizeof(basic_kvdb_record), s);
286 basic_kvdb_record * const px =
287 reinterpret_cast<basic_kvdb_record *>(r);
288 const size_t alloc_sz = px->alloc_size + sizeof(*px);
289 px->~basic_kvdb_record();
290 rcu::s_instance.dealloc(px, alloc_sz);
295 release(basic_kvdb_record *r)
299 rcu::s_instance.free_with_fn(r, deleter);
303 release_no_rcu(basic_kvdb_record *r)
312 template <bool UseConcurrencyControl>
314 kvdb_ordered_index<UseConcurrencyControl>::get(
316 const std::string &key,
317 std::string &value, size_t max_bytes_read)
319 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
320 ANON_REGION("kvdb_ordered_index::get:", &private_::kvdb_get_probe0_cg);
321 typename my_btree::value_type v = 0;
322 if (btr.search(varkey(key), v)) {
323 ANON_REGION("kvdb_ordered_index::get:do_read:", &private_::kvdb_get_probe1_cg);
324 const kvdb_record * const r = (const kvdb_record *) v;
326 r->do_read(value, max_bytes_read);
332 template <bool UseConcurrencyControl>
334 kvdb_ordered_index<UseConcurrencyControl>::put(
336 const std::string &key,
337 const std::string &value)
339 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
340 ANON_REGION("kvdb_ordered_index::put:", &private_::kvdb_put_probe0_cg);
341 typename my_btree::value_type v = 0, v_old = 0;
342 if (btr.search(varkey(key), v)) {
344 kvdb_record * const r = (kvdb_record *) v;
346 lock_guard<kvdb_record> guard(*r);
347 if (r->do_write(value))
350 kvdb_record * const rnew = kvdb_record::alloc(value);
351 btr.insert(varkey(key), (typename my_btree::value_type) rnew, &v_old, 0);
352 INVARIANT((typename my_btree::value_type) r == v_old);
353 // rcu-free the old record
354 kvdb_record::release(r);
357 kvdb_record * const rnew = kvdb_record::alloc(value);
358 if (!btr.insert(varkey(key), (typename my_btree::value_type) rnew, &v_old, 0)) {
359 kvdb_record * const r = (kvdb_record *) v_old;
360 kvdb_record::release(r);
365 template <bool UseConcurrencyControl>
367 kvdb_ordered_index<UseConcurrencyControl>::insert(void *txn,
368 const std::string &key,
369 const std::string &value)
371 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
372 ANON_REGION("kvdb_ordered_index::insert:", &private_::kvdb_insert_probe0_cg);
373 kvdb_record * const rnew = kvdb_record::alloc(value);
374 typename my_btree::value_type v_old = 0;
375 if (!btr.insert(varkey(key), (typename my_btree::value_type) rnew, &v_old, 0)) {
376 kvdb_record * const r = (kvdb_record *) v_old;
377 kvdb_record::release(r);
382 template <typename Btree, bool UseConcurrencyControl>
383 class kvdb_wrapper_search_range_callback : public Btree::search_range_callback {
385 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
386 kvdb_wrapper_search_range_callback(
387 abstract_ordered_index::scan_callback &upcall,
389 : upcall(&upcall), arena(arena) {}
392 invoke(const typename Btree::string_type &k, typename Btree::value_type v)
394 const kvdb_record * const r = (const kvdb_record *) v;
395 std::string * const s_px = likely(arena) ? arena->next() : nullptr;
396 INVARIANT(s_px && s_px->empty());
398 r->do_read(*s_px, std::numeric_limits<size_t>::max());
399 return upcall->invoke(k.data(), k.length(), *s_px);
403 abstract_ordered_index::scan_callback *upcall;
407 template <bool UseConcurrencyControl>
409 kvdb_ordered_index<UseConcurrencyControl>::scan(
411 const std::string &start_key,
412 const std::string *end_key,
413 scan_callback &callback,
416 ANON_REGION("kvdb_ordered_index::scan:", &private_::kvdb_scan_probe0_cg);
417 kvdb_wrapper_search_range_callback<my_btree, UseConcurrencyControl> c(callback, arena);
418 key_type end(end_key ? key_type(*end_key) : key_type());
419 btr.search_range_call(key_type(start_key), end_key ? &end : 0, c, arena->next());
422 template <bool UseConcurrencyControl>
424 kvdb_ordered_index<UseConcurrencyControl>::rscan(
426 const std::string &start_key,
427 const std::string *end_key,
428 scan_callback &callback,
431 ANON_REGION("kvdb_ordered_index::rscan:", &private_::kvdb_scan_probe0_cg);
432 kvdb_wrapper_search_range_callback<my_btree, UseConcurrencyControl> c(callback, arena);
433 key_type end(end_key ? key_type(*end_key) : key_type());
434 btr.rsearch_range_call(key_type(start_key), end_key ? &end : 0, c, arena->next());
437 template <bool UseConcurrencyControl>
439 kvdb_ordered_index<UseConcurrencyControl>::remove(void *txn, const std::string &key)
441 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
442 ANON_REGION("kvdb_ordered_index::remove:", &private_::kvdb_remove_probe0_cg);
443 typename my_btree::value_type v = 0;
444 if (btr.remove(varkey(key), &v)) {
445 kvdb_record * const r = (kvdb_record *) v;
446 kvdb_record::release(r);
450 template <bool UseConcurrencyControl>
452 kvdb_ordered_index<UseConcurrencyControl>::size() const
457 template <typename Btree, bool UseConcurrencyControl>
458 struct purge_tree_walker : public Btree::tree_walk_callback {
459 typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
461 #ifdef TXN_BTREE_DUMP_PURGE_STATS
463 : purge_stats_nodes(0),
464 purge_stats_nosuffix_nodes(0) {}
465 std::map<size_t, size_t> purge_stats_record_size_counts; // just the record
466 std::map<size_t, size_t> purge_stats_alloc_size_counts; // includes overhead
467 std::vector<uint16_t> purge_stats_nkeys_node;
468 size_t purge_stats_nodes;
469 size_t purge_stats_nosuffix_nodes;
475 for (std::vector<uint16_t>::iterator it = purge_stats_nkeys_node.begin();
476 it != purge_stats_nkeys_node.end(); ++it)
478 const double avg_nkeys_node = double(v)/double(purge_stats_nkeys_node.size());
479 const double avg_fill_factor = avg_nkeys_node/double(Btree::NKeysPerNode);
480 std::cerr << "btree node stats" << std::endl;
481 std::cerr << " avg_nkeys_node: " << avg_nkeys_node << std::endl;
482 std::cerr << " avg_fill_factor: " << avg_fill_factor << std::endl;
483 std::cerr << " num_nodes: " << purge_stats_nodes << std::endl;
484 std::cerr << " num_nosuffix_nodes: " << purge_stats_nosuffix_nodes << std::endl;
485 std::cerr << "record size stats (nbytes => count)" << std::endl;
486 for (std::map<size_t, size_t>::iterator it = purge_stats_record_size_counts.begin();
487 it != purge_stats_record_size_counts.end(); ++it)
488 std::cerr << " " << it->first << " => " << it->second << std::endl;
489 std::cerr << "alloc size stats (nbytes => count)" << std::endl;
490 for (std::map<size_t, size_t>::iterator it = purge_stats_alloc_size_counts.begin();
491 it != purge_stats_alloc_size_counts.end(); ++it)
492 std::cerr << " " << (it->first + sizeof(kvdb_record)) << " => " << it->second << std::endl;
497 on_node_begin(const typename Btree::node_opaque_t *n)
499 INVARIANT(spec_values.empty());
500 spec_values = Btree::ExtractValues(n);
506 for (size_t i = 0; i < spec_values.size(); i++) {
507 kvdb_record * const r = (kvdb_record *) spec_values[i].first;
508 purge_stats_record_size_counts[r->size()]++;
509 purge_stats_alloc_size_counts[r->alloc_size]++;
510 kvdb_record::release_no_rcu(r);
512 #ifdef TXN_BTREE_DUMP_PURGE_STATS
513 purge_stats_nkeys_node.push_back(spec_values.size());
515 for (size_t i = 0; i < spec_values.size(); i++)
516 if (spec_values[i].second)
518 purge_stats_nosuffix_nodes++;
531 std::vector<std::pair<typename Btree::value_type, bool>> spec_values;
534 template <bool UseConcurrencyControl>
535 std::map<std::string, uint64_t>
536 kvdb_ordered_index<UseConcurrencyControl>::clear()
539 purge_tree_walker<my_btree, UseConcurrencyControl> w;
540 scoped_rcu_region guard;
543 #ifdef TXN_BTREE_DUMP_PURGE_STATS
544 std::cerr << "purging kvdb index: " << name << std::endl;
547 return std::map<std::string, uint64_t>();
550 template <bool UseConcurrencyControl>
551 abstract_ordered_index *
552 kvdb_wrapper<UseConcurrencyControl>::open_index(
553 const std::string &name, size_t value_size_hint, bool mostly_append)
555 return new kvdb_ordered_index<UseConcurrencyControl>(name);
558 #endif /* _KVDB_WRAPPER_IMPL_H_ */