2 * Copyright 2012 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
27 #include <type_traits>
29 #include <boost/noncopyable.hpp>
34 * ProducerConsumerQueue is a one producer and one consumer queue
38 struct ProducerConsumerQueue : private boost::noncopyable {
43 // Also, note that the number of usable slots in the queue at any
44 // given time is actually (size-1), so if you start with an empty queue,
45 // isFull() will return true after size-1 insertions.
46 explicit ProducerConsumerQueue(uint32_t size)
48 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
54 throw std::bad_alloc();
58 ~ProducerConsumerQueue() {
59 // We need to destruct anything that may still exist in our queue.
60 // (No real synchronization needed at destructor time: only one
61 // thread can be doing this.)
62 if (!std::has_trivial_destructor<T>::value) {
63 int read = readIndex_;
64 int end = writeIndex_;
67 if (++read == size_) {
76 template<class ...Args>
77 bool write(Args&&... recordArgs) {
78 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
79 auto nextRecord = currentWrite + 1;
80 if (nextRecord == size_) {
83 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
84 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
85 writeIndex_.store(nextRecord, std::memory_order_release);
93 // move (or copy) the value at the front of the queue to given variable
94 bool read(T& record) {
95 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
96 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
101 auto nextRecord = currentRead + 1;
102 if (nextRecord == size_) {
105 record = std::move(records_[currentRead]);
106 records_[currentRead].~T();
107 readIndex_.store(nextRecord, std::memory_order_release);
111 // pointer to the value at the front of the queue (for use in-place) or
114 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
115 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
119 return &records_[currentRead];
122 // queue must not be empty
124 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
125 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
127 auto nextRecord = currentRead + 1;
128 if (nextRecord == size_) {
131 records_[currentRead].~T();
132 readIndex_.store(nextRecord, std::memory_order_release);
135 bool isEmpty() const {
136 return readIndex_.load(std::memory_order_consume) ==
137 writeIndex_.load(std::memory_order_consume);
140 bool isFull() const {
141 auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
142 if (nextRecord == size_) {
145 if (nextRecord != readIndex_.load(std::memory_order_consume)) {
152 // * If called by consumer, then true size may be more (because producer may
153 // be adding items concurrently).
154 // * If called by producer, then true size may be less (because consumer may
155 // be removing items concurrently).
156 // * It is undefined to call this from any other thread.
157 size_t sizeGuess() const {
158 int ret = writeIndex_.load(std::memory_order_consume) -
159 readIndex_.load(std::memory_order_consume);
167 const uint32_t size_;
170 std::atomic<int> readIndex_;
171 std::atomic<int> writeIndex_;