#include <folly/concurrency/UnboundedQueue.h>
#include <folly/concurrency/DynamicBoundedQueue.h>
#include <folly/AtomicLinkedList.h>
+#include <folly/MPMCQueue.h>
#include <chrono>
#include <cassert>
const char* kAtomicLinkedListBenchmarkName = "FollyAtomicLinkedList";
typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
+// MPMC Queue (linearizable)
+size_t kMPMCQueueEnqueueStride = 10000;
+size_t kMPMCQueueCapacity = 50000;
+size_t kMPMCQueueEnqueueCount = 500000000;
+const char* kMPMCQueueBenchmarkName = "FollyMPMCQueue";
+typedef folly::MPMCQueue<size_t> MPMCQueue;
+
}
void run_atomic_linkedlist() {
auto start_time = std::chrono::system_clock::now();
size_t nNo = 0;
- size_t push_failure = 0;
size_t pop_sum = 0;
while (nNo < enqueue_count) {
size_t curr_push_count =
<< " != " << supposed_sum << "\n";
std::cout << "[ FAILED ] " << kTestName << "." << bench_name
<< " (" << milisecs.count() << " ms)" << std::endl;
- assert(false && "Folly unbounded queue ERROR");
+ assert(false && "Folly concurrent queue ERROR");
+ } else {
+ std::cout << "[ OK ] " << kTestName << "." << bench_name
+ << " (" << milisecs.count() << " ms)" << std::endl;
+ }
+}
+
+// MPMC Specialization.
+template <>
+void run_queue(MPMCQueue* q, size_t enqueue_count, const char* bench_name,
+ size_t enqueue_stride) {
+ std::cout << "[ RUN ] " << kTestName << "." << bench_name << std::endl;
+ auto start_time = std::chrono::system_clock::now();
+
+ size_t nNo = 0;
+ size_t push_sum = 0;
+ size_t pop_sum = 0;
+ while (nNo < enqueue_count) {
+ size_t curr_push_count =
+ std::min(enqueue_count - nNo, enqueue_stride);
+ for (size_t i = 0; i < curr_push_count; i++) {
+ if (q->write(nNo)) {
+ push_sum += nNo;
+ nNo++;
+ }
+ }
+ size_t res;
+ while (q->read(res)) {
+ pop_sum += res;
+ }
+ }
+
+ auto finish_time = std::chrono::system_clock::now();
+ auto dur = finish_time - start_time;
+ auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
+
+ size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
+ if (pop_sum != supposed_sum) {
+ std::cout << "Sequential queue pop sum: " << pop_sum
+ << " != " << supposed_sum << "\n";
+ std::cout << "[ FAILED ] " << kTestName << "." << bench_name
+ << " (" << milisecs.count() << " ms)" << std::endl;
+ assert(false && "Folly concurrent queue ERROR");
} else {
std::cout << "[ OK ] " << kTestName << "." << bench_name
<< " (" << milisecs.count() << " ms)" << std::endl;
}
template <typename Queue>
-void run_unbounded(size_t enqueue_count, const char* bench_name,
- size_t enqueue_stride) {
+void run_without_initial_capacity(size_t enqueue_count, const char* bench_name,
+ size_t enqueue_stride) {
std::unique_ptr<Queue> q(new Queue());
run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
}
template <typename Queue>
-void run_dynamic_bounded(size_t queue_capacity, size_t enqueue_count,
- const char* bench_name,
- size_t enqueue_stride) {
+void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
+ const char* bench_name, size_t enqueue_stride) {
std::unique_ptr<Queue> q(new Queue(queue_capacity));
run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
}
int main() {
+ // MPMCQueue
+ run_with_initial_capacity<MPMCQueue>(
+ kMPMCQueueCapacity ,
+ kMPMCQueueEnqueueCount,
+ kMPMCQueueBenchmarkName,
+ kMPMCQueueEnqueueStride);
+
+ // AtomicLinkedList
run_atomic_linkedlist();
- run_unbounded<USPSCQueue>(kUSPSCQueueEnqueueCount, kUSPSCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
- run_unbounded<UMPSCQueue>(kUMPSCQueueEnqueueCount, kUMPSCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
- run_unbounded<USPMCQueue>(kUSPMCQueueEnqueueCount, kUSPMCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
- run_unbounded<UMPMCQueue>(kUMPMCQueueEnqueueCount, kUMPMCQueueBenchmarkName,
- kUnboundedQueueEnqueueStride);
-
- run_dynamic_bounded<DSPSCQueue>(kDynamicBoundedQueueCapacity ,
- kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
- run_dynamic_bounded<DMPSCQueue>(kDynamicBoundedQueueCapacity,
- kDMPSCQueueEnqueueCount, kDMPSCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
- run_dynamic_bounded<DSPMCQueue>(kDynamicBoundedQueueCapacity,
- kDSPMCQueueEnqueueCount, kDSPMCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
- run_dynamic_bounded<DMPMCQueue>(kDynamicBoundedQueueCapacity,
- kDMPMCQueueEnqueueCount, kDMPMCQueueBenchmarkName,
- kDynamicBoundedQueueEnqueueStride);
+
+ // UnboundedQueue
+ run_without_initial_capacity<USPSCQueue>(
+ kUSPSCQueueEnqueueCount,
+ kUSPSCQueueBenchmarkName,
+ kUnboundedQueueEnqueueStride);
+ run_without_initial_capacity<UMPSCQueue>(
+ kUMPSCQueueEnqueueCount,
+ kUMPSCQueueBenchmarkName,
+ kUnboundedQueueEnqueueStride);
+ run_without_initial_capacity<USPMCQueue>(
+ kUSPMCQueueEnqueueCount,
+ kUSPMCQueueBenchmarkName,
+ kUnboundedQueueEnqueueStride);
+ run_without_initial_capacity<UMPMCQueue>(
+ kUMPMCQueueEnqueueCount,
+ kUMPMCQueueBenchmarkName,
+ kUnboundedQueueEnqueueStride);
+
+ // DynamicBoundedQueue
+ run_with_initial_capacity<DSPSCQueue>(
+ kDynamicBoundedQueueCapacity ,
+ kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
+ kDynamicBoundedQueueEnqueueStride);
+ run_with_initial_capacity<DMPSCQueue>(
+ kDynamicBoundedQueueCapacity,
+ kDMPSCQueueEnqueueCount,
+ kDMPSCQueueBenchmarkName,
+ kDynamicBoundedQueueEnqueueStride);
+ run_with_initial_capacity<DSPMCQueue>(
+ kDynamicBoundedQueueCapacity,
+ kDSPMCQueueEnqueueCount,
+ kDSPMCQueueBenchmarkName,
+ kDynamicBoundedQueueEnqueueStride);
+ run_with_initial_capacity<DMPMCQueue>(
+ kDynamicBoundedQueueCapacity,
+ kDMPMCQueueEnqueueCount,
+ kDMPMCQueueBenchmarkName,
+ kDynamicBoundedQueueEnqueueStride);
return 0;
}