--- /dev/null
+#include <cds/misc/RigtorpSPSCQueue.h>
+#include <cds_test/stress_test.h>
+#include <cds_test/stress_test_util.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture {
+protected:
+ static size_t s_nRigtorpSPSCQueuePassCount;
+ static size_t s_nRigtorpSPSCQueueCapacity;
+ static atomic_int producer_num;
+
+ static void SetUpTestCase() {
+ cds_test::config const &cfg = get_config("Misc");
+ GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000);
+ GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000);
+ }
+
+ template <typename Queue>
+ static void run_producer(Queue *q, size_t enqueue_count) {
+ for (size_t i = 0; i < enqueue_count; i++) {
+ size_t elem_to_push = rand(enqueue_count);
+ if (!elem_to_push) {
+ elem_to_push++;
+ }
+ q->push(elem_to_push);
+ }
+ producer_num.fetch_sub(1, std::memory_order_release);
+ }
+
+ template <typename Queue>
+ static void run_consumer(Queue *q) {
+ size_t dequeue_sum = 0;
+ while (true) {
+ size_t *res = nullptr;
+ while ((res = q->front())) {
+ dequeue_sum += *res;
+ q->pop();
+ }
+ if (producer_num.load(std::memory_order_acquire) == 0) {
+ while ((res = q->front())) {
+ dequeue_sum += *res;
+ q->pop();
+ }
+ break;
+ }
+ }
+ EXPECT_GT(dequeue_sum, 0);
+ }
+
+ template <typename Queue>
+ static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) {
+ producer_num.store(1, std::memory_order_relaxed);
+ size_t total_thread_cnt = 2;
+ std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
+ threads[0] = std::thread(run_producer<Queue>, q, producer_pass_count);
+ threads[1] = std::thread(run_consumer<Queue>, q);
+ for (size_t i = 0; i < total_thread_cnt; i++) {
+ threads[i].join();
+ }
+ }
+};
+
+atomic_int RigtorpSPSCQueueTest_Parallel::producer_num;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity;
+
+TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) {
+ std::unique_ptr<rigtorp::SPSCQueue<size_t>> q(
+ new rigtorp::SPSCQueue<size_t>(s_nRigtorpSPSCQueueCapacity));
+ RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount);
+}
+
+} // namespace