#include <cerrno>
+#include <boost/intrusive/parent_from_member.hpp>
#include <glog/logging.h>
#include "folly/Exception.h"
namespace folly {
+AsyncIOOp::AsyncIOOp(NotificationCallback cb)
+ : cb_(std::move(cb)),
+ state_(UNINITIALIZED),
+ result_(-EINVAL) {
+ memset(&iocb_, 0, sizeof(iocb_));
+}
+
+void AsyncIOOp::reset(NotificationCallback cb) {
+ CHECK_NE(state_, PENDING);
+ cb_ = std::move(cb);
+ state_ = UNINITIALIZED;
+ result_ = -EINVAL;
+ memset(&iocb_, 0, sizeof(iocb_));
+}
+
+AsyncIOOp::~AsyncIOOp() {
+ CHECK_NE(state_, PENDING);
+}
+
+void AsyncIOOp::start() {
+ DCHECK_EQ(state_, INITIALIZED);
+ state_ = PENDING;
+}
+
+void AsyncIOOp::complete(ssize_t result) {
+ DCHECK_EQ(state_, PENDING);
+ state_ = COMPLETED;
+ result_ = result;
+ if (cb_) {
+ cb_(this);
+ }
+}
+
+ssize_t AsyncIOOp::result() const {
+ CHECK_EQ(state_, COMPLETED);
+ return result_;
+}
+
+void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
+ init();
+ io_prep_pread(&iocb_, fd, buf, size, start);
+}
+
+void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
+ pread(fd, range.begin(), range.size(), start);
+}
+
+void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
+ init();
+ io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
+}
+
+void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
+ init();
+ io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
+}
+
+void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
+ pwrite(fd, range.begin(), range.size(), start);
+}
+
+void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
+ init();
+ io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
+}
+
+void AsyncIOOp::init() {
+ CHECK_EQ(state_, UNINITIALIZED);
+ state_ = INITIALIZED;
+}
+
AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: ctx_(0),
pending_(0),
}
}
-void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
- iocb cb;
- io_prep_pread(&cb, fd, buf, size, start);
- submit(op, &cb);
-}
-
-void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
- off_t start) {
- pread(op, fd, range.begin(), range.size(), start);
-}
-
-void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
- off_t start) {
- iocb cb;
- io_prep_preadv(&cb, fd, iov, iovcnt, start);
- submit(op, &cb);
-}
-
-void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
- off_t start) {
- iocb cb;
- io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
- submit(op, &cb);
-}
-
-void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
- off_t start) {
- pwrite(op, fd, range.begin(), range.size(), start);
-}
-
-void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
- off_t start) {
- iocb cb;
- io_prep_pwritev(&cb, fd, iov, iovcnt, start);
- submit(op, &cb);
-}
-
void AsyncIO::initializeContext() {
if (!ctx_) {
int rc = io_queue_init(capacity_, &ctx_);
}
}
-void AsyncIO::submit(Op* op, iocb* cb) {
- CHECK_EQ(op->state(), Op::UNINITIALIZED);
+void AsyncIO::submit(Op* op) {
+ CHECK_EQ(op->state(), Op::INITIALIZED);
CHECK_LT(pending_, capacity_) << "too many pending requests";
initializeContext(); // on demand
- cb->data = op;
+ iocb* cb = &op->iocb_;
+ cb->data = nullptr; // unused
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
}
for (size_t i = 0; i < count; ++i) {
- Op* op = static_cast<Op*>(events[i].data);
- DCHECK(op);
+ DCHECK(events[i].obj);
+ Op* op = boost::intrusive::get_parent_from_member(
+ events[i].obj, &AsyncIOOp::iocb_);
+ --pending_;
op->complete(events[i].res);
completed_.push_back(op);
}
- pending_ -= count;
return folly::Range<Op**>(&completed_.front(), count);
}
-AsyncIO::Op::Op()
- : state_(UNINITIALIZED),
- result_(-EINVAL) {
+AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
+ : asyncIO_(asyncIO) {
}
-void AsyncIO::Op::reset() {
- CHECK_NE(state_, PENDING);
- state_ = UNINITIALIZED;
- result_ = -EINVAL;
-}
-
-AsyncIO::Op::~Op() {
- CHECK_NE(state_, PENDING);
+AsyncIOQueue::~AsyncIOQueue() {
+ CHECK_EQ(asyncIO_->pending(), 0);
}
-void AsyncIO::Op::start() {
- DCHECK_EQ(state_, UNINITIALIZED);
- state_ = PENDING;
+void AsyncIOQueue::submit(AsyncIOOp* op) {
+ submit([op]() { return op; });
}
-void AsyncIO::Op::complete(ssize_t result) {
- DCHECK_EQ(state_, PENDING);
- state_ = COMPLETED;
- result_ = result;
- onCompleted();
+void AsyncIOQueue::submit(OpFactory op) {
+ queue_.push_back(op);
+ maybeDequeue();
}
-void AsyncIO::Op::onCompleted() { } // default: do nothing
-
-ssize_t AsyncIO::Op::result() const {
- CHECK_EQ(state_, COMPLETED);
- return result_;
+void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
+ maybeDequeue();
}
-CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
-
-CallbackOp::~CallbackOp() { }
+void AsyncIOQueue::maybeDequeue() {
+ while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
+ auto& opFactory = queue_.front();
+ auto op = opFactory();
+ queue_.pop_front();
-CallbackOp* CallbackOp::make(Callback&& callback) {
- // Ensure created on the heap
- return new CallbackOp(std::move(callback));
-}
+ // Interpose our completion callback
+ auto& nextCb = op->notificationCallback();
+ op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
+ this->onCompleted(op);
+ if (nextCb) nextCb(op);
+ });
-void CallbackOp::onCompleted() {
- callback_(result());
- delete this;
+ asyncIO_->submit(op);
+ }
}
} // namespace folly
#include <libaio.h>
#include <cstdint>
+#include <deque>
#include <functional>
#include <utility>
#include <vector>
namespace folly {
+/**
+ * An AsyncIOOp represents a pending operation. You may set a notification
+ * callback or you may use this class's methods directly.
+ *
+ * The op must remain allocated until completion.
+ */
+class AsyncIOOp : private boost::noncopyable {
+ friend class AsyncIO;
+ public:
+ typedef std::function<void(AsyncIOOp*)> NotificationCallback;
+
+ explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
+ ~AsyncIOOp();
+
+ // There would be a cancel() method here if Linux AIO actually implemented
+ // it. But let's not get your hopes up.
+
+ enum State {
+ UNINITIALIZED,
+ INITIALIZED,
+ PENDING,
+ COMPLETED
+ };
+
+ /**
+ * Initiate a read request.
+ */
+ void pread(int fd, void* buf, size_t size, off_t start);
+ void pread(int fd, Range<unsigned char*> range, off_t start);
+ void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
+
+ /**
+ * Initiate a write request.
+ */
+ void pwrite(int fd, const void* buf, size_t size, off_t start);
+ void pwrite(int fd, Range<const unsigned char*> range, off_t start);
+ void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
+
+ /**
+ * Return the current operation state.
+ */
+ State state() const { return state_; }
+
+ /**
+ * Reset the operation for reuse. It is an error to call reset() on
+ * an Op that is still pending.
+ */
+ void reset(NotificationCallback cb = NotificationCallback());
+
+ void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
+ const NotificationCallback& notificationCallback() const { return cb_; }
+
+ /**
+ * Retrieve the result of this operation. Returns >=0 on success,
+ * -errno on failure (that is, using the Linux kernel error reporting
+ * conventions). Use checkKernelError (folly/Exception.h) on the result to
+ * throw a std::system_error in case of error instead.
+ *
+ * It is an error to call this if the Op hasn't yet started or is still
+ * pending.
+ */
+ ssize_t result() const;
+
+ private:
+ void init();
+ void start();
+ void complete(ssize_t result);
+
+ NotificationCallback cb_;
+ iocb iocb_;
+ State state_;
+ ssize_t result_;
+};
+
/**
* C++ interface around Linux Async IO.
*/
class AsyncIO : private boost::noncopyable {
public:
+ typedef AsyncIOOp Op;
+
enum PollMode {
NOT_POLLABLE,
POLLABLE
explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
~AsyncIO();
- /**
- * An Op represents a pending operation. You may inherit from Op (and
- * override onCompleted) in order to be notified of completion (see
- * CallbackOp below for an example), or you may use Op's methods directly.
- *
- * The Op must remain allocated until completion.
- */
- class Op : private boost::noncopyable {
- friend class AsyncIO;
- public:
- Op();
- virtual ~Op();
-
- // There would be a cancel() method here if Linux AIO actually implemented
- // it. But let's not get your hopes up.
-
- enum State {
- UNINITIALIZED,
- PENDING,
- COMPLETED
- };
-
- /**
- * Return the current operation state.
- */
- State state() const { return state_; }
-
- /**
- * Reset the operation for reuse. It is an error to call reset() on
- * an Op that is still pending.
- */
- void reset();
-
- /**
- * Retrieve the result of this operation. Returns >=0 on success,
- * -errno on failure (that is, using the Linux kernel error reporting
- * conventions). Use checkKernelError (folly/Exception.h) on the result to
- * throw a std::system_error in case of error instead.
- *
- * It is an error to call this if the Op hasn't yet started or is still
- * pending.
- */
- ssize_t result() const;
-
- private:
- void start();
- void complete(ssize_t result);
-
- virtual void onCompleted();
-
- State state_;
- ssize_t result_;
- };
-
- /**
- * Initiate a read request.
- */
- void pread(Op* op, int fd, void* buf, size_t size, off_t start);
- void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
- void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
-
- /**
- * Initiate a write request.
- */
- void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
- void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
- void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
-
/**
* Wait for at least minRequests to complete. Returns the requests that
* have completed; the returned range is valid until the next call to
*/
Range<Op**> pollCompleted();
+ /**
+ * Submit an op for execution.
+ */
+ void submit(Op* op);
+
private:
void initializeContext();
- void submit(Op* op, iocb* cb);
Range<Op**> doWait(size_t minRequests, size_t maxRequests);
io_context_t ctx_;
};
/**
- * Implementation of AsyncIO::Op that calls a callback and then deletes
- * itself.
+ * Wrapper around AsyncIO that allows you to schedule more requests than
+ * the AsyncIO's object capacity. Other requests are queued and processed
+ * in a FIFO order.
*/
-class CallbackOp : public AsyncIO::Op {
+class AsyncIOQueue {
public:
- typedef std::function<void(ssize_t)> Callback;
- static CallbackOp* make(Callback&& callback);
+ /**
+ * Create a queue, using the given AsyncIO object.
+ * The AsyncIO object may not be used by anything else until the
+ * queue is destroyed.
+ */
+ explicit AsyncIOQueue(AsyncIO* asyncIO);
+ ~AsyncIOQueue();
+ size_t queued() const { return queue_.size(); }
+
+ /**
+ * Submit an op to the AsyncIO queue. The op will be queued until
+ * the AsyncIO object has room.
+ */
+ void submit(AsyncIOOp* op);
+
+ /**
+ * Submit a delayed op to the AsyncIO queue; this allows you to postpone
+ * creation of the Op (which may require allocating memory, etc) until
+ * the AsyncIO object has room.
+ */
+ typedef std::function<AsyncIOOp*()> OpFactory;
+ void submit(OpFactory op);
private:
- explicit CallbackOp(Callback&& callback);
- ~CallbackOp();
- void onCompleted() FOLLY_OVERRIDE;
+ void onCompleted(AsyncIOOp* op);
+ void maybeDequeue();
+
+ AsyncIO* asyncIO_;
- Callback callback_;
+ std::deque<OpFactory> queue_;
};
} // namespace folly
namespace fs = folly::fs;
using folly::AsyncIO;
+using folly::AsyncIOQueue;
namespace {
TemporaryFile tempFile(6 << 20); // 6MiB
+typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+ManagedBuffer allocateAligned(size_t size) {
+ void* buf;
+ int rc = posix_memalign(&buf, 512, size);
+ CHECK_EQ(rc, 0) << strerror(rc);
+ return ManagedBuffer(reinterpret_cast<char*>(buf), free);
+}
+
void testReadsSerially(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(1, pollMode);
};
for (int i = 0; i < specs.size(); i++) {
- std::unique_ptr<char[]> buf(new char[specs[i].size]);
- aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
+ auto buf = allocateAligned(specs[i].size);
+ op.pread(fd, buf.get(), specs[i].size, specs[i].start);
+ aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 1);
auto ops = readerWait(&aioReader);
EXPECT_EQ(1, ops.size());
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
- std::vector<std::unique_ptr<char[]>> bufs(specs.size());
+ std::vector<ManagedBuffer> bufs;
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
::close(fd);
};
for (int i = 0; i < specs.size(); i++) {
- bufs[i].reset(new char[specs[i].size]);
- aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
- specs[i].start);
+ bufs.push_back(allocateAligned(specs[i].size));
+ ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+ aioReader.submit(&ops[i]);
}
std::vector<bool> pending(specs.size(), true);
}
}
+void testReadsQueued(const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
+ size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
+ AsyncIO aioReader(readerCapacity, pollMode);
+ AsyncIOQueue aioQueue(&aioReader);
+ std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
+ std::vector<ManagedBuffer> bufs;
+
+ int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+ for (int i = 0; i < specs.size(); i++) {
+ bufs.push_back(allocateAligned(specs[i].size));
+ ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+ aioQueue.submit(&ops[i]);
+ }
+ std::vector<bool> pending(specs.size(), true);
+
+ size_t remaining = specs.size();
+ while (remaining != 0) {
+ if (remaining >= readerCapacity) {
+ EXPECT_EQ(readerCapacity, aioReader.pending());
+ EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
+ } else {
+ EXPECT_EQ(remaining, aioReader.pending());
+ EXPECT_EQ(0, aioQueue.queued());
+ }
+ auto completed = readerWait(&aioReader);
+ size_t nrRead = completed.size();
+ EXPECT_NE(nrRead, 0);
+ remaining -= nrRead;
+
+ for (int i = 0; i < nrRead; i++) {
+ int id = completed[i] - ops.get();
+ EXPECT_GE(id, 0);
+ EXPECT_LT(id, specs.size());
+ EXPECT_TRUE(pending[id]);
+ pending[id] = false;
+ ssize_t res = ops[id].result();
+ EXPECT_LE(0, res) << folly::errnoStr(-res);
+ EXPECT_EQ(specs[id].size, res);
+ }
+ }
+ EXPECT_EQ(aioReader.pending(), 0);
+ EXPECT_EQ(aioQueue.queued(), 0);
+ for (int i = 0; i < pending.size(); i++) {
+ EXPECT_FALSE(pending[i]);
+ }
+}
+
void testReads(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
testReadsParallel(specs, pollMode);
+ testReadsQueued(specs, pollMode);
}
} // anonymous namespace
::close(fd);
};
size_t size = 1024;
- std::unique_ptr<char[]> buf(new char[size]);
- aioReader.pread(&op, fd, buf.get(), size, 0);
+ auto buf = allocateAligned(size);
+ op.pread(fd, buf.get(), size, 0);
+ aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 1);
folly::Range<AsyncIO::Op**> completed;
EXPECT_EQ(aioReader.pending(), 0);
}
+