2 Copyright (c) 2017 Erik Rigtorp <erik@rigtorp.se>
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to deal
6 in the Software without restriction, including without limitation the rights
7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
11 The above copyright notice and this permission notice shall be included in all
12 copies or substantial portions of the Software.
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 #include <cds/details/defs.h>
33 template <typename T> class MPMCQueue {
35 static_assert(std::is_nothrow_copy_assignable<T>::value ||
36 std::is_nothrow_move_assignable<T>::value,
37 "T must be nothrow copy or move assignable");
39 static_assert(std::is_nothrow_destructible<T>::value,
40 "T must be nothrow destructible");
43 explicit MPMCQueue(const size_t capacity)
44 : capacity_(capacity), head_(0), tail_(0) {
46 throw std::invalid_argument("capacity < 1");
48 size_t space = capacity * sizeof(Slot) + kCacheLineSize - 1;
50 if (buf_ == nullptr) {
51 throw std::bad_alloc();
54 slots_ = reinterpret_cast<Slot *>(
55 std::align(kCacheLineSize, capacity * sizeof(Slot), buf, space));
56 if (slots_ == nullptr) {
58 throw std::bad_alloc();
60 for (size_t i = 0; i < capacity_; ++i) {
61 new (&slots_[i]) Slot();
63 static_assert(sizeof(MPMCQueue<T>) % kCacheLineSize == 0,
64 "MPMCQueue<T> size must be a multiple of cache line size to "
65 "prevent false sharing between adjacent queues");
66 static_assert(sizeof(Slot) % kCacheLineSize == 0,
67 "Slot size must be a multiple of cache line size to prevent "
68 "false sharing between adjacent slots");
69 assert(reinterpret_cast<size_t>(slots_) % kCacheLineSize == 0 &&
70 "slots_ array must be aligned to cache line size to prevent false "
71 "sharing between adjacent slots");
72 assert(reinterpret_cast<char *>(&tail_) -
73 reinterpret_cast<char *>(&head_) >=
75 "head and tail must be a cache line apart to prevent false sharing");
78 ~MPMCQueue() noexcept {
79 for (size_t i = 0; i < capacity_; ++i) {
85 // non-copyable and non-movable
86 MPMCQueue(const MPMCQueue &) = delete;
87 MPMCQueue &operator=(const MPMCQueue &) = delete;
89 template <typename... Args> void emplace(Args &&... args) noexcept {
90 static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
91 "T must be nothrow constructible with Args&&...");
92 auto const head = head_.fetch_add(1);
93 auto &slot = slots_[idx(head)];
94 while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire))
96 slot.construct(std::forward<Args>(args)...);
97 slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
100 template <typename... Args> bool try_emplace(Args &&... args) noexcept {
101 static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
102 "T must be nothrow constructible with Args&&...");
103 auto head = head_.load(std::memory_order_acquire);
105 auto &slot = slots_[idx(head)];
106 if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) {
107 if (head_.compare_exchange_strong(head, head + 1)) {
108 slot.construct(std::forward<Args>(args)...);
109 slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
113 auto const prevHead = head;
114 head = head_.load(std::memory_order_acquire);
115 if (head == prevHead) {
122 void push(const T &v) noexcept {
123 static_assert(std::is_nothrow_copy_constructible<T>::value,
124 "T must be nothrow copy constructible");
128 template <typename P,
129 typename = typename std::enable_if<
130 std::is_nothrow_constructible<T, P &&>::value>::type>
131 void push(P &&v) noexcept {
132 emplace(std::forward<P>(v));
135 bool try_push(const T &v) noexcept {
136 static_assert(std::is_nothrow_copy_constructible<T>::value,
137 "T must be nothrow copy constructible");
138 return try_emplace(v);
141 template <typename P,
142 typename = typename std::enable_if<
143 std::is_nothrow_constructible<T, P &&>::value>::type>
144 bool try_push(P &&v) noexcept {
145 return try_emplace(std::forward<P>(v));
148 void pop(T &v) noexcept {
149 auto const tail = tail_.fetch_add(1);
150 auto &slot = slots_[idx(tail)];
151 while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
155 slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
158 bool try_pop(T &v) noexcept {
159 auto tail = tail_.load(std::memory_order_acquire);
161 auto &slot = slots_[idx(tail)];
162 if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) {
163 if (tail_.compare_exchange_strong(tail, tail + 1)) {
166 slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
170 auto const prevTail = tail;
171 tail = tail_.load(std::memory_order_acquire);
172 if (tail == prevTail) {
180 constexpr size_t idx(size_t i) const noexcept { return i % capacity_; }
182 constexpr size_t turn(size_t i) const noexcept { return i / capacity_; }
184 static constexpr size_t kCacheLineSize = 128;
193 template <typename... Args> void construct(Args &&... args) noexcept {
194 static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
195 "T must be nothrow constructible with Args&&...");
196 new (&storage) T(std::forward<Args>(args)...);
199 void destroy() noexcept {
200 static_assert(std::is_nothrow_destructible<T>::value,
201 "T must be nothrow destructible");
202 reinterpret_cast<T *>(&storage)->~T();
205 T &&move() noexcept { return reinterpret_cast<T &&>(storage); }
207 // Align to avoid false sharing between adjacent slots
208 alignas(kCacheLineSize) std::atomic<size_t> turn = {0};
209 typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
213 const size_t capacity_;
217 // Align to avoid false sharing between head_ and tail_
218 alignas(kCacheLineSize) std::atomic<size_t> head_;
219 alignas(kCacheLineSize) std::atomic<size_t> tail_;