benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / masstree_struct.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef MASSTREE_STRUCT_HH
17 #define MASSTREE_STRUCT_HH
18 #include "masstree.hh"
19 #include "nodeversion.hh"
20 #include "stringbag.hh"
21 #include "mtcounters.hh"
22 #include "timestamp.hh"
23 namespace Masstree {
24
25 template <typename P>
26 struct make_nodeversion {
27     typedef typename mass::conditional<P::concurrent,
28                                        nodeversion,
29                                        singlethreaded_nodeversion>::type type;
30 };
31
32 template <typename P>
33 struct make_prefetcher {
34     typedef typename mass::conditional<P::prefetch,
35                                        value_prefetcher<typename P::value_type>,
36                                        do_nothing>::type type;
37 };
38
39 template <typename P>
40 class node_base : public make_nodeversion<P>::type {
41   public:
42     static constexpr bool concurrent = P::concurrent;
43     static constexpr int nikey = 1;
44     typedef leaf<P> leaf_type;
45     typedef internode<P> internode_type;
46     typedef node_base<P> base_type;
47     typedef typename P::value_type value_type;
48     typedef leafvalue<P> leafvalue_type;
49     typedef typename P::ikey_type ikey_type;
50     typedef key<ikey_type> key_type;
51     typedef typename make_nodeversion<P>::type nodeversion_type;
52     typedef typename P::threadinfo_type threadinfo;
53
54     node_base(bool isleaf)
55         : nodeversion_type(isleaf) {
56     }
57
58     int size() const {
59         if (this->isleaf())
60             return static_cast<const leaf_type*>(this)->size();
61         else
62             return static_cast<const internode_type*>(this)->size();
63     }
64
65     inline base_type* parent() const {
66         // almost always an internode
67         if (this->isleaf())
68             return static_cast<const leaf_type*>(this)->parent_;
69         else
70             return static_cast<const internode_type*>(this)->parent_;
71     }
72     static inline base_type* parent_for_layer_root(base_type* higher_layer) {
73         (void) higher_layer;
74         return 0;
75     }
76     static inline bool parent_exists(base_type* p) {
77         return p != 0;
78     }
79     inline bool has_parent() const {
80         return parent_exists(parent());
81     }
82     inline internode_type* locked_parent(threadinfo& ti) const;
83     inline void set_parent(base_type* p) {
84         if (this->isleaf())
85             static_cast<leaf_type*>(this)->parent_ = p;
86         else
87             static_cast<internode_type*>(this)->parent_ = p;
88     }
89     inline base_type* unsplit_ancestor() const {
90         base_type* x = const_cast<base_type*>(this), *p;
91         while (x->has_split() && (p = x->parent()))
92             x = p;
93         return x;
94     }
95     inline leaf_type* leftmost() const {
96         base_type* x = unsplit_ancestor();
97         while (!x->isleaf()) {
98             internode_type* in = static_cast<internode_type*>(x);
99             x = in->child_[0];
100         }
101         return x;
102     }
103
104     inline leaf_type* reach_leaf(const key_type& k, nodeversion_type& version,
105                                  threadinfo& ti) const;
106
107     void prefetch_full() const {
108         for (int i = 0; i < std::min(16 * std::min(P::leaf_width, P::internode_width) + 1, 4 * 64); i += 64)
109             ::prefetch((const char *) this + i);
110     }
111
112     void print(FILE* f, const char* prefix, int indent, int kdepth);
113 };
114
115 template <typename P>
116 class internode : public node_base<P> {
117   public:
118     static constexpr int width = P::internode_width;
119     typedef typename node_base<P>::nodeversion_type nodeversion_type;
120     typedef key<typename P::ikey_type> key_type;
121     typedef typename P::ikey_type ikey_type;
122     typedef typename key_bound<width, P::bound_method>::type bound_type;
123     typedef typename P::threadinfo_type threadinfo;
124
125     uint8_t nkeys_;
126     ikey_type ikey0_[width];
127     node_base<P>* child_[width + 1];
128     node_base<P>* parent_;
129     kvtimestamp_t created_at_[P::debug_level > 0];
130
131     internode()
132         : node_base<P>(false), nkeys_(0), parent_() {
133     }
134
135     static internode<P>* make(threadinfo& ti) {
136         void* ptr = ti.pool_allocate(sizeof(internode<P>),
137                                      memtag_masstree_internode);
138         internode<P>* n = new(ptr) internode<P>;
139         assert(n);
140         if (P::debug_level > 0)
141             n->created_at_[0] = ti.operation_timestamp();
142         return n;
143     }
144
145     int size() const {
146         return nkeys_;
147     }
148
149     key_type get_key(int p) const {
150         return key_type(ikey0_[p]);
151     }
152     ikey_type ikey(int p) const {
153         return ikey0_[p];
154     }
155     int compare_key(ikey_type a, int bp) const {
156         return ::compare(a, ikey(bp));
157     }
158     int compare_key(const key_type& a, int bp) const {
159         return ::compare(a.ikey(), ikey(bp));
160     }
161     inline int stable_last_key_compare(const key_type& k, nodeversion_type v,
162                                        threadinfo& ti) const;
163
164     void prefetch() const {
165         for (int i = 64; i < std::min(16 * width + 1, 4 * 64); i += 64)
166             ::prefetch((const char *) this + i);
167     }
168
169     void print(FILE* f, const char* prefix, int indent, int kdepth);
170
171     void deallocate(threadinfo& ti) {
172         ti.pool_deallocate(this, sizeof(*this), memtag_masstree_internode);
173     }
174     void deallocate_rcu(threadinfo& ti) {
175         ti.pool_deallocate_rcu(this, sizeof(*this), memtag_masstree_internode);
176     }
177
178   private:
179     void assign(int p, ikey_type ikey, node_base<P>* child) {
180         child->set_parent(this);
181         child_[p + 1] = child;
182         ikey0_[p] = ikey;
183     }
184
185     void shift_from(int p, const internode<P>* x, int xp, int n) {
186         masstree_precondition(x != this);
187         if (n) {
188             memcpy(ikey0_ + p, x->ikey0_ + xp, sizeof(ikey0_[0]) * n);
189             memcpy(child_ + p + 1, x->child_ + xp + 1, sizeof(child_[0]) * n);
190         }
191     }
192     void shift_up(int p, int xp, int n) {
193         memmove(ikey0_ + p, ikey0_ + xp, sizeof(ikey0_[0]) * n);
194         for (node_base<P> **a = child_ + p + n, **b = child_ + xp + n; n; --a, --b, --n)
195             *a = *b;
196     }
197     void shift_down(int p, int xp, int n) {
198         memmove(ikey0_ + p, ikey0_ + xp, sizeof(ikey0_[0]) * n);
199         for (node_base<P> **a = child_ + p + 1, **b = child_ + xp + 1; n; ++a, ++b, --n)
200             *a = *b;
201     }
202
203     int split_into(internode<P>* nr, int p, ikey_type ka, node_base<P>* value,
204                    ikey_type& split_ikey, int split_type);
205
206     template <typename PP> friend class tcursor;
207 };
208
209 template <typename P>
210 class leafvalue {
211   public:
212     typedef typename P::value_type value_type;
213     typedef typename make_prefetcher<P>::type prefetcher_type;
214
215     leafvalue() {
216     }
217     leafvalue(value_type v) {
218         u_.v = v;
219     }
220     leafvalue(node_base<P>* n) {
221         u_.x = reinterpret_cast<uintptr_t>(n);
222     }
223
224     static leafvalue<P> make_empty() {
225         return leafvalue<P>(value_type());
226     }
227
228     typedef bool (leafvalue<P>::*unspecified_bool_type)() const;
229     operator unspecified_bool_type() const {
230         return u_.x ? &leafvalue<P>::empty : 0;
231     }
232     bool empty() const {
233         return !u_.x;
234     }
235
236     value_type value() const {
237         return u_.v;
238     }
239     value_type& value() {
240         return u_.v;
241     }
242
243     node_base<P>* layer() const {
244         return reinterpret_cast<node_base<P>*>(u_.x);
245     }
246
247     void prefetch(int keylenx) const {
248         if (!leaf<P>::keylenx_is_layer(keylenx))
249             prefetcher_type()(u_.v);
250         else
251             u_.n->prefetch_full();
252     }
253
254   private:
255     union {
256         node_base<P>* n;
257         value_type v;
258         uintptr_t x;
259     } u_;
260 };
261
262 template <typename P>
263 class leaf : public node_base<P> {
264   public:
265     static constexpr int width = P::leaf_width;
266     typedef typename node_base<P>::nodeversion_type nodeversion_type;
267     typedef key<typename P::ikey_type> key_type;
268     typedef typename node_base<P>::leafvalue_type leafvalue_type;
269     typedef kpermuter<P::leaf_width> permuter_type;
270     typedef typename P::ikey_type ikey_type;
271     typedef typename key_bound<width, P::bound_method>::type bound_type;
272     typedef typename P::threadinfo_type threadinfo;
273     typedef stringbag<uint8_t> internal_ksuf_type;
274     typedef stringbag<uint16_t> external_ksuf_type;
275     static constexpr int ksuf_keylenx = 64;
276     static constexpr int layer_keylenx = 128;
277
278     enum {
279         modstate_insert = 0, modstate_remove = 1, modstate_deleted_layer = 2
280     };
281
282     int8_t extrasize64_;
283     uint8_t modstate_;
284     uint8_t keylenx_[width];
285     typename permuter_type::storage_type permutation_;
286     ikey_type ikey0_[width];
287     leafvalue_type lv_[width];
288     external_ksuf_type* ksuf_;
289     union {
290         leaf<P>* ptr;
291         uintptr_t x;
292     } next_;
293     leaf<P>* prev_;
294     node_base<P>* parent_;
295     kvtimestamp_t node_ts_;
296     kvtimestamp_t created_at_[P::debug_level > 0];
297     internal_ksuf_type iksuf_[0];
298
299     leaf(size_t sz, kvtimestamp_t node_ts)
300         : node_base<P>(true), modstate_(modstate_insert),
301           permutation_(permuter_type::make_empty()),
302           ksuf_(), parent_(), node_ts_(node_ts), iksuf_{} {
303         masstree_precondition(sz % 64 == 0 && sz / 64 < 128);
304         extrasize64_ = (int(sz) >> 6) - ((int(sizeof(*this)) + 63) >> 6);
305         if (extrasize64_ > 0)
306             new((void *)&iksuf_[0]) internal_ksuf_type(width, sz - sizeof(*this));
307     }
308
309     static leaf<P>* make(int ksufsize, kvtimestamp_t node_ts, threadinfo& ti) {
310         size_t sz = iceil(sizeof(leaf<P>) + std::min(ksufsize, 128), 64);
311         void* ptr = ti.pool_allocate(sz, memtag_masstree_leaf);
312         leaf<P>* n = new(ptr) leaf<P>(sz, node_ts);
313         assert(n);
314         if (P::debug_level > 0)
315             n->created_at_[0] = ti.operation_timestamp();
316         return n;
317     }
318     static leaf<P>* make_root(int ksufsize, leaf<P>* parent, threadinfo& ti) {
319         leaf<P>* n = make(ksufsize, parent ? parent->node_ts_ : 0, ti);
320         n->next_.ptr = n->prev_ = 0;
321         n->parent_ = node_base<P>::parent_for_layer_root(parent);
322         n->mark_root();
323         return n;
324     }
325
326     static size_t min_allocated_size() {
327         return (sizeof(leaf<P>) + 63) & ~size_t(63);
328     }
329     size_t allocated_size() const {
330         int es = (extrasize64_ >= 0 ? extrasize64_ : -extrasize64_ - 1);
331         return (sizeof(*this) + es * 64 + 63) & ~size_t(63);
332     }
333     int size() const {
334         return permuter_type::size(permutation_);
335     }
336     permuter_type permutation() const {
337         return permuter_type(permutation_);
338     }
339     typename nodeversion_type::value_type full_version_value() const {
340         static_assert(int(nodeversion_type::traits_type::top_stable_bits) >= int(permuter_type::size_bits), "not enough bits to add size to version");
341         return (this->version_value() << permuter_type::size_bits) + size();
342     }
343     typename nodeversion_type::value_type full_unlocked_version_value() const {
344         static_assert(int(nodeversion_type::traits_type::top_stable_bits) >= int(permuter_type::size_bits), "not enough bits to add size to version");
345         typename node_base<P>::nodeversion_type v(*this);
346         if (v.locked())
347             // subtlely, unlocked_version_value() is different than v.unlock(); v.version_value() because the latter will add a
348             // split bit if we're doing a split. So we do the latter to get the fully correct version.
349             v.unlock();
350         return (v.version_value() << permuter_type::size_bits) + size();
351     }
352
353     using node_base<P>::has_changed;
354     bool has_changed(nodeversion_type oldv,
355                      typename permuter_type::storage_type oldperm) const {
356         return this->has_changed(oldv) || oldperm != permutation_;
357     }
358
359     key_type get_key(int p) const {
360         int keylenx = keylenx_[p];
361         if (!keylenx_has_ksuf(keylenx))
362             return key_type(ikey0_[p], keylenx);
363         else
364             return key_type(ikey0_[p], ksuf(p));
365     }
366     ikey_type ikey(int p) const {
367         return ikey0_[p];
368     }
369     ikey_type ikey_bound() const {
370         return ikey0_[0];
371     }
372     int compare_key(const key_type& a, int bp) const {
373         return a.compare(ikey(bp), keylenx_[bp]);
374     }
375     inline int stable_last_key_compare(const key_type& k, nodeversion_type v,
376                                        threadinfo& ti) const;
377
378     inline leaf<P>* advance_to_key(const key_type& k, nodeversion_type& version,
379                                    threadinfo& ti) const;
380
381     static bool keylenx_is_layer(int keylenx) {
382         return keylenx > 127;
383     }
384     static bool keylenx_has_ksuf(int keylenx) {
385         return keylenx == ksuf_keylenx;
386     }
387
388     bool is_layer(int p) const {
389         return keylenx_is_layer(keylenx_[p]);
390     }
391     bool has_ksuf(int p) const {
392         return keylenx_has_ksuf(keylenx_[p]);
393     }
394     Str ksuf(int p, int keylenx) const {
395         masstree_precondition(keylenx_has_ksuf(keylenx));
396         return ksuf_ ? ksuf_->get(p) : iksuf_[0].get(p);
397     }
398     Str ksuf(int p) const {
399         return ksuf(p, keylenx_[p]);
400     }
401     bool ksuf_equals(int p, const key_type& ka) const {
402         return ksuf_equals(p, ka, keylenx_[p]);
403     }
404     bool ksuf_equals(int p, const key_type& ka, int keylenx) const {
405         if (!keylenx_has_ksuf(keylenx))
406             return true;
407         Str s = ksuf(p, keylenx);
408         return s.len == ka.suffix().len
409             && string_slice<uintptr_t>::equals_sloppy(s.s, ka.suffix().s, s.len);
410     }
411     // Returns 1 if match & not layer, 0 if no match, <0 if match and layer
412     int ksuf_matches(int p, const key_type& ka) const {
413         int keylenx = keylenx_[p];
414         if (keylenx < ksuf_keylenx)
415             return 1;
416         if (keylenx == layer_keylenx)
417             return -(int) sizeof(ikey_type);
418         Str s = ksuf(p, keylenx);
419         return s.len == ka.suffix().len
420             && string_slice<uintptr_t>::equals_sloppy(s.s, ka.suffix().s, s.len);
421     }
422     int ksuf_compare(int p, const key_type& ka) const {
423         int keylenx = keylenx_[p];
424         if (!keylenx_has_ksuf(keylenx))
425             return 0;
426         return ksuf(p, keylenx).compare(ka.suffix());
427     }
428
429     size_t ksuf_used_capacity() const {
430         if (ksuf_)
431             return ksuf_->used_capacity();
432         else if (extrasize64_ > 0)
433             return iksuf_[0].used_capacity();
434         else
435             return 0;
436     }
437     size_t ksuf_capacity() const {
438         if (ksuf_)
439             return ksuf_->capacity();
440         else if (extrasize64_ > 0)
441             return iksuf_[0].capacity();
442         else
443             return 0;
444     }
445     bool ksuf_external() const {
446         return ksuf_;
447     }
448     Str ksuf_storage(int p) const {
449         if (ksuf_)
450             return ksuf_->get(p);
451         else if (extrasize64_ > 0)
452             return iksuf_[0].get(p);
453         else
454             return Str();
455     }
456
457     bool deleted_layer() const {
458         return modstate_ == modstate_deleted_layer;
459     }
460
461     void prefetch() const {
462         for (int i = 64; i < std::min(16 * width + 1, 4 * 64); i += 64)
463             ::prefetch((const char *) this + i);
464         if (extrasize64_ > 0)
465             ::prefetch((const char *) &iksuf_[0]);
466         else if (extrasize64_ < 0) {
467             ::prefetch((const char *) ksuf_);
468             ::prefetch((const char *) ksuf_ + CACHE_LINE_SIZE);
469         }
470     }
471
472     void print(FILE* f, const char* prefix, int indent, int kdepth);
473
474     leaf<P>* safe_next() const {
475         return reinterpret_cast<leaf<P>*>(next_.x & ~(uintptr_t) 1);
476     }
477
478     void deallocate(threadinfo& ti) {
479         if (ksuf_)
480             ti.deallocate(ksuf_, ksuf_->capacity(),
481                           memtag_masstree_ksuffixes);
482         if (extrasize64_ != 0)
483             iksuf_[0].~stringbag();
484         ti.pool_deallocate(this, allocated_size(), memtag_masstree_leaf);
485     }
486     void deallocate_rcu(threadinfo& ti) {
487         if (ksuf_)
488             ti.deallocate_rcu(ksuf_, ksuf_->capacity(),
489                               memtag_masstree_ksuffixes);
490         ti.pool_deallocate_rcu(this, allocated_size(), memtag_masstree_leaf);
491     }
492
493   private:
494     inline void mark_deleted_layer() {
495         modstate_ = modstate_deleted_layer;
496     }
497
498     inline void assign(int p, const key_type& ka, threadinfo& ti) {
499         lv_[p] = leafvalue_type::make_empty();
500         ikey0_[p] = ka.ikey();
501         if (!ka.has_suffix())
502             keylenx_[p] = ka.length();
503         else {
504             keylenx_[p] = ksuf_keylenx;
505             assign_ksuf(p, ka.suffix(), false, ti);
506         }
507     }
508     inline void assign_initialize(int p, const key_type& ka, threadinfo& ti) {
509         lv_[p] = leafvalue_type::make_empty();
510         ikey0_[p] = ka.ikey();
511         if (!ka.has_suffix())
512             keylenx_[p] = ka.length();
513         else {
514             keylenx_[p] = ksuf_keylenx;
515             assign_ksuf(p, ka.suffix(), true, ti);
516         }
517     }
518     inline void assign_initialize(int p, leaf<P>* x, int xp, threadinfo& ti) {
519         lv_[p] = x->lv_[xp];
520         ikey0_[p] = x->ikey0_[xp];
521         keylenx_[p] = x->keylenx_[xp];
522         if (x->has_ksuf(xp))
523             assign_ksuf(p, x->ksuf(xp), true, ti);
524     }
525     inline void assign_initialize_for_layer(int p, const key_type& ka) {
526         assert(ka.has_suffix());
527         ikey0_[p] = ka.ikey();
528         keylenx_[p] = layer_keylenx;
529     }
530     void assign_ksuf(int p, Str s, bool initializing, threadinfo& ti);
531
532     inline ikey_type ikey_after_insert(const permuter_type& perm, int i,
533                                        const key_type& ka, int ka_i) const;
534     int split_into(leaf<P>* nr, int p, const key_type& ka, ikey_type& split_ikey,
535                    threadinfo& ti);
536
537     template <typename PP> friend class tcursor;
538 };
539
540
541 template <typename P>
542 void basic_table<P>::initialize(threadinfo& ti) {
543     masstree_precondition(!root_);
544     root_ = node_type::leaf_type::make_root(0, 0, ti);
545 }
546
547
548 /** @brief Return this node's parent in locked state.
549     @pre this->locked()
550     @post this->parent() == result && (!result || result->locked()) */
551 template <typename P>
552 internode<P>* node_base<P>::locked_parent(threadinfo& ti) const
553 {
554     node_base<P>* p;
555     masstree_precondition(!this->concurrent || this->locked());
556     while (1) {
557         p = this->parent();
558         if (!node_base<P>::parent_exists(p))
559             break;
560         nodeversion_type pv = p->lock(*p, ti.lock_fence(tc_internode_lock));
561         if (p == this->parent()) {
562             masstree_invariant(!p->isleaf());
563             break;
564         }
565         p->unlock(pv);
566         relax_fence();
567     }
568     return static_cast<internode<P>*>(p);
569 }
570
571
572 /** @brief Return the result of compare_key(k, LAST KEY IN NODE).
573
574     Reruns the comparison until a stable comparison is obtained. */
575 template <typename P>
576 inline int
577 internode<P>::stable_last_key_compare(const key_type& k, nodeversion_type v,
578                                       threadinfo& ti) const
579 {
580     while (1) {
581         int cmp = compare_key(k, size() - 1);
582         if (likely(!this->has_changed(v)))
583             return cmp;
584         v = this->stable_annotated(ti.stable_fence());
585     }
586 }
587
588 template <typename P>
589 inline int
590 leaf<P>::stable_last_key_compare(const key_type& k, nodeversion_type v,
591                                  threadinfo& ti) const
592 {
593     while (1) {
594         typename leaf<P>::permuter_type perm(permutation_);
595         int p = perm[perm.size() - 1];
596         int cmp = compare_key(k, p);
597         if (likely(!this->has_changed(v)))
598             return cmp;
599         v = this->stable_annotated(ti.stable_fence());
600     }
601 }
602
603
604 /** @brief Return the leaf in this tree layer responsible for @a ka.
605
606     Returns a stable leaf. Sets @a version to the stable version. */
607 template <typename P>
608 inline leaf<P>* node_base<P>::reach_leaf(const key_type& ka,
609                                          nodeversion_type& version,
610                                          threadinfo& ti) const
611 {
612     const node_base<P> *n[2];
613     typename node_base<P>::nodeversion_type v[2];
614     bool sense;
615
616     // Get a non-stale root.
617     // Detect staleness by checking whether n has ever split.
618     // The true root has never split.
619  retry:
620     sense = false;
621     n[sense] = this;
622     while (1) {
623         v[sense] = n[sense]->stable_annotated(ti.stable_fence());
624         if (!v[sense].has_split())
625             break;
626         ti.mark(tc_root_retry);
627         n[sense] = n[sense]->unsplit_ancestor();
628     }
629
630     // Loop over internal nodes.
631     while (!v[sense].isleaf()) {
632         const internode<P> *in = static_cast<const internode<P> *>(n[sense]);
633         in->prefetch();
634         int kp = internode<P>::bound_type::upper(ka, *in);
635         n[!sense] = in->child_[kp];
636         if (!n[!sense])
637             goto retry;
638         v[!sense] = n[!sense]->stable_annotated(ti.stable_fence());
639
640         if (likely(!in->has_changed(v[sense]))) {
641             sense = !sense;
642             continue;
643         }
644
645         typename node_base<P>::nodeversion_type oldv = v[sense];
646         v[sense] = in->stable_annotated(ti.stable_fence());
647         if (oldv.has_split(v[sense])
648             && in->stable_last_key_compare(ka, v[sense], ti) > 0) {
649             ti.mark(tc_root_retry);
650             goto retry;
651         } else
652             ti.mark(tc_internode_retry);
653     }
654
655     version = v[sense];
656     return const_cast<leaf<P> *>(static_cast<const leaf<P> *>(n[sense]));
657 }
658
659 /** @brief Return the leaf at or after *this responsible for @a ka.
660     @pre *this was responsible for @a ka at version @a v
661
662     Checks whether *this has split since version @a v. If it has split, then
663     advances through the leaves using the B^link-tree pointers and returns
664     the relevant leaf, setting @a v to the stable version for that leaf. */
665 template <typename P>
666 leaf<P>* leaf<P>::advance_to_key(const key_type& ka, nodeversion_type& v,
667                                  threadinfo& ti) const
668 {
669     const leaf<P>* n = this;
670     nodeversion_type oldv = v;
671     v = n->stable_annotated(ti.stable_fence());
672     if (v.has_split(oldv)
673         && n->stable_last_key_compare(ka, v, ti) > 0) {
674         leaf<P> *next;
675         ti.mark(tc_leaf_walk);
676         while (likely(!v.deleted()) && (next = n->safe_next())
677                && compare(ka.ikey(), next->ikey_bound()) >= 0) {
678             n = next;
679             v = n->stable_annotated(ti.stable_fence());
680         }
681     }
682     return const_cast<leaf<P>*>(n);
683 }
684
685
686 /** @brief Assign position @a p's keysuffix to @a s.
687
688     This may allocate a new suffix container, copying suffixes over.
689
690     The @a initializing parameter determines which suffixes are copied. If @a
691     initializing is false, then this is an insertion into a live node. The
692     live node's permutation indicates which keysuffixes are active, and only
693     active suffixes are copied. If @a initializing is true, then this
694     assignment is part of the initialization process for a new node. The
695     permutation might not be set up yet. In this case, it is assumed that key
696     positions [0,p) are ready: keysuffixes in that range are copied. In either
697     case, the key at position p is NOT copied; it is assigned to @a s. */
698 template <typename P>
699 void leaf<P>::assign_ksuf(int p, Str s, bool initializing, threadinfo& ti) {
700     if ((ksuf_ && ksuf_->assign(p, s))
701         || (extrasize64_ > 0 && iksuf_[0].assign(p, s)))
702         return;
703
704     external_ksuf_type* oksuf = ksuf_;
705
706     permuter_type perm(permutation_);
707     int n = initializing ? p : perm.size();
708
709     size_t csz = 0;
710     for (int i = 0; i < n; ++i) {
711         int mp = initializing ? i : perm[i];
712         if (mp != p && has_ksuf(mp))
713             csz += ksuf(mp).len;
714     }
715
716     size_t sz = iceil_log2(external_ksuf_type::safe_size(width, csz + s.len));
717     if (oksuf)
718         sz = std::max(sz, oksuf->capacity());
719
720     void* ptr = ti.allocate(sz, memtag_masstree_ksuffixes);
721     external_ksuf_type* nksuf = new(ptr) external_ksuf_type(width, sz);
722     for (int i = 0; i < n; ++i) {
723         int mp = initializing ? i : perm[i];
724         if (mp != p && has_ksuf(mp)) {
725             bool ok = nksuf->assign(mp, ksuf(mp));
726             assert(ok); (void) ok;
727         }
728     }
729     bool ok = nksuf->assign(p, s);
730     assert(ok); (void) ok;
731     fence();
732
733     // removed ksufs aren't copied to the new ksuf, but observers
734     // might need them. We ensure that observers must retry by
735     // ensuring that we are not currently in the remove state.
736     // State transitions are accompanied by mark_insert() so observers
737     // will retry.
738     masstree_invariant(modstate_ != modstate_remove);
739
740     ksuf_ = nksuf;
741     fence();
742
743     if (extrasize64_ >= 0)      // now the new ksuf_ installed, mark old dead
744         extrasize64_ = -extrasize64_ - 1;
745
746     if (oksuf)
747         ti.deallocate_rcu(oksuf, oksuf->capacity(),
748                           memtag_masstree_ksuffixes);
749 }
750
751 template <typename P>
752 inline basic_table<P>::basic_table()
753     : root_(0) {
754 }
755
756 template <typename P>
757 inline node_base<P>* basic_table<P>::root() const {
758     return root_;
759 }
760
761 template <typename P>
762 inline node_base<P>* basic_table<P>::fix_root() {
763     node_base<P>* root = root_;
764     if (unlikely(root->has_split())) {
765         node_base<P>* old_root = root;
766         root = root->unsplit_ancestor();
767         (void) cmpxchg(&root_, old_root, root);
768     }
769     return root;
770 }
771
772 } // namespace Masstree
773 #endif