Rework folly::AsyncIO interface to make it easier for other classes to use Op
authorTudor Bosman <tudorb@fb.com>
Thu, 14 Feb 2013 23:26:26 +0000 (15:26 -0800)
committerJordan DeLong <jdelong@fb.com>
Tue, 19 Mar 2013 00:07:33 +0000 (17:07 -0700)
Summary:
AsyncIOOp no longer requires derivation to be able to use callbacks; the
callback is passed in.  This makes composition easier (see AsyncIOQueue, added
in this diff).

Test Plan: async_io_test, test added

Reviewed By: lucian@fb.com

FB internal diff: D709648

folly/experimental/io/AsyncIO.cpp
folly/experimental/io/AsyncIO.h
folly/experimental/io/test/AsyncIOTest.cpp

index 1b3b07cb8f54826af0a51fd5717f7d0889977784..d064affc9176796e861bd1afecd3b08077704693 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <cerrno>
 
+#include <boost/intrusive/parent_from_member.hpp>
 #include <glog/logging.h>
 
 #include "folly/Exception.h"
 
 namespace folly {
 
+AsyncIOOp::AsyncIOOp(NotificationCallback cb)
+  : cb_(std::move(cb)),
+    state_(UNINITIALIZED),
+    result_(-EINVAL) {
+  memset(&iocb_, 0, sizeof(iocb_));
+}
+
+void AsyncIOOp::reset(NotificationCallback cb) {
+  CHECK_NE(state_, PENDING);
+  cb_ = std::move(cb);
+  state_ = UNINITIALIZED;
+  result_ = -EINVAL;
+  memset(&iocb_, 0, sizeof(iocb_));
+}
+
+AsyncIOOp::~AsyncIOOp() {
+  CHECK_NE(state_, PENDING);
+}
+
+void AsyncIOOp::start() {
+  DCHECK_EQ(state_, INITIALIZED);
+  state_ = PENDING;
+}
+
+void AsyncIOOp::complete(ssize_t result) {
+  DCHECK_EQ(state_, PENDING);
+  state_ = COMPLETED;
+  result_ = result;
+  if (cb_) {
+    cb_(this);
+  }
+}
+
+ssize_t AsyncIOOp::result() const {
+  CHECK_EQ(state_, COMPLETED);
+  return result_;
+}
+
+void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
+  init();
+  io_prep_pread(&iocb_, fd, buf, size, start);
+}
+
+void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
+  pread(fd, range.begin(), range.size(), start);
+}
+
+void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
+  init();
+  io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
+}
+
+void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
+  init();
+  io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
+}
+
+void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
+  pwrite(fd, range.begin(), range.size(), start);
+}
+
+void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
+  init();
+  io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
+}
+
+void AsyncIOOp::init() {
+  CHECK_EQ(state_, UNINITIALIZED);
+  state_ = INITIALIZED;
+}
+
 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
   : ctx_(0),
     pending_(0),
@@ -51,43 +123,6 @@ AsyncIO::~AsyncIO() {
   }
 }
 
-void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
-  iocb cb;
-  io_prep_pread(&cb, fd, buf, size, start);
-  submit(op, &cb);
-}
-
-void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
-                    off_t start) {
-  pread(op, fd, range.begin(), range.size(), start);
-}
-
-void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
-                     off_t start) {
-  iocb cb;
-  io_prep_preadv(&cb, fd, iov, iovcnt, start);
-  submit(op, &cb);
-}
-
-void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
-                     off_t start) {
-  iocb cb;
-  io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
-  submit(op, &cb);
-}
-
-void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
-                     off_t start) {
-  pwrite(op, fd, range.begin(), range.size(), start);
-}
-
-void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
-                      off_t start) {
-  iocb cb;
-  io_prep_pwritev(&cb, fd, iov, iovcnt, start);
-  submit(op, &cb);
-}
-
 void AsyncIO::initializeContext() {
   if (!ctx_) {
     int rc = io_queue_init(capacity_, &ctx_);
@@ -97,11 +132,12 @@ void AsyncIO::initializeContext() {
   }
 }
 
-void AsyncIO::submit(Op* op, iocb* cb) {
-  CHECK_EQ(op->state(), Op::UNINITIALIZED);
+void AsyncIO::submit(Op* op) {
+  CHECK_EQ(op->state(), Op::INITIALIZED);
   CHECK_LT(pending_, capacity_) << "too many pending requests";
   initializeContext();  // on demand
-  cb->data = op;
+  iocb* cb = &op->iocb_;
+  cb->data = nullptr;  // unused
   if (pollFd_ != -1) {
     io_set_eventfd(cb, pollFd_);
   }
@@ -158,62 +194,53 @@ Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
   }
 
   for (size_t i = 0; i < count; ++i) {
-    Op* op = static_cast<Op*>(events[i].data);
-    DCHECK(op);
+    DCHECK(events[i].obj);
+    Op* op = boost::intrusive::get_parent_from_member(
+        events[i].obj, &AsyncIOOp::iocb_);
+    --pending_;
     op->complete(events[i].res);
     completed_.push_back(op);
   }
-  pending_ -= count;
 
   return folly::Range<Op**>(&completed_.front(), count);
 }
 
-AsyncIO::Op::Op()
-  : state_(UNINITIALIZED),
-    result_(-EINVAL) {
+AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
+  : asyncIO_(asyncIO) {
 }
 
-void AsyncIO::Op::reset() {
-  CHECK_NE(state_, PENDING);
-  state_ = UNINITIALIZED;
-  result_ = -EINVAL;
-}
-
-AsyncIO::Op::~Op() {
-  CHECK_NE(state_, PENDING);
+AsyncIOQueue::~AsyncIOQueue() {
+  CHECK_EQ(asyncIO_->pending(), 0);
 }
 
-void AsyncIO::Op::start() {
-  DCHECK_EQ(state_, UNINITIALIZED);
-  state_ = PENDING;
+void AsyncIOQueue::submit(AsyncIOOp* op) {
+  submit([op]() { return op; });
 }
 
-void AsyncIO::Op::complete(ssize_t result) {
-  DCHECK_EQ(state_, PENDING);
-  state_ = COMPLETED;
-  result_ = result;
-  onCompleted();
+void AsyncIOQueue::submit(OpFactory op) {
+  queue_.push_back(op);
+  maybeDequeue();
 }
 
-void AsyncIO::Op::onCompleted() { }  // default: do nothing
-
-ssize_t AsyncIO::Op::result() const {
-  CHECK_EQ(state_, COMPLETED);
-  return result_;
+void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
+  maybeDequeue();
 }
 
-CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
-
-CallbackOp::~CallbackOp() { }
+void AsyncIOQueue::maybeDequeue() {
+  while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
+    auto& opFactory = queue_.front();
+    auto op = opFactory();
+    queue_.pop_front();
 
-CallbackOp* CallbackOp::make(Callback&& callback) {
-  // Ensure created on the heap
-  return new CallbackOp(std::move(callback));
-}
+    // Interpose our completion callback
+    auto& nextCb = op->notificationCallback();
+    op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
+      this->onCompleted(op);
+      if (nextCb) nextCb(op);
+    });
 
-void CallbackOp::onCompleted() {
-  callback_(result());
-  delete this;
+    asyncIO_->submit(op);
+  }
 }
 
 }  // namespace folly
index 18dec376a6bdb1ab91e42342e9891a88372858d7..3c84ebb0c4f2be331669ffb88b379fdfb8fbbfc3 100644 (file)
@@ -22,6 +22,7 @@
 #include <libaio.h>
 
 #include <cstdint>
+#include <deque>
 #include <functional>
 #include <utility>
 #include <vector>
 
 namespace folly {
 
+/**
+ * An AsyncIOOp represents a pending operation.  You may set a notification
+ * callback or you may use this class's methods directly.
+ *
+ * The op must remain allocated until completion.
+ */
+class AsyncIOOp : private boost::noncopyable {
+  friend class AsyncIO;
+ public:
+  typedef std::function<void(AsyncIOOp*)> NotificationCallback;
+
+  explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
+  ~AsyncIOOp();
+
+  // There would be a cancel() method here if Linux AIO actually implemented
+  // it.  But let's not get your hopes up.
+
+  enum State {
+    UNINITIALIZED,
+    INITIALIZED,
+    PENDING,
+    COMPLETED
+  };
+
+  /**
+   * Initiate a read request.
+   */
+  void pread(int fd, void* buf, size_t size, off_t start);
+  void pread(int fd, Range<unsigned char*> range, off_t start);
+  void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
+
+  /**
+   * Initiate a write request.
+   */
+  void pwrite(int fd, const void* buf, size_t size, off_t start);
+  void pwrite(int fd, Range<const unsigned char*> range, off_t start);
+  void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
+
+  /**
+   * Return the current operation state.
+   */
+  State state() const { return state_; }
+
+  /**
+   * Reset the operation for reuse.  It is an error to call reset() on
+   * an Op that is still pending.
+   */
+  void reset(NotificationCallback cb = NotificationCallback());
+
+  void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
+  const NotificationCallback& notificationCallback() const { return cb_; }
+
+  /**
+   * Retrieve the result of this operation.  Returns >=0 on success,
+   * -errno on failure (that is, using the Linux kernel error reporting
+   * conventions).  Use checkKernelError (folly/Exception.h) on the result to
+   * throw a std::system_error in case of error instead.
+   *
+   * It is an error to call this if the Op hasn't yet started or is still
+   * pending.
+   */
+  ssize_t result() const;
+
+ private:
+  void init();
+  void start();
+  void complete(ssize_t result);
+
+  NotificationCallback cb_;
+  iocb iocb_;
+  State state_;
+  ssize_t result_;
+};
+
 /**
  * C++ interface around Linux Async IO.
  */
 class AsyncIO : private boost::noncopyable {
  public:
+  typedef AsyncIOOp Op;
+
   enum PollMode {
     NOT_POLLABLE,
     POLLABLE
@@ -60,74 +137,6 @@ class AsyncIO : private boost::noncopyable {
   explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
   ~AsyncIO();
 
-  /**
-   * An Op represents a pending operation.  You may inherit from Op (and
-   * override onCompleted) in order to be notified of completion (see
-   * CallbackOp below for an example), or you may use Op's methods directly.
-   *
-   * The Op must remain allocated until completion.
-   */
-  class Op : private boost::noncopyable {
-    friend class AsyncIO;
-   public:
-    Op();
-    virtual ~Op();
-
-    // There would be a cancel() method here if Linux AIO actually implemented
-    // it.  But let's not get your hopes up.
-
-    enum State {
-      UNINITIALIZED,
-      PENDING,
-      COMPLETED
-    };
-
-    /**
-     * Return the current operation state.
-     */
-    State state() const { return state_; }
-
-    /**
-     * Reset the operation for reuse.  It is an error to call reset() on
-     * an Op that is still pending.
-     */
-    void reset();
-
-    /**
-     * Retrieve the result of this operation.  Returns >=0 on success,
-     * -errno on failure (that is, using the Linux kernel error reporting
-     * conventions).  Use checkKernelError (folly/Exception.h) on the result to
-     * throw a std::system_error in case of error instead.
-     *
-     * It is an error to call this if the Op hasn't yet started or is still
-     * pending.
-     */
-    ssize_t result() const;
-
-   private:
-    void start();
-    void complete(ssize_t result);
-
-    virtual void onCompleted();
-
-    State state_;
-    ssize_t result_;
-  };
-
-  /**
-   * Initiate a read request.
-   */
-  void pread(Op* op, int fd, void* buf, size_t size, off_t start);
-  void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
-  void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
-
-  /**
-   * Initiate a write request.
-   */
-  void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
-  void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
-  void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
-
   /**
    * Wait for at least minRequests to complete.  Returns the requests that
    * have completed; the returned range is valid until the next call to
@@ -160,9 +169,13 @@ class AsyncIO : private boost::noncopyable {
    */
   Range<Op**> pollCompleted();
 
+  /**
+   * Submit an op for execution.
+   */
+  void submit(Op* op);
+
  private:
   void initializeContext();
-  void submit(Op* op, iocb* cb);
   Range<Op**> doWait(size_t minRequests, size_t maxRequests);
 
   io_context_t ctx_;
@@ -173,20 +186,42 @@ class AsyncIO : private boost::noncopyable {
 };
 
 /**
- * Implementation of AsyncIO::Op that calls a callback and then deletes
- * itself.
+ * Wrapper around AsyncIO that allows you to schedule more requests than
+ * the AsyncIO's object capacity.  Other requests are queued and processed
+ * in a FIFO order.
  */
-class CallbackOp : public AsyncIO::Op {
+class AsyncIOQueue {
  public:
-  typedef std::function<void(ssize_t)> Callback;
-  static CallbackOp* make(Callback&& callback);
+  /**
+   * Create a queue, using the given AsyncIO object.
+   * The AsyncIO object may not be used by anything else until the
+   * queue is destroyed.
+   */
+  explicit AsyncIOQueue(AsyncIO* asyncIO);
+  ~AsyncIOQueue();
 
+  size_t queued() const { return queue_.size(); }
+
+  /**
+   * Submit an op to the AsyncIO queue.  The op will be queued until
+   * the AsyncIO object has room.
+   */
+  void submit(AsyncIOOp* op);
+
+  /**
+   * Submit a delayed op to the AsyncIO queue; this allows you to postpone
+   * creation of the Op (which may require allocating memory, etc) until
+   * the AsyncIO object has room.
+   */
+  typedef std::function<AsyncIOOp*()> OpFactory;
+  void submit(OpFactory op);
  private:
-  explicit CallbackOp(Callback&& callback);
-  ~CallbackOp();
-  void onCompleted() FOLLY_OVERRIDE;
+  void onCompleted(AsyncIOOp* op);
+  void maybeDequeue();
+
+  AsyncIO* asyncIO_;
 
-  Callback callback_;
+  std::deque<OpFactory> queue_;
 };
 
 }  // namespace folly
index c8601309b30b4dab48292f427e6af87bb566a599..a513913dadab8be6155b38bf35647d0a4200d081 100644 (file)
@@ -36,6 +36,7 @@
 
 namespace fs = folly::fs;
 using folly::AsyncIO;
+using folly::AsyncIOQueue;
 
 namespace {
 
@@ -116,6 +117,14 @@ TemporaryFile::~TemporaryFile() {
 
 TemporaryFile tempFile(6 << 20);  // 6MiB
 
+typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+ManagedBuffer allocateAligned(size_t size) {
+  void* buf;
+  int rc = posix_memalign(&buf, 512, size);
+  CHECK_EQ(rc, 0) << strerror(rc);
+  return ManagedBuffer(reinterpret_cast<char*>(buf), free);
+}
+
 void testReadsSerially(const std::vector<TestSpec>& specs,
                        AsyncIO::PollMode pollMode) {
   AsyncIO aioReader(1, pollMode);
@@ -127,8 +136,9 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
   };
 
   for (int i = 0; i < specs.size(); i++) {
-    std::unique_ptr<char[]> buf(new char[specs[i].size]);
-    aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
+    auto buf = allocateAligned(specs[i].size);
+    op.pread(fd, buf.get(), specs[i].size, specs[i].start);
+    aioReader.submit(&op);
     EXPECT_EQ(aioReader.pending(), 1);
     auto ops = readerWait(&aioReader);
     EXPECT_EQ(1, ops.size());
@@ -145,7 +155,7 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
                        AsyncIO::PollMode pollMode) {
   AsyncIO aioReader(specs.size(), pollMode);
   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
-  std::vector<std::unique_ptr<char[]>> bufs(specs.size());
+  std::vector<ManagedBuffer> bufs;
 
   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
   PCHECK(fd != -1);
@@ -153,9 +163,9 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
     ::close(fd);
   };
   for (int i = 0; i < specs.size(); i++) {
-    bufs[i].reset(new char[specs[i].size]);
-    aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
-                    specs[i].start);
+    bufs.push_back(allocateAligned(specs[i].size));
+    ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+    aioReader.submit(&ops[i]);
   }
   std::vector<bool> pending(specs.size(), true);
 
@@ -184,10 +194,63 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
   }
 }
 
+void testReadsQueued(const std::vector<TestSpec>& specs,
+                     AsyncIO::PollMode pollMode) {
+  size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
+  AsyncIO aioReader(readerCapacity, pollMode);
+  AsyncIOQueue aioQueue(&aioReader);
+  std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
+  std::vector<ManagedBuffer> bufs;
+
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+  for (int i = 0; i < specs.size(); i++) {
+    bufs.push_back(allocateAligned(specs[i].size));
+    ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+    aioQueue.submit(&ops[i]);
+  }
+  std::vector<bool> pending(specs.size(), true);
+
+  size_t remaining = specs.size();
+  while (remaining != 0) {
+    if (remaining >= readerCapacity) {
+      EXPECT_EQ(readerCapacity, aioReader.pending());
+      EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
+    } else {
+      EXPECT_EQ(remaining, aioReader.pending());
+      EXPECT_EQ(0, aioQueue.queued());
+    }
+    auto completed = readerWait(&aioReader);
+    size_t nrRead = completed.size();
+    EXPECT_NE(nrRead, 0);
+    remaining -= nrRead;
+
+    for (int i = 0; i < nrRead; i++) {
+      int id = completed[i] - ops.get();
+      EXPECT_GE(id, 0);
+      EXPECT_LT(id, specs.size());
+      EXPECT_TRUE(pending[id]);
+      pending[id] = false;
+      ssize_t res = ops[id].result();
+      EXPECT_LE(0, res) << folly::errnoStr(-res);
+      EXPECT_EQ(specs[id].size, res);
+    }
+  }
+  EXPECT_EQ(aioReader.pending(), 0);
+  EXPECT_EQ(aioQueue.queued(), 0);
+  for (int i = 0; i < pending.size(); i++) {
+    EXPECT_FALSE(pending[i]);
+  }
+}
+
 void testReads(const std::vector<TestSpec>& specs,
                AsyncIO::PollMode pollMode) {
   testReadsSerially(specs, pollMode);
   testReadsParallel(specs, pollMode);
+  testReadsQueued(specs, pollMode);
 }
 
 }  // anonymous namespace
@@ -275,8 +338,9 @@ TEST(AsyncIO, NonBlockingWait) {
     ::close(fd);
   };
   size_t size = 1024;
-  std::unique_ptr<char[]> buf(new char[size]);
-  aioReader.pread(&op, fd, buf.get(), size, 0);
+  auto buf = allocateAligned(size);
+  op.pread(fd, buf.get(), size, 0);
+  aioReader.submit(&op);
   EXPECT_EQ(aioReader.pending(), 1);
 
   folly::Range<AsyncIO::Op**> completed;
@@ -293,3 +357,4 @@ TEST(AsyncIO, NonBlockingWait) {
   EXPECT_EQ(aioReader.pending(), 0);
 }
 
+