1 #include "queue_test.h"
5 class FollyQueueEnqueueDequeueTest_Parallel : public cds_test::stress_fixture {
7 static std::atomic_int producer_num;
9 // The milliseconds for consumers to wait when a failed try_dequeue happens.
10 static unsigned s_nConsumerWaitTime;
11 // For MPMC, half of the threads are producers, and the rest are consumers.
12 static unsigned s_nThreadCount;
14 static size_t s_nUnboundedQueueEnqueueStride;
15 static size_t s_nUSPSCQueueEnqueueCount;
16 static size_t s_nUMPSCQueueEnqueueCount;
17 static size_t s_nUSPMCQueueEnqueueCount;
18 static size_t s_nUMPMCQueueEnqueueCount;
20 // Dynamic bounded queue
21 static size_t s_nDynamicBoundedQueueEnqueueStride;
22 static size_t s_nDynamicBoundedQueueCapacity;
23 static size_t s_nDSPSCQueueEnqueueCount;
24 static size_t s_nDMPSCQueueEnqueueCount;
25 static size_t s_nDSPMCQueueEnqueueCount;
26 static size_t s_nDMPMCQueueEnqueueCount;
29 static size_t s_nAtomicLinkedListPassCount;
31 // MPMC Queue (linearizable)
32 static size_t s_nMPMCQueueEnqueueStride;
33 static size_t s_nMPMCQueueCapacity;
34 static size_t s_nMPMCQueueEnqueueCount;
36 static void SetUpTestCase() {
37 const cds_test::config &cfg = get_config("ParallelFollyQueue");
39 GetConfigNonZeroExpected(ConsumerWaitTime, 200);
40 GetConfigNonZeroExpected(ThreadCount, 4);
41 GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000);
42 GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000);
43 GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000);
44 GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000);
45 GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000);
46 // Dynamic bounded queue
47 GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000);
48 GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000);
49 GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000);
50 GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000);
51 GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000);
52 GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000);
54 GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000);
55 // MPMC Queue (linearizable)
56 GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000);
57 GetConfigNonZeroExpected(MPMCQueueCapacity, 50000);
58 GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000);
61 template <typename Queue, typename Type>
62 static void general_enqueue(Queue *q, const Type &elem) {
66 template <typename Queue, typename Type>
67 static bool general_try_dequeue(Queue *q, Type &result) {
68 return q->try_dequeue(result);
71 // MPMC Specialization.
72 template <typename Type>
73 static void general_enqueue(MPMCQueue *q, const Type &elem) {
74 EXPECT_TRUE(q->write(elem));
77 template <typename Type>
78 static bool general_try_dequeue(MPMCQueue *q, Type &result) {
79 return q->read(result);
82 // AtomicLinkedList Specialization.
83 template <typename Type>
84 static void general_enqueue(AtomicLinkedList *q, const Type &elem) {
88 template <typename Queue>
89 static void run_producer(Queue *q, size_t enqueue_count) {
90 for (size_t i = 0; i < enqueue_count; i++) {
91 size_t elem_to_push = rand(enqueue_count);
95 general_enqueue(q, elem_to_push);
97 producer_num.fetch_sub(1, std::memory_order_release);
100 template <typename Queue> static void run_consumer(Queue *q) {
101 size_t dequeue_sum = 0;
104 if (!general_try_dequeue(q, result)) {
105 if (producer_num.load(std::memory_order_acquire) == 0) {
106 if (!general_try_dequeue(q, result)) {
107 // If all producers are done and we still dequeue to nothing,
108 // the consumer quits.
113 dequeue_sum += result;
115 EXPECT_GT(dequeue_sum, 0);
118 template <typename QueueType>
119 static void FollyMPMCThreading(QueueType *q, size_t producer_cnt,
120 size_t producer_pass_count,
121 size_t consumer_cnt) {
122 producer_num.store(producer_cnt, std::memory_order_relaxed);
123 size_t total_thread_cnt = producer_cnt + consumer_cnt;
124 std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
125 for (size_t i = 0; i < total_thread_cnt; i++) {
126 if (i < producer_cnt) {
128 std::thread(run_producer<QueueType>, q, producer_pass_count);
130 threads[i] = std::thread(run_consumer<QueueType>, q);
133 for (size_t i = 0; i < total_thread_cnt; i++) {
139 // Specialization for AtomicLinkedList
141 void FollyQueueEnqueueDequeueTest_Parallel::run_consumer(AtomicLinkedList *q) {
142 size_t dequeue_sum = 0;
143 auto func = [&dequeue_sum](size_t elem) { dequeue_sum += elem; };
146 if (producer_num.load(std::memory_order_acquire) == 0) {
148 // If all producers are done and we still dequeue to nothing,
149 // the consumer quits.
153 EXPECT_GT(dequeue_sum, 0);
156 std::atomic_int FollyQueueEnqueueDequeueTest_Parallel::producer_num;
158 unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nConsumerWaitTime;
159 unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nThreadCount;
161 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUnboundedQueueEnqueueStride;
162 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPSCQueueEnqueueCount;
163 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPSCQueueEnqueueCount;
164 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPMCQueueEnqueueCount;
165 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPMCQueueEnqueueCount;
166 // Dynamic bounded queue
168 FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueEnqueueStride;
169 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueCapacity;
170 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPSCQueueEnqueueCount;
171 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPSCQueueEnqueueCount;
172 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPMCQueueEnqueueCount;
173 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPMCQueueEnqueueCount;
175 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nAtomicLinkedListPassCount;
176 // MPMC Queue (linearizable)
177 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueStride;
178 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueCapacity;
179 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueCount;
181 // Used as a MPSC queue.
182 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyAtomicLinkedList) {
183 typedef AtomicLinkedList Queue;
184 std::unique_ptr<Queue> q(new Queue());
185 FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nAtomicLinkedListPassCount,
189 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyMPMCQueue) {
190 typedef MPMCQueue Queue;
191 std::unique_ptr<Queue> q(new Queue(s_nMPMCQueueCapacity));
192 FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nMPMCQueueEnqueueCount,
193 s_nThreadCount - s_nThreadCount / 2);
196 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPSC) {
197 typedef DSPSCQueue Queue;
198 std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
199 FollyMPMCThreading(q.get(), 1, s_nDSPSCQueueEnqueueCount, 1);
202 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPSC) {
203 typedef DMPSCQueue Queue;
204 std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
205 FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nDMPSCQueueEnqueueCount, 1);
208 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPMC) {
209 typedef DSPMCQueue Queue;
210 std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
211 FollyMPMCThreading(q.get(), 1, s_nDSPMCQueueEnqueueCount, s_nThreadCount - 1);
214 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPMC) {
215 typedef DMPMCQueue Queue;
216 std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
217 FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nDMPMCQueueEnqueueCount,
218 s_nThreadCount - s_nThreadCount / 2);
221 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPSC) {
222 typedef USPSCQueue Queue;
223 std::unique_ptr<Queue> q(new Queue());
224 FollyMPMCThreading(q.get(), 1, s_nUSPSCQueueEnqueueCount, 1);
227 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPSC) {
228 typedef UMPSCQueue Queue;
229 std::unique_ptr<Queue> q(new Queue());
230 FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nUMPSCQueueEnqueueCount, 1);
233 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPMC) {
234 typedef USPMCQueue Queue;
235 std::unique_ptr<Queue> q(new Queue());
236 FollyMPMCThreading(q.get(), 1, s_nUSPMCQueueEnqueueCount, s_nThreadCount - 1);
239 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPMC) {
240 typedef UMPMCQueue Queue;
242 std::unique_ptr<Queue> q(new Queue());
243 FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nUMPMCQueueEnqueueCount,
244 s_nThreadCount - s_nThreadCount / 2);
247 } // namespace folly_test