2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
27 #include <type_traits>
33 * ProducerConsumerQueue is a one producer and one consumer queue
37 struct ProducerConsumerQueue {
40 ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
41 ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
45 // Also, note that the number of usable slots in the queue at any
46 // given time is actually (size-1), so if you start with an empty queue,
47 // isFull() will return true after size-1 insertions.
48 explicit ProducerConsumerQueue(uint32_t size)
50 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
56 throw std::bad_alloc();
60 ~ProducerConsumerQueue() {
61 // We need to destruct anything that may still exist in our queue.
62 // (No real synchronization needed at destructor time: only one
63 // thread can be doing this.)
64 if (!std::is_trivially_destructible<T>::value) {
65 size_t read = readIndex_;
66 size_t end = writeIndex_;
69 if (++read == size_) {
78 template<class ...Args>
79 bool write(Args&&... recordArgs) {
80 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
81 auto nextRecord = currentWrite + 1;
82 if (nextRecord == size_) {
85 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
86 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
87 writeIndex_.store(nextRecord, std::memory_order_release);
95 // move (or copy) the value at the front of the queue to given variable
96 bool read(T& record) {
97 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
98 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
103 auto nextRecord = currentRead + 1;
104 if (nextRecord == size_) {
107 record = std::move(records_[currentRead]);
108 records_[currentRead].~T();
109 readIndex_.store(nextRecord, std::memory_order_release);
113 // pointer to the value at the front of the queue (for use in-place) or
116 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
117 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
121 return &records_[currentRead];
124 // queue must not be empty
126 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
127 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
129 auto nextRecord = currentRead + 1;
130 if (nextRecord == size_) {
133 records_[currentRead].~T();
134 readIndex_.store(nextRecord, std::memory_order_release);
137 bool isEmpty() const {
138 return readIndex_.load(std::memory_order_consume) ==
139 writeIndex_.load(std::memory_order_consume);
142 bool isFull() const {
143 auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
144 if (nextRecord == size_) {
147 if (nextRecord != readIndex_.load(std::memory_order_consume)) {
154 // * If called by consumer, then true size may be more (because producer may
155 // be adding items concurrently).
156 // * If called by producer, then true size may be less (because consumer may
157 // be removing items concurrently).
158 // * It is undefined to call this from any other thread.
159 size_t sizeGuess() const {
160 int ret = writeIndex_.load(std::memory_order_consume) -
161 readIndex_.load(std::memory_order_consume);
169 const uint32_t size_;
172 std::atomic<unsigned int> readIndex_;
173 std::atomic<unsigned int> writeIndex_;