--- /dev/null
+#ifndef _BACKOFF_H
+#define _BACKOFF_H
+
+#include <cds/algo/backoff_strategy.h>
+
+namespace cds_others {
+
+namespace bkoff = cds::backoff;
+struct BackoffTraits : public bkoff::exponential_const_traits {
+ static size_t lower_bound;
+ static size_t upper_bound;
+};
+typedef bkoff::exponential<BackoffTraits> ExpBackoff;
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _BARRIER_H
+#define _BARRIER_H
+
+#include "backoff.h"
+#include <atomic>
+
+namespace cds_others {
+
+class SpinBarrier {
+public:
+ SpinBarrier(unsigned int n) : n_(n) {
+ nwait_ = 0;
+ step_ = 0;
+ }
+
+ // The purpose of wait() is that threads that enter it synchronize with
+ // threads when they get out of it.
+ /** wildcard(2) is acq_rel, ensuring that all threads hb before other
+ * threads in the rmw chain order, then the wildcard (4) and (5) are
+ * release/acquire to make sure the last thread synchronize with all other
+ * earlier threads. Plus, the (4) and (5) synchronization can make sure the
+ * reset of nwait_ in wildcard(3) happens-before any other threads in the
+ * later usage of the barrier.
+ */
+
+ bool wait() {
+ unsigned int step = step_.load(std::memory_order_relaxed);
+
+ if (nwait_.fetch_add(1, std::memory_order_acq_rel) == n_ - 1) {
+ /* OK, last thread to come. */
+ nwait_.store(0, std::memory_order_relaxed);
+ step_.fetch_add(1, std::memory_order_release);
+ return true;
+ } else {
+ ExpBackoff backoff;
+ /* Run in circles and scream like a little girl. */
+ while (step_.load(std::memory_order_acquire) == step) {
+ backoff();
+ }
+ return false;
+ }
+ }
+
+protected:
+ /* Number of synchronized threads. */
+ const unsigned int n_;
+
+ /* Number of threads currently spinning. */
+ std::atomic<unsigned int> nwait_;
+
+ /* Number of barrier syncronizations completed so far,
+ * * it's OK to wrap. */
+ std::atomic<unsigned int> step_;
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _MCS_LOCK_H
+#define _MCS_LOCK_H
+
+#include "backoff.h"
+#include <atomic>
+#include <cds/algo/backoff_strategy.h>
+#include <thread>
+
+namespace cds_others {
+
+size_t BackoffTraits::lower_bound = 16;
+size_t BackoffTraits::upper_bound = 1024;
+
+// Forward declaration
+struct mcs_node;
+struct mcs_mutex;
+
+struct mcs_node {
+ std::atomic<mcs_node *> next;
+ std::atomic<int> gate;
+
+ mcs_node() {
+ next.store(0);
+ gate.store(0);
+ }
+};
+
+struct mcs_mutex {
+public:
+ // tail is null when lock is not held
+ std::atomic<mcs_node *> m_tail;
+
+ mcs_mutex() { m_tail.store(nullptr); }
+ ~mcs_mutex() { assert(m_tail.load() == nullptr); }
+
+ class guard {
+ public:
+ mcs_mutex *m_t;
+ mcs_node m_node; // node held on the stack
+
+ // Call the wrapper (instrument every lock/unlock)
+ guard(mcs_mutex *t) : m_t(t) { t->lock(this); }
+ ~guard() { m_t->unlock(this); }
+ };
+
+ void lock(guard *I) {
+ mcs_node *me = &(I->m_node);
+
+ // set up my node :
+ // not published yet so relaxed :
+ me->next.store(nullptr, std::memory_order_relaxed);
+ me->gate.store(1, std::memory_order_relaxed);
+
+ // publish my node as the new tail :
+ mcs_node *pred = m_tail.exchange(me, std::memory_order_acq_rel);
+ if (pred != nullptr) {
+ // (*1) race here
+ // unlock of pred can see me in the tail before I fill next
+
+ // If this is relaxed, the store 0 to gate will be read before and
+ // that lock will never ends.
+ // publish me to previous lock-holder :
+ pred->next.store(me, std::memory_order_release);
+
+ // (*2) pred not touched any more
+
+ // now this is the spin -
+ // wait on predecessor setting my flag -
+ ExpBackoff backoff;
+ while (me->gate.load(std::memory_order_acquire)) {
+ backoff();
+ }
+ }
+ }
+
+ void unlock(guard *I) {
+ mcs_node *me = &(I->m_node);
+ mcs_node *next = me->next.load(std::memory_order_acquire);
+ if (next == nullptr) {
+ mcs_node *tail_was_me = me;
+
+ // This was mo_acq_rel, which is stronger than necessary
+ if (m_tail.compare_exchange_strong(tail_was_me, nullptr,
+ std::memory_order_release)) {
+ // got null in tail, mutex is unlocked
+ return;
+ }
+
+ // (*1) catch the race :
+ ExpBackoff backoff;
+ for (;;) {
+ next = me->next.load(std::memory_order_acquire);
+ if (next != nullptr)
+ break;
+ backoff();
+ }
+ }
+
+ // (*2) - store to next must be done,
+ // so no locker can be viewing my node any more
+
+ next->gate.store(0, std::memory_order_release);
+ }
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _RWLOCK_H
+#define _RWLOCK_H
+
+#include "backoff.h"
+#include <atomic>
+#include <cds/algo/backoff_strategy.h>
+#include <thread>
+
+namespace cds_others {
+
+#define RW_LOCK_BIAS 0x00100000
+#define WRITE_LOCK_CMP RW_LOCK_BIAS
+
+using std::memory_order_acquire;
+using std::memory_order_release;
+using std::memory_order_relaxed;
+using std::atomic_int;
+
+class RWLock {
+public:
+ RWLock() {
+ lock.store(RW_LOCK_BIAS);
+ }
+
+ int read_can_lock() {
+ return atomic_load_explicit(&lock, memory_order_relaxed) > 0;
+ }
+
+ int write_can_lock() {
+ return atomic_load_explicit(&lock, memory_order_relaxed) == RW_LOCK_BIAS;
+ }
+
+ void read_lock() {
+ ExpBackoff backoff;
+ int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
+ while (priorvalue <= 0) {
+ atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed);
+ while (atomic_load_explicit(&lock, memory_order_relaxed) <= 0) {
+ backoff();
+ }
+ priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
+ }
+ }
+
+ void write_lock() {
+ int priorvalue =
+ atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
+ ExpBackoff backoff;
+ while (priorvalue != RW_LOCK_BIAS) {
+ atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed);
+ while (atomic_load_explicit(&lock, memory_order_relaxed) !=
+ RW_LOCK_BIAS) {
+ backoff();
+ }
+ priorvalue =
+ atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
+ }
+ }
+
+ int read_trylock() {
+ int priorvalue = atomic_fetch_sub_explicit(&lock, 1, memory_order_acquire);
+ if (priorvalue > 0)
+ return 1;
+
+ atomic_fetch_add_explicit(&lock, 1, memory_order_relaxed);
+ return 0;
+ }
+
+ int write_trylock() {
+ int priorvalue =
+ atomic_fetch_sub_explicit(&lock, RW_LOCK_BIAS, memory_order_acquire);
+ if (priorvalue == RW_LOCK_BIAS)
+ return 1;
+
+ atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_relaxed);
+ return 0;
+ }
+
+ void read_unlock() {
+ atomic_fetch_add_explicit(&lock, 1, memory_order_release);
+ }
+
+ void write_unlock() {
+ atomic_fetch_add_explicit(&lock, RW_LOCK_BIAS, memory_order_release);
+ }
+
+private:
+ atomic_int lock;
+};
+
+} // namespace cds_others
+
+#endif
--- /dev/null
+#ifndef _SEQLOCK_H
+#define _SEQLOCK_H
+
+#include <atomic>
+
+namespace cds_others {
+
+using std::atomic_int;
+using std::memory_order_release;
+using std::memory_order_acquire;
+using std::memory_order_relaxed;
+
+class SeqLock {
+private:
+ // Sequence for reader consistency check.
+ atomic_int seq_;
+ // It needs to be atomic to avoid data races
+ atomic_int data_;
+
+public:
+ SeqLock() {
+ atomic_init(&seq_, 0);
+ atomic_init(&data_, 0);
+ }
+
+ int read() {
+ while (true) {
+ int old_seq = seq_.load(memory_order_acquire); // acquire
+ if (old_seq % 2 == 1)
+ continue;
+
+ int res = data_.load(memory_order_acquire); // acquire
+ if (seq_.load(memory_order_relaxed) == old_seq) { // relaxed
+ return res;
+ }
+ }
+ }
+
+ void write(int new_data) {
+ while (true) {
+ // This might be a relaxed too
+ int old_seq = seq_.load(memory_order_acquire); // acquire
+ if (old_seq % 2 == 1)
+ continue; // Retry
+
+ // Should be relaxed!!!
+ if (seq_.compare_exchange_strong(old_seq, old_seq + 1,
+ memory_order_relaxed,
+ memory_order_relaxed)) // relaxed
+ break;
+ }
+
+ // Update the data
+ data_.store(new_data, memory_order_release); // release
+
+ seq_.fetch_add(1, memory_order_release); // release
+ }
+};
+
+} // namespace cds_others
+
+#endif
${CMAKE_CURRENT_SOURCE_DIR}
)
+add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/misc)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/freelist)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/map)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/pqueue)
add_custom_target( stress-all
DEPENDS
+ stress-misc
stress-freelist
stress-map
stress-pqueue
--- /dev/null
+set(PACKAGE_NAME stress-misc)
+
+set(CDSSTRESS_STACK_SOURCES
+ ../main.cpp
+ barrier_driver.cpp
+ seqlock_driver.cpp
+ rwlock_driver.cpp
+ mcslock_driver.cpp
+ spinlock_driver.cpp
+)
+
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}
+)
+
+add_executable(${PACKAGE_NAME} ${CDSSTRESS_STACK_SOURCES})
+target_link_libraries(${PACKAGE_NAME} ${CDS_TEST_LIBRARIES} ${CDSSTRESS_FRAMEWORK_LIBRARY})
+
+add_test(NAME ${PACKAGE_NAME} COMMAND ${PACKAGE_NAME} WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH})
--- /dev/null
+#include <atomic>
+#include <cds/gc/dhp.h>
+#include <cds/gc/hp.h>
+#include <cds/sync/barrier.h>
+#include <cds_test/stress_test.h>
+#include <iostream>
+#include <thread>
+
+using namespace std;
+
+namespace {
+
+typedef cds_others::SpinBarrier Barrier;
+
+class BarrierTest : public cds_test::stress_fixture {
+protected:
+ static Barrier *barrier;
+ static int count;
+ static const int kThreads = 6;
+ static const int kPassCount = 10000;
+
+ static void SetUpTestCase() {}
+
+ static void TearDownTestCase() {
+ assert (count == kPassCount*kPassCount);
+ }
+
+ static void Thread() {
+ for (int i = 0; i < kPassCount; i++) {
+ for (int j = 0; j < kPassCount; j++) {
+ if (barrier->wait()) {
+ count++;
+ }
+ }
+ }
+ }
+};
+
+Barrier *BarrierTest::barrier;
+int BarrierTest::count;
+const int BarrierTest::kThreads;
+
+TEST_F(BarrierTest, Wait) {
+ barrier = new Barrier(kThreads);
+ int num_threads = kThreads;
+ std::thread *threads = new std::thread[num_threads];
+ for (int i = 0; i < num_threads; i++) {
+ threads[i] = std::thread(Thread);
+ }
+
+ for (int i = 0; i < num_threads; i++) {
+ threads[i].join();
+ }
+}
+
+} // namespace
--- /dev/null
+#include <atomic>
+#include <cds/gc/dhp.h>
+#include <cds/gc/hp.h>
+#include <cds/sync/mcs-lock.h>
+#include <cds_test/stress_test.h>
+#include <iostream>
+#include <thread>
+
+using namespace std;
+
+namespace {
+
+class MCSLockTest : public cds_test::stress_fixture {
+protected:
+ static int x;
+ static cds_others::mcs_mutex *my_mutex;
+ static const int kThreads = 6;
+
+ static void SetUpTestCase() {}
+
+ static void Thread() {
+ cds_others::mcs_mutex::guard g(my_mutex);
+ x = 1;
+ my_mutex->unlock(&g);
+ for (int i = 0; i < 10000; i++) {
+ for (int j = 0; j < 300; j++) {
+ my_mutex->lock(&g);
+ x = i + j;
+ my_mutex->unlock(&g);
+ }
+ }
+ my_mutex->lock(&g);
+ }
+};
+
+int MCSLockTest::x;
+cds_others::mcs_mutex *MCSLockTest::my_mutex;
+const int MCSLockTest::kThreads;
+
+TEST_F(MCSLockTest, BasicLockUnlock) {
+ my_mutex = new cds_others::mcs_mutex();
+ std::thread threads[kThreads];
+ for (int i = 0; i < kThreads; i++) {
+ threads[i] = std::thread(Thread);
+ }
+
+ for (int i = 0; i < kThreads; i++) {
+ threads[i].join();
+ }
+}
+
+} // namespace
--- /dev/null
+#include <atomic>
+#include <cds/gc/dhp.h>
+#include <cds/gc/hp.h>
+#include <cds/sync/rwlock.h>
+#include <cds_test/stress_test.h>
+#include <iostream>
+#include <thread>
+
+using namespace std;
+
+namespace {
+
+typedef cds_others::RWLock RWLock;
+class RWLockTest : public cds_test::stress_fixture {
+protected:
+ static int sum;
+ static int x;
+ static RWLock *rwlock;
+ static const int kReaderThreads = 0;
+ static const int kWriterThreads = 0;
+ static const int kReaderWriterThreads = 6;
+ static const int kWriterPercentage = 20;
+ static const int kRWPassCount = 20000;
+
+ static void SetUpTestCase() {}
+
+ static void ReaderThread() {
+ for (int i = 0; i < 10000; i++) {
+ for (int j = 0; j < 10; i++) {
+ if (rwlock->read_can_lock()) {
+ if (!rwlock->read_trylock()) {
+ rwlock->read_lock();
+ }
+ sum += x;
+ rwlock->read_unlock();
+ } else {
+ rwlock->read_lock();
+ sum += x;
+ rwlock->read_unlock();
+ }
+ }
+ }
+ }
+
+ static void WriterThread() {
+ for (int i = 0; i < 10000; i++) {
+ if (rwlock->write_can_lock()) {
+ if (!rwlock->write_trylock()) {
+ rwlock->write_lock();
+ }
+ x += 1;
+ rwlock->write_unlock();
+ } else {
+ rwlock->write_lock();
+ x += 1;
+ rwlock->write_unlock();
+ }
+ }
+ }
+
+ static void ReaderWriterThread() {
+ for (int i = 0; i < kRWPassCount; i++) {
+ for (int j = 0; j < kRWPassCount; j++) {
+ if (rand(100) < kWriterPercentage) {
+ if (rwlock->read_can_lock()) {
+ if (!rwlock->read_trylock()) {
+ rwlock->read_lock();
+ }
+ sum += x;
+ rwlock->read_unlock();
+ } else {
+ rwlock->read_lock();
+ sum += x;
+ rwlock->read_unlock();
+ }
+ } else {
+ if (rwlock->write_can_lock()) {
+ if (!rwlock->write_trylock()) {
+ rwlock->write_lock();
+ }
+ x += 1;
+ rwlock->write_unlock();
+ } else {
+ rwlock->write_lock();
+ x += 1;
+ rwlock->write_unlock();
+ }
+ }
+ }
+ }
+ }
+};
+
+int RWLockTest::x;
+int RWLockTest::sum;
+RWLock *RWLockTest::rwlock;
+const int RWLockTest::kReaderThreads;
+const int RWLockTest::kWriterThreads;
+const int RWLockTest::kReaderWriterThreads;
+const int RWLockTest::kRWPassCount;
+
+TEST_F(RWLockTest, BasicLockUnlock) {
+ rwlock = new RWLock();
+ int num_threads = kReaderThreads + kWriterThreads + kReaderWriterThreads;
+ std::thread *threads = new std::thread[num_threads];
+ for (int i = 0; i < kReaderThreads; i++) {
+ threads[i] = std::thread(ReaderThread);
+ }
+ for (int i = kReaderThreads; i < (kReaderThreads + kWriterThreads); i++) {
+ threads[i] = std::thread(WriterThread);
+ }
+ for (int i = (kReaderThreads + kWriterThreads); i < num_threads; i++) {
+ threads[i] = std::thread(ReaderWriterThread);
+ }
+
+ for (int i = 0; i < num_threads; i++) {
+ threads[i].join();
+ }
+}
+
+} // namespace
--- /dev/null
+#include <cds/container/rwqueue.h>
+#include <cstdlib>
+#include <ctime>
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+cds::container::RWQueue<int> queue;
+
+void InitQueue() {
+ for (int i = 0; i < 2000000; i++) {
+ queue.enqueue(rand() % 100);
+ }
+}
+
+void ProducerThread() {
+ for (int i = 0; i < 1000000; i++) {
+ for (int j = 0; j < 50; j++) {
+ queue.enqueue(rand() % 100);
+ }
+ }
+}
+
+void ProducerConsumerThread() {
+ unsigned long long sum = 0;
+ int element;
+ for (int i = 0; i < 1000000; i++) {
+ for (int j = 0; j < 50; j++) {
+ if (!queue.empty() && queue.dequeue(element)) {
+ sum += element;
+ }
+ if (j % 2 == 0) {
+ queue.enqueue(rand() % 100);
+ }
+ }
+ }
+}
+
+void ConsumerThread() {
+ int element;
+ unsigned long long sum = 0;
+ int yield_times = 3;
+ while (yield_times > 0) {
+ while (queue.dequeue(element)) {
+ sum += element;
+ yield_times = 3;
+ }
+ std::this_thread::yield();
+ yield_times--;
+ }
+}
+
+int main() {
+ srand(time(NULL));
+ const int kThreads = 6;
+ // Initialize the queue with some elements.
+ InitQueue();
+ cout << "Starting " << kThreads << " threads for RWQueue...\n";
+
+ struct timespec start, finish;
+ double elapsed = 0.0;
+ clock_gettime(CLOCK_MONOTONIC, &start);
+
+ std::thread threads[kThreads];
+ // Producer thread
+ threads[0] = std::thread(ProducerThread);
+ // ProducerConsumer threads
+ for (int i = 1; i < kThreads; i++) {
+ threads[i] = std::thread(ProducerConsumerThread);
+ }
+
+ for (int i = 0; i < kThreads; i++) {
+ threads[i].join();
+ }
+
+ clock_gettime(CLOCK_MONOTONIC, &finish);
+ elapsed = (finish.tv_sec - start.tv_sec);
+ elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0;
+ cout << "All threads finished.\n";
+ cout << "Time: " << elapsed << " seconds\n";
+ return 0;
+}
--- /dev/null
+#include <atomic>
+#include <cds/gc/dhp.h>
+#include <cds/gc/hp.h>
+#include <cds/sync/seqlock.h>
+#include <cds_test/stress_test.h>
+#include <iostream>
+#include <thread>
+
+using namespace std;
+
+namespace {
+
+typedef cds_others::SeqLock SeqLock;
+
+class SeqLockTest : public cds_test::stress_fixture {
+protected:
+ static int sum;
+ static SeqLock *seqlock;
+ static const int kReaderThreads = 0;
+ static const int kWriterThreads = 0;
+ static const int kReaderWriterThreads = 6;
+ static const int kWriterPercentage = 15;
+ static const int kPassCount = 15000;
+
+ static void SetUpTestCase() {}
+
+ static void ReaderThread() {}
+
+ static void WriterThread() {}
+
+ static void ReaderWriterThread() {
+ for (int i = 0; i < kPassCount; i++) {
+ for (int j = 0; j < kPassCount; j++) {
+ if (rand(100) < kWriterPercentage) {
+ sum += seqlock->read();
+ } else {
+ seqlock->write(rand(10));
+ }
+ }
+ }
+ }
+};
+
+int SeqLockTest::sum;
+SeqLock *SeqLockTest::seqlock;
+const int SeqLockTest::kReaderThreads;
+const int SeqLockTest::kWriterThreads;
+const int SeqLockTest::kReaderWriterThreads;
+
+TEST_F(SeqLockTest, BasicReadWriter) {
+ seqlock = new SeqLock();
+ int num_threads = kReaderThreads + kWriterThreads + kReaderWriterThreads;
+ std::thread *threads = new std::thread[num_threads];
+ for (int i = 0; i < kReaderThreads; i++) {
+ threads[i] = std::thread(ReaderThread);
+ }
+ for (int i = kReaderThreads; i < (kReaderThreads + kWriterThreads); i++) {
+ threads[i] = std::thread(WriterThread);
+ }
+ for (int i = (kReaderThreads + kWriterThreads); i < num_threads; i++) {
+ threads[i] = std::thread(ReaderWriterThread);
+ }
+
+ for (int i = 0; i < num_threads; i++) {
+ threads[i].join();
+ }
+}
+
+} // namespace
--- /dev/null
+#include <atomic>
+#include <cds/gc/dhp.h>
+#include <cds/gc/hp.h>
+#include <cds/sync/spinlock.h>
+#include <cds_test/stress_test.h>
+#include <iostream>
+#include <thread>
+
+using namespace std;
+
+namespace {
+
+typedef cds::sync::spin SpinLock;
+typedef cds::sync::reentrant_spin32 Reentrant32;
+typedef cds::sync::reentrant_spin64 Reentrant64;
+
+#define TASK(lock_type, lock_ptr) \
+ static void Thread ## lock_type() { \
+ for (int i = 0; i < 100000; i++) { \
+ for (int j = 0; j < 30000; j++) { \
+ lock_ptr->lock(); \
+ x = i + j; \
+ lock_ptr->unlock(); \
+ } \
+ } \
+ }
+
+#define LOCK_TEST(lock_type, lock_ptr) \
+ TEST_F(SpinLockTest, lock_type) { \
+ lock_ptr = new lock_type(); \
+ std::thread threads[kThreads]; \
+ for (int i = 0; i < kThreads; i++) { \
+ threads[i] = std::thread(Thread ## lock_type); \
+ } \
+ for (int i = 0; i < kThreads; i++) { \
+ threads[i].join(); \
+ } \
+}
+
+class SpinLockTest : public cds_test::stress_fixture {
+protected:
+ static int x;
+ static SpinLock* spin_mutex;
+ static Reentrant32* reentrant_mutex32;
+ static Reentrant64* reentrant_mutex64;
+
+ static const int kThreads = 6;
+
+ static void SetUpTestCase() {}
+
+ TASK(SpinLock, spin_mutex)
+ TASK(Reentrant32, reentrant_mutex32)
+ TASK(Reentrant64, reentrant_mutex64)
+};
+
+int SpinLockTest::x;
+const int SpinLockTest::kThreads;
+SpinLock* SpinLockTest::spin_mutex;
+Reentrant32* SpinLockTest::reentrant_mutex32;
+Reentrant64* SpinLockTest::reentrant_mutex64;
+
+LOCK_TEST(SpinLock, spin_mutex)
+LOCK_TEST(Reentrant32, reentrant_mutex32)
+LOCK_TEST(Reentrant64, reentrant_mutex64)
+
+} // namespace