2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 template <typename InputT, typename ResultT>
20 struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
21 DispatchBaton(DispatchFunctionT&& dispatchFunction)
22 : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
28 void reserve(size_t numEntries) {
29 optEntries_.reserve(numEntries);
32 void setError(std::string message) {
33 optErrorMessage_ = std::move(message);
36 void setExpectedCount(size_t expectedCount) {
37 expectedCount_ = expectedCount;
40 Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
41 if (sequenceNumber >= optEntries_.size()) {
42 optEntries_.resize(sequenceNumber + 1);
44 folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
46 throw std::logic_error(
47 "Cannot have multiple inputs with same token sequence number");
49 optEntry = Entry(std::move(input));
50 return optEntry->promise.getFuture();
54 void setExceptionResults(std::exception_ptr eptr) {
55 auto exceptionWrapper = exception_wrapper(eptr);
56 for (auto& optEntry : optEntries_) {
58 optEntry->promise.setException(exceptionWrapper);
63 template <typename TException>
64 void setExceptionResults(
66 std::exception_ptr eptr = std::exception_ptr()) {
67 auto exceptionWrapper =
68 eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
69 for (auto& optEntry : optEntries_) {
71 optEntry->promise.setException(exceptionWrapper);
76 void fulfillPromises() {
78 // If an error message is set, set all promises to exception with message
79 if (optErrorMessage_) {
80 auto ex = std::logic_error(*optErrorMessage_);
81 return setExceptionResults(std::move(ex));
84 // Create inputs vector and validate entries count same as expectedCount_
85 std::vector<InputT> inputs;
86 inputs.reserve(expectedCount_);
87 bool allEntriesFound = (optEntries_.size() == expectedCount_);
88 if (allEntriesFound) {
89 for (auto& optEntry : optEntries_) {
91 allEntriesFound = false;
94 inputs.emplace_back(std::move(optEntry->input));
97 if (!allEntriesFound) {
98 auto ex = std::logic_error(
99 "One or more input tokens destroyed before calling dispatch");
100 return setExceptionResults(std::move(ex));
103 // Call the user provided batch dispatch function to get all results
104 // and make sure that we have the expected number of results returned
105 auto results = dispatchFunction_(std::move(inputs));
106 if (results.size() != expectedCount_) {
107 auto ex = std::logic_error(
108 "Unexpected number of results returned from dispatch function");
109 return setExceptionResults(std::move(ex));
112 // Fulfill the promises with the results from the batch dispatch
113 for (size_t i = 0; i < expectedCount_; ++i) {
114 optEntries_[i]->promise.setValue(std::move(results[i]));
116 } catch (const std::exception& ex) {
117 return setExceptionResults(ex, std::current_exception());
119 return setExceptionResults(std::current_exception());
125 folly::Promise<ResultT> promise;
127 Entry(Entry&& other) noexcept
128 : input(std::move(other.input)), promise(std::move(other.promise)) {}
130 Entry& operator=(Entry&& other) noexcept {
131 input = std::move(other.input);
132 promise = std::move(other.promise);
136 explicit Entry(InputT&& input) : input(std::move(input)) {}
139 size_t expectedCount_;
140 DispatchFunctionT dispatchFunction_;
141 std::vector<folly::Optional<Entry>> optEntries_;
142 folly::Optional<std::string> optErrorMessage_;
145 template <typename InputT, typename ResultT>
146 AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
147 std::shared_ptr<DispatchBaton> baton,
148 size_t sequenceNumber)
149 : baton_(std::move(baton)), sequenceNumber_(sequenceNumber) {}
151 template <typename InputT, typename ResultT>
152 size_t AtomicBatchDispatcher<InputT, ResultT>::Token::sequenceNumber() const {
153 return sequenceNumber_;
156 template <typename InputT, typename ResultT>
157 Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
159 auto baton = std::move(baton_);
161 throw std::logic_error(
162 "Dispatch called more than once on the same Token object");
164 return baton->getFutureResult(std::move(input), sequenceNumber_);
167 template <typename InputT, typename ResultT>
168 AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
169 DispatchFunctionT&& dispatchFunc)
170 : numTokensIssued_(0),
171 baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
173 template <typename InputT, typename ResultT>
174 AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
177 "AtomicBatchDispatcher destroyed before commit() was called on it");
182 template <typename InputT, typename ResultT>
183 void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
185 throw std::logic_error("Cannot call reserve(....) after calling commit()");
187 baton_->reserve(numEntries);
190 template <typename InputT, typename ResultT>
191 auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
193 throw std::logic_error("Cannot issue more tokens after calling commit()");
195 return Token(baton_, numTokensIssued_++);
198 template <typename InputT, typename ResultT>
199 void AtomicBatchDispatcher<InputT, ResultT>::commit() {
200 auto baton = std::move(baton_);
202 throw std::logic_error(
203 "Cannot call commit() more than once on the same dispatcher");
205 baton->setExpectedCount(numTokensIssued_);
208 template <typename InputT, typename ResultT>
209 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
210 folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
211 size_t initialCapacity) {
212 auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
213 if (initialCapacity) {
214 abd.reserve(initialCapacity);
219 } // namespace fibers