From: Daniel Sommermann Date: Thu, 13 Nov 2014 18:49:31 +0000 (-0800) Subject: Move some tests to folly X-Git-Tag: v0.22.0~164 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=5e80856a1c8a52fc32a9d2eb776aaace0ca61182;p=folly.git Move some tests to folly Summary: These lived in fbthrift previously, but they should move with the main code too. Test Plan: unit tests Reviewed By: davejwatson@fb.com Subscribers: doug, alandau, bmatheny, njormrod, mshneer, folly-diffs@ FB internal diff: D1683229 Signature: t1:1683229:1416010730:36fb7e4c9916ae7a9b5972cd476f82014c5f4c78 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 66f62e98..4aca7068 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -167,6 +167,9 @@ nobase_follyinclude_HEADERS = \ io/async/Request.h \ io/async/SSLContext.h \ io/async/TimeoutManager.h \ + io/async/test/TimeUtil.h \ + io/async/test/UndelayedDestruction.h \ + io/async/test/Util.h \ json.h \ Lazy.h \ LifoSem.h \ @@ -296,6 +299,7 @@ libfolly_la_SOURCES = \ io/async/Request.cpp \ io/async/SSLContext.cpp \ io/async/HHWheelTimer.cpp \ + io/async/test/TimeUtil.cpp \ json.cpp \ detail/MemoryIdler.cpp \ MacAddress.cpp \ diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp new file mode 100644 index 00000000..ddf70a06 --- /dev/null +++ b/folly/io/async/test/EventBaseTest.cpp @@ -0,0 +1,1542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include + +#include +#include +#include + +using std::deque; +using std::pair; +using std::vector; +using std::make_pair; +using std::cerr; +using std::endl; + +using namespace folly; + +/////////////////////////////////////////////////////////////////////////// +// Tests for read and write events +/////////////////////////////////////////////////////////////////////////// + +enum { BUF_SIZE = 4096 }; + +ssize_t writeToFD(int fd, size_t length) { + // write an arbitrary amount of data to the fd + char buf[length]; + memset(buf, 'a', sizeof(buf)); + ssize_t rc = write(fd, buf, sizeof(buf)); + CHECK_EQ(rc, length); + return rc; +} + +size_t writeUntilFull(int fd) { + // Write to the fd until EAGAIN is returned + size_t bytesWritten = 0; + char buf[BUF_SIZE]; + memset(buf, 'a', sizeof(buf)); + while (true) { + ssize_t rc = write(fd, buf, sizeof(buf)); + if (rc < 0) { + CHECK_EQ(errno, EAGAIN); + break; + } else { + bytesWritten += rc; + } + } + return bytesWritten; +} + +ssize_t readFromFD(int fd, size_t length) { + // write an arbitrary amount of data to the fd + char buf[length]; + return read(fd, buf, sizeof(buf)); +} + +size_t readUntilEmpty(int fd) { + // Read from the fd until EAGAIN is returned + char buf[BUF_SIZE]; + size_t bytesRead = 0; + while (true) { + int rc = read(fd, buf, sizeof(buf)); + if (rc == 0) { + CHECK(false) << "unexpected EOF"; + } else if (rc < 0) { + CHECK_EQ(errno, EAGAIN); + break; + } else { + bytesRead += rc; + } + } + return bytesRead; +} + +void checkReadUntilEmpty(int fd, size_t expectedLength) { + ASSERT_EQ(readUntilEmpty(fd), expectedLength); +} + +struct ScheduledEvent { + int milliseconds; + uint16_t events; + size_t length; + ssize_t result; + + void perform(int fd) { + if (events & EventHandler::READ) { + if (length == 0) { + result = readUntilEmpty(fd); + } else { + result = readFromFD(fd, length); + } + } + if (events & EventHandler::WRITE) { + if (length == 0) { + result = writeUntilFull(fd); + } else { + result = writeToFD(fd, length); + } + } + } +}; + +void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) { + for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) { + eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd), + ev->milliseconds); + } +} + +class TestHandler : public EventHandler { + public: + TestHandler(EventBase* eventBase, int fd) + : EventHandler(eventBase, fd), fd_(fd) {} + + virtual void handlerReady(uint16_t events) noexcept { + ssize_t bytesRead = 0; + ssize_t bytesWritten = 0; + if (events & READ) { + // Read all available data, so EventBase will stop calling us + // until new data becomes available + bytesRead = readUntilEmpty(fd_); + } + if (events & WRITE) { + // Write until the pipe buffer is full, so EventBase will stop calling + // us until the other end has read some data + bytesWritten = writeUntilFull(fd_); + } + + log.push_back(EventRecord(events, bytesRead, bytesWritten)); + } + + struct EventRecord { + EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten) + : events(events) + , timestamp() + , bytesRead(bytesRead) + , bytesWritten(bytesWritten) {} + + uint16_t events; + TimePoint timestamp; + ssize_t bytesRead; + ssize_t bytesWritten; + }; + + deque log; + + private: + int fd_; +}; + +/** + * Test a READ event + */ +TEST(EventBaseTest, ReadEvent) { + EventBase eb; + SocketPair sp; + + // Register for read events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ); + + // Register timeouts to perform two write events + ScheduledEvent events[] = { + { 10, EventHandler::WRITE, 2345 }, + { 160, EventHandler::WRITE, 99 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // Since we didn't use the EventHandler::PERSIST flag, the handler should + // have received the first read, then unregistered itself. Check that only + // the first chunk of data was received. + ASSERT_EQ(handler.log.size(), 1); + ASSERT_EQ(handler.log[0].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90); + ASSERT_EQ(handler.log[0].bytesRead, events[0].length); + ASSERT_EQ(handler.log[0].bytesWritten, 0); + T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30); + + // Make sure the second chunk of data is still waiting to be read. + size_t bytesRemaining = readUntilEmpty(sp[0]); + ASSERT_EQ(bytesRemaining, events[1].length); +} + +/** + * Test (READ | PERSIST) + */ +TEST(EventBaseTest, ReadPersist) { + EventBase eb; + SocketPair sp; + + // Register for read events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ | EventHandler::PERSIST); + + // Register several timeouts to perform writes + ScheduledEvent events[] = { + { 10, EventHandler::WRITE, 1024 }, + { 20, EventHandler::WRITE, 2211 }, + { 30, EventHandler::WRITE, 4096 }, + { 100, EventHandler::WRITE, 100 }, + { 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler after the third write + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // The handler should have received the first 3 events, + // then been unregistered after that. + ASSERT_EQ(handler.log.size(), 3); + for (int n = 0; n < 3; ++n) { + ASSERT_EQ(handler.log[n].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds); + ASSERT_EQ(handler.log[n].bytesRead, events[n].length); + ASSERT_EQ(handler.log[n].bytesWritten, 0); + } + T_CHECK_TIMEOUT(start, end, events[3].milliseconds); + + // Make sure the data from the last write is still waiting to be read + size_t bytesRemaining = readUntilEmpty(sp[0]); + ASSERT_EQ(bytesRemaining, events[3].length); +} + +/** + * Test registering for READ when the socket is immediately readable + */ +TEST(EventBaseTest, ReadImmediate) { + EventBase eb; + SocketPair sp; + + // Write some data to the socket so the other end will + // be immediately readable + size_t dataLength = 1234; + writeToFD(sp[1], dataLength); + + // Register for read events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ | EventHandler::PERSIST); + + // Register a timeout to perform another write + ScheduledEvent events[] = { + { 10, EventHandler::WRITE, 2345 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + ASSERT_EQ(handler.log.size(), 2); + + // There should have been 1 event for immediate readability + ASSERT_EQ(handler.log[0].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0); + ASSERT_EQ(handler.log[0].bytesRead, dataLength); + ASSERT_EQ(handler.log[0].bytesWritten, 0); + + // There should be another event after the timeout wrote more data + ASSERT_EQ(handler.log[1].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[1].bytesRead, events[0].length); + ASSERT_EQ(handler.log[1].bytesWritten, 0); + + T_CHECK_TIMEOUT(start, end, 20); +} + +/** + * Test a WRITE event + */ +TEST(EventBaseTest, WriteEvent) { + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t initialBytesWritten = writeUntilFull(sp[0]); + + // Register for write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::WRITE); + + // Register timeouts to perform two reads + ScheduledEvent events[] = { + { 10, EventHandler::READ, 0 }, + { 60, EventHandler::READ, 0 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // Since we didn't use the EventHandler::PERSIST flag, the handler should + // have only been able to write once, then unregistered itself. + ASSERT_EQ(handler.log.size(), 1); + ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[0].bytesRead, 0); + ASSERT_GT(handler.log[0].bytesWritten, 0); + T_CHECK_TIMEOUT(start, end, events[1].milliseconds); + + ASSERT_EQ(events[0].result, initialBytesWritten); + ASSERT_EQ(events[1].result, handler.log[0].bytesWritten); +} + +/** + * Test (WRITE | PERSIST) + */ +TEST(EventBaseTest, WritePersist) { + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t initialBytesWritten = writeUntilFull(sp[0]); + + // Register for write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST); + + // Register several timeouts to read from the socket at several intervals + ScheduledEvent events[] = { + { 10, EventHandler::READ, 0 }, + { 40, EventHandler::READ, 0 }, + { 70, EventHandler::READ, 0 }, + { 100, EventHandler::READ, 0 }, + { 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler after the third read + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // The handler should have received the first 3 events, + // then been unregistered after that. + ASSERT_EQ(handler.log.size(), 3); + ASSERT_EQ(events[0].result, initialBytesWritten); + for (int n = 0; n < 3; ++n) { + ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds); + ASSERT_EQ(handler.log[n].bytesRead, 0); + ASSERT_GT(handler.log[n].bytesWritten, 0); + ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result); + } + T_CHECK_TIMEOUT(start, end, events[3].milliseconds); +} + +/** + * Test registering for WRITE when the socket is immediately writable + */ +TEST(EventBaseTest, WriteImmediate) { + EventBase eb; + SocketPair sp; + + // Register for write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST); + + // Register a timeout to perform a read + ScheduledEvent events[] = { + { 10, EventHandler::READ, 0 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler + int64_t unregisterTimeout = 40; + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), + unregisterTimeout); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + ASSERT_EQ(handler.log.size(), 2); + + // Since the socket buffer was initially empty, + // there should have been 1 event for immediate writability + ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0); + ASSERT_EQ(handler.log[0].bytesRead, 0); + ASSERT_GT(handler.log[0].bytesWritten, 0); + + // There should be another event after the timeout wrote more data + ASSERT_EQ(handler.log[1].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[1].bytesRead, 0); + ASSERT_GT(handler.log[1].bytesWritten, 0); + + T_CHECK_TIMEOUT(start, end, unregisterTimeout); +} + +/** + * Test (READ | WRITE) when the socket becomes readable first + */ +TEST(EventBaseTest, ReadWrite) { + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t sock0WriteLength = writeUntilFull(sp[0]); + + // Register for read and write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ_WRITE); + + // Register timeouts to perform a write then a read. + ScheduledEvent events[] = { + { 10, EventHandler::WRITE, 2345 }, + { 40, EventHandler::READ, 0 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // Since we didn't use the EventHandler::PERSIST flag, the handler should + // have only noticed readability, then unregistered itself. Check that only + // one event was logged. + ASSERT_EQ(handler.log.size(), 1); + ASSERT_EQ(handler.log[0].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[0].bytesRead, events[0].length); + ASSERT_EQ(handler.log[0].bytesWritten, 0); + ASSERT_EQ(events[1].result, sock0WriteLength); + T_CHECK_TIMEOUT(start, end, events[1].milliseconds); +} + +/** + * Test (READ | WRITE) when the socket becomes writable first + */ +TEST(EventBaseTest, WriteRead) { + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t sock0WriteLength = writeUntilFull(sp[0]); + + // Register for read and write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ_WRITE); + + // Register timeouts to perform a read then a write. + size_t sock1WriteLength = 2345; + ScheduledEvent events[] = { + { 10, EventHandler::READ, 0 }, + { 40, EventHandler::WRITE, sock1WriteLength }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // Since we didn't use the EventHandler::PERSIST flag, the handler should + // have only noticed writability, then unregistered itself. Check that only + // one event was logged. + ASSERT_EQ(handler.log.size(), 1); + ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[0].bytesRead, 0); + ASSERT_GT(handler.log[0].bytesWritten, 0); + ASSERT_EQ(events[0].result, sock0WriteLength); + ASSERT_EQ(events[1].result, sock1WriteLength); + T_CHECK_TIMEOUT(start, end, events[1].milliseconds); + + // Make sure the written data is still waiting to be read. + size_t bytesRemaining = readUntilEmpty(sp[0]); + ASSERT_EQ(bytesRemaining, events[1].length); +} + +/** + * Test (READ | WRITE) when the socket becomes readable and writable + * at the same time. + */ +TEST(EventBaseTest, ReadWriteSimultaneous) { + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t sock0WriteLength = writeUntilFull(sp[0]); + + // Register for read and write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ_WRITE); + + // Register a timeout to perform a read and write together + ScheduledEvent events[] = { + { 10, EventHandler::READ | EventHandler::WRITE, 0 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // It's not strictly required that the EventBase register us about both + // events in the same call. So, it's possible that if the EventBase + // implementation changes this test could start failing, and it wouldn't be + // considered breaking the API. However for now it's nice to exercise this + // code path. + ASSERT_EQ(handler.log.size(), 1); + ASSERT_EQ(handler.log[0].events, + EventHandler::READ | EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength); + ASSERT_GT(handler.log[0].bytesWritten, 0); + T_CHECK_TIMEOUT(start, end, events[0].milliseconds); +} + +/** + * Test (READ | WRITE | PERSIST) + */ +TEST(EventBaseTest, ReadWritePersist) { + EventBase eb; + SocketPair sp; + + // Register for read and write events + TestHandler handler(&eb, sp[0]); + handler.registerHandler(EventHandler::READ | EventHandler::WRITE | + EventHandler::PERSIST); + + // Register timeouts to perform several reads and writes + ScheduledEvent events[] = { + { 10, EventHandler::WRITE, 2345 }, + { 20, EventHandler::READ, 0 }, + { 35, EventHandler::WRITE, 200 }, + { 45, EventHandler::WRITE, 15 }, + { 55, EventHandler::READ, 0 }, + { 120, EventHandler::WRITE, 2345 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + ASSERT_EQ(handler.log.size(), 6); + + // Since we didn't fill up the write buffer immediately, there should + // be an immediate event for writability. + ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0); + ASSERT_EQ(handler.log[0].bytesRead, 0); + ASSERT_GT(handler.log[0].bytesWritten, 0); + + // Events 1 through 5 should correspond to the scheduled events + for (int n = 1; n < 6; ++n) { + ScheduledEvent* event = &events[n - 1]; + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds); + if (event->events == EventHandler::READ) { + ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); + ASSERT_EQ(handler.log[n].bytesRead, 0); + ASSERT_GT(handler.log[n].bytesWritten, 0); + } else { + ASSERT_EQ(handler.log[n].events, EventHandler::READ); + ASSERT_EQ(handler.log[n].bytesRead, event->length); + ASSERT_EQ(handler.log[n].bytesWritten, 0); + } + } + + // The timeout should have unregistered the handler before the last write. + // Make sure that data is still waiting to be read + size_t bytesRemaining = readUntilEmpty(sp[0]); + ASSERT_EQ(bytesRemaining, events[5].length); +} + + +class PartialReadHandler : public TestHandler { + public: + PartialReadHandler(EventBase* eventBase, int fd, size_t readLength) + : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {} + + virtual void handlerReady(uint16_t events) noexcept { + assert(events == EventHandler::READ); + ssize_t bytesRead = readFromFD(fd_, readLength_); + log.push_back(EventRecord(events, bytesRead, 0)); + } + + private: + int fd_; + size_t readLength_; +}; + +/** + * Test reading only part of the available data when a read event is fired. + * When PERSIST is used, make sure the handler gets notified again the next + * time around the loop. + */ +TEST(EventBaseTest, ReadPartial) { + EventBase eb; + SocketPair sp; + + // Register for read events + size_t readLength = 100; + PartialReadHandler handler(&eb, sp[0], readLength); + handler.registerHandler(EventHandler::READ | EventHandler::PERSIST); + + // Register a timeout to perform a single write, + // with more data than PartialReadHandler will read at once + ScheduledEvent events[] = { + { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + ASSERT_EQ(handler.log.size(), 4); + + // The first 3 invocations should read readLength bytes each + for (int n = 0; n < 3; ++n) { + ASSERT_EQ(handler.log[n].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[n].bytesRead, readLength); + ASSERT_EQ(handler.log[n].bytesWritten, 0); + } + // The last read only has readLength/2 bytes + ASSERT_EQ(handler.log[3].events, EventHandler::READ); + T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[3].bytesRead, readLength / 2); + ASSERT_EQ(handler.log[3].bytesWritten, 0); +} + + +class PartialWriteHandler : public TestHandler { + public: + PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength) + : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {} + + virtual void handlerReady(uint16_t events) noexcept { + assert(events == EventHandler::WRITE); + ssize_t bytesWritten = writeToFD(fd_, writeLength_); + log.push_back(EventRecord(events, 0, bytesWritten)); + } + + private: + int fd_; + size_t writeLength_; +}; + +/** + * Test writing without completely filling up the write buffer when the fd + * becomes writable. When PERSIST is used, make sure the handler gets + * notified again the next time around the loop. + */ +TEST(EventBaseTest, WritePartial) { + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t initialBytesWritten = writeUntilFull(sp[0]); + + // Register for write events + size_t writeLength = 100; + PartialWriteHandler handler(&eb, sp[0], writeLength); + handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST); + + // Register a timeout to read, so that more data can be written + ScheduledEvent events[] = { + { 10, EventHandler::READ, 0 }, + { 0, 0, 0 }, + }; + scheduleEvents(&eb, sp[1], events); + + // Schedule a timeout to unregister the handler + eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); + + // Loop + TimePoint start; + eb.loop(); + TimePoint end; + + // Depending on how big the socket buffer is, there will be multiple writes + // Only check the first 5 + int numChecked = 5; + ASSERT_GE(handler.log.size(), numChecked); + ASSERT_EQ(events[0].result, initialBytesWritten); + + // The first 3 invocations should read writeLength bytes each + for (int n = 0; n < numChecked; ++n) { + ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds); + ASSERT_EQ(handler.log[n].bytesRead, 0); + ASSERT_EQ(handler.log[n].bytesWritten, writeLength); + } +} + + +/** + * Test destroying a registered EventHandler + */ +TEST(EventBaseTest, DestroyHandler) { + class DestroyHandler : public TAsyncTimeout { + public: + DestroyHandler(EventBase* eb, EventHandler* h) + : TAsyncTimeout(eb) + , handler_(h) {} + + virtual void timeoutExpired() noexcept { + delete handler_; + } + + private: + EventHandler* handler_; + }; + + EventBase eb; + SocketPair sp; + + // Fill up the write buffer before starting + size_t initialBytesWritten = writeUntilFull(sp[0]); + + // Register for write events + TestHandler* handler = new TestHandler(&eb, sp[0]); + handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST); + + // After 10ms, read some data, so that the handler + // will be notified that it can write. + eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), + 10); + + // Start a timer to destroy the handler after 25ms + // This mainly just makes sure the code doesn't break or assert + DestroyHandler dh(&eb, handler); + dh.scheduleTimeout(25); + + TimePoint start; + eb.loop(); + TimePoint end; + + // Make sure the EventHandler was uninstalled properly when it was + // destroyed, and the EventBase loop exited + T_CHECK_TIMEOUT(start, end, 25); + + // Make sure that the handler wrote data to the socket + // before it was destroyed + size_t bytesRemaining = readUntilEmpty(sp[1]); + ASSERT_GT(bytesRemaining, 0); +} + + +/////////////////////////////////////////////////////////////////////////// +// Tests for timeout events +/////////////////////////////////////////////////////////////////////////// + +TEST(EventBaseTest, RunAfterDelay) { + EventBase eb; + + TimePoint timestamp1(false); + TimePoint timestamp2(false); + TimePoint timestamp3(false); + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40); + + TimePoint start; + eb.loop(); + TimePoint end; + + T_CHECK_TIMEOUT(start, timestamp1, 10); + T_CHECK_TIMEOUT(start, timestamp2, 20); + T_CHECK_TIMEOUT(start, timestamp3, 40); + T_CHECK_TIMEOUT(start, end, 40); +} + +/** + * Test the behavior of runAfterDelay() when some timeouts are + * still scheduled when the EventBase is destroyed. + */ +TEST(EventBaseTest, RunAfterDelayDestruction) { + TimePoint timestamp1(false); + TimePoint timestamp2(false); + TimePoint timestamp3(false); + TimePoint timestamp4(false); + TimePoint start(false); + TimePoint end(false); + + { + EventBase eb; + + // Run two normal timeouts + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); + + // Schedule a timeout to stop the event loop after 40ms + eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40); + + // Schedule 2 timeouts that would fire after the event loop stops + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80); + eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160); + + start.reset(); + eb.loop(); + end.reset(); + } + + T_CHECK_TIMEOUT(start, timestamp1, 10); + T_CHECK_TIMEOUT(start, timestamp2, 20); + T_CHECK_TIMEOUT(start, end, 40); + + ASSERT_TRUE(timestamp3.isUnset()); + ASSERT_TRUE(timestamp4.isUnset()); + + // Ideally this test should be run under valgrind to ensure that no + // memory is leaked. +} + +class TestTimeout : public TAsyncTimeout { + public: + explicit TestTimeout(EventBase* eventBase) + : TAsyncTimeout(eventBase) + , timestamp(false) {} + + virtual void timeoutExpired() noexcept { + timestamp.reset(); + } + + TimePoint timestamp; +}; + +TEST(EventBaseTest, BasicTimeouts) { + EventBase eb; + + TestTimeout t1(&eb); + TestTimeout t2(&eb); + TestTimeout t3(&eb); + t1.scheduleTimeout(10); + t2.scheduleTimeout(20); + t3.scheduleTimeout(40); + + TimePoint start; + eb.loop(); + TimePoint end; + + T_CHECK_TIMEOUT(start, t1.timestamp, 10); + T_CHECK_TIMEOUT(start, t2.timestamp, 20); + T_CHECK_TIMEOUT(start, t3.timestamp, 40); + T_CHECK_TIMEOUT(start, end, 40); +} + +class ReschedulingTimeout : public TAsyncTimeout { + public: + ReschedulingTimeout(EventBase* evb, const vector& timeouts) + : TAsyncTimeout(evb) + , timeouts_(timeouts) + , iterator_(timeouts_.begin()) {} + + void start() { + reschedule(); + } + + virtual void timeoutExpired() noexcept { + timestamps.push_back(TimePoint()); + reschedule(); + } + + void reschedule() { + if (iterator_ != timeouts_.end()) { + uint32_t timeout = *iterator_; + ++iterator_; + scheduleTimeout(timeout); + } + } + + vector timestamps; + + private: + vector timeouts_; + vector::const_iterator iterator_; +}; + +/** + * Test rescheduling the same timeout multiple times + */ +TEST(EventBaseTest, ReuseTimeout) { + EventBase eb; + + vector timeouts; + timeouts.push_back(10); + timeouts.push_back(30); + timeouts.push_back(15); + + ReschedulingTimeout t(&eb, timeouts); + t.start(); + + TimePoint start; + eb.loop(); + TimePoint end; + + // Use a higher tolerance than usual. We're waiting on 3 timeouts + // consecutively. In general, each timeout may go over by a few + // milliseconds, and we're tripling this error by witing on 3 timeouts. + int64_t tolerance = 6; + + ASSERT_EQ(timeouts.size(), t.timestamps.size()); + uint32_t total = 0; + for (int n = 0; n < timeouts.size(); ++n) { + total += timeouts[n]; + T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance); + } + T_CHECK_TIMEOUT(start, end, total, tolerance); +} + +/** + * Test rescheduling a timeout before it has fired + */ +TEST(EventBaseTest, RescheduleTimeout) { + EventBase eb; + + TestTimeout t1(&eb); + TestTimeout t2(&eb); + TestTimeout t3(&eb); + + t1.scheduleTimeout(15); + t2.scheduleTimeout(30); + t3.scheduleTimeout(30); + + auto f = static_cast( + &TAsyncTimeout::scheduleTimeout); + + // after 10ms, reschedule t2 to run sooner than originally scheduled + eb.runAfterDelay(std::bind(f, &t2, 10), 10); + // after 10ms, reschedule t3 to run later than originally scheduled + eb.runAfterDelay(std::bind(f, &t3, 40), 10); + + TimePoint start; + eb.loop(); + TimePoint end; + + T_CHECK_TIMEOUT(start, t1.timestamp, 15); + T_CHECK_TIMEOUT(start, t2.timestamp, 20); + T_CHECK_TIMEOUT(start, t3.timestamp, 50); + T_CHECK_TIMEOUT(start, end, 50); +} + +/** + * Test cancelling a timeout + */ +TEST(EventBaseTest, CancelTimeout) { + EventBase eb; + + vector timeouts; + timeouts.push_back(10); + timeouts.push_back(30); + timeouts.push_back(25); + + ReschedulingTimeout t(&eb, timeouts); + t.start(); + eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50); + + TimePoint start; + eb.loop(); + TimePoint end; + + ASSERT_EQ(t.timestamps.size(), 2); + T_CHECK_TIMEOUT(start, t.timestamps[0], 10); + T_CHECK_TIMEOUT(start, t.timestamps[1], 40); + T_CHECK_TIMEOUT(start, end, 50); +} + +/** + * Test destroying a scheduled timeout object + */ +TEST(EventBaseTest, DestroyTimeout) { + class DestroyTimeout : public TAsyncTimeout { + public: + DestroyTimeout(EventBase* eb, TAsyncTimeout* t) + : TAsyncTimeout(eb) + , timeout_(t) {} + + virtual void timeoutExpired() noexcept { + delete timeout_; + } + + private: + TAsyncTimeout* timeout_; + }; + + EventBase eb; + + TestTimeout* t1 = new TestTimeout(&eb); + t1->scheduleTimeout(30); + + DestroyTimeout dt(&eb, t1); + dt.scheduleTimeout(10); + + TimePoint start; + eb.loop(); + TimePoint end; + + T_CHECK_TIMEOUT(start, end, 10); +} + + +/////////////////////////////////////////////////////////////////////////// +// Test for runInThreadTestFunc() +/////////////////////////////////////////////////////////////////////////// + +struct RunInThreadData { + RunInThreadData(int numThreads, int opsPerThread) + : opsPerThread(opsPerThread) + , opsToGo(numThreads*opsPerThread) {} + + EventBase evb; + deque< pair > values; + + int opsPerThread; + int opsToGo; +}; + +struct RunInThreadArg { + RunInThreadArg(RunInThreadData* data, + int threadId, + int value) + : data(data) + , thread(threadId) + , value(value) {} + + RunInThreadData* data; + int thread; + int value; +}; + +void runInThreadTestFunc(RunInThreadArg* arg) { + arg->data->values.push_back(make_pair(arg->thread, arg->value)); + RunInThreadData* data = arg->data; + delete arg; + + if(--data->opsToGo == 0) { + // Break out of the event base loop if we are the last thread running + data->evb.terminateLoopSoon(); + } +} + +class RunInThreadTester : public concurrency::Runnable { + public: + RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {} + + void run() { + // Call evb->runInThread() a number of times + { + for (int n = 0; n < data_->opsPerThread; ++n) { + RunInThreadArg* arg = new RunInThreadArg(data_, id_, n); + data_->evb.runInEventBaseThread(runInThreadTestFunc, arg); + usleep(10); + } + } + } + + private: + int id_; + RunInThreadData* data_; +}; + +TEST(EventBaseTest, RunInThread) { + uint32_t numThreads = 50; + uint32_t opsPerThread = 100; + RunInThreadData data(numThreads, opsPerThread); + + PosixThreadFactory threadFactory; + threadFactory.setDetached(false); + deque< std::shared_ptr > threads; + for (int n = 0; n < numThreads; ++n) { + std::shared_ptr runner(new RunInThreadTester(n, &data)); + std::shared_ptr thread = threadFactory.newThread(runner); + threads.push_back(thread); + thread->start(); + } + + // Add a timeout event to run after 3 seconds. + // Otherwise loop() will return immediately since there are no events to run. + // Once the last thread exits, it will stop the loop(). However, this + // timeout also stops the loop in case there is a bug performing the normal + // stop. + data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb), + 3000); + + TimePoint start; + data.evb.loop(); + TimePoint end; + + // Verify that the loop exited because all threads finished and requested it + // to stop. This should happen much sooner than the 3 second timeout. + // Assert that it happens in under a second. (This is still tons of extra + // padding.) + int64_t timeTaken = end.getTime() - start.getTime(); + ASSERT_LT(timeTaken, 1000); + VLOG(11) << "Time taken: " << timeTaken; + + // Verify that we have all of the events from every thread + int expectedValues[numThreads]; + for (int n = 0; n < numThreads; ++n) { + expectedValues[n] = 0; + } + for (deque< pair >::const_iterator it = data.values.begin(); + it != data.values.end(); + ++it) { + int threadID = it->first; + int value = it->second; + ASSERT_EQ(expectedValues[threadID], value); + ++expectedValues[threadID]; + } + for (int n = 0; n < numThreads; ++n) { + ASSERT_EQ(expectedValues[n], opsPerThread); + } + + // Wait on all of the threads. Otherwise we can exit and clean up + // RunInThreadData before the last thread exits, while it is still holding + // the RunInThreadData's mutex. + for (deque< std::shared_ptr >::const_iterator it = threads.begin(); + it != threads.end(); + ++it) { + (*it)->join(); + } +} + +/////////////////////////////////////////////////////////////////////////// +// Tests for runInLoop() +/////////////////////////////////////////////////////////////////////////// + +class CountedLoopCallback : public EventBase::LoopCallback { + public: + CountedLoopCallback(EventBase* eventBase, + unsigned int count, + std::function action = + std::function()) + : eventBase_(eventBase) + , count_(count) + , action_(action) {} + + virtual void runLoopCallback() noexcept { + --count_; + if (count_ > 0) { + eventBase_->runInLoop(this); + } else if (action_) { + action_(); + } + } + + unsigned int getCount() const { + return count_; + } + + private: + EventBase* eventBase_; + unsigned int count_; + std::function action_; +}; + +// Test that EventBase::loop() doesn't exit while there are +// still LoopCallbacks remaining to be invoked. +TEST(EventBaseTest, RepeatedRunInLoop) { + EventBase eventBase; + + CountedLoopCallback c(&eventBase, 10); + eventBase.runInLoop(&c); + // The callback shouldn't have run immediately + ASSERT_EQ(c.getCount(), 10); + eventBase.loop(); + + // loop() should loop until the CountedLoopCallback stops + // re-installing itself. + ASSERT_EQ(c.getCount(), 0); +} + +// Test runInLoop() calls with terminateLoopSoon() +TEST(EventBaseTest, RunInLoopStopLoop) { + EventBase eventBase; + + CountedLoopCallback c1(&eventBase, 20); + CountedLoopCallback c2(&eventBase, 10, + std::bind(&EventBase::terminateLoopSoon, &eventBase)); + + eventBase.runInLoop(&c1); + eventBase.runInLoop(&c2); + ASSERT_EQ(c1.getCount(), 20); + ASSERT_EQ(c2.getCount(), 10); + + eventBase.loopForever(); + + // c2 should have stopped the loop after 10 iterations + ASSERT_EQ(c2.getCount(), 0); + + // We allow the EventBase to run the loop callbacks in whatever order it + // chooses. We'll accept c1's count being either 10 (if the loop terminated + // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop + // before c1 ran). + // + // (With the current code, c1 will always run 10 times, but we don't consider + // this a hard API requirement.) + ASSERT_GE(c1.getCount(), 10); + ASSERT_LE(c1.getCount(), 11); +} + +// Test cancelling runInLoop() callbacks +TEST(EventBaseTest, CancelRunInLoop) { + EventBase eventBase; + + CountedLoopCallback c1(&eventBase, 20); + CountedLoopCallback c2(&eventBase, 20); + CountedLoopCallback c3(&eventBase, 20); + + std::function cancelC1Action = + std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1); + std::function cancelC2Action = + std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2); + + CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action); + CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action); + + // Install cancelC1 after c1 + eventBase.runInLoop(&c1); + eventBase.runInLoop(&cancelC1); + + // Install cancelC2 before c2 + eventBase.runInLoop(&cancelC2); + eventBase.runInLoop(&c2); + + // Install c3 + eventBase.runInLoop(&c3); + + ASSERT_EQ(c1.getCount(), 20); + ASSERT_EQ(c2.getCount(), 20); + ASSERT_EQ(c3.getCount(), 20); + ASSERT_EQ(cancelC1.getCount(), 10); + ASSERT_EQ(cancelC2.getCount(), 10); + + // Run the loop + eventBase.loop(); + + // cancelC1 and cancelC3 should have both fired after 10 iterations and + // stopped re-installing themselves + ASSERT_EQ(cancelC1.getCount(), 0); + ASSERT_EQ(cancelC2.getCount(), 0); + // c3 should have continued on for the full 20 iterations + ASSERT_EQ(c3.getCount(), 0); + + // c1 and c2 should have both been cancelled on the 10th iteration. + // + // Callbacks are always run in the order they are installed, + // so c1 should have fired 10 times, and been canceled after it ran on the + // 10th iteration. c2 should have only fired 9 times, because cancelC2 will + // have run before it on the 10th iteration, and cancelled it before it + // fired. + ASSERT_EQ(c1.getCount(), 10); + ASSERT_EQ(c2.getCount(), 11); +} + +class TerminateTestCallback : public EventBase::LoopCallback, + public EventHandler { + public: + TerminateTestCallback(EventBase* eventBase, int fd) + : EventHandler(eventBase, fd), + eventBase_(eventBase), + loopInvocations_(0), + maxLoopInvocations_(0), + eventInvocations_(0), + maxEventInvocations_(0) {} + + void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) { + loopInvocations_ = 0; + maxLoopInvocations_ = maxLoopInvocations; + eventInvocations_ = 0; + maxEventInvocations_ = maxEventInvocations; + + cancelLoopCallback(); + unregisterHandler(); + } + + virtual void handlerReady(uint16_t events) noexcept { + // We didn't register with PERSIST, so we will have been automatically + // unregistered already. + ASSERT_FALSE(isHandlerRegistered()); + + ++eventInvocations_; + if (eventInvocations_ >= maxEventInvocations_) { + return; + } + + eventBase_->runInLoop(this); + } + virtual void runLoopCallback() noexcept { + ++loopInvocations_; + if (loopInvocations_ >= maxLoopInvocations_) { + return; + } + + registerHandler(READ); + } + + uint32_t getLoopInvocations() const { + return loopInvocations_; + } + uint32_t getEventInvocations() const { + return eventInvocations_; + } + + private: + EventBase* eventBase_; + uint32_t loopInvocations_; + uint32_t maxLoopInvocations_; + uint32_t eventInvocations_; + uint32_t maxEventInvocations_; +}; + +/** + * Test that EventBase::loop() correctly detects when there are no more events + * left to run. + * + * This uses a single callback, which alternates registering itself as a loop + * callback versus a EventHandler callback. This exercises a regression where + * EventBase::loop() incorrectly exited if there were no more fd handlers + * registered, but a loop callback installed a new fd handler. + */ +TEST(EventBaseTest, LoopTermination) { + EventBase eventBase; + + // Open a pipe and close the write end, + // so the read endpoint will be readable + int pipeFds[2]; + int rc = pipe(pipeFds); + ASSERT_EQ(rc, 0); + close(pipeFds[1]); + TerminateTestCallback callback(&eventBase, pipeFds[0]); + + // Test once where the callback will exit after a loop callback + callback.reset(10, 100); + eventBase.runInLoop(&callback); + eventBase.loop(); + ASSERT_EQ(callback.getLoopInvocations(), 10); + ASSERT_EQ(callback.getEventInvocations(), 9); + + // Test once where the callback will exit after an fd event callback + callback.reset(100, 7); + eventBase.runInLoop(&callback); + eventBase.loop(); + ASSERT_EQ(callback.getLoopInvocations(), 7); + ASSERT_EQ(callback.getEventInvocations(), 7); + + close(pipeFds[0]); +} + +/////////////////////////////////////////////////////////////////////////// +// Tests for latency calculations +/////////////////////////////////////////////////////////////////////////// + +class IdleTimeTimeoutSeries : public TAsyncTimeout { + + public: + + explicit IdleTimeTimeoutSeries(EventBase *base, + std::deque& timeout) : + TAsyncTimeout(base), + timeouts_(0), + timeout_(timeout) { + scheduleTimeout(1); + } + + virtual ~IdleTimeTimeoutSeries() {} + + void timeoutExpired() noexcept { + ++timeouts_; + + if(timeout_.empty()){ + cancelTimeout(); + } else { + uint64_t sleepTime = timeout_.front(); + timeout_.pop_front(); + if (sleepTime) { + usleep(sleepTime); + } + scheduleTimeout(1); + } + } + + int getTimeouts() const { + return timeouts_; + } + + private: + int timeouts_; + std::deque& timeout_; +}; + +/** + * Verify that idle time is correctly accounted for when decaying our loop + * time. + * + * This works by creating a high loop time (via usleep), expecting a latency + * callback with known value, and then scheduling a timeout for later. This + * later timeout is far enough in the future that the idle time should have + * caused the loop time to decay. + */ +TEST(EventBaseTest, IdleTime) { + EventBase eventBase; + eventBase.setLoadAvgMsec(1000); + eventBase.resetLoadAvg(5900.0); + std::deque timeouts0(4, 8080); + timeouts0.push_front(8000); + timeouts0.push_back(14000); + IdleTimeTimeoutSeries tos0(&eventBase, timeouts0); + std::deque timeouts(20, 20); + std::unique_ptr tos; + + int latencyCallbacks = 0; + eventBase.setMaxLatency(6000, [&]() { + ++latencyCallbacks; + + switch (latencyCallbacks) { + case 1: + ASSERT_EQ(6, tos0.getTimeouts()); + ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200); + ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200); + tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts)); + break; + + default: + FAIL() << "Unexpected latency callback"; + break; + } + }); + + // Kick things off with an "immedite" timeout + tos0.scheduleTimeout(1); + + eventBase.loop(); + + ASSERT_EQ(1, latencyCallbacks); + ASSERT_EQ(7, tos0.getTimeouts()); + ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200); + ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200); + ASSERT_EQ(21, tos->getTimeouts()); +} + +/** + * Test that thisLoop functionality works with terminateLoopSoon + */ +TEST(EventBaseTest, ThisLoop) { + EventBase eb; + bool runInLoop = false; + bool runThisLoop = false; + + eb.runInLoop([&](){ + eb.terminateLoopSoon(); + eb.runInLoop([&]() { + runInLoop = true; + }); + eb.runInLoop([&]() { + runThisLoop = true; + }, true); + }, true); + eb.loopForever(); + + // Should not work + ASSERT_FALSE(runInLoop); + // Should work with thisLoop + ASSERT_TRUE(runThisLoop); +} + +TEST(EventBaseTest, EventBaseThreadLoop) { + EventBase base; + bool ran = false; + + base.runInEventBaseThread([&](){ + ran = true; + }); + base.loop(); + + ASSERT_EQ(true, ran); +} + +TEST(EventBaseTest, EventBaseThreadName) { + EventBase base; + base.setName("foo"); + base.loop(); + +#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12) + char name[16]; + pthread_getname_np(pthread_self(), name, 16); + ASSERT_EQ(0, strcmp("foo", name)); +#endif +} diff --git a/folly/io/async/test/HHWheelTimerTest.cpp b/folly/io/async/test/HHWheelTimerTest.cpp new file mode 100644 index 00000000..05a84c8f --- /dev/null +++ b/folly/io/async/test/HHWheelTimerTest.cpp @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include + +#include +#include + +using namespace folly; +using std::chrono::milliseconds; + +typedef UndelayedDestruction StackWheelTimer; + +class TestTimeout : public HHWheelTimer::Callback { + public: + TestTimeout() {} + TestTimeout(HHWheelTimer* t, milliseconds timeout) { + t->scheduleTimeout(this, timeout); + } + virtual void timeoutExpired() noexcept { + timestamps.push_back(TimePoint()); + if (fn) { + fn(); + } + } + + std::deque timestamps; + std::function fn; +}; + +/* + * Test firing some simple timeouts that are fired once and never rescheduled + */ +TEST(HHWheelTimerTest, FireOnce) { + EventBase eventBase; + StackWheelTimer t(&eventBase, milliseconds(1)); + + const HHWheelTimer::Callback* nullCallback = nullptr; + + TestTimeout t1; + TestTimeout t2; + TestTimeout t3; + + ASSERT_EQ(t.count(), 0); + + t.scheduleTimeout(&t1, milliseconds(5)); + t.scheduleTimeout(&t2, milliseconds(5)); + // Verify scheduling it twice cancels, then schedules. + // Should only get one callback. + t.scheduleTimeout(&t2, milliseconds(5)); + t.scheduleTimeout(&t3, milliseconds(10)); + + ASSERT_EQ(t.count(), 3); + + TimePoint start; + eventBase.loop(); + TimePoint end; + + ASSERT_EQ(t1.timestamps.size(), 1); + ASSERT_EQ(t2.timestamps.size(), 1); + ASSERT_EQ(t3.timestamps.size(), 1); + + ASSERT_EQ(t.count(), 0); + + T_CHECK_TIMEOUT(start, t1.timestamps[0], 5); + T_CHECK_TIMEOUT(start, t2.timestamps[0], 5); + T_CHECK_TIMEOUT(start, t3.timestamps[0], 10); + T_CHECK_TIMEOUT(start, end, 10); +} + +/* + * Test cancelling a timeout when it is scheduled to be fired right away. + */ + +TEST(HHWheelTimerTest, CancelTimeout) { + EventBase eventBase; + StackWheelTimer t(&eventBase, milliseconds(1)); + + // Create several timeouts that will all fire in 5ms. + TestTimeout t5_1(&t, milliseconds(5)); + TestTimeout t5_2(&t, milliseconds(5)); + TestTimeout t5_3(&t, milliseconds(5)); + TestTimeout t5_4(&t, milliseconds(5)); + TestTimeout t5_5(&t, milliseconds(5)); + + // Also create a few timeouts to fire in 10ms + TestTimeout t10_1(&t, milliseconds(10)); + TestTimeout t10_2(&t, milliseconds(10)); + TestTimeout t10_3(&t, milliseconds(10)); + + TestTimeout t20_1(&t, milliseconds(20)); + TestTimeout t20_2(&t, milliseconds(20)); + + // Have t5_1 cancel t5_2 and t5_4. + // + // Cancelling t5_2 will test cancelling a timeout that is at the head of the + // list and ready to be fired. + // + // Cancelling t5_4 will test cancelling a timeout in the middle of the list + t5_1.fn = [&] { + t5_2.cancelTimeout(); + t5_4.cancelTimeout(); + }; + + // Have t5_3 cancel t5_5. + // This will test cancelling the last remaining timeout. + // + // Then have t5_3 reschedule itself. + t5_3.fn = [&] { + t5_5.cancelTimeout(); + // Reset our function so we won't continually reschedule ourself + std::function fnDtorGuard; + t5_3.fn.swap(fnDtorGuard); + t.scheduleTimeout(&t5_3, milliseconds(5)); + + // Also test cancelling timeouts in another timeset that isn't ready to + // fire yet. + // + // Cancel the middle timeout in ts10. + t10_2.cancelTimeout(); + // Cancel both the timeouts in ts20. + t20_1.cancelTimeout(); + t20_2.cancelTimeout(); + }; + + TimePoint start; + eventBase.loop(); + TimePoint end; + + ASSERT_EQ(t5_1.timestamps.size(), 1); + T_CHECK_TIMEOUT(start, t5_1.timestamps[0], 5); + + ASSERT_EQ(t5_3.timestamps.size(), 2); + T_CHECK_TIMEOUT(start, t5_3.timestamps[0], 5); + T_CHECK_TIMEOUT(t5_3.timestamps[0], t5_3.timestamps[1], 5); + + ASSERT_EQ(t10_1.timestamps.size(), 1); + T_CHECK_TIMEOUT(start, t10_1.timestamps[0], 10); + ASSERT_EQ(t10_3.timestamps.size(), 1); + T_CHECK_TIMEOUT(start, t10_3.timestamps[0], 10); + + // Cancelled timeouts + ASSERT_EQ(t5_2.timestamps.size(), 0); + ASSERT_EQ(t5_4.timestamps.size(), 0); + ASSERT_EQ(t5_5.timestamps.size(), 0); + ASSERT_EQ(t10_2.timestamps.size(), 0); + ASSERT_EQ(t20_1.timestamps.size(), 0); + ASSERT_EQ(t20_2.timestamps.size(), 0); + + T_CHECK_TIMEOUT(start, end, 10); +} + +/* + * Test destroying a HHWheelTimer with timeouts outstanding + */ + +TEST(HHWheelTimerTest, DestroyTimeoutSet) { + EventBase eventBase; + + HHWheelTimer::UniquePtr t( + new HHWheelTimer(&eventBase, milliseconds(1))); + + TestTimeout t5_1(t.get(), milliseconds(5)); + TestTimeout t5_2(t.get(), milliseconds(5)); + TestTimeout t5_3(t.get(), milliseconds(5)); + + TestTimeout t10_1(t.get(), milliseconds(10)); + TestTimeout t10_2(t.get(), milliseconds(10)); + + // Have t5_2 destroy t + // Note that this will call destroy() inside t's timeoutExpired() + // method. + t5_2.fn = [&] { + t5_3.cancelTimeout(); + t5_1.cancelTimeout(); + t10_1.cancelTimeout(); + t10_2.cancelTimeout(); + t.reset();}; + + TimePoint start; + eventBase.loop(); + TimePoint end; + + ASSERT_EQ(t5_1.timestamps.size(), 1); + T_CHECK_TIMEOUT(start, t5_1.timestamps[0], 5); + ASSERT_EQ(t5_2.timestamps.size(), 1); + T_CHECK_TIMEOUT(start, t5_2.timestamps[0], 5); + + ASSERT_EQ(t5_3.timestamps.size(), 0); + ASSERT_EQ(t10_1.timestamps.size(), 0); + ASSERT_EQ(t10_2.timestamps.size(), 0); + + T_CHECK_TIMEOUT(start, end, 5); +} + +/* + * Test the tick interval parameter + */ +TEST(HHWheelTimerTest, AtMostEveryN) { + EventBase eventBase; + + // Create a timeout set with a 10ms interval, to fire no more than once + // every 3ms. + milliseconds interval(25); + milliseconds atMostEveryN(6); + StackWheelTimer t(&eventBase, atMostEveryN); + t.setCatchupEveryN(70); + + // Create 60 timeouts to be added to ts10 at 1ms intervals. + uint32_t numTimeouts = 60; + std::vector timeouts(numTimeouts); + + // Create a scheduler timeout to add the timeouts 1ms apart. + uint32_t index = 0; + StackWheelTimer ts1(&eventBase, milliseconds(1)); + TestTimeout scheduler(&ts1, milliseconds(1)); + scheduler.fn = [&] { + if (index >= numTimeouts) { + return; + } + // Call timeoutExpired() on the timeout so it will record a timestamp. + // This is done only so we can record when we scheduled the timeout. + // This way if ts1 starts to fall behind a little over time we will still + // be comparing the ts10 timeouts to when they were first scheduled (rather + // than when we intended to schedule them). The scheduler may fall behind + // eventually since we don't really schedule it once every millisecond. + // Each time it finishes we schedule it for 1 millisecond in the future. + // The amount of time it takes to run, and any delays it encounters + // getting scheduled may eventually add up over time. + timeouts[index].timeoutExpired(); + + // Schedule the new timeout + t.scheduleTimeout(&timeouts[index], interval); + // Reschedule ourself + ts1.scheduleTimeout(&scheduler, milliseconds(1)); + ++index; + }; + + // Go ahead and schedule the first timeout now. + //scheduler.fn(); + + TimePoint start; + eventBase.loop(); + TimePoint end; + + // We scheduled timeouts 1ms apart, when the HHWheelTimer is only allowed + // to wake up at most once every 3ms. It will therefore wake up every 3ms + // and fire groups of approximately 3 timeouts at a time. + // + // This is "approximately 3" since it may get slightly behind and fire 4 in + // one interval, etc. T_CHECK_TIMEOUT normally allows a few milliseconds of + // tolerance. We have to add the same into our checking algorithm here. + for (uint32_t idx = 0; idx < numTimeouts; ++idx) { + ASSERT_EQ(timeouts[idx].timestamps.size(), 2); + + TimePoint scheduledTime(timeouts[idx].timestamps[0]); + TimePoint firedTime(timeouts[idx].timestamps[1]); + + // Assert that the timeout fired at roughly the right time. + // T_CHECK_TIMEOUT() normally has a tolerance of 5ms. Allow an additional + // atMostEveryN. + milliseconds tolerance = milliseconds(5) + interval; + T_CHECK_TIMEOUT(scheduledTime, firedTime, atMostEveryN.count(), + tolerance.count()); + + // Assert that the difference between the previous timeout and now was + // either very small (fired in the same event loop), or larger than + // atMostEveryN. + if (idx == 0) { + // no previous value + continue; + } + TimePoint prev(timeouts[idx - 1].timestamps[1]); + + milliseconds delta((firedTime.getTimeStart() - prev.getTimeEnd()) - + (firedTime.getTimeWaiting() - prev.getTimeWaiting())); + if (delta > milliseconds(1)) { + T_CHECK_TIMEOUT(prev, firedTime, atMostEveryN.count()); } + } +} + +/* + * Test an event loop that is blocking + */ + +TEST(HHWheelTimerTest, SlowLoop) { + EventBase eventBase; + StackWheelTimer t(&eventBase, milliseconds(1)); + + TestTimeout t1; + TestTimeout t2; + + ASSERT_EQ(t.count(), 0); + + eventBase.runInLoop([](){usleep(10000);}); + t.scheduleTimeout(&t1, milliseconds(5)); + + ASSERT_EQ(t.count(), 1); + + TimePoint start; + eventBase.loop(); + TimePoint end; + + ASSERT_EQ(t1.timestamps.size(), 1); + ASSERT_EQ(t.count(), 0); + + // Check that the timeout was delayed by sleep + T_CHECK_TIMEOUT(start, t1.timestamps[0], 15, 1); + T_CHECK_TIMEOUT(start, end, 15, 1); + + // Try it again, this time with catchup timing every loop + t.setCatchupEveryN(1); + + eventBase.runInLoop([](){usleep(10000);}); + t.scheduleTimeout(&t2, milliseconds(5)); + + ASSERT_EQ(t.count(), 1); + + TimePoint start2; + eventBase.loop(); + TimePoint end2; + + ASSERT_EQ(t2.timestamps.size(), 1); + ASSERT_EQ(t.count(), 0); + + // Check that the timeout was NOT delayed by sleep + T_CHECK_TIMEOUT(start2, t2.timestamps[0], 10, 1); + T_CHECK_TIMEOUT(start2, end2, 10, 1); +} diff --git a/folly/io/async/test/SocketPair.cpp b/folly/io/async/test/SocketPair.cpp new file mode 100644 index 00000000..73df89b7 --- /dev/null +++ b/folly/io/async/test/SocketPair.cpp @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace folly { + +SocketPair::SocketPair(Mode mode) { + if (socketpair(PF_UNIX, SOCK_STREAM, 0, fds_) != 0) { + throw std::runtime_error( + folly::to("test::SocketPair: failed create socket pair", + errno)); + } + + if (mode == NONBLOCKING) { + if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) != 0) { + throw std::runtime_error( + folly::to("test::SocketPair: failed to set non-blocking " + "read mode", errno)); + } + if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) != 0) { + throw std::runtime_error( + folly::to("test::SocketPair: failed to set non-blocking " + "write mode", errno)); + } + } +} + +SocketPair::~SocketPair() { + closeFD0(); + closeFD1(); +} + +void SocketPair::closeFD0() { + if (fds_[0] >= 0) { + close(fds_[0]); + fds_[0] = -1; + } +} + +void SocketPair::closeFD1() { + if (fds_[1] >= 0) { + close(fds_[1]); + fds_[1] = -1; + } +} + +} diff --git a/folly/io/async/test/SocketPair.h b/folly/io/async/test/SocketPair.h new file mode 100644 index 00000000..81cf6b56 --- /dev/null +++ b/folly/io/async/test/SocketPair.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +namespace folly { + +class SocketPair { + public: + enum Mode { + BLOCKING, + NONBLOCKING + }; + + explicit SocketPair(Mode mode = NONBLOCKING); + ~SocketPair(); + + int operator[](int index) const { + return fds_[index]; + } + + void closeFD0(); + void closeFD1(); + + int extractFD0() { + return extractFD(0); + } + int extractFD1() { + return extractFD(1); + } + int extractFD(int index) { + int fd = fds_[index]; + fds_[index] = -1; + return fd; + } + + private: + int fds_[2]; +}; + +} diff --git a/folly/io/async/test/TimeUtil.cpp b/folly/io/async/test/TimeUtil.cpp new file mode 100644 index 00000000..9a60a0e5 --- /dev/null +++ b/folly/io/async/test/TimeUtil.cpp @@ -0,0 +1,237 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define __STDC_FORMAT_MACROS + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using std::string; +using namespace std::chrono; + +namespace folly { + +/** + * glibc doesn't provide gettid(), so define it ourselves. + */ +static pid_t gettid() { + return syscall(SYS_gettid); +} + +/** + * The /proc//schedstat file reports time values in jiffies. + * + * Determine how many jiffies are in a second. + * Returns -1 if the number of jiffies/second cannot be determined. + */ +static int64_t determineJiffiesHZ() { + // It seems like the only real way to figure out the CONFIG_HZ value used by + // this kernel is to look it up in the config file. + // + // Look in /boot/config- + struct utsname unameInfo; + if (uname(&unameInfo) != 0) { + LOG(ERROR) << "unable to determine jiffies/second: uname failed: %s" + << strerror(errno); + return -1; + } + + char configPath[256]; + snprintf(configPath, sizeof(configPath), "/boot/config-%s", + unameInfo.release); + + FILE* f = fopen(configPath, "r"); + if (f == nullptr) { + LOG(ERROR) << "unable to determine jiffies/second: " + "cannot open kernel config file %s" << configPath; + return -1; + } + + int64_t hz = -1; + char buf[1024]; + while (fgets(buf, sizeof(buf), f) != nullptr) { + if (strcmp(buf, "CONFIG_NO_HZ=y\n") == 0) { + // schedstat info seems to be reported in nanoseconds on tickless + // kernels. + // + // The CONFIG_HZ value doesn't matter for our purposes, + // so return as soon as we see CONFIG_NO_HZ. + fclose(f); + return 1000000000; + } else if (strcmp(buf, "CONFIG_HZ=1000\n") == 0) { + hz = 1000; + } else if (strcmp(buf, "CONFIG_HZ=300\n") == 0) { + hz = 300; + } else if (strcmp(buf, "CONFIG_HZ=250\n") == 0) { + hz = 250; + } else if (strcmp(buf, "CONFIG_HZ=100\n") == 0) { + hz = 100; + } + } + fclose(f); + + if (hz == -1) { + LOG(ERROR) << "unable to determine jiffies/second: no CONFIG_HZ setting " + "found in %s" << configPath; + return -1; + } + + return hz; +} + +/** + * Determine how long this process has spent waiting to get scheduled on the + * CPU. + * + * Returns the number of milliseconds spent waiting, or -1 if the amount of + * time cannot be determined. + */ +static milliseconds getTimeWaitingMS(pid_t tid) { + static int64_t jiffiesHZ = 0; + if (jiffiesHZ == 0) { + jiffiesHZ = determineJiffiesHZ(); + } + + if (jiffiesHZ < 0) { + // We couldn't figure out how many jiffies there are in a second. + // Don't bother reading the schedstat info if we can't interpret it. + return milliseconds(0); + } + + int fd = -1; + try { + char schedstatFile[256]; + snprintf(schedstatFile, sizeof(schedstatFile), + "/proc/%d/schedstat", tid); + fd = open(schedstatFile, O_RDONLY); + if (fd < 0) { + throw std::runtime_error( + folly::to("failed to open process schedstat file", errno)); + } + + char buf[512]; + ssize_t bytesReadRet = read(fd, buf, sizeof(buf) - 1); + if (bytesReadRet <= 0) { + throw std::runtime_error( + folly::to("failed to read process schedstat file", errno)); + } + size_t bytesRead = size_t(bytesReadRet); + + if (buf[bytesRead - 1] != '\n') { + throw std::runtime_error("expected newline at end of schedstat data"); + } + assert(bytesRead < sizeof(buf)); + buf[bytesRead] = '\0'; + + uint64_t activeJiffies = 0; + uint64_t waitingJiffies = 0; + uint64_t numTasks = 0; + int rc = sscanf(buf, "%" PRIu64 " %" PRIu64 " %" PRIu64 "\n", + &activeJiffies, &waitingJiffies, &numTasks); + if (rc != 3) { + throw std::runtime_error("failed to parse schedstat data"); + } + + close(fd); + return milliseconds((waitingJiffies * 1000) / jiffiesHZ); + } catch (const std::runtime_error& e) { + if (fd >= 0) { + close(fd); + } + LOG(ERROR) << "error determining process wait time: %s" << e.what(); + return milliseconds(0); + } +} + +void TimePoint::reset() { + // Remember the current time + timeStart_ = system_clock::now(); + + // Remember how long this process has spent waiting to be scheduled + tid_ = gettid(); + timeWaiting_ = getTimeWaitingMS(tid_); + + // In case it took a while to read the schedstat info, + // also record the time after the schedstat check + timeEnd_ = system_clock::now(); +} + +std::ostream& operator<<(std::ostream& os, const TimePoint& timePoint) { + os << "TimePoint(" << timePoint.getTimeStart().time_since_epoch().count() + << ", " << timePoint.getTimeEnd().time_since_epoch().count() << ", " + << timePoint.getTimeWaiting().count() << ")"; + return os; +} + +bool +checkTimeout(const TimePoint& start, const TimePoint& end, + milliseconds expectedMS, bool allowSmaller, + milliseconds tolerance) { + auto elapsedMS = end.getTimeStart() - start.getTimeEnd(); + + if (!allowSmaller) { + // Timeouts should never fire before the time was up. + // Allow 1ms of wiggle room for rounding errors. + if (elapsedMS < expectedMS - milliseconds(1)) { + return false; + } + } + + // Check that the event fired within a reasonable time of the timout. + // + // If the system is under heavy load, our process may have had to wait for a + // while to be run. The time spent waiting for the processor shouldn't + // count against us, so exclude this time from the check. + milliseconds excludedMS; + if (end.getTid() != start.getTid()) { + // We can only correctly compute the amount of time waiting to be scheduled + // if both TimePoints were set in the same thread. + excludedMS = milliseconds(0); + } else { + excludedMS = end.getTimeWaiting() - start.getTimeWaiting(); + assert(end.getTimeWaiting() >= start.getTimeWaiting()); + // Add a tolerance here due to precision issues on linux, see below note. + assert( (elapsedMS + tolerance) >= excludedMS); + } + + milliseconds effectiveElapsedMS = milliseconds(0); + if (elapsedMS > excludedMS) { + effectiveElapsedMS = duration_cast(elapsedMS) - excludedMS; + } + + // On x86 Linux, sleep calls generally have precision only to the nearest + // millisecond. The tolerance parameter lets users allow a few ms of slop. + milliseconds overrun = effectiveElapsedMS - expectedMS; + if (overrun > tolerance) { + return false; + } + + return true; +} + +} diff --git a/folly/io/async/test/TimeUtil.h b/folly/io/async/test/TimeUtil.h new file mode 100644 index 00000000..ca9c0437 --- /dev/null +++ b/folly/io/async/test/TimeUtil.h @@ -0,0 +1,76 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { + +class TimePoint { + public: + explicit TimePoint(bool set = true) + : tid_(0) { + if (set) { + reset(); + } + } + + void reset(); + + bool isUnset() const { + return (timeStart_.time_since_epoch().count() == 0 && + timeEnd_.time_since_epoch().count() == 0 && + timeWaiting_.count() == 0); + } + + std::chrono::system_clock::time_point getTime() const { + return timeStart_; + } + + std::chrono::system_clock::time_point getTimeStart() const { + return timeStart_; + } + + std::chrono::system_clock::time_point getTimeEnd() const { + return timeStart_; + } + + std::chrono::milliseconds getTimeWaiting() const { + return timeWaiting_; + } + + pid_t getTid() const { + return tid_; + } + + private: + std::chrono::system_clock::time_point timeStart_; + std::chrono::system_clock::time_point timeEnd_; + std::chrono::milliseconds timeWaiting_; + pid_t tid_; +}; + +std::ostream& operator<<(std::ostream& os, const TimePoint& timePoint); + +bool checkTimeout(const TimePoint& start, + const TimePoint& end, + std::chrono::milliseconds expectedMS, + bool allowSmaller, + std::chrono::milliseconds tolerance = + std::chrono::milliseconds(5)); + +} diff --git a/folly/io/async/test/UndelayedDestruction.h b/folly/io/async/test/UndelayedDestruction.h new file mode 100644 index 00000000..952b38ce --- /dev/null +++ b/folly/io/async/test/UndelayedDestruction.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include + +namespace folly { + +/** + * A helper class to allow a DelayedDestruction object to be instantiated on + * the stack. + * + * This class derives from an existing DelayedDestruction type and makes the + * destructor public again. This allows objects of this type to be declared on + * the stack or directly inside another class. Normally DelayedDestruction + * objects must be dynamically allocated on the heap. + * + * However, the trade-off is that you lose some of the protections provided by + * DelayedDestruction::destroy(). DelayedDestruction::destroy() will + * automatically delay destruction of the object until it is safe to do so. + * If you use UndelayedDestruction, you become responsible for ensuring that + * you only destroy the object where it is safe to do so. Attempting to + * destroy a UndelayedDestruction object while it has a non-zero destructor + * guard count will abort the program. + */ +template +class UndelayedDestruction : public TDD { + public: + // We could just use constructor inheritance, but not all compilers + // support that. So, just use a forwarding constructor. + // + // Ideally we would use std::enable_if<> and std::is_constructible<> to + // provide only constructor methods that are valid for our parent class. + // Unfortunately std::is_constructible<> doesn't work for types that aren't + // destructible. In gcc-4.6 it results in a compiler error. In the latest + // gcc code it looks like it has been fixed to return false. (The language + // in the standard seems to indicate that returning false is the correct + // behavior for non-destructible types, which is unfortunate.) + template + explicit UndelayedDestruction(Args&& ...args) + : TDD(std::forward(args)...) {} + + /** + * Public destructor. + * + * The caller is responsible for ensuring that the object is only destroyed + * where it is safe to do so. (i.e., when the destructor guard count is 0). + * + * The exact conditions for meeting this may be dependant upon your class + * semantics. Typically you are only guaranteed that it is safe to destroy + * the object directly from the event loop (e.g., directly from a + * TEventBase::LoopCallback), or when the event loop is stopped. + */ + virtual ~UndelayedDestruction() { + // Crash if the caller is destroying us with outstanding destructor guards. + if (this->getDestructorGuardCount() != 0) { + abort(); + } + // Invoke destroy. This is necessary since our base class may have + // implemented custom behavior in destroy(). + this->destroy(); + } + + protected: + /** + * Override our parent's destroy() method to make it protected. + * Callers should use the normal destructor instead of destroy + */ + virtual void destroy() { + this->TDD::destroy(); + } + + virtual void destroyNow(bool delayed) { + // Do nothing. This will always be invoked from the call to destroy inside + // our destructor. + assert(!delayed); + // prevent unused variable warnings when asserts are compiled out. + (void)delayed; + } + + private: + // Forbidden copy constructor and assignment operator + UndelayedDestruction(UndelayedDestruction const &) = delete; + UndelayedDestruction& operator=(UndelayedDestruction const &) = delete; +}; + +} diff --git a/folly/io/async/test/Util.h b/folly/io/async/test/Util.h new file mode 100644 index 00000000..e0b7360e --- /dev/null +++ b/folly/io/async/test/Util.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +/** + * Check how long a timeout took to fire. + * + * This method verifies: + * - that the timeout did not fire too early (never less than expectedMS) + * - that the timeout fired within a reasonable period of the expected + * duration. It must fire within the specified tolerance, excluding time + * that this process spent waiting to be scheduled. + * + * @param start A TimePoint object set just before the timeout + * was scheduled. + * @param end A TimePoint object set when the timeout fired. + * @param expectedMS The timeout duration, in milliseconds + * @param tolerance The tolerance, in milliseconds. + */ +#define T_CHECK_TIMEOUT(start, end, expectedMS, ...) \ + EXPECT_TRUE(::folly::checkTimeout((start), (end), \ + (expectedMS), false, \ + ##__VA_ARGS__)) + +/** + * Verify that an event took less than a specified amount of time. + * + * This is similar to T_CHECK_TIMEOUT, but does not fail if the event took less + * than the allowed time. + */ +#define T_CHECK_TIME_LT(start, end, expectedMS, ...) \ + EXPECT_TRUE(::folly::checkTimeout((start), (end), \ + (expectedMS), true, \ + ##__VA_ARGS__))