From d43e710c813ee30f0537688dffa4cdb0b1fc5ced Mon Sep 17 00:00:00 2001 From: Phil Willoughby Date: Tue, 15 Mar 2016 01:39:48 -0700 Subject: [PATCH] Convert a polling loop to a futex wait Summary:Add a new method to MPMCQueue: ``` template bool tryWriteUntil(const std::chrono::time_point& when, Args&&... args) noexcept ``` This allows you to write producers which terminate reliably in the absence of consumers. Returns `true` if `args` was enqueued, `false` otherwise. `Clock` must be one of the types supported by the underlying call to `folly::detail::Futex::futexWaitUntil`; at time of writing these are `std::chrono::steady_clock` and `std::chrono::system_clock`. Reviewed By: nbronson Differential Revision: D2895574 fb-gh-sync-id: bdfabcd043191c149f1271e30ffc28476cc8a36e shipit-source-id: bdfabcd043191c149f1271e30ffc28476cc8a36e --- folly/MPMCQueue.h | 55 +++++- folly/detail/TurnSequencer.h | 38 ++-- folly/test/MPMCQueueTest.cpp | 352 ++++++++++++++++++++++++++++------- 3 files changed, 366 insertions(+), 79 deletions(-) diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 05de4c7b..66b76886 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -284,6 +284,21 @@ class MPMCQueue : boost::noncopyable { } } + template + bool tryWriteUntil(const std::chrono::time_point& when, + Args&&... args) noexcept { + uint64_t ticket; + if (tryObtainPromisedPushTicketUntil(ticket, when)) { + // we have pre-validated that the ticket won't block, or rather that + // it won't block longer than it takes another thread to dequeue an + // element from the slot it identifies. + enqueueWithTicket(ticket, std::forward(args)...); + return true; + } else { + return false; + } + } + /// If the queue is not full, enqueues and returns true, otherwise /// returns false. Unlike write this method can be blocked by another /// thread, specifically a read that has linearized (been assigned @@ -471,6 +486,28 @@ class MPMCQueue : boost::noncopyable { } } + /// Tries until when to obtain a push ticket for which + /// SingleElementQueue::enqueue won't block. Returns true on success, false + /// on failure. + /// ticket is filled on success AND failure. + template + bool tryObtainPromisedPushTicketUntil( + uint64_t& ticket, const std::chrono::time_point& when) noexcept { + bool deadlineReached = false; + while (!deadlineReached) { + if (tryObtainPromisedPushTicket(ticket)) { + return true; + } + // ticket is a blocking ticket until the preceding ticket has been + // processed: wait until this ticket's turn arrives. We have not reserved + // this ticket so we will have to re-attempt to get a non-blocking ticket + // if we wake up before we time-out. + deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil( + turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when); + } + return false; + } + /// Tries to obtain a push ticket which can be satisfied if all /// in-progress pops complete. This function does not block, but /// blocking may be required when using the returned ticket if some @@ -482,6 +519,7 @@ class MPMCQueue : boost::noncopyable { auto numPops = popTicket_.load(std::memory_order_acquire); // B // n will be negative if pops are pending int64_t n = numPushes - numPops; + rv = numPushes; if (n >= static_cast(capacity_)) { // Full, linearize at B. We don't need to recheck the read we // performed at A, because if numPushes was stale at B then the @@ -489,7 +527,6 @@ class MPMCQueue : boost::noncopyable { return false; } if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) { - rv = numPushes; return true; } } @@ -597,7 +634,7 @@ struct SingleElementQueue { template ::value && boost::has_nothrow_constructor::value) || - std::is_nothrow_constructible::value>::type> + std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, Atom& spinCutoff, const bool updateSpinCutoff, @@ -611,6 +648,20 @@ struct SingleElementQueue { ImplByMove, ImplByRelocation>::type()); } + /// Waits until either: + /// 1: the dequeue turn preceding the given enqueue turn has arrived + /// 2: the given deadline has arrived + /// Case 1 returns true, case 2 returns false. + template + bool tryWaitForEnqueueTurnUntil( + const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + const std::chrono::time_point& when) noexcept { + return sequencer_.tryWaitForTurn( + turn * 2, spinCutoff, updateSpinCutoff, &when); + } + bool mayEnqueue(const uint32_t turn) const noexcept { return sequencer_.isTurn(turn * 2); } diff --git a/folly/detail/TurnSequencer.h b/folly/detail/TurnSequencer.h index 85aed316..32a6f57f 100644 --- a/folly/detail/TurnSequencer.h +++ b/folly/detail/TurnSequencer.h @@ -82,10 +82,10 @@ struct TurnSequencer { /// See tryWaitForTurn /// Requires that `turn` is not a turn in the past. void waitForTurn(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff) noexcept { + Atom& spinCutoff, + const bool updateSpinCutoff) noexcept { bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff); - (void) success; + (void)success; assert(success); } @@ -99,9 +99,15 @@ struct TurnSequencer { /// before blocking and will adjust spinCutoff based on the results, /// otherwise it will spin for at most spinCutoff spins. /// Returns true if the wait succeeded, false if the turn is in the past + /// or the absTime time value is not nullptr and is reached before the turn + /// arrives + template bool tryWaitForTurn(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff) noexcept { + Atom& spinCutoff, + const bool updateSpinCutoff, + const std::chrono::time_point* absTime = + nullptr) noexcept { uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); const uint32_t effectiveSpinCutoff = updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; @@ -142,7 +148,15 @@ struct TurnSequencer { continue; } } - state_.futexWait(new_state, futexChannel(turn)); + if (absTime) { + auto futexResult = + state_.futexWaitUntil(new_state, *absTime, futexChannel(turn)); + if (futexResult == FutexResult::TIMEDOUT) { + return false; + } + } else { + state_.futexWait(new_state, futexChannel(turn)); + } } if (updateSpinCutoff || prevThresh == 0) { @@ -179,9 +193,9 @@ struct TurnSequencer { while (true) { assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t new_state = encode( - (turn + 1) << kTurnShift, - max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); + uint32_t new_state = + encode((turn + 1) << kTurnShift, + max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); if (state_.compare_exchange_strong(state, new_state)) { if (max_waiter_delta != 0) { state_.futexWake(std::numeric_limits::max(), @@ -227,9 +241,7 @@ struct TurnSequencer { /// Returns the bitmask to pass futexWait or futexWake when communicating /// about the specified turn - int futexChannel(uint32_t turn) const noexcept { - return 1 << (turn & 31); - } + int futexChannel(uint32_t turn) const noexcept { return 1 << (turn & 31); } uint32_t decodeCurrentSturn(uint32_t state) const noexcept { return state & ~kWaitersMask; @@ -240,7 +252,7 @@ struct TurnSequencer { } uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { - return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD); + return currentSturn | std::min(uint32_t{kWaitersMask}, maxWaiterD); } }; diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index 394b5838..ee09c7a4 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -35,6 +35,14 @@ FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr); using namespace folly; using namespace detail; using namespace test; +using std::chrono::time_point; +using std::chrono::steady_clock; +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::string; +using std::make_unique; +using std::unique_ptr; +using std::vector; typedef DeterministicSchedule DSched; @@ -61,7 +69,7 @@ void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) { Atom spinThreshold(0); int prev = -1; - std::vector threads(numThreads); + vector threads(numThreads); for (int i = 0; i < numThreads; ++i) { threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread, numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold), @@ -102,6 +110,12 @@ void runElementTypeTest(T&& src) { cq.blockingRead(dest); EXPECT_TRUE(cq.write(std::move(dest))); EXPECT_TRUE(cq.read(dest)); + auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1); + EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest))); + EXPECT_TRUE(cq.read(dest)); + auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1); + EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest))); + EXPECT_TRUE(cq.read(dest)); } struct RefCounted { @@ -132,9 +146,9 @@ void intrusive_ptr_release(RefCounted const* p) { TEST(MPMCQueue, lots_of_element_types) { runElementTypeTest(10); - runElementTypeTest(std::string("abc")); - runElementTypeTest(std::make_pair(10, std::string("def"))); - runElementTypeTest(std::vector{ { "abc" } }); + runElementTypeTest(string("abc")); + runElementTypeTest(std::make_pair(10, string("def"))); + runElementTypeTest(vector{{"abc"}}); runElementTypeTest(std::make_shared('a')); runElementTypeTest(folly::make_unique('a')); runElementTypeTest(boost::intrusive_ptr(new RefCounted)); @@ -237,7 +251,7 @@ void runTryEnqDeqTest(int numThreads, int numOps) { MPMCQueue cq(numThreads); uint64_t n = numOps; - std::vector threads(numThreads); + vector threads(numThreads); std::atomic sum(0); for (int t = 0; t < numThreads; ++t) { threads[t] = DSched::thread(std::bind(runTryEnqDeqThread, @@ -294,9 +308,59 @@ uint64_t nowMicro() { } template -std::string producerConsumerBench(Q&& queue, std::string qName, - int numProducers, int numConsumers, - int numOps, bool ignoreContents = false) { +struct WriteMethodCaller { + WriteMethodCaller() {} + virtual ~WriteMethodCaller() = default; + virtual bool callWrite(Q& q, int i) = 0; + virtual string methodName() = 0; +}; + +template +struct BlockingWriteCaller : public WriteMethodCaller { + bool callWrite(Q& q, int i) override { + q.blockingWrite(i); + return true; + } + string methodName() override { return "blockingWrite"; } +}; + +template +struct WriteIfNotFullCaller : public WriteMethodCaller { + bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); } + string methodName() override { return "writeIfNotFull"; } +}; + +template +struct WriteCaller : public WriteMethodCaller { + bool callWrite(Q& q, int i) override { return q.write(i); } + string methodName() override { return "write"; } +}; + +template +struct TryWriteUntilCaller : public WriteMethodCaller { + const Duration duration_; + explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {} + bool callWrite(Q& q, int i) override { + auto then = Clock::now() + duration_; + return q.tryWriteUntil(then, i); + } + string methodName() override { + return folly::sformat( + "tryWriteUntil({}ms)", + std::chrono::duration_cast(duration_).count()); + } +}; + +template +string producerConsumerBench(Q&& queue, + string qName, + int numProducers, + int numConsumers, + int numOps, + WriteMethodCaller& writer, + bool ignoreContents = false) { Q& q = queue; struct rusage beginUsage; @@ -306,17 +370,20 @@ std::string producerConsumerBench(Q&& queue, std::string qName, uint64_t n = numOps; std::atomic sum(0); + std::atomic failed(0); - std::vector producers(numProducers); + vector producers(numProducers); for (int t = 0; t < numProducers; ++t) { producers[t] = DSched::thread([&,t]{ for (int i = t; i < numOps; i += numProducers) { - q.blockingWrite(i); + while (!writer.callWrite(q, i)) { + ++failed; + } } }); } - std::vector consumers(numConsumers); + vector consumers(numConsumers); for (int t = 0; t < numConsumers; ++t) { consumers[t] = DSched::thread([&,t]{ uint64_t localSum = 0; @@ -348,27 +415,76 @@ std::string producerConsumerBench(Q&& queue, std::string qName, uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n; long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw - (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw); + uint64_t failures = failed; - return folly::format( - "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff", - qName, numProducers, numConsumers, nanosPer, csw, n).str(); + return folly::sformat( + "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} " + "handoff, {} failures", + qName, + numProducers, + writer.methodName(), + numConsumers, + nanosPer, + csw, + n, + failures); } - TEST(MPMCQueue, mt_prod_cons_deterministic) { // we use the Bench method, but perf results are meaningless under DSched DSched sched(DSched::uniform(0)); - producerConsumerBench(MPMCQueue(10), - "", 1, 1, 1000); - producerConsumerBench(MPMCQueue(100), - "", 10, 10, 1000); - producerConsumerBench(MPMCQueue(10), - "", 1, 1, 1000); - producerConsumerBench(MPMCQueue(100), - "", 10, 10, 1000); - producerConsumerBench(MPMCQueue(1), - "", 10, 10, 1000); + vector>>> + callers; + callers.emplace_back( + make_unique>>()); + callers.emplace_back( + make_unique>>()); + callers.emplace_back( + make_unique>>()); + callers.emplace_back( + make_unique>>( + milliseconds(1))); + callers.emplace_back( + make_unique>>( + seconds(2))); + + for (const auto& caller : callers) { + LOG(INFO) + << producerConsumerBench(MPMCQueue(10), + "MPMCQueue(10)", + 1, + 1, + 1000, + *caller); + LOG(INFO) + << producerConsumerBench(MPMCQueue(100), + "MPMCQueue(100)", + 10, + 10, + 1000, + *caller); + LOG(INFO) + << producerConsumerBench(MPMCQueue(10), + "MPMCQueue(10)", + 1, + 1, + 1000, + *caller); + LOG(INFO) + << producerConsumerBench(MPMCQueue(100), + "MPMCQueue(100)", + 10, + 10, + 1000, + *caller); + LOG(INFO) << producerConsumerBench(MPMCQueue(1), + "MPMCQueue(1)", + 10, + 10, + 1000, + *caller); + } } #define PC_BENCH(q, np, nc, ...) \ @@ -376,38 +492,71 @@ TEST(MPMCQueue, mt_prod_cons_deterministic) { TEST(MPMCQueue, mt_prod_cons) { int n = 100000; - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(100000), 32, 100, n); + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back( + make_unique>>(milliseconds(1))); + callers.emplace_back( + make_unique>>(seconds(2))); + for (const auto& caller : callers) { + LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH(MPMCQueue(100000), 32, 100, n, *caller); + } } TEST(MPMCQueue, mt_prod_cons_emulated_futex) { int n = 100000; - LOG(INFO) << PC_BENCH((MPMCQueue(10)), 1, 1, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10)), 10, 1, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10)), 1, 10, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10)), 10, 10, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 1, 1, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 10, 1, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 1, 10, n); - LOG(INFO) << PC_BENCH((MPMCQueue(10000)), 10, 10, n); - LOG(INFO) - << PC_BENCH((MPMCQueue(100000)), 32, 100, n); -} - -template class Atom> -void runNeverFailThread( - int numThreads, - int n, /*numOps*/ - MPMCQueue& cq, - std::atomic& sum, - int t) { + vector>>> + callers; + callers.emplace_back( + make_unique>>()); + callers.emplace_back( + make_unique>>()); + callers.emplace_back( + make_unique>>()); + callers.emplace_back( + make_unique>>( + milliseconds(1))); + callers.emplace_back( + make_unique>>( + seconds(2))); + for (const auto& caller : callers) { + LOG(INFO) << PC_BENCH( + (MPMCQueue(10)), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10)), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10000)), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10000)), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10000)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(10000)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH( + (MPMCQueue(100000)), 32, 100, n, *caller); + } +} + +template