add BatchDispatcher
authorShubhanshu Agrawal <shubhanshu@fb.com>
Mon, 3 Oct 2016 19:13:32 +0000 (12:13 -0700)
committerFacebook Github Bot <facebook-github-bot-bot@fb.com>
Mon, 3 Oct 2016 19:23:49 +0000 (12:23 -0700)
Summary:
This diff adds BatchDispatcher, which can be used to batch values while performing IO.
This would be useful in writing single id code in node, and would be use to batch ids while doing IO at storage adapter.

Differential Revision: D3900404

fbshipit-source-id: f53aa352344ff55674c7544302b6a1b4726214b6

folly/Makefile.am
folly/fibers/BatchDispatcher.h [new file with mode: 0644]
folly/fibers/test/FibersTest.cpp

index 1d204e01476505c67a14848da99abce574d4bac5..66dc0d2e5c31a927b08c548ffd6161d41fbef2b2 100644 (file)
@@ -516,6 +516,7 @@ nobase_follyinclude_HEADERS += \
        fibers/AddTasks-inl.h \
        fibers/Baton.h \
        fibers/Baton-inl.h \
+       fibers/BatchDispatcher.h \
        fibers/BoostContextCompatibility.h \
        fibers/EventBaseLoopController.h \
        fibers/EventBaseLoopController-inl.h \
diff --git a/folly/fibers/BatchDispatcher.h b/folly/fibers/BatchDispatcher.h
new file mode 100644 (file)
index 0000000..671675d
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+
+namespace folly {
+namespace fibers {
+
+/**
+ * BatchDispatcher is useful for batching values while doing I/O.
+ * For example, if you are launching multiple tasks which take a
+ * single id and each task fetches from database, you can use BatchDispatcher
+ * to batch those ids and do a single query requesting all those ids.
+ *
+ * To use this, create a BatchDispatcher with a dispatch function
+ * which consumes a vector of values and returns a vector of results
+ * in the same order. Add values to BatchDispatcher using add function,
+ * which returns a future to the result set in your dispatch function.
+ *
+ * Implementation Logic:
+ *  - using FiberManager as executor example, user creates a
+ *    thread_local BatchDispatcher, on which user calls add(value).
+ *  - add(value) adds the value in a vector and also schedules a new
+ *    task(BatchDispatchFunction) which will read the vector of values and call
+ *    user's DispatchFunction() on it.
+ *  - assuming the executor queues all the task and runs them in order of their
+ *    creation time, then BatchDispatcher will run later than all the tasks
+ *    already created. Depending on this, all the values were added in these
+ *    tasks would be picked up by BatchDispatchFunction()
+ *
+ * Example:
+ *  - User schedules Task1, Task2, Task3 each of them calls BatchDispatch.add()
+ *    with id1, id2, id3 respectively.
+ *  - Executor's state {Task1, Task2, Task3}, BatchDispatchers state {}
+ *  - After Task1 calls BatchDispatcher.add():
+ *    Executor's state {Task2, Task3, BatchDispatchFunction},
+ *    BatchDispatcher's state {id1}
+ *  - After Task2 calls BatchDispatcher.add():
+ *    Executor's state {Task3, BatchDispatchFunction},
+ *    BatchDispatcher's state {id1, id2}
+ *  - After Task3 calls BatchDispatcher.add():
+ *    Executor's state {BatchDispatchFunction},
+ *    BatchDispatcher's state {id1, id2, id3}
+ *  - Now BatchDispatcher calls user's Dispatch function with {id1, id2, id3}
+ *
+ * Note:
+ *  - This only works with executors which runs
+ *    the tasks in order of their schedule time.
+ *  - BatchDispatcher is not thread safe.
+ */
+template <typename ValueT, typename ResultT, typename ExecutorT>
+class BatchDispatcher {
+ public:
+  using ValueBatchT = std::vector<ValueT>;
+  using ResultBatchT = std::vector<ResultT>;
+  using PromiseBatchT = std::vector<folly::Promise<ResultT>>;
+  using DispatchFunctionT = folly::Function<ResultBatchT(ValueBatchT&&)>;
+
+  BatchDispatcher(ExecutorT& executor, DispatchFunctionT dispatchFunc)
+      : executor_(executor),
+        state_(new DispatchState(std::move(dispatchFunc))) {}
+
+  Future<ResultT> add(ValueT&& value) {
+    if (state_->values.empty()) {
+      executor_.add([state = state_]() { dispatchFunctionWrapper(*state); });
+    }
+
+    folly::Promise<ResultT> resultPromise;
+    auto resultFuture = resultPromise.getFuture();
+
+    state_->values.emplace_back(std::move(value));
+    state_->promises.emplace_back(std::move(resultPromise));
+
+    return resultFuture;
+  }
+
+ private:
+  struct DispatchState {
+    explicit DispatchState(DispatchFunctionT&& dispatchFunction)
+        : dispatchFunc(std::move(dispatchFunction)) {}
+
+    DispatchFunctionT dispatchFunc;
+    ValueBatchT values;
+    PromiseBatchT promises;
+  };
+
+  static void dispatchFunctionWrapper(DispatchState& state) {
+    ValueBatchT values;
+    PromiseBatchT promises;
+    state.values.swap(values);
+    state.promises.swap(promises);
+
+    auto results = state.dispatchFunc(std::move(values));
+    if (results.size() != promises.size()) {
+      throw std::logic_error(
+          "Unexpected number of results returned from dispatch function");
+    }
+    for (size_t i = 0; i < promises.size(); i++) {
+      promises[i].setValue(std::move(results[i]));
+    }
+  }
+
+  ExecutorT& executor_;
+  std::shared_ptr<DispatchState> state_;
+};
+}
+}
index e0c5424b337dd6376901bf9d3171a55c52df2964..f5218dd285dbf6a444c1265755dbd05ef4e447da 100644 (file)
@@ -20,7 +20,9 @@
 #include <folly/Memory.h>
 #include <folly/futures/Future.h>
 
+#include <folly/Conv.h>
 #include <folly/fibers/AddTasks.h>
+#include <folly/fibers/BatchDispatcher.h>
 #include <folly/fibers/EventBaseLoopController.h>
 #include <folly/fibers/FiberManager.h>
 #include <folly/fibers/FiberManagerMap.h>
@@ -1595,6 +1597,137 @@ TEST(FiberManager, semaphore) {
   EXPECT_GE(counterB, 0);
 }
 
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+      executor, [=](std::vector<int>&& batch) {
+        EXPECT_EQ(batchSize, batch.size());
+        std::vector<std::string> results;
+        for (auto& it : batch) {
+          results.push_back(folly::to<std::string>(it));
+        }
+        return results;
+      });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, batchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+
+  // Reuse the same BatchDispatcher to batch once again.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+}
+
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    std::vector<int> input) {
+  thread_local BatchDispatcher<
+      std::vector<int>,
+      std::vector<std::string>,
+      ExecutorT>
+  batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+    std::vector<std::vector<std::string>> results;
+    int numberOfElements = 0;
+    for (auto& unit : batch) {
+      numberOfElements += unit.size();
+      std::vector<std::string> result;
+      for (auto& element : unit) {
+        result.push_back(folly::to<std::string>(element));
+      }
+      results.push_back(std::move(result));
+    }
+    EXPECT_EQ(totalNumberOfElements, numberOfElements);
+    return results;
+  });
+
+  return batchDispatcher.add(std::move(input));
+}
+
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT>
+  batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
+    EXPECT_EQ(totalNumberOfElements, batch.size());
+    std::vector<std::string> results;
+    std::vector<folly::Future<std::vector<std::string>>>
+        innerDispatchResultFutures;
+
+    std::vector<int> group;
+    for (auto unit : batch) {
+      group.push_back(unit);
+      if (group.size() == 5) {
+        auto localGroup = group;
+        group.clear();
+
+        innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+            executor, totalNumberOfElements, localGroup));
+      }
+    }
+
+    folly::collectAll(
+        innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
+        .then([&](
+            std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
+          for (auto& unit : innerDispatchResults) {
+            for (auto& element : unit.value()) {
+              results.push_back(element);
+            }
+          }
+        })
+        .get();
+    return results;
+  });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, doubleBatchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 20;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add([=, &executor]() {
+        doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+      });
+    }
+  });
+  evb.loop();
+}
+
 /**
  * Test that we can properly track fiber stack usage.
  *