2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include "folly/MPMCQueue.h"
23 template <class T, class... Stages> class MPMCPipeline;
25 template <class T, size_t Amp> class MPMCPipelineStage {
28 static constexpr size_t kAmplification = Amp;
34 * Helper template to determine value type and amplification whether or not
35 * we use MPMCPipelineStage<>
37 template <class T> struct PipelineStageInfo {
38 static constexpr size_t kAmplification = 1;
42 template <class T, size_t Amp>
43 struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
44 static constexpr size_t kAmplification = Amp;
49 * Wrapper around MPMCQueue (friend) that keeps track of tickets.
52 class MPMCPipelineStageImpl {
55 template <class U, class... Stages> friend class MPMCPipeline;
57 // Implicit so that MPMCPipeline construction works
58 /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { }
59 MPMCPipelineStageImpl() { }
61 // only use on first stage, uses queue_.pushTicket_ instead of existing
63 template <class... Args>
64 void blockingWrite(Args&&... args) noexcept {
65 queue_.blockingWrite(std::forward<Args>(args)...);
68 template <class... Args>
69 bool write(Args&&... args) noexcept {
70 return queue_.write(std::forward<Args>(args)...);
73 template <class... Args>
74 void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
75 queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
78 uint64_t blockingRead(T& elem) noexcept {
79 uint64_t ticket = queue_.popTicket_++;
80 queue_.dequeueWithTicket(ticket, elem);
84 bool read(T& elem) noexcept { // only use on last stage, won't track ticket
85 return queue_.read(elem);
88 template <class... Args>
89 bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
90 if (queue_.tryObtainReadyPopTicket(ticket)) {
91 queue_.dequeueWithTicket(ticket, elem);
98 // See MPMCQueue<T>::writeCount; only works for the first stage
99 uint64_t writeCount() const noexcept {
100 return queue_.writeCount();
103 uint64_t readCount() const noexcept {
104 return queue_.readCount();
111 // Product of amplifications of a tuple of PipelineStageInfo<X>
112 template <class Tuple> struct AmplificationProduct;
114 template <> struct AmplificationProduct<std::tuple<>> {
115 static constexpr size_t value = 1;
118 template <class T, class... Ts>
119 struct AmplificationProduct<std::tuple<T, Ts...>> {
120 static constexpr size_t value =
122 AmplificationProduct<std::tuple<Ts...>>::value;