Adding support for in-place use of ProducerConsumerQueue.
[folly.git] / folly / ProducerConsumerQueue.h
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
22
23 #include <atomic>
24 #include <cassert>
25 #include <cstdlib>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29 #include <boost/noncopyable.hpp>
30
31 namespace folly {
32
33 /*
34  * ProducerConsumerQueue is a one producer and one consumer queue
35  * without locks.
36  */
37 template<class T>
38 struct ProducerConsumerQueue : private boost::noncopyable {
39   typedef T value_type;
40
41   // size must be >= 2
42   explicit ProducerConsumerQueue(uint32_t size)
43     : size_(size)
44     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
45     , readIndex_(0)
46     , writeIndex_(0)
47   {
48     assert(size >= 2);
49     if (!records_) {
50       throw std::bad_alloc();
51     }
52   }
53
54   ~ProducerConsumerQueue() {
55     // We need to destruct anything that may still exist in our queue.
56     // (No real synchronization needed at destructor time: only one
57     // thread can be doing this.)
58     if (!std::has_trivial_destructor<T>::value) {
59       int read = readIndex_;
60       int end = writeIndex_;
61       while (read != end) {
62         records_[read].~T();
63         if (++read == size_) {
64           read = 0;
65         }
66       }
67     }
68
69     std::free(records_);
70   }
71
72   template<class ...Args>
73   bool write(Args&&... recordArgs) {
74     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
75     auto nextRecord = currentWrite + 1;
76     if (nextRecord == size_) {
77       nextRecord = 0;
78     }
79     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
80       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
81       writeIndex_.store(nextRecord, std::memory_order_release);
82       return true;
83     }
84
85     // queue is full
86     return false;
87   }
88
89   // move (or copy) the value at the front of the queue to given variable
90   bool read(T& record) {
91     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
92     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
93       // queue is empty
94       return false;
95     }
96
97     auto nextRecord = currentRead + 1;
98     if (nextRecord == size_) {
99       nextRecord = 0;
100     }
101     record = std::move(records_[currentRead]);
102     records_[currentRead].~T();
103     readIndex_.store(nextRecord, std::memory_order_release);
104     return true;
105   }
106
107   // pointer to the value at the front of the queue (for use in-place) or
108   // nullptr if empty.
109   T* frontPtr() {
110     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
111     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
112       // queue is empty
113       return nullptr;
114     }
115     return &records_[currentRead];
116   }
117
118   // queue must not be empty
119   void popFront() {
120     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
121     assert(currentRead != writeIndex_.load(std::memory_order_acquire));
122
123     auto nextRecord = currentRead + 1;
124     if (nextRecord == size_) {
125       nextRecord = 0;
126     }
127     records_[currentRead].~T();
128     readIndex_.store(nextRecord, std::memory_order_release);
129   }
130
131   bool isEmpty() const {
132    return readIndex_.load(std::memory_order_consume) !=
133          writeIndex_.load(std::memory_order_consume);
134   }
135
136   bool isFull() const {
137     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
138     if (nextRecord == size_) {
139       nextRecord = 0;
140     }
141     if (nextRecord != readIndex_.load(std::memory_order_consume)) {
142       return false;
143     }
144     // queue is full
145     return true;
146   }
147
148 private:
149   const uint32_t size_;
150   T* const records_;
151
152   std::atomic<int> readIndex_;
153   std::atomic<int> writeIndex_;
154 };
155
156 }
157
158 #endif