#include "folly/io/async/Request.h"
#include "folly/Likely.h"
#include "folly/SmallLocks.h"
+#include "folly/ScopeGuard.h"
#include <glog/logging.h>
#include <deque>
virtual void handlerReady(uint16_t events) noexcept;
private:
+
+ void setActive(bool active) {
+ DCHECK(queue_);
+ if (!active_ && active) {
+ ++queue_->numActiveConsumers_;
+ } else if (active_ && !active) {
+ --queue_->numActiveConsumers_;
+ }
+ active_ = active;
+ }
void init(EventBase* eventBase, NotificationQueue* queue);
NotificationQueue* queue_;
bool* destroyedFlagPtr_;
uint32_t maxReadAtOnce_;
EventBase* base_;
+ bool active_{false};
};
enum class FdType {
*/
bool tryConsume(MessageT& result) {
checkPid();
- if (!tryConsumeEvent()) {
- return false;
- }
try {
folly::MSLGuard g(spinlock_);
- // This shouldn't happen normally. See the comments in
- // Consumer::handlerReady() for more details.
if (UNLIKELY(queue_.empty())) {
- LOG(ERROR) << "found empty queue after signalled event";
return false;
}
bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
checkPid();
+ bool signal = false;
{
folly::MSLGuard g(spinlock_);
if (!checkQueueSize(maxSize, throws)) {
return false;
}
+ // We only need to signal an event if not all consumers are
+ // awake.
+ if (numActiveConsumers_ < numConsumers_) {
+ signal = true;
+ }
queue_.push_back(
std::make_pair(std::move(message),
RequestContext::saveContext()));
}
- signalEvent();
+ if (signal) {
+ signalEvent();
+ }
return true;
}
bool putMessageImpl(
const MessageT& message, size_t maxSize, bool throws=true) {
checkPid();
+ bool signal = false;
{
folly::MSLGuard g(spinlock_);
if (!checkQueueSize(maxSize, throws)) {
return false;
}
+ if (numActiveConsumers_ < numConsumers_) {
+ signal = true;
+ }
queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
}
- signalEvent();
+ if (signal) {
+ signalEvent();
+ }
return true;
}
void putMessagesImpl(InputIteratorT first, InputIteratorT last,
std::input_iterator_tag) {
checkPid();
+ bool signal = false;
size_t numAdded = 0;
{
folly::MSLGuard g(spinlock_);
++first;
++numAdded;
}
+ if (numActiveConsumers_ < numConsumers_) {
+ signal = true;
+ }
+ }
+ if (signal) {
+ signalEvent();
}
- signalEvent(numAdded);
}
mutable folly::MicroSpinLock spinlock_;
uint32_t advisoryMaxQueueSize_;
pid_t pid_;
std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
+ int numConsumers_{0};
+ std::atomic<int> numActiveConsumers_{0};
};
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
noexcept {
uint32_t numProcessed = 0;
+ bool firstRun = true;
+ setActive(true);
+ SCOPE_EXIT { setActive(false); };
while (true) {
// Try to decrement the eventfd.
//
- // We decrement the eventfd before checking the queue, and only pop a
- // message off the queue if we read from the eventfd.
- //
- // Reading the eventfd first allows us to not have to hold the spinlock
- // while accessing the eventfd. If we popped from the queue first, we
- // would have to hold the lock while reading from or writing to the
- // eventfd. (Multiple consumers may be woken up from a single eventfd
- // notification. If we popped from the queue first, we could end up
- // popping a message from the queue before the eventfd has been notified by
- // the producer, unless the consumer and producer both held the spinlock
- // around the entire operation.)
- if (!queue_->tryConsumeEvent()) {
- // no message available right now
- return;
+ // The eventfd is only used to wake up the consumer - there may or
+ // may not actually be an event available (another consumer may
+ // have read it). We don't really care, we only care about
+ // emptying the queue.
+ if (firstRun) {
+ queue_->tryConsumeEvent();
+ firstRun = false;
}
// Now pop the message off of the queue.
- // We successfully consumed the eventfd notification.
- // There should be a message available for us to consume.
//
// We have to manually acquire and release the spinlock here, rather than
// using SpinLockHolder since the MessageT has to be constructed while
bool locked = true;
try {
- // The eventfd is incremented once for every message, and only
- // decremented when a message is popped off. There should always be a
- // message here to read.
if (UNLIKELY(queue_->queue_.empty())) {
- // Unfortunately we have seen this happen in practice if a user forks
- // the process, and then the child tries to send a message to a
- // NotificationQueue being monitored by a thread in the parent.
- // The child can signal the parent via the eventfd, but won't have been
- // able to put anything on the parent's queue since it has a separate
- // address space.
- //
- // This is a bug in the sender's code. putMessagesImpl() should cause
- // the sender to crash now before trying to send a message from the
- // wrong process. However, just in case let's handle this case in the
- // consumer without crashing.
- LOG(ERROR) << "found empty queue after signalled event";
+ // If there is no message, we've reached the end of the queue, return.
queue_->spinlock_.unlock();
return;
}
// We use this as an optimization to see if we should bother trying to
// loop again and read another message after invoking this callback.
bool wasEmpty = queue_->queue_.empty();
+ if (wasEmpty) {
+ setActive(false);
+ }
// Now unlock the spinlock before we invoke the callback.
queue_->spinlock_.unlock();
// If we have hit maxReadAtOnce_, we are done.
++numProcessed;
if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+ queue_->signalEvent(1);
return;
}
base_ = eventBase;
queue_ = queue;
+
+ {
+ folly::MSLGuard g(queue_->spinlock_);
+ queue_->numConsumers_++;
+ }
+ queue_->signalEvent();
+
if (queue_->eventfd_ >= 0) {
initHandler(eventBase, queue_->eventfd_);
} else {
return;
}
+ {
+ folly::MSLGuard g(queue_->spinlock_);
+ queue_->numConsumers_--;
+ setActive(false);
+ }
+
assert(isHandlerRegistered());
unregisterHandler();
detachEventBase();