AsyncIO::cancel
authorPhilip Pronin <philipp@fb.com>
Thu, 11 May 2017 19:51:11 +0000 (12:51 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 11 May 2017 20:07:37 +0000 (13:07 -0700)
Summary:
It should be implemented with `io_cancel`, but it is not
supported (D682836), so still have to drain events, ignoring only
op callbacks.

Reviewed By: luciang, ot

Differential Revision: D5044020

fbshipit-source-id: 0bcd04c91a437fccaf2189ccf771a1cb61c68942

folly/experimental/io/AsyncIO.cpp
folly/experimental/io/AsyncIO.h
folly/experimental/io/test/AsyncIOTest.cpp

index 9128d8b3d88b876b4f8190762f5ed1c16322b98a..e5674bff9e1915fd2eaf01126fb10646e4cff786 100644 (file)
@@ -66,6 +66,11 @@ void AsyncIOOp::complete(ssize_t result) {
   }
 }
 
+void AsyncIOOp::cancel() {
+  DCHECK_EQ(state_, State::PENDING);
+  state_ = State::CANCELED;
+}
+
 ssize_t AsyncIOOp::result() const {
   CHECK_EQ(state_, State::COMPLETED);
   return result_;
@@ -104,13 +109,7 @@ void AsyncIOOp::init() {
   state_ = State::INITIALIZED;
 }
 
-AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
-  : ctx_(0),
-    ctxSet_(false),
-    pending_(0),
-    submitted_(0),
-    capacity_(capacity),
-    pollFd_(-1) {
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
   CHECK_GT(capacity_, 0);
   completed_.reserve(capacity_);
   if (pollMode == POLLABLE) {
@@ -194,7 +193,15 @@ Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
   auto p = pending_.load(std::memory_order_acquire);
   CHECK_LE(minRequests, p);
-  return doWait(minRequests, p);
+  doWait(WaitType::COMPLETE, minRequests, p, &completed_);
+  return Range<Op**>(completed_.data(), completed_.size());
+}
+
+size_t AsyncIO::cancel() {
+  CHECK(ctx_);
+  auto p = pending_.load(std::memory_order_acquire);
+  doWait(WaitType::CANCEL, p, p, nullptr);
+  return p;
 }
 
 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
@@ -217,12 +224,19 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
   DCHECK_LE(numEvents, pending_);
 
   // Don't reap more than numEvents, as we've just reset the counter to 0.
-  return doWait(numEvents, numEvents);
+  doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_);
+  return Range<Op**>(completed_.data(), completed_.size());
 }
 
-Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
+void AsyncIO::doWait(
+    WaitType type,
+    size_t minRequests,
+    size_t maxRequests,
+    std::vector<Op*>* result) {
   io_event events[maxRequests];
 
+  // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
+  // WaitType::CANCEL we have to wait for IO completion.
   size_t count = 0;
   do {
     int ret;
@@ -237,27 +251,32 @@ Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
                          /* timeout */ nullptr);  // wait forever
     } while (ret == -EINTR);
     // Check as may not be able to recover without leaking events.
-    CHECK_GE(ret, 0)
-      << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
+    CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
+                     << errnoStr(-ret);
     count += ret;
   } while (count < minRequests);
   DCHECK_LE(count, maxRequests);
 
-  completed_.clear();
-  if (count == 0) {
-    return folly::Range<Op**>();
+  if (result != nullptr) {
+    result->clear();
   }
-
   for (size_t i = 0; i < count; ++i) {
     DCHECK(events[i].obj);
     Op* op = boost::intrusive::get_parent_from_member(
         events[i].obj, &AsyncIOOp::iocb_);
     decrementPending();
-    op->complete(events[i].res);
-    completed_.push_back(op);
+    switch (type) {
+      case WaitType::COMPLETE:
+        op->complete(events[i].res);
+        break;
+      case WaitType::CANCEL:
+        op->cancel();
+        break;
+    }
+    if (result != nullptr) {
+      result->push_back(op);
+    }
   }
-
-  return folly::Range<Op**>(&completed_.front(), count);
 }
 
 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
@@ -308,6 +327,7 @@ const char* asyncIoOpStateToString(AsyncIOOp::State state) {
     X(AsyncIOOp::State::INITIALIZED);
     X(AsyncIOOp::State::PENDING);
     X(AsyncIOOp::State::COMPLETED);
+    X(AsyncIOOp::State::CANCELED);
   }
   return "<INVALID AsyncIOOp::State>";
 }
index faa5e27fc4b4e075d36f023bba1e5d72cae15dcc..6702c53fda07ab6423c53cd3fbe80b19a3b934fe 100644 (file)
@@ -40,25 +40,24 @@ namespace folly {
  * An AsyncIOOp represents a pending operation.  You may set a notification
  * callback or you may use this class's methods directly.
  *
- * The op must remain allocated until completion.
+ * The op must remain allocated until it is completed or canceled.
  */
 class AsyncIOOp : private boost::noncopyable {
   friend class AsyncIO;
   friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
+
  public:
   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
 
   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
   ~AsyncIOOp();
 
-  // There would be a cancel() method here if Linux AIO actually implemented
-  // it.  But let's not get your hopes up.
-
   enum class State {
     UNINITIALIZED,
     INITIALIZED,
     PENDING,
-    COMPLETED
+    COMPLETED,
+    CANCELED,
   };
 
   /**
@@ -95,8 +94,7 @@ class AsyncIOOp : private boost::noncopyable {
    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
    * throw a std::system_error in case of error instead.
    *
-   * It is an error to call this if the Op hasn't yet started or is still
-   * pending.
+   * It is an error to call this if the Op hasn't completed.
    */
   ssize_t result() const;
 
@@ -104,6 +102,7 @@ class AsyncIOOp : private boost::noncopyable {
   void init();
   void start();
   void complete(ssize_t result);
+  void cancel();
 
   NotificationCallback cb_;
   iocb iocb_;
@@ -123,7 +122,7 @@ class AsyncIO : private boost::noncopyable {
 
   enum PollMode {
     NOT_POLLABLE,
-    POLLABLE
+    POLLABLE,
   };
 
   /**
@@ -141,12 +140,12 @@ class AsyncIO : private boost::noncopyable {
    * file descriptor directly.
    *
    * You may use the same AsyncIO object from multiple threads, as long as
-   * there is only one concurrent caller of wait() / pollCompleted() (perhaps
-   * by always calling it from the same thread, or by providing appropriate
-   * mutual exclusion)  In this case, pending() returns a snapshot
+   * there is only one concurrent caller of wait() / pollCompleted() / cancel()
+   * (perhaps by always calling it from the same thread, or by providing
+   * appropriate mutual exclusion).  In this case, pending() returns a snapshot
    * of the current number of pending requests.
    */
-  explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
+  explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
   ~AsyncIO();
 
   /**
@@ -156,6 +155,11 @@ class AsyncIO : private boost::noncopyable {
    */
   Range<Op**> wait(size_t minRequests);
 
+  /**
+   * Cancel all pending requests and return their number.
+   */
+  size_t cancel();
+
   /**
    * Return the number of pending requests.
    */
@@ -196,16 +200,21 @@ class AsyncIO : private boost::noncopyable {
   void decrementPending();
   void initializeContext();
 
-  Range<Op**> doWait(size_t minRequests, size_t maxRequests);
+  enum class WaitType { COMPLETE, CANCEL };
+  void doWait(
+      WaitType type,
+      size_t minRequests,
+      size_t maxRequests,
+      std::vector<Op*>* result);
 
-  io_context_t ctx_;
-  std::atomic<bool> ctxSet_;
+  io_context_t ctx_{nullptr};
+  std::atomic<bool> ctxSet_{false};
   std::mutex initMutex_;
 
-  std::atomic<size_t> pending_;
-  std::atomic<size_t> submitted_;
+  std::atomic<size_t> pending_{0};
+  std::atomic<size_t> submitted_{0};
   const size_t capacity_;
-  int pollFd_;
+  int pollFd_{-1};
   std::vector<Op*> completed_;
 };
 
index 39e51a1d2729a250024a4cbcc35164c23f74384c..7c776ef05b7f094cde60c704affe88280dd4e9d5 100644 (file)
@@ -36,7 +36,9 @@
 #include <folly/portability/Sockets.h>
 
 namespace fs = folly::fs;
+
 using folly::AsyncIO;
+using folly::AsyncIOOp;
 using folly::AsyncIOQueue;
 
 namespace {
@@ -85,7 +87,7 @@ class TemporaryFile {
 };
 
 TemporaryFile::TemporaryFile(size_t size)
-  : path_(fs::temp_directory_path() / fs::unique_path()) {
+    : path_(fs::temp_directory_path() / fs::unique_path()) {
   CHECK_EQ(size % sizeof(uint32_t), 0);
   size /= sizeof(uint32_t);
   const uint32_t seed = 42;
@@ -370,7 +372,7 @@ TEST(AsyncIO, NonBlockingWait) {
   SCOPE_EXIT {
     ::close(fd);
   };
-  size_t size = 2*kAlign;
+  size_t size = 2 * kAlign;
   auto buf = allocateAligned(size);
   op.pread(fd, buf.get(), size, 0);
   aioReader.submit(&op);
@@ -389,3 +391,50 @@ TEST(AsyncIO, NonBlockingWait) {
   EXPECT_EQ(size, res);
   EXPECT_EQ(aioReader.pending(), 0);
 }
+
+TEST(AsyncIO, Cancel) {
+  constexpr size_t kNumOps = 10;
+
+  AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE);
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+
+  std::vector<AsyncIO::Op> ops(kNumOps);
+  std::vector<ManagedBuffer> bufs;
+
+  size_t completed = 0;
+  for (auto& op : ops) {
+    const size_t size = 2 * kAlign;
+    bufs.push_back(allocateAligned(size));
+    op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
+    op.pread(fd, bufs.back().get(), size, 0);
+    aioReader.submit(&op);
+  }
+
+  EXPECT_EQ(aioReader.pending(), kNumOps);
+  EXPECT_EQ(completed, 0);
+
+  {
+    auto result = aioReader.wait(1);
+    EXPECT_EQ(result.size(), 1);
+  }
+  EXPECT_EQ(completed, 1);
+  EXPECT_EQ(aioReader.pending(), kNumOps - 1);
+
+  EXPECT_EQ(aioReader.cancel(), kNumOps - 1);
+  EXPECT_EQ(aioReader.pending(), 0);
+  EXPECT_EQ(completed, 1);
+
+  completed = 0;
+  for (auto& op : ops) {
+    if (op.state() == AsyncIOOp::State::COMPLETED) {
+      ++completed;
+    } else {
+      EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op;
+    }
+  }
+  EXPECT_EQ(completed, 1);
+}