CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
auto p = pending_.load(std::memory_order_acquire);
CHECK_LE(minRequests, p);
- doWait(WaitType::COMPLETE, minRequests, p, &completed_);
- return Range<Op**>(completed_.data(), completed_.size());
+ return doWait(WaitType::COMPLETE, minRequests, p, completed_);
}
-size_t AsyncIO::cancel() {
+Range<AsyncIO::Op**> AsyncIO::cancel() {
CHECK(ctx_);
auto p = pending_.load(std::memory_order_acquire);
- doWait(WaitType::CANCEL, p, p, nullptr);
- return p;
+ return doWait(WaitType::CANCEL, p, p, canceled_);
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
DCHECK_LE(numEvents, pending_);
// Don't reap more than numEvents, as we've just reset the counter to 0.
- doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_);
- return Range<Op**>(completed_.data(), completed_.size());
+ return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
}
-void AsyncIO::doWait(
+Range<AsyncIO::Op**> AsyncIO::doWait(
WaitType type,
size_t minRequests,
size_t maxRequests,
- std::vector<Op*>* result) {
+ std::vector<Op*>& result) {
io_event events[maxRequests];
// Unfortunately, Linux AIO doesn't implement io_cancel, so even for
} while (count < minRequests);
DCHECK_LE(count, maxRequests);
- if (result != nullptr) {
- result->clear();
- }
+ result.clear();
for (size_t i = 0; i < count; ++i) {
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
op->cancel();
break;
}
- if (result != nullptr) {
- result->push_back(op);
- }
+ result.push_back(op);
}
+
+ return range(result);
}
AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
Range<Op**> wait(size_t minRequests);
/**
- * Cancel all pending requests and return their number.
+ * Cancel all pending requests and return them; the returned range is
+ * valid until the next call to cancel().
*/
- size_t cancel();
+ Range<Op**> cancel();
/**
* Return the number of pending requests.
void initializeContext();
enum class WaitType { COMPLETE, CANCEL };
- void doWait(
+ Range<AsyncIO::Op**> doWait(
WaitType type,
size_t minRequests,
size_t maxRequests,
- std::vector<Op*>* result);
+ std::vector<Op*>& result);
io_context_t ctx_{nullptr};
std::atomic<bool> ctxSet_{false};
const size_t capacity_;
int pollFd_{-1};
std::vector<Op*> completed_;
+ std::vector<Op*> canceled_;
};
/**
}
TEST(AsyncIO, Cancel) {
- constexpr size_t kNumOps = 10;
+ constexpr size_t kNumOpsBatch1 = 10;
+ constexpr size_t kNumOpsBatch2 = 10;
- AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE);
+ AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
- std::vector<AsyncIO::Op> ops(kNumOps);
+ size_t completed = 0;
+
+ std::vector<std::unique_ptr<AsyncIO::Op>> ops;
std::vector<ManagedBuffer> bufs;
+ const auto schedule = [&](size_t n) {
+ for (size_t i = 0; i < n; ++i) {
+ const size_t size = 2 * kAlign;
+ bufs.push_back(allocateAligned(size));
- size_t completed = 0;
- for (auto& op : ops) {
- const size_t size = 2 * kAlign;
- bufs.push_back(allocateAligned(size));
- op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
- op.pread(fd, bufs.back().get(), size, 0);
- aioReader.submit(&op);
- }
+ ops.push_back(std::make_unique<AsyncIO::Op>());
+ auto& op = *ops.back();
- EXPECT_EQ(aioReader.pending(), kNumOps);
+ op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
+ op.pread(fd, bufs.back().get(), size, 0);
+ aioReader.submit(&op);
+ }
+ };
+
+ // Mix completed and canceled operations for this test.
+ // In order to achieve that, schedule in two batches and do partial
+ // wait() after the first one.
+
+ schedule(kNumOpsBatch1);
+ EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
EXPECT_EQ(completed, 0);
- {
- auto result = aioReader.wait(1);
- EXPECT_EQ(result.size(), 1);
- }
- EXPECT_EQ(completed, 1);
- EXPECT_EQ(aioReader.pending(), kNumOps - 1);
+ auto result = aioReader.wait(1);
+ EXPECT_GE(result.size(), 1);
+ EXPECT_EQ(completed, result.size());
+ EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
+
+ schedule(kNumOpsBatch2);
+ EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
+ EXPECT_EQ(completed, result.size());
- EXPECT_EQ(aioReader.cancel(), kNumOps - 1);
+ auto canceled = aioReader.cancel();
+ EXPECT_EQ(canceled.size(), ops.size() - result.size());
EXPECT_EQ(aioReader.pending(), 0);
- EXPECT_EQ(completed, 1);
+ EXPECT_EQ(completed, result.size());
- completed = 0;
+ size_t foundCompleted = 0;
for (auto& op : ops) {
- if (op.state() == AsyncIOOp::State::COMPLETED) {
- ++completed;
+ if (op->state() == AsyncIOOp::State::COMPLETED) {
+ ++foundCompleted;
} else {
- EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op;
+ EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
}
}
- EXPECT_EQ(completed, 1);
+ EXPECT_EQ(foundCompleted, completed);
}