2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author: Xin Liu <xliux@fb.com>
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
24 This implements a sorted associative container that supports only
25 unique keys. (Similar to std::set.)
29 1. Small memory overhead: ~40% less memory overhead compared with
30 std::set (1.6 words per node versus 3). It has an minimum of 4
31 words (7 words if there nodes got deleted) per-list overhead
34 2. Read accesses (count, find iterator, skipper) are lock-free and
35 mostly wait-free (the only wait a reader may need to do is when
36 the node it is visiting is in a pending stage, i.e. deleting,
37 adding and not fully linked). Write accesses (remove, add) need
38 to acquire locks, but locks are local to the predecessor nodes
39 and/or successor nodes.
41 3. Good high contention performance, comparable single-thread
42 performance. In the multithreaded case (12 workers), CSL tested
43 10x faster than a RWSpinLocked std::set for an averaged sized
46 Comparable read performance to std::set when single threaded,
47 especially when the list size is large, and scales better to
48 larger lists: when the size is small, CSL can be 20-50% slower on
49 find()/contains(). As the size gets large (> 1M elements),
50 find()/contains() can be 30% faster.
52 Iterating through a skiplist is similar to iterating through a
53 linked list, thus is much (2-6x) faster than on a std::set
54 (tree-based). This is especially true for short lists due to
55 better cache locality. Based on that, it's also faster to
56 intersect two skiplists.
58 4. Lazy removal with GC support. The removed nodes get deleted when
59 the last Accessor to the skiplist is destroyed.
63 1. Write operations are usually 30% slower than std::set in a single
66 2. Need to have a head node for each list, which has a 4 word
69 3. When the list is quite small (< 1000 elements), single threaded
70 benchmarks show CSL can be 10x slower than std:set.
72 4. The interface requires using an Accessor to access the skiplist.
75 5. Currently x64 only, due to use of MicroSpinLock.
77 6. Freed nodes will not be reclaimed as long as there are ongoing
82 typedef ConcurrentSkipList<int> SkipListT;
83 shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
85 // It's usually good practice to hold an accessor only during
86 // its necessary life cycle (but not in a tight loop as
87 // Accessor creation incurs ref-counting overhead).
89 // Holding it longer delays garbage-collecting the deleted
91 SkipListT::Accessor accessor(sl);
94 for (auto &elem : accessor) {
95 // use elem to access data
100 Another useful type is the Skipper accessor. This is useful if you
101 want to skip to locations in the way std::lower_bound() works,
102 i.e. it can be used for going through the list by skipping to the
103 node no less than a specified key. The Skipper keeps its location as
104 state, which makes it convenient for things like implementing
105 intersection of two sets efficiently, as it can start from the last
109 SkipListT::Accessor accessor(sl);
110 SkipListT::Skipper skipper(accessor);
113 CHECK_LE(30, *skipper);
116 // GC may happen when the accessor gets destructed.
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
125 #include <type_traits>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <boost/scoped_ptr.hpp>
134 #include <glog/logging.h>
135 #include "folly/ConcurrentSkipList-inl.h"
136 #include "folly/Likely.h"
137 #include "folly/SmallLocks.h"
141 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
142 class ConcurrentSkipList {
143 // MAX_HEIGHT needs to be at least 2 to suppress compiler
144 // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
145 // being treated as a scalar in the compiler).
146 static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
147 "MAX_HEIGHT can only be in the range of [2, 64)");
148 typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
149 typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
152 typedef detail::SkipListNode<T> NodeType;
153 typedef T value_type;
157 typedef detail::csl_iterator<value_type, NodeType> iterator;
158 typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
163 // convenient function to get an Accessor to a new instance.
164 static Accessor create(int height=1) {
165 return Accessor(createInstance(height));
168 // create a shared_ptr skiplist object with initial head height.
169 static std::shared_ptr<SkipListType> createInstance(int height=1) {
170 return std::shared_ptr<SkipListType>(new SkipListType(height));
173 // create a unique_ptr skiplist object with initial head height.
174 static std::unique_ptr<SkipListType> createRawInstance(int height=1) {
175 return std::unique_ptr<SkipListType>(new SkipListType(height));
179 //===================================================================
180 // Below are implementation details.
181 // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
182 //===================================================================
184 ~ConcurrentSkipList() {
185 CHECK_EQ(recycler_.refs(), 0);
186 while (NodeType* current = head_.load(std::memory_order_relaxed)) {
187 NodeType* tmp = current->skip(0);
188 NodeType::destroy(current);
189 head_.store(tmp, std::memory_order_relaxed);
195 static bool greater(const value_type &data, const NodeType *node) {
196 return node && Comp()(node->data(), data);
199 static bool less(const value_type &data, const NodeType *node) {
200 return (node == nullptr) || Comp()(data, node->data());
203 static int findInsertionPoint(NodeType *cur, int cur_layer,
204 const value_type &data,
205 NodeType *preds[], NodeType *succs[]) {
207 NodeType *pred = cur;
208 NodeType *foundNode = nullptr;
209 for (int layer = cur_layer; layer >= 0; --layer) {
210 NodeType *node = pred->skip(layer);
211 while (greater(data, node)) {
213 node = node->skip(layer);
215 if (foundLayer == -1 && !less(data, node)) { // the two keys equal
221 // if found, succs[0..foundLayer] need to point to the cached foundNode,
222 // as foundNode might be deleted at the same time thus pred->skip() can
223 // return NULL or another node.
224 succs[layer] = foundNode ? foundNode : node;
229 struct Recycler : private boost::noncopyable {
230 Recycler() : refs_(0), dirty_(false) { lock_.init(); }
234 for (auto& node : *nodes_) {
235 NodeType::destroy(node);
240 void add(NodeType* node) {
241 std::lock_guard<MicroSpinLock> g(lock_);
242 if (nodes_.get() == nullptr) {
243 nodes_.reset(new std::vector<NodeType*>(1, node));
245 nodes_->push_back(node);
247 DCHECK_GT(refs(), 0);
248 dirty_.store(true, std::memory_order_relaxed);
252 return refs_.load(std::memory_order_relaxed);
256 return refs_.fetch_add(1, std::memory_order_relaxed);
260 // We don't expect to clean the recycler immediately everytime it is OK
261 // to do so. Here, it is possible that multiple accessors all release at
262 // the same time but nobody would clean the recycler here. If this
263 // happens, the recycler will usually still get cleaned when
264 // such a race doesn't happen. The worst case is the recycler will
265 // eventually get deleted along with the skiplist.
266 if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
267 return refs_.fetch_add(-1, std::memory_order_relaxed);
270 boost::scoped_ptr<std::vector<NodeType*> > newNodes;
272 std::lock_guard<MicroSpinLock> g(lock_);
273 if (nodes_.get() == nullptr || refs() > 1) {
274 return refs_.fetch_add(-1, std::memory_order_relaxed);
276 // once refs_ reaches 1 and there is no other accessor, it is safe to
277 // remove all the current nodes in the recycler, as we already acquired
278 // the lock here so no more new nodes can be added, even though new
279 // accessors may be added after that.
280 newNodes.swap(nodes_);
281 dirty_.store(false, std::memory_order_relaxed);
284 // TODO(xliu) should we spawn a thread to do this when there are large
285 // number of nodes in the recycler?
286 for (auto& node : *newNodes) {
287 NodeType::destroy(node);
290 // decrease the ref count at the very end, to minimize the
291 // chance of other threads acquiring lock_ to clear the deleted
293 return refs_.fetch_add(-1, std::memory_order_relaxed);
297 boost::scoped_ptr<std::vector<NodeType*>> nodes_;
298 std::atomic<int32_t> refs_; // current number of visitors to the list
299 std::atomic<bool> dirty_; // whether *nodes_ is non-empty
300 MicroSpinLock lock_; // protects access to *nodes_
301 }; // class ConcurrentSkipList::Recycler
303 explicit ConcurrentSkipList(int height) :
304 head_(NodeType::create(height, value_type(), true)), size_(0) {}
306 size_t size() const { return size_.load(std::memory_order_relaxed); }
308 return head_.load(std::memory_order_consume)->height();
310 int maxLayer() const { return height() - 1; }
312 size_t incrementSize(int delta) {
313 return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
316 // Returns the node if found, nullptr otherwise.
317 NodeType* find(const value_type &data) {
318 auto ret = findNode(data);
319 if (ret.second && !ret.first->markedForRemoval()) return ret.first;
323 // lock all the necessary nodes for changing (adding or removing) the list.
324 // returns true if all the lock acquried successfully and the related nodes
325 // are all validate (not in certain pending states), false otherwise.
326 bool lockNodesForChange(int nodeHeight,
327 ScopedLocker guards[MAX_HEIGHT],
328 NodeType *preds[MAX_HEIGHT],
329 NodeType *succs[MAX_HEIGHT],
331 NodeType *pred, *succ, *prevPred = nullptr;
333 for (int layer = 0; valid && layer < nodeHeight; ++layer) {
335 DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
336 << " nodeheight=" << nodeHeight;
338 if (pred != prevPred) {
339 guards[layer] = pred->acquireGuard();
342 valid = !pred->markedForRemoval() &&
343 pred->skip(layer) == succ; // check again after locking
345 if (adding) { // when adding a node, the succ shouldn't be going away
346 valid = valid && (succ == nullptr || !succ->markedForRemoval());
353 // Returns a paired value:
354 // pair.first always stores the pointer to the node with the same input key.
355 // It could be either the newly added data, or the existed data in the
356 // list with the same key.
357 // pair.second stores whether the data is added successfully:
358 // 0 means not added, otherwise reutrns the new size.
360 std::pair<NodeType*, size_t> addOrGetData(U &&data) {
361 NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
366 int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
369 NodeType *nodeFound = succs[layer];
370 DCHECK(nodeFound != nullptr);
371 if (nodeFound->markedForRemoval()) {
372 continue; // if it's getting deleted retry finding node.
374 // wait until fully linked.
375 while (UNLIKELY(!nodeFound->fullyLinked())) {}
376 return std::make_pair(nodeFound, 0);
379 // need to capped at the original height -- the real height may have grown
380 int nodeHeight = detail::SkipListRandomHeight::instance()->
381 getHeight(max_layer + 1);
383 ScopedLocker guards[MAX_HEIGHT];
384 if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
385 continue; // give up the locks and retry until all valid
388 // locks acquired and all valid, need to modify the links under the locks.
389 newNode = NodeType::create(nodeHeight, std::forward<U>(data));
390 for (int layer = 0; layer < nodeHeight; ++layer) {
391 newNode->setSkip(layer, succs[layer]);
392 preds[layer]->setSkip(layer, newNode);
395 newNode->setFullyLinked();
396 newSize = incrementSize(1);
402 detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
404 if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
407 CHECK_GT(newSize, 0);
408 return std::make_pair(newNode, newSize);
411 bool remove(const value_type &data) {
412 NodeType *nodeToDelete = nullptr;
413 ScopedLocker nodeGuard;
414 bool isMarked = false;
416 NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
420 int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
421 if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
426 nodeToDelete = succs[layer];
427 nodeHeight = nodeToDelete->height();
428 nodeGuard = nodeToDelete->acquireGuard();
429 if (nodeToDelete->markedForRemoval()) return false;
430 nodeToDelete->setMarkedForRemoval();
434 // acquire pred locks from bottom layer up
435 ScopedLocker guards[MAX_HEIGHT];
436 if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
437 continue; // this will unlock all the locks
440 for (int layer = nodeHeight - 1; layer >= 0; --layer) {
441 preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
447 recycle(nodeToDelete);
451 const value_type *first() const {
452 auto node = head_.load(std::memory_order_consume)->skip(0);
453 return node ? &node->data() : nullptr;
456 const value_type *last() const {
457 NodeType *pred = head_.load(std::memory_order_consume);
458 NodeType *node = nullptr;
459 for (int layer = maxLayer(); layer >= 0; --layer) {
461 node = pred->skip(layer);
462 if (node) pred = node;
463 } while (node != nullptr);
465 return pred == head_.load(std::memory_order_relaxed)
466 ? nullptr : &pred->data();
469 static bool okToDelete(NodeType *candidate, int layer) {
470 DCHECK(candidate != nullptr);
471 return candidate->fullyLinked() &&
472 candidate->maxLayer() == layer &&
473 !candidate->markedForRemoval();
476 // find node for insertion/deleting
477 int findInsertionPointGetMaxLayer(const value_type &data,
478 NodeType *preds[], NodeType *succs[], int *max_layer) const {
479 *max_layer = maxLayer();
480 return findInsertionPoint(head_.load(std::memory_order_consume),
481 *max_layer, data, preds, succs);
484 // Find node for access. Returns a paired values:
485 // pair.first = the first node that no-less than data value
486 // pair.second = 1 when the data value is founded, or 0 otherwise.
487 // This is like lower_bound, but not exact: we could have the node marked for
488 // removal so still need to check that.
489 std::pair<NodeType*, int> findNode(const value_type &data) const {
490 return findNodeDownRight(data);
493 // Find node by first stepping down then stepping right. Based on benchmark
494 // results, this is slightly faster than findNodeRightDown for better
495 // localality on the skipping pointers.
496 std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
497 NodeType *pred = head_.load(std::memory_order_consume);
498 int ht = pred->height();
499 NodeType *node = nullptr;
504 for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
505 if (ht == 0) return std::make_pair(pred->skip(0), 0); // not found
507 node = pred->skip(--ht); // node <= data now
509 while (greater(data, node)) {
511 node = node->skip(ht);
513 found = !less(data, node);
515 return std::make_pair(node, found);
518 // find node by first stepping right then stepping down.
519 // We still keep this for reference purposes.
520 std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
521 NodeType *pred = head_.load(std::memory_order_consume);
522 NodeType *node = nullptr;
523 auto top = maxLayer();
525 for (int layer = top; !found && layer >= 0; --layer) {
526 node = pred->skip(layer);
527 while (greater(data, node)) {
529 node = node->skip(layer);
531 found = !less(data, node);
533 return std::make_pair(node, found);
536 NodeType* lower_bound(const value_type &data) const {
537 auto node = findNode(data).first;
538 while (node != nullptr && node->markedForRemoval()) {
539 node = node->skip(0);
544 void growHeight(int height) {
545 NodeType* oldHead = head_.load(std::memory_order_consume);
546 if (oldHead->height() >= height) { // someone else already did this
550 NodeType* newHead = NodeType::create(height, value_type(), true);
552 { // need to guard the head node in case others are adding/removing
553 // nodes linked to the head.
554 ScopedLocker g = oldHead->acquireGuard();
555 newHead->copyHead(oldHead);
556 NodeType* expected = oldHead;
557 if (!head_.compare_exchange_strong(expected, newHead,
558 std::memory_order_release)) {
559 // if someone has already done the swap, just return.
560 NodeType::destroy(newHead);
563 oldHead->setMarkedForRemoval();
568 void recycle(NodeType *node) {
572 std::atomic<NodeType*> head_;
574 std::atomic<size_t> size_;
577 template<typename T, typename Comp, int MAX_HEIGHT>
578 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
579 typedef detail::SkipListNode<T> NodeType;
580 typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
582 typedef T value_type;
584 typedef T& reference;
586 typedef const T& const_reference;
587 typedef const T* const_pointer;
588 typedef size_t size_type;
589 typedef Comp key_compare;
590 typedef Comp value_compare;
592 typedef typename SkipListType::iterator iterator;
593 typedef typename SkipListType::const_iterator const_iterator;
594 typedef typename SkipListType::Skipper Skipper;
596 explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
597 : slHolder_(std::move(skip_list))
599 sl_ = slHolder_.get();
600 DCHECK(sl_ != nullptr);
601 sl_->recycler_.addRef();
604 // Unsafe initializer: the caller assumes the responsibility to keep
605 // skip_list valid during the whole life cycle of the Acessor.
606 explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
607 DCHECK(sl_ != nullptr);
608 sl_->recycler_.addRef();
611 Accessor(const Accessor &accessor) :
613 slHolder_(accessor.slHolder_) {
614 sl_->recycler_.addRef();
617 Accessor& operator=(const Accessor &accessor) {
618 if (this != &accessor) {
619 slHolder_ = accessor.slHolder_;
620 sl_->recycler_.release();
622 sl_->recycler_.addRef();
628 sl_->recycler_.release();
631 bool empty() const { return sl_->size() == 0; }
632 size_t size() const { return sl_->size(); }
633 size_type max_size() const { return std::numeric_limits<size_type>::max(); }
635 // returns end() if the value is not in the list, otherwise returns an
636 // iterator pointing to the data, and it's guaranteed that the data is valid
637 // as far as the Accessor is hold.
638 iterator find(const key_type &value) { return iterator(sl_->find(value)); }
639 const_iterator find(const key_type &value) const {
640 return iterator(sl_->find(value));
642 size_type count(const key_type &data) const { return contains(data); }
644 iterator begin() const {
645 NodeType* head = sl_->head_.load(std::memory_order_consume);
646 return iterator(head->next());
648 iterator end() const { return iterator(nullptr); }
649 const_iterator cbegin() const { return begin(); }
650 const_iterator cend() const { return end(); }
653 typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
654 std::pair<iterator, bool> insert(U&& data) {
655 auto ret = sl_->addOrGetData(std::forward<U>(data));
656 return std::make_pair(iterator(ret.first), ret.second);
658 size_t erase(const key_type &data) { return remove(data); }
660 iterator lower_bound(const key_type &data) const {
661 return iterator(sl_->lower_bound(data));
664 size_t height() const { return sl_->height(); }
666 // first() returns pointer to the first element in the skiplist, or
669 // last() returns the pointer to the last element in the skiplist,
670 // nullptr if list is empty.
672 // Note: As concurrent writing can happen, first() is not
673 // guaranteed to be the min_element() in the list. Similarly
674 // last() is not guaranteed to be the max_element(), and both of them can
675 // be invalid (i.e. nullptr), so we name them differently from front() and
677 const key_type *first() const { return sl_->first(); }
678 const key_type *last() const { return sl_->last(); }
680 // Try to remove the last element in the skip list.
682 // Returns true if we removed it, false if either the list is empty
683 // or a race condition happened (i.e. the used-to-be last element
684 // was already removed by another thread).
686 auto last = sl_->last();
687 return last ? sl_->remove(*last) : false;
690 std::pair<key_type*, bool> addOrGetData(const key_type &data) {
691 auto ret = sl_->addOrGetData(data);
692 return std::make_pair(&ret.first->data(), ret.second);
695 SkipListType* skiplist() const { return sl_; }
698 // TODO:(xliu) remove these.
699 // Returns true if the node is added successfully, false if not, i.e. the
700 // node with the same key already existed in the list.
701 bool contains(const key_type &data) const { return sl_->find(data); }
702 bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
703 bool remove(const key_type &data) { return sl_->remove(data); }
707 std::shared_ptr<SkipListType> slHolder_;
710 // implements forward iterator concept.
711 template<typename ValT, typename NodeT>
712 class detail::csl_iterator :
713 public boost::iterator_facade<csl_iterator<ValT, NodeT>,
714 ValT, boost::forward_traversal_tag> {
716 typedef ValT value_type;
717 typedef value_type& reference;
718 typedef value_type* pointer;
719 typedef ptrdiff_t difference_type;
721 explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
723 template<typename OtherVal, typename OtherNode>
724 csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
725 typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
726 = 0) : node_(other.node_) {}
728 size_t nodeSize() const {
729 return node_ == nullptr ? 0 :
730 node_->height() * sizeof(NodeT*) + sizeof(*this);
733 bool good() const { return node_ != nullptr; }
736 friend class boost::iterator_core_access;
737 template<class,class> friend class csl_iterator;
739 void increment() { node_ = node_->next(); };
740 bool equal(const csl_iterator& other) const { return node_ == other.node_; }
741 value_type& dereference() const { return node_->data(); }
747 template<typename T, typename Comp, int MAX_HEIGHT>
748 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
749 typedef detail::SkipListNode<T> NodeType;
750 typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
751 typedef typename SkipListType::Accessor Accessor;
754 typedef T value_type;
755 typedef T& reference;
757 typedef ptrdiff_t difference_type;
759 Skipper(const std::shared_ptr<SkipListType>& skipList) :
760 accessor_(skipList) {
764 Skipper(const Accessor& accessor) : accessor_(accessor) {
769 // need to cache the head node
770 NodeType* head_node = head();
771 headHeight_ = head_node->height();
772 for (int i = 0; i < headHeight_; ++i) {
773 preds_[i] = head_node;
774 succs_[i] = head_node->skip(i);
776 int max_layer = maxLayer();
777 for (int i = 0; i < max_layer; ++i) {
780 hints_[max_layer] = max_layer;
783 // advance to the next node in the list.
784 Skipper& operator ++() {
785 preds_[0] = succs_[0];
786 succs_[0] = preds_[0]->skip(0);
787 int height = curHeight();
788 for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
789 preds_[i] = succs_[i];
790 succs_[i] = preds_[i]->skip(i);
795 bool good() const { return succs_[0] != nullptr; }
797 int maxLayer() const { return headHeight_ - 1; }
799 int curHeight() const {
800 // need to cap the height to the cached head height, as the current node
801 // might be some newly inserted node and also during the time period the
802 // head height may have grown.
803 return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
806 const value_type &data() const {
807 DCHECK(succs_[0] != NULL);
808 return succs_[0]->data();
811 value_type &operator *() const {
812 DCHECK(succs_[0] != NULL);
813 return succs_[0]->data();
816 value_type *operator->() {
817 DCHECK(succs_[0] != NULL);
818 return &succs_[0]->data();
822 * Skip to the position whose data is no less than the parameter.
823 * (I.e. the lower_bound).
825 * Returns true if the data is found, false otherwise.
827 bool to(const value_type &data) {
828 int layer = curHeight() - 1;
829 if (layer < 0) return false; // reaches the end of the list
831 int lyr = hints_[layer];
832 int max_layer = maxLayer();
833 while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
836 hints_[layer] = lyr; // update the hint
838 int foundLayer = SkipListType::
839 findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
840 if (foundLayer < 0) return false;
842 DCHECK(succs_[0] != NULL) << "lyr=" << lyr << "; max_layer=" << max_layer;
843 return !succs_[0]->markedForRemoval();
847 NodeType* head() const {
848 return accessor_.skiplist()->head_.load(std::memory_order_consume);
853 NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
854 uint8_t hints_[MAX_HEIGHT];
859 #endif // FOLLY_CONCURRENT_SKIP_LIST_H_