From fff701bfc7db2f23becae9da222a63a7ad86177b Mon Sep 17 00:00:00 2001 From: Delyan Kratunov Date: Thu, 28 May 2015 17:30:19 -0700 Subject: [PATCH] LockFreeRingBuffer Summary: This introduces a lock-free ring buffer with the following expected semantics: 1. Writers can't block on readers 2. Writers are mostly lock-free 3. Readers can detect if they're being too slow 4. Be usable on Android (TBD but should work as-is with the armeabi-v7a ABI; armeabi (ARMv5) support is explicitly a non-goal) Non-goals: 1. Match MPMCQueue in level of optimization. There's no need for that yet. Test Plan: iloveunittests Reviewed By: ngbronson@fb.com Subscribers: trunkagent, folly-diffs@, yfeldblum, chalfant FB internal diff: D2037718 Signature: t1:2037718:1432850250:c57963510d8cda58edc006f4c3260f5ac34d4996 --- folly/Makefile.am | 1 + folly/detail/TurnSequencer.h | 22 +- folly/experimental/LockFreeRingBuffer.h | 215 ++++++++++++++++ .../test/LockFreeRingBufferTest.cpp | 229 ++++++++++++++++++ 4 files changed, 464 insertions(+), 3 deletions(-) create mode 100644 folly/experimental/LockFreeRingBuffer.h create mode 100644 folly/experimental/test/LockFreeRingBufferTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index f5546b56..d2377fa5 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -104,6 +104,7 @@ nobase_follyinclude_HEADERS = \ experimental/FutureDAG.h \ experimental/io/FsUtil.h \ experimental/JSONSchema.h \ + experimental/LockFreeRingBuffer.h \ experimental/Select64.h \ experimental/SharedMutex.h \ experimental/StringKeyedCommon.h \ diff --git a/folly/detail/TurnSequencer.h b/folly/detail/TurnSequencer.h index aafeaa16..42bb7d01 100644 --- a/folly/detail/TurnSequencer.h +++ b/folly/detail/TurnSequencer.h @@ -78,6 +78,16 @@ struct TurnSequencer { return decodeCurrentSturn(state) == (turn << kTurnShift); } + /// See tryWaitForTurn + /// Requires that `turn` is not a turn in the past. + void waitForTurn(const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff) noexcept { + bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff); + (void) success; + assert(success); + } + // Internally we always work with shifted turn values, which makes the // truncation and wraparound work correctly. This leaves us bits at // the bottom to store the number of waiters. We call shifted turns @@ -87,7 +97,8 @@ struct TurnSequencer { /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries /// before blocking and will adjust spinCutoff based on the results, /// otherwise it will spin for at most spinCutoff spins. - void waitForTurn(const uint32_t turn, + /// Returns true if the wait succeeded, false if the turn is in the past + bool tryWaitForTurn(const uint32_t turn, Atom& spinCutoff, const bool updateSpinCutoff) noexcept { uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); @@ -103,8 +114,11 @@ struct TurnSequencer { break; } - // wrap-safe version of assert(current_sturn < sturn) - assert(sturn - current_sturn < std::numeric_limits::max() / 2); + // wrap-safe version of (current_sturn >= sturn) + if(sturn - current_sturn >= std::numeric_limits::max() / 2) { + // turn is in the past + return false; + } // the first effectSpinCutoff tries are spins, after that we will // record ourself as a waiter and block with futexWait @@ -154,6 +168,8 @@ struct TurnSequencer { prevThresh, prevThresh + int(target - prevThresh) / 8); } } + + return true; } /// Unblocks a thread running waitForTurn(turn + 1) diff --git a/folly/experimental/LockFreeRingBuffer.h b/folly/experimental/LockFreeRingBuffer.h new file mode 100644 index 00000000..87ed084e --- /dev/null +++ b/folly/experimental/LockFreeRingBuffer.h @@ -0,0 +1,215 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace folly { +namespace detail { + +template class Atom> +class RingBufferSlot; +} // namespace detail + +/// LockFreeRingBuffer is a fixed-size, concurrent ring buffer with the +/// following semantics: +/// +/// 1. Writers cannot block on other writers UNLESS they are writes +/// apart from each other (writing to the same slot after a wrap-around) +/// 2. Writers cannot block on readers +/// 3. Readers can wait for writes that haven't occurred yet +/// 4. Readers can detect if they are lagging behind +/// +/// In this sense, reads from this buffer are best-effort but writes +/// are guaranteed. +/// +/// Another way to think about this is as an unbounded stream of writes. The +/// buffer contains the last writes but readers can attempt to read +/// any part of the stream, even outside this window. The read API takes a +/// Cursor that can point anywhere in this stream of writes. Reads from the +/// "future" can optionally block but reads from the "past" will always fail. +/// + +template class Atom = std::atomic> +class LockFreeRingBuffer: boost::noncopyable { + + static_assert(std::is_nothrow_default_constructible::value, + "Element type must be nothrow default constructible"); + + static_assert(FOLLY_IS_TRIVIALLY_COPYABLE(T), + "Element type must be trivially copyable"); + +public: + /// Opaque pointer to a past or future write. + /// Can be moved relative to its current location but not in absolute terms. + struct Cursor { + explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {} + + void moveForward(uint64_t steps = 1) noexcept { + ticket += steps; + } + + void moveBackward(uint64_t steps = 1) noexcept { + if (steps > ticket) { + ticket = 0; + } else { + ticket -= steps; + } + } + + protected: // for test visibility reasons + uint64_t ticket; + friend class LockFreeRingBuffer; + }; + + explicit LockFreeRingBuffer(size_t capacity) noexcept + : capacity_(capacity) + , slots_(new detail::RingBufferSlot[capacity]) + , ticket_(0) + {} + + /// Perform a single write of an object of type T. + /// Writes can block iff a previous writer has not yet completed a write + /// for the same slot (before the most recent wrap-around). + void write(T& value) noexcept { + uint64_t ticket = ticket_.fetch_add(1); + slots_[idx(ticket)].write(turn(ticket), value); + } + + /// Read the value at the cursor. + /// Returns true if the read succeeded, false otherwise. If the return + /// value is false, dest is to be considered partially read and in an + /// inconsistent state. Readers are advised to discard it. + bool tryRead(T& dest, const Cursor& cursor) noexcept { + return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket)); + } + + /// Read the value at the cursor or block if the write has not occurred yet. + /// Returns true if the read succeeded, false otherwise. If the return + /// value is false, dest is to be considered partially read and in an + /// inconsistent state. Readers are advised to discard it. + bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept { + return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket)); + } + + /// Returns a Cursor pointing to the first write that has not occurred yet. + Cursor currentHead() noexcept { + return Cursor(ticket_.load()); + } + + /// Returns a Cursor pointing to a currently readable write. + /// skipFraction is a value in the [0, 1] range indicating how far into the + /// currently readable window to place the cursor. 0 means the + /// earliest readable write, 1 means the latest readable write (if any). + Cursor currentTail(double skipFraction = 0.0) noexcept { + assert(skipFraction >= 0.0 && skipFraction <= 1.0); + uint64_t ticket = ticket_.load(); + + uint64_t backStep = std::llround((1.0 - skipFraction) * capacity_); + + // always try to move at least one step backward to something readable + backStep = std::max(1, backStep); + + // can't go back more steps than we've taken + backStep = std::min(ticket, backStep); + + return Cursor(ticket - backStep); + } + + ~LockFreeRingBuffer() { + } + +private: + const size_t capacity_; + + const std::unique_ptr[]> slots_; + + Atom ticket_; + + uint32_t idx(uint64_t ticket) noexcept { + return ticket % capacity_; + } + + uint32_t turn(uint64_t ticket) noexcept { + return (ticket / capacity_); + } +}; // LockFreeRingBuffer + +namespace detail { +template class Atom> +class RingBufferSlot { +public: + explicit RingBufferSlot() noexcept + : sequencer_() + , data() + { + } + + void write(const uint32_t turn, T& value) noexcept { + Atom cutoff(0); + sequencer_.waitForTurn(turn * 2, cutoff, false); + + // Change to an odd-numbered turn to indicate write in process + sequencer_.completeTurn(turn * 2); + + data = std::move(value); + sequencer_.completeTurn(turn * 2 + 1); + // At (turn + 1) * 2 + } + + bool waitAndTryRead(T& dest, uint32_t turn) noexcept { + uint32_t desired_turn = (turn + 1) * 2; + Atom cutoff(0); + if(!sequencer_.tryWaitForTurn(desired_turn, cutoff, false)) { + return false; + } + memcpy(&dest, &data, sizeof(T)); + + // if it's still the same turn, we read the value successfully + return sequencer_.isTurn(desired_turn); + } + + bool tryRead(T& dest, uint32_t turn) noexcept { + // The write that started at turn 0 ended at turn 2 + if (!sequencer_.isTurn((turn + 1) * 2)) { + return false; + } + memcpy(&dest, &data, sizeof(T)); + + // if it's still the same turn, we read the value successfully + return sequencer_.isTurn((turn + 1) * 2); + } + + +private: + TurnSequencer sequencer_; + T data; +}; // RingBufferSlot + +} // namespace detail + +} // namespace folly diff --git a/folly/experimental/test/LockFreeRingBufferTest.cpp b/folly/experimental/test/LockFreeRingBufferTest.cpp new file mode 100644 index 00000000..30fb074e --- /dev/null +++ b/folly/experimental/test/LockFreeRingBufferTest.cpp @@ -0,0 +1,229 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include + +namespace folly { + +TEST(LockFreeRingBuffer, writeReadSequentially) { + const int capacity = 256; + const int turns = 4; + + LockFreeRingBuffer rb(capacity); + LockFreeRingBuffer::Cursor cur = rb.currentHead(); + for (unsigned int turn = 0; turn < turns; turn++) { + for (unsigned int write = 0; write < capacity; write++) { + int val = turn*capacity + write; + rb.write(val); + } + + for (unsigned int write = 0; write < capacity; write++) { + int dest = 0; + ASSERT_TRUE(rb.tryRead(dest, cur)); + ASSERT_EQ(turn*capacity + write, dest); + cur.moveForward(); + } + } +} + +TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) { + const int capacity = 256; + const int turns = 4; + + LockFreeRingBuffer rb(capacity); + for (unsigned int turn = 0; turn < turns; turn++) { + for (unsigned int write = 0; write < capacity; write++) { + int val = turn*capacity + write; + rb.write(val); + } + + LockFreeRingBuffer::Cursor cur = rb.currentHead(); + cur.moveBackward(1); /// last write + for (int write = capacity - 1; write >= 0; write--) { + int foo = 0; + ASSERT_TRUE(rb.tryRead(foo, cur)); + ASSERT_EQ(turn*capacity + write, foo); + cur.moveBackward(); + } + } +} + +TEST(LockFreeRingBuffer, readsCanBlock) { + // Start a reader thread, confirm that reading can block + std::atomic readerHasRun(false); + LockFreeRingBuffer rb(1); + auto cursor = rb.currentHead(); + cursor.moveForward(3); // wait for the 4th write + + const int sentinel = 0xfaceb00c; + + auto reader = std::thread([&]() { + int val = 0; + EXPECT_TRUE(rb.waitAndTryRead(val, cursor)); + readerHasRun = true; + EXPECT_EQ(sentinel, val); + }); + + for (int i = 0; i < 4; i++) { + EXPECT_FALSE(readerHasRun); + int val = sentinel; + rb.write(val); + } + reader.join(); + EXPECT_TRUE(readerHasRun); +} + +// expose the cursor raw value via a wrapper type +template class Atom> +uint64_t value(const typename LockFreeRingBuffer::Cursor&& rbcursor) { + typedef typename LockFreeRingBuffer::Cursor RBCursor; + + RBCursor cursor = std::move(rbcursor); + + struct ExposedCursor : RBCursor { + ExposedCursor(const RBCursor& cursor): RBCursor(cursor) {} + uint64_t value(){ + return this->ticket; + } + }; + return ExposedCursor(cursor).value(); +} + +template class Atom> +void runReader( + LockFreeRingBuffer& rb, std::atomic& writes +) { + int32_t idx; + while ((idx = writes--) > 0) { + rb.write(idx); + } +} + +template class Atom> +void runWritesNeverFail( + int capacity, int writes, int writers +) { + using folly::test::DeterministicSchedule; + + DeterministicSchedule sched(DeterministicSchedule::uniform(0)); + LockFreeRingBuffer rb(capacity); + + std::atomic writes_remaining(writes); + std::vector threads(writers); + + for (int i = 0; i < writers; i++) { + threads[i] = DeterministicSchedule::thread( + std::bind(runReader, std::ref(rb), std::ref(writes_remaining)) + ); + } + + for (auto& thread : threads) { + DeterministicSchedule::join(thread); + } + + EXPECT_EQ(writes, (value)(rb.currentHead())); +} + +TEST(LockFreeRingBuffer, writesNeverFail) { + using folly::test::DeterministicAtomic; + using folly::detail::EmulatedFutexAtomic; + + runWritesNeverFail(1, 100, 4); + runWritesNeverFail(10, 100, 4); + runWritesNeverFail(100, 1000, 8); + runWritesNeverFail(1000, 10000, 16); + + runWritesNeverFail(1, 100, 4); + runWritesNeverFail(10, 100, 4); + runWritesNeverFail(100, 1000, 8); + runWritesNeverFail(1000, 10000, 16); + + runWritesNeverFail(1, 100, 4); + runWritesNeverFail(10, 100, 4); + runWritesNeverFail(100, 1000, 8); + runWritesNeverFail(1000, 10000, 16); +} + +TEST(LockFreeRingBuffer, readerCanDetectSkips) { + const int capacity = 4; + const int rounds = 4; + + LockFreeRingBuffer rb(capacity); + auto cursor = rb.currentHead(); + cursor.moveForward(1); + + for (int round = 0; round < rounds; round++) { + for (int i = 0; i < capacity; i++) { + int val = round * capacity + i; + rb.write(val); + } + } + + int result = -1; + EXPECT_FALSE(rb.tryRead(result, cursor)); + EXPECT_FALSE(rb.waitAndTryRead(result, cursor)); + EXPECT_EQ(-1, result); + + cursor = rb.currentTail(); + EXPECT_TRUE(rb.tryRead(result, cursor)); + EXPECT_EQ(capacity * (rounds - 1), result); + + cursor = rb.currentTail(1.0); + EXPECT_TRUE(rb.tryRead(result, cursor)); + EXPECT_EQ((capacity * rounds) - 1, result); +} + + +TEST(LockFreeRingBuffer, currentTailRange) { + const int capacity = 4; + LockFreeRingBuffer rb(capacity); + + // Workaround for template deduction failure + auto (&cursorValue)(value); + + // Empty buffer - everything points to 0 + EXPECT_EQ(0, cursorValue(rb.currentTail(0))); + EXPECT_EQ(0, cursorValue(rb.currentTail(0.5))); + EXPECT_EQ(0, cursorValue(rb.currentTail(1))); + + // Half-full + int val = 5; + rb.write(val); + rb.write(val); + + EXPECT_EQ(0, cursorValue(rb.currentTail(0))); + EXPECT_EQ(1, cursorValue(rb.currentTail(1))); + + // Full + rb.write(val); + rb.write(val); + + EXPECT_EQ(0, cursorValue(rb.currentTail(0))); + EXPECT_EQ(3, cursorValue(rb.currentTail(1))); + + auto midvalue = cursorValue(rb.currentTail(0.5)); + // both rounding behaviours are acceptable + EXPECT_TRUE(midvalue == 1 || midvalue == 2); +} + +} // namespace folly -- 2.34.1