LockFreeRingBuffer
authorDelyan Kratunov <delyank@fb.com>
Fri, 29 May 2015 00:30:19 +0000 (17:30 -0700)
committerNoam Lerner <noamler@fb.com>
Wed, 3 Jun 2015 16:50:26 +0000 (09:50 -0700)
Summary:
This introduces a lock-free ring buffer with the following expected semantics:

1. Writers can't block on readers
2. Writers are mostly lock-free
3. Readers can detect if they're being too slow
4. Be usable on Android (TBD but should work as-is with the armeabi-v7a ABI; armeabi (ARMv5) support is explicitly a non-goal)

Non-goals:
1. Match MPMCQueue in level of optimization. There's no need for that yet.

Test Plan: iloveunittests

Reviewed By: ngbronson@fb.com

Subscribers: trunkagent, folly-diffs@, yfeldblum, chalfant

FB internal diff: D2037718

Signature: t1:2037718:1432850250:c57963510d8cda58edc006f4c3260f5ac34d4996

folly/Makefile.am
folly/detail/TurnSequencer.h
folly/experimental/LockFreeRingBuffer.h [new file with mode: 0644]
folly/experimental/test/LockFreeRingBufferTest.cpp [new file with mode: 0644]

index f5546b562a20834b59947ff91d19d5ed0a0b60fe..d2377fa5e237652f650314dd15095d01ccef8df7 100644 (file)
@@ -104,6 +104,7 @@ nobase_follyinclude_HEADERS = \
        experimental/FutureDAG.h \
        experimental/io/FsUtil.h \
        experimental/JSONSchema.h \
+       experimental/LockFreeRingBuffer.h \
        experimental/Select64.h \
        experimental/SharedMutex.h \
        experimental/StringKeyedCommon.h \
index aafeaa16879e7950cb10450b86ed0e3d8623d9eb..42bb7d01ffdc6d960e017e28e76d7466005c8d5f 100644 (file)
@@ -78,6 +78,16 @@ struct TurnSequencer {
     return decodeCurrentSturn(state) == (turn << kTurnShift);
   }
 
+  /// See tryWaitForTurn
+  /// Requires that `turn` is not a turn in the past.
+  void waitForTurn(const uint32_t turn,
+                Atom<uint32_t>& spinCutoff,
+                const bool updateSpinCutoff) noexcept {
+    bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
+    (void) success;
+    assert(success);
+  }
+
   // Internally we always work with shifted turn values, which makes the
   // truncation and wraparound work correctly.  This leaves us bits at
   // the bottom to store the number of waiters.  We call shifted turns
@@ -87,7 +97,8 @@ struct TurnSequencer {
   /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
   /// before blocking and will adjust spinCutoff based on the results,
   /// otherwise it will spin for at most spinCutoff spins.
-  void waitForTurn(const uint32_t turn,
+  /// Returns true if the wait succeeded, false if the turn is in the past
+  bool tryWaitForTurn(const uint32_t turn,
                    Atom<uint32_t>& spinCutoff,
                    const bool updateSpinCutoff) noexcept {
     uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
@@ -103,8 +114,11 @@ struct TurnSequencer {
         break;
       }
 
-      // wrap-safe version of assert(current_sturn < sturn)
-      assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
+      // wrap-safe version of (current_sturn >= sturn)
+      if(sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) {
+        // turn is in the past
+        return false;
+      }
 
       // the first effectSpinCutoff tries are spins, after that we will
       // record ourself as a waiter and block with futexWait
@@ -154,6 +168,8 @@ struct TurnSequencer {
             prevThresh, prevThresh + int(target - prevThresh) / 8);
       }
     }
+
+    return true;
   }
 
   /// Unblocks a thread running waitForTurn(turn + 1)
diff --git a/folly/experimental/LockFreeRingBuffer.h b/folly/experimental/LockFreeRingBuffer.h
new file mode 100644 (file)
index 0000000..87ed084
--- /dev/null
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <cmath>
+#include <string.h>
+#include <type_traits>
+#include <unistd.h>
+
+#include <folly/detail/TurnSequencer.h>
+#include <folly/Portability.h>
+
+namespace folly {
+namespace detail {
+
+template<typename T,
+         template<typename> class Atom>
+class RingBufferSlot;
+} // namespace detail
+
+/// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
+/// following semantics:
+///
+///  1. Writers cannot block on other writers UNLESS they are <capacity> writes
+///     apart from each other (writing to the same slot after a wrap-around)
+///  2. Writers cannot block on readers
+///  3. Readers can wait for writes that haven't occurred yet
+///  4. Readers can detect if they are lagging behind
+///
+/// In this sense, reads from this buffer are best-effort but writes
+/// are guaranteed.
+///
+/// Another way to think about this is as an unbounded stream of writes. The
+/// buffer contains the last <capacity> writes but readers can attempt to read
+/// any part of the stream, even outside this window. The read API takes a
+/// Cursor that can point anywhere in this stream of writes. Reads from the
+/// "future" can optionally block but reads from the "past" will always fail.
+///
+
+template<typename T, template<typename> class Atom = std::atomic>
+class LockFreeRingBuffer: boost::noncopyable {
+
+   static_assert(std::is_nothrow_default_constructible<T>::value,
+       "Element type must be nothrow default constructible");
+
+   static_assert(FOLLY_IS_TRIVIALLY_COPYABLE(T),
+       "Element type must be trivially copyable");
+
+public:
+  /// Opaque pointer to a past or future write.
+  /// Can be moved relative to its current location but not in absolute terms.
+  struct Cursor {
+    explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
+
+    void moveForward(uint64_t steps = 1) noexcept {
+      ticket += steps;
+    }
+
+    void moveBackward(uint64_t steps = 1) noexcept {
+      if (steps > ticket) {
+        ticket = 0;
+      } else {
+        ticket -= steps;
+      }
+    }
+
+  protected: // for test visibility reasons
+    uint64_t ticket;
+    friend class LockFreeRingBuffer;
+  };
+
+  explicit LockFreeRingBuffer(size_t capacity) noexcept
+    : capacity_(capacity)
+    , slots_(new detail::RingBufferSlot<T,Atom>[capacity])
+    , ticket_(0)
+  {}
+
+  /// Perform a single write of an object of type T.
+  /// Writes can block iff a previous writer has not yet completed a write
+  /// for the same slot (before the most recent wrap-around).
+  void write(T& value) noexcept {
+    uint64_t ticket = ticket_.fetch_add(1);
+    slots_[idx(ticket)].write(turn(ticket), value);
+  }
+
+  /// Read the value at the cursor.
+  /// Returns true if the read succeeded, false otherwise. If the return
+  /// value is false, dest is to be considered partially read and in an
+  /// inconsistent state. Readers are advised to discard it.
+  bool tryRead(T& dest, const Cursor& cursor) noexcept {
+    return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
+  }
+
+  /// Read the value at the cursor or block if the write has not occurred yet.
+  /// Returns true if the read succeeded, false otherwise. If the return
+  /// value is false, dest is to be considered partially read and in an
+  /// inconsistent state. Readers are advised to discard it.
+  bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
+    return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
+  }
+
+  /// Returns a Cursor pointing to the first write that has not occurred yet.
+  Cursor currentHead() noexcept {
+    return Cursor(ticket_.load());
+  }
+
+  /// Returns a Cursor pointing to a currently readable write.
+  /// skipFraction is a value in the [0, 1] range indicating how far into the
+  /// currently readable window to place the cursor. 0 means the
+  /// earliest readable write, 1 means the latest readable write (if any).
+  Cursor currentTail(double skipFraction = 0.0) noexcept {
+    assert(skipFraction >= 0.0 && skipFraction <= 1.0);
+    uint64_t ticket = ticket_.load();
+
+    uint64_t backStep = std::llround((1.0 - skipFraction) * capacity_);
+
+    // always try to move at least one step backward to something readable
+    backStep = std::max<uint64_t>(1, backStep);
+
+    // can't go back more steps than we've taken
+    backStep = std::min(ticket, backStep);
+
+    return Cursor(ticket - backStep);
+  }
+
+  ~LockFreeRingBuffer() {
+  }
+
+private:
+  const size_t capacity_;
+
+  const std::unique_ptr<detail::RingBufferSlot<T,Atom>[]> slots_;
+
+  Atom<uint64_t> ticket_;
+
+  uint32_t idx(uint64_t ticket) noexcept {
+    return ticket % capacity_;
+  }
+
+  uint32_t turn(uint64_t ticket) noexcept {
+    return (ticket / capacity_);
+  }
+}; // LockFreeRingBuffer
+
+namespace detail {
+template<typename T, template<typename> class Atom>
+class RingBufferSlot {
+public:
+  explicit RingBufferSlot() noexcept
+    : sequencer_()
+    , data()
+  {
+  }
+
+  void write(const uint32_t turn, T& value) noexcept {
+    Atom<uint32_t> cutoff(0);
+    sequencer_.waitForTurn(turn * 2, cutoff, false);
+
+    // Change to an odd-numbered turn to indicate write in process
+    sequencer_.completeTurn(turn * 2);
+
+    data = std::move(value);
+    sequencer_.completeTurn(turn * 2 + 1);
+    // At (turn + 1) * 2
+  }
+
+  bool waitAndTryRead(T& dest, uint32_t turn) noexcept {
+    uint32_t desired_turn = (turn + 1) * 2;
+    Atom<uint32_t> cutoff(0);
+    if(!sequencer_.tryWaitForTurn(desired_turn, cutoff, false)) {
+      return false;
+    }
+    memcpy(&dest, &data, sizeof(T));
+
+    // if it's still the same turn, we read the value successfully
+    return sequencer_.isTurn(desired_turn);
+  }
+
+  bool tryRead(T& dest, uint32_t turn) noexcept {
+    // The write that started at turn 0 ended at turn 2
+    if (!sequencer_.isTurn((turn + 1) * 2)) {
+      return false;
+    }
+    memcpy(&dest, &data, sizeof(T));
+
+    // if it's still the same turn, we read the value successfully
+    return sequencer_.isTurn((turn + 1) * 2);
+  }
+
+
+private:
+  TurnSequencer<Atom> sequencer_;
+  T data;
+}; // RingBufferSlot
+
+} // namespace detail
+
+} // namespace folly
diff --git a/folly/experimental/test/LockFreeRingBufferTest.cpp b/folly/experimental/test/LockFreeRingBufferTest.cpp
new file mode 100644 (file)
index 0000000..30fb074
--- /dev/null
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <iostream>
+#include <thread>
+
+#include <folly/detail/Futex.h>
+#include <folly/experimental/LockFreeRingBuffer.h>
+#include <folly/test/DeterministicSchedule.h>
+
+namespace folly {
+
+TEST(LockFreeRingBuffer, writeReadSequentially) {
+  const int capacity = 256;
+  const int turns = 4;
+
+  LockFreeRingBuffer<int> rb(capacity);
+  LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
+  for (unsigned int turn = 0; turn < turns; turn++) {
+    for (unsigned int write = 0; write < capacity; write++) {
+      int val = turn*capacity + write;
+      rb.write(val);
+    }
+
+    for (unsigned int write = 0; write < capacity; write++) {
+      int dest = 0;
+      ASSERT_TRUE(rb.tryRead(dest, cur));
+      ASSERT_EQ(turn*capacity + write, dest);
+      cur.moveForward();
+    }
+  }
+}
+
+TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) {
+  const int capacity = 256;
+  const int turns = 4;
+
+  LockFreeRingBuffer<int> rb(capacity);
+  for (unsigned int turn = 0; turn < turns; turn++) {
+    for (unsigned int write = 0; write < capacity; write++) {
+      int val = turn*capacity + write;
+      rb.write(val);
+    }
+
+    LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
+    cur.moveBackward(1); /// last write
+    for (int write = capacity - 1; write >= 0; write--) {
+      int foo = 0;
+      ASSERT_TRUE(rb.tryRead(foo, cur));
+      ASSERT_EQ(turn*capacity + write, foo);
+      cur.moveBackward();
+    }
+  }
+}
+
+TEST(LockFreeRingBuffer, readsCanBlock) {
+  // Start a reader thread, confirm that reading can block
+  std::atomic<bool> readerHasRun(false);
+  LockFreeRingBuffer<int> rb(1);
+  auto cursor = rb.currentHead();
+  cursor.moveForward(3); // wait for the 4th write
+
+  const int sentinel = 0xfaceb00c;
+
+  auto reader = std::thread([&]() {
+    int val = 0;
+    EXPECT_TRUE(rb.waitAndTryRead(val, cursor));
+    readerHasRun = true;
+    EXPECT_EQ(sentinel, val);
+  });
+
+  for (int i = 0; i < 4; i++) {
+    EXPECT_FALSE(readerHasRun);
+    int val = sentinel;
+    rb.write(val);
+  }
+  reader.join();
+  EXPECT_TRUE(readerHasRun);
+}
+
+// expose the cursor raw value via a wrapper type
+template<typename T, template<typename> class Atom>
+uint64_t value(const typename LockFreeRingBuffer<T, Atom>::Cursor&& rbcursor) {
+  typedef typename LockFreeRingBuffer<T,Atom>::Cursor RBCursor;
+
+  RBCursor cursor = std::move(rbcursor);
+
+  struct ExposedCursor : RBCursor {
+    ExposedCursor(const RBCursor& cursor): RBCursor(cursor) {}
+    uint64_t value(){
+      return this->ticket;
+    }
+  };
+  return ExposedCursor(cursor).value();
+}
+
+template<template<typename> class Atom>
+void runReader(
+    LockFreeRingBuffer<int, Atom>& rb, std::atomic<int32_t>& writes
+) {
+  int32_t idx;
+  while ((idx = writes--) > 0) {
+    rb.write(idx);
+  }
+}
+
+template<template<typename> class Atom>
+void runWritesNeverFail(
+    int capacity, int writes, int writers
+) {
+  using folly::test::DeterministicSchedule;
+
+  DeterministicSchedule sched(DeterministicSchedule::uniform(0));
+  LockFreeRingBuffer<int, Atom> rb(capacity);
+
+  std::atomic<int32_t> writes_remaining(writes);
+  std::vector<std::thread> threads(writers);
+
+  for (int i = 0; i < writers; i++) {
+    threads[i] = DeterministicSchedule::thread(
+        std::bind(runReader<Atom>, std::ref(rb), std::ref(writes_remaining))
+    );
+  }
+
+  for (auto& thread : threads) {
+    DeterministicSchedule::join(thread);
+  }
+
+  EXPECT_EQ(writes, (value<int, Atom>)(rb.currentHead()));
+}
+
+TEST(LockFreeRingBuffer, writesNeverFail) {
+  using folly::test::DeterministicAtomic;
+  using folly::detail::EmulatedFutexAtomic;
+
+  runWritesNeverFail<DeterministicAtomic>(1, 100, 4);
+  runWritesNeverFail<DeterministicAtomic>(10, 100, 4);
+  runWritesNeverFail<DeterministicAtomic>(100, 1000, 8);
+  runWritesNeverFail<DeterministicAtomic>(1000, 10000, 16);
+
+  runWritesNeverFail<std::atomic>(1, 100, 4);
+  runWritesNeverFail<std::atomic>(10, 100, 4);
+  runWritesNeverFail<std::atomic>(100, 1000, 8);
+  runWritesNeverFail<std::atomic>(1000, 10000, 16);
+
+  runWritesNeverFail<EmulatedFutexAtomic>(1, 100, 4);
+  runWritesNeverFail<EmulatedFutexAtomic>(10, 100, 4);
+  runWritesNeverFail<EmulatedFutexAtomic>(100, 1000, 8);
+  runWritesNeverFail<EmulatedFutexAtomic>(1000, 10000, 16);
+}
+
+TEST(LockFreeRingBuffer, readerCanDetectSkips) {
+  const int capacity = 4;
+  const int rounds = 4;
+
+  LockFreeRingBuffer<int> rb(capacity);
+  auto cursor = rb.currentHead();
+  cursor.moveForward(1);
+
+  for (int round = 0; round < rounds; round++) {
+    for (int i = 0; i < capacity; i++) {
+      int val = round * capacity + i;
+      rb.write(val);
+    }
+  }
+
+  int result = -1;
+  EXPECT_FALSE(rb.tryRead(result, cursor));
+  EXPECT_FALSE(rb.waitAndTryRead(result, cursor));
+  EXPECT_EQ(-1, result);
+
+  cursor = rb.currentTail();
+  EXPECT_TRUE(rb.tryRead(result, cursor));
+  EXPECT_EQ(capacity * (rounds - 1), result);
+
+  cursor = rb.currentTail(1.0);
+  EXPECT_TRUE(rb.tryRead(result, cursor));
+  EXPECT_EQ((capacity * rounds) - 1, result);
+}
+
+
+TEST(LockFreeRingBuffer, currentTailRange) {
+  const int capacity = 4;
+  LockFreeRingBuffer<int> rb(capacity);
+
+  // Workaround for template deduction failure
+  auto (&cursorValue)(value<int, std::atomic>);
+
+  // Empty buffer - everything points to 0
+  EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
+  EXPECT_EQ(0, cursorValue(rb.currentTail(0.5)));
+  EXPECT_EQ(0, cursorValue(rb.currentTail(1)));
+
+  // Half-full
+  int val = 5;
+  rb.write(val);
+  rb.write(val);
+
+  EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
+  EXPECT_EQ(1, cursorValue(rb.currentTail(1)));
+
+  // Full
+  rb.write(val);
+  rb.write(val);
+
+  EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
+  EXPECT_EQ(3, cursorValue(rb.currentTail(1)));
+
+  auto midvalue = cursorValue(rb.currentTail(0.5));
+  // both rounding behaviours are acceptable
+  EXPECT_TRUE(midvalue == 1 || midvalue == 2);
+}
+
+} // namespace folly