2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
27 #include <type_traits>
29 #include <boost/noncopyable.hpp>
30 #include <boost/type_traits.hpp>
35 * ProducerConsumerQueue is a one producer and one consumer queue
39 struct ProducerConsumerQueue : private boost::noncopyable {
44 // Also, note that the number of usable slots in the queue at any
45 // given time is actually (size-1), so if you start with an empty queue,
46 // isFull() will return true after size-1 insertions.
47 explicit ProducerConsumerQueue(uint32_t size)
49 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
55 throw std::bad_alloc();
59 ~ProducerConsumerQueue() {
60 // We need to destruct anything that may still exist in our queue.
61 // (No real synchronization needed at destructor time: only one
62 // thread can be doing this.)
63 if (!boost::has_trivial_destructor<T>::value) {
64 int read = readIndex_;
65 int end = writeIndex_;
68 if (++read == size_) {
77 template<class ...Args>
78 bool write(Args&&... recordArgs) {
79 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
80 auto nextRecord = currentWrite + 1;
81 if (nextRecord == size_) {
84 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
85 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
86 writeIndex_.store(nextRecord, std::memory_order_release);
94 // move (or copy) the value at the front of the queue to given variable
95 bool read(T& record) {
96 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
97 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
102 auto nextRecord = currentRead + 1;
103 if (nextRecord == size_) {
106 record = std::move(records_[currentRead]);
107 records_[currentRead].~T();
108 readIndex_.store(nextRecord, std::memory_order_release);
112 // pointer to the value at the front of the queue (for use in-place) or
115 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
116 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
120 return &records_[currentRead];
123 // queue must not be empty
125 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
126 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
128 auto nextRecord = currentRead + 1;
129 if (nextRecord == size_) {
132 records_[currentRead].~T();
133 readIndex_.store(nextRecord, std::memory_order_release);
136 bool isEmpty() const {
137 return readIndex_.load(std::memory_order_consume) ==
138 writeIndex_.load(std::memory_order_consume);
141 bool isFull() const {
142 auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
143 if (nextRecord == size_) {
146 if (nextRecord != readIndex_.load(std::memory_order_consume)) {
153 // * If called by consumer, then true size may be more (because producer may
154 // be adding items concurrently).
155 // * If called by producer, then true size may be less (because consumer may
156 // be removing items concurrently).
157 // * It is undefined to call this from any other thread.
158 size_t sizeGuess() const {
159 int ret = writeIndex_.load(std::memory_order_consume) -
160 readIndex_.load(std::memory_order_consume);
168 const uint32_t size_;
171 std::atomic<int> readIndex_;
172 std::atomic<int> writeIndex_;