Adds Rigtorp queues parallel test cases
[libcds.git] / test / stress / misc / rigtorp_spsc_driver.cpp
diff --git a/test/stress/misc/rigtorp_spsc_driver.cpp b/test/stress/misc/rigtorp_spsc_driver.cpp
new file mode 100644 (file)
index 0000000..ade9994
--- /dev/null
@@ -0,0 +1,78 @@
+#include <cds/misc/RigtorpSPSCQueue.h>
+#include <cds_test/stress_test.h>
+#include <cds_test/stress_test_util.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture {
+protected:
+  static size_t s_nRigtorpSPSCQueuePassCount;
+  static size_t s_nRigtorpSPSCQueueCapacity;
+  static atomic_int producer_num;
+
+  static void SetUpTestCase() {
+    cds_test::config const &cfg = get_config("Misc");
+    GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000);
+    GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000);
+  }
+
+  template <typename Queue>
+  static void run_producer(Queue *q, size_t enqueue_count) {
+    for (size_t i = 0; i < enqueue_count; i++) {
+      size_t elem_to_push = rand(enqueue_count);
+      if (!elem_to_push) {
+        elem_to_push++;
+      }
+      q->push(elem_to_push);
+    }
+    producer_num.fetch_sub(1, std::memory_order_release);
+  }
+
+       template <typename Queue>
+       static void run_consumer(Queue *q) {
+    size_t dequeue_sum = 0;
+    while (true) {
+      size_t *res = nullptr;
+      while ((res = q->front())) {
+        dequeue_sum += *res;
+        q->pop();
+      }
+      if (producer_num.load(std::memory_order_acquire) == 0) {
+        while ((res = q->front())) {
+          dequeue_sum += *res;
+          q->pop();
+        }
+        break;
+      }
+    }
+    EXPECT_GT(dequeue_sum, 0);
+  }
+
+  template <typename Queue>
+  static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) {
+    producer_num.store(1, std::memory_order_relaxed);
+    size_t total_thread_cnt = 2;
+    std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
+    threads[0] = std::thread(run_producer<Queue>, q, producer_pass_count);
+    threads[1] = std::thread(run_consumer<Queue>, q);
+    for (size_t i = 0; i < total_thread_cnt; i++) {
+      threads[i].join();
+    }
+  }
+};
+
+atomic_int RigtorpSPSCQueueTest_Parallel::producer_num;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity;
+
+TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) {
+  std::unique_ptr<rigtorp::SPSCQueue<size_t>> q(
+      new rigtorp::SPSCQueue<size_t>(s_nRigtorpSPSCQueueCapacity));
+  RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount);
+}
+
+} // namespace