11 // Thread safety is ensured for many concurrent enqueuers but only one
12 // concurrent dequeuer. That is, the head end is thread safe, but the tail end
13 // can only be manipulated by a single thread.
14 template <typename Tp, unsigned int Capacity>
20 memset(&buf_[0], 0, Capacity * sizeof(buf_[0]));
26 return head_.load(std::memory_order_acquire) ==
27 tail_.load(std::memory_order_acquire) &&
28 !buf_[head_.load(std::memory_order_acquire)].load(std::memory_order_acquire);
31 // blocks until something enqs()
38 unsigned icur = head_.load(std::memory_order_acquire);
39 INVARIANT(icur < Capacity);
40 if (buf_[icur].load(std::memory_order_acquire)) {
45 // found an empty spot, so we now race for it
46 unsigned inext = (icur + 1) % Capacity;
47 if (!head_.compare_exchange_strong(icur, inext, std::memory_order_acq_rel)) {
52 INVARIANT(!buf_[icur].load(std::memory_order_acquire));
53 buf_[icur].store(p, std::memory_order_release);
56 // blocks until something deqs()
60 while (!buf_[tail_.load(std::memory_order_acquire)].load(std::memory_order_acquire))
62 Tp *ret = buf_[tail_.load(std::memory_order_acquire)].load(std::memory_order_acquire);
63 buf_[postincr(tail_)].store(nullptr, std::memory_order_release);
71 return buf_[tail_.load(std::memory_order_acquire)].load(std::memory_order_acquire);
74 // takes a current snapshot of all entries in the queue
76 peekall(std::vector<Tp *> &ps, size_t limit = std::numeric_limits<size_t>::max())
79 const unsigned t = tail_.load(std::memory_order_acquire);
82 while ((p = buf_[i].load(std::memory_order_acquire)) && ps.size() < limit) {
86 // have fully wrapped around
93 static inline unsigned
96 const unsigned ret = i;
97 i = (i + 1) % Capacity;
101 static inline unsigned
102 postincr(std::atomic<unsigned> &i)
104 const unsigned ret = i.load(std::memory_order_acquire);
105 i.store((ret + 1) % Capacity, std::memory_order_release);
109 std::atomic<Tp *> buf_[Capacity];
110 std::atomic<unsigned> head_;
111 std::atomic<unsigned> tail_;