/// call to blockingWrite() when the queue size is known to be equal
/// to its capacity.
///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template<typename> class Atom>
uint64_t offset;
do {
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
int stride;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride));
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ }
// If there was an expansion after the corresponding push ticket
// was issued, adjust accordingly
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
do {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
ticket = this->pushTicket_.load(std::memory_order_acquire);
auto numPops = this->popTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
- const auto oldCap = cap;
+ const auto curCap = cap;
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
int64_t n = ticket - numPops;
- if (n >= static_cast<ssize_t>(this->capacity_)) {
- ticket -= offset;
- return false;
- }
- if (n >= static_cast<ssize_t>(oldCap)) {
- if (tryExpand(state, oldCap)) {
- // This or another thread started an expansion. Start over
- // with a new state.
+ if (n >= static_cast<ssize_t>(cap)) {
+ if ((cap == curCap) && tryExpand(state, cap)) {
+ // This or another thread started an expansion. Start over.
continue;
- } else {
- // Can't expand.
- ticket -= offset;
- return false;
}
+ // Can't expand.
+ ticket -= offset;
+ return false;
}
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
do {
ticket = this->popTicket_.load(std::memory_order_relaxed);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
}
}
+// All the never_fail tests are for the non-dynamic version only.
+// False positive for dynamic version. Some writeIfNotFull() and
+// tryWriteUntil() operations may fail in transient conditions related
+// to expansion.
+
TEST(MPMCQueue, mt_never_fail) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<std::atomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFail<std::atomic, true>(nts, n);
-}
-
TEST(MPMCQueue, mt_never_fail_emulated_futex) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
-}
-
template<bool Dynamic = false>
void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
LOG(INFO) << "using seed " << seed;
runMtNeverFailDeterministic(nts, n, seed);
}
-TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
- std::vector<int> nts {3, 10};
- long seed = 0; // nowMicro() % 10000;
- int n = 1000;
- runMtNeverFailDeterministic<true>(nts, n, seed);
-}
-
template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
runMtNeverFailUntilSystem(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFailUntilSystem<true>(nts, n);
-}
-
template <bool Dynamic = false>
void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
runMtNeverFailUntilSteady(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFailUntilSteady<true>(nts, n);
-}
-
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
TEST(MPMCQueue, try_write_until_dynamic) {
testTryWriteUntil<true>();
}
+
+template <bool Dynamic>
+void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
+ CHECK(q.write(1));
+ /* The following must not block forever */
+ q.tryWriteUntil(
+ std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
+}
+
+TEST(MPMCQueue, try_write_until_timeout) {
+ folly::MPMCQueue<int, std::atomic, false> queue(1);
+ testTimeout<false>(queue);
+}
+
+TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
+ folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+ testTimeout<true>(queue);
+}