namespace folly {
AsyncIOOp::AsyncIOOp(NotificationCallback cb)
- : cb_(std::move(cb)),
- state_(State::UNINITIALIZED),
- result_(-EINVAL) {
+ : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
memset(&iocb_, 0, sizeof(iocb_));
}
// returns negative errno
if (rc == -EAGAIN) {
long aio_nr, aio_max;
- std::unique_ptr<FILE, int(*)(FILE*)>
- fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
+ std::unique_ptr<FILE, int (*)(FILE*)> fp(
+ fopen("/proc/sys/fs/aio-nr", "r"), fclose);
PCHECK(fp);
CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
- std::unique_ptr<FILE, int(*)(FILE*)>
- aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
+ std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
+ fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
PCHECK(aio_max_fp);
CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
void AsyncIO::submit(Op* op) {
CHECK_EQ(op->state(), Op::State::INITIALIZED);
- initializeContext(); // on demand
+ initializeContext(); // on demand
// We can increment past capacity, but we'll clean up after ourselves.
auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
throw std::range_error("AsyncIO: too many pending requests");
}
iocb* cb = &op->iocb_;
- cb->data = nullptr; // unused
+ cb->data = nullptr; // unused
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
rc = ::read(pollFd_, &numEvents, 8);
} while (rc == -1 && errno == EINTR);
if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
- return Range<Op**>(); // nothing completed
+ return Range<Op**>(); // nothing completed
}
checkUnixError(rc, "AsyncIO: read from event fd failed");
DCHECK_EQ(rc, 8);
// GOTCHA: io_getevents() may returns less than min_nr results if
// interrupted after some events have been read (if before, -EINTR
// is returned).
- ret = io_getevents(ctx_,
- minRequests - count,
- maxRequests - count,
- events + count,
- /* timeout */ nullptr); // wait forever
+ ret = io_getevents(
+ ctx_,
+ minRequests - count,
+ maxRequests - count,
+ events + count,
+ /* timeout */ nullptr); // wait forever
} while (ret == -EINTR);
// Check as may not be able to recover without leaking events.
CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
return range(result);
}
-AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
- : asyncIO_(asyncIO) {
-}
+AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
AsyncIOQueue::~AsyncIOQueue() {
CHECK_EQ(asyncIO_->pending(), 0);
maybeDequeue();
}
-void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) {
+ maybeDequeue();
+}
void AsyncIOQueue::maybeDequeue() {
while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
auto& nextCb = op->notificationCallback();
op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
this->onCompleted(op2);
- if (nextCb) nextCb(op2);
+ if (nextCb) {
+ nextCb(op2);
+ }
});
asyncIO_->submit(op);
namespace {
-#define X(c) case c: return #c
+#define X(c) \
+ case c: \
+ return #c
const char* asyncIoOpStateToString(AsyncIOOp::State state) {
switch (state) {
std::string path = folly::to<std::string>("/proc/self/fd/", fd);
char link[PATH_MAX];
const ssize_t length =
- std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
+ std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
return path.assign(link, length);
}
std::ostream& operator<<(std::ostream& os, const iocb& cb) {
os << folly::format(
- "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
- cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
- cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
+ "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
+ cb.data,
+ cb.key,
+ iocbCmdToString(cb.aio_lio_opcode),
+ cb.aio_reqprio,
+ cb.aio_fildes,
+ fd2name(cb.aio_fildes));
switch (cb.aio_lio_opcode) {
case IO_CMD_PREAD:
case IO_CMD_PWRITE:
- os << folly::format("buf={}, offset={}, nbytes={}, ",
- cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
+ os << folly::format(
+ "buf={}, offset={}, nbytes={}, ",
+ cb.u.c.buf,
+ cb.u.c.offset,
+ cb.u.c.nbytes);
break;
default:
os << "[TODO: write debug string for "
return os;
}
-} // anonymous namespace
+} // anonymous namespace
std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
os << "{" << op.state_ << ", ";
return os << asyncIoOpStateToString(state);
}
-} // namespace folly
+} // namespace folly
/**
* Return the current operation state.
*/
- State state() const { return state_; }
+ State state() const {
+ return state_;
+ }
/**
* Reset the operation for reuse. It is an error to call reset() on
*/
void reset(NotificationCallback cb = NotificationCallback());
- void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
- const NotificationCallback& notificationCallback() const { return cb_; }
+ void setNotificationCallback(NotificationCallback cb) {
+ cb_ = std::move(cb);
+ }
+ const NotificationCallback& notificationCallback() const {
+ return cb_;
+ }
/**
* Retrieve the result of this operation. Returns >=0 on success,
/**
* Return the number of pending requests.
*/
- size_t pending() const { return pending_; }
+ size_t pending() const {
+ return pending_;
+ }
/**
* Return the maximum number of requests that can be kept outstanding
* at any one time.
*/
- size_t capacity() const { return capacity_; }
+ size_t capacity() const {
+ return capacity_;
+ }
/**
* Return the accumulative number of submitted I/O, since this object
* has been created.
*/
- size_t totalSubmits() const { return submitted_; }
+ size_t totalSubmits() const {
+ return submitted_;
+ }
/**
* If POLLABLE, return a file descriptor that can be passed to poll / epoll
* and will become readable when any async IO operations have completed.
* If NOT_POLLABLE, return -1.
*/
- int pollFd() const { return pollFd_; }
+ int pollFd() const {
+ return pollFd_;
+ }
/**
* If POLLABLE, call instead of wait after the file descriptor returned
explicit AsyncIOQueue(AsyncIO* asyncIO);
~AsyncIOQueue();
- size_t queued() const { return queue_.size(); }
+ size_t queued() const {
+ return queue_.size();
+ }
/**
* Submit an op to the AsyncIO queue. The op will be queued until
std::deque<OpFactory> queue_;
};
-} // namespace folly
+} // namespace folly
}
return true;
}
-} // namespace
+} // namespace
bool starts_with(const path& pth, const path& prefix) {
path::const_iterator it;
if (!skipPrefix(pth, prefix, it)) {
throw filesystem_error(
"Path does not start with prefix",
- pth, prefix,
+ pth,
+ prefix,
bsys::errc::make_error_code(bsys::errc::invalid_argument));
}
return read_symlink("/proc/self/exe");
}
-} // namespace fs
-} // namespace folly
+} // namespace fs
+} // namespace folly
*/
path executable_path();
-} // namespace fs
-} // namespace folly
+} // namespace fs
+} // namespace folly
}
}
-} // namespace
+} // namespace
-
-int main(int argc, char *argv[]) {
+int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_cp) {
- if (argc != 3) usage(argv[0]);
+ if (argc != 3) {
+ usage(argv[0]);
+ }
copy(argv[1], argv[2]);
} else {
- if (argc != 1) usage(argv[0]);
+ if (argc != 1) {
+ usage(argv[0]);
+ }
list();
}
return 0;
size_t pageSize = 0;
boost::cmatch match;
- bool error = gen::byLine("/proc/meminfo") |
- [&] (StringPiece line) -> bool {
- if (boost::regex_match(line.begin(), line.end(), match, regex)) {
- StringPiece numStr(
- line.begin() + match.position(1), size_t(match.length(1)));
- pageSize = to<size_t>(numStr) * 1024; // in KiB
- return false; // stop
- }
- return true;
- };
+ bool error = gen::byLine("/proc/meminfo") | [&](StringPiece line) -> bool {
+ if (boost::regex_match(line.begin(), line.end(), match, regex)) {
+ StringPiece numStr(
+ line.begin() + match.position(1), size_t(match.length(1)));
+ pageSize = to<size_t>(numStr) * 1024; // in KiB
+ return false; // stop
+ }
+ return true;
+ };
if (error) {
throw std::runtime_error("Can't find default huge page size");
HugePageSizeVec readHugePageSizes() {
HugePageSizeVec sizeVec = readRawHugePageSizes();
if (sizeVec.empty()) {
- return sizeVec; // nothing to do
+ return sizeVec; // nothing to do
}
std::sort(sizeVec.begin(), sizeVec.end());
std::vector<StringPiece> options;
gen::byLine("/proc/mounts") | gen::eachAs<StringPiece>() |
- [&](StringPiece line) {
- parts.clear();
- split(" ", line, parts);
- // device path fstype options uid gid
- if (parts.size() != 6) {
- throw std::runtime_error("Invalid /proc/mounts line");
- }
- if (parts[2] != "hugetlbfs") {
- return; // we only care about hugetlbfs
- }
-
- options.clear();
- split(",", parts[3], options);
- size_t pageSize = defaultHugePageSize;
- // Search for the "pagesize" option, which must have a value
- for (auto& option : options) {
- // key=value
- const char* p = static_cast<const char*>(
- memchr(option.data(), '=', option.size()));
- if (!p) {
- continue;
+ [&](StringPiece line) {
+ parts.clear();
+ split(" ", line, parts);
+ // device path fstype options uid gid
+ if (parts.size() != 6) {
+ throw std::runtime_error("Invalid /proc/mounts line");
+ }
+ if (parts[2] != "hugetlbfs") {
+ return; // we only care about hugetlbfs
+ }
+
+ options.clear();
+ split(",", parts[3], options);
+ size_t pageSize = defaultHugePageSize;
+ // Search for the "pagesize" option, which must have a value
+ for (auto& option : options) {
+ // key=value
+ const char* p = static_cast<const char*>(
+ memchr(option.data(), '=', option.size()));
+ if (!p) {
+ continue;
+ }
+ if (StringPiece(option.data(), p) != "pagesize") {
+ continue;
+ }
+ pageSize = parsePageSizeValue(StringPiece(p + 1, option.end()));
+ break;
+ }
+
+ auto pos = std::lower_bound(
+ sizeVec.begin(), sizeVec.end(), pageSize, PageSizeLess());
+ if (pos == sizeVec.end() || pos->size != pageSize) {
+ throw std::runtime_error("Mount page size not found");
}
- if (StringPiece(option.data(), p) != "pagesize") {
- continue;
+ if (!pos->mountPoint.empty()) {
+ // Only one mount point per page size is allowed
+ return;
+ }
+
+ // Store mount point
+ fs::path path(parts[1].begin(), parts[1].end());
+ struct stat st;
+ const int ret = stat(path.string().c_str(), &st);
+ if (ret == -1 && errno == ENOENT) {
+ return;
}
- pageSize = parsePageSizeValue(StringPiece(p + 1, option.end()));
- break;
- }
-
- auto pos = std::lower_bound(sizeVec.begin(), sizeVec.end(), pageSize,
- PageSizeLess());
- if (pos == sizeVec.end() || pos->size != pageSize) {
- throw std::runtime_error("Mount page size not found");
- }
- if (!pos->mountPoint.empty()) {
- // Only one mount point per page size is allowed
- return;
- }
-
- // Store mount point
- fs::path path(parts[1].begin(), parts[1].end());
- struct stat st;
- const int ret = stat(path.string().c_str(), &st);
- if (ret == -1 && errno == ENOENT) {
- return;
- }
- checkUnixError(ret, "stat hugepage mountpoint failed");
- pos->mountPoint = fs::canonical(path);
- pos->device = st.st_dev;
- };
+ checkUnixError(ret, "stat hugepage mountpoint failed");
+ pos->mountPoint = fs::canonical(path);
+ pos->device = st.st_dev;
+ };
return sizeVec;
}
-} // namespace
+} // namespace
const HugePageSizeVec& getHugePageSizes() {
static HugePageSizeVec sizes = readHugePageSizes();
return nullptr;
}
-} // namespace folly
+} // namespace folly
namespace folly {
struct HugePageSize : private boost::totally_ordered<HugePageSize> {
- explicit HugePageSize(size_t s) : size(s) { }
+ explicit HugePageSize(size_t s) : size(s) {}
fs::path filePath(const fs::path& relpath) const {
return mountPoint / relpath;
*/
const HugePageSize* getHugePageSizeForDevice(dev_t device);
-} // namespace folly
+} // namespace folly
namespace {
-constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
+constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
struct TestSpec {
off_t start;
int r;
do {
- r = poll(&pfd, 1, -1); // wait forever
+ r = poll(&pfd, 1, -1); // wait forever
} while (r == -1 && errno == EINTR);
PCHECK(r == 1);
- CHECK_EQ(pfd.revents, POLLIN); // no errors etc
+ CHECK_EQ(pfd.revents, POLLIN); // no errors etc
}
folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
explicit TemporaryFile(size_t size);
~TemporaryFile();
- const fs::path path() const { return path_; }
+ const fs::path path() const {
+ return path_;
+ }
private:
fs::path path_;
}
}
-TemporaryFile tempFile(6 << 20); // 6MiB
+TemporaryFile tempFile(6 << 20); // 6MiB
-typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
ManagedBuffer allocateAligned(size_t size) {
void* buf;
int rc = posix_memalign(&buf, kAlign, size);
return ManagedBuffer(reinterpret_cast<char*>(buf), free);
}
-void testReadsSerially(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+void testReadsSerially(
+ const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
AsyncIO aioReader(1, pollMode);
AsyncIO::Op op;
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
}
}
-void testReadsParallel(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode,
- bool multithreaded) {
+void testReadsParallel(
+ const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode,
+ bool multithreaded) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
std::vector<ManagedBuffer> bufs;
for (size_t i = 0; i < specs.size(); i++) {
bufs.push_back(allocateAligned(specs[i].size));
}
- auto submit = [&] (size_t i) {
+ auto submit = [&](size_t i) {
ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
aioReader.submit(&ops[i]);
};
}
}
-void testReadsQueued(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+void testReadsQueued(
+ const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
AsyncIO aioReader(readerCapacity, pollMode);
AsyncIOQueue aioQueue(&aioReader);
}
}
-void testReads(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+void testReads(const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
testReadsParallel(specs, pollMode, false);
testReadsParallel(specs, pollMode, true);
testReadsQueued(specs, pollMode);
}
-} // anonymous namespace
+} // anonymous namespace
TEST(AsyncIO, ZeroAsyncDataNotPollable) {
testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
TEST(AsyncIO, MultipleAsyncDataNotPollable) {
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::NOT_POLLABLE);
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::NOT_POLLABLE);
- testReads({
- {0, 5*1024*1024},
- {kAlign, 5*1024*1024}
- }, AsyncIO::NOT_POLLABLE);
-
- testReads({
- {kAlign, 0},
- {kAlign, kAlign},
- {kAlign, 2*kAlign},
- {kAlign, 20*kAlign},
- {kAlign, 1024*1024},
- }, AsyncIO::NOT_POLLABLE);
+ testReads(
+ {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
+
+ testReads(
+ {
+ {kAlign, 0},
+ {kAlign, kAlign},
+ {kAlign, 2 * kAlign},
+ {kAlign, 20 * kAlign},
+ {kAlign, 1024 * 1024},
+ },
+ AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, MultipleAsyncDataPollable) {
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::POLLABLE);
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::POLLABLE);
- testReads({
- {0, 5*1024*1024},
- {kAlign, 5*1024*1024}
- }, AsyncIO::NOT_POLLABLE);
-
- testReads({
- {kAlign, 0},
- {kAlign, kAlign},
- {kAlign, 2*kAlign},
- {kAlign, 20*kAlign},
- {kAlign, 1024*1024},
- }, AsyncIO::NOT_POLLABLE);
+ testReads(
+ {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
+
+ testReads(
+ {
+ {kAlign, 0},
+ {kAlign, kAlign},
+ {kAlign, 2 * kAlign},
+ {kAlign, 20 * kAlign},
+ {kAlign, 1024 * 1024},
+ },
+ AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, ManyAsyncDataNotPollable) {
void expectPathEq(const path& a, const path& b) {
EXPECT_TRUE(a == b) << "expected path=" << a << "\nactual path=" << b;
}
-} // namespace
+} // namespace
TEST(Simple, Path) {
path root("/");
EXPECT_TRUE(starts_with(abs1, root));
EXPECT_FALSE(starts_with(rel1, root));
expectPathEq(path("hello/world"), remove_prefix(abs1, root));
- EXPECT_THROW({remove_prefix(rel1, root);}, filesystem_error);
+ EXPECT_THROW({ remove_prefix(rel1, root); }, filesystem_error);
path abs2("/hello");
path abs3("/hello/");
expectPathEq(path("world"), remove_prefix(abs1, abs2));
expectPathEq(path("world"), remove_prefix(abs1, abs3));
expectPathEq(path(), remove_prefix(abs1, abs4));
- EXPECT_THROW({remove_prefix(abs1, abs5);}, filesystem_error);
- EXPECT_THROW({remove_prefix(abs1, abs6);}, filesystem_error);
+ EXPECT_THROW({ remove_prefix(abs1, abs5); }, filesystem_error);
+ EXPECT_THROW({ remove_prefix(abs1, abs6); }, filesystem_error);
}
TEST(Simple, CanonicalizeParent) {
expectPathEq(a, canonical_parent(b));
expectPathEq(a, canonical(b));
expectPathEq(a, canonical_parent(b));
- EXPECT_THROW({canonical(c);}, filesystem_error);
- EXPECT_THROW({canonical(d);}, filesystem_error);
+ EXPECT_THROW({ canonical(c); }, filesystem_error);
+ EXPECT_THROW({ canonical(d); }, filesystem_error);
expectPathEq(c, canonical_parent(c));
expectPathEq(c, canonical_parent(d));
}