From: Shubhanshu Agrawal Date: Mon, 3 Oct 2016 19:13:32 +0000 (-0700) Subject: add BatchDispatcher X-Git-Tag: v2016.10.10.00~23 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=52736dced685724c4a72bd60b6c879b30313e597;p=folly.git add BatchDispatcher Summary: This diff adds BatchDispatcher, which can be used to batch values while performing IO. This would be useful in writing single id code in node, and would be use to batch ids while doing IO at storage adapter. Differential Revision: D3900404 fbshipit-source-id: f53aa352344ff55674c7544302b6a1b4726214b6 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 1d204e01..66dc0d2e 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -516,6 +516,7 @@ nobase_follyinclude_HEADERS += \ fibers/AddTasks-inl.h \ fibers/Baton.h \ fibers/Baton-inl.h \ + fibers/BatchDispatcher.h \ fibers/BoostContextCompatibility.h \ fibers/EventBaseLoopController.h \ fibers/EventBaseLoopController-inl.h \ diff --git a/folly/fibers/BatchDispatcher.h b/folly/fibers/BatchDispatcher.h new file mode 100644 index 00000000..671675dd --- /dev/null +++ b/folly/fibers/BatchDispatcher.h @@ -0,0 +1,122 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { +namespace fibers { + +/** + * BatchDispatcher is useful for batching values while doing I/O. + * For example, if you are launching multiple tasks which take a + * single id and each task fetches from database, you can use BatchDispatcher + * to batch those ids and do a single query requesting all those ids. + * + * To use this, create a BatchDispatcher with a dispatch function + * which consumes a vector of values and returns a vector of results + * in the same order. Add values to BatchDispatcher using add function, + * which returns a future to the result set in your dispatch function. + * + * Implementation Logic: + * - using FiberManager as executor example, user creates a + * thread_local BatchDispatcher, on which user calls add(value). + * - add(value) adds the value in a vector and also schedules a new + * task(BatchDispatchFunction) which will read the vector of values and call + * user's DispatchFunction() on it. + * - assuming the executor queues all the task and runs them in order of their + * creation time, then BatchDispatcher will run later than all the tasks + * already created. Depending on this, all the values were added in these + * tasks would be picked up by BatchDispatchFunction() + * + * Example: + * - User schedules Task1, Task2, Task3 each of them calls BatchDispatch.add() + * with id1, id2, id3 respectively. + * - Executor's state {Task1, Task2, Task3}, BatchDispatchers state {} + * - After Task1 calls BatchDispatcher.add(): + * Executor's state {Task2, Task3, BatchDispatchFunction}, + * BatchDispatcher's state {id1} + * - After Task2 calls BatchDispatcher.add(): + * Executor's state {Task3, BatchDispatchFunction}, + * BatchDispatcher's state {id1, id2} + * - After Task3 calls BatchDispatcher.add(): + * Executor's state {BatchDispatchFunction}, + * BatchDispatcher's state {id1, id2, id3} + * - Now BatchDispatcher calls user's Dispatch function with {id1, id2, id3} + * + * Note: + * - This only works with executors which runs + * the tasks in order of their schedule time. + * - BatchDispatcher is not thread safe. + */ +template +class BatchDispatcher { + public: + using ValueBatchT = std::vector; + using ResultBatchT = std::vector; + using PromiseBatchT = std::vector>; + using DispatchFunctionT = folly::Function; + + BatchDispatcher(ExecutorT& executor, DispatchFunctionT dispatchFunc) + : executor_(executor), + state_(new DispatchState(std::move(dispatchFunc))) {} + + Future add(ValueT&& value) { + if (state_->values.empty()) { + executor_.add([state = state_]() { dispatchFunctionWrapper(*state); }); + } + + folly::Promise resultPromise; + auto resultFuture = resultPromise.getFuture(); + + state_->values.emplace_back(std::move(value)); + state_->promises.emplace_back(std::move(resultPromise)); + + return resultFuture; + } + + private: + struct DispatchState { + explicit DispatchState(DispatchFunctionT&& dispatchFunction) + : dispatchFunc(std::move(dispatchFunction)) {} + + DispatchFunctionT dispatchFunc; + ValueBatchT values; + PromiseBatchT promises; + }; + + static void dispatchFunctionWrapper(DispatchState& state) { + ValueBatchT values; + PromiseBatchT promises; + state.values.swap(values); + state.promises.swap(promises); + + auto results = state.dispatchFunc(std::move(values)); + if (results.size() != promises.size()) { + throw std::logic_error( + "Unexpected number of results returned from dispatch function"); + } + for (size_t i = 0; i < promises.size(); i++) { + promises[i].setValue(std::move(results[i])); + } + } + + ExecutorT& executor_; + std::shared_ptr state_; +}; +} +} diff --git a/folly/fibers/test/FibersTest.cpp b/folly/fibers/test/FibersTest.cpp index e0c5424b..f5218dd2 100644 --- a/folly/fibers/test/FibersTest.cpp +++ b/folly/fibers/test/FibersTest.cpp @@ -20,7 +20,9 @@ #include #include +#include #include +#include #include #include #include @@ -1595,6 +1597,137 @@ TEST(FiberManager, semaphore) { EXPECT_GE(counterB, 0); } +template +void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) { + thread_local BatchDispatcher batchDispatcher( + executor, [=](std::vector&& batch) { + EXPECT_EQ(batchSize, batch.size()); + std::vector results; + for (auto& it : batch) { + results.push_back(folly::to(it)); + } + return results; + }); + + auto indexCopy = index; + auto result = batchDispatcher.add(std::move(indexCopy)); + EXPECT_EQ(folly::to(index), result.get()); +} + +TEST(FiberManager, batchDispatchTest) { + folly::EventBase evb; + auto& executor = getFiberManager(evb); + + // Launch multiple fibers with a single id. + executor.add([&]() { + int batchSize = 10; + for (int i = 0; i < batchSize; i++) { + executor.add( + [=, &executor]() { singleBatchDispatch(executor, batchSize, i); }); + } + }); + evb.loop(); + + // Reuse the same BatchDispatcher to batch once again. + executor.add([&]() { + int batchSize = 10; + for (int i = 0; i < batchSize; i++) { + executor.add( + [=, &executor]() { singleBatchDispatch(executor, batchSize, i); }); + } + }); + evb.loop(); +} + +template +folly::Future> doubleBatchInnerDispatch( + ExecutorT& executor, + int totalNumberOfElements, + std::vector input) { + thread_local BatchDispatcher< + std::vector, + std::vector, + ExecutorT> + batchDispatcher(executor, [=](std::vector>&& batch) { + std::vector> results; + int numberOfElements = 0; + for (auto& unit : batch) { + numberOfElements += unit.size(); + std::vector result; + for (auto& element : unit) { + result.push_back(folly::to(element)); + } + results.push_back(std::move(result)); + } + EXPECT_EQ(totalNumberOfElements, numberOfElements); + return results; + }); + + return batchDispatcher.add(std::move(input)); +} + +/** + * Batch values in groups of 5, and then call inner dispatch. + */ +template +void doubleBatchOuterDispatch( + ExecutorT& executor, + int totalNumberOfElements, + int index) { + thread_local BatchDispatcher + batchDispatcher(executor, [=, &executor](std::vector&& batch) { + EXPECT_EQ(totalNumberOfElements, batch.size()); + std::vector results; + std::vector>> + innerDispatchResultFutures; + + std::vector group; + for (auto unit : batch) { + group.push_back(unit); + if (group.size() == 5) { + auto localGroup = group; + group.clear(); + + innerDispatchResultFutures.push_back(doubleBatchInnerDispatch( + executor, totalNumberOfElements, localGroup)); + } + } + + folly::collectAll( + innerDispatchResultFutures.begin(), innerDispatchResultFutures.end()) + .then([&]( + std::vector>> innerDispatchResults) { + for (auto& unit : innerDispatchResults) { + for (auto& element : unit.value()) { + results.push_back(element); + } + } + }) + .get(); + return results; + }); + + auto indexCopy = index; + auto result = batchDispatcher.add(std::move(indexCopy)); + EXPECT_EQ(folly::to(index), result.get()); +} + +TEST(FiberManager, doubleBatchDispatchTest) { + folly::EventBase evb; + auto& executor = getFiberManager(evb); + + // Launch multiple fibers with a single id. + executor.add([&]() { + int totalNumberOfElements = 20; + for (int i = 0; i < totalNumberOfElements; i++) { + executor.add([=, &executor]() { + doubleBatchOuterDispatch(executor, totalNumberOfElements, i); + }); + } + }); + evb.loop(); +} + /** * Test that we can properly track fiber stack usage. *