experimental/fibers/WhenN.h \
experimental/fibers/WhenN-inl.h \
experimental/FunctionScheduler.h \
+ experimental/FutureDAG.h \
experimental/io/FsUtil.h \
experimental/JSONSchema.h \
experimental/Select64.h \
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/futures/SharedPromise.h>
+
+namespace folly {
+
+class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
+ public:
+ static std::shared_ptr<FutureDAG> create() {
+ return std::shared_ptr<FutureDAG>(new FutureDAG());
+ }
+
+ typedef size_t Handle;
+ typedef std::function<Future<void>()> FutureFunc;
+
+ Handle add(FutureFunc func, Executor* executor = nullptr) {
+ nodes.emplace_back(std::move(func), executor);
+ return nodes.size() - 1;
+ }
+
+ void dependency(Handle a, Handle b) {
+ nodes[b].dependencies.push_back(a);
+ nodes[a].hasDependents = true;
+ }
+
+ Future<void> go() {
+ if (hasCycle()) {
+ return makeFuture<void>(std::runtime_error("Cycle in FutureDAG graph"));
+ }
+ std::vector<Handle> rootNodes;
+ std::vector<Handle> leafNodes;
+ for (Handle handle = 0; handle < nodes.size(); handle++) {
+ if (nodes[handle].dependencies.empty()) {
+ rootNodes.push_back(handle);
+ }
+ if (!nodes[handle].hasDependents) {
+ leafNodes.push_back(handle);
+ }
+ }
+
+ auto sinkHandle = add([] { return Future<void>(); });
+ for (auto handle : leafNodes) {
+ dependency(handle, sinkHandle);
+ }
+
+ auto sourceHandle = add(nullptr);
+ for (auto handle : rootNodes) {
+ dependency(sourceHandle, handle);
+ }
+
+ for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
+ std::vector<Future<void>> dependencies;
+ for (auto depHandle : nodes[handle].dependencies) {
+ dependencies.push_back(nodes[depHandle].promise.getFuture());
+ }
+
+ collect(dependencies)
+ .via(nodes[handle].executor)
+ .then([this, handle] {
+ nodes[handle].func()
+ .then([this, handle] (Try<void>&& t) {
+ nodes[handle].promise.setTry(std::move(t));
+ });
+ })
+ .onError([this, handle] (exception_wrapper ew) {
+ nodes[handle].promise.setException(std::move(ew));
+ });
+ }
+
+ nodes[sourceHandle].promise.setValue();
+ auto that = shared_from_this();
+ return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+ }
+
+ private:
+ FutureDAG() = default;
+
+ bool hasCycle() {
+ // Perform a modified topological sort to detect cycles
+ std::vector<std::vector<Handle>> dependencies;
+ for (auto& node : nodes) {
+ dependencies.push_back(node.dependencies);
+ }
+
+ std::vector<size_t> dependents(nodes.size());
+ for (auto& dependencyEdges : dependencies) {
+ for (auto handle : dependencyEdges) {
+ dependents[handle]++;
+ }
+ }
+
+ std::vector<Handle> handles;
+ for (Handle handle = 0; handle < nodes.size(); handle++) {
+ if (!nodes[handle].hasDependents) {
+ handles.push_back(handle);
+ }
+ }
+
+ while (!handles.empty()) {
+ auto handle = handles.back();
+ handles.pop_back();
+ while (!dependencies[handle].empty()) {
+ auto dependency = dependencies[handle].back();
+ dependencies[handle].pop_back();
+ if (--dependents[dependency] == 0) {
+ handles.push_back(dependency);
+ }
+ }
+ }
+
+ for (auto& dependencyEdges : dependencies) {
+ if (!dependencyEdges.empty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ struct Node {
+ Node(FutureFunc&& funcArg, Executor* executorArg) :
+ func(std::move(funcArg)), executor(executorArg) {}
+
+ FutureFunc func{nullptr};
+ Executor* executor{nullptr};
+ SharedPromise<void> promise;
+ std::vector<Handle> dependencies;
+ bool hasDependents{false};
+ bool visited{false};
+ };
+
+ std::vector<Node> nodes;
+};
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/FutureDAG.h>
+#include <gtest/gtest.h>
+#include <boost/thread/barrier.hpp>
+
+using namespace folly;
+
+struct FutureDAGTest : public testing::Test {
+ typedef FutureDAG::Handle Handle;
+
+ Handle add() {
+ auto node = folly::make_unique<TestNode>(this);
+ auto handle = node->handle;
+ nodes.emplace(handle, std::move(node));
+ return handle;
+ }
+
+ void dependency(Handle a, Handle b) {
+ nodes.at(b)->dependencies.push_back(a);
+ dag->dependency(a, b);
+ }
+
+ void checkOrder() {
+ EXPECT_EQ(nodes.size(), order.size());
+ for (auto& kv : nodes) {
+ auto handle = kv.first;
+ auto& node = kv.second;
+ auto it = order.begin();
+ while (*it != handle) {
+ it++;
+ }
+ for (auto dep : node->dependencies) {
+ EXPECT_TRUE(std::find(it, order.end(), dep) == order.end());
+ }
+ }
+ }
+
+ struct TestNode {
+ explicit TestNode(FutureDAGTest* test) {
+ func = [this, test] {
+ test->order.push_back(handle);
+ return Future<void>();
+ };
+ handle = test->dag->add(func);
+ }
+
+ FutureDAG::FutureFunc func;
+ Handle handle;
+ std::vector<Handle> dependencies;
+ };
+
+ std::shared_ptr<FutureDAG> dag = FutureDAG::create();
+ std::map<Handle, std::unique_ptr<TestNode>> nodes;
+ std::vector<Handle> order;
+};
+
+
+TEST_F(FutureDAGTest, SingleNode) {
+ add();
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+TEST_F(FutureDAGTest, FanOut) {
+ auto h1 = add();
+ auto h2 = add();
+ auto h3 = add();
+ dependency(h1, h2);
+ dependency(h1, h3);
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+TEST_F(FutureDAGTest, FanIn) {
+ auto h1 = add();
+ auto h2 = add();
+ auto h3 = add();
+ dependency(h1, h3);
+ dependency(h2, h3);
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+TEST_F(FutureDAGTest, FanOutFanIn) {
+ auto h1 = add();
+ auto h2 = add();
+ auto h3 = add();
+ auto h4 = add();
+ dependency(h1, h3);
+ dependency(h1, h2);
+ dependency(h2, h4);
+ dependency(h3, h4);
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+TEST_F(FutureDAGTest, Complex) {
+ auto A = add();
+ auto B = add();
+ auto C = add();
+ auto D = add();
+ auto E = add();
+ auto F = add();
+ auto G = add();
+ auto H = add();
+ auto I = add();
+ auto J = add();
+ auto K = add();
+ auto L = add();
+ auto M = add();
+ auto N = add();
+
+ dependency(A, B);
+ dependency(A, C);
+ dependency(A, D);
+ dependency(A, J);
+ dependency(C, H);
+ dependency(D, E);
+ dependency(E, F);
+ dependency(E, G);
+ dependency(F, H);
+ dependency(G, H);
+ dependency(H, I);
+ dependency(J, K);
+ dependency(K, L);
+ dependency(K, M);
+ dependency(L, N);
+ dependency(I, N);
+
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+FutureDAG::FutureFunc makeFutureFunc = []{
+ return makeFuture();
+};
+
+FutureDAG::FutureFunc throwFunc = []{
+ return makeFuture<void>(std::runtime_error("oops"));
+};
+
+TEST_F(FutureDAGTest, ThrowBegin) {
+ auto h1 = dag->add(throwFunc);
+ auto h2 = dag->add(makeFutureFunc);
+ dag->dependency(h1, h2);
+ EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, ThrowEnd) {
+ auto h1 = dag->add(makeFutureFunc);
+ auto h2 = dag->add(throwFunc);
+ dag->dependency(h1, h2);
+ EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, Cycle1) {
+ auto h1 = add();
+ dependency(h1, h1);
+ EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, Cycle2) {
+ auto h1 = add();
+ auto h2 = add();
+ dependency(h1, h2);
+ dependency(h2, h1);
+ EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, Cycle3) {
+ auto h1 = add();
+ auto h2 = add();
+ auto h3 = add();
+ dependency(h1, h2);
+ dependency(h2, h3);
+ dependency(h3, h1);
+ EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, DestroyBeforeComplete) {
+ auto barrier = std::make_shared<boost::barrier>(2);
+ Future<void> f;
+ {
+ auto dag = FutureDAG::create();
+ auto h1 = dag->add([barrier] {
+ auto p = std::make_shared<Promise<void>>();
+ std::thread t([p, barrier]{
+ barrier->wait();
+ p->setValue();
+ });
+ t.detach();
+ return p->getFuture();
+ });
+ auto h2 = dag->add(makeFutureFunc);
+ dag->dependency(h1, h2);
+ f = dag->go();
+ }
+ barrier->wait();
+ ASSERT_NO_THROW(f.get());
+}