2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/futures/Future.h>
19 #include <folly/futures/SharedPromise.h>
23 class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
25 static std::shared_ptr<FutureDAG> create() {
26 return std::shared_ptr<FutureDAG>(new FutureDAG());
29 typedef size_t Handle;
30 typedef std::function<Future<Unit>()> FutureFunc;
32 Handle add(FutureFunc func, Executor* executor = nullptr) {
33 nodes.emplace_back(std::move(func), executor);
34 return nodes.size() - 1;
37 void dependency(Handle a, Handle b) {
38 nodes[b].dependencies.push_back(a);
39 nodes[a].hasDependents = true;
44 return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
46 std::vector<Handle> rootNodes;
47 std::vector<Handle> leafNodes;
48 for (Handle handle = 0; handle < nodes.size(); handle++) {
49 if (nodes[handle].dependencies.empty()) {
50 rootNodes.push_back(handle);
52 if (!nodes[handle].hasDependents) {
53 leafNodes.push_back(handle);
57 auto sinkHandle = add([] { return Future<Unit>(); });
58 for (auto handle : leafNodes) {
59 dependency(handle, sinkHandle);
62 auto sourceHandle = add(nullptr);
63 for (auto handle : rootNodes) {
64 dependency(sourceHandle, handle);
67 for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
68 std::vector<Future<Unit>> dependencies;
69 for (auto depHandle : nodes[handle].dependencies) {
70 dependencies.push_back(nodes[depHandle].promise.getFuture());
74 .via(nodes[handle].executor)
75 .then([this, handle] {
77 .then([this, handle] (Try<Unit>&& t) {
78 nodes[handle].promise.setTry(std::move(t));
81 .onError([this, handle] (exception_wrapper ew) {
82 nodes[handle].promise.setException(std::move(ew));
86 nodes[sourceHandle].promise.setValue();
87 auto that = shared_from_this();
88 return nodes[sinkHandle].promise.getFuture().ensure([that]{});
92 FutureDAG() = default;
95 // Perform a modified topological sort to detect cycles
96 std::vector<std::vector<Handle>> dependencies;
97 for (auto& node : nodes) {
98 dependencies.push_back(node.dependencies);
101 std::vector<size_t> dependents(nodes.size());
102 for (auto& dependencyEdges : dependencies) {
103 for (auto handle : dependencyEdges) {
104 dependents[handle]++;
108 std::vector<Handle> handles;
109 for (Handle handle = 0; handle < nodes.size(); handle++) {
110 if (!nodes[handle].hasDependents) {
111 handles.push_back(handle);
115 while (!handles.empty()) {
116 auto handle = handles.back();
118 while (!dependencies[handle].empty()) {
119 auto dependency = dependencies[handle].back();
120 dependencies[handle].pop_back();
121 if (--dependents[dependency] == 0) {
122 handles.push_back(dependency);
127 for (auto& dependencyEdges : dependencies) {
128 if (!dependencyEdges.empty()) {
137 Node(FutureFunc&& funcArg, Executor* executorArg) :
138 func(std::move(funcArg)), executor(executorArg) {}
140 FutureFunc func{nullptr};
141 Executor* executor{nullptr};
142 SharedPromise<Unit> promise;
143 std::vector<Handle> dependencies;
144 bool hasDependents{false};
148 std::vector<Node> nodes;