folly: accommodate use of -Wshadow in other projects
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
index 77595c66be7b3aa1d3b9c30c0275ddb6319ffc09..f4ee54b6b91b464d1ad70c8993b2c8a24bacd94a 100644 (file)
@@ -25,6 +25,7 @@
 #include <cstdio>
 #include <memory>
 #include <random>
+#include <thread>
 #include <vector>
 
 #include <glog/logging.h>
 
 namespace fs = folly::fs;
 using folly::AsyncIO;
+using folly::AsyncIOQueue;
 
 namespace {
 
-constexpr size_t kAlignment = 512;  // align reads to 512 B (for O_DIRECT)
+constexpr size_t kAlign = 4096;  // align reads to 4096 B (for O_DIRECT)
 
 struct TestSpec {
   off_t start;
@@ -114,21 +116,30 @@ TemporaryFile::~TemporaryFile() {
   }
 }
 
-TemporaryFile thisBinary(6 << 20);  // 6MiB
+TemporaryFile tempFile(6 << 20);  // 6MiB
+
+typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+ManagedBuffer allocateAligned(size_t size) {
+  void* buf;
+  int rc = posix_memalign(&buf, kAlign, size);
+  CHECK_EQ(rc, 0) << strerror(rc);
+  return ManagedBuffer(reinterpret_cast<char*>(buf), free);
+}
 
 void testReadsSerially(const std::vector<TestSpec>& specs,
                        AsyncIO::PollMode pollMode) {
   AsyncIO aioReader(1, pollMode);
   AsyncIO::Op op;
-  int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
   PCHECK(fd != -1);
   SCOPE_EXIT {
     ::close(fd);
   };
 
   for (int i = 0; i < specs.size(); i++) {
-    std::unique_ptr<char[]> buf(new char[specs[i].size]);
-    aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
+    auto buf = allocateAligned(specs[i].size);
+    op.pread(fd, buf.get(), specs[i].size, specs[i].start);
+    aioReader.submit(&op);
     EXPECT_EQ(aioReader.pending(), 1);
     auto ops = readerWait(&aioReader);
     EXPECT_EQ(1, ops.size());
@@ -142,20 +153,39 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
 }
 
 void testReadsParallel(const std::vector<TestSpec>& specs,
-                       AsyncIO::PollMode pollMode) {
+                       AsyncIO::PollMode pollMode,
+                       bool multithreaded) {
   AsyncIO aioReader(specs.size(), pollMode);
   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
-  std::vector<std::unique_ptr<char[]>> bufs(specs.size());
+  std::vector<ManagedBuffer> bufs;
+  bufs.reserve(specs.size());
 
-  int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
   PCHECK(fd != -1);
   SCOPE_EXIT {
     ::close(fd);
   };
+
+  std::vector<std::thread> threads;
+  if (multithreaded) {
+    threads.reserve(specs.size());
+  }
+  for (int i = 0; i < specs.size(); i++) {
+    bufs.push_back(allocateAligned(specs[i].size));
+  }
+  auto submit = [&] (int i) {
+    ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+    aioReader.submit(&ops[i]);
+  };
   for (int i = 0; i < specs.size(); i++) {
-    bufs[i].reset(new char[specs[i].size]);
-    aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
-                    specs[i].start);
+    if (multithreaded) {
+      threads.emplace_back([&submit, i] { submit(i); });
+    } else {
+      submit(i);
+    }
+  }
+  for (auto& t : threads) {
+    t.join();
   }
   std::vector<bool> pending(specs.size(), true);
 
@@ -184,10 +214,64 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
   }
 }
 
+void testReadsQueued(const std::vector<TestSpec>& specs,
+                     AsyncIO::PollMode pollMode) {
+  size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
+  AsyncIO aioReader(readerCapacity, pollMode);
+  AsyncIOQueue aioQueue(&aioReader);
+  std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
+  std::vector<ManagedBuffer> bufs;
+
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+  for (int i = 0; i < specs.size(); i++) {
+    bufs.push_back(allocateAligned(specs[i].size));
+    ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+    aioQueue.submit(&ops[i]);
+  }
+  std::vector<bool> pending(specs.size(), true);
+
+  size_t remaining = specs.size();
+  while (remaining != 0) {
+    if (remaining >= readerCapacity) {
+      EXPECT_EQ(readerCapacity, aioReader.pending());
+      EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
+    } else {
+      EXPECT_EQ(remaining, aioReader.pending());
+      EXPECT_EQ(0, aioQueue.queued());
+    }
+    auto completed = readerWait(&aioReader);
+    size_t nrRead = completed.size();
+    EXPECT_NE(nrRead, 0);
+    remaining -= nrRead;
+
+    for (int i = 0; i < nrRead; i++) {
+      int id = completed[i] - ops.get();
+      EXPECT_GE(id, 0);
+      EXPECT_LT(id, specs.size());
+      EXPECT_TRUE(pending[id]);
+      pending[id] = false;
+      ssize_t res = ops[id].result();
+      EXPECT_LE(0, res) << folly::errnoStr(-res);
+      EXPECT_EQ(specs[id].size, res);
+    }
+  }
+  EXPECT_EQ(aioReader.pending(), 0);
+  EXPECT_EQ(aioQueue.queued(), 0);
+  for (int i = 0; i < pending.size(); i++) {
+    EXPECT_FALSE(pending[i]);
+  }
+}
+
 void testReads(const std::vector<TestSpec>& specs,
                AsyncIO::PollMode pollMode) {
   testReadsSerially(specs, pollMode);
-  testReadsParallel(specs, pollMode);
+  testReadsParallel(specs, pollMode, false);
+  testReadsParallel(specs, pollMode, true);
+  testReadsQueued(specs, pollMode);
 }
 
 }  // anonymous namespace
@@ -201,56 +285,64 @@ TEST(AsyncIO, ZeroAsyncDataPollable) {
 }
 
 TEST(AsyncIO, SingleAsyncDataNotPollable) {
-  testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
-  testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
+  testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
+  testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
 }
 
 TEST(AsyncIO, SingleAsyncDataPollable) {
-  testReads({{0, 512}}, AsyncIO::POLLABLE);
-  testReads({{0, 512}}, AsyncIO::POLLABLE);
+  testReads({{0, kAlign}}, AsyncIO::POLLABLE);
+  testReads({{0, kAlign}}, AsyncIO::POLLABLE);
 }
 
 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
-  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
-  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
+  testReads(
+      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      AsyncIO::NOT_POLLABLE);
+  testReads(
+      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      AsyncIO::NOT_POLLABLE);
 
   testReads({
     {0, 5*1024*1024},
-    {512, 5*1024*1024},
+    {kAlign, 5*1024*1024}
   }, AsyncIO::NOT_POLLABLE);
 
   testReads({
-    {512, 0},
-    {512, 512},
-    {512, 1024},
-    {512, 10*1024},
-    {512, 1024*1024},
+    {kAlign, 0},
+    {kAlign, kAlign},
+    {kAlign, 2*kAlign},
+    {kAlign, 20*kAlign},
+    {kAlign, 1024*1024},
   }, AsyncIO::NOT_POLLABLE);
 }
 
 TEST(AsyncIO, MultipleAsyncDataPollable) {
-  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
-  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
+  testReads(
+      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      AsyncIO::POLLABLE);
+  testReads(
+      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      AsyncIO::POLLABLE);
 
   testReads({
     {0, 5*1024*1024},
-    {512, 5*1024*1024},
-  }, AsyncIO::POLLABLE);
+    {kAlign, 5*1024*1024}
+  }, AsyncIO::NOT_POLLABLE);
 
   testReads({
-    {512, 0},
-    {512, 512},
-    {512, 1024},
-    {512, 10*1024},
-    {512, 1024*1024},
-  }, AsyncIO::POLLABLE);
+    {kAlign, 0},
+    {kAlign, kAlign},
+    {kAlign, 2*kAlign},
+    {kAlign, 20*kAlign},
+    {kAlign, 1024*1024},
+  }, AsyncIO::NOT_POLLABLE);
 }
 
 TEST(AsyncIO, ManyAsyncDataNotPollable) {
   {
     std::vector<TestSpec> v;
     for (int i = 0; i < 1000; i++) {
-      v.push_back({512 * i, 512});
+      v.push_back({off_t(kAlign * i), kAlign});
     }
     testReads(v, AsyncIO::NOT_POLLABLE);
   }
@@ -260,7 +352,7 @@ TEST(AsyncIO, ManyAsyncDataPollable) {
   {
     std::vector<TestSpec> v;
     for (int i = 0; i < 1000; i++) {
-      v.push_back({512 * i, 512});
+      v.push_back({off_t(kAlign * i), kAlign});
     }
     testReads(v, AsyncIO::POLLABLE);
   }
@@ -269,14 +361,15 @@ TEST(AsyncIO, ManyAsyncDataPollable) {
 TEST(AsyncIO, NonBlockingWait) {
   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
   AsyncIO::Op op;
-  int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
   PCHECK(fd != -1);
   SCOPE_EXIT {
     ::close(fd);
   };
-  size_t size = 1024;
-  std::unique_ptr<char[]> buf(new char[size]);
-  aioReader.pread(&op, fd, buf.get(), size, 0);
+  size_t size = 2*kAlign;
+  auto buf = allocateAligned(size);
+  op.pread(fd, buf.get(), size, 0);
+  aioReader.submit(&op);
   EXPECT_EQ(aioReader.pending(), 1);
 
   folly::Range<AsyncIO::Op**> completed;
@@ -293,3 +386,4 @@ TEST(AsyncIO, NonBlockingWait) {
   EXPECT_EQ(aioReader.pending(), 0);
 }
 
+