#include <atomic>
#include <assert.h>
#include <boost/noncopyable.hpp>
-#include <errno.h>
#include <limits>
-#include <linux/futex.h>
#include <string.h>
-#include <sys/syscall.h>
#include <type_traits>
#include <unistd.h>
if (prevThresh == 0) {
// bootstrap
- spinCutoff = target;
+ spinCutoff.store(target);
} else {
// try once, keep moving if CAS fails. Exponential moving average
// with alpha of 7/8
static Getcpu::Func pickGetcpuFunc(size_t numStripes);
};
+template<>
+Getcpu::Func AccessSpreader<std::atomic>::pickGetcpuFunc(size_t);
+
+
/// An array of kMaxCpus+1 AccessSpreader<Atom> instances constructed
/// with default params, with the zero-th element having 1 stripe
template <template<typename> class Atom, size_t kMaxStripe>
*/
#include <folly/detail/Futex.h>
+#include <stdint.h>
+#include <string.h>
+#include <condition_variable>
+#include <mutex>
+#include <boost/intrusive/list.hpp>
+#include <folly/Hash.h>
+#include <folly/ScopeGuard.h>
+
+#ifdef __linux__
+# include <errno.h>
+# include <linux/futex.h>
+# include <sys/syscall.h>
+#endif
+
+using namespace std::chrono;
namespace folly { namespace detail {
-/* see Futex.h */
-FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno) {
- if (returnVal == 0) {
+namespace {
+
+////////////////////////////////////////////////////
+// native implementation using the futex() syscall
+
+#ifdef __linux__
+
+int nativeFutexWake(void* addr, int count, uint32_t wakeMask) {
+ int rv = syscall(SYS_futex,
+ addr, /* addr1 */
+ FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
+ count, /* val */
+ nullptr, /* timeout */
+ nullptr, /* addr2 */
+ wakeMask); /* val3 */
+
+ assert(rv >= 0);
+
+ return rv;
+}
+
+template <class Clock>
+struct timespec
+timeSpecFromTimePoint(time_point<Clock> absTime)
+{
+ auto duration = absTime.time_since_epoch();
+ auto secs = duration_cast<seconds>(duration);
+ auto nanos = duration_cast<nanoseconds>(duration - secs);
+ struct timespec result = { secs.count(), nanos.count() };
+ return result;
+}
+
+FutexResult nativeFutexWaitImpl(void* addr,
+ uint32_t expected,
+ time_point<system_clock>* absSystemTime,
+ time_point<steady_clock>* absSteadyTime,
+ uint32_t waitMask) {
+ assert(absSystemTime == nullptr || absSteadyTime == nullptr);
+
+ int op = FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG;
+ struct timespec ts;
+ struct timespec* timeout = nullptr;
+
+ if (absSystemTime != nullptr) {
+ op |= FUTEX_CLOCK_REALTIME;
+ ts = timeSpecFromTimePoint(*absSystemTime);
+ timeout = &ts;
+ } else if (absSteadyTime != nullptr) {
+ ts = timeSpecFromTimePoint(*absSteadyTime);
+ timeout = &ts;
+ }
+
+ // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
+ // value - http://locklessinc.com/articles/futex_cheat_sheet/
+ int rv = syscall(SYS_futex,
+ addr, /* addr1 */
+ op, /* op */
+ expected, /* val */
+ timeout, /* timeout */
+ nullptr, /* addr2 */
+ waitMask); /* val3 */
+
+ if (rv == 0) {
return FutexResult::AWOKEN;
+ } else {
+ switch(errno) {
+ case ETIMEDOUT:
+ assert(timeout != nullptr);
+ return FutexResult::TIMEDOUT;
+ case EINTR:
+ return FutexResult::INTERRUPTED;
+ case EWOULDBLOCK:
+ return FutexResult::VALUE_CHANGED;
+ default:
+ assert(false);
+ // EACCESS, EFAULT, or EINVAL. All of these mean *addr point to
+ // invalid memory (or I misunderstand the API). We can either
+ // crash, or return a value that lets the process continue for
+ // a bit. We choose the latter. VALUE_CHANGED probably turns the
+ // caller into a spin lock.
+ return FutexResult::VALUE_CHANGED;
+ }
}
- switch(futexErrno) {
- case ETIMEDOUT:
+}
+
+#endif // __linux__
+
+///////////////////////////////////////////////////////
+// compatibility implementation using standard C++ API
+
+struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> {
+ void* const addr_;
+ const uint32_t waitMask_;
+ bool hasTimeout_;
+ bool signaled_;
+ std::condition_variable cond_;
+
+ EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout)
+ : addr_(addr)
+ , waitMask_(waitMask)
+ , hasTimeout_(hasTimeout)
+ , signaled_(false)
+ {
+ }
+};
+
+struct EmulatedFutexBucket {
+ std::mutex mutex_;
+ boost::intrusive::list<EmulatedFutexWaitNode> waiters_;
+
+ static const size_t kNumBuckets = 4096;
+ static EmulatedFutexBucket* gBuckets;
+ static std::once_flag gBucketInit;
+
+ static EmulatedFutexBucket& bucketFor(void* addr) {
+ std::call_once(gBucketInit, [](){
+ gBuckets = new EmulatedFutexBucket[kNumBuckets];
+ });
+ uint64_t mixedBits = folly::hash::twang_mix64(
+ reinterpret_cast<uintptr_t>(addr));
+ return gBuckets[mixedBits % kNumBuckets];
+ }
+};
+
+EmulatedFutexBucket* EmulatedFutexBucket::gBuckets;
+std::once_flag EmulatedFutexBucket::gBucketInit;
+
+int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
+ auto& bucket = EmulatedFutexBucket::bucketFor(addr);
+
+ int numAwoken = 0;
+ boost::intrusive::list<EmulatedFutexWaitNode> deferredWakeups;
+
+ {
+ std::unique_lock<std::mutex> lock(bucket.mutex_);
+
+ for (auto iter = bucket.waiters_.begin();
+ numAwoken < count && iter != bucket.waiters_.end(); ) {
+ auto current = iter;
+ auto& node = *iter++;
+ if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
+ // We unlink, but waiter destroys the node. We must signal timed
+ // waiters under the lock, to avoid a race where we release the lock,
+ // the waiter times out and deletes the node, and then we try to
+ // signal it. This problem doesn't exist for unbounded waiters,
+ // so for them we optimize their wakeup by releasing the lock first.
+ bucket.waiters_.erase(current);
+ if (node.hasTimeout_) {
+ node.signaled_ = true;
+ node.cond_.notify_one();
+ } else {
+ deferredWakeups.push_back(node);
+ }
+ ++numAwoken;
+ }
+ }
+ }
+
+ while (!deferredWakeups.empty()) {
+ auto& node = deferredWakeups.front();
+ deferredWakeups.pop_front();
+ node.signaled_ = true;
+ node.cond_.notify_one();
+ }
+
+ return numAwoken;
+}
+
+FutexResult emulatedFutexWaitImpl(
+ void* addr,
+ uint32_t expected,
+ time_point<system_clock>* absSystemTime,
+ time_point<steady_clock>* absSteadyTime,
+ uint32_t waitMask) {
+ bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr;
+ EmulatedFutexWaitNode node(addr, waitMask, hasTimeout);
+
+ auto& bucket = EmulatedFutexBucket::bucketFor(addr);
+ std::unique_lock<std::mutex> lock(bucket.mutex_);
+
+ uint32_t actual;
+ memcpy(&actual, addr, sizeof(uint32_t));
+ if (actual != expected) {
+ return FutexResult::VALUE_CHANGED;
+ }
+
+ bucket.waiters_.push_back(node);
+ while (!node.signaled_) {
+ std::cv_status status = std::cv_status::no_timeout;
+ if (absSystemTime != nullptr) {
+ status = node.cond_.wait_until(lock, *absSystemTime);
+ } else if (absSteadyTime != nullptr) {
+ status = node.cond_.wait_until(lock, *absSteadyTime);
+ } else {
+ node.cond_.wait(lock);
+ }
+
+ if (status == std::cv_status::timeout) {
+ bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
return FutexResult::TIMEDOUT;
- case EINTR:
- return FutexResult::INTERRUPTED;
- case EWOULDBLOCK:
- return FutexResult::VALUE_CHANGED;
- default:
- assert(false);
- /* Shouldn't reach here. Just return one of the FutexResults */
- return FutexResult::VALUE_CHANGED;
+ }
}
+ return FutexResult::AWOKEN;
+}
+
+} // anon namespace
+
+
+/////////////////////////////////
+// Futex<> specializations
+
+template <>
+int
+Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
+#ifdef __linux__
+ return nativeFutexWake(this, count, wakeMask);
+#else
+ return emulatedFutexWake(this, count, wakeMask);
+#endif
+}
+
+template <>
+int
+Futex<EmulatedFutexAtomic>::futexWake(int count, uint32_t wakeMask) {
+ return emulatedFutexWake(this, count, wakeMask);
+}
+
+template <>
+FutexResult
+Futex<std::atomic>::futexWaitImpl(uint32_t expected,
+ time_point<system_clock>* absSystemTime,
+ time_point<steady_clock>* absSteadyTime,
+ uint32_t waitMask) {
+#ifdef __linux__
+ return nativeFutexWaitImpl(
+ this, expected, absSystemTime, absSteadyTime, waitMask);
+#else
+ return emulatedFutexWaitImpl(
+ this, expected, absSystemTime, absSteadyTime, waitMask);
+#endif
+}
+
+template <>
+FutexResult
+Futex<EmulatedFutexAtomic>::futexWaitImpl(
+ uint32_t expected,
+ time_point<system_clock>* absSystemTime,
+ time_point<steady_clock>* absSteadyTime,
+ uint32_t waitMask) {
+ return emulatedFutexWaitImpl(
+ this, expected, absSystemTime, absSteadyTime, waitMask);
}
-}}
+}} // namespace folly::detail
#include <chrono>
#include <limits>
#include <assert.h>
-#include <errno.h>
-#include <linux/futex.h>
-#include <sys/syscall.h>
#include <unistd.h>
#include <boost/noncopyable.hpp>
-using std::chrono::steady_clock;
-using std::chrono::system_clock;
-using std::chrono::time_point;
-
namespace folly { namespace detail {
enum class FutexResult {
TIMEDOUT
};
-/* Converts return value and errno from a futex syscall to a FutexResult */
-FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno);
-
/**
* Futex is an atomic 32 bit unsigned integer that provides access to the
* futex() syscall on that value. It is templated in such a way that it
/** Puts the thread to sleep if this->load() == expected. Returns true when
* it is returning because it has consumed a wake() event, false for any
* other return (signal, this->load() != expected, or spurious wakeup). */
- bool futexWait(uint32_t expected, uint32_t waitMask = -1);
+ bool futexWait(uint32_t expected, uint32_t waitMask = -1) {
+ auto rv = futexWaitImpl(expected, nullptr, nullptr, waitMask);
+ assert(rv != FutexResult::TIMEDOUT);
+ return rv == FutexResult::AWOKEN;
+ }
/** Similar to futexWait but also accepts a timeout that gives the time until
* when the call can block (time is the absolute time i.e time since epoch).
* NOTE: On some systems steady_clock is just an alias for system_clock,
* and is not actually steady.*/
template <class Clock, class Duration = typename Clock::duration>
- FutexResult futexWaitUntil(uint32_t expected,
- const time_point<Clock, Duration>& absTime,
- uint32_t waitMask = -1);
+ FutexResult futexWaitUntil(
+ uint32_t expected,
+ const std::chrono::time_point<Clock, Duration>& absTime,
+ uint32_t waitMask = -1) {
+ using std::chrono::duration_cast;
+ using std::chrono::nanoseconds;
+ using std::chrono::seconds;
+ using std::chrono::steady_clock;
+ using std::chrono::system_clock;
+ using std::chrono::time_point;
+
+ static_assert(
+ (std::is_same<Clock, system_clock>::value ||
+ std::is_same<Clock, steady_clock>::value),
+ "futexWaitUntil only knows std::chrono::{system_clock,steady_clock}");
+ assert((std::is_same<Clock, system_clock>::value) || Clock::is_steady);
+
+ auto duration = absTime.time_since_epoch();
+ if (std::is_same<Clock, system_clock>::value) {
+ time_point<system_clock> absSystemTime(duration);
+ return futexWaitImpl(expected, &absSystemTime, nullptr, waitMask);
+ } else {
+ time_point<steady_clock> absSteadyTime(duration);
+ return futexWaitImpl(expected, nullptr, &absSteadyTime, waitMask);
+ }
+ }
/** Wakens up to count waiters where (waitMask & wakeMask) != 0,
* returning the number of awoken threads. */
int futexWake(int count = std::numeric_limits<int>::max(),
uint32_t wakeMask = -1);
- private:
-
- /** Futex wait implemented via syscall SYS_futex. absTimeout gives
- * time till when the wait can block. If it is nullptr the call will
- * block until a matching futex wake is received. extraOpFlags can be
- * used to specify addtional flags to add to the futex operation (by
- * default only FUTEX_WAIT_BITSET and FUTEX_PRIVATE_FLAG are included).
- * Returns 0 on success or -1 on error, with errno set to one of the
- * values listed in futex(2). */
- int futexWaitImpl(uint32_t expected,
- const struct timespec* absTimeout,
- int extraOpFlags,
- uint32_t waitMask);
+ private:
+
+ /** Underlying implementation of futexWait and futexWaitUntil.
+ * At most one of absSystemTime and absSteadyTime should be non-null.
+ * Timeouts are separated into separate parameters to allow the
+ * implementations to be elsewhere without templating on the clock
+ * type, which is otherwise complicated by the fact that steady_clock
+ * is the same as system_clock on some platforms. */
+ FutexResult futexWaitImpl(
+ uint32_t expected,
+ std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+ std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+ uint32_t waitMask);
};
-template <>
-inline int
-Futex<std::atomic>::futexWaitImpl(uint32_t expected,
- const struct timespec* absTimeout,
- int extraOpFlags,
- uint32_t waitMask) {
- assert(sizeof(*this) == sizeof(int));
-
- /* Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
- * value - http://locklessinc.com/articles/futex_cheat_sheet/ */
- int rv = syscall(
- SYS_futex,
- this, /* addr1 */
- FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | extraOpFlags, /* op */
- expected, /* val */
- absTimeout, /* timeout */
- nullptr, /* addr2 */
- waitMask); /* val3 */
-
- assert(rv == 0 ||
- errno == EWOULDBLOCK ||
- errno == EINTR ||
- (absTimeout != nullptr && errno == ETIMEDOUT));
-
- return rv;
-}
-
-template <>
-inline bool Futex<std::atomic>::futexWait(uint32_t expected,
- uint32_t waitMask) {
- return futexWaitImpl(expected, nullptr, 0 /* extraOpFlags */, waitMask) == 0;
-}
-
-template <>
-inline int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
- assert(sizeof(*this) == sizeof(int));
- int rv = syscall(SYS_futex,
- this, /* addr1 */
- FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
- count, /* val */
- nullptr, /* timeout */
- nullptr, /* addr2 */
- wakeMask); /* val3 */
- assert(rv >= 0);
- return rv;
-}
-
-/* Convert std::chrono::time_point to struct timespec */
-template <class Clock, class Duration = typename Clock::Duration>
-struct timespec timePointToTimeSpec(const time_point<Clock, Duration>& tp) {
- using std::chrono::nanoseconds;
- using std::chrono::seconds;
- using std::chrono::duration_cast;
-
- struct timespec ts;
- auto duration = tp.time_since_epoch();
- auto secs = duration_cast<seconds>(duration);
- auto nanos = duration_cast<nanoseconds>(duration - secs);
- ts.tv_sec = secs.count();
- ts.tv_nsec = nanos.count();
- return ts;
-}
-
-template <template<typename> class Atom> template<class Clock, class Duration>
-inline FutexResult
-Futex<Atom>::futexWaitUntil(
- uint32_t expected,
- const time_point<Clock, Duration>& absTime,
- uint32_t waitMask) {
-
- static_assert(std::is_same<Clock,system_clock>::value ||
- std::is_same<Clock,steady_clock>::value,
- "Only std::system_clock or std::steady_clock supported");
-
- struct timespec absTimeSpec = timePointToTimeSpec(absTime);
- int extraOpFlags = 0;
-
- /* We must use FUTEX_CLOCK_REALTIME flag if we are getting the time_point
- * from the system clock (CLOCK_REALTIME). This check also works correctly for
- * broken glibc in which steady_clock is a typedef to system_clock.*/
- if (std::is_same<Clock,system_clock>::value) {
- extraOpFlags = FUTEX_CLOCK_REALTIME;
- } else {
- assert(Clock::is_steady);
- }
+/** A std::atomic subclass that can be used to force Futex to emulate
+ * the underlying futex() syscall. This is primarily useful to test or
+ * benchmark the emulated implementation on systems that don't need it. */
+template <typename T>
+struct EmulatedFutexAtomic : public std::atomic<T> {
+ EmulatedFutexAtomic() noexcept = default;
+ constexpr /* implicit */ EmulatedFutexAtomic(T init) noexcept
+ : std::atomic<T>(init) {}
+ EmulatedFutexAtomic(const EmulatedFutexAtomic& rhs) = delete;
+};
+
+/* Available specializations, with definitions elsewhere */
+
+template<>
+int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask);
+
+template<>
+FutexResult Futex<std::atomic>::futexWaitImpl(
+ uint32_t expected,
+ std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+ std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+ uint32_t waitMask);
+
+template<>
+int Futex<EmulatedFutexAtomic>::futexWake(int count, uint32_t wakeMask);
- const int rv = futexWaitImpl(expected, &absTimeSpec, extraOpFlags, waitMask);
- return futexErrnoToFutexResult(rv, errno);
-}
+template<>
+FutexResult Futex<EmulatedFutexAtomic>::futexWaitImpl(
+ uint32_t expected,
+ std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+ std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+ uint32_t waitMask);
}}
using namespace folly;
using namespace folly::test;
+using folly::detail::EmulatedFutexAtomic;
typedef DeterministicSchedule DSched;
run_pingpong_test<std::atomic>(iters);
}
+BENCHMARK(baton_pingpong_emulated_futex, iters) {
+ run_pingpong_test<EmulatedFutexAtomic>(iters);
+}
+
BENCHMARK(posix_sem_pingpong, iters) {
sem_t sems[3];
sem_t* a = sems + 0;
thr.join();
}
-template <template<typename> class Atom>
+template <template<typename> class Atom, typename Clock>
void run_basic_timed_wait_tests() {
Baton<Atom> b;
b.post();
// tests if early delivery works fine
- EXPECT_TRUE(b.timed_wait(std::chrono::system_clock::now()));
+ EXPECT_TRUE(b.timed_wait(Clock::now()));
}
-template <template<typename> class Atom>
+template <template<typename> class Atom, typename Clock>
void run_timed_wait_tmo_tests() {
Baton<Atom> b;
auto thr = DSched::thread([&]{
- bool rv = b.timed_wait(std::chrono::system_clock::now() +
- std::chrono::milliseconds(1));
+ bool rv = b.timed_wait(Clock::now() + std::chrono::milliseconds(1));
// main thread is guaranteed to not post until timeout occurs
EXPECT_FALSE(rv);
});
DSched::join(thr);
}
-template <template<typename> class Atom>
+template <template<typename> class Atom, typename Clock>
void run_timed_wait_regular_test() {
Baton<Atom> b;
auto thr = DSched::thread([&] {
- bool rv = b.timed_wait(
- std::chrono::time_point<std::chrono::system_clock>::max());
- if (std::is_same<Atom<int>, std::atomic<int>>::value) {
- // We can only ensure this for std::atomic
+ // To wait forever we'd like to use time_point<Clock>::max, but
+ // std::condition_variable does math to convert the timeout to
+ // system_clock without handling overflow.
+ auto farFuture = Clock::now() + std::chrono::hours(1000);
+ bool rv = b.timed_wait(farFuture);
+ if (!std::is_same<Atom<int>, DeterministicAtomic<int>>::value) {
+ // DeterministicAtomic ignores actual times, so doesn't guarantee
+ // a lack of timeout
EXPECT_TRUE(rv);
}
});
- if (std::is_same<Atom<int>, std::atomic<int>>::value) {
- // If we are using std::atomic, then a sleep here guarantees to a large
- // extent that 'thr' will execute wait before we post it, thus testing
- // late delivery. For DeterministicAtomic, we just rely on
- // DeterministicSchedule to do the scheduling
+ if (!std::is_same<Atom<int>, DeterministicAtomic<int>>::value) {
+ // If we are using std::atomic (or EmulatedFutexAtomic) then
+ // a sleep here guarantees to a large extent that 'thr' will
+ // execute wait before we post it, thus testing late delivery. For
+ // DeterministicAtomic, we just rely on DeterministicSchedule to do
+ // the scheduling. The test won't fail if we lose the race, we just
+ // don't get coverage.
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
DSched::join(thr);
}
-TEST(Baton, timed_wait_basic) {
- run_basic_timed_wait_tests<std::atomic>();
- run_basic_timed_wait_tests<DeterministicAtomic>();
+TEST(Baton, timed_wait_basic_system_clock) {
+ run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock>();
+ run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
+ run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::system_clock>();
+}
+
+TEST(Baton, timed_wait_timeout_system_clock) {
+ run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock>();
+ run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
+ run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::system_clock>();
+}
+
+TEST(Baton, timed_wait_system_clock) {
+ run_timed_wait_regular_test<std::atomic, std::chrono::system_clock>();
+ run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::system_clock>();
+ run_timed_wait_regular_test<DeterministicAtomic, std::chrono::system_clock>();
+}
+
+TEST(Baton, timed_wait_basic_steady_clock) {
+ run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock>();
+ run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
+ run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::steady_clock>();
}
-TEST(Baton, timed_wait_timeout) {
- run_timed_wait_tmo_tests<std::atomic>();
- run_timed_wait_tmo_tests<DeterministicAtomic>();
+TEST(Baton, timed_wait_timeout_steady_clock) {
+ run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock>();
+ run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
+ run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::steady_clock>();
}
-TEST(Baton, timed_wait) {
- run_timed_wait_regular_test<std::atomic>();
- run_timed_wait_regular_test<DeterministicAtomic>();
+TEST(Baton, timed_wait_steady_clock) {
+ run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock>();
+ run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::steady_clock>();
+ run_timed_wait_regular_test<DeterministicAtomic, std::chrono::steady_clock>();
}
template <template<typename> class Atom>
TEST(Baton, try_wait) {
run_try_wait_tests<std::atomic>();
+ run_try_wait_tests<EmulatedFutexAtomic>();
run_try_wait_tests<DeterministicAtomic>();
}
namespace folly { namespace detail {
using namespace test;
-
-template<>
-bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
- uint32_t waitMask) {
- bool rv;
- DeterministicSchedule::beforeSharedAccess();
- futexLock.lock();
- if (data != expected) {
- rv = false;
- } else {
- auto& queue = futexQueues[this];
- bool done = false;
- queue.push_back(std::make_pair(waitMask, &done));
- while (!done) {
- futexLock.unlock();
- DeterministicSchedule::afterSharedAccess();
- DeterministicSchedule::beforeSharedAccess();
- futexLock.lock();
- }
- rv = true;
- }
- futexLock.unlock();
- DeterministicSchedule::afterSharedAccess();
- return rv;
-}
-
-FutexResult futexWaitUntilImpl(Futex<DeterministicAtomic>* futex,
- uint32_t expected, uint32_t waitMask) {
- if (futex == nullptr) {
- return FutexResult::VALUE_CHANGED;
- }
-
- bool rv = false;
+using namespace std::chrono;
+
+template <>
+FutexResult
+Futex<DeterministicAtomic>::futexWaitImpl(
+ uint32_t expected,
+ time_point<system_clock>* absSystemTimeout,
+ time_point<steady_clock>* absSteadyTimeout,
+ uint32_t waitMask) {
+ bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
+ bool awoken = false;
+ FutexResult result = FutexResult::AWOKEN;
int futexErrno = 0;
DeterministicSchedule::beforeSharedAccess();
futexLock.lock();
- if (futex->data == expected) {
- auto& queue = futexQueues[futex];
- queue.push_back(std::make_pair(waitMask, &rv));
+ if (data == expected) {
+ auto& queue = futexQueues[this];
+ queue.push_back(std::make_pair(waitMask, &awoken));
auto ours = queue.end();
ours--;
- while (!rv) {
+ while (!awoken) {
futexLock.unlock();
DeterministicSchedule::afterSharedAccess();
DeterministicSchedule::beforeSharedAccess();
// Simulate spurious wake-ups, timeouts each time with
// a 10% probability if we haven't been woken up already
- if (!rv && DeterministicSchedule::getRandNumber(100) < 10) {
- assert(futexQueues.count(futex) != 0 &&
- &futexQueues[futex] == &queue);
+ if (!awoken && hasTimeout &&
+ DeterministicSchedule::getRandNumber(100) < 10) {
+ assert(futexQueues.count(this) != 0 &&
+ &futexQueues[this] == &queue);
queue.erase(ours);
if (queue.empty()) {
- futexQueues.erase(futex);
+ futexQueues.erase(this);
}
- rv = false;
// Simulate ETIMEDOUT 90% of the time and other failures
// remaining time
- futexErrno =
- DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR;
+ result =
+ DeterministicSchedule::getRandNumber(100) >= 10
+ ? FutexResult::TIMEDOUT : FutexResult::INTERRUPTED;
break;
}
}
} else {
- futexErrno = EWOULDBLOCK;
+ result = FutexResult::VALUE_CHANGED;
}
futexLock.unlock();
DeterministicSchedule::afterSharedAccess();
- return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno);
+ return result;
}
template<>
-int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
+int
+Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
int rv = 0;
DeterministicSchedule::beforeSharedAccess();
futexLock.lock();
}
};
-}}
+}} // namespace folly::test
+
+/* Specialization declarations */
namespace folly { namespace detail {
template<>
-bool Futex<test::DeterministicAtomic>::futexWait(uint32_t expected,
- uint32_t waitMask);
-
-/// This function ignores the time bound, and instead pseudo-randomly chooses
-/// whether the timeout was reached. To do otherwise would not be deterministic.
-FutexResult futexWaitUntilImpl(Futex<test::DeterministicAtomic> *futex,
- uint32_t expected, uint32_t waitMask);
-
-template<> template<class Clock, class Duration>
-FutexResult
-Futex<test::DeterministicAtomic>::futexWaitUntil(
- uint32_t expected,
- const time_point<Clock, Duration>& absTimeUnused,
- uint32_t waitMask) {
- return futexWaitUntilImpl(this, expected, waitMask);
-}
+int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
template<>
-int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
+FutexResult Futex<test::DeterministicAtomic>::futexWaitImpl(
+ uint32_t expected,
+ std::chrono::time_point<std::chrono::system_clock>* absSystemTime,
+ std::chrono::time_point<std::chrono::steady_clock>* absSteadyTime,
+ uint32_t waitMask);
+
+template<>
+Getcpu::Func
+AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(size_t numStripes);
-}}
+}} // namespace folly::detail
BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32)
BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000)
-// sudo nice -n -20 tao/queues/LifoSemTests --benchmark --bm_min_iters=10000000
+// sudo nice -n -20 folly/test/LifoSemTests --benchmark --bm_min_iters=10000000
// ============================================================================
-// tao/queues/LifoSemTests.cpp relative time/iter iters/s
+// folly/test/LifoSemTests.cpp relative time/iter iters/s
// ============================================================================
-// lifo_sem_pingpong 1.91us 522.92K
-// lifo_sem_oneway 211.18ns 4.74M
-// single_thread_lifo_post 19.71ns 50.75M
-// single_thread_lifo_wait 18.84ns 53.09M
-// single_thread_lifo_postwait 39.41ns 25.37M
-// single_thread_lifo_trywait 912.10ps 1.10G
-// single_thread_posix_postwait 32.93ns 30.37M
-// single_thread_posix_trywait 10.06ns 99.36M
+// lifo_sem_pingpong 396.84ns 2.52M
+// lifo_sem_oneway 88.52ns 11.30M
+// single_thread_lifo_post 14.78ns 67.67M
+// single_thread_lifo_wait 13.53ns 73.90M
+// single_thread_lifo_postwait 28.91ns 34.59M
+// single_thread_lifo_trywait 670.13ps 1.49G
+// single_thread_posix_postwait 24.12ns 41.46M
+// single_thread_posix_trywait 6.76ns 147.88M
// ----------------------------------------------------------------------------
-// contendedUse(1_to_1) 208.21ns 4.80M
-// contendedUse(1_to_32) 532.41ns 1.88M
-// contendedUse(32_to_1) 153.74ns 6.50M
-// contendedUse(16_to_16) 301.86ns 3.31M
-// contendedUse(32_to_32) 268.32ns 3.73M
-// contendedUse(32_to_1000) 966.27ns 1.03M
+// contendedUse(1_to_1) 143.60ns 6.96M
+// contendedUse(1_to_32) 244.06ns 4.10M
+// contendedUse(32_to_1) 131.99ns 7.58M
+// contendedUse(16_to_16) 210.64ns 4.75M
+// contendedUse(32_to_32) 222.91ns 4.49M
+// contendedUse(32_to_1000) 453.39ns 2.21M
// ============================================================================
int main(int argc, char ** argv) {
run_mt_sequencer_test<std::atomic>(100, 10000, -100);
}
+TEST(MPMCQueue, sequencer_emulated_futex) {
+ run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
+ run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
+ run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
+}
+
TEST(MPMCQueue, sequencer_deterministic) {
DSched sched(DSched::uniform(0));
run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
}
}
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
+ }
+}
+
TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
int nts[] = { 3, 10 };
LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
}
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+ int n = 100000;
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
+ LOG(INFO)
+ << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
+}
+
template <template<typename> class Atom>
void runNeverFailThread(
int numThreads,
}
}
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
+ << nt << " threads";
+ }
+}
+
TEST(MPMCQueue, mt_never_fail_deterministic) {
int nts[] = { 3, 10 };