add emulation of futex() for non-Linux systems
authorNathan Bronson <ngbronson@fb.com>
Wed, 1 Oct 2014 22:46:20 +0000 (15:46 -0700)
committerAndrii Grynenko <andrii@fb.com>
Wed, 15 Oct 2014 00:55:33 +0000 (17:55 -0700)
Summary:
Inside the Linux kernel, futex() works by hashing the source
address to a fixed set of wakeup lists.  When using folly on non-Linux
systems we can emulate something similar by using std::mutex and
std::condition_variable.

Emulating futex() using higher-level APIs is less crazy than it sounds,
because the emulated futex still provides the advantages of the real
one: it allows all of the fast-paths of a new synchronization construct
to be inlined; it is space efficient, taking only 1 word and allowing
the caller to encode state into all of the word's bits; and it avoids
system calls unless a thread actually needs to be put to sleep or
woken up.  Think of this as a way of boostrapping something with the
same properties as futex() on platforms that don't expose it publically.
(Presumably these platforms have private APIs that do something similar.)

This diff moves all of the Linux-specific futex stuff into Futex.cpp,
where it is gated by #ifdef __linux__.  It also adds an emulated
implementation.  The emulated futex will be selected by default on
non-Linux platforms, or it can be used on Linux by using an Atom template
type of folly::detail::EmulatedFutexAtomic.  This means, for example,
that you can test MPMCQueue on top of the emulated API by instantiating
a MPMCQueue<ElemType, EmulatedFutexAtomic>.

As a bonus, this refactoring provides a small speed boost by removing
an unnecessary evaluation of the errno macro in the success path of
futexWait.

Test Plan:
1. existing unit tests
2. new unit tests (including tests of Futex users)
3. compile Futex.cpp on OS X (some other build failures still occur)

Reviewed By: delong.j@fb.com

Subscribers: trunkagent, njormrod, yiding, boseant, mssarang

FB internal diff: D1596118

Tasks: 4952724

folly/MPMCQueue.h
folly/detail/CacheLocality.h
folly/detail/Futex.cpp
folly/detail/Futex.h
folly/test/BatonTest.cpp
folly/test/DeterministicSchedule.cpp
folly/test/DeterministicSchedule.h
folly/test/LifoSemTests.cpp
folly/test/MPMCQueueTest.cpp

index 34ed779ac079dcf28cf41cb99736557e97e79347..d1e3b32d23f1e4ed72e6399d471f0986177f7916 100644 (file)
 #include <atomic>
 #include <assert.h>
 #include <boost/noncopyable.hpp>
-#include <errno.h>
 #include <limits>
-#include <linux/futex.h>
 #include <string.h>
-#include <sys/syscall.h>
 #include <type_traits>
 #include <unistd.h>
 
@@ -661,7 +658,7 @@ struct TurnSequencer {
 
       if (prevThresh == 0) {
         // bootstrap
-        spinCutoff = target;
+        spinCutoff.store(target);
       } else {
         // try once, keep moving if CAS fails.  Exponential moving average
         // with alpha of 7/8
index efb4c298aa1f3bc35afffb6db3e0c08ffdbf35ba..107cf7577b449f4dc154b71018dafc67e00a55c4 100644 (file)
@@ -322,6 +322,10 @@ struct AccessSpreader {
   static Getcpu::Func pickGetcpuFunc(size_t numStripes);
 };
 
+template<>
+Getcpu::Func AccessSpreader<std::atomic>::pickGetcpuFunc(size_t);
+
+
 /// An array of kMaxCpus+1 AccessSpreader<Atom> instances constructed
 /// with default params, with the zero-th element having 1 stripe
 template <template<typename> class Atom, size_t kMaxStripe>
index 9baafd2a48a0008e8018feadbf27b4bd88ba6f40..489cec39dbdd2b45b687c5ef0783e492beb99ba8 100644 (file)
  */
 
 #include <folly/detail/Futex.h>
+#include <stdint.h>
+#include <string.h>
+#include <condition_variable>
+#include <mutex>
+#include <boost/intrusive/list.hpp>
+#include <folly/Hash.h>
+#include <folly/ScopeGuard.h>
+
+#ifdef __linux__
+# include <errno.h>
+# include <linux/futex.h>
+# include <sys/syscall.h>
+#endif
+
+using namespace std::chrono;
 
 namespace folly { namespace detail {
 
-/* see Futex.h */
-FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno) {
-  if (returnVal == 0) {
+namespace {
+
+////////////////////////////////////////////////////
+// native implementation using the futex() syscall
+
+#ifdef __linux__
+
+int nativeFutexWake(void* addr, int count, uint32_t wakeMask) {
+  int rv = syscall(SYS_futex,
+                   addr, /* addr1 */
+                   FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
+                   count, /* val */
+                   nullptr, /* timeout */
+                   nullptr, /* addr2 */
+                   wakeMask); /* val3 */
+
+  assert(rv >= 0);
+
+  return rv;
+}
+
+template <class Clock>
+struct timespec
+timeSpecFromTimePoint(time_point<Clock> absTime)
+{
+  auto duration = absTime.time_since_epoch();
+  auto secs = duration_cast<seconds>(duration);
+  auto nanos = duration_cast<nanoseconds>(duration - secs);
+  struct timespec result = { secs.count(), nanos.count() };
+  return result;
+}
+
+FutexResult nativeFutexWaitImpl(void* addr,
+                                uint32_t expected,
+                                time_point<system_clock>* absSystemTime,
+                                time_point<steady_clock>* absSteadyTime,
+                                uint32_t waitMask) {
+  assert(absSystemTime == nullptr || absSteadyTime == nullptr);
+
+  int op = FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG;
+  struct timespec ts;
+  struct timespec* timeout = nullptr;
+
+  if (absSystemTime != nullptr) {
+    op |= FUTEX_CLOCK_REALTIME;
+    ts = timeSpecFromTimePoint(*absSystemTime);
+    timeout = &ts;
+  } else if (absSteadyTime != nullptr) {
+    ts = timeSpecFromTimePoint(*absSteadyTime);
+    timeout = &ts;
+  }
+
+  // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
+  // value - http://locklessinc.com/articles/futex_cheat_sheet/
+  int rv = syscall(SYS_futex,
+                   addr, /* addr1 */
+                   op, /* op */
+                   expected, /* val */
+                   timeout, /* timeout */
+                   nullptr, /* addr2 */
+                   waitMask); /* val3 */
+
+  if (rv == 0) {
     return FutexResult::AWOKEN;
+  } else {
+    switch(errno) {
+      case ETIMEDOUT:
+        assert(timeout != nullptr);
+        return FutexResult::TIMEDOUT;
+      case EINTR:
+        return FutexResult::INTERRUPTED;
+      case EWOULDBLOCK:
+        return FutexResult::VALUE_CHANGED;
+      default:
+        assert(false);
+        // EACCESS, EFAULT, or EINVAL. All of these mean *addr point to
+        // invalid memory (or I misunderstand the API).  We can either
+        // crash, or return a value that lets the process continue for
+        // a bit. We choose the latter. VALUE_CHANGED probably turns the
+        // caller into a spin lock.
+        return FutexResult::VALUE_CHANGED;
+    }
   }
-  switch(futexErrno) {
-    case ETIMEDOUT:
+}
+
+#endif // __linux__
+
+///////////////////////////////////////////////////////
+// compatibility implementation using standard C++ API
+
+struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> {
+  void* const addr_;
+  const uint32_t waitMask_;
+  bool hasTimeout_;
+  bool signaled_;
+  std::condition_variable cond_;
+
+  EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout)
+    : addr_(addr)
+    , waitMask_(waitMask)
+    , hasTimeout_(hasTimeout)
+    , signaled_(false)
+  {
+  }
+};
+
+struct EmulatedFutexBucket {
+  std::mutex mutex_;
+  boost::intrusive::list<EmulatedFutexWaitNode> waiters_;
+
+  static const size_t kNumBuckets = 4096;
+  static EmulatedFutexBucket* gBuckets;
+  static std::once_flag gBucketInit;
+
+  static EmulatedFutexBucket& bucketFor(void* addr) {
+    std::call_once(gBucketInit, [](){
+      gBuckets = new EmulatedFutexBucket[kNumBuckets];
+    });
+    uint64_t mixedBits = folly::hash::twang_mix64(
+        reinterpret_cast<uintptr_t>(addr));
+    return gBuckets[mixedBits % kNumBuckets];
+  }
+};
+
+EmulatedFutexBucket* EmulatedFutexBucket::gBuckets;
+std::once_flag EmulatedFutexBucket::gBucketInit;
+
+int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
+  auto& bucket = EmulatedFutexBucket::bucketFor(addr);
+
+  int numAwoken = 0;
+  boost::intrusive::list<EmulatedFutexWaitNode> deferredWakeups;
+
+  {
+    std::unique_lock<std::mutex> lock(bucket.mutex_);
+
+    for (auto iter = bucket.waiters_.begin();
+         numAwoken < count && iter != bucket.waiters_.end(); ) {
+      auto current = iter;
+      auto& node = *iter++;
+      if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
+        // We unlink, but waiter destroys the node.  We must signal timed
+        // waiters under the lock, to avoid a race where we release the lock,
+        // the waiter times out and deletes the node, and then we try to
+        // signal it.  This problem doesn't exist for unbounded waiters,
+        // so for them we optimize their wakeup by releasing the lock first.
+        bucket.waiters_.erase(current);
+        if (node.hasTimeout_) {
+          node.signaled_ = true;
+          node.cond_.notify_one();
+        } else {
+          deferredWakeups.push_back(node);
+        }
+        ++numAwoken;
+      }
+    }
+  }
+
+  while (!deferredWakeups.empty()) {
+    auto& node = deferredWakeups.front();
+    deferredWakeups.pop_front();
+    node.signaled_ = true;
+    node.cond_.notify_one();
+  }
+
+  return numAwoken;
+}
+
+FutexResult emulatedFutexWaitImpl(
+        void* addr,
+        uint32_t expected,
+        time_point<system_clock>* absSystemTime,
+        time_point<steady_clock>* absSteadyTime,
+        uint32_t waitMask) {
+  bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr;
+  EmulatedFutexWaitNode node(addr, waitMask, hasTimeout);
+
+  auto& bucket = EmulatedFutexBucket::bucketFor(addr);
+  std::unique_lock<std::mutex> lock(bucket.mutex_);
+
+  uint32_t actual;
+  memcpy(&actual, addr, sizeof(uint32_t));
+  if (actual != expected) {
+    return FutexResult::VALUE_CHANGED;
+  }
+
+  bucket.waiters_.push_back(node);
+  while (!node.signaled_) {
+    std::cv_status status = std::cv_status::no_timeout;
+    if (absSystemTime != nullptr) {
+      status = node.cond_.wait_until(lock, *absSystemTime);
+    } else if (absSteadyTime != nullptr) {
+      status = node.cond_.wait_until(lock, *absSteadyTime);
+    } else {
+      node.cond_.wait(lock);
+    }
+
+    if (status == std::cv_status::timeout) {
+      bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
       return FutexResult::TIMEDOUT;
-    case EINTR:
-      return FutexResult::INTERRUPTED;
-    case EWOULDBLOCK:
-      return FutexResult::VALUE_CHANGED;
-    default:
-      assert(false);
-      /* Shouldn't reach here. Just return one of the FutexResults */
-      return FutexResult::VALUE_CHANGED;
+    }
   }
+  return FutexResult::AWOKEN;
+}
+
+} // anon namespace
+
+
+/////////////////////////////////
+// Futex<> specializations
+
+template <>
+int
+Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
+#ifdef __linux__
+  return nativeFutexWake(this, count, wakeMask);
+#else
+  return emulatedFutexWake(this, count, wakeMask);
+#endif
+}
+
+template <>
+int
+Futex<EmulatedFutexAtomic>::futexWake(int count, uint32_t wakeMask) {
+  return emulatedFutexWake(this, count, wakeMask);
+}
+
+template <>
+FutexResult
+Futex<std::atomic>::futexWaitImpl(uint32_t expected,
+                                  time_point<system_clock>* absSystemTime,
+                                  time_point<steady_clock>* absSteadyTime,
+                                  uint32_t waitMask) {
+#ifdef __linux__
+  return nativeFutexWaitImpl(
+      this, expected, absSystemTime, absSteadyTime, waitMask);
+#else
+  return emulatedFutexWaitImpl(
+      this, expected, absSystemTime, absSteadyTime, waitMask);
+#endif
+}
+
+template <>
+FutexResult
+Futex<EmulatedFutexAtomic>::futexWaitImpl(
+        uint32_t expected,
+        time_point<system_clock>* absSystemTime,
+        time_point<steady_clock>* absSteadyTime,
+        uint32_t waitMask) {
+  return emulatedFutexWaitImpl(
+      this, expected, absSystemTime, absSteadyTime, waitMask);
 }
 
-}}
+}} // namespace folly::detail
index c47bc9a597e82f33100912ffe28b3f8a1f6d49b6..fe61d0e66417081fcc45c5a569b3b492acbf0306 100644 (file)
 #include <chrono>
 #include <limits>
 #include <assert.h>
-#include <errno.h>
-#include <linux/futex.h>
-#include <sys/syscall.h>
 #include <unistd.h>
 #include <boost/noncopyable.hpp>
 
-using std::chrono::steady_clock;
-using std::chrono::system_clock;
-using std::chrono::time_point;
-
 namespace folly { namespace detail {
 
 enum class FutexResult {
@@ -39,9 +32,6 @@ enum class FutexResult {
   TIMEDOUT
 };
 
-/* Converts return value and errno from a futex syscall to a FutexResult */
-FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno);
-
 /**
  * Futex is an atomic 32 bit unsigned integer that provides access to the
  * futex() syscall on that value.  It is templated in such a way that it
@@ -59,7 +49,11 @@ struct Futex : Atom<uint32_t>, boost::noncopyable {
   /** Puts the thread to sleep if this->load() == expected.  Returns true when
    *  it is returning because it has consumed a wake() event, false for any
    *  other return (signal, this->load() != expected, or spurious wakeup). */
-  bool futexWait(uint32_t expected, uint32_t waitMask = -1);
+  bool futexWait(uint32_t expected, uint32_t waitMask = -1) {
+    auto rv = futexWaitImpl(expected, nullptr, nullptr, waitMask);
+    assert(rv != FutexResult::TIMEDOUT);
+    return rv == FutexResult::AWOKEN;
+  }
 
   /** Similar to futexWait but also accepts a timeout that gives the time until
    *  when the call can block (time is the absolute time i.e time since epoch).
@@ -69,118 +63,84 @@ struct Futex : Atom<uint32_t>, boost::noncopyable {
    *  NOTE: On some systems steady_clock is just an alias for system_clock,
    *  and is not actually steady.*/
   template <class Clock, class Duration = typename Clock::duration>
-  FutexResult futexWaitUntil(uint32_t expected,
-                             const time_point<Clock, Duration>& absTime,
-                             uint32_t waitMask = -1);
+  FutexResult futexWaitUntil(
+          uint32_t expected,
+          const std::chrono::time_point<Clock, Duration>& absTime,
+          uint32_t waitMask = -1) {
+    using std::chrono::duration_cast;
+    using std::chrono::nanoseconds;
+    using std::chrono::seconds;
+    using std::chrono::steady_clock;
+    using std::chrono::system_clock;
+    using std::chrono::time_point;
+
+    static_assert(
+        (std::is_same<Clock, system_clock>::value ||
+         std::is_same<Clock, steady_clock>::value),
+        "futexWaitUntil only knows std::chrono::{system_clock,steady_clock}");
+    assert((std::is_same<Clock, system_clock>::value) || Clock::is_steady);
+
+    auto duration = absTime.time_since_epoch();
+    if (std::is_same<Clock, system_clock>::value) {
+      time_point<system_clock> absSystemTime(duration);
+      return futexWaitImpl(expected, &absSystemTime, nullptr, waitMask);
+    } else {
+      time_point<steady_clock> absSteadyTime(duration);
+      return futexWaitImpl(expected, nullptr, &absSteadyTime, waitMask);
+    }
+  }
 
   /** Wakens up to count waiters where (waitMask & wakeMask) != 0,
    *  returning the number of awoken threads. */
   int futexWake(int count = std::numeric_limits<int>::max(),
                 uint32_t wakeMask = -1);
 
 private:
-
-  /** Futex wait implemented via syscall SYS_futex. absTimeout gives
-   *  time till when the wait can block. If it is nullptr the call will
-   *  block until a matching futex wake is received. extraOpFlags can be
-   *  used to specify addtional flags to add to the futex operation (by
-   *  default only FUTEX_WAIT_BITSET and FUTEX_PRIVATE_FLAG are included).
-   *  Returns 0 on success or -1 on error, with errno set to one of the
-   *  values listed in futex(2). */
-  int futexWaitImpl(uint32_t expected,
-                    const struct timespec* absTimeout,
-                    int extraOpFlags,
-                    uint32_t waitMask);
+ private:
+
+  /** Underlying implementation of futexWait and futexWaitUntil.
+   *  At most one of absSystemTime and absSteadyTime should be non-null.
+   *  Timeouts are separated into separate parameters to allow the
+   *  implementations to be elsewhere without templating on the clock
+   *  type, which is otherwise complicated by the fact that steady_clock
+   *  is the same as system_clock on some platforms. */
+  FutexResult futexWaitImpl(
+      uint32_t expected,
+      std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+      std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+      uint32_t waitMask);
 };
 
-template <>
-inline int
-Futex<std::atomic>::futexWaitImpl(uint32_t expected,
-                                  const struct timespec* absTimeout,
-                                  int extraOpFlags,
-                                  uint32_t waitMask) {
-  assert(sizeof(*this) == sizeof(int));
-
-  /* Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
-   * value - http://locklessinc.com/articles/futex_cheat_sheet/ */
-  int rv = syscall(
-      SYS_futex,
-      this, /* addr1 */
-      FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | extraOpFlags, /* op */
-      expected, /* val */
-      absTimeout, /* timeout */
-      nullptr, /* addr2 */
-      waitMask); /* val3 */
-
-  assert(rv == 0 ||
-         errno == EWOULDBLOCK ||
-         errno == EINTR ||
-         (absTimeout != nullptr && errno == ETIMEDOUT));
-
-  return rv;
-}
-
-template <>
-inline bool Futex<std::atomic>::futexWait(uint32_t expected,
-                                          uint32_t waitMask) {
-  return futexWaitImpl(expected, nullptr, 0 /* extraOpFlags */, waitMask) == 0;
-}
-
-template <>
-inline int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
-  assert(sizeof(*this) == sizeof(int));
-  int rv = syscall(SYS_futex,
-                   this, /* addr1 */
-                   FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
-                   count, /* val */
-                   nullptr, /* timeout */
-                   nullptr, /* addr2 */
-                   wakeMask); /* val3 */
-  assert(rv >= 0);
-  return rv;
-}
-
-/* Convert std::chrono::time_point to struct timespec */
-template <class Clock, class Duration = typename Clock::Duration>
-struct timespec timePointToTimeSpec(const time_point<Clock, Duration>& tp) {
-  using std::chrono::nanoseconds;
-  using std::chrono::seconds;
-  using std::chrono::duration_cast;
-
-  struct timespec ts;
-  auto duration = tp.time_since_epoch();
-  auto secs = duration_cast<seconds>(duration);
-  auto nanos = duration_cast<nanoseconds>(duration - secs);
-  ts.tv_sec = secs.count();
-  ts.tv_nsec = nanos.count();
-  return ts;
-}
-
-template <template<typename> class Atom> template<class Clock, class Duration>
-inline FutexResult
-Futex<Atom>::futexWaitUntil(
-               uint32_t expected,
-               const time_point<Clock, Duration>& absTime,
-               uint32_t waitMask) {
-
-  static_assert(std::is_same<Clock,system_clock>::value ||
-                std::is_same<Clock,steady_clock>::value,
-                "Only std::system_clock or std::steady_clock supported");
-
-  struct timespec absTimeSpec = timePointToTimeSpec(absTime);
-  int extraOpFlags = 0;
-
-  /* We must use FUTEX_CLOCK_REALTIME flag if we are getting the time_point
-   * from the system clock (CLOCK_REALTIME). This check also works correctly for
-   * broken glibc in which steady_clock is a typedef to system_clock.*/
-  if (std::is_same<Clock,system_clock>::value) {
-    extraOpFlags = FUTEX_CLOCK_REALTIME;
-  } else {
-    assert(Clock::is_steady);
-  }
+/** A std::atomic subclass that can be used to force Futex to emulate
+ *  the underlying futex() syscall.  This is primarily useful to test or
+ *  benchmark the emulated implementation on systems that don't need it. */
+template <typename T>
+struct EmulatedFutexAtomic : public std::atomic<T> {
+  EmulatedFutexAtomic() noexcept = default;
+  constexpr /* implicit */ EmulatedFutexAtomic(T init) noexcept
+      : std::atomic<T>(init) {}
+  EmulatedFutexAtomic(const EmulatedFutexAtomic& rhs) = delete;
+};
+
+/* Available specializations, with definitions elsewhere */
+
+template<>
+int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask);
+
+template<>
+FutexResult Futex<std::atomic>::futexWaitImpl(
+      uint32_t expected,
+      std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+      std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+      uint32_t waitMask);
+
+template<>
+int Futex<EmulatedFutexAtomic>::futexWake(int count, uint32_t wakeMask);
 
-  const int rv = futexWaitImpl(expected, &absTimeSpec, extraOpFlags, waitMask);
-  return futexErrnoToFutexResult(rv, errno);
-}
+template<>
+FutexResult Futex<EmulatedFutexAtomic>::futexWaitImpl(
+      uint32_t expected,
+      std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+      std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+      uint32_t waitMask);
 
 }}
index f214dd6907acdad748644cbc2b773fcb4817fbb7..8779a4c334ce536c8dc125a91d51bbf79740577d 100644 (file)
@@ -24,6 +24,7 @@
 
 using namespace folly;
 using namespace folly::test;
+using folly::detail::EmulatedFutexAtomic;
 
 typedef DeterministicSchedule DSched;
 
@@ -63,6 +64,10 @@ BENCHMARK(baton_pingpong, iters) {
   run_pingpong_test<std::atomic>(iters);
 }
 
+BENCHMARK(baton_pingpong_emulated_futex, iters) {
+  run_pingpong_test<EmulatedFutexAtomic>(iters);
+}
+
 BENCHMARK(posix_sem_pingpong, iters) {
   sem_t sems[3];
   sem_t* a = sems + 0;
@@ -83,45 +88,50 @@ BENCHMARK(posix_sem_pingpong, iters) {
   thr.join();
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, typename Clock>
 void run_basic_timed_wait_tests() {
   Baton<Atom> b;
   b.post();
   // tests if early delivery works fine
-  EXPECT_TRUE(b.timed_wait(std::chrono::system_clock::now()));
+  EXPECT_TRUE(b.timed_wait(Clock::now()));
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, typename Clock>
 void run_timed_wait_tmo_tests() {
   Baton<Atom> b;
 
   auto thr = DSched::thread([&]{
-    bool rv = b.timed_wait(std::chrono::system_clock::now() +
-                           std::chrono::milliseconds(1));
+    bool rv = b.timed_wait(Clock::now() + std::chrono::milliseconds(1));
     // main thread is guaranteed to not post until timeout occurs
     EXPECT_FALSE(rv);
   });
   DSched::join(thr);
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, typename Clock>
 void run_timed_wait_regular_test() {
   Baton<Atom> b;
 
   auto thr = DSched::thread([&] {
-    bool rv = b.timed_wait(
-                std::chrono::time_point<std::chrono::system_clock>::max());
-    if (std::is_same<Atom<int>, std::atomic<int>>::value) {
-      // We can only ensure this for std::atomic
+    // To wait forever we'd like to use time_point<Clock>::max, but
+    // std::condition_variable does math to convert the timeout to
+    // system_clock without handling overflow.
+    auto farFuture = Clock::now() + std::chrono::hours(1000);
+    bool rv = b.timed_wait(farFuture);
+    if (!std::is_same<Atom<int>, DeterministicAtomic<int>>::value) {
+      // DeterministicAtomic ignores actual times, so doesn't guarantee
+      // a lack of timeout
       EXPECT_TRUE(rv);
     }
   });
 
-  if (std::is_same<Atom<int>, std::atomic<int>>::value) {
-    // If we are using std::atomic, then a sleep here guarantees to a large
-    // extent that 'thr' will execute wait before we post it, thus testing
-    // late delivery. For DeterministicAtomic, we just rely on
-    // DeterministicSchedule to do the scheduling
+  if (!std::is_same<Atom<int>, DeterministicAtomic<int>>::value) {
+    // If we are using std::atomic (or EmulatedFutexAtomic) then
+    // a sleep here guarantees to a large extent that 'thr' will
+    // execute wait before we post it, thus testing late delivery. For
+    // DeterministicAtomic, we just rely on DeterministicSchedule to do
+    // the scheduling.  The test won't fail if we lose the race, we just
+    // don't get coverage.
     std::this_thread::sleep_for(std::chrono::milliseconds(2));
   }
 
@@ -129,19 +139,40 @@ void run_timed_wait_regular_test() {
   DSched::join(thr);
 }
 
-TEST(Baton, timed_wait_basic) {
-  run_basic_timed_wait_tests<std::atomic>();
-  run_basic_timed_wait_tests<DeterministicAtomic>();
+TEST(Baton, timed_wait_basic_system_clock) {
+  run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock>();
+  run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
+  run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::system_clock>();
+}
+
+TEST(Baton, timed_wait_timeout_system_clock) {
+  run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock>();
+  run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
+  run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::system_clock>();
+}
+
+TEST(Baton, timed_wait_system_clock) {
+  run_timed_wait_regular_test<std::atomic, std::chrono::system_clock>();
+  run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::system_clock>();
+  run_timed_wait_regular_test<DeterministicAtomic, std::chrono::system_clock>();
+}
+
+TEST(Baton, timed_wait_basic_steady_clock) {
+  run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock>();
+  run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
+  run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::steady_clock>();
 }
 
-TEST(Baton, timed_wait_timeout) {
-  run_timed_wait_tmo_tests<std::atomic>();
-  run_timed_wait_tmo_tests<DeterministicAtomic>();
+TEST(Baton, timed_wait_timeout_steady_clock) {
+  run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock>();
+  run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
+  run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::steady_clock>();
 }
 
-TEST(Baton, timed_wait) {
-  run_timed_wait_regular_test<std::atomic>();
-  run_timed_wait_regular_test<DeterministicAtomic>();
+TEST(Baton, timed_wait_steady_clock) {
+  run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock>();
+  run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::steady_clock>();
+  run_timed_wait_regular_test<DeterministicAtomic, std::chrono::steady_clock>();
 }
 
 template <template<typename> class Atom>
@@ -154,6 +185,7 @@ void run_try_wait_tests() {
 
 TEST(Baton, try_wait) {
   run_try_wait_tests<std::atomic>();
+  run_try_wait_tests<EmulatedFutexAtomic>();
   run_try_wait_tests<DeterministicAtomic>();
 }
 
index 8a3b1fc15248a4e126c3b5f09dbde36d840d135d..a079917e2ef0ed95b7b22776d118ae56ecf005e0 100644 (file)
@@ -229,49 +229,28 @@ DeterministicSchedule::wait(sem_t* sem) {
 namespace folly { namespace detail {
 
 using namespace test;
-
-template<>
-bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
-                                           uint32_t waitMask) {
-  bool rv;
-  DeterministicSchedule::beforeSharedAccess();
-  futexLock.lock();
-  if (data != expected) {
-    rv = false;
-  } else {
-    auto& queue = futexQueues[this];
-    bool done = false;
-    queue.push_back(std::make_pair(waitMask, &done));
-    while (!done) {
-      futexLock.unlock();
-      DeterministicSchedule::afterSharedAccess();
-      DeterministicSchedule::beforeSharedAccess();
-      futexLock.lock();
-    }
-    rv = true;
-  }
-  futexLock.unlock();
-  DeterministicSchedule::afterSharedAccess();
-  return rv;
-}
-
-FutexResult futexWaitUntilImpl(Futex<DeterministicAtomic>* futex,
-                               uint32_t expected, uint32_t waitMask) {
-  if (futex == nullptr) {
-    return FutexResult::VALUE_CHANGED;
-  }
-
-  bool rv = false;
+using namespace std::chrono;
+
+template <>
+FutexResult
+Futex<DeterministicAtomic>::futexWaitImpl(
+        uint32_t expected,
+        time_point<system_clock>* absSystemTimeout,
+        time_point<steady_clock>* absSteadyTimeout,
+        uint32_t waitMask) {
+  bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
+  bool awoken = false;
+  FutexResult result = FutexResult::AWOKEN;
   int futexErrno = 0;
 
   DeterministicSchedule::beforeSharedAccess();
   futexLock.lock();
-  if (futex->data == expected) {
-    auto& queue = futexQueues[futex];
-    queue.push_back(std::make_pair(waitMask, &rv));
+  if (data == expected) {
+    auto& queue = futexQueues[this];
+    queue.push_back(std::make_pair(waitMask, &awoken));
     auto ours = queue.end();
     ours--;
-    while (!rv) {
+    while (!awoken) {
       futexLock.unlock();
       DeterministicSchedule::afterSharedAccess();
       DeterministicSchedule::beforeSharedAccess();
@@ -279,31 +258,33 @@ FutexResult futexWaitUntilImpl(Futex<DeterministicAtomic>* futex,
 
       // Simulate spurious wake-ups, timeouts each time with
       // a 10% probability if we haven't been woken up already
-      if (!rv && DeterministicSchedule::getRandNumber(100) < 10) {
-        assert(futexQueues.count(futex) != 0 &&
-               &futexQueues[futex] == &queue);
+      if (!awoken && hasTimeout &&
+          DeterministicSchedule::getRandNumber(100) < 10) {
+        assert(futexQueues.count(this) != 0 &&
+               &futexQueues[this] == &queue);
         queue.erase(ours);
         if (queue.empty()) {
-          futexQueues.erase(futex);
+          futexQueues.erase(this);
         }
-        rv = false;
         // Simulate ETIMEDOUT 90% of the time and other failures
         // remaining time
-        futexErrno =
-          DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR;
+        result =
+          DeterministicSchedule::getRandNumber(100) >= 10
+              ? FutexResult::TIMEDOUT : FutexResult::INTERRUPTED;
         break;
       }
     }
   } else {
-    futexErrno = EWOULDBLOCK;
+    result = FutexResult::VALUE_CHANGED;
   }
   futexLock.unlock();
   DeterministicSchedule::afterSharedAccess();
-  return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno);
+  return result;
 }
 
 template<>
-int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
+int
+Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
   int rv = 0;
   DeterministicSchedule::beforeSharedAccess();
   futexLock.lock();
index a344ac9ee65629c4cf831437143ae2d8bc867311..4862ac9fa8708e654c41b1d79877d1a30f1c2a07 100644 (file)
@@ -270,29 +270,24 @@ struct DeterministicAtomic {
   }
 };
 
-}}
+}} // namespace folly::test
+
+/* Specialization declarations */
 
 namespace folly { namespace detail {
 
 template<>
-bool Futex<test::DeterministicAtomic>::futexWait(uint32_t expected,
-                                                 uint32_t waitMask);
-
-/// This function ignores the time bound, and instead pseudo-randomly chooses
-/// whether the timeout was reached. To do otherwise would not be deterministic.
-FutexResult futexWaitUntilImpl(Futex<test::DeterministicAtomic> *futex,
-                               uint32_t expected, uint32_t waitMask);
-
-template<> template<class Clock, class Duration>
-FutexResult
-Futex<test::DeterministicAtomic>::futexWaitUntil(
-          uint32_t expected,
-          const time_point<Clock, Duration>& absTimeUnused,
-          uint32_t waitMask) {
-  return futexWaitUntilImpl(this, expected, waitMask);
-}
+int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
 
 template<>
-int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
+FutexResult Futex<test::DeterministicAtomic>::futexWaitImpl(
+      uint32_t expected,
+      std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+      std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+      uint32_t waitMask);
+
+template<>
+Getcpu::Func
+AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(size_t numStripes);
 
-}}
+}} // namespace folly::detail
index 15ef5e02d10d2f25fdb5e522366b8ef3781643ce..e3de16b7662d20ed5e9a54be90e32d2ecc0bcf39 100644 (file)
@@ -407,25 +407,25 @@ BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16)
 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32)
 BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000)
 
-// sudo nice -n -20 tao/queues/LifoSemTests --benchmark --bm_min_iters=10000000
+// sudo nice -n -20 folly/test/LifoSemTests --benchmark --bm_min_iters=10000000
 // ============================================================================
-// tao/queues/LifoSemTests.cpp                     relative  time/iter  iters/s
+// folly/test/LifoSemTests.cpp                     relative  time/iter  iters/s
 // ============================================================================
-// lifo_sem_pingpong                                            1.91us  522.92K
-// lifo_sem_oneway                                            211.18ns    4.74M
-// single_thread_lifo_post                                     19.71ns   50.75M
-// single_thread_lifo_wait                                     18.84ns   53.09M
-// single_thread_lifo_postwait                                 39.41ns   25.37M
-// single_thread_lifo_trywait                                 912.10ps    1.10G
-// single_thread_posix_postwait                                32.93ns   30.37M
-// single_thread_posix_trywait                                 10.06ns   99.36M
+// lifo_sem_pingpong                                          396.84ns    2.52M
+// lifo_sem_oneway                                             88.52ns   11.30M
+// single_thread_lifo_post                                     14.78ns   67.67M
+// single_thread_lifo_wait                                     13.53ns   73.90M
+// single_thread_lifo_postwait                                 28.91ns   34.59M
+// single_thread_lifo_trywait                                 670.13ps    1.49G
+// single_thread_posix_postwait                                24.12ns   41.46M
+// single_thread_posix_trywait                                  6.76ns  147.88M
 // ----------------------------------------------------------------------------
-// contendedUse(1_to_1)                                       208.21ns    4.80M
-// contendedUse(1_to_32)                                      532.41ns    1.88M
-// contendedUse(32_to_1)                                      153.74ns    6.50M
-// contendedUse(16_to_16)                                     301.86ns    3.31M
-// contendedUse(32_to_32)                                     268.32ns    3.73M
-// contendedUse(32_to_1000)                                   966.27ns    1.03M
+// contendedUse(1_to_1)                                       143.60ns    6.96M
+// contendedUse(1_to_32)                                      244.06ns    4.10M
+// contendedUse(32_to_1)                                      131.99ns    7.58M
+// contendedUse(16_to_16)                                     210.64ns    4.75M
+// contendedUse(32_to_32)                                     222.91ns    4.49M
+// contendedUse(32_to_1000)                                   453.39ns    2.21M
 // ============================================================================
 
 int main(int argc, char ** argv) {
index 1a84b485aba2cc7f78d9574643169abf6bc58e4a..be1fbc904d6b3837e7ec689b7eb1f6d03aa7cfb6 100644 (file)
@@ -82,6 +82,12 @@ TEST(MPMCQueue, sequencer) {
   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
 }
 
+TEST(MPMCQueue, sequencer_emulated_futex) {
+  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
+  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
+  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
+}
+
 TEST(MPMCQueue, sequencer_deterministic) {
   DSched sched(DSched::uniform(0));
   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
@@ -254,6 +260,15 @@ TEST(MPMCQueue, mt_try_enq_deq) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
   int nts[] = { 3, 10 };
 
@@ -373,6 +388,20 @@ TEST(MPMCQueue, mt_prod_cons) {
   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
 }
 
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+  int n = 100000;
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
+  LOG(INFO)
+    << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
+}
+
 template <template<typename> class Atom>
 void runNeverFailThread(
     int numThreads,
@@ -427,6 +456,17 @@ TEST(MPMCQueue, mt_never_fail) {
   }
 }
 
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
+              << nt << " threads";
+  }
+}
+
 TEST(MPMCQueue, mt_never_fail_deterministic) {
   int nts[] = { 3, 10 };