2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <sys/types.h>
30 #include <folly/FileUtil.h>
31 #include <folly/io/async/EventBase.h>
32 #include <folly/io/async/EventHandler.h>
33 #include <folly/io/async/DelayedDestruction.h>
34 #include <folly/io/async/Request.h>
35 #include <folly/Likely.h>
36 #include <folly/ScopeGuard.h>
37 #include <folly/SpinLock.h>
39 #include <glog/logging.h>
41 #if __linux__ && !__ANDROID__
42 #define FOLLY_HAVE_EVENTFD
43 #include <folly/io/async/EventFDWrapper.h>
49 * A producer-consumer queue for passing messages between EventBase threads.
51 * Messages can be added to the queue from any thread. Multiple consumers may
52 * listen to the queue from multiple EventBase threads.
54 * A NotificationQueue may not be destroyed while there are still consumers
55 * registered to receive events from the queue. It is the user's
56 * responsibility to ensure that all consumers are unregistered before the
59 * MessageT should be MoveConstructible (i.e., must support either a move
60 * constructor or a copy constructor, or both). Ideally it's move constructor
61 * (or copy constructor if no move constructor is provided) should never throw
62 * exceptions. If the constructor may throw, the consumers could end up
63 * spinning trying to move a message off the queue and failing, and then
66 template<typename MessageT>
67 class NotificationQueue {
70 * A callback interface for consuming messages from the queue as they arrive.
72 class Consumer : public DelayedDestruction, private EventHandler {
74 enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
78 destroyedFlagPtr_(nullptr),
79 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
81 // create a consumer in-place, without the need to build new class
82 template <typename TCallback>
83 static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
84 TCallback&& callback);
87 * messageAvailable() will be invoked whenever a new
88 * message is available from the pipe.
90 virtual void messageAvailable(MessageT&& message) = 0;
93 * Begin consuming messages from the specified queue.
95 * messageAvailable() will be called whenever a message is available. This
96 * consumer will continue to consume messages until stopConsuming() is
99 * A Consumer may only consume messages from a single NotificationQueue at
100 * a time. startConsuming() should not be called if this consumer is
103 void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
104 init(eventBase, queue);
105 registerHandler(READ | PERSIST);
109 * Same as above but registers this event handler as internal so that it
110 * doesn't count towards the pending reader count for the IOLoop.
112 void startConsumingInternal(
113 EventBase* eventBase, NotificationQueue* queue) {
114 init(eventBase, queue);
115 registerInternalHandler(READ | PERSIST);
119 * Stop consuming messages.
121 * startConsuming() may be called again to resume consumption of messages
122 * at a later point in time.
124 void stopConsuming();
127 * Consume messages off the queue until it is empty. No messages may be
128 * added to the queue while it is draining, so that the process is bounded.
129 * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
130 * and tryPutMessageNoThrow will return false.
132 * @returns true if the queue was drained, false otherwise. In practice,
133 * this will only fail if someone else is already draining the queue.
135 bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
138 * Get the NotificationQueue that this consumer is currently consuming
139 * messages from. Returns nullptr if the consumer is not currently
140 * consuming events from any queue.
142 NotificationQueue* getCurrentQueue() const {
147 * Set a limit on how many messages this consumer will read each iteration
148 * around the event loop.
150 * This helps rate-limit how much work the Consumer will do each event loop
151 * iteration, to prevent it from starving other event handlers.
153 * A limit of 0 means no limit will be enforced. If unset, the limit
154 * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
156 void setMaxReadAtOnce(uint32_t maxAtOnce) {
157 maxReadAtOnce_ = maxAtOnce;
159 uint32_t getMaxReadAtOnce() const {
160 return maxReadAtOnce_;
163 EventBase* getEventBase() {
167 void handlerReady(uint16_t events) noexcept override;
171 void destroy() override;
173 virtual ~Consumer() {}
177 * Consume messages off the the queue until
178 * - the queue is empty (1), or
179 * - until the consumer is destroyed, or
180 * - until the consumer is uninstalled, or
181 * - an exception is thrown in the course of dequeueing, or
182 * - unless isDrain is true, until the maxReadAtOnce_ limit is hit
184 * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
186 void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
188 void setActive(bool active, bool shouldLock = false) {
194 queue_->spinlock_.lock();
196 if (!active_ && active) {
197 ++queue_->numActiveConsumers_;
198 } else if (active_ && !active) {
199 --queue_->numActiveConsumers_;
203 queue_->spinlock_.unlock();
206 void init(EventBase* eventBase, NotificationQueue* queue);
208 NotificationQueue* queue_;
209 bool* destroyedFlagPtr_;
210 uint32_t maxReadAtOnce_;
217 #ifdef FOLLY_HAVE_EVENTFD
223 * Create a new NotificationQueue.
225 * If the maxSize parameter is specified, this sets the maximum queue size
226 * that will be enforced by tryPutMessage(). (This size is advisory, and may
227 * be exceeded if producers explicitly use putMessage() instead of
230 * The fdType parameter determines the type of file descriptor used
231 * internally to signal message availability. The default (eventfd) is
232 * preferable for performance and because it won't fail when the queue gets
233 * too long. It is not available on on older and non-linux kernels, however.
234 * In this case the code will fall back to using a pipe, the parameter is
235 * mostly for testing purposes.
237 explicit NotificationQueue(uint32_t maxSize = 0,
238 #ifdef FOLLY_HAVE_EVENTFD
239 FdType fdType = FdType::EVENTFD)
241 FdType fdType = FdType::PIPE)
245 advisoryMaxQueueSize_(maxSize),
246 pid_(pid_t(getpid())),
249 RequestContext::saveContext();
251 #ifdef FOLLY_HAVE_EVENTFD
252 if (fdType == FdType::EVENTFD) {
253 eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
254 if (eventfd_ == -1) {
255 if (errno == ENOSYS || errno == EINVAL) {
256 // eventfd not availalble
257 LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
258 << errno << ", falling back to pipe mode (is your kernel "
260 fdType = FdType::PIPE;
263 folly::throwSystemError("Failed to create eventfd for "
264 "NotificationQueue", errno);
269 if (fdType == FdType::PIPE) {
270 if (pipe(pipeFds_)) {
271 folly::throwSystemError("Failed to create pipe for NotificationQueue",
275 // put both ends of the pipe into non-blocking mode
276 if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
277 folly::throwSystemError("failed to put NotificationQueue pipe read "
278 "endpoint into non-blocking mode", errno);
280 if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
281 folly::throwSystemError("failed to put NotificationQueue pipe write "
282 "endpoint into non-blocking mode", errno);
285 ::close(pipeFds_[0]);
286 ::close(pipeFds_[1]);
292 ~NotificationQueue() {
297 if (pipeFds_[0] >= 0) {
298 ::close(pipeFds_[0]);
301 if (pipeFds_[1] >= 0) {
302 ::close(pipeFds_[1]);
308 * Set the advisory maximum queue size.
310 * This maximum queue size affects calls to tryPutMessage(). Message
311 * producers can still use the putMessage() call to unconditionally put a
312 * message on the queue, ignoring the configured maximum queue size. This
313 * can cause the queue size to exceed the configured maximum.
315 void setMaxQueueSize(uint32_t max) {
316 advisoryMaxQueueSize_ = max;
320 * Attempt to put a message on the queue if the queue is not already full.
322 * If the queue is full, a std::overflow_error will be thrown. The
323 * setMaxQueueSize() function controls the maximum queue size.
325 * If the queue is currently draining, an std::runtime_error will be thrown.
327 * This method may contend briefly on a spinlock if many threads are
328 * concurrently accessing the queue, but for all intents and purposes it will
329 * immediately place the message on the queue and return.
331 * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
332 * may throw any other exception thrown by the MessageT move/copy
335 void tryPutMessage(MessageT&& message) {
336 putMessageImpl(std::move(message), advisoryMaxQueueSize_);
338 void tryPutMessage(const MessageT& message) {
339 putMessageImpl(message, advisoryMaxQueueSize_);
343 * No-throw versions of the above. Instead returns true on success, false on
346 * Only std::overflow_error (the common exception case) and std::runtime_error
347 * (which indicates that the queue is being drained) are prevented from being
348 * thrown. User code must still catch std::bad_alloc errors.
350 bool tryPutMessageNoThrow(MessageT&& message) {
351 return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
353 bool tryPutMessageNoThrow(const MessageT& message) {
354 return putMessageImpl(message, advisoryMaxQueueSize_, false);
358 * Unconditionally put a message on the queue.
360 * This method is like tryPutMessage(), but ignores the maximum queue size
361 * and always puts the message on the queue, even if the maximum queue size
364 * putMessage() may throw
365 * - std::bad_alloc if memory allocation fails, and may
366 * - std::runtime_error if the queue is currently draining
367 * - any other exception thrown by the MessageT move/copy constructor.
369 void putMessage(MessageT&& message) {
370 putMessageImpl(std::move(message), 0);
372 void putMessage(const MessageT& message) {
373 putMessageImpl(message, 0);
377 * Put several messages on the queue.
379 template<typename InputIteratorT>
380 void putMessages(InputIteratorT first, InputIteratorT last) {
381 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
383 putMessagesImpl(first, last, IterCategory());
387 * Try to immediately pull a message off of the queue, without blocking.
389 * If a message is immediately available, the result parameter will be
390 * updated to contain the message contents and true will be returned.
392 * If no message is available, false will be returned and result will be left
395 bool tryConsume(MessageT& result) {
400 folly::SpinLockGuard g(spinlock_);
402 if (UNLIKELY(queue_.empty())) {
406 auto data = std::move(queue_.front());
408 RequestContext::setContext(data.second);
412 // Handle an exception if the assignment operator happens to throw.
413 // We consumed an event but weren't able to pop the message off the
414 // queue. Signal the event again since the message is still in the
424 folly::SpinLockGuard g(spinlock_);
425 return queue_.size();
429 * Check that the NotificationQueue is being used from the correct process.
431 * If you create a NotificationQueue in one process, then fork, and try to
432 * send messages to the queue from the child process, you're going to have a
433 * bad time. Unfortunately users have (accidentally) run into this.
435 * Because we use an eventfd/pipe, the child process can actually signal the
436 * parent process that an event is ready. However, it can't put anything on
437 * the parent's queue, so the parent wakes up and finds an empty queue. This
438 * check ensures that we catch the problem in the misbehaving child process
439 * code, and crash before signalling the parent process.
441 void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); }
444 // Forbidden copy constructor and assignment operator
445 NotificationQueue(NotificationQueue const &) = delete;
446 NotificationQueue& operator=(NotificationQueue const &) = delete;
448 inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
449 DCHECK(0 == spinlock_.trylock());
450 if (maxSize > 0 && queue_.size() >= maxSize) {
452 throw std::overflow_error("unable to add message to NotificationQueue: "
460 inline bool checkDraining(bool throws=true) {
461 if (UNLIKELY(draining_ && throws)) {
462 throw std::runtime_error("queue is draining, cannot add message");
467 inline void signalEvent(size_t numAdded = 1) const {
468 static const uint8_t kPipeMessage[] = {
469 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
472 ssize_t bytes_written = 0;
473 ssize_t bytes_expected = 0;
475 // eventfd(2) dictates that we must write a 64-bit integer
476 uint64_t numAdded64(numAdded);
477 bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
478 bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
480 // pipe semantics, add one message for each numAdded
481 bytes_expected = numAdded;
483 size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
484 ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
486 // TODO: if the pipe is full, write will fail with EAGAIN.
487 // See task #1044651 for how this could be handled
492 } while (numAdded > 0);
494 if (bytes_written != bytes_expected) {
495 folly::throwSystemError("failed to signal NotificationQueue after "
500 bool tryConsumeEvent() {
504 rc = readNoInt(eventfd_, &value, sizeof(value));
507 rc = readNoInt(pipeFds_[0], &value8, sizeof(value8));
511 // EAGAIN should pretty much be the only error we can ever get.
512 // This means someone else already processed the only available message.
513 CHECK_EQ(errno, EAGAIN);
520 bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
524 folly::SpinLockGuard g(spinlock_);
525 if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
528 // We only need to signal an event if not all consumers are
530 if (numActiveConsumers_ < numConsumers_) {
533 queue_.emplace_back(std::move(message), RequestContext::saveContext());
542 const MessageT& message, size_t maxSize, bool throws=true) {
546 folly::SpinLockGuard g(spinlock_);
547 if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
550 if (numActiveConsumers_ < numConsumers_) {
553 queue_.emplace_back(message, RequestContext::saveContext());
561 template<typename InputIteratorT>
562 void putMessagesImpl(InputIteratorT first, InputIteratorT last,
563 std::input_iterator_tag) {
568 folly::SpinLockGuard g(spinlock_);
570 while (first != last) {
571 queue_.emplace_back(*first, RequestContext::saveContext());
575 if (numActiveConsumers_ < numConsumers_) {
584 mutable folly::SpinLock spinlock_;
586 int pipeFds_[2]; // to fallback to on older/non-linux systems
587 uint32_t advisoryMaxQueueSize_;
589 std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
590 int numConsumers_{0};
591 std::atomic<int> numActiveConsumers_{0};
592 bool draining_{false};
595 template<typename MessageT>
596 void NotificationQueue<MessageT>::Consumer::destroy() {
597 // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
598 // will be non-nullptr. Mark the value that it points to, so that
599 // handlerReady() will know the callback is destroyed, and that it cannot
600 // access any member variables anymore.
601 if (destroyedFlagPtr_) {
602 *destroyedFlagPtr_ = true;
605 DelayedDestruction::destroy();
608 template<typename MessageT>
609 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
611 consumeMessages(false);
614 template<typename MessageT>
615 void NotificationQueue<MessageT>::Consumer::consumeMessages(
616 bool isDrain, size_t* numConsumed) noexcept {
617 DestructorGuard dg(this);
618 uint32_t numProcessed = 0;
619 bool firstRun = true;
621 SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
623 if (numConsumed != nullptr) {
624 *numConsumed = numProcessed;
628 // Try to decrement the eventfd.
630 // The eventfd is only used to wake up the consumer - there may or
631 // may not actually be an event available (another consumer may
632 // have read it). We don't really care, we only care about
633 // emptying the queue.
634 if (!isDrain && firstRun) {
635 queue_->tryConsumeEvent();
639 // Now pop the message off of the queue.
641 // We have to manually acquire and release the spinlock here, rather than
642 // using SpinLockHolder since the MessageT has to be constructed while
643 // holding the spinlock and available after we release it. SpinLockHolder
644 // unfortunately doesn't provide a release() method. (We can't construct
645 // MessageT first since we have no guarantee that MessageT has a default
647 queue_->spinlock_.lock();
651 if (UNLIKELY(queue_->queue_.empty())) {
652 // If there is no message, we've reached the end of the queue, return.
654 queue_->spinlock_.unlock();
658 // Pull a message off the queue.
659 auto& data = queue_->queue_.front();
661 MessageT msg(std::move(data.first));
663 RequestContext::setContext(data.second);
664 queue_->queue_.pop_front();
666 // Check to see if the queue is empty now.
667 // We use this as an optimization to see if we should bother trying to
668 // loop again and read another message after invoking this callback.
669 bool wasEmpty = queue_->queue_.empty();
674 // Now unlock the spinlock before we invoke the callback.
675 queue_->spinlock_.unlock();
679 bool callbackDestroyed = false;
680 CHECK(destroyedFlagPtr_ == nullptr);
681 destroyedFlagPtr_ = &callbackDestroyed;
682 messageAvailable(std::move(msg));
683 destroyedFlagPtr_ = nullptr;
685 RequestContext::setContext(old_ctx);
687 // If the callback was destroyed before it returned, we are done
688 if (callbackDestroyed) {
692 // If the callback is no longer installed, we are done.
693 if (queue_ == nullptr) {
697 // If we have hit maxReadAtOnce_, we are done.
699 if (!isDrain && maxReadAtOnce_ > 0 &&
700 numProcessed >= maxReadAtOnce_) {
701 queue_->signalEvent(1);
705 // If the queue was empty before we invoked the callback, it's probable
706 // that it is still empty now. Just go ahead and return, rather than
707 // looping again and trying to re-read from the eventfd. (If a new
708 // message had in fact arrived while we were invoking the callback, we
709 // will simply be woken up the next time around the event loop and will
710 // process the message then.)
714 } catch (const std::exception& ex) {
715 // This catch block is really just to handle the case where the MessageT
716 // constructor throws. The messageAvailable() callback itself is
717 // declared as noexcept and should never throw.
719 // If the MessageT constructor does throw we try to handle it as best as
720 // we can, but we can't work miracles. We will just ignore the error for
721 // now and return. The next time around the event loop we will end up
722 // trying to read the message again. If MessageT continues to throw we
723 // will never make forward progress and will keep trying each time around
726 // Unlock the spinlock.
727 queue_->spinlock_.unlock();
729 // Push a notification back on the eventfd since we didn't actually
730 // read the message off of the queue.
732 queue_->signalEvent(1);
741 template<typename MessageT>
742 void NotificationQueue<MessageT>::Consumer::init(
743 EventBase* eventBase,
744 NotificationQueue* queue) {
745 assert(eventBase->isInEventBaseThread());
746 assert(queue_ == nullptr);
747 assert(!isHandlerRegistered());
755 folly::SpinLockGuard g(queue_->spinlock_);
756 queue_->numConsumers_++;
758 queue_->signalEvent();
760 if (queue_->eventfd_ >= 0) {
761 initHandler(eventBase, queue_->eventfd_);
763 initHandler(eventBase, queue_->pipeFds_[0]);
767 template<typename MessageT>
768 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
769 if (queue_ == nullptr) {
770 assert(!isHandlerRegistered());
775 folly::SpinLockGuard g(queue_->spinlock_);
776 queue_->numConsumers_--;
780 assert(isHandlerRegistered());
786 template<typename MessageT>
787 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
788 size_t* numConsumed) noexcept {
789 DestructorGuard dg(this);
791 folly::SpinLockGuard g(queue_->spinlock_);
792 if (queue_->draining_) {
795 queue_->draining_ = true;
797 consumeMessages(true, numConsumed);
799 folly::SpinLockGuard g(queue_->spinlock_);
800 queue_->draining_ = false;
806 * Creates a NotificationQueue::Consumer wrapping a function object
807 * Modeled after AsyncTimeout::make
813 template <typename MessageT, typename TCallback>
814 struct notification_queue_consumer_wrapper
815 : public NotificationQueue<MessageT>::Consumer {
817 template <typename UCallback>
818 explicit notification_queue_consumer_wrapper(UCallback&& callback)
819 : callback_(std::forward<UCallback>(callback)) {}
821 // we are being stricter here and requiring noexcept for callback
822 void messageAvailable(MessageT&& message) override {
824 noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
825 "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
828 callback_(std::forward<MessageT>(message));
835 } // namespace detail
837 template <typename MessageT>
838 template <typename TCallback>
839 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
840 DelayedDestruction::Destructor>
841 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
842 return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
843 DelayedDestruction::Destructor>(
844 new detail::notification_queue_consumer_wrapper<
846 typename std::decay<TCallback>::type>(
847 std::forward<TCallback>(callback)));