2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
23 #include <boost/intrusive/parent_from_member.hpp>
24 #include <glog/logging.h>
26 #include "folly/Exception.h"
27 #include "folly/Format.h"
28 #include "folly/Likely.h"
29 #include "folly/String.h"
30 #include "folly/eventfd.h"
34 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
36 state_(State::UNINITIALIZED),
38 memset(&iocb_, 0, sizeof(iocb_));
41 void AsyncIOOp::reset(NotificationCallback cb) {
42 CHECK_NE(state_, State::PENDING);
44 state_ = State::UNINITIALIZED;
46 memset(&iocb_, 0, sizeof(iocb_));
49 AsyncIOOp::~AsyncIOOp() {
50 CHECK_NE(state_, State::PENDING);
53 void AsyncIOOp::start() {
54 DCHECK_EQ(state_, State::INITIALIZED);
55 state_ = State::PENDING;
58 void AsyncIOOp::complete(ssize_t result) {
59 DCHECK_EQ(state_, State::PENDING);
60 state_ = State::COMPLETED;
67 ssize_t AsyncIOOp::result() const {
68 CHECK_EQ(state_, State::COMPLETED);
72 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
74 io_prep_pread(&iocb_, fd, buf, size, start);
77 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
78 pread(fd, range.begin(), range.size(), start);
81 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
83 io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
86 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
88 io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
91 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
92 pwrite(fd, range.begin(), range.size(), start);
95 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
97 io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
100 void AsyncIOOp::init() {
101 CHECK_EQ(state_, State::UNINITIALIZED);
102 state_ = State::INITIALIZED;
105 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
110 CHECK_GT(capacity_, 0);
111 completed_.reserve(capacity_);
112 if (pollMode == POLLABLE) {
113 pollFd_ = eventfd(0, EFD_NONBLOCK);
114 checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
118 AsyncIO::~AsyncIO() {
119 CHECK_EQ(pending_, 0);
121 int rc = io_queue_release(ctx_);
122 CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
125 CHECK_ERR(close(pollFd_));
129 void AsyncIO::initializeContext() {
131 int rc = io_queue_init(capacity_, &ctx_);
132 // returns negative errno
133 checkKernelError(rc, "AsyncIO: io_queue_init failed");
138 void AsyncIO::submit(Op* op) {
139 CHECK_EQ(op->state(), Op::State::INITIALIZED);
140 CHECK_LT(pending_, capacity_) << "too many pending requests";
141 initializeContext(); // on demand
142 iocb* cb = &op->iocb_;
143 cb->data = nullptr; // unused
145 io_set_eventfd(cb, pollFd_);
147 int rc = io_submit(ctx_, 1, &cb);
148 checkKernelError(rc, "AsyncIO: io_submit failed");
154 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
156 CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
157 return doWait(minRequests, pending_);
160 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
162 CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
164 // This sets the eventFd counter to 0, see
165 // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
168 rc = ::read(pollFd_, &numEvents, 8);
169 } while (rc == -1 && errno == EINTR);
170 if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
171 return Range<Op**>(); // nothing completed
173 checkUnixError(rc, "AsyncIO: read from event fd failed");
176 DCHECK_GT(numEvents, 0);
177 DCHECK_LE(numEvents, pending_);
179 // Don't reap more than numEvents, as we've just reset the counter to 0.
180 return doWait(numEvents, numEvents);
183 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
184 io_event events[pending_];
188 count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
189 } while (count == -EINTR);
190 checkKernelError(count, "AsyncIO: io_getevents failed");
191 DCHECK_GE(count, minRequests); // the man page says so
192 DCHECK_LE(count, pending_);
196 return folly::Range<Op**>();
199 for (size_t i = 0; i < count; ++i) {
200 DCHECK(events[i].obj);
201 Op* op = boost::intrusive::get_parent_from_member(
202 events[i].obj, &AsyncIOOp::iocb_);
204 op->complete(events[i].res);
205 completed_.push_back(op);
208 return folly::Range<Op**>(&completed_.front(), count);
211 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
212 : asyncIO_(asyncIO) {
215 AsyncIOQueue::~AsyncIOQueue() {
216 CHECK_EQ(asyncIO_->pending(), 0);
219 void AsyncIOQueue::submit(AsyncIOOp* op) {
220 submit([op]() { return op; });
223 void AsyncIOQueue::submit(OpFactory op) {
224 queue_.push_back(op);
228 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
232 void AsyncIOQueue::maybeDequeue() {
233 while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
234 auto& opFactory = queue_.front();
235 auto op = opFactory();
238 // Interpose our completion callback
239 auto& nextCb = op->notificationCallback();
240 op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
241 this->onCompleted(op);
242 if (nextCb) nextCb(op);
245 asyncIO_->submit(op);
249 // debugging helpers:
253 #define X(c) case c: return #c
255 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
257 X(AsyncIOOp::State::UNINITIALIZED);
258 X(AsyncIOOp::State::INITIALIZED);
259 X(AsyncIOOp::State::PENDING);
260 X(AsyncIOOp::State::COMPLETED);
262 return "<INVALID AsyncIOOp::State>";
265 const char* iocbCmdToString(short int cmd_short) {
266 io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
277 return "<INVALID io_iocb_cmd>";
282 std::string fd2name(int fd) {
283 std::string path = folly::to<std::string>("/proc/self/fd/", fd);
285 const ssize_t length =
286 std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
287 return path.assign(link, length);
290 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
292 "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
293 cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
294 cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
296 switch (cb.aio_lio_opcode) {
299 os << folly::format("buf={}, off={}, size={}, ",
300 cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
302 os << "[TODO: write debug string for "
303 << iocbCmdToString(cb.aio_lio_opcode) << "] ";
309 } // anonymous namespace
311 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
312 os << "{" << op.state_ << ", ";
314 if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
318 if (op.state_ == AsyncIOOp::State::COMPLETED) {
319 os << "result=" << op.result_ << ", ";
325 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
326 return os << asyncIoOpStateToString(state);