2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include <folly/Function.h>
22 #include <folly/ThreadLocal.h>
23 #include <glog/logging.h>
29 // This is a thread-local cached, multi-producer single-consumer
30 // queue, similar to a concurrent version of std::list.
32 class ThreadCachedListsBase {
35 folly::Function<void()> cb_;
40 template <typename Tag>
41 class ThreadCachedLists : public ThreadCachedListsBase {
43 struct AtomicListHead {
44 std::atomic<Node*> tail_{nullptr};
45 std::atomic<Node*> head_{nullptr};
48 // Non-concurrent list, similar to std::list.
53 // Run func on each list node.
54 template <typename Func>
55 void forEach(Func func) {
57 while (node != nullptr) {
58 auto next = node->next_;
64 // Splice other in to this list.
65 // Afterwards, other is a valid empty listhead.
66 void splice(ListHead& other);
68 void splice(AtomicListHead& other);
71 // Push a node on a thread-local list. Returns true if local list
73 void push(Node* node);
75 // Collect all thread local lists to a single local list.
76 // This function is threadsafe with concurrent push()es,
77 // but only a single thread may call collect() at a time.
78 void collect(ListHead& list);
81 // Push list to the global list.
82 void pushGlobal(ListHead& list);
86 struct TLHead : public AtomicListHead {
87 ThreadCachedLists* parent_;
90 TLHead(ThreadCachedLists* parent) : parent_(parent) {}
93 parent_->ghead_.splice(*this);
97 folly::ThreadLocalPtr<TLHead, Tag> lhead_;
100 // push() and splice() are optimistic w.r.t setting the list head: The
101 // first pusher cas's the list head, which functions as a lock until
102 // tail != null. The first pusher then sets tail_ = head_.
104 // splice() does the opposite: steals the tail_ via exchange, then
105 // unlocks the list again by setting head_ to null.
106 template <typename Tag>
107 void ThreadCachedLists<Tag>::push(Node* node) {
108 DCHECK(node->next_ == nullptr);
109 static thread_local TLHead* cache_{nullptr};
112 auto l = lhead_.get();
114 lhead_.reset(new TLHead(this));
122 auto head = cache_->head_.load(std::memory_order_relaxed);
124 node->next_ = nullptr;
125 if (cache_->head_.compare_exchange_weak(head, node)) {
126 cache_->tail_.store(node);
130 auto tail = cache_->tail_.load(std::memory_order_relaxed);
133 if (cache_->tail_.compare_exchange_weak(node->next_, node)) {
141 template <typename Tag>
142 void ThreadCachedLists<Tag>::collect(ListHead& list) {
143 auto acc = lhead_.accessAllThreads();
145 for (auto& thr : acc) {
152 template <typename Tag>
153 void ThreadCachedLists<Tag>::ListHead::splice(ListHead& other) {
154 if (other.head_ != nullptr) {
155 DCHECK(other.tail_ != nullptr);
157 DCHECK(other.tail_ == nullptr);
162 DCHECK(tail_ != nullptr);
163 DCHECK(head_->next_ == nullptr);
164 head_->next_ = other.tail_;
167 DCHECK(head_ == nullptr);
172 other.head_ = nullptr;
173 other.tail_ = nullptr;
176 template <typename Tag>
177 void ThreadCachedLists<Tag>::ListHead::splice(AtomicListHead& list) {
180 auto tail = list.tail_.load();
182 local.tail_ = list.tail_.exchange(nullptr);
183 local.head_ = list.head_.exchange(nullptr);
188 } // namespace detail