--- /dev/null
+#include <cds/misc/RigtorpMPMCQueue.h>
+#include <cds_test/stress_test.h>
+#include <cds_test/stress_test_util.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class RigtorpMPMCQueueTest_Parallel : public cds_test::stress_fixture {
+protected:
+ static size_t s_nRigtorpMPMCQueueThreadCount;
+ static size_t s_nRigtorpMPMCQueuePassCount;
+ static size_t s_nRigtorpMPMCQueueCapacity;
+ static atomic_int producer_num;
+
+ static void SetUpTestCase() {
+ cds_test::config const &cfg = get_config("Misc");
+ GetConfigExpected(RigtorpMPMCQueueThreadCount, 4);
+ GetConfigExpected(RigtorpMPMCQueuePassCount, 10000);
+ GetConfigExpected(RigtorpMPMCQueueCapacity, 2048);
+ }
+
+ template <typename Queue>
+ static void run_producer(Queue *q, size_t enqueue_count) {
+ for (size_t i = 0; i < enqueue_count; i++) {
+ size_t elem_to_push = rand(enqueue_count);
+ if (!elem_to_push) {
+ elem_to_push++;
+ }
+ q->push(elem_to_push);
+ }
+ producer_num.fetch_sub(1, std::memory_order_release);
+ }
+
+ template <typename Queue>
+ static void run_consumer(Queue *q) {
+ size_t dequeue_sum = 0;
+ while (true) {
+ size_t res = 0;
+ if (q->try_pop(res)) {
+ dequeue_sum += res;
+ } else if (producer_num.load(std::memory_order_acquire) == 0) {
+ while (q->try_pop(res)) {
+ dequeue_sum += res;
+ }
+ break;
+ }
+ }
+ EXPECT_GT(dequeue_sum, 0);
+ }
+
+ template <typename Queue>
+ static void RigtorpMPMCThreading(Queue *q, size_t producer_pass_count) {
+ size_t producer_cnt = s_nRigtorpMPMCQueueThreadCount / 2;
+ size_t consumer_cnt = s_nRigtorpMPMCQueueThreadCount - producer_cnt;
+ producer_num.store(producer_cnt, std::memory_order_relaxed);
+ std::unique_ptr<std::thread[]> threads(
+ new std::thread[s_nRigtorpMPMCQueueThreadCount]);
+ for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
+ if (i < producer_cnt) {
+ threads[i] = std::thread(run_producer<Queue>, q, producer_pass_count);
+ } else {
+ threads[i] = std::thread(run_consumer<Queue>, q);
+ }
+ }
+ for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
+ threads[i].join();
+ }
+ }
+};
+
+atomic_int RigtorpMPMCQueueTest_Parallel::producer_num;
+size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueThreadCount;
+size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueuePassCount;
+size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueCapacity;
+
+TEST_F(RigtorpMPMCQueueTest_Parallel, PushPop) {
+ std::unique_ptr<rigtorp::MPMCQueue<size_t>> q(
+ new rigtorp::MPMCQueue<size_t>(s_nRigtorpMPMCQueueCapacity));
+ RigtorpMPMCThreading(q.get(), s_nRigtorpMPMCQueuePassCount);
+}
+
+} // namespace
--- /dev/null
+#include <cds/misc/RigtorpSPSCQueue.h>
+#include <cds_test/stress_test.h>
+#include <cds_test/stress_test_util.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture {
+protected:
+ static size_t s_nRigtorpSPSCQueuePassCount;
+ static size_t s_nRigtorpSPSCQueueCapacity;
+ static atomic_int producer_num;
+
+ static void SetUpTestCase() {
+ cds_test::config const &cfg = get_config("Misc");
+ GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000);
+ GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000);
+ }
+
+ template <typename Queue>
+ static void run_producer(Queue *q, size_t enqueue_count) {
+ for (size_t i = 0; i < enqueue_count; i++) {
+ size_t elem_to_push = rand(enqueue_count);
+ if (!elem_to_push) {
+ elem_to_push++;
+ }
+ q->push(elem_to_push);
+ }
+ producer_num.fetch_sub(1, std::memory_order_release);
+ }
+
+ template <typename Queue>
+ static void run_consumer(Queue *q) {
+ size_t dequeue_sum = 0;
+ while (true) {
+ size_t *res = nullptr;
+ while ((res = q->front())) {
+ dequeue_sum += *res;
+ q->pop();
+ }
+ if (producer_num.load(std::memory_order_acquire) == 0) {
+ while ((res = q->front())) {
+ dequeue_sum += *res;
+ q->pop();
+ }
+ break;
+ }
+ }
+ EXPECT_GT(dequeue_sum, 0);
+ }
+
+ template <typename Queue>
+ static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) {
+ producer_num.store(1, std::memory_order_relaxed);
+ size_t total_thread_cnt = 2;
+ std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
+ threads[0] = std::thread(run_producer<Queue>, q, producer_pass_count);
+ threads[1] = std::thread(run_consumer<Queue>, q);
+ for (size_t i = 0; i < total_thread_cnt; i++) {
+ threads[i].join();
+ }
+ }
+};
+
+atomic_int RigtorpSPSCQueueTest_Parallel::producer_num;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity;
+
+TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) {
+ std::unique_ptr<rigtorp::SPSCQueue<size_t>> q(
+ new rigtorp::SPSCQueue<size_t>(s_nRigtorpSPSCQueueCapacity));
+ RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount);
+}
+
+} // namespace