io/async/SSLContext.h \
io/async/ScopedEventBaseThread.h \
io/async/TimeoutManager.h \
+ io/async/VirtualEventBase.h \
io/async/WriteChainAsyncTransportWrapper.h \
io/async/test/AsyncSSLSocketTest.h \
io/async/test/BlockingSocket.h \
io/async/Request.cpp \
io/async/SSLContext.cpp \
io/async/ScopedEventBaseThread.cpp \
+ io/async/VirtualEventBase.cpp \
io/async/HHWheelTimer.cpp \
io/async/TimeoutManager.cpp \
io/async/test/ScopedBoundPort.cpp \
namespace folly {
namespace fibers {
-inline EventBaseLoopController::EventBaseLoopController()
+template <typename EventBaseT>
+inline EventBaseLoopControllerT<EventBaseT>::EventBaseLoopControllerT()
: callback_(*this), aliveWeak_(destructionCallback_.getWeak()) {}
-inline EventBaseLoopController::~EventBaseLoopController() {
+template <typename EventBaseT>
+inline EventBaseLoopControllerT<EventBaseT>::~EventBaseLoopControllerT() {
callback_.cancelLoopCallback();
eventBaseKeepAlive_.reset();
}
-inline void EventBaseLoopController::attachEventBase(
- folly::EventBase& eventBase) {
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::attachEventBase(
+ EventBaseT& eventBase) {
if (eventBase_ != nullptr) {
LOG(ERROR) << "Attempt to reattach EventBase to LoopController";
}
}
}
-inline void EventBaseLoopController::setFiberManager(FiberManager* fm) {
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::setFiberManager(
+ FiberManager* fm) {
fm_ = fm;
}
-inline void EventBaseLoopController::schedule() {
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::schedule() {
if (eventBase_ == nullptr) {
// In this case we need to postpone scheduling.
awaitingScheduling_ = true;
}
}
-inline void EventBaseLoopController::cancel() {
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::cancel() {
callback_.cancelLoopCallback();
}
-inline void EventBaseLoopController::runLoop() {
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::runLoop() {
if (!eventBaseKeepAlive_) {
eventBaseKeepAlive_ = eventBase_->loopKeepAlive();
}
}
}
-inline void EventBaseLoopController::scheduleThreadSafe(
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::scheduleThreadSafe(
std::function<bool()> func) {
/* The only way we could end up here is if
1) Fiber thread creates a fiber that awaits (which means we must
}
}
-inline void EventBaseLoopController::timedSchedule(
+template <typename EventBaseT>
+inline void EventBaseLoopControllerT<EventBaseT>::timedSchedule(
std::function<void()> func,
TimePoint time) {
assert(eventBaseAttached_);
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/LoopController.h>
#include <folly/io/async/EventBase.h>
+#include <folly/io/async/VirtualEventBase.h>
#include <atomic>
#include <memory>
namespace folly {
namespace fibers {
-class EventBaseLoopController : public LoopController {
+template <typename EventBaseT>
+class EventBaseLoopControllerT : public LoopController {
public:
- explicit EventBaseLoopController();
- ~EventBaseLoopController();
+ explicit EventBaseLoopControllerT();
+ ~EventBaseLoopControllerT();
/**
* Attach EventBase after LoopController was created.
*/
- void attachEventBase(folly::EventBase& eventBase);
+ void attachEventBase(EventBaseT& eventBase);
- folly::EventBase* getEventBase() {
+ EventBaseT* getEventBase() {
return eventBase_;
}
private:
class ControllerCallback : public folly::EventBase::LoopCallback {
public:
- explicit ControllerCallback(EventBaseLoopController& controller)
+ explicit ControllerCallback(EventBaseLoopControllerT& controller)
: controller_(controller) {}
void runLoopCallback() noexcept override {
}
private:
- EventBaseLoopController& controller_;
+ EventBaseLoopControllerT& controller_;
};
class DestructionCallback : public folly::EventBase::LoopCallback {
};
bool awaitingScheduling_{false};
- folly::EventBase* eventBase_{nullptr};
- folly::EventBase::LoopKeepAlive eventBaseKeepAlive_;
+ EventBaseT* eventBase_{nullptr};
+ typename EventBaseT::LoopKeepAlive eventBaseKeepAlive_;
ControllerCallback callback_;
DestructionCallback destructionCallback_;
FiberManager* fm_{nullptr};
friend class FiberManager;
};
+
+using EventBaseLoopController = EventBaseLoopControllerT<folly::EventBase>;
}
} // folly::fibers
namespace {
+template <typename EventBaseT>
class EventBaseOnDestructionCallback : public EventBase::LoopCallback {
public:
- explicit EventBaseOnDestructionCallback(EventBase& evb) : evb_(evb) {}
+ explicit EventBaseOnDestructionCallback(EventBaseT& evb) : evb_(evb) {}
void runLoopCallback() noexcept override;
private:
- EventBase& evb_;
+ EventBaseT& evb_;
};
+template <typename EventBaseT>
class GlobalCache {
public:
- static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
+ static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
return instance().getImpl(evb, opts);
}
- static std::unique_ptr<FiberManager> erase(EventBase& evb) {
+ static std::unique_ptr<FiberManager> erase(EventBaseT& evb) {
return instance().eraseImpl(evb);
}
return *ret;
}
- FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
+ FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
std::lock_guard<std::mutex> lg(mutex_);
auto& fmPtrRef = map_[&evb];
if (!fmPtrRef) {
- auto loopController = make_unique<EventBaseLoopController>();
+ auto loopController = make_unique<EventBaseLoopControllerT<EventBaseT>>();
loopController->attachEventBase(evb);
- evb.runOnDestruction(new EventBaseOnDestructionCallback(evb));
+ evb.runOnDestruction(new EventBaseOnDestructionCallback<EventBaseT>(evb));
fmPtrRef = make_unique<FiberManager>(std::move(loopController), opts);
}
return *fmPtrRef;
}
- std::unique_ptr<FiberManager> eraseImpl(EventBase& evb) {
+ std::unique_ptr<FiberManager> eraseImpl(EventBaseT& evb) {
std::lock_guard<std::mutex> lg(mutex_);
DCHECK_EQ(1, map_.count(&evb));
}
std::mutex mutex_;
- std::unordered_map<EventBase*, std::unique_ptr<FiberManager>> map_;
+ std::unordered_map<EventBaseT*, std::unique_ptr<FiberManager>> map_;
};
constexpr size_t kEraseListMaxSize = 64;
+template <typename EventBaseT>
class ThreadLocalCache {
public:
- static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
+ static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
return instance()->getImpl(evb, opts);
}
- static void erase(EventBase& evb) {
+ static void erase(EventBaseT& evb) {
for (auto& localInstance : instance().accessAllThreads()) {
SYNCHRONIZED(info, localInstance.eraseInfo_) {
if (info.eraseList.size() >= kEraseListMaxSize) {
return *ret;
}
- FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
+ FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
eraseImpl();
auto& fmPtrRef = map_[&evb];
if (!fmPtrRef) {
- fmPtrRef = &GlobalCache::get(evb, opts);
+ fmPtrRef = &GlobalCache<EventBaseT>::get(evb, opts);
}
DCHECK(fmPtrRef != nullptr);
}
}
- std::unordered_map<EventBase*, FiberManager*> map_;
+ std::unordered_map<EventBaseT*, FiberManager*> map_;
std::atomic<bool> eraseRequested_{false};
struct EraseInfo {
bool eraseAll{false};
- std::vector<EventBase*> eraseList;
+ std::vector<EventBaseT*> eraseList;
};
folly::Synchronized<EraseInfo> eraseInfo_;
};
-void EventBaseOnDestructionCallback::runLoopCallback() noexcept {
- auto fm = GlobalCache::erase(evb_);
+template <typename EventBaseT>
+void EventBaseOnDestructionCallback<EventBaseT>::runLoopCallback() noexcept {
+ auto fm = GlobalCache<EventBaseT>::erase(evb_);
DCHECK(fm.get() != nullptr);
- ThreadLocalCache::erase(evb_);
+ ThreadLocalCache<EventBaseT>::erase(evb_);
delete this;
}
FiberManager& getFiberManager(
EventBase& evb,
const FiberManager::Options& opts) {
- return ThreadLocalCache::get(evb, opts);
+ return ThreadLocalCache<EventBase>::get(evb, opts);
+}
+
+FiberManager& getFiberManager(
+ VirtualEventBase& evb,
+ const FiberManager::Options& opts) {
+ return ThreadLocalCache<VirtualEventBase>::get(evb, opts);
}
}
}
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManagerInternal.h>
+#include <folly/io/async/VirtualEventBase.h>
namespace folly {
namespace fibers {
FiberManager& getFiberManager(
folly::EventBase& evb,
const FiberManager::Options& opts = FiberManager::Options());
+
+FiberManager& getFiberManager(
+ folly::VirtualEventBase& evb,
+ const FiberManager::Options& opts = FiberManager::Options());
}
}
#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
#include <folly/fibers/WhenN.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
using namespace folly::fibers;
validateResults<std::runtime_error>(results, COUNT);
}
+TEST(FiberManager, VirtualEventBase) {
+ folly::ScopedEventBaseThread thread;
+
+ auto evb1 =
+ folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
+ auto evb2 =
+ folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
+
+ bool done1{false};
+ bool done2{false};
+
+ getFiberManager(*evb1).addTaskRemote([&] {
+ Baton baton;
+ baton.timed_wait(std::chrono::milliseconds{100});
+
+ done1 = true;
+ });
+
+ getFiberManager(*evb2).addTaskRemote([&] {
+ Baton baton;
+ baton.timed_wait(std::chrono::milliseconds{200});
+
+ done2 = true;
+ });
+
+ evb1.reset();
+ EXPECT_TRUE(done1);
+
+ evb2.reset();
+ EXPECT_TRUE(done2);
+}
+
/**
* Test that we can properly track fiber stack usage.
*
static constexpr const char* kContextDataName{"EventBase"};
};
+class VirtualEventBase;
+
/**
* This class is a wrapper for all asynchronous I/O processing functionality
*
// EventBase needs access to LoopCallbackList (and therefore to hook_)
friend class EventBase;
+ friend class VirtualEventBase;
std::shared_ptr<RequestContext> context_;
};
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/io/async/VirtualEventBase.h>
+
+namespace folly {
+
+VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) {
+ evbLoopKeepAlive_ = evb_.loopKeepAliveAtomic();
+ loopKeepAlive_ = loopKeepAliveAtomic();
+}
+
+VirtualEventBase::~VirtualEventBase() {
+ CHECK(!evb_.inRunningEventBaseThread());
+
+ CHECK(evb_.runInEventBaseThread([&] { loopKeepAlive_.reset(); }));
+ loopKeepAliveBaton_.wait();
+
+ CHECK(evb_.runInEventBaseThreadAndWait([&] {
+ clearCobTimeouts();
+
+ onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) {
+ while (!callbacks.empty()) {
+ auto& callback = callbacks.front();
+ callbacks.pop_front();
+ callback.runLoopCallback();
+ }
+ });
+
+ evbLoopKeepAlive_.reset();
+ }));
+}
+
+void VirtualEventBase::runOnDestruction(EventBase::LoopCallback* callback) {
+ onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) {
+ callback->cancelLoopCallback();
+ callbacks.push_back(*callback);
+ });
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Baton.h>
+#include <folly/Executor.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly {
+
+/**
+ * VirtualEventBase implements a light-weight view onto existing EventBase.
+ *
+ * Multiple VirtualEventBases can be backed by a single EventBase. Similarly
+ * to EventBase, VirtualEventBase implements loopKeepAlive() functionality,
+ * which allows callbacks holding LoopKeepAlive token to keep EventBase looping
+ * until they are complete.
+ *
+ * VirtualEventBase destructor blocks until all its KeepAliveTokens are released
+ * and all tasks scheduled through it are complete. EventBase destructor also
+ * blocks until all VirtualEventBases backed by it are released.
+ */
+class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
+ public:
+ explicit VirtualEventBase(EventBase& evb);
+
+ VirtualEventBase(const VirtualEventBase&) = delete;
+ VirtualEventBase& operator=(const VirtualEventBase&) = delete;
+
+ ~VirtualEventBase();
+
+ EventBase& getEventBase() {
+ return evb_;
+ }
+
+ /**
+ * Adds the given callback to a queue of things run before destruction
+ * of current VirtualEventBase.
+ *
+ * This allows users of VirtualEventBase that run in it, but don't control it,
+ * to be notified before VirtualEventBase gets destructed.
+ *
+ * Note: this will be called from the loop of the EventBase, backing this
+ * VirtualEventBase
+ */
+ void runOnDestruction(EventBase::LoopCallback* callback);
+
+ /**
+ * @see EventBase::runInLoop
+ */
+ template <typename F>
+ void runInLoop(F&& f, bool thisIteration = false) {
+ evb_.runInLoop(std::forward<F>(f), thisIteration);
+ }
+
+ /**
+ * VirtualEventBase destructor blocks until all tasks scheduled through its
+ * runInEventBaseThread are complete.
+ *
+ * @see EventBase::runInEventBaseThread
+ */
+ template <typename F>
+ void runInEventBaseThread(F&& f) {
+ // LoopKeepAlive token has to be released in the EventBase thread. If
+ // runInEventBaseThread() fails, we can't extract the LoopKeepAlive token
+ // from the callback to properly release it.
+ CHECK(evb_.runInEventBaseThread([
+ keepAlive = loopKeepAliveAtomic(),
+ f = std::forward<F>(f)
+ ]() mutable { f(); }));
+ }
+
+ void attachTimeoutManager(
+ AsyncTimeout* obj,
+ TimeoutManager::InternalEnum internal) override {
+ evb_.attachTimeoutManager(obj, internal);
+ }
+
+ void detachTimeoutManager(AsyncTimeout* obj) override {
+ evb_.detachTimeoutManager(obj);
+ }
+
+ bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
+ override {
+ return evb_.scheduleTimeout(obj, timeout);
+ }
+
+ void cancelTimeout(AsyncTimeout* obj) override {
+ evb_.cancelTimeout(obj);
+ }
+
+ void bumpHandlingTime() override {
+ evb_.bumpHandlingTime();
+ }
+
+ bool isInTimeoutManagerThread() override {
+ return evb_.isInTimeoutManagerThread();
+ }
+
+ /**
+ * @see runInEventBaseThread
+ */
+ void add(folly::Func f) override {
+ runInEventBaseThread(std::move(f));
+ }
+
+ struct LoopKeepAliveDeleter {
+ void operator()(VirtualEventBase* evb) {
+ DCHECK(evb->getEventBase().inRunningEventBaseThread());
+ if (evb->loopKeepAliveCountAtomic_.load()) {
+ evb->loopKeepAliveCount_ += evb->loopKeepAliveCountAtomic_.exchange(0);
+ }
+ DCHECK(evb->loopKeepAliveCount_ > 0);
+ if (--evb->loopKeepAliveCount_ == 0) {
+ evb->loopKeepAliveBaton_.post();
+ }
+ }
+ };
+ using LoopKeepAlive = std::unique_ptr<VirtualEventBase, LoopKeepAliveDeleter>;
+
+ /**
+ * Returns you a handle which prevents VirtualEventBase from being destroyed.
+ * LoopKeepAlive handle can be released from EventBase loop only.
+ *
+ * loopKeepAlive() can be called from EventBase thread only.
+ */
+ LoopKeepAlive loopKeepAlive() {
+ DCHECK(evb_.isInEventBaseThread());
+ ++loopKeepAliveCount_;
+ return LoopKeepAlive(this);
+ }
+
+ /**
+ * Thread-safe version of loopKeepAlive()
+ */
+ LoopKeepAlive loopKeepAliveAtomic() {
+ if (evb_.inRunningEventBaseThread()) {
+ return loopKeepAlive();
+ }
+ ++loopKeepAliveCountAtomic_;
+ return LoopKeepAlive(this);
+ }
+
+ private:
+ using LoopCallbackList = EventBase::LoopCallback::List;
+
+ EventBase& evb_;
+
+ ssize_t loopKeepAliveCount_{0};
+ std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
+ folly::Baton<> loopKeepAliveBaton_;
+ LoopKeepAlive loopKeepAlive_;
+
+ EventBase::LoopKeepAlive evbLoopKeepAlive_;
+
+ folly::Synchronized<LoopCallbackList> onDestructionCallbacks_;
+};
+}