}
}
+void AsyncIOOp::cancel() {
+ DCHECK_EQ(state_, State::PENDING);
+ state_ = State::CANCELED;
+}
+
ssize_t AsyncIOOp::result() const {
CHECK_EQ(state_, State::COMPLETED);
return result_;
state_ = State::INITIALIZED;
}
-AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
- : ctx_(0),
- ctxSet_(false),
- pending_(0),
- submitted_(0),
- capacity_(capacity),
- pollFd_(-1) {
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
CHECK_GT(capacity_, 0);
completed_.reserve(capacity_);
if (pollMode == POLLABLE) {
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
auto p = pending_.load(std::memory_order_acquire);
CHECK_LE(minRequests, p);
- return doWait(minRequests, p);
+ doWait(WaitType::COMPLETE, minRequests, p, &completed_);
+ return Range<Op**>(completed_.data(), completed_.size());
+}
+
+size_t AsyncIO::cancel() {
+ CHECK(ctx_);
+ auto p = pending_.load(std::memory_order_acquire);
+ doWait(WaitType::CANCEL, p, p, nullptr);
+ return p;
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
DCHECK_LE(numEvents, pending_);
// Don't reap more than numEvents, as we've just reset the counter to 0.
- return doWait(numEvents, numEvents);
+ doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_);
+ return Range<Op**>(completed_.data(), completed_.size());
}
-Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
+void AsyncIO::doWait(
+ WaitType type,
+ size_t minRequests,
+ size_t maxRequests,
+ std::vector<Op*>* result) {
io_event events[maxRequests];
+ // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
+ // WaitType::CANCEL we have to wait for IO completion.
size_t count = 0;
do {
int ret;
/* timeout */ nullptr); // wait forever
} while (ret == -EINTR);
// Check as may not be able to recover without leaking events.
- CHECK_GE(ret, 0)
- << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
+ CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
+ << errnoStr(-ret);
count += ret;
} while (count < minRequests);
DCHECK_LE(count, maxRequests);
- completed_.clear();
- if (count == 0) {
- return folly::Range<Op**>();
+ if (result != nullptr) {
+ result->clear();
}
-
for (size_t i = 0; i < count; ++i) {
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
decrementPending();
- op->complete(events[i].res);
- completed_.push_back(op);
+ switch (type) {
+ case WaitType::COMPLETE:
+ op->complete(events[i].res);
+ break;
+ case WaitType::CANCEL:
+ op->cancel();
+ break;
+ }
+ if (result != nullptr) {
+ result->push_back(op);
+ }
}
-
- return folly::Range<Op**>(&completed_.front(), count);
}
AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
X(AsyncIOOp::State::INITIALIZED);
X(AsyncIOOp::State::PENDING);
X(AsyncIOOp::State::COMPLETED);
+ X(AsyncIOOp::State::CANCELED);
}
return "<INVALID AsyncIOOp::State>";
}
* An AsyncIOOp represents a pending operation. You may set a notification
* callback or you may use this class's methods directly.
*
- * The op must remain allocated until completion.
+ * The op must remain allocated until it is completed or canceled.
*/
class AsyncIOOp : private boost::noncopyable {
friend class AsyncIO;
friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
+
public:
typedef std::function<void(AsyncIOOp*)> NotificationCallback;
explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
~AsyncIOOp();
- // There would be a cancel() method here if Linux AIO actually implemented
- // it. But let's not get your hopes up.
-
enum class State {
UNINITIALIZED,
INITIALIZED,
PENDING,
- COMPLETED
+ COMPLETED,
+ CANCELED,
};
/**
* conventions). Use checkKernelError (folly/Exception.h) on the result to
* throw a std::system_error in case of error instead.
*
- * It is an error to call this if the Op hasn't yet started or is still
- * pending.
+ * It is an error to call this if the Op hasn't completed.
*/
ssize_t result() const;
void init();
void start();
void complete(ssize_t result);
+ void cancel();
NotificationCallback cb_;
iocb iocb_;
enum PollMode {
NOT_POLLABLE,
- POLLABLE
+ POLLABLE,
};
/**
* file descriptor directly.
*
* You may use the same AsyncIO object from multiple threads, as long as
- * there is only one concurrent caller of wait() / pollCompleted() (perhaps
- * by always calling it from the same thread, or by providing appropriate
- * mutual exclusion) In this case, pending() returns a snapshot
+ * there is only one concurrent caller of wait() / pollCompleted() / cancel()
+ * (perhaps by always calling it from the same thread, or by providing
+ * appropriate mutual exclusion). In this case, pending() returns a snapshot
* of the current number of pending requests.
*/
- explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
+ explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
~AsyncIO();
/**
*/
Range<Op**> wait(size_t minRequests);
+ /**
+ * Cancel all pending requests and return their number.
+ */
+ size_t cancel();
+
/**
* Return the number of pending requests.
*/
void decrementPending();
void initializeContext();
- Range<Op**> doWait(size_t minRequests, size_t maxRequests);
+ enum class WaitType { COMPLETE, CANCEL };
+ void doWait(
+ WaitType type,
+ size_t minRequests,
+ size_t maxRequests,
+ std::vector<Op*>* result);
- io_context_t ctx_;
- std::atomic<bool> ctxSet_;
+ io_context_t ctx_{nullptr};
+ std::atomic<bool> ctxSet_{false};
std::mutex initMutex_;
- std::atomic<size_t> pending_;
- std::atomic<size_t> submitted_;
+ std::atomic<size_t> pending_{0};
+ std::atomic<size_t> submitted_{0};
const size_t capacity_;
- int pollFd_;
+ int pollFd_{-1};
std::vector<Op*> completed_;
};
#include <folly/portability/Sockets.h>
namespace fs = folly::fs;
+
using folly::AsyncIO;
+using folly::AsyncIOOp;
using folly::AsyncIOQueue;
namespace {
};
TemporaryFile::TemporaryFile(size_t size)
- : path_(fs::temp_directory_path() / fs::unique_path()) {
+ : path_(fs::temp_directory_path() / fs::unique_path()) {
CHECK_EQ(size % sizeof(uint32_t), 0);
size /= sizeof(uint32_t);
const uint32_t seed = 42;
SCOPE_EXIT {
::close(fd);
};
- size_t size = 2*kAlign;
+ size_t size = 2 * kAlign;
auto buf = allocateAligned(size);
op.pread(fd, buf.get(), size, 0);
aioReader.submit(&op);
EXPECT_EQ(size, res);
EXPECT_EQ(aioReader.pending(), 0);
}
+
+TEST(AsyncIO, Cancel) {
+ constexpr size_t kNumOps = 10;
+
+ AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE);
+ int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+
+ std::vector<AsyncIO::Op> ops(kNumOps);
+ std::vector<ManagedBuffer> bufs;
+
+ size_t completed = 0;
+ for (auto& op : ops) {
+ const size_t size = 2 * kAlign;
+ bufs.push_back(allocateAligned(size));
+ op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
+ op.pread(fd, bufs.back().get(), size, 0);
+ aioReader.submit(&op);
+ }
+
+ EXPECT_EQ(aioReader.pending(), kNumOps);
+ EXPECT_EQ(completed, 0);
+
+ {
+ auto result = aioReader.wait(1);
+ EXPECT_EQ(result.size(), 1);
+ }
+ EXPECT_EQ(completed, 1);
+ EXPECT_EQ(aioReader.pending(), kNumOps - 1);
+
+ EXPECT_EQ(aioReader.cancel(), kNumOps - 1);
+ EXPECT_EQ(aioReader.pending(), 0);
+ EXPECT_EQ(completed, 1);
+
+ completed = 0;
+ for (auto& op : ops) {
+ if (op.state() == AsyncIOOp::State::COMPLETED) {
+ ++completed;
+ } else {
+ EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op;
+ }
+ }
+ EXPECT_EQ(completed, 1);
+}