private:
void maybeCallback() {
assert(ready());
- if (!calledBack_ && isActive() && callback_) {
- // TODO(5306911) we should probably try/catch
- calledBack_ = true;
- Executor* x = executor_;
-
- RequestContext::setContext(context_);
- if (x) {
- MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
- MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
- x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
- } else {
- callback_(std::move(*result_));
+ if (isActive() && callback_) {
+ if (!calledBack_.exchange(true)) {
+ // TODO(5306911) we should probably try/catch
+ Executor* x = executor_;
+
+ RequestContext::setContext(context_);
+ if (x) {
+ MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
+ MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+ x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
+ } else {
+ callback_(std::move(*result_));
+ }
}
}
}
#include <folly/wangle/Executor.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/ManualExecutor.h>
+#include <folly/MPMCQueue.h>
#include <folly/io/async/Request.h>
#define EXPECT_TYPE(x, T) \
EXPECT_TRUE((std::is_same<decltype(x), T>::value))
+/// Simple executor that does work in another thread
+class ThreadExecutor : public Executor {
+ folly::MPMCQueue<Func> funcs;
+ std::atomic<bool> done {false};
+ std::thread worker;
+ folly::Baton<> baton;
+
+ void work() {
+ baton.post();
+ Func fn;
+ while (!done) {
+ while (!funcs.isEmpty()) {
+ funcs.blockingRead(fn);
+ fn();
+ }
+ }
+ }
+
+ public:
+ ThreadExecutor(size_t n = 1024)
+ : funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {}
+
+ ~ThreadExecutor() {
+ done = true;
+ funcs.write([]{});
+ worker.join();
+ }
+
+ void add(Func fn) override {
+ funcs.blockingWrite(std::move(fn));
+ }
+
+ void waitForStartup() {
+ baton.wait();
+ }
+};
+
typedef WangleException eggs_t;
static eggs_t eggs("eggs");
// Fulfil the promise
p.setValue();
}
+
+
+// This only fails about 1 in 1k times when the bug is present :(
+TEST(Future, t5506504) {
+ ThreadExecutor x;
+
+ auto fn = [&x]{
+ auto promises = std::make_shared<vector<Promise<void>>>(4);
+ vector<Future<void>> futures;
+
+ for (auto& p : *promises) {
+ futures.emplace_back(
+ p.getFuture()
+ .via(&x)
+ .then([](Try<void>&&){}));
+ }
+
+ x.waitForStartup();
+ x.add([promises]{
+ for (auto& p : *promises) p.setValue();
+ });
+
+ return whenAll(futures.begin(), futures.end());
+ };
+
+ waitWithSemaphore(fn());
+}