From: Andrii Grynenko Date: Thu, 5 May 2016 01:15:53 +0000 (-0700) Subject: Allow adding tasks to TaskIterator dynamically X-Git-Tag: 2016.07.26~275 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=93db3df45dd290f1024b9a4e556f65c549e0c43e;p=folly.git Allow adding tasks to TaskIterator dynamically Reviewed By: yfeldblum Differential Revision: D3244669 fb-gh-sync-id: 73fa4ecb0432a802e67ef922255a896d96f32374 fbshipit-source-id: 73fa4ecb0432a802e67ef922255a896d96f32374 --- diff --git a/folly/experimental/fibers/AddTasks-inl.h b/folly/experimental/fibers/AddTasks-inl.h index 6a5c1da4..f0a712e9 100644 --- a/folly/experimental/fibers/AddTasks-inl.h +++ b/folly/experimental/fibers/AddTasks-inl.h @@ -16,20 +16,12 @@ #include #include -#include - namespace folly { namespace fibers { template TaskIterator::TaskIterator(TaskIterator&& other) noexcept - : context_(std::move(other.context_)), id_(other.id_) {} - -template -TaskIterator::TaskIterator(std::shared_ptr context) - : context_(std::move(context)), id_(-1) { - assert(context_); -} + : context_(std::move(other.context_)), id_(other.id_), fm_(other.fm_) {} template inline bool TaskIterator::hasCompleted() const { @@ -92,6 +84,30 @@ inline size_t TaskIterator::getTaskID() const { return id_; } +template +template +void TaskIterator::addTask(F&& func) { + static_assert( + std::is_convertible::type, T>::value, + "TaskIterator: T must be convertible from func()'s return type"); + + auto taskId = context_->totalTasks++; + + fm_.addTask( + [ taskId, context = context_, func = std::forward(func) ]() mutable { + context->results.emplace_back( + taskId, folly::makeTryWith(std::move(func))); + + // Check for awaiting iterator. + if (context->promise.hasValue()) { + if (--context->tasksToFulfillPromise == 0) { + context->promise->setValue(); + context->promise.clear(); + } + } + }); +} + template TaskIterator::value_type()>::type> @@ -101,32 +117,15 @@ addTasks(InputIterator first, InputIterator last) { ResultType; typedef TaskIterator IteratorType; - auto context = std::make_shared(); - context->totalTasks = std::distance(first, last); - context->results.reserve(context->totalTasks); - - for (size_t i = 0; first != last; ++i, ++first) { -#ifdef __clang__ -#pragma clang diagnostic push // ignore generalized lambda capture warning -#pragma clang diagnostic ignored "-Wc++1y-extensions" -#endif - addTask([ i, context, f = std::move(*first) ]() { - context->results.emplace_back(i, folly::makeTryWith(std::move(f))); - - // Check for awaiting iterator. - if (context->promise.hasValue()) { - if (--context->tasksToFulfillPromise == 0) { - context->promise->setValue(); - context->promise.clear(); - } - } - }); -#ifdef __clang__ -#pragma clang diagnostic pop -#endif + IteratorType iterator; + + for (; first != last; ++first) { + iterator.addTask(std::move(*first)); } - return IteratorType(std::move(context)); + iterator.context_->results.reserve(iterator.context_->totalTasks); + + return std::move(iterator); } } } diff --git a/folly/experimental/fibers/AddTasks.h b/folly/experimental/fibers/AddTasks.h index 9e3019d3..be4c25b1 100644 --- a/folly/experimental/fibers/AddTasks.h +++ b/folly/experimental/fibers/AddTasks.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -49,6 +50,8 @@ class TaskIterator { public: typedef T value_type; + TaskIterator() : fm_(FiberManager::getFiberManager()) {} + // not copyable TaskIterator(const TaskIterator& other) = delete; TaskIterator& operator=(const TaskIterator& other) = delete; @@ -57,6 +60,14 @@ class TaskIterator { TaskIterator(TaskIterator&& other) noexcept; TaskIterator& operator=(TaskIterator&& other) = delete; + /** + * Add one more task to the TaskIterator. + * + * @param func task to be added, will be scheduled on current FiberManager + */ + template + void addTask(F&& func); + /** * @return True if there are tasks immediately available to be consumed (no * need to await on them). @@ -111,10 +122,9 @@ class TaskIterator { size_t tasksToFulfillPromise{0}; }; - std::shared_ptr context_; - size_t id_; - - explicit TaskIterator(std::shared_ptr context); + std::shared_ptr context_{std::make_shared()}; + size_t id_{std::numeric_limits::max()}; + FiberManager& fm_; folly::Try awaitNextResult(); }; diff --git a/folly/experimental/fibers/test/FibersTest.cpp b/folly/experimental/fibers/test/FibersTest.cpp index 08d182ff..c19e95a0 100644 --- a/folly/experimental/fibers/test/FibersTest.cpp +++ b/folly/experimental/fibers/test/FibersTest.cpp @@ -463,7 +463,7 @@ TEST(FiberManager, addTasksVoidThrow) { loopController.loop(std::move(loopFunc)); } -TEST(FiberManager, reserve) { +TEST(FiberManager, addTasksReserve) { std::vector> pendingFibers; bool taskAdded = false; @@ -517,6 +517,42 @@ TEST(FiberManager, reserve) { loopController.loop(std::move(loopFunc)); } +TEST(FiberManager, addTaskDynamic) { + folly::EventBase evb; + + Baton batons[3]; + + auto makeTask = [&](size_t taskId) { + return [&, taskId]() -> size_t { + batons[taskId].wait(); + return taskId; + }; + }; + + getFiberManager(evb) + .addTaskFuture([&]() { + TaskIterator iterator; + + iterator.addTask(makeTask(0)); + iterator.addTask(makeTask(1)); + + batons[1].post(); + + EXPECT_EQ(1, iterator.awaitNext()); + + iterator.addTask(makeTask(2)); + + batons[2].post(); + + EXPECT_EQ(2, iterator.awaitNext()); + + batons[0].post(); + + EXPECT_EQ(0, iterator.awaitNext()); + }) + .waitVia(&evb); +} + TEST(FiberManager, forEach) { std::vector> pendingFibers; bool taskAdded = false;