X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fexperimental%2Fio%2Ftest%2FAsyncIOTest.cpp;h=f4ee54b6b91b464d1ad70c8993b2c8a24bacd94a;hb=46de7098b46d000be284450d582c14608cab6f93;hp=77595c66be7b3aa1d3b9c30c0275ddb6319ffc09;hpb=8f45b8d51e9fe5cab83d5e62aeae24aa19c3ae80;p=folly.git diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp index 77595c66..f4ee54b6 100644 --- a/folly/experimental/io/test/AsyncIOTest.cpp +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -36,10 +37,11 @@ namespace fs = folly::fs; using folly::AsyncIO; +using folly::AsyncIOQueue; namespace { -constexpr size_t kAlignment = 512; // align reads to 512 B (for O_DIRECT) +constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT) struct TestSpec { off_t start; @@ -114,21 +116,30 @@ TemporaryFile::~TemporaryFile() { } } -TemporaryFile thisBinary(6 << 20); // 6MiB +TemporaryFile tempFile(6 << 20); // 6MiB + +typedef std::unique_ptr ManagedBuffer; +ManagedBuffer allocateAligned(size_t size) { + void* buf; + int rc = posix_memalign(&buf, kAlign, size); + CHECK_EQ(rc, 0) << strerror(rc); + return ManagedBuffer(reinterpret_cast(buf), free); +} void testReadsSerially(const std::vector& specs, AsyncIO::PollMode pollMode) { AsyncIO aioReader(1, pollMode); AsyncIO::Op op; - int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY); + int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); PCHECK(fd != -1); SCOPE_EXIT { ::close(fd); }; for (int i = 0; i < specs.size(); i++) { - std::unique_ptr buf(new char[specs[i].size]); - aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start); + auto buf = allocateAligned(specs[i].size); + op.pread(fd, buf.get(), specs[i].size, specs[i].start); + aioReader.submit(&op); EXPECT_EQ(aioReader.pending(), 1); auto ops = readerWait(&aioReader); EXPECT_EQ(1, ops.size()); @@ -142,20 +153,39 @@ void testReadsSerially(const std::vector& specs, } void testReadsParallel(const std::vector& specs, - AsyncIO::PollMode pollMode) { + AsyncIO::PollMode pollMode, + bool multithreaded) { AsyncIO aioReader(specs.size(), pollMode); std::unique_ptr ops(new AsyncIO::Op[specs.size()]); - std::vector> bufs(specs.size()); + std::vector bufs; + bufs.reserve(specs.size()); - int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY); + int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); PCHECK(fd != -1); SCOPE_EXIT { ::close(fd); }; + + std::vector threads; + if (multithreaded) { + threads.reserve(specs.size()); + } + for (int i = 0; i < specs.size(); i++) { + bufs.push_back(allocateAligned(specs[i].size)); + } + auto submit = [&] (int i) { + ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); + aioReader.submit(&ops[i]); + }; for (int i = 0; i < specs.size(); i++) { - bufs[i].reset(new char[specs[i].size]); - aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size, - specs[i].start); + if (multithreaded) { + threads.emplace_back([&submit, i] { submit(i); }); + } else { + submit(i); + } + } + for (auto& t : threads) { + t.join(); } std::vector pending(specs.size(), true); @@ -184,10 +214,64 @@ void testReadsParallel(const std::vector& specs, } } +void testReadsQueued(const std::vector& specs, + AsyncIO::PollMode pollMode) { + size_t readerCapacity = std::max(specs.size() / 2, size_t(1)); + AsyncIO aioReader(readerCapacity, pollMode); + AsyncIOQueue aioQueue(&aioReader); + std::unique_ptr ops(new AsyncIO::Op[specs.size()]); + std::vector bufs; + + int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); + PCHECK(fd != -1); + SCOPE_EXIT { + ::close(fd); + }; + for (int i = 0; i < specs.size(); i++) { + bufs.push_back(allocateAligned(specs[i].size)); + ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); + aioQueue.submit(&ops[i]); + } + std::vector pending(specs.size(), true); + + size_t remaining = specs.size(); + while (remaining != 0) { + if (remaining >= readerCapacity) { + EXPECT_EQ(readerCapacity, aioReader.pending()); + EXPECT_EQ(remaining - readerCapacity, aioQueue.queued()); + } else { + EXPECT_EQ(remaining, aioReader.pending()); + EXPECT_EQ(0, aioQueue.queued()); + } + auto completed = readerWait(&aioReader); + size_t nrRead = completed.size(); + EXPECT_NE(nrRead, 0); + remaining -= nrRead; + + for (int i = 0; i < nrRead; i++) { + int id = completed[i] - ops.get(); + EXPECT_GE(id, 0); + EXPECT_LT(id, specs.size()); + EXPECT_TRUE(pending[id]); + pending[id] = false; + ssize_t res = ops[id].result(); + EXPECT_LE(0, res) << folly::errnoStr(-res); + EXPECT_EQ(specs[id].size, res); + } + } + EXPECT_EQ(aioReader.pending(), 0); + EXPECT_EQ(aioQueue.queued(), 0); + for (int i = 0; i < pending.size(); i++) { + EXPECT_FALSE(pending[i]); + } +} + void testReads(const std::vector& specs, AsyncIO::PollMode pollMode) { testReadsSerially(specs, pollMode); - testReadsParallel(specs, pollMode); + testReadsParallel(specs, pollMode, false); + testReadsParallel(specs, pollMode, true); + testReadsQueued(specs, pollMode); } } // anonymous namespace @@ -201,56 +285,64 @@ TEST(AsyncIO, ZeroAsyncDataPollable) { } TEST(AsyncIO, SingleAsyncDataNotPollable) { - testReads({{0, 512}}, AsyncIO::NOT_POLLABLE); - testReads({{0, 512}}, AsyncIO::NOT_POLLABLE); + testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE); + testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE); } TEST(AsyncIO, SingleAsyncDataPollable) { - testReads({{0, 512}}, AsyncIO::POLLABLE); - testReads({{0, 512}}, AsyncIO::POLLABLE); + testReads({{0, kAlign}}, AsyncIO::POLLABLE); + testReads({{0, kAlign}}, AsyncIO::POLLABLE); } TEST(AsyncIO, MultipleAsyncDataNotPollable) { - testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE); - testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE); + testReads( + {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}}, + AsyncIO::NOT_POLLABLE); + testReads( + {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}}, + AsyncIO::NOT_POLLABLE); testReads({ {0, 5*1024*1024}, - {512, 5*1024*1024}, + {kAlign, 5*1024*1024} }, AsyncIO::NOT_POLLABLE); testReads({ - {512, 0}, - {512, 512}, - {512, 1024}, - {512, 10*1024}, - {512, 1024*1024}, + {kAlign, 0}, + {kAlign, kAlign}, + {kAlign, 2*kAlign}, + {kAlign, 20*kAlign}, + {kAlign, 1024*1024}, }, AsyncIO::NOT_POLLABLE); } TEST(AsyncIO, MultipleAsyncDataPollable) { - testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE); - testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE); + testReads( + {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}}, + AsyncIO::POLLABLE); + testReads( + {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}}, + AsyncIO::POLLABLE); testReads({ {0, 5*1024*1024}, - {512, 5*1024*1024}, - }, AsyncIO::POLLABLE); + {kAlign, 5*1024*1024} + }, AsyncIO::NOT_POLLABLE); testReads({ - {512, 0}, - {512, 512}, - {512, 1024}, - {512, 10*1024}, - {512, 1024*1024}, - }, AsyncIO::POLLABLE); + {kAlign, 0}, + {kAlign, kAlign}, + {kAlign, 2*kAlign}, + {kAlign, 20*kAlign}, + {kAlign, 1024*1024}, + }, AsyncIO::NOT_POLLABLE); } TEST(AsyncIO, ManyAsyncDataNotPollable) { { std::vector v; for (int i = 0; i < 1000; i++) { - v.push_back({512 * i, 512}); + v.push_back({off_t(kAlign * i), kAlign}); } testReads(v, AsyncIO::NOT_POLLABLE); } @@ -260,7 +352,7 @@ TEST(AsyncIO, ManyAsyncDataPollable) { { std::vector v; for (int i = 0; i < 1000; i++) { - v.push_back({512 * i, 512}); + v.push_back({off_t(kAlign * i), kAlign}); } testReads(v, AsyncIO::POLLABLE); } @@ -269,14 +361,15 @@ TEST(AsyncIO, ManyAsyncDataPollable) { TEST(AsyncIO, NonBlockingWait) { AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE); AsyncIO::Op op; - int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY); + int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); PCHECK(fd != -1); SCOPE_EXIT { ::close(fd); }; - size_t size = 1024; - std::unique_ptr buf(new char[size]); - aioReader.pread(&op, fd, buf.get(), size, 0); + size_t size = 2*kAlign; + auto buf = allocateAligned(size); + op.pread(fd, buf.get(), size, 0); + aioReader.submit(&op); EXPECT_EQ(aioReader.pending(), 1); folly::Range completed; @@ -293,3 +386,4 @@ TEST(AsyncIO, NonBlockingWait) { EXPECT_EQ(aioReader.pending(), 0); } +