From: Dave Watson Date: Thu, 28 Dec 2017 15:46:10 +0000 (-0800) Subject: synchronization/ParkingLot X-Git-Tag: v2018.01.01.00~7 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;ds=sidebyside;h=84ad2a4d4b52b0207b35f35c54702cb87189fe4d;p=folly.git synchronization/ParkingLot Summary: A ParkingLot API inspired by linux futex syscall, and WebKit's parkingLot. Extends the futex interface with lambdas, such that many different sleeping abstractions can be built. Reviewed By: yfeldblum, aary Differential Revision: D6581826 fbshipit-source-id: dba741fe4ed34f27bfad5f5747adce85741441e0 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 9bc83a13..76a95848 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -437,6 +437,7 @@ nobase_follyinclude_HEADERS = \ synchronization/Baton.h \ synchronization/CallOnce.h \ synchronization/LifoSem.h \ + synchronization/ParkingLot.h \ synchronization/detail/AtomicUtils.h \ synchronization/detail/Sleeper.h \ system/MemoryMapping.h \ @@ -625,6 +626,7 @@ libfolly_la_SOURCES = \ stats/TimeseriesHistogram.cpp \ synchronization/AsymmetricMemoryBarrier.cpp \ synchronization/LifoSem.cpp \ + synchronization/ParkingLot.cpp \ system/MemoryMapping.cpp \ system/Shell.cpp \ system/ThreadName.cpp \ diff --git a/folly/synchronization/ParkingLot.cpp b/folly/synchronization/ParkingLot.cpp new file mode 100644 index 00000000..e3594cca --- /dev/null +++ b/folly/synchronization/ParkingLot.cpp @@ -0,0 +1,34 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace folly { +namespace parking_lot_detail { + +Bucket& Bucket::bucketFor(uint64_t key) { + constexpr size_t const kNumBuckets = kIsMobile ? 256 : 4096; + + // Statically allocating this lets us use this in allocation-sensitive + // contexts. This relies on the assumption that std::mutex won't dynamically + // allocate memory, which we assume to be the case on Linux and iOS. + static Indestructible> gBuckets; + return (*gBuckets)[key % kNumBuckets]; +} + +std::atomic idallocator{0}; + +} // namespace parking_lot_detail +} // namespace folly diff --git a/folly/synchronization/ParkingLot.h b/folly/synchronization/ParkingLot.h new file mode 100644 index 00000000..4ee87f7a --- /dev/null +++ b/folly/synchronization/ParkingLot.h @@ -0,0 +1,278 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +namespace folly { + +namespace parking_lot_detail { + +struct WaitNodeBase : public boost::intrusive::list_base_hook<> { + const uint64_t key_; + const uint64_t lotid_; + + // tricky: hold both bucket and node mutex to write, either to read + bool signaled_; + std::mutex mutex_; + std::condition_variable cond_; + + WaitNodeBase(uint64_t key, uint64_t lotid) + : key_(key), lotid_(lotid), signaled_(false) {} + + template + std::cv_status wait(std::chrono::time_point deadline) { + std::cv_status status = std::cv_status::no_timeout; + std::unique_lock nodeLock(mutex_); + while (!signaled_ && status != std::cv_status::timeout) { + if (deadline != std::chrono::time_point::max()) { + status = cond_.wait_until(nodeLock, deadline); + } else { + cond_.wait(nodeLock); + } + } + return status; + } + + void wake() { + std::unique_lock nodeLock(mutex_); + signaled_ = true; + cond_.notify_one(); + } + + bool signaled() { + return signaled_; + } +}; + +extern std::atomic idallocator; + +// Our emulated futex uses 4096 lists of wait nodes. There are two levels +// of locking: a per-list mutex that controls access to the list and a +// per-node mutex, condvar, and bool that are used for the actual wakeups. +// The per-node mutex allows us to do precise wakeups without thundering +// herds. +struct Bucket { + std::mutex mutex_; + boost::intrusive::list waiters_; + + static Bucket& bucketFor(uint64_t key); +}; + +} // namespace parking_lot_detail + +enum class UnparkControl { + RetainContinue, + RemoveContinue, + RetainBreak, + RemoveBreak, +}; + +enum class ParkResult { + Skip, + Unpark, + Timeout, +}; + +/* + * ParkingLot provides an interface that is similar to Linux's futex + * system call, but with additional functionality. It is implemented + * in a portable way on top of std::mutex and std::condition_variable. + * + * Additional reading: + * https://webkit.org/blog/6161/locking-in-webkit/ + * https://github.com/WebKit/webkit/blob/master/Source/WTF/wtf/ParkingLot.h + * https://locklessinc.com/articles/futex_cheat_sheet/ + * + * The main difference from futex is that park/unpark take lambdas, + * such that nearly anything can be done while holding the bucket + * lock. Unpark() lambda can also be used to wake up any number of + * waiters. + * + * ParkingLot is templated on the data type, however, all ParkingLot + * implementations are backed by a single static array of buckets to + * avoid large memory overhead. Lambdas will only ever be called on + * the specific ParkingLot's nodes. + */ +template +class ParkingLot { + const uint64_t lotid_; + ParkingLot(const ParkingLot&) = delete; + + struct WaitNode : public parking_lot_detail::WaitNodeBase { + const Data data_; + + template + WaitNode(uint64_t key, uint64_t lotid, D&& data) + : WaitNodeBase(key, lotid), data_(std::forward(data)) {} + }; + + public: + ParkingLot() : lotid_(parking_lot_detail::idallocator++) {} + + /* Park API + * + * Key is almost always the address of a variable. + * + * ToPark runs while holding the bucket lock: usually this + * is a check to see if we can sleep, by checking waiter bits. + * + * PreWait is usually used to implement condition variable like + * things, such that you can unlock the condition variable's lock at + * the appropriate time. + */ + template + ParkResult park(const Key key, D&& data, ToPark&& toPark, PreWait&& preWait) { + return park_until( + key, + std::forward(data), + std::forward(toPark), + std::forward(preWait), + std::chrono::steady_clock::time_point::max()); + } + + template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Clock, + typename Duration> + ParkResult park_until( + const Key key, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::time_point deadline); + + template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Rep, + typename Period> + ParkResult park_for( + const Key key, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::duration& timeout) { + return park_until( + key, + std::forward(data), + std::forward(toPark), + std::forward(preWait), + timeout + std::chrono::steady_clock::now()); + } + + /* + * Unpark API + * + * Key is the same uniqueaddress used in park(), and is used as a + * hash key for lookup of waiters. + * + * Unparker is a function that is given the Data parameter, and + * returns an UnparkControl. The Remove* results will remove and + * wake the waiter, the Ignore/Stop results will not, while stopping + * or continuing iteration of the waiter list. + */ + template + void unpark(const Key key, Unparker&& func); +}; + +template +template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Clock, + typename Duration> +ParkResult ParkingLot::park_until( + const Key bits, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::time_point deadline) { + auto key = hash::twang_mix64(uint64_t(bits)); + auto& bucket = parking_lot_detail::Bucket::bucketFor(key); + WaitNode node(key, lotid_, std::forward(data)); + + { + std::unique_lock bucketLock(bucket.mutex_); + + if (!std::forward(toPark)()) { + return ParkResult::Skip; + } + + bucket.waiters_.push_back(node); + } // bucketLock scope + + std::forward(preWait)(); + + auto status = node.wait(deadline); + + if (status == std::cv_status::timeout) { + // it's not really a timeout until we unlink the unsignaled node + std::unique_lock bucketLock(bucket.mutex_); + if (!node.signaled()) { + bucket.waiters_.erase(bucket.waiters_.iterator_to(node)); + return ParkResult::Timeout; + } + } + + return ParkResult::Unpark; +} + +template +template +void ParkingLot::unpark(const Key bits, Func&& func) { + auto key = hash::twang_mix64(uint64_t(bits)); + auto& bucket = parking_lot_detail::Bucket::bucketFor(key); + std::unique_lock bucketLock(bucket.mutex_); + + for (auto iter = bucket.waiters_.begin(); iter != bucket.waiters_.end();) { + auto current = iter; + auto& node = *static_cast(&*iter++); + if (node.key_ == key && node.lotid_ == lotid_) { + auto result = std::forward(func)(node.data_); + if (result == UnparkControl::RemoveBreak || + result == UnparkControl::RemoveContinue) { + // we unlink, but waiter destroys the node + bucket.waiters_.erase(current); + + node.wake(); + } + if (result == UnparkControl::RemoveBreak || + result == UnparkControl::RetainBreak) { + return; + } + } + } +} + +} // namespace folly diff --git a/folly/synchronization/test/ParkingLotTest.cpp b/folly/synchronization/test/ParkingLotTest.cpp new file mode 100644 index 00000000..d31a4992 --- /dev/null +++ b/folly/synchronization/test/ParkingLotTest.cpp @@ -0,0 +1,120 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include +#include + +using namespace folly; + +TEST(ParkingLot, multilot) { + using SmallLot = ParkingLot; + using LargeLot = ParkingLot; + SmallLot smalllot; + LargeLot largelot; + folly::Baton<> sb; + folly::Baton<> lb; + + std::thread small([&]() { + smalllot.park(0, false, [] { return true; }, [&]() { sb.post(); }); + }); + std::thread large([&]() { + largelot.park(0, true, [] { return true; }, [&]() { lb.post(); }); + }); + sb.wait(); + lb.wait(); + + int count = 0; + smalllot.unpark(0, [&](bool data) { + count++; + EXPECT_EQ(data, false); + return UnparkControl::RemoveContinue; + }); + EXPECT_EQ(count, 1); + count = 0; + largelot.unpark(0, [&](bool data) { + count++; + EXPECT_EQ(data, true); + return UnparkControl::RemoveContinue; + }); + EXPECT_EQ(count, 1); + + small.join(); + large.join(); +} + +// This is not possible to implement with Futex, because futex +// and the native linux syscall are 32-bit only. +TEST(ParkingLot, LargeWord) { + ParkingLot lot; + std::atomic w{0}; + + lot.park(0, false, [&]() { return w == 1; }, []() {}); + + // Validate should return false, will hang otherwise. +} + +class WaitableMutex : public std::mutex { + using Lot = ParkingLot>; + static Lot lot; + + public: + void unlock() { + bool unparked = false; + lot.unpark(uint64_t(this), [&](std::function wfunc) { + if (wfunc()) { + unparked = true; + return UnparkControl::RemoveBreak; + } else { + return UnparkControl::RemoveContinue; + } + }); + if (!unparked) { + std::mutex::unlock(); + } + // Otherwise, we pass mutex directly to waiter without needing to unlock. + } + + template + void wait(Wait wfunc) { + lot.park( + uint64_t(this), + wfunc, + [&]() { return !wfunc(); }, + [&]() { std::mutex::unlock(); }); + } +}; + +WaitableMutex::Lot WaitableMutex::lot; + +TEST(ParkingLot, WaitableMutexTest) { + std::atomic go{false}; + WaitableMutex mu; + std::thread t([&]() { + std::lock_guard g(mu); + mu.wait([&]() { return go == true; }); + }); + sleep(1); + + { + std::lock_guard g(mu); + go = true; + } + t.join(); +}