From 8f45b8d51e9fe5cab83d5e62aeae24aa19c3ae80 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Mon, 4 Feb 2013 18:38:14 -0800 Subject: [PATCH] AsyncIO in folly Summary: Interface extended and cleaned up. Also, now actually allows you to retrieve IO errors. Also moved some useful functions out of Subprocess.cpp into a separate header file. Test Plan: async_io_test, subprocess_test Reviewed By: philipp@fb.com FB internal diff: D698412 --- folly/Exception.h | 73 +++++ folly/Subprocess.cpp | 34 +-- folly/experimental/io/AsyncIO.cpp | 239 +++++++++++++++++ folly/experimental/io/AsyncIO.h | 195 ++++++++++++++ folly/experimental/io/test/AsyncIOTest.cpp | 295 +++++++++++++++++++++ 5 files changed, 803 insertions(+), 33 deletions(-) create mode 100644 folly/Exception.h create mode 100644 folly/experimental/io/AsyncIO.cpp create mode 100644 folly/experimental/io/AsyncIO.h create mode 100644 folly/experimental/io/test/AsyncIOTest.cpp diff --git a/folly/Exception.h b/folly/Exception.h new file mode 100644 index 00000000..5820f68b --- /dev/null +++ b/folly/Exception.h @@ -0,0 +1,73 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_EXCEPTION_H_ +#define FOLLY_EXCEPTION_H_ + +#include + +#include +#include + +#include "folly/Likely.h" + +namespace folly { + +// Helper to throw std::system_error +void throwSystemError(int err, const char* msg) __attribute__((noreturn)); +inline void throwSystemError(int err, const char* msg) { + throw std::system_error(err, std::system_category(), msg); +} + +// Helper to throw std::system_error from errno +void throwSystemError(const char* msg) __attribute__((noreturn)); +inline void throwSystemError(const char* msg) { + throwSystemError(errno, msg); +} + +// Check a Posix return code (0 on success, error number on error), throw +// on error. +inline void checkPosixError(int err, const char* msg) { + if (UNLIKELY(err != 0)) { + throwSystemError(err, msg); + } +} + +// Check a Linux kernel-style return code (>= 0 on success, negative error +// number on error), throw on error. +inline void checkKernelError(ssize_t ret, const char* msg) { + if (UNLIKELY(ret < 0)) { + throwSystemError(-ret, msg); + } +} + +// Check a traditional Uinx return code (-1 and sets errno on error), throw +// on error. +inline void checkUnixError(ssize_t ret, const char* msg) { + if (UNLIKELY(ret == -1)) { + throwSystemError(msg); + } +} +inline void checkUnixError(ssize_t ret, int savedErrno, const char* msg) { + if (UNLIKELY(ret == -1)) { + throwSystemError(savedErrno, msg); + } +} + +} // namespace folly + +#endif /* FOLLY_EXCEPTION_H_ */ + diff --git a/folly/Subprocess.cpp b/folly/Subprocess.cpp index 1470a706..e2d5f7bd 100644 --- a/folly/Subprocess.cpp +++ b/folly/Subprocess.cpp @@ -31,6 +31,7 @@ #include #include "folly/Conv.h" +#include "folly/Exception.h" #include "folly/ScopeGuard.h" #include "folly/String.h" #include "folly/io/Cursor.h" @@ -101,39 +102,6 @@ std::unique_ptr cloneStrings(const std::vector& s) { return d; } -// Helper to throw std::system_error -void throwSystemError(int err, const char* msg) __attribute__((noreturn)); -void throwSystemError(int err, const char* msg) { - throw std::system_error(err, std::system_category(), msg); -} - -// Helper to throw std::system_error from errno -void throwSystemError(const char* msg) __attribute__((noreturn)); -void throwSystemError(const char* msg) { - throwSystemError(errno, msg); -} - -// Check a Posix return code (0 on success, error number on error), throw -// on error. -void checkPosixError(int err, const char* msg) { - if (err != 0) { - throwSystemError(err, msg); - } -} - -// Check a traditional Uinx return code (-1 and sets errno on error), throw -// on error. -void checkUnixError(ssize_t ret, const char* msg) { - if (ret == -1) { - throwSystemError(msg); - } -} -void checkUnixError(ssize_t ret, int savedErrno, const char* msg) { - if (ret == -1) { - throwSystemError(savedErrno, msg); - } -} - // Check a wait() status, throw on non-successful void checkStatus(ProcessReturnCode returnCode) { if (returnCode.state() != ProcessReturnCode::EXITED || diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp new file mode 100644 index 00000000..2080b720 --- /dev/null +++ b/folly/experimental/io/AsyncIO.cpp @@ -0,0 +1,239 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/experimental/io/AsyncIO.h" + +#include + +#include + +#include "folly/Exception.h" +#include "folly/Likely.h" +#include "folly/String.h" +#include "folly/eventfd.h" + +namespace folly { + +AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) + : ctx_(0), + pending_(0), + capacity_(capacity), + pollFd_(-1) { + if (UNLIKELY(capacity_ == 0)) { + throw std::out_of_range("AsyncIO: capacity must not be 0"); + } + completed_.reserve(capacity_); + if (pollMode == POLLABLE) { + pollFd_ = eventfd(0, EFD_NONBLOCK); + checkUnixError(pollFd_, "AsyncIO: eventfd creation failed"); + } +} + +AsyncIO::~AsyncIO() { + CHECK_EQ(pending_, 0); + if (ctx_) { + int rc = io_queue_release(ctx_); + CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc); + } + if (pollFd_ != -1) { + CHECK_ERR(close(pollFd_)); + } +} + +void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) { + iocb cb; + io_prep_pread(&cb, fd, buf, size, start); + submit(op, &cb); +} + +void AsyncIO::pread(Op* op, int fd, Range range, + off_t start) { + pread(op, fd, range.begin(), range.size(), start); +} + +void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt, + off_t start) { + iocb cb; + io_prep_preadv(&cb, fd, iov, iovcnt, start); + submit(op, &cb); +} + +void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size, + off_t start) { + iocb cb; + io_prep_pwrite(&cb, fd, const_cast(buf), size, start); + submit(op, &cb); +} + +void AsyncIO::pwrite(Op* op, int fd, Range range, + off_t start) { + pwrite(op, fd, range.begin(), range.size(), start); +} + +void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt, + off_t start) { + iocb cb; + io_prep_pwritev(&cb, fd, iov, iovcnt, start); + submit(op, &cb); +} + +void AsyncIO::initializeContext() { + if (!ctx_) { + int rc = io_queue_init(capacity_, &ctx_); + // returns negative errno + checkKernelError(rc, "AsyncIO: io_queue_init failed"); + DCHECK(ctx_); + } +} + +void AsyncIO::submit(Op* op, iocb* cb) { + if (UNLIKELY(pending_ >= capacity_)) { + throw std::out_of_range("AsyncIO: too many pending requests"); + } + if (UNLIKELY(op->state() != Op::UNINITIALIZED)) { + throw std::logic_error("AsyncIO: Invalid Op state in submit"); + } + initializeContext(); // on demand + cb->data = op; + if (pollFd_ != -1) { + io_set_eventfd(cb, pollFd_); + } + int rc = io_submit(ctx_, 1, &cb); + checkKernelError(rc, "AsyncIO: io_submit failed"); + DCHECK_EQ(rc, 1); + op->start(); + ++pending_; +} + +Range AsyncIO::wait(size_t minRequests) { + if (UNLIKELY(!ctx_)) { + throw std::logic_error("AsyncIO: wait called with no requests"); + } + if (UNLIKELY(pollFd_ != -1)) { + throw std::logic_error("AsyncIO: wait not allowed on pollable object"); + } + return doWait(minRequests, pending_); +} + +Range AsyncIO::pollCompleted() { + if (UNLIKELY(!ctx_)) { + throw std::logic_error("AsyncIO: pollCompleted called with no requests"); + } + if (UNLIKELY(pollFd_ == -1)) { + throw std::logic_error( + "AsyncIO: pollCompleted not allowed on non-pollable object"); + } + uint64_t numEvents; + // This sets the eventFd counter to 0, see + // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html + ssize_t rc; + do { + rc = ::read(pollFd_, &numEvents, 8); + } while (rc == -1 && errno == EINTR); + if (UNLIKELY(rc == -1 && errno == EAGAIN)) { + return Range(); // nothing completed + } + checkUnixError(rc, "AsyncIO: read from event fd failed"); + DCHECK_EQ(rc, 8); + + DCHECK_GT(numEvents, 0); + DCHECK_LE(numEvents, pending_); + + // Don't reap more than numEvents, as we've just reset the counter to 0. + return doWait(numEvents, numEvents); +} + +Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { + io_event events[pending_]; + int count; + do { + // Wait forever + count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr); + } while (count == -EINTR); + checkKernelError(count, "AsyncIO: io_getevents failed"); + DCHECK_GE(count, minRequests); // the man page says so + DCHECK_LE(count, pending_); + + completed_.clear(); + if (count == 0) { + return folly::Range(); + } + + for (size_t i = 0; i < count; ++i) { + Op* op = static_cast(events[i].data); + DCHECK(op); + op->complete(events[i].res); + completed_.push_back(op); + } + pending_ -= count; + + return folly::Range(&completed_.front(), count); +} + +AsyncIO::Op::Op() + : state_(UNINITIALIZED), + result_(-EINVAL) { +} + +void AsyncIO::Op::reset() { + if (UNLIKELY(state_ == PENDING)) { + throw std::logic_error("AsyncIO: invalid state for reset"); + } + state_ = UNINITIALIZED; + result_ = -EINVAL; +} + +AsyncIO::Op::~Op() { + CHECK_NE(state_, PENDING); +} + +void AsyncIO::Op::start() { + DCHECK_EQ(state_, UNINITIALIZED); + state_ = PENDING; +} + +void AsyncIO::Op::complete(ssize_t result) { + DCHECK_EQ(state_, PENDING); + state_ = COMPLETED; + result_ = result; + onCompleted(); +} + +void AsyncIO::Op::onCompleted() { } // default: do nothing + +ssize_t AsyncIO::Op::result() const { + if (UNLIKELY(state_ != COMPLETED)) { + throw std::logic_error("AsyncIO: Invalid Op state in result"); + } + return result_; +} + +CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { } + +CallbackOp::~CallbackOp() { } + +CallbackOp* CallbackOp::make(Callback&& callback) { + // Ensure created on the heap + return new CallbackOp(std::move(callback)); +} + +void CallbackOp::onCompleted() { + callback_(result()); + delete this; +} + +} // namespace folly + diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h new file mode 100644 index 00000000..81ed94dc --- /dev/null +++ b/folly/experimental/io/AsyncIO.h @@ -0,0 +1,195 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_ASYNCIO_H_ +#define FOLLY_IO_ASYNCIO_H_ + +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include "folly/Portability.h" +#include "folly/Range.h" + +namespace folly { + +/** + * C++ interface around Linux Async IO. + */ +class AsyncIO : private boost::noncopyable { + public: + enum PollMode { + NOT_POLLABLE, + POLLABLE + }; + + /** + * Create an AsyncIO context capacble of holding at most 'capacity' pending + * requests at the same time. As requests complete, others can be scheduled, + * as long as this limit is not exceeded. + * + * Note: the maximum number of allowed concurrent requests is controlled + * by the fs.aio-max-nr sysctl, the default value is usually 64K. + * + * If pollMode is POLLABLE, pollFd() will return a file descriptor that + * can be passed to poll / epoll / select and will become readable when + * any IOs on this AioReader have completed. If you do this, you must use + * pollCompleted() instead of wait() -- do not read from the pollFd() + * file descriptor directly. + */ + explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE); + ~AsyncIO(); + + /** + * An Op represents a pending operation. You may inherit from Op (and + * override onCompleted) in order to be notified of completion (see + * CallbackOp below for an example), or you may use Op's methods directly. + * + * The Op must remain allocated until completion. + */ + class Op : private boost::noncopyable { + friend class AsyncIO; + public: + Op(); + virtual ~Op(); + + // There would be a cancel() method here if Linux AIO actually implemented + // it. But let's not get your hopes up. + + enum State { + UNINITIALIZED, + PENDING, + COMPLETED + }; + + /** + * Return the current operation state. + */ + State state() const { return state_; } + + /** + * Reset the operation for reuse. It is an error to call reset() on + * an Op that is still pending. + */ + void reset(); + + /** + * Retrieve the result of this operation. Returns >=0 on success, + * -errno on failure (that is, using the Linux kernel error reporting + * conventions). Use checkKernelError (folly/Exception.h) on the result to + * throw a std::system_error in case of error instead. + * + * It is an error to call this if the Op hasn't yet started or is still + * pending. + */ + ssize_t result() const; + + private: + void start(); + void complete(ssize_t result); + + virtual void onCompleted(); + + State state_; + ssize_t result_; + }; + + /** + * Initiate a read request. + */ + void pread(Op* op, int fd, void* buf, size_t size, off_t start); + void pread(Op* op, int fd, Range range, off_t start); + void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start); + + /** + * Initiate a write request. + */ + void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start); + void pwrite(Op* op, int fd, Range range, off_t start); + void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start); + + /** + * Wait for at least minRequests to complete. Returns the requests that + * have completed; the returned range is valid until the next call to + * wait(). minRequests may be 0 to not block. + */ + Range wait(size_t minRequests); + + /** + * Return the number of pending requests. + */ + size_t pending() const { return pending_; } + + /** + * Return the maximum number of requests that can be kept outstanding + * at any one time. + */ + size_t capacity() const { return capacity_; } + + /** + * If POLLABLE, return a file descriptor that can be passed to poll / epoll + * and will become readable when any async IO operations have completed. + * If NOT_POLLABLE, return -1. + */ + int pollFd() const { return pollFd_; } + + /** + * If POLLABLE, call instead of wait after the file descriptor returned + * by pollFd() became readable. The returned range is valid until the next + * call to pollCompleted(). + */ + Range pollCompleted(); + + private: + void initializeContext(); + void submit(Op* op, iocb* cb); + Range doWait(size_t minRequests, size_t maxRequests); + + io_context_t ctx_; + size_t pending_; + size_t capacity_; + int pollFd_; + std::vector completed_; +}; + +/** + * Implementation of AsyncIO::Op that calls a callback and then deletes + * itself. + */ +class CallbackOp : public AsyncIO::Op { + public: + typedef std::function Callback; + static CallbackOp* make(Callback&& callback); + + private: + explicit CallbackOp(Callback&& callback); + ~CallbackOp(); + void onCompleted() FOLLY_OVERRIDE; + + Callback callback_; +}; + +} // namespace folly + +#endif /* FOLLY_IO_ASYNCIO_H_ */ + diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp new file mode 100644 index 00000000..77595c66 --- /dev/null +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -0,0 +1,295 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/experimental/io/AsyncIO.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "folly/experimental/io/FsUtil.h" +#include "folly/ScopeGuard.h" +#include "folly/String.h" + +namespace fs = folly::fs; +using folly::AsyncIO; + +namespace { + +constexpr size_t kAlignment = 512; // align reads to 512 B (for O_DIRECT) + +struct TestSpec { + off_t start; + size_t size; +}; + +void waitUntilReadable(int fd) { + pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + + int r; + do { + r = poll(&pfd, 1, -1); // wait forever + } while (r == -1 && errno == EINTR); + PCHECK(r == 1); + CHECK_EQ(pfd.revents, POLLIN); // no errors etc +} + +folly::Range readerWait(AsyncIO* reader) { + int fd = reader->pollFd(); + if (fd == -1) { + return reader->wait(1); + } else { + waitUntilReadable(fd); + return reader->pollCompleted(); + } +} + +// Temporary file that is NOT kept open but is deleted on exit. +// Generate random-looking but reproduceable data. +class TemporaryFile { + public: + explicit TemporaryFile(size_t size); + ~TemporaryFile(); + + const fs::path path() const { return path_; } + + private: + fs::path path_; +}; + +TemporaryFile::TemporaryFile(size_t size) + : path_(fs::temp_directory_path() / fs::unique_path()) { + CHECK_EQ(size % sizeof(uint32_t), 0); + size /= sizeof(uint32_t); + const uint32_t seed = 42; + std::mt19937 rnd(seed); + + const size_t bufferSize = 1U << 16; + uint32_t buffer[bufferSize]; + + FILE* fp = ::fopen(path_.c_str(), "wb"); + PCHECK(fp != nullptr); + while (size) { + size_t n = std::min(size, bufferSize); + for (size_t i = 0; i < n; ++i) { + buffer[i] = rnd(); + } + size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp); + PCHECK(written == n); + size -= written; + } + PCHECK(::fclose(fp) == 0); +} + +TemporaryFile::~TemporaryFile() { + try { + fs::remove(path_); + } catch (const fs::filesystem_error& e) { + LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e); + } +} + +TemporaryFile thisBinary(6 << 20); // 6MiB + +void testReadsSerially(const std::vector& specs, + AsyncIO::PollMode pollMode) { + AsyncIO aioReader(1, pollMode); + AsyncIO::Op op; + int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY); + PCHECK(fd != -1); + SCOPE_EXIT { + ::close(fd); + }; + + for (int i = 0; i < specs.size(); i++) { + std::unique_ptr buf(new char[specs[i].size]); + aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start); + EXPECT_EQ(aioReader.pending(), 1); + auto ops = readerWait(&aioReader); + EXPECT_EQ(1, ops.size()); + EXPECT_TRUE(ops[0] == &op); + EXPECT_EQ(aioReader.pending(), 0); + ssize_t res = op.result(); + EXPECT_LE(0, res) << folly::errnoStr(-res); + EXPECT_EQ(specs[i].size, res); + op.reset(); + } +} + +void testReadsParallel(const std::vector& specs, + AsyncIO::PollMode pollMode) { + AsyncIO aioReader(specs.size(), pollMode); + std::unique_ptr ops(new AsyncIO::Op[specs.size()]); + std::vector> bufs(specs.size()); + + int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY); + PCHECK(fd != -1); + SCOPE_EXIT { + ::close(fd); + }; + for (int i = 0; i < specs.size(); i++) { + bufs[i].reset(new char[specs[i].size]); + aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size, + specs[i].start); + } + std::vector pending(specs.size(), true); + + size_t remaining = specs.size(); + while (remaining != 0) { + EXPECT_EQ(remaining, aioReader.pending()); + auto completed = readerWait(&aioReader); + size_t nrRead = completed.size(); + EXPECT_NE(nrRead, 0); + remaining -= nrRead; + + for (int i = 0; i < nrRead; i++) { + int id = completed[i] - ops.get(); + EXPECT_GE(id, 0); + EXPECT_LT(id, specs.size()); + EXPECT_TRUE(pending[id]); + pending[id] = false; + ssize_t res = ops[id].result(); + EXPECT_LE(0, res) << folly::errnoStr(-res); + EXPECT_EQ(specs[id].size, res); + } + } + EXPECT_EQ(aioReader.pending(), 0); + for (int i = 0; i < pending.size(); i++) { + EXPECT_FALSE(pending[i]); + } +} + +void testReads(const std::vector& specs, + AsyncIO::PollMode pollMode) { + testReadsSerially(specs, pollMode); + testReadsParallel(specs, pollMode); +} + +} // anonymous namespace + +TEST(AsyncIO, ZeroAsyncDataNotPollable) { + testReads({{0, 0}}, AsyncIO::NOT_POLLABLE); +} + +TEST(AsyncIO, ZeroAsyncDataPollable) { + testReads({{0, 0}}, AsyncIO::POLLABLE); +} + +TEST(AsyncIO, SingleAsyncDataNotPollable) { + testReads({{0, 512}}, AsyncIO::NOT_POLLABLE); + testReads({{0, 512}}, AsyncIO::NOT_POLLABLE); +} + +TEST(AsyncIO, SingleAsyncDataPollable) { + testReads({{0, 512}}, AsyncIO::POLLABLE); + testReads({{0, 512}}, AsyncIO::POLLABLE); +} + +TEST(AsyncIO, MultipleAsyncDataNotPollable) { + testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE); + testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE); + + testReads({ + {0, 5*1024*1024}, + {512, 5*1024*1024}, + }, AsyncIO::NOT_POLLABLE); + + testReads({ + {512, 0}, + {512, 512}, + {512, 1024}, + {512, 10*1024}, + {512, 1024*1024}, + }, AsyncIO::NOT_POLLABLE); +} + +TEST(AsyncIO, MultipleAsyncDataPollable) { + testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE); + testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE); + + testReads({ + {0, 5*1024*1024}, + {512, 5*1024*1024}, + }, AsyncIO::POLLABLE); + + testReads({ + {512, 0}, + {512, 512}, + {512, 1024}, + {512, 10*1024}, + {512, 1024*1024}, + }, AsyncIO::POLLABLE); +} + +TEST(AsyncIO, ManyAsyncDataNotPollable) { + { + std::vector v; + for (int i = 0; i < 1000; i++) { + v.push_back({512 * i, 512}); + } + testReads(v, AsyncIO::NOT_POLLABLE); + } +} + +TEST(AsyncIO, ManyAsyncDataPollable) { + { + std::vector v; + for (int i = 0; i < 1000; i++) { + v.push_back({512 * i, 512}); + } + testReads(v, AsyncIO::POLLABLE); + } +} + +TEST(AsyncIO, NonBlockingWait) { + AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE); + AsyncIO::Op op; + int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY); + PCHECK(fd != -1); + SCOPE_EXIT { + ::close(fd); + }; + size_t size = 1024; + std::unique_ptr buf(new char[size]); + aioReader.pread(&op, fd, buf.get(), size, 0); + EXPECT_EQ(aioReader.pending(), 1); + + folly::Range completed; + while (completed.empty()) { + // poll without blocking until the read request completes. + completed = aioReader.wait(0); + } + EXPECT_EQ(completed.size(), 1); + + EXPECT_TRUE(completed[0] == &op); + ssize_t res = op.result(); + EXPECT_LE(0, res) << folly::errnoStr(-res); + EXPECT_EQ(size, res); + EXPECT_EQ(aioReader.pending(), 0); +} + -- 2.34.1