experimental/wangle/concurrent/Codel.h \
experimental/wangle/concurrent/CPUThreadPoolExecutor.h \
experimental/wangle/concurrent/FutureExecutor.h \
+ experimental/wangle/concurrent/GlobalExecutor.h \
+ experimental/wangle/concurrent/IOExecutor.h \
experimental/wangle/concurrent/IOThreadPoolExecutor.h \
experimental/wangle/concurrent/LifoSemMPMCQueue.h \
experimental/wangle/concurrent/NamedThreadFactory.h \
experimental/TestUtil.cpp \
experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp \
experimental/wangle/concurrent/Codel.cpp \
+ experimental/wangle/concurrent/GlobalExecutor.cpp \
+ experimental/wangle/concurrent/IOExecutor.cpp \
experimental/wangle/concurrent/IOThreadPoolExecutor.cpp \
experimental/wangle/concurrent/ThreadPoolExecutor.cpp \
experimental/wangle/ConnectionManager.cpp \
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/Singleton.h>
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+
+using namespace folly;
+using namespace folly::wangle;
+
+namespace {
+
+Singleton<IOThreadPoolExecutor> globalIOThreadPoolSingleton(
+ "GlobalIOThreadPool",
+ [](){
+ return new IOThreadPoolExecutor(
+ sysconf(_SC_NPROCESSORS_ONLN),
+ std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
+ });
+
+}
+
+namespace folly { namespace wangle {
+
+IOExecutor* getIOExecutor() {
+ auto singleton = IOExecutor::getSingleton();
+ auto executor = singleton->load();
+ while (!executor) {
+ IOExecutor* nullIOExecutor = nullptr;
+ singleton->compare_exchange_strong(
+ nullIOExecutor,
+ Singleton<IOThreadPoolExecutor>::get("GlobalIOThreadPool"));
+ executor = singleton->load();
+ }
+ return executor;
+}
+
+void setIOExecutor(IOExecutor* executor) {
+ IOExecutor::getSingleton()->store(executor);
+}
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly { namespace wangle {
+
+class IOExecutor;
+
+// Retrieve the global IOExecutor. If there is none, a default
+// IOThreadPoolExecutor will be constructed and returned.
+IOExecutor* getIOExecutor();
+
+// Set an IOExecutor to be the global IOExecutor which will be returned by
+// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves
+// as global when they are destructed.
+void setIOExecutor(IOExecutor* executor);
+
+}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
+
+#include <folly/experimental/Singleton.h>
+#include <folly/experimental/wangle/concurrent/GlobalExecutor.h>
+
+using folly::Singleton;
+using folly::wangle::IOExecutor;
+
+namespace {
+
+Singleton<std::atomic<IOExecutor*>> globalIOExecutorSingleton(
+ "GlobalIOExecutor",
+ [](){
+ return new std::atomic<IOExecutor*>(nullptr);
+ });
+
+}
+
+namespace folly { namespace wangle {
+
+IOExecutor::~IOExecutor() {
+ auto thisCopy = this;
+ try {
+ getSingleton()->compare_exchange_strong(thisCopy, nullptr);
+ } catch (const std::runtime_error& e) {
+ // The global IOExecutor singleton was already destructed so doesn't need to
+ // be restored. Ignore.
+ }
+}
+
+std::atomic<IOExecutor*>* IOExecutor::getSingleton() {
+ return Singleton<std::atomic<IOExecutor*>>::get("GlobalIOExecutor");
+}
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <folly/Executor.h>
+
+namespace folly {
+class EventBase;
+}
+
+namespace folly { namespace wangle {
+
+// An IOExecutor is an executor that operates on at least one EventBase. One of
+// these EventBases should be accessible via getEventBase(). The event base
+// returned by a call to getEventBase() is implementation dependent.
+//
+// Note that IOExecutors don't necessarily loop on the base themselves - for
+// instance, EventBase itself is an IOExecutor but doesn't drive itself.
+//
+// Implementations of IOExecutor are eligible to become the global IO executor,
+// returned on every call to getIOExecutor(), via setIOExecutor().
+// These functions are declared in GlobalExecutor.h
+//
+// If getIOExecutor is called and none has been set, a default global
+// IOThreadPoolExecutor will be created and returned.
+class IOExecutor : public virtual Executor {
+ public:
+ virtual ~IOExecutor();
+ virtual EventBase* getEventBase() = 0;
+
+ private:
+ static std::atomic<IOExecutor*>* getSingleton();
+ friend IOExecutor* getIOExecutor();
+ friend void setIOExecutor(IOExecutor* executor);
+};
+
+}}
if (threadList_.get().empty()) {
throw std::runtime_error("No threads available");
}
- auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
- auto ioThread = std::static_pointer_cast<IOThread>(thread);
+ auto ioThread = pickThread();
auto moveTask = folly::makeMoveWrapper(
Task(std::move(func), expiration, std::move(expireCallback)));
}
}
+std::shared_ptr<IOThreadPoolExecutor::IOThread>
+IOThreadPoolExecutor::pickThread() {
+ if (*thisThread_) {
+ return *thisThread_;
+ }
+ auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
+ return std::static_pointer_cast<IOThread>(thread);
+}
+
+EventBase* IOThreadPoolExecutor::getEventBase() {
+ return pickThread()->eventBase;
+}
+
std::shared_ptr<ThreadPoolExecutor::Thread>
IOThreadPoolExecutor::makeThread() {
return std::make_shared<IOThread>(this);
const auto ioThread = std::static_pointer_cast<IOThread>(thread);
ioThread->eventBase =
folly::EventBaseManager::get()->getEventBase();
+ thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
ioThread->eventBase->runBeforeLoop(idler);
*/
#pragma once
+
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
#include <folly/io/async/EventBase.h>
// N.B. For this thread pool, stop() behaves like join() because outstanding
// tasks belong to the event base and will be executed upon its destruction.
-class IOThreadPoolExecutor : public ThreadPoolExecutor {
+class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
public:
explicit IOThreadPoolExecutor(
size_t numThreads,
std::chrono::milliseconds expiration,
Func expireCallback = nullptr) override;
- private:
- ThreadPtr makeThread() override;
- void threadRun(ThreadPtr thread) override;
- void stopThreads(size_t n) override;
- uint64_t getPendingTaskCount() override;
+ EventBase* getEventBase() override;
+ private:
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
IOThread(IOThreadPoolExecutor* pool)
: Thread(pool),
EventBase* eventBase;
};
+ ThreadPtr makeThread() override;
+ std::shared_ptr<IOThread> pickThread();
+ void threadRun(ThreadPtr thread) override;
+ void stopThreads(size_t n) override;
+ uint64_t getPendingTaskCount() override;
+
size_t nextThread_;
+ ThreadLocal<std::shared_ptr<IOThread>> thisThread_;
};
}} // folly::wangle
namespace folly { namespace wangle {
-class ThreadPoolExecutor : public Executor {
+class ThreadPoolExecutor : public virtual Executor {
public:
explicit ThreadPoolExecutor(
size_t numThreads,
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <folly/experimental/wangle/concurrent/GlobalExecutor.h>
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
+
+using namespace folly::wangle;
+
+TEST(GlobalExecutorTest, GlobalIOExecutor) {
+ class DummyExecutor : public IOExecutor {
+ public:
+ void add(folly::Func f) override {
+ count++;
+ }
+ folly::EventBase* getEventBase() override {
+ return nullptr;
+ }
+ int count{0};
+ };
+
+ auto f = [](){};
+
+ // Don't explode, we should create the default global IOExecutor lazily here.
+ getIOExecutor()->add(f);
+
+ {
+ DummyExecutor dummy;
+ setIOExecutor(&dummy);
+ getIOExecutor()->add(f);
+ // Make sure we were properly installed.
+ EXPECT_EQ(1, dummy.count);
+ }
+
+ // Don't explode, we should restore the default global IOExecutor when dummy
+ // is destructed.
+ getIOExecutor()->add(f);
+}