AsyncIO: CHECK preconditions
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <cerrno>
20
21 #include <glog/logging.h>
22
23 #include "folly/Exception.h"
24 #include "folly/Likely.h"
25 #include "folly/String.h"
26 #include "folly/eventfd.h"
27
28 namespace folly {
29
30 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
31   : ctx_(0),
32     pending_(0),
33     capacity_(capacity),
34     pollFd_(-1) {
35   CHECK_GT(capacity_, 0);
36   completed_.reserve(capacity_);
37   if (pollMode == POLLABLE) {
38     pollFd_ = eventfd(0, EFD_NONBLOCK);
39     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
40   }
41 }
42
43 AsyncIO::~AsyncIO() {
44   CHECK_EQ(pending_, 0);
45   if (ctx_) {
46     int rc = io_queue_release(ctx_);
47     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
48   }
49   if (pollFd_ != -1) {
50     CHECK_ERR(close(pollFd_));
51   }
52 }
53
54 void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
55   iocb cb;
56   io_prep_pread(&cb, fd, buf, size, start);
57   submit(op, &cb);
58 }
59
60 void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
61                     off_t start) {
62   pread(op, fd, range.begin(), range.size(), start);
63 }
64
65 void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
66                      off_t start) {
67   iocb cb;
68   io_prep_preadv(&cb, fd, iov, iovcnt, start);
69   submit(op, &cb);
70 }
71
72 void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
73                      off_t start) {
74   iocb cb;
75   io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
76   submit(op, &cb);
77 }
78
79 void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
80                      off_t start) {
81   pwrite(op, fd, range.begin(), range.size(), start);
82 }
83
84 void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
85                       off_t start) {
86   iocb cb;
87   io_prep_pwritev(&cb, fd, iov, iovcnt, start);
88   submit(op, &cb);
89 }
90
91 void AsyncIO::initializeContext() {
92   if (!ctx_) {
93     int rc = io_queue_init(capacity_, &ctx_);
94     // returns negative errno
95     checkKernelError(rc, "AsyncIO: io_queue_init failed");
96     DCHECK(ctx_);
97   }
98 }
99
100 void AsyncIO::submit(Op* op, iocb* cb) {
101   CHECK_EQ(op->state(), Op::UNINITIALIZED);
102   CHECK_LT(pending_, capacity_) << "too many pending requests";
103   initializeContext();  // on demand
104   cb->data = op;
105   if (pollFd_ != -1) {
106     io_set_eventfd(cb, pollFd_);
107   }
108   int rc = io_submit(ctx_, 1, &cb);
109   checkKernelError(rc, "AsyncIO: io_submit failed");
110   DCHECK_EQ(rc, 1);
111   op->start();
112   ++pending_;
113 }
114
115 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
116   CHECK(ctx_);
117   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
118   return doWait(minRequests, pending_);
119 }
120
121 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
122   CHECK(ctx_);
123   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
124   uint64_t numEvents;
125   // This sets the eventFd counter to 0, see
126   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
127   ssize_t rc;
128   do {
129     rc = ::read(pollFd_, &numEvents, 8);
130   } while (rc == -1 && errno == EINTR);
131   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
132     return Range<Op**>();  // nothing completed
133   }
134   checkUnixError(rc, "AsyncIO: read from event fd failed");
135   DCHECK_EQ(rc, 8);
136
137   DCHECK_GT(numEvents, 0);
138   DCHECK_LE(numEvents, pending_);
139
140   // Don't reap more than numEvents, as we've just reset the counter to 0.
141   return doWait(numEvents, numEvents);
142 }
143
144 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
145   io_event events[pending_];
146   int count;
147   do {
148     // Wait forever
149     count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
150   } while (count == -EINTR);
151   checkKernelError(count, "AsyncIO: io_getevents failed");
152   DCHECK_GE(count, minRequests);  // the man page says so
153   DCHECK_LE(count, pending_);
154
155   completed_.clear();
156   if (count == 0) {
157     return folly::Range<Op**>();
158   }
159
160   for (size_t i = 0; i < count; ++i) {
161     Op* op = static_cast<Op*>(events[i].data);
162     DCHECK(op);
163     op->complete(events[i].res);
164     completed_.push_back(op);
165   }
166   pending_ -= count;
167
168   return folly::Range<Op**>(&completed_.front(), count);
169 }
170
171 AsyncIO::Op::Op()
172   : state_(UNINITIALIZED),
173     result_(-EINVAL) {
174 }
175
176 void AsyncIO::Op::reset() {
177   CHECK_NE(state_, PENDING);
178   state_ = UNINITIALIZED;
179   result_ = -EINVAL;
180 }
181
182 AsyncIO::Op::~Op() {
183   CHECK_NE(state_, PENDING);
184 }
185
186 void AsyncIO::Op::start() {
187   DCHECK_EQ(state_, UNINITIALIZED);
188   state_ = PENDING;
189 }
190
191 void AsyncIO::Op::complete(ssize_t result) {
192   DCHECK_EQ(state_, PENDING);
193   state_ = COMPLETED;
194   result_ = result;
195   onCompleted();
196 }
197
198 void AsyncIO::Op::onCompleted() { }  // default: do nothing
199
200 ssize_t AsyncIO::Op::result() const {
201   CHECK_EQ(state_, COMPLETED);
202   return result_;
203 }
204
205 CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
206
207 CallbackOp::~CallbackOp() { }
208
209 CallbackOp* CallbackOp::make(Callback&& callback) {
210   // Ensure created on the heap
211   return new CallbackOp(std::move(callback));
212 }
213
214 void CallbackOp::onCompleted() {
215   callback_(result());
216   delete this;
217 }
218
219 }  // namespace folly
220