2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
20 #include <sys/types.h>
30 #include <glog/logging.h>
31 #include <gtest/gtest.h>
33 #include "folly/experimental/io/FsUtil.h"
34 #include "folly/ScopeGuard.h"
35 #include "folly/String.h"
37 namespace fs = folly::fs;
42 constexpr size_t kAlignment = 512; // align reads to 512 B (for O_DIRECT)
49 void waitUntilReadable(int fd) {
56 r = poll(&pfd, 1, -1); // wait forever
57 } while (r == -1 && errno == EINTR);
59 CHECK_EQ(pfd.revents, POLLIN); // no errors etc
62 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
63 int fd = reader->pollFd();
65 return reader->wait(1);
67 waitUntilReadable(fd);
68 return reader->pollCompleted();
72 // Temporary file that is NOT kept open but is deleted on exit.
73 // Generate random-looking but reproduceable data.
76 explicit TemporaryFile(size_t size);
79 const fs::path path() const { return path_; }
85 TemporaryFile::TemporaryFile(size_t size)
86 : path_(fs::temp_directory_path() / fs::unique_path()) {
87 CHECK_EQ(size % sizeof(uint32_t), 0);
88 size /= sizeof(uint32_t);
89 const uint32_t seed = 42;
90 std::mt19937 rnd(seed);
92 const size_t bufferSize = 1U << 16;
93 uint32_t buffer[bufferSize];
95 FILE* fp = ::fopen(path_.c_str(), "wb");
96 PCHECK(fp != nullptr);
98 size_t n = std::min(size, bufferSize);
99 for (size_t i = 0; i < n; ++i) {
102 size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
103 PCHECK(written == n);
106 PCHECK(::fclose(fp) == 0);
109 TemporaryFile::~TemporaryFile() {
112 } catch (const fs::filesystem_error& e) {
113 LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
117 TemporaryFile tempFile(6 << 20); // 6MiB
119 void testReadsSerially(const std::vector<TestSpec>& specs,
120 AsyncIO::PollMode pollMode) {
121 AsyncIO aioReader(1, pollMode);
123 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
129 for (int i = 0; i < specs.size(); i++) {
130 std::unique_ptr<char[]> buf(new char[specs[i].size]);
131 aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
132 EXPECT_EQ(aioReader.pending(), 1);
133 auto ops = readerWait(&aioReader);
134 EXPECT_EQ(1, ops.size());
135 EXPECT_TRUE(ops[0] == &op);
136 EXPECT_EQ(aioReader.pending(), 0);
137 ssize_t res = op.result();
138 EXPECT_LE(0, res) << folly::errnoStr(-res);
139 EXPECT_EQ(specs[i].size, res);
144 void testReadsParallel(const std::vector<TestSpec>& specs,
145 AsyncIO::PollMode pollMode) {
146 AsyncIO aioReader(specs.size(), pollMode);
147 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
148 std::vector<std::unique_ptr<char[]>> bufs(specs.size());
150 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
155 for (int i = 0; i < specs.size(); i++) {
156 bufs[i].reset(new char[specs[i].size]);
157 aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
160 std::vector<bool> pending(specs.size(), true);
162 size_t remaining = specs.size();
163 while (remaining != 0) {
164 EXPECT_EQ(remaining, aioReader.pending());
165 auto completed = readerWait(&aioReader);
166 size_t nrRead = completed.size();
167 EXPECT_NE(nrRead, 0);
170 for (int i = 0; i < nrRead; i++) {
171 int id = completed[i] - ops.get();
173 EXPECT_LT(id, specs.size());
174 EXPECT_TRUE(pending[id]);
176 ssize_t res = ops[id].result();
177 EXPECT_LE(0, res) << folly::errnoStr(-res);
178 EXPECT_EQ(specs[id].size, res);
181 EXPECT_EQ(aioReader.pending(), 0);
182 for (int i = 0; i < pending.size(); i++) {
183 EXPECT_FALSE(pending[i]);
187 void testReads(const std::vector<TestSpec>& specs,
188 AsyncIO::PollMode pollMode) {
189 testReadsSerially(specs, pollMode);
190 testReadsParallel(specs, pollMode);
193 } // anonymous namespace
195 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
196 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
199 TEST(AsyncIO, ZeroAsyncDataPollable) {
200 testReads({{0, 0}}, AsyncIO::POLLABLE);
203 TEST(AsyncIO, SingleAsyncDataNotPollable) {
204 testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
205 testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
208 TEST(AsyncIO, SingleAsyncDataPollable) {
209 testReads({{0, 512}}, AsyncIO::POLLABLE);
210 testReads({{0, 512}}, AsyncIO::POLLABLE);
213 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
214 testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
215 testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
220 }, AsyncIO::NOT_POLLABLE);
228 }, AsyncIO::NOT_POLLABLE);
231 TEST(AsyncIO, MultipleAsyncDataPollable) {
232 testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
233 testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
238 }, AsyncIO::POLLABLE);
246 }, AsyncIO::POLLABLE);
249 TEST(AsyncIO, ManyAsyncDataNotPollable) {
251 std::vector<TestSpec> v;
252 for (int i = 0; i < 1000; i++) {
253 v.push_back({512 * i, 512});
255 testReads(v, AsyncIO::NOT_POLLABLE);
259 TEST(AsyncIO, ManyAsyncDataPollable) {
261 std::vector<TestSpec> v;
262 for (int i = 0; i < 1000; i++) {
263 v.push_back({512 * i, 512});
265 testReads(v, AsyncIO::POLLABLE);
269 TEST(AsyncIO, NonBlockingWait) {
270 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
272 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
278 std::unique_ptr<char[]> buf(new char[size]);
279 aioReader.pread(&op, fd, buf.get(), size, 0);
280 EXPECT_EQ(aioReader.pending(), 1);
282 folly::Range<AsyncIO::Op**> completed;
283 while (completed.empty()) {
284 // poll without blocking until the read request completes.
285 completed = aioReader.wait(0);
287 EXPECT_EQ(completed.size(), 1);
289 EXPECT_TRUE(completed[0] == &op);
290 ssize_t res = op.result();
291 EXPECT_LE(0, res) << folly::errnoStr(-res);
292 EXPECT_EQ(size, res);
293 EXPECT_EQ(aioReader.pending(), 0);