From: Peizhao Ou Date: Fri, 26 Jan 2018 18:50:44 +0000 (-0800) Subject: Reorgs added benchmarks (put them in misc folder) X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=a09deddad8c64f9663fdfccbab67a43cf214543f;p=libcds.git Reorgs added benchmarks (put them in misc folder) --- diff --git a/cds/container/chase-lev-deque.h b/cds/container/chase-lev-deque.h deleted file mode 100644 index 4f812584..00000000 --- a/cds/container/chase-lev-deque.h +++ /dev/null @@ -1,125 +0,0 @@ -#ifndef _CHASE_LEV_DEQUE_H -#define _CHASE_LEV_DEQUE_H - -#include -#include -#include -#include -#include - -namespace cds_others { - -#define EMPTY 0xffffffff -#define ABORT 0xfffffffe - -using std::memory_order_seq_cst; -using std::memory_order_release; -using std::memory_order_acquire; -using std::memory_order_acq_rel; -using std::memory_order_relaxed; -using std::atomic_int; -using std::atomic_size_t; -using std::atomic_uintptr_t; -using std::size_t; - -class ChaseLevDeque { -private: - atomic_size_t top; - atomic_size_t bottom; - atomic_uintptr_t array; /* Atomic(Array *) */ - -public: - struct Array { - atomic_size_t size; - atomic_int buffer[]; - }; - - ChaseLevDeque() { - Array *a = (Array *)calloc(1, sizeof(Array) + 2 * sizeof(atomic_int)); - array.store((uintptr_t)a, memory_order_relaxed); - top.store(0, memory_order_relaxed); - bottom.store(0, memory_order_relaxed); - a->size.store(2, memory_order_relaxed); - } - - int take() { - size_t b = bottom.load(memory_order_relaxed) - 1; - Array *a = (Array *)array.load(memory_order_relaxed); - bottom.store(b, memory_order_relaxed); - atomic_thread_fence(memory_order_seq_cst); - size_t t = top.load(memory_order_relaxed); - int x; - if (t <= b) { - /* Non-empty queue. */ - x = a->buffer[b % a->size.load(memory_order_relaxed)].load( - memory_order_relaxed); - if (t == b) { - /* Single last element in queue. */ - if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst, - memory_order_relaxed)) - /* Failed race. */ - x = EMPTY; - bottom.store(b + 1, memory_order_relaxed); - } - } else { /* Empty queue. */ - x = EMPTY; - bottom.store(b + 1, memory_order_relaxed); - } - return x; - } - - void resize() { - Array *a = (Array *)array.load(memory_order_relaxed); - size_t size = a->size.load(memory_order_relaxed); - size_t new_size = size << 1; - Array *new_a = - (Array *)calloc(1, new_size * sizeof(atomic_int) + sizeof(Array)); - size_t t = top.load(memory_order_relaxed); - size_t b = bottom.load(memory_order_relaxed); - new_a->size.store(new_size, memory_order_relaxed); - size_t i; - for (i = t; i < b; i++) { - new_a->buffer[i % new_size].store( - a->buffer[i % size].load(memory_order_relaxed), memory_order_relaxed); - } - array.store((uintptr_t)new_a, memory_order_release); - // std::cout << "Resize to " << new_size << "\n"; - } - - void push(int x) { - size_t b = bottom.load(memory_order_relaxed); - size_t t = top.load(memory_order_acquire); - Array *a = (Array *)array.load(memory_order_relaxed); - if (b - t > a->size.load(memory_order_relaxed) - 1) /* Full queue. */ { - resize(); - // Bug in paper...should have next line... - a = (Array *)array.load(memory_order_relaxed); - } - a->buffer[b % a->size.load(memory_order_relaxed)].store( - x, memory_order_relaxed); - atomic_thread_fence(memory_order_release); - bottom.store(b + 1, memory_order_relaxed); - } - - int steal() { - size_t t = top.load(memory_order_acquire); - atomic_thread_fence(memory_order_seq_cst); - size_t b = bottom.load(memory_order_acquire); - int x = EMPTY; - if (t < b) { - /* Non-empty queue. */ - Array *a = (Array *)array.load(memory_order_acquire); - x = a->buffer[t % a->size.load(memory_order_relaxed)].load( - memory_order_relaxed); - if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst, - memory_order_relaxed)) - /* Failed race. */ - return ABORT; - } - return x; - } -}; - -} // namespace cds_others - -#endif diff --git a/cds/misc/RigtorpMPMCQueue.h b/cds/misc/RigtorpMPMCQueue.h new file mode 100644 index 00000000..5f56c4b7 --- /dev/null +++ b/cds/misc/RigtorpMPMCQueue.h @@ -0,0 +1,221 @@ +/* +Copyright (c) 2017 Erik Rigtorp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace rigtorp { + +template class MPMCQueue { +private: + static_assert(std::is_nothrow_copy_assignable::value || + std::is_nothrow_move_assignable::value, + "T must be nothrow copy or move assignable"); + + static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + +public: + explicit MPMCQueue(const size_t capacity) + : capacity_(capacity), head_(0), tail_(0) { + if (capacity_ < 1) { + throw std::invalid_argument("capacity < 1"); + } + size_t space = capacity * sizeof(Slot) + kCacheLineSize - 1; + buf_ = malloc(space); + if (buf_ == nullptr) { + throw std::bad_alloc(); + } + void *buf = buf_; + slots_ = reinterpret_cast( + std::align(kCacheLineSize, capacity * sizeof(Slot), buf, space)); + if (slots_ == nullptr) { + free(buf_); + throw std::bad_alloc(); + } + for (size_t i = 0; i < capacity_; ++i) { + new (&slots_[i]) Slot(); + } + static_assert(sizeof(MPMCQueue) % kCacheLineSize == 0, + "MPMCQueue size must be a multiple of cache line size to " + "prevent false sharing between adjacent queues"); + static_assert(sizeof(Slot) % kCacheLineSize == 0, + "Slot size must be a multiple of cache line size to prevent " + "false sharing between adjacent slots"); + assert(reinterpret_cast(slots_) % kCacheLineSize == 0 && + "slots_ array must be aligned to cache line size to prevent false " + "sharing between adjacent slots"); + assert(reinterpret_cast(&tail_) - + reinterpret_cast(&head_) >= + kCacheLineSize && + "head and tail must be a cache line apart to prevent false sharing"); + } + + ~MPMCQueue() noexcept { + for (size_t i = 0; i < capacity_; ++i) { + slots_[i].~Slot(); + } + free(buf_); + } + + // non-copyable and non-movable + MPMCQueue(const MPMCQueue &) = delete; + MPMCQueue &operator=(const MPMCQueue &) = delete; + + template void emplace(Args &&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + auto const head = head_.fetch_add(1); + auto &slot = slots_[idx(head)]; + while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) + ; + slot.construct(std::forward(args)...); + slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); + } + + template bool try_emplace(Args &&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + auto head = head_.load(std::memory_order_acquire); + for (;;) { + auto &slot = slots_[idx(head)]; + if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { + if (head_.compare_exchange_strong(head, head + 1)) { + slot.construct(std::forward(args)...); + slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); + return true; + } + } else { + auto const prevHead = head; + head = head_.load(std::memory_order_acquire); + if (head == prevHead) { + return false; + } + } + } + } + + void push(const T &v) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); + emplace(v); + } + + template ::value>::type> + void push(P &&v) noexcept { + emplace(std::forward

(v)); + } + + bool try_push(const T &v) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); + return try_emplace(v); + } + + template ::value>::type> + bool try_push(P &&v) noexcept { + return try_emplace(std::forward

(v)); + } + + void pop(T &v) noexcept { + auto const tail = tail_.fetch_add(1); + auto &slot = slots_[idx(tail)]; + while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) + ; + v = slot.move(); + slot.destroy(); + slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); + } + + bool try_pop(T &v) noexcept { + auto tail = tail_.load(std::memory_order_acquire); + for (;;) { + auto &slot = slots_[idx(tail)]; + if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) { + if (tail_.compare_exchange_strong(tail, tail + 1)) { + v = slot.move(); + slot.destroy(); + slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); + return true; + } + } else { + auto const prevTail = tail; + tail = tail_.load(std::memory_order_acquire); + if (tail == prevTail) { + return false; + } + } + } + } + +private: + constexpr size_t idx(size_t i) const noexcept { return i % capacity_; } + + constexpr size_t turn(size_t i) const noexcept { return i / capacity_; } + + static constexpr size_t kCacheLineSize = 128; + + struct Slot { + ~Slot() noexcept { + if (turn & 1) { + destroy(); + } + } + + template void construct(Args &&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + new (&storage) T(std::forward(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + reinterpret_cast(&storage)->~T(); + } + + T &&move() noexcept { return reinterpret_cast(storage); } + + // Align to avoid false sharing between adjacent slots + alignas(kCacheLineSize) std::atomic turn = {0}; + typename std::aligned_storage::type storage; + }; + +private: + const size_t capacity_; + Slot *slots_; + void *buf_; + + // Align to avoid false sharing between head_ and tail_ + alignas(kCacheLineSize) std::atomic head_; + alignas(kCacheLineSize) std::atomic tail_; +}; +} diff --git a/cds/misc/backoff.h b/cds/misc/backoff.h new file mode 100644 index 00000000..bcbcec50 --- /dev/null +++ b/cds/misc/backoff.h @@ -0,0 +1,17 @@ +#ifndef _BACKOFF_H +#define _BACKOFF_H + +#include + +namespace cds_others { + +namespace bkoff = cds::backoff; +struct BackoffTraits : public bkoff::exponential_const_traits { + static size_t lower_bound; + static size_t upper_bound; +}; +typedef bkoff::exponential ExpBackoff; + +} // namespace cds_others + +#endif diff --git a/cds/misc/barrier.h b/cds/misc/barrier.h new file mode 100644 index 00000000..6d0366c7 --- /dev/null +++ b/cds/misc/barrier.h @@ -0,0 +1,58 @@ +#ifndef _BARRIER_H +#define _BARRIER_H + +#include "backoff.h" +#include + +namespace cds_others { + +class SpinBarrier { +public: + SpinBarrier(unsigned int n) : n_(n) { + nwait_ = 0; + step_ = 0; + } + + // The purpose of wait() is that threads that enter it synchronize with + // threads when they get out of it. + /** wildcard(2) is acq_rel, ensuring that all threads hb before other + * threads in the rmw chain order, then the wildcard (4) and (5) are + * release/acquire to make sure the last thread synchronize with all other + * earlier threads. Plus, the (4) and (5) synchronization can make sure the + * reset of nwait_ in wildcard(3) happens-before any other threads in the + * later usage of the barrier. + */ + + bool wait() { + unsigned int step = step_.load(std::memory_order_relaxed); + + if (nwait_.fetch_add(1, std::memory_order_acq_rel) == n_ - 1) { + /* OK, last thread to come. */ + nwait_.store(0, std::memory_order_relaxed); + step_.fetch_add(1, std::memory_order_release); + return true; + } else { + ExpBackoff backoff; + /* Run in circles and scream like a little girl. */ + while (step_.load(std::memory_order_acquire) == step) { + backoff(); + } + return false; + } + } + +protected: + /* Number of synchronized threads. */ + const unsigned int n_; + + /* Number of threads currently spinning. */ + std::atomic nwait_; + + /* Number of barrier syncronizations completed so far, + * * it's OK to wrap. */ + std::atomic step_; +}; + +} // namespace cds_others + +#endif diff --git a/cds/misc/chase-lev-deque.h b/cds/misc/chase-lev-deque.h new file mode 100644 index 00000000..a5c6cc2a --- /dev/null +++ b/cds/misc/chase-lev-deque.h @@ -0,0 +1,125 @@ +#ifndef _CHASE_LEV_DEQUE_H +#define _CHASE_LEV_DEQUE_H + +#include +#include +#include +#include +#include + +namespace cds_others { + +#define EMPTY 0xffffffff +#define ABORT 0xfffffffe + +using std::memory_order_seq_cst; +using std::memory_order_release; +using std::memory_order_acquire; +using std::memory_order_acq_rel; +using std::memory_order_relaxed; +using std::atomic_int; +using std::atomic_size_t; +using std::atomic_uintptr_t; +using std::size_t; + +class ChaseLevDeque { +private: + atomic_size_t top; + atomic_size_t bottom; + atomic_uintptr_t array; /* Atomic(Array *) */ + +public: + struct Array { + atomic_size_t size; + atomic_int buffer[]; + }; + + ChaseLevDeque() { + Array *a = (Array *)calloc(1, sizeof(Array) + 2 * sizeof(atomic_int)); + array.store((uintptr_t)a, memory_order_relaxed); + top.store(0, memory_order_relaxed); + bottom.store(0, memory_order_relaxed); + a->size.store(2, memory_order_relaxed); + } + + int take() { + size_t b = bottom.load(memory_order_relaxed) - 1; + Array *a = (Array *)array.load(memory_order_relaxed); + bottom.store(b, memory_order_relaxed); + atomic_thread_fence(memory_order_seq_cst); + size_t t = top.load(memory_order_relaxed); + int x; + if (t <= b) { + /* Non-empty queue. */ + x = a->buffer[b % a->size.load(memory_order_relaxed)].load( + memory_order_relaxed); + if (t == b) { + /* Single last element in queue. */ + if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst, + memory_order_relaxed)) + /* Failed race. */ + x = EMPTY; + bottom.store(b + 1, memory_order_relaxed); + } + } else { /* Empty queue. */ + x = EMPTY; + bottom.store(b + 1, memory_order_relaxed); + } + return x; + } + + void resize() { + Array *a = (Array *)array.load(memory_order_relaxed); + size_t size = a->size.load(memory_order_relaxed); + size_t new_size = size << 1; + Array *new_a = + (Array *)calloc(1, new_size * sizeof(atomic_int) + sizeof(Array)); + size_t t = top.load(memory_order_relaxed); + size_t b = bottom.load(memory_order_relaxed); + new_a->size.store(new_size, memory_order_relaxed); + size_t i; + for (i = t; i < b; i++) { + new_a->buffer[i % new_size].store( + a->buffer[i % size].load(memory_order_relaxed), memory_order_relaxed); + } + array.store((uintptr_t)new_a, memory_order_release); + // std::cout << "Resize to " << new_size << "\n"; + } + + void push(int x) { + size_t b = bottom.load(memory_order_relaxed); + size_t t = top.load(memory_order_acquire); + Array *a = (Array *)array.load(memory_order_relaxed); + if (b - t > a->size.load(memory_order_relaxed) - 1) /* Full queue. */ { + resize(); + // Bug in paper...should have next line... + a = (Array *)array.load(memory_order_relaxed); + } + a->buffer[b % a->size.load(memory_order_relaxed)].store( + x, memory_order_relaxed); + atomic_thread_fence(memory_order_release); + bottom.store(b + 1, memory_order_relaxed); + } + + int steal() { + size_t t = top.load(memory_order_acquire); + atomic_thread_fence(memory_order_seq_cst); + size_t b = bottom.load(memory_order_acquire); + int x = EMPTY; + if (t < b) { + /* Non-empty queue. */ + Array *a = (Array *)array.load(memory_order_acquire); + x = a->buffer[t % a->size.load(memory_order_relaxed)].load( + memory_order_relaxed); + if (!top.compare_exchange_strong(t, t + 1, memory_order_seq_cst, + memory_order_relaxed)) + /* Failed race. */ + return ABORT; + } + return x; + } +}; + +} // namespace cds_others + +#endif diff --git a/cds/misc/mcs-lock.h b/cds/misc/mcs-lock.h new file mode 100644 index 00000000..d5cda3a6 --- /dev/null +++ b/cds/misc/mcs-lock.h @@ -0,0 +1,108 @@ +#ifndef _MCS_LOCK_H +#define _MCS_LOCK_H + +#include "backoff.h" +#include +#include +#include + +namespace cds_others { + +size_t BackoffTraits::lower_bound = 16; +size_t BackoffTraits::upper_bound = 1024; + +// Forward declaration +struct mcs_node; +struct mcs_mutex; + +struct mcs_node { + std::atomic next; + std::atomic gate; + + mcs_node() { + next.store(0); + gate.store(0); + } +}; + +struct mcs_mutex { +public: + // tail is null when lock is not held + std::atomic m_tail; + + mcs_mutex() { m_tail.store(nullptr); } + ~mcs_mutex() { assert(m_tail.load() == nullptr); } + + class guard { + public: + mcs_mutex *m_t; + mcs_node m_node; // node held on the stack + + // Call the wrapper (instrument every lock/unlock) + guard(mcs_mutex *t) : m_t(t) { t->lock(this); } + ~guard() { m_t->unlock(this); } + }; + + void lock(guard *I) { + mcs_node *me = &(I->m_node); + + // set up my node : + // not published yet so relaxed : + me->next.store(nullptr, std::memory_order_relaxed); + me->gate.store(1, std::memory_order_relaxed); + + // publish my node as the new tail : + mcs_node *pred = m_tail.exchange(me, std::memory_order_acq_rel); + if (pred != nullptr) { + // (*1) race here + // unlock of pred can see me in the tail before I fill next + + // If this is relaxed, the store 0 to gate will be read before and + // that lock will never ends. + // publish me to previous lock-holder : + pred->next.store(me, std::memory_order_release); + + // (*2) pred not touched any more + + // now this is the spin - + // wait on predecessor setting my flag - + ExpBackoff backoff; + while (me->gate.load(std::memory_order_acquire)) { + backoff(); + } + } + } + + void unlock(guard *I) { + mcs_node *me = &(I->m_node); + mcs_node *next = me->next.load(std::memory_order_acquire); + if (next == nullptr) { + mcs_node *tail_was_me = me; + + // This was mo_acq_rel, which is stronger than necessary + if (m_tail.compare_exchange_strong(tail_was_me, nullptr, + std::memory_order_release)) { + // got null in tail, mutex is unlocked + return; + } + + // (*1) catch the race : + ExpBackoff backoff; + for (;;) { + next = me->next.load(std::memory_order_acquire); + if (next != nullptr) + break; + backoff(); + } + } + + // (*2) - store to next must be done, + // so no locker can be viewing my node any more + + next->gate.store(0, std::memory_order_release); + } +}; + +} // namespace cds_others + +#endif diff --git a/cds/misc/rwlock.h b/cds/misc/rwlock.h new file mode 100644 index 00000000..e8c6465e --- /dev/null +++ b/cds/misc/rwlock.h @@ -0,0 +1,93 @@ +#ifndef _RWLOCK_H +#define _RWLOCK_H + +#include "backoff.h" +#include +#include +#include + +namespace cds_others { + +#define RW_LOCK_BIAS 0x00100000 +#define WRITE_LOCK_CMP RW_LOCK_BIAS + +using std::memory_order_acquire; +using std::memory_order_release; +using std::memory_order_relaxed; +using std::atomic_int; + +class RWLock { +public: + RWLock() { + lock.store(RW_LOCK_BIAS); + } + + int read_can_lock() { + return atomic_load_explicit(&lock, memory_order_relaxed) > 0; + } + + int write_can_lock() { + return atomic_load_explicit(&lock, memory_order_relaxed) == RW_LOCK_BIAS; + } + + void read_lock() { + ExpBackoff backoff; + int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); + while (priorvalue <= 0) { + atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed); + while (atomic_load_explicit(&lock, memory_order_relaxed) <= 0) { + backoff(); + } + priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); + } + } + + void write_lock() { + int priorvalue = + atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); + ExpBackoff backoff; + while (priorvalue != RW_LOCK_BIAS) { + atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed); + while (atomic_load_explicit(&lock, memory_order_relaxed) != + RW_LOCK_BIAS) { + backoff(); + } + priorvalue = + atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); + } + } + + int read_trylock() { + int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); + if (priorvalue > 0) + return 1; + + atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed); + return 0; + } + + int write_trylock() { + int priorvalue = + atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); + if (priorvalue == RW_LOCK_BIAS) + return 1; + + atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed); + return 0; + } + + void read_unlock() { + atomic_fetch_add_explicit(&lock, 1, memory_order_release); + } + + void write_unlock() { + atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_release); + } + +private: + atomic_int lock; +}; + +} // namespace cds_others + +#endif diff --git a/cds/misc/seqlock.h b/cds/misc/seqlock.h new file mode 100644 index 00000000..74270539 --- /dev/null +++ b/cds/misc/seqlock.h @@ -0,0 +1,70 @@ +#ifndef _SEQLOCK_H +#define _SEQLOCK_H + +#include "backoff.h" +#include + +namespace cds_others { + +using std::atomic_int; +using std::memory_order_release; +using std::memory_order_acquire; +using std::memory_order_relaxed; + +class SeqLock { +private: + // Sequence for reader consistency check. + atomic_int seq_; + // It needs to be atomic to avoid data races + atomic_int data_; + +public: + SeqLock() { + atomic_init(&seq_, 0); + atomic_init(&data_, 0); + } + + int read() { + ExpBackoff backoff; + while (true) { + int old_seq = seq_.load(memory_order_acquire); + if (old_seq % 2 == 1) { + backoff(); + continue; + } + + int res = data_.load(memory_order_acquire); + if (seq_.load(memory_order_relaxed) == old_seq) { + return res; + } + } + } + + void write(int new_data) { + ExpBackoff backoff; + while (true) { + // This might be a relaxed too + int old_seq = seq_.load(memory_order_acquire); + if (old_seq % 2 == 1) { + backoff(); + continue; // Retry + } + + // Should be relaxed!!! + if (seq_.compare_exchange_strong(old_seq, old_seq + 1, + memory_order_relaxed, + memory_order_relaxed)) { + break; + } + } + + // Update the data + data_.store(new_data, memory_order_release); // release + + seq_.fetch_add(1, memory_order_release); // release + } +}; + +} // namespace cds_others + +#endif diff --git a/cds/misc/ticket_lock.h b/cds/misc/ticket_lock.h new file mode 100644 index 00000000..233b4400 --- /dev/null +++ b/cds/misc/ticket_lock.h @@ -0,0 +1,51 @@ +#ifndef _TICKET_LOCK_H +#define _TICKET_LOCK_H + +#include "backoff.h" +#include + +namespace cds_others { + +class TicketLock { + /** + This ticket lock implementation is derived from the original Mellor-Crummey + & Scott paper in 1991. It assumes that the ticket and turn counter are + large enough to accommodate the maximum number of simultaneous requests for + the lock. + */ +public: + TicketLock() { + ticket.store(0, std::memory_order_relaxed); + turn.store(0, std::memory_order_relaxed); + } + + void lock() { + // First grab a ticket + unsigned my_ticket = ticket.fetch_add(1, std::memory_order_relaxed); + // Spinning for my turn + ExpBackoff backoff; + while (true) { + unsigned my_turn = turn.load(std::memory_order_acquire); + if (my_turn == my_ticket) { + // Now it's my turn + return; + } else { + backoff(); + } + } + } + + void unlock() { + unsigned my_turn = turn.load(std::memory_order_relaxed); + turn.store(my_turn + 1, std::memory_order_release); + } + +private: + std::atomic_uint ticket; + std::atomic_uint turn; +}; + +} // namespace cds_others + +#endif diff --git a/cds/sync/backoff.h b/cds/sync/backoff.h deleted file mode 100644 index bcbcec50..00000000 --- a/cds/sync/backoff.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef _BACKOFF_H -#define _BACKOFF_H - -#include - -namespace cds_others { - -namespace bkoff = cds::backoff; -struct BackoffTraits : public bkoff::exponential_const_traits { - static size_t lower_bound; - static size_t upper_bound; -}; -typedef bkoff::exponential ExpBackoff; - -} // namespace cds_others - -#endif diff --git a/cds/sync/barrier.h b/cds/sync/barrier.h deleted file mode 100644 index 6d0366c7..00000000 --- a/cds/sync/barrier.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef _BARRIER_H -#define _BARRIER_H - -#include "backoff.h" -#include - -namespace cds_others { - -class SpinBarrier { -public: - SpinBarrier(unsigned int n) : n_(n) { - nwait_ = 0; - step_ = 0; - } - - // The purpose of wait() is that threads that enter it synchronize with - // threads when they get out of it. - /** wildcard(2) is acq_rel, ensuring that all threads hb before other - * threads in the rmw chain order, then the wildcard (4) and (5) are - * release/acquire to make sure the last thread synchronize with all other - * earlier threads. Plus, the (4) and (5) synchronization can make sure the - * reset of nwait_ in wildcard(3) happens-before any other threads in the - * later usage of the barrier. - */ - - bool wait() { - unsigned int step = step_.load(std::memory_order_relaxed); - - if (nwait_.fetch_add(1, std::memory_order_acq_rel) == n_ - 1) { - /* OK, last thread to come. */ - nwait_.store(0, std::memory_order_relaxed); - step_.fetch_add(1, std::memory_order_release); - return true; - } else { - ExpBackoff backoff; - /* Run in circles and scream like a little girl. */ - while (step_.load(std::memory_order_acquire) == step) { - backoff(); - } - return false; - } - } - -protected: - /* Number of synchronized threads. */ - const unsigned int n_; - - /* Number of threads currently spinning. */ - std::atomic nwait_; - - /* Number of barrier syncronizations completed so far, - * * it's OK to wrap. */ - std::atomic step_; -}; - -} // namespace cds_others - -#endif diff --git a/cds/sync/mcs-lock.h b/cds/sync/mcs-lock.h deleted file mode 100644 index d5cda3a6..00000000 --- a/cds/sync/mcs-lock.h +++ /dev/null @@ -1,108 +0,0 @@ -#ifndef _MCS_LOCK_H -#define _MCS_LOCK_H - -#include "backoff.h" -#include -#include -#include - -namespace cds_others { - -size_t BackoffTraits::lower_bound = 16; -size_t BackoffTraits::upper_bound = 1024; - -// Forward declaration -struct mcs_node; -struct mcs_mutex; - -struct mcs_node { - std::atomic next; - std::atomic gate; - - mcs_node() { - next.store(0); - gate.store(0); - } -}; - -struct mcs_mutex { -public: - // tail is null when lock is not held - std::atomic m_tail; - - mcs_mutex() { m_tail.store(nullptr); } - ~mcs_mutex() { assert(m_tail.load() == nullptr); } - - class guard { - public: - mcs_mutex *m_t; - mcs_node m_node; // node held on the stack - - // Call the wrapper (instrument every lock/unlock) - guard(mcs_mutex *t) : m_t(t) { t->lock(this); } - ~guard() { m_t->unlock(this); } - }; - - void lock(guard *I) { - mcs_node *me = &(I->m_node); - - // set up my node : - // not published yet so relaxed : - me->next.store(nullptr, std::memory_order_relaxed); - me->gate.store(1, std::memory_order_relaxed); - - // publish my node as the new tail : - mcs_node *pred = m_tail.exchange(me, std::memory_order_acq_rel); - if (pred != nullptr) { - // (*1) race here - // unlock of pred can see me in the tail before I fill next - - // If this is relaxed, the store 0 to gate will be read before and - // that lock will never ends. - // publish me to previous lock-holder : - pred->next.store(me, std::memory_order_release); - - // (*2) pred not touched any more - - // now this is the spin - - // wait on predecessor setting my flag - - ExpBackoff backoff; - while (me->gate.load(std::memory_order_acquire)) { - backoff(); - } - } - } - - void unlock(guard *I) { - mcs_node *me = &(I->m_node); - mcs_node *next = me->next.load(std::memory_order_acquire); - if (next == nullptr) { - mcs_node *tail_was_me = me; - - // This was mo_acq_rel, which is stronger than necessary - if (m_tail.compare_exchange_strong(tail_was_me, nullptr, - std::memory_order_release)) { - // got null in tail, mutex is unlocked - return; - } - - // (*1) catch the race : - ExpBackoff backoff; - for (;;) { - next = me->next.load(std::memory_order_acquire); - if (next != nullptr) - break; - backoff(); - } - } - - // (*2) - store to next must be done, - // so no locker can be viewing my node any more - - next->gate.store(0, std::memory_order_release); - } -}; - -} // namespace cds_others - -#endif diff --git a/cds/sync/rwlock.h b/cds/sync/rwlock.h deleted file mode 100644 index e8c6465e..00000000 --- a/cds/sync/rwlock.h +++ /dev/null @@ -1,93 +0,0 @@ -#ifndef _RWLOCK_H -#define _RWLOCK_H - -#include "backoff.h" -#include -#include -#include - -namespace cds_others { - -#define RW_LOCK_BIAS 0x00100000 -#define WRITE_LOCK_CMP RW_LOCK_BIAS - -using std::memory_order_acquire; -using std::memory_order_release; -using std::memory_order_relaxed; -using std::atomic_int; - -class RWLock { -public: - RWLock() { - lock.store(RW_LOCK_BIAS); - } - - int read_can_lock() { - return atomic_load_explicit(&lock, memory_order_relaxed) > 0; - } - - int write_can_lock() { - return atomic_load_explicit(&lock, memory_order_relaxed) == RW_LOCK_BIAS; - } - - void read_lock() { - ExpBackoff backoff; - int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); - while (priorvalue <= 0) { - atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed); - while (atomic_load_explicit(&lock, memory_order_relaxed) <= 0) { - backoff(); - } - priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); - } - } - - void write_lock() { - int priorvalue = - atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); - ExpBackoff backoff; - while (priorvalue != RW_LOCK_BIAS) { - atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed); - while (atomic_load_explicit(&lock, memory_order_relaxed) != - RW_LOCK_BIAS) { - backoff(); - } - priorvalue = - atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); - } - } - - int read_trylock() { - int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire); - if (priorvalue > 0) - return 1; - - atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed); - return 0; - } - - int write_trylock() { - int priorvalue = - atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire); - if (priorvalue == RW_LOCK_BIAS) - return 1; - - atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed); - return 0; - } - - void read_unlock() { - atomic_fetch_add_explicit(&lock, 1, memory_order_release); - } - - void write_unlock() { - atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_release); - } - -private: - atomic_int lock; -}; - -} // namespace cds_others - -#endif diff --git a/cds/sync/seqlock.h b/cds/sync/seqlock.h deleted file mode 100644 index 74270539..00000000 --- a/cds/sync/seqlock.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef _SEQLOCK_H -#define _SEQLOCK_H - -#include "backoff.h" -#include - -namespace cds_others { - -using std::atomic_int; -using std::memory_order_release; -using std::memory_order_acquire; -using std::memory_order_relaxed; - -class SeqLock { -private: - // Sequence for reader consistency check. - atomic_int seq_; - // It needs to be atomic to avoid data races - atomic_int data_; - -public: - SeqLock() { - atomic_init(&seq_, 0); - atomic_init(&data_, 0); - } - - int read() { - ExpBackoff backoff; - while (true) { - int old_seq = seq_.load(memory_order_acquire); - if (old_seq % 2 == 1) { - backoff(); - continue; - } - - int res = data_.load(memory_order_acquire); - if (seq_.load(memory_order_relaxed) == old_seq) { - return res; - } - } - } - - void write(int new_data) { - ExpBackoff backoff; - while (true) { - // This might be a relaxed too - int old_seq = seq_.load(memory_order_acquire); - if (old_seq % 2 == 1) { - backoff(); - continue; // Retry - } - - // Should be relaxed!!! - if (seq_.compare_exchange_strong(old_seq, old_seq + 1, - memory_order_relaxed, - memory_order_relaxed)) { - break; - } - } - - // Update the data - data_.store(new_data, memory_order_release); // release - - seq_.fetch_add(1, memory_order_release); // release - } -}; - -} // namespace cds_others - -#endif diff --git a/cds/sync/ticket_lock.h b/cds/sync/ticket_lock.h deleted file mode 100644 index 233b4400..00000000 --- a/cds/sync/ticket_lock.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef _TICKET_LOCK_H -#define _TICKET_LOCK_H - -#include "backoff.h" -#include - -namespace cds_others { - -class TicketLock { - /** - This ticket lock implementation is derived from the original Mellor-Crummey - & Scott paper in 1991. It assumes that the ticket and turn counter are - large enough to accommodate the maximum number of simultaneous requests for - the lock. - */ -public: - TicketLock() { - ticket.store(0, std::memory_order_relaxed); - turn.store(0, std::memory_order_relaxed); - } - - void lock() { - // First grab a ticket - unsigned my_ticket = ticket.fetch_add(1, std::memory_order_relaxed); - // Spinning for my turn - ExpBackoff backoff; - while (true) { - unsigned my_turn = turn.load(std::memory_order_acquire); - if (my_turn == my_ticket) { - // Now it's my turn - return; - } else { - backoff(); - } - } - } - - void unlock() { - unsigned my_turn = turn.load(std::memory_order_relaxed); - turn.store(my_turn + 1, std::memory_order_release); - } - -private: - std::atomic_uint ticket; - std::atomic_uint turn; -}; - -} // namespace cds_others - -#endif diff --git a/test/stress/misc/barrier_driver.cpp b/test/stress/misc/barrier_driver.cpp index 3ffe5aec..90ca2399 100644 --- a/test/stress/misc/barrier_driver.cpp +++ b/test/stress/misc/barrier_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/misc/deque_driver.cpp b/test/stress/misc/deque_driver.cpp index 60809472..8976c3cd 100644 --- a/test/stress/misc/deque_driver.cpp +++ b/test/stress/misc/deque_driver.cpp @@ -1,5 +1,5 @@ #include "common.h" -#include +#include #include #include #include diff --git a/test/stress/misc/mcslock_driver.cpp b/test/stress/misc/mcslock_driver.cpp index e37e4324..ea3370ae 100644 --- a/test/stress/misc/mcslock_driver.cpp +++ b/test/stress/misc/mcslock_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/misc/rwlock_driver.cpp b/test/stress/misc/rwlock_driver.cpp index 548d16a9..fa425c9e 100644 --- a/test/stress/misc/rwlock_driver.cpp +++ b/test/stress/misc/rwlock_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/misc/seqlock_driver.cpp b/test/stress/misc/seqlock_driver.cpp index ba47d4a9..c3ee487a 100644 --- a/test/stress/misc/seqlock_driver.cpp +++ b/test/stress/misc/seqlock_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/misc/spinlock_driver.cpp b/test/stress/misc/spinlock_driver.cpp index 159c18af..94c6c7ce 100644 --- a/test/stress/misc/spinlock_driver.cpp +++ b/test/stress/misc/spinlock_driver.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/sequential/sequential-misc/CMakeLists.txt b/test/stress/sequential/sequential-misc/CMakeLists.txt index 6b48b52a..c5d1c335 100644 --- a/test/stress/sequential/sequential-misc/CMakeLists.txt +++ b/test/stress/sequential/sequential-misc/CMakeLists.txt @@ -2,8 +2,9 @@ set(PACKAGE_NAME stress-sequential-misc) set(CDSSTRESS_STACK_SOURCES ../../main.cpp - spinlock_driver.cpp + rigtorp_mpmc_driver.cpp deque_driver.cpp + spinlock_driver.cpp seqlock_driver.cpp rwlock_driver.cpp mcslock_driver.cpp diff --git a/test/stress/sequential/sequential-misc/deque_driver.cpp b/test/stress/sequential/sequential-misc/deque_driver.cpp index 9fe98137..4fc584d1 100644 --- a/test/stress/sequential/sequential-misc/deque_driver.cpp +++ b/test/stress/sequential/sequential-misc/deque_driver.cpp @@ -1,5 +1,5 @@ #include "common.h" -#include +#include #include #include #include diff --git a/test/stress/sequential/sequential-misc/mcslock_driver.cpp b/test/stress/sequential/sequential-misc/mcslock_driver.cpp index 5914f559..d0b83cb6 100644 --- a/test/stress/sequential/sequential-misc/mcslock_driver.cpp +++ b/test/stress/sequential/sequential-misc/mcslock_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/sequential/sequential-misc/rigtorp_mpmc_driver.cpp b/test/stress/sequential/sequential-misc/rigtorp_mpmc_driver.cpp new file mode 100644 index 00000000..743a64aa --- /dev/null +++ b/test/stress/sequential/sequential-misc/rigtorp_mpmc_driver.cpp @@ -0,0 +1,61 @@ +#include "common.h" +#include +#include +#include +#include + +using namespace std; + +namespace { + +class rigtorpMPMCQueueTest : public cds_test::stress_fixture { +protected: + static size_t s_nRigtorpMPMCQueuePassCount; + static size_t s_nRigtorpMPMCQueueEnqueueStride; + static size_t s_nRigtorpMPMCQueueCapacity; + + static void SetUpTestCase() { + cds_test::config const &cfg = get_config("Misc"); + GetConfig(RigtorpMPMCQueuePassCount); + GetConfig(RigtorpMPMCQueueEnqueueStride); + GetConfig(RigtorpMPMCQueueCapacity); + } + + void test() { + rigtorp::MPMCQueue q(s_nRigtorpMPMCQueueCapacity); + size_t nNo = 0; + size_t pop_sum = 0; + + while (nNo < s_nRigtorpMPMCQueuePassCount) { + size_t curr_push_count = + std::min(s_nRigtorpMPMCQueuePassCount - nNo, s_nRigtorpMPMCQueueEnqueueStride); + for (size_t i = 0; i < curr_push_count; i++) { + q.push(nNo); + ++nNo; + } + + for (size_t i = 0; i < curr_push_count; i++) { + size_t res; + q.pop(res); + pop_sum += res; + } + } + + size_t supposed_sum = + s_nRigtorpMPMCQueuePassCount * (s_nRigtorpMPMCQueuePassCount - 1) / 2; + if (pop_sum != supposed_sum) { + std::cout << "Sequential rigtorpMPMC queue pop sum: " << pop_sum + << " != " << supposed_sum << "\n"; + } + } +}; + +size_t rigtorpMPMCQueueTest::s_nRigtorpMPMCQueuePassCount; +size_t rigtorpMPMCQueueTest::s_nRigtorpMPMCQueueEnqueueStride; +size_t rigtorpMPMCQueueTest::s_nRigtorpMPMCQueueCapacity; + +TEST_F(rigtorpMPMCQueueTest, PushPop) { + test(); +} + +} // namespace diff --git a/test/stress/sequential/sequential-misc/rwlock_driver.cpp b/test/stress/sequential/sequential-misc/rwlock_driver.cpp index 61dc9893..aee9463a 100644 --- a/test/stress/sequential/sequential-misc/rwlock_driver.cpp +++ b/test/stress/sequential/sequential-misc/rwlock_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/sequential/sequential-misc/seqlock_driver.cpp b/test/stress/sequential/sequential-misc/seqlock_driver.cpp index f3680b64..21313b3f 100644 --- a/test/stress/sequential/sequential-misc/seqlock_driver.cpp +++ b/test/stress/sequential/sequential-misc/seqlock_driver.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/stress/sequential/sequential-misc/spinlock_driver.cpp b/test/stress/sequential/sequential-misc/spinlock_driver.cpp index bd2c42a3..99a48153 100644 --- a/test/stress/sequential/sequential-misc/spinlock_driver.cpp +++ b/test/stress/sequential/sequential-misc/spinlock_driver.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include