From: Nathan Bronson Date: Wed, 1 Oct 2014 22:46:20 +0000 (-0700) Subject: add emulation of futex() for non-Linux systems X-Git-Tag: v0.22.0~296 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=d3ee675ff41cd36587ba8de41568050baa08eb3e;p=folly.git add emulation of futex() for non-Linux systems Summary: Inside the Linux kernel, futex() works by hashing the source address to a fixed set of wakeup lists. When using folly on non-Linux systems we can emulate something similar by using std::mutex and std::condition_variable. Emulating futex() using higher-level APIs is less crazy than it sounds, because the emulated futex still provides the advantages of the real one: it allows all of the fast-paths of a new synchronization construct to be inlined; it is space efficient, taking only 1 word and allowing the caller to encode state into all of the word's bits; and it avoids system calls unless a thread actually needs to be put to sleep or woken up. Think of this as a way of boostrapping something with the same properties as futex() on platforms that don't expose it publically. (Presumably these platforms have private APIs that do something similar.) This diff moves all of the Linux-specific futex stuff into Futex.cpp, where it is gated by #ifdef __linux__. It also adds an emulated implementation. The emulated futex will be selected by default on non-Linux platforms, or it can be used on Linux by using an Atom template type of folly::detail::EmulatedFutexAtomic. This means, for example, that you can test MPMCQueue on top of the emulated API by instantiating a MPMCQueue. As a bonus, this refactoring provides a small speed boost by removing an unnecessary evaluation of the errno macro in the success path of futexWait. Test Plan: 1. existing unit tests 2. new unit tests (including tests of Futex users) 3. compile Futex.cpp on OS X (some other build failures still occur) Reviewed By: delong.j@fb.com Subscribers: trunkagent, njormrod, yiding, boseant, mssarang FB internal diff: D1596118 Tasks: 4952724 --- diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 34ed779a..d1e3b32d 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -20,11 +20,8 @@ #include #include #include -#include #include -#include #include -#include #include #include @@ -661,7 +658,7 @@ struct TurnSequencer { if (prevThresh == 0) { // bootstrap - spinCutoff = target; + spinCutoff.store(target); } else { // try once, keep moving if CAS fails. Exponential moving average // with alpha of 7/8 diff --git a/folly/detail/CacheLocality.h b/folly/detail/CacheLocality.h index efb4c298..107cf757 100644 --- a/folly/detail/CacheLocality.h +++ b/folly/detail/CacheLocality.h @@ -322,6 +322,10 @@ struct AccessSpreader { static Getcpu::Func pickGetcpuFunc(size_t numStripes); }; +template<> +Getcpu::Func AccessSpreader::pickGetcpuFunc(size_t); + + /// An array of kMaxCpus+1 AccessSpreader instances constructed /// with default params, with the zero-th element having 1 stripe template class Atom, size_t kMaxStripe> diff --git a/folly/detail/Futex.cpp b/folly/detail/Futex.cpp index 9baafd2a..489cec39 100644 --- a/folly/detail/Futex.cpp +++ b/folly/detail/Futex.cpp @@ -15,26 +15,275 @@ */ #include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __linux__ +# include +# include +# include +#endif + +using namespace std::chrono; namespace folly { namespace detail { -/* see Futex.h */ -FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno) { - if (returnVal == 0) { +namespace { + +//////////////////////////////////////////////////// +// native implementation using the futex() syscall + +#ifdef __linux__ + +int nativeFutexWake(void* addr, int count, uint32_t wakeMask) { + int rv = syscall(SYS_futex, + addr, /* addr1 */ + FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */ + count, /* val */ + nullptr, /* timeout */ + nullptr, /* addr2 */ + wakeMask); /* val3 */ + + assert(rv >= 0); + + return rv; +} + +template +struct timespec +timeSpecFromTimePoint(time_point absTime) +{ + auto duration = absTime.time_since_epoch(); + auto secs = duration_cast(duration); + auto nanos = duration_cast(duration - secs); + struct timespec result = { secs.count(), nanos.count() }; + return result; +} + +FutexResult nativeFutexWaitImpl(void* addr, + uint32_t expected, + time_point* absSystemTime, + time_point* absSteadyTime, + uint32_t waitMask) { + assert(absSystemTime == nullptr || absSteadyTime == nullptr); + + int op = FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG; + struct timespec ts; + struct timespec* timeout = nullptr; + + if (absSystemTime != nullptr) { + op |= FUTEX_CLOCK_REALTIME; + ts = timeSpecFromTimePoint(*absSystemTime); + timeout = &ts; + } else if (absSteadyTime != nullptr) { + ts = timeSpecFromTimePoint(*absSteadyTime); + timeout = &ts; + } + + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout + // value - http://locklessinc.com/articles/futex_cheat_sheet/ + int rv = syscall(SYS_futex, + addr, /* addr1 */ + op, /* op */ + expected, /* val */ + timeout, /* timeout */ + nullptr, /* addr2 */ + waitMask); /* val3 */ + + if (rv == 0) { return FutexResult::AWOKEN; + } else { + switch(errno) { + case ETIMEDOUT: + assert(timeout != nullptr); + return FutexResult::TIMEDOUT; + case EINTR: + return FutexResult::INTERRUPTED; + case EWOULDBLOCK: + return FutexResult::VALUE_CHANGED; + default: + assert(false); + // EACCESS, EFAULT, or EINVAL. All of these mean *addr point to + // invalid memory (or I misunderstand the API). We can either + // crash, or return a value that lets the process continue for + // a bit. We choose the latter. VALUE_CHANGED probably turns the + // caller into a spin lock. + return FutexResult::VALUE_CHANGED; + } } - switch(futexErrno) { - case ETIMEDOUT: +} + +#endif // __linux__ + +/////////////////////////////////////////////////////// +// compatibility implementation using standard C++ API + +struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> { + void* const addr_; + const uint32_t waitMask_; + bool hasTimeout_; + bool signaled_; + std::condition_variable cond_; + + EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout) + : addr_(addr) + , waitMask_(waitMask) + , hasTimeout_(hasTimeout) + , signaled_(false) + { + } +}; + +struct EmulatedFutexBucket { + std::mutex mutex_; + boost::intrusive::list waiters_; + + static const size_t kNumBuckets = 4096; + static EmulatedFutexBucket* gBuckets; + static std::once_flag gBucketInit; + + static EmulatedFutexBucket& bucketFor(void* addr) { + std::call_once(gBucketInit, [](){ + gBuckets = new EmulatedFutexBucket[kNumBuckets]; + }); + uint64_t mixedBits = folly::hash::twang_mix64( + reinterpret_cast(addr)); + return gBuckets[mixedBits % kNumBuckets]; + } +}; + +EmulatedFutexBucket* EmulatedFutexBucket::gBuckets; +std::once_flag EmulatedFutexBucket::gBucketInit; + +int emulatedFutexWake(void* addr, int count, uint32_t waitMask) { + auto& bucket = EmulatedFutexBucket::bucketFor(addr); + + int numAwoken = 0; + boost::intrusive::list deferredWakeups; + + { + std::unique_lock lock(bucket.mutex_); + + for (auto iter = bucket.waiters_.begin(); + numAwoken < count && iter != bucket.waiters_.end(); ) { + auto current = iter; + auto& node = *iter++; + if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) { + // We unlink, but waiter destroys the node. We must signal timed + // waiters under the lock, to avoid a race where we release the lock, + // the waiter times out and deletes the node, and then we try to + // signal it. This problem doesn't exist for unbounded waiters, + // so for them we optimize their wakeup by releasing the lock first. + bucket.waiters_.erase(current); + if (node.hasTimeout_) { + node.signaled_ = true; + node.cond_.notify_one(); + } else { + deferredWakeups.push_back(node); + } + ++numAwoken; + } + } + } + + while (!deferredWakeups.empty()) { + auto& node = deferredWakeups.front(); + deferredWakeups.pop_front(); + node.signaled_ = true; + node.cond_.notify_one(); + } + + return numAwoken; +} + +FutexResult emulatedFutexWaitImpl( + void* addr, + uint32_t expected, + time_point* absSystemTime, + time_point* absSteadyTime, + uint32_t waitMask) { + bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr; + EmulatedFutexWaitNode node(addr, waitMask, hasTimeout); + + auto& bucket = EmulatedFutexBucket::bucketFor(addr); + std::unique_lock lock(bucket.mutex_); + + uint32_t actual; + memcpy(&actual, addr, sizeof(uint32_t)); + if (actual != expected) { + return FutexResult::VALUE_CHANGED; + } + + bucket.waiters_.push_back(node); + while (!node.signaled_) { + std::cv_status status = std::cv_status::no_timeout; + if (absSystemTime != nullptr) { + status = node.cond_.wait_until(lock, *absSystemTime); + } else if (absSteadyTime != nullptr) { + status = node.cond_.wait_until(lock, *absSteadyTime); + } else { + node.cond_.wait(lock); + } + + if (status == std::cv_status::timeout) { + bucket.waiters_.erase(bucket.waiters_.iterator_to(node)); return FutexResult::TIMEDOUT; - case EINTR: - return FutexResult::INTERRUPTED; - case EWOULDBLOCK: - return FutexResult::VALUE_CHANGED; - default: - assert(false); - /* Shouldn't reach here. Just return one of the FutexResults */ - return FutexResult::VALUE_CHANGED; + } } + return FutexResult::AWOKEN; +} + +} // anon namespace + + +///////////////////////////////// +// Futex<> specializations + +template <> +int +Futex::futexWake(int count, uint32_t wakeMask) { +#ifdef __linux__ + return nativeFutexWake(this, count, wakeMask); +#else + return emulatedFutexWake(this, count, wakeMask); +#endif +} + +template <> +int +Futex::futexWake(int count, uint32_t wakeMask) { + return emulatedFutexWake(this, count, wakeMask); +} + +template <> +FutexResult +Futex::futexWaitImpl(uint32_t expected, + time_point* absSystemTime, + time_point* absSteadyTime, + uint32_t waitMask) { +#ifdef __linux__ + return nativeFutexWaitImpl( + this, expected, absSystemTime, absSteadyTime, waitMask); +#else + return emulatedFutexWaitImpl( + this, expected, absSystemTime, absSteadyTime, waitMask); +#endif +} + +template <> +FutexResult +Futex::futexWaitImpl( + uint32_t expected, + time_point* absSystemTime, + time_point* absSteadyTime, + uint32_t waitMask) { + return emulatedFutexWaitImpl( + this, expected, absSystemTime, absSteadyTime, waitMask); } -}} +}} // namespace folly::detail diff --git a/folly/detail/Futex.h b/folly/detail/Futex.h index c47bc9a5..fe61d0e6 100644 --- a/folly/detail/Futex.h +++ b/folly/detail/Futex.h @@ -20,16 +20,9 @@ #include #include #include -#include -#include -#include #include #include -using std::chrono::steady_clock; -using std::chrono::system_clock; -using std::chrono::time_point; - namespace folly { namespace detail { enum class FutexResult { @@ -39,9 +32,6 @@ enum class FutexResult { TIMEDOUT }; -/* Converts return value and errno from a futex syscall to a FutexResult */ -FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno); - /** * Futex is an atomic 32 bit unsigned integer that provides access to the * futex() syscall on that value. It is templated in such a way that it @@ -59,7 +49,11 @@ struct Futex : Atom, boost::noncopyable { /** Puts the thread to sleep if this->load() == expected. Returns true when * it is returning because it has consumed a wake() event, false for any * other return (signal, this->load() != expected, or spurious wakeup). */ - bool futexWait(uint32_t expected, uint32_t waitMask = -1); + bool futexWait(uint32_t expected, uint32_t waitMask = -1) { + auto rv = futexWaitImpl(expected, nullptr, nullptr, waitMask); + assert(rv != FutexResult::TIMEDOUT); + return rv == FutexResult::AWOKEN; + } /** Similar to futexWait but also accepts a timeout that gives the time until * when the call can block (time is the absolute time i.e time since epoch). @@ -69,118 +63,84 @@ struct Futex : Atom, boost::noncopyable { * NOTE: On some systems steady_clock is just an alias for system_clock, * and is not actually steady.*/ template - FutexResult futexWaitUntil(uint32_t expected, - const time_point& absTime, - uint32_t waitMask = -1); + FutexResult futexWaitUntil( + uint32_t expected, + const std::chrono::time_point& absTime, + uint32_t waitMask = -1) { + using std::chrono::duration_cast; + using std::chrono::nanoseconds; + using std::chrono::seconds; + using std::chrono::steady_clock; + using std::chrono::system_clock; + using std::chrono::time_point; + + static_assert( + (std::is_same::value || + std::is_same::value), + "futexWaitUntil only knows std::chrono::{system_clock,steady_clock}"); + assert((std::is_same::value) || Clock::is_steady); + + auto duration = absTime.time_since_epoch(); + if (std::is_same::value) { + time_point absSystemTime(duration); + return futexWaitImpl(expected, &absSystemTime, nullptr, waitMask); + } else { + time_point absSteadyTime(duration); + return futexWaitImpl(expected, nullptr, &absSteadyTime, waitMask); + } + } /** Wakens up to count waiters where (waitMask & wakeMask) != 0, * returning the number of awoken threads. */ int futexWake(int count = std::numeric_limits::max(), uint32_t wakeMask = -1); - private: - - /** Futex wait implemented via syscall SYS_futex. absTimeout gives - * time till when the wait can block. If it is nullptr the call will - * block until a matching futex wake is received. extraOpFlags can be - * used to specify addtional flags to add to the futex operation (by - * default only FUTEX_WAIT_BITSET and FUTEX_PRIVATE_FLAG are included). - * Returns 0 on success or -1 on error, with errno set to one of the - * values listed in futex(2). */ - int futexWaitImpl(uint32_t expected, - const struct timespec* absTimeout, - int extraOpFlags, - uint32_t waitMask); + private: + + /** Underlying implementation of futexWait and futexWaitUntil. + * At most one of absSystemTime and absSteadyTime should be non-null. + * Timeouts are separated into separate parameters to allow the + * implementations to be elsewhere without templating on the clock + * type, which is otherwise complicated by the fact that steady_clock + * is the same as system_clock on some platforms. */ + FutexResult futexWaitImpl( + uint32_t expected, + std::chrono::time_point* absSystemTime, + std::chrono::time_point* absSteadyTime, + uint32_t waitMask); }; -template <> -inline int -Futex::futexWaitImpl(uint32_t expected, - const struct timespec* absTimeout, - int extraOpFlags, - uint32_t waitMask) { - assert(sizeof(*this) == sizeof(int)); - - /* Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout - * value - http://locklessinc.com/articles/futex_cheat_sheet/ */ - int rv = syscall( - SYS_futex, - this, /* addr1 */ - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | extraOpFlags, /* op */ - expected, /* val */ - absTimeout, /* timeout */ - nullptr, /* addr2 */ - waitMask); /* val3 */ - - assert(rv == 0 || - errno == EWOULDBLOCK || - errno == EINTR || - (absTimeout != nullptr && errno == ETIMEDOUT)); - - return rv; -} - -template <> -inline bool Futex::futexWait(uint32_t expected, - uint32_t waitMask) { - return futexWaitImpl(expected, nullptr, 0 /* extraOpFlags */, waitMask) == 0; -} - -template <> -inline int Futex::futexWake(int count, uint32_t wakeMask) { - assert(sizeof(*this) == sizeof(int)); - int rv = syscall(SYS_futex, - this, /* addr1 */ - FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */ - count, /* val */ - nullptr, /* timeout */ - nullptr, /* addr2 */ - wakeMask); /* val3 */ - assert(rv >= 0); - return rv; -} - -/* Convert std::chrono::time_point to struct timespec */ -template -struct timespec timePointToTimeSpec(const time_point& tp) { - using std::chrono::nanoseconds; - using std::chrono::seconds; - using std::chrono::duration_cast; - - struct timespec ts; - auto duration = tp.time_since_epoch(); - auto secs = duration_cast(duration); - auto nanos = duration_cast(duration - secs); - ts.tv_sec = secs.count(); - ts.tv_nsec = nanos.count(); - return ts; -} - -template class Atom> template -inline FutexResult -Futex::futexWaitUntil( - uint32_t expected, - const time_point& absTime, - uint32_t waitMask) { - - static_assert(std::is_same::value || - std::is_same::value, - "Only std::system_clock or std::steady_clock supported"); - - struct timespec absTimeSpec = timePointToTimeSpec(absTime); - int extraOpFlags = 0; - - /* We must use FUTEX_CLOCK_REALTIME flag if we are getting the time_point - * from the system clock (CLOCK_REALTIME). This check also works correctly for - * broken glibc in which steady_clock is a typedef to system_clock.*/ - if (std::is_same::value) { - extraOpFlags = FUTEX_CLOCK_REALTIME; - } else { - assert(Clock::is_steady); - } +/** A std::atomic subclass that can be used to force Futex to emulate + * the underlying futex() syscall. This is primarily useful to test or + * benchmark the emulated implementation on systems that don't need it. */ +template +struct EmulatedFutexAtomic : public std::atomic { + EmulatedFutexAtomic() noexcept = default; + constexpr /* implicit */ EmulatedFutexAtomic(T init) noexcept + : std::atomic(init) {} + EmulatedFutexAtomic(const EmulatedFutexAtomic& rhs) = delete; +}; + +/* Available specializations, with definitions elsewhere */ + +template<> +int Futex::futexWake(int count, uint32_t wakeMask); + +template<> +FutexResult Futex::futexWaitImpl( + uint32_t expected, + std::chrono::time_point* absSystemTime, + std::chrono::time_point* absSteadyTime, + uint32_t waitMask); + +template<> +int Futex::futexWake(int count, uint32_t wakeMask); - const int rv = futexWaitImpl(expected, &absTimeSpec, extraOpFlags, waitMask); - return futexErrnoToFutexResult(rv, errno); -} +template<> +FutexResult Futex::futexWaitImpl( + uint32_t expected, + std::chrono::time_point* absSystemTime, + std::chrono::time_point* absSteadyTime, + uint32_t waitMask); }} diff --git a/folly/test/BatonTest.cpp b/folly/test/BatonTest.cpp index f214dd69..8779a4c3 100644 --- a/folly/test/BatonTest.cpp +++ b/folly/test/BatonTest.cpp @@ -24,6 +24,7 @@ using namespace folly; using namespace folly::test; +using folly::detail::EmulatedFutexAtomic; typedef DeterministicSchedule DSched; @@ -63,6 +64,10 @@ BENCHMARK(baton_pingpong, iters) { run_pingpong_test(iters); } +BENCHMARK(baton_pingpong_emulated_futex, iters) { + run_pingpong_test(iters); +} + BENCHMARK(posix_sem_pingpong, iters) { sem_t sems[3]; sem_t* a = sems + 0; @@ -83,45 +88,50 @@ BENCHMARK(posix_sem_pingpong, iters) { thr.join(); } -template class Atom> +template class Atom, typename Clock> void run_basic_timed_wait_tests() { Baton b; b.post(); // tests if early delivery works fine - EXPECT_TRUE(b.timed_wait(std::chrono::system_clock::now())); + EXPECT_TRUE(b.timed_wait(Clock::now())); } -template class Atom> +template class Atom, typename Clock> void run_timed_wait_tmo_tests() { Baton b; auto thr = DSched::thread([&]{ - bool rv = b.timed_wait(std::chrono::system_clock::now() + - std::chrono::milliseconds(1)); + bool rv = b.timed_wait(Clock::now() + std::chrono::milliseconds(1)); // main thread is guaranteed to not post until timeout occurs EXPECT_FALSE(rv); }); DSched::join(thr); } -template class Atom> +template class Atom, typename Clock> void run_timed_wait_regular_test() { Baton b; auto thr = DSched::thread([&] { - bool rv = b.timed_wait( - std::chrono::time_point::max()); - if (std::is_same, std::atomic>::value) { - // We can only ensure this for std::atomic + // To wait forever we'd like to use time_point::max, but + // std::condition_variable does math to convert the timeout to + // system_clock without handling overflow. + auto farFuture = Clock::now() + std::chrono::hours(1000); + bool rv = b.timed_wait(farFuture); + if (!std::is_same, DeterministicAtomic>::value) { + // DeterministicAtomic ignores actual times, so doesn't guarantee + // a lack of timeout EXPECT_TRUE(rv); } }); - if (std::is_same, std::atomic>::value) { - // If we are using std::atomic, then a sleep here guarantees to a large - // extent that 'thr' will execute wait before we post it, thus testing - // late delivery. For DeterministicAtomic, we just rely on - // DeterministicSchedule to do the scheduling + if (!std::is_same, DeterministicAtomic>::value) { + // If we are using std::atomic (or EmulatedFutexAtomic) then + // a sleep here guarantees to a large extent that 'thr' will + // execute wait before we post it, thus testing late delivery. For + // DeterministicAtomic, we just rely on DeterministicSchedule to do + // the scheduling. The test won't fail if we lose the race, we just + // don't get coverage. std::this_thread::sleep_for(std::chrono::milliseconds(2)); } @@ -129,19 +139,40 @@ void run_timed_wait_regular_test() { DSched::join(thr); } -TEST(Baton, timed_wait_basic) { - run_basic_timed_wait_tests(); - run_basic_timed_wait_tests(); +TEST(Baton, timed_wait_basic_system_clock) { + run_basic_timed_wait_tests(); + run_basic_timed_wait_tests(); + run_basic_timed_wait_tests(); +} + +TEST(Baton, timed_wait_timeout_system_clock) { + run_timed_wait_tmo_tests(); + run_timed_wait_tmo_tests(); + run_timed_wait_tmo_tests(); +} + +TEST(Baton, timed_wait_system_clock) { + run_timed_wait_regular_test(); + run_timed_wait_regular_test(); + run_timed_wait_regular_test(); +} + +TEST(Baton, timed_wait_basic_steady_clock) { + run_basic_timed_wait_tests(); + run_basic_timed_wait_tests(); + run_basic_timed_wait_tests(); } -TEST(Baton, timed_wait_timeout) { - run_timed_wait_tmo_tests(); - run_timed_wait_tmo_tests(); +TEST(Baton, timed_wait_timeout_steady_clock) { + run_timed_wait_tmo_tests(); + run_timed_wait_tmo_tests(); + run_timed_wait_tmo_tests(); } -TEST(Baton, timed_wait) { - run_timed_wait_regular_test(); - run_timed_wait_regular_test(); +TEST(Baton, timed_wait_steady_clock) { + run_timed_wait_regular_test(); + run_timed_wait_regular_test(); + run_timed_wait_regular_test(); } template class Atom> @@ -154,6 +185,7 @@ void run_try_wait_tests() { TEST(Baton, try_wait) { run_try_wait_tests(); + run_try_wait_tests(); run_try_wait_tests(); } diff --git a/folly/test/DeterministicSchedule.cpp b/folly/test/DeterministicSchedule.cpp index 8a3b1fc1..a079917e 100644 --- a/folly/test/DeterministicSchedule.cpp +++ b/folly/test/DeterministicSchedule.cpp @@ -229,49 +229,28 @@ DeterministicSchedule::wait(sem_t* sem) { namespace folly { namespace detail { using namespace test; - -template<> -bool Futex::futexWait(uint32_t expected, - uint32_t waitMask) { - bool rv; - DeterministicSchedule::beforeSharedAccess(); - futexLock.lock(); - if (data != expected) { - rv = false; - } else { - auto& queue = futexQueues[this]; - bool done = false; - queue.push_back(std::make_pair(waitMask, &done)); - while (!done) { - futexLock.unlock(); - DeterministicSchedule::afterSharedAccess(); - DeterministicSchedule::beforeSharedAccess(); - futexLock.lock(); - } - rv = true; - } - futexLock.unlock(); - DeterministicSchedule::afterSharedAccess(); - return rv; -} - -FutexResult futexWaitUntilImpl(Futex* futex, - uint32_t expected, uint32_t waitMask) { - if (futex == nullptr) { - return FutexResult::VALUE_CHANGED; - } - - bool rv = false; +using namespace std::chrono; + +template <> +FutexResult +Futex::futexWaitImpl( + uint32_t expected, + time_point* absSystemTimeout, + time_point* absSteadyTimeout, + uint32_t waitMask) { + bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr; + bool awoken = false; + FutexResult result = FutexResult::AWOKEN; int futexErrno = 0; DeterministicSchedule::beforeSharedAccess(); futexLock.lock(); - if (futex->data == expected) { - auto& queue = futexQueues[futex]; - queue.push_back(std::make_pair(waitMask, &rv)); + if (data == expected) { + auto& queue = futexQueues[this]; + queue.push_back(std::make_pair(waitMask, &awoken)); auto ours = queue.end(); ours--; - while (!rv) { + while (!awoken) { futexLock.unlock(); DeterministicSchedule::afterSharedAccess(); DeterministicSchedule::beforeSharedAccess(); @@ -279,31 +258,33 @@ FutexResult futexWaitUntilImpl(Futex* futex, // Simulate spurious wake-ups, timeouts each time with // a 10% probability if we haven't been woken up already - if (!rv && DeterministicSchedule::getRandNumber(100) < 10) { - assert(futexQueues.count(futex) != 0 && - &futexQueues[futex] == &queue); + if (!awoken && hasTimeout && + DeterministicSchedule::getRandNumber(100) < 10) { + assert(futexQueues.count(this) != 0 && + &futexQueues[this] == &queue); queue.erase(ours); if (queue.empty()) { - futexQueues.erase(futex); + futexQueues.erase(this); } - rv = false; // Simulate ETIMEDOUT 90% of the time and other failures // remaining time - futexErrno = - DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR; + result = + DeterministicSchedule::getRandNumber(100) >= 10 + ? FutexResult::TIMEDOUT : FutexResult::INTERRUPTED; break; } } } else { - futexErrno = EWOULDBLOCK; + result = FutexResult::VALUE_CHANGED; } futexLock.unlock(); DeterministicSchedule::afterSharedAccess(); - return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno); + return result; } template<> -int Futex::futexWake(int count, uint32_t wakeMask) { +int +Futex::futexWake(int count, uint32_t wakeMask) { int rv = 0; DeterministicSchedule::beforeSharedAccess(); futexLock.lock(); diff --git a/folly/test/DeterministicSchedule.h b/folly/test/DeterministicSchedule.h index a344ac9e..4862ac9f 100644 --- a/folly/test/DeterministicSchedule.h +++ b/folly/test/DeterministicSchedule.h @@ -270,29 +270,24 @@ struct DeterministicAtomic { } }; -}} +}} // namespace folly::test + +/* Specialization declarations */ namespace folly { namespace detail { template<> -bool Futex::futexWait(uint32_t expected, - uint32_t waitMask); - -/// This function ignores the time bound, and instead pseudo-randomly chooses -/// whether the timeout was reached. To do otherwise would not be deterministic. -FutexResult futexWaitUntilImpl(Futex *futex, - uint32_t expected, uint32_t waitMask); - -template<> template -FutexResult -Futex::futexWaitUntil( - uint32_t expected, - const time_point& absTimeUnused, - uint32_t waitMask) { - return futexWaitUntilImpl(this, expected, waitMask); -} +int Futex::futexWake(int count, uint32_t wakeMask); template<> -int Futex::futexWake(int count, uint32_t wakeMask); +FutexResult Futex::futexWaitImpl( + uint32_t expected, + std::chrono::time_point* absSystemTime, + std::chrono::time_point* absSteadyTime, + uint32_t waitMask); + +template<> +Getcpu::Func +AccessSpreader::pickGetcpuFunc(size_t numStripes); -}} +}} // namespace folly::detail diff --git a/folly/test/LifoSemTests.cpp b/folly/test/LifoSemTests.cpp index 15ef5e02..e3de16b7 100644 --- a/folly/test/LifoSemTests.cpp +++ b/folly/test/LifoSemTests.cpp @@ -407,25 +407,25 @@ BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16) BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32) BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000) -// sudo nice -n -20 tao/queues/LifoSemTests --benchmark --bm_min_iters=10000000 +// sudo nice -n -20 folly/test/LifoSemTests --benchmark --bm_min_iters=10000000 // ============================================================================ -// tao/queues/LifoSemTests.cpp relative time/iter iters/s +// folly/test/LifoSemTests.cpp relative time/iter iters/s // ============================================================================ -// lifo_sem_pingpong 1.91us 522.92K -// lifo_sem_oneway 211.18ns 4.74M -// single_thread_lifo_post 19.71ns 50.75M -// single_thread_lifo_wait 18.84ns 53.09M -// single_thread_lifo_postwait 39.41ns 25.37M -// single_thread_lifo_trywait 912.10ps 1.10G -// single_thread_posix_postwait 32.93ns 30.37M -// single_thread_posix_trywait 10.06ns 99.36M +// lifo_sem_pingpong 396.84ns 2.52M +// lifo_sem_oneway 88.52ns 11.30M +// single_thread_lifo_post 14.78ns 67.67M +// single_thread_lifo_wait 13.53ns 73.90M +// single_thread_lifo_postwait 28.91ns 34.59M +// single_thread_lifo_trywait 670.13ps 1.49G +// single_thread_posix_postwait 24.12ns 41.46M +// single_thread_posix_trywait 6.76ns 147.88M // ---------------------------------------------------------------------------- -// contendedUse(1_to_1) 208.21ns 4.80M -// contendedUse(1_to_32) 532.41ns 1.88M -// contendedUse(32_to_1) 153.74ns 6.50M -// contendedUse(16_to_16) 301.86ns 3.31M -// contendedUse(32_to_32) 268.32ns 3.73M -// contendedUse(32_to_1000) 966.27ns 1.03M +// contendedUse(1_to_1) 143.60ns 6.96M +// contendedUse(1_to_32) 244.06ns 4.10M +// contendedUse(32_to_1) 131.99ns 7.58M +// contendedUse(16_to_16) 210.64ns 4.75M +// contendedUse(32_to_32) 222.91ns 4.49M +// contendedUse(32_to_1000) 453.39ns 2.21M // ============================================================================ int main(int argc, char ** argv) { diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index 1a84b485..be1fbc90 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -82,6 +82,12 @@ TEST(MPMCQueue, sequencer) { run_mt_sequencer_test(100, 10000, -100); } +TEST(MPMCQueue, sequencer_emulated_futex) { + run_mt_sequencer_test(1, 100, 0); + run_mt_sequencer_test(2, 100000, -100); + run_mt_sequencer_test(100, 10000, -100); +} + TEST(MPMCQueue, sequencer_deterministic) { DSched sched(DSched::uniform(0)); run_mt_sequencer_test(1, 100, -50); @@ -254,6 +260,15 @@ TEST(MPMCQueue, mt_try_enq_deq) { } } +TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + TEST(MPMCQueue, mt_try_enq_deq_deterministic) { int nts[] = { 3, 10 }; @@ -373,6 +388,20 @@ TEST(MPMCQueue, mt_prod_cons) { LOG(INFO) << PC_BENCH(MPMCQueue(100000), 32, 100, n); } +TEST(MPMCQueue, mt_prod_cons_emulated_futex) { + int n = 100000; + LOG(INFO) << PC_BENCH((MPMCQueue(10)), 1, 1, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), 10, 1, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), 1, 10, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), 10, 10, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 1, 1, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 10, 1, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 1, 10, n); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 10, 10, n); + LOG(INFO) + << PC_BENCH((MPMCQueue(100000)), 32, 100, n); +} + template class Atom> void runNeverFailThread( int numThreads, @@ -427,6 +456,17 @@ TEST(MPMCQueue, mt_never_fail) { } } +TEST(MPMCQueue, mt_never_fail_emulated_futex) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + uint64_t elapsed = runNeverFailTest(nt, n); + LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " + << nt << " threads"; + } +} + TEST(MPMCQueue, mt_never_fail_deterministic) { int nts[] = { 3, 10 };