From 517dd66ae37425460a7743737c2d27fa66f47eb8 Mon Sep 17 00:00:00 2001 From: Mehdi Amini Date: Sat, 12 Dec 2015 22:55:25 +0000 Subject: [PATCH] Add a C++11 ThreadPool implementation in LLVM This is a very simple implementation of a thread pool using C++11 thread. It accepts any std::function for asynchronous execution. Individual task can be synchronize using the returned future, or the client can block on the full queue completion. In case LLVM is configured with Threading disabled, it falls back to sequential execution using std::async with launch:deferred. This is intended to support parallelism for ThinLTO processing in linker plugin, but is generic enough for any other uses. Differential Revision: http://reviews.llvm.org/D15464 From: Mehdi Amini git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@255444 91177308-0d34-0410-b5e6-96231b3b80d8 --- include/llvm/Support/ThreadPool.h | 95 ++++++++++++++++++++ include/llvm/Support/thread.h | 2 + lib/Support/CMakeLists.txt | 1 + lib/Support/ThreadPool.cpp | 143 ++++++++++++++++++++++++++++++ unittests/Support/CMakeLists.txt | 1 + unittests/Support/ThreadPool.cpp | 91 +++++++++++++++++++ 6 files changed, 333 insertions(+) create mode 100644 include/llvm/Support/ThreadPool.h create mode 100644 lib/Support/ThreadPool.cpp create mode 100644 unittests/Support/ThreadPool.cpp diff --git a/include/llvm/Support/ThreadPool.h b/include/llvm/Support/ThreadPool.h new file mode 100644 index 00000000000..c388c074c15 --- /dev/null +++ b/include/llvm/Support/ThreadPool.h @@ -0,0 +1,95 @@ +//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based thread pool. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_THREAD_POOL_H +#define LLVM_SUPPORT_THREAD_POOL_H + +#include "llvm/Support/thread.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace llvm { + +/// A ThreadPool for asynchronous parallel execution on a defined number of +/// threads. +/// +/// The pool keeps a vector of threads alive, waiting on a condition variable +/// for some work to become available. +class ThreadPool { +public: + using TaskTy = std::function; + + /// Construct a pool with the number of core available on the system (or + /// whatever the value returned by std::thread::hardware_concurrency() is). + ThreadPool(); + + /// Construct a pool of \p ThreadCount threads + ThreadPool(unsigned ThreadCount); + + /// Blocking destructor: the pool will wait for all the threads to complete. + ~ThreadPool(); + + /// Asynchronous submission of a task to the pool. The returned future can be + /// used to wait for the task to finish and is *non-blocking* on destruction. + template + inline std::shared_future async(Function &&F, Args &&... ArgList) { + auto Task = + std::bind(std::forward(F), std::forward(ArgList...)); + return asyncImpl(Task); + } + + /// Asynchronous submission of a task to the pool. The returned future can be + /// used to wait for the task to finish and is *non-blocking* on destruction. + template + inline std::shared_future async(Function &&F) { + return asyncImpl(F); + } + + /// Blocking wait for all the threads to complete and the queue to be empty. + /// It is an error to try to add new tasks while blocking on this call. + void wait(); + +private: + /// Asynchronous submission of a task to the pool. The returned future can be + /// used to wait for the task to finish and is *non-blocking* on destruction. + std::shared_future asyncImpl(TaskTy f); + + /// Threads in flight + std::vector Threads; + + /// Tasks waiting for execution in the pool. + std::queue> Tasks; + + /// Locking and signaling for accessing the Tasks queue. + std::mutex QueueLock; + std::condition_variable QueueCondition; + + /// Locking and signaling for job completion + std::mutex CompletionLock; + std::condition_variable CompletionCondition; + + /// Keep track of the number of thread actually busy + std::atomic ActiveThreads; + + /// Signal for the destruction of the pool, asking thread to exit. + bool EnableFlag; +}; +} + +#endif // LLVM_SUPPORT_THREAD_POOL_H diff --git a/include/llvm/Support/thread.h b/include/llvm/Support/thread.h index 2d1f1b3a3ec..2d130418a57 100644 --- a/include/llvm/Support/thread.h +++ b/include/llvm/Support/thread.h @@ -43,6 +43,8 @@ typedef std::thread thread; #else // !LLVM_ENABLE_THREADS +#include + namespace llvm { struct thread { diff --git a/lib/Support/CMakeLists.txt b/lib/Support/CMakeLists.txt index 784e00e15b9..75b3e89f916 100644 --- a/lib/Support/CMakeLists.txt +++ b/lib/Support/CMakeLists.txt @@ -89,6 +89,7 @@ add_llvm_library(LLVMSupport StringRef.cpp SystemUtils.cpp TargetParser.cpp + ThreadPool.cpp Timer.cpp ToolOutputFile.cpp Triple.cpp diff --git a/lib/Support/ThreadPool.cpp b/lib/Support/ThreadPool.cpp new file mode 100644 index 00000000000..40f1da9ef3f --- /dev/null +++ b/lib/Support/ThreadPool.cpp @@ -0,0 +1,143 @@ +//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file implements a crude C++11 based thread pool. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/ThreadPool.h" + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/raw_ostream.h" + +using namespace llvm; + +#if LLVM_ENABLE_THREADS + +// Default to std::thread::hardware_concurrency +ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {} + +ThreadPool::ThreadPool(unsigned ThreadCount) + : ActiveThreads(0), EnableFlag(true) { + // Create ThreadCount threads that will loop forever, wait on QueueCondition + // for tasks to be queued or the Pool to be destroyed. + Threads.reserve(ThreadCount); + for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) { + Threads.emplace_back([&] { + while (true) { + std::packaged_task Task; + { + std::unique_lock LockGuard(QueueLock); + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + { + ++ActiveThreads; + std::unique_lock LockGuard(CompletionLock); + } + Task = std::move(Tasks.front()); + Tasks.pop(); + } + // Run the task we just grabbed + Task(); + + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::unique_lock LockGuard(CompletionLock); + --ActiveThreads; + } + + // Notify task completion, in case someone waits on ThreadPool::wait() + CompletionCondition.notify_all(); + } + }); + } +} + +void ThreadPool::wait() { + // Wait for all threads to complete and the queue to be empty + std::unique_lock LockGuard(CompletionLock); + CompletionCondition.wait(LockGuard, + [&] { return Tasks.empty() && !ActiveThreads; }); +} + +std::shared_future ThreadPool::asyncImpl(TaskTy Task) { + /// Wrap the Task in a packaged_task to return a future object. + std::packaged_task PackagedTask(std::move(Task)); + std::future Future = PackagedTask.get_future(); + { + // Lock the queue and push the new task + std::unique_lock LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the pool + assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + + Tasks.push(std::move(PackagedTask)); + } + QueueCondition.notify_one(); + return Future.share(); +} + +// The destructor joins all threads, waiting for completion. +ThreadPool::~ThreadPool() { + { + std::unique_lock LockGuard(QueueLock); + EnableFlag = false; + } + QueueCondition.notify_all(); + for (auto &Worker : Threads) + Worker.join(); +} + +#else // LLVM_ENABLE_THREADS Disabled + +ThreadPool::ThreadPool() : ThreadPool(0) {} + +// No threads are launched, issue a warning if ThreadCount is not 0 +ThreadPool::ThreadPool(unsigned ThreadCount) + : ActiveThreads(0), EnableFlag(true) { + if (ThreadCount) { + errs() << "Warning: request a ThreadPool with " << ThreadCount + << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; + } +} + +void ThreadPool::wait() { + // Sequential implementation running the tasks + while (!Tasks.empty()) { + auto Task = std::move(Tasks.front()); + Tasks.pop(); + Task(); + } +} + +std::shared_future ThreadPool::asyncImpl(TaskTy Task) { + // Get a Future with launch::deferred execution using std::async + auto Future = std::async(std::launch::deferred, std::move(Task)).share(); + // Wrap the future so that both ThreadPool::wait() can operate and the + // returned future can be sync'ed on. + std::packaged_task PackagedTask([Future]() { Future.get(); }); + Tasks.push(std::move(PackagedTask)); + return Future; +} + +ThreadPool::~ThreadPool() { + EnableFlag = false; + wait(); +} + +#endif diff --git a/unittests/Support/CMakeLists.txt b/unittests/Support/CMakeLists.txt index fd8324c836d..9bd685759ed 100644 --- a/unittests/Support/CMakeLists.txt +++ b/unittests/Support/CMakeLists.txt @@ -41,6 +41,7 @@ add_llvm_unittest(SupportTests SwapByteOrderTest.cpp TargetRegistry.cpp ThreadLocalTest.cpp + ThreadPool.cpp TimeValueTest.cpp TrailingObjectsTest.cpp UnicodeTest.cpp diff --git a/unittests/Support/ThreadPool.cpp b/unittests/Support/ThreadPool.cpp new file mode 100644 index 00000000000..d36341e425d --- /dev/null +++ b/unittests/Support/ThreadPool.cpp @@ -0,0 +1,91 @@ +//========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/ThreadPool.h" + +#include "llvm/ADT/STLExtras.h" + +#include "gtest/gtest.h" + +using namespace llvm; +using namespace std::chrono; + +/// Try best to make this thread not progress faster than the main thread +static void yield() { +#ifdef LLVM_ENABLE_THREADS + std::this_thread::yield(); +#endif + std::this_thread::sleep_for(milliseconds(200)); +#ifdef LLVM_ENABLE_THREADS + std::this_thread::yield(); +#endif +} + +TEST(ThreadPoolTest, AsyncBarrier) { + // test that async & barrier work together properly. + + std::atomic_int checked_in{0}; + + ThreadPool Pool; + for (size_t i = 0; i < 5; ++i) { + Pool.async([&checked_in, i] { + yield(); + ++checked_in; + }); + } + ASSERT_EQ(0, checked_in); + Pool.wait(); + ASSERT_EQ(5, checked_in); +} + +TEST(ThreadPoolTest, Async) { + ThreadPool Pool; + std::atomic_int i{0}; + // sleep here just to ensure that the not-equal is correct. + Pool.async([&i] { + yield(); + ++i; + }); + Pool.async([&i] { ++i; }); + ASSERT_NE(2, i.load()); + Pool.wait(); + ASSERT_EQ(2, i.load()); +} + +TEST(ThreadPoolTest, GetFuture) { + ThreadPool Pool; + std::atomic_int i{0}; + // sleep here just to ensure that the not-equal is correct. + Pool.async([&i] { + yield(); + ++i; + }); + // Force the future using get() + Pool.async([&i] { ++i; }).get(); + ASSERT_NE(2, i.load()); + Pool.wait(); + ASSERT_EQ(2, i.load()); +} + +TEST(ThreadPoolTest, PoolDestruction) { + // Test that we are waiting on destruction + std::atomic_int checked_in{0}; + + { + ThreadPool Pool; + for (size_t i = 0; i < 5; ++i) { + Pool.async([&checked_in, i] { + yield(); + ++checked_in; + }); + } + ASSERT_EQ(0, checked_in); + } + ASSERT_EQ(5, checked_in); +} -- 2.34.1