From: Lee Howes Date: Fri, 26 Aug 2016 22:58:45 +0000 (-0700) Subject: Added fiber-compatible semaphore. X-Git-Tag: v2016.08.29.00~1 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=dcf0273a2733df4e61d92b1510a411e3418c8fad;p=folly.git Added fiber-compatible semaphore. Summary: Adds a standard semaphore type with signal and wait methods that is safe to use in both multi-threaded contexts and from fibers. Reviewed By: andriigrynenko Differential Revision: D3778943 fbshipit-source-id: 6997f1fb870739e07f982399dbebfd8b3e45daa2 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 7caea18b..67491803 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -521,6 +521,7 @@ nobase_follyinclude_HEADERS += \ fibers/LoopController.h \ fibers/Promise.h \ fibers/Promise-inl.h \ + fibers/Semaphore.h \ fibers/SimpleLoopController.h \ fibers/TimedMutex.h \ fibers/TimedMutex-inl.h \ @@ -535,6 +536,7 @@ libfolly_la_SOURCES += \ fibers/FiberManager.cpp \ fibers/FiberManagerMap.cpp \ fibers/GuardPageAllocator.cpp \ + fibers/Semaphore.cpp \ fibers/TimeoutController.cpp endif diff --git a/folly/fibers/Semaphore.cpp b/folly/fibers/Semaphore.cpp new file mode 100644 index 00000000..40efae3f --- /dev/null +++ b/folly/fibers/Semaphore.cpp @@ -0,0 +1,94 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Semaphore.h" + +namespace folly { +namespace fibers { + +bool Semaphore::signalSlow() { + // If we signalled a release, notify the waitlist + SYNCHRONIZED(waitList_) { + auto testVal = tokens_.load(std::memory_order_acquire); + if (testVal != 0) { + return false; + } + + if (waitList_.empty()) { + // If the waitlist is now empty, ensure the token count increments + // No need for CAS here as we will always be under the mutex + CHECK(tokens_.compare_exchange_strong( + testVal, testVal + 1, std::memory_order_relaxed)); + } else { + // trigger waiter if there is one + waitList_.front()->post(); + waitList_.pop(); + } + } // SYNCHRONIZED(waitList_) + return true; +} + +void Semaphore::signal() { + auto oldVal = tokens_.load(std::memory_order_acquire); + do { + if (oldVal == 0) { + if (signalSlow()) { + break; + } + } + } while (!tokens_.compare_exchange_weak( + oldVal, + oldVal + 1, + std::memory_order_release, + std::memory_order_acquire)); +} + +bool Semaphore::waitSlow() { + // Slow path, create a baton and acquire a mutex to update the wait list + folly::fibers::Baton waitBaton; + + SYNCHRONIZED(waitList_) { + auto testVal = tokens_.load(std::memory_order_acquire); + if (testVal != 0) { + return false; + } + // prepare baton and add to queue + waitList_.push(&waitBaton); + } + // If we managed to create a baton, wait on it + // This has to be done here so the mutex has been released + waitBaton.wait(); + return true; +} + +void Semaphore::wait() { + auto oldVal = tokens_.load(std::memory_order_acquire); + do { + if (oldVal == 0) { + // If waitSlow fails it is because the token is non-zero by the time + // the lock is taken, so we can just continue round the loop + if (waitSlow()) { + break; + } + } + } while (!tokens_.compare_exchange_weak( + oldVal, + oldVal - 1, + std::memory_order_release, + std::memory_order_acquire)); +} + +} // namespace fibers +} // namespace folly diff --git a/folly/fibers/Semaphore.h b/folly/fibers/Semaphore.h new file mode 100644 index 00000000..24e5707c --- /dev/null +++ b/folly/fibers/Semaphore.h @@ -0,0 +1,57 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { +namespace fibers { + +/* + * Fiber-compatible semaphore. Will safely block fibers that wait when no + * tokens are available and wake fibers when signalled. + */ +class Semaphore { + public: + explicit Semaphore(size_t tokenCount) : tokens_(tokenCount) {} + + Semaphore(const Semaphore&) = delete; + Semaphore(Semaphore&&) = delete; + Semaphore& operator=(const Semaphore&) = delete; + Semaphore& operator=(Semaphore&&) = delete; + + /* + * Release a token in the semaphore. Signal the waiter if necessary. + */ + void signal(); + + /* + * Wait for capacity in the semaphore. + */ + void wait(); + + private: + bool waitSlow(); + bool signalSlow(); + + // Atomic counter + std::atomic tokens_; + folly::Synchronized> waitList_; +}; + +} // namespace fibers +} // namespace folly diff --git a/folly/fibers/test/FibersTest.cpp b/folly/fibers/test/FibersTest.cpp index dcdd5d79..bbb5ddcf 100644 --- a/folly/fibers/test/FibersTest.cpp +++ b/folly/fibers/test/FibersTest.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -1539,3 +1540,58 @@ TEST(FiberManager, nestedFiberManagers) { outerEvb.loopForever(); } + +TEST(FiberManager, semaphore) { + constexpr size_t kTasks = 10; + constexpr size_t kIterations = 10000; + constexpr size_t kNumTokens = 10; + + Semaphore sem(kNumTokens); + int counterA = 0; + int counterB = 0; + + auto task = [&sem, kTasks, kIterations, kNumTokens]( + int& counter, folly::fibers::Baton& baton) { + FiberManager manager(folly::make_unique()); + folly::EventBase evb; + dynamic_cast(manager.loopController()) + .attachEventBase(evb); + + { + std::shared_ptr completionCounter( + &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); }); + + for (size_t i = 0; i < kTasks; ++i) { + manager.addTask([&, completionCounter]() { + for (size_t i = 0; i < kIterations; ++i) { + sem.wait(); + ++counter; + sem.signal(); + --counter; + + EXPECT_LT(counter, kNumTokens); + EXPECT_GE(counter, 0); + } + }); + } + + baton.wait(); + } + evb.loopForever(); + }; + + folly::fibers::Baton batonA; + folly::fibers::Baton batonB; + std::thread threadA([&] { task(counterA, batonA); }); + std::thread threadB([&] { task(counterB, batonB); }); + + batonA.post(); + batonB.post(); + threadA.join(); + threadB.join(); + + EXPECT_LT(counterA, kNumTokens); + EXPECT_LT(counterB, kNumTokens); + EXPECT_GE(counterA, 0); + EXPECT_GE(counterB, 0); +}