Updates the internal representation of signals in NotificationQueue
authorTobias Ritzau <ritzau@fb.com>
Fri, 22 Apr 2016 02:04:13 +0000 (19:04 -0700)
committerFacebook Github Bot 1 <facebook-github-bot-1-bot@fb.com>
Fri, 22 Apr 2016 02:05:24 +0000 (19:05 -0700)
Summary: The signals were represented using bytes in a pipe or using a count on an event fd (when available). This count was ever growing and caused the pipe to overflow, and in both cases you would get signals on empty queues. This diff only writes to the fd if it there are no bytes to read. Due to races there can still be multiple bytes in the pipe, but overflowing should not be possible. Instead of blindly signaling when there could be messages in the queue, the signals are now synchronized with the state of the queue so that the signals are drained when the queue is empty. This also made it possible to skip the semaphore behavior of the event fd which should improve perf.

Reviewed By: dcolascione

Differential Revision: D3198252

fb-gh-sync-id: 39e620b10c254ffcacabc4c5ac36950a215d4803
fbshipit-source-id: 39e620b10c254ffcacabc4c5ac36950a215d4803

folly/io/async/NotificationQueue.h

index 7c8b2ee9684c155252ecc8b1f28f98cb7e89ae56..4714ef8d8c6e85c2b9aa552cba7de215c3bf803e 100644 (file)
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <fcntl.h>
+#include <poll.h>
 #include <sys/types.h>
 #include <unistd.h>
 
@@ -250,7 +251,7 @@ class NotificationQueue {
 
 #ifdef FOLLY_HAVE_EVENTFD
     if (fdType == FdType::EVENTFD) {
-      eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
+      eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
       if (eventfd_ == -1) {
         if (errno == ENOSYS || errno == EINVAL) {
           // eventfd not availalble
@@ -393,29 +394,21 @@ class NotificationQueue {
    * unmodified.
    */
   bool tryConsume(MessageT& result) {
+    SCOPE_EXIT { syncSignalAndQueue(); };
+
     checkPid();
 
-    try {
+    folly::SpinLockGuard g(spinlock_);
 
-      folly::SpinLockGuard g(spinlock_);
+    if (UNLIKELY(queue_.empty())) {
+      return false;
+    }
 
-      if (UNLIKELY(queue_.empty())) {
-        return false;
-      }
+    auto data = std::move(queue_.front());
+    result = data.first;
+    RequestContext::setContext(data.second);
 
-      auto data = std::move(queue_.front());
-      result = data.first;
-      RequestContext::setContext(data.second);
-
-      queue_.pop_front();
-    } catch (...) {
-      // Handle an exception if the assignment operator happens to throw.
-      // We consumed an event but weren't able to pop the message off the
-      // queue.  Signal the event again since the message is still in the
-      // queue.
-      signalEvent(1);
-      throw;
-    }
+    queue_.pop_front();
 
     return true;
   }
@@ -470,39 +463,38 @@ class NotificationQueue {
   mutable std::atomic<int> maxEventBytes_{0};
 #endif
 
-  inline void signalEvent(size_t numAdded = 1) const {
-    static const uint8_t kPipeMessage[] = {
-      1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
-    };
+  void ensureSignalLocked() const {
+    // semantics: empty fd == empty queue <=> !signal_
+    if (signal_) {
+      return;
+    }
 
     ssize_t bytes_written = 0;
     ssize_t bytes_expected = 0;
-    if (eventfd_ >= 0) {
-      // eventfd(2) dictates that we must write a 64-bit integer
-      uint64_t numAdded64(numAdded);
-      bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
-      bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
-    } else {
-      // pipe semantics, add one message for each numAdded
-      bytes_expected = numAdded;
-      do {
-        size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
-        ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
-        if (rc < 0) {
-          // TODO: if the pipe is full, write will fail with EAGAIN.
-          // See task #1044651 for how this could be handled
-          break;
-        }
-        numAdded -= rc;
-        bytes_written += rc;
-      } while (numAdded > 0);
-    }
+
+    do {
+      if (eventfd_ >= 0) {
+        // eventfd(2) dictates that we must write a 64-bit integer
+        uint64_t signal = 1;
+        bytes_expected = static_cast<ssize_t>(sizeof(signal));
+        bytes_written = ::write(eventfd_, &signal, bytes_expected);
+      } else {
+        uint8_t signal = 1;
+        bytes_expected = static_cast<ssize_t>(sizeof(signal));
+        bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
+      }
+    } while (bytes_written == -1 && errno == EINTR);
+
 #ifdef __ANDROID__
-    eventBytes_ += bytes_written;
-    maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
+    if (bytes_written > 0) {
+      eventBytes_ += bytes_written;
+      maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
+    }
 #endif
 
-    if (bytes_written != bytes_expected) {
+    if (bytes_written == bytes_expected) {
+      signal_ = true;
+    } else {
 #ifdef __ANDROID__
       LOG(ERROR) << "NotificationQueue Write Error=" << errno
                  << " bytesInPipe=" << eventBytes_
@@ -513,27 +505,50 @@ class NotificationQueue {
     }
   }
 
-  bool tryConsumeEvent() {
-    uint64_t value = 0;
-    ssize_t rc = -1;
-    if (eventfd_ >= 0) {
-      rc = readNoInt(eventfd_, &value, sizeof(value));
+  void drainSignalsLocked() {
+    ssize_t bytes_read = 0;
+    if (eventfd_ > 0) {
+      uint64_t message;
+      bytes_read = readNoInt(eventfd_, &message, sizeof(message));
+      CHECK(bytes_read != -1 || errno == EAGAIN);
     } else {
-      uint8_t value8;
-      rc = readNoInt(pipeFds_[0], &value8, sizeof(value8));
-      value = value8;
+      // There should only be one byte in the pipe. To avoid potential leaks we still drain.
+      uint8_t message[32];
+      ssize_t result;
+      while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
+        bytes_read += result;
+      }
+      CHECK(result == -1 && errno == EAGAIN);
+      LOG_IF(ERROR, bytes_read > 1)
+        << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
+        << bytes_read << " bytes, expected <= 1";
+    }
+    LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
+      << "[NotificationQueue] Unexpected state while draining signals: signal_="
+      << signal_ << " bytes_read=" << bytes_read;
+
+    signal_ = false;
+
 #ifdef __ANDROID__
-      eventBytes_ -= 1;
-#endif
+    if (bytes_read > 0) {
+      eventBytes_ -= bytes_read;
     }
-    if (rc < 0) {
-      // EAGAIN should pretty much be the only error we can ever get.
-      // This means someone else already processed the only available message.
-      CHECK_EQ(errno, EAGAIN);
-      return false;
+#endif
+  }
+
+  void ensureSignal() const {
+    folly::SpinLockGuard g(spinlock_);
+    ensureSignalLocked();
+  }
+
+  void syncSignalAndQueue() {
+    folly::SpinLockGuard g(spinlock_);
+
+    if (queue_.empty()) {
+      drainSignalsLocked();
+    } else {
+      ensureSignalLocked();
     }
-    assert(value == 1);
-    return true;
   }
 
   bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
@@ -550,9 +565,9 @@ class NotificationQueue {
         signal = true;
       }
       queue_.emplace_back(std::move(message), RequestContext::saveContext());
-    }
-    if (signal) {
-      signalEvent();
+      if (signal) {
+        ensureSignalLocked();
+      }
     }
     return true;
   }
@@ -570,9 +585,9 @@ class NotificationQueue {
         signal = true;
       }
       queue_.emplace_back(message, RequestContext::saveContext());
-    }
-    if (signal) {
-      signalEvent();
+      if (signal) {
+        ensureSignalLocked();
+      }
     }
     return true;
   }
@@ -594,13 +609,14 @@ class NotificationQueue {
       if (numActiveConsumers_ < numConsumers_) {
         signal = true;
       }
-    }
-    if (signal) {
-      signalEvent();
+      if (signal) {
+        ensureSignalLocked();
+      }
     }
   }
 
   mutable folly::SpinLock spinlock_;
+  mutable bool signal_{false};
   int eventfd_;
   int pipeFds_[2]; // to fallback to on older/non-linux systems
   uint32_t advisoryMaxQueueSize_;
@@ -635,8 +651,12 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
     bool isDrain, size_t* numConsumed) noexcept {
   DestructorGuard dg(this);
   uint32_t numProcessed = 0;
-  bool firstRun = true;
   setActive(true);
+  SCOPE_EXIT {
+    if (queue_) {
+      queue_->syncSignalAndQueue();
+    }
+  };
   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
   SCOPE_EXIT {
     if (numConsumed != nullptr) {
@@ -644,17 +664,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
     }
   };
   while (true) {
-    // Try to decrement the eventfd.
-    //
-    // The eventfd is only used to wake up the consumer - there may or
-    // may not actually be an event available (another consumer may
-    // have read it).  We don't really care, we only care about
-    // emptying the queue.
-    if (!isDrain && firstRun) {
-      queue_->tryConsumeEvent();
-      firstRun = false;
-    }
-
     // Now pop the message off of the queue.
     //
     // We have to manually acquire and release the spinlock here, rather than
@@ -717,7 +726,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       ++numProcessed;
       if (!isDrain && maxReadAtOnce_ > 0 &&
           numProcessed >= maxReadAtOnce_) {
-        queue_->signalEvent(1);
         return;
       }
 
@@ -744,12 +752,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
       if (locked) {
         // Unlock the spinlock.
         queue_->spinlock_.unlock();
-
-        // Push a notification back on the eventfd since we didn't actually
-        // read the message off of the queue.
-        if (!isDrain) {
-          queue_->signalEvent(1);
-        }
       }
 
       return;
@@ -774,7 +776,7 @@ void NotificationQueue<MessageT>::Consumer::init(
     folly::SpinLockGuard g(queue_->spinlock_);
     queue_->numConsumers_++;
   }
-  queue_->signalEvent();
+  queue_->ensureSignal();
 
   if (queue_->eventfd_ >= 0) {
     initHandler(eventBase, queue_->eventfd_);