io/RecordIO.h \
io/RecordIO-inl.h \
io/TypedIOBuf.h \
+ io/async/AsyncTimeout.h \
+ io/async/EventBase.h \
+ io/async/EventFDWrapper.h \
+ io/async/EventHandler.h \
+ io/async/EventUtil.h \
+ io/async/NotificationQueue.h \
+ io/async/Request.h \
+ io/async/TimeoutManager.h \
json.h \
Lazy.h \
Likely.h \
io/IOBuf.cpp \
io/IOBufQueue.cpp \
io/RecordIO.cpp \
+ io/async/AsyncTimeout.cpp \
+ io/async/EventBase.cpp \
+ io/async/EventHandler.cpp \
+ io/async/Request.cpp \
json.cpp \
detail/MemoryIdler.cpp \
MemoryMapping.cpp \
AC_CHECK_LIB([double-conversion],[ceil],[],[AC_MSG_ERROR(
[Please install double-conversion library])])
+AC_CHECK_LIB([event], [event_set], [], [AC_MSG_ERROR([Unable to find libevent])])
+
# Checks for typedefs, structures, and compiler characteristics.
AC_HEADER_STDBOOL
AC_C_CONST
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "folly/io/async/AsyncTimeout.h"
+#include "folly/io/async/EventBase.h"
+#include "folly/io/async/EventUtil.h"
+#include "folly/io/async/Request.h"
+
+#include <assert.h>
+#include <glog/logging.h>
+
+namespace folly {
+
+AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager)
+ : timeoutManager_(timeoutManager) {
+
+ event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+ event_.ev_base = nullptr;
+ timeoutManager_->attachTimeoutManager(
+ this,
+ TimeoutManager::InternalEnum::NORMAL);
+ RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(EventBase* eventBase)
+ : timeoutManager_(eventBase) {
+
+ event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+ event_.ev_base = nullptr;
+ timeoutManager_->attachTimeoutManager(
+ this,
+ TimeoutManager::InternalEnum::NORMAL);
+ RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager,
+ InternalEnum internal)
+ : timeoutManager_(timeoutManager) {
+
+ event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+ event_.ev_base = nullptr;
+ timeoutManager_->attachTimeoutManager(this, internal);
+ RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(EventBase* eventBase, InternalEnum internal)
+ : timeoutManager_(eventBase) {
+
+ event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+ event_.ev_base = nullptr;
+ timeoutManager_->attachTimeoutManager(this, internal);
+ RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(): timeoutManager_(nullptr) {
+ event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+ event_.ev_base = nullptr;
+ RequestContext::getStaticContext();
+}
+
+AsyncTimeout::~AsyncTimeout() {
+ cancelTimeout();
+}
+
+bool AsyncTimeout::scheduleTimeout(std::chrono::milliseconds timeout) {
+ assert(timeoutManager_ != nullptr);
+ context_ = RequestContext::saveContext();
+ return timeoutManager_->scheduleTimeout(this, timeout);
+}
+
+bool AsyncTimeout::scheduleTimeout(uint32_t milliseconds) {
+ return scheduleTimeout(std::chrono::milliseconds(milliseconds));
+}
+
+void AsyncTimeout::cancelTimeout() {
+ if (isScheduled()) {
+ timeoutManager_->cancelTimeout(this);
+ }
+}
+
+bool AsyncTimeout::isScheduled() const {
+ return EventUtil::isEventRegistered(&event_);
+}
+
+void AsyncTimeout::attachTimeoutManager(
+ TimeoutManager* timeoutManager,
+ InternalEnum internal) {
+ // This also implies no timeout is scheduled.
+ assert(timeoutManager_ == nullptr);
+ assert(timeoutManager->isInTimeoutManagerThread());
+ timeoutManager_ = timeoutManager;
+
+ timeoutManager_->attachTimeoutManager(this, internal);
+}
+
+void AsyncTimeout::attachEventBase(
+ EventBase* eventBase,
+ InternalEnum internal) {
+ attachTimeoutManager(eventBase, internal);
+}
+
+void AsyncTimeout::detachTimeoutManager() {
+ // Only allow the event base to be changed if the timeout is not
+ // currently installed.
+ if (isScheduled()) {
+ // Programmer bug. Abort the program.
+ LOG(ERROR) << "detachEventBase() called on scheduled timeout; aborting";
+ abort();
+ return;
+ }
+
+ if (timeoutManager_) {
+ timeoutManager_->detachTimeoutManager(this);
+ timeoutManager_ = nullptr;
+ }
+}
+
+void AsyncTimeout::detachEventBase() {
+ detachTimeoutManager();
+}
+
+void AsyncTimeout::libeventCallback(int fd, short events, void* arg) {
+ AsyncTimeout* timeout = reinterpret_cast<AsyncTimeout*>(arg);
+ assert(fd == -1);
+ assert(events == EV_TIMEOUT);
+
+ // double check that ev_flags gets reset when the timeout is not running
+ assert((timeout->event_.ev_flags & ~EVLIST_INTERNAL) == EVLIST_INIT);
+
+ // this can't possibly fire if timeout->eventBase_ is nullptr
+ (void) timeout->timeoutManager_->bumpHandlingTime();
+
+ auto old_ctx =
+ RequestContext::setContext(timeout->context_);
+
+ timeout->timeoutExpired();
+
+ RequestContext::setContext(old_ctx);
+}
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "folly/io/async/TimeoutManager.h"
+
+#include <boost/noncopyable.hpp>
+#include <event.h>
+#include <memory>
+
+namespace folly {
+
+class EventBase;
+class RequestContext;
+class TimeoutManager;
+
+/**
+ * AsyncTimeout is used to asynchronously wait for a timeout to occur.
+ */
+class AsyncTimeout : private boost::noncopyable {
+ public:
+ typedef TimeoutManager::InternalEnum InternalEnum;
+
+ /**
+ * Create a new AsyncTimeout object, driven by the specified TimeoutManager.
+ */
+ explicit AsyncTimeout(TimeoutManager* timeoutManager);
+ explicit AsyncTimeout(EventBase* eventBase);
+
+ /**
+ * Create a new internal AsyncTimeout object.
+ *
+ * Internal timeouts are like regular timeouts, but will not stop the
+ * TimeoutManager loop from exiting if the only remaining events are internal
+ * timeouts.
+ *
+ * This is useful for implementing fallback timeouts to abort the
+ * TimeoutManager loop if the other events have not been processed within a
+ * specified time period: if the event loop takes too long the timeout will
+ * fire and can stop the event loop. However, if all other events complete,
+ * the event loop will exit even though the internal timeout is still
+ * installed.
+ */
+ AsyncTimeout(TimeoutManager* timeoutManager, InternalEnum internal);
+ AsyncTimeout(EventBase* eventBase, InternalEnum internal);
+
+ /**
+ * Create a new AsyncTimeout object, not yet assigned to a TimeoutManager.
+ *
+ * attachEventBase() must be called prior to scheduling the timeout.
+ */
+ AsyncTimeout();
+
+ /**
+ * AsyncTimeout destructor.
+ *
+ * The timeout will be automatically cancelled if it is running.
+ */
+ virtual ~AsyncTimeout();
+
+ /**
+ * timeoutExpired() is invoked when the timeout period has expired.
+ */
+ virtual void timeoutExpired() noexcept = 0;
+
+ /**
+ * Schedule the timeout to fire in the specified number of milliseconds.
+ *
+ * After the specified number of milliseconds has elapsed, timeoutExpired()
+ * will be invoked by the TimeoutManager's main loop.
+ *
+ * If the timeout is already running, it will be rescheduled with the
+ * new timeout value.
+ *
+ * @param milliseconds The timeout duration, in milliseconds.
+ *
+ * @return Returns true if the timeout was successfully scheduled,
+ * and false if an error occurred. After an error, the timeout is
+ * always unscheduled, even if scheduleTimeout() was just
+ * rescheduling an existing timeout.
+ */
+ bool scheduleTimeout(uint32_t milliseconds);
+ bool scheduleTimeout(std::chrono::milliseconds timeout);
+
+ /**
+ * Cancel the timeout, if it is running.
+ */
+ void cancelTimeout();
+
+ /**
+ * Returns true if the timeout is currently scheduled.
+ */
+ bool isScheduled() const;
+
+ /**
+ * Attach the timeout to a TimeoutManager.
+ *
+ * This may only be called if the timeout is not currently attached to a
+ * TimeoutManager (either by using the default constructor, or by calling
+ * detachTimeoutManager()).
+ *
+ * This method must be invoked in the TimeoutManager's thread.
+ *
+ * The internal parameter specifies if this timeout should be treated as an
+ * internal event. TimeoutManager::loop() will return when there are no more
+ * non-internal events remaining.
+ */
+ void attachTimeoutManager(TimeoutManager* timeoutManager,
+ InternalEnum internal = InternalEnum::NORMAL);
+ void attachEventBase(EventBase* eventBase,
+ InternalEnum internal = InternalEnum::NORMAL);
+
+ /**
+ * Detach the timeout from its TimeoutManager.
+ *
+ * This may only be called when the timeout is not running.
+ * Once detached, the timeout may not be scheduled again until it is
+ * re-attached to a EventBase by calling attachEventBase().
+ *
+ * This method must be called from the current TimeoutManager's thread.
+ */
+ void detachTimeoutManager();
+ void detachEventBase();
+
+ /**
+ * Returns the internal handle to the event
+ */
+ struct event* getEvent() {
+ return &event_;
+ }
+
+ private:
+ static void libeventCallback(int fd, short events, void* arg);
+
+ struct event event_;
+
+ /*
+ * Store a pointer to the TimeoutManager. We only use this
+ * for some assert() statements, to make sure that AsyncTimeout is always
+ * used from the correct thread.
+ */
+ TimeoutManager* timeoutManager_;
+
+ // Save the request context for when the timeout fires.
+ std::shared_ptr<RequestContext> context_;
+};
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "folly/io/async/EventBase.h"
+
+#include "folly/io/async/NotificationQueue.h"
+
+#include <boost/static_assert.hpp>
+#include <fcntl.h>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace {
+
+using folly::Cob;
+using folly::EventBase;
+
+class Tr1FunctionLoopCallback : public EventBase::LoopCallback {
+ public:
+ explicit Tr1FunctionLoopCallback(const Cob& function)
+ : function_(function) {}
+
+ virtual void runLoopCallback() noexcept {
+ function_();
+ delete this;
+ }
+
+ private:
+ Cob function_;
+};
+
+}
+
+namespace folly {
+
+const int kNoFD = -1;
+
+/*
+ * EventBase::FunctionRunner
+ */
+
+class EventBase::FunctionRunner
+ : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
+ public:
+ void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
+
+ // In libevent2, internal events do not break the loop.
+ // Most users would expect loop(), followed by runInEventBaseThread(),
+ // to break the loop and check if it should exit or not.
+ // To have similar bejaviour to libevent1.4, tell the loop to break here.
+ // Note that loop() may still continue to loop, but it will also check the
+ // stop_ flag as well as runInLoop callbacks, etc.
+ event_base_loopbreak(getEventBase()->evb_);
+
+ if (msg.first == nullptr && msg.second == nullptr) {
+ // terminateLoopSoon() sends a null message just to
+ // wake up the loop. We can ignore these messages.
+ return;
+ }
+
+ // If function is nullptr, just log and move on
+ if (!msg.first) {
+ LOG(ERROR) << "nullptr callback registered to be run in "
+ << "event base thread";
+ return;
+ }
+
+ // The function should never throw an exception, because we have no
+ // way of knowing what sort of error handling to perform.
+ //
+ // If it does throw, log a message and abort the program.
+ try {
+ msg.first(msg.second);
+ } catch (const std::exception& ex) {
+ LOG(ERROR) << "runInEventBaseThread() function threw a "
+ << typeid(ex).name() << " exception: " << ex.what();
+ abort();
+ } catch (...) {
+ LOG(ERROR) << "runInEventBaseThread() function threw an exception";
+ abort();
+ }
+ }
+};
+
+/*
+ * EventBase::CobTimeout methods
+ */
+
+void EventBase::CobTimeout::timeoutExpired() noexcept {
+ // For now, we just swallow any exceptions that the callback threw.
+ try {
+ cob_();
+ } catch (const std::exception& ex) {
+ LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
+ << typeid(ex).name() << " exception: " << ex.what();
+ } catch (...) {
+ LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
+ << "type";
+ }
+
+ // The CobTimeout object was allocated on the heap by runAfterDelay(),
+ // so delete it now that the it has fired.
+ delete this;
+}
+
+/*
+ * EventBase methods
+ */
+
+EventBase::EventBase()
+ : runOnceCallbacks_(nullptr)
+ , stop_(false)
+ , loopThread_(0)
+ , evb_(static_cast<event_base*>(event_init()))
+ , queue_(nullptr)
+ , fnRunner_(nullptr)
+ , maxLatency_(0)
+ , avgLoopTime_(2000000)
+ , maxLatencyLoopTime_(avgLoopTime_)
+ , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
+ , latestLoopCnt_(nextLoopCnt_)
+ , startWork_(0)
+ , observer_(nullptr)
+ , observerSampleCount_(0) {
+ VLOG(5) << "EventBase(): Created.";
+ initNotificationQueue();
+ RequestContext::getStaticContext();
+}
+
+// takes ownership of the event_base
+EventBase::EventBase(event_base* evb)
+ : runOnceCallbacks_(nullptr)
+ , stop_(false)
+ , loopThread_(0)
+ , evb_(evb)
+ , queue_(nullptr)
+ , fnRunner_(nullptr)
+ , maxLatency_(0)
+ , avgLoopTime_(2000000)
+ , maxLatencyLoopTime_(avgLoopTime_)
+ , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
+ , latestLoopCnt_(nextLoopCnt_)
+ , startWork_(0)
+ , observer_(nullptr)
+ , observerSampleCount_(0) {
+ initNotificationQueue();
+ RequestContext::getStaticContext();
+}
+
+EventBase::~EventBase() {
+ // Delete any unfired CobTimeout objects, so that we don't leak memory
+ // (Note that we don't fire them. The caller is responsible for cleaning up
+ // its own data structures if it destroys the EventBase with unfired events
+ // remaining.)
+ while (!pendingCobTimeouts_.empty()) {
+ CobTimeout* timeout = &pendingCobTimeouts_.front();
+ delete timeout;
+ }
+
+ (void) runLoopCallbacks(false);
+
+ // Stop consumer before deleting NotificationQueue
+ fnRunner_->stopConsuming();
+ event_base_free(evb_);
+ VLOG(5) << "EventBase(): Destroyed.";
+}
+
+int EventBase::getNotificationQueueSize() const {
+ return queue_->size();
+}
+
+// Set smoothing coefficient for loop load average; input is # of milliseconds
+// for exp(-1) decay.
+void EventBase::setLoadAvgMsec(uint32_t ms) {
+ uint64_t us = 1000 * ms;
+ if (ms > 0) {
+ maxLatencyLoopTime_.setTimeInterval(us);
+ avgLoopTime_.setTimeInterval(us);
+ } else {
+ LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
+ }
+}
+
+void EventBase::resetLoadAvg(double value) {
+ avgLoopTime_.reset(value);
+ maxLatencyLoopTime_.reset(value);
+}
+
+static int64_t getTimeDelta(int64_t *prev) {
+ int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+ int64_t delta = now - *prev;
+ *prev = now;
+ return delta;
+}
+
+void EventBase::waitUntilRunning() {
+ while (!isRunning()) {
+ sched_yield();
+ }
+}
+
+// enters the event_base loop -- will only exit when forced to
+bool EventBase::loop() {
+ VLOG(5) << "EventBase(): Starting loop.";
+ int res = 0;
+ bool ranLoopCallbacks;
+ int nonBlocking;
+
+ loopThread_.store(pthread_self(), std::memory_order_release);
+
+#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
+ if (!name_.empty()) {
+ pthread_setname_np(pthread_self(), name_.c_str());
+ }
+#endif
+
+ int64_t prev = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+ int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ // TODO: Read stop_ atomically with an acquire barrier.
+ while (!stop_) {
+ ++nextLoopCnt_;
+
+ // nobody can add loop callbacks from within this thread if
+ // we don't have to handle anything to start with...
+ nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK);
+ res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking);
+ ranLoopCallbacks = runLoopCallbacks();
+
+ int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
+ int64_t idle = startWork_ - idleStart;
+
+ avgLoopTime_.addSample(idle, busy);
+ maxLatencyLoopTime_.addSample(idle, busy);
+
+ if (observer_) {
+ if (observerSampleCount_++ == observer_->getSampleRate()) {
+ observerSampleCount_ = 0;
+ observer_->loopSample(busy, idle);
+ }
+ }
+
+ VLOG(11) << "EventBase " << this << " did not timeout "
+ " loop time guess: " << busy + idle <<
+ " idle time: " << idle <<
+ " busy time: " << busy <<
+ " avgLoopTime: " << avgLoopTime_.get() <<
+ " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
+ " maxLatency_: " << maxLatency_ <<
+ " nothingHandledYet(): "<< nothingHandledYet();
+
+ // see if our average loop time has exceeded our limit
+ if ((maxLatency_ > 0) &&
+ (maxLatencyLoopTime_.get() > double(maxLatency_))) {
+ maxLatencyCob_();
+ // back off temporarily -- don't keep spamming maxLatencyCob_
+ // if we're only a bit over the limit
+ maxLatencyLoopTime_.dampen(0.9);
+ }
+
+ // Our loop run did real work; reset the idle timer
+ idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ // If the event loop indicate that there were no more events, and
+ // we also didn't have any loop callbacks to run, there is nothing left to
+ // do.
+ if (res != 0 && !ranLoopCallbacks) {
+ // Since Notification Queue is marked 'internal' some events may not have
+ // run. Run them manually if so, and continue looping.
+ //
+ if (getNotificationQueueSize() > 0) {
+ fnRunner_->handlerReady(0);
+ } else {
+ break;
+ }
+ }
+
+ VLOG(5) << "EventBase " << this << " loop time: " << getTimeDelta(&prev);
+ }
+ // Reset stop_ so loop() can be called again
+ stop_ = false;
+
+ if (res < 0) {
+ LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
+ return false;
+ } else if (res == 1) {
+ VLOG(5) << "EventBase: ran out of events (exiting loop)!";
+ } else if (res > 1) {
+ LOG(ERROR) << "EventBase: unknown event loop result = " << res;
+ return false;
+ }
+
+ loopThread_.store(0, std::memory_order_release);
+
+ VLOG(5) << "EventBase(): Done with loop.";
+ return true;
+}
+
+void EventBase::loopForever() {
+ // Update the notification queue event to treat it as a normal (non-internal)
+ // event. The notification queue event always remains installed, and the main
+ // loop won't exit with it installed.
+ fnRunner_->stopConsuming();
+ fnRunner_->startConsuming(this, queue_.get());
+
+ bool ret = loop();
+
+ // Restore the notification queue internal flag
+ fnRunner_->stopConsuming();
+ fnRunner_->startConsumingInternal(this, queue_.get());
+
+ if (!ret) {
+ folly::throwSystemError("error in EventBase::loopForever()");
+ }
+}
+
+bool EventBase::bumpHandlingTime() {
+ VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
+ " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
+ if(nothingHandledYet()) {
+ latestLoopCnt_ = nextLoopCnt_;
+ // set the time
+ startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
+ " (loop) startWork_ " << startWork_;
+ return true;
+ }
+ return false;
+}
+
+void EventBase::terminateLoopSoon() {
+ VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
+
+ if (!isRunning()) {
+ return;
+ }
+
+ // Set stop to true, so the event loop will know to exit.
+ // TODO: We should really use an atomic operation here with a release
+ // barrier.
+ stop_ = true;
+
+ // Call event_base_loopbreak() so that libevent will exit the next time
+ // around the loop.
+ event_base_loopbreak(evb_);
+
+ // If terminateLoopSoon() is called from another thread,
+ // the EventBase thread might be stuck waiting for events.
+ // In this case, it won't wake up and notice that stop_ is set until it
+ // receives another event. Send an empty frame to the notification queue
+ // so that the event loop will wake up even if there are no other events.
+ //
+ // We don't care about the return value of trySendFrame(). If it fails
+ // this likely means the EventBase already has lots of events waiting
+ // anyway.
+ try {
+ queue_->putMessage(std::make_pair(nullptr, nullptr));
+ } catch (...) {
+ // We don't care if putMessage() fails. This likely means
+ // the EventBase already has lots of events waiting anyway.
+ }
+}
+
+void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
+ DCHECK(isInEventBaseThread());
+ callback->cancelLoopCallback();
+ callback->context_ = RequestContext::saveContext();
+ if (runOnceCallbacks_ != nullptr && thisIteration) {
+ runOnceCallbacks_->push_back(*callback);
+ } else {
+ loopCallbacks_.push_back(*callback);
+ }
+}
+
+void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
+ DCHECK(isInEventBaseThread());
+ Tr1FunctionLoopCallback* wrapper = new Tr1FunctionLoopCallback(cob);
+ wrapper->context_ = RequestContext::saveContext();
+ if (runOnceCallbacks_ != nullptr && thisIteration) {
+ runOnceCallbacks_->push_back(*wrapper);
+ } else {
+ loopCallbacks_.push_back(*wrapper);
+ }
+}
+
+bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
+ // Send the message.
+ // It will be received by the FunctionRunner in the EventBase's thread.
+
+ // We try not to schedule nullptr callbacks
+ if (!fn) {
+ LOG(ERROR) << "EventBase " << this
+ << ": Scheduling nullptr callbacks is not allowed";
+ return false;
+ }
+
+ // Short-circuit if we are already in our event base
+ if (inRunningEventBaseThread()) {
+ runInLoop(new RunInLoopCallback(fn, arg));
+ return true;
+
+ }
+
+ try {
+ queue_->putMessage(std::make_pair(fn, arg));
+ } catch (const std::exception& ex) {
+ LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
+ << fn << "for EventBase thread: " << ex.what();
+ return false;
+ }
+
+ return true;
+}
+
+bool EventBase::runInEventBaseThread(const Cob& fn) {
+ // Short-circuit if we are already in our event base
+ if (inRunningEventBaseThread()) {
+ runInLoop(fn);
+ return true;
+ }
+
+ Cob* fnCopy;
+ // Allocate a copy of the function so we can pass it to the other thread
+ // The other thread will delete this copy once the function has been run
+ try {
+ fnCopy = new Cob(fn);
+ } catch (const std::bad_alloc& ex) {
+ LOG(ERROR) << "failed to allocate tr::function copy "
+ << "for runInEventBaseThread()";
+ return false;
+ }
+
+ if (!runInEventBaseThread(&EventBase::runTr1FunctionPtr, fnCopy)) {
+ delete fnCopy;
+ return false;
+ }
+
+ return true;
+}
+
+bool EventBase::runAfterDelay(const Cob& cob,
+ int milliseconds,
+ TimeoutManager::InternalEnum in) {
+ CobTimeout* timeout = new CobTimeout(this, cob, in);
+ if (!timeout->scheduleTimeout(milliseconds)) {
+ delete timeout;
+ return false;
+ }
+
+ pendingCobTimeouts_.push_back(*timeout);
+ return true;
+}
+
+bool EventBase::runLoopCallbacks(bool setContext) {
+ if (!loopCallbacks_.empty()) {
+ bumpHandlingTime();
+ // Swap the loopCallbacks_ list with a temporary list on our stack.
+ // This way we will only run callbacks scheduled at the time
+ // runLoopCallbacks() was invoked.
+ //
+ // If any of these callbacks in turn call runInLoop() to schedule more
+ // callbacks, those new callbacks won't be run until the next iteration
+ // around the event loop. This prevents runInLoop() callbacks from being
+ // able to start file descriptor and timeout based events.
+ LoopCallbackList currentCallbacks;
+ currentCallbacks.swap(loopCallbacks_);
+ runOnceCallbacks_ = ¤tCallbacks;
+
+ while (!currentCallbacks.empty()) {
+ LoopCallback* callback = ¤tCallbacks.front();
+ currentCallbacks.pop_front();
+ if (setContext) {
+ RequestContext::setContext(callback->context_);
+ }
+ callback->runLoopCallback();
+ }
+
+ runOnceCallbacks_ = nullptr;
+ return true;
+ }
+ return false;
+}
+
+void EventBase::initNotificationQueue() {
+ // Infinite size queue
+ queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
+
+ // We allocate fnRunner_ separately, rather than declaring it directly
+ // as a member of EventBase solely so that we don't need to include
+ // NotificationQueue.h from EventBase.h
+ fnRunner_.reset(new FunctionRunner());
+
+ // Mark this as an internal event, so event_base_loop() will return if
+ // there are no other events besides this one installed.
+ //
+ // Most callers don't care about the internal notification queue used by
+ // EventBase. The queue is always installed, so if we did count the queue as
+ // an active event, loop() would never exit with no more events to process.
+ // Users can use loopForever() if they do care about the notification queue.
+ // (This is useful for EventBase threads that do nothing but process
+ // runInEventBaseThread() notifications.)
+ fnRunner_->startConsumingInternal(this, queue_.get());
+}
+
+void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
+ expCoeff_ = -1.0/timeInterval;
+ VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
+}
+
+void EventBase::SmoothLoopTime::reset(double value) {
+ value_ = value;
+}
+
+void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
+ /*
+ * Position at which the busy sample is considered to be taken.
+ * (Allows to quickly skew our average without editing much code)
+ */
+ enum BusySamplePosition {
+ RIGHT = 0, // busy sample placed at the end of the iteration
+ CENTER = 1, // busy sample placed at the middle point of the iteration
+ LEFT = 2, // busy sample placed at the beginning of the iteration
+ };
+
+ VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
+ " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
+ " busy " << busy << " " << __PRETTY_FUNCTION__;
+ idle += oldBusyLeftover_ + busy;
+ oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
+ idle -= oldBusyLeftover_;
+
+ double coeff = exp(idle * expCoeff_);
+ value_ *= coeff;
+ value_ += (1.0 - coeff) * busy;
+}
+
+bool EventBase::nothingHandledYet() {
+ VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
+ return (nextLoopCnt_ != latestLoopCnt_);
+}
+
+/* static */
+void EventBase::runTr1FunctionPtr(Cob* fn) {
+ // The function should never throw an exception, because we have no
+ // way of knowing what sort of error handling to perform.
+ //
+ // If it does throw, log a message and abort the program.
+ try {
+ (*fn)();
+ } catch (const std::exception &ex) {
+ LOG(ERROR) << "runInEventBaseThread() std::function threw a "
+ << typeid(ex).name() << " exception: " << ex.what();
+ abort();
+ } catch (...) {
+ LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
+ abort();
+ }
+
+ // The function object was allocated by runInEventBaseThread().
+ // Delete it once it has been run.
+ delete fn;
+}
+
+EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
+ : fn_(fn)
+ , arg_(arg) {}
+
+void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
+ fn_(arg_);
+ delete this;
+}
+
+void EventBase::attachTimeoutManager(AsyncTimeout* obj,
+ InternalEnum internal) {
+
+ struct event* ev = obj->getEvent();
+ assert(ev->ev_base == nullptr);
+
+ event_base_set(getLibeventBase(), ev);
+ if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
+ // Set the EVLIST_INTERNAL flag
+ ev->ev_flags |= EVLIST_INTERNAL;
+ }
+}
+
+void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
+ cancelTimeout(obj);
+ struct event* ev = obj->getEvent();
+ ev->ev_base = nullptr;
+}
+
+bool EventBase::scheduleTimeout(AsyncTimeout* obj,
+ std::chrono::milliseconds timeout) {
+ assert(isInEventBaseThread());
+ // Set up the timeval and add the event
+ struct timeval tv;
+ tv.tv_sec = timeout.count() / 1000LL;
+ tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
+
+ struct event* ev = obj->getEvent();
+ if (event_add(ev, &tv) < 0) {
+ LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
+ return false;
+ }
+
+ return true;
+}
+
+void EventBase::cancelTimeout(AsyncTimeout* obj) {
+ assert(isInEventBaseThread());
+ struct event* ev = obj->getEvent();
+ if (EventUtil::isEventRegistered(ev)) {
+ event_del(ev);
+ }
+}
+
+void EventBase::setName(const std::string& name) {
+ assert(isInEventBaseThread());
+ name_ = name;
+#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
+ if (isRunning()) {
+ pthread_setname_np(loopThread_.load(std::memory_order_relaxed),
+ name_.c_str());
+ }
+#endif
+}
+
+const std::string& EventBase::getName() {
+ assert(isInEventBaseThread());
+ return name_;
+}
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+#include "folly/io/async/AsyncTimeout.h"
+#include "folly/io/async/TimeoutManager.h"
+#include <memory>
+#include <stack>
+#include <list>
+#include <queue>
+#include <cstdlib>
+#include <set>
+#include <utility>
+#include <boost/intrusive/list.hpp>
+#include <boost/utility.hpp>
+#include <functional>
+#include <event.h> // libevent
+#include <errno.h>
+#include <math.h>
+#include <atomic>
+
+namespace folly {
+
+typedef std::function<void()> Cob;
+template <typename MessageT>
+class NotificationQueue;
+
+class EventBaseObserver {
+ public:
+ virtual ~EventBaseObserver() {}
+
+ virtual uint32_t getSampleRate() const = 0;
+
+ virtual void loopSample(
+ int64_t busyTime, int64_t idleTime) = 0;
+};
+
+/**
+ * This class is a wrapper for all asynchronous I/O processing functionality
+ *
+ * EventBase provides a main loop that notifies EventHandler callback objects
+ * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
+ * when a specified timeout has expired. More complex, higher-level callback
+ * mechanisms can then be built on top of EventHandler and AsyncTimeout.
+ *
+ * A EventBase object can only drive an event loop for a single thread. To
+ * take advantage of multiple CPU cores, most asynchronous I/O servers have one
+ * thread per CPU, and use a separate EventBase for each thread.
+ *
+ * In general, most EventBase methods may only be called from the thread
+ * running the EventBase's loop. There are a few exceptions to this rule, for
+ * methods that are explicitly intended to allow communication with a
+ * EventBase from other threads. When it is safe to call a method from
+ * another thread it is explicitly listed in the method comments.
+ */
+class EventBase : private boost::noncopyable, public TimeoutManager {
+ public:
+ /**
+ * A callback interface to use with runInLoop()
+ *
+ * Derive from this class if you need to delay some code execution until the
+ * next iteration of the event loop. This allows you to schedule code to be
+ * invoked from the top-level of the loop, after your immediate callers have
+ * returned.
+ *
+ * If a LoopCallback object is destroyed while it is scheduled to be run in
+ * the next loop iteration, it will automatically be cancelled.
+ */
+ class LoopCallback {
+ public:
+ virtual ~LoopCallback() {}
+
+ virtual void runLoopCallback() noexcept = 0;
+ void cancelLoopCallback() {
+ hook_.unlink();
+ }
+
+ bool isLoopCallbackScheduled() const {
+ return hook_.is_linked();
+ }
+
+ private:
+ typedef boost::intrusive::list_member_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
+
+ ListHook hook_;
+
+ typedef boost::intrusive::list<
+ LoopCallback,
+ boost::intrusive::member_hook<LoopCallback, ListHook,
+ &LoopCallback::hook_>,
+ boost::intrusive::constant_time_size<false> > List;
+
+ // EventBase needs access to LoopCallbackList (and therefore to hook_)
+ friend class EventBase;
+ std::shared_ptr<RequestContext> context_;
+ };
+
+ /**
+ * Create a new EventBase object.
+ */
+ EventBase();
+
+ /**
+ * Create a new EventBase object that will use the specified libevent
+ * event_base object to drive the event loop.
+ *
+ * The EventBase will take ownership of this event_base, and will call
+ * event_base_free(evb) when the EventBase is destroyed.
+ */
+ explicit EventBase(event_base* evb);
+ ~EventBase();
+
+ /**
+ * Runs the event loop.
+ *
+ * loop() will loop waiting for I/O or timeouts and invoking EventHandler
+ * and AsyncTimeout callbacks as their events become ready. loop() will
+ * only return when there are no more events remaining to process, or after
+ * terminateLoopSoon() has been called.
+ *
+ * loop() may be called again to restart event processing after a previous
+ * call to loop() or loopForever() has returned.
+ *
+ * Returns true if the loop completed normally (if it processed all
+ * outstanding requests, or if terminateLoopSoon() was called). If an error
+ * occurs waiting for events, false will be returned.
+ */
+ bool loop();
+
+ /**
+ * Runs the event loop.
+ *
+ * loopForever() behaves like loop(), except that it keeps running even if
+ * when there are no more user-supplied EventHandlers or AsyncTimeouts
+ * registered. It will only return after terminateLoopSoon() has been
+ * called.
+ *
+ * This is useful for callers that want to wait for other threads to call
+ * runInEventBaseThread(), even when there are no other scheduled events.
+ *
+ * loopForever() may be called again to restart event processing after a
+ * previous call to loop() or loopForever() has returned.
+ *
+ * Throws a std::system_error if an error occurs.
+ */
+ void loopForever();
+
+ /**
+ * Causes the event loop to exit soon.
+ *
+ * This will cause an existing call to loop() or loopForever() to stop event
+ * processing and return, even if there are still events remaining to be
+ * processed.
+ *
+ * It is safe to call terminateLoopSoon() from another thread to cause loop()
+ * to wake up and return in the EventBase loop thread. terminateLoopSoon()
+ * may also be called from the loop thread itself (for example, a
+ * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
+ * cause the loop to exit after the callback returns.)
+ *
+ * Note that the caller is responsible for ensuring that cleanup of all event
+ * callbacks occurs properly. Since terminateLoopSoon() causes the loop to
+ * exit even when there are pending events present, there may be remaining
+ * callbacks present waiting to be invoked. If the loop is later restarted
+ * pending events will continue to be processed normally, however if the
+ * EventBase is destroyed after calling terminateLoopSoon() it is the
+ * caller's responsibility to ensure that cleanup happens properly even if
+ * some outstanding events are never processed.
+ */
+ void terminateLoopSoon();
+
+ /**
+ * Adds the given callback to a queue of things run after the current pass
+ * through the event loop completes. Note that if this callback calls
+ * runInLoop() the new callback won't be called until the main event loop
+ * has gone through a cycle.
+ *
+ * This method may only be called from the EventBase's thread. This
+ * essentially allows an event handler to schedule an additional callback to
+ * be invoked after it returns.
+ *
+ * Use runInEventBaseThread() to schedule functions from another thread.
+ *
+ * The thisIteration parameter makes this callback run in this loop
+ * iteration, instead of the next one, even if called from a
+ * runInLoop callback (normal io callbacks that call runInLoop will
+ * always run in this iteration). This was originally added to
+ * support detachEventBase, as a user callback may have called
+ * terminateLoopSoon(), but we want to make sure we detach. Also,
+ * detachEventBase almost always must be called from the base event
+ * loop to ensure the stack is unwound, since most users of
+ * EventBase are not thread safe.
+ *
+ * Ideally we would not need thisIteration, and instead just use
+ * runInLoop with loop() (instead of terminateLoopSoon).
+ */
+ void runInLoop(LoopCallback* callback, bool thisIteration = false);
+
+ /**
+ * Convenience function to call runInLoop() with a std::function.
+ *
+ * This creates a LoopCallback object to wrap the std::function, and invoke
+ * the std::function when the loop callback fires. This is slightly more
+ * expensive than defining your own LoopCallback, but more convenient in
+ * areas that aren't performance sensitive where you just want to use
+ * std::bind. (std::bind is fairly slow on even by itself.)
+ *
+ * This method may only be called from the EventBase's thread. This
+ * essentially allows an event handler to schedule an additional callback to
+ * be invoked after it returns.
+ *
+ * Use runInEventBaseThread() to schedule functions from another thread.
+ */
+ void runInLoop(const Cob& c, bool thisIteration = false);
+
+ /**
+ * Run the specified function in the EventBase's thread.
+ *
+ * This method is thread-safe, and may be called from another thread.
+ *
+ * If runInEventBaseThread() is called when the EventBase loop is not
+ * running, the function call will be delayed until the next time the loop is
+ * started.
+ *
+ * If runInEventBaseThread() returns true the function has successfully been
+ * scheduled to run in the loop thread. However, if the loop is terminated
+ * (and never later restarted) before it has a chance to run the requested
+ * function, the function may never be run at all. The caller is responsible
+ * for handling this situation correctly if they may terminate the loop with
+ * outstanding runInEventBaseThread() calls pending.
+ *
+ * If two calls to runInEventBaseThread() are made from the same thread, the
+ * functions will always be run in the order that they were scheduled.
+ * Ordering between functions scheduled from separate threads is not
+ * guaranteed.
+ *
+ * @param fn The function to run. The function must not throw any
+ * exceptions.
+ * @param arg An argument to pass to the function.
+ *
+ * @return Returns true if the function was successfully scheduled, or false
+ * if there was an error scheduling the function.
+ */
+ template<typename T>
+ bool runInEventBaseThread(void (*fn)(T*), T* arg) {
+ return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
+ reinterpret_cast<void*>(arg));
+ }
+
+ bool runInEventBaseThread(void (*fn)(void*), void* arg);
+
+ /**
+ * Run the specified function in the EventBase's thread
+ *
+ * This version of runInEventBaseThread() takes a std::function object.
+ * Note that this is less efficient than the version that takes a plain
+ * function pointer and void* argument, as it has to allocate memory to copy
+ * the std::function object.
+ *
+ * If the EventBase loop is terminated before it has a chance to run this
+ * function, the allocated memory will be leaked. The caller is responsible
+ * for ensuring that the EventBase loop is not terminated before this
+ * function can run.
+ *
+ * The function must not throw any exceptions.
+ */
+ bool runInEventBaseThread(const Cob& fn);
+
+ /**
+ * Runs the given Cob at some time after the specified number of
+ * milliseconds. (No guarantees exactly when.)
+ *
+ * @return true iff the cob was successfully registered.
+ */
+ bool runAfterDelay(
+ const Cob& c,
+ int milliseconds,
+ TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL);
+
+ /**
+ * Set the maximum desired latency in us and provide a callback which will be
+ * called when that latency is exceeded.
+ */
+ void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+ maxLatency_ = maxLatency;
+ maxLatencyCob_ = maxLatencyCob;
+ }
+
+ /**
+ * Set smoothing coefficient for loop load average; # of milliseconds
+ * for exp(-1) (1/2.71828...) decay.
+ */
+ void setLoadAvgMsec(uint32_t ms);
+
+ /**
+ * reset the load average to a desired value
+ */
+ void resetLoadAvg(double value = 0.0);
+
+ /**
+ * Get the average loop time in microseconds (an exponentially-smoothed ave)
+ */
+ double getAvgLoopTime() const {
+ return avgLoopTime_.get();
+ }
+
+ /**
+ * check if the event base loop is running.
+ */
+ bool isRunning() const {
+ return loopThread_.load(std::memory_order_relaxed) != 0;
+ }
+
+ /**
+ * wait until the event loop starts (after starting the event loop thread).
+ */
+ void waitUntilRunning();
+
+ int getNotificationQueueSize() const;
+
+ /**
+ * Verify that current thread is the EventBase thread, if the EventBase is
+ * running.
+ */
+ bool isInEventBaseThread() const {
+ auto tid = loopThread_.load(std::memory_order_relaxed);
+ return tid == 0 || pthread_equal(tid, pthread_self());
+ }
+
+ bool inRunningEventBaseThread() const {
+ return pthread_equal(
+ loopThread_.load(std::memory_order_relaxed), pthread_self());
+ }
+
+ // --------- interface to underlying libevent base ------------
+ // Avoid using these functions if possible. These functions are not
+ // guaranteed to always be present if we ever provide alternative EventBase
+ // implementations that do not use libevent internally.
+ event_base* getLibeventBase() const { return evb_; }
+ static const char* getLibeventVersion() { return event_get_version(); }
+ static const char* getLibeventMethod() { return event_get_method(); }
+
+ /**
+ * only EventHandler/AsyncTimeout subclasses and ourselves should
+ * ever call this.
+ *
+ * This is used to mark the beginning of a new loop cycle by the
+ * first handler fired within that cycle.
+ *
+ */
+ bool bumpHandlingTime();
+
+ class SmoothLoopTime {
+ public:
+ explicit SmoothLoopTime(uint64_t timeInterval)
+ : expCoeff_(-1.0/timeInterval)
+ , value_(0.0)
+ , oldBusyLeftover_(0) {
+ VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
+ }
+
+ void setTimeInterval(uint64_t timeInterval);
+ void reset(double value = 0.0);
+
+ void addSample(int64_t idle, int64_t busy);
+
+ double get() const {
+ return value_;
+ }
+
+ void dampen(double factor) {
+ value_ *= factor;
+ }
+
+ private:
+ double expCoeff_;
+ double value_;
+ int64_t oldBusyLeftover_;
+ };
+
+ void setObserver(
+ const std::shared_ptr<EventBaseObserver>& observer) {
+ observer_ = observer;
+ }
+
+ const std::shared_ptr<EventBaseObserver>& getObserver() {
+ return observer_;
+ }
+
+ /**
+ * Set the name of the thread that runs this event base.
+ */
+ void setName(const std::string& name);
+
+ /**
+ * Returns the name of the thread that runs this event base.
+ */
+ const std::string& getName();
+
+ private:
+
+ // TimeoutManager
+ void attachTimeoutManager(AsyncTimeout* obj,
+ TimeoutManager::InternalEnum internal);
+
+ void detachTimeoutManager(AsyncTimeout* obj);
+
+ bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout);
+
+ void cancelTimeout(AsyncTimeout* obj);
+
+ bool isInTimeoutManagerThread() {
+ return isInEventBaseThread();
+ }
+
+ // Helper class used to short circuit runInEventBaseThread
+ class RunInLoopCallback : public LoopCallback {
+ public:
+ RunInLoopCallback(void (*fn)(void*), void* arg);
+ void runLoopCallback() noexcept;
+
+ private:
+ void (*fn_)(void*);
+ void* arg_;
+ };
+
+ /*
+ * Helper function that tells us whether we have already handled
+ * some event/timeout/callback in this loop iteration.
+ */
+ bool nothingHandledYet();
+
+ // --------- libevent callbacks (not for client use) ------------
+
+ static void runTr1FunctionPtr(std::function<void()>* fn);
+
+ // small object used as a callback arg with enough info to execute the
+ // appropriate client-provided Cob
+ class CobTimeout : public AsyncTimeout {
+ public:
+ CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
+ : AsyncTimeout(b, in), cob_(c) {}
+
+ virtual void timeoutExpired() noexcept;
+
+ private:
+ Cob cob_;
+
+ public:
+ typedef boost::intrusive::list_member_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
+
+ ListHook hook;
+
+ typedef boost::intrusive::list<
+ CobTimeout,
+ boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
+ boost::intrusive::constant_time_size<false> > List;
+ };
+
+ typedef LoopCallback::List LoopCallbackList;
+ class FunctionRunner;
+
+ // executes any callbacks queued by runInLoop(); returns false if none found
+ bool runLoopCallbacks(bool setContext = true);
+
+ void initNotificationQueue();
+
+ CobTimeout::List pendingCobTimeouts_;
+
+ LoopCallbackList loopCallbacks_;
+
+ // This will be null most of the time, but point to currentCallbacks
+ // if we are in the middle of running loop callbacks, such that
+ // runInLoop(..., true) will always run in the current loop
+ // iteration.
+ LoopCallbackList* runOnceCallbacks_;
+
+ // stop_ is set by terminateLoopSoon() and is used by the main loop
+ // to determine if it should exit
+ bool stop_;
+
+ // The ID of the thread running the main loop.
+ // 0 if loop is not running.
+ // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
+ // even that atomic<pthread_t> is valid), but that's how it is
+ // everywhere (at least on Linux, FreeBSD, and OSX).
+ std::atomic<pthread_t> loopThread_;
+
+ // pointer to underlying event_base class doing the heavy lifting
+ event_base* evb_;
+
+ // A notification queue for runInEventBaseThread() to use
+ // to send function requests to the EventBase thread.
+ std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
+ std::unique_ptr<FunctionRunner> fnRunner_;
+
+ // limit for latency in microseconds (0 disables)
+ int64_t maxLatency_;
+
+ // exponentially-smoothed average loop time for latency-limiting
+ SmoothLoopTime avgLoopTime_;
+
+ // smoothed loop time used to invoke latency callbacks; differs from
+ // avgLoopTime_ in that it's scaled down after triggering a callback
+ // to reduce spamminess
+ SmoothLoopTime maxLatencyLoopTime_;
+
+ // callback called when latency limit is exceeded
+ Cob maxLatencyCob_;
+
+ // we'll wait this long before running deferred callbacks if the event
+ // loop is idle.
+ static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
+
+ // Wrap-around loop counter to detect beginning of each loop
+ uint64_t nextLoopCnt_;
+ uint64_t latestLoopCnt_;
+ uint64_t startWork_;
+
+ // Observer to export counters
+ std::shared_ptr<EventBaseObserver> observer_;
+ uint32_t observerSampleCount_;
+
+ // Name of the thread running this EventBase
+ std::string name_;
+};
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Work around the lack of <sys/eventfd.h> on glibc 2.5.1 which we still
+ * need to support, sigh.
+ */
+
+#pragma once
+
+#include <features.h>
+
+// <sys/eventfd.h> doesn't exist on older glibc versions
+#if (defined(__GLIBC__) && __GLIBC_PREREQ(2, 9))
+#include <sys/eventfd.h>
+#else /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */
+
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+// Use existing __NR_eventfd2 if already defined
+// Values from the Linux kernel source:
+// arch/x86/include/asm/unistd_{32,64}.h
+#ifndef __NR_eventfd2
+#if defined(__x86_64__)
+#define __NR_eventfd2 290
+#elif defined(__i386__)
+#define __NR_eventfd2 328
+#else
+#error "Can't define __NR_eventfd2 for your architecture."
+#endif
+#endif
+
+enum
+ {
+ EFD_SEMAPHORE = 1,
+#define EFD_SEMAPHORE EFD_SEMAPHORE
+ EFD_CLOEXEC = 02000000,
+#define EFD_CLOEXEC EFD_CLOEXEC
+ EFD_NONBLOCK = 04000
+#define EFD_NONBLOCK EFD_NONBLOCK
+ };
+
+// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
+// Use the eventfd2 system call, as in glibc 2.9+
+// (requires kernel 2.6.30+)
+#define eventfd(initval, flags) syscall(__NR_eventfd2,(initval),(flags))
+
+#endif /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "folly/io/async/EventHandler.h"
+#include "folly/io/async/EventBase.h"
+
+#include <assert.h>
+
+namespace folly {
+
+EventHandler::EventHandler(EventBase* eventBase, int fd) {
+ event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
+ if (eventBase != nullptr) {
+ setEventBase(eventBase);
+ } else {
+ // Callers must set the EventBase and fd before using this timeout.
+ // Set event_->ev_base to nullptr to ensure that this happens.
+ // (otherwise libevent will initialize it to the "default" event_base)
+ event_.ev_base = nullptr;
+ eventBase_ = nullptr;
+ }
+}
+
+EventHandler::~EventHandler() {
+ unregisterHandler();
+}
+
+bool EventHandler::registerImpl(uint16_t events, bool internal) {
+ assert(event_.ev_base != nullptr);
+
+ // We have to unregister the event before we can change the event flags
+ if (isHandlerRegistered()) {
+ // If the new events are the same are the same as the already registered
+ // flags, we don't have to do anything. Just return.
+ if (events == event_.ev_events &&
+ static_cast<bool>(event_.ev_flags & EVLIST_INTERNAL) == internal) {
+ return true;
+ }
+
+ event_del(&event_);
+ }
+
+ // Update the event flags
+ // Unfortunately, event_set() resets the event_base, so we have to remember
+ // it before hand, then pass it back into event_base_set() afterwards
+ struct event_base* evb = event_.ev_base;
+ event_set(&event_, event_.ev_fd, events,
+ &EventHandler::libeventCallback, this);
+ event_base_set(evb, &event_);
+
+ // Set EVLIST_INTERNAL if this is an internal event
+ if (internal) {
+ event_.ev_flags |= EVLIST_INTERNAL;
+ }
+
+ // Add the event.
+ //
+ // Although libevent allows events to wait on both I/O and a timeout,
+ // we intentionally don't allow an EventHandler to also use a timeout.
+ // Callers must maintain a separate AsyncTimeout object if they want a
+ // timeout.
+ //
+ // Otherwise, it is difficult to handle persistent events properly. (The I/O
+ // event and timeout may both fire together the same time around the event
+ // loop. Normally we would want to inform the caller of the I/O event first,
+ // then the timeout. However, it is difficult to do this properly since the
+ // I/O callback could delete the EventHandler.) Additionally, if a caller
+ // uses the same struct event for both I/O and timeout, and they just want to
+ // reschedule the timeout, libevent currently makes an epoll_ctl() call even
+ // if the I/O event flags haven't changed. Using a separate event struct is
+ // therefore slightly more efficient in this case (although it does take up
+ // more space).
+ if (event_add(&event_, nullptr) < 0) {
+ LOG(ERROR) << "EventBase: failed to register event handler for fd "
+ << event_.ev_fd << ": " << strerror(errno);
+ // Call event_del() to make sure the event is completely uninstalled
+ event_del(&event_);
+ return false;
+ }
+
+ return true;
+}
+
+void EventHandler::unregisterHandler() {
+ if (isHandlerRegistered()) {
+ event_del(&event_);
+ }
+}
+
+void EventHandler::attachEventBase(EventBase* eventBase) {
+ // attachEventBase() may only be called on detached handlers
+ assert(event_.ev_base == nullptr);
+ assert(!isHandlerRegistered());
+ // This must be invoked from the EventBase's thread
+ assert(eventBase->isInEventBaseThread());
+
+ setEventBase(eventBase);
+}
+
+void EventHandler::detachEventBase() {
+ ensureNotRegistered(__func__);
+ event_.ev_base = nullptr;
+}
+
+void EventHandler::changeHandlerFD(int fd) {
+ ensureNotRegistered(__func__);
+ // event_set() resets event_base.ev_base, so manually restore it afterwards
+ struct event_base* evb = event_.ev_base;
+ event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
+ event_.ev_base = evb; // don't use event_base_set(), since evb may be nullptr
+}
+
+void EventHandler::initHandler(EventBase* eventBase, int fd) {
+ ensureNotRegistered(__func__);
+ event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
+ setEventBase(eventBase);
+}
+
+void EventHandler::ensureNotRegistered(const char* fn) {
+ // Neither the EventBase nor file descriptor may be changed while the
+ // handler is registered. Treat it as a programmer bug and abort the program
+ // if this requirement is violated.
+ if (isHandlerRegistered()) {
+ LOG(ERROR) << fn << " called on registered handler; aborting";
+ abort();
+ }
+}
+
+void EventHandler::libeventCallback(int fd, short events, void* arg) {
+ EventHandler* handler = reinterpret_cast<EventHandler*>(arg);
+ assert(fd == handler->event_.ev_fd);
+
+ // this can't possibly fire if handler->eventBase_ is nullptr
+ (void) handler->eventBase_->bumpHandlingTime();
+
+ handler->handlerReady(events);
+}
+
+void EventHandler::setEventBase(EventBase* eventBase) {
+ event_base_set(eventBase->getLibeventBase(), &event_);
+ eventBase_ = eventBase;
+}
+
+bool EventHandler::isPending() {
+ if (event_.ev_flags & EVLIST_ACTIVE) {
+ if (event_.ev_res & EV_READ) {
+ return true;
+ }
+ }
+ return false;
+}
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <glog/logging.h>
+#include "folly/io/async/EventUtil.h"
+#include <boost/noncopyable.hpp>
+#include <stddef.h>
+
+namespace folly {
+
+class EventBase;
+
+/**
+ * The EventHandler class is used to asynchronously wait for events on a file
+ * descriptor.
+ *
+ * Users that wish to wait on I/O events should derive from EventHandler and
+ * implement the handlerReady() method.
+ */
+class EventHandler : private boost::noncopyable {
+ public:
+ enum EventFlags {
+ NONE = 0,
+ READ = EV_READ,
+ WRITE = EV_WRITE,
+ READ_WRITE = (READ | WRITE),
+ PERSIST = EV_PERSIST
+ };
+
+ /**
+ * Create a new EventHandler object.
+ *
+ * @param eventBase The EventBase to use to drive this event handler.
+ * This may be nullptr, in which case the EventBase must be
+ * set separately using initHandler() or attachEventBase()
+ * before the handler can be registered.
+ * @param fd The file descriptor that this EventHandler will
+ * monitor. This may be -1, in which case the file
+ * descriptor must be set separately using initHandler() or
+ * changeHandlerFD() before the handler can be registered.
+ */
+ explicit EventHandler(EventBase* eventBase = nullptr, int fd = -1);
+
+ /**
+ * EventHandler destructor.
+ *
+ * The event will be automatically unregistered if it is still registered.
+ */
+ virtual ~EventHandler();
+
+ /**
+ * handlerReady() is invoked when the handler is ready.
+ *
+ * @param events A bitset indicating the events that are ready.
+ */
+ virtual void handlerReady(uint16_t events) noexcept = 0;
+
+ /**
+ * Register the handler.
+ *
+ * If the handler is already registered, the registration will be updated
+ * to wait on the new set of events.
+ *
+ * @param events A bitset specifying the events to monitor.
+ * If the PERSIST bit is set, the handler will remain
+ * registered even after handlerReady() is called.
+ *
+ * @return Returns true if the handler was successfully registered,
+ * or false if an error occurred. After an error, the handler is
+ * always unregistered, even if it was already registered prior to
+ * this call to registerHandler().
+ */
+ bool registerHandler(uint16_t events) {
+ return registerImpl(events, false);
+ }
+
+ /**
+ * Unregister the handler, if it is registered.
+ */
+ void unregisterHandler();
+
+ /**
+ * Returns true if the handler is currently registered.
+ */
+ bool isHandlerRegistered() const {
+ return EventUtil::isEventRegistered(&event_);
+ }
+
+ /**
+ * Attach the handler to a EventBase.
+ *
+ * This may only be called if the handler is not currently attached to a
+ * EventBase (either by using the default constructor, or by calling
+ * detachEventBase()).
+ *
+ * This method must be invoked in the EventBase's thread.
+ */
+ void attachEventBase(EventBase* eventBase);
+
+ /**
+ * Detach the handler from its EventBase.
+ *
+ * This may only be called when the handler is not currently registered.
+ * Once detached, the handler may not be registered again until it is
+ * re-attached to a EventBase by calling attachEventBase().
+ *
+ * This method must be called from the current EventBase's thread.
+ */
+ void detachEventBase();
+
+ /**
+ * Change the file descriptor that this handler is associated with.
+ *
+ * This may only be called when the handler is not currently registered.
+ */
+ void changeHandlerFD(int fd);
+
+ /**
+ * Attach the handler to a EventBase, and change the file descriptor.
+ *
+ * This method may only be called if the handler is not currently attached to
+ * a EventBase. This is primarily intended to be used to initialize
+ * EventHandler objects created using the default constructor.
+ */
+ void initHandler(EventBase* eventBase, int fd);
+
+ /**
+ * Return the set of events that we're currently registered for.
+ */
+ uint16_t getRegisteredEvents() const {
+ return (isHandlerRegistered()) ?
+ event_.ev_events : 0;
+ }
+
+ /**
+ * Register the handler as an internal event.
+ *
+ * This event will not count as an active event for determining if the
+ * EventBase loop has more events to process. The EventBase loop runs
+ * only as long as there are active EventHandlers, however "internal" event
+ * handlers are not counted. Therefore this event handler will not prevent
+ * EventBase loop from exiting with no more work to do if there are no other
+ * non-internal event handlers registered.
+ *
+ * This is intended to be used only in very rare cases by the internal
+ * EventBase code. This API is not guaranteed to remain stable or portable
+ * in the future.
+ */
+ bool registerInternalHandler(uint16_t events) {
+ return registerImpl(events, true);
+ }
+
+ bool isPending();
+
+ private:
+ bool registerImpl(uint16_t events, bool internal);
+ void ensureNotRegistered(const char* fn);
+
+ void setEventBase(EventBase* eventBase);
+
+ static void libeventCallback(int fd, short events, void* arg);
+
+ struct event event_;
+ EventBase* eventBase_;
+};
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <event.h> // libevent
+
+namespace folly {
+
+/**
+ * low-level libevent utility functions
+ */
+class EventUtil {
+ public:
+ static bool isEventRegistered(const struct event* ev) {
+ // If any of these flags are set, the event is registered.
+ enum {
+ EVLIST_REGISTERED = (EVLIST_INSERTED | EVLIST_ACTIVE |
+ EVLIST_TIMEOUT | EVLIST_SIGNAL)
+ };
+ return (ev->ev_flags & EVLIST_REGISTERED);
+ }
+};
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "folly/io/async/EventBase.h"
+#include "folly/io/async/EventFDWrapper.h"
+#include "folly/io/async/EventHandler.h"
+#include "folly/io/async/Request.h"
+#include "folly/Likely.h"
+#include "folly/SmallLocks.h"
+
+#include <glog/logging.h>
+#include <deque>
+
+namespace folly {
+
+/**
+ * A producer-consumer queue for passing messages between EventBase threads.
+ *
+ * Messages can be added to the queue from any thread. Multiple consumers may
+ * listen to the queue from multiple EventBase threads.
+ *
+ * A NotificationQueue may not be destroyed while there are still consumers
+ * registered to receive events from the queue. It is the user's
+ * responsibility to ensure that all consumers are unregistered before the
+ * queue is destroyed.
+ *
+ * MessageT should be MoveConstructible (i.e., must support either a move
+ * constructor or a copy constructor, or both). Ideally it's move constructor
+ * (or copy constructor if no move constructor is provided) should never throw
+ * exceptions. If the constructor may throw, the consumers could end up
+ * spinning trying to move a message off the queue and failing, and then
+ * retrying.
+ */
+template<typename MessageT>
+class NotificationQueue {
+ public:
+ /**
+ * A callback interface for consuming messages from the queue as they arrive.
+ */
+ class Consumer : private EventHandler {
+ public:
+ enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
+
+ Consumer()
+ : queue_(nullptr),
+ destroyedFlagPtr_(nullptr),
+ maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
+
+ virtual ~Consumer();
+
+ /**
+ * messageAvailable() will be invoked whenever a new
+ * message is available from the pipe.
+ */
+ virtual void messageAvailable(MessageT&& message) = 0;
+
+ /**
+ * Begin consuming messages from the specified queue.
+ *
+ * messageAvailable() will be called whenever a message is available. This
+ * consumer will continue to consume messages until stopConsuming() is
+ * called.
+ *
+ * A Consumer may only consume messages from a single NotificationQueue at
+ * a time. startConsuming() should not be called if this consumer is
+ * already consuming.
+ */
+ void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
+ init(eventBase, queue);
+ registerHandler(READ | PERSIST);
+ }
+
+ /**
+ * Same as above but registers this event handler as internal so that it
+ * doesn't count towards the pending reader count for the IOLoop.
+ */
+ void startConsumingInternal(
+ EventBase* eventBase, NotificationQueue* queue) {
+ init(eventBase, queue);
+ registerInternalHandler(READ | PERSIST);
+ }
+
+ /**
+ * Stop consuming messages.
+ *
+ * startConsuming() may be called again to resume consumption of messages
+ * at a later point in time.
+ */
+ void stopConsuming();
+
+ /**
+ * Get the NotificationQueue that this consumer is currently consuming
+ * messages from. Returns nullptr if the consumer is not currently
+ * consuming events from any queue.
+ */
+ NotificationQueue* getCurrentQueue() const {
+ return queue_;
+ }
+
+ /**
+ * Set a limit on how many messages this consumer will read each iteration
+ * around the event loop.
+ *
+ * This helps rate-limit how much work the Consumer will do each event loop
+ * iteration, to prevent it from starving other event handlers.
+ *
+ * A limit of 0 means no limit will be enforced. If unset, the limit
+ * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
+ */
+ void setMaxReadAtOnce(uint32_t maxAtOnce) {
+ maxReadAtOnce_ = maxAtOnce;
+ }
+ uint32_t getMaxReadAtOnce() const {
+ return maxReadAtOnce_;
+ }
+
+ EventBase* getEventBase() {
+ return base_;
+ }
+
+ virtual void handlerReady(uint16_t events) noexcept;
+
+ private:
+ void init(EventBase* eventBase, NotificationQueue* queue);
+
+ NotificationQueue* queue_;
+ bool* destroyedFlagPtr_;
+ uint32_t maxReadAtOnce_;
+ EventBase* base_;
+ };
+
+ enum class FdType {
+ EVENTFD,
+ PIPE
+ };
+
+ /**
+ * Create a new NotificationQueue.
+ *
+ * If the maxSize parameter is specified, this sets the maximum queue size
+ * that will be enforced by tryPutMessage(). (This size is advisory, and may
+ * be exceeded if producers explicitly use putMessage() instead of
+ * tryPutMessage().)
+ *
+ * The fdType parameter determines the type of file descriptor used
+ * internally to signal message availability. The default (eventfd) is
+ * preferable for performance and because it won't fail when the queue gets
+ * too long. It is not available on on older and non-linux kernels, however.
+ * In this case the code will fall back to using a pipe, the parameter is
+ * mostly for testing purposes.
+ */
+ explicit NotificationQueue(uint32_t maxSize = 0,
+ FdType fdType = FdType::EVENTFD)
+ : eventfd_(-1),
+ pipeFds_{-1, -1},
+ advisoryMaxQueueSize_(maxSize),
+ pid_(getpid()),
+ queue_() {
+
+ spinlock_.init();
+
+ RequestContext::getStaticContext();
+
+ if (fdType == FdType::EVENTFD) {
+ eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
+ if (eventfd_ == -1) {
+ if (errno == ENOSYS || errno == EINVAL) {
+ // eventfd not availalble
+ LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
+ << errno << ", falling back to pipe mode (is your kernel "
+ << "> 2.6.30?)";
+ fdType = FdType::PIPE;
+ } else {
+ // some other error
+ folly::throwSystemError("Failed to create eventfd for "
+ "NotificationQueue", errno);
+ }
+ }
+ }
+ if (fdType == FdType::PIPE) {
+ if (pipe(pipeFds_)) {
+ folly::throwSystemError("Failed to create pipe for NotificationQueue",
+ errno);
+ }
+ try {
+ // put both ends of the pipe into non-blocking mode
+ if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
+ folly::throwSystemError("failed to put NotificationQueue pipe read "
+ "endpoint into non-blocking mode", errno);
+ }
+ if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
+ folly::throwSystemError("failed to put NotificationQueue pipe write "
+ "endpoint into non-blocking mode", errno);
+ }
+ } catch (...) {
+ ::close(pipeFds_[0]);
+ ::close(pipeFds_[1]);
+ throw;
+ }
+ }
+ }
+
+ ~NotificationQueue() {
+ if (eventfd_ >= 0) {
+ ::close(eventfd_);
+ eventfd_ = -1;
+ }
+ if (pipeFds_[0] >= 0) {
+ ::close(pipeFds_[0]);
+ pipeFds_[0] = -1;
+ }
+ if (pipeFds_[1] >= 0) {
+ ::close(pipeFds_[1]);
+ pipeFds_[1] = -1;
+ }
+ }
+
+ /**
+ * Set the advisory maximum queue size.
+ *
+ * This maximum queue size affects calls to tryPutMessage(). Message
+ * producers can still use the putMessage() call to unconditionally put a
+ * message on the queue, ignoring the configured maximum queue size. This
+ * can cause the queue size to exceed the configured maximum.
+ */
+ void setMaxQueueSize(uint32_t max) {
+ advisoryMaxQueueSize_ = max;
+ }
+
+ /**
+ * Attempt to put a message on the queue if the queue is not already full.
+ *
+ * If the queue is full, a std::overflow_error will be thrown. The
+ * setMaxQueueSize() function controls the maximum queue size.
+ *
+ * This method may contend briefly on a spinlock if many threads are
+ * concurrently accessing the queue, but for all intents and purposes it will
+ * immediately place the message on the queue and return.
+ *
+ * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
+ * may throw any other exception thrown by the MessageT move/copy
+ * constructor.
+ */
+ void tryPutMessage(MessageT&& message) {
+ putMessageImpl(std::move(message), advisoryMaxQueueSize_);
+ }
+ void tryPutMessage(const MessageT& message) {
+ putMessageImpl(message, advisoryMaxQueueSize_);
+ }
+
+ /**
+ * No-throw versions of the above. Instead returns true on success, false on
+ * failure.
+ *
+ * Only std::overflow_error is prevented from being thrown (since this is the
+ * common exception case), user code must still catch std::bad_alloc errors.
+ */
+ bool tryPutMessageNoThrow(MessageT&& message) {
+ return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
+ }
+ bool tryPutMessageNoThrow(const MessageT& message) {
+ return putMessageImpl(message, advisoryMaxQueueSize_, false);
+ }
+
+ /**
+ * Unconditionally put a message on the queue.
+ *
+ * This method is like tryPutMessage(), but ignores the maximum queue size
+ * and always puts the message on the queue, even if the maximum queue size
+ * would be exceeded.
+ *
+ * putMessage() may throw std::bad_alloc if memory allocation fails, and may
+ * throw any other exception thrown by the MessageT move/copy constructor.
+ */
+ void putMessage(MessageT&& message) {
+ putMessageImpl(std::move(message), 0);
+ }
+ void putMessage(const MessageT& message) {
+ putMessageImpl(message, 0);
+ }
+
+ /**
+ * Put several messages on the queue.
+ */
+ template<typename InputIteratorT>
+ void putMessages(InputIteratorT first, InputIteratorT last) {
+ typedef typename std::iterator_traits<InputIteratorT>::iterator_category
+ IterCategory;
+ putMessagesImpl(first, last, IterCategory());
+ }
+
+ /**
+ * Try to immediately pull a message off of the queue, without blocking.
+ *
+ * If a message is immediately available, the result parameter will be
+ * updated to contain the message contents and true will be returned.
+ *
+ * If no message is available, false will be returned and result will be left
+ * unmodified.
+ */
+ bool tryConsume(MessageT& result) {
+ checkPid();
+ if (!tryConsumeEvent()) {
+ return false;
+ }
+
+ try {
+
+ folly::MSLGuard g(spinlock_);
+
+ // This shouldn't happen normally. See the comments in
+ // Consumer::handlerReady() for more details.
+ if (UNLIKELY(queue_.empty())) {
+ LOG(ERROR) << "found empty queue after signalled event";
+ return false;
+ }
+
+ auto data = std::move(queue_.front());
+ result = data.first;
+ RequestContext::setContext(data.second);
+
+ queue_.pop_front();
+ } catch (...) {
+ // Handle an exception if the assignment operator happens to throw.
+ // We consumed an event but weren't able to pop the message off the
+ // queue. Signal the event again since the message is still in the
+ // queue.
+ signalEvent(1);
+ throw;
+ }
+
+ return true;
+ }
+
+ int size() {
+ folly::MSLGuard g(spinlock_);
+ return queue_.size();
+ }
+
+ /**
+ * Check that the NotificationQueue is being used from the correct process.
+ *
+ * If you create a NotificationQueue in one process, then fork, and try to
+ * send messages to the queue from the child process, you're going to have a
+ * bad time. Unfortunately users have (accidentally) run into this.
+ *
+ * Because we use an eventfd/pipe, the child process can actually signal the
+ * parent process that an event is ready. However, it can't put anything on
+ * the parent's queue, so the parent wakes up and finds an empty queue. This
+ * check ensures that we catch the problem in the misbehaving child process
+ * code, and crash before signalling the parent process.
+ */
+ void checkPid() const {
+ CHECK_EQ(pid_, getpid());
+ }
+
+ private:
+ // Forbidden copy constructor and assignment operator
+ NotificationQueue(NotificationQueue const &) = delete;
+ NotificationQueue& operator=(NotificationQueue const &) = delete;
+
+ inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
+ DCHECK(0 == spinlock_.try_lock());
+ if (maxSize > 0 && queue_.size() >= maxSize) {
+ if (throws) {
+ throw std::overflow_error("unable to add message to NotificationQueue: "
+ "queue is full");
+ }
+ return false;
+ }
+ return true;
+ }
+
+ inline void signalEvent(size_t numAdded = 1) const {
+ static const uint8_t kPipeMessage[] = {
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
+ };
+
+ ssize_t bytes_written = 0;
+ ssize_t bytes_expected = 0;
+ if (eventfd_ >= 0) {
+ // eventfd(2) dictates that we must write a 64-bit integer
+ uint64_t numAdded64(numAdded);
+ bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
+ bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
+ } else {
+ // pipe semantics, add one message for each numAdded
+ bytes_expected = numAdded;
+ do {
+ size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
+ ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
+ if (rc < 0) {
+ // TODO: if the pipe is full, write will fail with EAGAIN.
+ // See task #1044651 for how this could be handled
+ break;
+ }
+ numAdded -= rc;
+ bytes_written += rc;
+ } while (numAdded > 0);
+ }
+ if (bytes_written != bytes_expected) {
+ folly::throwSystemError("failed to signal NotificationQueue after "
+ "write", errno);
+ }
+ }
+
+ bool tryConsumeEvent() {
+ uint64_t value = 0;
+ ssize_t rc = -1;
+ if (eventfd_ >= 0) {
+ rc = ::read(eventfd_, &value, sizeof(value));
+ } else {
+ uint8_t value8;
+ rc = ::read(pipeFds_[0], &value8, sizeof(value8));
+ value = value8;
+ }
+ if (rc < 0) {
+ // EAGAIN should pretty much be the only error we can ever get.
+ // This means someone else already processed the only available message.
+ assert(errno == EAGAIN);
+ return false;
+ }
+ assert(value == 1);
+ return true;
+ }
+
+ bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
+ checkPid();
+ {
+ folly::MSLGuard g(spinlock_);
+ if (!checkQueueSize(maxSize, throws)) {
+ return false;
+ }
+ queue_.push_back(
+ std::make_pair(std::move(message),
+ RequestContext::saveContext()));
+ }
+ signalEvent();
+ return true;
+ }
+
+ bool putMessageImpl(
+ const MessageT& message, size_t maxSize, bool throws=true) {
+ checkPid();
+ {
+ folly::MSLGuard g(spinlock_);
+ if (!checkQueueSize(maxSize, throws)) {
+ return false;
+ }
+ queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
+ }
+ signalEvent();
+ return true;
+ }
+
+ template<typename InputIteratorT>
+ void putMessagesImpl(InputIteratorT first, InputIteratorT last,
+ std::input_iterator_tag) {
+ checkPid();
+ size_t numAdded = 0;
+ {
+ folly::MSLGuard g(spinlock_);
+ while (first != last) {
+ queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
+ ++first;
+ ++numAdded;
+ }
+ }
+ signalEvent(numAdded);
+ }
+
+ mutable folly::MicroSpinLock spinlock_;
+ int eventfd_;
+ int pipeFds_[2]; // to fallback to on older/non-linux systems
+ uint32_t advisoryMaxQueueSize_;
+ pid_t pid_;
+ std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
+};
+
+template<typename MessageT>
+NotificationQueue<MessageT>::Consumer::~Consumer() {
+ // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
+ // will be non-nullptr. Mark the value that it points to, so that
+ // handlerReady() will know the callback is destroyed, and that it cannot
+ // access any member variables anymore.
+ if (destroyedFlagPtr_) {
+ *destroyedFlagPtr_ = true;
+ }
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
+ noexcept {
+ uint32_t numProcessed = 0;
+ while (true) {
+ // Try to decrement the eventfd.
+ //
+ // We decrement the eventfd before checking the queue, and only pop a
+ // message off the queue if we read from the eventfd.
+ //
+ // Reading the eventfd first allows us to not have to hold the spinlock
+ // while accessing the eventfd. If we popped from the queue first, we
+ // would have to hold the lock while reading from or writing to the
+ // eventfd. (Multiple consumers may be woken up from a single eventfd
+ // notification. If we popped from the queue first, we could end up
+ // popping a message from the queue before the eventfd has been notified by
+ // the producer, unless the consumer and producer both held the spinlock
+ // around the entire operation.)
+ if (!queue_->tryConsumeEvent()) {
+ // no message available right now
+ return;
+ }
+
+ // Now pop the message off of the queue.
+ // We successfully consumed the eventfd notification.
+ // There should be a message available for us to consume.
+ //
+ // We have to manually acquire and release the spinlock here, rather than
+ // using SpinLockHolder since the MessageT has to be constructed while
+ // holding the spinlock and available after we release it. SpinLockHolder
+ // unfortunately doesn't provide a release() method. (We can't construct
+ // MessageT first since we have no guarantee that MessageT has a default
+ // constructor.
+ queue_->spinlock_.lock();
+ bool locked = true;
+
+ try {
+ // The eventfd is incremented once for every message, and only
+ // decremented when a message is popped off. There should always be a
+ // message here to read.
+ if (UNLIKELY(queue_->queue_.empty())) {
+ // Unfortunately we have seen this happen in practice if a user forks
+ // the process, and then the child tries to send a message to a
+ // NotificationQueue being monitored by a thread in the parent.
+ // The child can signal the parent via the eventfd, but won't have been
+ // able to put anything on the parent's queue since it has a separate
+ // address space.
+ //
+ // This is a bug in the sender's code. putMessagesImpl() should cause
+ // the sender to crash now before trying to send a message from the
+ // wrong process. However, just in case let's handle this case in the
+ // consumer without crashing.
+ LOG(ERROR) << "found empty queue after signalled event";
+ queue_->spinlock_.unlock();
+ return;
+ }
+
+ // Pull a message off the queue.
+ auto& data = queue_->queue_.front();
+
+ MessageT msg(std::move(data.first));
+ auto old_ctx =
+ RequestContext::setContext(data.second);
+ queue_->queue_.pop_front();
+
+ // Check to see if the queue is empty now.
+ // We use this as an optimization to see if we should bother trying to
+ // loop again and read another message after invoking this callback.
+ bool wasEmpty = queue_->queue_.empty();
+
+ // Now unlock the spinlock before we invoke the callback.
+ queue_->spinlock_.unlock();
+ locked = false;
+
+ // Call the callback
+ bool callbackDestroyed = false;
+ CHECK(destroyedFlagPtr_ == nullptr);
+ destroyedFlagPtr_ = &callbackDestroyed;
+ messageAvailable(std::move(msg));
+
+ RequestContext::setContext(old_ctx);
+
+ // If the callback was destroyed before it returned, we are done
+ if (callbackDestroyed) {
+ return;
+ }
+ destroyedFlagPtr_ = nullptr;
+
+ // If the callback is no longer installed, we are done.
+ if (queue_ == nullptr) {
+ return;
+ }
+
+ // If we have hit maxReadAtOnce_, we are done.
+ ++numProcessed;
+ if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+ return;
+ }
+
+ // If the queue was empty before we invoked the callback, it's probable
+ // that it is still empty now. Just go ahead and return, rather than
+ // looping again and trying to re-read from the eventfd. (If a new
+ // message had in fact arrived while we were invoking the callback, we
+ // will simply be woken up the next time around the event loop and will
+ // process the message then.)
+ if (wasEmpty) {
+ return;
+ }
+ } catch (const std::exception& ex) {
+ // This catch block is really just to handle the case where the MessageT
+ // constructor throws. The messageAvailable() callback itself is
+ // declared as noexcept and should never throw.
+ //
+ // If the MessageT constructor does throw we try to handle it as best as
+ // we can, but we can't work miracles. We will just ignore the error for
+ // now and return. The next time around the event loop we will end up
+ // trying to read the message again. If MessageT continues to throw we
+ // will never make forward progress and will keep trying each time around
+ // the event loop.
+ if (locked) {
+ // Unlock the spinlock.
+ queue_->spinlock_.unlock();
+
+ // Push a notification back on the eventfd since we didn't actually
+ // read the message off of the queue.
+ queue_->signalEvent(1);
+ }
+
+ return;
+ }
+ }
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::init(
+ EventBase* eventBase,
+ NotificationQueue* queue) {
+ assert(eventBase->isInEventBaseThread());
+ assert(queue_ == nullptr);
+ assert(!isHandlerRegistered());
+ queue->checkPid();
+
+ base_ = eventBase;
+
+ queue_ = queue;
+ if (queue_->eventfd_ >= 0) {
+ initHandler(eventBase, queue_->eventfd_);
+ } else {
+ initHandler(eventBase, queue_->pipeFds_[0]);
+ }
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::stopConsuming() {
+ if (queue_ == nullptr) {
+ assert(!isHandlerRegistered());
+ return;
+ }
+
+ assert(isHandlerRegistered());
+ unregisterHandler();
+ detachEventBase();
+ queue_ = nullptr;
+}
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "folly/io/async/Request.h"
+
+#ifndef NO_LIB_GFLAGS
+ DEFINE_bool(enable_request_context, true,
+ "Enable collection of per-request queueing stats for thrift");
+#endif
+
+namespace folly {
+
+#ifdef NO_LIB_GFLAGS
+ bool FLAGS_enable_thrift_request_context = true;
+#endif
+
+RequestContext* defaultContext;
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+#include <memory>
+#include <glog/logging.h>
+#include "folly/ThreadLocal.h"
+#include "folly/RWSpinLock.h"
+
+/**
+ * In many cases this header is included as a
+ * dependency to libraries which do not need
+ * command line flags. GFLAGS is a large binary
+ * and thus we do this so that a library which
+ * is size sensitive doesn't have to pull in
+ * GFLAGS if it doesn't want to.
+ */
+#ifndef NO_LIB_GFLAGS
+ #include <gflags/gflags.h>
+ DECLARE_bool(enable_request_context);
+#endif
+
+namespace folly {
+
+#ifdef NO_LIB_GFLAGS
+ extern bool FLAGS_enable_request_context;
+#endif
+
+// Some request context that follows an async request through a process
+// Everything in the context must be thread safe
+
+class RequestData {
+ public:
+ virtual ~RequestData() {}
+};
+
+class RequestContext;
+
+// If you do not call create() to create a unique request context,
+// this default request context will always be returned, and is never
+// copied between threads.
+extern RequestContext* defaultContext;
+
+class RequestContext {
+ public:
+ // Create a unique requext context for this request.
+ // It will be passed between queues / threads (where implemented),
+ // so it should be valid for the lifetime of the request.
+ static bool create() {
+ if(!FLAGS_enable_request_context) {
+ return false;
+ }
+ bool prev = getStaticContext().get() != nullptr;
+ getStaticContext().reset(new std::shared_ptr<RequestContext>(
+ std::make_shared<RequestContext>()));
+ return prev;
+ }
+
+ // Get the current context.
+ static RequestContext* get() {
+ if (!FLAGS_enable_request_context ||
+ getStaticContext().get() == nullptr) {
+ if (defaultContext == nullptr) {
+ defaultContext = new RequestContext;
+ }
+ return defaultContext;
+ }
+ return getStaticContext().get()->get();
+ }
+
+ // The following API may be used to set per-request data in a thread-safe way.
+ // This access is still performance sensitive, so please ask if you need help
+ // profiling any use of these functions.
+ void setContextData(
+ const std::string& val, std::unique_ptr<RequestData> data) {
+ if (!FLAGS_enable_request_context) {
+ return;
+ }
+
+ folly::RWSpinLock::WriteHolder guard(lock);
+ if (data_.find(val) != data_.end()) {
+ LOG_FIRST_N(WARNING, 1) <<
+ "Called RequestContext::setContextData with data already set";
+
+ data_[val] = nullptr;
+ } else {
+ data_[val] = std::move(data);
+ }
+ }
+
+ bool hasContextData(const std::string& val) {
+ folly::RWSpinLock::ReadHolder guard(lock);
+ return data_.find(val) != data_.end();
+ }
+
+ RequestData* getContextData(const std::string& val) {
+ folly::RWSpinLock::ReadHolder guard(lock);
+ auto r = data_.find(val);
+ if (r == data_.end()) {
+ return nullptr;
+ } else {
+ return r->second.get();
+ }
+ }
+
+ void clearContextData(const std::string& val) {
+ folly::RWSpinLock::WriteHolder guard(lock);
+ data_.erase(val);
+ }
+
+ // The following API is used to pass the context through queues / threads.
+ // saveContext is called to geta shared_ptr to the context, and
+ // setContext is used to reset it on the other side of the queue.
+ //
+ // A shared_ptr is used, because many request may fan out across
+ // multiple threads, or do post-send processing, etc.
+
+ static std::shared_ptr<RequestContext>
+ setContext(std::shared_ptr<RequestContext> ctx) {
+ if (FLAGS_enable_request_context) {
+ std::shared_ptr<RequestContext> old_ctx;
+ if (getStaticContext().get()) {
+ old_ctx = *getStaticContext().get();
+ }
+ if (ctx == nullptr) {
+ getStaticContext().reset(nullptr);
+ } else {
+ getStaticContext().reset(new std::shared_ptr<RequestContext>(ctx));
+ }
+ return old_ctx;
+ }
+ return std::shared_ptr<RequestContext>();
+ }
+
+ static std::shared_ptr<RequestContext> saveContext() {
+ if (!FLAGS_enable_request_context) {
+ return std::shared_ptr<RequestContext>();
+ }
+ if (getStaticContext().get() == nullptr) {
+ return std::shared_ptr<RequestContext>();
+ } else {
+ return *getStaticContext().get();
+ }
+ }
+
+ // Used to solve static destruction ordering issue. Any static object
+ // that uses RequestContext must call this function in its constructor.
+ //
+ // See below link for more details.
+ // http://stackoverflow.com/questions/335369/
+ // finding-c-static-initialization-order-problems#335746
+ static folly::ThreadLocalPtr<std::shared_ptr<RequestContext>>&
+ getStaticContext() {
+ static folly::ThreadLocalPtr<std::shared_ptr<RequestContext> > context;
+ return context;
+ }
+
+ private:
+ folly::RWSpinLock lock;
+ std::map<std::string, std::unique_ptr<RequestData>> data_;
+};
+
+/**
+ * Set the request context for a specific scope. For example,
+ * if you ran a part of a request in another thread you could
+ * use RequestContextGuard to copy apply the request context
+ * inside the other therad.
+ */
+class RequestContextGuard {
+ public:
+ explicit RequestContextGuard(std::shared_ptr<RequestContext> ctx) {
+ oldctx_ = RequestContext::setContext(std::move(ctx));
+ }
+
+ ~RequestContextGuard() {
+ RequestContext::setContext(std::move(oldctx_));
+ }
+
+ private:
+ std::shared_ptr<RequestContext> oldctx_;
+};
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <stdint.h>
+
+namespace folly {
+
+class AsyncTimeout;
+
+/**
+ * Base interface to be implemented by all classes expecting to manage
+ * timeouts. AsyncTimeout will use implementations of this interface
+ * to schedule/cancel timeouts.
+ */
+class TimeoutManager {
+ public:
+ enum class InternalEnum {
+ INTERNAL,
+ NORMAL
+ };
+
+ virtual ~TimeoutManager() {}
+
+ /**
+ * Attaches/detaches TimeoutManager to AsyncTimeout
+ */
+ virtual void attachTimeoutManager(AsyncTimeout* obj,
+ InternalEnum internal) = 0;
+ virtual void detachTimeoutManager(AsyncTimeout* obj) = 0;
+
+ /**
+ * Schedules AsyncTimeout to fire after `timeout` milliseconds
+ */
+ virtual bool scheduleTimeout(AsyncTimeout* obj,
+ std::chrono::milliseconds timeout) = 0;
+
+ /**
+ * Cancels the AsyncTimeout, if scheduled
+ */
+ virtual void cancelTimeout(AsyncTimeout* obj) = 0;
+
+ /**
+ * This is used to mark the beginning of a new loop cycle by the
+ * first handler fired within that cycle.
+ */
+ virtual bool bumpHandlingTime() = 0;
+
+ /**
+ * Helper method to know whether we are running in the timeout manager
+ * thread
+ */
+ virtual bool isInTimeoutManagerThread() = 0;
+};
+
+} // folly