#include <chrono>
#include <thread>
-#include <folly/wangle/futures/detail/Core.h>
#include <folly/Baton.h>
+#include <folly/wangle/futures/detail/Core.h>
+#include <folly/wangle/futures/Timekeeper.h>
namespace folly { namespace wangle {
+class Timekeeper;
+
+namespace detail {
+ Timekeeper* getTimekeeperSingleton();
+}
+
template <typename T>
struct isFuture {
static const bool value = false;
return done;
}
-template <typename T, class Duration>
+template <typename T, class Dur>
Future<T>
-waitWithSemaphore(Future<T>&& f, Duration timeout) {
+waitWithSemaphore(Future<T>&& f, Dur timeout) {
auto baton = std::make_shared<Baton<>>();
auto done = f.then([baton](Try<T> &&t) {
baton->post();
return done;
}
-template <class Duration>
+template <class Dur>
Future<void>
-waitWithSemaphore(Future<void>&& f, Duration timeout) {
+waitWithSemaphore(Future<void>&& f, Dur timeout) {
auto baton = std::make_shared<Baton<>>();
auto done = f.then([baton](Try<void> &&t) {
baton->post();
return done;
}
+namespace {
+ template <class T>
+ void getWaitHelper(Future<T>* f) {
+ // If we already have a value do the cheap thing
+ if (f->isReady()) {
+ return;
+ }
+
+ folly::Baton<> baton;
+ f->then([&](Try<T> const&) {
+ baton.post();
+ });
+ baton.wait();
+ }
+
+ template <class T>
+ Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
+ // TODO make and use variadic whenAny #5877971
+ Promise<T> p;
+ auto token = std::make_shared<std::atomic<bool>>();
+ folly::Baton<> baton;
+
+ folly::wangle::detail::getTimekeeperSingleton()->after(dur)
+ .then([&,token](Try<void> const& t) {
+ try {
+ t.value();
+ if (token->exchange(true) == false) {
+ p.setException(TimedOut());
+ baton.post();
+ }
+ } catch (std::exception const& e) {
+ if (token->exchange(true) == false) {
+ p.setException(std::current_exception());
+ baton.post();
+ }
+ }
+ });
+
+ f->then([&, token](Try<T>&& t) {
+ if (token->exchange(true) == false) {
+ p.fulfilTry(std::move(t));
+ baton.post();
+ }
+ });
+
+ baton.wait();
+ return p.getFuture();
+ }
+}
+
+template <class T>
+T Future<T>::get() {
+ getWaitHelper(this);
+
+ // Big assumption here: the then() call above, since it doesn't move out
+ // the value, leaves us with a value to return here. This would be a big
+ // no-no in user code, but I'm invoking internal developer privilege. This
+ // is slightly more efficient (save a move()) especially if there's an
+ // exception (save a throw).
+ return std::move(value());
+}
+
+template <>
+inline void Future<void>::get() {
+ getWaitHelper(this);
+}
+
+template <class T>
+T Future<T>::get(Duration dur) {
+ return std::move(getWaitTimeoutHelper(this, dur).value());
+}
+
+template <>
+inline void Future<void>::get(Duration dur) {
+ getWaitTimeoutHelper(this, dur).value();
+}
+
+template <class T>
+Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk)
+{
+ return whenAll(*this, futures::sleep(dur, tk))
+ .then([](Try<std::tuple<Try<T>, Try<void>>>&& tup) {
+ Try<T>& t = std::get<0>(tup.value());
+ return makeFuture<T>(std::move(t));
+ });
+}
+
}}
// I haven't included a Future<T&> specialization because I don't forsee us
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/futures/Future.h>
+#include <folly/wangle/futures/detail/ThreadWheelTimekeeper.h>
+#include <folly/Likely.h>
+
+namespace folly { namespace wangle { namespace futures {
+
+Future<void> sleep(Duration dur, Timekeeper* tk) {
+ if (LIKELY(!tk)) {
+ tk = detail::getTimekeeperSingleton();
+ }
+ return tk->after(dur);
+}
+
+}}}
#include <folly/MoveWrapper.h>
#include <folly/wangle/futures/Promise.h>
#include <folly/wangle/futures/Try.h>
+#include <folly/wangle/futures/WangleException.h>
+#include <folly/wangle/futures/detail/Types.h>
namespace folly { namespace wangle {
+template <class> struct Promise;
+
namespace detail {
template <class> struct Core;
typedef typename ArgType<Args...>::FirstArg FirstArg;
};
+
} // detail
-template <class> struct Promise;
+struct Timekeeper;
template <typename T> struct isFuture;
+/// This namespace is for utility functions that would usually be static
+/// members of Future, except they don't make sense there because they don't
+/// depend on the template type (rather, on the type of their arguments in
+/// some cases). This is the least-bad naming scheme we could think of. Some
+/// of the functions herein have really-likely-to-collide names, like "map"
+/// and "sleep".
+namespace futures {
+ /// Returns a Future that will complete after the specified duration. The
+ /// Duration typedef of a `std::chrono` duration type indicates the
+ /// resolution you can expect to be meaningful (milliseconds at the time of
+ /// writing). Normally you wouldn't need to specify a Timekeeper, we will
+ /// use the global wangle timekeeper (we run a thread whose job it is to
+ /// keep time for wangle timeouts) but we provide the option for power
+ /// users.
+ ///
+ /// The Timekeeper thread will be lazily created the first time it is
+ /// needed. If your program never uses any timeouts or other time-based
+ /// Futures you will pay no Timekeeper thread overhead.
+ Future<void> sleep(Duration, Timekeeper* = nullptr);
+}
+
template <class T>
class Future {
public:
/** A reference to the Try of the value */
Try<T>& getTry();
+ /// Block until the future is fulfilled. Returns the value (moved out), or
+ /// throws the exception. The future must not already have a callback.
+ T get();
+
+ /// Block until the future is fulfilled, or until timed out. Returns the
+ /// value (moved out), or throws the exception (which might be a TimedOut
+ /// exception).
+ T get(Duration dur);
+
/** When this Future has completed, execute func which is a function that
takes a Try<T>&&. A Future for the return type of func is
returned. e.g.
raise(FutureCancellation());
}
+ /// Delay the completion of this Future for at least this duration from
+ /// now. The optional Timekeeper is as with futures::sleep().
+ Future<T> delayed(Duration, Timekeeper* = nullptr);
+
private:
typedef detail::Core<T>* corePtr;
*
* Note: each call to this starts a (short-lived) thread and allocates memory.
*/
-template <typename T, class Duration>
-Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
+template <typename T, class Dur>
+Future<T> waitWithSemaphore(Future<T>&& f, Dur timeout);
}} // folly::wangle
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/futures/detail/Types.h>
+
+namespace folly { namespace wangle {
+
+template <class> struct Future;
+
+/// A Timekeeper handles the details of keeping time and fulfilling delay
+/// promises. The returned Future<void> will either complete after the
+/// elapsed time, or in the event of some kind of exceptional error may hold
+/// an exception. These Futures respond to cancellation. If you use a lot of
+/// Delays and many of them ultimately are unneeded (as would be the case for
+/// Delays that are used to trigger timeouts of async operations), then you
+/// can and should cancel them to reclaim resources.
+///
+/// Users will typically get one of these via Future::sleep(Duration) or
+/// use them implicitly behind the scenes by passing a timeout to some Future
+/// operation.
+///
+/// Although we don't formally alias Delay = Future<void>,
+/// that's an appropriate term for it. People will probably also call these
+/// Timeouts, and that's ok I guess, but that term is so overloaded I thought
+/// it made sense to introduce a cleaner term.
+///
+/// Remember that Duration is a std::chrono duration (millisecond resolution
+/// at the time of writing).
+class Timekeeper {
+ public:
+ virtual ~Timekeeper() = default;
+
+ /// Returns a future that will complete after the given duration with the
+ /// elapsed time. Exceptional errors can happen but they must be
+ /// exceptional. Use the steady (monotonic) clock.
+ ///
+ /// You may cancel this Future to reclaim resources.
+ ///
+ /// This future probably completes on the timer thread. You should almost
+ /// certainly follow it with a via() call or the accuracy of other timers
+ /// will suffer.
+ virtual Future<void> after(Duration) = 0;
+
+ /// Returns a future that will complete at the requested time.
+ ///
+ /// You may cancel this Future to reclaim resources.
+ ///
+ /// NB This is sugar for `after(when - now)`, so while you are welcome to
+ /// use a std::chrono::system_clock::time_point it will not track changes to
+ /// the system clock but rather execute that many milliseconds in the future
+ /// according to the steady clock.
+ template <class Clock>
+ Future<void> at(std::chrono::time_point<Clock> when);
+};
+
+}}
+
+// now get those definitions
+#include <folly/wangle/futures/Future.h>
+
+// finally we can use Future
+namespace folly { namespace wangle {
+
+ template <class Clock>
+ Future<void> Timekeeper::at(std::chrono::time_point<Clock> when) {
+ auto now = Clock::now();
+
+ if (when <= now) {
+ return makeFuture();
+ }
+
+ return after(when - now);
+ }
+
+}}
FutureCancellation() : WangleException("Future was cancelled") {}
};
+class TimedOut : public WangleException {
+ public:
+ TimedOut() : WangleException("Timed out") {}
+};
+
}}
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// fbbuild is too dumb to know that .h files in the directory affect
-// our project, unless we have a .cpp file in the target, in the same
-// directory.
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ThreadWheelTimekeeper.h"
+
+#include <folly/experimental/Singleton.h>
+#include <folly/wangle/futures/Future.h>
+#include <future>
+
+namespace folly { namespace wangle { namespace detail {
+
+namespace {
+ Singleton<ThreadWheelTimekeeper> timekeeperSingleton_;
+
+ // Our Callback object for HHWheelTimer
+ struct WTCallback : public folly::HHWheelTimer::Callback {
+ // Only allow creation by this factory, to ensure heap allocation.
+ static WTCallback* create() {
+ // optimization opportunity: memory pool
+ return new WTCallback();
+ }
+
+ Future<void> getFuture() {
+ return promise_.getFuture();
+ }
+
+ protected:
+ Promise<void> promise_;
+
+ explicit WTCallback() {
+ promise_.setInterruptHandler(
+ std::bind(&WTCallback::interruptHandler, this));
+ }
+
+ void timeoutExpired() noexcept override {
+ promise_.setValue();
+ delete this;
+ }
+
+ void interruptHandler() {
+ cancelTimeout();
+ delete this;
+ }
+ };
+
+} // namespace
+
+
+ThreadWheelTimekeeper::ThreadWheelTimekeeper() :
+ thread_([this]{ eventBase_.loopForever(); }),
+ wheelTimer_(new HHWheelTimer(&eventBase_, std::chrono::milliseconds(1)))
+{
+ eventBase_.waitUntilRunning();
+ eventBase_.runInEventBaseThread([this]{
+ // 15 characters max
+ eventBase_.setName("FutureTimekeepr");
+ });
+}
+
+ThreadWheelTimekeeper::~ThreadWheelTimekeeper() {
+ eventBase_.runInEventBaseThread([this]{
+ wheelTimer_->cancelAll();
+ });
+ eventBase_.terminateLoopSoon();
+ thread_.join();
+}
+
+Future<void> ThreadWheelTimekeeper::after(Duration dur) {
+ auto cob = WTCallback::create();
+ auto f = cob->getFuture();
+ eventBase_.runInEventBaseThread([=]{
+ wheelTimer_->scheduleTimeout(cob, dur);
+ });
+ return f;
+}
+
+Timekeeper* getTimekeeperSingleton() {
+ return timekeeperSingleton_.get_fast();
+}
+
+}}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/futures/Future.h>
+#include <folly/wangle/futures/Timekeeper.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <thread>
+
+namespace folly { namespace wangle { namespace detail {
+
+/// The default Timekeeper implementation which uses a HHWheelTimer on an
+/// EventBase in a dedicated thread. Users needn't deal with this directly, it
+/// is used by default by Future methods that work with timeouts.
+class ThreadWheelTimekeeper : public Timekeeper {
+ public:
+ /// But it doesn't *have* to be a singleton.
+ ThreadWheelTimekeeper();
+ ~ThreadWheelTimekeeper();
+
+ /// Implement the Timekeeper interface
+ /// This future *does* complete on the timer thread. You should almost
+ /// certainly follow it with a via() call or the accuracy of other timers
+ /// will suffer.
+ Future<void> after(Duration) override;
+
+ protected:
+ folly::EventBase eventBase_;
+ std::thread thread_;
+ HHWheelTimer::UniquePtr wheelTimer_;
+};
+
+Timekeeper* getTimekeeperSingleton();
+
+}}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+namespace folly { namespace wangle {
+
+using Duration = std::chrono::milliseconds;
+
+}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <folly/wangle/futures/Timekeeper.h>
+#include <unistd.h>
+
+using namespace folly::wangle;
+using namespace std::chrono;
+using folly::wangle::Timekeeper;
+using Duration = folly::wangle::Duration;
+
+std::chrono::milliseconds const one_ms(1);
+std::chrono::milliseconds const awhile(10);
+
+std::chrono::steady_clock::time_point now() {
+ return std::chrono::steady_clock::now();
+}
+
+struct TimekeeperFixture : public testing::Test {
+ TimekeeperFixture() :
+ timeLord_(folly::wangle::detail::getTimekeeperSingleton())
+ {}
+
+ Timekeeper* timeLord_;
+};
+
+TEST_F(TimekeeperFixture, after) {
+ Duration waited(0);
+
+ auto t1 = now();
+ auto f = timeLord_->after(awhile);
+ EXPECT_FALSE(f.isReady());
+ f.get();
+ auto t2 = now();
+
+ EXPECT_GE(t2 - t1, awhile);
+}
+
+TEST(Timekeeper, futureGet) {
+ Promise<int> p;
+ std::thread([&]{ p.setValue(42); }).detach();
+ EXPECT_EQ(42, p.getFuture().get());
+}
+
+TEST(Timekeeper, futureGetBeforeTimeout) {
+ Promise<int> p;
+ std::thread([&]{ p.setValue(42); }).detach();
+ // Technically this is a race and if the test server is REALLY overloaded
+ // and it takes more than a second to do that thread it could be flaky. But
+ // I want a low timeout (in human terms) so if this regresses and someone
+ // runs it by hand they're not sitting there forever wondering why it's
+ // blocked, and get a useful error message instead. If it does get flaky,
+ // empirically increase the timeout to the point where it's very improbable.
+ EXPECT_EQ(42, p.getFuture().get(seconds(2)));
+}
+
+TEST(Timekeeper, futureGetTimeout) {
+ Promise<int> p;
+ EXPECT_THROW(p.getFuture().get(Duration(1)), folly::wangle::TimedOut);
+}
+
+TEST(Timekeeper, futureSleep) {
+ auto t1 = now();
+ futures::sleep(one_ms).get();
+ EXPECT_GE(now() - t1, one_ms);
+}
+
+TEST(Timekeeper, futureDelayed) {
+ auto t1 = now();
+ auto dur = makeFuture()
+ .delayed(one_ms)
+ .then([=]{ return now() - t1; })
+ .get();
+
+ EXPECT_GE(dur, one_ms);
+}