namespace {
-using folly::Cob;
using folly::EventBase;
-template <typename Callback>
class FunctionLoopCallback : public EventBase::LoopCallback {
public:
- explicit FunctionLoopCallback(Cob&& function)
+ explicit FunctionLoopCallback(EventBase::Func&& function)
: function_(std::move(function)) {}
- explicit FunctionLoopCallback(const Cob& function) : function_(function) {}
-
void runLoopCallback() noexcept override {
function_();
delete this;
}
private:
- Callback function_;
+ EventBase::Func function_;
};
}
* EventBase::FunctionRunner
*/
-class EventBase::FunctionRunner : public NotificationQueue<Cob>::Consumer {
+class EventBase::FunctionRunner
+ : public NotificationQueue<EventBase::Func>::Consumer {
public:
- void messageAvailable(Cob&& msg) override {
-
+ void messageAvailable(Func&& msg) override {
// In libevent2, internal events do not break the loop.
// Most users would expect loop(), followed by runInEventBaseThread(),
// to break the loop and check if it should exit or not.
}
}
-void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
- DCHECK(isInEventBaseThread());
- auto wrapper = new FunctionLoopCallback<Cob>(cob);
- wrapper->context_ = RequestContext::saveContext();
- if (runOnceCallbacks_ != nullptr && thisIteration) {
- runOnceCallbacks_->push_back(*wrapper);
- } else {
- loopCallbacks_.push_back(*wrapper);
- }
-}
-
-void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
+void EventBase::runInLoop(Func cob, bool thisIteration) {
DCHECK(isInEventBaseThread());
- auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
+ auto wrapper = new FunctionLoopCallback(std::move(cob));
wrapper->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
runOnceCallbacks_->push_back(*wrapper);
}
}
-void EventBase::runAfterDrain(Cob&& cob) {
- auto callback = new FunctionLoopCallback<Cob>(std::move(cob));
+void EventBase::runAfterDrain(Func cob) {
+ auto callback = new FunctionLoopCallback(std::move(cob));
std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_);
callback->cancelLoopCallback();
runAfterDrainCallbacks_.push_back(*callback);
runBeforeLoopCallbacks_.push_back(*callback);
}
-bool EventBase::runInEventBaseThread(const Cob& fn) {
+bool EventBase::runInEventBaseThread(Func fn) {
// Send the message.
// It will be received by the FunctionRunner in the EventBase's thread.
// Short-circuit if we are already in our event base
if (inRunningEventBaseThread()) {
- runInLoop(fn);
+ runInLoop(std::move(fn));
return true;
}
try {
- queue_->putMessage(fn);
+ queue_->putMessage(std::move(fn));
} catch (const std::exception& ex) {
LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
<< "for EventBase thread: " << ex.what();
return true;
}
-bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
+bool EventBase::runInEventBaseThreadAndWait(Func fn) {
if (inRunningEventBaseThread()) {
LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed";
return true;
}
-bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) {
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
if (isInEventBaseThread()) {
fn();
return true;
} else {
- return runInEventBaseThreadAndWait(fn);
+ return runInEventBaseThreadAndWait(std::move(fn));
}
}
-void EventBase::runAfterDelay(const Cob& cob,
- uint32_t milliseconds,
- TimeoutManager::InternalEnum in) {
- if (!tryRunAfterDelay(cob, milliseconds, in)) {
+void EventBase::runAfterDelay(
+ Func cob,
+ uint32_t milliseconds,
+ TimeoutManager::InternalEnum in) {
+ if (!tryRunAfterDelay(std::move(cob), milliseconds, in)) {
folly::throwSystemError(
"error in EventBase::runAfterDelay(), failed to schedule timeout");
}
}
-bool EventBase::tryRunAfterDelay(const Cob& cob,
- uint32_t milliseconds,
- TimeoutManager::InternalEnum in) {
- CobTimeout* timeout = new CobTimeout(this, cob, in);
+bool EventBase::tryRunAfterDelay(
+ Func cob,
+ uint32_t milliseconds,
+ TimeoutManager::InternalEnum in) {
+ CobTimeout* timeout = new CobTimeout(this, std::move(cob), in);
if (!timeout->scheduleTimeout(milliseconds)) {
delete timeout;
return false;
void EventBase::initNotificationQueue() {
// Infinite size queue
- queue_.reset(new NotificationQueue<Cob>());
+ queue_.reset(new NotificationQueue<Func>());
// We allocate fnRunner_ separately, rather than declaring it directly
// as a member of EventBase solely so that we don't need to include
#include <boost/intrusive/list.hpp>
#include <boost/utility.hpp>
+
#include <folly/Executor.h>
+#include <folly/Function.h>
#include <folly/Portability.h>
#include <folly/experimental/ExecutionObserver.h>
#include <folly/futures/DrivableExecutor.h>
public TimeoutManager,
public DrivableExecutor {
public:
+ using Func = folly::Function<void()>;
+
/**
* A callback interface to use with runInLoop()
*
*
* Use runInEventBaseThread() to schedule functions from another thread.
*/
- void runInLoop(const Cob& c, bool thisIteration = false);
-
- void runInLoop(Cob&& c, bool thisIteration = false);
+ void runInLoop(Func c, bool thisIteration = false);
/**
* Adds the given callback to a queue of things run before destruction
* Note: will be called from the thread that invoked EventBase destructor,
* after the final run of loop callbacks.
*/
- void runAfterDrain(Cob&& cob);
+ void runAfterDrain(Func cob);
/**
* Adds a callback that will run immediately *before* the event loop.
*
* The function must not throw any exceptions.
*/
- bool runInEventBaseThread(const Cob& fn);
+ bool runInEventBaseThread(Func fn);
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
- bool runInEventBaseThreadAndWait(const Cob& fn);
+ bool runInEventBaseThreadAndWait(Func fn);
/*
* Like runInEventBaseThreadAndWait, except if the caller is already in the
* Like runInEventBaseThreadAndWait, except if the caller is already in the
* event base thread, the functor is simply run inline.
*/
- bool runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn);
+ bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
/**
* Runs the given Cob at some time after the specified number of
* Throws a std::system_error if an error occurs.
*/
void runAfterDelay(
- const Cob& c,
+ Func c,
uint32_t milliseconds,
TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
*
* */
bool tryRunAfterDelay(
- const Cob& cob,
+ Func cob,
uint32_t milliseconds,
TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
* called when that latency is exceeded.
* OBS: This functionality depends on time-measurement.
*/
- void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+ void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
assert(enableTimeMeasurement_);
maxLatency_ = maxLatency;
- maxLatencyCob_ = maxLatencyCob;
+ maxLatencyCob_ = std::move(maxLatencyCob);
}
void add(Cob fn) override {
// runInEventBaseThread() takes a const&,
// so no point in doing std::move here.
- runInEventBaseThread(fn);
+ runInEventBaseThread(std::move(fn));
}
/// Implements the DrivableExecutor interface
// appropriate client-provided Cob
class CobTimeout : public AsyncTimeout {
public:
- CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
- : AsyncTimeout(b, in), cob_(c) {}
+ CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in)
+ : AsyncTimeout(b, in), cob_(std::move(c)) {}
virtual void timeoutExpired() noexcept;
private:
- Cob cob_;
+ Func cob_;
public:
typedef boost::intrusive::list_member_hook<
// A notification queue for runInEventBaseThread() to use
// to send function requests to the EventBase thread.
- std::unique_ptr<NotificationQueue<Cob>> queue_;
+ std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
// limit for latency in microseconds (0 disables)
SmoothLoopTime maxLatencyLoopTime_;
// callback called when latency limit is exceeded
- Cob maxLatencyCob_;
+ Func maxLatencyCob_;
// Enables/disables time measurements in loopBody(). if disabled, the
// following functionality that relies on time-measurement, will not