2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
21 #include <boost/intrusive/parent_from_member.hpp>
22 #include <glog/logging.h>
24 #include "folly/Exception.h"
25 #include "folly/Likely.h"
26 #include "folly/String.h"
27 #include "folly/eventfd.h"
31 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
33 state_(UNINITIALIZED),
35 memset(&iocb_, 0, sizeof(iocb_));
38 void AsyncIOOp::reset(NotificationCallback cb) {
39 CHECK_NE(state_, PENDING);
41 state_ = UNINITIALIZED;
43 memset(&iocb_, 0, sizeof(iocb_));
46 AsyncIOOp::~AsyncIOOp() {
47 CHECK_NE(state_, PENDING);
50 void AsyncIOOp::start() {
51 DCHECK_EQ(state_, INITIALIZED);
55 void AsyncIOOp::complete(ssize_t result) {
56 DCHECK_EQ(state_, PENDING);
64 ssize_t AsyncIOOp::result() const {
65 CHECK_EQ(state_, COMPLETED);
69 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
71 io_prep_pread(&iocb_, fd, buf, size, start);
74 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
75 pread(fd, range.begin(), range.size(), start);
78 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
80 io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
83 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
85 io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
88 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
89 pwrite(fd, range.begin(), range.size(), start);
92 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
94 io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
97 void AsyncIOOp::init() {
98 CHECK_EQ(state_, UNINITIALIZED);
102 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
107 CHECK_GT(capacity_, 0);
108 completed_.reserve(capacity_);
109 if (pollMode == POLLABLE) {
110 pollFd_ = eventfd(0, EFD_NONBLOCK);
111 checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
115 AsyncIO::~AsyncIO() {
116 CHECK_EQ(pending_, 0);
118 int rc = io_queue_release(ctx_);
119 CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
122 CHECK_ERR(close(pollFd_));
126 void AsyncIO::initializeContext() {
128 int rc = io_queue_init(capacity_, &ctx_);
129 // returns negative errno
130 checkKernelError(rc, "AsyncIO: io_queue_init failed");
135 void AsyncIO::submit(Op* op) {
136 CHECK_EQ(op->state(), Op::INITIALIZED);
137 CHECK_LT(pending_, capacity_) << "too many pending requests";
138 initializeContext(); // on demand
139 iocb* cb = &op->iocb_;
140 cb->data = nullptr; // unused
142 io_set_eventfd(cb, pollFd_);
144 int rc = io_submit(ctx_, 1, &cb);
145 checkKernelError(rc, "AsyncIO: io_submit failed");
151 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
153 CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
154 return doWait(minRequests, pending_);
157 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
159 CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
161 // This sets the eventFd counter to 0, see
162 // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
165 rc = ::read(pollFd_, &numEvents, 8);
166 } while (rc == -1 && errno == EINTR);
167 if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
168 return Range<Op**>(); // nothing completed
170 checkUnixError(rc, "AsyncIO: read from event fd failed");
173 DCHECK_GT(numEvents, 0);
174 DCHECK_LE(numEvents, pending_);
176 // Don't reap more than numEvents, as we've just reset the counter to 0.
177 return doWait(numEvents, numEvents);
180 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
181 io_event events[pending_];
185 count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
186 } while (count == -EINTR);
187 checkKernelError(count, "AsyncIO: io_getevents failed");
188 DCHECK_GE(count, minRequests); // the man page says so
189 DCHECK_LE(count, pending_);
193 return folly::Range<Op**>();
196 for (size_t i = 0; i < count; ++i) {
197 DCHECK(events[i].obj);
198 Op* op = boost::intrusive::get_parent_from_member(
199 events[i].obj, &AsyncIOOp::iocb_);
201 op->complete(events[i].res);
202 completed_.push_back(op);
205 return folly::Range<Op**>(&completed_.front(), count);
208 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
209 : asyncIO_(asyncIO) {
212 AsyncIOQueue::~AsyncIOQueue() {
213 CHECK_EQ(asyncIO_->pending(), 0);
216 void AsyncIOQueue::submit(AsyncIOOp* op) {
217 submit([op]() { return op; });
220 void AsyncIOQueue::submit(OpFactory op) {
221 queue_.push_back(op);
225 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
229 void AsyncIOQueue::maybeDequeue() {
230 while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
231 auto& opFactory = queue_.front();
232 auto op = opFactory();
235 // Interpose our completion callback
236 auto& nextCb = op->notificationCallback();
237 op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
238 this->onCompleted(op);
239 if (nextCb) nextCb(op);
242 asyncIO_->submit(op);