#pragma once
#include <folly/Synchronized.h>
#pragma once
#include <folly/Synchronized.h>
+#include <folly/concurrency/UnboundedQueue.h>
#include <folly/executors/task_queue/BlockingQueue.h>
#include <folly/synchronization/LifoSem.h>
#include <folly/executors/task_queue/BlockingQueue.h>
#include <folly/synchronization/LifoSem.h>
-// Warning: this is effectively just a std::deque wrapped in a single mutex
-// We are aiming to add a more performant concurrent unbounded queue in the
-// future, but this class is available if you must have an unbounded queue
-// and can tolerate any contention.
template <class T>
class UnboundedBlockingQueue : public BlockingQueue<T> {
public:
virtual ~UnboundedBlockingQueue() {}
void add(T item) override {
template <class T>
class UnboundedBlockingQueue : public BlockingQueue<T> {
public:
virtual ~UnboundedBlockingQueue() {}
void add(T item) override {
- queue_.wlock()->push(std::move(item));
+ queue_.enqueue(std::move(item));
sem_.post();
}
T take() override {
sem_.post();
}
T take() override {
- while (true) {
- {
- auto ulockedQueue = queue_.ulock();
- if (!ulockedQueue->empty()) {
- auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
- T item = std::move(wlockedQueue->front());
- wlockedQueue->pop();
- return item;
- }
- }
+ T item;
+ while (!queue_.try_dequeue(item)) {
}
size_t size() override {
}
size_t size() override {
- return queue_.rlock()->size();
- Synchronized<std::queue<T>> queue_;
+ UMPMCQueue<T, false> queue_;