19 #include "scopedperf.hh"
23 btree<P>::node::base_invariant_unique_keys_check() const
25 size_t n = this->key_slots_used();
29 const leaf_node *leaf = AsLeaf(this);
30 typedef std::pair<key_slice, size_t> leaf_key;
32 prev.first = keys_[0];
33 prev.second = leaf->keyslice_length(0);
34 ALWAYS_ASSERT(prev.second <= 9);
35 ALWAYS_ASSERT(!leaf->is_layer(0) || prev.second == 9);
36 if (!leaf->is_layer(0) && prev.second == 9) {
37 ALWAYS_ASSERT(leaf->suffixes_);
38 ALWAYS_ASSERT(leaf->suffixes_[0].size() >= 1);
40 for (size_t i = 1; i < n; i++) {
42 cur_key.first = keys_[i];
43 cur_key.second = leaf->keyslice_length(i);
44 ALWAYS_ASSERT(cur_key.second <= 9);
45 ALWAYS_ASSERT(!leaf->is_layer(i) || cur_key.second == 9);
46 if (!leaf->is_layer(i) && cur_key.second == 9) {
47 ALWAYS_ASSERT(leaf->suffixes_);
48 ALWAYS_ASSERT(leaf->suffixes_[i].size() >= 1);
50 ALWAYS_ASSERT(cur_key > prev);
54 key_slice prev = keys_[0];
55 for (size_t i = 1; i < n; i++) {
56 ALWAYS_ASSERT(keys_[i] > prev);
64 btree<P>::node::base_invariant_checker(const key_slice *min_key,
65 const key_slice *max_key,
68 ALWAYS_ASSERT(!is_locked());
69 ALWAYS_ASSERT(!is_modifying());
70 ALWAYS_ASSERT(this->is_root() == is_root);
71 size_t n = this->key_slots_used();
72 ALWAYS_ASSERT(n <= NKeysPerNode);
74 if (is_internal_node())
75 ALWAYS_ASSERT(n >= 1);
77 if (is_internal_node())
78 ALWAYS_ASSERT(n >= NMinKeysPerNode);
80 // key-slices constrain splits
81 ALWAYS_ASSERT(n >= 1);
83 for (size_t i = 0; i < n; i++) {
84 ALWAYS_ASSERT(!min_key || keys_[i] >= *min_key);
85 ALWAYS_ASSERT(!max_key || keys_[i] < *max_key);
87 base_invariant_unique_keys_check();
92 btree<P>::node::invariant_checker(const key_slice *min_key,
93 const key_slice *max_key,
94 const node *left_sibling,
95 const node *right_sibling,
99 AsLeaf(this)->invariant_checker_impl(min_key, max_key, left_sibling, right_sibling, is_root) :
100 AsInternal(this)->invariant_checker_impl(min_key, max_key, left_sibling, right_sibling, is_root) ;
103 //static event_counter evt_btree_leaf_node_creates("btree_leaf_node_creates");
104 //static event_counter evt_btree_leaf_node_deletes("btree_leaf_node_deletes");
106 //event_counter btree<P>::leaf_node::g_evt_suffixes_array_created("btree_leaf_node_suffix_array_creates");
108 template <typename P>
109 btree<P>::leaf_node::leaf_node()
110 : node(), min_key_(0), prev_(NULL), next_(NULL), suffixes_(NULL)
112 //++evt_btree_leaf_node_creates;
115 template <typename P>
116 btree<P>::leaf_node::~leaf_node()
121 //++evt_btree_leaf_node_deletes;
124 template <typename P>
126 btree<P>::leaf_node::invariant_checker_impl(const key_slice *min_key,
127 const key_slice *max_key,
128 const node *left_sibling,
129 const node *right_sibling,
132 this->base_invariant_checker(min_key, max_key, is_root);
133 ALWAYS_ASSERT(!min_key || *min_key == this->min_key_);
134 ALWAYS_ASSERT(!is_root || min_key == NULL);
135 ALWAYS_ASSERT(!is_root || max_key == NULL);
136 ALWAYS_ASSERT(is_root || this->key_slots_used() > 0);
137 size_t n = this->key_slots_used();
138 for (size_t i = 0; i < n; i++)
139 if (this->is_layer(i))
140 this->values_[i].n_->invariant_checker(NULL, NULL, NULL, NULL, true);
143 //static event_counter evt_btree_internal_node_creates("btree_internal_node_creates");
144 //static event_counter evt_btree_internal_node_deletes("btree_internal_node_deletes");
146 template <typename P>
147 btree<P>::internal_node::internal_node()
149 VersionManip::Store(this->hdr_, 1);
150 //++evt_btree_internal_node_creates;
153 template <typename P>
154 btree<P>::internal_node::~internal_node()
156 //++evt_btree_internal_node_deletes;
159 template <typename P>
161 btree<P>::internal_node::invariant_checker_impl(const key_slice *min_key,
162 const key_slice *max_key,
163 const node *left_sibling,
164 const node *right_sibling,
167 this->base_invariant_checker(min_key, max_key, is_root);
168 size_t n = this->key_slots_used();
169 for (size_t i = 0; i <= n; i++) {
170 ALWAYS_ASSERT(this->children_[i] != NULL);
172 const node *left_child_sibling = NULL;
174 left_child_sibling = AsInternal(left_sibling)->children_[left_sibling->key_slots_used()];
175 this->children_[0]->invariant_checker(min_key, &this->keys_[0], left_child_sibling, this->children_[i + 1], false);
177 const node *right_child_sibling = NULL;
179 right_child_sibling = AsInternal(right_sibling)->children_[0];
180 this->children_[n]->invariant_checker(&this->keys_[n - 1], max_key, this->children_[i - 1], right_child_sibling, false);
182 this->children_[i]->invariant_checker(&this->keys_[i - 1], &this->keys_[i], this->children_[i - 1], this->children_[i + 1], false);
185 if (!n || this->children_[0]->is_internal_node())
187 for (size_t i = 0; i <= n; i++) {
188 const node *left_child_sibling = NULL;
189 const node *right_child_sibling = NULL;
191 left_child_sibling = AsInternal(left_sibling)->children_[left_sibling->key_slots_used()];
193 right_child_sibling = AsInternal(right_sibling)->children_[0];
194 const leaf_node *child_prev = (i == 0) ? AsLeaf(left_child_sibling) : AsLeaf(this->children_[i - 1]);
195 const leaf_node *child_next = (i == n) ? AsLeaf(right_child_sibling) : AsLeaf(this->children_[i + 1]);
196 ALWAYS_ASSERT(AsLeaf(this->children_[i])->prev_ == child_prev);
197 ALWAYS_ASSERT(AsLeaf(this->children_[i])->next_ == child_next);
201 template <typename P>
203 btree<P>::recursive_delete(node *n)
205 if (leaf_node *leaf = AsLeafCheck(n)) {
206 #ifdef CHECK_INVARIANTS
208 leaf->mark_deleting();
211 size_t n = leaf->key_slots_used();
212 for (size_t i = 0; i < n; i++)
213 if (leaf->is_layer(i))
214 recursive_delete(leaf->values_[i].n_);
215 leaf_node::deleter(leaf);
217 internal_node *internal = AsInternal(n);
218 size_t n = internal->key_slots_used();
219 for (size_t i = 0; i < n + 1; i++)
220 recursive_delete(internal->children_[i]);
221 #ifdef CHECK_INVARIANTS
223 internal->mark_deleting();
226 internal_node::deleter(internal);
230 //STATIC_COUNTER_DECL(scopedperf::tsc_ctr, btree_search_impl_tsc, btree_search_impl_perf_cg);
232 template <typename P>
234 btree<P>::search_impl(const key_type &k, value_type &v,
235 typename util::vec<leaf_node *>::type &leaf_nodes,
236 versioned_node_t *search_info) const
238 INVARIANT(rcu::s_instance.in_rcu_region());
239 //ANON_REGION("btree<P>::search_impl:", &btree_search_impl_perf_cg);
240 INVARIANT(leaf_nodes.empty());
247 if (likely(leaf_nodes.empty())) {
251 kslicelen = std::min(k.size(), size_t(9));
253 kcur = k.shift_many(leaf_nodes.size() - 1);
254 cur = leaf_nodes.back();
255 kslice = kcur.slice();
256 kslicelen = std::min(kcur.size(), size_t(9));
257 leaf_nodes.pop_back();
261 // each iteration of this while loop tries to descend
262 // down node "cur", looking for key kcur
265 uint64_t version = cur->stable_version();
266 if (unlikely(RawVersionManip::IsDeleting(version)))
267 // XXX: maybe we can only retry at the parent of this node, not the
268 // root node of the b-tree *layer*
270 if (leaf_node *leaf = AsLeafCheck(cur, version)) {
273 search_info->first = leaf;
274 search_info->second = RawVersionManip::Version(version);
276 key_search_ret kret = leaf->key_search(kslice, kslicelen);
277 ssize_t ret = kret.first;
280 typename leaf_node::value_or_node_ptr vn = leaf->values_[ret];
281 const bool is_layer = leaf->is_layer(ret);
282 INVARIANT(!is_layer || kslicelen == 9);
283 varkey suffix(leaf->suffix(ret));
284 if (unlikely(!leaf->check_version(version)))
286 leaf_nodes.push_back(leaf);
290 if (kslicelen == 9 && suffix != kcur.shift())
296 // search the next layer
299 kslice = kcur.slice();
300 kslicelen = std::min(kcur.size(), size_t(9));
304 // leaf might have lost responsibility for key k during the descend. we
305 // need to check this and adjust accordingly
306 if (unlikely(kslice < leaf->min_key_)) {
308 leaf_node *left_sibling = leaf->prev_;
309 if (unlikely(!leaf->check_version(version)))
311 if (likely(left_sibling)) {
315 // XXX: this case shouldn't be possible...
320 leaf_node *right_sibling = leaf->next_;
321 if (unlikely(!leaf->check_version(version)))
323 if (unlikely(!right_sibling)) {
324 leaf_nodes.push_back(leaf);
327 right_sibling->prefetch();
328 uint64_t right_version = right_sibling->stable_version();
329 key_slice right_min_key = right_sibling->min_key_;
330 if (unlikely(!right_sibling->check_version(right_version)))
332 if (unlikely(kslice >= right_min_key)) {
338 leaf_nodes.push_back(leaf);
341 internal_node *internal = AsInternal(cur);
342 internal->prefetch();
343 key_search_ret kret = internal->key_lower_bound_search(kslice);
344 ssize_t ret = kret.first;
346 cur = internal->children_[ret + 1];
348 cur = internal->children_[0];
349 if (unlikely(!internal->check_version(version)))
351 INVARIANT(kret.second);
356 template <typename S>
357 class string_restore {
359 inline string_restore(S &s, size_t n)
361 inline ~string_restore()
370 // recursively read the range from the layer down
372 // all args relative to prefix.
374 // prefix is non-const, meaning that this function might modify the contents.
375 // it is guaranteed, however, to restore prefix to the state it was before the
378 // returns true if we keep going
379 template <typename P>
381 btree<P>::search_range_at_layer(
384 const key_type &lower,
386 const key_type *upper,
387 low_level_search_range_callback &callback) const
389 VERBOSE(std::cerr << "search_range_at_layer: prefix.size()=" << prefix.size() << std::endl);
391 key_slice last_keyslice = 0;
392 size_t last_keyslice_len = 0;
393 bool emitted_last_keyslice = false;
395 last_keyslice = lower.slice();
396 last_keyslice_len = std::min(lower.size(), size_t(9));
397 emitted_last_keyslice = true;
400 key_slice lower_slice = lower.slice();
401 key_slice next_key = lower_slice;
402 const size_t prefix_size = prefix.size();
403 // NB: DON'T CALL RESERVE()
404 //prefix.reserve(prefix_size + 8); // allow for next layer
405 const uint64_t upper_slice = upper ? upper->slice() : 0;
406 string_restore<string_type> restorer(prefix, prefix_size);
407 while (!upper || next_key <= upper_slice) {
410 typename util::vec<leaf_kvinfo>::type buf;
411 const uint64_t version = leaf->stable_version();
412 key_slice leaf_min_key = leaf->min_key_;
413 if (leaf_min_key > next_key) {
415 leaf_node *left_sibling = leaf->prev_;
416 if (unlikely(!leaf->check_version(version)))
417 // try this node again
419 // try from left_sibling
425 // grab all keys in [lower_slice, upper_slice]. we'll do boundary condition
426 // checking later (outside of the critical section)
427 for (size_t i = 0; i < leaf->key_slots_used(); i++) {
428 // XXX(stephentu): this 1st filter is overly conservative and we can do
430 if ((leaf->keys_[i] > lower_slice ||
431 (leaf->keys_[i] == lower_slice &&
432 leaf->keyslice_length(i) >= std::min(lower.size(), size_t(9)))) &&
433 (!upper || leaf->keys_[i] <= upper_slice))
438 leaf->keyslice_length(i),
442 leaf_node *const right_sibling = leaf->next_;
443 key_slice leaf_max_key = right_sibling ? right_sibling->min_key_ : 0;
445 if (unlikely(!leaf->check_version(version)))
448 callback.on_resp_node(leaf, RawVersionManip::Version(version));
450 for (size_t i = 0; i < buf.size(); i++) {
451 // check to see if we already omitted a key <= buf[i]: if so, don't omit it
452 if (emitted_last_keyslice &&
453 ((buf[i].key_ < last_keyslice) ||
454 (buf[i].key_ == last_keyslice && buf[i].length_ <= last_keyslice_len)))
456 const size_t ncpy = std::min(buf[i].length_, size_t(8));
457 // XXX: prefix.end() calls _M_leak()
458 //prefix.replace(prefix.begin() + prefix_size, prefix.end(), buf[i].keyslice(), ncpy);
459 prefix.replace(prefix_size, string_type::npos, buf[i].keyslice(), ncpy);
461 // recurse into layer
462 leaf_node *const next_layer = leftmost_descend_layer(buf[i].vn_.n_);
464 if (emitted_last_keyslice && last_keyslice == buf[i].key_)
465 // NB(stephentu): this is implied by the filter above
466 INVARIANT(last_keyslice_len <= 8);
467 if (!search_range_at_layer(next_layer, prefix, zerokey, false, NULL, callback))
470 // check if we are before the start
471 if (buf[i].key_ == lower_slice) {
472 if (buf[i].length_ <= 8) {
473 if (buf[i].length_ < lower.size())
477 INVARIANT(buf[i].length_ == 9);
478 if (lower.size() > 8 && buf[i].suffix_ < lower.shift())
484 // check if we are after the end
485 if (upper && buf[i].key_ == upper_slice) {
486 if (buf[i].length_ == 9) {
487 if (upper->size() <= 8)
489 if (buf[i].suffix_ >= upper->shift())
491 } else if (buf[i].length_ >= upper->size()) {
495 if (buf[i].length_ == 9)
496 prefix.append((const char *) buf[i].suffix_.data(), buf[i].suffix_.size());
497 // we give the actual version # minus all the other bits, b/c they are not
498 // important here and make comparison easier at higher layers
499 if (!callback.invoke(prefix, buf[i].vn_.v_, leaf, RawVersionManip::Version(version)))
502 last_keyslice = buf[i].key_;
503 last_keyslice_len = buf[i].length_;
504 emitted_last_keyslice = true;
511 next_key = leaf_max_key;
512 leaf = right_sibling;
518 template <typename P>
520 btree<P>::search_range_call(const key_type &lower,
521 const key_type *upper,
522 low_level_search_range_callback &callback,
523 string_type *buf) const
526 INVARIANT(rcu::s_instance.in_rcu_region());
527 if (unlikely(upper && *upper <= lower))
529 typename util::vec<leaf_node *>::type leaf_nodes;
531 search_impl(lower, v, leaf_nodes);
532 INVARIANT(!leaf_nodes.empty());
534 string_type prefix_tmp, *prefix_px;
538 prefix_px = &prefix_tmp;
539 string_type &prefix(*prefix_px);
540 INVARIANT(prefix.empty());
541 prefix.assign((const char *) lower.data(), 8 * (leaf_nodes.size() - 1));
542 while (!leaf_nodes.empty()) {
543 leaf_node *cur = leaf_nodes.back();
544 leaf_nodes.pop_back();
545 key_type layer_upper;
546 bool layer_has_upper = false;
547 if (upper && upper->size() >= (8 * leaf_nodes.size())) {
548 layer_upper = upper->shift_many(leaf_nodes.size());
549 layer_has_upper = true;
551 #ifdef CHECK_INVARIANTS
552 string_type prefix_before(prefix);
554 if (!search_range_at_layer(
555 cur, prefix, lower.shift_many(leaf_nodes.size()),
556 first, layer_has_upper ? &layer_upper : NULL, callback))
558 #ifdef CHECK_INVARIANTS
559 INVARIANT(prefix == prefix_before);
562 if (!leaf_nodes.empty()) {
563 INVARIANT(prefix.size() >= 8);
564 prefix.resize(prefix.size() - 8);
569 template <typename P>
571 btree<P>::remove_stable_location(node **root_location, const key_type &k, value_type *old_v)
573 INVARIANT(rcu::s_instance.in_rcu_region());
576 node *replace_node = NULL;
577 typename util::vec<remove_parent_entry>::type parents;
578 typename util::vec<node *>::type locked_nodes;
579 node *local_root = *root_location;
580 remove_status status = remove0(local_root,
585 NULL, /* left_node */
586 NULL, /* right_node */
599 INVARIANT(local_root->is_deleting());
600 INVARIANT(local_root->is_lock_owner());
601 INVARIANT(local_root->is_root());
602 INVARIANT(local_root == *root_location);
603 replace_node->set_root();
604 local_root->clear_root();
605 COMPILER_MEMORY_FENCE;
606 *root_location = replace_node;
607 // locks are still held here
608 return UnlockAndReturn(locked_nodes, true);
610 ALWAYS_ASSERT(false);
613 ALWAYS_ASSERT(false);
617 template <typename P>
618 typename btree<P>::leaf_node *
619 btree<P>::leftmost_descend_layer(node *n) const
623 if (leaf_node *leaf = AsLeafCheck(cur))
625 internal_node *internal = AsInternal(cur);
626 uint64_t version = cur->stable_version();
627 node *child = internal->children_[0];
628 if (unlikely(!internal->check_version(version)))
634 template <typename P>
636 btree<P>::tree_walk(tree_walk_callback &callback) const
639 INVARIANT(rcu::s_instance.in_rcu_region());
640 std::vector<node *> q;
641 // XXX: not sure if cast is safe
642 q.push_back((node *) root_);
644 node *cur = q.back();
647 leaf_node *leaf = leftmost_descend_layer(cur);
652 const uint64_t version = leaf->stable_version();
653 const size_t n = leaf->key_slots_used();
654 std::vector<node *> layers;
655 for (size_t i = 0; i < n; i++)
656 if (leaf->is_layer(i))
657 layers.push_back(leaf->values_[i].n_);
658 leaf_node *next = leaf->next_;
659 callback.on_node_begin(leaf);
660 if (unlikely(!leaf->check_version(version))) {
661 callback.on_node_failure();
664 callback.on_node_success();
666 q.insert(q.end(), layers.begin(), layers.end());
671 template <typename P>
673 btree<P>::size_walk_callback::on_node_begin(const node_opaque_t *n)
675 INVARIANT(n->is_leaf_node());
676 INVARIANT(spec_size_ == 0);
677 const leaf_node *leaf = (const leaf_node *) n;
678 const size_t sz = leaf->key_slots_used();
679 for (size_t i = 0; i < sz; i++)
680 if (!leaf->is_layer(i))
684 template <typename P>
686 btree<P>::size_walk_callback::on_node_success()
692 template <typename P>
694 btree<P>::size_walk_callback::on_node_failure()
699 template <typename P>
700 typename btree<P>::leaf_node *
701 btree<P>::FindRespLeafNode(
707 version = leaf->stable_version();
708 if (unlikely(leaf->is_deleting())) {
709 leaf_node *left = leaf->prev_;
714 leaf_node *right = leaf->next_;
719 // XXX(stephentu): not sure if we can *really* depend on this,
720 // need to convince ourselves why this is not possible!
721 ALWAYS_ASSERT(false);
723 if (unlikely(kslice < leaf->min_key_)) {
724 // we need to go left
725 leaf_node *left = leaf->prev_;
730 leaf_node *right = leaf->next_;
732 // NB(stephentu): the only way for right->min_key_ to decrease, is for the
733 // current leaf node to split. therefore, it is un-necessary to ensure stable
734 // version on the next node (we only need to ensure it on the current node)
735 if (likely(right) && unlikely(kslice >= right->min_key_)) {
740 //if (likely(right)) {
741 // const uint64_t right_version = right->stable_version();
742 // const uint64_t right_min_key = right->min_key_;
743 // if (unlikely(!right->check_version(right_version)))
745 // if (unlikely(kslice >= right_min_key)) {
754 template <typename P>
755 typename btree<P>::leaf_node *
756 btree<P>::FindRespLeafLowerBound(
763 ssize_t &idxlowerbound)
765 leaf = FindRespLeafNode(leaf, kslice, version);
767 // use 0 for slice length, so we can a pointer <= all elements
768 // with the same slice
769 const key_search_ret kret = leaf->key_lower_bound_search(kslice, 0);
770 const ssize_t ret = kret.first;
773 // count the number of values already here with the same kslice.
776 for (size_t i = (ret == -1 ? 0 : ret); i < n; i++) {
777 if (leaf->keys_[i] < kslice) {
779 } else if (leaf->keys_[i] == kslice) {
780 const size_t kslicelen0 = leaf->keyslice_length(i);
781 if (kslicelen0 <= kslicelen) {
782 // invariant doesn't hold, b/c values can be changing
783 // concurrently (leaf is not assumed to be locked)
784 //INVARIANT(idxmatch == -1);
786 if (kslicelen0 == kslicelen)
797 template <typename P>
798 typename btree<P>::leaf_node *
799 btree<P>::FindRespLeafExact(
807 leaf = FindRespLeafNode(leaf, kslice, version);
808 key_search_ret kret = leaf->key_search(kslice, kslicelen);
809 idxmatch = kret.first;
814 template <typename P>
815 typename btree<P>::insert_status
816 btree<P>::insert0(node *np,
821 insert_info_t *insert_info,
824 typename util::vec<insert_parent_entry>::type &parents,
825 typename util::vec<node *>::type &locked_nodes)
827 uint64_t kslice = k.slice();
828 size_t kslicelen = std::min(k.size(), size_t(9));
831 if (leaf_node *leaf = AsLeafCheck(np)) {
832 // locked nodes are acquired bottom to top
833 INVARIANT(locked_nodes.empty());
838 ssize_t lenmatch, lenlowerbound;
839 leaf_node *resp_leaf = FindRespLeafLowerBound(
840 leaf, kslice, kslicelen, version, n, lenmatch, lenlowerbound);
843 if (lenmatch != -1) {
845 if (kslicelen <= 8 ||
846 (!resp_leaf->is_layer(lenmatch) &&
847 resp_leaf->suffix(lenmatch) == k.shift())) {
848 const uint64_t locked_version = resp_leaf->lock();
849 if (unlikely(!btree::CheckVersion(version, locked_version))) {
853 locked_nodes.push_back(resp_leaf);
854 // easy case- we don't modify the node itself
856 *old_v = resp_leaf->values_[lenmatch].v_;
858 resp_leaf->values_[lenmatch].v_ = v;
860 insert_info->node = 0;
861 return UnlockAndReturn(locked_nodes, I_NONE_NOMOD);
863 INVARIANT(kslicelen == 9);
864 if (resp_leaf->is_layer(lenmatch)) {
865 node *subroot = resp_leaf->values_[lenmatch].n_;
867 if (unlikely(!resp_leaf->check_version(version)))
871 typename util::vec<insert_parent_entry>::type subparents;
872 typename util::vec<node *>::type sub_locked_nodes;
873 const insert_status status =
874 insert0(subroot, k.shift(), v, only_if_absent, old_v, insert_info,
875 mk, ret, subparents, sub_locked_nodes);
881 INVARIANT(sub_locked_nodes.empty());
885 // the subroot split, so we need to find the leaf again, lock the
886 // node, and create a new internal node
889 INVARIANT(ret->key_slots_used() > 0);
892 resp_leaf = FindRespLeafLowerBound(
893 resp_leaf, kslice, kslicelen, version, n, lenmatch, lenlowerbound);
894 const uint64_t locked_version = resp_leaf->lock();
895 if (likely(btree::CheckVersion(version, locked_version))) {
896 locked_nodes.push_back(resp_leaf);
902 INVARIANT(lenmatch != -1);
903 INVARIANT(resp_leaf->is_layer(lenmatch));
904 subroot = resp_leaf->values_[lenmatch].n_;
905 INVARIANT(subroot->is_modifying());
906 INVARIANT(subroot->is_lock_owner());
907 INVARIANT(subroot->is_root());
909 internal_node *new_root = internal_node::alloc();
910 #ifdef CHECK_INVARIANTS
912 new_root->mark_modifying();
913 locked_nodes.push_back(new_root);
914 #endif /* CHECK_INVARIANTS */
915 new_root->children_[0] = subroot;
916 new_root->children_[1] = ret;
917 new_root->keys_[0] = mk;
918 new_root->set_key_slots_used(1);
919 new_root->set_root();
920 subroot->clear_root();
921 resp_leaf->values_[lenmatch].n_ = new_root;
923 // locks are still held here
924 UnlockNodes(sub_locked_nodes);
925 return UnlockAndReturn(locked_nodes, I_NONE_MOD);
927 ALWAYS_ASSERT(false);
930 const uint64_t locked_version = resp_leaf->lock();
931 if (unlikely(!btree::CheckVersion(version, locked_version))) {
935 locked_nodes.push_back(resp_leaf);
937 INVARIANT(resp_leaf->suffixes_); // b/c lenmatch != -1 and this is not a layer
938 // need to create a new btree layer, and add both existing key and
941 // XXX: need to mark modifying because we cannot change both the
942 // value and the type atomically
943 resp_leaf->mark_modifying();
945 leaf_node *new_root = leaf_node::alloc();
946 #ifdef CHECK_INVARIANTS
948 new_root->mark_modifying();
949 #endif /* CHECK_INVARIANTS */
950 new_root->set_root();
951 varkey old_slice(resp_leaf->suffix(lenmatch));
952 new_root->keys_[0] = old_slice.slice();
953 new_root->values_[0] = resp_leaf->values_[lenmatch];
954 new_root->keyslice_set_length(0, std::min(old_slice.size(), size_t(9)), false);
955 new_root->inc_key_slots_used();
956 if (new_root->keyslice_length(0) == 9) {
957 new_root->alloc_suffixes();
958 rcu_imstring i(old_slice.data() + 8, old_slice.size() - 8);
959 new_root->suffixes_[0].swap(i);
961 resp_leaf->values_[lenmatch].n_ = new_root;
964 resp_leaf->suffixes_[lenmatch].swap(i);
966 resp_leaf->value_set_layer(lenmatch);
967 #ifdef CHECK_INVARIANTS
969 #endif /* CHECK_INVARIANTS */
973 typename util::vec<insert_parent_entry>::type subparents;
974 typename util::vec<node *>::type sub_locked_nodes;
975 const insert_status status =
976 insert0(new_root, k.shift(), v, only_if_absent, old_v, insert_info,
977 mk, ret, subparents, sub_locked_nodes);
978 if (status != I_NONE_MOD)
980 INVARIANT(sub_locked_nodes.empty());
981 return UnlockAndReturn(locked_nodes, I_NONE_MOD);
985 // lenlowerbound + 1 is the slot (0-based index) we want the new key to go
986 // into, in the leaf node
987 if (n < NKeysPerNode) {
988 const uint64_t locked_version = resp_leaf->lock();
989 if (unlikely(!btree::CheckVersion(version, locked_version))) {
993 locked_nodes.push_back(resp_leaf);
995 // also easy case- we only need to make local modifications
996 resp_leaf->mark_modifying();
998 sift_right(resp_leaf->keys_, lenlowerbound + 1, n);
999 resp_leaf->keys_[lenlowerbound + 1] = kslice;
1000 sift_right(resp_leaf->values_, lenlowerbound + 1, n);
1001 resp_leaf->values_[lenlowerbound + 1].v_ = v;
1002 sift_right(resp_leaf->lengths_, lenlowerbound + 1, n);
1003 resp_leaf->keyslice_set_length(lenlowerbound + 1, kslicelen, false);
1004 if (resp_leaf->suffixes_)
1005 sift_swap_right(resp_leaf->suffixes_, lenlowerbound + 1, n);
1006 if (kslicelen == 9) {
1007 resp_leaf->ensure_suffixes();
1008 rcu_imstring i(k.data() + 8, k.size() - 8);
1009 resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1010 } else if (resp_leaf->suffixes_) {
1012 resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1014 resp_leaf->inc_key_slots_used();
1016 //#ifdef CHECK_INVARIANTS
1017 // resp_leaf->base_invariant_unique_keys_check();
1020 insert_info->node = resp_leaf;
1021 insert_info->old_version = RawVersionManip::Version(resp_leaf->unstable_version()); // we hold lock on leaf
1022 insert_info->new_version = insert_info->old_version + 1;
1024 return UnlockAndReturn(locked_nodes, I_NONE_MOD);
1026 INVARIANT(n == NKeysPerNode);
1028 if (unlikely(resp_leaf != leaf))
1029 // sigh, we really do need parent points- if resp_leaf != leaf, then
1030 // all the parent points we saved on the way down are no longer valid
1031 return UnlockAndReturn(locked_nodes, I_RETRY);
1032 const uint64_t locked_version = resp_leaf->lock();
1033 if (unlikely(!btree::CheckVersion(version, locked_version))) {
1034 resp_leaf->unlock();
1035 goto retry_cur_leaf;
1037 locked_nodes.push_back(resp_leaf);
1039 // we need to split the current node, potentially causing a bunch of
1040 // splits to happen in ancestors. to make this safe w/o
1041 // using a very complicated locking protocol, we will first acquire all
1042 // locks on nodes which will be modified, in left-to-right,
1043 // bottom-to-top order
1045 if (parents.empty()) {
1046 if (unlikely(!resp_leaf->is_root()))
1047 return UnlockAndReturn(locked_nodes, I_RETRY);
1048 //INVARIANT(resp_leaf == root);
1050 for (auto rit = parents.rbegin(); rit != parents.rend(); ++rit) {
1052 node *p = rit->first;
1054 locked_nodes.push_back(p);
1055 if (unlikely(!p->check_version(rit->second)))
1056 // in traversing down the tree, an ancestor of this node was
1057 // modified- to be safe, we start over
1058 return UnlockAndReturn(locked_nodes, I_RETRY);
1059 if ((rit + 1) == parents.rend()) {
1060 // did the root change?
1061 if (unlikely(!p->is_root()))
1062 return UnlockAndReturn(locked_nodes, I_RETRY);
1063 //INVARIANT(p == root);
1066 // since the child needs a split, see if we have room in the parent-
1067 // if we don't have room, we'll also need to split the parent, in which
1068 // case we must grab its parent's lock
1069 INVARIANT(p->is_internal_node());
1070 size_t parent_n = p->key_slots_used();
1071 INVARIANT(parent_n > 0 && parent_n <= NKeysPerNode);
1072 if (parent_n < NKeysPerNode)
1073 // can stop locking up now, since this node won't split
1078 // at this point, we have locked all nodes which will be split/modified
1079 // modulo the new nodes to be created
1080 resp_leaf->mark_modifying();
1082 leaf_node *new_leaf = leaf_node::alloc();
1083 new_leaf->prefetch();
1085 #ifdef CHECK_INVARIANTS
1087 new_leaf->mark_modifying();
1088 locked_nodes.push_back(new_leaf);
1089 #endif /* CHECK_INVARIANTS */
1091 if (!resp_leaf->next_ && resp_leaf->keys_[n - 1] < kslice) {
1092 // sequential insert optimization- in this case, we don't bother
1093 // splitting the node. instead, keep the current leaf node full, and
1094 // insert the new key into the new leaf node (violating the btree invariant)
1096 // this optimization is commonly implemented, including in masstree and
1097 // berkeley db- w/o this optimization, sequential inserts leave the all
1100 new_leaf->keys_[0] = kslice;
1101 new_leaf->values_[0].v_ = v;
1102 new_leaf->keyslice_set_length(0, kslicelen, false);
1103 if (kslicelen == 9) {
1104 new_leaf->alloc_suffixes();
1105 rcu_imstring i(k.data() + 8, k.size() - 8);
1106 new_leaf->suffixes_[0].swap(i);
1108 new_leaf->set_key_slots_used(1);
1113 // compute how many keys the smaller node would have if we put the
1114 // new key in the left (old) or right (new) side of the split.
1115 // then choose the split which maximizes the mininum.
1117 // XXX: do this in a more elegant way
1119 // a split point S is a number such that indices [0, s) go into the left
1120 // partition, and indices [s, N) go into the right partition
1121 size_t left_split_point, right_split_point;
1123 left_split_point = NKeysPerNode / 2;
1124 for (ssize_t i = left_split_point - 1; i >= 0; i--) {
1125 if (likely(resp_leaf->keys_[i] != resp_leaf->keys_[left_split_point]))
1129 INVARIANT(left_split_point <= NKeysPerNode);
1130 INVARIANT(left_split_point == 0 || resp_leaf->keys_[left_split_point - 1] != resp_leaf->keys_[left_split_point]);
1132 right_split_point = NKeysPerNode / 2;
1133 for (ssize_t i = right_split_point - 1; i >= 0 && i < ssize_t(NKeysPerNode) - 1; i++) {
1134 if (likely(resp_leaf->keys_[i] != resp_leaf->keys_[right_split_point]))
1136 right_split_point++;
1138 INVARIANT(right_split_point <= NKeysPerNode);
1139 INVARIANT(right_split_point == 0 || resp_leaf->keys_[right_split_point - 1] != resp_leaf->keys_[right_split_point]);
1142 if (std::min(left_split_point, NKeysPerNode - left_split_point) <
1143 std::min(right_split_point, NKeysPerNode - right_split_point))
1144 split_point = right_split_point;
1146 split_point = left_split_point;
1148 if (split_point <= size_t(lenlowerbound + 1) && resp_leaf->keys_[split_point - 1] != kslice) {
1149 // put new key in new leaf (right)
1150 size_t pos = lenlowerbound + 1 - split_point;
1152 copy_into(&new_leaf->keys_[0], resp_leaf->keys_, split_point, lenlowerbound + 1);
1153 new_leaf->keys_[pos] = kslice;
1154 copy_into(&new_leaf->keys_[pos + 1], resp_leaf->keys_, lenlowerbound + 1, NKeysPerNode);
1156 copy_into(&new_leaf->values_[0], resp_leaf->values_, split_point, lenlowerbound + 1);
1157 new_leaf->values_[pos].v_ = v;
1158 copy_into(&new_leaf->values_[pos + 1], resp_leaf->values_, lenlowerbound + 1, NKeysPerNode);
1160 copy_into(&new_leaf->lengths_[0], resp_leaf->lengths_, split_point, lenlowerbound + 1);
1161 new_leaf->keyslice_set_length(pos, kslicelen, false);
1162 copy_into(&new_leaf->lengths_[pos + 1], resp_leaf->lengths_, lenlowerbound + 1, NKeysPerNode);
1164 if (resp_leaf->suffixes_) {
1165 new_leaf->ensure_suffixes();
1166 swap_with(&new_leaf->suffixes_[0], resp_leaf->suffixes_, split_point, lenlowerbound + 1);
1168 if (kslicelen == 9) {
1169 new_leaf->ensure_suffixes();
1170 rcu_imstring i(k.data() + 8, k.size() - 8);
1171 new_leaf->suffixes_[pos].swap(i);
1172 } else if (new_leaf->suffixes_) {
1174 new_leaf->suffixes_[pos].swap(i);
1176 if (resp_leaf->suffixes_) {
1177 new_leaf->ensure_suffixes();
1178 swap_with(&new_leaf->suffixes_[pos + 1], resp_leaf->suffixes_, lenlowerbound + 1, NKeysPerNode);
1181 resp_leaf->set_key_slots_used(split_point);
1182 new_leaf->set_key_slots_used(NKeysPerNode - split_point + 1);
1184 #ifdef CHECK_INVARIANTS
1185 resp_leaf->base_invariant_unique_keys_check();
1186 new_leaf->base_invariant_unique_keys_check();
1187 INVARIANT(resp_leaf->keys_[split_point - 1] < new_leaf->keys_[0]);
1188 #endif /* CHECK_INVARIANTS */
1191 // XXX: not really sure if this invariant is true, but we rely
1193 INVARIANT(size_t(lenlowerbound + 1) <= split_point);
1195 // put new key in original leaf
1196 copy_into(&new_leaf->keys_[0], resp_leaf->keys_, split_point, NKeysPerNode);
1197 copy_into(&new_leaf->values_[0], resp_leaf->values_, split_point, NKeysPerNode);
1198 copy_into(&new_leaf->lengths_[0], resp_leaf->lengths_, split_point, NKeysPerNode);
1199 if (resp_leaf->suffixes_) {
1200 new_leaf->ensure_suffixes();
1201 swap_with(&new_leaf->suffixes_[0], resp_leaf->suffixes_, split_point, NKeysPerNode);
1204 sift_right(resp_leaf->keys_, lenlowerbound + 1, split_point);
1205 resp_leaf->keys_[lenlowerbound + 1] = kslice;
1206 sift_right(resp_leaf->values_, lenlowerbound + 1, split_point);
1207 resp_leaf->values_[lenlowerbound + 1].v_ = v;
1208 sift_right(resp_leaf->lengths_, lenlowerbound + 1, split_point);
1209 resp_leaf->keyslice_set_length(lenlowerbound + 1, kslicelen, false);
1210 if (resp_leaf->suffixes_)
1211 sift_swap_right(resp_leaf->suffixes_, lenlowerbound + 1, split_point);
1212 if (kslicelen == 9) {
1213 resp_leaf->ensure_suffixes();
1214 rcu_imstring i(k.data() + 8, k.size() - 8);
1215 resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1216 } else if (resp_leaf->suffixes_) {
1218 resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1221 resp_leaf->set_key_slots_used(split_point + 1);
1222 new_leaf->set_key_slots_used(NKeysPerNode - split_point);
1224 #ifdef CHECK_INVARIANTS
1225 resp_leaf->base_invariant_unique_keys_check();
1226 new_leaf->base_invariant_unique_keys_check();
1227 INVARIANT(resp_leaf->keys_[split_point] < new_leaf->keys_[0]);
1228 #endif /* CHECK_INVARIANTS */
1232 // pointer adjustment
1233 new_leaf->prev_ = resp_leaf;
1234 new_leaf->next_ = resp_leaf->next_;
1235 if (resp_leaf->next_)
1236 resp_leaf->next_->prev_ = new_leaf;
1237 resp_leaf->next_ = new_leaf;
1239 min_key = new_leaf->keys_[0];
1240 new_leaf->min_key_ = min_key;
1241 new_node = new_leaf;
1244 insert_info->node = resp_leaf;
1245 insert_info->old_version = RawVersionManip::Version(resp_leaf->unstable_version()); // we hold lock on leaf
1246 insert_info->new_version = insert_info->old_version + 1;
1252 internal_node *internal = AsInternal(np);
1253 uint64_t version = internal->stable_version();
1254 if (unlikely(RawVersionManip::IsDeleting(version)))
1255 return UnlockAndReturn(locked_nodes, I_RETRY);
1256 key_search_ret kret = internal->key_lower_bound_search(kslice);
1257 ssize_t ret = kret.first;
1258 size_t n = kret.second;
1259 size_t child_idx = (ret == -1) ? 0 : ret + 1;
1260 node *child_ptr = internal->children_[child_idx];
1261 if (unlikely(!internal->check_version(version)))
1262 return UnlockAndReturn(locked_nodes, I_RETRY);
1263 parents.push_back(insert_parent_entry(internal, version));
1265 node *new_child = NULL;
1266 insert_status status =
1267 insert0(child_ptr, k, v, only_if_absent, old_v, insert_info,
1268 mk, new_child, parents, locked_nodes);
1269 if (status != I_SPLIT) {
1270 INVARIANT(locked_nodes.empty());
1273 INVARIANT(new_child);
1274 INVARIANT(internal->is_locked()); // previous call to insert0() must lock internal node for insertion
1275 INVARIANT(internal->is_lock_owner());
1276 INVARIANT(internal->check_version(version));
1277 INVARIANT(new_child->key_slots_used() > 0);
1279 internal->mark_modifying();
1280 if (n < NKeysPerNode) {
1281 sift_right(internal->keys_, child_idx, n);
1282 internal->keys_[child_idx] = mk;
1283 sift_right(internal->children_, child_idx + 1, n + 1);
1284 internal->children_[child_idx + 1] = new_child;
1285 internal->inc_key_slots_used();
1286 return UnlockAndReturn(locked_nodes, I_NONE_MOD);
1288 INVARIANT(n == NKeysPerNode);
1289 INVARIANT(ret == internal->key_lower_bound_search(mk).first);
1291 internal_node *new_internal = internal_node::alloc();
1292 new_internal->prefetch();
1293 #ifdef CHECK_INVARIANTS
1294 new_internal->lock();
1295 new_internal->mark_modifying();
1296 locked_nodes.push_back(new_internal);
1297 #endif /* CHECK_INVARIANTS */
1299 // there are three cases post-split:
1300 // (1) mk goes in the original node
1301 // (2) mk is the key we push up
1302 // (3) mk goes in the new node
1304 const ssize_t split_point = NMinKeysPerNode - 1;
1305 if (ret < split_point) {
1307 min_key = internal->keys_[split_point];
1309 copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode, NKeysPerNode);
1310 copy_into(&new_internal->children_[0], internal->children_, NMinKeysPerNode, NKeysPerNode + 1);
1311 new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
1313 sift_right(internal->keys_, child_idx, NMinKeysPerNode - 1);
1314 internal->keys_[child_idx] = mk;
1315 sift_right(internal->children_, child_idx + 1, NMinKeysPerNode);
1316 internal->children_[child_idx + 1] = new_child;
1317 internal->set_key_slots_used(NMinKeysPerNode);
1319 } else if (ret == split_point) {
1323 copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode, NKeysPerNode);
1324 copy_into(&new_internal->children_[1], internal->children_, NMinKeysPerNode + 1, NKeysPerNode + 1);
1325 new_internal->children_[0] = new_child;
1326 new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
1327 internal->set_key_slots_used(NMinKeysPerNode);
1331 min_key = internal->keys_[NMinKeysPerNode];
1333 size_t pos = child_idx - NMinKeysPerNode - 1;
1335 copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode + 1, child_idx);
1336 new_internal->keys_[pos] = mk;
1337 copy_into(&new_internal->keys_[pos + 1], internal->keys_, child_idx, NKeysPerNode);
1339 copy_into(&new_internal->children_[0], internal->children_, NMinKeysPerNode + 1, child_idx + 1);
1340 new_internal->children_[pos + 1] = new_child;
1341 copy_into(&new_internal->children_[pos + 2], internal->children_, child_idx + 1, NKeysPerNode + 1);
1343 new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
1344 internal->set_key_slots_used(NMinKeysPerNode);
1347 INVARIANT(internal->keys_[internal->key_slots_used() - 1] < new_internal->keys_[0]);
1348 new_node = new_internal;
1354 template <typename P>
1356 btree<P>::insert_stable_location(
1357 node **root_location, const key_type &k, value_type v,
1358 bool only_if_absent, value_type *old_v,
1359 insert_info_t *insert_info)
1361 INVARIANT(rcu::s_instance.in_rcu_region());
1365 typename util::vec<insert_parent_entry>::type parents;
1366 typename util::vec<node *>::type locked_nodes;
1367 node *local_root = *root_location;
1368 const insert_status status =
1369 insert0(local_root, k, v, only_if_absent, old_v, insert_info,
1370 mk, ret, parents, locked_nodes);
1371 INVARIANT(status == I_SPLIT || locked_nodes.empty());
1381 INVARIANT(ret->key_slots_used() > 0);
1382 INVARIANT(local_root->is_modifying());
1383 INVARIANT(local_root->is_lock_owner());
1384 INVARIANT(local_root->is_root());
1385 INVARIANT(local_root == *root_location);
1386 internal_node *new_root = internal_node::alloc();
1387 #ifdef CHECK_INVARIANTS
1389 new_root->mark_modifying();
1390 locked_nodes.push_back(new_root);
1391 #endif /* CHECK_INVARIANTS */
1392 new_root->children_[0] = local_root;
1393 new_root->children_[1] = ret;
1394 new_root->keys_[0] = mk;
1395 new_root->set_key_slots_used(1);
1396 new_root->set_root();
1397 local_root->clear_root();
1398 COMPILER_MEMORY_FENCE;
1399 *root_location = new_root;
1400 // locks are still held here
1401 return UnlockAndReturn(locked_nodes, true);
1403 ALWAYS_ASSERT(false);
1408 * remove is very tricky to get right!
1410 * XXX: optimize remove so it holds less locks
1412 template <typename P>
1413 typename btree<P>::remove_status
1414 btree<P>::remove0(node *np,
1422 node *&replace_node,
1423 typename util::vec<remove_parent_entry>::type &parents,
1424 typename util::vec<node *>::type &locked_nodes)
1426 uint64_t kslice = k.slice();
1427 size_t kslicelen = std::min(k.size(), size_t(9));
1430 if (leaf_node *leaf = AsLeafCheck(np)) {
1431 INVARIANT(locked_nodes.empty());
1433 SINGLE_THREADED_INVARIANT(!left_node || (leaf->prev_ == left_node && AsLeaf(left_node)->next == leaf));
1434 SINGLE_THREADED_INVARIANT(!right_node || (leaf->next_ == right_node && AsLeaf(right_node)->prev == leaf));
1440 leaf_node *resp_leaf = FindRespLeafExact(
1441 leaf, kslice, kslicelen, version, n, ret);
1444 if (unlikely(!resp_leaf->check_version(version)))
1445 goto retry_cur_leaf;
1446 return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
1448 if (kslicelen == 9) {
1449 if (resp_leaf->is_layer(ret)) {
1450 node *subroot = resp_leaf->values_[ret].n_;
1452 if (unlikely(!resp_leaf->check_version(version)))
1453 goto retry_cur_leaf;
1456 node *replace_node = NULL;
1457 typename util::vec<remove_parent_entry>::type sub_parents;
1458 typename util::vec<node *>::type sub_locked_nodes;
1459 remove_status status = remove0(subroot,
1464 NULL, /* left_node */
1465 NULL, /* right_node */
1474 INVARIANT(sub_locked_nodes.empty());
1477 case R_REPLACE_NODE:
1478 INVARIANT(replace_node);
1480 resp_leaf = FindRespLeafExact(
1481 resp_leaf, kslice, kslicelen, version, n, ret);
1482 const uint64_t locked_version = resp_leaf->lock();
1483 if (likely(btree::CheckVersion(version, locked_version))) {
1484 locked_nodes.push_back(resp_leaf);
1487 resp_leaf->unlock();
1490 INVARIANT(subroot->is_deleting());
1491 INVARIANT(subroot->is_lock_owner());
1492 INVARIANT(subroot->is_root());
1493 replace_node->set_root();
1494 subroot->clear_root();
1495 resp_leaf->values_[ret].n_ = replace_node;
1497 // XXX: need to re-merge back when layer size becomes 1, but skip this
1500 // locks are still held here
1501 UnlockNodes(sub_locked_nodes);
1502 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1507 ALWAYS_ASSERT(false);
1511 if (resp_leaf->suffix(ret) != k.shift()) {
1512 if (unlikely(!resp_leaf->check_version(version)))
1513 goto retry_cur_leaf;
1514 return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
1519 //INVARIANT(!resp_leaf->is_layer(ret));
1520 if (n > NMinKeysPerNode) {
1521 const uint64_t locked_version = resp_leaf->lock();
1522 if (unlikely(!btree::CheckVersion(version, locked_version))) {
1523 resp_leaf->unlock();
1524 goto retry_cur_leaf;
1526 locked_nodes.push_back(resp_leaf);
1528 *old_v = resp_leaf->values_[ret].v_;
1529 resp_leaf->mark_modifying();
1530 remove_pos_from_leaf_node(resp_leaf, ret, n);
1531 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1534 if (unlikely(resp_leaf != leaf))
1535 return UnlockAndReturn(locked_nodes, R_RETRY);
1536 const uint64_t locked_version = leaf->lock();
1537 if (unlikely(!btree::CheckVersion(version, locked_version))) {
1539 goto retry_cur_leaf;
1541 locked_nodes.push_back(leaf);
1543 *old_v = leaf->values_[ret].v_;
1545 uint64_t leaf_version = leaf->unstable_version();
1547 leaf_node *left_sibling = AsLeaf(left_node);
1548 leaf_node *right_sibling = AsLeaf(right_node);
1550 // NOTE: remember that our locking discipline is left-to-right,
1551 // bottom-to-top. Here, we must acquire all locks on nodes being
1552 // modified in the tree. How we choose to handle removes is in the
1553 // following preference:
1554 // 1) steal from right node
1555 // 2) merge with right node
1556 // 3) steal from left node
1557 // 4) merge with left node
1559 // Btree invariants guarantee that at least one of the options above is
1560 // available (except for the root node). We pick the right node first,
1561 // because our locking discipline allows us to directly lock the right
1562 // node. If (1) and (2) cannot be satisfied, then we must first
1563 // *unlock* the current node, lock the left node, relock the current
1564 // node, and check nothing changed in between.
1566 if (right_sibling) {
1567 right_sibling->lock();
1568 locked_nodes.push_back(right_sibling);
1569 } else if (left_sibling) {
1571 left_sibling->lock();
1572 locked_nodes.push_back(left_sibling);
1574 if (unlikely(!leaf->check_version(leaf_version)))
1575 return UnlockAndReturn(locked_nodes, R_RETRY);
1577 INVARIANT(parents.empty());
1578 if (unlikely(!leaf->is_root()))
1579 return UnlockAndReturn(locked_nodes, R_RETRY);
1580 //INVARIANT(leaf == root);
1583 for (typename util::vec<remove_parent_entry>::type::reverse_iterator rit = parents.rbegin();
1584 rit != parents.rend(); ++rit) {
1585 node *p = rit->parent_;
1586 node *l = rit->parent_left_sibling_;
1587 node *r = rit->parent_right_sibling_;
1588 uint64_t p_version = rit->parent_version_;
1590 locked_nodes.push_back(p);
1591 if (unlikely(!p->check_version(p_version)))
1592 return UnlockAndReturn(locked_nodes, R_RETRY);
1593 size_t p_n = p->key_slots_used();
1594 if (p_n > NMinKeysPerNode)
1598 locked_nodes.push_back(r);
1602 locked_nodes.push_back(l);
1604 if (unlikely(!p->check_version(p_version)))
1605 return UnlockAndReturn(locked_nodes, R_RETRY);
1607 if (unlikely(!p->is_root()))
1608 return UnlockAndReturn(locked_nodes, R_RETRY);
1609 //INVARIANT(p == root);
1613 leaf->mark_modifying();
1615 if (right_sibling) {
1616 right_sibling->mark_modifying();
1617 size_t right_n = right_sibling->key_slots_used();
1618 if (right_n > NMinKeysPerNode) {
1619 // steal first contiguous key slices from right
1620 INVARIANT(right_sibling->keys_[0] > leaf->keys_[n - 1]);
1622 // indices [0, steal_point) will be taken from the right
1623 size_t steal_point = 1;
1624 for (size_t i = 0; i < right_n - 1; i++, steal_point++)
1625 if (likely(right_sibling->keys_[i] != right_sibling->keys_[steal_point]))
1628 INVARIANT(steal_point <= sizeof(key_slice) + 2);
1629 INVARIANT(steal_point <= right_n);
1631 // to steal, we need to ensure:
1632 // 1) we have enough room to steal
1633 // 2) the right sibling will not be empty after the steal
1634 if ((n - 1 + steal_point) <= NKeysPerNode && steal_point < right_n) {
1635 sift_left(leaf->keys_, ret, n);
1636 copy_into(&leaf->keys_[n - 1], right_sibling->keys_, 0, steal_point);
1637 sift_left(leaf->values_, ret, n);
1638 copy_into(&leaf->values_[n - 1], right_sibling->values_, 0, steal_point);
1639 sift_left(leaf->lengths_, ret, n);
1640 copy_into(&leaf->lengths_[n - 1], right_sibling->lengths_, 0, steal_point);
1641 if (leaf->suffixes_)
1642 sift_swap_left(leaf->suffixes_, ret, n);
1643 if (right_sibling->suffixes_) {
1644 leaf->ensure_suffixes();
1645 swap_with(&leaf->suffixes_[n - 1], right_sibling->suffixes_, 0, steal_point);
1648 sift_left(right_sibling->keys_, 0, right_n, steal_point);
1649 sift_left(right_sibling->values_, 0, right_n, steal_point);
1650 sift_left(right_sibling->lengths_, 0, right_n, steal_point);
1651 if (right_sibling->suffixes_)
1652 sift_swap_left(right_sibling->suffixes_, 0, right_n, steal_point);
1653 leaf->set_key_slots_used(n - 1 + steal_point);
1654 right_sibling->set_key_slots_used(right_n - steal_point);
1655 new_key = right_sibling->keys_[0];
1656 right_sibling->min_key_ = new_key;
1658 #ifdef CHECK_INVARIANTS
1659 leaf->base_invariant_unique_keys_check();
1660 right_sibling->base_invariant_unique_keys_check();
1661 INVARIANT(leaf->keys_[n - 1 + steal_point - 1] < new_key);
1662 #endif /* CHECK_INVARIANTS */
1664 return R_STOLE_FROM_RIGHT;
1666 // can't steal, so try merging- but only merge if we have room,
1667 // otherwise just allow this node to have less elements
1668 if ((n - 1 + right_n) > NKeysPerNode) {
1669 INVARIANT(n > 1); // if we can't steal or merge, we must have
1670 // enough elements to just remove one w/o going empty
1671 remove_pos_from_leaf_node(leaf, ret, n);
1672 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1677 // merge right sibling into this node
1678 INVARIANT(right_sibling->keys_[0] > leaf->keys_[n - 1]);
1679 INVARIANT((right_n + (n - 1)) <= NKeysPerNode);
1681 sift_left(leaf->keys_, ret, n);
1682 copy_into(&leaf->keys_[n - 1], right_sibling->keys_, 0, right_n);
1684 sift_left(leaf->values_, ret, n);
1685 copy_into(&leaf->values_[n - 1], right_sibling->values_, 0, right_n);
1687 sift_left(leaf->lengths_, ret, n);
1688 copy_into(&leaf->lengths_[n - 1], right_sibling->lengths_, 0, right_n);
1690 if (leaf->suffixes_)
1691 sift_swap_left(leaf->suffixes_, ret, n);
1693 if (right_sibling->suffixes_) {
1694 leaf->ensure_suffixes();
1695 swap_with(&leaf->suffixes_[n - 1], right_sibling->suffixes_, 0, right_n);
1698 leaf->set_key_slots_used(right_n + (n - 1));
1699 leaf->next_ = right_sibling->next_;
1700 if (right_sibling->next_)
1701 right_sibling->next_->prev_ = leaf;
1703 // leaf->next_->prev won't change because we hold lock for both leaf
1704 // and right_sibling
1705 INVARIANT(!leaf->next_ || leaf->next_->prev_ == leaf);
1707 // leaf->prev_->next might change, however, since the left node could be
1708 // splitting (and we might hold a pointer to the left-split of the left node,
1709 // before it gets updated)
1710 SINGLE_THREADED_INVARIANT(!leaf->prev_ || leaf->prev_->next_ == leaf);
1712 //#ifdef CHECK_INVARIANTS
1713 // leaf->base_invariant_unique_keys_check();
1715 leaf_node::release(right_sibling);
1716 return R_MERGE_WITH_RIGHT;
1720 left_sibling->mark_modifying();
1721 size_t left_n = left_sibling->key_slots_used();
1722 if (left_n > NMinKeysPerNode) {
1723 // try to steal from left
1724 INVARIANT(left_sibling->keys_[left_n - 1] < leaf->keys_[0]);
1726 // indices [steal_point, left_n) will be taken from the left
1727 size_t steal_point = left_n - 1;
1728 for (ssize_t i = steal_point - 1; i >= 0; i--, steal_point--)
1729 if (likely(left_sibling->keys_[i] != left_sibling->keys_[steal_point]))
1732 size_t nstolen = left_n - steal_point;
1733 INVARIANT(nstolen <= sizeof(key_slice) + 2);
1734 INVARIANT(steal_point < left_n);
1736 if ((n - 1 + nstolen) <= NKeysPerNode && steal_point > 0) {
1737 sift_right(leaf->keys_, ret + 1, n, nstolen - 1);
1738 sift_right(leaf->keys_, 0, ret, nstolen);
1739 copy_into(&leaf->keys_[0], &left_sibling->keys_[0], left_n - nstolen, left_n);
1741 sift_right(leaf->values_, ret + 1, n, nstolen - 1);
1742 sift_right(leaf->values_, 0, ret, nstolen);
1743 copy_into(&leaf->values_[0], &left_sibling->values_[0], left_n - nstolen, left_n);
1745 sift_right(leaf->lengths_, ret + 1, n, nstolen - 1);
1746 sift_right(leaf->lengths_, 0, ret, nstolen);
1747 copy_into(&leaf->lengths_[0], &left_sibling->lengths_[0], left_n - nstolen, left_n);
1749 if (leaf->suffixes_) {
1750 sift_swap_right(leaf->suffixes_, ret + 1, n, nstolen - 1);
1751 sift_swap_right(leaf->suffixes_, 0, ret, nstolen);
1753 if (left_sibling->suffixes_) {
1754 leaf->ensure_suffixes();
1755 swap_with(&leaf->suffixes_[0], &left_sibling->suffixes_[0], left_n - nstolen, left_n);
1758 left_sibling->set_key_slots_used(left_n - nstolen);
1759 leaf->set_key_slots_used(n - 1 + nstolen);
1760 new_key = leaf->keys_[0];
1761 leaf->min_key_ = new_key;
1763 #ifdef CHECK_INVARIANTS
1764 leaf->base_invariant_unique_keys_check();
1765 left_sibling->base_invariant_unique_keys_check();
1766 INVARIANT(left_sibling->keys_[left_n - nstolen - 1] < new_key);
1767 #endif /* CHECK_INVARIANTS */
1769 return R_STOLE_FROM_LEFT;
1771 if ((left_n + (n - 1)) > NKeysPerNode) {
1773 remove_pos_from_leaf_node(leaf, ret, n);
1774 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1779 // merge this node into left sibling
1780 INVARIANT(left_sibling->keys_[left_n - 1] < leaf->keys_[0]);
1781 INVARIANT((left_n + (n - 1)) <= NKeysPerNode);
1783 copy_into(&left_sibling->keys_[left_n], leaf->keys_, 0, ret);
1784 copy_into(&left_sibling->keys_[left_n + ret], leaf->keys_, ret + 1, n);
1786 copy_into(&left_sibling->values_[left_n], leaf->values_, 0, ret);
1787 copy_into(&left_sibling->values_[left_n + ret], leaf->values_, ret + 1, n);
1789 copy_into(&left_sibling->lengths_[left_n], leaf->lengths_, 0, ret);
1790 copy_into(&left_sibling->lengths_[left_n + ret], leaf->lengths_, ret + 1, n);
1792 if (leaf->suffixes_) {
1793 left_sibling->ensure_suffixes();
1794 swap_with(&left_sibling->suffixes_[left_n], leaf->suffixes_, 0, ret);
1795 swap_with(&left_sibling->suffixes_[left_n + ret], leaf->suffixes_, ret + 1, n);
1798 left_sibling->set_key_slots_used(left_n + (n - 1));
1799 left_sibling->next_ = leaf->next_;
1801 leaf->next_->prev_ = left_sibling;
1803 // see comments in right_sibling case above, for why one of them is INVARIANT and
1804 // the other is SINGLE_THREADED_INVARIANT
1805 INVARIANT(!left_sibling->next_ || left_sibling->next_->prev_ == left_sibling);
1806 SINGLE_THREADED_INVARIANT(
1807 !left_sibling->prev_ ||
1808 left_sibling->prev_->next_ == left_sibling);
1810 //left_sibling->base_invariant_unique_keys_check();
1811 leaf_node::release(leaf);
1812 return R_MERGE_WITH_LEFT;
1815 // root node, so we are ok
1816 //INVARIANT(leaf == root);
1817 INVARIANT(leaf->is_root());
1818 remove_pos_from_leaf_node(leaf, ret, n);
1819 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1822 internal_node *internal = AsInternal(np);
1823 uint64_t version = internal->stable_version();
1824 if (unlikely(RawVersionManip::IsDeleting(version)))
1825 return UnlockAndReturn(locked_nodes, R_RETRY);
1826 key_search_ret kret = internal->key_lower_bound_search(kslice);
1827 ssize_t ret = kret.first;
1828 size_t n = kret.second;
1829 size_t child_idx = (ret == -1) ? 0 : ret + 1;
1830 node *child_ptr = internal->children_[child_idx];
1831 key_slice *child_min_key = child_idx == 0 ? NULL : &internal->keys_[child_idx - 1];
1832 key_slice *child_max_key = child_idx == n ? NULL : &internal->keys_[child_idx];
1833 node *child_left_sibling = child_idx == 0 ? NULL : internal->children_[child_idx - 1];
1834 node *child_right_sibling = child_idx == n ? NULL : internal->children_[child_idx + 1];
1835 if (unlikely(!internal->check_version(version)))
1836 return UnlockAndReturn(locked_nodes, R_RETRY);
1837 parents.push_back(remove_parent_entry(internal, left_node, right_node, version));
1841 remove_status status = remove0(child_ptr,
1847 child_right_sibling,
1858 case R_STOLE_FROM_LEFT:
1859 INVARIANT(internal->is_locked());
1860 INVARIANT(internal->is_lock_owner());
1861 internal->keys_[child_idx - 1] = nk;
1862 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1864 case R_STOLE_FROM_RIGHT:
1865 INVARIANT(internal->is_locked());
1866 INVARIANT(internal->is_lock_owner());
1867 internal->keys_[child_idx] = nk;
1868 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1870 case R_MERGE_WITH_LEFT:
1871 case R_MERGE_WITH_RIGHT:
1873 internal->mark_modifying();
1875 size_t del_key_idx, del_child_idx;
1876 if (status == R_MERGE_WITH_LEFT) {
1877 // need to delete key at position (child_idx - 1), and child at
1878 // position (child_idx)
1879 del_key_idx = child_idx - 1;
1880 del_child_idx = child_idx;
1882 // need to delete key at position (child_idx), and child at
1883 // posiiton (child_idx + 1)
1884 del_key_idx = child_idx;
1885 del_child_idx = child_idx + 1;
1888 if (n > NMinKeysPerNode) {
1889 remove_pos_from_internal_node(internal, del_key_idx, del_child_idx, n);
1890 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1893 internal_node *left_sibling = AsInternal(left_node);
1894 internal_node *right_sibling = AsInternal(right_node);
1896 // WARNING: if you change the order of events here, then you must also
1897 // change the locking protocol (see the comment in the leaf node case)
1899 if (right_sibling) {
1900 right_sibling->mark_modifying();
1901 size_t right_n = right_sibling->key_slots_used();
1903 INVARIANT(right_sibling->keys_[0] > internal->keys_[n - 1]);
1904 INVARIANT(*max_key > internal->keys_[n - 1]);
1905 if (right_n > NMinKeysPerNode) {
1907 sift_left(internal->keys_, del_key_idx, n);
1908 internal->keys_[n - 1] = *max_key;
1910 sift_left(internal->children_, del_child_idx, n + 1);
1911 internal->children_[n] = right_sibling->children_[0];
1913 new_key = right_sibling->keys_[0];
1915 sift_left(right_sibling->keys_, 0, right_n);
1916 sift_left(right_sibling->children_, 0, right_n + 1);
1917 right_sibling->dec_key_slots_used();
1919 return R_STOLE_FROM_RIGHT;
1924 sift_left(internal->keys_, del_key_idx, n);
1925 internal->keys_[n - 1] = *max_key;
1926 copy_into(&internal->keys_[n], right_sibling->keys_, 0, right_n);
1928 sift_left(internal->children_, del_child_idx, n + 1);
1929 copy_into(&internal->children_[n], right_sibling->children_, 0, right_n + 1);
1931 internal->set_key_slots_used(n + right_n);
1932 internal_node::release(right_sibling);
1933 return R_MERGE_WITH_RIGHT;
1938 left_sibling->mark_modifying();
1939 size_t left_n = left_sibling->key_slots_used();
1941 INVARIANT(left_sibling->keys_[left_n - 1] < internal->keys_[0]);
1942 INVARIANT(left_sibling->keys_[left_n - 1] < *min_key);
1943 INVARIANT(*min_key < internal->keys_[0]);
1944 if (left_n > NMinKeysPerNode) {
1946 sift_right(internal->keys_, 0, del_key_idx);
1947 internal->keys_[0] = *min_key;
1949 sift_right(internal->children_, 0, del_child_idx);
1950 internal->children_[0] = left_sibling->children_[left_n];
1952 new_key = left_sibling->keys_[left_n - 1];
1953 left_sibling->dec_key_slots_used();
1955 return R_STOLE_FROM_LEFT;
1957 // merge into left sibling
1960 size_t left_key_j = left_n;
1961 size_t left_child_j = left_n + 1;
1963 left_sibling->keys_[left_key_j++] = *min_key;
1965 copy_into(&left_sibling->keys_[left_key_j], internal->keys_, 0, del_key_idx);
1966 left_key_j += del_key_idx;
1967 copy_into(&left_sibling->keys_[left_key_j], internal->keys_, del_key_idx + 1, n);
1969 copy_into(&left_sibling->children_[left_child_j], internal->children_, 0, del_child_idx);
1970 left_child_j += del_child_idx;
1971 copy_into(&left_sibling->children_[left_child_j], internal->children_, del_child_idx + 1, n + 1);
1973 left_sibling->set_key_slots_used(n + left_n);
1974 internal_node::release(internal);
1975 return R_MERGE_WITH_LEFT;
1979 //INVARIANT(internal == root);
1980 INVARIANT(internal->is_root());
1981 remove_pos_from_internal_node(internal, del_key_idx, del_child_idx, n);
1982 INVARIANT(internal->key_slots_used() + 1 == n);
1984 replace_node = internal->children_[0];
1985 internal_node::release(internal);
1986 return R_REPLACE_NODE;
1989 return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1993 ALWAYS_ASSERT(false);
1994 return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
1999 template <typename P>
2001 btree<P>::NodeStringify(const node_opaque_t *n)
2003 std::vector<std::string> keys;
2004 for (size_t i = 0; i < n->key_slots_used(); i++)
2005 keys.push_back(std::string("0x") + util::hexify(n->keys_[i]));
2007 std::ostringstream b;
2008 b << "node[v=" << n->version_info_str()
2009 << ", keys=" << util::format_list(keys.begin(), keys.end());
2011 if (n->is_leaf_node()) {
2012 const leaf_node *leaf = AsLeaf(n);
2013 std::vector<std::string> lengths;
2014 for (size_t i = 0; i < leaf->key_slots_used(); i++) {
2015 std::ostringstream inf;
2016 inf << "<l=" << leaf->keyslice_length(i) << ",is_layer=" << leaf->is_layer(i) << ">";
2017 lengths.push_back(inf.str());
2019 b << ", lengths=" << util::format_list(lengths.begin(), lengths.end());
2021 //const internal_node *internal = AsInternal(n);
2029 template <typename P>
2030 std::vector<std::pair<typename btree<P>::value_type, bool>>
2031 btree<P>::ExtractValues(const node_opaque_t *n)
2033 std::vector< std::pair<value_type, bool> > ret;
2034 if (!n->is_leaf_node())
2036 const leaf_node *leaf = (const leaf_node *) n;
2037 const size_t sz = leaf->key_slots_used();
2038 for (size_t i = 0; i < sz; i++)
2039 if (!leaf->is_layer(i))
2040 ret.emplace_back(leaf->values_[i].v_, leaf->keyslice_length(i) > 8);