2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_LIFOSEM_H
18 #define FOLLY_LIFOSEM_H
25 #include <system_error>
27 #include <folly/AtomicStruct.h>
28 #include <folly/Baton.h>
29 #include <folly/IndexedMemPool.h>
30 #include <folly/Likely.h>
31 #include <folly/detail/CacheLocality.h>
35 template <template<typename> class Atom = std::atomic,
36 class BatonType = Baton<Atom>>
39 /// LifoSem is a semaphore that wakes its waiters in a manner intended to
40 /// maximize performance rather than fairness. It should be preferred
41 /// to a mutex+condvar or POSIX sem_t solution when all of the waiters
42 /// are equivalent. It is faster than a condvar or sem_t, and it has a
43 /// shutdown state that might save you a lot of complexity when it comes
44 /// time to shut down your work pipelines. LifoSem is larger than sem_t,
45 /// but that is only because it uses padding and alignment to avoid
48 /// LifoSem allows multi-post and multi-tryWait, and provides a shutdown
49 /// state that awakens all waiters. LifoSem is faster than sem_t because
50 /// it performs exact wakeups, so it often requires fewer system calls.
51 /// It provides all of the functionality of sem_t except for timed waiting.
52 /// It is called LifoSem because its wakeup policy is approximately LIFO,
53 /// rather than the usual FIFO.
55 /// The core semaphore operations provided are:
57 /// -- post() -- if there is a pending waiter, wake it up, otherwise
58 /// increment the value of the semaphore. If the value of the semaphore
59 /// is already 2^32-1, does nothing. Compare to sem_post().
61 /// -- post(n) -- equivalent to n calls to post(), but much more efficient.
62 /// sem_t has no equivalent to this method.
64 /// -- bool tryWait() -- if the semaphore's value is positive, decrements it
65 /// and returns true, otherwise returns false. Compare to sem_trywait().
67 /// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's
68 /// value by n, returning the amount by which it actually was decremented
69 /// (a value from 0 to n inclusive). Not atomic. Equivalent to n calls
70 /// to tryWait(). sem_t has no equivalent to this method.
72 /// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait().
74 /// LifoSem also has the notion of a shutdown state, in which any calls
75 /// that would block (or are already blocked) throw ShutdownSemError.
76 /// Note the difference between a call to wait() and a call to wait()
77 /// that might block. In the former case tryWait() would succeed, and no
78 /// isShutdown() check is performed. In the latter case an exception is
79 /// thrown. This behavior allows a LifoSem controlling work distribution
80 /// to drain. If you want to immediately stop all waiting on shutdown,
81 /// you can just check isShutdown() yourself (preferrably wrapped in
82 /// an UNLIKELY). This fast-stop behavior is easy to add, but difficult
83 /// to remove if you want the draining behavior, which is why we have
84 /// chosen the former. Since wait() is the only method that can block,
85 /// it is the only one that is affected by the shutdown state.
87 /// All LifoSem operations operations except valueGuess() are guaranteed
88 /// to be linearizable.
89 typedef LifoSemImpl<> LifoSem;
92 /// The exception thrown when wait()ing on an isShutdown() LifoSem
93 struct ShutdownSemError : public std::runtime_error {
94 explicit ShutdownSemError(const std::string& msg);
95 virtual ~ShutdownSemError() noexcept;
100 // Internally, a LifoSem is either a value or a linked list of wait nodes.
101 // This union is captured in the LifoSemHead type, which holds either a
102 // value or an indexed pointer to the list. LifoSemHead itself is a value
103 // type, the head is a mutable atomic box containing a LifoSemHead value.
104 // Each wait node corresponds to exactly one waiter. Values can flow
105 // through the semaphore either by going into and out of the head's value,
106 // or by direct communication from a poster to a waiter. The former path
107 // is taken when there are no pending waiters, the latter otherwise. The
108 // general flow of a post is to try to increment the value or pop-and-post
109 // a wait node. Either of those have the effect of conveying one semaphore
110 // unit. Waiting is the opposite, either a decrement of the value or
111 // push-and-wait of a wait node. The generic LifoSemBase abstracts the
112 // actual mechanism by which a wait node's post->wait communication is
113 // performed, which is why we have LifoSemRawNode and LifoSemNode.
115 /// LifoSemRawNode is the actual pooled storage that backs LifoSemNode
116 /// for user-specified Handoff types. This is done so that we can have
117 /// a large static IndexedMemPool of nodes, instead of per-type pools
118 template <template<typename> class Atom>
119 struct LifoSemRawNode {
120 std::aligned_storage<sizeof(void*),alignof(void*)>::type raw;
122 /// The IndexedMemPool index of the next node in this chain, or 0
123 /// if none. This will be set to uint32_t(-1) if the node is being
124 /// posted due to a shutdown-induced wakeup
127 bool isShutdownNotice() const { return next == uint32_t(-1); }
128 void clearShutdownNotice() { next = 0; }
129 void setShutdownNotice() { next = uint32_t(-1); }
131 typedef folly::IndexedMemPool<LifoSemRawNode<Atom>,32,200,Atom> Pool;
133 /// Storage for all of the waiter nodes for LifoSem-s that use Atom
137 /// Use this macro to declare the static storage that backs the raw nodes
138 /// for the specified atomic type
139 #define LIFOSEM_DECLARE_POOL(Atom, capacity) \
143 LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \
144 static Pool* instance = new Pool((capacity)); \
150 /// Handoff is a type not bigger than a void* that knows how to perform a
151 /// single post() -> wait() communication. It must have a post() method.
152 /// If it has a wait() method then LifoSemBase's wait() implementation
153 /// will work out of the box, otherwise you will need to specialize
154 /// LifoSemBase::wait accordingly.
155 template <typename Handoff, template<typename> class Atom>
156 struct LifoSemNode : public LifoSemRawNode<Atom> {
158 static_assert(sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
159 "Handoff too big for small-object optimization, use indirection");
160 static_assert(alignof(Handoff) <=
161 alignof(decltype(LifoSemRawNode<Atom>::raw)),
162 "Handoff alignment constraint not satisfied");
164 template <typename ...Args>
165 void init(Args&&... args) {
166 new (&this->raw) Handoff(std::forward<Args>(args)...);
170 handoff().~Handoff();
172 memset(&this->raw, 'F', sizeof(this->raw));
177 return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
180 const Handoff& handoff() const {
181 return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
185 template <typename Handoff, template<typename> class Atom>
186 struct LifoSemNodeRecycler {
187 void operator()(LifoSemNode<Handoff,Atom>* elem) const {
189 auto idx = LifoSemRawNode<Atom>::pool().locateElem(elem);
190 LifoSemRawNode<Atom>::pool().recycleIndex(idx);
194 /// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state
195 /// bits, and a sequence number used to avoid ABA problems in the lock-free
196 /// management of the LifoSem's wait lists. The value can either hold
197 /// an integral semaphore value (if there are no waiters) or a node index
198 /// (see IndexedMemPool) for the head of a list of wait nodes
200 // What we really want are bitfields:
201 // uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
202 // Unfortunately g++ generates pretty bad code for this sometimes (I saw
203 // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
204 // in bulk, for example). We can generate better code anyway by assuming
205 // that setters won't be given values that cause under/overflow, and
206 // putting the sequence at the end where its planned overflow doesn't
209 // data == 0 (empty list) with isNodeIdx is conceptually the same
210 // as data == 0 (no unclaimed increments) with !isNodeIdx, we always
211 // convert the former into the latter to make the logic simpler.
214 IsShutdownShift = 33,
218 IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
219 IsShutdownMask = uint64_t(1) << IsShutdownShift,
220 SeqIncr = uint64_t(1) << SeqShift,
221 SeqMask = ~(SeqIncr - 1),
230 inline uint32_t idx() const {
232 assert(uint32_t(bits) != 0);
233 return uint32_t(bits);
235 inline uint32_t value() const {
236 assert(!isNodeIdx());
237 return uint32_t(bits);
239 inline constexpr bool isNodeIdx() const {
240 return (bits & IsNodeIdxMask) != 0;
242 inline constexpr bool isShutdown() const {
243 return (bits & IsShutdownMask) != 0;
245 inline constexpr uint32_t seq() const {
246 return uint32_t(bits >> SeqShift);
249 //////// setter-like things return a new struct
251 /// This should only be used for initial construction, not for setting
252 /// the value, because it clears the sequence number
253 static inline constexpr LifoSemHead fresh(uint32_t value) {
254 return LifoSemHead{ value };
257 /// Returns the LifoSemHead that results from popping a waiter node,
258 /// given the current waiter node's next ptr
259 inline LifoSemHead withPop(uint32_t idxNext) const {
262 // no isNodeIdx bit or data bits. Wraparound of seq bits is okay
263 return LifoSemHead{ (bits & (SeqMask | IsShutdownMask)) + SeqIncr };
265 // preserve sequence bits (incremented with wraparound okay) and
266 // isNodeIdx bit, replace all data bits
268 (bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
273 /// Returns the LifoSemHead that results from pushing a new waiter node
274 inline LifoSemHead withPush(uint32_t _idx) const {
275 assert(isNodeIdx() || value() == 0);
276 assert(!isShutdown());
278 return LifoSemHead{ (bits & SeqMask) | IsNodeIdxMask | _idx };
281 /// Returns the LifoSemHead with value increased by delta, with
282 /// saturation if the maximum value is reached
283 inline LifoSemHead withValueIncr(uint32_t delta) const {
284 assert(!isNodeIdx());
285 auto rv = LifoSemHead{ bits + SeqIncr + delta };
286 if (UNLIKELY(rv.isNodeIdx())) {
287 // value has overflowed into the isNodeIdx bit
288 rv = LifoSemHead{ (rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1) };
293 /// Returns the LifoSemHead that results from decrementing the value
294 inline LifoSemHead withValueDecr(uint32_t delta) const {
295 assert(delta > 0 && delta <= value());
296 return LifoSemHead{ bits + SeqIncr - delta };
299 /// Returns the LifoSemHead with the same state as the current node,
300 /// but with the shutdown bit set
301 inline LifoSemHead withShutdown() const {
302 return LifoSemHead{ bits | IsShutdownMask };
305 inline constexpr bool operator== (const LifoSemHead& rhs) const {
306 return bits == rhs.bits;
308 inline constexpr bool operator!= (const LifoSemHead& rhs) const {
309 return !(*this == rhs);
313 /// LifoSemBase is the engine for several different types of LIFO
314 /// semaphore. LifoSemBase handles storage of positive semaphore values
315 /// and wait nodes, but the actual waiting and notification mechanism is
316 /// up to the client.
318 /// The Handoff type is responsible for arranging one wakeup notification.
319 /// See LifoSemNode for more information on how to make your own.
320 template <typename Handoff,
321 template<typename> class Atom = std::atomic>
325 constexpr explicit LifoSemBase(uint32_t initialValue = 0)
326 : head_(LifoSemHead::fresh(initialValue)) {}
328 LifoSemBase(LifoSemBase const&) = delete;
329 LifoSemBase& operator=(LifoSemBase const&) = delete;
331 /// Silently saturates if value is already 2^32-1
333 auto idx = incrOrPop(1);
335 idxToNode(idx).handoff().post();
339 /// Equivalent to n calls to post(), except may be much more efficient.
340 /// At any point in time at which the semaphore's value would exceed
341 /// 2^32-1 if tracked with infinite precision, it may be silently
342 /// truncated to 2^32-1. This saturation is not guaranteed to be exact,
343 /// although it is guaranteed that overflow won't result in wrap-around.
344 /// There would be a substantial performance and complexity cost in
345 /// guaranteeing exact saturation (similar to the cost of maintaining
346 /// linearizability near the zero value, but without as much of
348 void post(uint32_t n) {
350 while (n > 0 && (idx = incrOrPop(n)) != 0) {
351 // pop accounts for only 1
352 idxToNode(idx).handoff().post();
357 /// Returns true iff shutdown() has been called
358 bool isShutdown() const {
359 return UNLIKELY(head_.load(std::memory_order_acquire).isShutdown());
362 /// Prevents blocking on this semaphore, causing all blocking wait()
363 /// calls to throw ShutdownSemError. Both currently blocked wait() and
364 /// future calls to wait() for which tryWait() would return false will
365 /// cause an exception. Calls to wait() for which the matching post()
366 /// has already occurred will proceed normally.
368 // first set the shutdown bit
369 auto h = head_.load(std::memory_order_acquire);
370 while (!h.isShutdown()) {
371 if (head_.compare_exchange_strong(h, h.withShutdown())) {
373 h = h.withShutdown();
376 // compare_exchange_strong rereads h, retry
379 // now wake up any waiters
380 while (h.isNodeIdx()) {
381 auto& node = idxToNode(h.idx());
382 auto repl = h.withPop(node.next);
383 if (head_.compare_exchange_strong(h, repl)) {
384 // successful pop, wake up the waiter and move on. The next
385 // field is used to convey that this wakeup didn't consume a value
386 node.setShutdownNotice();
387 node.handoff().post();
393 /// Returns true iff value was decremented
396 auto rv = decrOrPush(n, 0);
397 assert((rv == WaitResult::DECR && n == 0) ||
398 (rv != WaitResult::DECR && n == 1));
399 // SHUTDOWN is okay here, since we don't actually wait
400 return rv == WaitResult::DECR;
403 /// Equivalent to (but may be much more efficient than) n calls to
404 /// tryWait(). Returns the total amount by which the semaphore's value
406 uint32_t tryWait(uint32_t n) {
412 auto rv = decrOrPush(n, 0);
413 assert((rv == WaitResult::DECR && n < prev) ||
414 (rv != WaitResult::DECR && n == prev));
415 if (rv != WaitResult::DECR) {
422 /// Blocks the current thread until there is a matching post or the
423 /// semaphore is shut down. Throws ShutdownSemError if the semaphore
424 /// has been shut down and this method would otherwise be blocking.
425 /// Note that wait() doesn't throw during shutdown if tryWait() would
428 // early check isn't required for correctness, but is an important
429 // perf win if we can avoid allocating and deallocating a node
434 // allocateNode() won't compile unless Handoff has a default
436 UniquePtr node = allocateNode();
438 auto rv = tryWaitOrPush(*node);
439 if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
440 assert(isShutdown());
441 throw ShutdownSemError("wait() would block but semaphore is shut down");
444 if (rv == WaitResult::PUSH) {
445 node->handoff().wait();
446 if (UNLIKELY(node->isShutdownNotice())) {
447 // this wait() didn't consume a value, it was triggered by shutdown
448 assert(isShutdown());
449 throw ShutdownSemError(
450 "blocking wait() interrupted by semaphore shutdown");
453 // node->handoff().wait() can't return until after the node has
454 // been popped and post()ed, so it is okay for the UniquePtr to
455 // recycle the node now
457 // else node wasn't pushed, so it is safe to recycle
460 /// Returns a guess at the current value, designed for debugging.
461 /// If there are no concurrent posters or waiters then this will
463 uint32_t valueGuess() const {
464 // this is actually linearizable, but we don't promise that because
465 // we may want to add striping in the future to help under heavy
467 auto h = head_.load(std::memory_order_acquire);
468 return h.isNodeIdx() ? 0 : h.value();
473 enum class WaitResult {
479 /// The type of a std::unique_ptr that will automatically return a
480 /// LifoSemNode to the appropriate IndexedMemPool
481 typedef std::unique_ptr<LifoSemNode<Handoff, Atom>,
482 LifoSemNodeRecycler<Handoff, Atom>> UniquePtr;
484 /// Returns a node that can be passed to decrOrLink
485 template <typename... Args>
486 UniquePtr allocateNode(Args&&... args) {
487 auto idx = LifoSemRawNode<Atom>::pool().allocIndex();
489 auto& node = idxToNode(idx);
490 node.clearShutdownNotice();
492 node.init(std::forward<Args>(args)...);
494 LifoSemRawNode<Atom>::pool().recycleIndex(idx);
497 return UniquePtr(&node);
503 /// Returns DECR if the semaphore value was decremented (and waiterNode
504 /// was untouched), PUSH if a reference to the wait node was pushed,
505 /// or SHUTDOWN if decrement was not possible and push wasn't allowed
506 /// because isShutdown(). Ownership of the wait node remains the
507 /// responsibility of the caller, who must not release it until after
508 /// the node's Handoff has been posted.
509 WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) {
511 return decrOrPush(n, nodeToIdx(waiterNode));
516 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
517 folly::AtomicStruct<LifoSemHead,Atom> head_;
519 char padding_[folly::detail::CacheLocality::kFalseSharingRange -
520 sizeof(LifoSemHead)];
523 static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) {
524 auto raw = &LifoSemRawNode<Atom>::pool()[idx];
525 return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
528 static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) {
529 return LifoSemRawNode<Atom>::pool().locateElem(&node);
532 /// Either increments by n and returns 0, or pops a node and returns it.
533 /// If n + the stripe's value overflows, then the stripe's value
534 /// saturates silently at 2^32-1
535 uint32_t incrOrPop(uint32_t n) {
539 auto head = head_.load(std::memory_order_acquire);
540 if (head.isNodeIdx()) {
541 auto& node = idxToNode(head.idx());
542 if (head_.compare_exchange_strong(head, head.withPop(node.next))) {
547 auto after = head.withValueIncr(n);
548 if (head_.compare_exchange_strong(head, after)) {
557 /// Returns DECR if some amount was decremented, with that amount
558 /// subtracted from n. If n is 1 and this function returns DECR then n
559 /// must be 0 afterward. Returns PUSH if no value could be decremented
560 /// and idx was pushed, or if idx was zero and no push was performed but
561 /// a push would have been performed with a valid node. Returns SHUTDOWN
562 /// if the caller should have blocked but isShutdown(). If idx == 0,
563 /// may return PUSH even after isShutdown() or may return SHUTDOWN
564 WaitResult decrOrPush(uint32_t& n, uint32_t idx) {
568 auto head = head_.load(std::memory_order_acquire);
570 if (!head.isNodeIdx() && head.value() > 0) {
572 auto delta = std::min(n, head.value());
573 if (head_.compare_exchange_strong(head, head.withValueDecr(delta))) {
575 return WaitResult::DECR;
580 return WaitResult::PUSH;
583 if (UNLIKELY(head.isShutdown())) {
584 return WaitResult::SHUTDOWN;
587 auto& node = idxToNode(idx);
588 node.next = head.isNodeIdx() ? head.idx() : 0;
589 if (head_.compare_exchange_strong(head, head.withPush(idx))) {
591 return WaitResult::PUSH;
599 } // namespace detail
601 template <template<typename> class Atom, class BatonType>
602 struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
603 constexpr explicit LifoSemImpl(uint32_t v = 0)
604 : detail::LifoSemBase<BatonType, Atom>(v) {}