Adds Folly MPMCQueue test case
authorPeizhao Ou <peizhaoo@uci.edu>
Wed, 7 Feb 2018 18:56:24 +0000 (10:56 -0800)
committerPeizhao Ou <peizhaoo@uci.edu>
Wed, 7 Feb 2018 18:56:24 +0000 (10:56 -0800)
folly/stress-test/stress-sequential-folly-queue.cpp

index 21f5311ca3de668151adcd4fa9e92ae5737d009d..d3c3906a5dcdb16213e4307ebc959a0dc599f696 100644 (file)
@@ -1,6 +1,7 @@
 #include <folly/concurrency/UnboundedQueue.h>
 #include <folly/concurrency/DynamicBoundedQueue.h>
 #include <folly/AtomicLinkedList.h>
+#include <folly/MPMCQueue.h>
 
 #include <chrono>
 #include <cassert>
@@ -50,6 +51,13 @@ size_t kAtomicLinkedListPassCount = 10000;
 const char* kAtomicLinkedListBenchmarkName = "FollyAtomicLinkedList";
 typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
 
+// MPMC Queue (linearizable)
+size_t kMPMCQueueEnqueueStride = 10000;
+size_t kMPMCQueueCapacity = 50000;
+size_t kMPMCQueueEnqueueCount = 500000000;
+const char* kMPMCQueueBenchmarkName = "FollyMPMCQueue";
+typedef folly::MPMCQueue<size_t> MPMCQueue;
+
 }
 
 void run_atomic_linkedlist() {
@@ -97,7 +105,6 @@ void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
     auto start_time = std::chrono::system_clock::now();
 
     size_t nNo = 0;
-    size_t push_failure = 0;
     size_t pop_sum = 0;
     while (nNo < enqueue_count) {
       size_t curr_push_count =
@@ -122,7 +129,49 @@ void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
                 << " != " << supposed_sum << "\n";
       std::cout << "[       FAILED ] " << kTestName << "." << bench_name
                 << " (" << milisecs.count() << " ms)" << std::endl;
-      assert(false && "Folly unbounded queue ERROR");
+      assert(false && "Folly concurrent queue ERROR");
+    } else {
+        std::cout << "[       OK ] " << kTestName << "." << bench_name
+                  << " (" << milisecs.count() << " ms)" << std::endl;
+    }
+}
+
+// MPMC Specialization.
+template <>
+void run_queue(MPMCQueue* q, size_t enqueue_count, const char* bench_name,
+               size_t enqueue_stride) {
+    std::cout << "[ RUN      ] " << kTestName << "." << bench_name << std::endl;
+    auto start_time = std::chrono::system_clock::now();
+
+    size_t nNo = 0;
+    size_t push_sum = 0;
+    size_t pop_sum = 0;
+    while (nNo < enqueue_count) {
+      size_t curr_push_count =
+          std::min(enqueue_count - nNo, enqueue_stride);
+      for (size_t i = 0; i < curr_push_count; i++) {
+        if (q->write(nNo)) {
+          push_sum += nNo;
+          nNo++;
+        }
+      }
+      size_t res;
+      while (q->read(res)) {
+        pop_sum += res;
+      }
+    }
+
+    auto finish_time = std::chrono::system_clock::now();
+    auto dur = finish_time - start_time;
+    auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
+
+    size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+    if (pop_sum != supposed_sum) {
+      std::cout << "Sequential queue pop sum: " << pop_sum
+                << " != " << supposed_sum << "\n";
+      std::cout << "[       FAILED ] " << kTestName << "." << bench_name
+                << " (" << milisecs.count() << " ms)" << std::endl;
+      assert(false && "Folly concurrent queue ERROR");
     } else {
         std::cout << "[       OK ] " << kTestName << "." << bench_name
                   << " (" << milisecs.count() << " ms)" << std::endl;
@@ -130,43 +179,68 @@ void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
 }
 
 template <typename Queue>
-void run_unbounded(size_t enqueue_count, const char* bench_name,
-                   size_t enqueue_stride) {
+void run_without_initial_capacity(size_t enqueue_count, const char* bench_name,
+                                  size_t enqueue_stride) {
   std::unique_ptr<Queue> q(new Queue());
   run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
 }
 
 template <typename Queue>
-void run_dynamic_bounded(size_t queue_capacity, size_t enqueue_count,
-                         const char* bench_name,
-                         size_t enqueue_stride) {
+void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
+                               const char* bench_name, size_t enqueue_stride) {
   std::unique_ptr<Queue> q(new Queue(queue_capacity));
   run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
 }
 
 int main() {
+  // MPMCQueue
+  run_with_initial_capacity<MPMCQueue>(
+      kMPMCQueueCapacity ,
+      kMPMCQueueEnqueueCount,
+      kMPMCQueueBenchmarkName,
+      kMPMCQueueEnqueueStride);
+
+  // AtomicLinkedList
   run_atomic_linkedlist();
-  run_unbounded<USPSCQueue>(kUSPSCQueueEnqueueCount, kUSPSCQueueBenchmarkName,
-                        kUnboundedQueueEnqueueStride);
-  run_unbounded<UMPSCQueue>(kUMPSCQueueEnqueueCount, kUMPSCQueueBenchmarkName,
-                        kUnboundedQueueEnqueueStride);
-  run_unbounded<USPMCQueue>(kUSPMCQueueEnqueueCount, kUSPMCQueueBenchmarkName,
-                        kUnboundedQueueEnqueueStride);
-  run_unbounded<UMPMCQueue>(kUMPMCQueueEnqueueCount, kUMPMCQueueBenchmarkName,
-                        kUnboundedQueueEnqueueStride);
-
-  run_dynamic_bounded<DSPSCQueue>(kDynamicBoundedQueueCapacity ,
-                        kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
-                        kDynamicBoundedQueueEnqueueStride);
-  run_dynamic_bounded<DMPSCQueue>(kDynamicBoundedQueueCapacity,
-                        kDMPSCQueueEnqueueCount, kDMPSCQueueBenchmarkName,
-                        kDynamicBoundedQueueEnqueueStride);
-  run_dynamic_bounded<DSPMCQueue>(kDynamicBoundedQueueCapacity,
-                        kDSPMCQueueEnqueueCount, kDSPMCQueueBenchmarkName,
-                        kDynamicBoundedQueueEnqueueStride);
-  run_dynamic_bounded<DMPMCQueue>(kDynamicBoundedQueueCapacity,
-                        kDMPMCQueueEnqueueCount, kDMPMCQueueBenchmarkName,
-                        kDynamicBoundedQueueEnqueueStride);
+
+  // UnboundedQueue
+  run_without_initial_capacity<USPSCQueue>(
+      kUSPSCQueueEnqueueCount,
+      kUSPSCQueueBenchmarkName,
+      kUnboundedQueueEnqueueStride);
+  run_without_initial_capacity<UMPSCQueue>(
+      kUMPSCQueueEnqueueCount,
+      kUMPSCQueueBenchmarkName,
+      kUnboundedQueueEnqueueStride);
+  run_without_initial_capacity<USPMCQueue>(
+      kUSPMCQueueEnqueueCount,
+      kUSPMCQueueBenchmarkName,
+      kUnboundedQueueEnqueueStride);
+  run_without_initial_capacity<UMPMCQueue>(
+      kUMPMCQueueEnqueueCount,
+      kUMPMCQueueBenchmarkName,
+      kUnboundedQueueEnqueueStride);
+
+  // DynamicBoundedQueue
+  run_with_initial_capacity<DSPSCQueue>(
+      kDynamicBoundedQueueCapacity ,
+      kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
+      kDynamicBoundedQueueEnqueueStride);
+  run_with_initial_capacity<DMPSCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDMPSCQueueEnqueueCount,
+      kDMPSCQueueBenchmarkName,
+      kDynamicBoundedQueueEnqueueStride);
+  run_with_initial_capacity<DSPMCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDSPMCQueueEnqueueCount,
+      kDSPMCQueueBenchmarkName,
+      kDynamicBoundedQueueEnqueueStride);
+  run_with_initial_capacity<DMPMCQueue>(
+      kDynamicBoundedQueueCapacity,
+      kDMPMCQueueEnqueueCount,
+      kDMPMCQueueBenchmarkName,
+      kDynamicBoundedQueueEnqueueStride);
 
   return 0;
 }