From: Yedidya Feldblum Date: Thu, 2 Jul 2015 20:58:58 +0000 (-0700) Subject: Lift thrift/lib/cpp/test/TNotificationQueueTest. X-Git-Tag: v0.49.0~4 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=4f6b99265ace26691d34b78ce51acddbf598d2ab;p=folly.git Lift thrift/lib/cpp/test/TNotificationQueueTest. Summary: [Folly] Lift thrift/lib/cpp/test/TNotificationQueueTest. `NotificationQueue` is already moved into folly; move its accompanying test suite as well. Reviewed By: @simpkins Differential Revision: D2207104 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index f0eaa9c9..ce8b6a78 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -201,6 +201,7 @@ nobase_follyinclude_HEADERS = \ io/async/HHWheelTimer.h \ io/async/Request.h \ io/async/SSLContext.h \ + io/async/ScopedEventBaseThread.h \ io/async/TimeoutManager.h \ io/async/test/AsyncSSLSocketTest.h \ io/async/test/BlockingSocket.h \ @@ -385,6 +386,7 @@ libfolly_la_SOURCES = \ io/async/EventBaseManager.cpp \ io/async/EventHandler.cpp \ io/async/SSLContext.cpp \ + io/async/ScopedEventBaseThread.cpp \ io/async/HHWheelTimer.cpp \ io/async/test/TimeUtil.cpp \ json.cpp \ diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h index 6bdf16fa..46c6a891 100644 --- a/folly/io/async/NotificationQueue.h +++ b/folly/io/async/NotificationQueue.h @@ -120,7 +120,7 @@ class NotificationQueue { * @returns true if the queue was drained, false otherwise. In practice, * this will only fail if someone else is already draining the queue. */ - bool consumeUntilDrained() noexcept; + bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept; /** * Get the NotificationQueue that this consumer is currently consuming @@ -165,7 +165,7 @@ class NotificationQueue { * * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation. */ - void consumeMessages(bool isDrain) noexcept; + void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept; void setActive(bool active, bool shouldLock = false) { if (!queue_) { @@ -595,11 +595,16 @@ void NotificationQueue::Consumer::handlerReady(uint16_t events) template void NotificationQueue::Consumer::consumeMessages( - bool isDrain) noexcept { + bool isDrain, size_t* numConsumed) noexcept { uint32_t numProcessed = 0; bool firstRun = true; setActive(true); SCOPE_EXIT { setActive(false, /* shouldLock = */ true); }; + SCOPE_EXIT { + if (numConsumed != nullptr) { + *numConsumed = numProcessed; + } + }; while (true) { // Try to decrement the eventfd. // @@ -760,7 +765,8 @@ void NotificationQueue::Consumer::stopConsuming() { } template -bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { +bool NotificationQueue::Consumer::consumeUntilDrained( + size_t* numConsumed) noexcept { { folly::SpinLockGuard g(queue_->spinlock_); if (queue_->draining_) { @@ -768,7 +774,7 @@ bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { } queue_->draining_ = true; } - consumeMessages(true); + consumeMessages(true, numConsumed); { folly::SpinLockGuard g(queue_->spinlock_); queue_->draining_ = false; diff --git a/folly/io/async/ScopedEventBaseThread.cpp b/folly/io/async/ScopedEventBaseThread.cpp new file mode 100644 index 00000000..29a44592 --- /dev/null +++ b/folly/io/async/ScopedEventBaseThread.cpp @@ -0,0 +1,66 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +using namespace std; + +namespace folly { + +ScopedEventBaseThread::ScopedEventBaseThread(bool autostart) { + if (autostart) { + start(); + } +} + +ScopedEventBaseThread::~ScopedEventBaseThread() { + stop(); +} + +ScopedEventBaseThread::ScopedEventBaseThread( + ScopedEventBaseThread&& other) noexcept = default; + +ScopedEventBaseThread& ScopedEventBaseThread::operator=( + ScopedEventBaseThread&& other) noexcept = default; + +void ScopedEventBaseThread::start() { + if (running()) { + return; + } + eventBase_ = make_unique(); + thread_ = make_unique(&EventBase::loopForever, &*eventBase_); + eventBase_->waitUntilRunning(); +} + +void ScopedEventBaseThread::stop() { + if (!running()) { + return; + } + eventBase_->terminateLoopSoon(); + thread_->join(); + eventBase_ = nullptr; + thread_ = nullptr; +} + +bool ScopedEventBaseThread::running() { + CHECK(bool(eventBase_) == bool(thread_)); + return eventBase_ && thread_; +} + +} diff --git a/folly/io/async/ScopedEventBaseThread.h b/folly/io/async/ScopedEventBaseThread.h new file mode 100644 index 00000000..60d62d79 --- /dev/null +++ b/folly/io/async/ScopedEventBaseThread.h @@ -0,0 +1,62 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace std { +class thread; +} + +namespace folly { + +/** + * A helper class to start a new thread running a TEventBase loop. + * + * The new thread will be started by the ScopedEventBaseThread constructor. + * When the ScopedEventBaseThread object is destroyed, the thread will be + * stopped. + */ +class ScopedEventBaseThread { + public: + explicit ScopedEventBaseThread(bool autostart = true); + ~ScopedEventBaseThread(); + + ScopedEventBaseThread(ScopedEventBaseThread&& other) noexcept; + ScopedEventBaseThread &operator=(ScopedEventBaseThread&& other) noexcept; + + /** + * Get a pointer to the TEventBase driving this thread. + */ + EventBase* getEventBase() const { + return eventBase_.get(); + } + + void start(); + void stop(); + bool running(); + + private: + ScopedEventBaseThread(const ScopedEventBaseThread& other) = delete; + ScopedEventBaseThread& operator=(const ScopedEventBaseThread& other) = delete; + + std::unique_ptr eventBase_; + std::unique_ptr thread_; +}; + +} diff --git a/folly/io/async/test/NotificationQueueTest.cpp b/folly/io/async/test/NotificationQueueTest.cpp new file mode 100644 index 00000000..c831ee10 --- /dev/null +++ b/folly/io/async/test/NotificationQueueTest.cpp @@ -0,0 +1,642 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +using namespace std; +using namespace folly; + +typedef NotificationQueue IntQueue; + +class QueueConsumer : public IntQueue::Consumer { + public: + QueueConsumer() {} + + void messageAvailable(int&& value) override { + messages.push_back(value); + if (fn) { + fn(value); + } + } + + std::function fn; + std::deque messages; +}; + +class QueueTest { + public: + explicit QueueTest(uint32_t maxSize = 0, + IntQueue::FdType type = IntQueue::FdType::EVENTFD) : + queue(maxSize, type), + terminationQueue(maxSize, type) + {} + + void sendOne(); + void putMessages(); + void multiConsumer(); + void maxQueueSize(); + void maxReadAtOnce(); + void destroyCallback(); + void useAfterFork(); + + IntQueue queue; + IntQueue terminationQueue; + +}; + +void QueueTest::sendOne() { + // Create a notification queue and a callback in this thread + EventBase eventBase; + + QueueConsumer consumer; + consumer.fn = [&](int) { + // Stop consuming after we receive 1 message + consumer.stopConsuming(); + }; + consumer.startConsuming(&eventBase, &queue); + + // Start a new EventBase thread to put a message on our queue + ScopedEventBaseThread t1; + t1.getEventBase()->runInEventBaseThread([&] { + queue.putMessage(5); + }); + + // Loop until we receive the message + eventBase.loop(); + + const auto& messages = consumer.messages; + EXPECT_EQ(1, messages.size()); + EXPECT_EQ(5, messages.at(0)); +} + +void QueueTest::putMessages() { + EventBase eventBase; + + QueueConsumer consumer; + QueueConsumer consumer2; + consumer.fn = [&](int msg) { + // Stop consuming after we receive a message with value 0, and start + // consumer2 + if (msg == 0) { + consumer.stopConsuming(); + consumer2.startConsuming(&eventBase, &queue); + } + }; + consumer2.fn = [&](int msg) { + // Stop consuming after we receive a message with value 0 + if (msg == 0) { + consumer2.stopConsuming(); + } + }; + consumer.startConsuming(&eventBase, &queue); + + list msgList = { 1, 2, 3, 4 }; + vector msgVector = { 5, 0, 9, 8, 7, 6, 7, 7, + 8, 8, 2, 9, 6, 6, 10, 2, 0 }; + // Call putMessages() several times to add messages to the queue + queue.putMessages(msgList.begin(), msgList.end()); + queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4); + // Test sending 17 messages, the pipe-based queue calls write in 16 byte + // chunks + queue.putMessages(msgVector.begin(), msgVector.end()); + + // Loop until the consumer has stopped + eventBase.loop(); + + vector expectedMessages = { 1, 2, 3, 4, 9, 8, 7, 5, 0 }; + vector expectedMessages2 = { 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0 }; + EXPECT_EQ(expectedMessages.size(), consumer.messages.size()); + for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) { + EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx)); + } + EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size()); + for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) { + EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx)); + } +} + +void QueueTest::multiConsumer() { + uint32_t numConsumers = 8; + uint32_t numMessages = 10000; + + // Create several consumers each running in their own EventBase thread + vector consumers(numConsumers); + vector threads(numConsumers); + + for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) { + QueueConsumer* consumer = &consumers[consumerIdx]; + + consumer->fn = [consumer, consumerIdx, this](int value) { + // Treat 0 as a signal to stop. + if (value == 0) { + consumer->stopConsuming(); + // Put a message on the terminationQueue to indicate we have stopped + terminationQueue.putMessage(consumerIdx); + } + }; + + EventBase* eventBase = threads[consumerIdx].getEventBase(); + eventBase->runInEventBaseThread([eventBase, consumer, this] { + consumer->startConsuming(eventBase, &queue); + }); + } + + // Now add a number of messages from this thread + // Start at 1 rather than 0, since 0 is the signal to stop. + for (uint32_t n = 1; n < numMessages; ++n) { + queue.putMessage(n); + } + // Now add a 0 for each consumer, to signal them to stop + for (uint32_t n = 0; n < numConsumers; ++n) { + queue.putMessage(0); + } + + // Wait until we get notified that all of the consumers have stopped + // We use a separate notification queue for this. + QueueConsumer terminationConsumer; + vector consumersStopped(numConsumers, 0); + uint32_t consumersRemaining = numConsumers; + terminationConsumer.fn = [&](int consumerIdx) { + --consumersRemaining; + if (consumersRemaining == 0) { + terminationConsumer.stopConsuming(); + } + + EXPECT_GE(consumerIdx, 0); + EXPECT_LT(consumerIdx, numConsumers); + ++consumersStopped[consumerIdx]; + }; + EventBase eventBase; + terminationConsumer.startConsuming(&eventBase, &terminationQueue); + eventBase.loop(); + + // Verify that we saw exactly 1 stop message for each consumer + for (uint32_t n = 0; n < numConsumers; ++n) { + EXPECT_EQ(1, consumersStopped[n]); + } + + // Validate that every message sent to the main queue was received exactly + // once. + vector messageCount(numMessages, 0); + for (uint32_t n = 0; n < numConsumers; ++n) { + for (int msg : consumers[n].messages) { + EXPECT_GE(msg, 0); + EXPECT_LT(msg, numMessages); + ++messageCount[msg]; + } + } + + // 0 is the signal to stop, and should have been received once by each + // consumer + EXPECT_EQ(numConsumers, messageCount[0]); + // All other messages should have been received exactly once + for (uint32_t n = 1; n < numMessages; ++n) { + EXPECT_EQ(1, messageCount[n]); + } +} + +void QueueTest::maxQueueSize() { + // Create a queue with a maximum size of 5, and fill it up + + for (int n = 0; n < 5; ++n) { + queue.tryPutMessage(n); + } + + // Calling tryPutMessage() now should fail + EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error); + + EXPECT_FALSE(queue.tryPutMessageNoThrow(5)); + int val = 5; + EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val))); + + // Pop a message from the queue + int result = -1; + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(0, result); + + // We should be able to write another message now that we popped one off. + queue.tryPutMessage(5); + // But now we are full again. + EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error); + // putMessage() should let us exceed the maximum + queue.putMessage(6); + + // Pull another mesage off + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(1, result); + + // tryPutMessage() should still fail since putMessage() actually put us over + // the max. + EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error); + + // Pull another message off and try again + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(2, result); + queue.tryPutMessage(7); + + // Now pull all the remaining messages off + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(3, result); + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(4, result); + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(5, result); + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(6, result); + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(7, result); + + // There should be no messages left + result = -1; + EXPECT_TRUE(!queue.tryConsume(result)); + EXPECT_EQ(-1, result); +} + + +void QueueTest::maxReadAtOnce() { + // Add 100 messages to the queue + for (int n = 0; n < 100; ++n) { + queue.putMessage(n); + } + + EventBase eventBase; + + // Record how many messages were processed each loop iteration. + uint32_t messagesThisLoop = 0; + std::vector messagesPerLoop; + std::function loopFinished = [&] { + // Record the current number of messages read this loop + messagesPerLoop.push_back(messagesThisLoop); + // Reset messagesThisLoop to 0 for the next loop + messagesThisLoop = 0; + + // To prevent use-after-free bugs when eventBase destructs, + // prevent calling runInLoop any more after the test is finished. + // 55 == number of times loop should run. + if (messagesPerLoop.size() != 55) { + // Reschedule ourself to run at the end of the next loop + eventBase.runInLoop(loopFinished); + } + }; + // Schedule the first call to loopFinished + eventBase.runInLoop(loopFinished); + + QueueConsumer consumer; + // Read the first 50 messages 10 at a time. + consumer.setMaxReadAtOnce(10); + consumer.fn = [&](int value) { + ++messagesThisLoop; + // After 50 messages, drop to reading only 1 message at a time. + if (value == 50) { + consumer.setMaxReadAtOnce(1); + } + // Terminate the loop when we reach the end of the messages. + if (value == 99) { + eventBase.terminateLoopSoon(); + } + }; + consumer.startConsuming(&eventBase, &queue); + + // Run the event loop until the consumer terminates it + eventBase.loop(); + + // The consumer should have read all 100 messages in order + EXPECT_EQ(100, consumer.messages.size()); + for (int n = 0; n < 100; ++n) { + EXPECT_EQ(n, consumer.messages.at(n)); + } + + // Currently EventBase happens to still run the loop callbacks even after + // terminateLoopSoon() is called. However, we don't really want to depend on + // this behavior. In case this ever changes in the future, add + // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the + // last loop iteration. + if (messagesThisLoop > 0) { + messagesPerLoop.push_back(messagesThisLoop); + messagesThisLoop = 0; + } + + // For the first 5 loops it should have read 10 messages each time. + // After that it should have read 1 messages per loop for the next 50 loops. + EXPECT_EQ(55, messagesPerLoop.size()); + for (int n = 0; n < 5; ++n) { + EXPECT_EQ(10, messagesPerLoop.at(n)); + } + for (int n = 5; n < 55; ++n) { + EXPECT_EQ(1, messagesPerLoop.at(n)); + } +} + + +void QueueTest::destroyCallback() { + // Rather than using QueueConsumer, define a separate class for the destroy + // test. The DestroyTestConsumer will delete itself inside the + // messageAvailable() callback. With a regular QueueConsumer this would + // destroy the std::function object while the function is running, which we + // should probably avoid doing. This uses a pointer to a std::function to + // avoid destroying the function object. + class DestroyTestConsumer : public IntQueue::Consumer { + public: + DestroyTestConsumer() {} + + void messageAvailable(int&& value) override { + if (fn && *fn) { + (*fn)(value); + } + } + + std::function *fn; + }; + + EventBase eventBase; + // Create a queue and add 2 messages to it + queue.putMessage(1); + queue.putMessage(2); + + // Create two QueueConsumers allocated on the heap. + // Have whichever one gets called first destroy both of the QueueConsumers. + // This way one consumer will be destroyed from inside its messageAvailable() + // callback, and one consume will be destroyed when it isn't inside + // messageAvailable(). + std::unique_ptr consumer1(new DestroyTestConsumer); + std::unique_ptr consumer2(new DestroyTestConsumer); + std::function fn = [&](int) { + consumer1.reset(); + consumer2.reset(); + }; + consumer1->fn = &fn; + consumer2->fn = &fn; + + consumer1->startConsuming(&eventBase, &queue); + consumer2->startConsuming(&eventBase, &queue); + + // Run the event loop. + eventBase.loop(); + + // One of the consumers should have fired, received the message, + // then destroyed both consumers. + EXPECT_TRUE(!consumer1); + EXPECT_TRUE(!consumer2); + // One message should be left in the queue + int result = 1; + EXPECT_TRUE(queue.tryConsume(result)); + EXPECT_EQ(2, result); +} + +TEST(NotificationQueueTest, ConsumeUntilDrained) { + // Basic tests: make sure we + // - drain all the messages + // - ignore any maxReadAtOnce + // - can't add messages during draining + EventBase eventBase; + IntQueue queue; + QueueConsumer consumer; + consumer.fn = [&](int i) { + EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error); + EXPECT_FALSE(queue.tryPutMessageNoThrow(i)); + EXPECT_THROW(queue.putMessage(i), std::runtime_error); + std::vector ints{1, 2, 3}; + EXPECT_THROW( + queue.putMessages(ints.begin(), ints.end()), + std::runtime_error); + }; + consumer.setMaxReadAtOnce(10); // We should ignore this + consumer.startConsuming(&eventBase, &queue); + for (int i = 0; i < 20; i++) { + queue.putMessage(i); + } + EXPECT_TRUE(consumer.consumeUntilDrained()); + EXPECT_EQ(20, consumer.messages.size()); + + // Make sure there can only be one drainer at once + folly::Baton<> callbackBaton, threadStartBaton; + consumer.fn = [&](int i) { + callbackBaton.wait(); + }; + QueueConsumer competingConsumer; + competingConsumer.startConsuming(&eventBase, &queue); + queue.putMessage(1); + atomic raceA {false}; + atomic raceB {false}; + size_t numConsA = 0; + size_t numConsB = 0; + auto thread = std::thread([&]{ + threadStartBaton.post(); + raceB = consumer.consumeUntilDrained(&numConsB) && numConsB; + }); + threadStartBaton.wait(); + raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA; + callbackBaton.post(); + thread.join(); + EXPECT_FALSE(raceA && raceB); + EXPECT_TRUE(raceA || raceB); + EXPECT_TRUE(raceA ^ raceB); +} + +TEST(NotificationQueueTest, ConsumeUntilDrainedStress) { + for (size_t i = 0; i < 1 << 8; ++i) { + // Basic tests: make sure we + // - drain all the messages + // - ignore any maxReadAtOnce + // - can't add messages during draining + EventBase eventBase; + IntQueue queue; + QueueConsumer consumer; + consumer.fn = [&](int i) { + EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error); + EXPECT_FALSE(queue.tryPutMessageNoThrow(i)); + EXPECT_THROW(queue.putMessage(i), std::runtime_error); + std::vector ints{1, 2, 3}; + EXPECT_THROW( + queue.putMessages(ints.begin(), ints.end()), + std::runtime_error); + }; + consumer.setMaxReadAtOnce(10); // We should ignore this + consumer.startConsuming(&eventBase, &queue); + for (int i = 0; i < 20; i++) { + queue.putMessage(i); + } + EXPECT_TRUE(consumer.consumeUntilDrained()); + EXPECT_EQ(20, consumer.messages.size()); + + // Make sure there can only be one drainer at once + folly::Baton<> callbackBaton, threadStartBaton; + consumer.fn = [&](int i) { + callbackBaton.wait(); + }; + QueueConsumer competingConsumer; + competingConsumer.startConsuming(&eventBase, &queue); + queue.putMessage(1); + atomic raceA {false}; + atomic raceB {false}; + size_t numConsA = 0; + size_t numConsB = 0; + auto thread = std::thread([&]{ + threadStartBaton.post(); + raceB = consumer.consumeUntilDrained(&numConsB) && numConsB; + }); + threadStartBaton.wait(); + raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA; + callbackBaton.post(); + thread.join(); + EXPECT_FALSE(raceA && raceB); + EXPECT_TRUE(raceA || raceB); + EXPECT_TRUE(raceA ^ raceB); + } +} + +TEST(NotificationQueueTest, SendOne) { + QueueTest qt; + qt.sendOne(); +} + +TEST(NotificationQueueTest, PutMessages) { + QueueTest qt; + qt.sendOne(); +} + +TEST(NotificationQueueTest, MultiConsumer) { + QueueTest qt; + qt.multiConsumer(); +} + +TEST(NotificationQueueTest, MaxQueueSize) { + QueueTest qt(5); + qt.maxQueueSize(); +} + +TEST(NotificationQueueTest, MaxReadAtOnce) { + QueueTest qt; + qt.maxReadAtOnce(); +} + +TEST(NotificationQueueTest, DestroyCallback) { + QueueTest qt; + qt.destroyCallback(); +} + +TEST(NotificationQueueTest, SendOnePipe) { + QueueTest qt(0, IntQueue::FdType::PIPE); + qt.sendOne(); +} + +TEST(NotificationQueueTest, PutMessagesPipe) { + QueueTest qt(0, IntQueue::FdType::PIPE); + qt.sendOne(); +} + +TEST(NotificationQueueTest, MultiConsumerPipe) { + QueueTest qt(0, IntQueue::FdType::PIPE); + qt.multiConsumer(); +} + +TEST(NotificationQueueTest, MaxQueueSizePipe) { + QueueTest qt(5, IntQueue::FdType::PIPE); + qt.maxQueueSize(); +} + +TEST(NotificationQueueTest, MaxReadAtOncePipe) { + QueueTest qt(0, IntQueue::FdType::PIPE); + qt.maxReadAtOnce(); +} + +TEST(NotificationQueueTest, DestroyCallbackPipe) { + QueueTest qt(0, IntQueue::FdType::PIPE); + qt.destroyCallback(); +} + +/* + * Test code that creates a TNotificationQueue, then forks, and incorrectly + * tries to send a message to the queue from the child process. + * + * The child process should crash in this scenario, since the child code has a + * bug. (Older versions of TNotificationQueue didn't catch this in the child, + * resulting in a crash in the parent process.) + */ +TEST(NotificationQueueTest, UseAfterFork) { + IntQueue queue; + int childStatus = 0; + QueueConsumer consumer; + + // Boost sets a custom SIGCHLD handler, which fails the test if a child + // process exits abnormally. We don't want this. + signal(SIGCHLD, SIG_DFL); + + // Log some info so users reading the test output aren't confused + // by the child process' crash log messages. + LOG(INFO) << "This test makes sure the child process crashes. " + << "Error log messagges and a backtrace are expected."; + + { + // Start a separate thread consuming from the queue + ScopedEventBaseThread t1; + t1.getEventBase()->runInEventBaseThread([&] { + consumer.startConsuming(t1.getEventBase(), &queue); + }); + + // Send a message to it, just for sanity checking + queue.putMessage(1234); + + // Fork + pid_t pid = fork(); + if (pid == 0) { + // The boost test framework installs signal handlers to catch errors. + // We only want to catch in the parent. In the child let SIGABRT crash + // us normally. + signal(SIGABRT, SIG_DFL); + + // Child. + // We're horrible people, so we try to send a message to the queue + // that is being consumed in the parent process. + // + // The putMessage() call should catch this error, and crash our process. + queue.putMessage(9876); + // We shouldn't reach here. + _exit(0); + } + + // Parent. Wait for the child to exit. + auto waited = waitpid(pid, &childStatus, 0); + EXPECT_EQ(pid, waited); + + // Send another message to the queue before we terminate the thread. + queue.putMessage(5678); + } + + // The child process should have crashed when it tried to call putMessage() + // on our TNotificationQueue. + EXPECT_TRUE(WIFSIGNALED(childStatus)); + EXPECT_EQ(SIGABRT, WTERMSIG(childStatus)); + + // Make sure the parent saw the expected messages. + // It should have gotten 1234 and 5678 from the parent process, but not + // 9876 from the child. + EXPECT_EQ(2, consumer.messages.size()); + EXPECT_EQ(1234, consumer.messages.front()); + consumer.messages.pop_front(); + EXPECT_EQ(5678, consumer.messages.front()); + consumer.messages.pop_front(); +} diff --git a/folly/io/async/test/ScopedEventBaseThreadTest.cpp b/folly/io/async/test/ScopedEventBaseThreadTest.cpp new file mode 100644 index 00000000..ba94518a --- /dev/null +++ b/folly/io/async/test/ScopedEventBaseThreadTest.cpp @@ -0,0 +1,79 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include + +using namespace std; +using namespace std::chrono; +using namespace folly; + +class ScopedEventBaseThreadTest : public testing::Test {}; + +TEST_F(ScopedEventBaseThreadTest, example) { + ScopedEventBaseThread sebt; + + Baton<> done; + sebt.getEventBase()->runInEventBaseThread([&] { done.post(); }); + done.timed_wait(steady_clock::now() + milliseconds(100)); +} + +TEST_F(ScopedEventBaseThreadTest, start_stop) { + ScopedEventBaseThread sebt(false); + + for (size_t i = 0; i < 4; ++i) { + EXPECT_EQ(nullptr, sebt.getEventBase()); + sebt.start(); + EXPECT_NE(nullptr, sebt.getEventBase()); + + Baton<> done; + sebt.getEventBase()->runInEventBaseThread([&] { done.post(); }); + done.timed_wait(steady_clock::now() + milliseconds(100)); + + EXPECT_NE(nullptr, sebt.getEventBase()); + sebt.stop(); + EXPECT_EQ(nullptr, sebt.getEventBase()); + } +} + +TEST_F(ScopedEventBaseThreadTest, move) { + auto sebt0 = ScopedEventBaseThread(); + auto sebt1 = std::move(sebt0); + auto sebt2 = std::move(sebt1); + + EXPECT_EQ(nullptr, sebt0.getEventBase()); + EXPECT_EQ(nullptr, sebt1.getEventBase()); + EXPECT_NE(nullptr, sebt2.getEventBase()); + + Baton<> done; + sebt2.getEventBase()->runInEventBaseThread([&] { done.post(); }); + done.timed_wait(steady_clock::now() + milliseconds(100)); +} + +TEST_F(ScopedEventBaseThreadTest, self_move) { + ScopedEventBaseThread sebt; + sebt = std::move(sebt); + + EXPECT_NE(nullptr, sebt.getEventBase()); + + Baton<> done; + sebt.getEventBase()->runInEventBaseThread([&] { done.post(); }); + done.timed_wait(steady_clock::now() + milliseconds(100)); +}