Wait for all threads to finish executing before leaving in an EventBase test
[folly.git] / folly / io / async / test / EventBaseTest.cpp
index cfea59919a66b759b807e14e26d3c00b15914c88..11f61187fa74cd8919798c8a4ea8a7e746fe1b02 100644 (file)
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 #include <folly/Memory.h>
+#include <folly/ScopeGuard.h>
 
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/EventBase.h>
@@ -25,6 +27,8 @@
 #include <folly/io/async/test/Util.h>
 #include <folly/portability/Unistd.h>
 
+#include <folly/futures/Promise.h>
+
 #include <atomic>
 #include <iostream>
 #include <memory>
@@ -1112,11 +1116,18 @@ void runInThreadTestFunc(RunInThreadArg* arg) {
 }
 
 TEST(EventBaseTest, RunInThread) {
-  uint32_t numThreads = 50;
-  uint32_t opsPerThread = 100;
+  constexpr uint32_t numThreads = 50;
+  constexpr uint32_t opsPerThread = 100;
   RunInThreadData data(numThreads, opsPerThread);
 
   deque<std::thread> threads;
+  SCOPE_EXIT {
+    // Wait on all of the threads.
+    for (auto& thread : threads) {
+      thread.join();
+    }
+  };
+
   for (uint32_t i = 0; i < numThreads; ++i) {
     threads.emplace_back([i, &data] {
         for (int n = 0; n < data.opsPerThread; ++n) {
@@ -1182,24 +1193,23 @@ TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
     auto& atom = atoms.at(i);
     atom = make_unique<atomic<size_t>>(0);
   }
-  vector<thread> threads(c);
+  vector<thread> threads;
   for (size_t i = 0; i < c; ++i) {
-    auto& atom = *atoms.at(i);
-    auto& th = threads.at(i);
-    th = thread([&atom] {
-        EventBase eb;
-        auto ebth = thread([&]{ eb.loopForever(); });
-        eb.waitUntilRunning();
-        eb.runInEventBaseThreadAndWait([&] {
-          size_t x = 0;
-          atom.compare_exchange_weak(
-              x, 1, std::memory_order_release, std::memory_order_relaxed);
-        });
+    threads.emplace_back([&atoms, i] {
+      EventBase eb;
+      auto& atom = *atoms.at(i);
+      auto ebth = thread([&] { eb.loopForever(); });
+      eb.waitUntilRunning();
+      eb.runInEventBaseThreadAndWait([&] {
         size_t x = 0;
         atom.compare_exchange_weak(
-            x, 2, std::memory_order_release, std::memory_order_relaxed);
-        eb.terminateLoopSoon();
-        ebth.join();
+            x, 1, std::memory_order_release, std::memory_order_relaxed);
+      });
+      size_t x = 0;
+      atom.compare_exchange_weak(
+          x, 2, std::memory_order_release, std::memory_order_relaxed);
+      eb.terminateLoopSoon();
+      ebth.join();
     });
   }
   for (size_t i = 0; i < c; ++i) {
@@ -1820,3 +1830,48 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) {
 
   t.join();
 }
+
+TEST(EventBaseTest, DrivableExecutorTest) {
+  folly::Promise<bool> p;
+  auto f = p.getFuture();
+  EventBase base;
+  bool finished = false;
+
+  std::thread t([&] {
+    /* sleep override */
+    std::this_thread::sleep_for(std::chrono::microseconds(10));
+    finished = true;
+    base.runInEventBaseThread([&]() { p.setValue(true); });
+  });
+
+  // Ensure drive does not busy wait
+  base.drive(); // TODO: fix notification queue init() extra wakeup
+  base.drive();
+  EXPECT_TRUE(finished);
+
+  folly::Promise<bool> p2;
+  auto f2 = p2.getFuture();
+  // Ensure waitVia gets woken up properly, even from
+  // a separate thread.
+  base.runAfterDelay([&]() { p2.setValue(true); }, 10);
+  f2.waitVia(&base);
+  EXPECT_TRUE(f2.isReady());
+
+  t.join();
+}
+
+TEST(EventBaseTest, RequestContextTest) {
+  EventBase evb;
+  auto defaultCtx = RequestContext::get();
+
+  {
+    RequestContextScopeGuard rctx;
+    auto context = RequestContext::get();
+    EXPECT_NE(defaultCtx, context);
+    evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
+  }
+
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+  evb.loop();
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+}