Multi-producer multi-consumer queue with optional blocking
authorNathan Bronson <ngbronson@fb.com>
Fri, 28 Jun 2013 20:42:14 +0000 (13:42 -0700)
committerSara Golemon <sgolemon@fb.com>
Mon, 1 Jul 2013 19:57:43 +0000 (12:57 -0700)
Summary:
MPMCQueue<T> is a high-performance bounded concurrent queue that
supports multiple producers, multiple consumers, and optional blocking.
The queue has a fixed capacity, for which all memory will be allocated
up front.  The bulk of the work of enqueuing and dequeuing can be
performed in parallel.

To make an MPMCQueue<T>, T must satisfy either of two conditions:
- it has been tagged FOLLY_ASSUME_FBVECTOR_COMPATIBLE; or
- both the constructor used during enqueue and the move operator are
marked noexcept.

This diff extracts the generic component from tao/queues/ConcurrentQueue
and renames identifiers to match those of existing folly queues.
It also includes an extraction of Futex, which wraps the futex syscall,
and DeterministicScheduler, which allows for deterministic exploration
of thread interleavings for components built from std::atomic and Futex.

Test Plan: new unit tests

Reviewed By: tudorb@fb.com

FB internal diff: D866566

folly/MPMCQueue.h [new file with mode: 0644]
folly/detail/Futex.h [new file with mode: 0644]
folly/test/DeterministicSchedule.cpp [new file with mode: 0644]
folly/test/DeterministicSchedule.h [new file with mode: 0644]
folly/test/DeterministicScheduleTest.cpp [new file with mode: 0644]
folly/test/FutexTest.cpp [new file with mode: 0644]
folly/test/MPMCQueueTest.cpp [new file with mode: 0644]

diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h
new file mode 100644 (file)
index 0000000..9fb7867
--- /dev/null
@@ -0,0 +1,852 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <atomic>
+#include <assert.h>
+#include <boost/noncopyable.hpp>
+#include <errno.h>
+#include <limits>
+#include <linux/futex.h>
+#include <string.h>
+#include <sys/syscall.h>
+#include <unistd.h>
+
+#include <folly/Traits.h>
+#include <folly/detail/Futex.h>
+
+namespace folly {
+
+namespace detail {
+
+template<typename T, template<typename> class Atom>
+class SingleElementQueue;
+
+} // namespace detail
+
+/// MPMCQueue<T> is a high-performance bounded concurrent queue that
+/// supports multiple producers, multiple consumers, and optional blocking.
+/// The queue has a fixed capacity, for which all memory will be allocated
+/// up front.  The bulk of the work of enqueuing and dequeuing can be
+/// performed in parallel.
+///
+/// The underlying implementation uses a ticket dispenser for the head and
+/// the tail, spreading accesses across N single-element queues to produce
+/// a queue with capacity N.  The ticket dispensers use atomic increment,
+/// which is more robust to contention than a CAS loop.  Each of the
+/// single-element queues uses its own CAS to serialize access, with an
+/// adaptive spin cutoff.  When spinning fails on a single-element queue
+/// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
+/// even if multiple waiters are present on an individual queue (such as
+/// when the MPMCQueue's capacity is smaller than the number of enqueuers
+/// or dequeuers).
+///
+/// NOEXCEPT INTERACTION: Ticket-based queues separate the assignment
+/// of In benchmarks (contained in tao/queues/ConcurrentQueueTests)
+/// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
+/// than any of the alternatives present in fbcode, for both small (~10)
+/// and large capacities.  In these benchmarks it is also faster than
+/// tbb::concurrent_bounded_queue for all configurations.  When there are
+/// many more threads than cores, MPMCQueue is _much_ faster than the tbb
+/// queue because it uses futex() to block and unblock waiting threads,
+/// rather than spinning with sched_yield.
+///
+/// queue positions from the actual construction of the in-queue elements,
+/// which means that the T constructor used during enqueue must not throw
+/// an exception.  This is enforced at compile time using type traits,
+/// which requires that T be adorned with accurate noexcept information.
+/// If your type does not use noexcept, you will have to wrap it in
+/// something that provides the guarantee.  We provide an alternate
+/// safe implementation for types that don't use noexcept but that are
+/// marked folly::IsRelocatable and boost::has_nothrow_constructor,
+/// which is common for folly types.  In particular, if you can declare
+/// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in
+/// MPMCQueue.
+template<typename T,
+         template<typename> class Atom = std::atomic,
+         typename = typename std::enable_if<
+             std::is_nothrow_constructible<T,T&&>::value ||
+             folly::IsRelocatable<T>::value>::type>
+class MPMCQueue : boost::noncopyable {
+ public:
+  typedef T value_type;
+
+  explicit MPMCQueue(size_t capacity)
+    : capacity_(capacity)
+    , slots_(new detail::SingleElementQueue<T,Atom>[capacity +
+                                                    2 * kSlotPadding])
+    , stride_(computeStride(capacity))
+    , pushTicket_(0)
+    , popTicket_(0)
+    , pushSpinCutoff_(0)
+    , popSpinCutoff_(0)
+  {
+    // ideally this would be a static assert, but g++ doesn't allow it
+    assert(alignof(MPMCQueue<T,Atom>) >= kFalseSharingRange);
+  }
+
+  /// A default-constructed queue is useful because a usable (non-zero
+  /// capacity) queue can be moved onto it or swapped with it
+  MPMCQueue() noexcept
+    : capacity_(0)
+    , slots_(nullptr)
+    , stride_(0)
+    , pushTicket_(0)
+    , popTicket_(0)
+    , pushSpinCutoff_(0)
+    , popSpinCutoff_(0)
+  {}
+
+  /// IMPORTANT: The move constructor is here to make it easier to perform
+  /// the initialization phase, it is not safe to use when there are any
+  /// concurrent accesses (this is not checked).
+  MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
+    : capacity_(rhs.capacity_)
+    , slots_(rhs.slots_)
+    , stride_(rhs.stride_)
+    , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
+    , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
+    , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
+    , popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed))
+  {
+    // relaxed ops are okay for the previous reads, since rhs queue can't
+    // be in concurrent use
+
+    // zero out rhs
+    rhs.capacity_ = 0;
+    rhs.slots_ = nullptr;
+    rhs.stride_ = 0;
+    rhs.pushTicket_.store(0, std::memory_order_relaxed);
+    rhs.popTicket_.store(0, std::memory_order_relaxed);
+    rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
+    rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
+  }
+
+  /// IMPORTANT: The move operator is here to make it easier to perform
+  /// the initialization phase, it is not safe to use when there are any
+  /// concurrent accesses (this is not checked).
+  MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
+    if (this != &rhs) {
+      this->~MPMCQueue();
+      new (this) MPMCQueue(std::move(rhs));
+    }
+    return *this;
+  }
+
+  /// MPMCQueue can only be safely destroyed when there are no
+  /// pending enqueuers or dequeuers (this is not checked).
+  ~MPMCQueue() {
+    delete[] slots_;
+  }
+
+  /// Returns the number of successful reads minus the number of successful
+  /// writes.  Waiting blockingRead and blockingWrite calls are included,
+  /// so this value can be negative.
+  ssize_t size() const noexcept {
+    // since both pushes and pops increase monotonically, we can get a
+    // consistent snapshot either by bracketing a read of popTicket_ with
+    // two reads of pushTicket_ that return the same value, or the other
+    // way around.  We maximize our chances by alternately attempting
+    // both bracketings.
+    uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
+    uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
+    while (true) {
+      uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
+      if (pushes == nextPushes) {
+        // pushTicket_ didn't change from A (or the previous C) to C,
+        // so we can linearize at B (or D)
+        return pushes - pops;
+      }
+      pushes = nextPushes;
+      uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
+      if (pops == nextPops) {
+        // popTicket_ didn't chance from B (or the previous D), so we
+        // can linearize at C
+        return pushes - pops;
+      }
+      pops = nextPops;
+    }
+  }
+
+  /// Returns true if there are no items available for dequeue
+  bool isEmpty() const noexcept {
+    return size() <= 0;
+  }
+
+  /// Returns true if there is currently no empty space to enqueue
+  bool isFull() const noexcept {
+    // careful with signed -> unsigned promotion, since size can be negative
+    return size() >= static_cast<ssize_t>(capacity_);
+  }
+
+  /// Returns is a guess at size() for contexts that don't need a precise
+  /// value, such as stats.
+  uint64_t sizeGuess() const noexcept {
+    return writeCount() - readCount();
+  }
+
+  /// Doesn't change
+  size_t capacity() const noexcept {
+    return capacity_;
+  }
+
+  /// Returns the total number of calls to blockingWrite or successful
+  /// calls to write, including those blockingWrite calls that are
+  /// currently blocking
+  uint64_t writeCount() const noexcept {
+    return pushTicket_.load(std::memory_order_acquire);
+  }
+
+  /// Returns the total number of calls to blockingRead or successful
+  /// calls to read, including those blockingRead calls that are currently
+  /// blocking
+  uint64_t readCount() const noexcept {
+    return popTicket_.load(std::memory_order_acquire);
+  }
+
+  /// Enqueues a T constructed from args, blocking until space is
+  /// available.  Note that this method signature allows enqueue via
+  /// move, if args is a T rvalue, via copy, if args is a T lvalue, or
+  /// via emplacement if args is an initializer list that can be passed
+  /// to a T constructor.
+  template <typename ...Args>
+  void blockingWrite(Args&&... args) noexcept {
+    enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
+  }
+
+  /// If an item can be enqueued with no blocking, does so and returns
+  /// true, otherwise returns false.  This method is similar to
+  /// writeIfNotFull, but if you don't have a specific need for that
+  /// method you should use this one.
+  ///
+  /// One of the common usages of this method is to enqueue via the
+  /// move constructor, something like q.write(std::move(x)).  If write
+  /// returns false because the queue is full then x has not actually been
+  /// consumed, which looks strange.  To understand why it is actually okay
+  /// to use x afterward, remember that std::move is just a typecast that
+  /// provides an rvalue reference that enables use of a move constructor
+  /// or operator.  std::move doesn't actually move anything.  It could
+  /// more accurately be called std::rvalue_cast or std::move_permission.
+  template <typename ...Args>
+  bool write(Args&&... args) noexcept {
+    uint64_t ticket;
+    if (tryObtainReadyPushTicket(ticket)) {
+      // we have pre-validated that the ticket won't block
+      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /// If the queue is not full, enqueues and returns true, otherwise
+  /// returns false.  Unlike write this method can be blocked by another
+  /// thread, specifically a read that has linearized (been assigned
+  /// a ticket) but not yet completed.  If you don't really need this
+  /// function you should probably use write.
+  ///
+  /// MPMCQueue isn't lock-free, so just because a read operation has
+  /// linearized (and isFull is false) doesn't mean that space has been
+  /// made available for another write.  In this situation write will
+  /// return false, but writeIfNotFull will wait for the dequeue to finish.
+  /// This method is required if you are composing queues and managing
+  /// your own wakeup, because it guarantees that after every successful
+  /// write a readIfNotFull will succeed.
+  template <typename ...Args>
+  bool writeIfNotFull(Args&&... args) noexcept {
+    uint64_t ticket;
+    if (tryObtainPromisedPushTicket(ticket)) {
+      // some other thread is already dequeuing the slot into which we
+      // are going to enqueue, but we might have to wait for them to finish
+      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /// Moves a dequeued element onto elem, blocking until an element
+  /// is available
+  void blockingRead(T& elem) noexcept {
+    dequeueWithTicket(popTicket_++, elem);
+  }
+
+  /// If an item can be dequeued with no blocking, does so and returns
+  /// true, otherwise returns false.
+  bool read(T& elem) noexcept {
+    uint64_t ticket;
+    if (tryObtainReadyPopTicket(ticket)) {
+      // the ticket has been pre-validated to not block
+      dequeueWithTicket(ticket, elem);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /// If the queue is not empty, dequeues and returns true, otherwise
+  /// returns false.  If the matching write is still in progress then this
+  /// method may block waiting for it.  If you don't rely on being able
+  /// to dequeue (such as by counting completed write) then you should
+  /// prefer read.
+  bool readIfNotEmpty(T& elem) noexcept {
+    uint64_t ticket;
+    if (tryObtainPromisedPopTicket(ticket)) {
+      // the matching enqueue already has a ticket, but might not be done
+      dequeueWithTicket(ticket, elem);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+ private:
+  enum {
+    /// Once every kAdaptationFreq we will spin longer, to try to estimate
+    /// the proper spin backoff
+    kAdaptationFreq = 128,
+
+    /// Memory locations on the same cache line are subject to false
+    /// sharing, which is very bad for performance
+    kFalseSharingRange = 64,
+
+    /// To avoid false sharing in slots_ with neighboring memory
+    /// allocations, we pad it with this many SingleElementQueue-s at
+    /// each end
+    kSlotPadding = 1 +
+        (kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue<T,Atom>)
+  };
+
+#define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(kFalseSharingRange)))
+
+  /// The maximum number of items in the queue at once
+  size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE;
+
+  /// An array of capacity_ SingleElementQueue-s, each of which holds
+  /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
+  /// touch the slots at either end, to avoid false sharing
+  detail::SingleElementQueue<T,Atom>* slots_;
+
+  /// The number of slots_ indices that we advance for each ticket, to
+  /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
+  /// aren't on the same cache line
+  int stride_;
+
+  /// Enqueuers get tickets from here
+  Atom<uint64_t> pushTicket_ FOLLY_ON_NEXT_CACHE_LINE;
+
+  /// Dequeuers get tickets from here
+  Atom<uint64_t> popTicket_ FOLLY_ON_NEXT_CACHE_LINE;
+
+  /// This is how many times we will spin before using FUTEX_WAIT when
+  /// the queue is full on enqueue, adaptively computed by occasionally
+  /// spinning for longer and smoothing with an exponential moving average
+  Atom<int> pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
+
+  /// The adaptive spin cutoff when the queue is empty on dequeue
+  Atom<int> popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
+
+  /// Alignment doesn't avoid false sharing at the end of the struct,
+  /// so fill out the last cache line
+  char padding_[kFalseSharingRange - sizeof(Atom<int>)];
+
+#undef FOLLY_ON_NEXT_CACHE_LINE
+
+  /// We assign tickets in increasing order, but we don't want to
+  /// access neighboring elements of slots_ because that will lead to
+  /// false sharing (multiple cores accessing the same cache line even
+  /// though they aren't accessing the same bytes in that cache line).
+  /// To avoid this we advance by stride slots per ticket.
+  ///
+  /// We need gcd(capacity, stride) to be 1 so that we will use all
+  /// of the slots.  We ensure this by only considering prime strides,
+  /// which either have no common divisors with capacity or else have
+  /// a zero remainder after dividing by capacity.  That is sufficient
+  /// to guarantee correctness, but we also want to actually spread the
+  /// accesses away from each other to avoid false sharing (consider a
+  /// stride of 7 with a capacity of 8).  To that end we try a few taking
+  /// care to observe that advancing by -1 is as bad as advancing by 1
+  /// when in comes to false sharing.
+  ///
+  /// The simple way to avoid false sharing would be to pad each
+  /// SingleElementQueue, but since we have capacity_ of them that could
+  /// waste a lot of space.
+  static int computeStride(size_t capacity) noexcept {
+    static const int smallPrimes[] = { 2, 3, 5, 7, 11, 13, 17, 19, 23 };
+
+    int bestStride = 1;
+    size_t bestSep = 1;
+    for (int stride : smallPrimes) {
+      if ((stride % capacity) == 0 || (capacity % stride) == 0) {
+        continue;
+      }
+      size_t sep = stride % capacity;
+      sep = std::min(sep, capacity - sep);
+      if (sep > bestSep) {
+        bestStride = stride;
+        bestSep = sep;
+      }
+    }
+    return bestStride;
+  }
+
+  /// Returns the index into slots_ that should be used when enqueuing or
+  /// dequeuing with the specified ticket
+  size_t idx(uint64_t ticket) noexcept {
+    return ((ticket * stride_) % capacity_) + kSlotPadding;
+  }
+
+  /// Maps an enqueue or dequeue ticket to the turn should be used at the
+  /// corresponding SingleElementQueue
+  uint32_t turn(uint64_t ticket) noexcept {
+    return ticket / capacity_;
+  }
+
+  /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
+  /// won't block.  Returns true on immediate success, false on immediate
+  /// failure.
+  bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
+    auto ticket = pushTicket_.load(std::memory_order_acquire); // A
+    while (true) {
+      if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
+        // if we call enqueue(ticket, ...) on the SingleElementQueue
+        // right now it would block, but this might no longer be the next
+        // ticket.  We can increase the chance of tryEnqueue success under
+        // contention (without blocking) by rechecking the ticket dispenser
+        auto prev = ticket;
+        ticket = pushTicket_.load(std::memory_order_acquire); // B
+        if (prev == ticket) {
+          // mayEnqueue was bracketed by two reads (A or prev B or prev
+          // failing CAS to B), so we are definitely unable to enqueue
+          return false;
+        }
+      } else {
+        // we will bracket the mayEnqueue check with a read (A or prev B
+        // or prev failing CAS) and the following CAS.  If the CAS fails
+        // it will effect a load of pushTicket_
+        if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          rv = ticket;
+          return true;
+        }
+      }
+    }
+  }
+
+  /// Tries to obtain a push ticket which can be satisfied if all
+  /// in-progress pops complete.  This function does not block, but
+  /// blocking may be required when using the returned ticket if some
+  /// other thread's pop is still in progress (ticket has been granted but
+  /// pop has not yet completed).
+  bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
+    auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
+    while (true) {
+      auto numPops = popTicket_.load(std::memory_order_acquire); // B
+      // n will be negative if pops are pending
+      int64_t n = numPushes - numPops;
+      if (n >= static_cast<ssize_t>(capacity_)) {
+        // Full, linearize at B.  We don't need to recheck the read we
+        // performed at A, because if numPushes was stale at B then the
+        // real numPushes value is even worse
+        return false;
+      }
+      if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
+        rv = numPushes;
+        return true;
+      }
+    }
+  }
+
+  /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
+  /// won't block.  Returns true on immediate success, false on immediate
+  /// failure.
+  bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
+    auto ticket = popTicket_.load(std::memory_order_acquire);
+    while (true) {
+      if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
+        auto prev = ticket;
+        ticket = popTicket_.load(std::memory_order_acquire);
+        if (prev == ticket) {
+          return false;
+        }
+      } else {
+        if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          rv = ticket;
+          return true;
+        }
+      }
+    }
+  }
+
+  /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
+  /// corresponding push ticket has already been handed out, rather than
+  /// returning one whose corresponding push ticket has already been
+  /// completed.  This means that there is a possibility that the caller
+  /// will block when using the ticket, but it allows the user to rely on
+  /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket
+  /// will return true.  The "try" part of this is that we won't have
+  /// to block waiting for someone to call enqueue, although we might
+  /// have to block waiting for them to finish executing code inside the
+  /// MPMCQueue itself.
+  bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
+    auto numPops = popTicket_.load(std::memory_order_acquire); // A
+    while (true) {
+      auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
+      if (numPops >= numPushes) {
+        // Empty, or empty with pending pops.  Linearize at B.  We don't
+        // need to recheck the read we performed at A, because if numPops
+        // is stale then the fresh value is larger and the >= is still true
+        return false;
+      }
+      if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
+        rv = numPops;
+        return true;
+      }
+    }
+  }
+
+  // Given a ticket, constructs an enqueued item using args
+  template <typename ...Args>
+  void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
+    slots_[idx(ticket)].enqueue(turn(ticket),
+                                pushSpinCutoff_,
+                                (ticket % kAdaptationFreq) == 0,
+                                std::forward<Args>(args)...);
+  }
+
+  // Given a ticket, dequeues the corresponding element
+  void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
+    slots_[idx(ticket)].dequeue(turn(ticket),
+                                popSpinCutoff_,
+                                (ticket % kAdaptationFreq) == 0,
+                                elem);
+  }
+};
+
+
+namespace detail {
+
+/// A TurnSequencer allows threads to order their execution according to
+/// a monotonically increasing (with wraparound) "turn" value.  The two
+/// operations provided are to wait for turn T, and to move to the next
+/// turn.  Every thread that is waiting for T must have arrived before
+/// that turn is marked completed (for MPMCQueue only one thread waits
+/// for any particular turn, so this is trivially true).
+///
+/// TurnSequencer's state_ holds 26 bits of the current turn (shifted
+/// left by 6), along with a 6 bit saturating value that records the
+/// maximum waiter minus the current turn.  Wraparound of the turn space
+/// is expected and handled.  This allows us to atomically adjust the
+/// number of outstanding waiters when we perform a FUTEX_WAKE operation.
+/// Compare this strategy to sem_t's separate num_waiters field, which
+/// isn't decremented until after the waiting thread gets scheduled,
+/// during which time more enqueues might have occurred and made pointless
+/// FUTEX_WAKE calls.
+///
+/// TurnSequencer uses futex() directly.  It is optimized for the
+/// case that the highest awaited turn is 32 or less higher than the
+/// current turn.  We use the FUTEX_WAIT_BITSET variant, which lets
+/// us embed 32 separate wakeup channels in a single futex.  See
+/// http://locklessinc.com/articles/futex_cheat_sheet for a description.
+///
+/// We only need to keep exact track of the delta between the current
+/// turn and the maximum waiter for the 32 turns that follow the current
+/// one, because waiters at turn t+32 will be awoken at turn t.  At that
+/// point they can then adjust the delta using the higher base.  Since we
+/// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits.
+/// We actually store waiter deltas up to 63, since that might reduce
+/// the number of CAS operations a tiny bit.
+///
+/// To avoid some futex() calls entirely, TurnSequencer uses an adaptive
+/// spin cutoff before waiting.  The overheads (and convergence rate)
+/// of separately tracking the spin cutoff for each TurnSequencer would
+/// be prohibitive, so the actual storage is passed in as a parameter and
+/// updated atomically.  This also lets the caller use different adaptive
+/// cutoffs for different operations (read versus write, for example).
+/// To avoid contention, the spin cutoff is only updated when requested
+/// by the caller.
+template <template<typename> class Atom>
+struct TurnSequencer {
+  explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept
+      : state_(encode(firstTurn << kTurnShift, 0))
+  {}
+
+  /// Returns true iff a call to waitForTurn(turn, ...) won't block
+  bool isTurn(const uint32_t turn) const noexcept {
+    auto state = state_.load(std::memory_order_acquire);
+    return decodeCurrentSturn(state) == (turn << kTurnShift);
+  }
+
+  // Internally we always work with shifted turn values, which makes the
+  // truncation and wraparound work correctly.  This leaves us bits at
+  // the bottom to store the number of waiters.  We call shifted turns
+  // "sturns" inside this class.
+
+  /// Blocks the current thread until turn has arrived.  If
+  /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
+  /// before blocking and will adjust spinCutoff based on the results,
+  /// otherwise it will spin for at most spinCutoff spins.
+  void waitForTurn(const uint32_t turn,
+                   Atom<int>& spinCutoff,
+                   const bool updateSpinCutoff) noexcept {
+    int prevThresh = spinCutoff.load(std::memory_order_relaxed);
+    const int effectiveSpinCutoff =
+        updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
+    int tries;
+
+    const uint32_t sturn = turn << kTurnShift;
+    for (tries = 0; ; ++tries) {
+      uint32_t state = state_.load(std::memory_order_acquire);
+      uint32_t current_sturn = decodeCurrentSturn(state);
+      if (current_sturn == sturn) {
+        break;
+      }
+
+      // wrap-safe version of assert(current_sturn < sturn)
+      assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
+
+      // the first effectSpinCutoff tries are spins, after that we will
+      // record ourself as a waiter and block with futexWait
+      if (tries < effectiveSpinCutoff) {
+        asm volatile ("pause");
+        continue;
+      }
+
+      uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
+      uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
+      uint32_t new_state;
+      if (our_waiter_delta <= current_max_waiter_delta) {
+        // state already records us as waiters, probably because this
+        // isn't our first time around this loop
+        new_state = state;
+      } else {
+        new_state = encode(current_sturn, our_waiter_delta);
+        if (state != new_state &&
+            !state_.compare_exchange_strong(state, new_state)) {
+          continue;
+        }
+      }
+      state_.futexWait(new_state, futexChannel(turn));
+    }
+
+    if (updateSpinCutoff || prevThresh == 0) {
+      // if we hit kMaxSpins then spinning was pointless, so the right
+      // spinCutoff is kMinSpins
+      int target;
+      if (tries >= kMaxSpins) {
+        target = kMinSpins;
+      } else {
+        // to account for variations, we allow ourself to spin 2*N when
+        // we think that N is actually required in order to succeed
+        target = std::min(int{kMaxSpins}, std::max(int{kMinSpins}, tries * 2));
+      }
+
+      if (prevThresh == 0) {
+        // bootstrap
+        spinCutoff = target;
+      } else {
+        // try once, keep moving if CAS fails.  Exponential moving average
+        // with alpha of 7/8
+        spinCutoff.compare_exchange_weak(
+            prevThresh, prevThresh + (target - prevThresh) / 8);
+      }
+    }
+  }
+
+  /// Unblocks a thread running waitForTurn(turn + 1)
+  void completeTurn(const uint32_t turn) noexcept {
+    uint32_t state = state_.load(std::memory_order_acquire);
+    while (true) {
+      assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
+      uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
+      uint32_t new_state = encode(
+              (turn + 1) << kTurnShift,
+              max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
+      if (state_.compare_exchange_strong(state, new_state)) {
+        if (max_waiter_delta != 0) {
+          state_.futexWake(std::numeric_limits<int>::max(),
+                           futexChannel(turn + 1));
+        }
+        break;
+      }
+      // failing compare_exchange_strong updates first arg to the value
+      // that caused the failure, so no need to reread state_
+    }
+  }
+
+  /// Returns the least-most significant byte of the current uncompleted
+  /// turn.  The full 32 bit turn cannot be recovered.
+  uint8_t uncompletedTurnLSB() const noexcept {
+    return state_.load(std::memory_order_acquire) >> kTurnShift;
+  }
+
+ private:
+  enum : uint32_t {
+    /// kTurnShift counts the bits that are stolen to record the delta
+    /// between the current turn and the maximum waiter. It needs to be big
+    /// enough to record wait deltas of 0 to 32 inclusive.  Waiters more
+    /// than 32 in the future will be woken up 32*n turns early (since
+    /// their BITSET will hit) and will adjust the waiter count again.
+    /// We go a bit beyond and let the waiter count go up to 63, which
+    /// is free and might save us a few CAS
+    kTurnShift = 6,
+    kWaitersMask = (1 << kTurnShift) - 1,
+
+    /// The minimum spin count that we will adaptively select
+    kMinSpins = 20,
+
+    /// The maximum spin count that we will adaptively select, and the
+    /// spin count that will be used when probing to get a new data point
+    /// for the adaptation
+    kMaxSpins = 2000,
+  };
+
+  /// This holds both the current turn, and the highest waiting turn,
+  /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn))
+  Futex<Atom> state_;
+
+  /// Returns the bitmask to pass futexWait or futexWake when communicating
+  /// about the specified turn
+  int futexChannel(uint32_t turn) const noexcept {
+    return 1 << (turn & 31);
+  }
+
+  uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
+    return state & ~kWaitersMask;
+  }
+
+  uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept {
+    return state & kWaitersMask;
+  }
+
+  uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
+    return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD);
+  }
+};
+
+
+/// SingleElementQueue implements a blocking queue that holds at most one
+/// item, and that requires its users to assign incrementing identifiers
+/// (turns) to each enqueue and dequeue operation.  Note that the turns
+/// used by SingleElementQueue are doubled inside the TurnSequencer
+template <typename T, template <typename> class Atom>
+struct SingleElementQueue {
+
+  ~SingleElementQueue() noexcept {
+    if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
+      // we are pending a dequeue, so we have a constructed item
+      destroyContents();
+    }
+  }
+
+  /// enqueue using in-place noexcept construction
+  template <typename ...Args,
+            typename = typename std::enable_if<
+                std::is_nothrow_constructible<T,Args...>::value>::type>
+  void enqueue(const uint32_t turn,
+               Atom<int>& spinCutoff,
+               const bool updateSpinCutoff,
+               Args&&... args) noexcept {
+    sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
+    new (contents_) T(std::forward<Args>(args)...);
+    sequencer_.completeTurn(turn * 2);
+  }
+
+  /// enqueue using move construction, either real (if
+  /// is_nothrow_move_constructible) or simulated using relocation and
+  /// default construction (if IsRelocatable and has_nothrow_constructor)
+  template <typename = typename std::enable_if<
+                (folly::IsRelocatable<T>::value &&
+                 boost::has_nothrow_constructor<T>::value) ||
+                std::is_nothrow_constructible<T,T&&>::value>::type>
+  void enqueue(const uint32_t turn,
+               Atom<int>& spinCutoff,
+               const bool updateSpinCutoff,
+               T&& goner) noexcept {
+    if (std::is_nothrow_constructible<T,T&&>::value) {
+      // this is preferred
+      sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
+      new (contents_) T(std::move(goner));
+      sequencer_.completeTurn(turn * 2);
+    } else {
+      // simulate nothrow move with relocation, followed by default
+      // construction to fill the gap we created
+      sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
+      memcpy(contents_, &goner, sizeof(T));
+      sequencer_.completeTurn(turn * 2);
+      new (&goner) T();
+    }
+  }
+
+  bool mayEnqueue(const uint32_t turn) const noexcept {
+    return sequencer_.isTurn(turn * 2);
+  }
+
+  void dequeue(uint32_t turn,
+               Atom<int>& spinCutoff,
+               const bool updateSpinCutoff,
+               T& elem) noexcept {
+    if (folly::IsRelocatable<T>::value) {
+      // this version is preferred, because we do as much work as possible
+      // before waiting
+      try {
+        elem.~T();
+      } catch (...) {
+        // unlikely, but if we don't complete our turn the queue will die
+      }
+      sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
+      memcpy(&elem, contents_, sizeof(T));
+      sequencer_.completeTurn(turn * 2 + 1);
+    } else {
+      // use nothrow move assignment
+      sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
+      elem = std::move(*ptr());
+      destroyContents();
+      sequencer_.completeTurn(turn * 2 + 1);
+    }
+  }
+
+  bool mayDequeue(const uint32_t turn) const noexcept {
+    return sequencer_.isTurn(turn * 2 + 1);
+  }
+
+ private:
+  /// Storage for a T constructed with placement new
+  char contents_[sizeof(T)] __attribute__((aligned(alignof(T))));
+
+  /// Even turns are pushes, odd turns are pops
+  TurnSequencer<Atom> sequencer_;
+
+  T* ptr() noexcept {
+    return static_cast<T*>(static_cast<void*>(contents_));
+  }
+
+  void destroyContents() noexcept {
+    try {
+      ptr()->~T();
+    } catch (...) {
+      // g++ doesn't seem to have std::is_nothrow_destructible yet
+    }
+#ifndef NDEBUG
+    memset(contents_, 'Q', sizeof(T));
+#endif
+  }
+};
+
+} // namespace detail
+
+} // namespace folly
diff --git a/folly/detail/Futex.h b/folly/detail/Futex.h
new file mode 100644 (file)
index 0000000..93544bc
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <limits>
+#include <assert.h>
+#include <errno.h>
+#include <linux/futex.h>
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <boost/noncopyable.hpp>
+
+namespace folly { namespace detail {
+
+/**
+ * Futex is an atomic 32 bit unsigned integer that provides access to the
+ * futex() syscall on that value.  It is templated in such a way that it
+ * can interact properly with DeterministicSchedule testing.
+ *
+ * If you don't know how to use futex(), you probably shouldn't be using
+ * this class.  Even if you do know how, you should have a good reason
+ * (and benchmarks to back you up).
+ */
+template <template <typename> class Atom = std::atomic>
+struct Futex : Atom<uint32_t>, boost::noncopyable {
+
+  explicit Futex(uint32_t init = 0) : Atom<uint32_t>(init) {}
+
+  /** Puts the thread to sleep if this->load() == expected.  Returns true when
+   *  it is returning because it has consumed a wake() event, false for any
+   *  other return (signal, this->load() != expected, or spurious wakeup). */
+  bool futexWait(uint32_t expected, uint32_t waitMask = -1);
+
+  /** Wakens up to count waiters where (waitMask & wakeMask) != 0,
+   *  returning the number of awoken threads. */
+  int futexWake(int count = std::numeric_limits<int>::max(),
+                uint32_t wakeMask = -1);
+};
+
+template <>
+inline bool Futex<std::atomic>::futexWait(uint32_t expected,
+                                          uint32_t waitMask) {
+  assert(sizeof(*this) == sizeof(int));
+  int rv = syscall(SYS_futex,
+                   this, /* addr1 */
+                   FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, /* op */
+                   expected, /* val */
+                   nullptr, /* timeout */
+                   nullptr, /* addr2 */
+                   waitMask); /* val3 */
+  assert(rv == 0 || (errno == EWOULDBLOCK || errno == EINTR));
+  return rv == 0;
+}
+
+template <>
+inline int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
+  assert(sizeof(*this) == sizeof(int));
+  int rv = syscall(SYS_futex,
+                   this, /* addr1 */
+                   FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
+                   count, /* val */
+                   nullptr, /* timeout */
+                   nullptr, /* addr2 */
+                   wakeMask); /* val3 */
+  assert(rv >= 0);
+  return rv;
+}
+
+}}
diff --git a/folly/test/DeterministicSchedule.cpp b/folly/test/DeterministicSchedule.cpp
new file mode 100644 (file)
index 0000000..7f77262
--- /dev/null
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeterministicSchedule.h"
+#include <algorithm>
+#include <list>
+#include <mutex>
+#include <random>
+#include <utility>
+#include <unordered_map>
+#include <assert.h>
+
+namespace folly { namespace test {
+
+__thread sem_t* DeterministicSchedule::tls_sem;
+__thread DeterministicSchedule* DeterministicSchedule::tls_sched;
+
+// access is protected by futexLock
+static std::unordered_map<detail::Futex<DeterministicAtomic>*,
+                          std::list<std::pair<uint32_t,bool*>>> futexQueues;
+
+static std::mutex futexLock;
+
+DeterministicSchedule::DeterministicSchedule(
+        const std::function<int(int)>& scheduler)
+  : scheduler_(scheduler)
+{
+  assert(tls_sem == nullptr);
+  assert(tls_sched == nullptr);
+
+  tls_sem = new sem_t;
+  sem_init(tls_sem, 0, 1);
+  sems_.push_back(tls_sem);
+
+  tls_sched = this;
+}
+
+DeterministicSchedule::~DeterministicSchedule() {
+  assert(tls_sched == this);
+  assert(sems_.size() == 1);
+  assert(sems_[0] == tls_sem);
+  beforeThreadExit();
+}
+
+std::function<int(int)>
+DeterministicSchedule::uniform(long seed) {
+  auto rand = std::make_shared<std::ranlux48>(seed);
+  return [rand](int numActive) {
+    auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
+    return dist(*rand);
+  };
+}
+
+struct UniformSubset {
+  UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
+    : uniform_(DeterministicSchedule::uniform(seed))
+    , subsetSize_(subsetSize)
+    , stepsBetweenSelect_(stepsBetweenSelect)
+    , stepsLeft_(0)
+  {
+  }
+
+  int operator()(int numActive) {
+    adjustPermSize(numActive);
+    if (stepsLeft_-- == 0) {
+      stepsLeft_ = stepsBetweenSelect_ - 1;
+      shufflePrefix();
+    }
+    return perm_[uniform_(std::min(numActive, subsetSize_))];
+  }
+
+ private:
+  std::function<int(int)> uniform_;
+  const int subsetSize_;
+  const int stepsBetweenSelect_;
+
+  int stepsLeft_;
+  // only the first subsetSize_ is properly randomized
+  std::vector<int> perm_;
+
+  void adjustPermSize(int numActive) {
+    if (perm_.size() > numActive) {
+      perm_.erase(std::remove_if(perm_.begin(), perm_.end(),
+              [=](int x){ return x >= numActive; }), perm_.end());
+    } else {
+      while (perm_.size() < numActive) {
+        perm_.push_back(perm_.size());
+      }
+    }
+    assert(perm_.size() == numActive);
+  }
+
+  void shufflePrefix() {
+    for (int i = 0; i < std::min(int(perm_.size() - 1), subsetSize_); ++i) {
+      int j = uniform_(perm_.size() - i) + i;
+      std::swap(perm_[i], perm_[j]);
+    }
+  }
+};
+
+std::function<int(int)>
+DeterministicSchedule::uniformSubset(long seed, int n, int m) {
+  auto gen = std::make_shared<UniformSubset>(seed, n, m);
+  return [=](int numActive) { return (*gen)(numActive); };
+}
+
+void
+DeterministicSchedule::beforeSharedAccess() {
+  if (tls_sem) {
+    sem_wait(tls_sem);
+  }
+}
+
+void
+DeterministicSchedule::afterSharedAccess() {
+  auto sched = tls_sched;
+  if (!sched) {
+    return;
+  }
+
+  sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
+}
+
+sem_t*
+DeterministicSchedule::beforeThreadCreate() {
+  sem_t* s = new sem_t;
+  sem_init(s, 0, 0);
+  beforeSharedAccess();
+  sems_.push_back(s);
+  afterSharedAccess();
+  return s;
+}
+
+void
+DeterministicSchedule::afterThreadCreate(sem_t* sem) {
+  assert(tls_sem == nullptr);
+  assert(tls_sched == nullptr);
+  tls_sem = sem;
+  tls_sched = this;
+  bool started = false;
+  while (!started) {
+    beforeSharedAccess();
+    if (active_.count(std::this_thread::get_id()) == 1) {
+      started = true;
+    }
+    afterSharedAccess();
+  }
+}
+
+void
+DeterministicSchedule::beforeThreadExit() {
+  assert(tls_sched == this);
+  beforeSharedAccess();
+  sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
+  active_.erase(std::this_thread::get_id());
+  if (sems_.size() > 0) {
+    afterSharedAccess();
+  }
+  sem_destroy(tls_sem);
+  delete tls_sem;
+  tls_sem = nullptr;
+  tls_sched = nullptr;
+}
+
+void
+DeterministicSchedule::join(std::thread& child) {
+  auto sched = tls_sched;
+  if (sched) {
+    bool done = false;
+    while (!done) {
+      beforeSharedAccess();
+      done = !sched->active_.count(child.get_id());
+      afterSharedAccess();
+    }
+  }
+  child.join();
+}
+
+void
+DeterministicSchedule::post(sem_t* sem) {
+  beforeSharedAccess();
+  sem_post(sem);
+  afterSharedAccess();
+}
+
+bool
+DeterministicSchedule::tryWait(sem_t* sem) {
+  beforeSharedAccess();
+  int rv = sem_trywait(sem);
+  afterSharedAccess();
+  if (rv == 0) {
+    return true;
+  } else {
+    assert(errno == EAGAIN);
+    return false;
+  }
+}
+
+void
+DeterministicSchedule::wait(sem_t* sem) {
+  while (!tryWait(sem)) {
+    // we're not busy waiting because this is a deterministic schedule
+  }
+}
+
+}}
+
+namespace folly { namespace detail {
+
+using namespace test;
+
+template<>
+bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
+                                           uint32_t waitMask) {
+  bool rv;
+  DeterministicSchedule::beforeSharedAccess();
+  futexLock.lock();
+  if (data != expected) {
+    rv = false;
+  } else {
+    auto& queue = futexQueues[this];
+    bool done = false;
+    queue.push_back(std::make_pair(waitMask, &done));
+    while (!done) {
+      futexLock.unlock();
+      DeterministicSchedule::afterSharedAccess();
+      DeterministicSchedule::beforeSharedAccess();
+      futexLock.lock();
+    }
+    rv = true;
+  }
+  futexLock.unlock();
+  DeterministicSchedule::afterSharedAccess();
+  return rv;
+}
+
+template<>
+int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
+  int rv = 0;
+  DeterministicSchedule::beforeSharedAccess();
+  futexLock.lock();
+  if (futexQueues.count(this) > 0) {
+    auto& queue = futexQueues[this];
+    auto iter = queue.begin();
+    while (iter != queue.end() && rv < count) {
+      auto cur = iter++;
+      if ((cur->first & wakeMask) != 0) {
+        *(cur->second) = true;
+        rv++;
+        queue.erase(cur);
+      }
+    }
+    if (queue.empty()) {
+      futexQueues.erase(this);
+    }
+  }
+  futexLock.unlock();
+  DeterministicSchedule::afterSharedAccess();
+  return rv;
+}
+
+}}
diff --git a/folly/test/DeterministicSchedule.h b/folly/test/DeterministicSchedule.h
new file mode 100644 (file)
index 0000000..3e7ccb3
--- /dev/null
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <functional>
+#include <thread>
+#include <unordered_set>
+#include <vector>
+#include <boost/noncopyable.hpp>
+#include <semaphore.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <folly/ScopeGuard.h>
+#include <folly/detail/Futex.h>
+
+namespace folly { namespace test {
+
+/**
+ * DeterministicSchedule coordinates the inter-thread communication of a
+ * set of threads under test, so that despite concurrency the execution is
+ * the same every time.  It works by stashing a reference to the schedule
+ * in a thread-local variable, then blocking all but one thread at a time.
+ *
+ * In order for DeterministicSchedule to work, it needs to intercept
+ * all inter-thread communication.  To do this you should use
+ * DeterministicAtomic<T> instead of std::atomic<T>, create threads
+ * using DeterministicSchedule::thread() instead of the std::thread
+ * constructor, DeterministicSchedule::join(thr) instead of thr.join(),
+ * and access semaphores via the helper functions in DeterministicSchedule.
+ * Locks are not yet supported, although they would be easy to add with
+ * the same strategy as the mapping of sem_wait.
+ *
+ * The actual schedule is defined by a function from n -> [0,n). At
+ * each step, the function will be given the number of active threads
+ * (n), and it returns the index of the thread that should be run next.
+ * Invocations of the scheduler function will be serialized, but will
+ * occur from multiple threads.  A good starting schedule is uniform(0).
+ */
+class DeterministicSchedule : boost::noncopyable {
+ public:
+  /**
+   * Arranges for the current thread (and all threads created by
+   * DeterministicSchedule::thread on a thread participating in this
+   * schedule) to participate in a deterministic schedule.
+   */
+  explicit DeterministicSchedule(const std::function<int(int)>& scheduler);
+
+  /** Completes the schedule. */
+  ~DeterministicSchedule();
+
+  /**
+   * Returns a scheduling function that randomly chooses one of the
+   * runnable threads at each step, with no history.  This implements
+   * a schedule that is equivalent to one in which the steps between
+   * inter-thread communication are random variables following a poisson
+   * distribution.
+   */
+  static std::function<int(int)> uniform(long seed);
+
+  /**
+   * Returns a scheduling function that chooses a subset of the active
+   * threads and randomly chooses a member of the subset as the next
+   * runnable thread.  The subset is chosen with size n, and the choice
+   * is made every m steps.
+   */
+  static std::function<int(int)> uniformSubset(long seed, int n = 2,
+                                               int m = 64);
+
+  /** Obtains permission for the current thread to perform inter-thread
+   *  communication. */
+  static void beforeSharedAccess();
+
+  /** Releases permission for the current thread to perform inter-thread
+   *  communication. */
+  static void afterSharedAccess();
+
+  /** Launches a thread that will participate in the same deterministic
+   *  schedule as the current thread. */
+  template <typename Func, typename... Args>
+  static inline std::thread thread(Func&& func, Args&&... args) {
+    // TODO: maybe future versions of gcc will allow forwarding to thread
+    auto sched = tls_sched;
+    auto sem = sched ? sched->beforeThreadCreate() : nullptr;
+    auto child = std::thread([=](Args... a) {
+      if (sched) sched->afterThreadCreate(sem);
+      SCOPE_EXIT { if (sched) sched->beforeThreadExit(); };
+      func(a...);
+    }, args...);
+    if (sched) {
+      beforeSharedAccess();
+      sched->active_.insert(child.get_id());
+      afterSharedAccess();
+    }
+    return child;
+  }
+
+  /** Calls child.join() as part of a deterministic schedule. */
+  static void join(std::thread& child);
+
+  /** Calls sem_post(sem) as part of a deterministic schedule. */
+  static void post(sem_t* sem);
+
+  /** Calls sem_trywait(sem) as part of a deterministic schedule, returning
+   *  true on success and false on transient failure. */
+  static bool tryWait(sem_t* sem);
+
+  /** Calls sem_wait(sem) as part of a deterministic schedule. */
+  static void wait(sem_t* sem);
+
+ private:
+  static __thread sem_t* tls_sem;
+  static __thread DeterministicSchedule* tls_sched;
+
+  std::function<int(int)> scheduler_;
+  std::vector<sem_t*> sems_;
+  std::unordered_set<std::thread::id> active_;
+
+  sem_t* beforeThreadCreate();
+  void afterThreadCreate(sem_t*);
+  void beforeThreadExit();
+};
+
+
+/**
+ * DeterministicAtomic<T> is a drop-in replacement std::atomic<T> that
+ * cooperates with DeterministicSchedule.
+ */
+template <typename T>
+struct DeterministicAtomic {
+  std::atomic<T> data;
+
+  DeterministicAtomic() = default;
+  ~DeterministicAtomic() = default;
+  DeterministicAtomic(DeterministicAtomic<T> const &) = delete;
+  DeterministicAtomic<T>& operator= (DeterministicAtomic<T> const &) = delete;
+
+  constexpr /* implicit */ DeterministicAtomic(T v) noexcept : data(v) {}
+
+  bool is_lock_free() const noexcept {
+    return data.is_lock_free();
+  }
+
+  bool compare_exchange_strong(
+          T& v0, T v1,
+          std::memory_order mo = std::memory_order_seq_cst) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    bool rv = data.compare_exchange_strong(v0, v1, mo);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  bool compare_exchange_weak(
+          T& v0, T v1,
+          std::memory_order mo = std::memory_order_seq_cst) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    bool rv = data.compare_exchange_weak(v0, v1, mo);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = data.exchange(v, mo);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  /* implicit */ operator T () const noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = data;
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = data.load(mo);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator= (T v) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = (data = v);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    data.store(v, mo);
+    DeterministicSchedule::afterSharedAccess();
+  }
+
+  T operator++ () noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = ++data;
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator++ (int postDummy) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = data++;
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator-- () noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = --data;
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator-- (int postDummy) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = data--;
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator+= (T v) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = (data += v);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator-= (T v) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = (data -= v);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator&= (T v) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = (data &= v);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+
+  T operator|= (T v) noexcept {
+    DeterministicSchedule::beforeSharedAccess();
+    T rv = (data |= v);
+    DeterministicSchedule::afterSharedAccess();
+    return rv;
+  }
+};
+
+}}
+
+namespace folly { namespace detail {
+
+template<>
+bool Futex<test::DeterministicAtomic>::futexWait(uint32_t expected,
+                                                 uint32_t waitMask);
+
+template<>
+int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
+
+}}
diff --git a/folly/test/DeterministicScheduleTest.cpp b/folly/test/DeterministicScheduleTest.cpp
new file mode 100644 (file)
index 0000000..43e02a8
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeterministicSchedule.h"
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+using namespace folly::test;
+
+TEST(DeterministicSchedule, uniform) {
+  auto p = DeterministicSchedule::uniform(0);
+  int buckets[10] = {};
+  for (int i = 0; i < 100000; ++i) {
+    buckets[p(10)]++;
+  }
+  for (int i = 0; i < 10; ++i) {
+    EXPECT_TRUE(buckets[i] > 9000);
+  }
+}
+
+TEST(DeterministicSchedule, uniformSubset) {
+  auto ps = DeterministicSchedule::uniformSubset(0, 3, 100);
+  int buckets[10] = {};
+  std::set<int> seen;
+  for (int i = 0; i < 100000; ++i) {
+    if (i > 0 && (i % 100) == 0) {
+      EXPECT_EQ(seen.size(), 3);
+      seen.clear();
+    }
+    int x = ps(10);
+    seen.insert(x);
+    EXPECT_TRUE(seen.size() <= 3);
+    buckets[x]++;
+  }
+  for (int i = 0; i < 10; ++i) {
+    EXPECT_TRUE(buckets[i] > 9000);
+  }
+}
+
+int main(int argc, char** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}
diff --git a/folly/test/FutexTest.cpp b/folly/test/FutexTest.cpp
new file mode 100644 (file)
index 0000000..5b2b1b8
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/detail/Futex.h"
+#include "folly/test/DeterministicSchedule.h"
+
+#include <thread>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+using namespace folly::detail;
+using namespace folly::test;
+
+typedef DeterministicSchedule DSched;
+
+template <template<typename> class Atom>
+void run_basic_tests() {
+  Futex<Atom> f(0);
+
+  EXPECT_FALSE(f.futexWait(1));
+  EXPECT_EQ(f.futexWake(), 0);
+
+  auto thr = DSched::thread([&]{
+    EXPECT_TRUE(f.futexWait(0));
+  });
+
+  while (f.futexWake() != 1) {
+    std::this_thread::yield();
+  }
+
+  DSched::join(thr);
+}
+
+
+TEST(Futex, basic_live) {
+  run_basic_tests<std::atomic>();
+}
+
+TEST(Futex, basic_deterministic) {
+  DSched sched(DSched::uniform(0));
+  run_basic_tests<DeterministicAtomic>();
+}
+
+int main(int argc, char ** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}
+
diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp
new file mode 100644 (file)
index 0000000..330b0fb
--- /dev/null
@@ -0,0 +1,639 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/MPMCQueue.h>
+#include <folly/Format.h>
+#include <folly/Memory.h>
+#include <folly/test/DeterministicSchedule.h>
+
+#include <boost/intrusive_ptr.hpp>
+#include <memory>
+#include <functional>
+#include <thread>
+#include <utility>
+#include <unistd.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
+
+using namespace folly;
+using namespace detail;
+using namespace test;
+
+typedef DeterministicSchedule DSched;
+
+
+template <template<typename> class Atom>
+void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
+  TurnSequencer<Atom> seq(init);
+  Atom<int> spinThreshold(0);
+
+  int prev = -1;
+  std::vector<std::thread> threads(numThreads);
+  for (int i = 0; i < numThreads; ++i) {
+    threads[i] = DSched::thread([&, i]{
+      for (int op = i; op < numOps; op += numThreads) {
+        seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
+        EXPECT_EQ(prev, op - 1);
+        prev = op;
+        seq.completeTurn(init + op);
+      }
+    });
+  }
+
+  for (auto& thr : threads) {
+    DSched::join(thr);
+  }
+
+  EXPECT_EQ(prev, numOps - 1);
+}
+
+TEST(MPMCQueue, sequencer) {
+  run_mt_sequencer_test<std::atomic>(1, 100, 0);
+  run_mt_sequencer_test<std::atomic>(2, 100000, -100);
+  run_mt_sequencer_test<std::atomic>(100, 10000, -100);
+}
+
+TEST(MPMCQueue, sequencer_deterministic) {
+  DSched sched(DSched::uniform(0));
+  run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
+  run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
+  run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
+}
+
+template <typename T>
+void runElementTypeTest(T&& src) {
+  MPMCQueue<T> cq(10);
+  cq.blockingWrite(std::move(src));
+  T dest;
+  cq.blockingRead(dest);
+  EXPECT_TRUE(cq.write(std::move(dest)));
+  EXPECT_TRUE(cq.read(dest));
+}
+
+struct RefCounted {
+  mutable std::atomic<int> rc;
+
+  RefCounted() : rc(0) {}
+};
+
+void intrusive_ptr_add_ref(RefCounted const* p) {
+  p->rc++;
+}
+
+void intrusive_ptr_release(RefCounted const* p) {
+  if (--(p->rc)) {
+    delete p;
+  }
+}
+
+TEST(MPMCQueue, lots_of_element_types) {
+  runElementTypeTest(10);
+  runElementTypeTest(std::string("abc"));
+  runElementTypeTest(std::make_pair(10, std::string("def")));
+  runElementTypeTest(std::vector<std::string>{ { "abc" } });
+  runElementTypeTest(std::make_shared<char>('a'));
+  runElementTypeTest(folly::make_unique<char>('a'));
+  runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
+}
+
+TEST(MPMCQueue, single_thread_enqdeq) {
+  MPMCQueue<int> cq(10);
+
+  for (int pass = 0; pass < 10; ++pass) {
+    for (int i = 0; i < 10; ++i) {
+      EXPECT_TRUE(cq.write(i));
+    }
+    EXPECT_FALSE(cq.write(-1));
+    EXPECT_FALSE(cq.isEmpty());
+    EXPECT_EQ(cq.size(), 10);
+
+    for (int i = 0; i < 5; ++i) {
+      int dest = -1;
+      EXPECT_TRUE(cq.read(dest));
+      EXPECT_EQ(dest, i);
+    }
+    for (int i = 5; i < 10; ++i) {
+      int dest = -1;
+      cq.blockingRead(dest);
+      EXPECT_EQ(dest, i);
+    }
+    int dest = -1;
+    EXPECT_FALSE(cq.read(dest));
+    EXPECT_EQ(dest, -1);
+
+    EXPECT_TRUE(cq.isEmpty());
+    EXPECT_EQ(cq.size(), 0);
+  }
+}
+
+TEST(MPMCQueue, tryenq_capacity_test) {
+  for (size_t cap = 1; cap < 100; ++cap) {
+    MPMCQueue<int> cq(cap);
+    for (int i = 0; i < cap; ++i) {
+      EXPECT_TRUE(cq.write(i));
+    }
+    EXPECT_FALSE(cq.write(100));
+  }
+}
+
+TEST(MPMCQueue, enq_capacity_test) {
+  for (auto cap : { 1, 100, 10000 }) {
+    MPMCQueue<int> cq(cap);
+    for (int i = 0; i < cap; ++i) {
+      cq.blockingWrite(i);
+    }
+    int t = 0;
+    int when;
+    auto thr = std::thread([&]{
+      cq.blockingWrite(100);
+      when = t;
+    });
+    usleep(2000);
+    t = 1;
+    int dummy;
+    cq.blockingRead(dummy);
+    thr.join();
+    EXPECT_EQ(when, 1);
+  }
+}
+
+template <template<typename> class Atom>
+void runTryEnqDeqTest(int numThreads, int numOps) {
+  // write and read aren't linearizable, so we don't have
+  // hard guarantees on their individual behavior.  We can still test
+  // correctness in aggregate
+  MPMCQueue<int,Atom> cq(numThreads);
+
+  uint64_t n = numOps;
+  std::vector<std::thread> threads(numThreads);
+  std::atomic<uint64_t> sum(0);
+  for (int t = 0; t < numThreads; ++t) {
+    threads[t] = DSched::thread([&,t]{
+      uint64_t threadSum = 0;
+      int src = t;
+      // received doesn't reflect any actual values, we just start with
+      // t and increment by numThreads to get the rounding of termination
+      // correct if numThreads doesn't evenly divide numOps
+      int received = t;
+      while (src < n || received < n) {
+        if (src < n && cq.write(src)) {
+          src += numThreads;
+        }
+
+        int dst;
+        if (received < n && cq.read(dst)) {
+          received += numThreads;
+          threadSum += dst;
+        }
+      }
+      sum += threadSum;
+    });
+  }
+  for (auto& t : threads) {
+    DSched::join(t);
+  }
+  EXPECT_TRUE(cq.isEmpty());
+  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+}
+
+TEST(MPMCQueue, mt_try_enq_deq) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<std::atomic>(nt, n);
+  }
+}
+
+TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
+  int nts[] = { 3, 10 };
+
+  long seed = 0;
+  LOG(INFO) << "using seed " << seed;
+
+  int n = 1000;
+  for (int nt : nts) {
+    {
+      DSched sched(DSched::uniform(seed));
+      runTryEnqDeqTest<DeterministicAtomic>(nt, n);
+    }
+    {
+      DSched sched(DSched::uniformSubset(seed, 2));
+      runTryEnqDeqTest<DeterministicAtomic>(nt, n);
+    }
+  }
+}
+
+uint64_t nowMicro() {
+  timeval tv;
+  gettimeofday(&tv, 0);
+  return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
+}
+
+template <typename Q>
+std::string producerConsumerBench(Q&& queue, std::string qName,
+                                  int numProducers, int numConsumers,
+                                  int numOps, bool ignoreContents = false) {
+  Q& q = queue;
+
+  struct rusage beginUsage;
+  getrusage(RUSAGE_SELF, &beginUsage);
+
+  auto beginMicro = nowMicro();
+
+  uint64_t n = numOps;
+  std::atomic<uint64_t> sum(0);
+
+  std::vector<std::thread> producers(numProducers);
+  for (int t = 0; t < numProducers; ++t) {
+    producers[t] = DSched::thread([&,t]{
+      for (int i = t; i < numOps; i += numProducers) {
+        q.blockingWrite(i);
+      }
+    });
+  }
+
+  std::vector<std::thread> consumers(numConsumers);
+  for (int t = 0; t < numConsumers; ++t) {
+    consumers[t] = DSched::thread([&,t]{
+      uint64_t localSum = 0;
+      for (int i = t; i < numOps; i += numConsumers) {
+        int dest = -1;
+        q.blockingRead(dest);
+        EXPECT_FALSE(dest == -1);
+        localSum += dest;
+      }
+      sum += localSum;
+    });
+  }
+
+  for (auto& t : producers) {
+    DSched::join(t);
+  }
+  for (auto& t : consumers) {
+    DSched::join(t);
+  }
+  if (!ignoreContents) {
+    EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+  }
+
+  auto endMicro = nowMicro();
+
+  struct rusage endUsage;
+  getrusage(RUSAGE_SELF, &endUsage);
+
+  uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
+  long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
+      (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
+
+  return folly::format(
+      "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
+      qName, numProducers, numConsumers, nanosPer, csw, n).str();
+}
+
+
+TEST(MPMCQueue, mt_prod_cons_deterministic) {
+  // we use the Bench method, but perf results are meaningless under DSched
+  DSched sched(DSched::uniform(0));
+
+  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
+          "", 1, 1, 1000);
+  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
+          "", 10, 10, 1000);
+  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
+          "", 1, 1, 1000);
+  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
+          "", 10, 10, 1000);
+  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
+          "", 10, 10, 1000);
+}
+
+#define PC_BENCH(q, np, nc, nops...) \
+    producerConsumerBench(q, #q, (np), (nc), nops)
+
+TEST(MPMCQueue, mt_prod_cons) {
+  int n = 100000;
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
+  LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
+}
+
+template <template<typename> class Atom>
+uint64_t runNeverFailTest(int numThreads, int numOps) {
+  // always #enq >= #deq
+  MPMCQueue<int,Atom> cq(numThreads);
+
+  uint64_t n = numOps;
+  auto beginMicro = nowMicro();
+
+  std::vector<std::thread> threads(numThreads);
+  std::atomic<uint64_t> sum(0);
+  for (int t = 0; t < numThreads; ++t) {
+    threads[t] = DSched::thread([&,t]{
+      uint64_t threadSum = 0;
+      for (int i = t; i < n; i += numThreads) {
+        // enq + deq
+        EXPECT_TRUE(cq.writeIfNotFull(i));
+
+        int dest = -1;
+        EXPECT_TRUE(cq.readIfNotEmpty(dest));
+        EXPECT_TRUE(dest >= 0);
+        threadSum += dest;
+      }
+      sum += threadSum;
+    });
+  }
+  for (auto& t : threads) {
+    DSched::join(t);
+  }
+  EXPECT_TRUE(cq.isEmpty());
+  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+
+  return nowMicro() - beginMicro;
+}
+
+TEST(MPMCQueue, mt_never_fail) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
+              << nt << " threads";
+  }
+}
+
+TEST(MPMCQueue, mt_never_fail_deterministic) {
+  int nts[] = { 3, 10 };
+
+  long seed = 0; // nowMicro() % 10000;
+  LOG(INFO) << "using seed " << seed;
+
+  int n = 1000;
+  for (int nt : nts) {
+    {
+      DSched sched(DSched::uniform(seed));
+      runNeverFailTest<DeterministicAtomic>(nt, n);
+    }
+    {
+      DSched sched(DSched::uniformSubset(seed, 2));
+      runNeverFailTest<DeterministicAtomic>(nt, n);
+    }
+  }
+}
+
+enum LifecycleEvent {
+  NOTHING = -1,
+  DEFAULT_CONSTRUCTOR,
+  COPY_CONSTRUCTOR,
+  MOVE_CONSTRUCTOR,
+  TWO_ARG_CONSTRUCTOR,
+  COPY_OPERATOR,
+  MOVE_OPERATOR,
+  DESTRUCTOR,
+  MAX_LIFECYCLE_EVENT
+};
+
+static __thread int lc_counts[MAX_LIFECYCLE_EVENT];
+static __thread int lc_prev[MAX_LIFECYCLE_EVENT];
+
+static int lc_outstanding() {
+  return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
+      lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
+      lc_counts[DESTRUCTOR];
+}
+
+static void lc_snap() {
+  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
+    lc_prev[i] = lc_counts[i];
+  }
+}
+
+#define LIFECYCLE_STEP(args...) lc_step(__LINE__, args)
+
+static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
+  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
+    int delta = i == what || i == what2 ? 1 : 0;
+    EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
+        << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
+        << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
+        << ", from line " << lineno;
+  }
+  lc_snap();
+}
+
+template <typename R>
+struct Lifecycle {
+  typedef R IsRelocatable;
+
+  bool constructed;
+
+  Lifecycle() noexcept : constructed(true) {
+    ++lc_counts[DEFAULT_CONSTRUCTOR];
+  }
+
+  explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
+    ++lc_counts[TWO_ARG_CONSTRUCTOR];
+  }
+
+  Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
+    ++lc_counts[COPY_CONSTRUCTOR];
+  }
+
+  Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
+    ++lc_counts[MOVE_CONSTRUCTOR];
+  }
+
+  Lifecycle& operator= (const Lifecycle& rhs) noexcept {
+    ++lc_counts[COPY_OPERATOR];
+    return *this;
+  }
+
+  Lifecycle& operator= (Lifecycle&& rhs) noexcept {
+    ++lc_counts[MOVE_OPERATOR];
+    return *this;
+  }
+
+  ~Lifecycle() noexcept {
+    ++lc_counts[DESTRUCTOR];
+    assert(lc_outstanding() >= 0);
+    assert(constructed);
+    constructed = false;
+  }
+};
+
+template <typename R>
+void runPerfectForwardingTest() {
+  lc_snap();
+  EXPECT_EQ(lc_outstanding(), 0);
+
+  {
+    MPMCQueue<Lifecycle<R>> queue(50);
+    LIFECYCLE_STEP(NOTHING);
+
+    for (int pass = 0; pass < 10; ++pass) {
+      for (int i = 0; i < 10; ++i) {
+        queue.blockingWrite();
+        LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+
+        queue.blockingWrite(1, "one");
+        LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
+
+        {
+          Lifecycle<R> src;
+          LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+          queue.blockingWrite(std::move(src));
+          LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
+        }
+        LIFECYCLE_STEP(DESTRUCTOR);
+
+        {
+          Lifecycle<R> src;
+          LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+          queue.blockingWrite(src);
+          LIFECYCLE_STEP(COPY_CONSTRUCTOR);
+        }
+        LIFECYCLE_STEP(DESTRUCTOR);
+
+        EXPECT_TRUE(queue.write());
+        LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+      }
+
+      EXPECT_EQ(queue.size(), 50);
+      EXPECT_FALSE(queue.write(2, "two"));
+      LIFECYCLE_STEP(NOTHING);
+
+      for (int i = 0; i < 50; ++i) {
+        {
+          Lifecycle<R> node;
+          LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+
+          queue.blockingRead(node);
+          if (R::value) {
+            // relocatable, moved via memcpy
+            LIFECYCLE_STEP(DESTRUCTOR);
+          } else {
+            LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
+          }
+        }
+        LIFECYCLE_STEP(DESTRUCTOR);
+      }
+
+      EXPECT_EQ(queue.size(), 0);
+    }
+
+    // put one element back before destruction
+    {
+      Lifecycle<R> src(3, "three");
+      LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
+      queue.write(std::move(src));
+      LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
+    }
+    LIFECYCLE_STEP(DESTRUCTOR); // destroy src
+  }
+  LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
+
+  EXPECT_EQ(lc_outstanding(), 0);
+}
+
+TEST(MPMCQueue, perfect_forwarding) {
+  runPerfectForwardingTest<std::false_type>();
+}
+
+TEST(MPMCQueue, perfect_forwarding_relocatable) {
+  runPerfectForwardingTest<std::true_type>();
+}
+
+TEST(MPMCQueue, queue_moving) {
+  lc_snap();
+  EXPECT_EQ(lc_outstanding(), 0);
+
+  {
+    MPMCQueue<Lifecycle<std::false_type>> a(50);
+    LIFECYCLE_STEP(NOTHING);
+
+    a.blockingWrite();
+    LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+
+    // move constructor
+    MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
+    LIFECYCLE_STEP(NOTHING);
+    EXPECT_EQ(a.capacity(), 0);
+    EXPECT_EQ(a.size(), 0);
+    EXPECT_EQ(b.capacity(), 50);
+    EXPECT_EQ(b.size(), 1);
+
+    b.blockingWrite();
+    LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+
+    // move operator
+    MPMCQueue<Lifecycle<std::false_type>> c;
+    LIFECYCLE_STEP(NOTHING);
+    c = std::move(b);
+    LIFECYCLE_STEP(NOTHING);
+    EXPECT_EQ(c.capacity(), 50);
+    EXPECT_EQ(c.size(), 2);
+
+    {
+      Lifecycle<std::false_type> dst;
+      LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
+      c.blockingRead(dst);
+      LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
+
+      {
+        // swap
+        MPMCQueue<Lifecycle<std::false_type>> d(10);
+        LIFECYCLE_STEP(NOTHING);
+        std::swap(c, d);
+        LIFECYCLE_STEP(NOTHING);
+        EXPECT_EQ(c.capacity(), 10);
+        EXPECT_TRUE(c.isEmpty());
+        EXPECT_EQ(d.capacity(), 50);
+        EXPECT_EQ(d.size(), 1);
+
+        d.blockingRead(dst);
+        LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
+
+        c.blockingWrite(dst);
+        LIFECYCLE_STEP(COPY_CONSTRUCTOR);
+
+        d.blockingWrite(std::move(dst));
+        LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
+      } // d goes out of scope
+      LIFECYCLE_STEP(DESTRUCTOR);
+    } // dst goes out of scope
+    LIFECYCLE_STEP(DESTRUCTOR);
+  } // c goes out of scope
+  LIFECYCLE_STEP(DESTRUCTOR);
+}
+
+int main(int argc, char ** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}
+