nobase_follyinclude_HEADERS += \
fibers/AddTasks.h \
fibers/AddTasks-inl.h \
+ fibers/AtomicBatchDispatcher.h \
+ fibers/AtomicBatchDispatcher-inl.h \
fibers/Baton.h \
fibers/Baton-inl.h \
fibers/BatchDispatcher.h \
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace folly {
+namespace fibers {
+
+template <typename InputT, typename ResultT>
+struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
+ DispatchBaton(DispatchFunctionT&& dispatchFunction)
+ : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
+
+ ~DispatchBaton() {
+ fulfillPromises();
+ }
+
+ void reserve(size_t numEntries) {
+ optEntries_.reserve(numEntries);
+ }
+
+ void setError(std::string message) {
+ optErrorMessage_ = std::move(message);
+ }
+
+ void setExpectedCount(size_t expectedCount) {
+ expectedCount_ = expectedCount;
+ }
+
+ Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
+ if (sequenceNumber >= optEntries_.size()) {
+ optEntries_.resize(sequenceNumber + 1);
+ }
+ folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
+ if (optEntry) {
+ throw std::logic_error(
+ "Cannot have multiple inputs with same token sequence number");
+ }
+ optEntry = Entry(std::move(input));
+ return optEntry->promise.getFuture();
+ }
+
+ private:
+ void setExceptionResults(std::exception_ptr eptr) {
+ auto exceptionWrapper = exception_wrapper(eptr);
+ for (auto& optEntry : optEntries_) {
+ if (optEntry) {
+ optEntry->promise.setException(exceptionWrapper);
+ }
+ }
+ }
+
+ template <typename TException>
+ void setExceptionResults(
+ const TException& ex,
+ std::exception_ptr eptr = std::exception_ptr()) {
+ auto exceptionWrapper =
+ eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
+ for (auto& optEntry : optEntries_) {
+ if (optEntry) {
+ optEntry->promise.setException(exceptionWrapper);
+ }
+ }
+ }
+
+ void fulfillPromises() {
+ try {
+ // If an error message is set, set all promises to exception with message
+ if (optErrorMessage_) {
+ auto ex = std::logic_error(*optErrorMessage_);
+ return setExceptionResults(std::move(ex));
+ }
+
+ // Create inputs vector and validate entries count same as expectedCount_
+ std::vector<InputT> inputs;
+ inputs.reserve(expectedCount_);
+ bool allEntriesFound = (optEntries_.size() == expectedCount_);
+ if (allEntriesFound) {
+ for (auto& optEntry : optEntries_) {
+ if (!optEntry) {
+ allEntriesFound = false;
+ break;
+ }
+ inputs.emplace_back(std::move(optEntry->input));
+ }
+ }
+ if (!allEntriesFound) {
+ auto ex = std::logic_error(
+ "One or more input tokens destroyed before calling dispatch");
+ return setExceptionResults(std::move(ex));
+ }
+
+ // Call the user provided batch dispatch function to get all results
+ // and make sure that we have the expected number of results returned
+ auto results = dispatchFunction_(std::move(inputs));
+ if (results.size() != expectedCount_) {
+ auto ex = std::logic_error(
+ "Unexpected number of results returned from dispatch function");
+ return setExceptionResults(std::move(ex));
+ }
+
+ // Fulfill the promises with the results from the batch dispatch
+ for (size_t i = 0; i < expectedCount_; ++i) {
+ optEntries_[i]->promise.setValue(std::move(results[i]));
+ }
+ } catch (const std::exception& ex) {
+ return setExceptionResults(ex, std::current_exception());
+ } catch (...) {
+ return setExceptionResults(std::current_exception());
+ }
+ }
+
+ struct Entry {
+ InputT input;
+ folly::Promise<ResultT> promise;
+
+ Entry(Entry&& other) noexcept
+ : input(std::move(other.input)), promise(std::move(other.promise)) {}
+
+ Entry& operator=(Entry&& other) noexcept {
+ input = std::move(other.input);
+ promise = std::move(other.promise);
+ return *this;
+ }
+
+ explicit Entry(InputT&& input) : input(std::move(input)) {}
+ };
+
+ size_t expectedCount_;
+ DispatchFunctionT dispatchFunction_;
+ std::vector<folly::Optional<Entry>> optEntries_;
+ folly::Optional<std::string> optErrorMessage_;
+};
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
+ std::shared_ptr<DispatchBaton> baton,
+ size_t sequenceNumber)
+ : baton_(std::move(baton)), SEQUENCE_NUMBER(sequenceNumber) {}
+
+template <typename InputT, typename ResultT>
+Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
+ InputT input) {
+ auto baton = std::move(baton_);
+ if (!baton) {
+ throw std::logic_error(
+ "Dispatch called more than once on the same Token object");
+ }
+ return baton->getFutureResult(std::move(input), SEQUENCE_NUMBER);
+}
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
+ DispatchFunctionT&& dispatchFunc)
+ : numTokensIssued_(0),
+ baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
+ if (baton_) {
+ baton_->setError(
+ "AtomicBatchDispatcher destroyed before commit() was called on it");
+ commit();
+ }
+}
+
+template <typename InputT, typename ResultT>
+void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
+ if (!baton_) {
+ throw std::logic_error("Cannot call reserve(....) after calling commit()");
+ }
+ baton_->reserve(numEntries);
+}
+
+template <typename InputT, typename ResultT>
+auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
+ if (!baton_) {
+ throw std::logic_error("Cannot issue more tokens after calling commit()");
+ }
+ return Token(baton_, numTokensIssued_++);
+}
+
+template <typename InputT, typename ResultT>
+void AtomicBatchDispatcher<InputT, ResultT>::commit() {
+ auto baton = std::move(baton_);
+ if (!baton) {
+ throw std::logic_error(
+ "Cannot call commit() more than once on the same dispatcher");
+ }
+ baton->setExpectedCount(numTokensIssued_);
+}
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
+ folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
+ size_t initialCapacity) {
+ auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
+ if (initialCapacity) {
+ abd.reserve(initialCapacity);
+ }
+ return abd;
+}
+
+} // namespace fibers
+} // manespace folly
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Function.h>
+#include <folly/Optional.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+#include <memory>
+#include <utility>
+#include <vector>
+
+namespace folly {
+namespace fibers {
+
+/**
+ * AtomicBatchDispatcher should be used if you want to process fiber tasks in
+ * parallel, but require to synchronize them at some point. The canonical
+ * example is to create a database transaction dispatch round. This API notably
+ * enforces that all tasks in the batch have reached the synchronization point
+ * before the user provided dispatch function is called with all the inputs
+ * provided in one function call. It also provides a guarantee that the inputs
+ * in the vector of inputs passed to the user provided dispatch function will be
+ * in the same order as the order in which the token for the job was issued.
+ *
+ * Use this when you want all the inputs in the batch to be processed by a
+ * single function call to the user provided dispatch function.
+ * The user provided dispatch function takes a vector of InputT as input and
+ * returns a vector of ResultT.
+ * To use an AtomicBatchDispatcher, create it by providing a dispatch function:
+ * TO EITHER the constructor of the AtomicBatchDispatcher class
+ * (can call reserve method on the dispatcher to reserve space (for number of
+ * inputs expected)),
+ * OR the createAtomicBatchDispatcher function in folly::fibers namespace
+ * (optionally specify an initial capacity (for number of inputs expected)).
+ * The AtomicBatchDispatcher object created using this call (dispatcher),
+ * is the only object that can issue tokens (Token objects) that are used to
+ * add an input to the batch. A single Token is issued when the user calls
+ * the getToken function on the dispatcher.
+ * Token objects cannot be copied (can only be moved). User can call the public
+ * dispatch function on the Token providing a single input value. The dispatch
+ * function returns a folly::Future<ResultT> value that the user can then wait
+ * on to obtain a ResultT value. The ResultT value will only be available once
+ * the dispatch function has been called on all the Tokens in the batch and the
+ * user has called dispatcher.commit() to indicate no more batched transactions
+ * are to be added.
+ * User code pertaining to a task can be run between the point where a token for
+ * the task has been issued and before calling the dispatch function on the
+ * token. Since this code can potentially throw, the token issued for a task
+ * should be moved into this processing code in such a way that if an exception
+ * is thrown and then handled, the token object for the task is destroyed.
+ * The batch query dispatcher will wait until all tokens have either been
+ * destroyed or have had the dispatch function called on them. Leaking an
+ * issued token will cause the batch dispatch to wait forever to happen.
+ *
+ * The AtomicBatchDispatcher object is referred to as the dispatcher below.
+ *
+ * POSSIBLE ERRORS:
+ * 1) The dispatcher is destroyed before calling commit on it, for example
+ * because the user forgot to call commit OR an exception was thrown
+ * in user code before the call to commit:
+ * - The future ResultT has an exception of type std::logic_error set for all
+ * tokens that were issued by the dispatcher (once all tokens are either
+ * destroyed or have called dispatch)
+ * 2) Calling the dispatch function more than once on the same Token object
+ * (or a moved version of the same Token):
+ * - Subsequent calls to dispatch (after the first one) will throw an
+ * std::logic_error exception (the batch itself will not have any errors
+ * and will get processed)
+ * 3) One/more of the Tokens issued are destroyed before calling dispatch on
+ * it/them:
+ * - The future ResultT has an exception of type std::logic_error set for all
+ * tokens that were issued by the dispatcher (once all tokens are either
+ * destroyed or have called dispatch)
+ * 4) dispatcher.getToken() is called after calling dispatcher.commit()
+ * - the call to getToken() will throw an std::logic_error exception
+ * (the batch itself will not have any errors and will get processed).
+ * 5) All tokens were issued and called dispatch, the user provided batch
+ * dispatch function is called, but that function throws any exception.
+ * - The future ResultT has exception for all tokens that were issued by
+ * the dispatcher. The result will contain the wrapped user exception.
+ *
+ * EXAMPLE (There are other ways to achieve this, but this is one example):
+ * - User creates an AtomicBatchDispatcher on stack
+ * auto dispatcher =
+ * folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
+ * - User creates "count" number of token objects by calling "getToken" count
+ * number of times
+ * std::vector<Job> jobs;
+ * for (size_t i = 0; i < count; ++i) {
+ * auto token = dispatcher.getToken();
+ * jobs.push_back(Job(std::move(token), singleInputValueToProcess);
+ * }
+ * - User calls commit() on the dispatcher to indicate that no new tokens will
+ * be issued for this batch
+ * dispatcher.commit();
+ * - Use any single threaded executor that will process the jobs
+ * - On each execution (fiber) preprocess a single "Job" that has been moved in
+ * from the original vector "jobs". This way if the preprocessing throws
+ * the Job object being processed is destroyed and so is the token.
+ * - On each execution (fiber) call the dispatch on the token
+ * auto future = job.token.dispatch(job.input);
+ * - Save the future returned so that eventually you can wait on the results
+ * ResultT result;
+ * try {
+ * result = future.value();
+ * // future.hasValue() is true
+ * } catch (...) {
+ * // future.hasException() is true
+ * <DO WHATEVER YOU WANT IN CASE OF ERROR> }
+ * }
+ *
+ * NOTES:
+ * - AtomicBatchDispatcher is not thread safe.
+ * - Works for executors that run tasks on a single thread.
+ */
+template <typename InputT, typename ResultT>
+class AtomicBatchDispatcher {
+ private:
+ struct DispatchBaton;
+ friend struct DispatchBaton;
+
+ public:
+ using DispatchFunctionT =
+ folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
+
+ class Token {
+ public:
+ explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
+
+ Future<ResultT> dispatch(InputT input);
+
+ // Allow moving a Token object
+ Token(Token&&) = default;
+ Token& operator=(Token&&) = default;
+
+ private:
+ // Disallow copying a Token object
+ Token(const Token&) = delete;
+ Token& operator=(const Token&) = delete;
+
+ std::shared_ptr<DispatchBaton> baton_;
+ const size_t SEQUENCE_NUMBER;
+ };
+
+ explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
+
+ ~AtomicBatchDispatcher();
+
+ // numEntries is a *hint* about the number of inputs to expect:
+ // - It is used purely to reserve space for storing vector of inputs etc.,
+ // so that reeallocation and move copy are reduced / not needed.
+ // - It is provided purely for performance reasons
+ void reserve(size_t numEntries);
+
+ Token getToken();
+
+ void commit();
+
+ // Allow moving an AtomicBatchDispatcher object
+ AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
+ AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
+
+ private:
+ // Disallow copying an AtomicBatchDispatcher object
+ AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
+ AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
+
+ size_t numTokensIssued_;
+ std::shared_ptr<DispatchBaton> baton_;
+};
+
+// initialCapacity is a *hint* about the number of inputs to expect:
+// - It is used purely to reserve space for storing vector of inputs etc.,
+// so that reeallocation and move copy are reduced / not needed.
+// - It is provided purely for performance reasons
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
+ folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
+ size_t initialCapacity = 0);
+
+} // namespace fibers
+} // namespace folly
+
+#include <folly/fibers/AtomicBatchDispatcher-inl.h>
#include <vector>
#include <folly/Memory.h>
+#include <folly/Random.h>
#include <folly/futures/Future.h>
#include <folly/Conv.h>
#include <folly/fibers/AddTasks.h>
+#include <folly/fibers/AtomicBatchDispatcher.h>
#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManager.h>
evb.loop();
}
+namespace AtomicBatchDispatcherTesting {
+
+using ValueT = size_t;
+using ResultT = std::string;
+using DispatchFunctionT =
+ folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
+
+#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
+#if ENABLE_TRACE_IN_TEST
+#define OUTPUT_TRACE std::cerr
+#else // ENABLE_TRACE_IN_TEST
+struct DevNullPiper {
+ template <typename T>
+ DevNullPiper& operator<<(const T&) {
+ return *this;
+ }
+
+ DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
+ return *this;
+ }
+} devNullPiper;
+#define OUTPUT_TRACE devNullPiper
+#endif // ENABLE_TRACE_IN_TEST
+
+struct Job {
+ AtomicBatchDispatcher<ValueT, ResultT>::Token token;
+ ValueT input;
+
+ void preprocess(FiberManager& executor, bool die) {
+ // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
+ clock_t msecToDoIO = folly::Random::rand32() % 10;
+ double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
+ double endAfter = start + msecToDoIO;
+ while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
+ executor.yield();
+ }
+ if (die) {
+ throw std::logic_error("Simulating preprocessing failure");
+ }
+ }
+
+ Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
+ : token(std::move(t)), input(i) {}
+
+ Job(Job&&) = default;
+ Job& operator=(Job&&) = default;
+};
+
+ResultT processSingleInput(ValueT&& input) {
+ return folly::to<ResultT>(std::move(input));
+}
+
+std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
+ size_t expectedCount = inputs.size();
+ std::vector<ResultT> results;
+ results.reserve(expectedCount);
+ for (size_t i = 0; i < expectedCount; ++i) {
+ results.emplace_back(processSingleInput(std::move(inputs[i])));
+ }
+ return results;
+}
+
+void createJobs(
+ AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
+ std::vector<Job>& jobs,
+ size_t count) {
+ jobs.clear();
+ for (size_t i = 0; i < count; ++i) {
+ jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
+ }
+}
+
+enum class DispatchProblem {
+ None,
+ PreprocessThrows,
+ DuplicateDispatch,
+};
+
+void dispatchJobs(
+ FiberManager& executor,
+ std::vector<Job>& jobs,
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ DispatchProblem dispatchProblem = DispatchProblem::None,
+ size_t problemIndex = size_t(-1)) {
+ EXPECT_TRUE(
+ dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
+ results.clear();
+ results.resize(jobs.size());
+ for (size_t i = 0; i < jobs.size(); ++i) {
+ executor.add(
+ [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
+ try {
+ Job job(std::move(jobs[i]));
+
+ if (dispatchProblem == DispatchProblem::PreprocessThrows) {
+ if (i == problemIndex) {
+ EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
+ return;
+ }
+ }
+
+ job.preprocess(executor, false);
+ OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
+ results[i] = job.token.dispatch(job.input);
+ OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
+
+ if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
+ if (i == problemIndex) {
+ EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
+ }
+ }
+ } catch (...) {
+ OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
+ }
+ });
+ }
+}
+
+void validateResult(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t i) {
+ try {
+ OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
+ << std::endl;
+ } catch (std::exception& e) {
+ OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
+ throw;
+ }
+}
+
+template <typename TException>
+void validateResults(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t expectedNumResults) {
+ size_t numResultsFilled = 0;
+ for (size_t i = 0; i < results.size(); ++i) {
+ if (!results[i]) {
+ continue;
+ }
+ ++numResultsFilled;
+ EXPECT_THROW(validateResult(results, i), TException);
+ }
+ EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+void validateResults(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t expectedNumResults) {
+ size_t numResultsFilled = 0;
+ for (size_t i = 0; i < results.size(); ++i) {
+ if (!results[i]) {
+ continue;
+ }
+ ++numResultsFilled;
+ EXPECT_NO_THROW(validateResult(results, i));
+ ValueT expectedInput = i;
+ EXPECT_EQ(
+ results[i]->value(), processSingleInput(std::move(expectedInput)));
+ }
+ EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+} // AtomicBatchDispatcherTesting
+
+#define SET_UP_TEST_FUNC \
+ using namespace AtomicBatchDispatcherTesting; \
+ folly::EventBase evb; \
+ auto& executor = getFiberManager(evb); \
+ const size_t COUNT = 11; \
+ std::vector<Job> jobs; \
+ jobs.reserve(COUNT); \
+ std::vector<folly::Optional<folly::Future<ResultT>>> results; \
+ results.reserve(COUNT); \
+ DispatchFunctionT dispatchFunc
+
+TEST(FiberManager, ABD_Test) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing AtomicBatchDispatcher with explicit call to commit()
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults(results, COUNT);
+}
+
+TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing AtomicBatchDispatcher destroyed before calling commit.
+ // Handles error cases for:
+ // - User might have forgotten to add the call to commit() in the code
+ // - An unexpected exception got thrown in user code before commit() is called
+ //
+ try {
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ throw std::runtime_error(
+ "Unexpected exception in user code before commit called");
+ atomicBatchDispatcher.commit();
+ } catch (...) {
+ /* User code handles the exception and does not exit process */
+ }
+ evb.loop();
+ validateResults<std::logic_error>(results, COUNT);
+}
+
+TEST(FiberManager, ABD_PreprocessingFailureTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing preprocessing failure on a job throws
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults<std::logic_error>(results, COUNT - 1);
+}
+
+TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that calling dispatch more than once on the same token throws
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+}
+
+TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that exception set on attempt to call getToken after commit called
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ atomicBatchDispatcher.commit();
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ dispatchJobs(executor, jobs, results);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ evb.loop();
+ validateResults(results, COUNT);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+}
+
+TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that exception is set if user provided batch dispatch throws
+ //
+ dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
+ auto results = userDispatchFunc(std::move(inputs));
+ throw std::runtime_error("Unexpected exception in user dispatch function");
+ return results;
+ };
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults<std::runtime_error>(results, COUNT);
+}
+
/**
* Test that we can properly track fiber stack usage.
*