--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_EXCEPTION_H_
+#define FOLLY_EXCEPTION_H_
+
+#include <errno.h>
+
+#include <stdexcept>
+#include <system_error>
+
+#include "folly/Likely.h"
+
+namespace folly {
+
+// Helper to throw std::system_error
+void throwSystemError(int err, const char* msg) __attribute__((noreturn));
+inline void throwSystemError(int err, const char* msg) {
+ throw std::system_error(err, std::system_category(), msg);
+}
+
+// Helper to throw std::system_error from errno
+void throwSystemError(const char* msg) __attribute__((noreturn));
+inline void throwSystemError(const char* msg) {
+ throwSystemError(errno, msg);
+}
+
+// Check a Posix return code (0 on success, error number on error), throw
+// on error.
+inline void checkPosixError(int err, const char* msg) {
+ if (UNLIKELY(err != 0)) {
+ throwSystemError(err, msg);
+ }
+}
+
+// Check a Linux kernel-style return code (>= 0 on success, negative error
+// number on error), throw on error.
+inline void checkKernelError(ssize_t ret, const char* msg) {
+ if (UNLIKELY(ret < 0)) {
+ throwSystemError(-ret, msg);
+ }
+}
+
+// Check a traditional Uinx return code (-1 and sets errno on error), throw
+// on error.
+inline void checkUnixError(ssize_t ret, const char* msg) {
+ if (UNLIKELY(ret == -1)) {
+ throwSystemError(msg);
+ }
+}
+inline void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
+ if (UNLIKELY(ret == -1)) {
+ throwSystemError(savedErrno, msg);
+ }
+}
+
+} // namespace folly
+
+#endif /* FOLLY_EXCEPTION_H_ */
+
#include <glog/logging.h>
#include "folly/Conv.h"
+#include "folly/Exception.h"
#include "folly/ScopeGuard.h"
#include "folly/String.h"
#include "folly/io/Cursor.h"
return d;
}
-// Helper to throw std::system_error
-void throwSystemError(int err, const char* msg) __attribute__((noreturn));
-void throwSystemError(int err, const char* msg) {
- throw std::system_error(err, std::system_category(), msg);
-}
-
-// Helper to throw std::system_error from errno
-void throwSystemError(const char* msg) __attribute__((noreturn));
-void throwSystemError(const char* msg) {
- throwSystemError(errno, msg);
-}
-
-// Check a Posix return code (0 on success, error number on error), throw
-// on error.
-void checkPosixError(int err, const char* msg) {
- if (err != 0) {
- throwSystemError(err, msg);
- }
-}
-
-// Check a traditional Uinx return code (-1 and sets errno on error), throw
-// on error.
-void checkUnixError(ssize_t ret, const char* msg) {
- if (ret == -1) {
- throwSystemError(msg);
- }
-}
-void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
- if (ret == -1) {
- throwSystemError(savedErrno, msg);
- }
-}
-
// Check a wait() status, throw on non-successful
void checkStatus(ProcessReturnCode returnCode) {
if (returnCode.state() != ProcessReturnCode::EXITED ||
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/AsyncIO.h"
+
+#include <cerrno>
+
+#include <glog/logging.h>
+
+#include "folly/Exception.h"
+#include "folly/Likely.h"
+#include "folly/String.h"
+#include "folly/eventfd.h"
+
+namespace folly {
+
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
+ : ctx_(0),
+ pending_(0),
+ capacity_(capacity),
+ pollFd_(-1) {
+ if (UNLIKELY(capacity_ == 0)) {
+ throw std::out_of_range("AsyncIO: capacity must not be 0");
+ }
+ completed_.reserve(capacity_);
+ if (pollMode == POLLABLE) {
+ pollFd_ = eventfd(0, EFD_NONBLOCK);
+ checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
+ }
+}
+
+AsyncIO::~AsyncIO() {
+ CHECK_EQ(pending_, 0);
+ if (ctx_) {
+ int rc = io_queue_release(ctx_);
+ CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
+ }
+ if (pollFd_ != -1) {
+ CHECK_ERR(close(pollFd_));
+ }
+}
+
+void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
+ iocb cb;
+ io_prep_pread(&cb, fd, buf, size, start);
+ submit(op, &cb);
+}
+
+void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
+ off_t start) {
+ pread(op, fd, range.begin(), range.size(), start);
+}
+
+void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
+ off_t start) {
+ iocb cb;
+ io_prep_preadv(&cb, fd, iov, iovcnt, start);
+ submit(op, &cb);
+}
+
+void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
+ off_t start) {
+ iocb cb;
+ io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
+ submit(op, &cb);
+}
+
+void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
+ off_t start) {
+ pwrite(op, fd, range.begin(), range.size(), start);
+}
+
+void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
+ off_t start) {
+ iocb cb;
+ io_prep_pwritev(&cb, fd, iov, iovcnt, start);
+ submit(op, &cb);
+}
+
+void AsyncIO::initializeContext() {
+ if (!ctx_) {
+ int rc = io_queue_init(capacity_, &ctx_);
+ // returns negative errno
+ checkKernelError(rc, "AsyncIO: io_queue_init failed");
+ DCHECK(ctx_);
+ }
+}
+
+void AsyncIO::submit(Op* op, iocb* cb) {
+ if (UNLIKELY(pending_ >= capacity_)) {
+ throw std::out_of_range("AsyncIO: too many pending requests");
+ }
+ if (UNLIKELY(op->state() != Op::UNINITIALIZED)) {
+ throw std::logic_error("AsyncIO: Invalid Op state in submit");
+ }
+ initializeContext(); // on demand
+ cb->data = op;
+ if (pollFd_ != -1) {
+ io_set_eventfd(cb, pollFd_);
+ }
+ int rc = io_submit(ctx_, 1, &cb);
+ checkKernelError(rc, "AsyncIO: io_submit failed");
+ DCHECK_EQ(rc, 1);
+ op->start();
+ ++pending_;
+}
+
+Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
+ if (UNLIKELY(!ctx_)) {
+ throw std::logic_error("AsyncIO: wait called with no requests");
+ }
+ if (UNLIKELY(pollFd_ != -1)) {
+ throw std::logic_error("AsyncIO: wait not allowed on pollable object");
+ }
+ return doWait(minRequests, pending_);
+}
+
+Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
+ if (UNLIKELY(!ctx_)) {
+ throw std::logic_error("AsyncIO: pollCompleted called with no requests");
+ }
+ if (UNLIKELY(pollFd_ == -1)) {
+ throw std::logic_error(
+ "AsyncIO: pollCompleted not allowed on non-pollable object");
+ }
+ uint64_t numEvents;
+ // This sets the eventFd counter to 0, see
+ // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
+ ssize_t rc;
+ do {
+ rc = ::read(pollFd_, &numEvents, 8);
+ } while (rc == -1 && errno == EINTR);
+ if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
+ return Range<Op**>(); // nothing completed
+ }
+ checkUnixError(rc, "AsyncIO: read from event fd failed");
+ DCHECK_EQ(rc, 8);
+
+ DCHECK_GT(numEvents, 0);
+ DCHECK_LE(numEvents, pending_);
+
+ // Don't reap more than numEvents, as we've just reset the counter to 0.
+ return doWait(numEvents, numEvents);
+}
+
+Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
+ io_event events[pending_];
+ int count;
+ do {
+ // Wait forever
+ count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
+ } while (count == -EINTR);
+ checkKernelError(count, "AsyncIO: io_getevents failed");
+ DCHECK_GE(count, minRequests); // the man page says so
+ DCHECK_LE(count, pending_);
+
+ completed_.clear();
+ if (count == 0) {
+ return folly::Range<Op**>();
+ }
+
+ for (size_t i = 0; i < count; ++i) {
+ Op* op = static_cast<Op*>(events[i].data);
+ DCHECK(op);
+ op->complete(events[i].res);
+ completed_.push_back(op);
+ }
+ pending_ -= count;
+
+ return folly::Range<Op**>(&completed_.front(), count);
+}
+
+AsyncIO::Op::Op()
+ : state_(UNINITIALIZED),
+ result_(-EINVAL) {
+}
+
+void AsyncIO::Op::reset() {
+ if (UNLIKELY(state_ == PENDING)) {
+ throw std::logic_error("AsyncIO: invalid state for reset");
+ }
+ state_ = UNINITIALIZED;
+ result_ = -EINVAL;
+}
+
+AsyncIO::Op::~Op() {
+ CHECK_NE(state_, PENDING);
+}
+
+void AsyncIO::Op::start() {
+ DCHECK_EQ(state_, UNINITIALIZED);
+ state_ = PENDING;
+}
+
+void AsyncIO::Op::complete(ssize_t result) {
+ DCHECK_EQ(state_, PENDING);
+ state_ = COMPLETED;
+ result_ = result;
+ onCompleted();
+}
+
+void AsyncIO::Op::onCompleted() { } // default: do nothing
+
+ssize_t AsyncIO::Op::result() const {
+ if (UNLIKELY(state_ != COMPLETED)) {
+ throw std::logic_error("AsyncIO: Invalid Op state in result");
+ }
+ return result_;
+}
+
+CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
+
+CallbackOp::~CallbackOp() { }
+
+CallbackOp* CallbackOp::make(Callback&& callback) {
+ // Ensure created on the heap
+ return new CallbackOp(std::move(callback));
+}
+
+void CallbackOp::onCompleted() {
+ callback_(result());
+ delete this;
+}
+
+} // namespace folly
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_ASYNCIO_H_
+#define FOLLY_IO_ASYNCIO_H_
+
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <libaio.h>
+
+#include <cstdint>
+#include <functional>
+#include <utility>
+#include <vector>
+
+#include <boost/noncopyable.hpp>
+
+#include "folly/Portability.h"
+#include "folly/Range.h"
+
+namespace folly {
+
+/**
+ * C++ interface around Linux Async IO.
+ */
+class AsyncIO : private boost::noncopyable {
+ public:
+ enum PollMode {
+ NOT_POLLABLE,
+ POLLABLE
+ };
+
+ /**
+ * Create an AsyncIO context capacble of holding at most 'capacity' pending
+ * requests at the same time. As requests complete, others can be scheduled,
+ * as long as this limit is not exceeded.
+ *
+ * Note: the maximum number of allowed concurrent requests is controlled
+ * by the fs.aio-max-nr sysctl, the default value is usually 64K.
+ *
+ * If pollMode is POLLABLE, pollFd() will return a file descriptor that
+ * can be passed to poll / epoll / select and will become readable when
+ * any IOs on this AioReader have completed. If you do this, you must use
+ * pollCompleted() instead of wait() -- do not read from the pollFd()
+ * file descriptor directly.
+ */
+ explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
+ ~AsyncIO();
+
+ /**
+ * An Op represents a pending operation. You may inherit from Op (and
+ * override onCompleted) in order to be notified of completion (see
+ * CallbackOp below for an example), or you may use Op's methods directly.
+ *
+ * The Op must remain allocated until completion.
+ */
+ class Op : private boost::noncopyable {
+ friend class AsyncIO;
+ public:
+ Op();
+ virtual ~Op();
+
+ // There would be a cancel() method here if Linux AIO actually implemented
+ // it. But let's not get your hopes up.
+
+ enum State {
+ UNINITIALIZED,
+ PENDING,
+ COMPLETED
+ };
+
+ /**
+ * Return the current operation state.
+ */
+ State state() const { return state_; }
+
+ /**
+ * Reset the operation for reuse. It is an error to call reset() on
+ * an Op that is still pending.
+ */
+ void reset();
+
+ /**
+ * Retrieve the result of this operation. Returns >=0 on success,
+ * -errno on failure (that is, using the Linux kernel error reporting
+ * conventions). Use checkKernelError (folly/Exception.h) on the result to
+ * throw a std::system_error in case of error instead.
+ *
+ * It is an error to call this if the Op hasn't yet started or is still
+ * pending.
+ */
+ ssize_t result() const;
+
+ private:
+ void start();
+ void complete(ssize_t result);
+
+ virtual void onCompleted();
+
+ State state_;
+ ssize_t result_;
+ };
+
+ /**
+ * Initiate a read request.
+ */
+ void pread(Op* op, int fd, void* buf, size_t size, off_t start);
+ void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
+ void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
+
+ /**
+ * Initiate a write request.
+ */
+ void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
+ void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
+ void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
+
+ /**
+ * Wait for at least minRequests to complete. Returns the requests that
+ * have completed; the returned range is valid until the next call to
+ * wait(). minRequests may be 0 to not block.
+ */
+ Range<Op**> wait(size_t minRequests);
+
+ /**
+ * Return the number of pending requests.
+ */
+ size_t pending() const { return pending_; }
+
+ /**
+ * Return the maximum number of requests that can be kept outstanding
+ * at any one time.
+ */
+ size_t capacity() const { return capacity_; }
+
+ /**
+ * If POLLABLE, return a file descriptor that can be passed to poll / epoll
+ * and will become readable when any async IO operations have completed.
+ * If NOT_POLLABLE, return -1.
+ */
+ int pollFd() const { return pollFd_; }
+
+ /**
+ * If POLLABLE, call instead of wait after the file descriptor returned
+ * by pollFd() became readable. The returned range is valid until the next
+ * call to pollCompleted().
+ */
+ Range<Op**> pollCompleted();
+
+ private:
+ void initializeContext();
+ void submit(Op* op, iocb* cb);
+ Range<Op**> doWait(size_t minRequests, size_t maxRequests);
+
+ io_context_t ctx_;
+ size_t pending_;
+ size_t capacity_;
+ int pollFd_;
+ std::vector<Op*> completed_;
+};
+
+/**
+ * Implementation of AsyncIO::Op that calls a callback and then deletes
+ * itself.
+ */
+class CallbackOp : public AsyncIO::Op {
+ public:
+ typedef std::function<void(ssize_t)> Callback;
+ static CallbackOp* make(Callback&& callback);
+
+ private:
+ explicit CallbackOp(Callback&& callback);
+ ~CallbackOp();
+ void onCompleted() FOLLY_OVERRIDE;
+
+ Callback callback_;
+};
+
+} // namespace folly
+
+#endif /* FOLLY_IO_ASYNCIO_H_ */
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/AsyncIO.h"
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include <cstdlib>
+#include <cstdio>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/experimental/io/FsUtil.h"
+#include "folly/ScopeGuard.h"
+#include "folly/String.h"
+
+namespace fs = folly::fs;
+using folly::AsyncIO;
+
+namespace {
+
+constexpr size_t kAlignment = 512; // align reads to 512 B (for O_DIRECT)
+
+struct TestSpec {
+ off_t start;
+ size_t size;
+};
+
+void waitUntilReadable(int fd) {
+ pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+
+ int r;
+ do {
+ r = poll(&pfd, 1, -1); // wait forever
+ } while (r == -1 && errno == EINTR);
+ PCHECK(r == 1);
+ CHECK_EQ(pfd.revents, POLLIN); // no errors etc
+}
+
+folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
+ int fd = reader->pollFd();
+ if (fd == -1) {
+ return reader->wait(1);
+ } else {
+ waitUntilReadable(fd);
+ return reader->pollCompleted();
+ }
+}
+
+// Temporary file that is NOT kept open but is deleted on exit.
+// Generate random-looking but reproduceable data.
+class TemporaryFile {
+ public:
+ explicit TemporaryFile(size_t size);
+ ~TemporaryFile();
+
+ const fs::path path() const { return path_; }
+
+ private:
+ fs::path path_;
+};
+
+TemporaryFile::TemporaryFile(size_t size)
+ : path_(fs::temp_directory_path() / fs::unique_path()) {
+ CHECK_EQ(size % sizeof(uint32_t), 0);
+ size /= sizeof(uint32_t);
+ const uint32_t seed = 42;
+ std::mt19937 rnd(seed);
+
+ const size_t bufferSize = 1U << 16;
+ uint32_t buffer[bufferSize];
+
+ FILE* fp = ::fopen(path_.c_str(), "wb");
+ PCHECK(fp != nullptr);
+ while (size) {
+ size_t n = std::min(size, bufferSize);
+ for (size_t i = 0; i < n; ++i) {
+ buffer[i] = rnd();
+ }
+ size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
+ PCHECK(written == n);
+ size -= written;
+ }
+ PCHECK(::fclose(fp) == 0);
+}
+
+TemporaryFile::~TemporaryFile() {
+ try {
+ fs::remove(path_);
+ } catch (const fs::filesystem_error& e) {
+ LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
+ }
+}
+
+TemporaryFile thisBinary(6 << 20); // 6MiB
+
+void testReadsSerially(const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
+ AsyncIO aioReader(1, pollMode);
+ AsyncIO::Op op;
+ int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+
+ for (int i = 0; i < specs.size(); i++) {
+ std::unique_ptr<char[]> buf(new char[specs[i].size]);
+ aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
+ EXPECT_EQ(aioReader.pending(), 1);
+ auto ops = readerWait(&aioReader);
+ EXPECT_EQ(1, ops.size());
+ EXPECT_TRUE(ops[0] == &op);
+ EXPECT_EQ(aioReader.pending(), 0);
+ ssize_t res = op.result();
+ EXPECT_LE(0, res) << folly::errnoStr(-res);
+ EXPECT_EQ(specs[i].size, res);
+ op.reset();
+ }
+}
+
+void testReadsParallel(const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
+ AsyncIO aioReader(specs.size(), pollMode);
+ std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
+ std::vector<std::unique_ptr<char[]>> bufs(specs.size());
+
+ int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+ for (int i = 0; i < specs.size(); i++) {
+ bufs[i].reset(new char[specs[i].size]);
+ aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
+ specs[i].start);
+ }
+ std::vector<bool> pending(specs.size(), true);
+
+ size_t remaining = specs.size();
+ while (remaining != 0) {
+ EXPECT_EQ(remaining, aioReader.pending());
+ auto completed = readerWait(&aioReader);
+ size_t nrRead = completed.size();
+ EXPECT_NE(nrRead, 0);
+ remaining -= nrRead;
+
+ for (int i = 0; i < nrRead; i++) {
+ int id = completed[i] - ops.get();
+ EXPECT_GE(id, 0);
+ EXPECT_LT(id, specs.size());
+ EXPECT_TRUE(pending[id]);
+ pending[id] = false;
+ ssize_t res = ops[id].result();
+ EXPECT_LE(0, res) << folly::errnoStr(-res);
+ EXPECT_EQ(specs[id].size, res);
+ }
+ }
+ EXPECT_EQ(aioReader.pending(), 0);
+ for (int i = 0; i < pending.size(); i++) {
+ EXPECT_FALSE(pending[i]);
+ }
+}
+
+void testReads(const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
+ testReadsSerially(specs, pollMode);
+ testReadsParallel(specs, pollMode);
+}
+
+} // anonymous namespace
+
+TEST(AsyncIO, ZeroAsyncDataNotPollable) {
+ testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
+}
+
+TEST(AsyncIO, ZeroAsyncDataPollable) {
+ testReads({{0, 0}}, AsyncIO::POLLABLE);
+}
+
+TEST(AsyncIO, SingleAsyncDataNotPollable) {
+ testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
+ testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
+}
+
+TEST(AsyncIO, SingleAsyncDataPollable) {
+ testReads({{0, 512}}, AsyncIO::POLLABLE);
+ testReads({{0, 512}}, AsyncIO::POLLABLE);
+}
+
+TEST(AsyncIO, MultipleAsyncDataNotPollable) {
+ testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
+ testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
+
+ testReads({
+ {0, 5*1024*1024},
+ {512, 5*1024*1024},
+ }, AsyncIO::NOT_POLLABLE);
+
+ testReads({
+ {512, 0},
+ {512, 512},
+ {512, 1024},
+ {512, 10*1024},
+ {512, 1024*1024},
+ }, AsyncIO::NOT_POLLABLE);
+}
+
+TEST(AsyncIO, MultipleAsyncDataPollable) {
+ testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
+ testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
+
+ testReads({
+ {0, 5*1024*1024},
+ {512, 5*1024*1024},
+ }, AsyncIO::POLLABLE);
+
+ testReads({
+ {512, 0},
+ {512, 512},
+ {512, 1024},
+ {512, 10*1024},
+ {512, 1024*1024},
+ }, AsyncIO::POLLABLE);
+}
+
+TEST(AsyncIO, ManyAsyncDataNotPollable) {
+ {
+ std::vector<TestSpec> v;
+ for (int i = 0; i < 1000; i++) {
+ v.push_back({512 * i, 512});
+ }
+ testReads(v, AsyncIO::NOT_POLLABLE);
+ }
+}
+
+TEST(AsyncIO, ManyAsyncDataPollable) {
+ {
+ std::vector<TestSpec> v;
+ for (int i = 0; i < 1000; i++) {
+ v.push_back({512 * i, 512});
+ }
+ testReads(v, AsyncIO::POLLABLE);
+ }
+}
+
+TEST(AsyncIO, NonBlockingWait) {
+ AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
+ AsyncIO::Op op;
+ int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+ size_t size = 1024;
+ std::unique_ptr<char[]> buf(new char[size]);
+ aioReader.pread(&op, fd, buf.get(), size, 0);
+ EXPECT_EQ(aioReader.pending(), 1);
+
+ folly::Range<AsyncIO::Op**> completed;
+ while (completed.empty()) {
+ // poll without blocking until the read request completes.
+ completed = aioReader.wait(0);
+ }
+ EXPECT_EQ(completed.size(), 1);
+
+ EXPECT_TRUE(completed[0] == &op);
+ ssize_t res = op.result();
+ EXPECT_LE(0, res) << folly::errnoStr(-res);
+ EXPECT_EQ(size, res);
+ EXPECT_EQ(aioReader.pending(), 0);
+}
+