detail/StaticSingletonManager.h \
detail/Stats.h \
detail/ThreadLocalDetail.h \
- detail/TryDetail.h \
+ detail/TryDetail.h \
detail/TurnSequencer.h \
detail/UncaughtExceptionCounter.h \
Demangle.h \
fibers/Baton-inl.h \
fibers/BatchDispatcher.h \
fibers/BoostContextCompatibility.h \
+ fibers/detail/AtomicBatchDispatcher.h \
fibers/EventBaseLoopController.h \
fibers/EventBaseLoopController-inl.h \
fibers/Fiber.h \
libfolly_la_SOURCES += \
fibers/Baton.cpp \
+ fibers/detail/AtomicBatchDispatcher.cpp \
fibers/Fiber.cpp \
fibers/FiberManager.cpp \
fibers/FiberManagerMap.cpp \
optEntries_.reserve(numEntries);
}
- void setError(std::string message) {
- optErrorMessage_ = std::move(message);
+ void setExceptionWrapper(folly::exception_wrapper&& exWrapper) {
+ exceptionWrapper_ = std::move(exWrapper);
}
void setExpectedCount(size_t expectedCount) {
+ assert(expectedCount_ == 0 || !"expectedCount_ being set more than once");
expectedCount_ = expectedCount;
+ optEntries_.resize(expectedCount_);
}
Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
optEntries_.resize(sequenceNumber + 1);
}
folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
- if (optEntry) {
- throw std::logic_error(
- "Cannot have multiple inputs with same token sequence number");
- }
+ assert(!optEntry || !"Multiple inputs have the same token sequence number");
optEntry = Entry(std::move(input));
return optEntry->promise.getFuture();
}
private:
- void setExceptionResults(std::exception_ptr eptr) {
- auto exceptionWrapper = exception_wrapper(eptr);
+ void setExceptionResults(const folly::exception_wrapper& exceptionWrapper) {
for (auto& optEntry : optEntries_) {
if (optEntry) {
optEntry->promise.setException(exceptionWrapper);
}
}
+ void setExceptionResults(std::exception_ptr eptr) {
+ auto exceptionWrapper = exception_wrapper(eptr);
+ return setExceptionResults(exceptionWrapper);
+ }
+
template <typename TException>
void setExceptionResults(
const TException& ex,
std::exception_ptr eptr = std::exception_ptr()) {
auto exceptionWrapper =
eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
- for (auto& optEntry : optEntries_) {
- if (optEntry) {
- optEntry->promise.setException(exceptionWrapper);
- }
- }
+ return setExceptionResults(exceptionWrapper);
}
void fulfillPromises() {
try {
// If an error message is set, set all promises to exception with message
- if (optErrorMessage_) {
- auto ex = std::logic_error(*optErrorMessage_);
- return setExceptionResults(std::move(ex));
+ if (exceptionWrapper_) {
+ return setExceptionResults(exceptionWrapper_);
}
- // Create inputs vector and validate entries count same as expectedCount_
- std::vector<InputT> inputs;
- inputs.reserve(expectedCount_);
- bool allEntriesFound = (optEntries_.size() == expectedCount_);
- if (allEntriesFound) {
- for (auto& optEntry : optEntries_) {
- if (!optEntry) {
- allEntriesFound = false;
- break;
- }
- inputs.emplace_back(std::move(optEntry->input));
+ // Validate entries count same as expectedCount_
+ assert(
+ optEntries_.size() == expectedCount_ ||
+ !"Entries vector did not have expected size");
+ std::vector<size_t> vecTokensNotDispatched;
+ for (size_t i = 0; i < expectedCount_; ++i) {
+ if (!optEntries_[i]) {
+ vecTokensNotDispatched.push_back(i);
}
}
- if (!allEntriesFound) {
- auto ex = std::logic_error(
- "One or more input tokens destroyed before calling dispatch");
- return setExceptionResults(std::move(ex));
+ if (!vecTokensNotDispatched.empty()) {
+ return setExceptionResults(ABDTokenNotDispatchedException(
+ detail::createABDTokenNotDispatchedExMsg(vecTokensNotDispatched)));
+ }
+
+ // Create the inputs vector
+ std::vector<InputT> inputs;
+ inputs.reserve(expectedCount_);
+ for (auto& optEntry : optEntries_) {
+ inputs.emplace_back(std::move(optEntry->input));
}
// Call the user provided batch dispatch function to get all results
// and make sure that we have the expected number of results returned
auto results = dispatchFunction_(std::move(inputs));
if (results.size() != expectedCount_) {
- auto ex = std::logic_error(
- "Unexpected number of results returned from dispatch function");
- return setExceptionResults(std::move(ex));
+ return setExceptionResults(
+ ABDUsageException(detail::createUnexpectedNumResultsABDUsageExMsg(
+ expectedCount_, results.size())));
}
// Fulfill the promises with the results from the batch dispatch
optEntries_[i]->promise.setValue(std::move(results[i]));
}
} catch (const std::exception& ex) {
+ // Set exceptions thrown when executing the user provided dispatch func
return setExceptionResults(ex, std::current_exception());
} catch (...) {
+ // Set exceptions thrown when executing the user provided dispatch func
return setExceptionResults(std::current_exception());
}
}
size_t expectedCount_;
DispatchFunctionT dispatchFunction_;
std::vector<folly::Optional<Entry>> optEntries_;
- folly::Optional<std::string> optErrorMessage_;
+ folly::exception_wrapper exceptionWrapper_;
};
template <typename InputT, typename ResultT>
InputT input) {
auto baton = std::move(baton_);
if (!baton) {
- throw std::logic_error(
+ throw ABDUsageException(
"Dispatch called more than once on the same Token object");
}
return baton->getFutureResult(std::move(input), sequenceNumber_);
template <typename InputT, typename ResultT>
AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
if (baton_) {
- baton_->setError(
- "AtomicBatchDispatcher destroyed before commit() was called on it");
+ // Set error here rather than throw because we do not want to throw from
+ // the destructor of AtomicBatchDispatcher
+ baton_->setExceptionWrapper(
+ folly::make_exception_wrapper<ABDCommitNotCalledException>());
commit();
}
}
template <typename InputT, typename ResultT>
void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
if (!baton_) {
- throw std::logic_error("Cannot call reserve(....) after calling commit()");
+ throw ABDUsageException("Cannot call reserve(....) after calling commit()");
}
baton_->reserve(numEntries);
}
template <typename InputT, typename ResultT>
auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
if (!baton_) {
- throw std::logic_error("Cannot issue more tokens after calling commit()");
+ throw ABDUsageException("Cannot issue more tokens after calling commit()");
}
return Token(baton_, numTokensIssued_++);
}
void AtomicBatchDispatcher<InputT, ResultT>::commit() {
auto baton = std::move(baton_);
if (!baton) {
- throw std::logic_error(
+ throw ABDUsageException(
"Cannot call commit() more than once on the same dispatcher");
}
baton->setExpectedCount(numTokensIssued_);
#include <folly/Function.h>
#include <folly/Optional.h>
+#include <folly/fibers/detail/AtomicBatchDispatcher.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <memory>
+#include <stdexcept>
+#include <string>
#include <utility>
#include <vector>
namespace folly {
namespace fibers {
+/**
+ * An exception class that gets thrown when the AtomicBatchDispatcher is used
+ * incorrectly. This is indicative of a bug in the user code.
+ * Examples are, multiple dispatch calls on the same token, trying to get more
+ * tokens from the dispatcher after commit has been called, etc.
+ */
+class ABDUsageException : public std::logic_error {
+ using std::logic_error::logic_error;
+};
+
+/**
+ * An exception class that gets set on the promise for dispatched tokens, when
+ * the AtomicBatchDispatcher was destroyed before commit was called on it.
+ */
+class ABDCommitNotCalledException : public std::runtime_error {
+ public:
+ ABDCommitNotCalledException()
+ : std::runtime_error(
+ "AtomicBatchDispatcher destroyed before commit() was called") {}
+};
+
+/**
+ * An exception class that gets set on the promise for dispatched tokens, when
+ * one or more other tokens in the batch were destroyed before dispatch was
+ * called on them.
+ * Only here so that the caller can distinguish the real failure cause
+ * rather than these subsequently thrown exceptions.
+ */
+class ABDTokenNotDispatchedException : public std::runtime_error {
+ using std::runtime_error::runtime_error;
+};
+
/**
* AtomicBatchDispatcher should be used if you want to process fiber tasks in
* parallel, but require to synchronize them at some point. The canonical
* 1) The dispatcher is destroyed before calling commit on it, for example
* because the user forgot to call commit OR an exception was thrown
* in user code before the call to commit:
- * - The future ResultT has an exception of type std::logic_error set for all
- * tokens that were issued by the dispatcher (once all tokens are either
- * destroyed or have called dispatch)
+ * - The future ResultT has an exception of type ABDCommitNotCalledException
+ * set for all tokens that were issued by the dispatcher (once all tokens
+ * are either destroyed or have called dispatch)
* 2) Calling the dispatch function more than once on the same Token object
* (or a moved version of the same Token):
* - Subsequent calls to dispatch (after the first one) will throw an
- * std::logic_error exception (the batch itself will not have any errors
+ * ABDUsageException exception (the batch itself will not have any errors
* and will get processed)
* 3) One/more of the Tokens issued are destroyed before calling dispatch on
* it/them:
- * - The future ResultT has an exception of type std::logic_error set for all
+ * - The future ResultT has an ABDTokenNotDispatchedException set for all
* tokens that were issued by the dispatcher (once all tokens are either
* destroyed or have called dispatch)
* 4) dispatcher.getToken() is called after calling dispatcher.commit()
- * - the call to getToken() will throw an std::logic_error exception
+ * - the call to getToken() will throw an ABDUsageException exception
* (the batch itself will not have any errors and will get processed).
* 5) All tokens were issued and called dispatch, the user provided batch
* dispatch function is called, but that function throws any exception.
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AtomicBatchDispatcher.h"
+#include <folly/Format.h>
+
+namespace folly {
+namespace fibers {
+namespace detail {
+
+std::string createABDTokenNotDispatchedExMsg(
+ const std::vector<size_t>& vecTokensNotDispatched) {
+ size_t numTokensNotDispatched = vecTokensNotDispatched.size();
+ assert(numTokensNotDispatched > 0);
+ size_t numSeqNumToPrint =
+ (numTokensNotDispatched > 10 ? 10 : numTokensNotDispatched);
+ std::string strInputsNotFound =
+ folly::sformat("{}", vecTokensNotDispatched[0]);
+ for (size_t i = 1; i < numSeqNumToPrint; ++i) {
+ strInputsNotFound += folly::sformat(", {}", vecTokensNotDispatched[i]);
+ }
+ if (numSeqNumToPrint < numTokensNotDispatched) {
+ strInputsNotFound += "...";
+ }
+ return folly::sformat(
+ "{} input tokens (seq nums: {}) destroyed before calling dispatch",
+ numTokensNotDispatched,
+ strInputsNotFound);
+}
+
+std::string createUnexpectedNumResultsABDUsageExMsg(
+ size_t numExpectedResults,
+ size_t numActualResults) {
+ return folly::sformat(
+ "Unexpected number of results ({}) returned from dispatch function, "
+ "expected ({})",
+ numActualResults,
+ numExpectedResults);
+}
+
+} // namespace detail
+} // namespace fibers
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <vector>
+
+namespace folly {
+namespace fibers {
+namespace detail {
+
+std::string createABDTokenNotDispatchedExMsg(
+ const std::vector<size_t>& vecTokensNotDispatched);
+
+std::string createUnexpectedNumResultsABDUsageExMsg(
+ size_t numExpectedResults,
+ size_t numActualResults);
+
+} // namespace detail
+} // namespace fibers
+} // namespace folly
if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
if (i == problemIndex) {
- EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
+ EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
}
}
} catch (...) {
/* User code handles the exception and does not exit process */
}
evb.loop();
- validateResults<std::logic_error>(results, COUNT);
+ validateResults<ABDCommitNotCalledException>(results, COUNT);
}
TEST(FiberManager, ABD_PreprocessingFailureTest) {
dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
atomicBatchDispatcher.commit();
evb.loop();
- validateResults<std::logic_error>(results, COUNT - 1);
+ validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
}
TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
atomicBatchDispatcher.commit();
- EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
dispatchJobs(executor, jobs, results);
- EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
evb.loop();
validateResults(results, COUNT);
- EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
}
TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {