detail/TurnSequencer.h \
detail/UncaughtExceptionCounter.h \
executors/Async.h \
- executors/BlockingQueue.h \
executors/CPUThreadPoolExecutor.h \
executors/Codel.h \
executors/DrivableExecutor.h \
executors/IOExecutor.h \
executors/IOObjectCache.h \
executors/IOThreadPoolExecutor.h \
- executors/LifoSemMPMCQueue.h \
- executors/NamedThreadFactory.h \
executors/NotificationQueueExecutor.h \
- executors/PriorityLifoSemMPMCQueue.h \
- executors/PriorityThreadFactory.h \
executors/ScheduledExecutor.h \
executors/SerialExecutor.h \
- executors/ThreadFactory.h \
executors/ThreadPoolExecutor.h \
executors/ThreadedExecutor.h \
- executors/UnboundedBlockingQueue.h \
+ executors/task_queue/BlockingQueue.h \
+ executors/task_queue/LifoSemMPMCQueue.h \
+ executors/task_queue/PriorityLifoSemMPMCQueue.h \
+ executors/task_queue/UnboundedBlockingQueue.h \
+ executors/thread_factory/NamedThreadFactory.h \
+ executors/thread_factory/PriorityThreadFactory.h \
+ executors/thread_factory/ThreadFactory.h \
functional/ApplyTuple.h \
Demangle.h \
DiscriminatedPtr.h \
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <exception>
-#include <stdexcept>
-
-#include <glog/logging.h>
-
-namespace folly {
-
-// Some queue implementations (for example, LifoSemMPMCQueue or
-// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and
-// non-blocking (THROW) behaviors.
-enum class QueueBehaviorIfFull { THROW, BLOCK };
-
-class QueueFullException : public std::runtime_error {
- using std::runtime_error::runtime_error; // Inherit constructors.
-};
-
-template <class T>
-class BlockingQueue {
- public:
- virtual ~BlockingQueue() = default;
- virtual void add(T item) = 0;
- virtual void addWithPriority(T item, int8_t /* priority */) {
- add(std::move(item));
- }
- virtual uint8_t getNumPriorities() {
- return 1;
- }
- virtual T take() = 0;
- virtual size_t size() = 0;
-};
-
-} // namespace folly
*/
#include <folly/executors/CPUThreadPoolExecutor.h>
-#include <folly/executors/PriorityLifoSemMPMCQueue.h>
+#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
namespace folly {
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/LifoSem.h>
-#include <folly/MPMCQueue.h>
-#include <folly/executors/BlockingQueue.h>
-
-namespace folly {
-
-template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
-class LifoSemMPMCQueue : public BlockingQueue<T> {
- public:
- // Note: The queue pre-allocates all memory for max_capacity
- explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {}
-
- void add(T item) override {
- switch (kBehavior) { // static
- case QueueBehaviorIfFull::THROW:
- if (!queue_.write(std::move(item))) {
- throw QueueFullException("LifoSemMPMCQueue full, can't add item");
- }
- break;
- case QueueBehaviorIfFull::BLOCK:
- queue_.blockingWrite(std::move(item));
- break;
- }
- sem_.post();
- }
-
- T take() override {
- T item;
- while (!queue_.readIfNotEmpty(item)) {
- sem_.wait();
- }
- return item;
- }
-
- size_t capacity() {
- return queue_.capacity();
- }
-
- size_t size() override {
- return queue_.size();
- }
-
- private:
- folly::LifoSem sem_;
- folly::MPMCQueue<T> queue_;
-};
-
-} // namespace folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <atomic>
-#include <string>
-#include <thread>
-
-#include <folly/Conv.h>
-#include <folly/Range.h>
-#include <folly/ThreadName.h>
-#include <folly/executors/ThreadFactory.h>
-
-namespace folly {
-
-class NamedThreadFactory : public ThreadFactory {
- public:
- explicit NamedThreadFactory(folly::StringPiece prefix)
- : prefix_(prefix.str()), suffix_(0) {}
-
- std::thread newThread(Func&& func) override {
- auto name = folly::to<std::string>(prefix_, suffix_++);
- return std::thread(
- [ func = std::move(func), name = std::move(name) ]() mutable {
- folly::setThreadName(name);
- func();
- });
- }
-
- void setNamePrefix(folly::StringPiece prefix) {
- prefix_ = prefix.str();
- }
-
- std::string getNamePrefix() {
- return prefix_;
- }
-
- private:
- std::string prefix_;
- std::atomic<uint64_t> suffix_;
-};
-
-} // namespace folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/Executor.h>
-#include <folly/LifoSem.h>
-#include <folly/MPMCQueue.h>
-#include <folly/executors/BlockingQueue.h>
-
-namespace folly {
-
-template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
-class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
- public:
- // Note A: The queue pre-allocates all memory for max_capacity
- // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
- // MID_PRI and HI_PRI are treated at the same priority level.
- PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
- queues_.reserve(numPriorities);
- for (int8_t i = 0; i < numPriorities; i++) {
- queues_.emplace_back(max_capacity);
- }
- }
-
- uint8_t getNumPriorities() override {
- return queues_.size();
- }
-
- // Add at medium priority by default
- void add(T item) override {
- addWithPriority(std::move(item), folly::Executor::MID_PRI);
- }
-
- void addWithPriority(T item, int8_t priority) override {
- int mid = getNumPriorities() / 2;
- size_t queue = priority < 0
- ? std::max(0, mid + priority)
- : std::min(getNumPriorities() - 1, mid + priority);
- CHECK_LT(queue, queues_.size());
- switch (kBehavior) { // static
- case QueueBehaviorIfFull::THROW:
- if (!queues_[queue].write(std::move(item))) {
- throw QueueFullException("LifoSemMPMCQueue full, can't add item");
- }
- break;
- case QueueBehaviorIfFull::BLOCK:
- queues_[queue].blockingWrite(std::move(item));
- break;
- }
- sem_.post();
- }
-
- T take() override {
- T item;
- while (true) {
- if (nonBlockingTake(item)) {
- return item;
- }
- sem_.wait();
- }
- }
-
- bool nonBlockingTake(T& item) {
- for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
- if (it->readIfNotEmpty(item)) {
- return true;
- }
- }
- return false;
- }
-
- size_t size() override {
- size_t size = 0;
- for (auto& q : queues_) {
- size += q.size();
- }
- return size;
- }
-
- size_t sizeGuess() const {
- size_t size = 0;
- for (auto& q : queues_) {
- size += q.sizeGuess();
- }
- return size;
- }
-
- private:
- folly::LifoSem sem_;
- std::vector<folly::MPMCQueue<T>> queues_;
-};
-
-} // namespace folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/executors/ThreadFactory.h>
-
-#include <folly/portability/SysResource.h>
-#include <folly/portability/SysTime.h>
-
-namespace folly {
-
-/**
- * A ThreadFactory that sets nice values for each thread. The main
- * use case for this class is if there are multiple
- * CPUThreadPoolExecutors in a single process, or between multiple
- * processes, where some should have a higher priority than the others.
- *
- * Note that per-thread nice values are not POSIX standard, but both
- * pthreads and linux support per-thread nice. The default linux
- * scheduler uses these values to do smart thread prioritization.
- * sched_priority function calls only affect real-time schedulers.
- */
-class PriorityThreadFactory : public ThreadFactory {
- public:
- explicit PriorityThreadFactory(
- std::shared_ptr<ThreadFactory> factory,
- int priority)
- : factory_(std::move(factory)), priority_(priority) {}
-
- std::thread newThread(Func&& func) override {
- int priority = priority_;
- return factory_->newThread([ priority, func = std::move(func) ]() mutable {
- if (setpriority(PRIO_PROCESS, 0, priority) != 0) {
- LOG(ERROR) << "setpriority failed (are you root?) with error " << errno,
- strerror(errno);
- }
- func();
- });
- }
-
- private:
- std::shared_ptr<ThreadFactory> factory_;
- int priority_;
-};
-
-} // folly
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-#include <folly/Executor.h>
-
-#include <thread>
-
-namespace folly {
-
-class ThreadFactory {
- public:
- virtual ~ThreadFactory() = default;
- virtual std::thread newThread(Func&& func) = 0;
-};
-
-} // namespace folly
#include <folly/Memory.h>
#include <folly/RWSpinLock.h>
#include <folly/concurrency/GlobalThreadPoolList.h>
-#include <folly/executors/LifoSemMPMCQueue.h>
-#include <folly/executors/NamedThreadFactory.h>
+#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
+#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/io/async/Request.h>
#include <algorithm>
#include <glog/logging.h>
#include <folly/ThreadName.h>
-#include <folly/executors/NamedThreadFactory.h>
+#include <folly/executors/thread_factory/NamedThreadFactory.h>
namespace folly {
#include <thread>
#include <folly/Executor.h>
-#include <folly/executors/ThreadFactory.h>
+#include <folly/executors/thread_factory/ThreadFactory.h>
namespace folly {
+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/LifoSem.h>
-#include <folly/Synchronized.h>
-#include <folly/executors/BlockingQueue.h>
-#include <queue>
-
-namespace folly {
-
-// Warning: this is effectively just a std::deque wrapped in a single mutex
-// We are aiming to add a more performant concurrent unbounded queue in the
-// future, but this class is available if you must have an unbounded queue
-// and can tolerate any contention.
-template <class T>
-class UnboundedBlockingQueue : public BlockingQueue<T> {
- public:
- virtual ~UnboundedBlockingQueue() {}
-
- void add(T item) override {
- queue_.wlock()->push(std::move(item));
- sem_.post();
- }
-
- T take() override {
- while (true) {
- {
- auto ulockedQueue = queue_.ulock();
- if (!ulockedQueue->empty()) {
- auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
- T item = std::move(wlockedQueue->front());
- wlockedQueue->pop();
- return item;
- }
- }
- sem_.wait();
- }
- }
-
- size_t size() override {
- return queue_.rlock()->size();
- }
-
- private:
- LifoSem sem_;
- Synchronized<std::queue<T>> queue_;
-};
-
-} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <exception>
+#include <stdexcept>
+
+#include <glog/logging.h>
+
+namespace folly {
+
+// Some queue implementations (for example, LifoSemMPMCQueue or
+// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and
+// non-blocking (THROW) behaviors.
+enum class QueueBehaviorIfFull { THROW, BLOCK };
+
+class QueueFullException : public std::runtime_error {
+ using std::runtime_error::runtime_error; // Inherit constructors.
+};
+
+template <class T>
+class BlockingQueue {
+ public:
+ virtual ~BlockingQueue() = default;
+ virtual void add(T item) = 0;
+ virtual void addWithPriority(T item, int8_t /* priority */) {
+ add(std::move(item));
+ }
+ virtual uint8_t getNumPriorities() {
+ return 1;
+ }
+ virtual T take() = 0;
+ virtual size_t size() = 0;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+#include <folly/executors/task_queue/BlockingQueue.h>
+
+namespace folly {
+
+template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
+class LifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+ // Note: The queue pre-allocates all memory for max_capacity
+ explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {}
+
+ void add(T item) override {
+ switch (kBehavior) { // static
+ case QueueBehaviorIfFull::THROW:
+ if (!queue_.write(std::move(item))) {
+ throw QueueFullException("LifoSemMPMCQueue full, can't add item");
+ }
+ break;
+ case QueueBehaviorIfFull::BLOCK:
+ queue_.blockingWrite(std::move(item));
+ break;
+ }
+ sem_.post();
+ }
+
+ T take() override {
+ T item;
+ while (!queue_.readIfNotEmpty(item)) {
+ sem_.wait();
+ }
+ return item;
+ }
+
+ size_t capacity() {
+ return queue_.capacity();
+ }
+
+ size_t size() override {
+ return queue_.size();
+ }
+
+ private:
+ folly::LifoSem sem_;
+ folly::MPMCQueue<T> queue_;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Executor.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+#include <folly/executors/task_queue/BlockingQueue.h>
+
+namespace folly {
+
+template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
+class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+ // Note A: The queue pre-allocates all memory for max_capacity
+ // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
+ // MID_PRI and HI_PRI are treated at the same priority level.
+ PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
+ queues_.reserve(numPriorities);
+ for (int8_t i = 0; i < numPriorities; i++) {
+ queues_.emplace_back(max_capacity);
+ }
+ }
+
+ uint8_t getNumPriorities() override {
+ return queues_.size();
+ }
+
+ // Add at medium priority by default
+ void add(T item) override {
+ addWithPriority(std::move(item), folly::Executor::MID_PRI);
+ }
+
+ void addWithPriority(T item, int8_t priority) override {
+ int mid = getNumPriorities() / 2;
+ size_t queue = priority < 0
+ ? std::max(0, mid + priority)
+ : std::min(getNumPriorities() - 1, mid + priority);
+ CHECK_LT(queue, queues_.size());
+ switch (kBehavior) { // static
+ case QueueBehaviorIfFull::THROW:
+ if (!queues_[queue].write(std::move(item))) {
+ throw QueueFullException("LifoSemMPMCQueue full, can't add item");
+ }
+ break;
+ case QueueBehaviorIfFull::BLOCK:
+ queues_[queue].blockingWrite(std::move(item));
+ break;
+ }
+ sem_.post();
+ }
+
+ T take() override {
+ T item;
+ while (true) {
+ if (nonBlockingTake(item)) {
+ return item;
+ }
+ sem_.wait();
+ }
+ }
+
+ bool nonBlockingTake(T& item) {
+ for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
+ if (it->readIfNotEmpty(item)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ size_t size() override {
+ size_t size = 0;
+ for (auto& q : queues_) {
+ size += q.size();
+ }
+ return size;
+ }
+
+ size_t sizeGuess() const {
+ size_t size = 0;
+ for (auto& q : queues_) {
+ size += q.sizeGuess();
+ }
+ return size;
+ }
+
+ private:
+ folly::LifoSem sem_;
+ std::vector<folly::MPMCQueue<T>> queues_;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/LifoSem.h>
+#include <folly/Synchronized.h>
+#include <folly/executors/task_queue/BlockingQueue.h>
+#include <queue>
+
+namespace folly {
+
+// Warning: this is effectively just a std::deque wrapped in a single mutex
+// We are aiming to add a more performant concurrent unbounded queue in the
+// future, but this class is available if you must have an unbounded queue
+// and can tolerate any contention.
+template <class T>
+class UnboundedBlockingQueue : public BlockingQueue<T> {
+ public:
+ virtual ~UnboundedBlockingQueue() {}
+
+ void add(T item) override {
+ queue_.wlock()->push(std::move(item));
+ sem_.post();
+ }
+
+ T take() override {
+ while (true) {
+ {
+ auto ulockedQueue = queue_.ulock();
+ if (!ulockedQueue->empty()) {
+ auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
+ T item = std::move(wlockedQueue->front());
+ wlockedQueue->pop();
+ return item;
+ }
+ }
+ sem_.wait();
+ }
+ }
+
+ size_t size() override {
+ return queue_.rlock()->size();
+ }
+
+ private:
+ LifoSem sem_;
+ Synchronized<std::queue<T>> queue_;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
+#include <folly/Baton.h>
+#include <gtest/gtest.h>
+#include <thread>
+
+using namespace folly;
+
+TEST(UnboundedQueuee, push_pop) {
+ UnboundedBlockingQueue<int> q;
+ q.add(42);
+ EXPECT_EQ(42, q.take());
+}
+TEST(UnboundedBlockingQueue, size) {
+ UnboundedBlockingQueue<int> q;
+ EXPECT_EQ(0, q.size());
+ q.add(42);
+ EXPECT_EQ(1, q.size());
+ q.take();
+ EXPECT_EQ(0, q.size());
+}
+
+TEST(UnboundedBlockingQueue, concurrent_push_pop) {
+ UnboundedBlockingQueue<int> q;
+ Baton<> b1, b2;
+ std::thread t([&] {
+ b1.post();
+ EXPECT_EQ(42, q.take());
+ EXPECT_EQ(0, q.size());
+ b2.post();
+ });
+ b1.wait();
+ q.add(42);
+ b2.wait();
+ EXPECT_EQ(0, q.size());
+ t.join();
+}
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/FutureExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
-#include <folly/executors/LifoSemMPMCQueue.h>
-#include <folly/executors/PriorityThreadFactory.h>
#include <folly/executors/ThreadPoolExecutor.h>
+#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
+#include <folly/executors/thread_factory/PriorityThreadFactory.h>
#include <gtest/gtest.h>
using namespace folly;
+++ /dev/null
-/*
- * Copyright 2017-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <folly/Baton.h>
-#include <folly/executors/UnboundedBlockingQueue.h>
-#include <gtest/gtest.h>
-#include <thread>
-
-using namespace folly;
-
-TEST(UnboundedQueuee, push_pop) {
- UnboundedBlockingQueue<int> q;
- q.add(42);
- EXPECT_EQ(42, q.take());
-}
-TEST(UnboundedBlockingQueue, size) {
- UnboundedBlockingQueue<int> q;
- EXPECT_EQ(0, q.size());
- q.add(42);
- EXPECT_EQ(1, q.size());
- q.take();
- EXPECT_EQ(0, q.size());
-}
-
-TEST(UnboundedBlockingQueue, concurrent_push_pop) {
- UnboundedBlockingQueue<int> q;
- Baton<> b1, b2;
- std::thread t([&] {
- b1.post();
- EXPECT_EQ(42, q.take());
- EXPECT_EQ(0, q.size());
- b2.post();
- });
- b1.wait();
- q.add(42);
- b2.wait();
- EXPECT_EQ(0, q.size());
- t.join();
-}
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <string>
+#include <thread>
+
+#include <folly/Conv.h>
+#include <folly/Range.h>
+#include <folly/ThreadName.h>
+#include <folly/executors/thread_factory/ThreadFactory.h>
+
+namespace folly {
+
+class NamedThreadFactory : public ThreadFactory {
+ public:
+ explicit NamedThreadFactory(folly::StringPiece prefix)
+ : prefix_(prefix.str()), suffix_(0) {}
+
+ std::thread newThread(Func&& func) override {
+ auto name = folly::to<std::string>(prefix_, suffix_++);
+ return std::thread(
+ [ func = std::move(func), name = std::move(name) ]() mutable {
+ folly::setThreadName(name);
+ func();
+ });
+ }
+
+ void setNamePrefix(folly::StringPiece prefix) {
+ prefix_ = prefix.str();
+ }
+
+ std::string getNamePrefix() {
+ return prefix_;
+ }
+
+ private:
+ std::string prefix_;
+ std::atomic<uint64_t> suffix_;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/executors/thread_factory/ThreadFactory.h>
+
+#include <folly/portability/SysResource.h>
+#include <folly/portability/SysTime.h>
+
+namespace folly {
+
+/**
+ * A ThreadFactory that sets nice values for each thread. The main
+ * use case for this class is if there are multiple
+ * CPUThreadPoolExecutors in a single process, or between multiple
+ * processes, where some should have a higher priority than the others.
+ *
+ * Note that per-thread nice values are not POSIX standard, but both
+ * pthreads and linux support per-thread nice. The default linux
+ * scheduler uses these values to do smart thread prioritization.
+ * sched_priority function calls only affect real-time schedulers.
+ */
+class PriorityThreadFactory : public ThreadFactory {
+ public:
+ explicit PriorityThreadFactory(
+ std::shared_ptr<ThreadFactory> factory,
+ int priority)
+ : factory_(std::move(factory)), priority_(priority) {}
+
+ std::thread newThread(Func&& func) override {
+ int priority = priority_;
+ return factory_->newThread([ priority, func = std::move(func) ]() mutable {
+ if (setpriority(PRIO_PROCESS, 0, priority) != 0) {
+ LOG(ERROR) << "setpriority failed (are you root?) with error " << errno,
+ strerror(errno);
+ }
+ func();
+ });
+ }
+
+ private:
+ std::shared_ptr<ThreadFactory> factory_;
+ int priority_;
+};
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/Executor.h>
+
+#include <thread>
+
+namespace folly {
+
+class ThreadFactory {
+ public:
+ virtual ~ThreadFactory() = default;
+ virtual std::thread newThread(Func&& func) = 0;
+};
+
+} // namespace folly