Speed up EventCount, now 64-bit only
authorTudor Bosman <tudorb@fb.com>
Fri, 21 Jun 2013 03:14:35 +0000 (20:14 -0700)
committerJordan DeLong <jdelong@fb.com>
Wed, 26 Jun 2013 02:47:03 +0000 (19:47 -0700)
Summary:
Don't use two atomic variables when only one will suffice (otherwise, the
increments in doNotify() and prepareWait() would require memory_model_seq_cst,
because they need to globally order a store before a load, and no weaker
model allows you to do that)

Of course, this makes this 64-bit only, but I don't care.

Test Plan: eventcount_test, tests outside of folly

Reviewed By: delong.j@fb.com

FB internal diff: D858963

folly/experimental/EventCount.h

index f7469b4678622cd3dfebb137e910f7001237fca3..5a11a3300666df805b7633777a15e6426c01cfac 100644 (file)
 #include <syscall.h>
 #include <linux/futex.h>
 #include <sys/time.h>
+#include <cassert>
 #include <climits>
 #include <atomic>
 #include <thread>
 
+#include "folly/Bits.h"
+#include "folly/Likely.h"
+
 
 namespace folly {
 
@@ -90,12 +94,12 @@ inline int futex(int* uaddr, int op, int val, const timespec* timeout,
  */
 class EventCount {
  public:
-  EventCount() noexcept : epoch_(0), waiters_(0) { }
+  EventCount() noexcept : val_(0) { }
 
   class Key {
     friend class EventCount;
-    explicit Key(int e) noexcept : epoch_(e) { }
-    int epoch_;
+    explicit Key(uint32_t e) noexcept : epoch_(e) { }
+    uint32_t epoch_;
   };
 
   void notify() noexcept;
@@ -118,8 +122,28 @@ class EventCount {
   EventCount& operator=(const EventCount&) = delete;
   EventCount& operator=(EventCount&&) = delete;
 
-  std::atomic<int> epoch_;
-  std::atomic<int> waiters_;
+  // This requires 64-bit
+  static_assert(sizeof(int) == 4, "bad platform");
+  static_assert(sizeof(uint32_t) == 4, "bad platform");
+  static_assert(sizeof(uint64_t) == 8, "bad platform");
+
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+  static constexpr size_t kEpochOffset = 1;
+#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
+  static constexpr size_t kEpochOffset = 0;  // in units of sizeof(int)
+#else
+# error Your machine uses a weird endianness!
+#endif
+
+  // val_ stores the epoch in the most significant 32 bits and the
+  // waiter count in the least significant 32 bits.
+  std::atomic<uint64_t> val_;
+
+  static constexpr uint64_t kAddWaiter = uint64_t(1);
+  static constexpr uint64_t kSubWaiter = uint64_t(-1);
+  static constexpr size_t  kEpochShift = 32;
+  static constexpr uint64_t kAddEpoch = uint64_t(1) << kEpochShift;
+  static constexpr uint64_t kWaiterMask = kAddEpoch - 1;
 };
 
 inline void EventCount::notify() noexcept {
@@ -131,31 +155,36 @@ inline void EventCount::notifyAll() noexcept {
 }
 
 inline void EventCount::doNotify(int n) noexcept {
-  // The order is important: epoch_ is incremented before waiters_ is checked.
-  // prepareWait() increments waiters_ before checking epoch_, so it is
-  // impossible to miss a wakeup.
-  ++epoch_;
-  if (waiters_ != 0) {
-    detail::futex(reinterpret_cast<int*>(&epoch_), FUTEX_WAKE, n, nullptr,
-                  nullptr, 0);
+  uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_acq_rel);
+  if (UNLIKELY(prev & kWaiterMask)) {
+    detail::futex(reinterpret_cast<int*>(&val_) + kEpochOffset,
+                  FUTEX_WAKE, n, nullptr, nullptr, 0);
   }
 }
 
 inline EventCount::Key EventCount::prepareWait() noexcept {
-  ++waiters_;
-  return Key(epoch_);
+  uint64_t prev = val_.fetch_add(kAddWaiter, std::memory_order_acq_rel);
+  return Key(prev >> kEpochShift);
 }
 
 inline void EventCount::cancelWait() noexcept {
-  --waiters_;
+  // memory_order_relaxed would suffice for correctness, but the faster
+  // #waiters gets to 0, the less likely it is that we'll do spurious wakeups
+  // (and thus system calls).
+  uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
+  assert((prev & kWaiterMask) != 0);
 }
 
 inline void EventCount::wait(Key key) noexcept {
-  while (epoch_ == key.epoch_) {
-    detail::futex(reinterpret_cast<int*>(&epoch_), FUTEX_WAIT, key.epoch_,
-                  nullptr, nullptr, 0);
+  while ((val_.load(std::memory_order_acquire) >> kEpochShift) == key.epoch_) {
+    detail::futex(reinterpret_cast<int*>(&val_) + kEpochOffset,
+                  FUTEX_WAIT, key.epoch_, nullptr, nullptr, 0);
   }
-  --waiters_;
+  // memory_order_relaxed would suffice for correctness, but the faster
+  // #waiters gets to 0, the less likely it is that we'll do spurious wakeups
+  // (and thus system calls)
+  uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
+  assert((prev & kWaiterMask) != 0);
 }
 
 template <class Condition>