2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/io/async/EventHandler.h>
22 #include <folly/io/async/test/SocketPair.h>
23 #include <folly/io/async/test/Util.h>
39 using std::chrono::milliseconds;
40 using std::chrono::microseconds;
41 using std::chrono::duration_cast;
43 using namespace folly;
45 ///////////////////////////////////////////////////////////////////////////
46 // Tests for read and write events
47 ///////////////////////////////////////////////////////////////////////////
49 enum { BUF_SIZE = 4096 };
51 ssize_t writeToFD(int fd, size_t length) {
52 // write an arbitrary amount of data to the fd
54 memset(buf, 'a', sizeof(buf));
55 ssize_t rc = write(fd, buf, sizeof(buf));
60 size_t writeUntilFull(int fd) {
61 // Write to the fd until EAGAIN is returned
62 size_t bytesWritten = 0;
64 memset(buf, 'a', sizeof(buf));
66 ssize_t rc = write(fd, buf, sizeof(buf));
68 CHECK_EQ(errno, EAGAIN);
77 ssize_t readFromFD(int fd, size_t length) {
78 // write an arbitrary amount of data to the fd
80 return read(fd, buf, sizeof(buf));
83 size_t readUntilEmpty(int fd) {
84 // Read from the fd until EAGAIN is returned
88 int rc = read(fd, buf, sizeof(buf));
90 CHECK(false) << "unexpected EOF";
92 CHECK_EQ(errno, EAGAIN);
101 void checkReadUntilEmpty(int fd, size_t expectedLength) {
102 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
105 struct ScheduledEvent {
111 void perform(int fd) {
112 if (events & EventHandler::READ) {
114 result = readUntilEmpty(fd);
116 result = readFromFD(fd, length);
119 if (events & EventHandler::WRITE) {
121 result = writeUntilFull(fd);
123 result = writeToFD(fd, length);
129 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
130 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
131 eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
136 class TestHandler : public EventHandler {
138 TestHandler(EventBase* eventBase, int fd)
139 : EventHandler(eventBase, fd), fd_(fd) {}
141 virtual void handlerReady(uint16_t events) noexcept {
142 ssize_t bytesRead = 0;
143 ssize_t bytesWritten = 0;
145 // Read all available data, so EventBase will stop calling us
146 // until new data becomes available
147 bytesRead = readUntilEmpty(fd_);
149 if (events & WRITE) {
150 // Write until the pipe buffer is full, so EventBase will stop calling
151 // us until the other end has read some data
152 bytesWritten = writeUntilFull(fd_);
155 log.push_back(EventRecord(events, bytesRead, bytesWritten));
159 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
162 , bytesRead(bytesRead)
163 , bytesWritten(bytesWritten) {}
168 ssize_t bytesWritten;
171 deque<EventRecord> log;
180 TEST(EventBaseTest, ReadEvent) {
184 // Register for read events
185 TestHandler handler(&eb, sp[0]);
186 handler.registerHandler(EventHandler::READ);
188 // Register timeouts to perform two write events
189 ScheduledEvent events[] = {
190 { 10, EventHandler::WRITE, 2345 },
191 { 160, EventHandler::WRITE, 99 },
194 scheduleEvents(&eb, sp[1], events);
201 // Since we didn't use the EventHandler::PERSIST flag, the handler should
202 // have received the first read, then unregistered itself. Check that only
203 // the first chunk of data was received.
204 ASSERT_EQ(handler.log.size(), 1);
205 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
206 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
207 milliseconds(events[0].milliseconds), milliseconds(90));
208 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
209 ASSERT_EQ(handler.log[0].bytesWritten, 0);
210 T_CHECK_TIMEOUT(start, end,
211 milliseconds(events[1].milliseconds), milliseconds(30));
213 // Make sure the second chunk of data is still waiting to be read.
214 size_t bytesRemaining = readUntilEmpty(sp[0]);
215 ASSERT_EQ(bytesRemaining, events[1].length);
219 * Test (READ | PERSIST)
221 TEST(EventBaseTest, ReadPersist) {
225 // Register for read events
226 TestHandler handler(&eb, sp[0]);
227 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
229 // Register several timeouts to perform writes
230 ScheduledEvent events[] = {
231 { 10, EventHandler::WRITE, 1024 },
232 { 20, EventHandler::WRITE, 2211 },
233 { 30, EventHandler::WRITE, 4096 },
234 { 100, EventHandler::WRITE, 100 },
237 scheduleEvents(&eb, sp[1], events);
239 // Schedule a timeout to unregister the handler after the third write
240 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
247 // The handler should have received the first 3 events,
248 // then been unregistered after that.
249 ASSERT_EQ(handler.log.size(), 3);
250 for (int n = 0; n < 3; ++n) {
251 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
252 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
253 milliseconds(events[n].milliseconds));
254 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
255 ASSERT_EQ(handler.log[n].bytesWritten, 0);
257 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
259 // Make sure the data from the last write is still waiting to be read
260 size_t bytesRemaining = readUntilEmpty(sp[0]);
261 ASSERT_EQ(bytesRemaining, events[3].length);
265 * Test registering for READ when the socket is immediately readable
267 TEST(EventBaseTest, ReadImmediate) {
271 // Write some data to the socket so the other end will
272 // be immediately readable
273 size_t dataLength = 1234;
274 writeToFD(sp[1], dataLength);
276 // Register for read events
277 TestHandler handler(&eb, sp[0]);
278 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
280 // Register a timeout to perform another write
281 ScheduledEvent events[] = {
282 { 10, EventHandler::WRITE, 2345 },
285 scheduleEvents(&eb, sp[1], events);
287 // Schedule a timeout to unregister the handler
288 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
295 ASSERT_EQ(handler.log.size(), 2);
297 // There should have been 1 event for immediate readability
298 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
299 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
300 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
301 ASSERT_EQ(handler.log[0].bytesWritten, 0);
303 // There should be another event after the timeout wrote more data
304 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
305 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
306 milliseconds(events[0].milliseconds));
307 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
308 ASSERT_EQ(handler.log[1].bytesWritten, 0);
310 T_CHECK_TIMEOUT(start, end, milliseconds(20));
316 TEST(EventBaseTest, WriteEvent) {
320 // Fill up the write buffer before starting
321 size_t initialBytesWritten = writeUntilFull(sp[0]);
323 // Register for write events
324 TestHandler handler(&eb, sp[0]);
325 handler.registerHandler(EventHandler::WRITE);
327 // Register timeouts to perform two reads
328 ScheduledEvent events[] = {
329 { 10, EventHandler::READ, 0 },
330 { 60, EventHandler::READ, 0 },
333 scheduleEvents(&eb, sp[1], events);
340 // Since we didn't use the EventHandler::PERSIST flag, the handler should
341 // have only been able to write once, then unregistered itself.
342 ASSERT_EQ(handler.log.size(), 1);
343 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
344 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
345 milliseconds(events[0].milliseconds));
346 ASSERT_EQ(handler.log[0].bytesRead, 0);
347 ASSERT_GT(handler.log[0].bytesWritten, 0);
348 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
350 ASSERT_EQ(events[0].result, initialBytesWritten);
351 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
355 * Test (WRITE | PERSIST)
357 TEST(EventBaseTest, WritePersist) {
361 // Fill up the write buffer before starting
362 size_t initialBytesWritten = writeUntilFull(sp[0]);
364 // Register for write events
365 TestHandler handler(&eb, sp[0]);
366 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
368 // Register several timeouts to read from the socket at several intervals
369 ScheduledEvent events[] = {
370 { 10, EventHandler::READ, 0 },
371 { 40, EventHandler::READ, 0 },
372 { 70, EventHandler::READ, 0 },
373 { 100, EventHandler::READ, 0 },
376 scheduleEvents(&eb, sp[1], events);
378 // Schedule a timeout to unregister the handler after the third read
379 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
386 // The handler should have received the first 3 events,
387 // then been unregistered after that.
388 ASSERT_EQ(handler.log.size(), 3);
389 ASSERT_EQ(events[0].result, initialBytesWritten);
390 for (int n = 0; n < 3; ++n) {
391 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
392 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
393 milliseconds(events[n].milliseconds));
394 ASSERT_EQ(handler.log[n].bytesRead, 0);
395 ASSERT_GT(handler.log[n].bytesWritten, 0);
396 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
398 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
402 * Test registering for WRITE when the socket is immediately writable
404 TEST(EventBaseTest, WriteImmediate) {
408 // Register for write events
409 TestHandler handler(&eb, sp[0]);
410 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
412 // Register a timeout to perform a read
413 ScheduledEvent events[] = {
414 { 10, EventHandler::READ, 0 },
417 scheduleEvents(&eb, sp[1], events);
419 // Schedule a timeout to unregister the handler
420 int64_t unregisterTimeout = 40;
421 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
429 ASSERT_EQ(handler.log.size(), 2);
431 // Since the socket buffer was initially empty,
432 // there should have been 1 event for immediate writability
433 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
434 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
435 ASSERT_EQ(handler.log[0].bytesRead, 0);
436 ASSERT_GT(handler.log[0].bytesWritten, 0);
438 // There should be another event after the timeout wrote more data
439 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
440 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
441 milliseconds(events[0].milliseconds));
442 ASSERT_EQ(handler.log[1].bytesRead, 0);
443 ASSERT_GT(handler.log[1].bytesWritten, 0);
445 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
449 * Test (READ | WRITE) when the socket becomes readable first
451 TEST(EventBaseTest, ReadWrite) {
455 // Fill up the write buffer before starting
456 size_t sock0WriteLength = writeUntilFull(sp[0]);
458 // Register for read and write events
459 TestHandler handler(&eb, sp[0]);
460 handler.registerHandler(EventHandler::READ_WRITE);
462 // Register timeouts to perform a write then a read.
463 ScheduledEvent events[] = {
464 { 10, EventHandler::WRITE, 2345 },
465 { 40, EventHandler::READ, 0 },
468 scheduleEvents(&eb, sp[1], events);
475 // Since we didn't use the EventHandler::PERSIST flag, the handler should
476 // have only noticed readability, then unregistered itself. Check that only
477 // one event was logged.
478 ASSERT_EQ(handler.log.size(), 1);
479 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
480 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
481 milliseconds(events[0].milliseconds));
482 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
483 ASSERT_EQ(handler.log[0].bytesWritten, 0);
484 ASSERT_EQ(events[1].result, sock0WriteLength);
485 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
489 * Test (READ | WRITE) when the socket becomes writable first
491 TEST(EventBaseTest, WriteRead) {
495 // Fill up the write buffer before starting
496 size_t sock0WriteLength = writeUntilFull(sp[0]);
498 // Register for read and write events
499 TestHandler handler(&eb, sp[0]);
500 handler.registerHandler(EventHandler::READ_WRITE);
502 // Register timeouts to perform a read then a write.
503 size_t sock1WriteLength = 2345;
504 ScheduledEvent events[] = {
505 { 10, EventHandler::READ, 0 },
506 { 40, EventHandler::WRITE, sock1WriteLength },
509 scheduleEvents(&eb, sp[1], events);
516 // Since we didn't use the EventHandler::PERSIST flag, the handler should
517 // have only noticed writability, then unregistered itself. Check that only
518 // one event was logged.
519 ASSERT_EQ(handler.log.size(), 1);
520 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
521 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
522 milliseconds(events[0].milliseconds));
523 ASSERT_EQ(handler.log[0].bytesRead, 0);
524 ASSERT_GT(handler.log[0].bytesWritten, 0);
525 ASSERT_EQ(events[0].result, sock0WriteLength);
526 ASSERT_EQ(events[1].result, sock1WriteLength);
527 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
529 // Make sure the written data is still waiting to be read.
530 size_t bytesRemaining = readUntilEmpty(sp[0]);
531 ASSERT_EQ(bytesRemaining, events[1].length);
535 * Test (READ | WRITE) when the socket becomes readable and writable
538 TEST(EventBaseTest, ReadWriteSimultaneous) {
542 // Fill up the write buffer before starting
543 size_t sock0WriteLength = writeUntilFull(sp[0]);
545 // Register for read and write events
546 TestHandler handler(&eb, sp[0]);
547 handler.registerHandler(EventHandler::READ_WRITE);
549 // Register a timeout to perform a read and write together
550 ScheduledEvent events[] = {
551 { 10, EventHandler::READ | EventHandler::WRITE, 0 },
554 scheduleEvents(&eb, sp[1], events);
561 // It's not strictly required that the EventBase register us about both
562 // events in the same call. So, it's possible that if the EventBase
563 // implementation changes this test could start failing, and it wouldn't be
564 // considered breaking the API. However for now it's nice to exercise this
566 ASSERT_EQ(handler.log.size(), 1);
567 ASSERT_EQ(handler.log[0].events,
568 EventHandler::READ | EventHandler::WRITE);
569 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
570 milliseconds(events[0].milliseconds));
571 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
572 ASSERT_GT(handler.log[0].bytesWritten, 0);
573 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
577 * Test (READ | WRITE | PERSIST)
579 TEST(EventBaseTest, ReadWritePersist) {
583 // Register for read and write events
584 TestHandler handler(&eb, sp[0]);
585 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
586 EventHandler::PERSIST);
588 // Register timeouts to perform several reads and writes
589 ScheduledEvent events[] = {
590 { 10, EventHandler::WRITE, 2345 },
591 { 20, EventHandler::READ, 0 },
592 { 35, EventHandler::WRITE, 200 },
593 { 45, EventHandler::WRITE, 15 },
594 { 55, EventHandler::READ, 0 },
595 { 120, EventHandler::WRITE, 2345 },
598 scheduleEvents(&eb, sp[1], events);
600 // Schedule a timeout to unregister the handler
601 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
608 ASSERT_EQ(handler.log.size(), 6);
610 // Since we didn't fill up the write buffer immediately, there should
611 // be an immediate event for writability.
612 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
613 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
614 ASSERT_EQ(handler.log[0].bytesRead, 0);
615 ASSERT_GT(handler.log[0].bytesWritten, 0);
617 // Events 1 through 5 should correspond to the scheduled events
618 for (int n = 1; n < 6; ++n) {
619 ScheduledEvent* event = &events[n - 1];
620 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
621 milliseconds(event->milliseconds));
622 if (event->events == EventHandler::READ) {
623 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
624 ASSERT_EQ(handler.log[n].bytesRead, 0);
625 ASSERT_GT(handler.log[n].bytesWritten, 0);
627 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
628 ASSERT_EQ(handler.log[n].bytesRead, event->length);
629 ASSERT_EQ(handler.log[n].bytesWritten, 0);
633 // The timeout should have unregistered the handler before the last write.
634 // Make sure that data is still waiting to be read
635 size_t bytesRemaining = readUntilEmpty(sp[0]);
636 ASSERT_EQ(bytesRemaining, events[5].length);
640 class PartialReadHandler : public TestHandler {
642 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
643 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
645 virtual void handlerReady(uint16_t events) noexcept {
646 assert(events == EventHandler::READ);
647 ssize_t bytesRead = readFromFD(fd_, readLength_);
648 log.push_back(EventRecord(events, bytesRead, 0));
657 * Test reading only part of the available data when a read event is fired.
658 * When PERSIST is used, make sure the handler gets notified again the next
659 * time around the loop.
661 TEST(EventBaseTest, ReadPartial) {
665 // Register for read events
666 size_t readLength = 100;
667 PartialReadHandler handler(&eb, sp[0], readLength);
668 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
670 // Register a timeout to perform a single write,
671 // with more data than PartialReadHandler will read at once
672 ScheduledEvent events[] = {
673 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
676 scheduleEvents(&eb, sp[1], events);
678 // Schedule a timeout to unregister the handler
679 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
686 ASSERT_EQ(handler.log.size(), 4);
688 // The first 3 invocations should read readLength bytes each
689 for (int n = 0; n < 3; ++n) {
690 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
691 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
692 milliseconds(events[0].milliseconds));
693 ASSERT_EQ(handler.log[n].bytesRead, readLength);
694 ASSERT_EQ(handler.log[n].bytesWritten, 0);
696 // The last read only has readLength/2 bytes
697 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
698 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
699 milliseconds(events[0].milliseconds));
700 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
701 ASSERT_EQ(handler.log[3].bytesWritten, 0);
705 class PartialWriteHandler : public TestHandler {
707 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
708 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
710 virtual void handlerReady(uint16_t events) noexcept {
711 assert(events == EventHandler::WRITE);
712 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
713 log.push_back(EventRecord(events, 0, bytesWritten));
722 * Test writing without completely filling up the write buffer when the fd
723 * becomes writable. When PERSIST is used, make sure the handler gets
724 * notified again the next time around the loop.
726 TEST(EventBaseTest, WritePartial) {
730 // Fill up the write buffer before starting
731 size_t initialBytesWritten = writeUntilFull(sp[0]);
733 // Register for write events
734 size_t writeLength = 100;
735 PartialWriteHandler handler(&eb, sp[0], writeLength);
736 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
738 // Register a timeout to read, so that more data can be written
739 ScheduledEvent events[] = {
740 { 10, EventHandler::READ, 0 },
743 scheduleEvents(&eb, sp[1], events);
745 // Schedule a timeout to unregister the handler
746 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
753 // Depending on how big the socket buffer is, there will be multiple writes
754 // Only check the first 5
756 ASSERT_GE(handler.log.size(), numChecked);
757 ASSERT_EQ(events[0].result, initialBytesWritten);
759 // The first 3 invocations should read writeLength bytes each
760 for (int n = 0; n < numChecked; ++n) {
761 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
762 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
763 milliseconds(events[0].milliseconds));
764 ASSERT_EQ(handler.log[n].bytesRead, 0);
765 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
771 * Test destroying a registered EventHandler
773 TEST(EventBaseTest, DestroyHandler) {
774 class DestroyHandler : public AsyncTimeout {
776 DestroyHandler(EventBase* eb, EventHandler* h)
780 virtual void timeoutExpired() noexcept {
785 EventHandler* handler_;
791 // Fill up the write buffer before starting
792 size_t initialBytesWritten = writeUntilFull(sp[0]);
794 // Register for write events
795 TestHandler* handler = new TestHandler(&eb, sp[0]);
796 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
798 // After 10ms, read some data, so that the handler
799 // will be notified that it can write.
800 eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
803 // Start a timer to destroy the handler after 25ms
804 // This mainly just makes sure the code doesn't break or assert
805 DestroyHandler dh(&eb, handler);
806 dh.scheduleTimeout(25);
812 // Make sure the EventHandler was uninstalled properly when it was
813 // destroyed, and the EventBase loop exited
814 T_CHECK_TIMEOUT(start, end, milliseconds(25));
816 // Make sure that the handler wrote data to the socket
817 // before it was destroyed
818 size_t bytesRemaining = readUntilEmpty(sp[1]);
819 ASSERT_GT(bytesRemaining, 0);
823 ///////////////////////////////////////////////////////////////////////////
824 // Tests for timeout events
825 ///////////////////////////////////////////////////////////////////////////
827 TEST(EventBaseTest, RunAfterDelay) {
830 TimePoint timestamp1(false);
831 TimePoint timestamp2(false);
832 TimePoint timestamp3(false);
833 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
834 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
835 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
841 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
842 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
843 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
844 T_CHECK_TIMEOUT(start, end, milliseconds(40));
848 * Test the behavior of runAfterDelay() when some timeouts are
849 * still scheduled when the EventBase is destroyed.
851 TEST(EventBaseTest, RunAfterDelayDestruction) {
852 TimePoint timestamp1(false);
853 TimePoint timestamp2(false);
854 TimePoint timestamp3(false);
855 TimePoint timestamp4(false);
856 TimePoint start(false);
857 TimePoint end(false);
862 // Run two normal timeouts
863 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
864 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
866 // Schedule a timeout to stop the event loop after 40ms
867 eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
869 // Schedule 2 timeouts that would fire after the event loop stops
870 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
871 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
878 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
879 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
880 T_CHECK_TIMEOUT(start, end, milliseconds(40));
882 ASSERT_TRUE(timestamp3.isUnset());
883 ASSERT_TRUE(timestamp4.isUnset());
885 // Ideally this test should be run under valgrind to ensure that no
889 class TestTimeout : public AsyncTimeout {
891 explicit TestTimeout(EventBase* eventBase)
892 : AsyncTimeout(eventBase)
893 , timestamp(false) {}
895 virtual void timeoutExpired() noexcept {
902 TEST(EventBaseTest, BasicTimeouts) {
908 t1.scheduleTimeout(10);
909 t2.scheduleTimeout(20);
910 t3.scheduleTimeout(40);
916 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
917 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
918 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
919 T_CHECK_TIMEOUT(start, end, milliseconds(40));
922 class ReschedulingTimeout : public AsyncTimeout {
924 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
926 , timeouts_(timeouts)
927 , iterator_(timeouts_.begin()) {}
933 virtual void timeoutExpired() noexcept {
934 timestamps.push_back(TimePoint());
939 if (iterator_ != timeouts_.end()) {
940 uint32_t timeout = *iterator_;
942 scheduleTimeout(timeout);
946 vector<TimePoint> timestamps;
949 vector<uint32_t> timeouts_;
950 vector<uint32_t>::const_iterator iterator_;
954 * Test rescheduling the same timeout multiple times
956 TEST(EventBaseTest, ReuseTimeout) {
959 vector<uint32_t> timeouts;
960 timeouts.push_back(10);
961 timeouts.push_back(30);
962 timeouts.push_back(15);
964 ReschedulingTimeout t(&eb, timeouts);
971 // Use a higher tolerance than usual. We're waiting on 3 timeouts
972 // consecutively. In general, each timeout may go over by a few
973 // milliseconds, and we're tripling this error by witing on 3 timeouts.
974 milliseconds tolerance{6};
976 ASSERT_EQ(timeouts.size(), t.timestamps.size());
978 for (size_t n = 0; n < timeouts.size(); ++n) {
979 total += timeouts[n];
980 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
982 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
986 * Test rescheduling a timeout before it has fired
988 TEST(EventBaseTest, RescheduleTimeout) {
995 t1.scheduleTimeout(15);
996 t2.scheduleTimeout(30);
997 t3.scheduleTimeout(30);
999 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1000 &AsyncTimeout::scheduleTimeout);
1002 // after 10ms, reschedule t2 to run sooner than originally scheduled
1003 eb.runAfterDelay(std::bind(f, &t2, 10), 10);
1004 // after 10ms, reschedule t3 to run later than originally scheduled
1005 eb.runAfterDelay(std::bind(f, &t3, 40), 10);
1011 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1012 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1013 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1014 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1018 * Test cancelling a timeout
1020 TEST(EventBaseTest, CancelTimeout) {
1023 vector<uint32_t> timeouts;
1024 timeouts.push_back(10);
1025 timeouts.push_back(30);
1026 timeouts.push_back(25);
1028 ReschedulingTimeout t(&eb, timeouts);
1030 eb.runAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1036 ASSERT_EQ(t.timestamps.size(), 2);
1037 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1038 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1039 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1043 * Test destroying a scheduled timeout object
1045 TEST(EventBaseTest, DestroyTimeout) {
1046 class DestroyTimeout : public AsyncTimeout {
1048 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1052 virtual void timeoutExpired() noexcept {
1057 AsyncTimeout* timeout_;
1062 TestTimeout* t1 = new TestTimeout(&eb);
1063 t1->scheduleTimeout(30);
1065 DestroyTimeout dt(&eb, t1);
1066 dt.scheduleTimeout(10);
1072 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1076 ///////////////////////////////////////////////////////////////////////////
1077 // Test for runInThreadTestFunc()
1078 ///////////////////////////////////////////////////////////////////////////
1080 struct RunInThreadData {
1081 RunInThreadData(int numThreads, int opsPerThread)
1082 : opsPerThread(opsPerThread)
1083 , opsToGo(numThreads*opsPerThread) {}
1086 deque< pair<int, int> > values;
1092 struct RunInThreadArg {
1093 RunInThreadArg(RunInThreadData* data,
1100 RunInThreadData* data;
1105 void runInThreadTestFunc(RunInThreadArg* arg) {
1106 arg->data->values.push_back(make_pair(arg->thread, arg->value));
1107 RunInThreadData* data = arg->data;
1110 if(--data->opsToGo == 0) {
1111 // Break out of the event base loop if we are the last thread running
1112 data->evb.terminateLoopSoon();
1116 TEST(EventBaseTest, RunInThread) {
1117 uint32_t numThreads = 50;
1118 uint32_t opsPerThread = 100;
1119 RunInThreadData data(numThreads, opsPerThread);
1121 deque<std::thread> threads;
1122 for (uint32_t i = 0; i < numThreads; ++i) {
1123 threads.emplace_back([i, &data] {
1124 for (int n = 0; n < data.opsPerThread; ++n) {
1125 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1126 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1132 // Add a timeout event to run after 3 seconds.
1133 // Otherwise loop() will return immediately since there are no events to run.
1134 // Once the last thread exits, it will stop the loop(). However, this
1135 // timeout also stops the loop in case there is a bug performing the normal
1137 data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1144 // Verify that the loop exited because all threads finished and requested it
1145 // to stop. This should happen much sooner than the 3 second timeout.
1146 // Assert that it happens in under a second. (This is still tons of extra
1149 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1150 end.getTime() - start.getTime());
1151 ASSERT_LT(timeTaken.count(), 1000);
1152 VLOG(11) << "Time taken: " << timeTaken.count();
1154 // Verify that we have all of the events from every thread
1155 int expectedValues[numThreads];
1156 for (uint32_t n = 0; n < numThreads; ++n) {
1157 expectedValues[n] = 0;
1159 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1160 it != data.values.end();
1162 int threadID = it->first;
1163 int value = it->second;
1164 ASSERT_EQ(expectedValues[threadID], value);
1165 ++expectedValues[threadID];
1167 for (uint32_t n = 0; n < numThreads; ++n) {
1168 ASSERT_EQ(expectedValues[n], opsPerThread);
1171 // Wait on all of the threads.
1172 for (auto& thread: threads) {
1177 // This test simulates some calls, and verifies that the waiting happens by
1178 // triggering what otherwise would be race conditions, and trying to detect
1179 // whether any of the race conditions happened.
1180 TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
1181 const size_t c = 256;
1182 vector<atomic<size_t>> atoms(c);
1183 for (size_t i = 0; i < c; ++i) {
1184 auto& atom = atoms.at(i);
1187 vector<thread> threads(c);
1188 for (size_t i = 0; i < c; ++i) {
1189 auto& atom = atoms.at(i);
1190 auto& th = threads.at(i);
1191 th = thread([&atom] {
1193 auto ebth = thread([&]{ eb.loopForever(); });
1194 eb.waitUntilRunning();
1195 eb.runInEventBaseThreadAndWait([&] {
1197 atom.compare_exchange_weak(
1198 x, 1, std::memory_order_release, std::memory_order_relaxed);
1201 atom.compare_exchange_weak(
1202 x, 2, std::memory_order_release, std::memory_order_relaxed);
1203 eb.terminateLoopSoon();
1207 for (size_t i = 0; i < c; ++i) {
1208 auto& th = threads.at(i);
1212 for (auto& atom : atoms) sum += atom;
1216 ///////////////////////////////////////////////////////////////////////////
1217 // Tests for runInLoop()
1218 ///////////////////////////////////////////////////////////////////////////
1220 class CountedLoopCallback : public EventBase::LoopCallback {
1222 CountedLoopCallback(EventBase* eventBase,
1224 std::function<void()> action =
1225 std::function<void()>())
1226 : eventBase_(eventBase)
1228 , action_(action) {}
1230 virtual void runLoopCallback() noexcept {
1233 eventBase_->runInLoop(this);
1234 } else if (action_) {
1239 unsigned int getCount() const {
1244 EventBase* eventBase_;
1245 unsigned int count_;
1246 std::function<void()> action_;
1249 // Test that EventBase::loop() doesn't exit while there are
1250 // still LoopCallbacks remaining to be invoked.
1251 TEST(EventBaseTest, RepeatedRunInLoop) {
1252 EventBase eventBase;
1254 CountedLoopCallback c(&eventBase, 10);
1255 eventBase.runInLoop(&c);
1256 // The callback shouldn't have run immediately
1257 ASSERT_EQ(c.getCount(), 10);
1260 // loop() should loop until the CountedLoopCallback stops
1261 // re-installing itself.
1262 ASSERT_EQ(c.getCount(), 0);
1265 // Test runInLoop() calls with terminateLoopSoon()
1266 TEST(EventBaseTest, RunInLoopStopLoop) {
1267 EventBase eventBase;
1269 CountedLoopCallback c1(&eventBase, 20);
1270 CountedLoopCallback c2(&eventBase, 10,
1271 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1273 eventBase.runInLoop(&c1);
1274 eventBase.runInLoop(&c2);
1275 ASSERT_EQ(c1.getCount(), 20);
1276 ASSERT_EQ(c2.getCount(), 10);
1278 eventBase.loopForever();
1280 // c2 should have stopped the loop after 10 iterations
1281 ASSERT_EQ(c2.getCount(), 0);
1283 // We allow the EventBase to run the loop callbacks in whatever order it
1284 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1285 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1288 // (With the current code, c1 will always run 10 times, but we don't consider
1289 // this a hard API requirement.)
1290 ASSERT_GE(c1.getCount(), 10);
1291 ASSERT_LE(c1.getCount(), 11);
1294 // Test cancelling runInLoop() callbacks
1295 TEST(EventBaseTest, CancelRunInLoop) {
1296 EventBase eventBase;
1298 CountedLoopCallback c1(&eventBase, 20);
1299 CountedLoopCallback c2(&eventBase, 20);
1300 CountedLoopCallback c3(&eventBase, 20);
1302 std::function<void()> cancelC1Action =
1303 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1304 std::function<void()> cancelC2Action =
1305 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1307 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1308 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1310 // Install cancelC1 after c1
1311 eventBase.runInLoop(&c1);
1312 eventBase.runInLoop(&cancelC1);
1314 // Install cancelC2 before c2
1315 eventBase.runInLoop(&cancelC2);
1316 eventBase.runInLoop(&c2);
1319 eventBase.runInLoop(&c3);
1321 ASSERT_EQ(c1.getCount(), 20);
1322 ASSERT_EQ(c2.getCount(), 20);
1323 ASSERT_EQ(c3.getCount(), 20);
1324 ASSERT_EQ(cancelC1.getCount(), 10);
1325 ASSERT_EQ(cancelC2.getCount(), 10);
1330 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1331 // stopped re-installing themselves
1332 ASSERT_EQ(cancelC1.getCount(), 0);
1333 ASSERT_EQ(cancelC2.getCount(), 0);
1334 // c3 should have continued on for the full 20 iterations
1335 ASSERT_EQ(c3.getCount(), 0);
1337 // c1 and c2 should have both been cancelled on the 10th iteration.
1339 // Callbacks are always run in the order they are installed,
1340 // so c1 should have fired 10 times, and been canceled after it ran on the
1341 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1342 // have run before it on the 10th iteration, and cancelled it before it
1344 ASSERT_EQ(c1.getCount(), 10);
1345 ASSERT_EQ(c2.getCount(), 11);
1348 class TerminateTestCallback : public EventBase::LoopCallback,
1349 public EventHandler {
1351 TerminateTestCallback(EventBase* eventBase, int fd)
1352 : EventHandler(eventBase, fd),
1353 eventBase_(eventBase),
1354 loopInvocations_(0),
1355 maxLoopInvocations_(0),
1356 eventInvocations_(0),
1357 maxEventInvocations_(0) {}
1359 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1360 loopInvocations_ = 0;
1361 maxLoopInvocations_ = maxLoopInvocations;
1362 eventInvocations_ = 0;
1363 maxEventInvocations_ = maxEventInvocations;
1365 cancelLoopCallback();
1366 unregisterHandler();
1369 virtual void handlerReady(uint16_t events) noexcept {
1370 // We didn't register with PERSIST, so we will have been automatically
1371 // unregistered already.
1372 ASSERT_FALSE(isHandlerRegistered());
1374 ++eventInvocations_;
1375 if (eventInvocations_ >= maxEventInvocations_) {
1379 eventBase_->runInLoop(this);
1381 virtual void runLoopCallback() noexcept {
1383 if (loopInvocations_ >= maxLoopInvocations_) {
1387 registerHandler(READ);
1390 uint32_t getLoopInvocations() const {
1391 return loopInvocations_;
1393 uint32_t getEventInvocations() const {
1394 return eventInvocations_;
1398 EventBase* eventBase_;
1399 uint32_t loopInvocations_;
1400 uint32_t maxLoopInvocations_;
1401 uint32_t eventInvocations_;
1402 uint32_t maxEventInvocations_;
1406 * Test that EventBase::loop() correctly detects when there are no more events
1409 * This uses a single callback, which alternates registering itself as a loop
1410 * callback versus a EventHandler callback. This exercises a regression where
1411 * EventBase::loop() incorrectly exited if there were no more fd handlers
1412 * registered, but a loop callback installed a new fd handler.
1414 TEST(EventBaseTest, LoopTermination) {
1415 EventBase eventBase;
1417 // Open a pipe and close the write end,
1418 // so the read endpoint will be readable
1420 int rc = pipe(pipeFds);
1423 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1425 // Test once where the callback will exit after a loop callback
1426 callback.reset(10, 100);
1427 eventBase.runInLoop(&callback);
1429 ASSERT_EQ(callback.getLoopInvocations(), 10);
1430 ASSERT_EQ(callback.getEventInvocations(), 9);
1432 // Test once where the callback will exit after an fd event callback
1433 callback.reset(100, 7);
1434 eventBase.runInLoop(&callback);
1436 ASSERT_EQ(callback.getLoopInvocations(), 7);
1437 ASSERT_EQ(callback.getEventInvocations(), 7);
1442 ///////////////////////////////////////////////////////////////////////////
1443 // Tests for latency calculations
1444 ///////////////////////////////////////////////////////////////////////////
1446 class IdleTimeTimeoutSeries : public AsyncTimeout {
1450 explicit IdleTimeTimeoutSeries(EventBase *base,
1451 std::deque<std::uint64_t>& timeout) :
1458 virtual ~IdleTimeTimeoutSeries() {}
1460 void timeoutExpired() noexcept {
1463 if(timeout_.empty()){
1466 uint64_t sleepTime = timeout_.front();
1467 timeout_.pop_front();
1475 int getTimeouts() const {
1481 std::deque<uint64_t>& timeout_;
1485 * Verify that idle time is correctly accounted for when decaying our loop
1488 * This works by creating a high loop time (via usleep), expecting a latency
1489 * callback with known value, and then scheduling a timeout for later. This
1490 * later timeout is far enough in the future that the idle time should have
1491 * caused the loop time to decay.
1493 TEST(EventBaseTest, IdleTime) {
1494 EventBase eventBase;
1495 eventBase.setLoadAvgMsec(1000);
1496 eventBase.resetLoadAvg(5900.0);
1497 std::deque<uint64_t> timeouts0(4, 8080);
1498 timeouts0.push_front(8000);
1499 timeouts0.push_back(14000);
1500 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1501 std::deque<uint64_t> timeouts(20, 20);
1502 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1503 int64_t testStart = duration_cast<microseconds>(
1504 std::chrono::steady_clock::now().time_since_epoch()).count();
1505 bool hostOverloaded = false;
1507 int latencyCallbacks = 0;
1508 eventBase.setMaxLatency(6000, [&]() {
1511 switch (latencyCallbacks) {
1513 if (tos0.getTimeouts() < 6) {
1514 // This could only happen if the host this test is running
1515 // on is heavily loaded.
1516 int64_t maxLatencyReached = duration_cast<microseconds>(
1517 std::chrono::steady_clock::now().time_since_epoch()).count();
1518 ASSERT_LE(43800, maxLatencyReached - testStart);
1519 hostOverloaded = true;
1522 ASSERT_EQ(6, tos0.getTimeouts());
1523 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1524 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1525 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1529 FAIL() << "Unexpected latency callback";
1534 // Kick things off with an "immedite" timeout
1535 tos0.scheduleTimeout(1);
1539 if (hostOverloaded) {
1543 ASSERT_EQ(1, latencyCallbacks);
1544 ASSERT_EQ(7, tos0.getTimeouts());
1545 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1546 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1548 ASSERT_EQ(21, tos->getTimeouts());
1552 * Test that thisLoop functionality works with terminateLoopSoon
1554 TEST(EventBaseTest, ThisLoop) {
1556 bool runInLoop = false;
1557 bool runThisLoop = false;
1560 eb.terminateLoopSoon();
1561 eb.runInLoop([&]() {
1564 eb.runInLoop([&]() {
1571 ASSERT_FALSE(runInLoop);
1572 // Should work with thisLoop
1573 ASSERT_TRUE(runThisLoop);
1576 TEST(EventBaseTest, EventBaseThreadLoop) {
1580 base.runInEventBaseThread([&](){
1585 ASSERT_EQ(true, ran);
1588 TEST(EventBaseTest, EventBaseThreadName) {
1590 base.setName("foo");
1593 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1595 pthread_getname_np(pthread_self(), name, 16);
1596 ASSERT_EQ(0, strcmp("foo", name));
1600 TEST(EventBaseTest, RunBeforeLoop) {
1602 CountedLoopCallback cb(&base, 1, [&](){
1603 base.terminateLoopSoon();
1605 base.runBeforeLoop(&cb);
1607 ASSERT_EQ(cb.getCount(), 0);
1610 TEST(EventBaseTest, RunBeforeLoopWait) {
1612 CountedLoopCallback cb(&base, 1);
1613 base.runAfterDelay([&](){
1614 base.terminateLoopSoon();
1616 base.runBeforeLoop(&cb);
1619 // Check that we only ran once, and did not loop multiple times.
1620 ASSERT_EQ(cb.getCount(), 0);
1623 class PipeHandler : public EventHandler {
1625 PipeHandler(EventBase* eventBase, int fd)
1626 : EventHandler(eventBase, fd) {}
1628 void handlerReady(uint16_t events) noexcept {
1633 TEST(EventBaseTest, StopBeforeLoop) {
1636 // Give the evb something to do.
1638 ASSERT_EQ(0, pipe(p));
1639 PipeHandler handler(&evb, p[0]);
1640 handler.registerHandler(EventHandler::READ);
1642 // It's definitely not running yet
1643 evb.terminateLoopSoon();
1645 // let it run, it should exit quickly.
1646 std::thread t([&] { evb.loop(); });
1649 handler.unregisterHandler();
1656 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1661 base.runInEventBaseThread([&](){