2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <sys/types.h>
31 #include <boost/noncopyable.hpp>
33 #include <folly/Portability.h>
34 #include <folly/Range.h>
35 #include <folly/portability/SysUio.h>
40 * An AsyncIOOp represents a pending operation. You may set a notification
41 * callback or you may use this class's methods directly.
43 * The op must remain allocated until it is completed or canceled.
45 class AsyncIOOp : private boost::noncopyable {
47 friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
50 typedef std::function<void(AsyncIOOp*)> NotificationCallback;
52 explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
64 * Initiate a read request.
66 void pread(int fd, void* buf, size_t size, off_t start);
67 void pread(int fd, Range<unsigned char*> range, off_t start);
68 void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
71 * Initiate a write request.
73 void pwrite(int fd, const void* buf, size_t size, off_t start);
74 void pwrite(int fd, Range<const unsigned char*> range, off_t start);
75 void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
78 * Return the current operation state.
80 State state() const { return state_; }
83 * Reset the operation for reuse. It is an error to call reset() on
84 * an Op that is still pending.
86 void reset(NotificationCallback cb = NotificationCallback());
88 void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
89 const NotificationCallback& notificationCallback() const { return cb_; }
92 * Retrieve the result of this operation. Returns >=0 on success,
93 * -errno on failure (that is, using the Linux kernel error reporting
94 * conventions). Use checkKernelError (folly/Exception.h) on the result to
95 * throw a std::system_error in case of error instead.
97 * It is an error to call this if the Op hasn't completed.
99 ssize_t result() const;
104 void complete(ssize_t result);
107 NotificationCallback cb_;
113 std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
114 std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
117 * C++ interface around Linux Async IO.
119 class AsyncIO : private boost::noncopyable {
121 typedef AsyncIOOp Op;
129 * Create an AsyncIO context capable of holding at most 'capacity' pending
130 * requests at the same time. As requests complete, others can be scheduled,
131 * as long as this limit is not exceeded.
133 * Note: the maximum number of allowed concurrent requests is controlled
134 * by the fs.aio-max-nr sysctl, the default value is usually 64K.
136 * If pollMode is POLLABLE, pollFd() will return a file descriptor that
137 * can be passed to poll / epoll / select and will become readable when
138 * any IOs on this AsyncIO have completed. If you do this, you must use
139 * pollCompleted() instead of wait() -- do not read from the pollFd()
140 * file descriptor directly.
142 * You may use the same AsyncIO object from multiple threads, as long as
143 * there is only one concurrent caller of wait() / pollCompleted() / cancel()
144 * (perhaps by always calling it from the same thread, or by providing
145 * appropriate mutual exclusion). In this case, pending() returns a snapshot
146 * of the current number of pending requests.
148 explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
152 * Wait for at least minRequests to complete. Returns the requests that
153 * have completed; the returned range is valid until the next call to
154 * wait(). minRequests may be 0 to not block.
156 Range<Op**> wait(size_t minRequests);
159 * Cancel all pending requests and return them; the returned range is
160 * valid until the next call to cancel().
162 Range<Op**> cancel();
165 * Return the number of pending requests.
167 size_t pending() const { return pending_; }
170 * Return the maximum number of requests that can be kept outstanding
173 size_t capacity() const { return capacity_; }
176 * Return the accumulative number of submitted I/O, since this object
179 size_t totalSubmits() const { return submitted_; }
182 * If POLLABLE, return a file descriptor that can be passed to poll / epoll
183 * and will become readable when any async IO operations have completed.
184 * If NOT_POLLABLE, return -1.
186 int pollFd() const { return pollFd_; }
189 * If POLLABLE, call instead of wait after the file descriptor returned
190 * by pollFd() became readable. The returned range is valid until the next
191 * call to pollCompleted().
193 Range<Op**> pollCompleted();
196 * Submit an op for execution.
201 void decrementPending();
202 void initializeContext();
204 enum class WaitType { COMPLETE, CANCEL };
205 Range<AsyncIO::Op**> doWait(
209 std::vector<Op*>& result);
211 io_context_t ctx_{nullptr};
212 std::atomic<bool> ctxSet_{false};
213 std::mutex initMutex_;
215 std::atomic<size_t> pending_{0};
216 std::atomic<size_t> submitted_{0};
217 const size_t capacity_;
219 std::vector<Op*> completed_;
220 std::vector<Op*> canceled_;
224 * Wrapper around AsyncIO that allows you to schedule more requests than
225 * the AsyncIO's object capacity. Other requests are queued and processed
231 * Create a queue, using the given AsyncIO object.
232 * The AsyncIO object may not be used by anything else until the
233 * queue is destroyed.
235 explicit AsyncIOQueue(AsyncIO* asyncIO);
238 size_t queued() const { return queue_.size(); }
241 * Submit an op to the AsyncIO queue. The op will be queued until
242 * the AsyncIO object has room.
244 void submit(AsyncIOOp* op);
247 * Submit a delayed op to the AsyncIO queue; this allows you to postpone
248 * creation of the Op (which may require allocating memory, etc) until
249 * the AsyncIO object has room.
251 typedef std::function<AsyncIOOp*()> OpFactory;
252 void submit(OpFactory op);
255 void onCompleted(AsyncIOOp* op);
260 std::deque<OpFactory> queue_;