Preparing to fix the deadlock bug.
--- /dev/null
+/spsc-queue
+/spsc-relacy
--- /dev/null
+include ../benchmarks.mk
+
+TESTNAME = spsc-queue
+RELACYNAME = spsc-relacy
+
+all: $(TESTNAME)
+
+$(TESTNAME): $(TESTNAME).cc queue.h eventcount.h
+ $(CXX) -o $@ $< $(CPPFLAGS) $(LDFLAGS)
+
+relacy: $(RELACYNAME)
+
+$(RELACYNAME): spsc-relacy.cc queue-relacy.h eventcount-relacy.h
+ifdef RELACYPATH
+ $(CXX) -o $(RELACYNAME) spsc-relacy.cc -I$(RELACYPATH) -Wno-deprecated
+else
+ @echo "Please define RELACYPATH"
+ @echo " e.g., make RELACYPATH=/path-to-relacy"
+ @exit 1
+endif
+
+clean:
+ rm -f $(TESTNAME) $(RELACYNAME) *.o
--- /dev/null
+class eventcount
+{
+public:
+ eventcount() : waiters(0)
+ {
+ count($) = 0;
+ }
+
+ void signal_relaxed()
+ {
+ unsigned cmp = count.load(std::memory_order_relaxed);
+ signal_impl(cmp);
+ }
+
+ void signal()
+ {
+ unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst);
+ signal_impl(cmp);
+ }
+
+ unsigned get()
+ {
+ unsigned cmp = count.fetch_or(0x80000000,
+std::memory_order_seq_cst);
+ return cmp & 0x7FFFFFFF;
+ }
+
+ void wait(unsigned cmp)
+ {
+ unsigned ec = count.load(std::memory_order_seq_cst);
+ if (cmp == (ec & 0x7FFFFFFF))
+ {
+ guard.lock($);
+ ec = count.load(std::memory_order_seq_cst);
+ if (cmp == (ec & 0x7FFFFFFF))
+ {
+ waiters($) += 1;
+ cv.wait(guard, $);
+ }
+ guard.unlock($);
+ }
+ }
+
+private:
+ std::atomic<unsigned> count;
+ rl::var<unsigned> waiters;
+ std::mutex guard;
+ std::condition_variable cv;
+
+ void signal_impl(unsigned cmp)
+ {
+ if (cmp & 0x80000000)
+ {
+ guard.lock($);
+ while (false == count.compare_exchange_weak(cmp,
+ (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed));
+ unsigned w = waiters($);
+ waiters($) = 0;
+ guard.unlock($);
+ if (w)
+ cv.notify_all($);
+ }
+ }
+};
--- /dev/null
+#include <unrelacy.h>
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+class eventcount
+{
+public:
+ eventcount() : waiters(0)
+ {
+ count = 0;
+ }
+
+ void signal_relaxed()
+ {
+ unsigned cmp = count.load(std::memory_order_relaxed);
+ signal_impl(cmp);
+ }
+
+ void signal()
+ {
+ unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst);
+ signal_impl(cmp);
+ }
+
+ unsigned get()
+ {
+ unsigned cmp = count.fetch_or(0x80000000,
+std::memory_order_seq_cst);
+ return cmp & 0x7FFFFFFF;
+ }
+
+ void wait(unsigned cmp)
+ {
+ unsigned ec = count.load(std::memory_order_seq_cst);
+ if (cmp == (ec & 0x7FFFFFFF))
+ {
+ guard.lock($);
+ ec = count.load(std::memory_order_seq_cst);
+ if (cmp == (ec & 0x7FFFFFFF))
+ {
+ waiters += 1;
+ cv.wait(guard);
+ }
+ guard.unlock($);
+ }
+ }
+
+private:
+ std::atomic<unsigned> count;
+ rl::var<unsigned> waiters;
+ std::mutex guard;
+ std::condition_variable cv;
+
+ void signal_impl(unsigned cmp)
+ {
+ if (cmp & 0x80000000)
+ {
+ guard.lock($);
+ while (false == count.compare_exchange_weak(cmp,
+ (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed));
+ unsigned w = waiters($);
+ waiters = 0;
+ guard.unlock($);
+ if (w)
+ cv.notify_all($);
+ }
+ }
+};
--- /dev/null
+#include "eventcount-relacy.h"
+
+template<typename T>
+class spsc_queue
+{
+public:
+ spsc_queue()
+ {
+ node* n = new node ();
+ head($) = n;
+ tail($) = n;
+ }
+
+ ~spsc_queue()
+ {
+ RL_ASSERT(head($) == tail($));
+ delete ((node*)head($));
+ }
+
+ void enqueue(T data)
+ {
+ node* n = new node (data);
+ head($)->next.store(n, std::memory_order_release);
+ head($) = n;
+ ec.signal_relaxed();
+ }
+
+ T dequeue()
+ {
+ T data = try_dequeue();
+ while (0 == data)
+ {
+ int cmp = ec.get();
+ data = try_dequeue();
+ if (data)
+ break;
+ ec.wait(cmp);
+ data = try_dequeue();
+ if (data)
+ break;
+ }
+ return data;
+ }
+
+private:
+ struct node
+ {
+ std::atomic<node*> next;
+ rl::var<T> data;
+
+ node(T data = T())
+ : data(data)
+ {
+ next($) = 0;
+ }
+ };
+
+ rl::var<node*> head;
+ rl::var<node*> tail;
+
+ eventcount ec;
+
+ T try_dequeue()
+ {
+ node* t = tail($);
+ node* n = t->next.load(std::memory_order_acquire);
+ if (0 == n)
+ return 0;
+ T data = n->data($);
+ delete (t);
+ tail($) = n;
+ return data;
+ }
+};
--- /dev/null
+#include <unrelacy.h>
+#include <atomic>
+
+#include "eventcount.h"
+
+template<typename T>
+class spsc_queue
+{
+public:
+ spsc_queue()
+ {
+ node* n = new node ();
+ head = n;
+ tail = n;
+ }
+
+ ~spsc_queue()
+ {
+ RL_ASSERT(head == tail);
+ delete ((node*)head($));
+ }
+
+ void enqueue(T data)
+ {
+ node* n = new node (data);
+ head($)->next.store(n, std::memory_order_release);
+ head = n;
+ ec.signal_relaxed();
+ }
+
+ T dequeue()
+ {
+ T data = try_dequeue();
+ while (0 == data)
+ {
+ int cmp = ec.get();
+ data = try_dequeue();
+ if (data)
+ break;
+ ec.wait(cmp);
+ data = try_dequeue();
+ if (data)
+ break;
+ }
+ return data;
+ }
+
+private:
+ struct node
+ {
+ std::atomic<node*> next;
+ rl::var<T> data;
+
+ node(T data = T())
+ : data(data)
+ {
+ next = 0;
+ }
+ };
+
+ rl::var<node*> head;
+ rl::var<node*> tail;
+
+ eventcount ec;
+
+ T try_dequeue()
+ {
+ node* t = tail($);
+ node* n = t->next.load(std::memory_order_acquire);
+ if (0 == n)
+ return 0;
+ T data = n->data($);
+ delete (t);
+ tail = n;
+ return data;
+ }
+};
--- /dev/null
+#include <threads.h>
+
+#include "queue.h"
+
+spsc_queue<int> *q;
+
+ void thread(unsigned thread_index)
+ {
+ if (0 == thread_index)
+ {
+ q->enqueue(11);
+ }
+ else
+ {
+ int d = q->dequeue();
+ RL_ASSERT(11 == d);
+ }
+ }
+
+int user_main(int argc, char **argv)
+{
+ thrd_t A, B;
+
+ q = new spsc_queue<int>();
+
+ thrd_create(&A, (thrd_start_t)&thread, (void *)0);
+ thrd_create(&B, (thrd_start_t)&thread, (void *)1);
+ thrd_join(A);
+ thrd_join(B);
+
+ delete q;
+
+ return 0;
+}
--- /dev/null
+#include <relacy/relacy_std.hpp>
+
+#include "queue-relacy.h"
+
+struct spsc_queue_test : rl::test_suite<spsc_queue_test, 2>
+{
+ spsc_queue<int> q;
+
+ void thread(unsigned thread_index)
+ {
+ if (0 == thread_index)
+ {
+ q.enqueue(11);
+ }
+ else
+ {
+ int d = q.dequeue();
+ RL_ASSERT(11 == d);
+ }
+ }
+};
+
+
+int main()
+{
+ rl::simulate<spsc_queue_test>();
+}