AsyncIO in folly
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <fcntl.h>
22 #include <poll.h>
23
24 #include <cstdlib>
25 #include <cstdio>
26 #include <memory>
27 #include <random>
28 #include <vector>
29
30 #include <glog/logging.h>
31 #include <gtest/gtest.h>
32
33 #include "folly/experimental/io/FsUtil.h"
34 #include "folly/ScopeGuard.h"
35 #include "folly/String.h"
36
37 namespace fs = folly::fs;
38 using folly::AsyncIO;
39
40 namespace {
41
42 constexpr size_t kAlignment = 512;  // align reads to 512 B (for O_DIRECT)
43
44 struct TestSpec {
45   off_t start;
46   size_t size;
47 };
48
49 void waitUntilReadable(int fd) {
50   pollfd pfd;
51   pfd.fd = fd;
52   pfd.events = POLLIN;
53
54   int r;
55   do {
56     r = poll(&pfd, 1, -1);  // wait forever
57   } while (r == -1 && errno == EINTR);
58   PCHECK(r == 1);
59   CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
60 }
61
62 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
63   int fd = reader->pollFd();
64   if (fd == -1) {
65     return reader->wait(1);
66   } else {
67     waitUntilReadable(fd);
68     return reader->pollCompleted();
69   }
70 }
71
72 // Temporary file that is NOT kept open but is deleted on exit.
73 // Generate random-looking but reproduceable data.
74 class TemporaryFile {
75  public:
76   explicit TemporaryFile(size_t size);
77   ~TemporaryFile();
78
79   const fs::path path() const { return path_; }
80
81  private:
82   fs::path path_;
83 };
84
85 TemporaryFile::TemporaryFile(size_t size)
86   : path_(fs::temp_directory_path() / fs::unique_path()) {
87   CHECK_EQ(size % sizeof(uint32_t), 0);
88   size /= sizeof(uint32_t);
89   const uint32_t seed = 42;
90   std::mt19937 rnd(seed);
91
92   const size_t bufferSize = 1U << 16;
93   uint32_t buffer[bufferSize];
94
95   FILE* fp = ::fopen(path_.c_str(), "wb");
96   PCHECK(fp != nullptr);
97   while (size) {
98     size_t n = std::min(size, bufferSize);
99     for (size_t i = 0; i < n; ++i) {
100       buffer[i] = rnd();
101     }
102     size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
103     PCHECK(written == n);
104     size -= written;
105   }
106   PCHECK(::fclose(fp) == 0);
107 }
108
109 TemporaryFile::~TemporaryFile() {
110   try {
111     fs::remove(path_);
112   } catch (const fs::filesystem_error& e) {
113     LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
114   }
115 }
116
117 TemporaryFile thisBinary(6 << 20);  // 6MiB
118
119 void testReadsSerially(const std::vector<TestSpec>& specs,
120                        AsyncIO::PollMode pollMode) {
121   AsyncIO aioReader(1, pollMode);
122   AsyncIO::Op op;
123   int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
124   PCHECK(fd != -1);
125   SCOPE_EXIT {
126     ::close(fd);
127   };
128
129   for (int i = 0; i < specs.size(); i++) {
130     std::unique_ptr<char[]> buf(new char[specs[i].size]);
131     aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
132     EXPECT_EQ(aioReader.pending(), 1);
133     auto ops = readerWait(&aioReader);
134     EXPECT_EQ(1, ops.size());
135     EXPECT_TRUE(ops[0] == &op);
136     EXPECT_EQ(aioReader.pending(), 0);
137     ssize_t res = op.result();
138     EXPECT_LE(0, res) << folly::errnoStr(-res);
139     EXPECT_EQ(specs[i].size, res);
140     op.reset();
141   }
142 }
143
144 void testReadsParallel(const std::vector<TestSpec>& specs,
145                        AsyncIO::PollMode pollMode) {
146   AsyncIO aioReader(specs.size(), pollMode);
147   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
148   std::vector<std::unique_ptr<char[]>> bufs(specs.size());
149
150   int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
151   PCHECK(fd != -1);
152   SCOPE_EXIT {
153     ::close(fd);
154   };
155   for (int i = 0; i < specs.size(); i++) {
156     bufs[i].reset(new char[specs[i].size]);
157     aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
158                     specs[i].start);
159   }
160   std::vector<bool> pending(specs.size(), true);
161
162   size_t remaining = specs.size();
163   while (remaining != 0) {
164     EXPECT_EQ(remaining, aioReader.pending());
165     auto completed = readerWait(&aioReader);
166     size_t nrRead = completed.size();
167     EXPECT_NE(nrRead, 0);
168     remaining -= nrRead;
169
170     for (int i = 0; i < nrRead; i++) {
171       int id = completed[i] - ops.get();
172       EXPECT_GE(id, 0);
173       EXPECT_LT(id, specs.size());
174       EXPECT_TRUE(pending[id]);
175       pending[id] = false;
176       ssize_t res = ops[id].result();
177       EXPECT_LE(0, res) << folly::errnoStr(-res);
178       EXPECT_EQ(specs[id].size, res);
179     }
180   }
181   EXPECT_EQ(aioReader.pending(), 0);
182   for (int i = 0; i < pending.size(); i++) {
183     EXPECT_FALSE(pending[i]);
184   }
185 }
186
187 void testReads(const std::vector<TestSpec>& specs,
188                AsyncIO::PollMode pollMode) {
189   testReadsSerially(specs, pollMode);
190   testReadsParallel(specs, pollMode);
191 }
192
193 }  // anonymous namespace
194
195 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
196   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
197 }
198
199 TEST(AsyncIO, ZeroAsyncDataPollable) {
200   testReads({{0, 0}}, AsyncIO::POLLABLE);
201 }
202
203 TEST(AsyncIO, SingleAsyncDataNotPollable) {
204   testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
205   testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
206 }
207
208 TEST(AsyncIO, SingleAsyncDataPollable) {
209   testReads({{0, 512}}, AsyncIO::POLLABLE);
210   testReads({{0, 512}}, AsyncIO::POLLABLE);
211 }
212
213 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
214   testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
215   testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
216
217   testReads({
218     {0, 5*1024*1024},
219     {512, 5*1024*1024},
220   }, AsyncIO::NOT_POLLABLE);
221
222   testReads({
223     {512, 0},
224     {512, 512},
225     {512, 1024},
226     {512, 10*1024},
227     {512, 1024*1024},
228   }, AsyncIO::NOT_POLLABLE);
229 }
230
231 TEST(AsyncIO, MultipleAsyncDataPollable) {
232   testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
233   testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
234
235   testReads({
236     {0, 5*1024*1024},
237     {512, 5*1024*1024},
238   }, AsyncIO::POLLABLE);
239
240   testReads({
241     {512, 0},
242     {512, 512},
243     {512, 1024},
244     {512, 10*1024},
245     {512, 1024*1024},
246   }, AsyncIO::POLLABLE);
247 }
248
249 TEST(AsyncIO, ManyAsyncDataNotPollable) {
250   {
251     std::vector<TestSpec> v;
252     for (int i = 0; i < 1000; i++) {
253       v.push_back({512 * i, 512});
254     }
255     testReads(v, AsyncIO::NOT_POLLABLE);
256   }
257 }
258
259 TEST(AsyncIO, ManyAsyncDataPollable) {
260   {
261     std::vector<TestSpec> v;
262     for (int i = 0; i < 1000; i++) {
263       v.push_back({512 * i, 512});
264     }
265     testReads(v, AsyncIO::POLLABLE);
266   }
267 }
268
269 TEST(AsyncIO, NonBlockingWait) {
270   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
271   AsyncIO::Op op;
272   int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
273   PCHECK(fd != -1);
274   SCOPE_EXIT {
275     ::close(fd);
276   };
277   size_t size = 1024;
278   std::unique_ptr<char[]> buf(new char[size]);
279   aioReader.pread(&op, fd, buf.get(), size, 0);
280   EXPECT_EQ(aioReader.pending(), 1);
281
282   folly::Range<AsyncIO::Op**> completed;
283   while (completed.empty()) {
284     // poll without blocking until the read request completes.
285     completed = aioReader.wait(0);
286   }
287   EXPECT_EQ(completed.size(), 1);
288
289   EXPECT_TRUE(completed[0] == &op);
290   ssize_t res = op.result();
291   EXPECT_LE(0, res) << folly::errnoStr(-res);
292   EXPECT_EQ(size, res);
293   EXPECT_EQ(aioReader.pending(), 0);
294 }
295