2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/MPMCQueue.h>
23 template <class T, class... Stages> class MPMCPipeline;
25 template <class T, size_t Amp> class MPMCPipelineStage {
28 static constexpr size_t kAmplification = Amp;
34 * Helper template to determine value type and amplification whether or not
35 * we use MPMCPipelineStage<>
37 template <class T> struct PipelineStageInfo {
38 static constexpr size_t kAmplification = 1;
42 template <class T, size_t Amp>
43 struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
44 static constexpr size_t kAmplification = Amp;
49 * Wrapper around MPMCQueue (friend) that keeps track of tickets.
52 class MPMCPipelineStageImpl {
55 template <class U, class... Stages> friend class MPMCPipeline;
57 // Implicit so that MPMCPipeline construction works
58 /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { }
59 MPMCPipelineStageImpl() { }
61 // only use on first stage, uses queue_.pushTicket_ instead of existing
63 template <class... Args>
64 void blockingWrite(Args&&... args) noexcept {
65 queue_.blockingWrite(std::forward<Args>(args)...);
68 template <class... Args>
69 bool write(Args&&... args) noexcept {
70 return queue_.write(std::forward<Args>(args)...);
73 template <class... Args>
74 void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
75 queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
78 uint64_t blockingRead(T& elem) noexcept {
80 queue_.blockingReadWithTicket(ticket, elem);
84 bool read(T& elem) noexcept { // only use on last stage, won't track ticket
85 return queue_.read(elem);
88 template <class... Args>
89 bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
90 return queue_.readAndGetTicket(ticket, elem);
93 // See MPMCQueue<T>::writeCount; only works for the first stage
94 uint64_t writeCount() const noexcept {
95 return queue_.writeCount();
98 uint64_t readCount() const noexcept {
99 return queue_.readCount();
106 // Product of amplifications of a tuple of PipelineStageInfo<X>
107 template <class Tuple> struct AmplificationProduct;
109 template <> struct AmplificationProduct<std::tuple<>> {
110 static constexpr size_t value = 1;
113 template <class T, class... Ts>
114 struct AmplificationProduct<std::tuple<T, Ts...>> {
115 static constexpr size_t value =
117 AmplificationProduct<std::tuple<Ts...>>::value;