--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "folly/detail/MPMCPipelineDetail.h"
+
+namespace folly {
+
+/**
+ * Helper tag template to use amplification > 1
+ */
+template <class T, size_t Amp> class MPMCPipelineStage;
+
+/**
+ * Multi-Producer, Multi-Consumer pipeline.
+ *
+ * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
+ *
+ * At each stage, you may dequeue the results from the previous stage (possibly
+ * from multiple threads) and enqueue results to the next stage. Regardless of
+ * the order of completion, data is delivered to the next stage in the original
+ * order. Each input is matched with a "ticket" which must be produced
+ * when enqueueing to the next stage.
+ *
+ * A given stage must produce exactly K ("amplification factor", default K=1)
+ * results for every input. This is enforced by requiring that each ticket
+ * is used exactly K times.
+ *
+ * Usage:
+ *
+ * // arguments are queue sizes
+ * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
+ *
+ * pipeline.blockingWrite(42);
+ *
+ * {
+ * int val;
+ * auto ticket = pipeline.blockingReadStage<0>(val);
+ * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
+ * }
+ *
+ * {
+ * std::string val;
+ * auto ticket = pipeline.blockingReadStage<1>(val);
+ * int ival = 0;
+ * try {
+ * ival = folly::to<int>(val);
+ * } catch (...) {
+ * // We must produce exactly 1 output even on exception!
+ * }
+ * pipeline.blockingWriteStage<1>(ticket, ival);
+ * }
+ *
+ * int result;
+ * pipeline.blockingRead(result);
+ * // result == 42
+ *
+ * To specify amplification factors greater than 1, use
+ * MPMCPipelineStage<T, amplification> instead of T in the declaration:
+ *
+ * MPMCPipeline<int,
+ * MPMCPipelineStage<std::string, 2>,
+ * MPMCPipelineStage<int, 4>>
+ *
+ * declares a two-stage pipeline: the first stage produces 2 strings
+ * for each input int, the second stage produces 4 ints for each input string,
+ * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
+ *
+ * Implementation details: we use N+1 MPMCQueue objects; each intermediate
+ * queue connects two adjacent stages. The MPMCQueue implementation is abused;
+ * instead of using it as a queue, we insert in the output queue at the
+ * position determined by the input queue's popTicket_. We guarantee that
+ * all slots are filled (and therefore the queue doesn't freeze) because
+ * we require that each step produces exactly K outputs for every input.
+ */
+template <class In, class... Stages> class MPMCPipeline {
+ typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
+ typedef std::tuple<
+ detail::MPMCPipelineStageImpl<In>,
+ detail::MPMCPipelineStageImpl<
+ typename detail::PipelineStageInfo<Stages>::value_type>...>
+ StageTuple;
+ static constexpr size_t kAmplification =
+ detail::AmplificationProduct<StageInfos>::value;
+
+ public:
+ /**
+ * Ticket, returned by blockingReadStage, must be given back to
+ * blockingWriteStage. Tickets are not thread-safe.
+ */
+ template <size_t Stage>
+ class Ticket {
+ public:
+ ~Ticket() noexcept {
+ CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
+ }
+
+#ifndef NDEBUG
+ Ticket() noexcept
+ : owner_(nullptr),
+ remainingUses_(0),
+ value_(0xdeadbeeffaceb00c) {
+ }
+#else
+ Ticket() noexcept : remainingUses_(0) { }
+#endif
+
+ Ticket(Ticket&& other) noexcept
+ :
+#ifndef NDEBUG
+ owner_(other.owner_),
+#endif
+ remainingUses_(other.remainingUses_),
+ value_(other.value_) {
+ other.remainingUses_ = 0;
+#ifndef NDEBUG
+ other.owner_ = nullptr;
+ other.value_ = 0xdeadbeeffaceb00c;
+#endif
+ }
+
+ Ticket& operator=(Ticket&& other) noexcept {
+ if (this != &other) {
+ this->~Ticket();
+ new (this) Ticket(std::move(other));
+ }
+ return *this;
+ }
+
+ private:
+ friend class MPMCPipeline;
+#ifndef NDEBUG
+ MPMCPipeline* owner_;
+#endif
+ size_t remainingUses_;
+ uint64_t value_;
+
+
+ Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
+ :
+#ifndef NDEBUG
+ owner_(owner),
+#endif
+ remainingUses_(amplification),
+ value_(value * amplification) {
+ }
+
+ uint64_t use(MPMCPipeline* owner) {
+ CHECK_GT(remainingUses_--, 0);
+#ifndef NDEBUG
+ CHECK(owner == owner_);
+#endif
+ return value_++;
+ }
+ };
+
+ /**
+ * Default-construct pipeline. Useful to move-assign later,
+ * just like MPMCQueue, see MPMCQueue.h for more details.
+ */
+ MPMCPipeline() { }
+
+ /**
+ * Construct a pipeline with N+1 queue sizes.
+ */
+ template <class... Sizes>
+ explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) { }
+
+ /**
+ * Push an element into (the first stage of) the pipeline. Blocking.
+ */
+ template <class... Args>
+ void blockingWrite(Args&&... args) {
+ std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
+ }
+
+ /**
+ * Try to push an element into (the first stage of) the pipeline.
+ * Non-blocking.
+ */
+ template <class... Args>
+ bool write(Args&&... args) {
+ return std::get<0>(stages_).write(std::forward<Args>(args)...);
+ }
+
+ /**
+ * Read an element for stage Stage and obtain a ticket. Blocking.
+ */
+ template <size_t Stage>
+ Ticket<Stage> blockingReadStage(
+ typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
+ return Ticket<Stage>(
+ this,
+ std::tuple_element<Stage, StageInfos>::type::kAmplification,
+ std::get<Stage>(stages_).blockingRead(elem));
+ }
+
+ /**
+ * Try to read an element for stage Stage and obtain a ticket.
+ * Non-blocking.
+ */
+ template <size_t Stage>
+ bool readStage(
+ Ticket<Stage>& ticket,
+ typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
+ uint64_t tval;
+ if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
+ return false;
+ }
+ ticket = Ticket<Stage>(
+ this,
+ std::tuple_element<Stage, StageInfos>::type::kAmplification,
+ tval);
+ return true;
+ }
+
+ /**
+ * Complete an element in stage Stage (pushing it for stage Stage+1).
+ * Blocking.
+ */
+ template <size_t Stage, class... Args>
+ void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
+ std::get<Stage+1>(stages_).blockingWriteWithTicket(
+ ticket.use(this),
+ std::forward<Args>(args)...);
+ }
+
+ /**
+ * Pop an element from (the final stage of) the pipeline. Blocking.
+ */
+ void blockingRead(
+ typename std::tuple_element<
+ sizeof...(Stages),
+ StageTuple>::type::value_type& elem) {
+ std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
+ }
+
+ /**
+ * Try to pop an element from (the final stage of) the pipeline.
+ * Non-blocking.
+ */
+ bool read(
+ typename std::tuple_element<
+ sizeof...(Stages),
+ StageTuple>::type::value_type& elem) {
+ return std::get<sizeof...(Stages)>(stages_).read(elem);
+ }
+
+ /**
+ * Estimate queue size, measured as values from the last stage.
+ * (so if the pipeline has an amplification factor > 1, pushing an element
+ * into the first stage will cause sizeGuess() to be == amplification factor)
+ * Elements "in flight" (currently processed as part of a stage, so not
+ * in any queue) are also counted.
+ */
+ ssize_t sizeGuess() const noexcept {
+ return (std::get<0>(stages_).writeCount() * kAmplification -
+ std::get<sizeof...(Stages)>(stages_).readCount());
+ }
+
+ private:
+ StageTuple stages_;
+};
+
+
+} // namespaces
+
template<typename T, template<typename> class Atom>
class SingleElementQueue;
+template <typename T> class MPMCPipelineStageImpl;
+
} // namespace detail
/// MPMCQueue<T> is a high-performance bounded concurrent queue that
std::is_nothrow_constructible<T,T&&>::value ||
folly::IsRelocatable<T>::value>::type>
class MPMCQueue : boost::noncopyable {
+ friend class detail::MPMCPipelineStageImpl<T>;
public:
typedef T value_type;
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "folly/MPMCQueue.h"
+
+namespace folly {
+
+template <class T, class... Stages> class MPMCPipeline;
+
+template <class T, size_t Amp> class MPMCPipelineStage {
+ public:
+ typedef T value_type;
+ static constexpr size_t kAmplification = Amp;
+};
+
+namespace detail {
+
+/**
+ * Helper template to determine value type and amplification whether or not
+ * we use MPMCPipelineStage<>
+ */
+template <class T> struct PipelineStageInfo {
+ static constexpr size_t kAmplification = 1;
+ typedef T value_type;
+};
+
+template <class T, size_t Amp>
+struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
+ static constexpr size_t kAmplification = Amp;
+ typedef T value_type;
+};
+
+/**
+ * Wrapper around MPMCQueue (friend) that keeps track of tickets.
+ */
+template <class T>
+class MPMCPipelineStageImpl {
+ public:
+ typedef T value_type;
+ template <class U, class... Stages> friend class MPMCPipeline;
+
+ // Implicit so that MPMCPipeline construction works
+ /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { }
+ MPMCPipelineStageImpl() { }
+
+ // only use on first stage, uses queue_.pushTicket_ instead of existing
+ // ticket
+ template <class... Args>
+ void blockingWrite(Args&&... args) noexcept {
+ queue_.blockingWrite(std::forward<Args>(args)...);
+ }
+
+ template <class... Args>
+ bool write(Args&&... args) noexcept {
+ return queue_.write(std::forward<Args>(args)...);
+ }
+
+ template <class... Args>
+ void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
+ queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
+ }
+
+ uint64_t blockingRead(T& elem) noexcept {
+ uint64_t ticket = queue_.popTicket_++;
+ queue_.dequeueWithTicket(ticket, elem);
+ return ticket;
+ }
+
+ bool read(T& elem) noexcept { // only use on last stage, won't track ticket
+ return queue_.read(elem);
+ }
+
+ template <class... Args>
+ bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
+ if (queue_.tryObtainReadyPopTicket(ticket)) {
+ queue_.dequeueWithTicket(ticket, elem);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // See MPMCQueue<T>::writeCount; only works for the first stage
+ uint64_t writeCount() const noexcept {
+ return queue_.writeCount();
+ }
+
+ uint64_t readCount() const noexcept {
+ return queue_.readCount();
+ }
+
+ private:
+ MPMCQueue<T> queue_;
+};
+
+// Product of amplifications of a tuple of PipelineStageInfo<X>
+template <class Tuple> struct AmplificationProduct;
+
+template <> struct AmplificationProduct<std::tuple<>> {
+ static constexpr size_t value = 1;
+};
+
+template <class T, class... Ts>
+struct AmplificationProduct<std::tuple<T, Ts...>> {
+ static constexpr size_t value =
+ T::kAmplification *
+ AmplificationProduct<std::tuple<Ts...>>::value;
+};
+
+}} // namespaces
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/MPMCPipeline.h"
+
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Conv.h"
+
+namespace folly { namespace test {
+
+TEST(MPMCPipeline, Trivial) {
+ MPMCPipeline<int, std::string> a(2, 2);
+ EXPECT_EQ(0, a.sizeGuess());
+ a.blockingWrite(42);
+ EXPECT_EQ(1, a.sizeGuess());
+
+ int val;
+ auto ticket = a.blockingReadStage<0>(val);
+ EXPECT_EQ(42, val);
+ EXPECT_EQ(1, a.sizeGuess());
+
+ a.blockingWriteStage<0>(ticket, "hello world");
+ EXPECT_EQ(1, a.sizeGuess());
+
+ std::string s;
+
+ a.blockingRead(s);
+ EXPECT_EQ("hello world", s);
+ EXPECT_EQ(0, a.sizeGuess());
+}
+
+TEST(MPMCPipeline, TrivialAmplification) {
+ MPMCPipeline<int, MPMCPipelineStage<std::string, 2>> a(2, 2);
+ EXPECT_EQ(0, a.sizeGuess());
+ a.blockingWrite(42);
+ EXPECT_EQ(2, a.sizeGuess());
+
+ int val;
+ auto ticket = a.blockingReadStage<0>(val);
+ EXPECT_EQ(42, val);
+ EXPECT_EQ(2, a.sizeGuess());
+
+ a.blockingWriteStage<0>(ticket, "hello world");
+ EXPECT_EQ(2, a.sizeGuess());
+ a.blockingWriteStage<0>(ticket, "goodbye");
+ EXPECT_EQ(2, a.sizeGuess());
+
+ std::string s;
+
+ a.blockingRead(s);
+ EXPECT_EQ("hello world", s);
+ EXPECT_EQ(1, a.sizeGuess());
+
+ a.blockingRead(s);
+ EXPECT_EQ("goodbye", s);
+ EXPECT_EQ(0, a.sizeGuess());
+}
+
+TEST(MPMCPipeline, MultiThreaded) {
+ constexpr size_t numThreadsPerStage = 6;
+ MPMCPipeline<int, std::string, std::string> a(5, 5, 5);
+
+ std::vector<std::thread> threads;
+ threads.reserve(numThreadsPerStage * 2 + 1);
+ for (size_t i = 0; i < numThreadsPerStage; ++i) {
+ threads.emplace_back([&a, i] () {
+ for (;;) {
+ int val;
+ auto ticket = a.blockingReadStage<0>(val);
+ if (val == -1) { // stop
+ // We still need to propagate
+ a.blockingWriteStage<0>(ticket, "");
+ break;
+ }
+ a.blockingWriteStage<0>(
+ ticket, folly::to<std::string>(val, " hello"));
+ }
+ });
+ }
+
+ for (size_t i = 0; i < numThreadsPerStage; ++i) {
+ threads.emplace_back([&a, i] () {
+ for (;;) {
+ std::string val;
+ auto ticket = a.blockingReadStage<1>(val);
+ if (val.empty()) { // stop
+ // We still need to propagate
+ a.blockingWriteStage<1>(ticket, "");
+ break;
+ }
+ a.blockingWriteStage<1>(
+ ticket, folly::to<std::string>(val, " world"));
+ }
+ });
+ }
+
+ std::vector<std::string> results;
+ threads.emplace_back([&a, &results] () {
+ for (;;) {
+ std::string val;
+ a.blockingRead(val);
+ if (val.empty()) {
+ break;
+ }
+ results.push_back(val);
+ }
+ });
+
+ constexpr size_t numValues = 1000;
+ for (size_t i = 0; i < numValues; ++i) {
+ a.blockingWrite(i);
+ }
+ for (size_t i = 0; i < numThreadsPerStage; ++i) {
+ a.blockingWrite(-1);
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ // The consumer thread dequeued the first empty string, there should be
+ // numThreadsPerStage - 1 left.
+ EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess());
+ for (size_t i = 0; i < numThreadsPerStage - 1; ++i) {
+ std::string val;
+ a.blockingRead(val);
+ EXPECT_TRUE(val.empty());
+ }
+ {
+ std::string tmp;
+ EXPECT_FALSE(a.read(tmp));
+ }
+ EXPECT_EQ(0, a.sizeGuess());
+
+ EXPECT_EQ(numValues, results.size());
+ for (size_t i = 0; i < results.size(); ++i) {
+ EXPECT_EQ(folly::to<std::string>(i, " hello world"), results[i]);
+ }
+}
+
+}} // namespaces
+
+int main(int argc, char *argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+}
+