ba6ee797c6cae0aafe0fb2aacf2f62ed0ecd0274
[folly.git] / folly / stress-test / stress-parallel-folly-queue.cpp
1 #include "queue_test.h"
2
3 namespace folly_test {
4
5 class FollyQueueEnqueueDequeueTest_Parallel : public cds_test::stress_fixture {
6 protected:
7   static std::atomic_int producer_num;
8
9   // The milliseconds for consumers to wait when a failed try_dequeue happens.
10   static unsigned s_nConsumerWaitTime;
11   // For MPMC, half of the threads are producers, and the rest are consumers.
12   static unsigned s_nThreadCount;
13   // Unbounded queue
14   static size_t s_nUnboundedQueueEnqueueStride;
15   static size_t s_nUSPSCQueueEnqueueCount;
16   static size_t s_nUMPSCQueueEnqueueCount;
17   static size_t s_nUSPMCQueueEnqueueCount;
18   static size_t s_nUMPMCQueueEnqueueCount;
19
20   // Dynamic bounded queue
21   static size_t s_nDynamicBoundedQueueEnqueueStride;
22   static size_t s_nDynamicBoundedQueueCapacity;
23   static size_t s_nDSPSCQueueEnqueueCount;
24   static size_t s_nDMPSCQueueEnqueueCount;
25   static size_t s_nDSPMCQueueEnqueueCount;
26   static size_t s_nDMPMCQueueEnqueueCount;
27
28   // AtomicLinkedList
29   static size_t s_nAtomicLinkedListPassCount;
30
31   // MPMC Queue (linearizable)
32   static size_t s_nMPMCQueueEnqueueStride;
33   static size_t s_nMPMCQueueCapacity;
34   static size_t s_nMPMCQueueEnqueueCount;
35
36   static void SetUpTestCase() {
37     const cds_test::config &cfg = get_config("ParallelFollyQueue");
38     // Unbounded queue
39     GetConfigNonZeroExpected(ConsumerWaitTime, 200);
40     GetConfigNonZeroExpected(ThreadCount, 4);
41     GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000);
42     GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000);
43     GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000);
44     GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000);
45     GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000);
46     // Dynamic bounded queue
47     GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000);
48     GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000);
49     GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000);
50     GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000);
51     GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000);
52     GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000);
53     // AtomicLinkedList
54     GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000);
55     // MPMC Queue (linearizable)
56     GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000);
57     GetConfigNonZeroExpected(MPMCQueueCapacity, 50000);
58     GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000);
59   }
60
61   template <typename Queue, typename Type>
62   static void general_enqueue(Queue *q, const Type &elem) {
63     q->enqueue(elem);
64   }
65
66   template <typename Queue, typename Type>
67   static bool general_try_dequeue(Queue *q, Type &result) {
68     return q->try_dequeue(result);
69   }
70
71   // MPMC Specialization.
72   template <typename Type>
73   static void general_enqueue(MPMCQueue *q, const Type &elem) {
74     EXPECT_TRUE(q->write(elem));
75   }
76
77   template <typename Type>
78   static bool general_try_dequeue(MPMCQueue *q, Type &result) {
79     return q->read(result);
80   }
81
82   // AtomicLinkedList Specialization.
83   template <typename Type>
84   static void general_enqueue(AtomicLinkedList *q, const Type &elem) {
85     q->insertHead(elem);
86   }
87
88   template <typename Queue>
89   static void run_producer(Queue *q, size_t enqueue_count) {
90     for (size_t i = 0; i < enqueue_count; i++) {
91       size_t elem_to_push = rand(enqueue_count);
92       if (!elem_to_push) {
93         elem_to_push++;
94       }
95       general_enqueue(q, elem_to_push);
96     }
97     producer_num.fetch_sub(1, std::memory_order_release);
98   }
99
100   template <typename Queue> static void run_consumer(Queue *q) {
101     size_t dequeue_sum = 0;
102     size_t result = 0;
103     while (true) {
104       if (!general_try_dequeue(q, result)) {
105         if (producer_num.load(std::memory_order_acquire) == 0) {
106           if (!general_try_dequeue(q, result)) {
107             // If all producers are done and we still dequeue to nothing,
108             // the consumer quits.
109             break;
110           }
111         }
112       }
113       dequeue_sum += result;
114     }
115     EXPECT_GT(dequeue_sum, 0);
116   }
117
118   template <typename QueueType>
119   static void FollyMPMCThreading(QueueType *q, size_t producer_cnt,
120                                  size_t producer_pass_count,
121                                  size_t consumer_cnt) {
122     producer_num.store(producer_cnt, std::memory_order_relaxed);
123     size_t total_thread_cnt = producer_cnt + consumer_cnt;
124     std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
125     for (size_t i = 0; i < total_thread_cnt; i++) {
126       if (i < producer_cnt) {
127         threads[i] =
128             std::thread(run_producer<QueueType>, q, producer_pass_count);
129       } else {
130         threads[i] = std::thread(run_consumer<QueueType>, q);
131       }
132     }
133     for (size_t i = 0; i < total_thread_cnt; i++) {
134       threads[i].join();
135     }
136   }
137 };
138
139 // Specialization for AtomicLinkedList
140 template <>
141 void FollyQueueEnqueueDequeueTest_Parallel::run_consumer(AtomicLinkedList *q) {
142   size_t dequeue_sum = 0;
143   auto func = [&dequeue_sum](size_t elem) { dequeue_sum += elem; };
144   while (true) {
145     q->sweep(func);
146     if (producer_num.load(std::memory_order_acquire) == 0) {
147       q->sweep(func);
148       // If all producers are done and we still dequeue to nothing,
149       // the consumer quits.
150       break;
151     }
152   }
153   EXPECT_GT(dequeue_sum, 0);
154 }
155
156 std::atomic_int FollyQueueEnqueueDequeueTest_Parallel::producer_num;
157
158 unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nConsumerWaitTime;
159 unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nThreadCount;
160 // Unbounded queue
161 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUnboundedQueueEnqueueStride;
162 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPSCQueueEnqueueCount;
163 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPSCQueueEnqueueCount;
164 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPMCQueueEnqueueCount;
165 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPMCQueueEnqueueCount;
166 // Dynamic bounded queue
167 size_t
168     FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueEnqueueStride;
169 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueCapacity;
170 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPSCQueueEnqueueCount;
171 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPSCQueueEnqueueCount;
172 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPMCQueueEnqueueCount;
173 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPMCQueueEnqueueCount;
174 // AtomicLinkedList
175 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nAtomicLinkedListPassCount;
176 // MPMC Queue (linearizable)
177 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueStride;
178 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueCapacity;
179 size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueCount;
180
181 // Used as a MPSC queue.
182 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyAtomicLinkedList) {
183   typedef AtomicLinkedList Queue;
184   std::unique_ptr<Queue> q(new Queue());
185   FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nAtomicLinkedListPassCount,
186                      1);
187 }
188
189 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyMPMCQueue) {
190   typedef MPMCQueue Queue;
191   std::unique_ptr<Queue> q(new Queue(s_nMPMCQueueCapacity));
192   FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nMPMCQueueEnqueueCount,
193                      s_nThreadCount - s_nThreadCount / 2);
194 }
195
196 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPSC) {
197   typedef DSPSCQueue Queue;
198   std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
199   FollyMPMCThreading(q.get(), 1, s_nDSPSCQueueEnqueueCount, 1);
200 }
201
202 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPSC) {
203   typedef DMPSCQueue Queue;
204   std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
205   FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nDMPSCQueueEnqueueCount, 1);
206 }
207
208 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPMC) {
209   typedef DSPMCQueue Queue;
210   std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
211   FollyMPMCThreading(q.get(), 1, s_nDSPMCQueueEnqueueCount, s_nThreadCount - 1);
212 }
213
214 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPMC) {
215   typedef DMPMCQueue Queue;
216   std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
217   FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nDMPMCQueueEnqueueCount,
218                      s_nThreadCount - s_nThreadCount / 2);
219 }
220
221 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPSC) {
222   typedef USPSCQueue Queue;
223   std::unique_ptr<Queue> q(new Queue());
224   FollyMPMCThreading(q.get(), 1, s_nUSPSCQueueEnqueueCount, 1);
225 }
226
227 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPSC) {
228   typedef UMPSCQueue Queue;
229   std::unique_ptr<Queue> q(new Queue());
230   FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nUMPSCQueueEnqueueCount, 1);
231 }
232
233 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPMC) {
234   typedef USPMCQueue Queue;
235   std::unique_ptr<Queue> q(new Queue());
236   FollyMPMCThreading(q.get(), 1, s_nUSPMCQueueEnqueueCount, s_nThreadCount - 1);
237 }
238
239 TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPMC) {
240   typedef UMPMCQueue Queue;
241
242   std::unique_ptr<Queue> q(new Queue());
243   FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nUMPMCQueueEnqueueCount,
244                      s_nThreadCount - s_nThreadCount / 2);
245 }
246
247 } // namespace folly_test