From ced31653a84f29c3f236fb5976f1241262953914 Mon Sep 17 00:00:00 2001 From: Brian Norris Date: Wed, 10 Oct 2012 17:03:04 -0700 Subject: [PATCH] spsc-queue: add new benchmark From: https://groups.google.com/forum/?fromgroups=#!msg/comp.programming.threads/nSSFT9vKEe0/7eD3ioDg6nEJ --- spsc-queue/eventcount.h | 64 +++++++++++++++++++++++++++++++++++ spsc-queue/queue.h | 73 ++++++++++++++++++++++++++++++++++++++++ spsc-queue/spsc-queue.cc | 24 +++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 spsc-queue/eventcount.h create mode 100644 spsc-queue/queue.h create mode 100644 spsc-queue/spsc-queue.cc diff --git a/spsc-queue/eventcount.h b/spsc-queue/eventcount.h new file mode 100644 index 0000000..226603e --- /dev/null +++ b/spsc-queue/eventcount.h @@ -0,0 +1,64 @@ +class eventcount +{ +public: + eventcount() + : count(0) + , waiters(0) + {} + + void signal_relaxed() + { + unsigned cmp = count($).load(std::memory_order_relaxed); + signal_impl(cmp); + } + + void signal() + { + unsigned cmp = count($).fetch_add(0, std::memory_order_seq_cst); + signal_impl(cmp); + } + + unsigned get() + { + unsigned cmp = count($).fetch_or(0x80000000, +std::memory_order_seq_cst); + return cmp & 0x7FFFFFFF; + } + + void wait(unsigned cmp) + { + unsigned ec = count($).load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + guard.lock($); + ec = count($).load(std::memory_order_seq_cst); + if (cmp == (ec & 0x7FFFFFFF)) + { + waiters($) += 1; + cv.wait(guard, $); + } + guard.unlock($); + } + } + +private: + std::atomic count; + rl::var waiters; + mutex guard; + condition_variable_any cv; + + void signal_impl(unsigned cmp) + { + if (cmp & 0x80000000) + { + guard.lock($); + while (false == count($).compare_swap(cmp, + (cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed)); + unsigned w = waiters($); + waiters($) = 0; + guard.unlock($); + if (w) + cv.notify_all($); + } + } +}; diff --git a/spsc-queue/queue.h b/spsc-queue/queue.h new file mode 100644 index 0000000..d65477b --- /dev/null +++ b/spsc-queue/queue.h @@ -0,0 +1,73 @@ +#include "eventcount.h" + +template +class spsc_queue +{ +public: + spsc_queue() + { + node* n = RL_NEW node (); + head($) = n; + tail($) = n; + } + + ~spsc_queue() + { + RL_ASSERT(head($) == tail($)); + RL_DELETE((node*)head($)); + } + + void enqueue(T data) + { + node* n = RL_NEW node (data); + head($)->next($).store(n, std::memory_order_release); + head($) = n; + ec.signal_relaxed(); + } + + T dequeue() + { + T data = try_dequeue(); + while (0 == data) + { + int cmp = ec.get(); + data = try_dequeue(); + if (data) + break; + ec.wait(cmp); + data = try_dequeue(); + if (data) + break; + } + return data; + } + +private: + struct node + { + std::atomic next; + rl::var data; + + node(T data = T()) + : next(0) + , data(data) + {} + }; + + rl::var head; + rl::var tail; + + eventcount ec; + + T try_dequeue() + { + node* t = tail($); + node* n = t->next($).load(std::memory_order_acquire); + if (0 == n) + return 0; + T data = n->data($); + RL_DELETE(t); + tail($) = n; + return data; + } +}; diff --git a/spsc-queue/spsc-queue.cc b/spsc-queue/spsc-queue.cc new file mode 100644 index 0000000..a81a1d0 --- /dev/null +++ b/spsc-queue/spsc-queue.cc @@ -0,0 +1,24 @@ +#include "queue.h" + +struct spsc_queue_test : rl::test_suite +{ + spsc_queue q; + + void thread(unsigned thread_index) + { + if (0 == thread_index) + { + q.enqueue(11); + } + else + { + int d = q.dequeue(); + RL_ASSERT(11 == d); + } + } +}; + +int main() +{ + rl::simulate(); +} -- 2.34.1