2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 #include <folly/Memory.h>
21 #include <folly/io/async/AsyncTimeout.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/test/SocketPair.h>
25 #include <folly/io/async/test/Util.h>
37 using std::unique_ptr;
42 using std::chrono::milliseconds;
43 using std::chrono::microseconds;
44 using std::chrono::duration_cast;
46 using namespace folly;
48 ///////////////////////////////////////////////////////////////////////////
49 // Tests for read and write events
50 ///////////////////////////////////////////////////////////////////////////
52 enum { BUF_SIZE = 4096 };
54 ssize_t writeToFD(int fd, size_t length) {
55 // write an arbitrary amount of data to the fd
57 memset(buf, 'a', sizeof(buf));
58 ssize_t rc = write(fd, buf, sizeof(buf));
63 size_t writeUntilFull(int fd) {
64 // Write to the fd until EAGAIN is returned
65 size_t bytesWritten = 0;
67 memset(buf, 'a', sizeof(buf));
69 ssize_t rc = write(fd, buf, sizeof(buf));
71 CHECK_EQ(errno, EAGAIN);
80 ssize_t readFromFD(int fd, size_t length) {
81 // write an arbitrary amount of data to the fd
83 return read(fd, buf, sizeof(buf));
86 size_t readUntilEmpty(int fd) {
87 // Read from the fd until EAGAIN is returned
91 int rc = read(fd, buf, sizeof(buf));
93 CHECK(false) << "unexpected EOF";
95 CHECK_EQ(errno, EAGAIN);
104 void checkReadUntilEmpty(int fd, size_t expectedLength) {
105 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
108 struct ScheduledEvent {
114 void perform(int fd) {
115 if (events & EventHandler::READ) {
117 result = readUntilEmpty(fd);
119 result = readFromFD(fd, length);
122 if (events & EventHandler::WRITE) {
124 result = writeUntilFull(fd);
126 result = writeToFD(fd, length);
132 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
133 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
134 eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
139 class TestHandler : public EventHandler {
141 TestHandler(EventBase* eventBase, int fd)
142 : EventHandler(eventBase, fd), fd_(fd) {}
144 virtual void handlerReady(uint16_t events) noexcept {
145 ssize_t bytesRead = 0;
146 ssize_t bytesWritten = 0;
148 // Read all available data, so EventBase will stop calling us
149 // until new data becomes available
150 bytesRead = readUntilEmpty(fd_);
152 if (events & WRITE) {
153 // Write until the pipe buffer is full, so EventBase will stop calling
154 // us until the other end has read some data
155 bytesWritten = writeUntilFull(fd_);
158 log.push_back(EventRecord(events, bytesRead, bytesWritten));
162 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
165 , bytesRead(bytesRead)
166 , bytesWritten(bytesWritten) {}
171 ssize_t bytesWritten;
174 deque<EventRecord> log;
183 TEST(EventBaseTest, ReadEvent) {
187 // Register for read events
188 TestHandler handler(&eb, sp[0]);
189 handler.registerHandler(EventHandler::READ);
191 // Register timeouts to perform two write events
192 ScheduledEvent events[] = {
193 { 10, EventHandler::WRITE, 2345 },
194 { 160, EventHandler::WRITE, 99 },
197 scheduleEvents(&eb, sp[1], events);
204 // Since we didn't use the EventHandler::PERSIST flag, the handler should
205 // have received the first read, then unregistered itself. Check that only
206 // the first chunk of data was received.
207 ASSERT_EQ(handler.log.size(), 1);
208 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
209 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
210 milliseconds(events[0].milliseconds), milliseconds(90));
211 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
212 ASSERT_EQ(handler.log[0].bytesWritten, 0);
213 T_CHECK_TIMEOUT(start, end,
214 milliseconds(events[1].milliseconds), milliseconds(30));
216 // Make sure the second chunk of data is still waiting to be read.
217 size_t bytesRemaining = readUntilEmpty(sp[0]);
218 ASSERT_EQ(bytesRemaining, events[1].length);
222 * Test (READ | PERSIST)
224 TEST(EventBaseTest, ReadPersist) {
228 // Register for read events
229 TestHandler handler(&eb, sp[0]);
230 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
232 // Register several timeouts to perform writes
233 ScheduledEvent events[] = {
234 { 10, EventHandler::WRITE, 1024 },
235 { 20, EventHandler::WRITE, 2211 },
236 { 30, EventHandler::WRITE, 4096 },
237 { 100, EventHandler::WRITE, 100 },
240 scheduleEvents(&eb, sp[1], events);
242 // Schedule a timeout to unregister the handler after the third write
243 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
250 // The handler should have received the first 3 events,
251 // then been unregistered after that.
252 ASSERT_EQ(handler.log.size(), 3);
253 for (int n = 0; n < 3; ++n) {
254 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
255 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
256 milliseconds(events[n].milliseconds));
257 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
258 ASSERT_EQ(handler.log[n].bytesWritten, 0);
260 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
262 // Make sure the data from the last write is still waiting to be read
263 size_t bytesRemaining = readUntilEmpty(sp[0]);
264 ASSERT_EQ(bytesRemaining, events[3].length);
268 * Test registering for READ when the socket is immediately readable
270 TEST(EventBaseTest, ReadImmediate) {
274 // Write some data to the socket so the other end will
275 // be immediately readable
276 size_t dataLength = 1234;
277 writeToFD(sp[1], dataLength);
279 // Register for read events
280 TestHandler handler(&eb, sp[0]);
281 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
283 // Register a timeout to perform another write
284 ScheduledEvent events[] = {
285 { 10, EventHandler::WRITE, 2345 },
288 scheduleEvents(&eb, sp[1], events);
290 // Schedule a timeout to unregister the handler
291 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
298 ASSERT_EQ(handler.log.size(), 2);
300 // There should have been 1 event for immediate readability
301 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
302 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
303 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
304 ASSERT_EQ(handler.log[0].bytesWritten, 0);
306 // There should be another event after the timeout wrote more data
307 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
308 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
309 milliseconds(events[0].milliseconds));
310 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
311 ASSERT_EQ(handler.log[1].bytesWritten, 0);
313 T_CHECK_TIMEOUT(start, end, milliseconds(20));
319 TEST(EventBaseTest, WriteEvent) {
323 // Fill up the write buffer before starting
324 size_t initialBytesWritten = writeUntilFull(sp[0]);
326 // Register for write events
327 TestHandler handler(&eb, sp[0]);
328 handler.registerHandler(EventHandler::WRITE);
330 // Register timeouts to perform two reads
331 ScheduledEvent events[] = {
332 { 10, EventHandler::READ, 0 },
333 { 60, EventHandler::READ, 0 },
336 scheduleEvents(&eb, sp[1], events);
343 // Since we didn't use the EventHandler::PERSIST flag, the handler should
344 // have only been able to write once, then unregistered itself.
345 ASSERT_EQ(handler.log.size(), 1);
346 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
347 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
348 milliseconds(events[0].milliseconds));
349 ASSERT_EQ(handler.log[0].bytesRead, 0);
350 ASSERT_GT(handler.log[0].bytesWritten, 0);
351 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
353 ASSERT_EQ(events[0].result, initialBytesWritten);
354 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
358 * Test (WRITE | PERSIST)
360 TEST(EventBaseTest, WritePersist) {
364 // Fill up the write buffer before starting
365 size_t initialBytesWritten = writeUntilFull(sp[0]);
367 // Register for write events
368 TestHandler handler(&eb, sp[0]);
369 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
371 // Register several timeouts to read from the socket at several intervals
372 ScheduledEvent events[] = {
373 { 10, EventHandler::READ, 0 },
374 { 40, EventHandler::READ, 0 },
375 { 70, EventHandler::READ, 0 },
376 { 100, EventHandler::READ, 0 },
379 scheduleEvents(&eb, sp[1], events);
381 // Schedule a timeout to unregister the handler after the third read
382 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
389 // The handler should have received the first 3 events,
390 // then been unregistered after that.
391 ASSERT_EQ(handler.log.size(), 3);
392 ASSERT_EQ(events[0].result, initialBytesWritten);
393 for (int n = 0; n < 3; ++n) {
394 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
395 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
396 milliseconds(events[n].milliseconds));
397 ASSERT_EQ(handler.log[n].bytesRead, 0);
398 ASSERT_GT(handler.log[n].bytesWritten, 0);
399 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
401 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
405 * Test registering for WRITE when the socket is immediately writable
407 TEST(EventBaseTest, WriteImmediate) {
411 // Register for write events
412 TestHandler handler(&eb, sp[0]);
413 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
415 // Register a timeout to perform a read
416 ScheduledEvent events[] = {
417 { 10, EventHandler::READ, 0 },
420 scheduleEvents(&eb, sp[1], events);
422 // Schedule a timeout to unregister the handler
423 int64_t unregisterTimeout = 40;
424 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
432 ASSERT_EQ(handler.log.size(), 2);
434 // Since the socket buffer was initially empty,
435 // there should have been 1 event for immediate writability
436 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
437 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
438 ASSERT_EQ(handler.log[0].bytesRead, 0);
439 ASSERT_GT(handler.log[0].bytesWritten, 0);
441 // There should be another event after the timeout wrote more data
442 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
443 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
444 milliseconds(events[0].milliseconds));
445 ASSERT_EQ(handler.log[1].bytesRead, 0);
446 ASSERT_GT(handler.log[1].bytesWritten, 0);
448 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
452 * Test (READ | WRITE) when the socket becomes readable first
454 TEST(EventBaseTest, ReadWrite) {
458 // Fill up the write buffer before starting
459 size_t sock0WriteLength = writeUntilFull(sp[0]);
461 // Register for read and write events
462 TestHandler handler(&eb, sp[0]);
463 handler.registerHandler(EventHandler::READ_WRITE);
465 // Register timeouts to perform a write then a read.
466 ScheduledEvent events[] = {
467 { 10, EventHandler::WRITE, 2345 },
468 { 40, EventHandler::READ, 0 },
471 scheduleEvents(&eb, sp[1], events);
478 // Since we didn't use the EventHandler::PERSIST flag, the handler should
479 // have only noticed readability, then unregistered itself. Check that only
480 // one event was logged.
481 ASSERT_EQ(handler.log.size(), 1);
482 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
483 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
484 milliseconds(events[0].milliseconds));
485 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
486 ASSERT_EQ(handler.log[0].bytesWritten, 0);
487 ASSERT_EQ(events[1].result, sock0WriteLength);
488 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
492 * Test (READ | WRITE) when the socket becomes writable first
494 TEST(EventBaseTest, WriteRead) {
498 // Fill up the write buffer before starting
499 size_t sock0WriteLength = writeUntilFull(sp[0]);
501 // Register for read and write events
502 TestHandler handler(&eb, sp[0]);
503 handler.registerHandler(EventHandler::READ_WRITE);
505 // Register timeouts to perform a read then a write.
506 size_t sock1WriteLength = 2345;
507 ScheduledEvent events[] = {
508 { 10, EventHandler::READ, 0 },
509 { 40, EventHandler::WRITE, sock1WriteLength },
512 scheduleEvents(&eb, sp[1], events);
519 // Since we didn't use the EventHandler::PERSIST flag, the handler should
520 // have only noticed writability, then unregistered itself. Check that only
521 // one event was logged.
522 ASSERT_EQ(handler.log.size(), 1);
523 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
524 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
525 milliseconds(events[0].milliseconds));
526 ASSERT_EQ(handler.log[0].bytesRead, 0);
527 ASSERT_GT(handler.log[0].bytesWritten, 0);
528 ASSERT_EQ(events[0].result, sock0WriteLength);
529 ASSERT_EQ(events[1].result, sock1WriteLength);
530 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
532 // Make sure the written data is still waiting to be read.
533 size_t bytesRemaining = readUntilEmpty(sp[0]);
534 ASSERT_EQ(bytesRemaining, events[1].length);
538 * Test (READ | WRITE) when the socket becomes readable and writable
541 TEST(EventBaseTest, ReadWriteSimultaneous) {
545 // Fill up the write buffer before starting
546 size_t sock0WriteLength = writeUntilFull(sp[0]);
548 // Register for read and write events
549 TestHandler handler(&eb, sp[0]);
550 handler.registerHandler(EventHandler::READ_WRITE);
552 // Register a timeout to perform a read and write together
553 ScheduledEvent events[] = {
554 { 10, EventHandler::READ | EventHandler::WRITE, 0 },
557 scheduleEvents(&eb, sp[1], events);
564 // It's not strictly required that the EventBase register us about both
565 // events in the same call. So, it's possible that if the EventBase
566 // implementation changes this test could start failing, and it wouldn't be
567 // considered breaking the API. However for now it's nice to exercise this
569 ASSERT_EQ(handler.log.size(), 1);
570 ASSERT_EQ(handler.log[0].events,
571 EventHandler::READ | EventHandler::WRITE);
572 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
573 milliseconds(events[0].milliseconds));
574 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
575 ASSERT_GT(handler.log[0].bytesWritten, 0);
576 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
580 * Test (READ | WRITE | PERSIST)
582 TEST(EventBaseTest, ReadWritePersist) {
586 // Register for read and write events
587 TestHandler handler(&eb, sp[0]);
588 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
589 EventHandler::PERSIST);
591 // Register timeouts to perform several reads and writes
592 ScheduledEvent events[] = {
593 { 10, EventHandler::WRITE, 2345 },
594 { 20, EventHandler::READ, 0 },
595 { 35, EventHandler::WRITE, 200 },
596 { 45, EventHandler::WRITE, 15 },
597 { 55, EventHandler::READ, 0 },
598 { 120, EventHandler::WRITE, 2345 },
601 scheduleEvents(&eb, sp[1], events);
603 // Schedule a timeout to unregister the handler
604 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
611 ASSERT_EQ(handler.log.size(), 6);
613 // Since we didn't fill up the write buffer immediately, there should
614 // be an immediate event for writability.
615 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
616 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
617 ASSERT_EQ(handler.log[0].bytesRead, 0);
618 ASSERT_GT(handler.log[0].bytesWritten, 0);
620 // Events 1 through 5 should correspond to the scheduled events
621 for (int n = 1; n < 6; ++n) {
622 ScheduledEvent* event = &events[n - 1];
623 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
624 milliseconds(event->milliseconds));
625 if (event->events == EventHandler::READ) {
626 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
627 ASSERT_EQ(handler.log[n].bytesRead, 0);
628 ASSERT_GT(handler.log[n].bytesWritten, 0);
630 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
631 ASSERT_EQ(handler.log[n].bytesRead, event->length);
632 ASSERT_EQ(handler.log[n].bytesWritten, 0);
636 // The timeout should have unregistered the handler before the last write.
637 // Make sure that data is still waiting to be read
638 size_t bytesRemaining = readUntilEmpty(sp[0]);
639 ASSERT_EQ(bytesRemaining, events[5].length);
643 class PartialReadHandler : public TestHandler {
645 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
646 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
648 virtual void handlerReady(uint16_t events) noexcept {
649 assert(events == EventHandler::READ);
650 ssize_t bytesRead = readFromFD(fd_, readLength_);
651 log.push_back(EventRecord(events, bytesRead, 0));
660 * Test reading only part of the available data when a read event is fired.
661 * When PERSIST is used, make sure the handler gets notified again the next
662 * time around the loop.
664 TEST(EventBaseTest, ReadPartial) {
668 // Register for read events
669 size_t readLength = 100;
670 PartialReadHandler handler(&eb, sp[0], readLength);
671 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
673 // Register a timeout to perform a single write,
674 // with more data than PartialReadHandler will read at once
675 ScheduledEvent events[] = {
676 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
679 scheduleEvents(&eb, sp[1], events);
681 // Schedule a timeout to unregister the handler
682 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
689 ASSERT_EQ(handler.log.size(), 4);
691 // The first 3 invocations should read readLength bytes each
692 for (int n = 0; n < 3; ++n) {
693 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
694 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
695 milliseconds(events[0].milliseconds));
696 ASSERT_EQ(handler.log[n].bytesRead, readLength);
697 ASSERT_EQ(handler.log[n].bytesWritten, 0);
699 // The last read only has readLength/2 bytes
700 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
701 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
702 milliseconds(events[0].milliseconds));
703 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
704 ASSERT_EQ(handler.log[3].bytesWritten, 0);
708 class PartialWriteHandler : public TestHandler {
710 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
711 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
713 virtual void handlerReady(uint16_t events) noexcept {
714 assert(events == EventHandler::WRITE);
715 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
716 log.push_back(EventRecord(events, 0, bytesWritten));
725 * Test writing without completely filling up the write buffer when the fd
726 * becomes writable. When PERSIST is used, make sure the handler gets
727 * notified again the next time around the loop.
729 TEST(EventBaseTest, WritePartial) {
733 // Fill up the write buffer before starting
734 size_t initialBytesWritten = writeUntilFull(sp[0]);
736 // Register for write events
737 size_t writeLength = 100;
738 PartialWriteHandler handler(&eb, sp[0], writeLength);
739 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
741 // Register a timeout to read, so that more data can be written
742 ScheduledEvent events[] = {
743 { 10, EventHandler::READ, 0 },
746 scheduleEvents(&eb, sp[1], events);
748 // Schedule a timeout to unregister the handler
749 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
756 // Depending on how big the socket buffer is, there will be multiple writes
757 // Only check the first 5
759 ASSERT_GE(handler.log.size(), numChecked);
760 ASSERT_EQ(events[0].result, initialBytesWritten);
762 // The first 3 invocations should read writeLength bytes each
763 for (int n = 0; n < numChecked; ++n) {
764 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
765 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
766 milliseconds(events[0].milliseconds));
767 ASSERT_EQ(handler.log[n].bytesRead, 0);
768 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
774 * Test destroying a registered EventHandler
776 TEST(EventBaseTest, DestroyHandler) {
777 class DestroyHandler : public AsyncTimeout {
779 DestroyHandler(EventBase* eb, EventHandler* h)
783 virtual void timeoutExpired() noexcept {
788 EventHandler* handler_;
794 // Fill up the write buffer before starting
795 size_t initialBytesWritten = writeUntilFull(sp[0]);
797 // Register for write events
798 TestHandler* handler = new TestHandler(&eb, sp[0]);
799 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
801 // After 10ms, read some data, so that the handler
802 // will be notified that it can write.
803 eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
806 // Start a timer to destroy the handler after 25ms
807 // This mainly just makes sure the code doesn't break or assert
808 DestroyHandler dh(&eb, handler);
809 dh.scheduleTimeout(25);
815 // Make sure the EventHandler was uninstalled properly when it was
816 // destroyed, and the EventBase loop exited
817 T_CHECK_TIMEOUT(start, end, milliseconds(25));
819 // Make sure that the handler wrote data to the socket
820 // before it was destroyed
821 size_t bytesRemaining = readUntilEmpty(sp[1]);
822 ASSERT_GT(bytesRemaining, 0);
826 ///////////////////////////////////////////////////////////////////////////
827 // Tests for timeout events
828 ///////////////////////////////////////////////////////////////////////////
830 TEST(EventBaseTest, RunAfterDelay) {
833 TimePoint timestamp1(false);
834 TimePoint timestamp2(false);
835 TimePoint timestamp3(false);
836 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
837 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
838 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
844 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
845 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
846 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
847 T_CHECK_TIMEOUT(start, end, milliseconds(40));
851 * Test the behavior of runAfterDelay() when some timeouts are
852 * still scheduled when the EventBase is destroyed.
854 TEST(EventBaseTest, RunAfterDelayDestruction) {
855 TimePoint timestamp1(false);
856 TimePoint timestamp2(false);
857 TimePoint timestamp3(false);
858 TimePoint timestamp4(false);
859 TimePoint start(false);
860 TimePoint end(false);
865 // Run two normal timeouts
866 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
867 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
869 // Schedule a timeout to stop the event loop after 40ms
870 eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
872 // Schedule 2 timeouts that would fire after the event loop stops
873 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
874 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
881 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
882 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
883 T_CHECK_TIMEOUT(start, end, milliseconds(40));
885 ASSERT_TRUE(timestamp3.isUnset());
886 ASSERT_TRUE(timestamp4.isUnset());
888 // Ideally this test should be run under valgrind to ensure that no
892 class TestTimeout : public AsyncTimeout {
894 explicit TestTimeout(EventBase* eventBase)
895 : AsyncTimeout(eventBase)
896 , timestamp(false) {}
898 virtual void timeoutExpired() noexcept {
905 TEST(EventBaseTest, BasicTimeouts) {
911 t1.scheduleTimeout(10);
912 t2.scheduleTimeout(20);
913 t3.scheduleTimeout(40);
919 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922 T_CHECK_TIMEOUT(start, end, milliseconds(40));
925 class ReschedulingTimeout : public AsyncTimeout {
927 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
929 , timeouts_(timeouts)
930 , iterator_(timeouts_.begin()) {}
936 virtual void timeoutExpired() noexcept {
937 timestamps.push_back(TimePoint());
942 if (iterator_ != timeouts_.end()) {
943 uint32_t timeout = *iterator_;
945 scheduleTimeout(timeout);
949 vector<TimePoint> timestamps;
952 vector<uint32_t> timeouts_;
953 vector<uint32_t>::const_iterator iterator_;
957 * Test rescheduling the same timeout multiple times
959 TEST(EventBaseTest, ReuseTimeout) {
962 vector<uint32_t> timeouts;
963 timeouts.push_back(10);
964 timeouts.push_back(30);
965 timeouts.push_back(15);
967 ReschedulingTimeout t(&eb, timeouts);
974 // Use a higher tolerance than usual. We're waiting on 3 timeouts
975 // consecutively. In general, each timeout may go over by a few
976 // milliseconds, and we're tripling this error by witing on 3 timeouts.
977 milliseconds tolerance{6};
979 ASSERT_EQ(timeouts.size(), t.timestamps.size());
981 for (size_t n = 0; n < timeouts.size(); ++n) {
982 total += timeouts[n];
983 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
985 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
989 * Test rescheduling a timeout before it has fired
991 TEST(EventBaseTest, RescheduleTimeout) {
998 t1.scheduleTimeout(15);
999 t2.scheduleTimeout(30);
1000 t3.scheduleTimeout(30);
1002 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1003 &AsyncTimeout::scheduleTimeout);
1005 // after 10ms, reschedule t2 to run sooner than originally scheduled
1006 eb.runAfterDelay(std::bind(f, &t2, 10), 10);
1007 // after 10ms, reschedule t3 to run later than originally scheduled
1008 eb.runAfterDelay(std::bind(f, &t3, 40), 10);
1014 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1015 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1016 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1017 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1021 * Test cancelling a timeout
1023 TEST(EventBaseTest, CancelTimeout) {
1026 vector<uint32_t> timeouts;
1027 timeouts.push_back(10);
1028 timeouts.push_back(30);
1029 timeouts.push_back(25);
1031 ReschedulingTimeout t(&eb, timeouts);
1033 eb.runAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1039 ASSERT_EQ(t.timestamps.size(), 2);
1040 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1041 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1042 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1046 * Test destroying a scheduled timeout object
1048 TEST(EventBaseTest, DestroyTimeout) {
1049 class DestroyTimeout : public AsyncTimeout {
1051 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1055 virtual void timeoutExpired() noexcept {
1060 AsyncTimeout* timeout_;
1065 TestTimeout* t1 = new TestTimeout(&eb);
1066 t1->scheduleTimeout(30);
1068 DestroyTimeout dt(&eb, t1);
1069 dt.scheduleTimeout(10);
1075 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1079 ///////////////////////////////////////////////////////////////////////////
1080 // Test for runInThreadTestFunc()
1081 ///////////////////////////////////////////////////////////////////////////
1083 struct RunInThreadData {
1084 RunInThreadData(int numThreads, int opsPerThread)
1085 : opsPerThread(opsPerThread)
1086 , opsToGo(numThreads*opsPerThread) {}
1089 deque< pair<int, int> > values;
1095 struct RunInThreadArg {
1096 RunInThreadArg(RunInThreadData* data,
1103 RunInThreadData* data;
1108 void runInThreadTestFunc(RunInThreadArg* arg) {
1109 arg->data->values.push_back(make_pair(arg->thread, arg->value));
1110 RunInThreadData* data = arg->data;
1113 if(--data->opsToGo == 0) {
1114 // Break out of the event base loop if we are the last thread running
1115 data->evb.terminateLoopSoon();
1119 TEST(EventBaseTest, RunInThread) {
1120 uint32_t numThreads = 50;
1121 uint32_t opsPerThread = 100;
1122 RunInThreadData data(numThreads, opsPerThread);
1124 deque<std::thread> threads;
1125 for (uint32_t i = 0; i < numThreads; ++i) {
1126 threads.emplace_back([i, &data] {
1127 for (int n = 0; n < data.opsPerThread; ++n) {
1128 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1129 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1135 // Add a timeout event to run after 3 seconds.
1136 // Otherwise loop() will return immediately since there are no events to run.
1137 // Once the last thread exits, it will stop the loop(). However, this
1138 // timeout also stops the loop in case there is a bug performing the normal
1140 data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1147 // Verify that the loop exited because all threads finished and requested it
1148 // to stop. This should happen much sooner than the 3 second timeout.
1149 // Assert that it happens in under a second. (This is still tons of extra
1152 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1153 end.getTime() - start.getTime());
1154 ASSERT_LT(timeTaken.count(), 1000);
1155 VLOG(11) << "Time taken: " << timeTaken.count();
1157 // Verify that we have all of the events from every thread
1158 int expectedValues[numThreads];
1159 for (uint32_t n = 0; n < numThreads; ++n) {
1160 expectedValues[n] = 0;
1162 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1163 it != data.values.end();
1165 int threadID = it->first;
1166 int value = it->second;
1167 ASSERT_EQ(expectedValues[threadID], value);
1168 ++expectedValues[threadID];
1170 for (uint32_t n = 0; n < numThreads; ++n) {
1171 ASSERT_EQ(expectedValues[n], opsPerThread);
1174 // Wait on all of the threads.
1175 for (auto& thread: threads) {
1180 // This test simulates some calls, and verifies that the waiting happens by
1181 // triggering what otherwise would be race conditions, and trying to detect
1182 // whether any of the race conditions happened.
1183 TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
1184 const size_t c = 256;
1185 vector<unique_ptr<atomic<size_t>>> atoms(c);
1186 for (size_t i = 0; i < c; ++i) {
1187 auto& atom = atoms.at(i);
1188 atom = make_unique<atomic<size_t>>(0);
1190 vector<thread> threads(c);
1191 for (size_t i = 0; i < c; ++i) {
1192 auto& atom = *atoms.at(i);
1193 auto& th = threads.at(i);
1194 th = thread([&atom] {
1196 auto ebth = thread([&]{ eb.loopForever(); });
1197 eb.waitUntilRunning();
1198 eb.runInEventBaseThreadAndWait([&] {
1200 atom.compare_exchange_weak(
1201 x, 1, std::memory_order_release, std::memory_order_relaxed);
1204 atom.compare_exchange_weak(
1205 x, 2, std::memory_order_release, std::memory_order_relaxed);
1206 eb.terminateLoopSoon();
1210 for (size_t i = 0; i < c; ++i) {
1211 auto& th = threads.at(i);
1215 for (auto& atom : atoms) sum += *atom;
1219 ///////////////////////////////////////////////////////////////////////////
1220 // Tests for runInLoop()
1221 ///////////////////////////////////////////////////////////////////////////
1223 class CountedLoopCallback : public EventBase::LoopCallback {
1225 CountedLoopCallback(EventBase* eventBase,
1227 std::function<void()> action =
1228 std::function<void()>())
1229 : eventBase_(eventBase)
1231 , action_(action) {}
1233 virtual void runLoopCallback() noexcept {
1236 eventBase_->runInLoop(this);
1237 } else if (action_) {
1242 unsigned int getCount() const {
1247 EventBase* eventBase_;
1248 unsigned int count_;
1249 std::function<void()> action_;
1252 // Test that EventBase::loop() doesn't exit while there are
1253 // still LoopCallbacks remaining to be invoked.
1254 TEST(EventBaseTest, RepeatedRunInLoop) {
1255 EventBase eventBase;
1257 CountedLoopCallback c(&eventBase, 10);
1258 eventBase.runInLoop(&c);
1259 // The callback shouldn't have run immediately
1260 ASSERT_EQ(c.getCount(), 10);
1263 // loop() should loop until the CountedLoopCallback stops
1264 // re-installing itself.
1265 ASSERT_EQ(c.getCount(), 0);
1268 // Test runInLoop() calls with terminateLoopSoon()
1269 TEST(EventBaseTest, RunInLoopStopLoop) {
1270 EventBase eventBase;
1272 CountedLoopCallback c1(&eventBase, 20);
1273 CountedLoopCallback c2(&eventBase, 10,
1274 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1276 eventBase.runInLoop(&c1);
1277 eventBase.runInLoop(&c2);
1278 ASSERT_EQ(c1.getCount(), 20);
1279 ASSERT_EQ(c2.getCount(), 10);
1281 eventBase.loopForever();
1283 // c2 should have stopped the loop after 10 iterations
1284 ASSERT_EQ(c2.getCount(), 0);
1286 // We allow the EventBase to run the loop callbacks in whatever order it
1287 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1288 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1291 // (With the current code, c1 will always run 10 times, but we don't consider
1292 // this a hard API requirement.)
1293 ASSERT_GE(c1.getCount(), 10);
1294 ASSERT_LE(c1.getCount(), 11);
1297 // Test cancelling runInLoop() callbacks
1298 TEST(EventBaseTest, CancelRunInLoop) {
1299 EventBase eventBase;
1301 CountedLoopCallback c1(&eventBase, 20);
1302 CountedLoopCallback c2(&eventBase, 20);
1303 CountedLoopCallback c3(&eventBase, 20);
1305 std::function<void()> cancelC1Action =
1306 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1307 std::function<void()> cancelC2Action =
1308 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1310 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1311 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1313 // Install cancelC1 after c1
1314 eventBase.runInLoop(&c1);
1315 eventBase.runInLoop(&cancelC1);
1317 // Install cancelC2 before c2
1318 eventBase.runInLoop(&cancelC2);
1319 eventBase.runInLoop(&c2);
1322 eventBase.runInLoop(&c3);
1324 ASSERT_EQ(c1.getCount(), 20);
1325 ASSERT_EQ(c2.getCount(), 20);
1326 ASSERT_EQ(c3.getCount(), 20);
1327 ASSERT_EQ(cancelC1.getCount(), 10);
1328 ASSERT_EQ(cancelC2.getCount(), 10);
1333 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1334 // stopped re-installing themselves
1335 ASSERT_EQ(cancelC1.getCount(), 0);
1336 ASSERT_EQ(cancelC2.getCount(), 0);
1337 // c3 should have continued on for the full 20 iterations
1338 ASSERT_EQ(c3.getCount(), 0);
1340 // c1 and c2 should have both been cancelled on the 10th iteration.
1342 // Callbacks are always run in the order they are installed,
1343 // so c1 should have fired 10 times, and been canceled after it ran on the
1344 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1345 // have run before it on the 10th iteration, and cancelled it before it
1347 ASSERT_EQ(c1.getCount(), 10);
1348 ASSERT_EQ(c2.getCount(), 11);
1351 class TerminateTestCallback : public EventBase::LoopCallback,
1352 public EventHandler {
1354 TerminateTestCallback(EventBase* eventBase, int fd)
1355 : EventHandler(eventBase, fd),
1356 eventBase_(eventBase),
1357 loopInvocations_(0),
1358 maxLoopInvocations_(0),
1359 eventInvocations_(0),
1360 maxEventInvocations_(0) {}
1362 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1363 loopInvocations_ = 0;
1364 maxLoopInvocations_ = maxLoopInvocations;
1365 eventInvocations_ = 0;
1366 maxEventInvocations_ = maxEventInvocations;
1368 cancelLoopCallback();
1369 unregisterHandler();
1372 virtual void handlerReady(uint16_t events) noexcept {
1373 // We didn't register with PERSIST, so we will have been automatically
1374 // unregistered already.
1375 ASSERT_FALSE(isHandlerRegistered());
1377 ++eventInvocations_;
1378 if (eventInvocations_ >= maxEventInvocations_) {
1382 eventBase_->runInLoop(this);
1384 virtual void runLoopCallback() noexcept {
1386 if (loopInvocations_ >= maxLoopInvocations_) {
1390 registerHandler(READ);
1393 uint32_t getLoopInvocations() const {
1394 return loopInvocations_;
1396 uint32_t getEventInvocations() const {
1397 return eventInvocations_;
1401 EventBase* eventBase_;
1402 uint32_t loopInvocations_;
1403 uint32_t maxLoopInvocations_;
1404 uint32_t eventInvocations_;
1405 uint32_t maxEventInvocations_;
1409 * Test that EventBase::loop() correctly detects when there are no more events
1412 * This uses a single callback, which alternates registering itself as a loop
1413 * callback versus a EventHandler callback. This exercises a regression where
1414 * EventBase::loop() incorrectly exited if there were no more fd handlers
1415 * registered, but a loop callback installed a new fd handler.
1417 TEST(EventBaseTest, LoopTermination) {
1418 EventBase eventBase;
1420 // Open a pipe and close the write end,
1421 // so the read endpoint will be readable
1423 int rc = pipe(pipeFds);
1426 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1428 // Test once where the callback will exit after a loop callback
1429 callback.reset(10, 100);
1430 eventBase.runInLoop(&callback);
1432 ASSERT_EQ(callback.getLoopInvocations(), 10);
1433 ASSERT_EQ(callback.getEventInvocations(), 9);
1435 // Test once where the callback will exit after an fd event callback
1436 callback.reset(100, 7);
1437 eventBase.runInLoop(&callback);
1439 ASSERT_EQ(callback.getLoopInvocations(), 7);
1440 ASSERT_EQ(callback.getEventInvocations(), 7);
1445 ///////////////////////////////////////////////////////////////////////////
1446 // Tests for latency calculations
1447 ///////////////////////////////////////////////////////////////////////////
1449 class IdleTimeTimeoutSeries : public AsyncTimeout {
1453 explicit IdleTimeTimeoutSeries(EventBase *base,
1454 std::deque<std::uint64_t>& timeout) :
1461 virtual ~IdleTimeTimeoutSeries() {}
1463 void timeoutExpired() noexcept {
1466 if(timeout_.empty()){
1469 uint64_t sleepTime = timeout_.front();
1470 timeout_.pop_front();
1478 int getTimeouts() const {
1484 std::deque<uint64_t>& timeout_;
1488 * Verify that idle time is correctly accounted for when decaying our loop
1491 * This works by creating a high loop time (via usleep), expecting a latency
1492 * callback with known value, and then scheduling a timeout for later. This
1493 * later timeout is far enough in the future that the idle time should have
1494 * caused the loop time to decay.
1496 TEST(EventBaseTest, IdleTime) {
1497 EventBase eventBase;
1498 eventBase.setLoadAvgMsec(1000);
1499 eventBase.resetLoadAvg(5900.0);
1500 std::deque<uint64_t> timeouts0(4, 8080);
1501 timeouts0.push_front(8000);
1502 timeouts0.push_back(14000);
1503 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1504 std::deque<uint64_t> timeouts(20, 20);
1505 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1506 int64_t testStart = duration_cast<microseconds>(
1507 std::chrono::steady_clock::now().time_since_epoch()).count();
1508 bool hostOverloaded = false;
1510 int latencyCallbacks = 0;
1511 eventBase.setMaxLatency(6000, [&]() {
1514 switch (latencyCallbacks) {
1516 if (tos0.getTimeouts() < 6) {
1517 // This could only happen if the host this test is running
1518 // on is heavily loaded.
1519 int64_t maxLatencyReached = duration_cast<microseconds>(
1520 std::chrono::steady_clock::now().time_since_epoch()).count();
1521 ASSERT_LE(43800, maxLatencyReached - testStart);
1522 hostOverloaded = true;
1525 ASSERT_EQ(6, tos0.getTimeouts());
1526 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1527 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1528 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1532 FAIL() << "Unexpected latency callback";
1537 // Kick things off with an "immedite" timeout
1538 tos0.scheduleTimeout(1);
1542 if (hostOverloaded) {
1546 ASSERT_EQ(1, latencyCallbacks);
1547 ASSERT_EQ(7, tos0.getTimeouts());
1548 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1549 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1551 ASSERT_EQ(21, tos->getTimeouts());
1555 * Test that thisLoop functionality works with terminateLoopSoon
1557 TEST(EventBaseTest, ThisLoop) {
1559 bool runInLoop = false;
1560 bool runThisLoop = false;
1563 eb.terminateLoopSoon();
1564 eb.runInLoop([&]() {
1567 eb.runInLoop([&]() {
1574 ASSERT_FALSE(runInLoop);
1575 // Should work with thisLoop
1576 ASSERT_TRUE(runThisLoop);
1579 TEST(EventBaseTest, EventBaseThreadLoop) {
1583 base.runInEventBaseThread([&](){
1588 ASSERT_EQ(true, ran);
1591 TEST(EventBaseTest, EventBaseThreadName) {
1593 base.setName("foo");
1596 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1598 pthread_getname_np(pthread_self(), name, 16);
1599 ASSERT_EQ(0, strcmp("foo", name));
1603 TEST(EventBaseTest, RunBeforeLoop) {
1605 CountedLoopCallback cb(&base, 1, [&](){
1606 base.terminateLoopSoon();
1608 base.runBeforeLoop(&cb);
1610 ASSERT_EQ(cb.getCount(), 0);
1613 TEST(EventBaseTest, RunBeforeLoopWait) {
1615 CountedLoopCallback cb(&base, 1);
1616 base.runAfterDelay([&](){
1617 base.terminateLoopSoon();
1619 base.runBeforeLoop(&cb);
1622 // Check that we only ran once, and did not loop multiple times.
1623 ASSERT_EQ(cb.getCount(), 0);
1626 class PipeHandler : public EventHandler {
1628 PipeHandler(EventBase* eventBase, int fd)
1629 : EventHandler(eventBase, fd) {}
1631 void handlerReady(uint16_t events) noexcept {
1636 TEST(EventBaseTest, StopBeforeLoop) {
1639 // Give the evb something to do.
1641 ASSERT_EQ(0, pipe(p));
1642 PipeHandler handler(&evb, p[0]);
1643 handler.registerHandler(EventHandler::READ);
1645 // It's definitely not running yet
1646 evb.terminateLoopSoon();
1648 // let it run, it should exit quickly.
1649 std::thread t([&] { evb.loop(); });
1652 handler.unregisterHandler();
1659 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1664 base.runInEventBaseThread([&](){