experimental/FutureDAG.h \
experimental/io/FsUtil.h \
experimental/JSONSchema.h \
+ experimental/LockFreeRingBuffer.h \
experimental/Select64.h \
experimental/SharedMutex.h \
experimental/StringKeyedCommon.h \
return decodeCurrentSturn(state) == (turn << kTurnShift);
}
+ /// See tryWaitForTurn
+ /// Requires that `turn` is not a turn in the past.
+ void waitForTurn(const uint32_t turn,
+ Atom<uint32_t>& spinCutoff,
+ const bool updateSpinCutoff) noexcept {
+ bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
+ (void) success;
+ assert(success);
+ }
+
// Internally we always work with shifted turn values, which makes the
// truncation and wraparound work correctly. This leaves us bits at
// the bottom to store the number of waiters. We call shifted turns
/// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
/// before blocking and will adjust spinCutoff based on the results,
/// otherwise it will spin for at most spinCutoff spins.
- void waitForTurn(const uint32_t turn,
+ /// Returns true if the wait succeeded, false if the turn is in the past
+ bool tryWaitForTurn(const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff) noexcept {
uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
break;
}
- // wrap-safe version of assert(current_sturn < sturn)
- assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
+ // wrap-safe version of (current_sturn >= sturn)
+ if(sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) {
+ // turn is in the past
+ return false;
+ }
// the first effectSpinCutoff tries are spins, after that we will
// record ourself as a waiter and block with futexWait
prevThresh, prevThresh + int(target - prevThresh) / 8);
}
}
+
+ return true;
}
/// Unblocks a thread running waitForTurn(turn + 1)
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <cmath>
+#include <string.h>
+#include <type_traits>
+#include <unistd.h>
+
+#include <folly/detail/TurnSequencer.h>
+#include <folly/Portability.h>
+
+namespace folly {
+namespace detail {
+
+template<typename T,
+ template<typename> class Atom>
+class RingBufferSlot;
+} // namespace detail
+
+/// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
+/// following semantics:
+///
+/// 1. Writers cannot block on other writers UNLESS they are <capacity> writes
+/// apart from each other (writing to the same slot after a wrap-around)
+/// 2. Writers cannot block on readers
+/// 3. Readers can wait for writes that haven't occurred yet
+/// 4. Readers can detect if they are lagging behind
+///
+/// In this sense, reads from this buffer are best-effort but writes
+/// are guaranteed.
+///
+/// Another way to think about this is as an unbounded stream of writes. The
+/// buffer contains the last <capacity> writes but readers can attempt to read
+/// any part of the stream, even outside this window. The read API takes a
+/// Cursor that can point anywhere in this stream of writes. Reads from the
+/// "future" can optionally block but reads from the "past" will always fail.
+///
+
+template<typename T, template<typename> class Atom = std::atomic>
+class LockFreeRingBuffer: boost::noncopyable {
+
+ static_assert(std::is_nothrow_default_constructible<T>::value,
+ "Element type must be nothrow default constructible");
+
+ static_assert(FOLLY_IS_TRIVIALLY_COPYABLE(T),
+ "Element type must be trivially copyable");
+
+public:
+ /// Opaque pointer to a past or future write.
+ /// Can be moved relative to its current location but not in absolute terms.
+ struct Cursor {
+ explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
+
+ void moveForward(uint64_t steps = 1) noexcept {
+ ticket += steps;
+ }
+
+ void moveBackward(uint64_t steps = 1) noexcept {
+ if (steps > ticket) {
+ ticket = 0;
+ } else {
+ ticket -= steps;
+ }
+ }
+
+ protected: // for test visibility reasons
+ uint64_t ticket;
+ friend class LockFreeRingBuffer;
+ };
+
+ explicit LockFreeRingBuffer(size_t capacity) noexcept
+ : capacity_(capacity)
+ , slots_(new detail::RingBufferSlot<T,Atom>[capacity])
+ , ticket_(0)
+ {}
+
+ /// Perform a single write of an object of type T.
+ /// Writes can block iff a previous writer has not yet completed a write
+ /// for the same slot (before the most recent wrap-around).
+ void write(T& value) noexcept {
+ uint64_t ticket = ticket_.fetch_add(1);
+ slots_[idx(ticket)].write(turn(ticket), value);
+ }
+
+ /// Read the value at the cursor.
+ /// Returns true if the read succeeded, false otherwise. If the return
+ /// value is false, dest is to be considered partially read and in an
+ /// inconsistent state. Readers are advised to discard it.
+ bool tryRead(T& dest, const Cursor& cursor) noexcept {
+ return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
+ }
+
+ /// Read the value at the cursor or block if the write has not occurred yet.
+ /// Returns true if the read succeeded, false otherwise. If the return
+ /// value is false, dest is to be considered partially read and in an
+ /// inconsistent state. Readers are advised to discard it.
+ bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
+ return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
+ }
+
+ /// Returns a Cursor pointing to the first write that has not occurred yet.
+ Cursor currentHead() noexcept {
+ return Cursor(ticket_.load());
+ }
+
+ /// Returns a Cursor pointing to a currently readable write.
+ /// skipFraction is a value in the [0, 1] range indicating how far into the
+ /// currently readable window to place the cursor. 0 means the
+ /// earliest readable write, 1 means the latest readable write (if any).
+ Cursor currentTail(double skipFraction = 0.0) noexcept {
+ assert(skipFraction >= 0.0 && skipFraction <= 1.0);
+ uint64_t ticket = ticket_.load();
+
+ uint64_t backStep = std::llround((1.0 - skipFraction) * capacity_);
+
+ // always try to move at least one step backward to something readable
+ backStep = std::max<uint64_t>(1, backStep);
+
+ // can't go back more steps than we've taken
+ backStep = std::min(ticket, backStep);
+
+ return Cursor(ticket - backStep);
+ }
+
+ ~LockFreeRingBuffer() {
+ }
+
+private:
+ const size_t capacity_;
+
+ const std::unique_ptr<detail::RingBufferSlot<T,Atom>[]> slots_;
+
+ Atom<uint64_t> ticket_;
+
+ uint32_t idx(uint64_t ticket) noexcept {
+ return ticket % capacity_;
+ }
+
+ uint32_t turn(uint64_t ticket) noexcept {
+ return (ticket / capacity_);
+ }
+}; // LockFreeRingBuffer
+
+namespace detail {
+template<typename T, template<typename> class Atom>
+class RingBufferSlot {
+public:
+ explicit RingBufferSlot() noexcept
+ : sequencer_()
+ , data()
+ {
+ }
+
+ void write(const uint32_t turn, T& value) noexcept {
+ Atom<uint32_t> cutoff(0);
+ sequencer_.waitForTurn(turn * 2, cutoff, false);
+
+ // Change to an odd-numbered turn to indicate write in process
+ sequencer_.completeTurn(turn * 2);
+
+ data = std::move(value);
+ sequencer_.completeTurn(turn * 2 + 1);
+ // At (turn + 1) * 2
+ }
+
+ bool waitAndTryRead(T& dest, uint32_t turn) noexcept {
+ uint32_t desired_turn = (turn + 1) * 2;
+ Atom<uint32_t> cutoff(0);
+ if(!sequencer_.tryWaitForTurn(desired_turn, cutoff, false)) {
+ return false;
+ }
+ memcpy(&dest, &data, sizeof(T));
+
+ // if it's still the same turn, we read the value successfully
+ return sequencer_.isTurn(desired_turn);
+ }
+
+ bool tryRead(T& dest, uint32_t turn) noexcept {
+ // The write that started at turn 0 ended at turn 2
+ if (!sequencer_.isTurn((turn + 1) * 2)) {
+ return false;
+ }
+ memcpy(&dest, &data, sizeof(T));
+
+ // if it's still the same turn, we read the value successfully
+ return sequencer_.isTurn((turn + 1) * 2);
+ }
+
+
+private:
+ TurnSequencer<Atom> sequencer_;
+ T data;
+}; // RingBufferSlot
+
+} // namespace detail
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <iostream>
+#include <thread>
+
+#include <folly/detail/Futex.h>
+#include <folly/experimental/LockFreeRingBuffer.h>
+#include <folly/test/DeterministicSchedule.h>
+
+namespace folly {
+
+TEST(LockFreeRingBuffer, writeReadSequentially) {
+ const int capacity = 256;
+ const int turns = 4;
+
+ LockFreeRingBuffer<int> rb(capacity);
+ LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
+ for (unsigned int turn = 0; turn < turns; turn++) {
+ for (unsigned int write = 0; write < capacity; write++) {
+ int val = turn*capacity + write;
+ rb.write(val);
+ }
+
+ for (unsigned int write = 0; write < capacity; write++) {
+ int dest = 0;
+ ASSERT_TRUE(rb.tryRead(dest, cur));
+ ASSERT_EQ(turn*capacity + write, dest);
+ cur.moveForward();
+ }
+ }
+}
+
+TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) {
+ const int capacity = 256;
+ const int turns = 4;
+
+ LockFreeRingBuffer<int> rb(capacity);
+ for (unsigned int turn = 0; turn < turns; turn++) {
+ for (unsigned int write = 0; write < capacity; write++) {
+ int val = turn*capacity + write;
+ rb.write(val);
+ }
+
+ LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
+ cur.moveBackward(1); /// last write
+ for (int write = capacity - 1; write >= 0; write--) {
+ int foo = 0;
+ ASSERT_TRUE(rb.tryRead(foo, cur));
+ ASSERT_EQ(turn*capacity + write, foo);
+ cur.moveBackward();
+ }
+ }
+}
+
+TEST(LockFreeRingBuffer, readsCanBlock) {
+ // Start a reader thread, confirm that reading can block
+ std::atomic<bool> readerHasRun(false);
+ LockFreeRingBuffer<int> rb(1);
+ auto cursor = rb.currentHead();
+ cursor.moveForward(3); // wait for the 4th write
+
+ const int sentinel = 0xfaceb00c;
+
+ auto reader = std::thread([&]() {
+ int val = 0;
+ EXPECT_TRUE(rb.waitAndTryRead(val, cursor));
+ readerHasRun = true;
+ EXPECT_EQ(sentinel, val);
+ });
+
+ for (int i = 0; i < 4; i++) {
+ EXPECT_FALSE(readerHasRun);
+ int val = sentinel;
+ rb.write(val);
+ }
+ reader.join();
+ EXPECT_TRUE(readerHasRun);
+}
+
+// expose the cursor raw value via a wrapper type
+template<typename T, template<typename> class Atom>
+uint64_t value(const typename LockFreeRingBuffer<T, Atom>::Cursor&& rbcursor) {
+ typedef typename LockFreeRingBuffer<T,Atom>::Cursor RBCursor;
+
+ RBCursor cursor = std::move(rbcursor);
+
+ struct ExposedCursor : RBCursor {
+ ExposedCursor(const RBCursor& cursor): RBCursor(cursor) {}
+ uint64_t value(){
+ return this->ticket;
+ }
+ };
+ return ExposedCursor(cursor).value();
+}
+
+template<template<typename> class Atom>
+void runReader(
+ LockFreeRingBuffer<int, Atom>& rb, std::atomic<int32_t>& writes
+) {
+ int32_t idx;
+ while ((idx = writes--) > 0) {
+ rb.write(idx);
+ }
+}
+
+template<template<typename> class Atom>
+void runWritesNeverFail(
+ int capacity, int writes, int writers
+) {
+ using folly::test::DeterministicSchedule;
+
+ DeterministicSchedule sched(DeterministicSchedule::uniform(0));
+ LockFreeRingBuffer<int, Atom> rb(capacity);
+
+ std::atomic<int32_t> writes_remaining(writes);
+ std::vector<std::thread> threads(writers);
+
+ for (int i = 0; i < writers; i++) {
+ threads[i] = DeterministicSchedule::thread(
+ std::bind(runReader<Atom>, std::ref(rb), std::ref(writes_remaining))
+ );
+ }
+
+ for (auto& thread : threads) {
+ DeterministicSchedule::join(thread);
+ }
+
+ EXPECT_EQ(writes, (value<int, Atom>)(rb.currentHead()));
+}
+
+TEST(LockFreeRingBuffer, writesNeverFail) {
+ using folly::test::DeterministicAtomic;
+ using folly::detail::EmulatedFutexAtomic;
+
+ runWritesNeverFail<DeterministicAtomic>(1, 100, 4);
+ runWritesNeverFail<DeterministicAtomic>(10, 100, 4);
+ runWritesNeverFail<DeterministicAtomic>(100, 1000, 8);
+ runWritesNeverFail<DeterministicAtomic>(1000, 10000, 16);
+
+ runWritesNeverFail<std::atomic>(1, 100, 4);
+ runWritesNeverFail<std::atomic>(10, 100, 4);
+ runWritesNeverFail<std::atomic>(100, 1000, 8);
+ runWritesNeverFail<std::atomic>(1000, 10000, 16);
+
+ runWritesNeverFail<EmulatedFutexAtomic>(1, 100, 4);
+ runWritesNeverFail<EmulatedFutexAtomic>(10, 100, 4);
+ runWritesNeverFail<EmulatedFutexAtomic>(100, 1000, 8);
+ runWritesNeverFail<EmulatedFutexAtomic>(1000, 10000, 16);
+}
+
+TEST(LockFreeRingBuffer, readerCanDetectSkips) {
+ const int capacity = 4;
+ const int rounds = 4;
+
+ LockFreeRingBuffer<int> rb(capacity);
+ auto cursor = rb.currentHead();
+ cursor.moveForward(1);
+
+ for (int round = 0; round < rounds; round++) {
+ for (int i = 0; i < capacity; i++) {
+ int val = round * capacity + i;
+ rb.write(val);
+ }
+ }
+
+ int result = -1;
+ EXPECT_FALSE(rb.tryRead(result, cursor));
+ EXPECT_FALSE(rb.waitAndTryRead(result, cursor));
+ EXPECT_EQ(-1, result);
+
+ cursor = rb.currentTail();
+ EXPECT_TRUE(rb.tryRead(result, cursor));
+ EXPECT_EQ(capacity * (rounds - 1), result);
+
+ cursor = rb.currentTail(1.0);
+ EXPECT_TRUE(rb.tryRead(result, cursor));
+ EXPECT_EQ((capacity * rounds) - 1, result);
+}
+
+
+TEST(LockFreeRingBuffer, currentTailRange) {
+ const int capacity = 4;
+ LockFreeRingBuffer<int> rb(capacity);
+
+ // Workaround for template deduction failure
+ auto (&cursorValue)(value<int, std::atomic>);
+
+ // Empty buffer - everything points to 0
+ EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
+ EXPECT_EQ(0, cursorValue(rb.currentTail(0.5)));
+ EXPECT_EQ(0, cursorValue(rb.currentTail(1)));
+
+ // Half-full
+ int val = 5;
+ rb.write(val);
+ rb.write(val);
+
+ EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
+ EXPECT_EQ(1, cursorValue(rb.currentTail(1)));
+
+ // Full
+ rb.write(val);
+ rb.write(val);
+
+ EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
+ EXPECT_EQ(3, cursorValue(rb.currentTail(1)));
+
+ auto midvalue = cursorValue(rb.currentTail(0.5));
+ // both rounding behaviours are acceptable
+ EXPECT_TRUE(midvalue == 1 || midvalue == 2);
+}
+
+} // namespace folly