2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/Function.h>
19 #include <folly/Optional.h>
20 #include <folly/futures/Future.h>
21 #include <folly/futures/Promise.h>
30 * AtomicBatchDispatcher should be used if you want to process fiber tasks in
31 * parallel, but require to synchronize them at some point. The canonical
32 * example is to create a database transaction dispatch round. This API notably
33 * enforces that all tasks in the batch have reached the synchronization point
34 * before the user provided dispatch function is called with all the inputs
35 * provided in one function call. It also provides a guarantee that the inputs
36 * in the vector of inputs passed to the user provided dispatch function will be
37 * in the same order as the order in which the token for the job was issued.
39 * Use this when you want all the inputs in the batch to be processed by a
40 * single function call to the user provided dispatch function.
41 * The user provided dispatch function takes a vector of InputT as input and
42 * returns a vector of ResultT.
43 * To use an AtomicBatchDispatcher, create it by providing a dispatch function:
44 * TO EITHER the constructor of the AtomicBatchDispatcher class
45 * (can call reserve method on the dispatcher to reserve space (for number of
47 * OR the createAtomicBatchDispatcher function in folly::fibers namespace
48 * (optionally specify an initial capacity (for number of inputs expected)).
49 * The AtomicBatchDispatcher object created using this call (dispatcher),
50 * is the only object that can issue tokens (Token objects) that are used to
51 * add an input to the batch. A single Token is issued when the user calls
52 * the getToken function on the dispatcher.
53 * Token objects cannot be copied (can only be moved). User can call the public
54 * dispatch function on the Token providing a single input value. The dispatch
55 * function returns a folly::Future<ResultT> value that the user can then wait
56 * on to obtain a ResultT value. The ResultT value will only be available once
57 * the dispatch function has been called on all the Tokens in the batch and the
58 * user has called dispatcher.commit() to indicate no more batched transactions
60 * User code pertaining to a task can be run between the point where a token for
61 * the task has been issued and before calling the dispatch function on the
62 * token. Since this code can potentially throw, the token issued for a task
63 * should be moved into this processing code in such a way that if an exception
64 * is thrown and then handled, the token object for the task is destroyed.
65 * The batch query dispatcher will wait until all tokens have either been
66 * destroyed or have had the dispatch function called on them. Leaking an
67 * issued token will cause the batch dispatch to wait forever to happen.
69 * The AtomicBatchDispatcher object is referred to as the dispatcher below.
72 * 1) The dispatcher is destroyed before calling commit on it, for example
73 * because the user forgot to call commit OR an exception was thrown
74 * in user code before the call to commit:
75 * - The future ResultT has an exception of type std::logic_error set for all
76 * tokens that were issued by the dispatcher (once all tokens are either
77 * destroyed or have called dispatch)
78 * 2) Calling the dispatch function more than once on the same Token object
79 * (or a moved version of the same Token):
80 * - Subsequent calls to dispatch (after the first one) will throw an
81 * std::logic_error exception (the batch itself will not have any errors
82 * and will get processed)
83 * 3) One/more of the Tokens issued are destroyed before calling dispatch on
85 * - The future ResultT has an exception of type std::logic_error set for all
86 * tokens that were issued by the dispatcher (once all tokens are either
87 * destroyed or have called dispatch)
88 * 4) dispatcher.getToken() is called after calling dispatcher.commit()
89 * - the call to getToken() will throw an std::logic_error exception
90 * (the batch itself will not have any errors and will get processed).
91 * 5) All tokens were issued and called dispatch, the user provided batch
92 * dispatch function is called, but that function throws any exception.
93 * - The future ResultT has exception for all tokens that were issued by
94 * the dispatcher. The result will contain the wrapped user exception.
96 * EXAMPLE (There are other ways to achieve this, but this is one example):
97 * - User creates an AtomicBatchDispatcher on stack
99 * folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
100 * - User creates "count" number of token objects by calling "getToken" count
102 * std::vector<Job> jobs;
103 * for (size_t i = 0; i < count; ++i) {
104 * auto token = dispatcher.getToken();
105 * jobs.push_back(Job(std::move(token), singleInputValueToProcess);
107 * - User calls commit() on the dispatcher to indicate that no new tokens will
108 * be issued for this batch
109 * dispatcher.commit();
110 * - Use any single threaded executor that will process the jobs
111 * - On each execution (fiber) preprocess a single "Job" that has been moved in
112 * from the original vector "jobs". This way if the preprocessing throws
113 * the Job object being processed is destroyed and so is the token.
114 * - On each execution (fiber) call the dispatch on the token
115 * auto future = job.token.dispatch(job.input);
116 * - Save the future returned so that eventually you can wait on the results
119 * result = future.value();
120 * // future.hasValue() is true
122 * // future.hasException() is true
123 * <DO WHATEVER YOU WANT IN CASE OF ERROR> }
127 * - AtomicBatchDispatcher is not thread safe.
128 * - Works for executors that run tasks on a single thread.
130 template <typename InputT, typename ResultT>
131 class AtomicBatchDispatcher {
133 struct DispatchBaton;
134 friend struct DispatchBaton;
137 using DispatchFunctionT =
138 folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
142 explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
144 Future<ResultT> dispatch(InputT input);
146 // Allow moving a Token object
147 Token(Token&&) = default;
148 Token& operator=(Token&&) = default;
150 size_t sequenceNumber() const;
153 // Disallow copying a Token object
154 Token(const Token&) = delete;
155 Token& operator=(const Token&) = delete;
157 std::shared_ptr<DispatchBaton> baton_;
158 size_t sequenceNumber_;
161 explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
163 ~AtomicBatchDispatcher();
165 // numEntries is a *hint* about the number of inputs to expect:
166 // - It is used purely to reserve space for storing vector of inputs etc.,
167 // so that reeallocation and move copy are reduced / not needed.
168 // - It is provided purely for performance reasons
169 void reserve(size_t numEntries);
175 // Allow moving an AtomicBatchDispatcher object
176 AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
177 AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
180 // Disallow copying an AtomicBatchDispatcher object
181 AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
182 AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
184 size_t numTokensIssued_;
185 std::shared_ptr<DispatchBaton> baton_;
188 // initialCapacity is a *hint* about the number of inputs to expect:
189 // - It is used purely to reserve space for storing vector of inputs etc.,
190 // so that reeallocation and move copy are reduced / not needed.
191 // - It is provided purely for performance reasons
192 template <typename InputT, typename ResultT>
193 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
194 folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
195 size_t initialCapacity = 0);
197 } // namespace fibers
200 #include <folly/fibers/AtomicBatchDispatcher-inl.h>