Rework folly::AsyncIO interface to make it easier for other classes to use Op
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <cerrno>
20
21 #include <boost/intrusive/parent_from_member.hpp>
22 #include <glog/logging.h>
23
24 #include "folly/Exception.h"
25 #include "folly/Likely.h"
26 #include "folly/String.h"
27 #include "folly/eventfd.h"
28
29 namespace folly {
30
31 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
32   : cb_(std::move(cb)),
33     state_(UNINITIALIZED),
34     result_(-EINVAL) {
35   memset(&iocb_, 0, sizeof(iocb_));
36 }
37
38 void AsyncIOOp::reset(NotificationCallback cb) {
39   CHECK_NE(state_, PENDING);
40   cb_ = std::move(cb);
41   state_ = UNINITIALIZED;
42   result_ = -EINVAL;
43   memset(&iocb_, 0, sizeof(iocb_));
44 }
45
46 AsyncIOOp::~AsyncIOOp() {
47   CHECK_NE(state_, PENDING);
48 }
49
50 void AsyncIOOp::start() {
51   DCHECK_EQ(state_, INITIALIZED);
52   state_ = PENDING;
53 }
54
55 void AsyncIOOp::complete(ssize_t result) {
56   DCHECK_EQ(state_, PENDING);
57   state_ = COMPLETED;
58   result_ = result;
59   if (cb_) {
60     cb_(this);
61   }
62 }
63
64 ssize_t AsyncIOOp::result() const {
65   CHECK_EQ(state_, COMPLETED);
66   return result_;
67 }
68
69 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
70   init();
71   io_prep_pread(&iocb_, fd, buf, size, start);
72 }
73
74 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
75   pread(fd, range.begin(), range.size(), start);
76 }
77
78 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
79   init();
80   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
81 }
82
83 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
84   init();
85   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
86 }
87
88 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
89   pwrite(fd, range.begin(), range.size(), start);
90 }
91
92 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
93   init();
94   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
95 }
96
97 void AsyncIOOp::init() {
98   CHECK_EQ(state_, UNINITIALIZED);
99   state_ = INITIALIZED;
100 }
101
102 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
103   : ctx_(0),
104     pending_(0),
105     capacity_(capacity),
106     pollFd_(-1) {
107   CHECK_GT(capacity_, 0);
108   completed_.reserve(capacity_);
109   if (pollMode == POLLABLE) {
110     pollFd_ = eventfd(0, EFD_NONBLOCK);
111     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
112   }
113 }
114
115 AsyncIO::~AsyncIO() {
116   CHECK_EQ(pending_, 0);
117   if (ctx_) {
118     int rc = io_queue_release(ctx_);
119     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
120   }
121   if (pollFd_ != -1) {
122     CHECK_ERR(close(pollFd_));
123   }
124 }
125
126 void AsyncIO::initializeContext() {
127   if (!ctx_) {
128     int rc = io_queue_init(capacity_, &ctx_);
129     // returns negative errno
130     checkKernelError(rc, "AsyncIO: io_queue_init failed");
131     DCHECK(ctx_);
132   }
133 }
134
135 void AsyncIO::submit(Op* op) {
136   CHECK_EQ(op->state(), Op::INITIALIZED);
137   CHECK_LT(pending_, capacity_) << "too many pending requests";
138   initializeContext();  // on demand
139   iocb* cb = &op->iocb_;
140   cb->data = nullptr;  // unused
141   if (pollFd_ != -1) {
142     io_set_eventfd(cb, pollFd_);
143   }
144   int rc = io_submit(ctx_, 1, &cb);
145   checkKernelError(rc, "AsyncIO: io_submit failed");
146   DCHECK_EQ(rc, 1);
147   op->start();
148   ++pending_;
149 }
150
151 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
152   CHECK(ctx_);
153   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
154   return doWait(minRequests, pending_);
155 }
156
157 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
158   CHECK(ctx_);
159   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
160   uint64_t numEvents;
161   // This sets the eventFd counter to 0, see
162   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
163   ssize_t rc;
164   do {
165     rc = ::read(pollFd_, &numEvents, 8);
166   } while (rc == -1 && errno == EINTR);
167   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
168     return Range<Op**>();  // nothing completed
169   }
170   checkUnixError(rc, "AsyncIO: read from event fd failed");
171   DCHECK_EQ(rc, 8);
172
173   DCHECK_GT(numEvents, 0);
174   DCHECK_LE(numEvents, pending_);
175
176   // Don't reap more than numEvents, as we've just reset the counter to 0.
177   return doWait(numEvents, numEvents);
178 }
179
180 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
181   io_event events[pending_];
182   int count;
183   do {
184     // Wait forever
185     count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
186   } while (count == -EINTR);
187   checkKernelError(count, "AsyncIO: io_getevents failed");
188   DCHECK_GE(count, minRequests);  // the man page says so
189   DCHECK_LE(count, pending_);
190
191   completed_.clear();
192   if (count == 0) {
193     return folly::Range<Op**>();
194   }
195
196   for (size_t i = 0; i < count; ++i) {
197     DCHECK(events[i].obj);
198     Op* op = boost::intrusive::get_parent_from_member(
199         events[i].obj, &AsyncIOOp::iocb_);
200     --pending_;
201     op->complete(events[i].res);
202     completed_.push_back(op);
203   }
204
205   return folly::Range<Op**>(&completed_.front(), count);
206 }
207
208 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
209   : asyncIO_(asyncIO) {
210 }
211
212 AsyncIOQueue::~AsyncIOQueue() {
213   CHECK_EQ(asyncIO_->pending(), 0);
214 }
215
216 void AsyncIOQueue::submit(AsyncIOOp* op) {
217   submit([op]() { return op; });
218 }
219
220 void AsyncIOQueue::submit(OpFactory op) {
221   queue_.push_back(op);
222   maybeDequeue();
223 }
224
225 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
226   maybeDequeue();
227 }
228
229 void AsyncIOQueue::maybeDequeue() {
230   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
231     auto& opFactory = queue_.front();
232     auto op = opFactory();
233     queue_.pop_front();
234
235     // Interpose our completion callback
236     auto& nextCb = op->notificationCallback();
237     op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
238       this->onCompleted(op);
239       if (nextCb) nextCb(op);
240     });
241
242     asyncIO_->submit(op);
243   }
244 }
245
246 }  // namespace folly
247