folly: AsyncIO: add debuging helper
[folly.git] / folly / experimental / io / AsyncIO.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_IO_ASYNCIO_H_
18 #define FOLLY_IO_ASYNCIO_H_
19
20 #include <sys/types.h>
21 #include <sys/uio.h>
22 #include <libaio.h>
23
24 #include <cstdint>
25 #include <deque>
26 #include <functional>
27 #include <ostream>
28 #include <utility>
29 #include <vector>
30
31 #include <boost/noncopyable.hpp>
32
33 #include "folly/Portability.h"
34 #include "folly/Range.h"
35
36 namespace folly {
37
38 /**
39  * An AsyncIOOp represents a pending operation.  You may set a notification
40  * callback or you may use this class's methods directly.
41  *
42  * The op must remain allocated until completion.
43  */
44 class AsyncIOOp : private boost::noncopyable {
45   friend class AsyncIO;
46   friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
47  public:
48   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
49
50   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
51   ~AsyncIOOp();
52
53   // There would be a cancel() method here if Linux AIO actually implemented
54   // it.  But let's not get your hopes up.
55
56   enum class State {
57     UNINITIALIZED,
58     INITIALIZED,
59     PENDING,
60     COMPLETED
61   };
62
63   /**
64    * Initiate a read request.
65    */
66   void pread(int fd, void* buf, size_t size, off_t start);
67   void pread(int fd, Range<unsigned char*> range, off_t start);
68   void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
69
70   /**
71    * Initiate a write request.
72    */
73   void pwrite(int fd, const void* buf, size_t size, off_t start);
74   void pwrite(int fd, Range<const unsigned char*> range, off_t start);
75   void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
76
77   /**
78    * Return the current operation state.
79    */
80   State state() const { return state_; }
81
82   /**
83    * Reset the operation for reuse.  It is an error to call reset() on
84    * an Op that is still pending.
85    */
86   void reset(NotificationCallback cb = NotificationCallback());
87
88   void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
89   const NotificationCallback& notificationCallback() const { return cb_; }
90
91   /**
92    * Retrieve the result of this operation.  Returns >=0 on success,
93    * -errno on failure (that is, using the Linux kernel error reporting
94    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
95    * throw a std::system_error in case of error instead.
96    *
97    * It is an error to call this if the Op hasn't yet started or is still
98    * pending.
99    */
100   ssize_t result() const;
101
102  private:
103   void init();
104   void start();
105   void complete(ssize_t result);
106
107   NotificationCallback cb_;
108   iocb iocb_;
109   State state_;
110   ssize_t result_;
111 };
112
113 std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
114 std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
115
116 /**
117  * C++ interface around Linux Async IO.
118  */
119 class AsyncIO : private boost::noncopyable {
120  public:
121   typedef AsyncIOOp Op;
122
123   enum PollMode {
124     NOT_POLLABLE,
125     POLLABLE
126   };
127
128   /**
129    * Create an AsyncIO context capable of holding at most 'capacity' pending
130    * requests at the same time.  As requests complete, others can be scheduled,
131    * as long as this limit is not exceeded.
132    *
133    * Note: the maximum number of allowed concurrent requests is controlled
134    * by the fs.aio-max-nr sysctl, the default value is usually 64K.
135    *
136    * If pollMode is POLLABLE, pollFd() will return a file descriptor that
137    * can be passed to poll / epoll / select and will become readable when
138    * any IOs on this AsyncIO have completed.  If you do this, you must use
139    * pollCompleted() instead of wait() -- do not read from the pollFd()
140    * file descriptor directly.
141    */
142   explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
143   ~AsyncIO();
144
145   /**
146    * Wait for at least minRequests to complete.  Returns the requests that
147    * have completed; the returned range is valid until the next call to
148    * wait().  minRequests may be 0 to not block.
149    */
150   Range<Op**> wait(size_t minRequests);
151
152   /**
153    * Return the number of pending requests.
154    */
155   size_t pending() const { return pending_; }
156
157   /**
158    * Return the maximum number of requests that can be kept outstanding
159    * at any one time.
160    */
161   size_t capacity() const { return capacity_; }
162
163   /**
164    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
165    * and will become readable when any async IO operations have completed.
166    * If NOT_POLLABLE, return -1.
167    */
168   int pollFd() const { return pollFd_; }
169
170   /**
171    * If POLLABLE, call instead of wait after the file descriptor returned
172    * by pollFd() became readable.  The returned range is valid until the next
173    * call to pollCompleted().
174    */
175   Range<Op**> pollCompleted();
176
177   /**
178    * Submit an op for execution.
179    */
180   void submit(Op* op);
181
182  private:
183   void initializeContext();
184   Range<Op**> doWait(size_t minRequests, size_t maxRequests);
185
186   io_context_t ctx_;
187   size_t pending_;
188   const size_t capacity_;
189   int pollFd_;
190   std::vector<Op*> completed_;
191 };
192
193 /**
194  * Wrapper around AsyncIO that allows you to schedule more requests than
195  * the AsyncIO's object capacity.  Other requests are queued and processed
196  * in a FIFO order.
197  */
198 class AsyncIOQueue {
199  public:
200   /**
201    * Create a queue, using the given AsyncIO object.
202    * The AsyncIO object may not be used by anything else until the
203    * queue is destroyed.
204    */
205   explicit AsyncIOQueue(AsyncIO* asyncIO);
206   ~AsyncIOQueue();
207
208   size_t queued() const { return queue_.size(); }
209
210   /**
211    * Submit an op to the AsyncIO queue.  The op will be queued until
212    * the AsyncIO object has room.
213    */
214   void submit(AsyncIOOp* op);
215
216   /**
217    * Submit a delayed op to the AsyncIO queue; this allows you to postpone
218    * creation of the Op (which may require allocating memory, etc) until
219    * the AsyncIO object has room.
220    */
221   typedef std::function<AsyncIOOp*()> OpFactory;
222   void submit(OpFactory op);
223  private:
224   void onCompleted(AsyncIOOp* op);
225   void maybeDequeue();
226
227   AsyncIO* asyncIO_;
228
229   std::deque<OpFactory> queue_;
230 };
231
232 }  // namespace folly
233
234 #endif /* FOLLY_IO_ASYNCIO_H_ */
235