/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2013-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/// closed arrays instead of the current one. Information about closed
/// slots arrays (array address, capacity, stride, and offset) is
/// maintained in a logarithmic-sized structure. Each entry in that
-/// structure never need to be changed once set. The number of closed
+/// structure never needs to be changed once set. The number of closed
/// arrays is half the value of the seqlock (when unlocked).
///
/// The acquisition of the seqlock to perform an expansion does not
// There was an expansion after this ticket was issued.
break;
}
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready. No need to expand.
break;
- } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
- > ticket) {
+ } else if (
+ this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
// May block, but a pop is in progress. No need to expand.
// Get seqlock read section info again in case an expansion
// occurred with an equal or higher ticket.
}
private:
-
enum {
kSeqlockBits = 6,
kDefaultMinDynamicCapacity = 10,
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready.
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayDequeue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+ this->turn(ticket - offset, cap))) {
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ }
// If there was an expansion after this ticket was issued, adjust
// accordingly
assert((state & 1) == 0);
if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
assert(cap == this->dcapacity_.load());
- uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
- this->popTicket_.load());
- size_t newCapacity =
- std::min(dmult_ * cap, this->capacity_);
+ uint64_t ticket =
+ 1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
+ size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
Slot* newSlots =
- new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
if (newSlots == nullptr) {
// Expansion failed. Restore the seqlock
this->dstate_.store(state);
}
// ideally this would be a static assert, but g++ doesn't allow it
- assert(alignof(MPMCQueue<T, Atom>) >= CacheLocality::kFalseSharingRange);
+ assert(
+ alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size);
assert(
static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
- CacheLocality::kFalseSharingRange);
+ static_cast<ptrdiff_t>(hardware_destructive_interference_size));
}
/// A default-constructed queue is useful because a usable (non-zero
/// To avoid false sharing in slots_ with neighboring memory
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
- kSlotPadding = (CacheLocality::kFalseSharingRange - 1) / sizeof(Slot) + 1
+ kSlotPadding =
+ (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1
};
/// The maximum number of items in the queue at once
- size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
+ alignas(hardware_destructive_interference_size) size_t capacity_;
/// Anonymous union for use when Dynamic = false and true, respectively
union {
Atom<size_t> dcapacity_;
/// Enqueuers get tickets from here
- Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
+ alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_;
/// Dequeuers get tickets from here
- Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_;
+ alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_;
/// This is how many times we will spin before using FUTEX_WAIT when
/// the queue is full on enqueue, adaptively computed by occasionally
/// spinning for longer and smoothing with an exponential moving average
- Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_;
+ alignas(
+ hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_;
/// The adaptive spin cutoff when the queue is empty on dequeue
- Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_;
+ alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_;
/// Alignment doesn't prevent false sharing at the end of the struct,
/// so fill out the last cache line
- char padding_[CacheLocality::kFalseSharingRange - sizeof(Atom<uint32_t>)];
+ char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)];
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to