From: James Sedgwick Date: Fri, 20 Oct 2017 20:30:03 +0000 (-0700) Subject: move executor task queues and thread factories into subdirectories X-Git-Tag: v2017.10.23.00~5 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=0231b21273f809fda5753589b09e7f61c24cdac6;p=folly.git move executor task queues and thread factories into subdirectories Summary: as title, see moves (Note: this ignores all push blocking failures!) Reviewed By: mzlee Differential Revision: D6112001 fbshipit-source-id: 1eb10b44ae8ee1f90a10e05c29e48c99d824afa5 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 344dc3ae..3fc4644b 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -83,7 +83,6 @@ nobase_follyinclude_HEADERS = \ detail/TurnSequencer.h \ detail/UncaughtExceptionCounter.h \ executors/Async.h \ - executors/BlockingQueue.h \ executors/CPUThreadPoolExecutor.h \ executors/Codel.h \ executors/DrivableExecutor.h \ @@ -93,17 +92,18 @@ nobase_follyinclude_HEADERS = \ executors/IOExecutor.h \ executors/IOObjectCache.h \ executors/IOThreadPoolExecutor.h \ - executors/LifoSemMPMCQueue.h \ - executors/NamedThreadFactory.h \ executors/NotificationQueueExecutor.h \ - executors/PriorityLifoSemMPMCQueue.h \ - executors/PriorityThreadFactory.h \ executors/ScheduledExecutor.h \ executors/SerialExecutor.h \ - executors/ThreadFactory.h \ executors/ThreadPoolExecutor.h \ executors/ThreadedExecutor.h \ - executors/UnboundedBlockingQueue.h \ + executors/task_queue/BlockingQueue.h \ + executors/task_queue/LifoSemMPMCQueue.h \ + executors/task_queue/PriorityLifoSemMPMCQueue.h \ + executors/task_queue/UnboundedBlockingQueue.h \ + executors/thread_factory/NamedThreadFactory.h \ + executors/thread_factory/PriorityThreadFactory.h \ + executors/thread_factory/ThreadFactory.h \ functional/ApplyTuple.h \ Demangle.h \ DiscriminatedPtr.h \ diff --git a/folly/executors/BlockingQueue.h b/folly/executors/BlockingQueue.h deleted file mode 100644 index 49280954..00000000 --- a/folly/executors/BlockingQueue.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include - -namespace folly { - -// Some queue implementations (for example, LifoSemMPMCQueue or -// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and -// non-blocking (THROW) behaviors. -enum class QueueBehaviorIfFull { THROW, BLOCK }; - -class QueueFullException : public std::runtime_error { - using std::runtime_error::runtime_error; // Inherit constructors. -}; - -template -class BlockingQueue { - public: - virtual ~BlockingQueue() = default; - virtual void add(T item) = 0; - virtual void addWithPriority(T item, int8_t /* priority */) { - add(std::move(item)); - } - virtual uint8_t getNumPriorities() { - return 1; - } - virtual T take() = 0; - virtual size_t size() = 0; -}; - -} // namespace folly diff --git a/folly/executors/CPUThreadPoolExecutor.cpp b/folly/executors/CPUThreadPoolExecutor.cpp index 37fd4441..3c50a299 100644 --- a/folly/executors/CPUThreadPoolExecutor.cpp +++ b/folly/executors/CPUThreadPoolExecutor.cpp @@ -15,7 +15,7 @@ */ #include -#include +#include namespace folly { diff --git a/folly/executors/LifoSemMPMCQueue.h b/folly/executors/LifoSemMPMCQueue.h deleted file mode 100644 index 3a16da27..00000000 --- a/folly/executors/LifoSemMPMCQueue.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -namespace folly { - -template -class LifoSemMPMCQueue : public BlockingQueue { - public: - // Note: The queue pre-allocates all memory for max_capacity - explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {} - - void add(T item) override { - switch (kBehavior) { // static - case QueueBehaviorIfFull::THROW: - if (!queue_.write(std::move(item))) { - throw QueueFullException("LifoSemMPMCQueue full, can't add item"); - } - break; - case QueueBehaviorIfFull::BLOCK: - queue_.blockingWrite(std::move(item)); - break; - } - sem_.post(); - } - - T take() override { - T item; - while (!queue_.readIfNotEmpty(item)) { - sem_.wait(); - } - return item; - } - - size_t capacity() { - return queue_.capacity(); - } - - size_t size() override { - return queue_.size(); - } - - private: - folly::LifoSem sem_; - folly::MPMCQueue queue_; -}; - -} // namespace folly diff --git a/folly/executors/NamedThreadFactory.h b/folly/executors/NamedThreadFactory.h deleted file mode 100644 index 856a5295..00000000 --- a/folly/executors/NamedThreadFactory.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include -#include -#include -#include - -namespace folly { - -class NamedThreadFactory : public ThreadFactory { - public: - explicit NamedThreadFactory(folly::StringPiece prefix) - : prefix_(prefix.str()), suffix_(0) {} - - std::thread newThread(Func&& func) override { - auto name = folly::to(prefix_, suffix_++); - return std::thread( - [ func = std::move(func), name = std::move(name) ]() mutable { - folly::setThreadName(name); - func(); - }); - } - - void setNamePrefix(folly::StringPiece prefix) { - prefix_ = prefix.str(); - } - - std::string getNamePrefix() { - return prefix_; - } - - private: - std::string prefix_; - std::atomic suffix_; -}; - -} // namespace folly diff --git a/folly/executors/PriorityLifoSemMPMCQueue.h b/folly/executors/PriorityLifoSemMPMCQueue.h deleted file mode 100644 index 797287c0..00000000 --- a/folly/executors/PriorityLifoSemMPMCQueue.h +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include - -namespace folly { - -template -class PriorityLifoSemMPMCQueue : public BlockingQueue { - public: - // Note A: The queue pre-allocates all memory for max_capacity - // Note B: To use folly::Executor::*_PRI, for numPriorities == 2 - // MID_PRI and HI_PRI are treated at the same priority level. - PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) { - queues_.reserve(numPriorities); - for (int8_t i = 0; i < numPriorities; i++) { - queues_.emplace_back(max_capacity); - } - } - - uint8_t getNumPriorities() override { - return queues_.size(); - } - - // Add at medium priority by default - void add(T item) override { - addWithPriority(std::move(item), folly::Executor::MID_PRI); - } - - void addWithPriority(T item, int8_t priority) override { - int mid = getNumPriorities() / 2; - size_t queue = priority < 0 - ? std::max(0, mid + priority) - : std::min(getNumPriorities() - 1, mid + priority); - CHECK_LT(queue, queues_.size()); - switch (kBehavior) { // static - case QueueBehaviorIfFull::THROW: - if (!queues_[queue].write(std::move(item))) { - throw QueueFullException("LifoSemMPMCQueue full, can't add item"); - } - break; - case QueueBehaviorIfFull::BLOCK: - queues_[queue].blockingWrite(std::move(item)); - break; - } - sem_.post(); - } - - T take() override { - T item; - while (true) { - if (nonBlockingTake(item)) { - return item; - } - sem_.wait(); - } - } - - bool nonBlockingTake(T& item) { - for (auto it = queues_.rbegin(); it != queues_.rend(); it++) { - if (it->readIfNotEmpty(item)) { - return true; - } - } - return false; - } - - size_t size() override { - size_t size = 0; - for (auto& q : queues_) { - size += q.size(); - } - return size; - } - - size_t sizeGuess() const { - size_t size = 0; - for (auto& q : queues_) { - size += q.sizeGuess(); - } - return size; - } - - private: - folly::LifoSem sem_; - std::vector> queues_; -}; - -} // namespace folly diff --git a/folly/executors/PriorityThreadFactory.h b/folly/executors/PriorityThreadFactory.h deleted file mode 100644 index ed46dd35..00000000 --- a/folly/executors/PriorityThreadFactory.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include - -namespace folly { - -/** - * A ThreadFactory that sets nice values for each thread. The main - * use case for this class is if there are multiple - * CPUThreadPoolExecutors in a single process, or between multiple - * processes, where some should have a higher priority than the others. - * - * Note that per-thread nice values are not POSIX standard, but both - * pthreads and linux support per-thread nice. The default linux - * scheduler uses these values to do smart thread prioritization. - * sched_priority function calls only affect real-time schedulers. - */ -class PriorityThreadFactory : public ThreadFactory { - public: - explicit PriorityThreadFactory( - std::shared_ptr factory, - int priority) - : factory_(std::move(factory)), priority_(priority) {} - - std::thread newThread(Func&& func) override { - int priority = priority_; - return factory_->newThread([ priority, func = std::move(func) ]() mutable { - if (setpriority(PRIO_PROCESS, 0, priority) != 0) { - LOG(ERROR) << "setpriority failed (are you root?) with error " << errno, - strerror(errno); - } - func(); - }); - } - - private: - std::shared_ptr factory_; - int priority_; -}; - -} // folly diff --git a/folly/executors/ThreadFactory.h b/folly/executors/ThreadFactory.h deleted file mode 100644 index 0af86322..00000000 --- a/folly/executors/ThreadFactory.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once -#include - -#include - -namespace folly { - -class ThreadFactory { - public: - virtual ~ThreadFactory() = default; - virtual std::thread newThread(Func&& func) = 0; -}; - -} // namespace folly diff --git a/folly/executors/ThreadPoolExecutor.h b/folly/executors/ThreadPoolExecutor.h index 3de9ec59..09ff14fd 100644 --- a/folly/executors/ThreadPoolExecutor.h +++ b/folly/executors/ThreadPoolExecutor.h @@ -20,8 +20,8 @@ #include #include #include -#include -#include +#include +#include #include #include diff --git a/folly/executors/ThreadedExecutor.cpp b/folly/executors/ThreadedExecutor.cpp index d75e82b6..e19ecf4f 100644 --- a/folly/executors/ThreadedExecutor.cpp +++ b/folly/executors/ThreadedExecutor.cpp @@ -21,7 +21,7 @@ #include #include -#include +#include namespace folly { diff --git a/folly/executors/ThreadedExecutor.h b/folly/executors/ThreadedExecutor.h index 81c5fca8..94178987 100644 --- a/folly/executors/ThreadedExecutor.h +++ b/folly/executors/ThreadedExecutor.h @@ -25,7 +25,7 @@ #include #include -#include +#include namespace folly { diff --git a/folly/executors/UnboundedBlockingQueue.h b/folly/executors/UnboundedBlockingQueue.h deleted file mode 100644 index 3fb09b3f..00000000 --- a/folly/executors/UnboundedBlockingQueue.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include - -namespace folly { - -// Warning: this is effectively just a std::deque wrapped in a single mutex -// We are aiming to add a more performant concurrent unbounded queue in the -// future, but this class is available if you must have an unbounded queue -// and can tolerate any contention. -template -class UnboundedBlockingQueue : public BlockingQueue { - public: - virtual ~UnboundedBlockingQueue() {} - - void add(T item) override { - queue_.wlock()->push(std::move(item)); - sem_.post(); - } - - T take() override { - while (true) { - { - auto ulockedQueue = queue_.ulock(); - if (!ulockedQueue->empty()) { - auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite(); - T item = std::move(wlockedQueue->front()); - wlockedQueue->pop(); - return item; - } - } - sem_.wait(); - } - } - - size_t size() override { - return queue_.rlock()->size(); - } - - private: - LifoSem sem_; - Synchronized> queue_; -}; - -} // namespace folly diff --git a/folly/executors/task_queue/BlockingQueue.h b/folly/executors/task_queue/BlockingQueue.h new file mode 100644 index 00000000..49280954 --- /dev/null +++ b/folly/executors/task_queue/BlockingQueue.h @@ -0,0 +1,50 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +namespace folly { + +// Some queue implementations (for example, LifoSemMPMCQueue or +// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and +// non-blocking (THROW) behaviors. +enum class QueueBehaviorIfFull { THROW, BLOCK }; + +class QueueFullException : public std::runtime_error { + using std::runtime_error::runtime_error; // Inherit constructors. +}; + +template +class BlockingQueue { + public: + virtual ~BlockingQueue() = default; + virtual void add(T item) = 0; + virtual void addWithPriority(T item, int8_t /* priority */) { + add(std::move(item)); + } + virtual uint8_t getNumPriorities() { + return 1; + } + virtual T take() = 0; + virtual size_t size() = 0; +}; + +} // namespace folly diff --git a/folly/executors/task_queue/LifoSemMPMCQueue.h b/folly/executors/task_queue/LifoSemMPMCQueue.h new file mode 100644 index 00000000..68a86061 --- /dev/null +++ b/folly/executors/task_queue/LifoSemMPMCQueue.h @@ -0,0 +1,66 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace folly { + +template +class LifoSemMPMCQueue : public BlockingQueue { + public: + // Note: The queue pre-allocates all memory for max_capacity + explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {} + + void add(T item) override { + switch (kBehavior) { // static + case QueueBehaviorIfFull::THROW: + if (!queue_.write(std::move(item))) { + throw QueueFullException("LifoSemMPMCQueue full, can't add item"); + } + break; + case QueueBehaviorIfFull::BLOCK: + queue_.blockingWrite(std::move(item)); + break; + } + sem_.post(); + } + + T take() override { + T item; + while (!queue_.readIfNotEmpty(item)) { + sem_.wait(); + } + return item; + } + + size_t capacity() { + return queue_.capacity(); + } + + size_t size() override { + return queue_.size(); + } + + private: + folly::LifoSem sem_; + folly::MPMCQueue queue_; +}; + +} // namespace folly diff --git a/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h b/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h new file mode 100644 index 00000000..3242628d --- /dev/null +++ b/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h @@ -0,0 +1,107 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { + +template +class PriorityLifoSemMPMCQueue : public BlockingQueue { + public: + // Note A: The queue pre-allocates all memory for max_capacity + // Note B: To use folly::Executor::*_PRI, for numPriorities == 2 + // MID_PRI and HI_PRI are treated at the same priority level. + PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) { + queues_.reserve(numPriorities); + for (int8_t i = 0; i < numPriorities; i++) { + queues_.emplace_back(max_capacity); + } + } + + uint8_t getNumPriorities() override { + return queues_.size(); + } + + // Add at medium priority by default + void add(T item) override { + addWithPriority(std::move(item), folly::Executor::MID_PRI); + } + + void addWithPriority(T item, int8_t priority) override { + int mid = getNumPriorities() / 2; + size_t queue = priority < 0 + ? std::max(0, mid + priority) + : std::min(getNumPriorities() - 1, mid + priority); + CHECK_LT(queue, queues_.size()); + switch (kBehavior) { // static + case QueueBehaviorIfFull::THROW: + if (!queues_[queue].write(std::move(item))) { + throw QueueFullException("LifoSemMPMCQueue full, can't add item"); + } + break; + case QueueBehaviorIfFull::BLOCK: + queues_[queue].blockingWrite(std::move(item)); + break; + } + sem_.post(); + } + + T take() override { + T item; + while (true) { + if (nonBlockingTake(item)) { + return item; + } + sem_.wait(); + } + } + + bool nonBlockingTake(T& item) { + for (auto it = queues_.rbegin(); it != queues_.rend(); it++) { + if (it->readIfNotEmpty(item)) { + return true; + } + } + return false; + } + + size_t size() override { + size_t size = 0; + for (auto& q : queues_) { + size += q.size(); + } + return size; + } + + size_t sizeGuess() const { + size_t size = 0; + for (auto& q : queues_) { + size += q.sizeGuess(); + } + return size; + } + + private: + folly::LifoSem sem_; + std::vector> queues_; +}; + +} // namespace folly diff --git a/folly/executors/task_queue/UnboundedBlockingQueue.h b/folly/executors/task_queue/UnboundedBlockingQueue.h new file mode 100644 index 00000000..2f29fbfb --- /dev/null +++ b/folly/executors/task_queue/UnboundedBlockingQueue.h @@ -0,0 +1,64 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { + +// Warning: this is effectively just a std::deque wrapped in a single mutex +// We are aiming to add a more performant concurrent unbounded queue in the +// future, but this class is available if you must have an unbounded queue +// and can tolerate any contention. +template +class UnboundedBlockingQueue : public BlockingQueue { + public: + virtual ~UnboundedBlockingQueue() {} + + void add(T item) override { + queue_.wlock()->push(std::move(item)); + sem_.post(); + } + + T take() override { + while (true) { + { + auto ulockedQueue = queue_.ulock(); + if (!ulockedQueue->empty()) { + auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite(); + T item = std::move(wlockedQueue->front()); + wlockedQueue->pop(); + return item; + } + } + sem_.wait(); + } + } + + size_t size() override { + return queue_.rlock()->size(); + } + + private: + LifoSem sem_; + Synchronized> queue_; +}; + +} // namespace folly diff --git a/folly/executors/task_queue/test/UnboundedBlockingQueueTest.cpp b/folly/executors/task_queue/test/UnboundedBlockingQueueTest.cpp new file mode 100644 index 00000000..06de12ac --- /dev/null +++ b/folly/executors/task_queue/test/UnboundedBlockingQueueTest.cpp @@ -0,0 +1,51 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +using namespace folly; + +TEST(UnboundedQueuee, push_pop) { + UnboundedBlockingQueue q; + q.add(42); + EXPECT_EQ(42, q.take()); +} +TEST(UnboundedBlockingQueue, size) { + UnboundedBlockingQueue q; + EXPECT_EQ(0, q.size()); + q.add(42); + EXPECT_EQ(1, q.size()); + q.take(); + EXPECT_EQ(0, q.size()); +} + +TEST(UnboundedBlockingQueue, concurrent_push_pop) { + UnboundedBlockingQueue q; + Baton<> b1, b2; + std::thread t([&] { + b1.post(); + EXPECT_EQ(42, q.take()); + EXPECT_EQ(0, q.size()); + b2.post(); + }); + b1.wait(); + q.add(42); + b2.wait(); + EXPECT_EQ(0, q.size()); + t.join(); +} diff --git a/folly/executors/test/ThreadPoolExecutorTest.cpp b/folly/executors/test/ThreadPoolExecutorTest.cpp index 40b4af15..fd96b9f1 100644 --- a/folly/executors/test/ThreadPoolExecutorTest.cpp +++ b/folly/executors/test/ThreadPoolExecutorTest.cpp @@ -19,9 +19,9 @@ #include #include #include -#include -#include #include +#include +#include #include using namespace folly; diff --git a/folly/executors/test/UnboundedBlockingQueueTest.cpp b/folly/executors/test/UnboundedBlockingQueueTest.cpp deleted file mode 100644 index 203c1eb5..00000000 --- a/folly/executors/test/UnboundedBlockingQueueTest.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2017-present Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include -#include - -using namespace folly; - -TEST(UnboundedQueuee, push_pop) { - UnboundedBlockingQueue q; - q.add(42); - EXPECT_EQ(42, q.take()); -} -TEST(UnboundedBlockingQueue, size) { - UnboundedBlockingQueue q; - EXPECT_EQ(0, q.size()); - q.add(42); - EXPECT_EQ(1, q.size()); - q.take(); - EXPECT_EQ(0, q.size()); -} - -TEST(UnboundedBlockingQueue, concurrent_push_pop) { - UnboundedBlockingQueue q; - Baton<> b1, b2; - std::thread t([&] { - b1.post(); - EXPECT_EQ(42, q.take()); - EXPECT_EQ(0, q.size()); - b2.post(); - }); - b1.wait(); - q.add(42); - b2.wait(); - EXPECT_EQ(0, q.size()); - t.join(); -} diff --git a/folly/executors/thread_factory/NamedThreadFactory.h b/folly/executors/thread_factory/NamedThreadFactory.h new file mode 100644 index 00000000..2b34be9e --- /dev/null +++ b/folly/executors/thread_factory/NamedThreadFactory.h @@ -0,0 +1,57 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +namespace folly { + +class NamedThreadFactory : public ThreadFactory { + public: + explicit NamedThreadFactory(folly::StringPiece prefix) + : prefix_(prefix.str()), suffix_(0) {} + + std::thread newThread(Func&& func) override { + auto name = folly::to(prefix_, suffix_++); + return std::thread( + [ func = std::move(func), name = std::move(name) ]() mutable { + folly::setThreadName(name); + func(); + }); + } + + void setNamePrefix(folly::StringPiece prefix) { + prefix_ = prefix.str(); + } + + std::string getNamePrefix() { + return prefix_; + } + + private: + std::string prefix_; + std::atomic suffix_; +}; + +} // namespace folly diff --git a/folly/executors/thread_factory/PriorityThreadFactory.h b/folly/executors/thread_factory/PriorityThreadFactory.h new file mode 100644 index 00000000..0c75fefe --- /dev/null +++ b/folly/executors/thread_factory/PriorityThreadFactory.h @@ -0,0 +1,60 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +namespace folly { + +/** + * A ThreadFactory that sets nice values for each thread. The main + * use case for this class is if there are multiple + * CPUThreadPoolExecutors in a single process, or between multiple + * processes, where some should have a higher priority than the others. + * + * Note that per-thread nice values are not POSIX standard, but both + * pthreads and linux support per-thread nice. The default linux + * scheduler uses these values to do smart thread prioritization. + * sched_priority function calls only affect real-time schedulers. + */ +class PriorityThreadFactory : public ThreadFactory { + public: + explicit PriorityThreadFactory( + std::shared_ptr factory, + int priority) + : factory_(std::move(factory)), priority_(priority) {} + + std::thread newThread(Func&& func) override { + int priority = priority_; + return factory_->newThread([ priority, func = std::move(func) ]() mutable { + if (setpriority(PRIO_PROCESS, 0, priority) != 0) { + LOG(ERROR) << "setpriority failed (are you root?) with error " << errno, + strerror(errno); + } + func(); + }); + } + + private: + std::shared_ptr factory_; + int priority_; +}; + +} // folly diff --git a/folly/executors/thread_factory/ThreadFactory.h b/folly/executors/thread_factory/ThreadFactory.h new file mode 100644 index 00000000..0af86322 --- /dev/null +++ b/folly/executors/thread_factory/ThreadFactory.h @@ -0,0 +1,30 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +#include + +namespace folly { + +class ThreadFactory { + public: + virtual ~ThreadFactory() = default; + virtual std::thread newThread(Func&& func) = 0; +}; + +} // namespace folly