2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/Memory.h>
18 #include <folly/ScopeGuard.h>
20 #include <folly/io/async/AsyncTimeout.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventHandler.h>
23 #include <folly/io/async/test/SocketPair.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/portability/Unistd.h>
27 #include <folly/futures/Promise.h>
38 using std::unique_ptr;
43 using std::chrono::milliseconds;
44 using std::chrono::microseconds;
45 using std::chrono::duration_cast;
47 using namespace std::chrono_literals;
49 using namespace folly;
51 ///////////////////////////////////////////////////////////////////////////
52 // Tests for read and write events
53 ///////////////////////////////////////////////////////////////////////////
55 enum { BUF_SIZE = 4096 };
57 ssize_t writeToFD(int fd, size_t length) {
58 // write an arbitrary amount of data to the fd
59 auto bufv = vector<char>(length);
60 auto buf = bufv.data();
61 memset(buf, 'a', length);
62 ssize_t rc = write(fd, buf, length);
67 size_t writeUntilFull(int fd) {
68 // Write to the fd until EAGAIN is returned
69 size_t bytesWritten = 0;
71 memset(buf, 'a', sizeof(buf));
73 ssize_t rc = write(fd, buf, sizeof(buf));
75 CHECK_EQ(errno, EAGAIN);
84 ssize_t readFromFD(int fd, size_t length) {
85 // write an arbitrary amount of data to the fd
86 auto buf = vector<char>(length);
87 return read(fd, buf.data(), length);
90 size_t readUntilEmpty(int fd) {
91 // Read from the fd until EAGAIN is returned
95 int rc = read(fd, buf, sizeof(buf));
97 CHECK(false) << "unexpected EOF";
99 CHECK_EQ(errno, EAGAIN);
108 void checkReadUntilEmpty(int fd, size_t expectedLength) {
109 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
112 struct ScheduledEvent {
118 void perform(int fd) {
119 if (events & EventHandler::READ) {
121 result = readUntilEmpty(fd);
123 result = readFromFD(fd, length);
126 if (events & EventHandler::WRITE) {
128 result = writeUntilFull(fd);
130 result = writeToFD(fd, length);
136 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
137 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
138 eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
143 class TestHandler : public EventHandler {
145 TestHandler(EventBase* eventBase, int fd)
146 : EventHandler(eventBase, fd), fd_(fd) {}
148 void handlerReady(uint16_t events) noexcept override {
149 ssize_t bytesRead = 0;
150 ssize_t bytesWritten = 0;
152 // Read all available data, so EventBase will stop calling us
153 // until new data becomes available
154 bytesRead = readUntilEmpty(fd_);
156 if (events & WRITE) {
157 // Write until the pipe buffer is full, so EventBase will stop calling
158 // us until the other end has read some data
159 bytesWritten = writeUntilFull(fd_);
162 log.emplace_back(events, bytesRead, bytesWritten);
166 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
169 , bytesRead(bytesRead)
170 , bytesWritten(bytesWritten) {}
175 ssize_t bytesWritten;
178 deque<EventRecord> log;
187 TEST(EventBaseTest, ReadEvent) {
191 // Register for read events
192 TestHandler handler(&eb, sp[0]);
193 handler.registerHandler(EventHandler::READ);
195 // Register timeouts to perform two write events
196 ScheduledEvent events[] = {
197 { 10, EventHandler::WRITE, 2345, 0 },
198 { 160, EventHandler::WRITE, 99, 0 },
201 scheduleEvents(&eb, sp[1], events);
208 // Since we didn't use the EventHandler::PERSIST flag, the handler should
209 // have received the first read, then unregistered itself. Check that only
210 // the first chunk of data was received.
211 ASSERT_EQ(handler.log.size(), 1);
212 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
213 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
214 milliseconds(events[0].milliseconds), milliseconds(90));
215 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
216 ASSERT_EQ(handler.log[0].bytesWritten, 0);
217 T_CHECK_TIMEOUT(start, end,
218 milliseconds(events[1].milliseconds), milliseconds(30));
220 // Make sure the second chunk of data is still waiting to be read.
221 size_t bytesRemaining = readUntilEmpty(sp[0]);
222 ASSERT_EQ(bytesRemaining, events[1].length);
226 * Test (READ | PERSIST)
228 TEST(EventBaseTest, ReadPersist) {
232 // Register for read events
233 TestHandler handler(&eb, sp[0]);
234 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
236 // Register several timeouts to perform writes
237 ScheduledEvent events[] = {
238 { 10, EventHandler::WRITE, 1024, 0 },
239 { 20, EventHandler::WRITE, 2211, 0 },
240 { 30, EventHandler::WRITE, 4096, 0 },
241 { 100, EventHandler::WRITE, 100, 0 },
244 scheduleEvents(&eb, sp[1], events);
246 // Schedule a timeout to unregister the handler after the third write
247 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
254 // The handler should have received the first 3 events,
255 // then been unregistered after that.
256 ASSERT_EQ(handler.log.size(), 3);
257 for (int n = 0; n < 3; ++n) {
258 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
259 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
260 milliseconds(events[n].milliseconds));
261 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
262 ASSERT_EQ(handler.log[n].bytesWritten, 0);
264 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
266 // Make sure the data from the last write is still waiting to be read
267 size_t bytesRemaining = readUntilEmpty(sp[0]);
268 ASSERT_EQ(bytesRemaining, events[3].length);
272 * Test registering for READ when the socket is immediately readable
274 TEST(EventBaseTest, ReadImmediate) {
278 // Write some data to the socket so the other end will
279 // be immediately readable
280 size_t dataLength = 1234;
281 writeToFD(sp[1], dataLength);
283 // Register for read events
284 TestHandler handler(&eb, sp[0]);
285 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
287 // Register a timeout to perform another write
288 ScheduledEvent events[] = {
289 { 10, EventHandler::WRITE, 2345, 0 },
292 scheduleEvents(&eb, sp[1], events);
294 // Schedule a timeout to unregister the handler
295 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
302 ASSERT_EQ(handler.log.size(), 2);
304 // There should have been 1 event for immediate readability
305 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
306 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
307 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
308 ASSERT_EQ(handler.log[0].bytesWritten, 0);
310 // There should be another event after the timeout wrote more data
311 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
312 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
313 milliseconds(events[0].milliseconds));
314 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
315 ASSERT_EQ(handler.log[1].bytesWritten, 0);
317 T_CHECK_TIMEOUT(start, end, milliseconds(20));
323 TEST(EventBaseTest, WriteEvent) {
327 // Fill up the write buffer before starting
328 size_t initialBytesWritten = writeUntilFull(sp[0]);
330 // Register for write events
331 TestHandler handler(&eb, sp[0]);
332 handler.registerHandler(EventHandler::WRITE);
334 // Register timeouts to perform two reads
335 ScheduledEvent events[] = {
336 { 10, EventHandler::READ, 0, 0 },
337 { 60, EventHandler::READ, 0, 0 },
340 scheduleEvents(&eb, sp[1], events);
347 // Since we didn't use the EventHandler::PERSIST flag, the handler should
348 // have only been able to write once, then unregistered itself.
349 ASSERT_EQ(handler.log.size(), 1);
350 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
351 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
352 milliseconds(events[0].milliseconds));
353 ASSERT_EQ(handler.log[0].bytesRead, 0);
354 ASSERT_GT(handler.log[0].bytesWritten, 0);
355 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
357 ASSERT_EQ(events[0].result, initialBytesWritten);
358 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
362 * Test (WRITE | PERSIST)
364 TEST(EventBaseTest, WritePersist) {
368 // Fill up the write buffer before starting
369 size_t initialBytesWritten = writeUntilFull(sp[0]);
371 // Register for write events
372 TestHandler handler(&eb, sp[0]);
373 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
375 // Register several timeouts to read from the socket at several intervals
376 ScheduledEvent events[] = {
377 { 10, EventHandler::READ, 0, 0 },
378 { 40, EventHandler::READ, 0, 0 },
379 { 70, EventHandler::READ, 0, 0 },
380 { 100, EventHandler::READ, 0, 0 },
383 scheduleEvents(&eb, sp[1], events);
385 // Schedule a timeout to unregister the handler after the third read
386 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
393 // The handler should have received the first 3 events,
394 // then been unregistered after that.
395 ASSERT_EQ(handler.log.size(), 3);
396 ASSERT_EQ(events[0].result, initialBytesWritten);
397 for (int n = 0; n < 3; ++n) {
398 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
399 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
400 milliseconds(events[n].milliseconds));
401 ASSERT_EQ(handler.log[n].bytesRead, 0);
402 ASSERT_GT(handler.log[n].bytesWritten, 0);
403 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
405 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
409 * Test registering for WRITE when the socket is immediately writable
411 TEST(EventBaseTest, WriteImmediate) {
415 // Register for write events
416 TestHandler handler(&eb, sp[0]);
417 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
419 // Register a timeout to perform a read
420 ScheduledEvent events[] = {
421 { 10, EventHandler::READ, 0, 0 },
424 scheduleEvents(&eb, sp[1], events);
426 // Schedule a timeout to unregister the handler
427 int64_t unregisterTimeout = 40;
428 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
436 ASSERT_EQ(handler.log.size(), 2);
438 // Since the socket buffer was initially empty,
439 // there should have been 1 event for immediate writability
440 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
441 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
442 ASSERT_EQ(handler.log[0].bytesRead, 0);
443 ASSERT_GT(handler.log[0].bytesWritten, 0);
445 // There should be another event after the timeout wrote more data
446 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
447 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
448 milliseconds(events[0].milliseconds));
449 ASSERT_EQ(handler.log[1].bytesRead, 0);
450 ASSERT_GT(handler.log[1].bytesWritten, 0);
452 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
456 * Test (READ | WRITE) when the socket becomes readable first
458 TEST(EventBaseTest, ReadWrite) {
462 // Fill up the write buffer before starting
463 size_t sock0WriteLength = writeUntilFull(sp[0]);
465 // Register for read and write events
466 TestHandler handler(&eb, sp[0]);
467 handler.registerHandler(EventHandler::READ_WRITE);
469 // Register timeouts to perform a write then a read.
470 ScheduledEvent events[] = {
471 { 10, EventHandler::WRITE, 2345, 0 },
472 { 40, EventHandler::READ, 0, 0 },
475 scheduleEvents(&eb, sp[1], events);
482 // Since we didn't use the EventHandler::PERSIST flag, the handler should
483 // have only noticed readability, then unregistered itself. Check that only
484 // one event was logged.
485 ASSERT_EQ(handler.log.size(), 1);
486 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
487 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
488 milliseconds(events[0].milliseconds));
489 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
490 ASSERT_EQ(handler.log[0].bytesWritten, 0);
491 ASSERT_EQ(events[1].result, sock0WriteLength);
492 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
496 * Test (READ | WRITE) when the socket becomes writable first
498 TEST(EventBaseTest, WriteRead) {
502 // Fill up the write buffer before starting
503 size_t sock0WriteLength = writeUntilFull(sp[0]);
505 // Register for read and write events
506 TestHandler handler(&eb, sp[0]);
507 handler.registerHandler(EventHandler::READ_WRITE);
509 // Register timeouts to perform a read then a write.
510 size_t sock1WriteLength = 2345;
511 ScheduledEvent events[] = {
512 { 10, EventHandler::READ, 0, 0 },
513 { 40, EventHandler::WRITE, sock1WriteLength, 0 },
516 scheduleEvents(&eb, sp[1], events);
523 // Since we didn't use the EventHandler::PERSIST flag, the handler should
524 // have only noticed writability, then unregistered itself. Check that only
525 // one event was logged.
526 ASSERT_EQ(handler.log.size(), 1);
527 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
528 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
529 milliseconds(events[0].milliseconds));
530 ASSERT_EQ(handler.log[0].bytesRead, 0);
531 ASSERT_GT(handler.log[0].bytesWritten, 0);
532 ASSERT_EQ(events[0].result, sock0WriteLength);
533 ASSERT_EQ(events[1].result, sock1WriteLength);
534 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
536 // Make sure the written data is still waiting to be read.
537 size_t bytesRemaining = readUntilEmpty(sp[0]);
538 ASSERT_EQ(bytesRemaining, events[1].length);
542 * Test (READ | WRITE) when the socket becomes readable and writable
545 TEST(EventBaseTest, ReadWriteSimultaneous) {
549 // Fill up the write buffer before starting
550 size_t sock0WriteLength = writeUntilFull(sp[0]);
552 // Register for read and write events
553 TestHandler handler(&eb, sp[0]);
554 handler.registerHandler(EventHandler::READ_WRITE);
556 // Register a timeout to perform a read and write together
557 ScheduledEvent events[] = {
558 { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
561 scheduleEvents(&eb, sp[1], events);
568 // It's not strictly required that the EventBase register us about both
569 // events in the same call. So, it's possible that if the EventBase
570 // implementation changes this test could start failing, and it wouldn't be
571 // considered breaking the API. However for now it's nice to exercise this
573 ASSERT_EQ(handler.log.size(), 1);
574 ASSERT_EQ(handler.log[0].events,
575 EventHandler::READ | EventHandler::WRITE);
576 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
577 milliseconds(events[0].milliseconds));
578 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
579 ASSERT_GT(handler.log[0].bytesWritten, 0);
580 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
584 * Test (READ | WRITE | PERSIST)
586 TEST(EventBaseTest, ReadWritePersist) {
590 // Register for read and write events
591 TestHandler handler(&eb, sp[0]);
592 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
593 EventHandler::PERSIST);
595 // Register timeouts to perform several reads and writes
596 ScheduledEvent events[] = {
597 { 10, EventHandler::WRITE, 2345, 0 },
598 { 20, EventHandler::READ, 0, 0 },
599 { 35, EventHandler::WRITE, 200, 0 },
600 { 45, EventHandler::WRITE, 15, 0 },
601 { 55, EventHandler::READ, 0, 0 },
602 { 120, EventHandler::WRITE, 2345, 0 },
605 scheduleEvents(&eb, sp[1], events);
607 // Schedule a timeout to unregister the handler
608 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
615 ASSERT_EQ(handler.log.size(), 6);
617 // Since we didn't fill up the write buffer immediately, there should
618 // be an immediate event for writability.
619 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
620 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
621 ASSERT_EQ(handler.log[0].bytesRead, 0);
622 ASSERT_GT(handler.log[0].bytesWritten, 0);
624 // Events 1 through 5 should correspond to the scheduled events
625 for (int n = 1; n < 6; ++n) {
626 ScheduledEvent* event = &events[n - 1];
627 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
628 milliseconds(event->milliseconds));
629 if (event->events == EventHandler::READ) {
630 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
631 ASSERT_EQ(handler.log[n].bytesRead, 0);
632 ASSERT_GT(handler.log[n].bytesWritten, 0);
634 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
635 ASSERT_EQ(handler.log[n].bytesRead, event->length);
636 ASSERT_EQ(handler.log[n].bytesWritten, 0);
640 // The timeout should have unregistered the handler before the last write.
641 // Make sure that data is still waiting to be read
642 size_t bytesRemaining = readUntilEmpty(sp[0]);
643 ASSERT_EQ(bytesRemaining, events[5].length);
647 class PartialReadHandler : public TestHandler {
649 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
650 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
652 void handlerReady(uint16_t events) noexcept override {
653 assert(events == EventHandler::READ);
654 ssize_t bytesRead = readFromFD(fd_, readLength_);
655 log.emplace_back(events, bytesRead, 0);
664 * Test reading only part of the available data when a read event is fired.
665 * When PERSIST is used, make sure the handler gets notified again the next
666 * time around the loop.
668 TEST(EventBaseTest, ReadPartial) {
672 // Register for read events
673 size_t readLength = 100;
674 PartialReadHandler handler(&eb, sp[0], readLength);
675 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
677 // Register a timeout to perform a single write,
678 // with more data than PartialReadHandler will read at once
679 ScheduledEvent events[] = {
680 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
683 scheduleEvents(&eb, sp[1], events);
685 // Schedule a timeout to unregister the handler
686 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
693 ASSERT_EQ(handler.log.size(), 4);
695 // The first 3 invocations should read readLength bytes each
696 for (int n = 0; n < 3; ++n) {
697 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
698 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
699 milliseconds(events[0].milliseconds));
700 ASSERT_EQ(handler.log[n].bytesRead, readLength);
701 ASSERT_EQ(handler.log[n].bytesWritten, 0);
703 // The last read only has readLength/2 bytes
704 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
705 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
706 milliseconds(events[0].milliseconds));
707 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
708 ASSERT_EQ(handler.log[3].bytesWritten, 0);
712 class PartialWriteHandler : public TestHandler {
714 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
715 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
717 void handlerReady(uint16_t events) noexcept override {
718 assert(events == EventHandler::WRITE);
719 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
720 log.emplace_back(events, 0, bytesWritten);
729 * Test writing without completely filling up the write buffer when the fd
730 * becomes writable. When PERSIST is used, make sure the handler gets
731 * notified again the next time around the loop.
733 TEST(EventBaseTest, WritePartial) {
737 // Fill up the write buffer before starting
738 size_t initialBytesWritten = writeUntilFull(sp[0]);
740 // Register for write events
741 size_t writeLength = 100;
742 PartialWriteHandler handler(&eb, sp[0], writeLength);
743 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
745 // Register a timeout to read, so that more data can be written
746 ScheduledEvent events[] = {
747 { 10, EventHandler::READ, 0, 0 },
750 scheduleEvents(&eb, sp[1], events);
752 // Schedule a timeout to unregister the handler
753 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
760 // Depending on how big the socket buffer is, there will be multiple writes
761 // Only check the first 5
763 ASSERT_GE(handler.log.size(), numChecked);
764 ASSERT_EQ(events[0].result, initialBytesWritten);
766 // The first 3 invocations should read writeLength bytes each
767 for (int n = 0; n < numChecked; ++n) {
768 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
769 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
770 milliseconds(events[0].milliseconds));
771 ASSERT_EQ(handler.log[n].bytesRead, 0);
772 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
778 * Test destroying a registered EventHandler
780 TEST(EventBaseTest, DestroyHandler) {
781 class DestroyHandler : public AsyncTimeout {
783 DestroyHandler(EventBase* eb, EventHandler* h)
787 void timeoutExpired() noexcept override { delete handler_; }
790 EventHandler* handler_;
796 // Fill up the write buffer before starting
797 size_t initialBytesWritten = writeUntilFull(sp[0]);
799 // Register for write events
800 TestHandler* handler = new TestHandler(&eb, sp[0]);
801 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
803 // After 10ms, read some data, so that the handler
804 // will be notified that it can write.
805 eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
808 // Start a timer to destroy the handler after 25ms
809 // This mainly just makes sure the code doesn't break or assert
810 DestroyHandler dh(&eb, handler);
811 dh.scheduleTimeout(25);
817 // Make sure the EventHandler was uninstalled properly when it was
818 // destroyed, and the EventBase loop exited
819 T_CHECK_TIMEOUT(start, end, milliseconds(25));
821 // Make sure that the handler wrote data to the socket
822 // before it was destroyed
823 size_t bytesRemaining = readUntilEmpty(sp[1]);
824 ASSERT_GT(bytesRemaining, 0);
828 ///////////////////////////////////////////////////////////////////////////
829 // Tests for timeout events
830 ///////////////////////////////////////////////////////////////////////////
832 TEST(EventBaseTest, RunAfterDelay) {
835 TimePoint timestamp1(false);
836 TimePoint timestamp2(false);
837 TimePoint timestamp3(false);
838 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
839 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
840 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
846 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
847 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
848 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
849 T_CHECK_TIMEOUT(start, end, milliseconds(40));
853 * Test the behavior of tryRunAfterDelay() when some timeouts are
854 * still scheduled when the EventBase is destroyed.
856 TEST(EventBaseTest, RunAfterDelayDestruction) {
857 TimePoint timestamp1(false);
858 TimePoint timestamp2(false);
859 TimePoint timestamp3(false);
860 TimePoint timestamp4(false);
861 TimePoint start(false);
862 TimePoint end(false);
867 // Run two normal timeouts
868 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
869 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
871 // Schedule a timeout to stop the event loop after 40ms
872 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
874 // Schedule 2 timeouts that would fire after the event loop stops
875 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
876 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
883 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
884 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
885 T_CHECK_TIMEOUT(start, end, milliseconds(40));
887 ASSERT_TRUE(timestamp3.isUnset());
888 ASSERT_TRUE(timestamp4.isUnset());
890 // Ideally this test should be run under valgrind to ensure that no
894 class TestTimeout : public AsyncTimeout {
896 explicit TestTimeout(EventBase* eventBase)
897 : AsyncTimeout(eventBase)
898 , timestamp(false) {}
900 void timeoutExpired() noexcept override { timestamp.reset(); }
905 TEST(EventBaseTest, BasicTimeouts) {
911 t1.scheduleTimeout(10);
912 t2.scheduleTimeout(20);
913 t3.scheduleTimeout(40);
919 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922 T_CHECK_TIMEOUT(start, end, milliseconds(40));
925 class ReschedulingTimeout : public AsyncTimeout {
927 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
929 , timeouts_(timeouts)
930 , iterator_(timeouts_.begin()) {}
936 void timeoutExpired() noexcept override {
937 timestamps.emplace_back();
942 if (iterator_ != timeouts_.end()) {
943 uint32_t timeout = *iterator_;
945 scheduleTimeout(timeout);
949 vector<TimePoint> timestamps;
952 vector<uint32_t> timeouts_;
953 vector<uint32_t>::const_iterator iterator_;
957 * Test rescheduling the same timeout multiple times
959 TEST(EventBaseTest, ReuseTimeout) {
962 vector<uint32_t> timeouts;
963 timeouts.push_back(10);
964 timeouts.push_back(30);
965 timeouts.push_back(15);
967 ReschedulingTimeout t(&eb, timeouts);
974 // Use a higher tolerance than usual. We're waiting on 3 timeouts
975 // consecutively. In general, each timeout may go over by a few
976 // milliseconds, and we're tripling this error by witing on 3 timeouts.
977 milliseconds tolerance{6};
979 ASSERT_EQ(timeouts.size(), t.timestamps.size());
981 for (size_t n = 0; n < timeouts.size(); ++n) {
982 total += timeouts[n];
983 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
985 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
989 * Test rescheduling a timeout before it has fired
991 TEST(EventBaseTest, RescheduleTimeout) {
998 t1.scheduleTimeout(15);
999 t2.scheduleTimeout(30);
1000 t3.scheduleTimeout(30);
1002 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1003 &AsyncTimeout::scheduleTimeout);
1005 // after 10ms, reschedule t2 to run sooner than originally scheduled
1006 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1007 // after 10ms, reschedule t3 to run later than originally scheduled
1008 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1014 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1015 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1016 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1017 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1021 * Test cancelling a timeout
1023 TEST(EventBaseTest, CancelTimeout) {
1026 vector<uint32_t> timeouts;
1027 timeouts.push_back(10);
1028 timeouts.push_back(30);
1029 timeouts.push_back(25);
1031 ReschedulingTimeout t(&eb, timeouts);
1033 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1039 ASSERT_EQ(t.timestamps.size(), 2);
1040 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1041 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1042 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1046 * Test destroying a scheduled timeout object
1048 TEST(EventBaseTest, DestroyTimeout) {
1049 class DestroyTimeout : public AsyncTimeout {
1051 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1055 void timeoutExpired() noexcept override { delete timeout_; }
1058 AsyncTimeout* timeout_;
1063 TestTimeout* t1 = new TestTimeout(&eb);
1064 t1->scheduleTimeout(30);
1066 DestroyTimeout dt(&eb, t1);
1067 dt.scheduleTimeout(10);
1073 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1077 ///////////////////////////////////////////////////////////////////////////
1078 // Test for runInThreadTestFunc()
1079 ///////////////////////////////////////////////////////////////////////////
1081 struct RunInThreadData {
1082 RunInThreadData(int numThreads, int opsPerThread)
1083 : opsPerThread(opsPerThread)
1084 , opsToGo(numThreads*opsPerThread) {}
1087 deque< pair<int, int> > values;
1093 struct RunInThreadArg {
1094 RunInThreadArg(RunInThreadData* data,
1101 RunInThreadData* data;
1106 void runInThreadTestFunc(RunInThreadArg* arg) {
1107 arg->data->values.emplace_back(arg->thread, arg->value);
1108 RunInThreadData* data = arg->data;
1111 if(--data->opsToGo == 0) {
1112 // Break out of the event base loop if we are the last thread running
1113 data->evb.terminateLoopSoon();
1117 TEST(EventBaseTest, RunInThread) {
1118 constexpr uint32_t numThreads = 50;
1119 constexpr uint32_t opsPerThread = 100;
1120 RunInThreadData data(numThreads, opsPerThread);
1122 deque<std::thread> threads;
1124 // Wait on all of the threads.
1125 for (auto& thread : threads) {
1130 for (uint32_t i = 0; i < numThreads; ++i) {
1131 threads.emplace_back([i, &data] {
1132 for (int n = 0; n < data.opsPerThread; ++n) {
1133 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1134 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1140 // Add a timeout event to run after 3 seconds.
1141 // Otherwise loop() will return immediately since there are no events to run.
1142 // Once the last thread exits, it will stop the loop(). However, this
1143 // timeout also stops the loop in case there is a bug performing the normal
1145 data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1152 // Verify that the loop exited because all threads finished and requested it
1153 // to stop. This should happen much sooner than the 3 second timeout.
1154 // Assert that it happens in under a second. (This is still tons of extra
1157 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1158 end.getTime() - start.getTime());
1159 ASSERT_LT(timeTaken.count(), 1000);
1160 VLOG(11) << "Time taken: " << timeTaken.count();
1162 // Verify that we have all of the events from every thread
1163 int expectedValues[numThreads];
1164 for (uint32_t n = 0; n < numThreads; ++n) {
1165 expectedValues[n] = 0;
1167 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1168 it != data.values.end();
1170 int threadID = it->first;
1171 int value = it->second;
1172 ASSERT_EQ(expectedValues[threadID], value);
1173 ++expectedValues[threadID];
1175 for (uint32_t n = 0; n < numThreads; ++n) {
1176 ASSERT_EQ(expectedValues[n], opsPerThread);
1180 // This test simulates some calls, and verifies that the waiting happens by
1181 // triggering what otherwise would be race conditions, and trying to detect
1182 // whether any of the race conditions happened.
1183 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1184 const size_t c = 256;
1185 vector<unique_ptr<atomic<size_t>>> atoms(c);
1186 for (size_t i = 0; i < c; ++i) {
1187 auto& atom = atoms.at(i);
1188 atom = std::make_unique<atomic<size_t>>(0);
1190 vector<thread> threads;
1191 for (size_t i = 0; i < c; ++i) {
1192 threads.emplace_back([&atoms, i] {
1194 auto& atom = *atoms.at(i);
1195 auto ebth = thread([&] { eb.loopForever(); });
1196 eb.waitUntilRunning();
1197 eb.runInEventBaseThreadAndWait([&] {
1199 atom.compare_exchange_weak(
1200 x, 1, std::memory_order_release, std::memory_order_relaxed);
1203 atom.compare_exchange_weak(
1204 x, 2, std::memory_order_release, std::memory_order_relaxed);
1205 eb.terminateLoopSoon();
1209 for (size_t i = 0; i < c; ++i) {
1210 auto& th = threads.at(i);
1214 for (auto& atom : atoms) {
1220 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1222 thread th(&EventBase::loopForever, &eb);
1224 eb.terminateLoopSoon();
1227 auto mutated = false;
1228 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1231 EXPECT_TRUE(mutated);
1234 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1236 thread th(&EventBase::loopForever, &eb);
1238 eb.terminateLoopSoon();
1241 eb.runInEventBaseThreadAndWait([&] {
1242 auto mutated = false;
1243 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1246 EXPECT_TRUE(mutated);
1250 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1252 auto mutated = false;
1253 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1256 EXPECT_TRUE(mutated);
1259 ///////////////////////////////////////////////////////////////////////////
1260 // Tests for runInLoop()
1261 ///////////////////////////////////////////////////////////////////////////
1263 class CountedLoopCallback : public EventBase::LoopCallback {
1265 CountedLoopCallback(EventBase* eventBase,
1267 std::function<void()> action =
1268 std::function<void()>())
1269 : eventBase_(eventBase)
1271 , action_(action) {}
1273 void runLoopCallback() noexcept override {
1276 eventBase_->runInLoop(this);
1277 } else if (action_) {
1282 unsigned int getCount() const {
1287 EventBase* eventBase_;
1288 unsigned int count_;
1289 std::function<void()> action_;
1292 // Test that EventBase::loop() doesn't exit while there are
1293 // still LoopCallbacks remaining to be invoked.
1294 TEST(EventBaseTest, RepeatedRunInLoop) {
1295 EventBase eventBase;
1297 CountedLoopCallback c(&eventBase, 10);
1298 eventBase.runInLoop(&c);
1299 // The callback shouldn't have run immediately
1300 ASSERT_EQ(c.getCount(), 10);
1303 // loop() should loop until the CountedLoopCallback stops
1304 // re-installing itself.
1305 ASSERT_EQ(c.getCount(), 0);
1308 // Test that EventBase::loop() works as expected without time measurements.
1309 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1310 EventBase eventBase(false);
1312 CountedLoopCallback c(&eventBase, 10);
1313 eventBase.runInLoop(&c);
1314 // The callback shouldn't have run immediately
1315 ASSERT_EQ(c.getCount(), 10);
1318 // loop() should loop until the CountedLoopCallback stops
1319 // re-installing itself.
1320 ASSERT_EQ(c.getCount(), 0);
1323 // Test runInLoop() calls with terminateLoopSoon()
1324 TEST(EventBaseTest, RunInLoopStopLoop) {
1325 EventBase eventBase;
1327 CountedLoopCallback c1(&eventBase, 20);
1328 CountedLoopCallback c2(&eventBase, 10,
1329 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1331 eventBase.runInLoop(&c1);
1332 eventBase.runInLoop(&c2);
1333 ASSERT_EQ(c1.getCount(), 20);
1334 ASSERT_EQ(c2.getCount(), 10);
1336 eventBase.loopForever();
1338 // c2 should have stopped the loop after 10 iterations
1339 ASSERT_EQ(c2.getCount(), 0);
1341 // We allow the EventBase to run the loop callbacks in whatever order it
1342 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1343 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1346 // (With the current code, c1 will always run 10 times, but we don't consider
1347 // this a hard API requirement.)
1348 ASSERT_GE(c1.getCount(), 10);
1349 ASSERT_LE(c1.getCount(), 11);
1352 TEST(EventBaseTest, messageAvailableException) {
1353 auto deadManWalking = [] {
1354 EventBase eventBase;
1356 // Call this from another thread to force use of NotificationQueue in
1357 // runInEventBaseThread
1358 eventBase.runInEventBaseThread(
1359 []() { throw std::runtime_error("boom"); });
1362 eventBase.loopForever();
1364 EXPECT_DEATH(deadManWalking(), ".*");
1367 TEST(EventBaseTest, TryRunningAfterTerminate) {
1368 EventBase eventBase;
1369 CountedLoopCallback c1(&eventBase, 1,
1370 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1371 eventBase.runInLoop(&c1);
1372 eventBase.loopForever();
1374 eventBase.runInEventBaseThread([&]() {
1381 // Test cancelling runInLoop() callbacks
1382 TEST(EventBaseTest, CancelRunInLoop) {
1383 EventBase eventBase;
1385 CountedLoopCallback c1(&eventBase, 20);
1386 CountedLoopCallback c2(&eventBase, 20);
1387 CountedLoopCallback c3(&eventBase, 20);
1389 std::function<void()> cancelC1Action =
1390 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1391 std::function<void()> cancelC2Action =
1392 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1394 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1395 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1397 // Install cancelC1 after c1
1398 eventBase.runInLoop(&c1);
1399 eventBase.runInLoop(&cancelC1);
1401 // Install cancelC2 before c2
1402 eventBase.runInLoop(&cancelC2);
1403 eventBase.runInLoop(&c2);
1406 eventBase.runInLoop(&c3);
1408 ASSERT_EQ(c1.getCount(), 20);
1409 ASSERT_EQ(c2.getCount(), 20);
1410 ASSERT_EQ(c3.getCount(), 20);
1411 ASSERT_EQ(cancelC1.getCount(), 10);
1412 ASSERT_EQ(cancelC2.getCount(), 10);
1417 // cancelC1 and cancelC2 should have both fired after 10 iterations and
1418 // stopped re-installing themselves
1419 ASSERT_EQ(cancelC1.getCount(), 0);
1420 ASSERT_EQ(cancelC2.getCount(), 0);
1421 // c3 should have continued on for the full 20 iterations
1422 ASSERT_EQ(c3.getCount(), 0);
1424 // c1 and c2 should have both been cancelled on the 10th iteration.
1426 // Callbacks are always run in the order they are installed,
1427 // so c1 should have fired 10 times, and been canceled after it ran on the
1428 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1429 // have run before it on the 10th iteration, and cancelled it before it
1431 ASSERT_EQ(c1.getCount(), 10);
1432 ASSERT_EQ(c2.getCount(), 11);
1435 class TerminateTestCallback : public EventBase::LoopCallback,
1436 public EventHandler {
1438 TerminateTestCallback(EventBase* eventBase, int fd)
1439 : EventHandler(eventBase, fd),
1440 eventBase_(eventBase),
1441 loopInvocations_(0),
1442 maxLoopInvocations_(0),
1443 eventInvocations_(0),
1444 maxEventInvocations_(0) {}
1446 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1447 loopInvocations_ = 0;
1448 maxLoopInvocations_ = maxLoopInvocations;
1449 eventInvocations_ = 0;
1450 maxEventInvocations_ = maxEventInvocations;
1452 cancelLoopCallback();
1453 unregisterHandler();
1456 void handlerReady(uint16_t /* events */) noexcept override {
1457 // We didn't register with PERSIST, so we will have been automatically
1458 // unregistered already.
1459 ASSERT_FALSE(isHandlerRegistered());
1461 ++eventInvocations_;
1462 if (eventInvocations_ >= maxEventInvocations_) {
1466 eventBase_->runInLoop(this);
1468 void runLoopCallback() noexcept override {
1470 if (loopInvocations_ >= maxLoopInvocations_) {
1474 registerHandler(READ);
1477 uint32_t getLoopInvocations() const {
1478 return loopInvocations_;
1480 uint32_t getEventInvocations() const {
1481 return eventInvocations_;
1485 EventBase* eventBase_;
1486 uint32_t loopInvocations_;
1487 uint32_t maxLoopInvocations_;
1488 uint32_t eventInvocations_;
1489 uint32_t maxEventInvocations_;
1493 * Test that EventBase::loop() correctly detects when there are no more events
1496 * This uses a single callback, which alternates registering itself as a loop
1497 * callback versus a EventHandler callback. This exercises a regression where
1498 * EventBase::loop() incorrectly exited if there were no more fd handlers
1499 * registered, but a loop callback installed a new fd handler.
1501 TEST(EventBaseTest, LoopTermination) {
1502 EventBase eventBase;
1504 // Open a pipe and close the write end,
1505 // so the read endpoint will be readable
1507 int rc = pipe(pipeFds);
1510 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1512 // Test once where the callback will exit after a loop callback
1513 callback.reset(10, 100);
1514 eventBase.runInLoop(&callback);
1516 ASSERT_EQ(callback.getLoopInvocations(), 10);
1517 ASSERT_EQ(callback.getEventInvocations(), 9);
1519 // Test once where the callback will exit after an fd event callback
1520 callback.reset(100, 7);
1521 eventBase.runInLoop(&callback);
1523 ASSERT_EQ(callback.getLoopInvocations(), 7);
1524 ASSERT_EQ(callback.getEventInvocations(), 7);
1529 ///////////////////////////////////////////////////////////////////////////
1530 // Tests for latency calculations
1531 ///////////////////////////////////////////////////////////////////////////
1533 class IdleTimeTimeoutSeries : public AsyncTimeout {
1537 explicit IdleTimeTimeoutSeries(EventBase *base,
1538 std::deque<std::uint64_t>& timeout) :
1545 ~IdleTimeTimeoutSeries() override {}
1547 void timeoutExpired() noexcept override {
1550 if(timeout_.empty()){
1553 uint64_t sleepTime = timeout_.front();
1554 timeout_.pop_front();
1562 int getTimeouts() const {
1568 std::deque<uint64_t>& timeout_;
1572 * Verify that idle time is correctly accounted for when decaying our loop
1575 * This works by creating a high loop time (via usleep), expecting a latency
1576 * callback with known value, and then scheduling a timeout for later. This
1577 * later timeout is far enough in the future that the idle time should have
1578 * caused the loop time to decay.
1580 TEST(EventBaseTest, IdleTime) {
1581 EventBase eventBase;
1582 eventBase.setLoadAvgMsec(1000ms);
1583 eventBase.resetLoadAvg(5900.0);
1584 std::deque<uint64_t> timeouts0(4, 8080);
1585 timeouts0.push_front(8000);
1586 timeouts0.push_back(14000);
1587 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1588 std::deque<uint64_t> timeouts(20, 20);
1589 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1590 int64_t testStart = duration_cast<microseconds>(
1591 std::chrono::steady_clock::now().time_since_epoch()).count();
1592 bool hostOverloaded = false;
1594 int latencyCallbacks = 0;
1595 eventBase.setMaxLatency(6000us, [&]() {
1597 if (latencyCallbacks != 1) {
1598 FAIL() << "Unexpected latency callback";
1601 if (tos0.getTimeouts() < 6) {
1602 // This could only happen if the host this test is running
1603 // on is heavily loaded.
1604 int64_t maxLatencyReached = duration_cast<microseconds>(
1605 std::chrono::steady_clock::now().time_since_epoch()).count();
1606 ASSERT_LE(43800, maxLatencyReached - testStart);
1607 hostOverloaded = true;
1610 ASSERT_EQ(6, tos0.getTimeouts());
1611 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1612 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1613 tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1616 // Kick things off with an "immedite" timeout
1617 tos0.scheduleTimeout(1);
1621 if (hostOverloaded) {
1625 ASSERT_EQ(1, latencyCallbacks);
1626 ASSERT_EQ(7, tos0.getTimeouts());
1627 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1628 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1630 ASSERT_EQ(21, tos->getTimeouts());
1634 * Test that thisLoop functionality works with terminateLoopSoon
1636 TEST(EventBaseTest, ThisLoop) {
1638 bool runInLoop = false;
1639 bool runThisLoop = false;
1642 eb.terminateLoopSoon();
1643 eb.runInLoop([&]() {
1646 eb.runInLoop([&]() {
1653 ASSERT_FALSE(runInLoop);
1654 // Should work with thisLoop
1655 ASSERT_TRUE(runThisLoop);
1658 TEST(EventBaseTest, EventBaseThreadLoop) {
1662 base.runInEventBaseThread([&](){
1670 TEST(EventBaseTest, EventBaseThreadName) {
1672 base.setName("foo");
1675 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1677 pthread_getname_np(pthread_self(), name, 16);
1678 ASSERT_EQ(0, strcmp("foo", name));
1682 TEST(EventBaseTest, RunBeforeLoop) {
1684 CountedLoopCallback cb(&base, 1, [&](){
1685 base.terminateLoopSoon();
1687 base.runBeforeLoop(&cb);
1689 ASSERT_EQ(cb.getCount(), 0);
1692 TEST(EventBaseTest, RunBeforeLoopWait) {
1694 CountedLoopCallback cb(&base, 1);
1695 base.tryRunAfterDelay([&](){
1696 base.terminateLoopSoon();
1698 base.runBeforeLoop(&cb);
1701 // Check that we only ran once, and did not loop multiple times.
1702 ASSERT_EQ(cb.getCount(), 0);
1705 class PipeHandler : public EventHandler {
1707 PipeHandler(EventBase* eventBase, int fd)
1708 : EventHandler(eventBase, fd) {}
1710 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1713 TEST(EventBaseTest, StopBeforeLoop) {
1716 // Give the evb something to do.
1718 ASSERT_EQ(0, pipe(p));
1719 PipeHandler handler(&evb, p[0]);
1720 handler.registerHandler(EventHandler::READ);
1722 // It's definitely not running yet
1723 evb.terminateLoopSoon();
1725 // let it run, it should exit quickly.
1726 std::thread t([&] { evb.loop(); });
1729 handler.unregisterHandler();
1736 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1741 base.runInEventBaseThread([&](){
1749 TEST(EventBaseTest, LoopKeepAlive) {
1753 std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1754 /* sleep override */ std::this_thread::sleep_for(
1755 std::chrono::milliseconds(100));
1756 evb.runInEventBaseThread(
1757 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1767 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1773 evb.runInEventBaseThread([&] {
1774 t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1775 /* sleep override */ std::this_thread::sleep_for(
1776 std::chrono::milliseconds(100));
1777 evb.runInEventBaseThread(
1778 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1789 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1790 std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1794 std::thread evThread([&] {
1801 auto* ev = evb.get();
1802 Executor::KeepAlive keepAlive;
1803 ev->runInEventBaseThreadAndWait(
1804 [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
1805 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1806 ev->terminateLoopSoon();
1807 /* sleep override */
1808 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1809 ASSERT_FALSE(done) << "Loop terminated early";
1810 ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
1817 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1818 auto evb = std::make_unique<EventBase>();
1824 loopKeepAlive = evb->getKeepAliveToken(),
1827 /* sleep override */ std::this_thread::sleep_for(
1828 std::chrono::milliseconds(100));
1829 evbPtr->runInEventBaseThread(
1830 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1840 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1841 auto evb = std::make_unique<EventBase>();
1843 static constexpr size_t kNumThreads = 100;
1844 static constexpr size_t kNumTasks = 100;
1846 std::vector<std::thread> ts;
1847 std::vector<std::unique_ptr<Baton<>>> batons;
1850 for (size_t i = 0; i < kNumThreads; ++i) {
1851 batons.emplace_back(std::make_unique<Baton<>>());
1854 for (size_t i = 0; i < kNumThreads; ++i) {
1855 ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1856 std::vector<Executor::KeepAlive> keepAlives;
1857 for (size_t j = 0; j < kNumTasks; ++j) {
1858 keepAlives.emplace_back(evbPtr->getKeepAliveToken());
1863 /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1865 for (auto& keepAlive : keepAlives) {
1866 evbPtr->runInEventBaseThread(
1867 [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1872 for (auto& baton : batons) {
1878 EXPECT_EQ(kNumThreads * kNumTasks, done);
1880 for (auto& t : ts) {
1885 TEST(EventBaseTest, DrivableExecutorTest) {
1886 folly::Promise<bool> p;
1887 auto f = p.getFuture();
1889 bool finished = false;
1892 /* sleep override */
1893 std::this_thread::sleep_for(std::chrono::microseconds(10));
1895 base.runInEventBaseThread([&]() { p.setValue(true); });
1898 // Ensure drive does not busy wait
1899 base.drive(); // TODO: fix notification queue init() extra wakeup
1901 EXPECT_TRUE(finished);
1903 folly::Promise<bool> p2;
1904 auto f2 = p2.getFuture();
1905 // Ensure waitVia gets woken up properly, even from
1906 // a separate thread.
1907 base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1909 EXPECT_TRUE(f2.isReady());
1914 TEST(EventBaseTest, RequestContextTest) {
1916 auto defaultCtx = RequestContext::get();
1917 std::weak_ptr<RequestContext> rctx_weak_ptr;
1920 RequestContextScopeGuard rctx;
1921 rctx_weak_ptr = RequestContext::saveContext();
1922 auto context = RequestContext::get();
1923 EXPECT_NE(defaultCtx, context);
1924 evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1928 // Ensure that RequestContext created for the scope has been released and
1930 EXPECT_EQ(rctx_weak_ptr.expired(), true);
1932 EXPECT_EQ(defaultCtx, RequestContext::get());
1935 TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
1937 CountedLoopCallback c(&evb, 1);
1939 auto defaultCtx = RequestContext::get();
1940 EXPECT_EQ(defaultCtx, RequestContext::get());
1941 std::weak_ptr<RequestContext> rctx_weak_ptr;
1944 RequestContextScopeGuard rctx;
1945 rctx_weak_ptr = RequestContext::saveContext();
1946 auto context = RequestContext::get();
1947 EXPECT_NE(defaultCtx, context);
1949 c.cancelLoopCallback();
1952 // Ensure that RequestContext created for the scope has been released and
1954 EXPECT_EQ(rctx_weak_ptr.expired(), true);
1956 EXPECT_EQ(defaultCtx, RequestContext::get());