From: James Sedgwick Date: Tue, 12 May 2015 15:14:47 +0000 (-0700) Subject: SharedPromise X-Git-Tag: v0.39.0~22 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=c5dabddc2f804c33dfeb7e248d697bf69f6c90cf;p=folly.git SharedPromise Summary: I tried two "smart" ways (deriving from Promise, encapsulating a Promise) and got nothing but trouble. The KISS principle is applied with gusto in this diff. Test Plan: unit, integrating in 3+ places in separate diffs Reviewed By: hans@fb.com Subscribers: craffert, trunkagent, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2035528 Signature: t1:2035528:1431393438:4e554cd30fa531d75b9267dccaade6dc516f2b15 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index ad215492..92d56b73 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -133,6 +133,8 @@ nobase_follyinclude_HEADERS = \ futures/Promise.h \ futures/QueuedImmediateExecutor.h \ futures/ScheduledExecutor.h \ + futures/SharedPromise.h \ + futures/SharedPromise-inl.h \ futures/Timekeeper.h \ futures/Try-inl.h \ futures/Try.h \ diff --git a/folly/futures/Promise-inl.h b/folly/futures/Promise-inl.h index 4792b57d..b855d064 100644 --- a/folly/futures/Promise-inl.h +++ b/folly/futures/Promise-inl.h @@ -109,7 +109,7 @@ void Promise::setInterruptHandler( } template -void Promise::setTry(Try t) { +void Promise::setTry(Try&& t) { throwIfFulfilled(); core_->setResult(std::move(t)); } diff --git a/folly/futures/Promise.h b/folly/futures/Promise.h index 560f4dc7..0ee979ea 100644 --- a/folly/futures/Promise.h +++ b/folly/futures/Promise.h @@ -88,7 +88,7 @@ public: template void setValue(M&& value); - void setTry(Try t); + void setTry(Try&& t); /** Fulfill this Promise with the result of a function that takes no arguments and returns something implicitly convertible to T. diff --git a/folly/futures/SharedPromise-inl.h b/folly/futures/SharedPromise-inl.h new file mode 100644 index 00000000..2c09a2a0 --- /dev/null +++ b/folly/futures/SharedPromise-inl.h @@ -0,0 +1,127 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace folly { + +template +SharedPromise::SharedPromise(SharedPromise&& other) noexcept { + *this = std::move(other); +} + +template +SharedPromise& SharedPromise::operator=( + SharedPromise&& other) noexcept { + if (this == &other) { + return *this; + } + + // std::lock will perform deadlock avoidance, in case + // Thread A: p1 = std::move(p2) + // Thread B: p2 = std::move(p1) + // race each other + std::lock(mutex_, other.mutex_); + std::lock_guard g1(mutex_, std::adopt_lock); + std::lock_guard g2(other.mutex_, std::adopt_lock); + + std::swap(size_, other.size_); + std::swap(hasValue_, other.hasValue_); + std::swap(try_, other.try_); + std::swap(promises_, other.promises_); + + return *this; +} + +template +size_t SharedPromise::size() { + std::lock_guard g(mutex_); + return size_; +} + +template +Future SharedPromise::getFuture() { + std::lock_guard g(mutex_); + size_++; + if (hasValue_) { + return makeFuture(Try(try_)); + } else { + promises_.emplace_back(); + return promises_.back().getFuture(); + } +} + +template +template +typename std::enable_if::value>::type +SharedPromise::setException(E const& e) { + setTry(Try(e)); +} + +template +void SharedPromise::setException(std::exception_ptr const& ep) { + setTry(Try(ep)); +} + +template +void SharedPromise::setException(exception_wrapper ew) { + setTry(Try(std::move(ew))); +} + +template +void SharedPromise::setInterruptHandler( + std::function fn) { + std::lock_guard g(mutex_); + if (hasValue_) { + return; + } + for (auto& p : promises_) { + p.setInterruptHandler(fn); + } +} + +template +template +void SharedPromise::setValue(M&& v) { + setTry(Try(std::forward(v))); +} + +template +template +void SharedPromise::setWith(F&& func) { + setTry(makeTryFunction(std::forward(func))); +} + +template +void SharedPromise::setTry(Try&& t) { + std::vector> promises; + + { + std::lock_guard g(mutex_); + if (hasValue_) { + throw PromiseAlreadySatisfied(); + } + hasValue_ = true; + try_ = std::move(t); + promises.swap(promises_); + } + + for (auto& p : promises) { + p.setTry(Try(try_)); + } +} + +} diff --git a/folly/futures/SharedPromise.h b/folly/futures/SharedPromise.h new file mode 100644 index 00000000..5041a38a --- /dev/null +++ b/folly/futures/SharedPromise.h @@ -0,0 +1,122 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { + +/* + * SharedPromise provides the same interface as Promise, but you can extract + * multiple Futures from it, i.e. you can call getFuture() as many times as + * you'd like. When the SharedPromise is fulfilled, all of the Futures will be + * called back. Calls to getFuture() after the SharedPromise is fulfilled return + * a completed Future. If you find yourself constructing collections of Promises + * and fulfilling them simultaneously with the same value, consider this + * utility instead. Likewise, if you find yourself in need of setting multiple + * callbacks on the same Future (which is indefinitely unsupported), consider + * refactoring to use SharedPromise to "split" the Future. + */ +template +class SharedPromise { +public: + SharedPromise() = default; + ~SharedPromise() = default; + + // not copyable + SharedPromise(SharedPromise const&) = delete; + SharedPromise& operator=(SharedPromise const&) = delete; + + // movable + SharedPromise(SharedPromise&&) noexcept; + SharedPromise& operator=(SharedPromise&&) noexcept; + + /** Return a Future tied to the shared core state. This can be called only + once, thereafter Future already retrieved exception will be raised. */ + Future getFuture(); + + /** Return the number of Futures associated with this SharedPromise */ + size_t size(); + + /** Fulfill the SharedPromise with an exception_wrapper */ + void setException(exception_wrapper ew); + + /** Fulfill the SharedPromise with an exception_ptr, e.g. + try { + ... + } catch (...) { + p.setException(std::current_exception()); + } + */ + void setException(std::exception_ptr const&) DEPRECATED; + + /** Fulfill the SharedPromise with an exception type E, which can be passed to + std::make_exception_ptr(). Useful for originating exceptions. If you + caught an exception the exception_wrapper form is more appropriate. + */ + template + typename std::enable_if::value>::type + setException(E const&); + + /// Set an interrupt handler to handle interrupts. See the documentation for + /// Future::raise(). Your handler can do whatever it wants, but if you + /// bother to set one then you probably will want to fulfill the SharedPromise with + /// an exception (or special value) indicating how the interrupt was + /// handled. + void setInterruptHandler(std::function); + + /// Fulfill this SharedPromise + template + typename std::enable_if::value, void>::type + setValue() { + set(Try()); + } + + /// Sugar to fulfill this SharedPromise + template + typename std::enable_if::value, void>::type + setValue() { + set(Try(T())); + } + + /** Set the value (use perfect forwarding for both move and copy) */ + template + void setValue(M&& value); + + void setTry(Try&& t); + + /** Fulfill this SharedPromise with the result of a function that takes no + arguments and returns something implicitly convertible to T. + Captures exceptions. e.g. + + p.setWith([] { do something that may throw; return a T; }); + */ + template + void setWith(F&& func); + +private: + std::mutex mutex_; + size_t size_{0}; + bool hasValue_{false}; + Try try_; + std::vector> promises_; +}; + +} + +#include +#include diff --git a/folly/futures/Try-inl.h b/folly/futures/Try-inl.h index 13e40389..19287311 100644 --- a/folly/futures/Try-inl.h +++ b/folly/futures/Try-inl.h @@ -23,7 +23,7 @@ namespace folly { template -Try::Try(Try&& t) : contains_(t.contains_) { +Try::Try(Try&& t) noexcept : contains_(t.contains_) { if (contains_ == Contains::VALUE) { new (&value_)T(std::move(t.value_)); } else if (contains_ == Contains::EXCEPTION) { @@ -32,7 +32,11 @@ Try::Try(Try&& t) : contains_(t.contains_) { } template -Try& Try::operator=(Try&& t) { +Try& Try::operator=(Try&& t) noexcept { + if (this == &t) { + return *this; + } + this->~Try(); contains_ = t.contains_; if (contains_ == Contains::VALUE) { @@ -43,6 +47,36 @@ Try& Try::operator=(Try&& t) { return *this; } +template +Try::Try(const Try& t) { + static_assert( + std::is_copy_constructible::value, + "T must be copyable for Try to be copyable"); + contains_ = t.contains_; + if (contains_ == Contains::VALUE) { + new (&value_)T(t.value_); + } else if (contains_ == Contains::EXCEPTION) { + new (&e_)std::unique_ptr(); + e_ = folly::make_unique(*(t.e_)); + } +} + +template +Try& Try::operator=(const Try& t) { + static_assert( + std::is_copy_constructible::value, + "T must be copyable for Try to be copyable"); + this->~Try(); + contains_ = t.contains_; + if (contains_ == Contains::VALUE) { + new (&value_)T(t.value_); + } else if (contains_ == Contains::EXCEPTION) { + new (&e_)std::unique_ptr(); + e_ = folly::make_unique(*(t.e_)); + } + return *this; +} + template Try::~Try() { if (contains_ == Contains::VALUE) { diff --git a/folly/futures/Try.h b/folly/futures/Try.h index 1fd13c02..9e791dc3 100644 --- a/folly/futures/Try.h +++ b/folly/futures/Try.h @@ -101,14 +101,14 @@ class Try { } // Move constructor - Try(Try&& t); + Try(Try&& t) noexcept; // Move assigner - Try& operator=(Try&& t); + Try& operator=(Try&& t) noexcept; - // Non-copyable - Try(const Try& t) = delete; - // Non-copyable - Try& operator=(const Try& t) = delete; + // Copy constructor + Try(const Try& t); + // Copy assigner + Try& operator=(const Try& t); ~Try(); diff --git a/folly/futures/test/SharedPromiseTest.cpp b/folly/futures/test/SharedPromiseTest.cpp new file mode 100644 index 00000000..280bb64c --- /dev/null +++ b/folly/futures/test/SharedPromiseTest.cpp @@ -0,0 +1,100 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +using namespace folly; + +TEST(SharedPromise, SetGet) { + SharedPromise p; + p.setValue(1); + auto f1 = p.getFuture(); + auto f2 = p.getFuture(); + EXPECT_EQ(1, f1.value()); + EXPECT_EQ(1, f2.value()); +} +TEST(SharedPromise, GetSet) { + SharedPromise p; + auto f1 = p.getFuture(); + auto f2 = p.getFuture(); + p.setValue(1); + EXPECT_EQ(1, f1.value()); + EXPECT_EQ(1, f2.value()); +} + +TEST(SharedPromise, GetSetGet) { + SharedPromise p; + auto f1 = p.getFuture(); + p.setValue(1); + auto f2 = p.getFuture(); + EXPECT_EQ(1, f1.value()); + EXPECT_EQ(1, f2.value()); +} + +TEST(SharedPromise, Reset) { + SharedPromise p; + + auto f1 = p.getFuture(); + p.setValue(1); + EXPECT_EQ(1, f1.value()); + + p = SharedPromise(); + auto f2 = p.getFuture(); + EXPECT_FALSE(f2.isReady()); + p.setValue(2); + EXPECT_EQ(2, f2.value()); +} + +TEST(SharedPromise, GetMoveSet) { + SharedPromise p; + auto f = p.getFuture(); + auto p2 = std::move(p); + p2.setValue(1); + EXPECT_EQ(1, f.value()); +} + +TEST(SharedPromise, SetMoveGet) { + SharedPromise p; + p.setValue(1); + auto p2 = std::move(p); + auto f = p2.getFuture(); + EXPECT_EQ(1, f.value()); +} + +TEST(SharedPromise, MoveSetGet) { + SharedPromise p; + auto p2 = std::move(p); + p2.setValue(1); + auto f = p2.getFuture(); + EXPECT_EQ(1, f.value()); +} + +TEST(SharedPromise, MoveGetSet) { + SharedPromise p; + auto p2 = std::move(p); + auto f = p2.getFuture(); + p2.setValue(1); + EXPECT_EQ(1, f.value()); +} + +TEST(SharedPromise, MoveMove) { + SharedPromise> p; + auto f1 = p.getFuture(); + auto f2 = p.getFuture(); + auto p2 = std::move(p); + p = std::move(p2); + p.setValue(std::make_shared(1)); +} diff --git a/folly/futures/test/Try.cpp b/folly/futures/test/Try.cpp index 7782c531..69a3e0da 100644 --- a/folly/futures/test/Try.cpp +++ b/folly/futures/test/Try.cpp @@ -21,6 +21,19 @@ using namespace folly; +// Make sure we can copy Trys for copyable types +TEST(Try, copy) { + Try t; + auto t2 = t; +} + +// But don't choke on move-only types +TEST(Try, moveOnly) { + Try> t; + std::vector>> v; + v.reserve(10); +} + TEST(Try, makeTryFunction) { auto func = []() { return folly::make_unique(1); diff --git a/folly/wangle/channel/OutputBufferingHandler.h b/folly/wangle/channel/OutputBufferingHandler.h index 48d8aa51..31abbdb0 100644 --- a/folly/wangle/channel/OutputBufferingHandler.h +++ b/folly/wangle/channel/OutputBufferingHandler.h @@ -60,7 +60,7 @@ class OutputBufferingHandler : public OutboundBytesToBytesHandler, getContext()->fireWrite(std::move(sends_)) .then([promises](Try t) mutable { for (auto& p : *promises) { - p.setTry(t); + p.setTry(Try(t)); } }); }