From: Tudor Bosman Date: Fri, 19 Jul 2013 01:05:51 +0000 (-0700) Subject: Multi-Producer, Multi-Consumer pipeline X-Git-Tag: v0.22.0~921 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=5f4c667266b71278ced3af90b86859f94b3ea9a8;p=folly.git Multi-Producer, Multi-Consumer pipeline Summary: A bunch of MPMCQueues linked together. Stage i produces exactly Ki (default 1) outputs for each input. Ordering is preserved, even though stages might produce (intermediate or final) results in parallel and in any order; we do this by abusing the enqueueing mechanism in MPMCQueue. (Read the code for details) Test Plan: test added, more tests to be written before commit Reviewed By: ngbronson@fb.com FB internal diff: D892388 --- diff --git a/folly/MPMCPipeline.h b/folly/MPMCPipeline.h new file mode 100644 index 00000000..3981e243 --- /dev/null +++ b/folly/MPMCPipeline.h @@ -0,0 +1,285 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "folly/detail/MPMCPipelineDetail.h" + +namespace folly { + +/** + * Helper tag template to use amplification > 1 + */ +template class MPMCPipelineStage; + +/** + * Multi-Producer, Multi-Consumer pipeline. + * + * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h). + * + * At each stage, you may dequeue the results from the previous stage (possibly + * from multiple threads) and enqueue results to the next stage. Regardless of + * the order of completion, data is delivered to the next stage in the original + * order. Each input is matched with a "ticket" which must be produced + * when enqueueing to the next stage. + * + * A given stage must produce exactly K ("amplification factor", default K=1) + * results for every input. This is enforced by requiring that each ticket + * is used exactly K times. + * + * Usage: + * + * // arguments are queue sizes + * MPMCPipeline pipeline(10, 10, 10); + * + * pipeline.blockingWrite(42); + * + * { + * int val; + * auto ticket = pipeline.blockingReadStage<0>(val); + * pipeline.blockingWriteStage<0>(ticket, folly::to(val)); + * } + * + * { + * std::string val; + * auto ticket = pipeline.blockingReadStage<1>(val); + * int ival = 0; + * try { + * ival = folly::to(val); + * } catch (...) { + * // We must produce exactly 1 output even on exception! + * } + * pipeline.blockingWriteStage<1>(ticket, ival); + * } + * + * int result; + * pipeline.blockingRead(result); + * // result == 42 + * + * To specify amplification factors greater than 1, use + * MPMCPipelineStage instead of T in the declaration: + * + * MPMCPipeline, + * MPMCPipelineStage> + * + * declares a two-stage pipeline: the first stage produces 2 strings + * for each input int, the second stage produces 4 ints for each input string, + * so, overall, the pipeline produces 2*4 = 8 ints for each input int. + * + * Implementation details: we use N+1 MPMCQueue objects; each intermediate + * queue connects two adjacent stages. The MPMCQueue implementation is abused; + * instead of using it as a queue, we insert in the output queue at the + * position determined by the input queue's popTicket_. We guarantee that + * all slots are filled (and therefore the queue doesn't freeze) because + * we require that each step produces exactly K outputs for every input. + */ +template class MPMCPipeline { + typedef std::tuple...> StageInfos; + typedef std::tuple< + detail::MPMCPipelineStageImpl, + detail::MPMCPipelineStageImpl< + typename detail::PipelineStageInfo::value_type>...> + StageTuple; + static constexpr size_t kAmplification = + detail::AmplificationProduct::value; + + public: + /** + * Ticket, returned by blockingReadStage, must be given back to + * blockingWriteStage. Tickets are not thread-safe. + */ + template + class Ticket { + public: + ~Ticket() noexcept { + CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!"; + } + +#ifndef NDEBUG + Ticket() noexcept + : owner_(nullptr), + remainingUses_(0), + value_(0xdeadbeeffaceb00c) { + } +#else + Ticket() noexcept : remainingUses_(0) { } +#endif + + Ticket(Ticket&& other) noexcept + : +#ifndef NDEBUG + owner_(other.owner_), +#endif + remainingUses_(other.remainingUses_), + value_(other.value_) { + other.remainingUses_ = 0; +#ifndef NDEBUG + other.owner_ = nullptr; + other.value_ = 0xdeadbeeffaceb00c; +#endif + } + + Ticket& operator=(Ticket&& other) noexcept { + if (this != &other) { + this->~Ticket(); + new (this) Ticket(std::move(other)); + } + return *this; + } + + private: + friend class MPMCPipeline; +#ifndef NDEBUG + MPMCPipeline* owner_; +#endif + size_t remainingUses_; + uint64_t value_; + + + Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept + : +#ifndef NDEBUG + owner_(owner), +#endif + remainingUses_(amplification), + value_(value * amplification) { + } + + uint64_t use(MPMCPipeline* owner) { + CHECK_GT(remainingUses_--, 0); +#ifndef NDEBUG + CHECK(owner == owner_); +#endif + return value_++; + } + }; + + /** + * Default-construct pipeline. Useful to move-assign later, + * just like MPMCQueue, see MPMCQueue.h for more details. + */ + MPMCPipeline() { } + + /** + * Construct a pipeline with N+1 queue sizes. + */ + template + explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) { } + + /** + * Push an element into (the first stage of) the pipeline. Blocking. + */ + template + void blockingWrite(Args&&... args) { + std::get<0>(stages_).blockingWrite(std::forward(args)...); + } + + /** + * Try to push an element into (the first stage of) the pipeline. + * Non-blocking. + */ + template + bool write(Args&&... args) { + return std::get<0>(stages_).write(std::forward(args)...); + } + + /** + * Read an element for stage Stage and obtain a ticket. Blocking. + */ + template + Ticket blockingReadStage( + typename std::tuple_element::type::value_type& elem) { + return Ticket( + this, + std::tuple_element::type::kAmplification, + std::get(stages_).blockingRead(elem)); + } + + /** + * Try to read an element for stage Stage and obtain a ticket. + * Non-blocking. + */ + template + bool readStage( + Ticket& ticket, + typename std::tuple_element::type::value_type& elem) { + uint64_t tval; + if (!std::get(stages_).readAndGetTicket(tval, elem)) { + return false; + } + ticket = Ticket( + this, + std::tuple_element::type::kAmplification, + tval); + return true; + } + + /** + * Complete an element in stage Stage (pushing it for stage Stage+1). + * Blocking. + */ + template + void blockingWriteStage(Ticket& ticket, Args&&... args) { + std::get(stages_).blockingWriteWithTicket( + ticket.use(this), + std::forward(args)...); + } + + /** + * Pop an element from (the final stage of) the pipeline. Blocking. + */ + void blockingRead( + typename std::tuple_element< + sizeof...(Stages), + StageTuple>::type::value_type& elem) { + std::get(stages_).blockingRead(elem); + } + + /** + * Try to pop an element from (the final stage of) the pipeline. + * Non-blocking. + */ + bool read( + typename std::tuple_element< + sizeof...(Stages), + StageTuple>::type::value_type& elem) { + return std::get(stages_).read(elem); + } + + /** + * Estimate queue size, measured as values from the last stage. + * (so if the pipeline has an amplification factor > 1, pushing an element + * into the first stage will cause sizeGuess() to be == amplification factor) + * Elements "in flight" (currently processed as part of a stage, so not + * in any queue) are also counted. + */ + ssize_t sizeGuess() const noexcept { + return (std::get<0>(stages_).writeCount() * kAmplification - + std::get(stages_).readCount()); + } + + private: + StageTuple stages_; +}; + + +} // namespaces + diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 58c1d3ce..b4cdee18 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -37,6 +37,8 @@ namespace detail { template class Atom> class SingleElementQueue; +template class MPMCPipelineStageImpl; + } // namespace detail /// MPMCQueue is a high-performance bounded concurrent queue that @@ -83,6 +85,7 @@ template::value || folly::IsRelocatable::value>::type> class MPMCQueue : boost::noncopyable { + friend class detail::MPMCPipelineStageImpl; public: typedef T value_type; diff --git a/folly/detail/MPMCPipelineDetail.h b/folly/detail/MPMCPipelineDetail.h new file mode 100644 index 00000000..20f725a4 --- /dev/null +++ b/folly/detail/MPMCPipelineDetail.h @@ -0,0 +1,126 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "folly/MPMCQueue.h" + +namespace folly { + +template class MPMCPipeline; + +template class MPMCPipelineStage { + public: + typedef T value_type; + static constexpr size_t kAmplification = Amp; +}; + +namespace detail { + +/** + * Helper template to determine value type and amplification whether or not + * we use MPMCPipelineStage<> + */ +template struct PipelineStageInfo { + static constexpr size_t kAmplification = 1; + typedef T value_type; +}; + +template +struct PipelineStageInfo> { + static constexpr size_t kAmplification = Amp; + typedef T value_type; +}; + +/** + * Wrapper around MPMCQueue (friend) that keeps track of tickets. + */ +template +class MPMCPipelineStageImpl { + public: + typedef T value_type; + template friend class MPMCPipeline; + + // Implicit so that MPMCPipeline construction works + /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { } + MPMCPipelineStageImpl() { } + + // only use on first stage, uses queue_.pushTicket_ instead of existing + // ticket + template + void blockingWrite(Args&&... args) noexcept { + queue_.blockingWrite(std::forward(args)...); + } + + template + bool write(Args&&... args) noexcept { + return queue_.write(std::forward(args)...); + } + + template + void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept { + queue_.enqueueWithTicket(ticket, std::forward(args)...); + } + + uint64_t blockingRead(T& elem) noexcept { + uint64_t ticket = queue_.popTicket_++; + queue_.dequeueWithTicket(ticket, elem); + return ticket; + } + + bool read(T& elem) noexcept { // only use on last stage, won't track ticket + return queue_.read(elem); + } + + template + bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { + if (queue_.tryObtainReadyPopTicket(ticket)) { + queue_.dequeueWithTicket(ticket, elem); + return true; + } else { + return false; + } + } + + // See MPMCQueue::writeCount; only works for the first stage + uint64_t writeCount() const noexcept { + return queue_.writeCount(); + } + + uint64_t readCount() const noexcept { + return queue_.readCount(); + } + + private: + MPMCQueue queue_; +}; + +// Product of amplifications of a tuple of PipelineStageInfo +template struct AmplificationProduct; + +template <> struct AmplificationProduct> { + static constexpr size_t value = 1; +}; + +template +struct AmplificationProduct> { + static constexpr size_t value = + T::kAmplification * + AmplificationProduct>::value; +}; + +}} // namespaces + diff --git a/folly/test/MPMCPipelineTest.cpp b/folly/test/MPMCPipelineTest.cpp new file mode 100644 index 00000000..eed99054 --- /dev/null +++ b/folly/test/MPMCPipelineTest.cpp @@ -0,0 +1,166 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/MPMCPipeline.h" + +#include +#include + +#include +#include + +#include "folly/Conv.h" + +namespace folly { namespace test { + +TEST(MPMCPipeline, Trivial) { + MPMCPipeline a(2, 2); + EXPECT_EQ(0, a.sizeGuess()); + a.blockingWrite(42); + EXPECT_EQ(1, a.sizeGuess()); + + int val; + auto ticket = a.blockingReadStage<0>(val); + EXPECT_EQ(42, val); + EXPECT_EQ(1, a.sizeGuess()); + + a.blockingWriteStage<0>(ticket, "hello world"); + EXPECT_EQ(1, a.sizeGuess()); + + std::string s; + + a.blockingRead(s); + EXPECT_EQ("hello world", s); + EXPECT_EQ(0, a.sizeGuess()); +} + +TEST(MPMCPipeline, TrivialAmplification) { + MPMCPipeline> a(2, 2); + EXPECT_EQ(0, a.sizeGuess()); + a.blockingWrite(42); + EXPECT_EQ(2, a.sizeGuess()); + + int val; + auto ticket = a.blockingReadStage<0>(val); + EXPECT_EQ(42, val); + EXPECT_EQ(2, a.sizeGuess()); + + a.blockingWriteStage<0>(ticket, "hello world"); + EXPECT_EQ(2, a.sizeGuess()); + a.blockingWriteStage<0>(ticket, "goodbye"); + EXPECT_EQ(2, a.sizeGuess()); + + std::string s; + + a.blockingRead(s); + EXPECT_EQ("hello world", s); + EXPECT_EQ(1, a.sizeGuess()); + + a.blockingRead(s); + EXPECT_EQ("goodbye", s); + EXPECT_EQ(0, a.sizeGuess()); +} + +TEST(MPMCPipeline, MultiThreaded) { + constexpr size_t numThreadsPerStage = 6; + MPMCPipeline a(5, 5, 5); + + std::vector threads; + threads.reserve(numThreadsPerStage * 2 + 1); + for (size_t i = 0; i < numThreadsPerStage; ++i) { + threads.emplace_back([&a, i] () { + for (;;) { + int val; + auto ticket = a.blockingReadStage<0>(val); + if (val == -1) { // stop + // We still need to propagate + a.blockingWriteStage<0>(ticket, ""); + break; + } + a.blockingWriteStage<0>( + ticket, folly::to(val, " hello")); + } + }); + } + + for (size_t i = 0; i < numThreadsPerStage; ++i) { + threads.emplace_back([&a, i] () { + for (;;) { + std::string val; + auto ticket = a.blockingReadStage<1>(val); + if (val.empty()) { // stop + // We still need to propagate + a.blockingWriteStage<1>(ticket, ""); + break; + } + a.blockingWriteStage<1>( + ticket, folly::to(val, " world")); + } + }); + } + + std::vector results; + threads.emplace_back([&a, &results] () { + for (;;) { + std::string val; + a.blockingRead(val); + if (val.empty()) { + break; + } + results.push_back(val); + } + }); + + constexpr size_t numValues = 1000; + for (size_t i = 0; i < numValues; ++i) { + a.blockingWrite(i); + } + for (size_t i = 0; i < numThreadsPerStage; ++i) { + a.blockingWrite(-1); + } + + for (auto& t : threads) { + t.join(); + } + + // The consumer thread dequeued the first empty string, there should be + // numThreadsPerStage - 1 left. + EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess()); + for (size_t i = 0; i < numThreadsPerStage - 1; ++i) { + std::string val; + a.blockingRead(val); + EXPECT_TRUE(val.empty()); + } + { + std::string tmp; + EXPECT_FALSE(a.read(tmp)); + } + EXPECT_EQ(0, a.sizeGuess()); + + EXPECT_EQ(numValues, results.size()); + for (size_t i = 0; i < results.size(); ++i) { + EXPECT_EQ(folly::to(i, " hello world"), results[i]); + } +} + +}} // namespaces + +int main(int argc, char *argv[]) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} +