+++ /dev/null
-#ifndef _CHASE_LEV_DEQUE_H
-#define _CHASE_LEV_DEQUE_H
-
-#include <atomic>
-#include <cds/sync/backoff.h>
-#include <cstdlib>
-#include <inttypes.h>
-#include <iostream>
-
-namespace cds_others {
-
-#define EMPTY 0xffffffff
-#define ABORT 0xfffffffe
-
-using std::memory_order_seq_cst;
-using std::memory_order_release;
-using std::memory_order_acquire;
-using std::memory_order_acq_rel;
-using std::memory_order_relaxed;
-using std::atomic_int;
-using std::atomic_size_t;
-using std::atomic_uintptr_t;
-using std::size_t;
-
-class ChaseLevDeque {
-private:
- atomic_size_t top;
- atomic_size_t bottom;
- atomic_uintptr_t array; /* Atomic(Array *) */
-
-public:
- struct Array {
- atomic_size_t size;
- atomic_int buffer[];
- };
-
- ChaseLevDeque() {
- Array *a = (Array *)calloc(1, sizeof(Array) + 2 * sizeof(atomic_int));
- array.store((uintptr_t)a, memory_order_relaxed);
- top.store(0, memory_order_relaxed);
- bottom.store(0, memory_order_relaxed);
- a->size.store(2, memory_order_relaxed);
- }
-
- int take() {
- size_t b = bottom.load(memory_order_relaxed) - 1;
- Array *a = (Array *)array.load(memory_order_relaxed);
- bottom.store(b, memory_order_relaxed);
- atomic_thread_fence(memory_order_seq_cst);
- size_t t = top.load(memory_order_relaxed);
- int x;
- if (t <= b) {
- /* Non-empty queue. */
- x = a->buffer[b % a->size.load(memory_order_relaxed)].load(
- memory_order_relaxed);
- if (t == b) {
- /* Single last element in queue. */
- if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst,
- memory_order_relaxed))
- /* Failed race. */
- x = EMPTY;
- bottom.store(b + 1, memory_order_relaxed);
- }
- } else { /* Empty queue. */
- x = EMPTY;
- bottom.store(b + 1, memory_order_relaxed);
- }
- return x;
- }
-
- void resize() {
- Array *a = (Array *)array.load(memory_order_relaxed);
- size_t size = a->size.load(memory_order_relaxed);
- size_t new_size = size << 1;
- Array *new_a =
- (Array *)calloc(1, new_size * sizeof(atomic_int) + sizeof(Array));
- size_t t = top.load(memory_order_relaxed);
- size_t b = bottom.load(memory_order_relaxed);
- new_a->size.store(new_size, memory_order_relaxed);
- size_t i;
- for (i = t; i < b; i++) {
- new_a->buffer[i % new_size].store(
- a->buffer[i % size].load(memory_order_relaxed), memory_order_relaxed);
- }
- array.store((uintptr_t)new_a, memory_order_release);
- // std::cout << "Resize to " << new_size << "\n";
- }
-
- void push(int x) {
- size_t b = bottom.load(memory_order_relaxed);
- size_t t = top.load(memory_order_acquire);
- Array *a = (Array *)array.load(memory_order_relaxed);
- if (b - t > a->size.load(memory_order_relaxed) - 1) /* Full queue. */ {
- resize();
- // Bug in paper...should have next line...
- a = (Array *)array.load(memory_order_relaxed);
- }
- a->buffer[b % a->size.load(memory_order_relaxed)].store(
- x, memory_order_relaxed);
- atomic_thread_fence(memory_order_release);
- bottom.store(b + 1, memory_order_relaxed);
- }
-
- int steal() {
- size_t t = top.load(memory_order_acquire);
- atomic_thread_fence(memory_order_seq_cst);
- size_t b = bottom.load(memory_order_acquire);
- int x = EMPTY;
- if (t < b) {
- /* Non-empty queue. */
- Array *a = (Array *)array.load(memory_order_acquire);
- x = a->buffer[t % a->size.load(memory_order_relaxed)].load(
- memory_order_relaxed);
- if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst,
- memory_order_relaxed))
- /* Failed race. */
- return ABORT;
- }
- return x;
- }
-};
-
-} // namespace cds_others
-
-#endif
--- /dev/null
+/*
+Copyright (c) 2017 Erik Rigtorp <erik@rigtorp.se>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <limits>
+#include <memory>
+#include <stdexcept>
+
+namespace rigtorp {
+
+template <typename T> class MPMCQueue {
+private:
+ static_assert(std::is_nothrow_copy_assignable<T>::value ||
+ std::is_nothrow_move_assignable<T>::value,
+ "T must be nothrow copy or move assignable");
+
+ static_assert(std::is_nothrow_destructible<T>::value,
+ "T must be nothrow destructible");
+
+public:
+ explicit MPMCQueue(const size_t capacity)
+ : capacity_(capacity), head_(0), tail_(0) {
+ if (capacity_ < 1) {
+ throw std::invalid_argument("capacity < 1");
+ }
+ size_t space = capacity * sizeof(Slot) + kCacheLineSize - 1;
+ buf_ = malloc(space);
+ if (buf_ == nullptr) {
+ throw std::bad_alloc();
+ }
+ void *buf = buf_;
+ slots_ = reinterpret_cast<Slot *>(
+ std::align(kCacheLineSize, capacity * sizeof(Slot), buf, space));
+ if (slots_ == nullptr) {
+ free(buf_);
+ throw std::bad_alloc();
+ }
+ for (size_t i = 0; i < capacity_; ++i) {
+ new (&slots_[i]) Slot();
+ }
+ static_assert(sizeof(MPMCQueue<T>) % kCacheLineSize == 0,
+ "MPMCQueue<T> size must be a multiple of cache line size to "
+ "prevent false sharing between adjacent queues");
+ static_assert(sizeof(Slot) % kCacheLineSize == 0,
+ "Slot size must be a multiple of cache line size to prevent "
+ "false sharing between adjacent slots");
+ assert(reinterpret_cast<size_t>(slots_) % kCacheLineSize == 0 &&
+ "slots_ array must be aligned to cache line size to prevent false "
+ "sharing between adjacent slots");
+ assert(reinterpret_cast<char *>(&tail_) -
+ reinterpret_cast<char *>(&head_) >=
+ kCacheLineSize &&
+ "head and tail must be a cache line apart to prevent false sharing");
+ }
+
+ ~MPMCQueue() noexcept {
+ for (size_t i = 0; i < capacity_; ++i) {
+ slots_[i].~Slot();
+ }
+ free(buf_);
+ }
+
+ // non-copyable and non-movable
+ MPMCQueue(const MPMCQueue &) = delete;
+ MPMCQueue &operator=(const MPMCQueue &) = delete;
+
+ template <typename... Args> void emplace(Args &&... args) noexcept {
+ static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
+ "T must be nothrow constructible with Args&&...");
+ auto const head = head_.fetch_add(1);
+ auto &slot = slots_[idx(head)];
+ while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire))
+ ;
+ slot.construct(std::forward<Args>(args)...);
+ slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
+ }
+
+ template <typename... Args> bool try_emplace(Args &&... args) noexcept {
+ static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
+ "T must be nothrow constructible with Args&&...");
+ auto head = head_.load(std::memory_order_acquire);
+ for (;;) {
+ auto &slot = slots_[idx(head)];
+ if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) {
+ if (head_.compare_exchange_strong(head, head + 1)) {
+ slot.construct(std::forward<Args>(args)...);
+ slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
+ return true;
+ }
+ } else {
+ auto const prevHead = head;
+ head = head_.load(std::memory_order_acquire);
+ if (head == prevHead) {
+ return false;
+ }
+ }
+ }
+ }
+
+ void push(const T &v) noexcept {
+ static_assert(std::is_nothrow_copy_constructible<T>::value,
+ "T must be nothrow copy constructible");
+ emplace(v);
+ }
+
+ template <typename P,
+ typename = typename std::enable_if<
+ std::is_nothrow_constructible<T, P &&>::value>::type>
+ void push(P &&v) noexcept {
+ emplace(std::forward<P>(v));
+ }
+
+ bool try_push(const T &v) noexcept {
+ static_assert(std::is_nothrow_copy_constructible<T>::value,
+ "T must be nothrow copy constructible");
+ return try_emplace(v);
+ }
+
+ template <typename P,
+ typename = typename std::enable_if<
+ std::is_nothrow_constructible<T, P &&>::value>::type>
+ bool try_push(P &&v) noexcept {
+ return try_emplace(std::forward<P>(v));
+ }
+
+ void pop(T &v) noexcept {
+ auto const tail = tail_.fetch_add(1);
+ auto &slot = slots_[idx(tail)];
+ while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
+ ;
+ v = slot.move();
+ slot.destroy();
+ slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
+ }
+
+ bool try_pop(T &v) noexcept {
+ auto tail = tail_.load(std::memory_order_acquire);
+ for (;;) {
+ auto &slot = slots_[idx(tail)];
+ if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) {
+ if (tail_.compare_exchange_strong(tail, tail + 1)) {
+ v = slot.move();
+ slot.destroy();
+ slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
+ return true;
+ }
+ } else {
+ auto const prevTail = tail;
+ tail = tail_.load(std::memory_order_acquire);
+ if (tail == prevTail) {
+ return false;
+ }
+ }
+ }
+ }
+
+private:
+ constexpr size_t idx(size_t i) const noexcept { return i % capacity_; }
+
+ constexpr size_t turn(size_t i) const noexcept { return i / capacity_; }
+
+ static constexpr size_t kCacheLineSize = 128;
+
+ struct Slot {
+ ~Slot() noexcept {
+ if (turn & 1) {
+ destroy();
+ }
+ }
+
+ template <typename... Args> void construct(Args &&... args) noexcept {
+ static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
+ "T must be nothrow constructible with Args&&...");
+ new (&storage) T(std::forward<Args>(args)...);
+ }
+
+ void destroy() noexcept {
+ static_assert(std::is_nothrow_destructible<T>::value,
+ "T must be nothrow destructible");
+ reinterpret_cast<T *>(&storage)->~T();
+ }
+
+ T &&move() noexcept { return reinterpret_cast<T &&>(storage); }
+
+ // Align to avoid false sharing between adjacent slots
+ alignas(kCacheLineSize) std::atomic<size_t> turn = {0};
+ typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
+ };
+
+private:
+ const size_t capacity_;
+ Slot *slots_;
+ void *buf_;
+
+ // Align to avoid false sharing between head_ and tail_
+ alignas(kCacheLineSize) std::atomic<size_t> head_;
+ alignas(kCacheLineSize) std::atomic<size_t> tail_;
+};
+}
--- /dev/null
+#ifndef _BACKOFF_H
+#define _BACKOFF_H
+
+#include <cds/algo/backoff_strategy.h>
+
+namespace cds_others {
+
+namespace bkoff = cds::backoff;
+struct BackoffTraits : public bkoff::exponential_const_traits {
+ static size_t lower_bound;
+ static size_t upper_bound;
+};
+typedef bkoff::exponential<BackoffTraits> ExpBackoff;
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _BARRIER_H
+#define _BARRIER_H
+
+#include "backoff.h"
+#include <atomic>
+
+namespace cds_others {
+
+class SpinBarrier {
+public:
+ SpinBarrier(unsigned int n) : n_(n) {
+ nwait_ = 0;
+ step_ = 0;
+ }
+
+ // The purpose of wait() is that threads that enter it synchronize with
+ // threads when they get out of it.
+ /** wildcard(2) is acq_rel, ensuring that all threads hb before other
+ * threads in the rmw chain order, then the wildcard (4) and (5) are
+ * release/acquire to make sure the last thread synchronize with all other
+ * earlier threads. Plus, the (4) and (5) synchronization can make sure the
+ * reset of nwait_ in wildcard(3) happens-before any other threads in the
+ * later usage of the barrier.
+ */
+
+ bool wait() {
+ unsigned int step = step_.load(std::memory_order_relaxed);
+
+ if (nwait_.fetch_add(1, std::memory_order_acq_rel) == n_ - 1) {
+ /* OK, last thread to come. */
+ nwait_.store(0, std::memory_order_relaxed);
+ step_.fetch_add(1, std::memory_order_release);
+ return true;
+ } else {
+ ExpBackoff backoff;
+ /* Run in circles and scream like a little girl. */
+ while (step_.load(std::memory_order_acquire) == step) {
+ backoff();
+ }
+ return false;
+ }
+ }
+
+protected:
+ /* Number of synchronized threads. */
+ const unsigned int n_;
+
+ /* Number of threads currently spinning. */
+ std::atomic<unsigned int> nwait_;
+
+ /* Number of barrier syncronizations completed so far,
+ * * it's OK to wrap. */
+ std::atomic<unsigned int> step_;
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _CHASE_LEV_DEQUE_H
+#define _CHASE_LEV_DEQUE_H
+
+#include <atomic>
+#include <cds/misc/backoff.h>
+#include <cstdlib>
+#include <inttypes.h>
+#include <iostream>
+
+namespace cds_others {
+
+#define EMPTY 0xffffffff
+#define ABORT 0xfffffffe
+
+using std::memory_order_seq_cst;
+using std::memory_order_release;
+using std::memory_order_acquire;
+using std::memory_order_acq_rel;
+using std::memory_order_relaxed;
+using std::atomic_int;
+using std::atomic_size_t;
+using std::atomic_uintptr_t;
+using std::size_t;
+
+class ChaseLevDeque {
+private:
+ atomic_size_t top;
+ atomic_size_t bottom;
+ atomic_uintptr_t array; /* Atomic(Array *) */
+
+public:
+ struct Array {
+ atomic_size_t size;
+ atomic_int buffer[];
+ };
+
+ ChaseLevDeque() {
+ Array *a = (Array *)calloc(1, sizeof(Array) + 2 * sizeof(atomic_int));
+ array.store((uintptr_t)a, memory_order_relaxed);
+ top.store(0, memory_order_relaxed);
+ bottom.store(0, memory_order_relaxed);
+ a->size.store(2, memory_order_relaxed);
+ }
+
+ int take() {
+ size_t b = bottom.load(memory_order_relaxed) - 1;
+ Array *a = (Array *)array.load(memory_order_relaxed);
+ bottom.store(b, memory_order_relaxed);
+ atomic_thread_fence(memory_order_seq_cst);
+ size_t t = top.load(memory_order_relaxed);
+ int x;
+ if (t <= b) {
+ /* Non-empty queue. */
+ x = a->buffer[b % a->size.load(memory_order_relaxed)].load(
+ memory_order_relaxed);
+ if (t == b) {
+ /* Single last element in queue. */
+ if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst,
+ memory_order_relaxed))
+ /* Failed race. */
+ x = EMPTY;
+ bottom.store(b + 1, memory_order_relaxed);
+ }
+ } else { /* Empty queue. */
+ x = EMPTY;
+ bottom.store(b + 1, memory_order_relaxed);
+ }
+ return x;
+ }
+
+ void resize() {
+ Array *a = (Array *)array.load(memory_order_relaxed);
+ size_t size = a->size.load(memory_order_relaxed);
+ size_t new_size = size << 1;
+ Array *new_a =
+ (Array *)calloc(1, new_size * sizeof(atomic_int) + sizeof(Array));
+ size_t t = top.load(memory_order_relaxed);
+ size_t b = bottom.load(memory_order_relaxed);
+ new_a->size.store(new_size, memory_order_relaxed);
+ size_t i;
+ for (i = t; i < b; i++) {
+ new_a->buffer[i % new_size].store(
+ a->buffer[i % size].load(memory_order_relaxed), memory_order_relaxed);
+ }
+ array.store((uintptr_t)new_a, memory_order_release);
+ // std::cout << "Resize to " << new_size << "\n";
+ }
+
+ void push(int x) {
+ size_t b = bottom.load(memory_order_relaxed);
+ size_t t = top.load(memory_order_acquire);
+ Array *a = (Array *)array.load(memory_order_relaxed);
+ if (b - t > a->size.load(memory_order_relaxed) - 1) /* Full queue. */ {
+ resize();
+ // Bug in paper...should have next line...
+ a = (Array *)array.load(memory_order_relaxed);
+ }
+ a->buffer[b % a->size.load(memory_order_relaxed)].store(
+ x, memory_order_relaxed);
+ atomic_thread_fence(memory_order_release);
+ bottom.store(b + 1, memory_order_relaxed);
+ }
+
+ int steal() {
+ size_t t = top.load(memory_order_acquire);
+ atomic_thread_fence(memory_order_seq_cst);
+ size_t b = bottom.load(memory_order_acquire);
+ int x = EMPTY;
+ if (t < b) {
+ /* Non-empty queue. */
+ Array *a = (Array *)array.load(memory_order_acquire);
+ x = a->buffer[t % a->size.load(memory_order_relaxed)].load(
+ memory_order_relaxed);
+ if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst,
+ memory_order_relaxed))
+ /* Failed race. */
+ return ABORT;
+ }
+ return x;
+ }
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _MCS_LOCK_H
+#define _MCS_LOCK_H
+
+#include "backoff.h"
+#include <atomic>
+#include <cds/algo/backoff_strategy.h>
+#include <thread>
+
+namespace cds_others {
+
+size_t BackoffTraits::lower_bound = 16;
+size_t BackoffTraits::upper_bound = 1024;
+
+// Forward declaration
+struct mcs_node;
+struct mcs_mutex;
+
+struct mcs_node {
+ std::atomic<mcs_node *> next;
+ std::atomic<int> gate;
+
+ mcs_node() {
+ next.store(0);
+ gate.store(0);
+ }
+};
+
+struct mcs_mutex {
+public:
+ // tail is null when lock is not held
+ std::atomic<mcs_node *> m_tail;
+
+ mcs_mutex() { m_tail.store(nullptr); }
+ ~mcs_mutex() { assert(m_tail.load() == nullptr); }
+
+ class guard {
+ public:
+ mcs_mutex *m_t;
+ mcs_node m_node; // node held on the stack
+
+ // Call the wrapper (instrument every lock/unlock)
+ guard(mcs_mutex *t) : m_t(t) { t->lock(this); }
+ ~guard() { m_t->unlock(this); }
+ };
+
+ void lock(guard *I) {
+ mcs_node *me = &(I->m_node);
+
+ // set up my node :
+ // not published yet so relaxed :
+ me->next.store(nullptr, std::memory_order_relaxed);
+ me->gate.store(1, std::memory_order_relaxed);
+
+ // publish my node as the new tail :
+ mcs_node *pred = m_tail.exchange(me, std::memory_order_acq_rel);
+ if (pred != nullptr) {
+ // (*1) race here
+ // unlock of pred can see me in the tail before I fill next
+
+ // If this is relaxed, the store 0 to gate will be read before and
+ // that lock will never ends.
+ // publish me to previous lock-holder :
+ pred->next.store(me, std::memory_order_release);
+
+ // (*2) pred not touched any more
+
+ // now this is the spin -
+ // wait on predecessor setting my flag -
+ ExpBackoff backoff;
+ while (me->gate.load(std::memory_order_acquire)) {
+ backoff();
+ }
+ }
+ }
+
+ void unlock(guard *I) {
+ mcs_node *me = &(I->m_node);
+ mcs_node *next = me->next.load(std::memory_order_acquire);
+ if (next == nullptr) {
+ mcs_node *tail_was_me = me;
+
+ // This was mo_acq_rel, which is stronger than necessary
+ if (m_tail.compare_exchange_strong(tail_was_me, nullptr,
+ std::memory_order_release)) {
+ // got null in tail, mutex is unlocked
+ return;
+ }
+
+ // (*1) catch the race :
+ ExpBackoff backoff;
+ for (;;) {
+ next = me->next.load(std::memory_order_acquire);
+ if (next != nullptr)
+ break;
+ backoff();
+ }
+ }
+
+ // (*2) - store to next must be done,
+ // so no locker can be viewing my node any more
+
+ next->gate.store(0, std::memory_order_release);
+ }
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _RWLOCK_H
+#define _RWLOCK_H
+
+#include "backoff.h"
+#include <atomic>
+#include <cds/algo/backoff_strategy.h>
+#include <thread>
+
+namespace cds_others {
+
+#define RW_LOCK_BIAS 0x00100000
+#define WRITE_LOCK_CMP RW_LOCK_BIAS
+
+using std::memory_order_acquire;
+using std::memory_order_release;
+using std::memory_order_relaxed;
+using std::atomic_int;
+
+class RWLock {
+public:
+ RWLock() {
+ lock.store(RW_LOCK_BIAS);
+ }
+
+ int read_can_lock() {
+ return atomic_load_explicit(&lock, memory_order_relaxed) > 0;
+ }
+
+ int write_can_lock() {
+ return atomic_load_explicit(&lock, memory_order_relaxed) == RW_LOCK_BIAS;
+ }
+
+ void read_lock() {
+ ExpBackoff backoff;
+ int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
+ while (priorvalue <= 0) {
+ atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed);
+ while (atomic_load_explicit(&lock, memory_order_relaxed) <= 0) {
+ backoff();
+ }
+ priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
+ }
+ }
+
+ void write_lock() {
+ int priorvalue =
+ atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
+ ExpBackoff backoff;
+ while (priorvalue != RW_LOCK_BIAS) {
+ atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed);
+ while (atomic_load_explicit(&lock, memory_order_relaxed) !=
+ RW_LOCK_BIAS) {
+ backoff();
+ }
+ priorvalue =
+ atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
+ }
+ }
+
+ int read_trylock() {
+ int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
+ if (priorvalue > 0)
+ return 1;
+
+ atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed);
+ return 0;
+ }
+
+ int write_trylock() {
+ int priorvalue =
+ atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
+ if (priorvalue == RW_LOCK_BIAS)
+ return 1;
+
+ atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed);
+ return 0;
+ }
+
+ void read_unlock() {
+ atomic_fetch_add_explicit(&lock, 1, memory_order_release);
+ }
+
+ void write_unlock() {
+ atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_release);
+ }
+
+private:
+ atomic_int lock;
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _SEQLOCK_H
+#define _SEQLOCK_H
+
+#include "backoff.h"
+#include <atomic>
+
+namespace cds_others {
+
+using std::atomic_int;
+using std::memory_order_release;
+using std::memory_order_acquire;
+using std::memory_order_relaxed;
+
+class SeqLock {
+private:
+ // Sequence for reader consistency check.
+ atomic_int seq_;
+ // It needs to be atomic to avoid data races
+ atomic_int data_;
+
+public:
+ SeqLock() {
+ atomic_init(&seq_, 0);
+ atomic_init(&data_, 0);
+ }
+
+ int read() {
+ ExpBackoff backoff;
+ while (true) {
+ int old_seq = seq_.load(memory_order_acquire);
+ if (old_seq % 2 == 1) {
+ backoff();
+ continue;
+ }
+
+ int res = data_.load(memory_order_acquire);
+ if (seq_.load(memory_order_relaxed) == old_seq) {
+ return res;
+ }
+ }
+ }
+
+ void write(int new_data) {
+ ExpBackoff backoff;
+ while (true) {
+ // This might be a relaxed too
+ int old_seq = seq_.load(memory_order_acquire);
+ if (old_seq % 2 == 1) {
+ backoff();
+ continue; // Retry
+ }
+
+ // Should be relaxed!!!
+ if (seq_.compare_exchange_strong(old_seq, old_seq + 1,
+ memory_order_relaxed,
+ memory_order_relaxed)) {
+ break;
+ }
+ }
+
+ // Update the data
+ data_.store(new_data, memory_order_release); // release
+
+ seq_.fetch_add(1, memory_order_release); // release
+ }
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _TICKET_LOCK_H
+#define _TICKET_LOCK_H
+
+#include "backoff.h"
+#include <atomic>
+
+namespace cds_others {
+
+class TicketLock {
+ /**
+ This ticket lock implementation is derived from the original Mellor-Crummey
+ & Scott paper <Algorithms for Scalable Synchronization on SharedMemory
+ Multiprocessors> in 1991. It assumes that the ticket and turn counter are
+ large enough to accommodate the maximum number of simultaneous requests for
+ the lock.
+ */
+public:
+ TicketLock() {
+ ticket.store(0, std::memory_order_relaxed);
+ turn.store(0, std::memory_order_relaxed);
+ }
+
+ void lock() {
+ // First grab a ticket
+ unsigned my_ticket = ticket.fetch_add(1, std::memory_order_relaxed);
+ // Spinning for my turn
+ ExpBackoff backoff;
+ while (true) {
+ unsigned my_turn = turn.load(std::memory_order_acquire);
+ if (my_turn == my_ticket) {
+ // Now it's my turn
+ return;
+ } else {
+ backoff();
+ }
+ }
+ }
+
+ void unlock() {
+ unsigned my_turn = turn.load(std::memory_order_relaxed);
+ turn.store(my_turn + 1, std::memory_order_release);
+ }
+
+private:
+ std::atomic_uint ticket;
+ std::atomic_uint turn;
+};
+
+} // namespace cds_others
+
+#endif
+++ /dev/null
-#ifndef _BACKOFF_H
-#define _BACKOFF_H
-
-#include <cds/algo/backoff_strategy.h>
-
-namespace cds_others {
-
-namespace bkoff = cds::backoff;
-struct BackoffTraits : public bkoff::exponential_const_traits {
- static size_t lower_bound;
- static size_t upper_bound;
-};
-typedef bkoff::exponential<BackoffTraits> ExpBackoff;
-
-} // namespace cds_others
-
-#endif
+++ /dev/null
-#ifndef _BARRIER_H
-#define _BARRIER_H
-
-#include "backoff.h"
-#include <atomic>
-
-namespace cds_others {
-
-class SpinBarrier {
-public:
- SpinBarrier(unsigned int n) : n_(n) {
- nwait_ = 0;
- step_ = 0;
- }
-
- // The purpose of wait() is that threads that enter it synchronize with
- // threads when they get out of it.
- /** wildcard(2) is acq_rel, ensuring that all threads hb before other
- * threads in the rmw chain order, then the wildcard (4) and (5) are
- * release/acquire to make sure the last thread synchronize with all other
- * earlier threads. Plus, the (4) and (5) synchronization can make sure the
- * reset of nwait_ in wildcard(3) happens-before any other threads in the
- * later usage of the barrier.
- */
-
- bool wait() {
- unsigned int step = step_.load(std::memory_order_relaxed);
-
- if (nwait_.fetch_add(1, std::memory_order_acq_rel) == n_ - 1) {
- /* OK, last thread to come. */
- nwait_.store(0, std::memory_order_relaxed);
- step_.fetch_add(1, std::memory_order_release);
- return true;
- } else {
- ExpBackoff backoff;
- /* Run in circles and scream like a little girl. */
- while (step_.load(std::memory_order_acquire) == step) {
- backoff();
- }
- return false;
- }
- }
-
-protected:
- /* Number of synchronized threads. */
- const unsigned int n_;
-
- /* Number of threads currently spinning. */
- std::atomic<unsigned int> nwait_;
-
- /* Number of barrier syncronizations completed so far,
- * * it's OK to wrap. */
- std::atomic<unsigned int> step_;
-};
-
-} // namespace cds_others
-
-#endif
+++ /dev/null
-#ifndef _MCS_LOCK_H
-#define _MCS_LOCK_H
-
-#include "backoff.h"
-#include <atomic>
-#include <cds/algo/backoff_strategy.h>
-#include <thread>
-
-namespace cds_others {
-
-size_t BackoffTraits::lower_bound = 16;
-size_t BackoffTraits::upper_bound = 1024;
-
-// Forward declaration
-struct mcs_node;
-struct mcs_mutex;
-
-struct mcs_node {
- std::atomic<mcs_node *> next;
- std::atomic<int> gate;
-
- mcs_node() {
- next.store(0);
- gate.store(0);
- }
-};
-
-struct mcs_mutex {
-public:
- // tail is null when lock is not held
- std::atomic<mcs_node *> m_tail;
-
- mcs_mutex() { m_tail.store(nullptr); }
- ~mcs_mutex() { assert(m_tail.load() == nullptr); }
-
- class guard {
- public:
- mcs_mutex *m_t;
- mcs_node m_node; // node held on the stack
-
- // Call the wrapper (instrument every lock/unlock)
- guard(mcs_mutex *t) : m_t(t) { t->lock(this); }
- ~guard() { m_t->unlock(this); }
- };
-
- void lock(guard *I) {
- mcs_node *me = &(I->m_node);
-
- // set up my node :
- // not published yet so relaxed :
- me->next.store(nullptr, std::memory_order_relaxed);
- me->gate.store(1, std::memory_order_relaxed);
-
- // publish my node as the new tail :
- mcs_node *pred = m_tail.exchange(me, std::memory_order_acq_rel);
- if (pred != nullptr) {
- // (*1) race here
- // unlock of pred can see me in the tail before I fill next
-
- // If this is relaxed, the store 0 to gate will be read before and
- // that lock will never ends.
- // publish me to previous lock-holder :
- pred->next.store(me, std::memory_order_release);
-
- // (*2) pred not touched any more
-
- // now this is the spin -
- // wait on predecessor setting my flag -
- ExpBackoff backoff;
- while (me->gate.load(std::memory_order_acquire)) {
- backoff();
- }
- }
- }
-
- void unlock(guard *I) {
- mcs_node *me = &(I->m_node);
- mcs_node *next = me->next.load(std::memory_order_acquire);
- if (next == nullptr) {
- mcs_node *tail_was_me = me;
-
- // This was mo_acq_rel, which is stronger than necessary
- if (m_tail.compare_exchange_strong(tail_was_me, nullptr,
- std::memory_order_release)) {
- // got null in tail, mutex is unlocked
- return;
- }
-
- // (*1) catch the race :
- ExpBackoff backoff;
- for (;;) {
- next = me->next.load(std::memory_order_acquire);
- if (next != nullptr)
- break;
- backoff();
- }
- }
-
- // (*2) - store to next must be done,
- // so no locker can be viewing my node any more
-
- next->gate.store(0, std::memory_order_release);
- }
-};
-
-} // namespace cds_others
-
-#endif
+++ /dev/null
-#ifndef _RWLOCK_H
-#define _RWLOCK_H
-
-#include "backoff.h"
-#include <atomic>
-#include <cds/algo/backoff_strategy.h>
-#include <thread>
-
-namespace cds_others {
-
-#define RW_LOCK_BIAS 0x00100000
-#define WRITE_LOCK_CMP RW_LOCK_BIAS
-
-using std::memory_order_acquire;
-using std::memory_order_release;
-using std::memory_order_relaxed;
-using std::atomic_int;
-
-class RWLock {
-public:
- RWLock() {
- lock.store(RW_LOCK_BIAS);
- }
-
- int read_can_lock() {
- return atomic_load_explicit(&lock, memory_order_relaxed) > 0;
- }
-
- int write_can_lock() {
- return atomic_load_explicit(&lock, memory_order_relaxed) == RW_LOCK_BIAS;
- }
-
- void read_lock() {
- ExpBackoff backoff;
- int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
- while (priorvalue <= 0) {
- atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed);
- while (atomic_load_explicit(&lock, memory_order_relaxed) <= 0) {
- backoff();
- }
- priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
- }
- }
-
- void write_lock() {
- int priorvalue =
- atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
- ExpBackoff backoff;
- while (priorvalue != RW_LOCK_BIAS) {
- atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed);
- while (atomic_load_explicit(&lock, memory_order_relaxed) !=
- RW_LOCK_BIAS) {
- backoff();
- }
- priorvalue =
- atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
- }
- }
-
- int read_trylock() {
- int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
- if (priorvalue > 0)
- return 1;
-
- atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed);
- return 0;
- }
-
- int write_trylock() {
- int priorvalue =
- atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
- if (priorvalue == RW_LOCK_BIAS)
- return 1;
-
- atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed);
- return 0;
- }
-
- void read_unlock() {
- atomic_fetch_add_explicit(&lock, 1, memory_order_release);
- }
-
- void write_unlock() {
- atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_release);
- }
-
-private:
- atomic_int lock;
-};
-
-} // namespace cds_others
-
-#endif
+++ /dev/null
-#ifndef _SEQLOCK_H
-#define _SEQLOCK_H
-
-#include "backoff.h"
-#include <atomic>
-
-namespace cds_others {
-
-using std::atomic_int;
-using std::memory_order_release;
-using std::memory_order_acquire;
-using std::memory_order_relaxed;
-
-class SeqLock {
-private:
- // Sequence for reader consistency check.
- atomic_int seq_;
- // It needs to be atomic to avoid data races
- atomic_int data_;
-
-public:
- SeqLock() {
- atomic_init(&seq_, 0);
- atomic_init(&data_, 0);
- }
-
- int read() {
- ExpBackoff backoff;
- while (true) {
- int old_seq = seq_.load(memory_order_acquire);
- if (old_seq % 2 == 1) {
- backoff();
- continue;
- }
-
- int res = data_.load(memory_order_acquire);
- if (seq_.load(memory_order_relaxed) == old_seq) {
- return res;
- }
- }
- }
-
- void write(int new_data) {
- ExpBackoff backoff;
- while (true) {
- // This might be a relaxed too
- int old_seq = seq_.load(memory_order_acquire);
- if (old_seq % 2 == 1) {
- backoff();
- continue; // Retry
- }
-
- // Should be relaxed!!!
- if (seq_.compare_exchange_strong(old_seq, old_seq + 1,
- memory_order_relaxed,
- memory_order_relaxed)) {
- break;
- }
- }
-
- // Update the data
- data_.store(new_data, memory_order_release); // release
-
- seq_.fetch_add(1, memory_order_release); // release
- }
-};
-
-} // namespace cds_others
-
-#endif
+++ /dev/null
-#ifndef _TICKET_LOCK_H
-#define _TICKET_LOCK_H
-
-#include "backoff.h"
-#include <atomic>
-
-namespace cds_others {
-
-class TicketLock {
- /**
- This ticket lock implementation is derived from the original Mellor-Crummey
- & Scott paper <Algorithms for Scalable Synchronization on SharedMemory
- Multiprocessors> in 1991. It assumes that the ticket and turn counter are
- large enough to accommodate the maximum number of simultaneous requests for
- the lock.
- */
-public:
- TicketLock() {
- ticket.store(0, std::memory_order_relaxed);
- turn.store(0, std::memory_order_relaxed);
- }
-
- void lock() {
- // First grab a ticket
- unsigned my_ticket = ticket.fetch_add(1, std::memory_order_relaxed);
- // Spinning for my turn
- ExpBackoff backoff;
- while (true) {
- unsigned my_turn = turn.load(std::memory_order_acquire);
- if (my_turn == my_ticket) {
- // Now it's my turn
- return;
- } else {
- backoff();
- }
- }
- }
-
- void unlock() {
- unsigned my_turn = turn.load(std::memory_order_relaxed);
- turn.store(my_turn + 1, std::memory_order_release);
- }
-
-private:
- std::atomic_uint ticket;
- std::atomic_uint turn;
-};
-
-} // namespace cds_others
-
-#endif
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/barrier.h>
+#include <cds/misc/barrier.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
#include "common.h"
-#include <cds/container/chase-lev-deque.h>
+#include <cds/misc/chase-lev-deque.h>
#include <cds_test/stress_test.h>
#include <cstdlib>
#include <ctime>
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/mcs-lock.h>
+#include <cds/misc/mcs-lock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/rwlock.h>
+#include <cds/misc/rwlock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/seqlock.h>
+#include <cds/misc/seqlock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
#include <cds/sync/spinlock.h>
-#include <cds/sync/ticket_lock.h>
+#include <cds/misc/ticket_lock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <iostream>
set(CDSSTRESS_STACK_SOURCES
../../main.cpp
- spinlock_driver.cpp
+ rigtorp_mpmc_driver.cpp
deque_driver.cpp
+ spinlock_driver.cpp
seqlock_driver.cpp
rwlock_driver.cpp
mcslock_driver.cpp
#include "common.h"
-#include <cds/container/chase-lev-deque.h>
+#include <cds/misc/chase-lev-deque.h>
#include <cds_test/stress_test.h>
#include <cstdlib>
#include <ctime>
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/mcs-lock.h>
+#include <cds/misc/mcs-lock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
--- /dev/null
+#include "common.h"
+#include <cds/misc/RigtorpMPMCQueue.h>
+#include <cds_test/stress_test.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class rigtorpMPMCQueueTest : public cds_test::stress_fixture {
+protected:
+ static size_t s_nRigtorpMPMCQueuePassCount;
+ static size_t s_nRigtorpMPMCQueueEnqueueStride;
+ static size_t s_nRigtorpMPMCQueueCapacity;
+
+ static void SetUpTestCase() {
+ cds_test::config const &cfg = get_config("Misc");
+ GetConfig(RigtorpMPMCQueuePassCount);
+ GetConfig(RigtorpMPMCQueueEnqueueStride);
+ GetConfig(RigtorpMPMCQueueCapacity);
+ }
+
+ void test() {
+ rigtorp::MPMCQueue<size_t> q(s_nRigtorpMPMCQueueCapacity);
+ size_t nNo = 0;
+ size_t pop_sum = 0;
+
+ while (nNo < s_nRigtorpMPMCQueuePassCount) {
+ size_t curr_push_count =
+ std::min(s_nRigtorpMPMCQueuePassCount - nNo, s_nRigtorpMPMCQueueEnqueueStride);
+ for (size_t i = 0; i < curr_push_count; i++) {
+ q.push(nNo);
+ ++nNo;
+ }
+
+ for (size_t i = 0; i < curr_push_count; i++) {
+ size_t res;
+ q.pop(res);
+ pop_sum += res;
+ }
+ }
+
+ size_t supposed_sum =
+ s_nRigtorpMPMCQueuePassCount * (s_nRigtorpMPMCQueuePassCount - 1) / 2;
+ if (pop_sum != supposed_sum) {
+ std::cout << "Sequential rigtorpMPMC queue pop sum: " << pop_sum
+ << " != " << supposed_sum << "\n";
+ }
+ }
+};
+
+size_t rigtorpMPMCQueueTest::s_nRigtorpMPMCQueuePassCount;
+size_t rigtorpMPMCQueueTest::s_nRigtorpMPMCQueueEnqueueStride;
+size_t rigtorpMPMCQueueTest::s_nRigtorpMPMCQueueCapacity;
+
+TEST_F(rigtorpMPMCQueueTest, PushPop) {
+ test();
+}
+
+} // namespace
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/rwlock.h>
+#include <cds/misc/rwlock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
#include <atomic>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
-#include <cds/sync/seqlock.h>
+#include <cds/misc/seqlock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <memory>
#include <cds/gc/dhp.h>
#include <cds/gc/hp.h>
#include <cds/sync/spinlock.h>
-#include <cds/sync/ticket_lock.h>
+#include <cds/misc/ticket_lock.h>
#include <cds_test/stress_test.h>
#include <iostream>
#include <iostream>