2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <sys/types.h>
28 #include <folly/Exception.h>
29 #include <folly/FileUtil.h>
30 #include <folly/io/async/EventBase.h>
31 #include <folly/io/async/EventHandler.h>
32 #include <folly/io/async/DelayedDestruction.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/Likely.h>
35 #include <folly/ScopeGuard.h>
36 #include <folly/SpinLock.h>
37 #include <folly/portability/Fcntl.h>
38 #include <folly/portability/Sockets.h>
39 #include <folly/portability/Unistd.h>
41 #include <glog/logging.h>
43 #if __linux__ && !__ANDROID__
44 #define FOLLY_HAVE_EVENTFD
45 #include <folly/io/async/EventFDWrapper.h>
51 * A producer-consumer queue for passing messages between EventBase threads.
53 * Messages can be added to the queue from any thread. Multiple consumers may
54 * listen to the queue from multiple EventBase threads.
56 * A NotificationQueue may not be destroyed while there are still consumers
57 * registered to receive events from the queue. It is the user's
58 * responsibility to ensure that all consumers are unregistered before the
61 * MessageT should be MoveConstructible (i.e., must support either a move
62 * constructor or a copy constructor, or both). Ideally it's move constructor
63 * (or copy constructor if no move constructor is provided) should never throw
64 * exceptions. If the constructor may throw, the consumers could end up
65 * spinning trying to move a message off the queue and failing, and then
68 template<typename MessageT>
69 class NotificationQueue {
72 * A callback interface for consuming messages from the queue as they arrive.
74 class Consumer : public DelayedDestruction, private EventHandler {
76 enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
80 destroyedFlagPtr_(nullptr),
81 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
83 // create a consumer in-place, without the need to build new class
84 template <typename TCallback>
85 static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
86 TCallback&& callback);
89 * messageAvailable() will be invoked whenever a new
90 * message is available from the pipe.
92 virtual void messageAvailable(MessageT&& message) noexcept = 0;
95 * Begin consuming messages from the specified queue.
97 * messageAvailable() will be called whenever a message is available. This
98 * consumer will continue to consume messages until stopConsuming() is
101 * A Consumer may only consume messages from a single NotificationQueue at
102 * a time. startConsuming() should not be called if this consumer is
105 void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
106 init(eventBase, queue);
107 registerHandler(READ | PERSIST);
111 * Same as above but registers this event handler as internal so that it
112 * doesn't count towards the pending reader count for the IOLoop.
114 void startConsumingInternal(
115 EventBase* eventBase, NotificationQueue* queue) {
116 init(eventBase, queue);
117 registerInternalHandler(READ | PERSIST);
121 * Stop consuming messages.
123 * startConsuming() may be called again to resume consumption of messages
124 * at a later point in time.
126 void stopConsuming();
129 * Consume messages off the queue until it is empty. No messages may be
130 * added to the queue while it is draining, so that the process is bounded.
131 * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
132 * and tryPutMessageNoThrow will return false.
134 * @returns true if the queue was drained, false otherwise. In practice,
135 * this will only fail if someone else is already draining the queue.
137 bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
140 * Get the NotificationQueue that this consumer is currently consuming
141 * messages from. Returns nullptr if the consumer is not currently
142 * consuming events from any queue.
144 NotificationQueue* getCurrentQueue() const {
149 * Set a limit on how many messages this consumer will read each iteration
150 * around the event loop.
152 * This helps rate-limit how much work the Consumer will do each event loop
153 * iteration, to prevent it from starving other event handlers.
155 * A limit of 0 means no limit will be enforced. If unset, the limit
156 * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
158 void setMaxReadAtOnce(uint32_t maxAtOnce) {
159 maxReadAtOnce_ = maxAtOnce;
161 uint32_t getMaxReadAtOnce() const {
162 return maxReadAtOnce_;
165 EventBase* getEventBase() {
169 void handlerReady(uint16_t events) noexcept override;
173 void destroy() override;
175 ~Consumer() override {}
179 * Consume messages off the the queue until
180 * - the queue is empty (1), or
181 * - until the consumer is destroyed, or
182 * - until the consumer is uninstalled, or
183 * - an exception is thrown in the course of dequeueing, or
184 * - unless isDrain is true, until the maxReadAtOnce_ limit is hit
186 * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
188 void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
190 void setActive(bool active, bool shouldLock = false) {
196 queue_->spinlock_.lock();
198 if (!active_ && active) {
199 ++queue_->numActiveConsumers_;
200 } else if (active_ && !active) {
201 --queue_->numActiveConsumers_;
205 queue_->spinlock_.unlock();
208 void init(EventBase* eventBase, NotificationQueue* queue);
210 NotificationQueue* queue_;
211 bool* destroyedFlagPtr_;
212 uint32_t maxReadAtOnce_;
217 class SimpleConsumer {
219 explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
220 ++queue_.numConsumers_;
224 --queue_.numConsumers_;
228 return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
232 NotificationQueue& queue_;
237 #ifdef FOLLY_HAVE_EVENTFD
243 * Create a new NotificationQueue.
245 * If the maxSize parameter is specified, this sets the maximum queue size
246 * that will be enforced by tryPutMessage(). (This size is advisory, and may
247 * be exceeded if producers explicitly use putMessage() instead of
250 * The fdType parameter determines the type of file descriptor used
251 * internally to signal message availability. The default (eventfd) is
252 * preferable for performance and because it won't fail when the queue gets
253 * too long. It is not available on on older and non-linux kernels, however.
254 * In this case the code will fall back to using a pipe, the parameter is
255 * mostly for testing purposes.
257 explicit NotificationQueue(uint32_t maxSize = 0,
258 #ifdef FOLLY_HAVE_EVENTFD
259 FdType fdType = FdType::EVENTFD)
261 FdType fdType = FdType::PIPE)
265 advisoryMaxQueueSize_(maxSize),
266 pid_(pid_t(getpid())),
269 RequestContext::saveContext();
271 #ifdef FOLLY_HAVE_EVENTFD
272 if (fdType == FdType::EVENTFD) {
273 eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
274 if (eventfd_ == -1) {
275 if (errno == ENOSYS || errno == EINVAL) {
276 // eventfd not availalble
277 LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
278 << errno << ", falling back to pipe mode (is your kernel "
280 fdType = FdType::PIPE;
283 folly::throwSystemError("Failed to create eventfd for "
284 "NotificationQueue", errno);
289 if (fdType == FdType::PIPE) {
290 if (pipe(pipeFds_)) {
291 folly::throwSystemError("Failed to create pipe for NotificationQueue",
295 // put both ends of the pipe into non-blocking mode
296 if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
297 folly::throwSystemError("failed to put NotificationQueue pipe read "
298 "endpoint into non-blocking mode", errno);
300 if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
301 folly::throwSystemError("failed to put NotificationQueue pipe write "
302 "endpoint into non-blocking mode", errno);
305 ::close(pipeFds_[0]);
306 ::close(pipeFds_[1]);
312 ~NotificationQueue() {
317 if (pipeFds_[0] >= 0) {
318 ::close(pipeFds_[0]);
321 if (pipeFds_[1] >= 0) {
322 ::close(pipeFds_[1]);
328 * Set the advisory maximum queue size.
330 * This maximum queue size affects calls to tryPutMessage(). Message
331 * producers can still use the putMessage() call to unconditionally put a
332 * message on the queue, ignoring the configured maximum queue size. This
333 * can cause the queue size to exceed the configured maximum.
335 void setMaxQueueSize(uint32_t max) {
336 advisoryMaxQueueSize_ = max;
340 * Attempt to put a message on the queue if the queue is not already full.
342 * If the queue is full, a std::overflow_error will be thrown. The
343 * setMaxQueueSize() function controls the maximum queue size.
345 * If the queue is currently draining, an std::runtime_error will be thrown.
347 * This method may contend briefly on a spinlock if many threads are
348 * concurrently accessing the queue, but for all intents and purposes it will
349 * immediately place the message on the queue and return.
351 * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
352 * may throw any other exception thrown by the MessageT move/copy
355 void tryPutMessage(MessageT&& message) {
356 putMessageImpl(std::move(message), advisoryMaxQueueSize_);
358 void tryPutMessage(const MessageT& message) {
359 putMessageImpl(message, advisoryMaxQueueSize_);
363 * No-throw versions of the above. Instead returns true on success, false on
366 * Only std::overflow_error (the common exception case) and std::runtime_error
367 * (which indicates that the queue is being drained) are prevented from being
368 * thrown. User code must still catch std::bad_alloc errors.
370 bool tryPutMessageNoThrow(MessageT&& message) {
371 return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
373 bool tryPutMessageNoThrow(const MessageT& message) {
374 return putMessageImpl(message, advisoryMaxQueueSize_, false);
378 * Unconditionally put a message on the queue.
380 * This method is like tryPutMessage(), but ignores the maximum queue size
381 * and always puts the message on the queue, even if the maximum queue size
384 * putMessage() may throw
385 * - std::bad_alloc if memory allocation fails, and may
386 * - std::runtime_error if the queue is currently draining
387 * - any other exception thrown by the MessageT move/copy constructor.
389 void putMessage(MessageT&& message) {
390 putMessageImpl(std::move(message), 0);
392 void putMessage(const MessageT& message) {
393 putMessageImpl(message, 0);
397 * Put several messages on the queue.
399 template<typename InputIteratorT>
400 void putMessages(InputIteratorT first, InputIteratorT last) {
401 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
403 putMessagesImpl(first, last, IterCategory());
407 * Try to immediately pull a message off of the queue, without blocking.
409 * If a message is immediately available, the result parameter will be
410 * updated to contain the message contents and true will be returned.
412 * If no message is available, false will be returned and result will be left
415 bool tryConsume(MessageT& result) {
416 SCOPE_EXIT { syncSignalAndQueue(); };
420 folly::SpinLockGuard g(spinlock_);
422 if (UNLIKELY(queue_.empty())) {
426 auto& data = queue_.front();
427 result = std::move(data.first);
428 RequestContext::setContext(std::move(data.second));
435 size_t size() const {
436 folly::SpinLockGuard g(spinlock_);
437 return queue_.size();
441 * Check that the NotificationQueue is being used from the correct process.
443 * If you create a NotificationQueue in one process, then fork, and try to
444 * send messages to the queue from the child process, you're going to have a
445 * bad time. Unfortunately users have (accidentally) run into this.
447 * Because we use an eventfd/pipe, the child process can actually signal the
448 * parent process that an event is ready. However, it can't put anything on
449 * the parent's queue, so the parent wakes up and finds an empty queue. This
450 * check ensures that we catch the problem in the misbehaving child process
451 * code, and crash before signalling the parent process.
453 void checkPid() const { CHECK_EQ(pid_, pid_t(getpid())); }
456 // Forbidden copy constructor and assignment operator
457 NotificationQueue(NotificationQueue const &) = delete;
458 NotificationQueue& operator=(NotificationQueue const &) = delete;
460 inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
461 DCHECK(0 == spinlock_.try_lock());
462 if (maxSize > 0 && queue_.size() >= maxSize) {
464 throw std::overflow_error("unable to add message to NotificationQueue: "
472 inline bool checkDraining(bool throws=true) {
473 if (UNLIKELY(draining_ && throws)) {
474 throw std::runtime_error("queue is draining, cannot add message");
480 // TODO 10860938 Remove after figuring out crash
481 mutable std::atomic<int> eventBytes_{0};
482 mutable std::atomic<int> maxEventBytes_{0};
485 void ensureSignalLocked() const {
486 // semantics: empty fd == empty queue <=> !signal_
491 ssize_t bytes_written = 0;
492 size_t bytes_expected = 0;
496 // eventfd(2) dictates that we must write a 64-bit integer
498 bytes_expected = sizeof(signal);
499 bytes_written = ::write(eventfd_, &signal, bytes_expected);
502 bytes_expected = sizeof(signal);
503 bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
505 } while (bytes_written == -1 && errno == EINTR);
508 if (bytes_written > 0) {
509 eventBytes_ += bytes_written;
510 maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
514 if (bytes_written == ssize_t(bytes_expected)) {
518 LOG(ERROR) << "NotificationQueue Write Error=" << errno
519 << " bytesInPipe=" << eventBytes_
520 << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
522 folly::throwSystemError("failed to signal NotificationQueue after "
527 void drainSignalsLocked() {
528 ssize_t bytes_read = 0;
531 bytes_read = readNoInt(eventfd_, &message, sizeof(message));
532 CHECK(bytes_read != -1 || errno == EAGAIN);
534 // There should only be one byte in the pipe. To avoid potential leaks we still drain.
537 while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
538 bytes_read += result;
540 CHECK(result == -1 && errno == EAGAIN);
541 LOG_IF(ERROR, bytes_read > 1)
542 << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
543 << bytes_read << " bytes, expected <= 1";
545 LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
546 << "[NotificationQueue] Unexpected state while draining signals: signal_="
547 << signal_ << " bytes_read=" << bytes_read;
552 if (bytes_read > 0) {
553 eventBytes_ -= bytes_read;
558 void ensureSignal() const {
559 folly::SpinLockGuard g(spinlock_);
560 ensureSignalLocked();
563 void syncSignalAndQueue() {
564 folly::SpinLockGuard g(spinlock_);
566 if (queue_.empty()) {
567 drainSignalsLocked();
569 ensureSignalLocked();
573 bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
577 folly::SpinLockGuard g(spinlock_);
578 if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
581 // We only need to signal an event if not all consumers are
583 if (numActiveConsumers_ < numConsumers_) {
586 queue_.emplace_back(std::move(message), RequestContext::saveContext());
588 ensureSignalLocked();
595 const MessageT& message, size_t maxSize, bool throws=true) {
599 folly::SpinLockGuard g(spinlock_);
600 if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
603 if (numActiveConsumers_ < numConsumers_) {
606 queue_.emplace_back(message, RequestContext::saveContext());
608 ensureSignalLocked();
614 template<typename InputIteratorT>
615 void putMessagesImpl(InputIteratorT first, InputIteratorT last,
616 std::input_iterator_tag) {
621 folly::SpinLockGuard g(spinlock_);
623 while (first != last) {
624 queue_.emplace_back(*first, RequestContext::saveContext());
628 if (numActiveConsumers_ < numConsumers_) {
632 ensureSignalLocked();
637 mutable folly::SpinLock spinlock_;
638 mutable bool signal_{false};
640 int pipeFds_[2]; // to fallback to on older/non-linux systems
641 uint32_t advisoryMaxQueueSize_;
643 std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
644 int numConsumers_{0};
645 std::atomic<int> numActiveConsumers_{0};
646 bool draining_{false};
649 template<typename MessageT>
650 void NotificationQueue<MessageT>::Consumer::destroy() {
651 // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
652 // will be non-nullptr. Mark the value that it points to, so that
653 // handlerReady() will know the callback is destroyed, and that it cannot
654 // access any member variables anymore.
655 if (destroyedFlagPtr_) {
656 *destroyedFlagPtr_ = true;
659 DelayedDestruction::destroy();
662 template<typename MessageT>
663 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
665 consumeMessages(false);
668 template<typename MessageT>
669 void NotificationQueue<MessageT>::Consumer::consumeMessages(
670 bool isDrain, size_t* numConsumed) noexcept {
671 DestructorGuard dg(this);
672 uint32_t numProcessed = 0;
676 queue_->syncSignalAndQueue();
679 SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
681 if (numConsumed != nullptr) {
682 *numConsumed = numProcessed;
686 // Now pop the message off of the queue.
688 // We have to manually acquire and release the spinlock here, rather than
689 // using SpinLockHolder since the MessageT has to be constructed while
690 // holding the spinlock and available after we release it. SpinLockHolder
691 // unfortunately doesn't provide a release() method. (We can't construct
692 // MessageT first since we have no guarantee that MessageT has a default
694 queue_->spinlock_.lock();
698 if (UNLIKELY(queue_->queue_.empty())) {
699 // If there is no message, we've reached the end of the queue, return.
701 queue_->spinlock_.unlock();
705 // Pull a message off the queue.
706 auto& data = queue_->queue_.front();
708 MessageT msg(std::move(data.first));
709 RequestContextScopeGuard rctx(std::move(data.second));
710 queue_->queue_.pop_front();
712 // Check to see if the queue is empty now.
713 // We use this as an optimization to see if we should bother trying to
714 // loop again and read another message after invoking this callback.
715 bool wasEmpty = queue_->queue_.empty();
720 // Now unlock the spinlock before we invoke the callback.
721 queue_->spinlock_.unlock();
725 bool callbackDestroyed = false;
726 CHECK(destroyedFlagPtr_ == nullptr);
727 destroyedFlagPtr_ = &callbackDestroyed;
728 messageAvailable(std::move(msg));
729 destroyedFlagPtr_ = nullptr;
731 // If the callback was destroyed before it returned, we are done
732 if (callbackDestroyed) {
736 // If the callback is no longer installed, we are done.
737 if (queue_ == nullptr) {
741 // If we have hit maxReadAtOnce_, we are done.
743 if (!isDrain && maxReadAtOnce_ > 0 &&
744 numProcessed >= maxReadAtOnce_) {
748 // If the queue was empty before we invoked the callback, it's probable
749 // that it is still empty now. Just go ahead and return, rather than
750 // looping again and trying to re-read from the eventfd. (If a new
751 // message had in fact arrived while we were invoking the callback, we
752 // will simply be woken up the next time around the event loop and will
753 // process the message then.)
757 } catch (const std::exception&) {
758 // This catch block is really just to handle the case where the MessageT
759 // constructor throws. The messageAvailable() callback itself is
760 // declared as noexcept and should never throw.
762 // If the MessageT constructor does throw we try to handle it as best as
763 // we can, but we can't work miracles. We will just ignore the error for
764 // now and return. The next time around the event loop we will end up
765 // trying to read the message again. If MessageT continues to throw we
766 // will never make forward progress and will keep trying each time around
769 // Unlock the spinlock.
770 queue_->spinlock_.unlock();
778 template<typename MessageT>
779 void NotificationQueue<MessageT>::Consumer::init(
780 EventBase* eventBase,
781 NotificationQueue* queue) {
782 assert(eventBase->isInEventBaseThread());
783 assert(queue_ == nullptr);
784 assert(!isHandlerRegistered());
792 folly::SpinLockGuard g(queue_->spinlock_);
793 queue_->numConsumers_++;
795 queue_->ensureSignal();
797 if (queue_->eventfd_ >= 0) {
798 initHandler(eventBase, queue_->eventfd_);
800 initHandler(eventBase, queue_->pipeFds_[0]);
804 template<typename MessageT>
805 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
806 if (queue_ == nullptr) {
807 assert(!isHandlerRegistered());
812 folly::SpinLockGuard g(queue_->spinlock_);
813 queue_->numConsumers_--;
817 assert(isHandlerRegistered());
823 template<typename MessageT>
824 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
825 size_t* numConsumed) noexcept {
826 DestructorGuard dg(this);
828 folly::SpinLockGuard g(queue_->spinlock_);
829 if (queue_->draining_) {
832 queue_->draining_ = true;
834 consumeMessages(true, numConsumed);
836 folly::SpinLockGuard g(queue_->spinlock_);
837 queue_->draining_ = false;
843 * Creates a NotificationQueue::Consumer wrapping a function object
844 * Modeled after AsyncTimeout::make
850 template <typename MessageT, typename TCallback>
851 struct notification_queue_consumer_wrapper
852 : public NotificationQueue<MessageT>::Consumer {
854 template <typename UCallback>
855 explicit notification_queue_consumer_wrapper(UCallback&& callback)
856 : callback_(std::forward<UCallback>(callback)) {}
858 // we are being stricter here and requiring noexcept for callback
859 void messageAvailable(MessageT&& message) noexcept override {
861 noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
862 "callback must be declared noexcept, e.g.: `[]() noexcept {}`"
865 callback_(std::forward<MessageT>(message));
872 } // namespace detail
874 template <typename MessageT>
875 template <typename TCallback>
876 std::unique_ptr<typename NotificationQueue<MessageT>::Consumer,
877 DelayedDestruction::Destructor>
878 NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
879 return std::unique_ptr<NotificationQueue<MessageT>::Consumer,
880 DelayedDestruction::Destructor>(
881 new detail::notification_queue_consumer_wrapper<
883 typename std::decay<TCallback>::type>(
884 std::forward<TCallback>(callback)));