From: Dave Watson Date: Fri, 21 Feb 2014 20:59:25 +0000 (-0800) Subject: Move wangle to folly X-Git-Tag: v0.22.0~680 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=d0cb48287178710c04d0fbfa4f6f904084839280;p=folly.git Move wangle to folly Summary: * git mv * codemod facebook::wangle folly::wangle * Change 'runnable' to be a base class in wangle instead of thrift Justification: * std::future doesn't have then, whenall, etc. * boost::future doesn't support executors @override-unit-failures Test Plan: contbuild and pray Reviewed By: hans@fb.com FB internal diff: D1185194 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index e5fe6355..c89c16b1 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -125,7 +125,22 @@ nobase_follyinclude_HEADERS = \ Unicode.h \ Uri.h \ Uri-inl.h \ - Varint.h + Varint.h \ + wangle/Executor.h \ + wangle/Future-inl.h \ + wangle/Future.h \ + wangle/GenericThreadGate.h \ + wangle/InlineExecutor.h \ + wangle/Later-inl.h \ + wangle/Later.h \ + wangle/ManualExecutor.h \ + wangle/Promise-inl.h \ + wangle/Promise.h \ + wangle/ThreadGate.h \ + wangle/Try-inl.h \ + wangle/Try.h \ + wangle/WangleException.h \ + wangle/detail.h FormatTables.cpp: build/generate_format_tables.py build/generate_format_tables.py @@ -168,7 +183,10 @@ libfolly_la_SOURCES = \ ThreadCachedArena.cpp \ TimeoutQueue.cpp \ Unicode.cpp \ - Uri.cpp + Uri.cpp \ + wangle/InlineExecutor.cpp \ + wangle/ManualExecutor.cpp \ + wangle/ThreadGate.cpp if !HAVE_LINUX nobase_follyinclude_HEADERS += detail/Clock.h @@ -196,4 +214,3 @@ libfollybenchmark_la_LIBADD = libfolly.la libfollytimeout_queue_la_SOURCES = TimeoutQueue.cpp libfollytimeout_queue_la_LIBADD = libfolly.la - diff --git a/folly/wangle/Executor.h b/folly/wangle/Executor.h new file mode 100644 index 00000000..29037b47 --- /dev/null +++ b/folly/wangle/Executor.h @@ -0,0 +1,28 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace folly { namespace wangle { + class Executor : boost::noncopyable { + public: + virtual ~Executor() = default; + virtual void add(std::function&&) = 0; + }; +}} diff --git a/folly/wangle/Future-inl.h b/folly/wangle/Future-inl.h new file mode 100644 index 00000000..ce9cb7fc --- /dev/null +++ b/folly/wangle/Future-inl.h @@ -0,0 +1,402 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "detail.h" + +namespace folly { namespace wangle { + +template +struct isFuture { + static const bool value = false; +}; + +template +struct isFuture > { + static const bool value = true; +}; + +template +Future::Future(Future&& other) : obj_(other.obj_) { + other.obj_ = nullptr; +} + +template +Future& Future::operator=(Future&& other) { + std::swap(obj_, other.obj_); + return *this; +} + +template +Future::~Future() { + if (obj_) { + if (obj_->ready()) { + delete obj_; + } else { + setContinuation([](Try&&) {}); // detach + } + } +} + +template +void Future::throwIfInvalid() const { + if (!obj_) + throw NoState(); +} + +template +template +void Future::setContinuation(F&& func) { + throwIfInvalid(); + obj_->setContinuation(std::move(func)); + obj_ = nullptr; +} + +template +template +typename std::enable_if< + !isFuture&&)>::type>::value, + Future&&)>::type> >::type +Future::then(F&& func) { + typedef typename std::result_of&&)>::type B; + + throwIfInvalid(); + + // wrap these so we can move them into the lambda + folly::MoveWrapper> p; + folly::MoveWrapper funcm(std::forward(func)); + + // grab the Future now before we lose our handle on the Promise + auto f = p->getFuture(); + + /* This is a bit tricky. + + We can't just close over *this in case this Future gets moved. So we + make a new dummy Future. We could figure out something more + sophisticated that avoids making a new Future object when it can, as an + optimization. But this is correct. + + obj_ can't be moved, it is explicitly disallowed (as is copying). But + if there's ever a reason to allow it, this is one place that makes that + assumption and would need to be fixed. We use a standard shared pointer + for obj_ (by copying it in), which means in essence obj holds a shared + pointer to itself. But this shouldn't leak because Promise will not + outlive the continuation, because Promise will setException() with a + broken Promise if it is destructed before completed. We could use a + weak pointer but it would have to be converted to a shared pointer when + func is executed (because the Future returned by func may possibly + persist beyond the callback, if it gets moved), and so it is an + optimization to just make it shared from the get-go. + + We have to move in the Promise and func using the MoveWrapper + hack. (func could be copied but it's a big drag on perf). + + Two subtle but important points about this design. FutureObject has no + back pointers to Future or Promise, so if Future or Promise get moved + (and they will be moved in performant code) we don't have to do + anything fancy. And because we store the continuation in the + FutureObject, not in the Future, we can execute the continuation even + after the Future has gone out of scope. This is an intentional design + decision. It is likely we will want to be able to cancel a continuation + in some circumstances, but I think it should be explicit not implicit + in the destruction of the Future used to create it. + */ + setContinuation( + [p, funcm](Try&& t) mutable { + p->fulfil([&]() { + return (*funcm)(std::move(t)); + }); + }); + + return std::move(f); +} + +template +template +typename std::enable_if< + isFuture&&)>::type>::value, + Future&&)>::type::value_type> >::type +Future::then(F&& func) { + typedef typename std::result_of&&)>::type::value_type B; + + throwIfInvalid(); + + // wrap these so we can move them into the lambda + folly::MoveWrapper> p; + folly::MoveWrapper funcm(std::forward(func)); + + // grab the Future now before we lose our handle on the Promise + auto f = p->getFuture(); + + setContinuation( + [p, funcm](Try&& t) mutable { + try { + auto f2 = (*funcm)(std::move(t)); + // that didn't throw, now we can steal p + f2.setContinuation([p](Try&& b) mutable { + p->fulfilTry(std::move(b)); + }); + } catch (...) { + p->setException(std::current_exception()); + } + }); + + return std::move(f); +} + +template +Future Future::then() { + return then([] (Try&& t) {}); +} + +template +typename std::add_lvalue_reference::type Future::value() { + throwIfInvalid(); + + return obj_->value(); +} + +template +typename std::add_lvalue_reference::type Future::value() const { + throwIfInvalid(); + + return obj_->value(); +} + +template +Try& Future::valueTry() { + throwIfInvalid(); + + return obj_->valueTry(); +} + +template +template +inline Future Future::executeWithSameThread(Executor* executor) { + throwIfInvalid(); + + folly::MoveWrapper> p; + auto f = p->getFuture(); + + setContinuation([executor, p](Try&& t) mutable { + folly::MoveWrapper> tt(std::move(t)); + executor->add([p, tt]() mutable { + p->fulfilTry(std::move(*tt)); + }); + }); + + return f; +} + +template +template +inline void Future::executeWith( + Executor* executor, Promise&& cont_promise) { + throwIfInvalid(); + + folly::MoveWrapper> p(std::move(cont_promise)); + + setContinuation([executor, p](Try&& t) mutable { + folly::MoveWrapper> tt(std::move(t)); + executor->add([p, tt]() mutable { + p->fulfilTry(std::move(*tt)); + }); + }); +} + +template +bool Future::isReady() const { + throwIfInvalid(); + return obj_->ready(); +} + +// makeFuture + +template +Future::type> makeFuture(T&& t) { + Promise::type> p; + auto f = p.getFuture(); + p.setValue(std::forward(t)); + return std::move(f); +} + +inline // for multiple translation units +Future makeFuture() { + Promise p; + auto f = p.getFuture(); + p.setValue(); + return std::move(f); +} + +template +auto makeFutureTry( + F&& func, + typename std::enable_if::value, bool>::type sdf) + -> Future { + Promise p; + auto f = p.getFuture(); + p.fulfil( + [&func]() { + return (func)(); + }); + return std::move(f); +} + +template +auto makeFutureTry(F const& func) -> Future { + F copy = func; + return makeFutureTry(std::move(copy)); +} + +template +Future makeFuture(std::exception_ptr const& e) { + Promise p; + auto f = p.getFuture(); + p.setException(e); + return std::move(f); +} + +template +typename std::enable_if::value, Future>::type +makeFuture(E const& e) { + Promise p; + auto f = p.getFuture(); + p.fulfil([&]() -> T { throw e; }); + return std::move(f); +} + +// when (variadic) + +template +typename detail::VariadicContext::type +whenAll(Fs&... fs) +{ + auto ctx = new detail::VariadicContext(); + ctx->total = sizeof...(fs); + auto f_saved = ctx->p.getFuture(); + detail::whenAllVariadicHelper(ctx, fs...); + return std::move(f_saved); +} + +// when (iterator) + +template +Future< + std::vector< + Try::value_type::value_type>>> +whenAll(InputIterator first, InputIterator last) +{ + typedef + typename std::iterator_traits::value_type::value_type T; + + auto n = std::distance(first, last); + if (n == 0) + return makeFuture>>({}); + + auto ctx = new detail::WhenAllContext(); + + ctx->total = n; + ctx->results.resize(ctx->total); + + auto f_saved = ctx->p.getFuture(); + + for (size_t i = 0; first != last; ++first, ++i) { + auto& f = *first; + f.setContinuation([ctx, i](Try&& t) { + ctx->results[i] = std::move(t); + if (++ctx->count == ctx->total) { + ctx->p.setValue(std::move(ctx->results)); + delete ctx; + } + }); + } + + return std::move(f_saved); +} + +template +Future< + std::pair::value_type::value_type> > > +whenAny(InputIterator first, InputIterator last) { + typedef + typename std::iterator_traits::value_type::value_type T; + + auto ctx = new detail::WhenAnyContext(std::distance(first, last)); + auto f_saved = ctx->p.getFuture(); + + for (size_t i = 0; first != last; first++, i++) { + auto& f = *first; + f.setContinuation([i, ctx](Try&& t) { + if (!ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t))); + } + ctx->decref(); + }); + } + + return std::move(f_saved); +} + +template +Future::value_type::value_type>>>> +whenN(InputIterator first, InputIterator last, size_t n) { + typedef typename + std::iterator_traits::value_type::value_type T; + typedef std::vector>> V; + + struct ctx_t { + V v; + size_t completed; + Promise p; + }; + auto ctx = std::make_shared(); + ctx->completed = 0; + + // for each completed Future, increase count and add to vector, until we + // have n completed futures at which point we fulfil our Promise with the + // vector + auto it = first; + size_t i = 0; + while (it != last) { + it->then([ctx, n, i](Try&& t) { + auto& v = ctx->v; + auto c = ++ctx->completed; + if (c <= n) { + assert(ctx->v.size() < n); + v.push_back(std::make_pair(i, std::move(t))); + if (c == n) { + ctx->p.fulfilTry(Try(std::move(v))); + } + } + }); + + it++; + i++; + } + + if (i < n) { + ctx->p.setException(std::runtime_error("Not enough futures")); + } + + return ctx->p.getFuture(); +} + +}} diff --git a/folly/wangle/Future.h b/folly/wangle/Future.h new file mode 100644 index 00000000..48e8822e --- /dev/null +++ b/folly/wangle/Future.h @@ -0,0 +1,248 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "folly/MoveWrapper.h" +#include "Promise.h" +#include "Try.h" + +namespace folly { namespace wangle { + +template struct isFuture; + +template +class Future { + public: + typedef T value_type; + + // not copyable + Future(Future const&) = delete; + Future& operator=(Future const&) = delete; + + // movable + Future(Future&&); + Future& operator=(Future&&); + + ~Future(); + + /** Return the reference to result. Should not be called if !isReady(). + Will rethrow the exception if an exception has been + captured. + + This function is not thread safe - the returned Future can only + be executed from the thread that the executor runs it in. + See below for a thread safe version + */ + typename std::add_lvalue_reference::type + value(); + typename std::add_lvalue_reference::type + value() const; + + template + Future executeWithSameThread(Executor* executor); + + /** + Thread-safe version of executeWith + + Since an executor would likely start executing the Future chain + right away, it would be a race condition to call: + Future.executeWith(...).then(...), as there would be race + condition between the then and the running Future. + Instead, you may pass in a Promise so that we can set up + the rest of the chain in advance, without any racey + modifications of the continuation + */ + template + void executeWith(Executor* executor, Promise&& cont_promise); + + /** True when the result (or exception) is ready. value() will not block + when this returns true. */ + bool isReady() const; + + /** Wait until the result (or exception) is ready. Once this returns, + value() will not block, and isReady() will return true. + + XXX This implementation is simplistic and inefficient, but it does work + and a fully intelligent implementation is coming down the pipe. + */ + void wait() const { + while (!isReady()) { + // spin + std::this_thread::yield(); + } + } + + Try& valueTry(); + + /** When this Future has completed, execute func which is a function that + takes a Try&&. A Future for the return type of func is + returned. e.g. + + Future f2 = f1.then([](Try&&) { return string("foo"); }); + + The functor given may call value() without blocking, which may rethrow if + this has captured an exception. If func throws, the exception will be + captured in the Future that is returned. + */ + /* n3428 has then(scheduler&, F&&), we might want to reorganize to use + similar API. or maybe not */ + template + typename std::enable_if< + !isFuture&&)>::type>::value, + Future&&)>::type> >::type + then(F&& func); + + template + typename std::enable_if< + isFuture&&)>::type>::value, + Future&&)>::type::value_type> >::type + then(F&& func); + + /** Use this method on the Future when we don't really care about the + returned value and want to convert the Future to a Future + Convenience function + */ + Future then(); + + template + void setContinuation(F&& func); + + private: + /* Eventually this may not be a shared_ptr, but something similar without + expensive thread-safety. */ + typedef detail::FutureObject* objPtr; + + // shared state object + objPtr obj_; + + explicit + Future(objPtr obj) : obj_(obj) {} + + void throwIfInvalid() const; + + friend class Promise; +}; + +/** Make a completed Future by moving in a value. e.g. + auto f = makeFuture(string("foo")); +*/ +template +Future::type> makeFuture(T&& t); + +/** Make a completed void Future. */ +Future makeFuture(); + +/** Make a completed Future by executing a function. If the function throws + we capture the exception, otherwise we capture the result. */ +template +auto makeFutureTry( + F&& func, + typename std::enable_if< + !std::is_reference::value, bool>::type sdf = false) + -> Future; + +template +auto makeFutureTry( + F const& func) + -> Future; + +/** Make a completed (error) Future from an exception_ptr. Because the type +can't be inferred you have to give it, e.g. + +auto f = makeFuture(std::current_exception()); +*/ +template +Future makeFuture(std::exception_ptr const& e); + +/** Make a Future from an exception type E that can be passed to + std::make_exception_ptr(). */ +template +typename std::enable_if::value, Future>::type +makeFuture(E const& e); + +/** When all the input Futures complete, the returned Future will complete. + Errors do not cause early termination; this Future will always succeed + after all its Futures have finished (whether successfully or with an + error). + + The Futures are moved in, so your copies are invalid. If you need to + chain further from these Futures, use the variant with an output iterator. + + This function is thread-safe for Futures running on different threads. + + The return type for Future input is a Future>> + */ +template +Future::value_type::value_type>>> +whenAll(InputIterator first, InputIterator last); + +/** This version takes a varying number of Futures instead of an iterator. + The return type for (Future, Future, ...) input + is a Future, Try, ...>>. + */ +template +typename detail::VariadicContext::type +whenAll(Fs&... fs); + +/** The result is a pair of the index of the first Future to complete and + the Try. If multiple Futures complete at the same time (or are already + complete when passed in), the "winner" is chosen non-deterministically. + + This function is thread-safe for Futures running on different threads. + */ +template +Future::value_type::value_type>>> +whenAny(InputIterator first, InputIterator last); + +/** when n Futures have completed, the Future completes with a vector of + the index and Try of those n Futures (the indices refer to the original + order, but the result vector will be in an arbitrary order) + + Not thread safe. + */ +template +Future::value_type::value_type>>>> +whenN(InputIterator first, InputIterator last, size_t n); + +}} // folly::wangle + +#include "Future-inl.h" + +/* + +TODO + +I haven't included a Future specialization because I don't forsee us +using it, however it is not difficult to add when needed. Refer to +Future for guidance. std::Future and boost::Future code would also be +instructive. + +I think that this might be a good candidate for folly, once it has baked for +awhile. + +*/ diff --git a/folly/wangle/GenericThreadGate.h b/folly/wangle/GenericThreadGate.h new file mode 100644 index 00000000..01a41c25 --- /dev/null +++ b/folly/wangle/GenericThreadGate.h @@ -0,0 +1,65 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "ThreadGate.h" +#include "Executor.h" +#include + +namespace folly { namespace wangle { + +template < + class WestExecutorPtr = Executor*, + class EastExecutorPtr = Executor*, + class WaiterPtr = void*> +class GenericThreadGate : public ThreadGate { +public: + /** + EastExecutor and WestExecutor respond threadsafely to + `add(std::function&&)` + + Waiter responds to `makeProgress()`. It may block, as long as progress + will be made on the west front. + */ + GenericThreadGate(WestExecutorPtr west, + EastExecutorPtr east, + WaiterPtr waiter = nullptr) : + westExecutor(west), + eastExecutor(east), + waiter(waiter) + {} + + void addWest(std::function&& fn) { westExecutor->add(std::move(fn)); } + void addEast(std::function&& fn) { eastExecutor->add(std::move(fn)); } + + virtual void makeProgress() { + makeProgress_(std::is_same()); + } + + WestExecutorPtr westExecutor; + EastExecutorPtr eastExecutor; + WaiterPtr waiter; +private: + void makeProgress_(std::true_type const&) { + throw std::logic_error("No waiter."); + } + + void makeProgress_(std::false_type const&) { + waiter->makeProgress(); + } +}; + +}} // executor diff --git a/folly/wangle/InlineExecutor.cpp b/folly/wangle/InlineExecutor.cpp new file mode 100644 index 00000000..632558c2 --- /dev/null +++ b/folly/wangle/InlineExecutor.cpp @@ -0,0 +1,16 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + diff --git a/folly/wangle/InlineExecutor.h b/folly/wangle/InlineExecutor.h new file mode 100644 index 00000000..8c338b15 --- /dev/null +++ b/folly/wangle/InlineExecutor.h @@ -0,0 +1,29 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "folly/wangle/Executor.h" + +namespace folly { namespace wangle { + + class InlineExecutor : public Executor { + public: + void add(std::function&& f) override { + f(); + } + }; + +}} diff --git a/folly/wangle/Later-inl.h b/folly/wangle/Later-inl.h new file mode 100644 index 00000000..e17f2d7e --- /dev/null +++ b/folly/wangle/Later-inl.h @@ -0,0 +1,140 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "folly/wangle/Executor.h" +#include "folly/wangle/Future.h" +#include "folly/Optional.h" + +namespace folly { namespace wangle { + +template +struct isLater { + static const bool value = false; +}; + +template +struct isLater > { + static const bool value = true; +}; + +template +struct isLaterOrFuture { + static const bool value = false; +}; + +template +struct isLaterOrFuture> { + static const bool value = true; +}; + +template +struct isLaterOrFuture> { + static const bool value = true; +}; + +template +template +Later::Later() { + future_ = starter_.getFuture(); +} + +template +Later::Later(Promise&& starter) + : starter_(std::forward>(starter)) { } + +template +template +Later::Later(U&& input) { + folly::MoveWrapper> promise; + folly::MoveWrapper inputm(std::forward(input)); + future_ = promise->getFuture(); + starter_.getFuture().then([=](Try&& t) mutable { + promise->setValue(std::move(*inputm)); + }); +} + +template +template +typename std::enable_if< + !isLaterOrFuture&&)>::type>::value, + Later&&)>::type> >::type +Later::then(F&& fn) { + typedef typename std::result_of&&)>::type B; + + Later later(std::move(starter_)); + later.future_ = future_->then(std::forward(fn)); + return later; +} + +template +template +typename std::enable_if< + isFuture&&)>::type>::value, + Later&&)>::type::value_type> >::type +Later::then(F&& fn) { + typedef typename std::result_of&&)>::type::value_type B; + + Later later(std::move(starter_)); + later.future_ = future_->then(std::move(fn)); + return later; +} + +template +template +typename std::enable_if< + isLater&&)>::type>::value, + Later&&)>::type::value_type> >::type +Later::then(F&& fn) { + typedef typename std::result_of&&)>::type::value_type B; + + folly::MoveWrapper> promise; + folly::MoveWrapper fnm(std::move(fn)); + Later later(std::move(starter_)); + later.future_ = promise->getFuture(); + future_->then([=](Try&& t) mutable { + (*fnm)(std::move(t)) + .then([=](Try&& t2) mutable { + promise->fulfilTry(std::move(t2)); + }) + .launch(); + }); + return later; +} + +template +Later Later::via(Executor* executor) { + Promise promise; + Later later(std::move(starter_)); + later.future_ = promise.getFuture(); + future_->executeWith(executor, std::move(promise)); + return later; +} + +template +Future Later::launch() { + starter_.setValue(); + return std::move(*future_); +} + +template +void Later::fireAndForget() { + future_->setContinuation([] (Try&& t) {}); // detach + starter_.setValue(); +} + +}} diff --git a/folly/wangle/Later.h b/folly/wangle/Later.h new file mode 100644 index 00000000..213b527b --- /dev/null +++ b/folly/wangle/Later.h @@ -0,0 +1,150 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "folly/wangle/Executor.h" +#include "folly/wangle/Future.h" +#include "folly/Optional.h" + +namespace folly { namespace wangle { + +template struct isLaterOrFuture; +template struct isLater; + +/* + * Since wangle primitives (promise/future) are not thread safe, it is difficult + * to build complex asynchronous workflows. A Later allows you to build such a + * workflow before actually launching it so that continuations can be set in a + * threadsafe manner. + * + * The interface to add additional work is the same as future: a then() method + * that can take either a type T, a Future, or a Later + * + * Thread transitions are done by using executors and calling the via() method. + * + * Here is an example of a workflow: + * + * Later later(std::move(request)); + * + * auto future = later. + * .via(cpuExecutor) + * .then([=](Try&& t) { return doCpuWork(t.value()); }) + * .via(diskExecutor) + * .then([=](Try&& t) { return doDiskWork(t.value()); }) + * .via(serverExecutor) + * .then([=]Try&& t) { return sendClientResponse(t.value()); }) + * .launch(); + * + * Although this workflow traverses many threads, we are able to string + * continuations together in a threadsafe manner. + * + * Laters can also be used to wrap preexisting asynchronous modules that were + * not built with wangle in mind. You can create a Later with a function that + * takes a callback as input. The function will not actually be called until + * launch(), allowing you to string then() statements on top of the callback. + */ +template +class Later { + public: + typedef T value_type; + + template ::value>::type, + class = typename std::enable_if::value>::type> + Later(); + + template ::value>::type, + class = typename std::enable_if::value>::type> + explicit Later(U&& input); + + /* + * then() adds additional work to the end of the workflow. If the lambda + * provided to then() returns a future, that future must be fulfilled in the + * same thread of the last set executor (either at constructor or from a call + * to via()). + */ + template + typename std::enable_if< + !isLaterOrFuture&&)>::type>::value, + Later&&)>::type> >::type + then(F&& fn); + + template + typename std::enable_if< + isFuture&&)>::type>::value, + Later&&)>::type::value_type> >::type + then(F&& fn); + + /* + * If the function passed to then() returns a Later, calls to then() will + * be chained to the new Later before launching the new Later. + * + * This can be used to build asynchronous modules that can be called from a + * user thread and completed in a callback thread. Callbacks can be set up + * ahead of time without thread safety issues. + * + * Using the Later(std::function)>&& fn) + * constructor, you can wrap existing asynchronous modules with a Later and + * can chain it to wangle asynchronous workflows via this call. + */ + template + typename std::enable_if< + isLater&&)>::type>::value, + Later&&)>::type::value_type> >::type + then(F&& fn); + + /* + * Resets the executor - all then() calls made after the call to via() will be + * made in the new executor. + */ + Later via(Executor* executor); + + /* + * Starts the workflow. The function provided in the constructor will be + * called in the executor provided in the constructor. All proximate then() + * calls will be made, potentially changing threads if a via() call is made. + * The future returned will be fulfilled in the last executor. + * + * Thread safety issues of Futures still apply. If you want to wait on the + * Future, it must be done in the thread that will fulfill it. If you do not + * plan to use the result of the Future, use fireAndForget() + */ + Future launch(); + + /* + * Same as launch, only no Future is returned. This guarantees thread safe + * cleanup of the internal Futures, even if the Later completes in a different + * thread than the thread that calls fireAndForget(). + */ + void fireAndForget(); + + private: + Promise starter_; + folly::Optional> future_; + + struct hide { }; + + explicit Later(Promise&& starter); + + template + friend class Later; +}; + +}} + +#include "Later-inl.h" diff --git a/folly/wangle/ManualExecutor.cpp b/folly/wangle/ManualExecutor.cpp new file mode 100644 index 00000000..8caf3585 --- /dev/null +++ b/folly/wangle/ManualExecutor.cpp @@ -0,0 +1,86 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ManualExecutor.h" + +#include + +#include + +namespace folly { namespace wangle { + +ManualExecutor::ManualExecutor() { + if (sem_init(&sem_, 0, 0) == -1) { + throw std::runtime_error(std::string("sem_init: ") + strerror(errno)); + } +} + +void ManualExecutor::add(std::function&& callback) { + std::lock_guard lock(lock_); + runnables_.push(callback); + sem_post(&sem_); +} + +size_t ManualExecutor::run() { + size_t count; + size_t n; + std::function runnable; + + { + std::lock_guard lock(lock_); + n = runnables_.size(); + } + + for (count = 0; count < n; count++) { + { + std::lock_guard lock(lock_); + if (runnables_.empty()) { + break; + } + + // Balance the semaphore so it doesn't grow without bound + // if nobody is calling wait(). + // This may fail (with EAGAIN), that's fine. + sem_trywait(&sem_); + + runnable = std::move(runnables_.front()); + runnables_.pop(); + } + runnable(); + } + + return count; +} + +void ManualExecutor::wait() { + while (true) { + { + std::lock_guard lock(lock_); + if (!runnables_.empty()) + break; + } + + auto ret = sem_wait(&sem_); + if (ret == 0) { + break; + } + if (errno != EINVAL) { + throw std::runtime_error(std::string("sem_wait: ") + strerror(errno)); + } + } +} + +}} // namespace diff --git a/folly/wangle/ManualExecutor.h b/folly/wangle/ManualExecutor.h new file mode 100644 index 00000000..93c2591a --- /dev/null +++ b/folly/wangle/ManualExecutor.h @@ -0,0 +1,51 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "folly/wangle/Executor.h" +#include +#include +#include +#include + +namespace folly { namespace wangle { + + class ManualExecutor : public Executor { + public: + ManualExecutor(); + + void add(std::function&&) override; + + /// Do work. Returns the number of runnables that were executed (maybe 0). + /// Non-blocking. + size_t run(); + + /// Wait for work to do. + void wait(); + + /// Wait for work to do, and do it. + void makeProgress() { + wait(); + run(); + } + + private: + std::mutex lock_; + std::queue> runnables_; + sem_t sem_; + }; + +}} diff --git a/folly/wangle/Promise-inl.h b/folly/wangle/Promise-inl.h new file mode 100644 index 00000000..6e7e4947 --- /dev/null +++ b/folly/wangle/Promise-inl.h @@ -0,0 +1,160 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "WangleException.h" +#include "detail.h" + +namespace folly { namespace wangle { + +template +Promise::Promise() : retrieved_(false), obj_(new detail::FutureObject()) +{} + +template +Promise::Promise(Promise&& other) : +retrieved_(other.retrieved_), obj_(other.obj_) { + other.obj_ = nullptr; +} + +template +Promise& Promise::operator=(Promise&& other) { + std::swap(obj_, other.obj_); + std::swap(retrieved_, other.retrieved_); + return *this; +} + +template +void Promise::throwIfFulfilled() { + if (!obj_) + throw PromiseAlreadySatisfied(); +} + +template +void Promise::throwIfRetrieved() { + if (retrieved_) + throw FutureAlreadyRetrieved(); +} + +template +Promise::~Promise() { + if (obj_) { + setException(BrokenPromise()); + } +} + +template +Future Promise::getFuture() { + throwIfRetrieved(); + throwIfFulfilled(); + retrieved_ = true; + return Future(obj_); +} + +template +template +void Promise::setException(E const& e) { + throwIfFulfilled(); + setException(std::make_exception_ptr(e)); +} + +template +void Promise::setException(std::exception_ptr const& e) { + throwIfFulfilled(); + obj_->setException(e); + if (!retrieved_) { + delete obj_; + } + obj_ = nullptr; +} + +template +void Promise::fulfilTry(Try&& t) { + throwIfFulfilled(); + obj_->fulfil(std::move(t)); + if (!retrieved_) { + delete obj_; + } + obj_ = nullptr; +} + +template +template +void Promise::setValue(M&& v) { + static_assert(!std::is_same::value, + "Use setValue() instead"); + + throwIfFulfilled(); + obj_->fulfil(Try(std::forward(v))); + if (!retrieved_) { + delete obj_; + } + obj_ = nullptr; +} + +template +void Promise::setValue() { + static_assert(std::is_same::value, + "Use setValue(value) instead"); + + throwIfFulfilled(); + obj_->fulfil(Try()); + if (!retrieved_) { + delete obj_; + } + obj_ = nullptr; +} + +template +template +void Promise::fulfil(const F& func) { + fulfilHelper(func); +} + +template +template +typename std::enable_if< + std::is_convertible::type, T>::value && + !std::is_same::value>::type +inline Promise::fulfilHelper(const F& func) { + throwIfFulfilled(); + try { + setValue(func()); + } catch (...) { + setException(std::current_exception()); + } +} + +template +template +typename std::enable_if< + std::is_same::type, void>::value && + std::is_same::value>::type +inline Promise::fulfilHelper(const F& func) { + throwIfFulfilled(); + try { + func(); + setValue(); + } catch (...) { + setException(std::current_exception()); + } +} + +}} diff --git a/folly/wangle/Promise.h b/folly/wangle/Promise.h new file mode 100644 index 00000000..f2a354fd --- /dev/null +++ b/folly/wangle/Promise.h @@ -0,0 +1,105 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "Try.h" +#include "Future.h" + +namespace folly { namespace wangle { + +// forward declaration +template class Future; + +template +class Promise { +public: + Promise(); + ~Promise(); + + // not copyable + Promise(Promise const&) = delete; + Promise& operator=(Promise const&) = delete; + + // movable + Promise(Promise&&); + Promise& operator=(Promise&&); + + /** Return a Future tied to the shared state. This can be called only + once, thereafter Future already retrieved exception will be raised. */ + Future getFuture(); + + /** Fulfil the Promise with an exception_ptr, e.g. + try { + ... + } catch (...) { + p.setException(std::current_exception()); + } + */ + void setException(std::exception_ptr const&); + + /** Fulfil the Promise with an exception type E, which can be passed to + std::make_exception_ptr(). Useful for originating exceptions. If you + caught an exception the exception_ptr form is more appropriate. + */ + template void setException(E const&); + + /** Fulfil this Promise (only for Promise) */ + void setValue(); + + /** Set the value (use perfect forwarding for both move and copy) */ + template + void setValue(M&& value); + + void fulfilTry(Try&& t); + + /** Fulfil this Promise with the result of a function that takes no + arguments and returns something implicitly convertible to T. + Captures exceptions. e.g. + + p.fulfil([] { do something that may throw; return a T; }); + */ + template + void fulfil(const F& func); + +private: + typedef typename Future::objPtr objPtr; + + // Whether the Future has been retrieved (a one-time operation). + bool retrieved_; + + // shared state object + objPtr obj_; + + void throwIfFulfilled(); + void throwIfRetrieved(); + + template + typename std::enable_if< + std::is_convertible::type, T>::value && + !std::is_same::value>::type + fulfilHelper(const F& func); + + template + typename std::enable_if< + std::is_same::type, void>::value && + std::is_same::value>::type + fulfilHelper(const F& func); +}; + +}} + +#include "Promise-inl.h" diff --git a/folly/wangle/README b/folly/wangle/README new file mode 100644 index 00000000..76d482b4 --- /dev/null +++ b/folly/wangle/README @@ -0,0 +1 @@ +Please see https://our.intern.facebook.com/intern/dex/wangle/ diff --git a/folly/wangle/ThreadGate.cpp b/folly/wangle/ThreadGate.cpp new file mode 100644 index 00000000..e27142db --- /dev/null +++ b/folly/wangle/ThreadGate.cpp @@ -0,0 +1,28 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ThreadGate.h" +#include + +namespace folly { namespace wangle { + +void ThreadGate::makeProgress() +{ + throw std::logic_error("This ThreadGate doesn't know how to " + "make progress."); +} + +}} // namespace diff --git a/folly/wangle/ThreadGate.h b/folly/wangle/ThreadGate.h new file mode 100644 index 00000000..00929d25 --- /dev/null +++ b/folly/wangle/ThreadGate.h @@ -0,0 +1,192 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include "Future.h" + +namespace folly { namespace wangle { + +/** + Yo dawg, I heard you like asynchrony so I put asynchrony in your asynchronous + framework. + + Wangle's futures and promises are not thread safe. Counterintuitive as this + may seem at first, this is very intentional. Making futures and promises + threadsafe drastically reduces their performance. + + On the other hand, an asynchronous framework isn't much use if you can't do + asynchronous things in other threads. So we use the ThreadGate strategy to + decouple the threads and their futures with a form of message passing. + + There are two actors, the east thread which does the asynchronous operation + (the server) and the west thread that wants the asynchronous operation done + (the client). + + The client calls gate(fn), which returns a Future. Practically speaking + the returned Future is the same as the Future returned by fn. But + there are actually two futures involved - the original Future which will be + generated by fn (called the east Future), and the Future actually returned + by gate(fn) (called the west Future). + + These two futures are decoupled, and although the fulfilment of the east + Future eventually causes fulfilment of the west Future, those fulfilments + happen in their own threads. + + In order to make and use a ThreadGate, you need to provide a strategy for + executing code in the east and west threads. These strategies may be + different. The only requirement is a threadsafe method + `void add(function&&)`. You may find the executors in + Executor.h handy, but ensure that you are using them + threadsafely. + + In order for your ThreadGate to do anything, you need to drive those + executors somehow. An event loop is a natural fit. A thread pool might be + made to work. You could use a busy loop to make a very expensive space + heater. 0MQ would be pleasant. + + Another pattern supported by the ThreadGate is the single-thread pattern. In + this pattern, non-blocking I/O drives the asynchronous operation, and + futures are fulfilled in an event loop callback. In this scenario, + ThreadGate is largely superfluous, and the executors would likely just + execute code immediately and inline (and therefore not need to be driven, or + threadsafe). But a Waiter strategy that makes progress by driving the event + loop one iteration would allow for gate-and-wait code which is agnostic to + the small detail that everything happens in one thread. It would also make + Future change toward a multithreaded architecture easier, as you need only + change the components of the ThreadGate which your client code is already + using. + */ +class ThreadGate { +public: + virtual ~ThreadGate() {} + + /** + Returns a Future that will be fulfilled after the Future that will be + returned by fn() has been fulfilled, with the same value or exception + (moved). + + There's a lot of nuance in that sentence. Let's break it down. + + fn kicks off the asynchronous operation (makes the east Promise), and must + be executed in the east thread because the east thread is where the east + Promise will be fulfilled. Since gate is being called from the west + thread, we must gate fn using the east executor. fn is not executed + immediately, it is queued up and will be executed by the east thread as it + drives the executor. + + We create the west Promise and return its Future. + + When the east thread executes its task, fn is called and the resulting + Future gets a callback that will gate another task back to the west. + + Sometime later, the asynchronous operation completes and the east Promise + is fulfilled. Then the east Future executes its callback, which adds a + task to the west executor that task is to fulfil the west Promise with the + same Try, and it will execute in the west thread. + + At this point, the west Future is still unfulfilled, even though the east + Future has been fulfilled and its callback has finished executing. Only + when the west executor is driven to execute that task, the west Future + will be completed and its callbacks called. + + In summary, both east and west need to have plans to drive their + executors, or nothing will actually happen. When the executors are driven, + then everything flows. */ + template + Future gate(std::function()>&& fn) { + Promise pWest; + Future fWest = pWest.getFuture(); + + gate(std::move(fn), std::move(pWest)); + return fWest; + } + + /** + * This version of gate is to support use cases where the calling thread is + * not the west thread. Here is an example use case. + * + * Promise pWest; + * Future fWest = pWest.getFuture(); + * + * // Set up callbacks for west from a thread that is not west. + * fWest.then(...).then(...); + * + * threadGate.gate(..., std::move(pWest)); + * + * This function assumes that it is safe to call addEast from a thread that is + * not the west thread. + */ + template + void gate(std::function()>&& fn, + Promise&& p) { + folly::MoveWrapper> pWest(std::move(p)); + folly::MoveWrapper()>> fnm(std::move(fn)); + this->addEast([pWest, fnm, this]() mutable { + (*fnm)().then([pWest, this](Try&& t) mutable { + folly::MoveWrapper> tm(std::move(t)); + this->addWest([pWest, tm]() mutable { + pWest->fulfilTry(std::move(*tm)); + }); + }); + }); + } + + /** + If your workflow calls for synchronizing with a + west Future, then you may call waitFor, but if your west thread is + event-driven you will probably not need to call waitFor. + + In order for waitFor to behave properly, you must ensure that the Waiter's + makeProgress method causes some progress to be made on the west thread, + i.e. drives the west executor either directly or indirectly. + + (Naturally, progress needs to be made on the east thread as well. i.e. the + east executor is driven, the asynchronous operation happens, and its + Promise is fulfilled. It is likely that none of this concerns the consumer + of waitFor.) + + This is the only function that uses the Waiter. It is never called + internally. Therefore, if you never use waitFor you can safely provide a + DummyWaiter. + */ + template + void waitFor(Future const& f) { + while (!f.isReady()) { + this->makeProgress(); + } + } + + template + typename std::add_lvalue_reference::type + value(Future& f) { + waitFor(f); + return f.value(); + } + + template + typename std::add_lvalue_reference::type + value(Future const& f) { + waitFor(f); + return f.value(); + } + + virtual void addEast(std::function&&) = 0; + virtual void addWest(std::function&&) = 0; + virtual void makeProgress(); +}; + +}} // namespace diff --git a/folly/wangle/Try-inl.h b/folly/wangle/Try-inl.h new file mode 100644 index 00000000..68b41212 --- /dev/null +++ b/folly/wangle/Try-inl.h @@ -0,0 +1,93 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "WangleException.h" + +namespace folly { namespace wangle { + +template +Try::Try(Try&& t) : contains_(t.contains_) { + if (contains_ == VALUE) { + new (&value_)T(std::move(t.value_)); + } else if (contains_ == EXCEPTION) { + new (&e_)std::exception_ptr(t.e_); + } +} + +template +Try& Try::operator=(Try&& t) { + this->~Try(); + contains_ = t.contains_; + if (contains_ == VALUE) { + new (&value_)T(std::move(t.value_)); + } else if (contains_ == EXCEPTION) { + new (&e_)std::exception_ptr(t.e_); + } + return *this; +} + +template +Try::~Try() { + if (contains_ == VALUE) { + value_.~T(); + } else if (contains_ == EXCEPTION) { + e_.~exception_ptr(); + } +} + +template +T& Try::value() { + throwIfFailed(); + return value_; +} + +template +const T& Try::value() const { + throwIfFailed(); + return value_; +} + +template +void Try::throwIfFailed() const { + if (contains_ != VALUE) { + if (contains_ == EXCEPTION) { + std::rethrow_exception(e_); + } else { + throw UsingUninitializedTry(); + } + } +} + +void Try::throwIfFailed() const { + if (!hasValue_) { + std::rethrow_exception(e_); + } +} + +template +inline T moveFromTry(wangle::Try&& t) { + return std::move(t.value()); +} + +inline void moveFromTry(wangle::Try&& t) { + return t.value(); +} + +}} diff --git a/folly/wangle/Try.h b/folly/wangle/Try.h new file mode 100644 index 00000000..1e1df07d --- /dev/null +++ b/folly/wangle/Try.h @@ -0,0 +1,104 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace folly { namespace wangle { + +template +class Try { + static_assert(!std::is_reference::value, + "Try may not be used with reference types"); + + enum Contains { + VALUE, + EXCEPTION, + NOTHING, + }; + + public: + typedef T element_type; + + Try() : contains_(NOTHING) {} + explicit Try(const T& v) : contains_(VALUE), value_(v) {} + explicit Try(T&& v) : contains_(VALUE), value_(std::move(v)) {} + explicit Try(std::exception_ptr e) : contains_(EXCEPTION), e_(e) {} + + // move + Try(Try&& t); + Try& operator=(Try&& t); + + // no copy + Try(const Try& t) = delete; + Try& operator=(const Try& t) = delete; + + ~Try(); + + T& value(); + const T& value() const; + + void throwIfFailed() const; + + const T& operator*() const { return value(); } + T& operator*() { return value(); } + + const T* operator->() const { return &value(); } + T* operator->() { return &value(); } + + bool hasValue() const { return contains_ == VALUE; } + bool hasException() const { return contains_ == EXCEPTION; } + + private: + Contains contains_; + union { + T value_; + std::exception_ptr e_; + }; +}; + +template <> +class Try { + public: + Try() : hasValue_(true) {} + explicit Try(std::exception_ptr e) : hasValue_(false), e_(e) {} + + void value() const { throwIfFailed(); } + void operator*() const { return value(); } + + inline void throwIfFailed() const; + + bool hasValue() const { return hasValue_; } + bool hasException() const { return !hasValue_; } + + private: + bool hasValue_; + std::exception_ptr e_; +}; + +/** + * Extracts value from try and returns it. Throws if try contained an exception. + */ +template +T moveFromTry(wangle::Try&& t); + +/** + * Throws if try contained an exception. + */ +void moveFromTry(wangle::Try&& t); + +}} + +#include "Try-inl.h" diff --git a/folly/wangle/WangleException.h b/folly/wangle/WangleException.h new file mode 100644 index 00000000..40613c5c --- /dev/null +++ b/folly/wangle/WangleException.h @@ -0,0 +1,77 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { namespace wangle { + +class WangleException : public std::exception { + +public: + + explicit WangleException(std::string message_arg) + : message(message_arg) {} + + ~WangleException() throw(){} + + virtual const char *what() const throw() { + return message.c_str(); + } + + bool operator==(const WangleException &other) const{ + return other.message == this->message; + } + + bool operator!=(const WangleException &other) const{ + return !(*this == other); + } + + protected: + std::string message; +}; + +class BrokenPromise : public WangleException { + public: + explicit BrokenPromise() : + WangleException("Broken promise") { } +}; + +class NoState : public WangleException { + public: + explicit NoState() : WangleException("No state") { } +}; + +class PromiseAlreadySatisfied : public WangleException { + public: + explicit PromiseAlreadySatisfied() : + WangleException("Promise already satisfied") { } +}; + +class FutureAlreadyRetrieved : public WangleException { + public: + explicit FutureAlreadyRetrieved () : + WangleException("Future already retrieved") { } +}; + +class UsingUninitializedTry : public WangleException { + public: + explicit UsingUninitializedTry() : + WangleException("Using unitialized try") { } +}; + +}} diff --git a/folly/wangle/detail.h b/folly/wangle/detail.h new file mode 100644 index 00000000..bf85041e --- /dev/null +++ b/folly/wangle/detail.h @@ -0,0 +1,154 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "Try.h" +#include "Promise.h" +#include "Future.h" + +namespace folly { namespace wangle { namespace detail { + +/** The shared state object for Future and Promise. */ +template +class FutureObject { + public: + FutureObject() = default; + + // not copyable + FutureObject(FutureObject const&) = delete; + FutureObject& operator=(FutureObject const&) = delete; + + // not movable (see comment in the implementation of Future::then) + FutureObject(FutureObject&&) = delete; + FutureObject& operator=(FutureObject&&) = delete; + + Try& valueTry() { + return *value_; + } + + template + void setContinuation(F func) { + if (continuation_) { + throw std::logic_error("setContinuation called twice"); + } + + if (value_.hasValue()) { + func(std::move(*value_)); + delete this; + } else { + continuation_ = std::move(func); + } + } + + void fulfil(Try&& t) { + if (value_.hasValue()) { + throw std::logic_error("fulfil called twice"); + } + + if (continuation_) { + continuation_(std::move(t)); + delete this; + } else { + value_ = std::move(t); + } + } + + void setException(std::exception_ptr const& e) { + fulfil(Try(e)); + } + + template void setException(E const& e) { + fulfil(Try(std::make_exception_ptr(e))); + } + + bool ready() const { + return value_.hasValue(); + } + + typename std::add_lvalue_reference::type value() { + return value_->value(); + } + + private: + folly::Optional> value_; + std::function&&)> continuation_; +}; + +template +struct VariadicContext { + VariadicContext() : total(0), count(0) {} + Promise... > > p; + std::tuple... > results; + size_t total; + std::atomic count; + typedef Future...>> type; +}; + +template +typename std::enable_if::type +whenAllVariadicHelper(VariadicContext *ctx, THead& head, Fs&... tail) { + head.setContinuation([ctx](Try&& t) { + const size_t i = sizeof...(Ts) - sizeof...(Fs) - 1; + std::get(ctx->results) = std::move(t); + if (++ctx->count == ctx->total) { + ctx->p.setValue(std::move(ctx->results)); + delete ctx; + } + }); +} + +template +typename std::enable_if::type +whenAllVariadicHelper(VariadicContext *ctx, THead& head, Fs&... tail) { + head.setContinuation([ctx](Try&& t) { + const size_t i = sizeof...(Ts) - sizeof...(Fs) - 1; + std::get(ctx->results) = std::move(t); + if (++ctx->count == ctx->total) { + ctx->p.setValue(std::move(ctx->results)); + delete ctx; + } + }); + whenAllVariadicHelper(ctx, tail...); // recursive template tail call +} + +template +struct WhenAllContext { + explicit WhenAllContext() : count(0), total(0) {} + Promise > > p; + std::vector > results; + std::atomic count; + size_t total; +}; + +template +struct WhenAnyContext { + explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {}; + Promise>> p; + std::atomic done; + std::atomic ref_count; + void decref() { + if (--ref_count == 0) { + delete this; + } + } +}; + +}}} // namespace diff --git a/folly/wangle/test/FutureTest.cpp b/folly/wangle/test/FutureTest.cpp new file mode 100644 index 00000000..cdc8f82d --- /dev/null +++ b/folly/wangle/test/FutureTest.cpp @@ -0,0 +1,571 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "folly/wangle/Executor.h" +#include "folly/wangle/Future.h" + +using namespace folly::wangle; +using std::pair; +using std::string; +using std::unique_ptr; +using std::vector; + +#define EXPECT_TYPE(x, T) \ + EXPECT_TRUE((std::is_same::value)) + +typedef WangleException eggs_t; +static eggs_t eggs("eggs"); + +// Future + +TEST(Future, try) { + class A { + public: + A(int x) : x_(x) {} + + int x() const { + return x_; + } + private: + int x_; + }; + + A a(5); + Try t_a(std::move(a)); + + Try t_void; + + EXPECT_EQ(5, t_a.value().x()); +} + +TEST(Future, special) { + EXPECT_FALSE(std::is_copy_constructible>::value); + EXPECT_FALSE(std::is_copy_assignable>::value); + EXPECT_TRUE(std::is_move_constructible>::value); + EXPECT_TRUE(std::is_move_assignable>::value); +} + +TEST(Future, then) { + bool flag = false; + + makeFuture(42).then([&](Try&& t) { + flag = true; + EXPECT_EQ(42, t.value()); + }); + EXPECT_TRUE(flag); flag = false; + + makeFuture(42) + .then([](Try&& t) { return t.value(); }) + .then([&](Try&& t) { flag = true; EXPECT_EQ(42, t.value()); }); + EXPECT_TRUE(flag); flag = false; + + makeFuture().then([&](Try&& t) { flag = true; t.value(); }); + EXPECT_TRUE(flag); flag = false; + + Promise p; + auto f = p.getFuture().then([&](Try&& t) { flag = true; }); + EXPECT_FALSE(flag); + EXPECT_FALSE(f.isReady()); + p.setValue(); + EXPECT_TRUE(flag); + EXPECT_TRUE(f.isReady()); +} + +TEST(Future, value) { + auto f = makeFuture(unique_ptr(new int(42))); + auto up = std::move(f.value()); + EXPECT_EQ(42, *up); + + EXPECT_THROW(makeFuture(eggs).value(), eggs_t); +} + +TEST(Future, isReady) { + Promise p; + auto f = p.getFuture(); + EXPECT_FALSE(f.isReady()); + p.setValue(42); + EXPECT_TRUE(f.isReady()); + } + +TEST(Future, hasException) { + EXPECT_TRUE(makeFuture(eggs).valueTry().hasException()); + EXPECT_FALSE(makeFuture(42).valueTry().hasException()); +} + +TEST(Future, hasValue) { + EXPECT_TRUE(makeFuture(42).valueTry().hasValue()); + EXPECT_FALSE(makeFuture(eggs).valueTry().hasValue()); +} + +TEST(Future, makeFuture) { + EXPECT_TYPE(makeFuture(42), Future); + EXPECT_EQ(42, makeFuture(42).value()); + + EXPECT_TYPE(makeFuture(42), Future); + EXPECT_EQ(42, makeFuture(42).value()); + + auto fun = [] { return 42; }; + EXPECT_TYPE(makeFutureTry(fun), Future); + EXPECT_EQ(42, makeFutureTry(fun).value()); + + auto failfun = []() -> int { throw eggs; }; + EXPECT_TYPE(makeFutureTry(failfun), Future); + EXPECT_THROW(makeFutureTry(failfun).value(), eggs_t); + + EXPECT_TYPE(makeFuture(), Future); +} + +// Promise + +TEST(Promise, special) { + EXPECT_FALSE(std::is_copy_constructible>::value); + EXPECT_FALSE(std::is_copy_assignable>::value); + EXPECT_TRUE(std::is_move_constructible>::value); + EXPECT_TRUE(std::is_move_assignable>::value); +} + +TEST(Promise, getFuture) { + Promise p; + Future f = p.getFuture(); + EXPECT_FALSE(f.isReady()); +} + +TEST(Promise, setValue) { + Promise fund; + auto ffund = fund.getFuture(); + fund.setValue(42); + EXPECT_EQ(42, ffund.value()); + + struct Foo { + string name; + int value; + }; + + Promise pod; + auto fpod = pod.getFuture(); + Foo f = {"the answer", 42}; + pod.setValue(f); + Foo f2 = fpod.value(); + EXPECT_EQ(f.name, f2.name); + EXPECT_EQ(f.value, f2.value); + + pod = Promise(); + fpod = pod.getFuture(); + return; + pod.setValue(std::move(f2)); + Foo f3 = fpod.value(); + EXPECT_EQ(f.name, f3.name); + EXPECT_EQ(f.value, f3.value); + EXPECT_NE(f.name, f2.name); + + return; + + Promise> mov; + auto fmov = mov.getFuture(); + mov.setValue(unique_ptr(new int(42))); + unique_ptr ptr = std::move(fmov.value()); + EXPECT_EQ(42, *ptr); + + Promise v; + auto fv = v.getFuture(); + v.setValue(); + EXPECT_TRUE(fv.isReady()); +} + +TEST(Promise, setException) { + { + Promise p; + auto f = p.getFuture(); + p.setException(eggs); + EXPECT_THROW(f.value(), eggs_t); + } + { + Promise p; + auto f = p.getFuture(); + try { + throw eggs; + } catch (...) { + p.setException(std::current_exception()); + } + EXPECT_THROW(f.value(), eggs_t); + } +} + +TEST(Promise, fulfil) { + { + Promise p; + auto f = p.getFuture(); + p.fulfil([] { return 42; }); + EXPECT_EQ(42, f.value()); + } + { + Promise p; + auto f = p.getFuture(); + p.fulfil([]() -> int { throw eggs; }); + EXPECT_THROW(f.value(), eggs_t); + } +} + +TEST(Future, finish) { + auto x = std::make_shared(0); + Promise p; + auto f = p.getFuture().then([x](Try&& t) { *x = t.value(); }); + + // The continuation hasn't executed + EXPECT_EQ(0, *x); + + // The continuation has a reference to x + EXPECT_EQ(2, x.use_count()); + + p.setValue(42); + + // the continuation has executed + EXPECT_EQ(42, *x); + + // the continuation has been destructed + // and has released its reference to x + EXPECT_EQ(1, x.use_count()); +} + +TEST(Future, unwrap) { + Promise a; + Promise b; + + auto fa = a.getFuture(); + auto fb = b.getFuture(); + + bool flag1 = false; + bool flag2 = false; + + // do a, then do b, and get the result of a + b. + Future f = fa.then([&](Try&& ta) { + auto va = ta.value(); + flag1 = true; + return fb.then([va, &flag2](Try&& tb) { + flag2 = true; + return va + tb.value(); + }); + }); + + EXPECT_FALSE(flag1); + EXPECT_FALSE(flag2); + EXPECT_FALSE(f.isReady()); + + a.setValue(3); + EXPECT_TRUE(flag1); + EXPECT_FALSE(flag2); + EXPECT_FALSE(f.isReady()); + + b.setValue(4); + EXPECT_TRUE(flag1); + EXPECT_TRUE(flag2); + EXPECT_EQ(7, f.value()); +} + +TEST(Future, whenAll) { + // returns a vector variant + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = whenAll(futures.begin(), futures.end()); + + random_shuffle(promises.begin(), promises.end()); + for (auto& p : promises) { + EXPECT_FALSE(allf.isReady()); + p.setValue(42); + } + + EXPECT_TRUE(allf.isReady()); + auto& results = allf.value(); + for (auto& t : results) { + EXPECT_EQ(42, t.value()); + } + } + + // check error semantics + { + vector> promises(4); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = whenAll(futures.begin(), futures.end()); + + + promises[0].setValue(42); + promises[1].setException(eggs); + + EXPECT_FALSE(allf.isReady()); + + promises[2].setValue(42); + + EXPECT_FALSE(allf.isReady()); + + promises[3].setException(eggs); + + EXPECT_TRUE(allf.isReady()); + EXPECT_FALSE(allf.valueTry().hasException()); + + auto& results = allf.value(); + EXPECT_EQ(42, results[0].value()); + EXPECT_TRUE(results[1].hasException()); + EXPECT_EQ(42, results[2].value()); + EXPECT_TRUE(results[3].hasException()); + } + + // check that futures are ready in then() + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = whenAll(futures.begin(), futures.end()) + .then([](Try>>&& ts) { + for (auto& f : ts.value()) + f.value(); + }); + + random_shuffle(promises.begin(), promises.end()); + for (auto& p : promises) + p.setValue(); + EXPECT_TRUE(allf.isReady()); + } +} + + +TEST(Future, whenAny) { + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + for (auto& f : futures) { + EXPECT_FALSE(f.isReady()); + } + + auto anyf = whenAny(futures.begin(), futures.end()); + + /* futures were moved in, so these are invalid now */ + EXPECT_FALSE(anyf.isReady()); + + promises[7].setValue(42); + EXPECT_TRUE(anyf.isReady()); + auto& idx_fut = anyf.value(); + + auto i = idx_fut.first; + EXPECT_EQ(7, i); + + auto& f = idx_fut.second; + EXPECT_EQ(42, f.value()); + } + + // error + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + for (auto& f : futures) { + EXPECT_FALSE(f.isReady()); + } + + auto anyf = whenAny(futures.begin(), futures.end()); + + EXPECT_FALSE(anyf.isReady()); + + promises[3].setException(eggs); + EXPECT_TRUE(anyf.isReady()); + EXPECT_TRUE(anyf.value().second.hasException()); + } + + // then() + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto anyf = whenAny(futures.begin(), futures.end()) + .then([](Try>>&& f) { + EXPECT_EQ(42, f.value().second.value()); + }); + + promises[3].setValue(42); + EXPECT_TRUE(anyf.isReady()); + } +} + + +TEST(when, already_completed) { + { + vector> fs; + for (int i = 0; i < 10; i++) + fs.push_back(makeFuture()); + + whenAll(fs.begin(), fs.end()) + .then([&](Try>>&& t) { + EXPECT_EQ(fs.size(), t.value().size()); + }); + } + { + vector> fs; + for (int i = 0; i < 10; i++) + fs.push_back(makeFuture(i)); + + whenAny(fs.begin(), fs.end()) + .then([&](Try>>&& t) { + auto& p = t.value(); + EXPECT_EQ(p.first, p.second.value()); + }); + } +} + +TEST(when, whenN) { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + bool flag = false; + size_t n = 3; + whenN(futures.begin(), futures.end(), n) + .then([&](Try>>>&& t) { + flag = true; + auto v = t.value(); + EXPECT_EQ(n, v.size()); + for (auto& tt : v) + EXPECT_TRUE(tt.second.hasValue()); + }); + + promises[0].setValue(); + EXPECT_FALSE(flag); + promises[1].setValue(); + EXPECT_FALSE(flag); + promises[2].setValue(); + EXPECT_TRUE(flag); +} + +/* Ensure that we can compile when_{all,any} with folly::small_vector */ +TEST(when, small_vector) { + using folly::small_vector; + { + small_vector> futures; + + for (int i = 0; i < 10; i++) + futures.push_back(makeFuture()); + + auto anyf = whenAny(futures.begin(), futures.end()); + } + + { + small_vector> futures; + + for (int i = 0; i < 10; i++) + futures.push_back(makeFuture()); + + auto allf = whenAll(futures.begin(), futures.end()); + } +} + +TEST(Future, wait) { + Promise p; + auto f = p.getFuture(); + auto t = std::thread([&] { + std::this_thread::sleep_for(std::chrono::microseconds(10)); + p.setValue(); + }); + + f.wait(); + + EXPECT_TRUE(f.isReady()); + + t.join(); +} + +TEST(Future, whenAllVariadic) { + Promise pb; + Promise pi; + Future fb = pb.getFuture(); + Future fi = pi.getFuture(); + bool flag = false; + whenAll(fb, fi) + .then([&](Try, Try>>&& t) { + flag = true; + EXPECT_TRUE(t.hasValue()); + EXPECT_TRUE(std::get<0>(t.value()).hasValue()); + EXPECT_EQ(std::get<0>(t.value()).value(), true); + EXPECT_TRUE(std::get<1>(t.value()).hasValue()); + EXPECT_EQ(std::get<1>(t.value()).value(), 42); + }); + pb.setValue(true); + EXPECT_FALSE(flag); + pi.setValue(42); + EXPECT_TRUE(flag); +} + +TEST(Future, whenAll_none) { + vector> fs; + auto f = whenAll(fs.begin(), fs.end()); + EXPECT_TRUE(f.isReady()); +} + +TEST(Future, throwCaughtInImmediateThen) { + // Neither of these should throw "Promise already satisfied" + makeFuture().then( + [=](Try&&) -> int { throw std::exception(); }); + makeFuture().then( + [=](Try&&) -> Future { throw std::exception(); }); +} + +TEST(Future, throwIfFailed) { + makeFuture(eggs) + .then([=](Try&& t) { + EXPECT_THROW(t.throwIfFailed(), eggs_t); + }); + makeFuture() + .then([=](Try&& t) { + EXPECT_NO_THROW(t.throwIfFailed()); + }); + + makeFuture(eggs) + .then([=](Try&& t) { + EXPECT_THROW(t.throwIfFailed(), eggs_t); + }); + makeFuture(42) + .then([=](Try&& t) { + EXPECT_NO_THROW(t.throwIfFailed()); + }); +} diff --git a/folly/wangle/test/LaterTest.cpp b/folly/wangle/test/LaterTest.cpp new file mode 100644 index 00000000..37d0a4bf --- /dev/null +++ b/folly/wangle/test/LaterTest.cpp @@ -0,0 +1,144 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "folly/wangle/ManualExecutor.h" +#include "folly/wangle/InlineExecutor.h" +#include "folly/wangle/Later.h" + +using namespace folly::wangle; + +struct ManualWaiter { + explicit ManualWaiter(std::shared_ptr ex) : ex(ex) {} + + void makeProgress() { + ex->wait(); + ex->run(); + } + + std::shared_ptr ex; +}; + +struct LaterFixture : public testing::Test { + LaterFixture() : + westExecutor(new ManualExecutor), + eastExecutor(new ManualExecutor), + waiter(new ManualWaiter(westExecutor)), + done(false) + { + t = std::thread([=] { + ManualWaiter eastWaiter(eastExecutor); + while (!done) + eastWaiter.makeProgress(); + }); + } + + ~LaterFixture() { + done = true; + eastExecutor->add([=]() { }); + t.join(); + } + + Later later; + std::shared_ptr westExecutor; + std::shared_ptr eastExecutor; + std::shared_ptr waiter; + InlineExecutor inlineExecutor; + bool done; + std::thread t; +}; + +TEST(Later, construct_and_launch) { + bool fulfilled = false; + auto later = Later().then([&](Try&& t) { + fulfilled = true; + return makeFuture(1); + }); + + // has not started yet. + EXPECT_FALSE(fulfilled); + + EXPECT_EQ(later.launch().value(), 1); + EXPECT_TRUE(fulfilled); +} + +TEST(Later, then_value) { + auto future = Later(std::move(1)) + .then([](Try&& t) { + return t.value() == 1; + }) + .launch(); + + EXPECT_TRUE(future.value()); +} + +TEST(Later, then_future) { + auto future = Later(1) + .then([](Try&& t) { + return makeFuture(t.value() == 1); + }) + .launch(); + EXPECT_TRUE(future.value()); +} + +TEST_F(LaterFixture, thread_hops) { + auto westThreadId = std::this_thread::get_id(); + auto future = later.via(eastExecutor.get()).then([=](Try&& t) { + EXPECT_NE(std::this_thread::get_id(), westThreadId); + return makeFuture(1); + }).via(westExecutor.get() + ).then([=](Try&& t) { + EXPECT_EQ(std::this_thread::get_id(), westThreadId); + return t.value(); + }).launch(); + while (!future.isReady()) { + waiter->makeProgress(); + } + EXPECT_EQ(future.value(), 1); +} + +TEST_F(LaterFixture, chain_laters) { + auto westThreadId = std::this_thread::get_id(); + auto future = later.via(eastExecutor.get()).then([=](Try&& t) { + EXPECT_NE(std::this_thread::get_id(), westThreadId); + return makeFuture(1); + }).then([=](Try&& t) { + int val = t.value(); + return Later(std::move(val)).via(westExecutor.get()) + .then([=](Try&& t) mutable { + EXPECT_EQ(std::this_thread::get_id(), westThreadId); + return t.value(); + }); + }).then([=](Try&& t) { + EXPECT_EQ(std::this_thread::get_id(), westThreadId); + return t.value(); + }).launch(); + + while (!future.isReady()) { + waiter->makeProgress(); + } + EXPECT_EQ(future.value(), 1); +} + +TEST_F(LaterFixture, fire_and_forget) { + auto west = westExecutor.get(); + later.via(eastExecutor.get()).then([=](Try&& t) { + west->add([]() {}); + }).fireAndForget(); + waiter->makeProgress(); +} diff --git a/folly/wangle/test/ThreadGateTest.cpp b/folly/wangle/test/ThreadGateTest.cpp new file mode 100644 index 00000000..ef434ac6 --- /dev/null +++ b/folly/wangle/test/ThreadGateTest.cpp @@ -0,0 +1,165 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "folly/wangle/Executor.h" +#include "folly/wangle/ManualExecutor.h" +#include "folly/wangle/ThreadGate.h" +#include "folly/wangle/GenericThreadGate.h" + +using namespace folly::wangle; +using std::make_shared; +using std::shared_ptr; +using std::thread; +using std::vector; + +struct ManualWaiter { + explicit ManualWaiter(shared_ptr ex) : ex(ex) {} + + void makeProgress() { + ex->wait(); + ex->run(); + } + + shared_ptr ex; +}; + +struct GenericThreadGateFixture : public testing::Test { + GenericThreadGateFixture() : + westExecutor(new ManualExecutor), + eastExecutor(new ManualExecutor), + waiter(new ManualWaiter(westExecutor)), + tg(westExecutor, eastExecutor, waiter), + done(false) + { + t = thread([=] { + ManualWaiter eastWaiter(eastExecutor); + while (!done) + eastWaiter.makeProgress(); + }); + } + + ~GenericThreadGateFixture() { + done = true; + tg.gate([] { return makeFuture(); }); + t.join(); + } + + shared_ptr westExecutor; + shared_ptr eastExecutor; + shared_ptr waiter; + GenericThreadGate< + shared_ptr, + shared_ptr, + shared_ptr> tg; + bool done; + thread t; +}; + +TEST_F(GenericThreadGateFixture, gate_and_wait) { + auto f = tg.gate([] { return makeFuture(); }); + EXPECT_FALSE(f.isReady()); + + tg.waitFor(f); + EXPECT_TRUE(f.isReady()); +} + +TEST_F(GenericThreadGateFixture, gate_many) { + vector> fs; + int n = 10; + + for (int i = 0; i < n; i++) + fs.push_back(tg.gate([&] { return makeFuture(); })); + + for (auto& f : fs) + EXPECT_FALSE(f.isReady()); + + auto all = whenAll(fs.begin(), fs.end()); + tg.waitFor(all); +} + +TEST_F(GenericThreadGateFixture, gate_alternating) { + vector> ps(10); + vector> fs; + size_t count = 0; + + for (auto& p : ps) { + auto* pp = &p; + auto f = tg.gate([=] { return pp->getFuture(); }); + + // abuse the thread gate to do our dirty work in the other thread + tg.gate([=] { pp->setValue(); return makeFuture(); }); + + fs.push_back(f.then([&](Try&&) { count++; })); + } + + for (auto& f : fs) + EXPECT_FALSE(f.isReady()); + EXPECT_EQ(0, count); + + auto all = whenAll(fs.begin(), fs.end()); + tg.waitFor(all); + + EXPECT_EQ(ps.size(), count); +} + +TEST(GenericThreadGate, noWaiter) { + auto west = make_shared(); + auto east = make_shared(); + Promise p; + auto dummyFuture = p.getFuture(); + + GenericThreadGate + tg(west.get(), east.get()); + + EXPECT_THROW(tg.waitFor(dummyFuture), std::logic_error); +} + +TEST_F(GenericThreadGateFixture, gate_with_promise) { + Promise p; + + auto westId = std::this_thread::get_id(); + bool westThenCalled = false; + auto f = p.getFuture().then( + [westId, &westThenCalled](Try&& t) { + EXPECT_EQ(t.value(), 1); + EXPECT_EQ(std::this_thread::get_id(), westId); + westThenCalled = true; + return t.value(); + }); + + bool eastPromiseMade = false; + std::async(std::launch::async, [&p, &eastPromiseMade, this]() { + // South thread != west thread. p gets set in west thread. + tg.gate([&p, &eastPromiseMade, this] { + EXPECT_EQ(t.get_id(), std::this_thread::get_id()); + Promise eastPromise; + auto eastFuture = eastPromise.getFuture(); + eastPromise.setValue(1); + eastPromiseMade = true; + return eastFuture; + }, + std::move(p)); + }); + + tg.waitFor(f); + EXPECT_TRUE(westThenCalled); + EXPECT_TRUE(eastPromiseMade); + EXPECT_EQ(f.value(), 1); +} diff --git a/folly/wangle/test/main.cpp b/folly/wangle/test/main.cpp new file mode 100644 index 00000000..7dbf27d4 --- /dev/null +++ b/folly/wangle/test/main.cpp @@ -0,0 +1,22 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}