Add futexTimedWait
authorSarang Masti <mssarang@fb.com>
Mon, 9 Dec 2013 23:08:02 +0000 (15:08 -0800)
committerJordan DeLong <jdelong@fb.com>
Fri, 20 Dec 2013 21:07:37 +0000 (13:07 -0800)
Summary:
Add futexTimedWait to Futex which allows callers to wait on the futex
for a specified max duration.

Test Plan: -- Ran all unitests

Reviewed By: ngbronson@fb.com

FB internal diff: D1090115

folly/detail/Futex.cpp [new file with mode: 0644]
folly/detail/Futex.h
folly/test/DeterministicSchedule.cpp
folly/test/DeterministicSchedule.h
folly/test/FutexTest.cpp

diff --git a/folly/detail/Futex.cpp b/folly/detail/Futex.cpp
new file mode 100644 (file)
index 0000000..206df40
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/detail/Futex.h>
+
+namespace folly { namespace detail {
+
+/* see Futex.h */
+FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno) {
+  if (returnVal == 0) {
+    return FutexResult::AWOKEN;
+  }
+  switch(futexErrno) {
+    case ETIMEDOUT:
+      return FutexResult::TIMEDOUT;
+    case EINTR:
+      return FutexResult::INTERRUPTED;
+    case EWOULDBLOCK:
+      return FutexResult::VALUE_CHANGED;
+    default:
+      assert(false);
+      /* Shouldn't reach here. Just return one of the FutexResults */
+      return FutexResult::VALUE_CHANGED;
+  }
+}
+
+}}
index 93544bc2666a7a47a5d0beb7bc6305a511e9ea19..50eba88a5af48225104d077248149cdea1e2d8fe 100644 (file)
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <atomic>
+#include <chrono>
 #include <limits>
 #include <assert.h>
 #include <errno.h>
 #include <unistd.h>
 #include <boost/noncopyable.hpp>
 
+using std::chrono::steady_clock;
+using std::chrono::system_clock;
+using std::chrono::time_point;
+
 namespace folly { namespace detail {
 
+enum class FutexResult {
+  VALUE_CHANGED, /* Futex value didn't match expected */
+  AWOKEN,        /* futex wait matched with a futex wake */
+  INTERRUPTED,   /* Spurious wake-up or signal caused futex wait failure */
+  TIMEDOUT
+};
+
+/* Converts return value and errno from a futex syscall to a FutexResult */
+FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno);
+
 /**
  * Futex is an atomic 32 bit unsigned integer that provides access to the
  * futex() syscall on that value.  It is templated in such a way that it
@@ -46,25 +61,69 @@ struct Futex : Atom<uint32_t>, boost::noncopyable {
    *  other return (signal, this->load() != expected, or spurious wakeup). */
   bool futexWait(uint32_t expected, uint32_t waitMask = -1);
 
+  /** Similar to futexWait but also accepts a timeout that gives the time until
+   *  when the call can block (time is the absolute time i.e time since epoch).
+   *  Allowed clock types: std::chrono::system_clock, std::chrono::steady_clock.
+   *  Returns one of FutexResult values.
+   *
+   *  NOTE: On some systems steady_clock is just an alias for system_clock,
+   *  and is not actually steady.*/
+  template <class Clock, class Duration = typename Clock::duration>
+  FutexResult futexWaitUntil(uint32_t expected,
+                             const time_point<Clock, Duration>& absTime,
+                             uint32_t waitMask = -1);
+
   /** Wakens up to count waiters where (waitMask & wakeMask) != 0,
    *  returning the number of awoken threads. */
   int futexWake(int count = std::numeric_limits<int>::max(),
                 uint32_t wakeMask = -1);
+
+  private:
+
+  /** Futex wait implemented via syscall SYS_futex. absTimeout gives
+   *  time till when the wait can block. If it is nullptr the call will
+   *  block until a matching futex wake is received. extraOpFlags can be
+   *  used to specify addtional flags to add to the futex operation (by
+   *  default only FUTEX_WAIT_BITSET and FUTEX_PRIVATE_FLAG are included).
+   *  Returns 0 on success or -1 on error, with errno set to one of the
+   *  values listed in futex(2). */
+  int futexWaitImpl(uint32_t expected,
+                    const struct timespec* absTimeout,
+                    int extraOpFlags,
+                    uint32_t waitMask);
 };
 
+template <>
+inline int
+Futex<std::atomic>::futexWaitImpl(uint32_t expected,
+                                  const struct timespec* absTimeout,
+                                  int extraOpFlags,
+                                  uint32_t waitMask) {
+  assert(sizeof(*this) == sizeof(int));
+
+  /* Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
+   * value - http://locklessinc.com/articles/futex_cheat_sheet/ */
+  int rv = syscall(
+      SYS_futex,
+      this, /* addr1 */
+      FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | extraOpFlags, /* op */
+      expected, /* val */
+      absTimeout, /* timeout */
+      nullptr, /* addr2 */
+      waitMask); /* val3 */
+
+  assert(rv == 0 ||
+         errno == EWOULDBLOCK ||
+         errno == EINTR ||
+         (absTimeout != nullptr && errno == ETIMEDOUT));
+
+  return rv;
+}
+
 template <>
 inline bool Futex<std::atomic>::futexWait(uint32_t expected,
                                           uint32_t waitMask) {
-  assert(sizeof(*this) == sizeof(int));
-  int rv = syscall(SYS_futex,
-                   this, /* addr1 */
-                   FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, /* op */
-                   expected, /* val */
-                   nullptr, /* timeout */
-                   nullptr, /* addr2 */
-                   waitMask); /* val3 */
-  assert(rv == 0 || (errno == EWOULDBLOCK || errno == EINTR));
-  return rv == 0;
+  return futexWaitImpl(expected, nullptr, 0 /* extraOpFlags */, waitMask) == 0;
 }
 
 template <>
@@ -81,4 +140,47 @@ inline int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
   return rv;
 }
 
+/* Convert std::chrono::time_point to struct timespec */
+template <class Clock, class Duration = typename Clock::Duration>
+struct timespec timePointToTimeSpec(const time_point<Clock, Duration>& tp) {
+  using std::chrono::nanoseconds;
+  using std::chrono::seconds;
+  using std::chrono::duration_cast;
+
+  struct timespec ts;
+  auto duration = tp.time_since_epoch();
+  auto secs = duration_cast<seconds>(duration);
+  auto nanos = duration_cast<nanoseconds>(duration - secs);
+  ts.tv_sec = secs.count();
+  ts.tv_nsec = nanos.count();
+  return ts;
+}
+
+template <template<typename> class Atom> template<class Clock, class Duration>
+inline FutexResult
+Futex<Atom>::futexWaitUntil(
+               uint32_t expected,
+               const time_point<Clock, Duration>& absTime,
+               uint32_t waitMask) {
+
+  static_assert(std::is_same<Clock,system_clock>::value ||
+                std::is_same<Clock,steady_clock>::value,
+                "Only std::system_clock or std::steady_clock supported");
+
+  struct timespec absTimeSpec = timePointToTimeSpec(absTime);
+  int extraOpFlags = 0;
+
+  /* We must use FUTEX_CLOCK_REALTIME flag if we are getting the time_point
+   * from the system clock (CLOCK_REALTIME). This check also works correctly for
+   * broken glibc in which steady_clock is a typedef to system_clock.*/
+  if (std::is_same<Clock,system_clock>::value) {
+    extraOpFlags = FUTEX_CLOCK_REALTIME;
+  } else {
+    assert(Clock::is_steady);
+  }
+
+  const int rv = futexWaitImpl(expected, &absTimeSpec, extraOpFlags, waitMask);
+  return futexErrnoToFutexResult(rv, errno);
+}
+
 }}
index 873bcf7886770ce3464dc8f8123892a224890499..f913b152e6fbfb626d6b0a4521c5234306b861d1 100644 (file)
@@ -134,6 +134,14 @@ DeterministicSchedule::afterSharedAccess() {
   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
 }
 
+int
+DeterministicSchedule::getRandNumber(int n) {
+  if (tls_sched) {
+    return tls_sched->scheduler_(n);
+  }
+  return std::rand() % n;
+}
+
 sem_t*
 DeterministicSchedule::beforeThreadCreate() {
   sem_t* s = new sem_t;
@@ -247,6 +255,49 @@ bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
   return rv;
 }
 
+FutexResult futexWaitUntilImpl(Futex<DeterministicAtomic>* futex,
+                               uint32_t expected, uint32_t waitMask) {
+  if (futex == nullptr) {
+    return FutexResult::VALUE_CHANGED;
+  }
+
+  bool rv = false;
+  int futexErrno = 0;
+
+  DeterministicSchedule::beforeSharedAccess();
+  futexLock.lock();
+  if (futex->data == expected) {
+    auto& queue = futexQueues[futex];
+    queue.push_back(std::make_pair(waitMask, &rv));
+    auto ours = queue.end();
+    ours--;
+    while (!rv) {
+      futexLock.unlock();
+      DeterministicSchedule::afterSharedAccess();
+      DeterministicSchedule::beforeSharedAccess();
+      futexLock.lock();
+
+      // Simulate spurious wake-ups, timeouts each time with
+      // a 10% probability
+      if (DeterministicSchedule::getRandNumber(100) < 10) {
+        queue.erase(ours);
+        if (queue.empty()) {
+          futexQueues.erase(futex);
+        }
+        rv = false;
+        // Simulate ETIMEDOUT 90% of the time and other failures
+        // remaining time
+        futexErrno =
+          DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR;
+        break;
+      }
+    }
+  }
+  futexLock.unlock();
+  DeterministicSchedule::afterSharedAccess();
+  return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno);
+}
+
 template<>
 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
   int rv = 0;
index 71cb1037f2040e08341327c60db166bf0f9908b6..e4c587b6ed35590d3e651b5e0b64d8da7f937abe 100644 (file)
@@ -124,6 +124,10 @@ class DeterministicSchedule : boost::noncopyable {
   /** Calls sem_wait(sem) as part of a deterministic schedule. */
   static void wait(sem_t* sem);
 
+  /** Used scheduler_ to get a random number b/w [0, n). If tls_sched is
+   *  not set-up it falls back to std::rand() */
+  static int getRandNumber(int n);
+
  private:
   static __thread sem_t* tls_sem;
   static __thread DeterministicSchedule* tls_sched;
@@ -274,6 +278,20 @@ template<>
 bool Futex<test::DeterministicAtomic>::futexWait(uint32_t expected,
                                                  uint32_t waitMask);
 
+/// This function ignores the time bound, and instead pseudo-randomly chooses
+/// whether the timeout was reached. To do otherwise would not be deterministic.
+FutexResult futexWaitUntilImpl(Futex<test::DeterministicAtomic> *futex,
+                               uint32_t expected, uint32_t waitMask);
+
+template<> template<class Clock, class Duration>
+FutexResult
+Futex<test::DeterministicAtomic>::futexWaitUntil(
+          uint32_t expected,
+          const time_point<Clock, Duration>& absTimeUnused,
+          uint32_t waitMask) {
+  return futexWaitUntilImpl(this, expected, waitMask);
+}
+
 template<>
 int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
 
index 5b2b1b859fc62015b07e43e817d95594f5cba895..37df56bf3b61c0fa4719dfe6ee7b6de5cd10aad6 100644 (file)
 #include "folly/detail/Futex.h"
 #include "folly/test/DeterministicSchedule.h"
 
+#include <chrono>
 #include <thread>
 
 #include <gflags/gflags.h>
 #include <gtest/gtest.h>
+#include <common/logging/logging.h>
+#include <time.h>
 
 using namespace folly::detail;
 using namespace folly::test;
+using namespace std::chrono;
 
 typedef DeterministicSchedule DSched;
 
@@ -45,14 +49,133 @@ void run_basic_tests() {
   DSched::join(thr);
 }
 
+template<template<typename> class Atom>
+void run_wait_until_tests();
+
+template <typename Clock>
+void stdAtomicWaitUntilTests() {
+  Futex<std::atomic> f(0);
+
+  auto thrA = DSched::thread([&]{
+    while (true) {
+      typename Clock::time_point nowPlus2s = Clock::now() + seconds(2);
+      auto res = f.futexWaitUntil(0, nowPlus2s);
+      EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
+      if (res == FutexResult::AWOKEN) {
+        break;
+      }
+    }
+  });
+
+  while (f.futexWake() != 1) {
+    std::this_thread::yield();
+  }
+
+  DSched::join(thrA);
+
+  auto start = Clock::now();
+  EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)),
+            FutexResult::TIMEDOUT);
+  LOG(INFO) << "Futex wait timed out after waiting for "
+            << duration_cast<milliseconds>(Clock::now() - start).count()
+            << "ms";
+}
+
+template <typename Clock>
+void deterministicAtomicWaitUntilTests() {
+  Futex<DeterministicAtomic> f(0);
+
+  // Futex wait must eventually fail with either FutexResult::TIMEDOUT or
+  // FutexResult::INTERRUPTED
+  auto res = f.futexWaitUntil(0, Clock::now() + milliseconds(100));
+  EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED);
+}
+
+template <>
+void run_wait_until_tests<std::atomic>() {
+  stdAtomicWaitUntilTests<system_clock>();
+  stdAtomicWaitUntilTests<steady_clock>();
+}
+
+template <>
+void run_wait_until_tests<DeterministicAtomic>() {
+  deterministicAtomicWaitUntilTests<system_clock>();
+  deterministicAtomicWaitUntilTests<steady_clock>();
+}
+
+uint64_t diff(uint64_t a, uint64_t b) {
+  return a > b ? a - b : b - a;
+}
+
+void run_system_clock_test() {
+  /* Test to verify that system_clock uses clock_gettime(CLOCK_REALTIME, ...)
+   * for the time_points */
+  struct timespec ts;
+  const int maxIters = 1000;
+  int iter = 0;
+  uint64_t delta = 10000000 /* 10 ms */;
+
+  /** The following loop is only to make the test more robust in the presence of
+   * clock adjustments that can occur. We just run the loop maxIter times and
+   * expect with very high probability that there will be atleast one iteration
+   * of the test during which clock adjustments > delta have not occurred. */
+  while (iter < maxIters) {
+    uint64_t a = duration_cast<nanoseconds>(system_clock::now()
+                                            .time_since_epoch()).count();
+
+    clock_gettime(CLOCK_REALTIME, &ts);
+    uint64_t b = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
+
+    uint64_t c = duration_cast<nanoseconds>(system_clock::now()
+                                            .time_since_epoch()).count();
+
+    if (diff(a, b) <= delta &&
+        diff(b, c) <= delta &&
+        diff(a, c) <= 2 * delta) {
+      /* Success! system_clock uses CLOCK_REALTIME for time_points */
+      break;
+    }
+    iter++;
+  }
+  EXPECT_TRUE(iter < maxIters);
+}
+
+void run_steady_clock_test() {
+  /* Test to verify that steady_clock uses clock_gettime(CLOCK_MONOTONIC, ...)
+   * for the time_points */
+  EXPECT_TRUE(steady_clock::is_steady);
+
+  uint64_t A = duration_cast<nanoseconds>(steady_clock::now()
+                                          .time_since_epoch()).count();
+
+  struct timespec ts;
+  clock_gettime(CLOCK_MONOTONIC, &ts);
+  uint64_t B = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
+
+  uint64_t C = duration_cast<nanoseconds>(steady_clock::now()
+                                          .time_since_epoch()).count();
+  EXPECT_TRUE(A <= B && B <= C);
+}
+
+TEST(Futex, clock_source) {
+  run_system_clock_test();
+
+  /* On some systems steady_clock is just an alias for system_clock. So,
+   * we must skip run_steady_clock_test if the two clocks are the same. */
+  if (!std::is_same<system_clock,steady_clock>::value) {
+    run_steady_clock_test();
+  }
+}
 
 TEST(Futex, basic_live) {
   run_basic_tests<std::atomic>();
+  run_wait_until_tests<std::atomic>();
 }
 
 TEST(Futex, basic_deterministic) {
   DSched sched(DSched::uniform(0));
   run_basic_tests<DeterministicAtomic>();
+  run_wait_until_tests<DeterministicAtomic>();
 }
 
 int main(int argc, char ** argv) {