From 7fbec7196afc262f23dd26431441f5c231d4d801 Mon Sep 17 00:00:00 2001 From: Marc Celani Date: Wed, 16 Apr 2014 17:19:31 -0700 Subject: [PATCH] Introduce LifoSem Summary: A LIFO semaphore is useful for building a thread pool because we can keep cpu caches warmer by continually reusing the last used thread. This diff introduces a LIFO semaphore to folly. Test Plan: unit tests Reviewed By: davejwatson@fb.com FB internal diff: D1280405 --- folly/LifoSem.cpp | 40 +++ folly/LifoSem.h | 600 ++++++++++++++++++++++++++++++++++++ folly/test/LifoSemTests.cpp | 437 ++++++++++++++++++++++++++ 3 files changed, 1077 insertions(+) create mode 100644 folly/LifoSem.cpp create mode 100644 folly/LifoSem.h create mode 100644 folly/test/LifoSemTests.cpp diff --git a/folly/LifoSem.cpp b/folly/LifoSem.cpp new file mode 100644 index 00000000..942f3ffa --- /dev/null +++ b/folly/LifoSem.cpp @@ -0,0 +1,40 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LifoSem.h" + +/// Raw node storage is preallocated in a contiguous memory segment, +/// but we use an anonymous mmap so the physical memory used (RSS) will +/// only reflect the maximum number of waiters that actually existed +/// concurrently. For blocked threads the max node count is limited by the +/// number of threads, so we can conservatively estimate that this will be +/// < 10k. For LifoEventSem, however, we could potentially have many more. +/// +/// On a 64-bit architecture each LifoSemRawNode takes 16 bytes. We make +/// the pool 1 million entries. + +LIFOSEM_DECLARE_POOL(std::atomic, 1000000) + +namespace folly { + +ShutdownSemError::ShutdownSemError(const std::string& msg) + : std::runtime_error(msg) +{} + +ShutdownSemError::~ShutdownSemError() noexcept { +} + +} diff --git a/folly/LifoSem.h b/folly/LifoSem.h new file mode 100644 index 00000000..b5b83cd4 --- /dev/null +++ b/folly/LifoSem.h @@ -0,0 +1,600 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_LIFOSEM_H +#define FOLLY_LIFOSEM_H + +#include +#include +#include +#include +#include +#include + +#include "folly/AtomicStruct.h" +#include "folly/Baton.h" +#include "folly/IndexedMemPool.h" +#include "folly/Likely.h" +#include "folly/detail/CacheLocality.h" + +namespace folly { + +template class Atom = std::atomic, + class BatonType = Baton> +struct LifoSemImpl; + +/// LifoSem is a semaphore that wakes its waiters in a manner intended to +/// maximize performance rather than fairness. It should be preferred +/// to a mutex+condvar or POSIX sem_t solution when all of the waiters +/// are equivalent. It is faster than a condvar or sem_t, and it has a +/// shutdown state that might save you a lot of complexity when it comes +/// time to shut down your work pipelines. LifoSem is larger than sem_t, +/// but that is only because it uses padding and alignment to avoid +/// false sharing. +/// +/// LifoSem allows multi-post and multi-tryWait, and provides a shutdown +/// state that awakens all waiters. LifoSem is faster than sem_t because +/// it performs exact wakeups, so it often requires fewer system calls. +/// It provides all of the functionality of sem_t except for timed waiting. +/// It is called LifoSem because its wakeup policy is approximately LIFO, +/// rather than the usual FIFO. +/// +/// The core semaphore operations provided are: +/// +/// -- post() -- if there is a pending waiter, wake it up, otherwise +/// increment the value of the semaphore. If the value of the semaphore +/// is already 2^32-1, does nothing. Compare to sem_post(). +/// +/// -- post(n) -- equivalent to n calls to post(), but much more efficient. +/// sem_t has no equivalent to this method. +/// +/// -- bool tryWait() -- if the semaphore's value is positive, decrements it +/// and returns true, otherwise returns false. Compare to sem_trywait(). +/// +/// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's +/// value by n, returning the amount by which it actually was decremented +/// (a value from 0 to n inclusive). Not atomic. Equivalent to n calls +/// to tryWait(). sem_t has no equivalent to this method. +/// +/// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait(). +/// +/// LifoSem also has the notion of a shutdown state, in which any calls +/// that would block (or are already blocked) throw ShutdownSemError. +/// Note the difference between a call to wait() and a call to wait() +/// that might block. In the former case tryWait() would succeed, and no +/// isShutdown() check is performed. In the latter case an exception is +/// thrown. This behavior allows a LifoSem controlling work distribution +/// to drain. If you want to immediately stop all waiting on shutdown, +/// you can just check isShutdown() yourself (preferrably wrapped in +/// an UNLIKELY). This fast-stop behavior is easy to add, but difficult +/// to remove if you want the draining behavior, which is why we have +/// chosen the former. Since wait() is the only method that can block, +/// it is the only one that is affected by the shutdown state. +/// +/// All LifoSem operations operations except valueGuess() are guaranteed +/// to be linearizable. +typedef LifoSemImpl<> LifoSem; + + +/// The exception thrown when wait()ing on an isShutdown() LifoSem +struct ShutdownSemError : public std::runtime_error { + explicit ShutdownSemError(const std::string& msg); + virtual ~ShutdownSemError() noexcept; +}; + +namespace detail { + +// Internally, a LifoSem is either a value or a linked list of wait nodes. +// This union is captured in the LifoSemHead type, which holds either a +// value or an indexed pointer to the list. LifoSemHead itself is a value +// type, the head is a mutable atomic box containing a LifoSemHead value. +// Each wait node corresponds to exactly one waiter. Values can flow +// through the semaphore either by going into and out of the head's value, +// or by direct communication from a poster to a waiter. The former path +// is taken when there are no pending waiters, the latter otherwise. The +// general flow of a post is to try to increment the value or pop-and-post +// a wait node. Either of those have the effect of conveying one semaphore +// unit. Waiting is the opposite, either a decrement of the value or +// push-and-wait of a wait node. The generic LifoSemBase abstracts the +// actual mechanism by which a wait node's post->wait communication is +// performed, which is why we have LifoSemRawNode and LifoSemNode. + +/// LifoSemRawNode is the actual pooled storage that backs LifoSemNode +/// for user-specified Handoff types. This is done so that we can have +/// a large static IndexedMemPool of nodes, instead of per-type pools +template class Atom> +struct LifoSemRawNode { + std::aligned_storage::type raw; + + /// The IndexedMemPool index of the next node in this chain, or 0 + /// if none. This will be set to uint32_t(-1) if the node is being + /// posted due to a shutdown-induced wakeup + uint32_t next; + + bool isShutdownNotice() const { return next == uint32_t(-1); } + void clearShutdownNotice() { next = 0; } + void setShutdownNotice() { next = uint32_t(-1); } + + typedef folly::IndexedMemPool,32,200,Atom> Pool; + + /// Storage for all of the waiter nodes for LifoSem-s that use Atom + static Pool pool; +}; + +/// Use this macro to declare the static storage that backs the raw nodes +/// for the specified atomic type +#define LIFOSEM_DECLARE_POOL(Atom, capacity) \ + template<> \ + folly::detail::LifoSemRawNode::Pool \ + folly::detail::LifoSemRawNode::pool((capacity)); + +/// Handoff is a type not bigger than a void* that knows how to perform a +/// single post() -> wait() communication. It must have a post() method. +/// If it has a wait() method then LifoSemBase's wait() implementation +/// will work out of the box, otherwise you will need to specialize +/// LifoSemBase::wait accordingly. +template class Atom> +struct LifoSemNode : public LifoSemRawNode { + + static_assert(sizeof(Handoff) <= sizeof(LifoSemRawNode::raw), + "Handoff too big for small-object optimization, use indirection"); + static_assert(alignof(Handoff) <= + alignof(decltype(LifoSemRawNode::raw)), + "Handoff alignment constraint not satisfied"); + + template + void init(Args&&... args) { + new (&this->raw) Handoff(std::forward(args)...); + } + + void destroy() { + handoff().~Handoff(); +#ifndef NDEBUG + memset(&this->raw, 'F', sizeof(this->raw)); +#endif + } + + Handoff& handoff() { + return *static_cast(static_cast(&this->raw)); + } + + const Handoff& handoff() const { + return *static_cast(static_cast(&this->raw)); + } +}; + +template class Atom> +struct LifoSemNodeRecycler { + void operator()(LifoSemNode* elem) const { + elem->destroy(); + auto idx = LifoSemRawNode::pool.locateElem(elem); + LifoSemRawNode::pool.recycleIndex(idx); + } +}; + +/// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state +/// bits, and a sequence number used to avoid ABA problems in the lock-free +/// management of the LifoSem's wait lists. The value can either hold +/// an integral semaphore value (if there are no waiters) or a node index +/// (see IndexedMemPool) for the head of a list of wait nodes +class LifoSemHead { + // What we really want are bitfields: + // uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31; + // Unfortunately g++ generates pretty bad code for this sometimes (I saw + // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of + // in bulk, for example). We can generate better code anyway by assuming + // that setters won't be given values that cause under/overflow, and + // putting the sequence at the end where its planned overflow doesn't + // need any masking. + // + // data == 0 (empty list) with isNodeIdx is conceptually the same + // as data == 0 (no unclaimed increments) with !isNodeIdx, we always + // convert the former into the latter to make the logic simpler. + enum { + IsNodeIdxShift = 32, + IsShutdownShift = 33, + SeqShift = 34, + }; + enum : uint64_t { + IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift, + IsShutdownMask = uint64_t(1) << IsShutdownShift, + SeqIncr = uint64_t(1) << SeqShift, + SeqMask = ~(SeqIncr - 1), + }; + + public: + + uint64_t bits; + + //////// getters + + inline uint32_t idx() const { + assert(isNodeIdx()); + assert(uint32_t(bits) != 0); + return uint32_t(bits); + } + inline uint32_t value() const { + assert(!isNodeIdx()); + return uint32_t(bits); + } + inline constexpr bool isNodeIdx() const { + return (bits & IsNodeIdxMask) != 0; + } + inline constexpr bool isShutdown() const { + return (bits & IsShutdownMask) != 0; + } + inline constexpr uint32_t seq() const { + return uint32_t(bits >> SeqShift); + } + + //////// setter-like things return a new struct + + /// This should only be used for initial construction, not for setting + /// the value, because it clears the sequence number + static inline constexpr LifoSemHead fresh(uint32_t value) { + return LifoSemHead{ value }; + } + + /// Returns the LifoSemHead that results from popping a waiter node, + /// given the current waiter node's next ptr + inline LifoSemHead withPop(uint32_t idxNext) const { + assert(isNodeIdx()); + if (idxNext == 0) { + // no isNodeIdx bit or data bits. Wraparound of seq bits is okay + return LifoSemHead{ (bits & (SeqMask | IsShutdownMask)) + SeqIncr }; + } else { + // preserve sequence bits (incremented with wraparound okay) and + // isNodeIdx bit, replace all data bits + return LifoSemHead{ + (bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) + + SeqIncr + idxNext }; + } + } + + /// Returns the LifoSemHead that results from pushing a new waiter node + inline LifoSemHead withPush(uint32_t idx) const { + assert(isNodeIdx() || value() == 0); + assert(!isShutdown()); + assert(idx != 0); + return LifoSemHead{ (bits & SeqMask) | IsNodeIdxMask | idx }; + } + + /// Returns the LifoSemHead with value increased by delta, with + /// saturation if the maximum value is reached + inline LifoSemHead withValueIncr(uint32_t delta) const { + assert(!isNodeIdx()); + auto rv = LifoSemHead{ bits + SeqIncr + delta }; + if (UNLIKELY(rv.isNodeIdx())) { + // value has overflowed into the isNodeIdx bit + rv = LifoSemHead{ (rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1) }; + } + return rv; + } + + /// Returns the LifoSemHead that results from decrementing the value + inline LifoSemHead withValueDecr(uint32_t delta) const { + assert(delta > 0 && delta <= value()); + return LifoSemHead{ bits + SeqIncr - delta }; + } + + /// Returns the LifoSemHead with the same state as the current node, + /// but with the shutdown bit set + inline LifoSemHead withShutdown() const { + return LifoSemHead{ bits | IsShutdownMask }; + } + + inline constexpr bool operator== (const LifoSemHead& rhs) const { + return bits == rhs.bits; + } + inline constexpr bool operator!= (const LifoSemHead& rhs) const { + return !(*this == rhs); + } +}; + +/// LifoSemBase is the engine for several different types of LIFO +/// semaphore. LifoSemBase handles storage of positive semaphore values +/// and wait nodes, but the actual waiting and notification mechanism is +/// up to the client. +/// +/// The Handoff type is responsible for arranging one wakeup notification. +/// See LifoSemNode for more information on how to make your own. +template class Atom = std::atomic> +struct LifoSemBase : boost::noncopyable { + + /// Constructor + explicit LifoSemBase(uint32_t initialValue = 0) + : head_(LifoSemHead::fresh(initialValue)) {} + + /// Silently saturates if value is already 2^32-1 + void post() { + auto idx = incrOrPop(1); + if (idx != 0) { + idxToNode(idx).handoff().post(); + } + } + + /// Equivalent to n calls to post(), except may be much more efficient. + /// At any point in time at which the semaphore's value would exceed + /// 2^32-1 if tracked with infinite precision, it may be silently + /// truncated to 2^32-1. This saturation is not guaranteed to be exact, + /// although it is guaranteed that overflow won't result in wrap-around. + /// There would be a substantial performance and complexity cost in + /// guaranteeing exact saturation (similar to the cost of maintaining + /// linearizability near the zero value, but without as much of + /// a benefit). + void post(uint32_t n) { + uint32_t idx; + while (n > 0 && (idx = incrOrPop(n)) != 0) { + // pop accounts for only 1 + idxToNode(idx).handoff().post(); + --n; + } + } + + /// Returns true iff shutdown() has been called + bool isShutdown() const { + return UNLIKELY(head_.load(std::memory_order_acquire).isShutdown()); + } + + /// Prevents blocking on this semaphore, causing all blocking wait() + /// calls to throw ShutdownSemError. Both currently blocked wait() and + /// future calls to wait() for which tryWait() would return false will + /// cause an exception. Calls to wait() for which the matching post() + /// has already occurred will proceed normally. + void shutdown() { + // first set the shutdown bit + auto h = head_.load(std::memory_order_acquire); + while (!h.isShutdown()) { + if (head_.compare_exchange_strong(h, h.withShutdown())) { + // success + h = h.withShutdown(); + break; + } + // compare_exchange_strong rereads h, retry + } + + // now wake up any waiters + while (h.isNodeIdx()) { + auto& node = idxToNode(h.idx()); + auto repl = h.withPop(node.next); + if (head_.compare_exchange_strong(h, repl)) { + // successful pop, wake up the waiter and move on. The next + // field is used to convey that this wakeup didn't consume a value + node.setShutdownNotice(); + node.handoff().post(); + h = repl; + } + } + } + + /// Returns true iff value was decremented + bool tryWait() { + uint32_t n = 1; + auto rv = decrOrPush(n, 0); + assert((rv == WaitResult::DECR && n == 0) || + (rv != WaitResult::DECR && n == 1)); + // SHUTDOWN is okay here, since we don't actually wait + return rv == WaitResult::DECR; + } + + /// Equivalent to (but may be much more efficient than) n calls to + /// tryWait(). Returns the total amount by which the semaphore's value + /// was decreased + uint32_t tryWait(uint32_t n) { + auto const orig = n; + while (n > 0) { +#ifndef NDEBUG + auto prev = n; +#endif + auto rv = decrOrPush(n, 0); + assert((rv == WaitResult::DECR && n < prev) || + (rv != WaitResult::DECR && n == prev)); + if (rv != WaitResult::DECR) { + break; + } + } + return orig - n; + } + + /// Blocks the current thread until there is a matching post or the + /// semaphore is shut down. Throws ShutdownSemError if the semaphore + /// has been shut down and this method would otherwise be blocking. + /// Note that wait() doesn't throw during shutdown if tryWait() would + /// return true + void wait() { + // early check isn't required for correctness, but is an important + // perf win if we can avoid allocating and deallocating a node + if (tryWait()) { + return; + } + + // allocateNode() won't compile unless Handoff has a default + // constructor + UniquePtr node = allocateNode(); + + auto rv = tryWaitOrPush(*node); + if (UNLIKELY(rv == WaitResult::SHUTDOWN)) { + assert(isShutdown()); + throw ShutdownSemError("wait() would block but semaphore is shut down"); + } + + if (rv == WaitResult::PUSH) { + node->handoff().wait(); + if (UNLIKELY(node->isShutdownNotice())) { + // this wait() didn't consume a value, it was triggered by shutdown + assert(isShutdown()); + throw ShutdownSemError( + "blocking wait() interrupted by semaphore shutdown"); + } + + // node->handoff().wait() can't return until after the node has + // been popped and post()ed, so it is okay for the UniquePtr to + // recycle the node now + } + // else node wasn't pushed, so it is safe to recycle + } + + /// Returns a guess at the current value, designed for debugging. + /// If there are no concurrent posters or waiters then this will + /// be correct + uint32_t valueGuess() const { + // this is actually linearizable, but we don't promise that because + // we may want to add striping in the future to help under heavy + // contention + auto h = head_.load(std::memory_order_acquire); + return h.isNodeIdx() ? 0 : h.value(); + } + + protected: + + enum class WaitResult { + PUSH, + DECR, + SHUTDOWN, + }; + + /// The type of a std::unique_ptr that will automatically return a + /// LifoSemNode to the appropriate IndexedMemPool + typedef std::unique_ptr, + LifoSemNodeRecycler> UniquePtr; + + /// Returns a node that can be passed to decrOrLink + template + UniquePtr allocateNode(Args&&... args) { + auto idx = LifoSemRawNode::pool.allocIndex(); + if (idx != 0) { + auto& node = idxToNode(idx); + node.clearShutdownNotice(); + try { + node.init(std::forward(args)...); + } catch (...) { + LifoSemRawNode::pool.recycleIndex(idx); + throw; + } + return UniquePtr(&node); + } else { + return UniquePtr(); + } + } + + /// Returns DECR if the semaphore value was decremented (and waiterNode + /// was untouched), PUSH if a reference to the wait node was pushed, + /// or SHUTDOWN if decrement was not possible and push wasn't allowed + /// because isShutdown(). Ownership of the wait node remains the + /// responsibility of the caller, who must not release it until after + /// the node's Handoff has been posted. + WaitResult tryWaitOrPush(LifoSemNode& waiterNode) { + uint32_t n = 1; + return decrOrPush(n, nodeToIdx(waiterNode)); + } + + private: + + folly::AtomicStruct head_ + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING; + + char padding_[folly::detail::CacheLocality::kFalseSharingRange - + sizeof(LifoSemHead)]; + + + static LifoSemNode& idxToNode(uint32_t idx) { + auto raw = &LifoSemRawNode::pool[idx]; + return *static_cast*>(raw); + } + + static uint32_t nodeToIdx(const LifoSemNode& node) { + return LifoSemRawNode::pool.locateElem(&node); + } + + /// Either increments by n and returns 0, or pops a node and returns it. + /// If n + the stripe's value overflows, then the stripe's value + /// saturates silently at 2^32-1 + uint32_t incrOrPop(uint32_t n) { + while (true) { + assert(n > 0); + + auto head = head_.load(std::memory_order_acquire); + if (head.isNodeIdx()) { + auto& node = idxToNode(head.idx()); + if (head_.compare_exchange_strong(head, head.withPop(node.next))) { + // successful pop + return head.idx(); + } + } else { + auto after = head.withValueIncr(n); + if (head_.compare_exchange_strong(head, after)) { + // successful incr + return 0; + } + } + // retry + } + } + + /// Returns DECR if some amount was decremented, with that amount + /// subtracted from n. If n is 1 and this function returns DECR then n + /// must be 0 afterward. Returns PUSH if no value could be decremented + /// and idx was pushed, or if idx was zero and no push was performed but + /// a push would have been performed with a valid node. Returns SHUTDOWN + /// if the caller should have blocked but isShutdown(). If idx == 0, + /// may return PUSH even after isShutdown() or may return SHUTDOWN + WaitResult decrOrPush(uint32_t& n, uint32_t idx) { + assert(n > 0); + + while (true) { + auto head = head_.load(std::memory_order_acquire); + + if (!head.isNodeIdx() && head.value() > 0) { + // decr + auto delta = std::min(n, head.value()); + if (head_.compare_exchange_strong(head, head.withValueDecr(delta))) { + n -= delta; + return WaitResult::DECR; + } + } else { + // push + if (idx == 0) { + return WaitResult::PUSH; + } + + if (UNLIKELY(head.isShutdown())) { + return WaitResult::SHUTDOWN; + } + + auto& node = idxToNode(idx); + node.next = head.isNodeIdx() ? head.idx() : 0; + if (head_.compare_exchange_strong(head, head.withPush(idx))) { + // push succeeded + return WaitResult::PUSH; + } + } + } + // retry + } +}; + +} // namespace detail + +template class Atom, class BatonType> +struct LifoSemImpl : public detail::LifoSemBase { + explicit LifoSemImpl(uint32_t v = 0) + : detail::LifoSemBase(v) {} +}; + +} // namespace folly + +#endif diff --git a/folly/test/LifoSemTests.cpp b/folly/test/LifoSemTests.cpp new file mode 100644 index 00000000..7bf1cce5 --- /dev/null +++ b/folly/test/LifoSemTests.cpp @@ -0,0 +1,437 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/LifoSem.h" +#include "folly/test/DeterministicSchedule.h" + +#include +#include +#include +#include + +#include "folly/Benchmark.h" +#include "folly/Random.h" + +using namespace folly; +using namespace folly::test; + +typedef LifoSemImpl DLifoSem; +typedef DeterministicSchedule DSched; + +LIFOSEM_DECLARE_POOL(DeterministicAtomic, 100000) + +TEST(LifoSem, basic) { + LifoSem sem; + EXPECT_FALSE(sem.tryWait()); + sem.post(); + EXPECT_TRUE(sem.tryWait()); + sem.post(); + sem.wait(); +} + +TEST(LifoSem, multi) { + LifoSem sem; + + const int opsPerThread = 10000; + std::thread threads[10]; + std::atomic blocks(0); + + for (auto& thr : threads) { + thr = std::thread([&]{ + int b = 0; + for (int i = 0; i < opsPerThread; ++i) { + if (!sem.tryWait()) { + sem.wait(); + ++b; + } + sem.post(); + } + blocks += b; + }); + } + + // start the flood + sem.post(); + + for (auto& thr : threads) { + thr.join(); + } + + LOG(INFO) << opsPerThread * sizeof(threads)/sizeof(threads[0]) + << " post/wait pairs, " << blocks << " blocked"; +} + +TEST(LifoSem, pingpong) { + DSched sched(DSched::uniform(0)); + + const int iters = 100; + + for (int pass = 0; pass < 10; ++pass) { + DLifoSem a; + DLifoSem b; + + auto thr = DSched::thread([&]{ + for (int i = 0; i < iters; ++i) { + a.wait(); + // main thread can't be running here + EXPECT_EQ(a.valueGuess(), 0); + EXPECT_EQ(b.valueGuess(), 0); + b.post(); + } + }); + for (int i = 0; i < iters; ++i) { + a.post(); + b.wait(); + // child thread can't be running here + EXPECT_EQ(a.valueGuess(), 0); + EXPECT_EQ(b.valueGuess(), 0); + } + DSched::join(thr); + } +} + +TEST(LifoSem, mutex) { + DSched sched(DSched::uniform(0)); + + const int iters = 100; + + for (int pass = 0; pass < 10; ++pass) { + DLifoSem a; + + auto thr = DSched::thread([&]{ + for (int i = 0; i < iters; ++i) { + a.wait(); + a.post(); + } + }); + for (int i = 0; i < iters; ++i) { + a.post(); + a.wait(); + } + a.post(); + DSched::join(thr); + a.wait(); + } +} + +TEST(LifoSem, no_blocking) { + long seed = folly::randomNumberSeed() % 10000; + LOG(INFO) << "seed=" << seed; + DSched sched(DSched::uniform(seed)); + + const int iters = 100; + const int numThreads = 2; + const int width = 10; + + for (int pass = 0; pass < 10; ++pass) { + DLifoSem a; + + std::vector threads; + while (threads.size() < numThreads) { + threads.emplace_back(DSched::thread([&]{ + for (int i = 0; i < iters; ++i) { + a.post(width); + for (int w = 0; w < width; ++w) { + a.wait(); + } + } + })); + } + for (auto& thr : threads) { + DSched::join(thr); + } + } +} + +TEST(LifoSem, one_way) { + long seed = folly::randomNumberSeed() % 10000; + LOG(INFO) << "seed=" << seed; + DSched sched(DSched::uniformSubset(seed, 1, 6)); + + const int iters = 1000; + + for (int pass = 0; pass < 10; ++pass) { + DLifoSem a; + + auto thr = DSched::thread([&]{ + for (int i = 0; i < iters; ++i) { + a.wait(); + } + }); + for (int i = 0; i < iters; ++i) { + a.post(); + } + DSched::join(thr); + } +} + +TEST(LifoSem, shutdown_race) { + long seed = folly::randomNumberSeed() % 10000; + LOG(INFO) << "seed=" << seed; + + bool shutdownWon = false; + bool shutdownLost = false; + for (int pass = 0; pass < 1000; ++pass) { + DSched sched(DSched::uniformSubset(seed + pass, 1, 1 + (pass % 50))); + + DLifoSem a; + int waitCount = 0; + auto thr = DSched::thread([&]{ + try { + while (true) { + a.wait(); + ++waitCount; + } + } catch (ShutdownSemError& x) { + // expected + EXPECT_TRUE(a.isShutdown()); + } + }); + EXPECT_TRUE(!a.isShutdown()); + a.shutdown(); + EXPECT_TRUE(a.isShutdown()); + a.post(); + DSched::join(thr); + EXPECT_EQ(1, waitCount + a.valueGuess()); + if (waitCount == 0) { + shutdownWon = true; + } else { + shutdownLost = true; + } + } + EXPECT_TRUE(shutdownWon); + EXPECT_TRUE(shutdownLost); +} + +TEST(LifoSem, shutdown_multi) { + DSched sched(DSched::uniform(0)); + + for (int pass = 0; pass < 10; ++pass) { + DLifoSem a; + std::vector threads; + while (threads.size() < 20) { + threads.push_back(DSched::thread([&]{ + try { + a.wait(); + EXPECT_TRUE(false); + } catch (ShutdownSemError& x) { + // expected + EXPECT_TRUE(a.isShutdown()); + } + })); + } + a.shutdown(); + for (auto& thr : threads) { + DSched::join(thr); + } + } +} + +TEST(LifoSem, multi_try_wait_simple) { + LifoSem sem; + sem.post(5); + auto n = sem.tryWait(10); // this used to trigger an assert + ASSERT_EQ(5, n); +} + +TEST(LifoSem, multi_try_wait) { + long seed = folly::randomNumberSeed() % 10000; + LOG(INFO) << "seed=" << seed; + DSched sched(DSched::uniform(seed)); + DLifoSem sem; + + const int NPOSTS = 1000; + + auto producer = [&]{ + for (int i=0; i consumer_stop; + int consumed = 0; + + auto consumer = [&]{ + bool stop; + do { + stop = consumer_stop.load(); + int n; + do { + n = sem.tryWait(10); + consumed += n; + } while (n > 0); + } while (!stop); + }; + + std::thread producer_thread(DSched::thread(producer)); + std::thread consumer_thread(DSched::thread(consumer)); + DSched::join(producer_thread); + consumer_stop.store(true); + DSched::join(consumer_thread); + + ASSERT_EQ(NPOSTS, consumed); +} + +BENCHMARK(lifo_sem_pingpong, iters) { + LifoSem a; + LifoSem b; + auto thr = std::thread([&]{ + for (int i = 0; i < iters; ++i) { + a.wait(); + b.post(); + } + }); + for (int i = 0; i < iters; ++i) { + a.post(); + b.wait(); + } + thr.join(); +} + +BENCHMARK(lifo_sem_oneway, iters) { + LifoSem a; + auto thr = std::thread([&]{ + for (int i = 0; i < iters; ++i) { + a.wait(); + } + }); + for (int i = 0; i < iters; ++i) { + a.post(); + } + thr.join(); +} + +BENCHMARK(single_thread_lifo_post, iters) { + LifoSem sem; + for (int n = 0; n < iters; ++n) { + sem.post(); + asm volatile ("":::"memory"); + } +} + +BENCHMARK(single_thread_lifo_wait, iters) { + LifoSem sem(iters); + for (int n = 0; n < iters; ++n) { + sem.wait(); + asm volatile ("":::"memory"); + } +} + +BENCHMARK(single_thread_lifo_postwait, iters) { + LifoSem sem; + for (int n = 0; n < iters; ++n) { + sem.post(); + asm volatile ("":::"memory"); + sem.wait(); + asm volatile ("":::"memory"); + } +} + +BENCHMARK(single_thread_lifo_trywait, iters) { + LifoSem sem; + for (int n = 0; n < iters; ++n) { + EXPECT_FALSE(sem.tryWait()); + asm volatile ("":::"memory"); + } +} + +BENCHMARK(single_thread_posix_postwait, iters) { + sem_t sem; + EXPECT_EQ(sem_init(&sem, 0, 0), 0); + for (int n = 0; n < iters; ++n) { + EXPECT_EQ(sem_post(&sem), 0); + EXPECT_EQ(sem_wait(&sem), 0); + } + EXPECT_EQ(sem_destroy(&sem), 0); +} + +BENCHMARK(single_thread_posix_trywait, iters) { + sem_t sem; + EXPECT_EQ(sem_init(&sem, 0, 0), 0); + for (int n = 0; n < iters; ++n) { + EXPECT_EQ(sem_trywait(&sem), -1); + } + EXPECT_EQ(sem_destroy(&sem), 0); +} + +static void contendedUse(uint n, int posters, int waiters) { + LifoSemImpl sem; + + std::vector threads; + std::atomic go(false); + + BENCHMARK_SUSPEND { + for (int t = 0; t < waiters; ++t) { + threads.emplace_back([=,&sem] { + for (uint i = t; i < n; i += waiters) { + sem.wait(); + } + }); + } + for (int t = 0; t < posters; ++t) { + threads.emplace_back([=,&sem,&go] { + while (!go.load()) { + std::this_thread::yield(); + } + for (uint i = t; i < n; i += posters) { + sem.post(); + } + }); + } + } + + go.store(true); + for (auto& thr : threads) { + thr.join(); + } +} + +BENCHMARK_DRAW_LINE() +BENCHMARK_NAMED_PARAM(contendedUse, 1_to_1, 1, 1) +BENCHMARK_NAMED_PARAM(contendedUse, 1_to_32, 1, 32) +BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1, 31, 1) +BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16) +BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32) +BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000) + +// sudo nice -n -20 tao/queues/LifoSemTests --benchmark --bm_min_iters=10000000 +// ============================================================================ +// tao/queues/LifoSemTests.cpp relative time/iter iters/s +// ============================================================================ +// lifo_sem_pingpong 1.91us 522.92K +// lifo_sem_oneway 211.18ns 4.74M +// single_thread_lifo_post 19.71ns 50.75M +// single_thread_lifo_wait 18.84ns 53.09M +// single_thread_lifo_postwait 39.41ns 25.37M +// single_thread_lifo_trywait 912.10ps 1.10G +// single_thread_posix_postwait 32.93ns 30.37M +// single_thread_posix_trywait 10.06ns 99.36M +// ---------------------------------------------------------------------------- +// contendedUse(1_to_1) 208.21ns 4.80M +// contendedUse(1_to_32) 532.41ns 1.88M +// contendedUse(32_to_1) 153.74ns 6.50M +// contendedUse(16_to_16) 301.86ns 3.31M +// contendedUse(32_to_32) 268.32ns 3.73M +// contendedUse(32_to_1000) 966.27ns 1.03M +// ============================================================================ + +int main(int argc, char ** argv) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + int rv = RUN_ALL_TESTS(); + folly::runBenchmarksOnFlag(); + return rv; +} -- 2.34.1