return range from AsyncIO::cancel(), fix test
authorPhilip Pronin <philipp@fb.com>
Fri, 12 May 2017 23:07:57 +0000 (16:07 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 12 May 2017 23:19:53 +0000 (16:19 -0700)
Summary:
Return not just number of cancelled ops, but all of them as well.
Test was incorrectly assuming `wait(1)` will return exactly one operation, fix
that as well.

Reviewed By: ot

Differential Revision: D5054684

fbshipit-source-id: 1c53c3f7ba855d1fcfeac8b1b27f90f0872d2c21

folly/experimental/io/AsyncIO.cpp
folly/experimental/io/AsyncIO.h
folly/experimental/io/test/AsyncIOTest.cpp

index e5674bff9e1915fd2eaf01126fb10646e4cff786..0b51128aed74d846f80f3026baf0a7fc894776fc 100644 (file)
@@ -193,15 +193,13 @@ Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
   auto p = pending_.load(std::memory_order_acquire);
   CHECK_LE(minRequests, p);
-  doWait(WaitType::COMPLETE, minRequests, p, &completed_);
-  return Range<Op**>(completed_.data(), completed_.size());
+  return doWait(WaitType::COMPLETE, minRequests, p, completed_);
 }
 
-size_t AsyncIO::cancel() {
+Range<AsyncIO::Op**> AsyncIO::cancel() {
   CHECK(ctx_);
   auto p = pending_.load(std::memory_order_acquire);
-  doWait(WaitType::CANCEL, p, p, nullptr);
-  return p;
+  return doWait(WaitType::CANCEL, p, p, canceled_);
 }
 
 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
@@ -224,15 +222,14 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
   DCHECK_LE(numEvents, pending_);
 
   // Don't reap more than numEvents, as we've just reset the counter to 0.
-  doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_);
-  return Range<Op**>(completed_.data(), completed_.size());
+  return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
 }
 
-void AsyncIO::doWait(
+Range<AsyncIO::Op**> AsyncIO::doWait(
     WaitType type,
     size_t minRequests,
     size_t maxRequests,
-    std::vector<Op*>* result) {
+    std::vector<Op*>& result) {
   io_event events[maxRequests];
 
   // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
@@ -257,9 +254,7 @@ void AsyncIO::doWait(
   } while (count < minRequests);
   DCHECK_LE(count, maxRequests);
 
-  if (result != nullptr) {
-    result->clear();
-  }
+  result.clear();
   for (size_t i = 0; i < count; ++i) {
     DCHECK(events[i].obj);
     Op* op = boost::intrusive::get_parent_from_member(
@@ -273,10 +268,10 @@ void AsyncIO::doWait(
         op->cancel();
         break;
     }
-    if (result != nullptr) {
-      result->push_back(op);
-    }
+    result.push_back(op);
   }
+
+  return range(result);
 }
 
 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
index 6702c53fda07ab6423c53cd3fbe80b19a3b934fe..3af437a0b00ab5ef6d6feaee1dbc612ea454d1ea 100644 (file)
@@ -156,9 +156,10 @@ class AsyncIO : private boost::noncopyable {
   Range<Op**> wait(size_t minRequests);
 
   /**
-   * Cancel all pending requests and return their number.
+   * Cancel all pending requests and return them; the returned range is
+   * valid until the next call to cancel().
    */
-  size_t cancel();
+  Range<Op**> cancel();
 
   /**
    * Return the number of pending requests.
@@ -201,11 +202,11 @@ class AsyncIO : private boost::noncopyable {
   void initializeContext();
 
   enum class WaitType { COMPLETE, CANCEL };
-  void doWait(
+  Range<AsyncIO::Op**> doWait(
       WaitType type,
       size_t minRequests,
       size_t maxRequests,
-      std::vector<Op*>* result);
+      std::vector<Op*>& result);
 
   io_context_t ctx_{nullptr};
   std::atomic<bool> ctxSet_{false};
@@ -216,6 +217,7 @@ class AsyncIO : private boost::noncopyable {
   const size_t capacity_;
   int pollFd_{-1};
   std::vector<Op*> completed_;
+  std::vector<Op*> canceled_;
 };
 
 /**
index 7c776ef05b7f094cde60c704affe88280dd4e9d5..d7b3273224af0047f7db824e63b64c4b295f7ed4 100644 (file)
@@ -393,48 +393,63 @@ TEST(AsyncIO, NonBlockingWait) {
 }
 
 TEST(AsyncIO, Cancel) {
-  constexpr size_t kNumOps = 10;
+  constexpr size_t kNumOpsBatch1 = 10;
+  constexpr size_t kNumOpsBatch2 = 10;
 
-  AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE);
+  AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
   PCHECK(fd != -1);
   SCOPE_EXIT {
     ::close(fd);
   };
 
-  std::vector<AsyncIO::Op> ops(kNumOps);
+  size_t completed = 0;
+
+  std::vector<std::unique_ptr<AsyncIO::Op>> ops;
   std::vector<ManagedBuffer> bufs;
+  const auto schedule = [&](size_t n) {
+    for (size_t i = 0; i < n; ++i) {
+      const size_t size = 2 * kAlign;
+      bufs.push_back(allocateAligned(size));
 
-  size_t completed = 0;
-  for (auto& op : ops) {
-    const size_t size = 2 * kAlign;
-    bufs.push_back(allocateAligned(size));
-    op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
-    op.pread(fd, bufs.back().get(), size, 0);
-    aioReader.submit(&op);
-  }
+      ops.push_back(std::make_unique<AsyncIO::Op>());
+      auto& op = *ops.back();
 
-  EXPECT_EQ(aioReader.pending(), kNumOps);
+      op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
+      op.pread(fd, bufs.back().get(), size, 0);
+      aioReader.submit(&op);
+    }
+  };
+
+  // Mix completed and canceled operations for this test.
+  // In order to achieve that, schedule in two batches and do partial
+  // wait() after the first one.
+
+  schedule(kNumOpsBatch1);
+  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
   EXPECT_EQ(completed, 0);
 
-  {
-    auto result = aioReader.wait(1);
-    EXPECT_EQ(result.size(), 1);
-  }
-  EXPECT_EQ(completed, 1);
-  EXPECT_EQ(aioReader.pending(), kNumOps - 1);
+  auto result = aioReader.wait(1);
+  EXPECT_GE(result.size(), 1);
+  EXPECT_EQ(completed, result.size());
+  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
+
+  schedule(kNumOpsBatch2);
+  EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
+  EXPECT_EQ(completed, result.size());
 
-  EXPECT_EQ(aioReader.cancel(), kNumOps - 1);
+  auto canceled = aioReader.cancel();
+  EXPECT_EQ(canceled.size(), ops.size() - result.size());
   EXPECT_EQ(aioReader.pending(), 0);
-  EXPECT_EQ(completed, 1);
+  EXPECT_EQ(completed, result.size());
 
-  completed = 0;
+  size_t foundCompleted = 0;
   for (auto& op : ops) {
-    if (op.state() == AsyncIOOp::State::COMPLETED) {
-      ++completed;
+    if (op->state() == AsyncIOOp::State::COMPLETED) {
+      ++foundCompleted;
     } else {
-      EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op;
+      EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
     }
   }
-  EXPECT_EQ(completed, 1);
+  EXPECT_EQ(foundCompleted, completed);
 }