* If runInEventBaseThread() returns true the function has successfully been
* scheduled to run in the loop thread. However, if the loop is terminated
* (and never later restarted) before it has a chance to run the requested
- * function, the function may never be run at all. The caller is responsible
- * for handling this situation correctly if they may terminate the loop with
- * outstanding runInEventBaseThread() calls pending.
+ * function, the function will be run upon the EventBase's destruction.
*
* If two calls to runInEventBaseThread() are made from the same thread, the
* functions will always be run in the order that they were scheduled.
* function pointer and void* argument, as it has to allocate memory to copy
* the std::function object.
*
- * If the EventBase loop is terminated before it has a chance to run this
- * function, the allocated memory will be leaked. The caller is responsible
- * for ensuring that the EventBase loop is not terminated before this
- * function can run.
+ * If the loop is terminated (and never later restarted) before it has a
+ * chance to run the requested function, the function will be run upon the
+ * EventBase's destruction.
*
* The function must not throw any exceptions.
*/
*/
void stopConsuming();
+ /**
+ * Consume messages off the queue until it is empty. No messages may be
+ * added to the queue while it is draining, so that the process is bounded.
+ * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
+ * and tryPutMessageNoThrow will return false.
+ *
+ * @returns true if the queue was drained, false otherwise. In practice,
+ * this will only fail if someone else is already draining the queue.
+ */
+ bool consumeUntilDrained() noexcept;
+
/**
* Get the NotificationQueue that this consumer is currently consuming
* messages from. Returns nullptr if the consumer is not currently
virtual void handlerReady(uint16_t events) noexcept;
private:
+ /**
+ * Consume messages off the the queue until
+ * - the queue is empty (1), or
+ * - until the consumer is destroyed, or
+ * - until the consumer is uninstalled, or
+ * - an exception is thrown in the course of dequeueing, or
+ * - unless isDrain is true, until the maxReadAtOnce_ limit is hit
+ *
+ * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
+ */
+ void consumeMessages(bool isDrain) noexcept;
void setActive(bool active, bool shouldLock = false) {
if (!queue_) {
* If the queue is full, a std::overflow_error will be thrown. The
* setMaxQueueSize() function controls the maximum queue size.
*
+ * If the queue is currently draining, an std::runtime_error will be thrown.
+ *
* This method may contend briefly on a spinlock if many threads are
* concurrently accessing the queue, but for all intents and purposes it will
* immediately place the message on the queue and return.
* No-throw versions of the above. Instead returns true on success, false on
* failure.
*
- * Only std::overflow_error is prevented from being thrown (since this is the
- * common exception case), user code must still catch std::bad_alloc errors.
+ * Only std::overflow_error (the common exception case) and std::runtime_error
+ * (which indicates that the queue is being drained) are prevented from being
+ * thrown. User code must still catch std::bad_alloc errors.
*/
bool tryPutMessageNoThrow(MessageT&& message) {
return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
* and always puts the message on the queue, even if the maximum queue size
* would be exceeded.
*
- * putMessage() may throw std::bad_alloc if memory allocation fails, and may
- * throw any other exception thrown by the MessageT move/copy constructor.
+ * putMessage() may throw
+ * - std::bad_alloc if memory allocation fails, and may
+ * - std::runtime_error if the queue is currently draining
+ * - any other exception thrown by the MessageT move/copy constructor.
*/
void putMessage(MessageT&& message) {
putMessageImpl(std::move(message), 0);
return true;
}
+ inline bool checkDraining(bool throws=true) {
+ if (UNLIKELY(draining_ && throws)) {
+ throw std::runtime_error("queue is draining, cannot add message");
+ }
+ return draining_;
+ }
+
inline void signalEvent(size_t numAdded = 1) const {
static const uint8_t kPipeMessage[] = {
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
bool signal = false;
{
folly::io::PortableSpinLockGuard g(spinlock_);
- if (!checkQueueSize(maxSize, throws)) {
+ if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
return false;
}
// We only need to signal an event if not all consumers are
bool signal = false;
{
folly::io::PortableSpinLockGuard g(spinlock_);
- if (!checkQueueSize(maxSize, throws)) {
+ if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
return false;
}
if (numActiveConsumers_ < numConsumers_) {
size_t numAdded = 0;
{
folly::io::PortableSpinLockGuard g(spinlock_);
+ checkDraining();
while (first != last) {
queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
++first;
std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
int numConsumers_{0};
std::atomic<int> numActiveConsumers_{0};
+ bool draining_{false};
};
template<typename MessageT>
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
noexcept {
+ consumeMessages(false);
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::consumeMessages(
+ bool isDrain) noexcept {
uint32_t numProcessed = 0;
bool firstRun = true;
setActive(true);
// may not actually be an event available (another consumer may
// have read it). We don't really care, we only care about
// emptying the queue.
- if (firstRun) {
+ if (!isDrain && firstRun) {
queue_->tryConsumeEvent();
firstRun = false;
}
// If we have hit maxReadAtOnce_, we are done.
++numProcessed;
- if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+ if (!isDrain && maxReadAtOnce_ > 0 &&
+ numProcessed >= maxReadAtOnce_) {
queue_->signalEvent(1);
return;
}
// Push a notification back on the eventfd since we didn't actually
// read the message off of the queue.
- queue_->signalEvent(1);
+ if (!isDrain) {
+ queue_->signalEvent(1);
+ }
}
return;
queue_ = nullptr;
}
+template<typename MessageT>
+bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
+ {
+ folly::io::PortableSpinLockGuard g(queue_->spinlock_);
+ if (queue_->draining_) {
+ return false;
+ }
+ queue_->draining_ = true;
+ }
+ consumeMessages(true);
+ {
+ folly::io::PortableSpinLockGuard g(queue_->spinlock_);
+ queue_->draining_ = false;
+ }
+ return true;
+}
+
} // folly