From: Shao-Chuan Wang Date: Tue, 16 Sep 2014 05:56:18 +0000 (-0700) Subject: Adding IO stats in AsyncIO. X-Git-Tag: v0.22.0~361 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=28f7ed4fd1e1939bb8dfcf5bf139d8edb0594345;p=folly.git Adding IO stats in AsyncIO. Summary: It would be great if we have IO stats tracking built within AsyncIO. It would enable upper layer application to better track the number of I/O that was submitted and completed Test Plan: $ fbmake runtests Test Results Summary: Passed: 1734 100% successful Reviewed By: philipp@fb.com Subscribers: njormrod, schen, stanislav FB internal diff: D1557472 Tasks: 5008299 --- diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp index 7e7be271..766da907 100644 --- a/folly/experimental/io/AsyncIO.cpp +++ b/folly/experimental/io/AsyncIO.cpp @@ -108,6 +108,7 @@ AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : ctx_(0), ctxSet_(false), pending_(0), + submitted_(0), capacity_(capacity), pollFd_(-1) { CHECK_GT(capacity_, 0); @@ -130,7 +131,7 @@ AsyncIO::~AsyncIO() { } void AsyncIO::decrementPending() { - ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel); + auto p = pending_.fetch_add(-1, std::memory_order_acq_rel); DCHECK_GE(p, 1); } @@ -168,7 +169,7 @@ void AsyncIO::submit(Op* op) { initializeContext(); // on demand // We can increment past capacity, but we'll clean up after ourselves. - ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel); + auto p = pending_.fetch_add(1, std::memory_order_acq_rel); if (p >= capacity_) { decrementPending(); throw std::range_error("AsyncIO: too many pending requests"); @@ -183,6 +184,7 @@ void AsyncIO::submit(Op* op) { decrementPending(); throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed"); } + submitted_++; DCHECK_EQ(rc, 1); op->start(); } @@ -190,7 +192,7 @@ void AsyncIO::submit(Op* op) { Range AsyncIO::wait(size_t minRequests) { CHECK(ctx_); CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object"; - ssize_t p = pending_.load(std::memory_order_acquire); + auto p = pending_.load(std::memory_order_acquire); CHECK_LE(minRequests, p); return doWait(minRequests, p); } diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h index 6b629326..ae1f4531 100644 --- a/folly/experimental/io/AsyncIO.h +++ b/folly/experimental/io/AsyncIO.h @@ -168,6 +168,12 @@ class AsyncIO : private boost::noncopyable { */ size_t capacity() const { return capacity_; } + /** + * Return the accumulative number of submitted I/O, since this object + * has been created. + */ + size_t totalSubmits() const { return submitted_; } + /** * If POLLABLE, return a file descriptor that can be passed to poll / epoll * and will become readable when any async IO operations have completed. @@ -197,8 +203,9 @@ class AsyncIO : private boost::noncopyable { std::atomic ctxSet_; std::mutex initMutex_; - std::atomic pending_; - const ssize_t capacity_; + std::atomic pending_; + std::atomic submitted_; + const size_t capacity_; int pollFd_; std::vector completed_; }; diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp index 7025eff5..dabb250a 100644 --- a/folly/experimental/io/test/AsyncIOTest.cpp +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -140,6 +140,7 @@ void testReadsSerially(const std::vector& specs, auto buf = allocateAligned(specs[i].size); op.pread(fd, buf.get(), specs[i].size, specs[i].start); aioReader.submit(&op); + EXPECT_EQ((i + 1), aioReader.totalSubmits()); EXPECT_EQ(aioReader.pending(), 1); auto ops = readerWait(&aioReader); EXPECT_EQ(1, ops.size()); @@ -208,6 +209,8 @@ void testReadsParallel(const std::vector& specs, EXPECT_EQ(specs[id].size, res); } } + EXPECT_EQ(specs.size(), aioReader.totalSubmits()); + EXPECT_EQ(aioReader.pending(), 0); for (int i = 0; i < pending.size(); i++) { EXPECT_FALSE(pending[i]); @@ -259,6 +262,7 @@ void testReadsQueued(const std::vector& specs, EXPECT_EQ(specs[id].size, res); } } + EXPECT_EQ(specs.size(), aioReader.totalSubmits()); EXPECT_EQ(aioReader.pending(), 0); EXPECT_EQ(aioQueue.queued(), 0); for (int i = 0; i < pending.size(); i++) {