/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
manager.addTask([&]() {
Baton baton;
- auto res = baton.timed_wait(std::chrono::milliseconds(230));
+ auto res = baton.try_wait_for(std::chrono::milliseconds(230));
EXPECT_FALSE(res);
EXPECT_EQ(5, iterations);
manager.addTask([&]() {
Baton baton;
- auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto res = baton.try_wait_for(std::chrono::milliseconds(130));
EXPECT_FALSE(res);
EXPECT_EQ(3, iterations);
Baton baton;
baton_ptr = &baton;
- auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto res = baton.try_wait_for(std::chrono::milliseconds(130));
EXPECT_TRUE(res);
EXPECT_EQ(2, iterations);
Baton baton;
auto start = EventBaseLoopController::Clock::now();
- auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
+ auto res = baton.try_wait_for(std::chrono::milliseconds(timeout_ms));
auto finish = EventBaseLoopController::Clock::now();
EXPECT_FALSE(res);
evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
auto start = EventBaseLoopController::Clock::now();
- auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto res = baton.try_wait_for(std::chrono::milliseconds(130));
auto finish = EventBaseLoopController::Clock::now();
EXPECT_TRUE(res);
getFiberManager(evb)
.addTaskFuture([&] {
EXPECT_THROW(
- await([](Promise<int> p) {
+ await([](Promise<int> p) {
p.setValue(42);
throw ExpectedException();
}),
- ExpectedException
- );
+ ExpectedException);
EXPECT_THROW(
- await([&](Promise<int> p) {
+ await([&](Promise<int> p) {
evb.runInEventBaseThread([p = std::move(p)]() mutable {
- p.setValue(42);
- });
+ p.setValue(42);
+ });
throw ExpectedException();
}),
- ExpectedException);
+ ExpectedException);
})
.waitVia(&evb);
}
manager.addTask([&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() {
+ funcs.push_back([&pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
manager.addTask([&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() -> size_t {
+ funcs.push_back([&pendingFibers]() -> size_t {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
manager.addTask([&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() {
+ funcs.push_back([&pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
manager.addTask([&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() {
+ funcs.push_back([&pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
manager.addTask([&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() {
+ funcs.push_back([&pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
int here;
/* 2 pages is a good guess */
- constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
+ constexpr auto const kHereToFiberMaxDist = 0x2000 / sizeof(int);
+
+ // With ASAN's detect_stack_use_after_return=1, this must be much larger
+ // I measured 410028 on x86_64, so allow for quadruple that, just in case.
+ constexpr auto const kHereToMainMaxDist =
+ folly::kIsSanitizeAddress ? 4 * 410028 : kHereToFiberMaxDist;
+
if (fiberLocation) {
- EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
+ EXPECT_GT(std::abs(&here - fiberLocation), kHereToFiberMaxDist);
}
if (mainLocation) {
- EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
+ EXPECT_LT(std::abs(&here - mainLocation), kHereToMainMaxDist);
}
EXPECT_FALSE(ran);
ran = true;
}
-}
+} // namespace
TEST(FiberManager, runInMainContext) {
FiberManager manager(std::make_unique<SimpleLoopController>());
}
TEST(FiberManager, semaphore) {
- constexpr size_t kTasks = 10;
- constexpr size_t kIterations = 10000;
- constexpr size_t kNumTokens = 10;
+ static constexpr size_t kTasks = 10;
+ static constexpr size_t kIterations = 10000;
+ static constexpr size_t kNumTokens = 10;
Semaphore sem(kNumTokens);
int counterA = 0;
int counterB = 0;
- auto task = [&sem, kTasks, kIterations, kNumTokens](
- int& counter, folly::fibers::Baton& baton) {
+ auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
FiberManager manager(std::make_unique<EventBaseLoopController>());
folly::EventBase evb;
dynamic_cast<EventBaseLoopController&>(manager.loopController())
std::vector<int>,
std::vector<std::string>,
ExecutorT>
- batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
- std::vector<std::vector<std::string>> results;
- int numberOfElements = 0;
- for (auto& unit : batch) {
- numberOfElements += unit.size();
- std::vector<std::string> result;
- for (auto& element : unit) {
- result.push_back(folly::to<std::string>(element));
- }
- results.push_back(std::move(result));
- }
- EXPECT_EQ(totalNumberOfElements, numberOfElements);
- return results;
- });
+ batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+ std::vector<std::vector<std::string>> results;
+ int numberOfElements = 0;
+ for (auto& unit : batch) {
+ numberOfElements += unit.size();
+ std::vector<std::string> result;
+ for (auto& element : unit) {
+ result.push_back(folly::to<std::string>(element));
+ }
+ results.push_back(std::move(result));
+ }
+ EXPECT_EQ(totalNumberOfElements, numberOfElements);
+ return results;
+ });
return batchDispatcher.add(std::move(input));
}
ExecutorT& executor,
int totalNumberOfElements,
int index) {
- thread_local BatchDispatcher<int, std::string, ExecutorT>
- batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
- EXPECT_EQ(totalNumberOfElements, batch.size());
- std::vector<std::string> results;
- std::vector<folly::Future<std::vector<std::string>>>
- innerDispatchResultFutures;
-
- std::vector<int> group;
- for (auto unit : batch) {
- group.push_back(unit);
- if (group.size() == 5) {
- auto localGroup = group;
- group.clear();
-
- innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
- executor, totalNumberOfElements, localGroup));
- }
- }
-
- folly::collectAll(
- innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
- .then([&](
- std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
- for (auto& unit : innerDispatchResults) {
- for (auto& element : unit.value()) {
- results.push_back(element);
- }
+ thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+ executor, [=, &executor](std::vector<int>&& batch) {
+ EXPECT_EQ(totalNumberOfElements, batch.size());
+ std::vector<std::string> results;
+ std::vector<folly::Future<std::vector<std::string>>>
+ innerDispatchResultFutures;
+
+ std::vector<int> group;
+ for (auto unit : batch) {
+ group.push_back(unit);
+ if (group.size() == 5) {
+ auto localGroup = group;
+ group.clear();
+
+ innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+ executor, totalNumberOfElements, localGroup));
}
- })
- .get();
- return results;
- });
+ }
+
+ folly::collectAll(
+ innerDispatchResultFutures.begin(),
+ innerDispatchResultFutures.end())
+ .then([&](std::vector<Try<std::vector<std::string>>>
+ innerDispatchResults) {
+ for (auto& unit : innerDispatchResults) {
+ for (auto& element : unit.value()) {
+ results.push_back(element);
+ }
+ }
+ })
+ .get();
+ return results;
+ });
auto indexCopy = index;
auto result = batchDispatcher.add(std::move(indexCopy));
template <typename ExecutorT>
void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
- executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
+ executor, [](std::vector<int> &&) -> std::vector<int> {
throw std::runtime_error("Surprise!!");
});
EXPECT_EQ(numResultsFilled, expectedNumResults);
}
-} // AtomicBatchDispatcherTesting
+} // namespace AtomicBatchDispatcherTesting
#define SET_UP_TEST_FUNC \
using namespace AtomicBatchDispatcherTesting; \
getFiberManager(*evb1).addTaskRemote([&] {
Baton baton;
- baton.timed_wait(std::chrono::milliseconds{100});
+ baton.try_wait_for(std::chrono::milliseconds{100});
done1 = true;
});
getFiberManager(evb2).addTaskRemote([&] {
Baton baton;
- baton.timed_wait(std::chrono::milliseconds{200});
+ baton.try_wait_for(std::chrono::milliseconds{200});
done2 = true;
});
EXPECT_TRUE(done2);
}
+TEST(TimedMutex, ThreadsAndFibersDontDeadlock) {
+ folly::EventBase evb;
+ auto& fm = getFiberManager(evb);
+ TimedMutex mutex;
+ std::thread testThread([&] {
+ for (int i = 0; i < 100; i++) {
+ mutex.lock();
+ mutex.unlock();
+ {
+ Baton b;
+ b.try_wait_for(std::chrono::milliseconds(1));
+ }
+ }
+ });
+
+ for (int numFibers = 0; numFibers < 100; numFibers++) {
+ fm.addTask([&] {
+ for (int i = 0; i < 20; i++) {
+ mutex.lock();
+ {
+ Baton b;
+ b.try_wait_for(std::chrono::milliseconds(1));
+ }
+ mutex.unlock();
+ {
+ Baton b;
+ b.try_wait_for(std::chrono::milliseconds(1));
+ }
+ }
+ });
+ }
+
+ evb.loop();
+ EXPECT_EQ(0, fm.hasTasks());
+ testThread.join();
+}
+
TEST(TimedMutex, ThreadFiberDeadlockOrder) {
folly::EventBase evb;
auto& fm = getFiberManager(evb);
*/
#ifndef FOLLY_SANITIZE_ADDRESS
TEST(FiberManager, recordStack) {
- std::thread([] {
+ auto f = [] {
folly::fibers::FiberManager::Options opts;
opts.recordStackEvery = 1;
// Check that we properly accounted fiber stack usage.
EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
- }).join();
+ };
+ std::thread(f).join();
}
#endif