Use folly::Function in folly::EventBase
authorYedidya Feldblum <yfeldblum@fb.com>
Thu, 7 Apr 2016 10:30:56 +0000 (03:30 -0700)
committerFacebook Github Bot 6 <facebook-github-bot-6-bot@fb.com>
Thu, 7 Apr 2016 10:35:23 +0000 (03:35 -0700)
Summary:[Folly] Use `folly::Function` in `folly::EventBase`.

`folly::Function` is moveable but noncopyable and therefore supports wrapping moveable but noncopyable lambdas - like the kind that arises when move-capturing a `std::unique_ptr`.

`std::function` is copyable - therefore it does not support wrapping such noncopyable lambdas.

Switching `folly::EventBase` to use it will allow callers to pass such noncopyable lambdas, allowing, e.g.:

```
auto numptr = folly::make_unique<int>(7); // unique_ptr is noncopyable
folly::EventBase eb;
eb.runInLoop([numptr = std::move(numptr)] { // therefore lambda is noncopyable
  int num = *numptr;
});
eb.loop();
```

This allows us to move away from the `folly::MoveWrapper` hack, which worked like:

```
auto numptr = folly::make_unique<int>(7); // unique_ptr is noncopyable
auto numptrw = folly::makeMoveWrapper(std::move(numptr)); // MoveWrapper is "copyable" - hacky
folly::EventBase eb;
eb.runInLoop([numptrw] { // therefore lambda is "copyable" - hacky
  int num = **numptrw;
});
```

We needed to do that hack while:

But neither condition is true anymore.

Reviewed By: spacedentist

Differential Revision: D3143931

fb-gh-sync-id: 4fbdf5fb77eb5848ed1c6de942b022382141577f
fbshipit-source-id: 4fbdf5fb77eb5848ed1c6de942b022382141577f

folly/io/async/EventBase.cpp
folly/io/async/EventBase.h

index 1e0c808d28eb3da084e2e4eff29ee78be372e819..72cd76448ae935deb955838092e89f073212c83f 100644 (file)
 
 namespace {
 
-using folly::Cob;
 using folly::EventBase;
 
-template <typename Callback>
 class FunctionLoopCallback : public EventBase::LoopCallback {
  public:
-  explicit FunctionLoopCallback(Cob&& function)
+  explicit FunctionLoopCallback(EventBase::Func&& function)
       : function_(std::move(function)) {}
 
-  explicit FunctionLoopCallback(const Cob& function) : function_(function) {}
-
   void runLoopCallback() noexcept override {
     function_();
     delete this;
   }
 
  private:
-  Callback function_;
+  EventBase::Func function_;
 };
 }
 
@@ -59,10 +55,10 @@ namespace folly {
  * EventBase::FunctionRunner
  */
 
-class EventBase::FunctionRunner : public NotificationQueue<Cob>::Consumer {
+class EventBase::FunctionRunner
+    : public NotificationQueue<EventBase::Func>::Consumer {
  public:
-  void messageAvailable(Cob&& msg) override {
-
+  void messageAvailable(Func&& msg) override {
     // In libevent2, internal events do not break the loop.
     // Most users would expect loop(), followed by runInEventBaseThread(),
     // to break the loop and check if it should exit or not.
@@ -508,20 +504,9 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
   }
 }
 
-void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
-  DCHECK(isInEventBaseThread());
-  auto wrapper = new FunctionLoopCallback<Cob>(cob);
-  wrapper->context_ = RequestContext::saveContext();
-  if (runOnceCallbacks_ != nullptr && thisIteration) {
-    runOnceCallbacks_->push_back(*wrapper);
-  } else {
-    loopCallbacks_.push_back(*wrapper);
-  }
-}
-
-void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
+void EventBase::runInLoop(Func cob, bool thisIteration) {
   DCHECK(isInEventBaseThread());
-  auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
+  auto wrapper = new FunctionLoopCallback(std::move(cob));
   wrapper->context_ = RequestContext::saveContext();
   if (runOnceCallbacks_ != nullptr && thisIteration) {
     runOnceCallbacks_->push_back(*wrapper);
@@ -530,8 +515,8 @@ void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
   }
 }
 
-void EventBase::runAfterDrain(Cob&& cob) {
-  auto callback = new FunctionLoopCallback<Cob>(std::move(cob));
+void EventBase::runAfterDrain(Func cob) {
+  auto callback = new FunctionLoopCallback(std::move(cob));
   std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_);
   callback->cancelLoopCallback();
   runAfterDrainCallbacks_.push_back(*callback);
@@ -549,7 +534,7 @@ void EventBase::runBeforeLoop(LoopCallback* callback) {
   runBeforeLoopCallbacks_.push_back(*callback);
 }
 
-bool EventBase::runInEventBaseThread(const Cob& fn) {
+bool EventBase::runInEventBaseThread(Func fn) {
   // Send the message.
   // It will be received by the FunctionRunner in the EventBase's thread.
 
@@ -562,13 +547,13 @@ bool EventBase::runInEventBaseThread(const Cob& fn) {
 
   // Short-circuit if we are already in our event base
   if (inRunningEventBaseThread()) {
-    runInLoop(fn);
+    runInLoop(std::move(fn));
     return true;
 
   }
 
   try {
-    queue_->putMessage(fn);
+    queue_->putMessage(std::move(fn));
   } catch (const std::exception& ex) {
     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
                << "for EventBase thread: " << ex.what();
@@ -578,7 +563,7 @@ bool EventBase::runInEventBaseThread(const Cob& fn) {
   return true;
 }
 
-bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
+bool EventBase::runInEventBaseThreadAndWait(Func fn) {
   if (inRunningEventBaseThread()) {
     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
                << "allowed";
@@ -605,28 +590,30 @@ bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
   return true;
 }
 
-bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) {
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
   if (isInEventBaseThread()) {
     fn();
     return true;
   } else {
-    return runInEventBaseThreadAndWait(fn);
+    return runInEventBaseThreadAndWait(std::move(fn));
   }
 }
 
-void EventBase::runAfterDelay(const Cob& cob,
-                              uint32_t milliseconds,
-                              TimeoutManager::InternalEnum in) {
-  if (!tryRunAfterDelay(cob, milliseconds, in)) {
+void EventBase::runAfterDelay(
+    Func cob,
+    uint32_t milliseconds,
+    TimeoutManager::InternalEnum in) {
+  if (!tryRunAfterDelay(std::move(cob), milliseconds, in)) {
     folly::throwSystemError(
       "error in EventBase::runAfterDelay(), failed to schedule timeout");
   }
 }
 
-bool EventBase::tryRunAfterDelay(const Cob& cob,
-                                 uint32_t milliseconds,
-                                 TimeoutManager::InternalEnum in) {
-  CobTimeout* timeout = new CobTimeout(this, cob, in);
+bool EventBase::tryRunAfterDelay(
+    Func cob,
+    uint32_t milliseconds,
+    TimeoutManager::InternalEnum in) {
+  CobTimeout* timeout = new CobTimeout(this, std::move(cob), in);
   if (!timeout->scheduleTimeout(milliseconds)) {
     delete timeout;
     return false;
@@ -667,7 +654,7 @@ bool EventBase::runLoopCallbacks(bool setContext) {
 
 void EventBase::initNotificationQueue() {
   // Infinite size queue
-  queue_.reset(new NotificationQueue<Cob>());
+  queue_.reset(new NotificationQueue<Func>());
 
   // We allocate fnRunner_ separately, rather than declaring it directly
   // as a member of EventBase solely so that we don't need to include
index 52b441716ad9f43a6a3014d14b0ef3d0ef115061..909d965728e981262f5288258b02c271c9a47288 100644 (file)
@@ -33,7 +33,9 @@
 
 #include <boost/intrusive/list.hpp>
 #include <boost/utility.hpp>
+
 #include <folly/Executor.h>
+#include <folly/Function.h>
 #include <folly/Portability.h>
 #include <folly/experimental/ExecutionObserver.h>
 #include <folly/futures/DrivableExecutor.h>
@@ -119,6 +121,8 @@ class EventBase : private boost::noncopyable,
                   public TimeoutManager,
                   public DrivableExecutor {
  public:
+  using Func = folly::Function<void()>;
+
   /**
    * A callback interface to use with runInLoop()
    *
@@ -304,9 +308,7 @@ class EventBase : private boost::noncopyable,
    *
    * Use runInEventBaseThread() to schedule functions from another thread.
    */
-  void runInLoop(const Cob& c, bool thisIteration = false);
-
-  void runInLoop(Cob&& c, bool thisIteration = false);
+  void runInLoop(Func c, bool thisIteration = false);
 
   /**
    * Adds the given callback to a queue of things run before destruction
@@ -327,7 +329,7 @@ class EventBase : private boost::noncopyable,
    * Note: will be called from the thread that invoked EventBase destructor,
    *       after the final run of loop callbacks.
    */
-  void runAfterDrain(Cob&& cob);
+  void runAfterDrain(Func cob);
 
   /**
    * Adds a callback that will run immediately *before* the event loop.
@@ -379,7 +381,7 @@ class EventBase : private boost::noncopyable,
    *
    * The function must not throw any exceptions.
    */
-  bool runInEventBaseThread(const Cob& fn);
+  bool runInEventBaseThread(Func fn);
 
   /*
    * Like runInEventBaseThread, but the caller waits for the callback to be
@@ -392,7 +394,7 @@ class EventBase : private boost::noncopyable,
    * Like runInEventBaseThread, but the caller waits for the callback to be
    * executed.
    */
-  bool runInEventBaseThreadAndWait(const Cob& fn);
+  bool runInEventBaseThreadAndWait(Func fn);
 
   /*
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
@@ -405,7 +407,7 @@ class EventBase : private boost::noncopyable,
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
    * event base thread, the functor is simply run inline.
    */
-  bool runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn);
+  bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
 
   /**
    * Runs the given Cob at some time after the specified number of
@@ -414,7 +416,7 @@ class EventBase : private boost::noncopyable,
    * Throws a std::system_error if an error occurs.
    */
   void runAfterDelay(
-      const Cob& c,
+      Func c,
       uint32_t milliseconds,
       TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
 
@@ -425,7 +427,7 @@ class EventBase : private boost::noncopyable,
    *
    * */
   bool tryRunAfterDelay(
-      const Cob& cob,
+      Func cob,
       uint32_t milliseconds,
       TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
 
@@ -434,10 +436,10 @@ class EventBase : private boost::noncopyable,
    * called when that latency is exceeded.
    * OBS: This functionality depends on time-measurement.
    */
-  void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+  void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
     assert(enableTimeMeasurement_);
     maxLatency_ = maxLatency;
-    maxLatencyCob_ = maxLatencyCob;
+    maxLatencyCob_ = std::move(maxLatencyCob);
   }
 
 
@@ -576,7 +578,7 @@ class EventBase : private boost::noncopyable,
   void add(Cob fn) override {
     // runInEventBaseThread() takes a const&,
     // so no point in doing std::move here.
-    runInEventBaseThread(fn);
+    runInEventBaseThread(std::move(fn));
   }
 
   /// Implements the DrivableExecutor interface
@@ -614,13 +616,13 @@ class EventBase : private boost::noncopyable,
   // appropriate client-provided Cob
   class CobTimeout : public AsyncTimeout {
    public:
-    CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
-        : AsyncTimeout(b, in), cob_(c) {}
+    CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in)
+        : AsyncTimeout(b, in), cob_(std::move(c)) {}
 
     virtual void timeoutExpired() noexcept;
 
    private:
-    Cob cob_;
+    Func cob_;
 
    public:
     typedef boost::intrusive::list_member_hook<
@@ -673,7 +675,7 @@ class EventBase : private boost::noncopyable,
 
   // A notification queue for runInEventBaseThread() to use
   // to send function requests to the EventBase thread.
-  std::unique_ptr<NotificationQueue<Cob>> queue_;
+  std::unique_ptr<NotificationQueue<Func>> queue_;
   std::unique_ptr<FunctionRunner> fnRunner_;
 
   // limit for latency in microseconds (0 disables)
@@ -688,7 +690,7 @@ class EventBase : private boost::noncopyable,
   SmoothLoopTime maxLatencyLoopTime_;
 
   // callback called when latency limit is exceeded
-  Cob maxLatencyCob_;
+  Func maxLatencyCob_;
 
   // Enables/disables time measurements in loopBody(). if disabled, the
   // following functionality that relies on time-measurement, will not