--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+namespace folly { namespace wangle {
+
+template <class T>
+class BlockingQueue {
+ public:
+ virtual ~BlockingQueue() {}
+ virtual void add(T item) = 0;
+ virtual T take() = 0;
+ virtual size_t size() = 0;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
+
+namespace folly { namespace wangle {
+
+const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<BlockingQueue<Task>> taskQueue,
+ std::unique_ptr<ThreadFactory> threadFactory)
+ : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
+ taskQueue_(std::move(taskQueue)) {
+ addThreads(numThreads);
+ CHECK(threadList_.get().size() == numThreads);
+}
+
+CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
+ stop();
+ CHECK(threadsToStop_ == 0);
+}
+
+void CPUThreadPoolExecutor::add(Func func) {
+ // TODO handle enqueue failure, here and in other add() callsites
+ taskQueue_->add(Task(std::move(func)));
+}
+
+void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
+ while (1) {
+ // TODO expiration / codel
+ auto t = taskQueue_->take();
+ if (UNLIKELY(t.poison)) {
+ CHECK(threadsToStop_-- > 0);
+ stoppedThreads_.add(thread);
+ return;
+ } else {
+ thread->idle = false;
+ try {
+ t.func();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " <<
+ typeid(e).name() << " exception: " << e.what();
+ } catch (...) {
+ LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled non-exception "
+ "object";
+ }
+ thread->idle = true;
+ }
+
+ if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
+ if (--threadsToStop_ >= 0) {
+ stoppedThreads_.add(thread);
+ return;
+ } else {
+ threadsToStop_++;
+ }
+ }
+ }
+}
+
+void CPUThreadPoolExecutor::stopThreads(size_t n) {
+ CHECK(stoppedThreads_.size() == 0);
+ threadsToStop_ = n;
+ for (int i = 0; i < n; i++) {
+ taskQueue_->add(Task());
+ }
+}
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+
+namespace folly { namespace wangle {
+
+class CPUThreadPoolExecutor : public ThreadPoolExecutor {
+ public:
+ struct Task;
+
+ // TODO thread naming, perhaps a required input to ThreadFactories
+ explicit CPUThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<BlockingQueue<Task>> taskQueue =
+ folly::make_unique<LifoSemMPMCQueue<Task>>(
+ CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+ std::unique_ptr<ThreadFactory> threadFactory =
+ folly::make_unique<NamedThreadFactory>("CPUThreadPool"));
+
+ ~CPUThreadPoolExecutor();
+
+ void add(Func func) override;
+
+ struct Task {
+ explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {}
+ Task() : func(nullptr), poison(true) {}
+ Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {}
+ Task(const Task&) = default;
+ Task& operator=(const Task&) = default;
+ Func func;
+ bool poison;
+ // TODO per-task stats, timeouts, expirations
+ };
+
+ static const size_t kDefaultMaxQueueSize;
+
+ private:
+ void threadRun(ThreadPtr thread) override;
+ void stopThreads(size_t n) override;
+
+ std::atomic<size_t> threadsToStop_;
+ std::unique_ptr<BlockingQueue<Task>> taskQueue_;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+
+namespace folly { namespace wangle {
+
+typedef std::function<void()> Func;
+
+class Executor {
+ public:
+ virtual ~Executor() {};
+ virtual void add(Func func) = 0;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <folly/MoveWrapper.h>
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+IOThreadPoolExecutor::IOThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<ThreadFactory> threadFactory)
+ : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
+ nextThread_(0) {
+ addThreads(numThreads);
+ CHECK(threadList_.get().size() == numThreads);
+}
+
+IOThreadPoolExecutor::~IOThreadPoolExecutor() {
+ stop();
+}
+
+void IOThreadPoolExecutor::add(Func func) {
+ RWSpinLock::ReadHolder{&threadListLock_};
+ if (threadList_.get().empty()) {
+ throw std::runtime_error("No threads available");
+ }
+ auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
+ auto ioThread = std::static_pointer_cast<IOThread>(thread);
+
+ auto moveFunc = folly::makeMoveWrapper(std::move(func));
+ auto wrappedFunc = [moveFunc, ioThread] () {
+ (*moveFunc)();
+ ioThread->outstandingTasks--;
+ };
+
+ ioThread->outstandingTasks++;
+ if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
+ ioThread->outstandingTasks--;
+ throw std::runtime_error("Unable to run func in event base thread");
+ }
+}
+
+std::shared_ptr<ThreadPoolExecutor::Thread>
+IOThreadPoolExecutor::makeThread() {
+ return std::make_shared<IOThread>();
+}
+
+void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
+ const auto ioThread = std::static_pointer_cast<IOThread>(thread);
+ while (ioThread->shouldRun) {
+ ioThread->eventBase.loopForever();
+ }
+ if (isJoin_) {
+ while (ioThread->outstandingTasks > 0) {
+ ioThread->eventBase.loopOnce();
+ }
+ }
+ stoppedThreads_.add(ioThread);
+}
+
+void IOThreadPoolExecutor::stopThreads(size_t n) {
+ for (int i = 0; i < n; i++) {
+ const auto ioThread = std::static_pointer_cast<IOThread>(
+ threadList_.get()[i]);
+ ioThread->shouldRun = false;
+ ioThread->eventBase.terminateLoopSoon();
+ }
+}
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly { namespace wangle {
+
+class IOThreadPoolExecutor : public ThreadPoolExecutor {
+ public:
+ explicit IOThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<ThreadFactory> threadFactory =
+ folly::make_unique<NamedThreadFactory>("IOThreadPool"));
+
+ ~IOThreadPoolExecutor();
+
+ void add(Func func) override;
+
+ private:
+ ThreadPtr makeThread() override;
+ void threadRun(ThreadPtr thread) override;
+ void stopThreads(size_t n) override;
+
+ struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
+ IOThread() : shouldRun(true), outstandingTasks(0) {};
+ std::atomic<bool> shouldRun;
+ std::atomic<size_t> outstandingTasks;
+ EventBase eventBase;
+ };
+
+ size_t nextThread_;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/BlockingQueue.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+
+namespace folly { namespace wangle {
+
+template <class T>
+class LifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+ explicit LifoSemMPMCQueue(size_t capacity) : queue_(capacity) {}
+
+ void add(T item) override {
+ if (!queue_.write(std::move(item))) {
+ throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
+ }
+ sem_.post();
+ }
+
+ T take() override {
+ T item;
+ while (!queue_.read(item)) {
+ sem_.wait();
+ }
+ return item;
+ }
+
+ size_t capacity() {
+ return queue_.capacity();
+ }
+
+ size_t size() override {
+ return queue_.size();
+ }
+
+ private:
+ LifoSem sem_;
+ MPMCQueue<T> queue_;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/ThreadFactory.h>
+#include <folly/Conv.h>
+#include <folly/ThreadName.h>
+
+namespace folly { namespace wangle {
+
+class NamedThreadFactory : public ThreadFactory {
+ public:
+ explicit NamedThreadFactory(folly::StringPiece prefix)
+ : prefix_(prefix), suffix_(0) {}
+
+ std::thread newThread(Func&& func) override {
+ auto thread = std::thread(std::move(func));
+ folly::setThreadName(
+ thread.native_handle(),
+ folly::to<std::string>(prefix_, suffix_++));
+ return thread;
+ }
+
+ private:
+ folly::StringPiece prefix_;
+ std::atomic<uint64_t> suffix_;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/Executor.h>
+
+#include <thread>
+
+namespace folly { namespace wangle {
+
+class ThreadFactory {
+ public:
+ virtual ~ThreadFactory() {}
+ virtual std::thread newThread(Func&& func) = 0;
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+
+namespace folly { namespace wangle {
+
+ThreadPoolExecutor::ThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<ThreadFactory> threadFactory)
+ : threadFactory_(std::move(threadFactory)) {}
+
+ThreadPoolExecutor::~ThreadPoolExecutor() {
+ CHECK(threadList_.get().size() == 0);
+}
+
+size_t ThreadPoolExecutor::numThreads() {
+ RWSpinLock::ReadHolder{&threadListLock_};
+ return threadList_.get().size();
+}
+
+void ThreadPoolExecutor::setNumThreads(size_t n) {
+ RWSpinLock::WriteHolder{&threadListLock_};
+ const auto current = threadList_.get().size();
+ if (n > current ) {
+ addThreads(n - current);
+ } else if (n < current) {
+ removeThreads(current - n, true);
+ }
+ CHECK(threadList_.get().size() == n);
+}
+
+void ThreadPoolExecutor::addThreads(size_t n) {
+ for (int i = 0; i < n; i++) {
+ auto thread = makeThread();
+ // TODO need a notion of failing to create the thread
+ // and then handling for that case
+ thread->handle = threadFactory_->newThread(
+ std::bind(&ThreadPoolExecutor::threadRun, this, thread));
+ threadList_.add(thread);
+ }
+}
+
+void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
+ CHECK(n <= threadList_.get().size());
+ CHECK(stoppedThreads_.size() == 0);
+ isJoin_ = isJoin;
+ stopThreads(n);
+ for (int i = 0; i < n; i++) {
+ auto thread = stoppedThreads_.take();
+ thread->handle.join();
+ threadList_.remove(thread);
+ }
+ CHECK(stoppedThreads_.size() == 0);
+}
+
+void ThreadPoolExecutor::stop() {
+ RWSpinLock::WriteHolder{&threadListLock_};
+ removeThreads(threadList_.get().size(), false);
+ CHECK(threadList_.get().size() == 0);
+}
+
+void ThreadPoolExecutor::join() {
+ RWSpinLock::WriteHolder{&threadListLock_};
+ removeThreads(threadList_.get().size(), true);
+ CHECK(threadList_.get().size() == 0);
+}
+
+std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
+
+void ThreadPoolExecutor::StoppedThreadQueue::add(
+ ThreadPoolExecutor::ThreadPtr item) {
+ std::lock_guard<std::mutex> guard(mutex_);
+ queue_.push(std::move(item));
+ sem_.post();
+}
+
+ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
+ while(1) {
+ {
+ std::lock_guard<std::mutex> guard(mutex_);
+ if (queue_.size() > 0) {
+ auto item = std::move(queue_.front());
+ queue_.pop();
+ return item;
+ }
+ }
+ sem_.wait();
+ }
+}
+
+size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
+ std::lock_guard<std::mutex> guard(mutex_);
+ return queue_.size();
+}
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/Executor.h>
+#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
+#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/Memory.h>
+#include <folly/RWSpinLock.h>
+
+#include <algorithm>
+#include <mutex>
+#include <queue>
+
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+class ThreadPoolExecutor : public Executor {
+ public:
+ explicit ThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<ThreadFactory> threadFactory);
+
+ ~ThreadPoolExecutor();
+
+ size_t numThreads();
+ void setNumThreads(size_t numThreads);
+ void stop();
+ void join();
+ // TODO expose stats
+
+ protected:
+ void addThreads(size_t n);
+ void removeThreads(size_t n, bool isJoin);
+
+ struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
+ virtual ~Thread() {}
+ Thread() : id(nextId++), handle(), idle(true) {};
+ static std::atomic<uint64_t> nextId;
+ uint64_t id;
+ std::thread handle;
+ bool idle;
+ // TODO per-thread stats go here
+ };
+
+ typedef std::shared_ptr<Thread> ThreadPtr;
+
+ // The function that will be bound to pool threads
+ virtual void threadRun(ThreadPtr thread) = 0;
+ // Stop n threads and put their Thread structs in the threadsStopped_ queue
+ virtual void stopThreads(size_t n) = 0;
+ // Create a suitable Thread struct
+ virtual ThreadPtr makeThread() {
+ return std::make_shared<Thread>();
+ }
+ // need a stopThread(id) for keepalive feature
+
+ class ThreadList {
+ public:
+ void add(const ThreadPtr& state) {
+ auto it = std::lower_bound(vec_.begin(), vec_.end(), state, compare);
+ vec_.insert(it, state);
+ }
+
+ void remove(const ThreadPtr& state) {
+ auto itPair = std::equal_range(vec_.begin(), vec_.end(), state, compare);
+ CHECK(itPair.first != vec_.end());
+ CHECK(std::next(itPair.first) == itPair.second);
+ vec_.erase(itPair.first);
+ }
+
+ const std::vector<ThreadPtr>& get() const {
+ return vec_;
+ }
+
+ private:
+ static bool compare(const ThreadPtr& ts1, const ThreadPtr& ts2) {
+ return ts1->id < ts2->id;
+ }
+
+ std::vector<ThreadPtr> vec_;
+ };
+
+ class StoppedThreadQueue : public BlockingQueue<ThreadPtr> {
+ public:
+ void add(ThreadPtr item) override;
+ ThreadPtr take() override;
+ size_t size() override;
+
+ private:
+ LifoSem sem_;
+ std::mutex mutex_;
+ std::queue<ThreadPtr> queue_;
+ };
+
+ std::unique_ptr<ThreadFactory> threadFactory_;
+ ThreadList threadList_;
+ RWSpinLock threadListLock_;
+ StoppedThreadQueue stoppedThreads_;
+ std::atomic<bool> isJoin_; // whether the current downsizing is a join
+};
+
+}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+
+template <class TPE>
+static void basic() {
+ // Create and destroy
+ TPE tpe(10);
+}
+
+TEST(ThreadPoolExecutorTest, CPUBasic) {
+ basic<CPUThreadPoolExecutor>();
+}
+
+TEST(IOThreadPoolExecutorTest, IOBasic) {
+ basic<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void resize() {
+ TPE tpe(100);
+ EXPECT_EQ(100, tpe.numThreads());
+ tpe.setNumThreads(50);
+ EXPECT_EQ(50, tpe.numThreads());
+ tpe.setNumThreads(150);
+ EXPECT_EQ(150, tpe.numThreads());
+}
+
+TEST(ThreadPoolExecutorTest, CPUResize) {
+ resize<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOResize) {
+ resize<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void stop() {
+ TPE tpe(10);
+ std::atomic<int> completed(0);
+ auto f = [&](){
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ completed++;
+ };
+ for (int i = 0; i < 1000; i++) {
+ tpe.add(f);
+ }
+ tpe.stop();
+ EXPECT_GT(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUStop) {
+ stop<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOStop) {
+ stop<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void join() {
+ TPE tpe(10);
+ std::atomic<int> completed(0);
+ auto f = [&](){
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ completed++;
+ };
+ for (int i = 0; i < 1000; i++) {
+ tpe.add(f);
+ }
+ tpe.join();
+ EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUJoin) {
+ join<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOJoin) {
+ join<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void resizeUnderLoad() {
+ TPE tpe(10);
+ std::atomic<int> completed(0);
+ auto f = [&](){
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ completed++;
+ };
+ for (int i = 0; i < 1000; i++) {
+ tpe.add(f);
+ }
+ tpe.setNumThreads(5);
+ tpe.setNumThreads(15);
+ tpe.join();
+ EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
+ resizeUnderLoad<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
+ resizeUnderLoad<IOThreadPoolExecutor>();
+}