From: Philip Pronin Date: Fri, 12 May 2017 23:07:57 +0000 (-0700) Subject: return range from AsyncIO::cancel(), fix test X-Git-Tag: v2017.05.15.00~1 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=7728c4b3b4a0122c2728e3ef548aa4d356ba7adf;p=folly.git return range from AsyncIO::cancel(), fix test Summary: Return not just number of cancelled ops, but all of them as well. Test was incorrectly assuming `wait(1)` will return exactly one operation, fix that as well. Reviewed By: ot Differential Revision: D5054684 fbshipit-source-id: 1c53c3f7ba855d1fcfeac8b1b27f90f0872d2c21 --- diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp index e5674bff..0b51128a 100644 --- a/folly/experimental/io/AsyncIO.cpp +++ b/folly/experimental/io/AsyncIO.cpp @@ -193,15 +193,13 @@ Range AsyncIO::wait(size_t minRequests) { CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object"; auto p = pending_.load(std::memory_order_acquire); CHECK_LE(minRequests, p); - doWait(WaitType::COMPLETE, minRequests, p, &completed_); - return Range(completed_.data(), completed_.size()); + return doWait(WaitType::COMPLETE, minRequests, p, completed_); } -size_t AsyncIO::cancel() { +Range AsyncIO::cancel() { CHECK(ctx_); auto p = pending_.load(std::memory_order_acquire); - doWait(WaitType::CANCEL, p, p, nullptr); - return p; + return doWait(WaitType::CANCEL, p, p, canceled_); } Range AsyncIO::pollCompleted() { @@ -224,15 +222,14 @@ Range AsyncIO::pollCompleted() { DCHECK_LE(numEvents, pending_); // Don't reap more than numEvents, as we've just reset the counter to 0. - doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_); - return Range(completed_.data(), completed_.size()); + return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_); } -void AsyncIO::doWait( +Range AsyncIO::doWait( WaitType type, size_t minRequests, size_t maxRequests, - std::vector* result) { + std::vector& result) { io_event events[maxRequests]; // Unfortunately, Linux AIO doesn't implement io_cancel, so even for @@ -257,9 +254,7 @@ void AsyncIO::doWait( } while (count < minRequests); DCHECK_LE(count, maxRequests); - if (result != nullptr) { - result->clear(); - } + result.clear(); for (size_t i = 0; i < count; ++i) { DCHECK(events[i].obj); Op* op = boost::intrusive::get_parent_from_member( @@ -273,10 +268,10 @@ void AsyncIO::doWait( op->cancel(); break; } - if (result != nullptr) { - result->push_back(op); - } + result.push_back(op); } + + return range(result); } AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h index 6702c53f..3af437a0 100644 --- a/folly/experimental/io/AsyncIO.h +++ b/folly/experimental/io/AsyncIO.h @@ -156,9 +156,10 @@ class AsyncIO : private boost::noncopyable { Range wait(size_t minRequests); /** - * Cancel all pending requests and return their number. + * Cancel all pending requests and return them; the returned range is + * valid until the next call to cancel(). */ - size_t cancel(); + Range cancel(); /** * Return the number of pending requests. @@ -201,11 +202,11 @@ class AsyncIO : private boost::noncopyable { void initializeContext(); enum class WaitType { COMPLETE, CANCEL }; - void doWait( + Range doWait( WaitType type, size_t minRequests, size_t maxRequests, - std::vector* result); + std::vector& result); io_context_t ctx_{nullptr}; std::atomic ctxSet_{false}; @@ -216,6 +217,7 @@ class AsyncIO : private boost::noncopyable { const size_t capacity_; int pollFd_{-1}; std::vector completed_; + std::vector canceled_; }; /** diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp index 7c776ef0..d7b32732 100644 --- a/folly/experimental/io/test/AsyncIOTest.cpp +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -393,48 +393,63 @@ TEST(AsyncIO, NonBlockingWait) { } TEST(AsyncIO, Cancel) { - constexpr size_t kNumOps = 10; + constexpr size_t kNumOpsBatch1 = 10; + constexpr size_t kNumOpsBatch2 = 10; - AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE); + AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); PCHECK(fd != -1); SCOPE_EXIT { ::close(fd); }; - std::vector ops(kNumOps); + size_t completed = 0; + + std::vector> ops; std::vector bufs; + const auto schedule = [&](size_t n) { + for (size_t i = 0; i < n; ++i) { + const size_t size = 2 * kAlign; + bufs.push_back(allocateAligned(size)); - size_t completed = 0; - for (auto& op : ops) { - const size_t size = 2 * kAlign; - bufs.push_back(allocateAligned(size)); - op.setNotificationCallback([&](AsyncIOOp*) { ++completed; }); - op.pread(fd, bufs.back().get(), size, 0); - aioReader.submit(&op); - } + ops.push_back(std::make_unique()); + auto& op = *ops.back(); - EXPECT_EQ(aioReader.pending(), kNumOps); + op.setNotificationCallback([&](AsyncIOOp*) { ++completed; }); + op.pread(fd, bufs.back().get(), size, 0); + aioReader.submit(&op); + } + }; + + // Mix completed and canceled operations for this test. + // In order to achieve that, schedule in two batches and do partial + // wait() after the first one. + + schedule(kNumOpsBatch1); + EXPECT_EQ(aioReader.pending(), kNumOpsBatch1); EXPECT_EQ(completed, 0); - { - auto result = aioReader.wait(1); - EXPECT_EQ(result.size(), 1); - } - EXPECT_EQ(completed, 1); - EXPECT_EQ(aioReader.pending(), kNumOps - 1); + auto result = aioReader.wait(1); + EXPECT_GE(result.size(), 1); + EXPECT_EQ(completed, result.size()); + EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size()); + + schedule(kNumOpsBatch2); + EXPECT_EQ(aioReader.pending(), ops.size() - result.size()); + EXPECT_EQ(completed, result.size()); - EXPECT_EQ(aioReader.cancel(), kNumOps - 1); + auto canceled = aioReader.cancel(); + EXPECT_EQ(canceled.size(), ops.size() - result.size()); EXPECT_EQ(aioReader.pending(), 0); - EXPECT_EQ(completed, 1); + EXPECT_EQ(completed, result.size()); - completed = 0; + size_t foundCompleted = 0; for (auto& op : ops) { - if (op.state() == AsyncIOOp::State::COMPLETED) { - ++completed; + if (op->state() == AsyncIOOp::State::COMPLETED) { + ++foundCompleted; } else { - EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op; + EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op; } } - EXPECT_EQ(completed, 1); + EXPECT_EQ(foundCompleted, completed); }