Summary:
It would be great if we have IO stats tracking built within AsyncIO.
It would enable upper layer application to better track the number of I/O
that was submitted and completed
Test Plan:
$ fbmake runtests
Test Results Summary:
Passed: 1734
100% successful
Reviewed By: philipp@fb.com
Subscribers: njormrod, schen, stanislav
FB internal diff:
D1557472
Tasks:
5008299
: ctx_(0),
ctxSet_(false),
pending_(0),
+ submitted_(0),
capacity_(capacity),
pollFd_(-1) {
CHECK_GT(capacity_, 0);
}
void AsyncIO::decrementPending() {
- ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel);
+ auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
DCHECK_GE(p, 1);
}
initializeContext(); // on demand
// We can increment past capacity, but we'll clean up after ourselves.
- ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel);
+ auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
if (p >= capacity_) {
decrementPending();
throw std::range_error("AsyncIO: too many pending requests");
decrementPending();
throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
}
+ submitted_++;
DCHECK_EQ(rc, 1);
op->start();
}
Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
CHECK(ctx_);
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
- ssize_t p = pending_.load(std::memory_order_acquire);
+ auto p = pending_.load(std::memory_order_acquire);
CHECK_LE(minRequests, p);
return doWait(minRequests, p);
}
*/
size_t capacity() const { return capacity_; }
+ /**
+ * Return the accumulative number of submitted I/O, since this object
+ * has been created.
+ */
+ size_t totalSubmits() const { return submitted_; }
+
/**
* If POLLABLE, return a file descriptor that can be passed to poll / epoll
* and will become readable when any async IO operations have completed.
std::atomic<bool> ctxSet_;
std::mutex initMutex_;
- std::atomic<ssize_t> pending_;
- const ssize_t capacity_;
+ std::atomic<size_t> pending_;
+ std::atomic<size_t> submitted_;
+ const size_t capacity_;
int pollFd_;
std::vector<Op*> completed_;
};
auto buf = allocateAligned(specs[i].size);
op.pread(fd, buf.get(), specs[i].size, specs[i].start);
aioReader.submit(&op);
+ EXPECT_EQ((i + 1), aioReader.totalSubmits());
EXPECT_EQ(aioReader.pending(), 1);
auto ops = readerWait(&aioReader);
EXPECT_EQ(1, ops.size());
EXPECT_EQ(specs[id].size, res);
}
}
+ EXPECT_EQ(specs.size(), aioReader.totalSubmits());
+
EXPECT_EQ(aioReader.pending(), 0);
for (int i = 0; i < pending.size(); i++) {
EXPECT_FALSE(pending[i]);
EXPECT_EQ(specs[id].size, res);
}
}
+ EXPECT_EQ(specs.size(), aioReader.totalSubmits());
EXPECT_EQ(aioReader.pending(), 0);
EXPECT_EQ(aioQueue.queued(), 0);
for (int i = 0; i < pending.size(); i++) {