concurrency/CacheLocality.h \
concurrency/ConcurrentHashMap.h \
concurrency/CoreCachedSharedPtr.h \
- concurrency/GlobalThreadPoolList.h \
concurrency/detail/ConcurrentHashMap-detail.h \
ConstexprMath.h \
detail/AtomicHashUtils.h \
executors/FiberIOExecutor.h \
executors/FutureExecutor.h \
executors/GlobalExecutor.h \
+ executors/GlobalThreadPoolList.h \
+ executors/InlineExecutor.h \
executors/IOExecutor.h \
executors/IOObjectCache.h \
executors/IOThreadPoolExecutor.h \
futures/Future-inl.h \
futures/FutureException.h \
futures/FutureSplitter.h \
- futures/InlineExecutor.h \
- futures/ManualExecutor.h \
futures/Promise-inl.h \
futures/Promise.h \
futures/SharedPromise.h \
compression/Compression.cpp \
compression/Zlib.cpp \
concurrency/CacheLocality.cpp \
- concurrency/GlobalThreadPoolList.cpp \
detail/Futex.cpp \
detail/IPAddress.cpp \
detail/StaticSingletonManager.cpp \
futures/Barrier.cpp \
futures/Future.cpp \
futures/FutureException.cpp \
- futures/InlineExecutor.cpp \
- futures/ManualExecutor.cpp \
futures/ThreadWheelTimekeeper.cpp \
futures/test/TestExecutor.cpp \
executors/CPUThreadPoolExecutor.cpp \
executors/Codel.cpp \
executors/GlobalExecutor.cpp \
+ executors/GlobalThreadPoolList.cpp \
executors/IOThreadPoolExecutor.cpp \
+ executors/InlineExecutor.cpp \
+ executors/ManualExecutor.cpp \
executors/SerialExecutor.cpp \
executors/ThreadPoolExecutor.cpp \
executors/ThreadedExecutor.cpp \
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/concurrency/GlobalThreadPoolList.h>
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include <folly/CppAttributes.h>
-#include <folly/Indestructible.h>
-#include <folly/Synchronized.h>
-#include <folly/ThreadLocal.h>
-
-namespace folly {
-
-namespace {
-
-class ThreadListHook {
- public:
- ThreadListHook(ThreadPoolListHook* poolId, std::thread::id threadId);
- ~ThreadListHook();
-
- private:
- ThreadListHook() {}
- ThreadPoolListHook* poolId_;
- std::thread::id threadId_;
-};
-
-class GlobalThreadPoolListImpl {
- public:
- GlobalThreadPoolListImpl() {}
-
- void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name);
-
- void unregisterThreadPool(ThreadPoolListHook* threadPoolId);
-
- void registerThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId);
-
- void unregisterThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId);
-
- private:
- struct PoolInfo {
- ThreadPoolListHook* addr;
- std::string name;
- std::vector<std::thread::id> threads;
- };
-
- struct Pools {
- // Just a vector since ease of access from gdb is the most important
- // property
- std::vector<PoolInfo> poolsInfo_;
-
- std::vector<std::thread::id>* FOLLY_NULLABLE
- getThreadVector(void* threadPoolId) {
- for (auto& elem : vector()) {
- if (elem.addr == threadPoolId) {
- return &elem.threads;
- }
- }
-
- return nullptr;
- }
-
- std::vector<PoolInfo>& vector() {
- return poolsInfo_;
- }
- };
-
- Pools pools_;
-};
-
-class GlobalThreadPoolList {
- public:
- GlobalThreadPoolList() {}
-
- static GlobalThreadPoolList& instance();
-
- void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name);
-
- void unregisterThreadPool(ThreadPoolListHook* threadPoolId);
-
- void registerThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId);
-
- void unregisterThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId);
-
- GlobalThreadPoolList(GlobalThreadPoolList const&) = delete;
- void operator=(GlobalThreadPoolList const&) = delete;
-
- private:
- folly::Synchronized<GlobalThreadPoolListImpl> globalListImpl_;
- folly::ThreadLocalPtr<ThreadListHook> threadHook_;
-};
-
-} // namespace
-
-GlobalThreadPoolList& GlobalThreadPoolList::instance() {
- static folly::Indestructible<GlobalThreadPoolList> ret;
- return *ret;
-}
-
-void GlobalThreadPoolList::registerThreadPool(
- ThreadPoolListHook* threadPoolId,
- std::string name) {
- globalListImpl_->registerThreadPool(threadPoolId, name);
-}
-
-void GlobalThreadPoolList::unregisterThreadPool(
- ThreadPoolListHook* threadPoolId) {
- globalListImpl_->unregisterThreadPool(threadPoolId);
-}
-
-void GlobalThreadPoolList::registerThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId) {
- DCHECK(!threadHook_);
- threadHook_.reset(make_unique<ThreadListHook>(threadPoolId, threadId));
-
- globalListImpl_->registerThreadPoolThread(threadPoolId, threadId);
-}
-
-void GlobalThreadPoolList::unregisterThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId) {
- (void)threadPoolId;
- (void)threadId;
- globalListImpl_->unregisterThreadPoolThread(threadPoolId, threadId);
-}
-
-void GlobalThreadPoolListImpl::registerThreadPool(
- ThreadPoolListHook* threadPoolId,
- std::string name) {
- PoolInfo info;
- info.name = name;
- info.addr = threadPoolId;
- pools_.vector().push_back(info);
-}
-
-void GlobalThreadPoolListImpl::unregisterThreadPool(
- ThreadPoolListHook* threadPoolId) {
- auto& vector = pools_.vector();
- vector.erase(
- std::remove_if(
- vector.begin(),
- vector.end(),
- [=](PoolInfo& i) { return i.addr == threadPoolId; }),
- vector.end());
-}
-
-void GlobalThreadPoolListImpl::registerThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId) {
- auto vec = pools_.getThreadVector(threadPoolId);
- if (vec == nullptr) {
- return;
- }
-
- vec->push_back(threadId);
-}
-
-void GlobalThreadPoolListImpl::unregisterThreadPoolThread(
- ThreadPoolListHook* threadPoolId,
- std::thread::id threadId) {
- auto vec = pools_.getThreadVector(threadPoolId);
- if (vec == nullptr) {
- return;
- }
-
- vec->erase(std::remove(vec->begin(), vec->end(), threadId), vec->end());
-}
-
-ThreadListHook::ThreadListHook(
- ThreadPoolListHook* poolId,
- std::thread::id threadId) {
- poolId_ = poolId;
- threadId_ = threadId;
-}
-
-ThreadListHook::~ThreadListHook() {
- GlobalThreadPoolList::instance().unregisterThreadPoolThread(
- poolId_, threadId_);
-}
-
-ThreadPoolListHook::ThreadPoolListHook(std::string name) {
- GlobalThreadPoolList::instance().registerThreadPool(this, name);
-}
-
-ThreadPoolListHook::~ThreadPoolListHook() {
- GlobalThreadPoolList::instance().unregisterThreadPool(this);
-}
-
-void ThreadPoolListHook::registerThread() {
- GlobalThreadPoolList::instance().registerThreadPoolThread(
- this, std::this_thread::get_id());
-}
-
-} // folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include <folly/Indestructible.h>
-#include <folly/Synchronized.h>
-#include <folly/ThreadLocal.h>
-
-namespace folly {
-
-/**
- * A hook for tracking which threads belong to which thread pools.
- * This is used only by a gdb extension to aid in debugging. You won't be able
- * to see any useful information from within C++ code.
- *
- * An instance of ThreadPoolListHook should be created in the thread pool class
- * that you want to keep track of. Then, to register a thread you call
- * registerThread() on your instance of ThreadPoolListHook from that thread.
- *
- * When a thread exits it will be removed from the list
- * When the thread pool is destroyed, it will be removed from the list
- */
-class ThreadPoolListHook {
- public:
- /**
- * Name is used to identify the thread pool when listing threads.
- */
- explicit ThreadPoolListHook(std::string name);
- ~ThreadPoolListHook();
-
- /**
- * Call this from any new thread that the thread pool creates.
- */
- void registerThread();
-
- ThreadPoolListHook(const ThreadPoolListHook& other) = delete;
- ThreadPoolListHook& operator=(const ThreadPoolListHook&) = delete;
-
- private:
- ThreadPoolListHook();
-};
-
-} // folly
#include <folly/Singleton.h>
#include <folly/executors/IOExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
-#include <folly/futures/InlineExecutor.h>
+#include <folly/executors/InlineExecutor.h>
using namespace folly;
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/GlobalThreadPoolList.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <folly/CppAttributes.h>
+#include <folly/Indestructible.h>
+#include <folly/Synchronized.h>
+#include <folly/ThreadLocal.h>
+
+namespace folly {
+
+namespace {
+
+class ThreadListHook {
+ public:
+ ThreadListHook(ThreadPoolListHook* poolId, std::thread::id threadId);
+ ~ThreadListHook();
+
+ private:
+ ThreadListHook() {}
+ ThreadPoolListHook* poolId_;
+ std::thread::id threadId_;
+};
+
+class GlobalThreadPoolListImpl {
+ public:
+ GlobalThreadPoolListImpl() {}
+
+ void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name);
+
+ void unregisterThreadPool(ThreadPoolListHook* threadPoolId);
+
+ void registerThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId);
+
+ void unregisterThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId);
+
+ private:
+ struct PoolInfo {
+ ThreadPoolListHook* addr;
+ std::string name;
+ std::vector<std::thread::id> threads;
+ };
+
+ struct Pools {
+ // Just a vector since ease of access from gdb is the most important
+ // property
+ std::vector<PoolInfo> poolsInfo_;
+
+ std::vector<std::thread::id>* FOLLY_NULLABLE
+ getThreadVector(void* threadPoolId) {
+ for (auto& elem : vector()) {
+ if (elem.addr == threadPoolId) {
+ return &elem.threads;
+ }
+ }
+
+ return nullptr;
+ }
+
+ std::vector<PoolInfo>& vector() {
+ return poolsInfo_;
+ }
+ };
+
+ Pools pools_;
+};
+
+class GlobalThreadPoolList {
+ public:
+ GlobalThreadPoolList() {}
+
+ static GlobalThreadPoolList& instance();
+
+ void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name);
+
+ void unregisterThreadPool(ThreadPoolListHook* threadPoolId);
+
+ void registerThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId);
+
+ void unregisterThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId);
+
+ GlobalThreadPoolList(GlobalThreadPoolList const&) = delete;
+ void operator=(GlobalThreadPoolList const&) = delete;
+
+ private:
+ folly::Synchronized<GlobalThreadPoolListImpl> globalListImpl_;
+ folly::ThreadLocalPtr<ThreadListHook> threadHook_;
+};
+
+} // namespace
+
+GlobalThreadPoolList& GlobalThreadPoolList::instance() {
+ static folly::Indestructible<GlobalThreadPoolList> ret;
+ return *ret;
+}
+
+void GlobalThreadPoolList::registerThreadPool(
+ ThreadPoolListHook* threadPoolId,
+ std::string name) {
+ globalListImpl_->registerThreadPool(threadPoolId, name);
+}
+
+void GlobalThreadPoolList::unregisterThreadPool(
+ ThreadPoolListHook* threadPoolId) {
+ globalListImpl_->unregisterThreadPool(threadPoolId);
+}
+
+void GlobalThreadPoolList::registerThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId) {
+ DCHECK(!threadHook_);
+ threadHook_.reset(make_unique<ThreadListHook>(threadPoolId, threadId));
+
+ globalListImpl_->registerThreadPoolThread(threadPoolId, threadId);
+}
+
+void GlobalThreadPoolList::unregisterThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId) {
+ (void)threadPoolId;
+ (void)threadId;
+ globalListImpl_->unregisterThreadPoolThread(threadPoolId, threadId);
+}
+
+void GlobalThreadPoolListImpl::registerThreadPool(
+ ThreadPoolListHook* threadPoolId,
+ std::string name) {
+ PoolInfo info;
+ info.name = name;
+ info.addr = threadPoolId;
+ pools_.vector().push_back(info);
+}
+
+void GlobalThreadPoolListImpl::unregisterThreadPool(
+ ThreadPoolListHook* threadPoolId) {
+ auto& vector = pools_.vector();
+ vector.erase(
+ std::remove_if(
+ vector.begin(),
+ vector.end(),
+ [=](PoolInfo& i) { return i.addr == threadPoolId; }),
+ vector.end());
+}
+
+void GlobalThreadPoolListImpl::registerThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId) {
+ auto vec = pools_.getThreadVector(threadPoolId);
+ if (vec == nullptr) {
+ return;
+ }
+
+ vec->push_back(threadId);
+}
+
+void GlobalThreadPoolListImpl::unregisterThreadPoolThread(
+ ThreadPoolListHook* threadPoolId,
+ std::thread::id threadId) {
+ auto vec = pools_.getThreadVector(threadPoolId);
+ if (vec == nullptr) {
+ return;
+ }
+
+ vec->erase(std::remove(vec->begin(), vec->end(), threadId), vec->end());
+}
+
+ThreadListHook::ThreadListHook(
+ ThreadPoolListHook* poolId,
+ std::thread::id threadId) {
+ poolId_ = poolId;
+ threadId_ = threadId;
+}
+
+ThreadListHook::~ThreadListHook() {
+ GlobalThreadPoolList::instance().unregisterThreadPoolThread(
+ poolId_, threadId_);
+}
+
+ThreadPoolListHook::ThreadPoolListHook(std::string name) {
+ GlobalThreadPoolList::instance().registerThreadPool(this, name);
+}
+
+ThreadPoolListHook::~ThreadPoolListHook() {
+ GlobalThreadPoolList::instance().unregisterThreadPool(this);
+}
+
+void ThreadPoolListHook::registerThread() {
+ GlobalThreadPoolList::instance().registerThreadPoolThread(
+ this, std::this_thread::get_id());
+}
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <folly/Indestructible.h>
+#include <folly/Synchronized.h>
+#include <folly/ThreadLocal.h>
+
+namespace folly {
+
+/**
+ * A hook for tracking which threads belong to which thread pools.
+ * This is used only by a gdb extension to aid in debugging. You won't be able
+ * to see any useful information from within C++ code.
+ *
+ * An instance of ThreadPoolListHook should be created in the thread pool class
+ * that you want to keep track of. Then, to register a thread you call
+ * registerThread() on your instance of ThreadPoolListHook from that thread.
+ *
+ * When a thread exits it will be removed from the list
+ * When the thread pool is destroyed, it will be removed from the list
+ */
+class ThreadPoolListHook {
+ public:
+ /**
+ * Name is used to identify the thread pool when listing threads.
+ */
+ explicit ThreadPoolListHook(std::string name);
+ ~ThreadPoolListHook();
+
+ /**
+ * Call this from any new thread that the thread pool creates.
+ */
+ void registerThread();
+
+ ThreadPoolListHook(const ThreadPoolListHook& other) = delete;
+ ThreadPoolListHook& operator=(const ThreadPoolListHook&) = delete;
+
+ private:
+ ThreadPoolListHook();
+};
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/InlineExecutor.h>
+
+#include <folly/Indestructible.h>
+
+namespace folly {
+
+InlineExecutor& InlineExecutor::instance() {
+ static auto instance = Indestructible<InlineExecutor>{};
+ return *instance;
+}
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/Executor.h>
+
+namespace folly {
+
+/// When work is "queued", execute it immediately inline.
+/// Usually when you think you want this, you actually want a
+/// QueuedImmediateExecutor.
+class InlineExecutor : public Executor {
+ public:
+ static InlineExecutor& instance();
+
+ void add(Func f) override {
+ f();
+ }
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/ManualExecutor.h>
+
+#include <string.h>
+#include <string>
+#include <tuple>
+
+namespace folly {
+
+void ManualExecutor::add(Func callback) {
+ std::lock_guard<std::mutex> lock(lock_);
+ funcs_.emplace(std::move(callback));
+ sem_.post();
+}
+
+size_t ManualExecutor::run() {
+ size_t count;
+ size_t n;
+ Func func;
+
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+
+ while (!scheduledFuncs_.empty()) {
+ auto& sf = scheduledFuncs_.top();
+ if (sf.time > now_) {
+ break;
+ }
+ funcs_.emplace(sf.moveOutFunc());
+ scheduledFuncs_.pop();
+ }
+
+ n = funcs_.size();
+ }
+
+ for (count = 0; count < n; count++) {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (funcs_.empty()) {
+ break;
+ }
+
+ // Balance the semaphore so it doesn't grow without bound
+ // if nobody is calling wait().
+ // This may fail (with EAGAIN), that's fine.
+ sem_.tryWait();
+
+ func = std::move(funcs_.front());
+ funcs_.pop();
+ }
+ func();
+ }
+
+ return count;
+}
+
+void ManualExecutor::wait() {
+ while (true) {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (!funcs_.empty()) {
+ break;
+ }
+ }
+
+ sem_.wait();
+ }
+}
+
+void ManualExecutor::advanceTo(TimePoint const& t) {
+ if (t > now_) {
+ now_ = t;
+ }
+ run();
+}
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdio>
+#include <memory>
+#include <mutex>
+#include <queue>
+
+#include <folly/LifoSem.h>
+#include <folly/executors/DrivableExecutor.h>
+#include <folly/executors/ScheduledExecutor.h>
+
+namespace folly {
+ /// A ManualExecutor only does work when you turn the crank, by calling
+ /// run() or indirectly with makeProgress() or waitFor().
+ ///
+ /// The clock for a manual executor starts at 0 and advances only when you
+ /// ask it to. i.e. time is also under manual control.
+ ///
+ /// NB No attempt has been made to make anything other than add and schedule
+ /// threadsafe.
+ class ManualExecutor : public DrivableExecutor,
+ public ScheduledExecutor {
+ public:
+ void add(Func) override;
+
+ /// Do work. Returns the number of functions that were executed (maybe 0).
+ /// Non-blocking, in the sense that we don't wait for work (we can't
+ /// control whether one of the functions blocks).
+ /// This is stable, it will not chase an ever-increasing tail of work.
+ /// This also means, there may be more work available to perform at the
+ /// moment that this returns.
+ size_t run();
+
+ /// Wait for work to do.
+ void wait();
+
+ /// Wait for work to do, and do it.
+ void makeProgress() {
+ wait();
+ run();
+ }
+
+ /// Implements DrivableExecutor
+ void drive() override {
+ makeProgress();
+ }
+
+ /// makeProgress until this Future is ready.
+ template <class F> void waitFor(F const& f) {
+ // TODO(5427828)
+#if 0
+ while (!f.isReady())
+ makeProgress();
+#else
+ while (!f.isReady()) {
+ run();
+ }
+#endif
+
+ }
+
+ void scheduleAt(Func&& f, TimePoint const& t) override {
+ std::lock_guard<std::mutex> lock(lock_);
+ scheduledFuncs_.emplace(t, std::move(f));
+ sem_.post();
+ }
+
+ /// Advance the clock. The clock never advances on its own.
+ /// Advancing the clock causes some work to be done, if work is available
+ /// to do (perhaps newly available because of the advanced clock).
+ /// If dur is <= 0 this is a noop.
+ void advance(Duration const& dur) {
+ advanceTo(now_ + dur);
+ }
+
+ /// Advance the clock to this absolute time. If t is <= now(),
+ /// this is a noop.
+ void advanceTo(TimePoint const& t);
+
+ TimePoint now() override { return now_; }
+
+ /// Flush the function queue. Destroys all stored functions without
+ /// executing them. Returns number of removed functions.
+ std::size_t clear() {
+ std::queue<Func> funcs;
+ std::priority_queue<ScheduledFunc> scheduled_funcs;
+
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ funcs_.swap(funcs);
+ scheduledFuncs_.swap(scheduled_funcs);
+ }
+
+ return funcs.size() + scheduled_funcs.size();
+ }
+
+ private:
+ std::mutex lock_;
+ std::queue<Func> funcs_;
+ LifoSem sem_;
+
+ // helper class to enable ordering of scheduled events in the priority
+ // queue
+ struct ScheduledFunc {
+ TimePoint time;
+ size_t ordinal;
+ Func mutable func;
+
+ ScheduledFunc(TimePoint const& t, Func&& f)
+ : time(t), func(std::move(f))
+ {
+ static size_t seq = 0;
+ ordinal = seq++;
+ }
+
+ bool operator<(ScheduledFunc const& b) const {
+ // Earlier-scheduled things must be *higher* priority
+ // in the max-based std::priority_queue
+ if (time == b.time) {
+ return ordinal > b.ordinal;
+ }
+ return time > b.time;
+ }
+
+ Func&& moveOutFunc() const {
+ return std::move(func);
+ }
+ };
+ std::priority_queue<ScheduledFunc> scheduledFuncs_;
+ TimePoint now_ = TimePoint::min();
+ };
+
+}
#include <folly/executors/ThreadPoolExecutor.h>
-#include <folly/concurrency/GlobalThreadPoolList.h>
+#include <folly/executors/GlobalThreadPoolList.h>
namespace folly {
#include <folly/Executor.h>
#include <folly/Memory.h>
#include <folly/RWSpinLock.h>
-#include <folly/concurrency/GlobalThreadPoolList.h>
+#include <folly/executors/GlobalThreadPoolList.h>
#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/io/async/Request.h>
*/
#include <folly/executors/Async.h>
-#include <folly/futures/ManualExecutor.h>
+#include <folly/executors/ManualExecutor.h>
#include <folly/portability/GTest.h>
using namespace folly;
#include <folly/Baton.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
+#include <folly/executors/InlineExecutor.h>
#include <folly/executors/SerialExecutor.h>
-#include <folly/futures/InlineExecutor.h>
#include <folly/portability/GTest.h>
using namespace std::chrono;
#include <folly/Baton.h>
#include <folly/Optional.h>
-#include <folly/futures/InlineExecutor.h>
+#include <folly/executors/InlineExecutor.h>
#include <folly/futures/Timekeeper.h>
#include <folly/futures/detail/Core.h>
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/futures/InlineExecutor.h>
-
-#include <folly/Indestructible.h>
-
-namespace folly {
-
-InlineExecutor& InlineExecutor::instance() {
- static auto instance = Indestructible<InlineExecutor>{};
- return *instance;
-}
-
-} // namespace folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-#include <folly/Executor.h>
-
-namespace folly {
-
-/// When work is "queued", execute it immediately inline.
-/// Usually when you think you want this, you actually want a
-/// QueuedImmediateExecutor.
-class InlineExecutor : public Executor {
- public:
- static InlineExecutor& instance();
-
- void add(Func f) override {
- f();
- }
-};
-
-} // namespace folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/futures/ManualExecutor.h>
-
-#include <string.h>
-#include <string>
-#include <tuple>
-
-namespace folly {
-
-void ManualExecutor::add(Func callback) {
- std::lock_guard<std::mutex> lock(lock_);
- funcs_.emplace(std::move(callback));
- sem_.post();
-}
-
-size_t ManualExecutor::run() {
- size_t count;
- size_t n;
- Func func;
-
- {
- std::lock_guard<std::mutex> lock(lock_);
-
- while (!scheduledFuncs_.empty()) {
- auto& sf = scheduledFuncs_.top();
- if (sf.time > now_) {
- break;
- }
- funcs_.emplace(sf.moveOutFunc());
- scheduledFuncs_.pop();
- }
-
- n = funcs_.size();
- }
-
- for (count = 0; count < n; count++) {
- {
- std::lock_guard<std::mutex> lock(lock_);
- if (funcs_.empty()) {
- break;
- }
-
- // Balance the semaphore so it doesn't grow without bound
- // if nobody is calling wait().
- // This may fail (with EAGAIN), that's fine.
- sem_.tryWait();
-
- func = std::move(funcs_.front());
- funcs_.pop();
- }
- func();
- }
-
- return count;
-}
-
-void ManualExecutor::wait() {
- while (true) {
- {
- std::lock_guard<std::mutex> lock(lock_);
- if (!funcs_.empty()) {
- break;
- }
- }
-
- sem_.wait();
- }
-}
-
-void ManualExecutor::advanceTo(TimePoint const& t) {
- if (t > now_) {
- now_ = t;
- }
- run();
-}
-
-} // namespace folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <cstdio>
-#include <memory>
-#include <mutex>
-#include <queue>
-
-#include <folly/LifoSem.h>
-#include <folly/executors/DrivableExecutor.h>
-#include <folly/executors/ScheduledExecutor.h>
-
-namespace folly {
- /// A ManualExecutor only does work when you turn the crank, by calling
- /// run() or indirectly with makeProgress() or waitFor().
- ///
- /// The clock for a manual executor starts at 0 and advances only when you
- /// ask it to. i.e. time is also under manual control.
- ///
- /// NB No attempt has been made to make anything other than add and schedule
- /// threadsafe.
- class ManualExecutor : public DrivableExecutor,
- public ScheduledExecutor {
- public:
- void add(Func) override;
-
- /// Do work. Returns the number of functions that were executed (maybe 0).
- /// Non-blocking, in the sense that we don't wait for work (we can't
- /// control whether one of the functions blocks).
- /// This is stable, it will not chase an ever-increasing tail of work.
- /// This also means, there may be more work available to perform at the
- /// moment that this returns.
- size_t run();
-
- /// Wait for work to do.
- void wait();
-
- /// Wait for work to do, and do it.
- void makeProgress() {
- wait();
- run();
- }
-
- /// Implements DrivableExecutor
- void drive() override {
- makeProgress();
- }
-
- /// makeProgress until this Future is ready.
- template <class F> void waitFor(F const& f) {
- // TODO(5427828)
-#if 0
- while (!f.isReady())
- makeProgress();
-#else
- while (!f.isReady()) {
- run();
- }
-#endif
-
- }
-
- void scheduleAt(Func&& f, TimePoint const& t) override {
- std::lock_guard<std::mutex> lock(lock_);
- scheduledFuncs_.emplace(t, std::move(f));
- sem_.post();
- }
-
- /// Advance the clock. The clock never advances on its own.
- /// Advancing the clock causes some work to be done, if work is available
- /// to do (perhaps newly available because of the advanced clock).
- /// If dur is <= 0 this is a noop.
- void advance(Duration const& dur) {
- advanceTo(now_ + dur);
- }
-
- /// Advance the clock to this absolute time. If t is <= now(),
- /// this is a noop.
- void advanceTo(TimePoint const& t);
-
- TimePoint now() override { return now_; }
-
- /// Flush the function queue. Destroys all stored functions without
- /// executing them. Returns number of removed functions.
- std::size_t clear() {
- std::queue<Func> funcs;
- std::priority_queue<ScheduledFunc> scheduled_funcs;
-
- {
- std::lock_guard<std::mutex> lock(lock_);
- funcs_.swap(funcs);
- scheduledFuncs_.swap(scheduled_funcs);
- }
-
- return funcs.size() + scheduled_funcs.size();
- }
-
- private:
- std::mutex lock_;
- std::queue<Func> funcs_;
- LifoSem sem_;
-
- // helper class to enable ordering of scheduled events in the priority
- // queue
- struct ScheduledFunc {
- TimePoint time;
- size_t ordinal;
- Func mutable func;
-
- ScheduledFunc(TimePoint const& t, Func&& f)
- : time(t), func(std::move(f))
- {
- static size_t seq = 0;
- ordinal = seq++;
- }
-
- bool operator<(ScheduledFunc const& b) const {
- // Earlier-scheduled things must be *higher* priority
- // in the max-based std::priority_queue
- if (time == b.time) {
- return ordinal > b.ordinal;
- }
- return time > b.time;
- }
-
- Func&& moveOutFunc() const {
- return std::move(func);
- }
- };
- std::priority_queue<ScheduledFunc> scheduledFuncs_;
- TimePoint now_ = TimePoint::min();
- };
-
-}
#include <folly/Benchmark.h>
#include <folly/Baton.h>
+#include <folly/executors/InlineExecutor.h>
#include <folly/futures/Future.h>
-#include <folly/futures/InlineExecutor.h>
#include <folly/futures/Promise.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/Semaphore.h>
*/
#include <folly/Baton.h>
+#include <folly/executors/InlineExecutor.h>
+#include <folly/executors/ManualExecutor.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/futures/Future.h>
-#include <folly/futures/InlineExecutor.h>
-#include <folly/futures/ManualExecutor.h>
#include <folly/portability/GTest.h>
// TODO(jsedgwick) move this test to executors/test/ once the tested executors
* limitations under the License.
*/
+#include <folly/executors/InlineExecutor.h>
#include <folly/futures/Future.h>
-#include <folly/futures/InlineExecutor.h>
#include <folly/portability/GTest.h>
using namespace folly;
#include <folly/Baton.h>
#include <folly/MPMCQueue.h>
#include <folly/executors/DrivableExecutor.h>
+#include <folly/executors/InlineExecutor.h>
+#include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h>
-#include <folly/futures/InlineExecutor.h>
-#include <folly/futures/ManualExecutor.h>
#include <folly/portability/GTest.h>
using namespace folly;
#include <boost/thread/barrier.hpp>
#include <folly/Conv.h>
+#include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h>
-#include <folly/futures/ManualExecutor.h>
#include <folly/portability/GTest.h>
#include <vector>