return nodes.size() - 1;
}
+ void remove(Handle a) {
+ if (nodes.size() > a && nodes[a].hasDependents) {
+ for (auto& node : nodes) {
+ auto& deps = node.dependencies;
+ deps.erase(
+ std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
+ for (Handle& handle : deps) {
+ if (handle > a) {
+ handle--;
+ }
+ }
+ }
+ }
+ nodes.erase(nodes.begin() + a);
+ }
+
+ void reset() {
+ // Delete all but source node, and reset dependency properties
+ Handle source_node;
+ std::unordered_set<Handle> memo;
+ for (auto& node : nodes) {
+ for (Handle handle : node.dependencies) {
+ memo.insert(handle);
+ }
+ }
+ for (Handle handle = 0; handle < nodes.size(); handle++) {
+ if (memo.find(handle) == memo.end()) {
+ source_node = handle;
+ }
+ }
+
+ // Faster to just create a new vector with the element in it?
+ nodes.erase(nodes.begin(), nodes.begin() + source_node);
+ nodes.erase(nodes.begin() + 1, nodes.end());
+ nodes[0].hasDependents = false;
+ nodes[0].dependencies.clear();
+ }
+
void dependency(Handle a, Handle b) {
nodes[b].dependencies.push_back(a);
nodes[a].hasDependents = true;
}
+ void clean_state(Handle source, Handle sink) {
+ for (auto handle : nodes[sink].dependencies) {
+ nodes[handle].hasDependents = false;
+ }
+ nodes[0].hasDependents = false;
+ remove(source);
+ remove(sink);
+ }
+
Future<Unit> go() {
if (hasCycle()) {
return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
}
collect(dependencies)
- .via(nodes[handle].executor)
- .then([this, handle] {
- nodes[handle].func()
- .then([this, handle] (Try<Unit>&& t) {
+ .via(nodes[handle].executor)
+ .then([this, handle] {
+ nodes[handle].func().then([this, handle](Try<Unit>&& t) {
nodes[handle].promise.setTry(std::move(t));
});
- })
- .onError([this, handle] (exception_wrapper ew) {
- nodes[handle].promise.setException(std::move(ew));
- });
+ })
+ .onError([this, handle](exception_wrapper ew) {
+ nodes[handle].promise.setException(std::move(ew));
+ });
}
nodes[sourceHandle].promise.setValue();
auto that = shared_from_this();
- return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+ return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
+ [this, sourceHandle, sinkHandle]() {
+ clean_state(sourceHandle, sinkHandle);
+ });
}
private:
}
struct Node {
- Node(FutureFunc&& funcArg, Executor* executorArg) :
- func(std::move(funcArg)), executor(executorArg) {}
+ Node(FutureFunc&& funcArg, Executor* executorArg)
+ : func(std::move(funcArg)), executor(executorArg) {}
FutureFunc func{nullptr};
Executor* executor{nullptr};
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <boost/thread/barrier.hpp>
#include <folly/experimental/FutureDAG.h>
#include <gtest/gtest.h>
-#include <boost/thread/barrier.hpp>
using namespace folly;
return handle;
}
+ void reset() {
+ Handle source_node;
+ std::unordered_set<Handle> memo;
+ for (auto& node : nodes) {
+ for (Handle handle : node.second->dependencies) {
+ memo.insert(handle);
+ }
+ }
+ for (auto& node : nodes) {
+ if (memo.find(node.first) == memo.end()) {
+ source_node = node.first;
+ }
+ }
+ for (auto it = nodes.cbegin(); it != nodes.cend();) {
+ if (it->first != source_node) {
+ it = nodes.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ dag->reset();
+ }
+
+ void remove(Handle a) {
+ for (auto itr = nodes.begin(); itr != nodes.end(); itr++) {
+ auto& deps = itr->second->dependencies;
+ if (std::find(deps.begin(), deps.end(), a) != deps.end()) {
+ deps.erase(deps.begin() + a);
+ }
+ }
+ nodes.erase(a);
+ dag->remove(a);
+ }
void dependency(Handle a, Handle b) {
nodes.at(b)->dependencies.push_back(a);
dag->dependency(a, b);
std::vector<Handle> order;
};
-
TEST_F(FutureDAGTest, SingleNode) {
add();
ASSERT_NO_THROW(dag->go().get());
checkOrder();
}
+TEST_F(FutureDAGTest, RemoveSingleNode) {
+ auto h1 = add();
+ auto h2 = add();
+ remove(h2);
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+TEST_F(FutureDAGTest, RemoveNodeComplex) {
+ auto h1 = add();
+ auto h2 = add();
+ auto h3 = add();
+ dependency(h1, h3);
+ dependency(h2, h1);
+ remove(h1);
+ remove(h2);
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
+TEST_F(FutureDAGTest, ResetDAG) {
+ auto h1 = add();
+ auto h2 = add();
+ auto h3 = add();
+ dependency(h3, h1);
+ dependency(h2, h3);
+
+ reset();
+ ASSERT_NO_THROW(dag->go().get());
+ checkOrder();
+}
+
TEST_F(FutureDAGTest, FanOut) {
auto h1 = add();
auto h2 = add();
checkOrder();
}
-FutureDAG::FutureFunc makeFutureFunc = []{
- return makeFuture();
-};
+FutureDAG::FutureFunc makeFutureFunc = [] { return makeFuture(); };
-FutureDAG::FutureFunc throwFunc = []{
+FutureDAG::FutureFunc throwFunc = [] {
return makeFuture<Unit>(std::runtime_error("oops"));
};
auto dag = FutureDAG::create();
auto h1 = dag->add([barrier] {
auto p = std::make_shared<Promise<Unit>>();
- std::thread t([p, barrier]{
+ std::thread t([p, barrier] {
barrier->wait();
p->setValue();
});