--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/test/SocketPair.h>
+#include <folly/io/async/test/Util.h>
+
+#include <iostream>
+#include <unistd.h>
+#include <memory>
+
+using std::deque;
+using std::pair;
+using std::vector;
+using std::make_pair;
+using std::cerr;
+using std::endl;
+
+using namespace folly;
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for read and write events
+///////////////////////////////////////////////////////////////////////////
+
+enum { BUF_SIZE = 4096 };
+
+ssize_t writeToFD(int fd, size_t length) {
+ // write an arbitrary amount of data to the fd
+ char buf[length];
+ memset(buf, 'a', sizeof(buf));
+ ssize_t rc = write(fd, buf, sizeof(buf));
+ CHECK_EQ(rc, length);
+ return rc;
+}
+
+size_t writeUntilFull(int fd) {
+ // Write to the fd until EAGAIN is returned
+ size_t bytesWritten = 0;
+ char buf[BUF_SIZE];
+ memset(buf, 'a', sizeof(buf));
+ while (true) {
+ ssize_t rc = write(fd, buf, sizeof(buf));
+ if (rc < 0) {
+ CHECK_EQ(errno, EAGAIN);
+ break;
+ } else {
+ bytesWritten += rc;
+ }
+ }
+ return bytesWritten;
+}
+
+ssize_t readFromFD(int fd, size_t length) {
+ // write an arbitrary amount of data to the fd
+ char buf[length];
+ return read(fd, buf, sizeof(buf));
+}
+
+size_t readUntilEmpty(int fd) {
+ // Read from the fd until EAGAIN is returned
+ char buf[BUF_SIZE];
+ size_t bytesRead = 0;
+ while (true) {
+ int rc = read(fd, buf, sizeof(buf));
+ if (rc == 0) {
+ CHECK(false) << "unexpected EOF";
+ } else if (rc < 0) {
+ CHECK_EQ(errno, EAGAIN);
+ break;
+ } else {
+ bytesRead += rc;
+ }
+ }
+ return bytesRead;
+}
+
+void checkReadUntilEmpty(int fd, size_t expectedLength) {
+ ASSERT_EQ(readUntilEmpty(fd), expectedLength);
+}
+
+struct ScheduledEvent {
+ int milliseconds;
+ uint16_t events;
+ size_t length;
+ ssize_t result;
+
+ void perform(int fd) {
+ if (events & EventHandler::READ) {
+ if (length == 0) {
+ result = readUntilEmpty(fd);
+ } else {
+ result = readFromFD(fd, length);
+ }
+ }
+ if (events & EventHandler::WRITE) {
+ if (length == 0) {
+ result = writeUntilFull(fd);
+ } else {
+ result = writeToFD(fd, length);
+ }
+ }
+ }
+};
+
+void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
+ for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
+ eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
+ ev->milliseconds);
+ }
+}
+
+class TestHandler : public EventHandler {
+ public:
+ TestHandler(EventBase* eventBase, int fd)
+ : EventHandler(eventBase, fd), fd_(fd) {}
+
+ virtual void handlerReady(uint16_t events) noexcept {
+ ssize_t bytesRead = 0;
+ ssize_t bytesWritten = 0;
+ if (events & READ) {
+ // Read all available data, so EventBase will stop calling us
+ // until new data becomes available
+ bytesRead = readUntilEmpty(fd_);
+ }
+ if (events & WRITE) {
+ // Write until the pipe buffer is full, so EventBase will stop calling
+ // us until the other end has read some data
+ bytesWritten = writeUntilFull(fd_);
+ }
+
+ log.push_back(EventRecord(events, bytesRead, bytesWritten));
+ }
+
+ struct EventRecord {
+ EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
+ : events(events)
+ , timestamp()
+ , bytesRead(bytesRead)
+ , bytesWritten(bytesWritten) {}
+
+ uint16_t events;
+ TimePoint timestamp;
+ ssize_t bytesRead;
+ ssize_t bytesWritten;
+ };
+
+ deque<EventRecord> log;
+
+ private:
+ int fd_;
+};
+
+/**
+ * Test a READ event
+ */
+TEST(EventBaseTest, ReadEvent) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Register for read events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ);
+
+ // Register timeouts to perform two write events
+ ScheduledEvent events[] = {
+ { 10, EventHandler::WRITE, 2345 },
+ { 160, EventHandler::WRITE, 99 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Since we didn't use the EventHandler::PERSIST flag, the handler should
+ // have received the first read, then unregistered itself. Check that only
+ // the first chunk of data was received.
+ ASSERT_EQ(handler.log.size(), 1);
+ ASSERT_EQ(handler.log[0].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90);
+ ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
+ ASSERT_EQ(handler.log[0].bytesWritten, 0);
+ T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30);
+
+ // Make sure the second chunk of data is still waiting to be read.
+ size_t bytesRemaining = readUntilEmpty(sp[0]);
+ ASSERT_EQ(bytesRemaining, events[1].length);
+}
+
+/**
+ * Test (READ | PERSIST)
+ */
+TEST(EventBaseTest, ReadPersist) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Register for read events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
+
+ // Register several timeouts to perform writes
+ ScheduledEvent events[] = {
+ { 10, EventHandler::WRITE, 1024 },
+ { 20, EventHandler::WRITE, 2211 },
+ { 30, EventHandler::WRITE, 4096 },
+ { 100, EventHandler::WRITE, 100 },
+ { 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler after the third write
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // The handler should have received the first 3 events,
+ // then been unregistered after that.
+ ASSERT_EQ(handler.log.size(), 3);
+ for (int n = 0; n < 3; ++n) {
+ ASSERT_EQ(handler.log[n].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+ ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
+ ASSERT_EQ(handler.log[n].bytesWritten, 0);
+ }
+ T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+
+ // Make sure the data from the last write is still waiting to be read
+ size_t bytesRemaining = readUntilEmpty(sp[0]);
+ ASSERT_EQ(bytesRemaining, events[3].length);
+}
+
+/**
+ * Test registering for READ when the socket is immediately readable
+ */
+TEST(EventBaseTest, ReadImmediate) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Write some data to the socket so the other end will
+ // be immediately readable
+ size_t dataLength = 1234;
+ writeToFD(sp[1], dataLength);
+
+ // Register for read events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
+
+ // Register a timeout to perform another write
+ ScheduledEvent events[] = {
+ { 10, EventHandler::WRITE, 2345 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ ASSERT_EQ(handler.log.size(), 2);
+
+ // There should have been 1 event for immediate readability
+ ASSERT_EQ(handler.log[0].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+ ASSERT_EQ(handler.log[0].bytesRead, dataLength);
+ ASSERT_EQ(handler.log[0].bytesWritten, 0);
+
+ // There should be another event after the timeout wrote more data
+ ASSERT_EQ(handler.log[1].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
+ ASSERT_EQ(handler.log[1].bytesWritten, 0);
+
+ T_CHECK_TIMEOUT(start, end, 20);
+}
+
+/**
+ * Test a WRITE event
+ */
+TEST(EventBaseTest, WriteEvent) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+ // Register for write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::WRITE);
+
+ // Register timeouts to perform two reads
+ ScheduledEvent events[] = {
+ { 10, EventHandler::READ, 0 },
+ { 60, EventHandler::READ, 0 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Since we didn't use the EventHandler::PERSIST flag, the handler should
+ // have only been able to write once, then unregistered itself.
+ ASSERT_EQ(handler.log.size(), 1);
+ ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[0].bytesRead, 0);
+ ASSERT_GT(handler.log[0].bytesWritten, 0);
+ T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+
+ ASSERT_EQ(events[0].result, initialBytesWritten);
+ ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
+}
+
+/**
+ * Test (WRITE | PERSIST)
+ */
+TEST(EventBaseTest, WritePersist) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+ // Register for write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+ // Register several timeouts to read from the socket at several intervals
+ ScheduledEvent events[] = {
+ { 10, EventHandler::READ, 0 },
+ { 40, EventHandler::READ, 0 },
+ { 70, EventHandler::READ, 0 },
+ { 100, EventHandler::READ, 0 },
+ { 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler after the third read
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // The handler should have received the first 3 events,
+ // then been unregistered after that.
+ ASSERT_EQ(handler.log.size(), 3);
+ ASSERT_EQ(events[0].result, initialBytesWritten);
+ for (int n = 0; n < 3; ++n) {
+ ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+ ASSERT_EQ(handler.log[n].bytesRead, 0);
+ ASSERT_GT(handler.log[n].bytesWritten, 0);
+ ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
+ }
+ T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+}
+
+/**
+ * Test registering for WRITE when the socket is immediately writable
+ */
+TEST(EventBaseTest, WriteImmediate) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Register for write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+ // Register a timeout to perform a read
+ ScheduledEvent events[] = {
+ { 10, EventHandler::READ, 0 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler
+ int64_t unregisterTimeout = 40;
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
+ unregisterTimeout);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ ASSERT_EQ(handler.log.size(), 2);
+
+ // Since the socket buffer was initially empty,
+ // there should have been 1 event for immediate writability
+ ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+ ASSERT_EQ(handler.log[0].bytesRead, 0);
+ ASSERT_GT(handler.log[0].bytesWritten, 0);
+
+ // There should be another event after the timeout wrote more data
+ ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[1].bytesRead, 0);
+ ASSERT_GT(handler.log[1].bytesWritten, 0);
+
+ T_CHECK_TIMEOUT(start, end, unregisterTimeout);
+}
+
+/**
+ * Test (READ | WRITE) when the socket becomes readable first
+ */
+TEST(EventBaseTest, ReadWrite) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t sock0WriteLength = writeUntilFull(sp[0]);
+
+ // Register for read and write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ_WRITE);
+
+ // Register timeouts to perform a write then a read.
+ ScheduledEvent events[] = {
+ { 10, EventHandler::WRITE, 2345 },
+ { 40, EventHandler::READ, 0 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Since we didn't use the EventHandler::PERSIST flag, the handler should
+ // have only noticed readability, then unregistered itself. Check that only
+ // one event was logged.
+ ASSERT_EQ(handler.log.size(), 1);
+ ASSERT_EQ(handler.log[0].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
+ ASSERT_EQ(handler.log[0].bytesWritten, 0);
+ ASSERT_EQ(events[1].result, sock0WriteLength);
+ T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+}
+
+/**
+ * Test (READ | WRITE) when the socket becomes writable first
+ */
+TEST(EventBaseTest, WriteRead) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t sock0WriteLength = writeUntilFull(sp[0]);
+
+ // Register for read and write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ_WRITE);
+
+ // Register timeouts to perform a read then a write.
+ size_t sock1WriteLength = 2345;
+ ScheduledEvent events[] = {
+ { 10, EventHandler::READ, 0 },
+ { 40, EventHandler::WRITE, sock1WriteLength },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Since we didn't use the EventHandler::PERSIST flag, the handler should
+ // have only noticed writability, then unregistered itself. Check that only
+ // one event was logged.
+ ASSERT_EQ(handler.log.size(), 1);
+ ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[0].bytesRead, 0);
+ ASSERT_GT(handler.log[0].bytesWritten, 0);
+ ASSERT_EQ(events[0].result, sock0WriteLength);
+ ASSERT_EQ(events[1].result, sock1WriteLength);
+ T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+
+ // Make sure the written data is still waiting to be read.
+ size_t bytesRemaining = readUntilEmpty(sp[0]);
+ ASSERT_EQ(bytesRemaining, events[1].length);
+}
+
+/**
+ * Test (READ | WRITE) when the socket becomes readable and writable
+ * at the same time.
+ */
+TEST(EventBaseTest, ReadWriteSimultaneous) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t sock0WriteLength = writeUntilFull(sp[0]);
+
+ // Register for read and write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ_WRITE);
+
+ // Register a timeout to perform a read and write together
+ ScheduledEvent events[] = {
+ { 10, EventHandler::READ | EventHandler::WRITE, 0 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // It's not strictly required that the EventBase register us about both
+ // events in the same call. So, it's possible that if the EventBase
+ // implementation changes this test could start failing, and it wouldn't be
+ // considered breaking the API. However for now it's nice to exercise this
+ // code path.
+ ASSERT_EQ(handler.log.size(), 1);
+ ASSERT_EQ(handler.log[0].events,
+ EventHandler::READ | EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
+ ASSERT_GT(handler.log[0].bytesWritten, 0);
+ T_CHECK_TIMEOUT(start, end, events[0].milliseconds);
+}
+
+/**
+ * Test (READ | WRITE | PERSIST)
+ */
+TEST(EventBaseTest, ReadWritePersist) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Register for read and write events
+ TestHandler handler(&eb, sp[0]);
+ handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
+ EventHandler::PERSIST);
+
+ // Register timeouts to perform several reads and writes
+ ScheduledEvent events[] = {
+ { 10, EventHandler::WRITE, 2345 },
+ { 20, EventHandler::READ, 0 },
+ { 35, EventHandler::WRITE, 200 },
+ { 45, EventHandler::WRITE, 15 },
+ { 55, EventHandler::READ, 0 },
+ { 120, EventHandler::WRITE, 2345 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ ASSERT_EQ(handler.log.size(), 6);
+
+ // Since we didn't fill up the write buffer immediately, there should
+ // be an immediate event for writability.
+ ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+ ASSERT_EQ(handler.log[0].bytesRead, 0);
+ ASSERT_GT(handler.log[0].bytesWritten, 0);
+
+ // Events 1 through 5 should correspond to the scheduled events
+ for (int n = 1; n < 6; ++n) {
+ ScheduledEvent* event = &events[n - 1];
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds);
+ if (event->events == EventHandler::READ) {
+ ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
+ ASSERT_EQ(handler.log[n].bytesRead, 0);
+ ASSERT_GT(handler.log[n].bytesWritten, 0);
+ } else {
+ ASSERT_EQ(handler.log[n].events, EventHandler::READ);
+ ASSERT_EQ(handler.log[n].bytesRead, event->length);
+ ASSERT_EQ(handler.log[n].bytesWritten, 0);
+ }
+ }
+
+ // The timeout should have unregistered the handler before the last write.
+ // Make sure that data is still waiting to be read
+ size_t bytesRemaining = readUntilEmpty(sp[0]);
+ ASSERT_EQ(bytesRemaining, events[5].length);
+}
+
+
+class PartialReadHandler : public TestHandler {
+ public:
+ PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
+ : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
+
+ virtual void handlerReady(uint16_t events) noexcept {
+ assert(events == EventHandler::READ);
+ ssize_t bytesRead = readFromFD(fd_, readLength_);
+ log.push_back(EventRecord(events, bytesRead, 0));
+ }
+
+ private:
+ int fd_;
+ size_t readLength_;
+};
+
+/**
+ * Test reading only part of the available data when a read event is fired.
+ * When PERSIST is used, make sure the handler gets notified again the next
+ * time around the loop.
+ */
+TEST(EventBaseTest, ReadPartial) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Register for read events
+ size_t readLength = 100;
+ PartialReadHandler handler(&eb, sp[0], readLength);
+ handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
+
+ // Register a timeout to perform a single write,
+ // with more data than PartialReadHandler will read at once
+ ScheduledEvent events[] = {
+ { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ ASSERT_EQ(handler.log.size(), 4);
+
+ // The first 3 invocations should read readLength bytes each
+ for (int n = 0; n < 3; ++n) {
+ ASSERT_EQ(handler.log[n].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[n].bytesRead, readLength);
+ ASSERT_EQ(handler.log[n].bytesWritten, 0);
+ }
+ // The last read only has readLength/2 bytes
+ ASSERT_EQ(handler.log[3].events, EventHandler::READ);
+ T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
+ ASSERT_EQ(handler.log[3].bytesWritten, 0);
+}
+
+
+class PartialWriteHandler : public TestHandler {
+ public:
+ PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
+ : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
+
+ virtual void handlerReady(uint16_t events) noexcept {
+ assert(events == EventHandler::WRITE);
+ ssize_t bytesWritten = writeToFD(fd_, writeLength_);
+ log.push_back(EventRecord(events, 0, bytesWritten));
+ }
+
+ private:
+ int fd_;
+ size_t writeLength_;
+};
+
+/**
+ * Test writing without completely filling up the write buffer when the fd
+ * becomes writable. When PERSIST is used, make sure the handler gets
+ * notified again the next time around the loop.
+ */
+TEST(EventBaseTest, WritePartial) {
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+ // Register for write events
+ size_t writeLength = 100;
+ PartialWriteHandler handler(&eb, sp[0], writeLength);
+ handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+ // Register a timeout to read, so that more data can be written
+ ScheduledEvent events[] = {
+ { 10, EventHandler::READ, 0 },
+ { 0, 0, 0 },
+ };
+ scheduleEvents(&eb, sp[1], events);
+
+ // Schedule a timeout to unregister the handler
+ eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+
+ // Loop
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Depending on how big the socket buffer is, there will be multiple writes
+ // Only check the first 5
+ int numChecked = 5;
+ ASSERT_GE(handler.log.size(), numChecked);
+ ASSERT_EQ(events[0].result, initialBytesWritten);
+
+ // The first 3 invocations should read writeLength bytes each
+ for (int n = 0; n < numChecked; ++n) {
+ ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+ ASSERT_EQ(handler.log[n].bytesRead, 0);
+ ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
+ }
+}
+
+
+/**
+ * Test destroying a registered EventHandler
+ */
+TEST(EventBaseTest, DestroyHandler) {
+ class DestroyHandler : public TAsyncTimeout {
+ public:
+ DestroyHandler(EventBase* eb, EventHandler* h)
+ : TAsyncTimeout(eb)
+ , handler_(h) {}
+
+ virtual void timeoutExpired() noexcept {
+ delete handler_;
+ }
+
+ private:
+ EventHandler* handler_;
+ };
+
+ EventBase eb;
+ SocketPair sp;
+
+ // Fill up the write buffer before starting
+ size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+ // Register for write events
+ TestHandler* handler = new TestHandler(&eb, sp[0]);
+ handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+ // After 10ms, read some data, so that the handler
+ // will be notified that it can write.
+ eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
+ 10);
+
+ // Start a timer to destroy the handler after 25ms
+ // This mainly just makes sure the code doesn't break or assert
+ DestroyHandler dh(&eb, handler);
+ dh.scheduleTimeout(25);
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Make sure the EventHandler was uninstalled properly when it was
+ // destroyed, and the EventBase loop exited
+ T_CHECK_TIMEOUT(start, end, 25);
+
+ // Make sure that the handler wrote data to the socket
+ // before it was destroyed
+ size_t bytesRemaining = readUntilEmpty(sp[1]);
+ ASSERT_GT(bytesRemaining, 0);
+}
+
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for timeout events
+///////////////////////////////////////////////////////////////////////////
+
+TEST(EventBaseTest, RunAfterDelay) {
+ EventBase eb;
+
+ TimePoint timestamp1(false);
+ TimePoint timestamp2(false);
+ TimePoint timestamp3(false);
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ T_CHECK_TIMEOUT(start, timestamp1, 10);
+ T_CHECK_TIMEOUT(start, timestamp2, 20);
+ T_CHECK_TIMEOUT(start, timestamp3, 40);
+ T_CHECK_TIMEOUT(start, end, 40);
+}
+
+/**
+ * Test the behavior of runAfterDelay() when some timeouts are
+ * still scheduled when the EventBase is destroyed.
+ */
+TEST(EventBaseTest, RunAfterDelayDestruction) {
+ TimePoint timestamp1(false);
+ TimePoint timestamp2(false);
+ TimePoint timestamp3(false);
+ TimePoint timestamp4(false);
+ TimePoint start(false);
+ TimePoint end(false);
+
+ {
+ EventBase eb;
+
+ // Run two normal timeouts
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
+
+ // Schedule a timeout to stop the event loop after 40ms
+ eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
+
+ // Schedule 2 timeouts that would fire after the event loop stops
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
+ eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
+
+ start.reset();
+ eb.loop();
+ end.reset();
+ }
+
+ T_CHECK_TIMEOUT(start, timestamp1, 10);
+ T_CHECK_TIMEOUT(start, timestamp2, 20);
+ T_CHECK_TIMEOUT(start, end, 40);
+
+ ASSERT_TRUE(timestamp3.isUnset());
+ ASSERT_TRUE(timestamp4.isUnset());
+
+ // Ideally this test should be run under valgrind to ensure that no
+ // memory is leaked.
+}
+
+class TestTimeout : public TAsyncTimeout {
+ public:
+ explicit TestTimeout(EventBase* eventBase)
+ : TAsyncTimeout(eventBase)
+ , timestamp(false) {}
+
+ virtual void timeoutExpired() noexcept {
+ timestamp.reset();
+ }
+
+ TimePoint timestamp;
+};
+
+TEST(EventBaseTest, BasicTimeouts) {
+ EventBase eb;
+
+ TestTimeout t1(&eb);
+ TestTimeout t2(&eb);
+ TestTimeout t3(&eb);
+ t1.scheduleTimeout(10);
+ t2.scheduleTimeout(20);
+ t3.scheduleTimeout(40);
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ T_CHECK_TIMEOUT(start, t1.timestamp, 10);
+ T_CHECK_TIMEOUT(start, t2.timestamp, 20);
+ T_CHECK_TIMEOUT(start, t3.timestamp, 40);
+ T_CHECK_TIMEOUT(start, end, 40);
+}
+
+class ReschedulingTimeout : public TAsyncTimeout {
+ public:
+ ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
+ : TAsyncTimeout(evb)
+ , timeouts_(timeouts)
+ , iterator_(timeouts_.begin()) {}
+
+ void start() {
+ reschedule();
+ }
+
+ virtual void timeoutExpired() noexcept {
+ timestamps.push_back(TimePoint());
+ reschedule();
+ }
+
+ void reschedule() {
+ if (iterator_ != timeouts_.end()) {
+ uint32_t timeout = *iterator_;
+ ++iterator_;
+ scheduleTimeout(timeout);
+ }
+ }
+
+ vector<TimePoint> timestamps;
+
+ private:
+ vector<uint32_t> timeouts_;
+ vector<uint32_t>::const_iterator iterator_;
+};
+
+/**
+ * Test rescheduling the same timeout multiple times
+ */
+TEST(EventBaseTest, ReuseTimeout) {
+ EventBase eb;
+
+ vector<uint32_t> timeouts;
+ timeouts.push_back(10);
+ timeouts.push_back(30);
+ timeouts.push_back(15);
+
+ ReschedulingTimeout t(&eb, timeouts);
+ t.start();
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ // Use a higher tolerance than usual. We're waiting on 3 timeouts
+ // consecutively. In general, each timeout may go over by a few
+ // milliseconds, and we're tripling this error by witing on 3 timeouts.
+ int64_t tolerance = 6;
+
+ ASSERT_EQ(timeouts.size(), t.timestamps.size());
+ uint32_t total = 0;
+ for (int n = 0; n < timeouts.size(); ++n) {
+ total += timeouts[n];
+ T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance);
+ }
+ T_CHECK_TIMEOUT(start, end, total, tolerance);
+}
+
+/**
+ * Test rescheduling a timeout before it has fired
+ */
+TEST(EventBaseTest, RescheduleTimeout) {
+ EventBase eb;
+
+ TestTimeout t1(&eb);
+ TestTimeout t2(&eb);
+ TestTimeout t3(&eb);
+
+ t1.scheduleTimeout(15);
+ t2.scheduleTimeout(30);
+ t3.scheduleTimeout(30);
+
+ auto f = static_cast<bool(TAsyncTimeout::*)(uint32_t)>(
+ &TAsyncTimeout::scheduleTimeout);
+
+ // after 10ms, reschedule t2 to run sooner than originally scheduled
+ eb.runAfterDelay(std::bind(f, &t2, 10), 10);
+ // after 10ms, reschedule t3 to run later than originally scheduled
+ eb.runAfterDelay(std::bind(f, &t3, 40), 10);
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ T_CHECK_TIMEOUT(start, t1.timestamp, 15);
+ T_CHECK_TIMEOUT(start, t2.timestamp, 20);
+ T_CHECK_TIMEOUT(start, t3.timestamp, 50);
+ T_CHECK_TIMEOUT(start, end, 50);
+}
+
+/**
+ * Test cancelling a timeout
+ */
+TEST(EventBaseTest, CancelTimeout) {
+ EventBase eb;
+
+ vector<uint32_t> timeouts;
+ timeouts.push_back(10);
+ timeouts.push_back(30);
+ timeouts.push_back(25);
+
+ ReschedulingTimeout t(&eb, timeouts);
+ t.start();
+ eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50);
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ ASSERT_EQ(t.timestamps.size(), 2);
+ T_CHECK_TIMEOUT(start, t.timestamps[0], 10);
+ T_CHECK_TIMEOUT(start, t.timestamps[1], 40);
+ T_CHECK_TIMEOUT(start, end, 50);
+}
+
+/**
+ * Test destroying a scheduled timeout object
+ */
+TEST(EventBaseTest, DestroyTimeout) {
+ class DestroyTimeout : public TAsyncTimeout {
+ public:
+ DestroyTimeout(EventBase* eb, TAsyncTimeout* t)
+ : TAsyncTimeout(eb)
+ , timeout_(t) {}
+
+ virtual void timeoutExpired() noexcept {
+ delete timeout_;
+ }
+
+ private:
+ TAsyncTimeout* timeout_;
+ };
+
+ EventBase eb;
+
+ TestTimeout* t1 = new TestTimeout(&eb);
+ t1->scheduleTimeout(30);
+
+ DestroyTimeout dt(&eb, t1);
+ dt.scheduleTimeout(10);
+
+ TimePoint start;
+ eb.loop();
+ TimePoint end;
+
+ T_CHECK_TIMEOUT(start, end, 10);
+}
+
+
+///////////////////////////////////////////////////////////////////////////
+// Test for runInThreadTestFunc()
+///////////////////////////////////////////////////////////////////////////
+
+struct RunInThreadData {
+ RunInThreadData(int numThreads, int opsPerThread)
+ : opsPerThread(opsPerThread)
+ , opsToGo(numThreads*opsPerThread) {}
+
+ EventBase evb;
+ deque< pair<int, int> > values;
+
+ int opsPerThread;
+ int opsToGo;
+};
+
+struct RunInThreadArg {
+ RunInThreadArg(RunInThreadData* data,
+ int threadId,
+ int value)
+ : data(data)
+ , thread(threadId)
+ , value(value) {}
+
+ RunInThreadData* data;
+ int thread;
+ int value;
+};
+
+void runInThreadTestFunc(RunInThreadArg* arg) {
+ arg->data->values.push_back(make_pair(arg->thread, arg->value));
+ RunInThreadData* data = arg->data;
+ delete arg;
+
+ if(--data->opsToGo == 0) {
+ // Break out of the event base loop if we are the last thread running
+ data->evb.terminateLoopSoon();
+ }
+}
+
+class RunInThreadTester : public concurrency::Runnable {
+ public:
+ RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {}
+
+ void run() {
+ // Call evb->runInThread() a number of times
+ {
+ for (int n = 0; n < data_->opsPerThread; ++n) {
+ RunInThreadArg* arg = new RunInThreadArg(data_, id_, n);
+ data_->evb.runInEventBaseThread(runInThreadTestFunc, arg);
+ usleep(10);
+ }
+ }
+ }
+
+ private:
+ int id_;
+ RunInThreadData* data_;
+};
+
+TEST(EventBaseTest, RunInThread) {
+ uint32_t numThreads = 50;
+ uint32_t opsPerThread = 100;
+ RunInThreadData data(numThreads, opsPerThread);
+
+ PosixThreadFactory threadFactory;
+ threadFactory.setDetached(false);
+ deque< std::shared_ptr<Thread> > threads;
+ for (int n = 0; n < numThreads; ++n) {
+ std::shared_ptr<RunInThreadTester> runner(new RunInThreadTester(n, &data));
+ std::shared_ptr<Thread> thread = threadFactory.newThread(runner);
+ threads.push_back(thread);
+ thread->start();
+ }
+
+ // Add a timeout event to run after 3 seconds.
+ // Otherwise loop() will return immediately since there are no events to run.
+ // Once the last thread exits, it will stop the loop(). However, this
+ // timeout also stops the loop in case there is a bug performing the normal
+ // stop.
+ data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
+ 3000);
+
+ TimePoint start;
+ data.evb.loop();
+ TimePoint end;
+
+ // Verify that the loop exited because all threads finished and requested it
+ // to stop. This should happen much sooner than the 3 second timeout.
+ // Assert that it happens in under a second. (This is still tons of extra
+ // padding.)
+ int64_t timeTaken = end.getTime() - start.getTime();
+ ASSERT_LT(timeTaken, 1000);
+ VLOG(11) << "Time taken: " << timeTaken;
+
+ // Verify that we have all of the events from every thread
+ int expectedValues[numThreads];
+ for (int n = 0; n < numThreads; ++n) {
+ expectedValues[n] = 0;
+ }
+ for (deque< pair<int, int> >::const_iterator it = data.values.begin();
+ it != data.values.end();
+ ++it) {
+ int threadID = it->first;
+ int value = it->second;
+ ASSERT_EQ(expectedValues[threadID], value);
+ ++expectedValues[threadID];
+ }
+ for (int n = 0; n < numThreads; ++n) {
+ ASSERT_EQ(expectedValues[n], opsPerThread);
+ }
+
+ // Wait on all of the threads. Otherwise we can exit and clean up
+ // RunInThreadData before the last thread exits, while it is still holding
+ // the RunInThreadData's mutex.
+ for (deque< std::shared_ptr<Thread> >::const_iterator it = threads.begin();
+ it != threads.end();
+ ++it) {
+ (*it)->join();
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for runInLoop()
+///////////////////////////////////////////////////////////////////////////
+
+class CountedLoopCallback : public EventBase::LoopCallback {
+ public:
+ CountedLoopCallback(EventBase* eventBase,
+ unsigned int count,
+ std::function<void()> action =
+ std::function<void()>())
+ : eventBase_(eventBase)
+ , count_(count)
+ , action_(action) {}
+
+ virtual void runLoopCallback() noexcept {
+ --count_;
+ if (count_ > 0) {
+ eventBase_->runInLoop(this);
+ } else if (action_) {
+ action_();
+ }
+ }
+
+ unsigned int getCount() const {
+ return count_;
+ }
+
+ private:
+ EventBase* eventBase_;
+ unsigned int count_;
+ std::function<void()> action_;
+};
+
+// Test that EventBase::loop() doesn't exit while there are
+// still LoopCallbacks remaining to be invoked.
+TEST(EventBaseTest, RepeatedRunInLoop) {
+ EventBase eventBase;
+
+ CountedLoopCallback c(&eventBase, 10);
+ eventBase.runInLoop(&c);
+ // The callback shouldn't have run immediately
+ ASSERT_EQ(c.getCount(), 10);
+ eventBase.loop();
+
+ // loop() should loop until the CountedLoopCallback stops
+ // re-installing itself.
+ ASSERT_EQ(c.getCount(), 0);
+}
+
+// Test runInLoop() calls with terminateLoopSoon()
+TEST(EventBaseTest, RunInLoopStopLoop) {
+ EventBase eventBase;
+
+ CountedLoopCallback c1(&eventBase, 20);
+ CountedLoopCallback c2(&eventBase, 10,
+ std::bind(&EventBase::terminateLoopSoon, &eventBase));
+
+ eventBase.runInLoop(&c1);
+ eventBase.runInLoop(&c2);
+ ASSERT_EQ(c1.getCount(), 20);
+ ASSERT_EQ(c2.getCount(), 10);
+
+ eventBase.loopForever();
+
+ // c2 should have stopped the loop after 10 iterations
+ ASSERT_EQ(c2.getCount(), 0);
+
+ // We allow the EventBase to run the loop callbacks in whatever order it
+ // chooses. We'll accept c1's count being either 10 (if the loop terminated
+ // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
+ // before c1 ran).
+ //
+ // (With the current code, c1 will always run 10 times, but we don't consider
+ // this a hard API requirement.)
+ ASSERT_GE(c1.getCount(), 10);
+ ASSERT_LE(c1.getCount(), 11);
+}
+
+// Test cancelling runInLoop() callbacks
+TEST(EventBaseTest, CancelRunInLoop) {
+ EventBase eventBase;
+
+ CountedLoopCallback c1(&eventBase, 20);
+ CountedLoopCallback c2(&eventBase, 20);
+ CountedLoopCallback c3(&eventBase, 20);
+
+ std::function<void()> cancelC1Action =
+ std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
+ std::function<void()> cancelC2Action =
+ std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
+
+ CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
+ CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
+
+ // Install cancelC1 after c1
+ eventBase.runInLoop(&c1);
+ eventBase.runInLoop(&cancelC1);
+
+ // Install cancelC2 before c2
+ eventBase.runInLoop(&cancelC2);
+ eventBase.runInLoop(&c2);
+
+ // Install c3
+ eventBase.runInLoop(&c3);
+
+ ASSERT_EQ(c1.getCount(), 20);
+ ASSERT_EQ(c2.getCount(), 20);
+ ASSERT_EQ(c3.getCount(), 20);
+ ASSERT_EQ(cancelC1.getCount(), 10);
+ ASSERT_EQ(cancelC2.getCount(), 10);
+
+ // Run the loop
+ eventBase.loop();
+
+ // cancelC1 and cancelC3 should have both fired after 10 iterations and
+ // stopped re-installing themselves
+ ASSERT_EQ(cancelC1.getCount(), 0);
+ ASSERT_EQ(cancelC2.getCount(), 0);
+ // c3 should have continued on for the full 20 iterations
+ ASSERT_EQ(c3.getCount(), 0);
+
+ // c1 and c2 should have both been cancelled on the 10th iteration.
+ //
+ // Callbacks are always run in the order they are installed,
+ // so c1 should have fired 10 times, and been canceled after it ran on the
+ // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
+ // have run before it on the 10th iteration, and cancelled it before it
+ // fired.
+ ASSERT_EQ(c1.getCount(), 10);
+ ASSERT_EQ(c2.getCount(), 11);
+}
+
+class TerminateTestCallback : public EventBase::LoopCallback,
+ public EventHandler {
+ public:
+ TerminateTestCallback(EventBase* eventBase, int fd)
+ : EventHandler(eventBase, fd),
+ eventBase_(eventBase),
+ loopInvocations_(0),
+ maxLoopInvocations_(0),
+ eventInvocations_(0),
+ maxEventInvocations_(0) {}
+
+ void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
+ loopInvocations_ = 0;
+ maxLoopInvocations_ = maxLoopInvocations;
+ eventInvocations_ = 0;
+ maxEventInvocations_ = maxEventInvocations;
+
+ cancelLoopCallback();
+ unregisterHandler();
+ }
+
+ virtual void handlerReady(uint16_t events) noexcept {
+ // We didn't register with PERSIST, so we will have been automatically
+ // unregistered already.
+ ASSERT_FALSE(isHandlerRegistered());
+
+ ++eventInvocations_;
+ if (eventInvocations_ >= maxEventInvocations_) {
+ return;
+ }
+
+ eventBase_->runInLoop(this);
+ }
+ virtual void runLoopCallback() noexcept {
+ ++loopInvocations_;
+ if (loopInvocations_ >= maxLoopInvocations_) {
+ return;
+ }
+
+ registerHandler(READ);
+ }
+
+ uint32_t getLoopInvocations() const {
+ return loopInvocations_;
+ }
+ uint32_t getEventInvocations() const {
+ return eventInvocations_;
+ }
+
+ private:
+ EventBase* eventBase_;
+ uint32_t loopInvocations_;
+ uint32_t maxLoopInvocations_;
+ uint32_t eventInvocations_;
+ uint32_t maxEventInvocations_;
+};
+
+/**
+ * Test that EventBase::loop() correctly detects when there are no more events
+ * left to run.
+ *
+ * This uses a single callback, which alternates registering itself as a loop
+ * callback versus a EventHandler callback. This exercises a regression where
+ * EventBase::loop() incorrectly exited if there were no more fd handlers
+ * registered, but a loop callback installed a new fd handler.
+ */
+TEST(EventBaseTest, LoopTermination) {
+ EventBase eventBase;
+
+ // Open a pipe and close the write end,
+ // so the read endpoint will be readable
+ int pipeFds[2];
+ int rc = pipe(pipeFds);
+ ASSERT_EQ(rc, 0);
+ close(pipeFds[1]);
+ TerminateTestCallback callback(&eventBase, pipeFds[0]);
+
+ // Test once where the callback will exit after a loop callback
+ callback.reset(10, 100);
+ eventBase.runInLoop(&callback);
+ eventBase.loop();
+ ASSERT_EQ(callback.getLoopInvocations(), 10);
+ ASSERT_EQ(callback.getEventInvocations(), 9);
+
+ // Test once where the callback will exit after an fd event callback
+ callback.reset(100, 7);
+ eventBase.runInLoop(&callback);
+ eventBase.loop();
+ ASSERT_EQ(callback.getLoopInvocations(), 7);
+ ASSERT_EQ(callback.getEventInvocations(), 7);
+
+ close(pipeFds[0]);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for latency calculations
+///////////////////////////////////////////////////////////////////////////
+
+class IdleTimeTimeoutSeries : public TAsyncTimeout {
+
+ public:
+
+ explicit IdleTimeTimeoutSeries(EventBase *base,
+ std::deque<std::uint64_t>& timeout) :
+ TAsyncTimeout(base),
+ timeouts_(0),
+ timeout_(timeout) {
+ scheduleTimeout(1);
+ }
+
+ virtual ~IdleTimeTimeoutSeries() {}
+
+ void timeoutExpired() noexcept {
+ ++timeouts_;
+
+ if(timeout_.empty()){
+ cancelTimeout();
+ } else {
+ uint64_t sleepTime = timeout_.front();
+ timeout_.pop_front();
+ if (sleepTime) {
+ usleep(sleepTime);
+ }
+ scheduleTimeout(1);
+ }
+ }
+
+ int getTimeouts() const {
+ return timeouts_;
+ }
+
+ private:
+ int timeouts_;
+ std::deque<uint64_t>& timeout_;
+};
+
+/**
+ * Verify that idle time is correctly accounted for when decaying our loop
+ * time.
+ *
+ * This works by creating a high loop time (via usleep), expecting a latency
+ * callback with known value, and then scheduling a timeout for later. This
+ * later timeout is far enough in the future that the idle time should have
+ * caused the loop time to decay.
+ */
+TEST(EventBaseTest, IdleTime) {
+ EventBase eventBase;
+ eventBase.setLoadAvgMsec(1000);
+ eventBase.resetLoadAvg(5900.0);
+ std::deque<uint64_t> timeouts0(4, 8080);
+ timeouts0.push_front(8000);
+ timeouts0.push_back(14000);
+ IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
+ std::deque<uint64_t> timeouts(20, 20);
+ std::unique_ptr<IdleTimeTimeoutSeries> tos;
+
+ int latencyCallbacks = 0;
+ eventBase.setMaxLatency(6000, [&]() {
+ ++latencyCallbacks;
+
+ switch (latencyCallbacks) {
+ case 1:
+ ASSERT_EQ(6, tos0.getTimeouts());
+ ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
+ ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
+ tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
+ break;
+
+ default:
+ FAIL() << "Unexpected latency callback";
+ break;
+ }
+ });
+
+ // Kick things off with an "immedite" timeout
+ tos0.scheduleTimeout(1);
+
+ eventBase.loop();
+
+ ASSERT_EQ(1, latencyCallbacks);
+ ASSERT_EQ(7, tos0.getTimeouts());
+ ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
+ ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
+ ASSERT_EQ(21, tos->getTimeouts());
+}
+
+/**
+ * Test that thisLoop functionality works with terminateLoopSoon
+ */
+TEST(EventBaseTest, ThisLoop) {
+ EventBase eb;
+ bool runInLoop = false;
+ bool runThisLoop = false;
+
+ eb.runInLoop([&](){
+ eb.terminateLoopSoon();
+ eb.runInLoop([&]() {
+ runInLoop = true;
+ });
+ eb.runInLoop([&]() {
+ runThisLoop = true;
+ }, true);
+ }, true);
+ eb.loopForever();
+
+ // Should not work
+ ASSERT_FALSE(runInLoop);
+ // Should work with thisLoop
+ ASSERT_TRUE(runThisLoop);
+}
+
+TEST(EventBaseTest, EventBaseThreadLoop) {
+ EventBase base;
+ bool ran = false;
+
+ base.runInEventBaseThread([&](){
+ ran = true;
+ });
+ base.loop();
+
+ ASSERT_EQ(true, ran);
+}
+
+TEST(EventBaseTest, EventBaseThreadName) {
+ EventBase base;
+ base.setName("foo");
+ base.loop();
+
+#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
+ char name[16];
+ pthread_getname_np(pthread_self(), name, 16);
+ ASSERT_EQ(0, strcmp("foo", name));
+#endif
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <folly/io/async/HHWheelTimer.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/test/UndelayedDestruction.h>
+#include <folly/io/async/test/Util.h>
+
+#include <gtest/gtest.h>
+#include <vector>
+
+using namespace folly;
+using std::chrono::milliseconds;
+
+typedef UndelayedDestruction<HHWheelTimer> StackWheelTimer;
+
+class TestTimeout : public HHWheelTimer::Callback {
+ public:
+ TestTimeout() {}
+ TestTimeout(HHWheelTimer* t, milliseconds timeout) {
+ t->scheduleTimeout(this, timeout);
+ }
+ virtual void timeoutExpired() noexcept {
+ timestamps.push_back(TimePoint());
+ if (fn) {
+ fn();
+ }
+ }
+
+ std::deque<TimePoint> timestamps;
+ std::function<void()> fn;
+};
+
+/*
+ * Test firing some simple timeouts that are fired once and never rescheduled
+ */
+TEST(HHWheelTimerTest, FireOnce) {
+ EventBase eventBase;
+ StackWheelTimer t(&eventBase, milliseconds(1));
+
+ const HHWheelTimer::Callback* nullCallback = nullptr;
+
+ TestTimeout t1;
+ TestTimeout t2;
+ TestTimeout t3;
+
+ ASSERT_EQ(t.count(), 0);
+
+ t.scheduleTimeout(&t1, milliseconds(5));
+ t.scheduleTimeout(&t2, milliseconds(5));
+ // Verify scheduling it twice cancels, then schedules.
+ // Should only get one callback.
+ t.scheduleTimeout(&t2, milliseconds(5));
+ t.scheduleTimeout(&t3, milliseconds(10));
+
+ ASSERT_EQ(t.count(), 3);
+
+ TimePoint start;
+ eventBase.loop();
+ TimePoint end;
+
+ ASSERT_EQ(t1.timestamps.size(), 1);
+ ASSERT_EQ(t2.timestamps.size(), 1);
+ ASSERT_EQ(t3.timestamps.size(), 1);
+
+ ASSERT_EQ(t.count(), 0);
+
+ T_CHECK_TIMEOUT(start, t1.timestamps[0], 5);
+ T_CHECK_TIMEOUT(start, t2.timestamps[0], 5);
+ T_CHECK_TIMEOUT(start, t3.timestamps[0], 10);
+ T_CHECK_TIMEOUT(start, end, 10);
+}
+
+/*
+ * Test cancelling a timeout when it is scheduled to be fired right away.
+ */
+
+TEST(HHWheelTimerTest, CancelTimeout) {
+ EventBase eventBase;
+ StackWheelTimer t(&eventBase, milliseconds(1));
+
+ // Create several timeouts that will all fire in 5ms.
+ TestTimeout t5_1(&t, milliseconds(5));
+ TestTimeout t5_2(&t, milliseconds(5));
+ TestTimeout t5_3(&t, milliseconds(5));
+ TestTimeout t5_4(&t, milliseconds(5));
+ TestTimeout t5_5(&t, milliseconds(5));
+
+ // Also create a few timeouts to fire in 10ms
+ TestTimeout t10_1(&t, milliseconds(10));
+ TestTimeout t10_2(&t, milliseconds(10));
+ TestTimeout t10_3(&t, milliseconds(10));
+
+ TestTimeout t20_1(&t, milliseconds(20));
+ TestTimeout t20_2(&t, milliseconds(20));
+
+ // Have t5_1 cancel t5_2 and t5_4.
+ //
+ // Cancelling t5_2 will test cancelling a timeout that is at the head of the
+ // list and ready to be fired.
+ //
+ // Cancelling t5_4 will test cancelling a timeout in the middle of the list
+ t5_1.fn = [&] {
+ t5_2.cancelTimeout();
+ t5_4.cancelTimeout();
+ };
+
+ // Have t5_3 cancel t5_5.
+ // This will test cancelling the last remaining timeout.
+ //
+ // Then have t5_3 reschedule itself.
+ t5_3.fn = [&] {
+ t5_5.cancelTimeout();
+ // Reset our function so we won't continually reschedule ourself
+ std::function<void()> fnDtorGuard;
+ t5_3.fn.swap(fnDtorGuard);
+ t.scheduleTimeout(&t5_3, milliseconds(5));
+
+ // Also test cancelling timeouts in another timeset that isn't ready to
+ // fire yet.
+ //
+ // Cancel the middle timeout in ts10.
+ t10_2.cancelTimeout();
+ // Cancel both the timeouts in ts20.
+ t20_1.cancelTimeout();
+ t20_2.cancelTimeout();
+ };
+
+ TimePoint start;
+ eventBase.loop();
+ TimePoint end;
+
+ ASSERT_EQ(t5_1.timestamps.size(), 1);
+ T_CHECK_TIMEOUT(start, t5_1.timestamps[0], 5);
+
+ ASSERT_EQ(t5_3.timestamps.size(), 2);
+ T_CHECK_TIMEOUT(start, t5_3.timestamps[0], 5);
+ T_CHECK_TIMEOUT(t5_3.timestamps[0], t5_3.timestamps[1], 5);
+
+ ASSERT_EQ(t10_1.timestamps.size(), 1);
+ T_CHECK_TIMEOUT(start, t10_1.timestamps[0], 10);
+ ASSERT_EQ(t10_3.timestamps.size(), 1);
+ T_CHECK_TIMEOUT(start, t10_3.timestamps[0], 10);
+
+ // Cancelled timeouts
+ ASSERT_EQ(t5_2.timestamps.size(), 0);
+ ASSERT_EQ(t5_4.timestamps.size(), 0);
+ ASSERT_EQ(t5_5.timestamps.size(), 0);
+ ASSERT_EQ(t10_2.timestamps.size(), 0);
+ ASSERT_EQ(t20_1.timestamps.size(), 0);
+ ASSERT_EQ(t20_2.timestamps.size(), 0);
+
+ T_CHECK_TIMEOUT(start, end, 10);
+}
+
+/*
+ * Test destroying a HHWheelTimer with timeouts outstanding
+ */
+
+TEST(HHWheelTimerTest, DestroyTimeoutSet) {
+ EventBase eventBase;
+
+ HHWheelTimer::UniquePtr t(
+ new HHWheelTimer(&eventBase, milliseconds(1)));
+
+ TestTimeout t5_1(t.get(), milliseconds(5));
+ TestTimeout t5_2(t.get(), milliseconds(5));
+ TestTimeout t5_3(t.get(), milliseconds(5));
+
+ TestTimeout t10_1(t.get(), milliseconds(10));
+ TestTimeout t10_2(t.get(), milliseconds(10));
+
+ // Have t5_2 destroy t
+ // Note that this will call destroy() inside t's timeoutExpired()
+ // method.
+ t5_2.fn = [&] {
+ t5_3.cancelTimeout();
+ t5_1.cancelTimeout();
+ t10_1.cancelTimeout();
+ t10_2.cancelTimeout();
+ t.reset();};
+
+ TimePoint start;
+ eventBase.loop();
+ TimePoint end;
+
+ ASSERT_EQ(t5_1.timestamps.size(), 1);
+ T_CHECK_TIMEOUT(start, t5_1.timestamps[0], 5);
+ ASSERT_EQ(t5_2.timestamps.size(), 1);
+ T_CHECK_TIMEOUT(start, t5_2.timestamps[0], 5);
+
+ ASSERT_EQ(t5_3.timestamps.size(), 0);
+ ASSERT_EQ(t10_1.timestamps.size(), 0);
+ ASSERT_EQ(t10_2.timestamps.size(), 0);
+
+ T_CHECK_TIMEOUT(start, end, 5);
+}
+
+/*
+ * Test the tick interval parameter
+ */
+TEST(HHWheelTimerTest, AtMostEveryN) {
+ EventBase eventBase;
+
+ // Create a timeout set with a 10ms interval, to fire no more than once
+ // every 3ms.
+ milliseconds interval(25);
+ milliseconds atMostEveryN(6);
+ StackWheelTimer t(&eventBase, atMostEveryN);
+ t.setCatchupEveryN(70);
+
+ // Create 60 timeouts to be added to ts10 at 1ms intervals.
+ uint32_t numTimeouts = 60;
+ std::vector<TestTimeout> timeouts(numTimeouts);
+
+ // Create a scheduler timeout to add the timeouts 1ms apart.
+ uint32_t index = 0;
+ StackWheelTimer ts1(&eventBase, milliseconds(1));
+ TestTimeout scheduler(&ts1, milliseconds(1));
+ scheduler.fn = [&] {
+ if (index >= numTimeouts) {
+ return;
+ }
+ // Call timeoutExpired() on the timeout so it will record a timestamp.
+ // This is done only so we can record when we scheduled the timeout.
+ // This way if ts1 starts to fall behind a little over time we will still
+ // be comparing the ts10 timeouts to when they were first scheduled (rather
+ // than when we intended to schedule them). The scheduler may fall behind
+ // eventually since we don't really schedule it once every millisecond.
+ // Each time it finishes we schedule it for 1 millisecond in the future.
+ // The amount of time it takes to run, and any delays it encounters
+ // getting scheduled may eventually add up over time.
+ timeouts[index].timeoutExpired();
+
+ // Schedule the new timeout
+ t.scheduleTimeout(&timeouts[index], interval);
+ // Reschedule ourself
+ ts1.scheduleTimeout(&scheduler, milliseconds(1));
+ ++index;
+ };
+
+ // Go ahead and schedule the first timeout now.
+ //scheduler.fn();
+
+ TimePoint start;
+ eventBase.loop();
+ TimePoint end;
+
+ // We scheduled timeouts 1ms apart, when the HHWheelTimer is only allowed
+ // to wake up at most once every 3ms. It will therefore wake up every 3ms
+ // and fire groups of approximately 3 timeouts at a time.
+ //
+ // This is "approximately 3" since it may get slightly behind and fire 4 in
+ // one interval, etc. T_CHECK_TIMEOUT normally allows a few milliseconds of
+ // tolerance. We have to add the same into our checking algorithm here.
+ for (uint32_t idx = 0; idx < numTimeouts; ++idx) {
+ ASSERT_EQ(timeouts[idx].timestamps.size(), 2);
+
+ TimePoint scheduledTime(timeouts[idx].timestamps[0]);
+ TimePoint firedTime(timeouts[idx].timestamps[1]);
+
+ // Assert that the timeout fired at roughly the right time.
+ // T_CHECK_TIMEOUT() normally has a tolerance of 5ms. Allow an additional
+ // atMostEveryN.
+ milliseconds tolerance = milliseconds(5) + interval;
+ T_CHECK_TIMEOUT(scheduledTime, firedTime, atMostEveryN.count(),
+ tolerance.count());
+
+ // Assert that the difference between the previous timeout and now was
+ // either very small (fired in the same event loop), or larger than
+ // atMostEveryN.
+ if (idx == 0) {
+ // no previous value
+ continue;
+ }
+ TimePoint prev(timeouts[idx - 1].timestamps[1]);
+
+ milliseconds delta((firedTime.getTimeStart() - prev.getTimeEnd()) -
+ (firedTime.getTimeWaiting() - prev.getTimeWaiting()));
+ if (delta > milliseconds(1)) {
+ T_CHECK_TIMEOUT(prev, firedTime, atMostEveryN.count()); }
+ }
+}
+
+/*
+ * Test an event loop that is blocking
+ */
+
+TEST(HHWheelTimerTest, SlowLoop) {
+ EventBase eventBase;
+ StackWheelTimer t(&eventBase, milliseconds(1));
+
+ TestTimeout t1;
+ TestTimeout t2;
+
+ ASSERT_EQ(t.count(), 0);
+
+ eventBase.runInLoop([](){usleep(10000);});
+ t.scheduleTimeout(&t1, milliseconds(5));
+
+ ASSERT_EQ(t.count(), 1);
+
+ TimePoint start;
+ eventBase.loop();
+ TimePoint end;
+
+ ASSERT_EQ(t1.timestamps.size(), 1);
+ ASSERT_EQ(t.count(), 0);
+
+ // Check that the timeout was delayed by sleep
+ T_CHECK_TIMEOUT(start, t1.timestamps[0], 15, 1);
+ T_CHECK_TIMEOUT(start, end, 15, 1);
+
+ // Try it again, this time with catchup timing every loop
+ t.setCatchupEveryN(1);
+
+ eventBase.runInLoop([](){usleep(10000);});
+ t.scheduleTimeout(&t2, milliseconds(5));
+
+ ASSERT_EQ(t.count(), 1);
+
+ TimePoint start2;
+ eventBase.loop();
+ TimePoint end2;
+
+ ASSERT_EQ(t2.timestamps.size(), 1);
+ ASSERT_EQ(t.count(), 0);
+
+ // Check that the timeout was NOT delayed by sleep
+ T_CHECK_TIMEOUT(start2, t2.timestamps[0], 10, 1);
+ T_CHECK_TIMEOUT(start2, end2, 10, 1);
+}