2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/Function.h>
18 #include <folly/detail/AtFork.h>
19 #include <folly/detail/TurnSequencer.h>
23 template <typename Tag>
24 bool rcu_domain<Tag>::singleton_{false};
26 template <typename Tag>
27 rcu_domain<Tag>::rcu_domain(Executor* executor) noexcept
28 : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) {
29 // Please use a unique tag for each domain.
33 // Register fork handlers. Holding read locks across fork is not
34 // supported. Using read locks in other atfork handlers is not
35 // supported. Other atfork handlers launching new child threads
36 // that use read locks *is* supported.
37 detail::AtFork::registerHandler(
39 [this]() { syncMutex_.lock(); },
40 [this]() { syncMutex_.unlock(); },
42 counters_.resetAfterFork();
47 template <typename Tag>
48 rcu_domain<Tag>::~rcu_domain() {
49 detail::AtFork::unregisterHandler(this);
52 template <typename Tag>
53 rcu_token rcu_domain<Tag>::lock_shared() {
54 auto idx = version_.load(std::memory_order_acquire);
56 counters_.increment(idx);
61 template <typename Tag>
62 void rcu_domain<Tag>::unlock_shared(rcu_token&& token) {
63 DCHECK(0 == token.epoch_ || 1 == token.epoch_);
64 counters_.decrement(token.epoch_);
67 template <typename Tag>
69 void rcu_domain<Tag>::call(T&& cbin) {
70 auto node = new list_node;
71 node->cb_ = [node, cb = std::forward<T>(cbin)]() {
78 template <typename Tag>
79 void rcu_domain<Tag>::retire(list_node* node) noexcept {
82 // Note that it's likely we hold a read lock here,
83 // so we can only half_sync(false). half_sync(true)
84 // or a synchronize() call might block forever.
85 uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
86 std::chrono::steady_clock::now().time_since_epoch())
88 if (time > syncTime_.load(std::memory_order_relaxed) + syncTimePeriod_) {
91 std::lock_guard<std::mutex> g(syncMutex_);
92 syncTime_.store(time, std::memory_order_relaxed);
93 half_sync(false, finished);
95 // callbacks are called outside of syncMutex_
97 [&](list_node* item) { executor_->add(std::move(item->cb_)); });
101 template <typename Tag>
102 void rcu_domain<Tag>::synchronize() noexcept {
103 auto curr = version_.load(std::memory_order_acquire);
104 // Target is two epochs away.
105 auto target = curr + 2;
107 // Try to assign ourselves to do the sync work.
108 // If someone else is already assigned, we can wait for
109 // the work to be finished by waiting on turn_.
110 auto work = work_.load(std::memory_order_acquire);
112 if (work < target && work_.compare_exchange_strong(tmp, target)) {
115 std::lock_guard<std::mutex> g(syncMutex_);
116 while (version_.load(std::memory_order_acquire) < target) {
117 half_sync(true, finished);
120 // callbacks are called outside of syncMutex_
122 [&](list_node* node) { executor_->add(std::move(node->cb_)); });
125 if (version_.load(std::memory_order_acquire) >= target) {
128 std::atomic<uint32_t> cutoff{100};
129 // Wait for someone to finish the work.
130 turn_.tryWaitForTurn(work, cutoff, false);
136 * Not multithread safe, but it could be with proper version
137 * checking and stronger increment of version. See
138 * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf
140 * This version, however, can go to sleep if there are outstanding
141 * readers, and does not spin or need rescheduling, unless blocking = false.
143 template <typename Tag>
144 void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) {
145 uint64_t curr = version_.load(std::memory_order_acquire);
146 auto next = curr + 1;
148 // Push all work to a queue for moving through two epochs. One
149 // version is not enough because of late readers of the version_
150 // counter in lock_shared.
152 // Note that for a similar reason we can't swap out the q here,
153 // and instead drain it, so concurrent calls to call() are safe,
154 // and will wait for the next epoch.
155 q_.collect(queues_[0]);
158 counters_.waitForZero(next & 1);
160 if (counters_.readFull(next & 1) != 0) {
165 // Run callbacks that have been through two epochs, and swap queues
166 // for those only through a single epoch.
167 finished.splice(queues_[1]);
168 queues_[1].splice(queues_[0]);
170 version_.store(next, std::memory_order_release);
171 // Notify synchronous waiters in synchronize().
172 turn_.completeTurn(curr);