fibers/LoopController.h \
fibers/Promise.h \
fibers/Promise-inl.h \
+ fibers/Semaphore.h \
fibers/SimpleLoopController.h \
fibers/TimedMutex.h \
fibers/TimedMutex-inl.h \
fibers/FiberManager.cpp \
fibers/FiberManagerMap.cpp \
fibers/GuardPageAllocator.cpp \
+ fibers/Semaphore.cpp \
fibers/TimeoutController.cpp
endif
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Semaphore.h"
+
+namespace folly {
+namespace fibers {
+
+bool Semaphore::signalSlow() {
+ // If we signalled a release, notify the waitlist
+ SYNCHRONIZED(waitList_) {
+ auto testVal = tokens_.load(std::memory_order_acquire);
+ if (testVal != 0) {
+ return false;
+ }
+
+ if (waitList_.empty()) {
+ // If the waitlist is now empty, ensure the token count increments
+ // No need for CAS here as we will always be under the mutex
+ CHECK(tokens_.compare_exchange_strong(
+ testVal, testVal + 1, std::memory_order_relaxed));
+ } else {
+ // trigger waiter if there is one
+ waitList_.front()->post();
+ waitList_.pop();
+ }
+ } // SYNCHRONIZED(waitList_)
+ return true;
+}
+
+void Semaphore::signal() {
+ auto oldVal = tokens_.load(std::memory_order_acquire);
+ do {
+ if (oldVal == 0) {
+ if (signalSlow()) {
+ break;
+ }
+ }
+ } while (!tokens_.compare_exchange_weak(
+ oldVal,
+ oldVal + 1,
+ std::memory_order_release,
+ std::memory_order_acquire));
+}
+
+bool Semaphore::waitSlow() {
+ // Slow path, create a baton and acquire a mutex to update the wait list
+ folly::fibers::Baton waitBaton;
+
+ SYNCHRONIZED(waitList_) {
+ auto testVal = tokens_.load(std::memory_order_acquire);
+ if (testVal != 0) {
+ return false;
+ }
+ // prepare baton and add to queue
+ waitList_.push(&waitBaton);
+ }
+ // If we managed to create a baton, wait on it
+ // This has to be done here so the mutex has been released
+ waitBaton.wait();
+ return true;
+}
+
+void Semaphore::wait() {
+ auto oldVal = tokens_.load(std::memory_order_acquire);
+ do {
+ if (oldVal == 0) {
+ // If waitSlow fails it is because the token is non-zero by the time
+ // the lock is taken, so we can just continue round the loop
+ if (waitSlow()) {
+ break;
+ }
+ }
+ } while (!tokens_.compare_exchange_weak(
+ oldVal,
+ oldVal - 1,
+ std::memory_order_release,
+ std::memory_order_acquire));
+}
+
+} // namespace fibers
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Synchronized.h>
+#include <folly/fibers/Baton.h>
+
+namespace folly {
+namespace fibers {
+
+/*
+ * Fiber-compatible semaphore. Will safely block fibers that wait when no
+ * tokens are available and wake fibers when signalled.
+ */
+class Semaphore {
+ public:
+ explicit Semaphore(size_t tokenCount) : tokens_(tokenCount) {}
+
+ Semaphore(const Semaphore&) = delete;
+ Semaphore(Semaphore&&) = delete;
+ Semaphore& operator=(const Semaphore&) = delete;
+ Semaphore& operator=(Semaphore&&) = delete;
+
+ /*
+ * Release a token in the semaphore. Signal the waiter if necessary.
+ */
+ void signal();
+
+ /*
+ * Wait for capacity in the semaphore.
+ */
+ void wait();
+
+ private:
+ bool waitSlow();
+ bool signalSlow();
+
+ // Atomic counter
+ std::atomic<int64_t> tokens_;
+ folly::Synchronized<std::queue<folly::fibers::Baton*>> waitList_;
+};
+
+} // namespace fibers
+} // namespace folly
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
#include <folly/fibers/WhenN.h>
outerEvb.loopForever();
}
+
+TEST(FiberManager, semaphore) {
+ constexpr size_t kTasks = 10;
+ constexpr size_t kIterations = 10000;
+ constexpr size_t kNumTokens = 10;
+
+ Semaphore sem(kNumTokens);
+ int counterA = 0;
+ int counterB = 0;
+
+ auto task = [&sem, kTasks, kIterations, kNumTokens](
+ int& counter, folly::fibers::Baton& baton) {
+ FiberManager manager(folly::make_unique<EventBaseLoopController>());
+ folly::EventBase evb;
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
+
+ {
+ std::shared_ptr<folly::EventBase> completionCounter(
+ &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+ for (size_t i = 0; i < kTasks; ++i) {
+ manager.addTask([&, completionCounter]() {
+ for (size_t i = 0; i < kIterations; ++i) {
+ sem.wait();
+ ++counter;
+ sem.signal();
+ --counter;
+
+ EXPECT_LT(counter, kNumTokens);
+ EXPECT_GE(counter, 0);
+ }
+ });
+ }
+
+ baton.wait();
+ }
+ evb.loopForever();
+ };
+
+ folly::fibers::Baton batonA;
+ folly::fibers::Baton batonB;
+ std::thread threadA([&] { task(counterA, batonA); });
+ std::thread threadB([&] { task(counterB, batonB); });
+
+ batonA.post();
+ batonB.post();
+ threadA.join();
+ threadB.join();
+
+ EXPECT_LT(counterA, kNumTokens);
+ EXPECT_LT(counterB, kNumTokens);
+ EXPECT_GE(counterA, 0);
+ EXPECT_GE(counterB, 0);
+}