/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2013-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <algorithm>
#include <atomic>
-#include <assert.h>
-#include <boost/noncopyable.hpp>
+#include <cassert>
+#include <cstring>
#include <limits>
-#include <string.h>
#include <type_traits>
+#include <boost/noncopyable.hpp>
+
#include <folly/Traits.h>
-#include <folly/detail/CacheLocality.h>
+#include <folly/concurrency/CacheLocality.h>
#include <folly/detail/TurnSequencer.h>
#include <folly/portability/Unistd.h>
namespace detail {
-template<typename T, template<typename> class Atom>
+template <typename T, template <typename> class Atom>
struct SingleElementQueue;
template <typename T> class MPMCPipelineStageImpl;
/// are you can enqueue one sentinel and then have each consumer requeue
/// two sentinels after it receives it (by requeuing 2 the shutdown can
/// complete in O(log P) time instead of O(P)).
-template<typename T, template<typename> class Atom = std::atomic,
- bool Dynamic = false>
+template <
+ typename T,
+ template <typename> class Atom = std::atomic,
+ bool Dynamic = false>
class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
friend class detail::MPMCPipelineStageImpl<T>;
using Slot = detail::SingleElementQueue<T,Atom>;
/// closed arrays instead of the current one. Information about closed
/// slots arrays (array address, capacity, stride, and offset) is
/// maintained in a logarithmic-sized structure. Each entry in that
-/// structure never need to be changed once set. The number of closed
+/// structure never needs to be changed once set. The number of closed
/// arrays is half the value of the seqlock (when unlocked).
///
/// The acquisition of the seqlock to perform an expansion does not
///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
-template <typename T, template<typename> class Atom>
+template <typename T, template <typename> class Atom>
class MPMCQueue<T,Atom,true> :
public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
// There was an expansion after this ticket was issued.
break;
}
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready. No need to expand.
break;
- } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
- > ticket) {
+ } else if (
+ this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
// May block, but a pop is in progress. No need to expand.
// Get seqlock read section info again in case an expansion
// occurred with an equal or higher ticket.
}
private:
-
enum {
kSeqlockBits = 6,
kDefaultMinDynamicCapacity = 10,
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready.
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayDequeue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+ this->turn(ticket - offset, cap))) {
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ }
// If there was an expansion after this ticket was issued, adjust
// accordingly
assert((state & 1) == 0);
if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
assert(cap == this->dcapacity_.load());
- uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
- this->popTicket_.load());
- size_t newCapacity =
- std::min(dmult_ * cap, this->capacity_);
+ uint64_t ticket =
+ 1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
+ size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
Slot* newSlots =
- new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
if (newSlots == nullptr) {
// Expansion failed. Restore the seqlock
this->dstate_.store(state);
namespace detail {
/// CRTP specialization of MPMCQueueBase
-template<
- template<
- typename T, template<typename> class Atom, bool Dynamic> class Derived,
- typename T, template<typename> class Atom, bool Dynamic>
+template <
+ template <typename T, template <typename> class Atom, bool Dynamic>
+ class Derived,
+ typename T,
+ template <typename> class Atom,
+ bool Dynamic>
class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
// Note: Using CRTP static casts in several functions of this base
}
// ideally this would be a static assert, but g++ doesn't allow it
- assert(alignof(MPMCQueue<T,Atom>)
- >= detail::CacheLocality::kFalseSharingRange);
- assert(static_cast<uint8_t*>(static_cast<void*>(&popTicket_))
- - static_cast<uint8_t*>(static_cast<void*>(&pushTicket_))
- >= detail::CacheLocality::kFalseSharingRange);
+ assert(
+ alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size);
+ assert(
+ static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
+ static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
+ static_cast<ptrdiff_t>(hardware_destructive_interference_size));
}
/// A default-constructed queue is useful because a usable (non-zero
/// To avoid false sharing in slots_ with neighboring memory
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
- kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
- / sizeof(Slot) + 1
+ kSlotPadding =
+ (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1
};
/// The maximum number of items in the queue at once
- size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
+ alignas(hardware_destructive_interference_size) size_t capacity_;
/// Anonymous union for use when Dynamic = false and true, respectively
union {
Atom<size_t> dcapacity_;
/// Enqueuers get tickets from here
- Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
+ alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_;
/// Dequeuers get tickets from here
- Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_;
+ alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_;
/// This is how many times we will spin before using FUTEX_WAIT when
/// the queue is full on enqueue, adaptively computed by occasionally
/// spinning for longer and smoothing with an exponential moving average
- Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_;
+ alignas(
+ hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_;
/// The adaptive spin cutoff when the queue is empty on dequeue
- Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_;
+ alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_;
/// Alignment doesn't prevent false sharing at the end of the struct,
/// so fill out the last cache line
- char padding_[detail::CacheLocality::kFalseSharingRange -
- sizeof(Atom<uint32_t>)];
+ char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)];
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to
}
/// enqueue using in-place noexcept construction
- template <typename ...Args,
- typename = typename std::enable_if<
- std::is_nothrow_constructible<T,Args...>::value>::type>
+ template <
+ typename... Args,
+ typename = typename std::enable_if<
+ std::is_nothrow_constructible<T, Args...>::value>::type>
void enqueue(const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff,