FormatTraits.h \
Format.h \
Format-inl.h \
+ futures/Barrier.h \
futures/Deprecated.h \
futures/ThreadedExecutor.h \
futures/DrivableExecutor.h \
FileUtil.cpp \
FingerprintTables.cpp \
futures/detail/ThreadWheelTimekeeper.cpp \
+ futures/Barrier.cpp \
futures/ThreadedExecutor.cpp \
futures/Future.cpp \
futures/InlineExecutor.cpp \
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/futures/Barrier.h>
+
+namespace folly { namespace futures {
+
+Barrier::Barrier(uint32_t n)
+ : size_(n),
+ controlBlock_(allocateControlBlock()) { }
+
+Barrier::~Barrier() {
+ auto block = controlBlock_.load(std::memory_order_relaxed);
+ auto prev = block->valueAndReaderCount.load(std::memory_order_relaxed);
+ DCHECK_EQ(prev >> kReaderShift, 0);
+ auto val = prev & kValueMask;
+ auto p = promises(block);
+
+ for (uint32_t i = 0; i < val; ++i) {
+ p[i].setException(
+ folly::make_exception_wrapper<std::runtime_error>("Barrier destroyed"));
+ }
+
+ freeControlBlock(controlBlock_);
+}
+
+auto Barrier::allocateControlBlock() -> ControlBlock* {
+ auto block = static_cast<ControlBlock*>(malloc(controlBlockSize(size_)));
+ if (!block) {
+ throw std::bad_alloc();
+ }
+ block->valueAndReaderCount = 0;
+
+ auto p = promises(block);
+ uint32_t i = 0;
+ try {
+ for (i = 0; i < size_; ++i) {
+ new (p + i) BoolPromise();
+ }
+ } catch (...) {
+ for (; i != 0; --i) {
+ p[i - 1].~BoolPromise();
+ }
+ throw;
+ }
+
+ return block;
+}
+
+void Barrier::freeControlBlock(ControlBlock* block) {
+ auto p = promises(block);
+ for (uint32_t i = size_; i != 0; --i) {
+ p[i - 1].~BoolPromise();
+ }
+ free(block);
+}
+
+folly::Future<bool> Barrier::wait() {
+ // Load the current control block first. As we know there is at least
+ // one thread in the current epoch (us), this means that the value is
+ // < size_, so controlBlock_ can't change until we bump the value below.
+ auto block = controlBlock_.load(std::memory_order_acquire);
+ auto p = promises(block);
+
+ // Bump the value and record ourselves as reader.
+ // This ensures that block stays allocated, as the reader count is > 0.
+ auto prev = block->valueAndReaderCount.fetch_add(kReader + 1,
+ std::memory_order_acquire);
+
+ auto prevValue = static_cast<uint32_t>(prev & kValueMask);
+ DCHECK_LT(prevValue, size_);
+ auto future = p[prevValue].getFuture();
+
+ if (prevValue + 1 == size_) {
+ // Need to reset the barrier before fulfilling any futures. This is
+ // when the epoch is flipped to the next.
+ controlBlock_.store(allocateControlBlock(), std::memory_order_release);
+
+ p[0].setValue(true);
+ for (uint32_t i = 1; i < size_; ++i) {
+ p[i].setValue(false);
+ }
+ }
+
+ // Free the control block if we're the last reader at max value.
+ prev = block->valueAndReaderCount.fetch_sub(kReader,
+ std::memory_order_acq_rel);
+ if (prev == (kReader | uint64_t(size_))) {
+ freeControlBlock(block);
+ }
+
+ return future;
+}
+
+}} // namespaces
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <cstdint>
+
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+
+namespace folly { namespace futures {
+
+// A folly::Future-istic Barrier synchronization primitive
+//
+// The barrier is initialized with a count N.
+//
+// The first N-1 calls to wait() return uncompleted futures.
+//
+// The Nth call to wait() completes the previous N-1 futures successfully,
+// returns a future that is already completed successfully, and resets the
+// barrier; the barrier may be reused immediately, as soon as at least one
+// of the future completions has been observed.
+//
+// Of these N futures, exactly one is completed with true, while the others are
+// completed with false; it is unspecified which future completes with true.
+// (This may be used to elect a "leader" among a group of threads.)
+//
+// If the barrier is destroyed, any futures already returned by wait() will
+// complete with an error.
+class Barrier {
+ public:
+ explicit Barrier(uint32_t n);
+ ~Barrier();
+
+ folly::Future<bool> wait();
+
+ private:
+ typedef folly::Promise<bool> BoolPromise;
+
+ static constexpr uint64_t kReaderShift = 32;
+ static constexpr uint64_t kReader = uint64_t(1) << kReaderShift;
+ static constexpr uint64_t kValueMask = kReader - 1;
+
+ // For each "epoch" that the barrier is active, we have a different
+ // ControlBlock. The ControlBlock contains the current barrier value
+ // and the number of readers (currently inside wait()) packed into a
+ // 64-bit value.
+ //
+ // The ControlBlock is allocated as long as either:
+ // - there are threads currently inside wait() (reader count > 0), or
+ // - the value has not yet reached size_ (value < size_)
+ //
+ // The array of size_ Promise objects is allocated immediately following
+ // valueAndReaderCount.
+
+ struct ControlBlock {
+ // Reader count in most significant 32 bits
+ // Value in least significant 32 bits
+ std::atomic<uint64_t> valueAndReaderCount;
+ };
+
+ struct ControlBlockAndPromise {
+ ControlBlock cb;
+ BoolPromise promises[1];
+ };
+
+ static BoolPromise* promises(ControlBlock* cb) {
+ return reinterpret_cast<ControlBlockAndPromise*>(cb)->promises;
+ }
+
+ static size_t controlBlockSize(size_t n) {
+ return offsetof(ControlBlockAndPromise, promises) + n * sizeof(BoolPromise);
+ }
+
+ ControlBlock* allocateControlBlock();
+ void freeControlBlock(ControlBlock* b);
+
+ uint32_t size_;
+ std::atomic<ControlBlock*> controlBlock_;
+};
+
+}} // namespaces
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/futures/Barrier.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
+#include <folly/Random.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+DEFINE_int32(seed, 0, "Random seed");
+
+namespace folly { namespace futures { namespace test {
+
+TEST(BarrierTest, Simple) {
+ constexpr uint32_t numThreads = 10;
+
+ std::mutex mutex;
+ std::condition_variable b1DoneCond;
+ std::condition_variable b2DoneCond;
+ std::atomic<uint32_t> b1TrueSeen(0);
+ std::atomic<uint32_t> b1Passed(0);
+ std::atomic<uint32_t> b2TrueSeen(0);
+ std::atomic<uint32_t> b2Passed(0);
+
+ Barrier barrier(numThreads + 1);
+
+ std::vector<std::thread> threads;
+ threads.reserve(numThreads);
+ for (uint32_t i = 0; i < numThreads; ++i) {
+ threads.emplace_back([&] () {
+ barrier.wait()
+ .then(
+ [&] (bool v) {
+ std::unique_lock<std::mutex> lock(mutex);
+ b1TrueSeen += uint32_t(v);
+ if (++b1Passed == numThreads) {
+ b1DoneCond.notify_one();
+ }
+ return barrier.wait();
+ })
+ .then(
+ [&] (bool v) {
+ std::unique_lock<std::mutex> lock(mutex);
+ b2TrueSeen += uint32_t(v);
+ if (++b2Passed == numThreads) {
+ b2DoneCond.notify_one();
+ }
+ })
+ .get();
+ });
+ }
+
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ EXPECT_EQ(0, b1Passed);
+ EXPECT_EQ(0, b1TrueSeen);
+
+ b1TrueSeen += barrier.wait().get();
+
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (b1Passed != numThreads) {
+ b1DoneCond.wait(lock);
+ }
+ EXPECT_EQ(1, b1TrueSeen);
+ }
+
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ EXPECT_EQ(0, b2Passed);
+ EXPECT_EQ(0, b2TrueSeen);
+
+ b2TrueSeen += barrier.wait().get();
+
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (b2Passed != numThreads) {
+ b2DoneCond.wait(lock);
+ }
+ EXPECT_EQ(1, b2TrueSeen);
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+TEST(BarrierTest, Random) {
+ // Create numThreads threads.
+ //
+ // Each thread repeats the following numIterations times:
+ // - grab a randomly chosen number of futures from the barrier, waiting
+ // for a short random time between each
+ // - wait for all futures to complete
+ // - record whether the one future returning true was seen among them
+ //
+ // At the end, we verify that exactly one future returning true was seen
+ // for each iteration.
+ constexpr uint32_t numIterations = 1;
+ auto numThreads = folly::Random::rand32(30, 91);
+
+ struct ThreadInfo {
+ ThreadInfo() { }
+ std::thread thread;
+ uint32_t iteration = 0;
+ uint32_t numFutures;
+ std::vector<uint32_t> trueSeen;
+ };
+
+ std::vector<ThreadInfo> threads;
+ threads.resize(numThreads);
+
+ uint32_t totalFutures = 0;
+ for (auto& tinfo : threads) {
+ tinfo.numFutures = folly::Random::rand32(100);
+ tinfo.trueSeen.resize(numIterations);
+ totalFutures += tinfo.numFutures;
+ }
+
+ Barrier barrier(totalFutures);
+
+ for (auto& tinfo : threads) {
+ auto pinfo = &tinfo;
+ tinfo.thread = std::thread(
+ [numIterations, pinfo, &barrier] () {
+ std::vector<folly::Future<bool>> futures;
+ futures.reserve(pinfo->numFutures);
+ for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) {
+ futures.clear();
+ for (uint32_t j = 0; j < pinfo->numFutures; ++j) {
+ futures.push_back(barrier.wait());
+ auto nanos = folly::Random::rand32(10 * 1000 * 1000);
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::nanoseconds(nanos));
+ }
+ auto results = folly::collect(futures).get();
+ pinfo->trueSeen[i] =
+ std::count(results.begin(), results.end(), true);
+ }
+ });
+ }
+
+ for (auto& tinfo : threads) {
+ tinfo.thread.join();
+ EXPECT_EQ(numIterations, tinfo.iteration);
+ }
+
+ for (uint32_t i = 0; i < numIterations; ++i) {
+ uint32_t trueCount = 0;
+ for (auto& tinfo : threads) {
+ trueCount += tinfo.trueSeen[i];
+ }
+ EXPECT_EQ(1, trueCount);
+ }
+}
+
+}}} // namespaces