From a8b4b5ea1e090f4dff374aec509119e842297956 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Thu, 14 Feb 2013 15:26:26 -0800 Subject: [PATCH] Rework folly::AsyncIO interface to make it easier for other classes to use Op Summary: AsyncIOOp no longer requires derivation to be able to use callbacks; the callback is passed in. This makes composition easier (see AsyncIOQueue, added in this diff). Test Plan: async_io_test, test added Reviewed By: lucian@fb.com FB internal diff: D709648 --- folly/experimental/io/AsyncIO.cpp | 181 ++++++++++--------- folly/experimental/io/AsyncIO.h | 191 ++++++++++++--------- folly/experimental/io/test/AsyncIOTest.cpp | 81 ++++++++- 3 files changed, 290 insertions(+), 163 deletions(-) diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp index 1b3b07cb..d064affc 100644 --- a/folly/experimental/io/AsyncIO.cpp +++ b/folly/experimental/io/AsyncIO.cpp @@ -18,6 +18,7 @@ #include +#include #include #include "folly/Exception.h" @@ -27,6 +28,77 @@ namespace folly { +AsyncIOOp::AsyncIOOp(NotificationCallback cb) + : cb_(std::move(cb)), + state_(UNINITIALIZED), + result_(-EINVAL) { + memset(&iocb_, 0, sizeof(iocb_)); +} + +void AsyncIOOp::reset(NotificationCallback cb) { + CHECK_NE(state_, PENDING); + cb_ = std::move(cb); + state_ = UNINITIALIZED; + result_ = -EINVAL; + memset(&iocb_, 0, sizeof(iocb_)); +} + +AsyncIOOp::~AsyncIOOp() { + CHECK_NE(state_, PENDING); +} + +void AsyncIOOp::start() { + DCHECK_EQ(state_, INITIALIZED); + state_ = PENDING; +} + +void AsyncIOOp::complete(ssize_t result) { + DCHECK_EQ(state_, PENDING); + state_ = COMPLETED; + result_ = result; + if (cb_) { + cb_(this); + } +} + +ssize_t AsyncIOOp::result() const { + CHECK_EQ(state_, COMPLETED); + return result_; +} + +void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) { + init(); + io_prep_pread(&iocb_, fd, buf, size, start); +} + +void AsyncIOOp::pread(int fd, Range range, off_t start) { + pread(fd, range.begin(), range.size(), start); +} + +void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) { + init(); + io_prep_preadv(&iocb_, fd, iov, iovcnt, start); +} + +void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) { + init(); + io_prep_pwrite(&iocb_, fd, const_cast(buf), size, start); +} + +void AsyncIOOp::pwrite(int fd, Range range, off_t start) { + pwrite(fd, range.begin(), range.size(), start); +} + +void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) { + init(); + io_prep_pwritev(&iocb_, fd, iov, iovcnt, start); +} + +void AsyncIOOp::init() { + CHECK_EQ(state_, UNINITIALIZED); + state_ = INITIALIZED; +} + AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : ctx_(0), pending_(0), @@ -51,43 +123,6 @@ AsyncIO::~AsyncIO() { } } -void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) { - iocb cb; - io_prep_pread(&cb, fd, buf, size, start); - submit(op, &cb); -} - -void AsyncIO::pread(Op* op, int fd, Range range, - off_t start) { - pread(op, fd, range.begin(), range.size(), start); -} - -void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt, - off_t start) { - iocb cb; - io_prep_preadv(&cb, fd, iov, iovcnt, start); - submit(op, &cb); -} - -void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size, - off_t start) { - iocb cb; - io_prep_pwrite(&cb, fd, const_cast(buf), size, start); - submit(op, &cb); -} - -void AsyncIO::pwrite(Op* op, int fd, Range range, - off_t start) { - pwrite(op, fd, range.begin(), range.size(), start); -} - -void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt, - off_t start) { - iocb cb; - io_prep_pwritev(&cb, fd, iov, iovcnt, start); - submit(op, &cb); -} - void AsyncIO::initializeContext() { if (!ctx_) { int rc = io_queue_init(capacity_, &ctx_); @@ -97,11 +132,12 @@ void AsyncIO::initializeContext() { } } -void AsyncIO::submit(Op* op, iocb* cb) { - CHECK_EQ(op->state(), Op::UNINITIALIZED); +void AsyncIO::submit(Op* op) { + CHECK_EQ(op->state(), Op::INITIALIZED); CHECK_LT(pending_, capacity_) << "too many pending requests"; initializeContext(); // on demand - cb->data = op; + iocb* cb = &op->iocb_; + cb->data = nullptr; // unused if (pollFd_ != -1) { io_set_eventfd(cb, pollFd_); } @@ -158,62 +194,53 @@ Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { } for (size_t i = 0; i < count; ++i) { - Op* op = static_cast(events[i].data); - DCHECK(op); + DCHECK(events[i].obj); + Op* op = boost::intrusive::get_parent_from_member( + events[i].obj, &AsyncIOOp::iocb_); + --pending_; op->complete(events[i].res); completed_.push_back(op); } - pending_ -= count; return folly::Range(&completed_.front(), count); } -AsyncIO::Op::Op() - : state_(UNINITIALIZED), - result_(-EINVAL) { +AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) + : asyncIO_(asyncIO) { } -void AsyncIO::Op::reset() { - CHECK_NE(state_, PENDING); - state_ = UNINITIALIZED; - result_ = -EINVAL; -} - -AsyncIO::Op::~Op() { - CHECK_NE(state_, PENDING); +AsyncIOQueue::~AsyncIOQueue() { + CHECK_EQ(asyncIO_->pending(), 0); } -void AsyncIO::Op::start() { - DCHECK_EQ(state_, UNINITIALIZED); - state_ = PENDING; +void AsyncIOQueue::submit(AsyncIOOp* op) { + submit([op]() { return op; }); } -void AsyncIO::Op::complete(ssize_t result) { - DCHECK_EQ(state_, PENDING); - state_ = COMPLETED; - result_ = result; - onCompleted(); +void AsyncIOQueue::submit(OpFactory op) { + queue_.push_back(op); + maybeDequeue(); } -void AsyncIO::Op::onCompleted() { } // default: do nothing - -ssize_t AsyncIO::Op::result() const { - CHECK_EQ(state_, COMPLETED); - return result_; +void AsyncIOQueue::onCompleted(AsyncIOOp* op) { + maybeDequeue(); } -CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { } - -CallbackOp::~CallbackOp() { } +void AsyncIOQueue::maybeDequeue() { + while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) { + auto& opFactory = queue_.front(); + auto op = opFactory(); + queue_.pop_front(); -CallbackOp* CallbackOp::make(Callback&& callback) { - // Ensure created on the heap - return new CallbackOp(std::move(callback)); -} + // Interpose our completion callback + auto& nextCb = op->notificationCallback(); + op->setNotificationCallback([this, nextCb](AsyncIOOp* op) { + this->onCompleted(op); + if (nextCb) nextCb(op); + }); -void CallbackOp::onCompleted() { - callback_(result()); - delete this; + asyncIO_->submit(op); + } } } // namespace folly diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h index 18dec376..3c84ebb0 100644 --- a/folly/experimental/io/AsyncIO.h +++ b/folly/experimental/io/AsyncIO.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -33,11 +34,87 @@ namespace folly { +/** + * An AsyncIOOp represents a pending operation. You may set a notification + * callback or you may use this class's methods directly. + * + * The op must remain allocated until completion. + */ +class AsyncIOOp : private boost::noncopyable { + friend class AsyncIO; + public: + typedef std::function NotificationCallback; + + explicit AsyncIOOp(NotificationCallback cb = NotificationCallback()); + ~AsyncIOOp(); + + // There would be a cancel() method here if Linux AIO actually implemented + // it. But let's not get your hopes up. + + enum State { + UNINITIALIZED, + INITIALIZED, + PENDING, + COMPLETED + }; + + /** + * Initiate a read request. + */ + void pread(int fd, void* buf, size_t size, off_t start); + void pread(int fd, Range range, off_t start); + void preadv(int fd, const iovec* iov, int iovcnt, off_t start); + + /** + * Initiate a write request. + */ + void pwrite(int fd, const void* buf, size_t size, off_t start); + void pwrite(int fd, Range range, off_t start); + void pwritev(int fd, const iovec* iov, int iovcnt, off_t start); + + /** + * Return the current operation state. + */ + State state() const { return state_; } + + /** + * Reset the operation for reuse. It is an error to call reset() on + * an Op that is still pending. + */ + void reset(NotificationCallback cb = NotificationCallback()); + + void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); } + const NotificationCallback& notificationCallback() const { return cb_; } + + /** + * Retrieve the result of this operation. Returns >=0 on success, + * -errno on failure (that is, using the Linux kernel error reporting + * conventions). Use checkKernelError (folly/Exception.h) on the result to + * throw a std::system_error in case of error instead. + * + * It is an error to call this if the Op hasn't yet started or is still + * pending. + */ + ssize_t result() const; + + private: + void init(); + void start(); + void complete(ssize_t result); + + NotificationCallback cb_; + iocb iocb_; + State state_; + ssize_t result_; +}; + /** * C++ interface around Linux Async IO. */ class AsyncIO : private boost::noncopyable { public: + typedef AsyncIOOp Op; + enum PollMode { NOT_POLLABLE, POLLABLE @@ -60,74 +137,6 @@ class AsyncIO : private boost::noncopyable { explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE); ~AsyncIO(); - /** - * An Op represents a pending operation. You may inherit from Op (and - * override onCompleted) in order to be notified of completion (see - * CallbackOp below for an example), or you may use Op's methods directly. - * - * The Op must remain allocated until completion. - */ - class Op : private boost::noncopyable { - friend class AsyncIO; - public: - Op(); - virtual ~Op(); - - // There would be a cancel() method here if Linux AIO actually implemented - // it. But let's not get your hopes up. - - enum State { - UNINITIALIZED, - PENDING, - COMPLETED - }; - - /** - * Return the current operation state. - */ - State state() const { return state_; } - - /** - * Reset the operation for reuse. It is an error to call reset() on - * an Op that is still pending. - */ - void reset(); - - /** - * Retrieve the result of this operation. Returns >=0 on success, - * -errno on failure (that is, using the Linux kernel error reporting - * conventions). Use checkKernelError (folly/Exception.h) on the result to - * throw a std::system_error in case of error instead. - * - * It is an error to call this if the Op hasn't yet started or is still - * pending. - */ - ssize_t result() const; - - private: - void start(); - void complete(ssize_t result); - - virtual void onCompleted(); - - State state_; - ssize_t result_; - }; - - /** - * Initiate a read request. - */ - void pread(Op* op, int fd, void* buf, size_t size, off_t start); - void pread(Op* op, int fd, Range range, off_t start); - void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start); - - /** - * Initiate a write request. - */ - void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start); - void pwrite(Op* op, int fd, Range range, off_t start); - void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start); - /** * Wait for at least minRequests to complete. Returns the requests that * have completed; the returned range is valid until the next call to @@ -160,9 +169,13 @@ class AsyncIO : private boost::noncopyable { */ Range pollCompleted(); + /** + * Submit an op for execution. + */ + void submit(Op* op); + private: void initializeContext(); - void submit(Op* op, iocb* cb); Range doWait(size_t minRequests, size_t maxRequests); io_context_t ctx_; @@ -173,20 +186,42 @@ class AsyncIO : private boost::noncopyable { }; /** - * Implementation of AsyncIO::Op that calls a callback and then deletes - * itself. + * Wrapper around AsyncIO that allows you to schedule more requests than + * the AsyncIO's object capacity. Other requests are queued and processed + * in a FIFO order. */ -class CallbackOp : public AsyncIO::Op { +class AsyncIOQueue { public: - typedef std::function Callback; - static CallbackOp* make(Callback&& callback); + /** + * Create a queue, using the given AsyncIO object. + * The AsyncIO object may not be used by anything else until the + * queue is destroyed. + */ + explicit AsyncIOQueue(AsyncIO* asyncIO); + ~AsyncIOQueue(); + size_t queued() const { return queue_.size(); } + + /** + * Submit an op to the AsyncIO queue. The op will be queued until + * the AsyncIO object has room. + */ + void submit(AsyncIOOp* op); + + /** + * Submit a delayed op to the AsyncIO queue; this allows you to postpone + * creation of the Op (which may require allocating memory, etc) until + * the AsyncIO object has room. + */ + typedef std::function OpFactory; + void submit(OpFactory op); private: - explicit CallbackOp(Callback&& callback); - ~CallbackOp(); - void onCompleted() FOLLY_OVERRIDE; + void onCompleted(AsyncIOOp* op); + void maybeDequeue(); + + AsyncIO* asyncIO_; - Callback callback_; + std::deque queue_; }; } // namespace folly diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp index c8601309..a513913d 100644 --- a/folly/experimental/io/test/AsyncIOTest.cpp +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -36,6 +36,7 @@ namespace fs = folly::fs; using folly::AsyncIO; +using folly::AsyncIOQueue; namespace { @@ -116,6 +117,14 @@ TemporaryFile::~TemporaryFile() { TemporaryFile tempFile(6 << 20); // 6MiB +typedef std::unique_ptr ManagedBuffer; +ManagedBuffer allocateAligned(size_t size) { + void* buf; + int rc = posix_memalign(&buf, 512, size); + CHECK_EQ(rc, 0) << strerror(rc); + return ManagedBuffer(reinterpret_cast(buf), free); +} + void testReadsSerially(const std::vector& specs, AsyncIO::PollMode pollMode) { AsyncIO aioReader(1, pollMode); @@ -127,8 +136,9 @@ void testReadsSerially(const std::vector& specs, }; for (int i = 0; i < specs.size(); i++) { - std::unique_ptr buf(new char[specs[i].size]); - aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start); + auto buf = allocateAligned(specs[i].size); + op.pread(fd, buf.get(), specs[i].size, specs[i].start); + aioReader.submit(&op); EXPECT_EQ(aioReader.pending(), 1); auto ops = readerWait(&aioReader); EXPECT_EQ(1, ops.size()); @@ -145,7 +155,7 @@ void testReadsParallel(const std::vector& specs, AsyncIO::PollMode pollMode) { AsyncIO aioReader(specs.size(), pollMode); std::unique_ptr ops(new AsyncIO::Op[specs.size()]); - std::vector> bufs(specs.size()); + std::vector bufs; int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); PCHECK(fd != -1); @@ -153,9 +163,9 @@ void testReadsParallel(const std::vector& specs, ::close(fd); }; for (int i = 0; i < specs.size(); i++) { - bufs[i].reset(new char[specs[i].size]); - aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size, - specs[i].start); + bufs.push_back(allocateAligned(specs[i].size)); + ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); + aioReader.submit(&ops[i]); } std::vector pending(specs.size(), true); @@ -184,10 +194,63 @@ void testReadsParallel(const std::vector& specs, } } +void testReadsQueued(const std::vector& specs, + AsyncIO::PollMode pollMode) { + size_t readerCapacity = std::max(specs.size() / 2, size_t(1)); + AsyncIO aioReader(readerCapacity, pollMode); + AsyncIOQueue aioQueue(&aioReader); + std::unique_ptr ops(new AsyncIO::Op[specs.size()]); + std::vector bufs; + + int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); + PCHECK(fd != -1); + SCOPE_EXIT { + ::close(fd); + }; + for (int i = 0; i < specs.size(); i++) { + bufs.push_back(allocateAligned(specs[i].size)); + ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); + aioQueue.submit(&ops[i]); + } + std::vector pending(specs.size(), true); + + size_t remaining = specs.size(); + while (remaining != 0) { + if (remaining >= readerCapacity) { + EXPECT_EQ(readerCapacity, aioReader.pending()); + EXPECT_EQ(remaining - readerCapacity, aioQueue.queued()); + } else { + EXPECT_EQ(remaining, aioReader.pending()); + EXPECT_EQ(0, aioQueue.queued()); + } + auto completed = readerWait(&aioReader); + size_t nrRead = completed.size(); + EXPECT_NE(nrRead, 0); + remaining -= nrRead; + + for (int i = 0; i < nrRead; i++) { + int id = completed[i] - ops.get(); + EXPECT_GE(id, 0); + EXPECT_LT(id, specs.size()); + EXPECT_TRUE(pending[id]); + pending[id] = false; + ssize_t res = ops[id].result(); + EXPECT_LE(0, res) << folly::errnoStr(-res); + EXPECT_EQ(specs[id].size, res); + } + } + EXPECT_EQ(aioReader.pending(), 0); + EXPECT_EQ(aioQueue.queued(), 0); + for (int i = 0; i < pending.size(); i++) { + EXPECT_FALSE(pending[i]); + } +} + void testReads(const std::vector& specs, AsyncIO::PollMode pollMode) { testReadsSerially(specs, pollMode); testReadsParallel(specs, pollMode); + testReadsQueued(specs, pollMode); } } // anonymous namespace @@ -275,8 +338,9 @@ TEST(AsyncIO, NonBlockingWait) { ::close(fd); }; size_t size = 1024; - std::unique_ptr buf(new char[size]); - aioReader.pread(&op, fd, buf.get(), size, 0); + auto buf = allocateAligned(size); + op.pread(fd, buf.get(), size, 0); + aioReader.submit(&op); EXPECT_EQ(aioReader.pending(), 1); folly::Range completed; @@ -293,3 +357,4 @@ TEST(AsyncIO, NonBlockingWait) { EXPECT_EQ(aioReader.pending(), 0); } + -- 2.34.1