add spsc for inference
authorPeizhao Ou <peizhaoo@uci.edu>
Fri, 20 Mar 2015 21:52:48 +0000 (14:52 -0700)
committerPeizhao Ou <peizhaoo@uci.edu>
Fri, 20 Mar 2015 21:52:48 +0000 (14:52 -0700)
spsc-queue-scfence/Makefile [new file with mode: 0644]
spsc-queue-scfence/eventcount.h [new file with mode: 0644]
spsc-queue-scfence/queue.h [new file with mode: 0644]
spsc-queue-scfence/spsc-queue.cc [new file with mode: 0644]

diff --git a/spsc-queue-scfence/Makefile b/spsc-queue-scfence/Makefile
new file mode 100644 (file)
index 0000000..103f5c4
--- /dev/null
@@ -0,0 +1,11 @@
+include ../benchmarks.mk
+
+TESTNAME = spsc-queue
+
+all: $(TESTNAME)
+
+$(TESTNAME): $(TESTNAME).cc queue.h eventcount.h
+       $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
+
+clean:
+       rm -f $(TESTNAME) $(RELACYNAME) *.o
diff --git a/spsc-queue-scfence/eventcount.h b/spsc-queue-scfence/eventcount.h
new file mode 100644 (file)
index 0000000..3f9ad1b
--- /dev/null
@@ -0,0 +1,69 @@
+#include <unrelacy.h>
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+#include "wildcard.h" 
+
+class eventcount
+{
+public:
+       eventcount() : waiters(0)
+       {
+               count = 0;
+       }
+
+       void signal_relaxed()
+       {
+               unsigned cmp = count.load(wildcard(1)); // relaxed
+               signal_impl(cmp);
+       }
+
+       void signal()
+       {
+               unsigned cmp = count.fetch_add(0, std::memory_order_seq_cst); // the fix
+               signal_impl(cmp);
+       }
+
+       unsigned get()
+       {
+               unsigned cmp = count.fetch_or(0x80000000, wildcard(2)); // sc
+               return cmp & 0x7FFFFFFF;
+       }
+
+       void wait(unsigned cmp)
+       {
+               unsigned ec = count.load(wildcard(3)); // sc
+               if (cmp == (ec & 0x7FFFFFFF))
+               {
+                       guard.lock($);
+                       ec = count.load(wildcard(4)); // sc
+                       if (cmp == (ec & 0x7FFFFFFF))
+                       {
+                               waiters += 1;
+                               cv.wait(guard);
+                       }
+                       guard.unlock($);
+               }
+       }
+
+private:
+       std::atomic<unsigned> count;
+       rl::var<unsigned> waiters;
+       std::mutex guard;
+       std::condition_variable cv;
+
+       void signal_impl(unsigned cmp)
+       {
+               if (cmp & 0x80000000)
+               {
+                       guard.lock($);
+                       while (false == count.compare_exchange_weak(cmp,
+                               (cmp + 1) & 0x7FFFFFFF, wildcard(5))); // relaxed
+                       unsigned w = waiters($);
+                       waiters = 0;
+                       guard.unlock($);
+                       if (w)
+                               cv.notify_all($);
+               }
+       }
+};
diff --git a/spsc-queue-scfence/queue.h b/spsc-queue-scfence/queue.h
new file mode 100644 (file)
index 0000000..859f059
--- /dev/null
@@ -0,0 +1,77 @@
+#include <unrelacy.h>
+#include <atomic>
+
+#include "eventcount.h"
+#include "wildcard.h"
+
+class spsc_queue
+{
+public:
+       spsc_queue()
+       {
+               node* n = new node (-1);
+               head = n;
+               tail = n;
+       }
+
+       ~spsc_queue()
+       {
+               RL_ASSERT(head == tail);
+               delete ((node*)head($));
+       }
+
+       void enqueue(int data)
+       {
+               node* n = new node (data);
+               head($)->next.store(n, wildcard(6));
+               head = n;
+               ec.signal_relaxed();
+       }
+
+       int dequeue()
+       {
+               int data = try_dequeue();
+               while (0 == data)
+               {
+                       int cmp = ec.get();
+                       data = try_dequeue();
+                       if (data)
+                               break;
+                       ec.wait(cmp);
+                       data = try_dequeue();
+                       if (data)
+                               break;
+               }
+               return data;
+       }
+
+private:
+       struct node
+       {
+               std::atomic<node*> next;
+               std::atomic<int> data;
+
+               node(int d)
+               {
+                       data.store(d, memory_order_normal);
+                       next = 0;
+               }
+       };
+
+       rl::var<node*> head;
+       rl::var<node*> tail;
+
+       eventcount ec;
+
+       int try_dequeue()
+       {
+               node* t = tail($);
+               node* n = t->next.load(wildcard(7));
+               if (0 == n)
+                       return 0;
+               int data = n->data.load(memory_order_normal);
+               delete (t);
+               tail = n;
+               return data;
+       }
+};
diff --git a/spsc-queue-scfence/spsc-queue.cc b/spsc-queue-scfence/spsc-queue.cc
new file mode 100644 (file)
index 0000000..dde5d64
--- /dev/null
@@ -0,0 +1,34 @@
+#include <threads.h>
+
+#include "queue.h"
+
+spsc_queue *q;
+
+       void thread(unsigned thread_index)
+       {
+               if (0 == thread_index)
+               {
+                       q->enqueue(11);
+               }
+               else
+               {
+                       int d = q->dequeue();
+                       //RL_ASSERT(11 == d);
+               }
+       }
+
+int user_main(int argc, char **argv)
+{
+       thrd_t A, B;
+
+       q = new spsc_queue;
+
+       thrd_create(&A, (thrd_start_t)&thread, (void *)0);
+       thrd_create(&B, (thrd_start_t)&thread, (void *)1);
+       thrd_join(A);
+       thrd_join(B);
+
+       delete q;
+
+       return 0;
+}