2 * Copyright 2017-present Facebook, Inc.
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
21 #include <folly/Memory.h>
22 #include <folly/ScopeGuard.h>
24 #include <folly/io/async/AsyncTimeout.h>
25 #include <folly/io/async/EventBase.h>
26 #include <folly/io/async/EventHandler.h>
27 #include <folly/io/async/test/SocketPair.h>
28 #include <folly/io/async/test/Util.h>
29 #include <folly/portability/Unistd.h>
31 #include <folly/futures/Promise.h>
42 using std::unique_ptr;
47 using std::chrono::milliseconds;
48 using std::chrono::microseconds;
49 using std::chrono::duration_cast;
51 using namespace std::chrono_literals;
53 using namespace folly;
55 ///////////////////////////////////////////////////////////////////////////
56 // Tests for read and write events
57 ///////////////////////////////////////////////////////////////////////////
59 enum { BUF_SIZE = 4096 };
61 ssize_t writeToFD(int fd, size_t length) {
62 // write an arbitrary amount of data to the fd
63 auto bufv = vector<char>(length);
64 auto buf = bufv.data();
65 memset(buf, 'a', length);
66 ssize_t rc = write(fd, buf, length);
71 size_t writeUntilFull(int fd) {
72 // Write to the fd until EAGAIN is returned
73 size_t bytesWritten = 0;
75 memset(buf, 'a', sizeof(buf));
77 ssize_t rc = write(fd, buf, sizeof(buf));
79 CHECK_EQ(errno, EAGAIN);
88 ssize_t readFromFD(int fd, size_t length) {
89 // write an arbitrary amount of data to the fd
90 auto buf = vector<char>(length);
91 return read(fd, buf.data(), length);
94 size_t readUntilEmpty(int fd) {
95 // Read from the fd until EAGAIN is returned
99 int rc = read(fd, buf, sizeof(buf));
101 CHECK(false) << "unexpected EOF";
103 CHECK_EQ(errno, EAGAIN);
112 void checkReadUntilEmpty(int fd, size_t expectedLength) {
113 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
116 struct ScheduledEvent {
122 void perform(int fd) {
123 if (events & EventHandler::READ) {
125 result = readUntilEmpty(fd);
127 result = readFromFD(fd, length);
130 if (events & EventHandler::WRITE) {
132 result = writeUntilFull(fd);
134 result = writeToFD(fd, length);
140 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
141 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
142 eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
147 class TestHandler : public EventHandler {
149 TestHandler(EventBase* eventBase, int fd)
150 : EventHandler(eventBase, fd), fd_(fd) {}
152 void handlerReady(uint16_t events) noexcept override {
153 ssize_t bytesRead = 0;
154 ssize_t bytesWritten = 0;
156 // Read all available data, so EventBase will stop calling us
157 // until new data becomes available
158 bytesRead = readUntilEmpty(fd_);
160 if (events & WRITE) {
161 // Write until the pipe buffer is full, so EventBase will stop calling
162 // us until the other end has read some data
163 bytesWritten = writeUntilFull(fd_);
166 log.emplace_back(events, bytesRead, bytesWritten);
170 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
173 , bytesRead(bytesRead)
174 , bytesWritten(bytesWritten) {}
179 ssize_t bytesWritten;
182 deque<EventRecord> log;
191 TEST(EventBaseTest, ReadEvent) {
195 // Register for read events
196 TestHandler handler(&eb, sp[0]);
197 handler.registerHandler(EventHandler::READ);
199 // Register timeouts to perform two write events
200 ScheduledEvent events[] = {
201 { 10, EventHandler::WRITE, 2345, 0 },
202 { 160, EventHandler::WRITE, 99, 0 },
205 scheduleEvents(&eb, sp[1], events);
212 // Since we didn't use the EventHandler::PERSIST flag, the handler should
213 // have received the first read, then unregistered itself. Check that only
214 // the first chunk of data was received.
215 ASSERT_EQ(handler.log.size(), 1);
216 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
217 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
218 milliseconds(events[0].milliseconds), milliseconds(90));
219 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
220 ASSERT_EQ(handler.log[0].bytesWritten, 0);
221 T_CHECK_TIMEOUT(start, end,
222 milliseconds(events[1].milliseconds), milliseconds(30));
224 // Make sure the second chunk of data is still waiting to be read.
225 size_t bytesRemaining = readUntilEmpty(sp[0]);
226 ASSERT_EQ(bytesRemaining, events[1].length);
230 * Test (READ | PERSIST)
232 TEST(EventBaseTest, ReadPersist) {
236 // Register for read events
237 TestHandler handler(&eb, sp[0]);
238 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
240 // Register several timeouts to perform writes
241 ScheduledEvent events[] = {
242 { 10, EventHandler::WRITE, 1024, 0 },
243 { 20, EventHandler::WRITE, 2211, 0 },
244 { 30, EventHandler::WRITE, 4096, 0 },
245 { 100, EventHandler::WRITE, 100, 0 },
248 scheduleEvents(&eb, sp[1], events);
250 // Schedule a timeout to unregister the handler after the third write
251 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
258 // The handler should have received the first 3 events,
259 // then been unregistered after that.
260 ASSERT_EQ(handler.log.size(), 3);
261 for (int n = 0; n < 3; ++n) {
262 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
263 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
264 milliseconds(events[n].milliseconds));
265 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
266 ASSERT_EQ(handler.log[n].bytesWritten, 0);
268 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
270 // Make sure the data from the last write is still waiting to be read
271 size_t bytesRemaining = readUntilEmpty(sp[0]);
272 ASSERT_EQ(bytesRemaining, events[3].length);
276 * Test registering for READ when the socket is immediately readable
278 TEST(EventBaseTest, ReadImmediate) {
282 // Write some data to the socket so the other end will
283 // be immediately readable
284 size_t dataLength = 1234;
285 writeToFD(sp[1], dataLength);
287 // Register for read events
288 TestHandler handler(&eb, sp[0]);
289 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
291 // Register a timeout to perform another write
292 ScheduledEvent events[] = {
293 { 10, EventHandler::WRITE, 2345, 0 },
296 scheduleEvents(&eb, sp[1], events);
298 // Schedule a timeout to unregister the handler
299 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
306 ASSERT_EQ(handler.log.size(), 2);
308 // There should have been 1 event for immediate readability
309 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
310 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
311 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
312 ASSERT_EQ(handler.log[0].bytesWritten, 0);
314 // There should be another event after the timeout wrote more data
315 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
316 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
317 milliseconds(events[0].milliseconds));
318 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
319 ASSERT_EQ(handler.log[1].bytesWritten, 0);
321 T_CHECK_TIMEOUT(start, end, milliseconds(20));
327 TEST(EventBaseTest, WriteEvent) {
331 // Fill up the write buffer before starting
332 size_t initialBytesWritten = writeUntilFull(sp[0]);
334 // Register for write events
335 TestHandler handler(&eb, sp[0]);
336 handler.registerHandler(EventHandler::WRITE);
338 // Register timeouts to perform two reads
339 ScheduledEvent events[] = {
340 { 10, EventHandler::READ, 0, 0 },
341 { 60, EventHandler::READ, 0, 0 },
344 scheduleEvents(&eb, sp[1], events);
351 // Since we didn't use the EventHandler::PERSIST flag, the handler should
352 // have only been able to write once, then unregistered itself.
353 ASSERT_EQ(handler.log.size(), 1);
354 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
355 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
356 milliseconds(events[0].milliseconds));
357 ASSERT_EQ(handler.log[0].bytesRead, 0);
358 ASSERT_GT(handler.log[0].bytesWritten, 0);
359 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
361 ASSERT_EQ(events[0].result, initialBytesWritten);
362 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
366 * Test (WRITE | PERSIST)
368 TEST(EventBaseTest, WritePersist) {
372 // Fill up the write buffer before starting
373 size_t initialBytesWritten = writeUntilFull(sp[0]);
375 // Register for write events
376 TestHandler handler(&eb, sp[0]);
377 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
379 // Register several timeouts to read from the socket at several intervals
380 ScheduledEvent events[] = {
381 { 10, EventHandler::READ, 0, 0 },
382 { 40, EventHandler::READ, 0, 0 },
383 { 70, EventHandler::READ, 0, 0 },
384 { 100, EventHandler::READ, 0, 0 },
387 scheduleEvents(&eb, sp[1], events);
389 // Schedule a timeout to unregister the handler after the third read
390 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
397 // The handler should have received the first 3 events,
398 // then been unregistered after that.
399 ASSERT_EQ(handler.log.size(), 3);
400 ASSERT_EQ(events[0].result, initialBytesWritten);
401 for (int n = 0; n < 3; ++n) {
402 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
403 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
404 milliseconds(events[n].milliseconds));
405 ASSERT_EQ(handler.log[n].bytesRead, 0);
406 ASSERT_GT(handler.log[n].bytesWritten, 0);
407 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
409 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
413 * Test registering for WRITE when the socket is immediately writable
415 TEST(EventBaseTest, WriteImmediate) {
419 // Register for write events
420 TestHandler handler(&eb, sp[0]);
421 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
423 // Register a timeout to perform a read
424 ScheduledEvent events[] = {
425 { 10, EventHandler::READ, 0, 0 },
428 scheduleEvents(&eb, sp[1], events);
430 // Schedule a timeout to unregister the handler
431 int64_t unregisterTimeout = 40;
432 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
440 ASSERT_EQ(handler.log.size(), 2);
442 // Since the socket buffer was initially empty,
443 // there should have been 1 event for immediate writability
444 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
445 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
446 ASSERT_EQ(handler.log[0].bytesRead, 0);
447 ASSERT_GT(handler.log[0].bytesWritten, 0);
449 // There should be another event after the timeout wrote more data
450 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
451 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
452 milliseconds(events[0].milliseconds));
453 ASSERT_EQ(handler.log[1].bytesRead, 0);
454 ASSERT_GT(handler.log[1].bytesWritten, 0);
456 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
460 * Test (READ | WRITE) when the socket becomes readable first
462 TEST(EventBaseTest, ReadWrite) {
466 // Fill up the write buffer before starting
467 size_t sock0WriteLength = writeUntilFull(sp[0]);
469 // Register for read and write events
470 TestHandler handler(&eb, sp[0]);
471 handler.registerHandler(EventHandler::READ_WRITE);
473 // Register timeouts to perform a write then a read.
474 ScheduledEvent events[] = {
475 { 10, EventHandler::WRITE, 2345, 0 },
476 { 40, EventHandler::READ, 0, 0 },
479 scheduleEvents(&eb, sp[1], events);
486 // Since we didn't use the EventHandler::PERSIST flag, the handler should
487 // have only noticed readability, then unregistered itself. Check that only
488 // one event was logged.
489 ASSERT_EQ(handler.log.size(), 1);
490 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
491 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
492 milliseconds(events[0].milliseconds));
493 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
494 ASSERT_EQ(handler.log[0].bytesWritten, 0);
495 ASSERT_EQ(events[1].result, sock0WriteLength);
496 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
500 * Test (READ | WRITE) when the socket becomes writable first
502 TEST(EventBaseTest, WriteRead) {
506 // Fill up the write buffer before starting
507 size_t sock0WriteLength = writeUntilFull(sp[0]);
509 // Register for read and write events
510 TestHandler handler(&eb, sp[0]);
511 handler.registerHandler(EventHandler::READ_WRITE);
513 // Register timeouts to perform a read then a write.
514 size_t sock1WriteLength = 2345;
515 ScheduledEvent events[] = {
516 { 10, EventHandler::READ, 0, 0 },
517 { 40, EventHandler::WRITE, sock1WriteLength, 0 },
520 scheduleEvents(&eb, sp[1], events);
527 // Since we didn't use the EventHandler::PERSIST flag, the handler should
528 // have only noticed writability, then unregistered itself. Check that only
529 // one event was logged.
530 ASSERT_EQ(handler.log.size(), 1);
531 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
532 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
533 milliseconds(events[0].milliseconds));
534 ASSERT_EQ(handler.log[0].bytesRead, 0);
535 ASSERT_GT(handler.log[0].bytesWritten, 0);
536 ASSERT_EQ(events[0].result, sock0WriteLength);
537 ASSERT_EQ(events[1].result, sock1WriteLength);
538 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
540 // Make sure the written data is still waiting to be read.
541 size_t bytesRemaining = readUntilEmpty(sp[0]);
542 ASSERT_EQ(bytesRemaining, events[1].length);
546 * Test (READ | WRITE) when the socket becomes readable and writable
549 TEST(EventBaseTest, ReadWriteSimultaneous) {
553 // Fill up the write buffer before starting
554 size_t sock0WriteLength = writeUntilFull(sp[0]);
556 // Register for read and write events
557 TestHandler handler(&eb, sp[0]);
558 handler.registerHandler(EventHandler::READ_WRITE);
560 // Register a timeout to perform a read and write together
561 ScheduledEvent events[] = {
562 { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
565 scheduleEvents(&eb, sp[1], events);
572 // It's not strictly required that the EventBase register us about both
573 // events in the same call. So, it's possible that if the EventBase
574 // implementation changes this test could start failing, and it wouldn't be
575 // considered breaking the API. However for now it's nice to exercise this
577 ASSERT_EQ(handler.log.size(), 1);
578 ASSERT_EQ(handler.log[0].events,
579 EventHandler::READ | EventHandler::WRITE);
580 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
581 milliseconds(events[0].milliseconds));
582 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
583 ASSERT_GT(handler.log[0].bytesWritten, 0);
584 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
588 * Test (READ | WRITE | PERSIST)
590 TEST(EventBaseTest, ReadWritePersist) {
594 // Register for read and write events
595 TestHandler handler(&eb, sp[0]);
596 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
597 EventHandler::PERSIST);
599 // Register timeouts to perform several reads and writes
600 ScheduledEvent events[] = {
601 { 10, EventHandler::WRITE, 2345, 0 },
602 { 20, EventHandler::READ, 0, 0 },
603 { 35, EventHandler::WRITE, 200, 0 },
604 { 45, EventHandler::WRITE, 15, 0 },
605 { 55, EventHandler::READ, 0, 0 },
606 { 120, EventHandler::WRITE, 2345, 0 },
609 scheduleEvents(&eb, sp[1], events);
611 // Schedule a timeout to unregister the handler
612 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
619 ASSERT_EQ(handler.log.size(), 6);
621 // Since we didn't fill up the write buffer immediately, there should
622 // be an immediate event for writability.
623 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
624 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
625 ASSERT_EQ(handler.log[0].bytesRead, 0);
626 ASSERT_GT(handler.log[0].bytesWritten, 0);
628 // Events 1 through 5 should correspond to the scheduled events
629 for (int n = 1; n < 6; ++n) {
630 ScheduledEvent* event = &events[n - 1];
631 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
632 milliseconds(event->milliseconds));
633 if (event->events == EventHandler::READ) {
634 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
635 ASSERT_EQ(handler.log[n].bytesRead, 0);
636 ASSERT_GT(handler.log[n].bytesWritten, 0);
638 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
639 ASSERT_EQ(handler.log[n].bytesRead, event->length);
640 ASSERT_EQ(handler.log[n].bytesWritten, 0);
644 // The timeout should have unregistered the handler before the last write.
645 // Make sure that data is still waiting to be read
646 size_t bytesRemaining = readUntilEmpty(sp[0]);
647 ASSERT_EQ(bytesRemaining, events[5].length);
651 class PartialReadHandler : public TestHandler {
653 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
654 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
656 void handlerReady(uint16_t events) noexcept override {
657 assert(events == EventHandler::READ);
658 ssize_t bytesRead = readFromFD(fd_, readLength_);
659 log.emplace_back(events, bytesRead, 0);
668 * Test reading only part of the available data when a read event is fired.
669 * When PERSIST is used, make sure the handler gets notified again the next
670 * time around the loop.
672 TEST(EventBaseTest, ReadPartial) {
676 // Register for read events
677 size_t readLength = 100;
678 PartialReadHandler handler(&eb, sp[0], readLength);
679 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
681 // Register a timeout to perform a single write,
682 // with more data than PartialReadHandler will read at once
683 ScheduledEvent events[] = {
684 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
687 scheduleEvents(&eb, sp[1], events);
689 // Schedule a timeout to unregister the handler
690 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
697 ASSERT_EQ(handler.log.size(), 4);
699 // The first 3 invocations should read readLength bytes each
700 for (int n = 0; n < 3; ++n) {
701 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
702 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
703 milliseconds(events[0].milliseconds));
704 ASSERT_EQ(handler.log[n].bytesRead, readLength);
705 ASSERT_EQ(handler.log[n].bytesWritten, 0);
707 // The last read only has readLength/2 bytes
708 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
709 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
710 milliseconds(events[0].milliseconds));
711 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
712 ASSERT_EQ(handler.log[3].bytesWritten, 0);
716 class PartialWriteHandler : public TestHandler {
718 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
719 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
721 void handlerReady(uint16_t events) noexcept override {
722 assert(events == EventHandler::WRITE);
723 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
724 log.emplace_back(events, 0, bytesWritten);
733 * Test writing without completely filling up the write buffer when the fd
734 * becomes writable. When PERSIST is used, make sure the handler gets
735 * notified again the next time around the loop.
737 TEST(EventBaseTest, WritePartial) {
741 // Fill up the write buffer before starting
742 size_t initialBytesWritten = writeUntilFull(sp[0]);
744 // Register for write events
745 size_t writeLength = 100;
746 PartialWriteHandler handler(&eb, sp[0], writeLength);
747 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
749 // Register a timeout to read, so that more data can be written
750 ScheduledEvent events[] = {
751 { 10, EventHandler::READ, 0, 0 },
754 scheduleEvents(&eb, sp[1], events);
756 // Schedule a timeout to unregister the handler
757 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
764 // Depending on how big the socket buffer is, there will be multiple writes
765 // Only check the first 5
767 ASSERT_GE(handler.log.size(), numChecked);
768 ASSERT_EQ(events[0].result, initialBytesWritten);
770 // The first 3 invocations should read writeLength bytes each
771 for (int n = 0; n < numChecked; ++n) {
772 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
773 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
774 milliseconds(events[0].milliseconds));
775 ASSERT_EQ(handler.log[n].bytesRead, 0);
776 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
782 * Test destroying a registered EventHandler
784 TEST(EventBaseTest, DestroyHandler) {
785 class DestroyHandler : public AsyncTimeout {
787 DestroyHandler(EventBase* eb, EventHandler* h)
791 void timeoutExpired() noexcept override { delete handler_; }
794 EventHandler* handler_;
800 // Fill up the write buffer before starting
801 size_t initialBytesWritten = writeUntilFull(sp[0]);
803 // Register for write events
804 TestHandler* handler = new TestHandler(&eb, sp[0]);
805 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
807 // After 10ms, read some data, so that the handler
808 // will be notified that it can write.
809 eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
812 // Start a timer to destroy the handler after 25ms
813 // This mainly just makes sure the code doesn't break or assert
814 DestroyHandler dh(&eb, handler);
815 dh.scheduleTimeout(25);
821 // Make sure the EventHandler was uninstalled properly when it was
822 // destroyed, and the EventBase loop exited
823 T_CHECK_TIMEOUT(start, end, milliseconds(25));
825 // Make sure that the handler wrote data to the socket
826 // before it was destroyed
827 size_t bytesRemaining = readUntilEmpty(sp[1]);
828 ASSERT_GT(bytesRemaining, 0);
832 ///////////////////////////////////////////////////////////////////////////
833 // Tests for timeout events
834 ///////////////////////////////////////////////////////////////////////////
836 TEST(EventBaseTest, RunAfterDelay) {
839 TimePoint timestamp1(false);
840 TimePoint timestamp2(false);
841 TimePoint timestamp3(false);
842 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
843 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
844 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
850 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
851 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
852 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
853 T_CHECK_TIMEOUT(start, end, milliseconds(40));
857 * Test the behavior of tryRunAfterDelay() when some timeouts are
858 * still scheduled when the EventBase is destroyed.
860 TEST(EventBaseTest, RunAfterDelayDestruction) {
861 TimePoint timestamp1(false);
862 TimePoint timestamp2(false);
863 TimePoint timestamp3(false);
864 TimePoint timestamp4(false);
865 TimePoint start(false);
866 TimePoint end(false);
871 // Run two normal timeouts
872 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
873 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
875 // Schedule a timeout to stop the event loop after 40ms
876 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
878 // Schedule 2 timeouts that would fire after the event loop stops
879 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
880 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
887 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
888 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
889 T_CHECK_TIMEOUT(start, end, milliseconds(40));
891 ASSERT_TRUE(timestamp3.isUnset());
892 ASSERT_TRUE(timestamp4.isUnset());
894 // Ideally this test should be run under valgrind to ensure that no
898 class TestTimeout : public AsyncTimeout {
900 explicit TestTimeout(EventBase* eventBase)
901 : AsyncTimeout(eventBase)
902 , timestamp(false) {}
904 void timeoutExpired() noexcept override { timestamp.reset(); }
909 TEST(EventBaseTest, BasicTimeouts) {
915 t1.scheduleTimeout(10);
916 t2.scheduleTimeout(20);
917 t3.scheduleTimeout(40);
923 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
924 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
925 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
926 T_CHECK_TIMEOUT(start, end, milliseconds(40));
929 class ReschedulingTimeout : public AsyncTimeout {
931 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
933 , timeouts_(timeouts)
934 , iterator_(timeouts_.begin()) {}
940 void timeoutExpired() noexcept override {
941 timestamps.emplace_back();
946 if (iterator_ != timeouts_.end()) {
947 uint32_t timeout = *iterator_;
949 scheduleTimeout(timeout);
953 vector<TimePoint> timestamps;
956 vector<uint32_t> timeouts_;
957 vector<uint32_t>::const_iterator iterator_;
961 * Test rescheduling the same timeout multiple times
963 TEST(EventBaseTest, ReuseTimeout) {
966 vector<uint32_t> timeouts;
967 timeouts.push_back(10);
968 timeouts.push_back(30);
969 timeouts.push_back(15);
971 ReschedulingTimeout t(&eb, timeouts);
978 // Use a higher tolerance than usual. We're waiting on 3 timeouts
979 // consecutively. In general, each timeout may go over by a few
980 // milliseconds, and we're tripling this error by witing on 3 timeouts.
981 milliseconds tolerance{6};
983 ASSERT_EQ(timeouts.size(), t.timestamps.size());
985 for (size_t n = 0; n < timeouts.size(); ++n) {
986 total += timeouts[n];
987 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
989 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
993 * Test rescheduling a timeout before it has fired
995 TEST(EventBaseTest, RescheduleTimeout) {
1000 TestTimeout t3(&eb);
1002 t1.scheduleTimeout(15);
1003 t2.scheduleTimeout(30);
1004 t3.scheduleTimeout(30);
1006 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1007 &AsyncTimeout::scheduleTimeout);
1009 // after 10ms, reschedule t2 to run sooner than originally scheduled
1010 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1011 // after 10ms, reschedule t3 to run later than originally scheduled
1012 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1018 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1019 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1020 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1021 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1025 * Test cancelling a timeout
1027 TEST(EventBaseTest, CancelTimeout) {
1030 vector<uint32_t> timeouts;
1031 timeouts.push_back(10);
1032 timeouts.push_back(30);
1033 timeouts.push_back(25);
1035 ReschedulingTimeout t(&eb, timeouts);
1037 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1043 ASSERT_EQ(t.timestamps.size(), 2);
1044 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1045 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1046 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1050 * Test destroying a scheduled timeout object
1052 TEST(EventBaseTest, DestroyTimeout) {
1053 class DestroyTimeout : public AsyncTimeout {
1055 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1059 void timeoutExpired() noexcept override { delete timeout_; }
1062 AsyncTimeout* timeout_;
1067 TestTimeout* t1 = new TestTimeout(&eb);
1068 t1->scheduleTimeout(30);
1070 DestroyTimeout dt(&eb, t1);
1071 dt.scheduleTimeout(10);
1077 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1081 ///////////////////////////////////////////////////////////////////////////
1082 // Test for runInThreadTestFunc()
1083 ///////////////////////////////////////////////////////////////////////////
1085 struct RunInThreadData {
1086 RunInThreadData(int numThreads, int opsPerThread)
1087 : opsPerThread(opsPerThread)
1088 , opsToGo(numThreads*opsPerThread) {}
1091 deque< pair<int, int> > values;
1097 struct RunInThreadArg {
1098 RunInThreadArg(RunInThreadData* data,
1105 RunInThreadData* data;
1110 void runInThreadTestFunc(RunInThreadArg* arg) {
1111 arg->data->values.emplace_back(arg->thread, arg->value);
1112 RunInThreadData* data = arg->data;
1115 if(--data->opsToGo == 0) {
1116 // Break out of the event base loop if we are the last thread running
1117 data->evb.terminateLoopSoon();
1121 TEST(EventBaseTest, RunInThread) {
1122 constexpr uint32_t numThreads = 50;
1123 constexpr uint32_t opsPerThread = 100;
1124 RunInThreadData data(numThreads, opsPerThread);
1126 deque<std::thread> threads;
1128 // Wait on all of the threads.
1129 for (auto& thread : threads) {
1134 for (uint32_t i = 0; i < numThreads; ++i) {
1135 threads.emplace_back([i, &data] {
1136 for (int n = 0; n < data.opsPerThread; ++n) {
1137 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1138 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1144 // Add a timeout event to run after 3 seconds.
1145 // Otherwise loop() will return immediately since there are no events to run.
1146 // Once the last thread exits, it will stop the loop(). However, this
1147 // timeout also stops the loop in case there is a bug performing the normal
1149 data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1156 // Verify that the loop exited because all threads finished and requested it
1157 // to stop. This should happen much sooner than the 3 second timeout.
1158 // Assert that it happens in under a second. (This is still tons of extra
1161 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1162 end.getTime() - start.getTime());
1163 ASSERT_LT(timeTaken.count(), 1000);
1164 VLOG(11) << "Time taken: " << timeTaken.count();
1166 // Verify that we have all of the events from every thread
1167 int expectedValues[numThreads];
1168 for (uint32_t n = 0; n < numThreads; ++n) {
1169 expectedValues[n] = 0;
1171 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1172 it != data.values.end();
1174 int threadID = it->first;
1175 int value = it->second;
1176 ASSERT_EQ(expectedValues[threadID], value);
1177 ++expectedValues[threadID];
1179 for (uint32_t n = 0; n < numThreads; ++n) {
1180 ASSERT_EQ(expectedValues[n], opsPerThread);
1184 // This test simulates some calls, and verifies that the waiting happens by
1185 // triggering what otherwise would be race conditions, and trying to detect
1186 // whether any of the race conditions happened.
1187 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1188 const size_t c = 256;
1189 vector<unique_ptr<atomic<size_t>>> atoms(c);
1190 for (size_t i = 0; i < c; ++i) {
1191 auto& atom = atoms.at(i);
1192 atom = make_unique<atomic<size_t>>(0);
1194 vector<thread> threads;
1195 for (size_t i = 0; i < c; ++i) {
1196 threads.emplace_back([&atoms, i] {
1198 auto& atom = *atoms.at(i);
1199 auto ebth = thread([&] { eb.loopForever(); });
1200 eb.waitUntilRunning();
1201 eb.runInEventBaseThreadAndWait([&] {
1203 atom.compare_exchange_weak(
1204 x, 1, std::memory_order_release, std::memory_order_relaxed);
1207 atom.compare_exchange_weak(
1208 x, 2, std::memory_order_release, std::memory_order_relaxed);
1209 eb.terminateLoopSoon();
1213 for (size_t i = 0; i < c; ++i) {
1214 auto& th = threads.at(i);
1218 for (auto& atom : atoms) sum += *atom;
1222 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1224 thread th(&EventBase::loopForever, &eb);
1226 eb.terminateLoopSoon();
1229 auto mutated = false;
1230 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1233 EXPECT_TRUE(mutated);
1236 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1238 thread th(&EventBase::loopForever, &eb);
1240 eb.terminateLoopSoon();
1243 eb.runInEventBaseThreadAndWait([&] {
1244 auto mutated = false;
1245 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1248 EXPECT_TRUE(mutated);
1252 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1254 auto mutated = false;
1255 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1258 EXPECT_TRUE(mutated);
1261 ///////////////////////////////////////////////////////////////////////////
1262 // Tests for runInLoop()
1263 ///////////////////////////////////////////////////////////////////////////
1265 class CountedLoopCallback : public EventBase::LoopCallback {
1267 CountedLoopCallback(EventBase* eventBase,
1269 std::function<void()> action =
1270 std::function<void()>())
1271 : eventBase_(eventBase)
1273 , action_(action) {}
1275 void runLoopCallback() noexcept override {
1278 eventBase_->runInLoop(this);
1279 } else if (action_) {
1284 unsigned int getCount() const {
1289 EventBase* eventBase_;
1290 unsigned int count_;
1291 std::function<void()> action_;
1294 // Test that EventBase::loop() doesn't exit while there are
1295 // still LoopCallbacks remaining to be invoked.
1296 TEST(EventBaseTest, RepeatedRunInLoop) {
1297 EventBase eventBase;
1299 CountedLoopCallback c(&eventBase, 10);
1300 eventBase.runInLoop(&c);
1301 // The callback shouldn't have run immediately
1302 ASSERT_EQ(c.getCount(), 10);
1305 // loop() should loop until the CountedLoopCallback stops
1306 // re-installing itself.
1307 ASSERT_EQ(c.getCount(), 0);
1310 // Test that EventBase::loop() works as expected without time measurements.
1311 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1312 EventBase eventBase(false);
1314 CountedLoopCallback c(&eventBase, 10);
1315 eventBase.runInLoop(&c);
1316 // The callback shouldn't have run immediately
1317 ASSERT_EQ(c.getCount(), 10);
1320 // loop() should loop until the CountedLoopCallback stops
1321 // re-installing itself.
1322 ASSERT_EQ(c.getCount(), 0);
1325 // Test runInLoop() calls with terminateLoopSoon()
1326 TEST(EventBaseTest, RunInLoopStopLoop) {
1327 EventBase eventBase;
1329 CountedLoopCallback c1(&eventBase, 20);
1330 CountedLoopCallback c2(&eventBase, 10,
1331 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1333 eventBase.runInLoop(&c1);
1334 eventBase.runInLoop(&c2);
1335 ASSERT_EQ(c1.getCount(), 20);
1336 ASSERT_EQ(c2.getCount(), 10);
1338 eventBase.loopForever();
1340 // c2 should have stopped the loop after 10 iterations
1341 ASSERT_EQ(c2.getCount(), 0);
1343 // We allow the EventBase to run the loop callbacks in whatever order it
1344 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1345 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1348 // (With the current code, c1 will always run 10 times, but we don't consider
1349 // this a hard API requirement.)
1350 ASSERT_GE(c1.getCount(), 10);
1351 ASSERT_LE(c1.getCount(), 11);
1354 TEST(EventBaseTest, TryRunningAfterTerminate) {
1355 EventBase eventBase;
1356 CountedLoopCallback c1(&eventBase, 1,
1357 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1358 eventBase.runInLoop(&c1);
1359 eventBase.loopForever();
1361 eventBase.runInEventBaseThread([&]() {
1368 // Test cancelling runInLoop() callbacks
1369 TEST(EventBaseTest, CancelRunInLoop) {
1370 EventBase eventBase;
1372 CountedLoopCallback c1(&eventBase, 20);
1373 CountedLoopCallback c2(&eventBase, 20);
1374 CountedLoopCallback c3(&eventBase, 20);
1376 std::function<void()> cancelC1Action =
1377 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1378 std::function<void()> cancelC2Action =
1379 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1381 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1382 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1384 // Install cancelC1 after c1
1385 eventBase.runInLoop(&c1);
1386 eventBase.runInLoop(&cancelC1);
1388 // Install cancelC2 before c2
1389 eventBase.runInLoop(&cancelC2);
1390 eventBase.runInLoop(&c2);
1393 eventBase.runInLoop(&c3);
1395 ASSERT_EQ(c1.getCount(), 20);
1396 ASSERT_EQ(c2.getCount(), 20);
1397 ASSERT_EQ(c3.getCount(), 20);
1398 ASSERT_EQ(cancelC1.getCount(), 10);
1399 ASSERT_EQ(cancelC2.getCount(), 10);
1404 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1405 // stopped re-installing themselves
1406 ASSERT_EQ(cancelC1.getCount(), 0);
1407 ASSERT_EQ(cancelC2.getCount(), 0);
1408 // c3 should have continued on for the full 20 iterations
1409 ASSERT_EQ(c3.getCount(), 0);
1411 // c1 and c2 should have both been cancelled on the 10th iteration.
1413 // Callbacks are always run in the order they are installed,
1414 // so c1 should have fired 10 times, and been canceled after it ran on the
1415 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1416 // have run before it on the 10th iteration, and cancelled it before it
1418 ASSERT_EQ(c1.getCount(), 10);
1419 ASSERT_EQ(c2.getCount(), 11);
1422 class TerminateTestCallback : public EventBase::LoopCallback,
1423 public EventHandler {
1425 TerminateTestCallback(EventBase* eventBase, int fd)
1426 : EventHandler(eventBase, fd),
1427 eventBase_(eventBase),
1428 loopInvocations_(0),
1429 maxLoopInvocations_(0),
1430 eventInvocations_(0),
1431 maxEventInvocations_(0) {}
1433 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1434 loopInvocations_ = 0;
1435 maxLoopInvocations_ = maxLoopInvocations;
1436 eventInvocations_ = 0;
1437 maxEventInvocations_ = maxEventInvocations;
1439 cancelLoopCallback();
1440 unregisterHandler();
1443 void handlerReady(uint16_t /* events */) noexcept override {
1444 // We didn't register with PERSIST, so we will have been automatically
1445 // unregistered already.
1446 ASSERT_FALSE(isHandlerRegistered());
1448 ++eventInvocations_;
1449 if (eventInvocations_ >= maxEventInvocations_) {
1453 eventBase_->runInLoop(this);
1455 void runLoopCallback() noexcept override {
1457 if (loopInvocations_ >= maxLoopInvocations_) {
1461 registerHandler(READ);
1464 uint32_t getLoopInvocations() const {
1465 return loopInvocations_;
1467 uint32_t getEventInvocations() const {
1468 return eventInvocations_;
1472 EventBase* eventBase_;
1473 uint32_t loopInvocations_;
1474 uint32_t maxLoopInvocations_;
1475 uint32_t eventInvocations_;
1476 uint32_t maxEventInvocations_;
1480 * Test that EventBase::loop() correctly detects when there are no more events
1483 * This uses a single callback, which alternates registering itself as a loop
1484 * callback versus a EventHandler callback. This exercises a regression where
1485 * EventBase::loop() incorrectly exited if there were no more fd handlers
1486 * registered, but a loop callback installed a new fd handler.
1488 TEST(EventBaseTest, LoopTermination) {
1489 EventBase eventBase;
1491 // Open a pipe and close the write end,
1492 // so the read endpoint will be readable
1494 int rc = pipe(pipeFds);
1497 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1499 // Test once where the callback will exit after a loop callback
1500 callback.reset(10, 100);
1501 eventBase.runInLoop(&callback);
1503 ASSERT_EQ(callback.getLoopInvocations(), 10);
1504 ASSERT_EQ(callback.getEventInvocations(), 9);
1506 // Test once where the callback will exit after an fd event callback
1507 callback.reset(100, 7);
1508 eventBase.runInLoop(&callback);
1510 ASSERT_EQ(callback.getLoopInvocations(), 7);
1511 ASSERT_EQ(callback.getEventInvocations(), 7);
1516 ///////////////////////////////////////////////////////////////////////////
1517 // Tests for latency calculations
1518 ///////////////////////////////////////////////////////////////////////////
1520 class IdleTimeTimeoutSeries : public AsyncTimeout {
1524 explicit IdleTimeTimeoutSeries(EventBase *base,
1525 std::deque<std::uint64_t>& timeout) :
1532 ~IdleTimeTimeoutSeries() override {}
1534 void timeoutExpired() noexcept override {
1537 if(timeout_.empty()){
1540 uint64_t sleepTime = timeout_.front();
1541 timeout_.pop_front();
1549 int getTimeouts() const {
1555 std::deque<uint64_t>& timeout_;
1559 * Verify that idle time is correctly accounted for when decaying our loop
1562 * This works by creating a high loop time (via usleep), expecting a latency
1563 * callback with known value, and then scheduling a timeout for later. This
1564 * later timeout is far enough in the future that the idle time should have
1565 * caused the loop time to decay.
1567 TEST(EventBaseTest, IdleTime) {
1568 EventBase eventBase;
1569 eventBase.setLoadAvgMsec(1000ms);
1570 eventBase.resetLoadAvg(5900.0);
1571 std::deque<uint64_t> timeouts0(4, 8080);
1572 timeouts0.push_front(8000);
1573 timeouts0.push_back(14000);
1574 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1575 std::deque<uint64_t> timeouts(20, 20);
1576 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1577 int64_t testStart = duration_cast<microseconds>(
1578 std::chrono::steady_clock::now().time_since_epoch()).count();
1579 bool hostOverloaded = false;
1581 int latencyCallbacks = 0;
1582 eventBase.setMaxLatency(6000us, [&]() {
1584 if (latencyCallbacks != 1) {
1585 FAIL() << "Unexpected latency callback";
1588 if (tos0.getTimeouts() < 6) {
1589 // This could only happen if the host this test is running
1590 // on is heavily loaded.
1591 int64_t maxLatencyReached = duration_cast<microseconds>(
1592 std::chrono::steady_clock::now().time_since_epoch()).count();
1593 ASSERT_LE(43800, maxLatencyReached - testStart);
1594 hostOverloaded = true;
1597 ASSERT_EQ(6, tos0.getTimeouts());
1598 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1599 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1600 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1603 // Kick things off with an "immedite" timeout
1604 tos0.scheduleTimeout(1);
1608 if (hostOverloaded) {
1612 ASSERT_EQ(1, latencyCallbacks);
1613 ASSERT_EQ(7, tos0.getTimeouts());
1614 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1615 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1617 ASSERT_EQ(21, tos->getTimeouts());
1621 * Test that thisLoop functionality works with terminateLoopSoon
1623 TEST(EventBaseTest, ThisLoop) {
1625 bool runInLoop = false;
1626 bool runThisLoop = false;
1629 eb.terminateLoopSoon();
1630 eb.runInLoop([&]() {
1633 eb.runInLoop([&]() {
1640 ASSERT_FALSE(runInLoop);
1641 // Should work with thisLoop
1642 ASSERT_TRUE(runThisLoop);
1645 TEST(EventBaseTest, EventBaseThreadLoop) {
1649 base.runInEventBaseThread([&](){
1654 ASSERT_EQ(true, ran);
1657 TEST(EventBaseTest, EventBaseThreadName) {
1659 base.setName("foo");
1662 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1664 pthread_getname_np(pthread_self(), name, 16);
1665 ASSERT_EQ(0, strcmp("foo", name));
1669 TEST(EventBaseTest, RunBeforeLoop) {
1671 CountedLoopCallback cb(&base, 1, [&](){
1672 base.terminateLoopSoon();
1674 base.runBeforeLoop(&cb);
1676 ASSERT_EQ(cb.getCount(), 0);
1679 TEST(EventBaseTest, RunBeforeLoopWait) {
1681 CountedLoopCallback cb(&base, 1);
1682 base.tryRunAfterDelay([&](){
1683 base.terminateLoopSoon();
1685 base.runBeforeLoop(&cb);
1688 // Check that we only ran once, and did not loop multiple times.
1689 ASSERT_EQ(cb.getCount(), 0);
1692 class PipeHandler : public EventHandler {
1694 PipeHandler(EventBase* eventBase, int fd)
1695 : EventHandler(eventBase, fd) {}
1697 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1700 TEST(EventBaseTest, StopBeforeLoop) {
1703 // Give the evb something to do.
1705 ASSERT_EQ(0, pipe(p));
1706 PipeHandler handler(&evb, p[0]);
1707 handler.registerHandler(EventHandler::READ);
1709 // It's definitely not running yet
1710 evb.terminateLoopSoon();
1712 // let it run, it should exit quickly.
1713 std::thread t([&] { evb.loop(); });
1716 handler.unregisterHandler();
1723 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1728 base.runInEventBaseThread([&](){
1736 TEST(EventBaseTest, LoopKeepAlive) {
1740 std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1741 /* sleep override */ std::this_thread::sleep_for(
1742 std::chrono::milliseconds(100));
1743 evb.runInEventBaseThread(
1744 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1754 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1760 evb.runInEventBaseThread([&] {
1761 t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1762 /* sleep override */ std::this_thread::sleep_for(
1763 std::chrono::milliseconds(100));
1764 evb.runInEventBaseThread(
1765 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1776 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1777 std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1781 std::thread evThread([&] {
1788 auto* ev = evb.get();
1789 Executor::KeepAlive keepAlive;
1790 ev->runInEventBaseThreadAndWait(
1791 [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
1792 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1793 ev->terminateLoopSoon();
1794 /* sleep override */
1795 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1796 ASSERT_FALSE(done) << "Loop terminated early";
1797 ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1804 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1805 auto evb = folly::make_unique<EventBase>();
1811 loopKeepAlive = evb->getKeepAliveToken(),
1814 /* sleep override */ std::this_thread::sleep_for(
1815 std::chrono::milliseconds(100));
1816 evbPtr->runInEventBaseThread(
1817 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1827 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1828 auto evb = folly::make_unique<EventBase>();
1830 static constexpr size_t kNumThreads = 100;
1831 static constexpr size_t kNumTasks = 100;
1833 std::vector<std::thread> ts;
1834 std::vector<std::unique_ptr<Baton<>>> batons;
1837 for (size_t i = 0; i < kNumThreads; ++i) {
1838 batons.emplace_back(folly::make_unique<Baton<>>());
1841 for (size_t i = 0; i < kNumThreads; ++i) {
1842 ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1843 std::vector<Executor::KeepAlive> keepAlives;
1844 for (size_t j = 0; j < kNumTasks; ++j) {
1845 keepAlives.emplace_back(evbPtr->getKeepAliveToken());
1850 /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1852 for (auto& keepAlive : keepAlives) {
1853 evbPtr->runInEventBaseThread(
1854 [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1859 for (auto& baton : batons) {
1865 EXPECT_EQ(kNumThreads * kNumTasks, done);
1867 for (auto& t : ts) {
1872 TEST(EventBaseTest, DrivableExecutorTest) {
1873 folly::Promise<bool> p;
1874 auto f = p.getFuture();
1876 bool finished = false;
1879 /* sleep override */
1880 std::this_thread::sleep_for(std::chrono::microseconds(10));
1882 base.runInEventBaseThread([&]() { p.setValue(true); });
1885 // Ensure drive does not busy wait
1886 base.drive(); // TODO: fix notification queue init() extra wakeup
1888 EXPECT_TRUE(finished);
1890 folly::Promise<bool> p2;
1891 auto f2 = p2.getFuture();
1892 // Ensure waitVia gets woken up properly, even from
1893 // a separate thread.
1894 base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1896 EXPECT_TRUE(f2.isReady());
1901 TEST(EventBaseTest, RequestContextTest) {
1903 auto defaultCtx = RequestContext::get();
1906 RequestContextScopeGuard rctx;
1907 auto context = RequestContext::get();
1908 EXPECT_NE(defaultCtx, context);
1909 evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1912 EXPECT_EQ(defaultCtx, RequestContext::get());
1914 EXPECT_EQ(defaultCtx, RequestContext::get());