2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/NotificationQueue.h>
19 #include <sys/types.h>
25 #include <folly/Baton.h>
26 #include <folly/io/async/ScopedEventBaseThread.h>
27 #include <folly/portability/GTest.h>
34 using namespace folly;
36 typedef NotificationQueue<int> IntQueue;
38 class QueueConsumer : public IntQueue::Consumer {
42 void messageAvailable(int&& value) noexcept override {
43 messages.push_back(value);
49 std::function<void(int)> fn;
50 std::deque<int> messages;
55 explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
56 : queue(maxSize, type), terminationQueue(maxSize, type) {}
63 void destroyCallback();
67 IntQueue terminationQueue;
71 void QueueTest::sendOne() {
72 // Create a notification queue and a callback in this thread
75 QueueConsumer consumer;
76 consumer.fn = [&](int) {
77 // Stop consuming after we receive 1 message
78 consumer.stopConsuming();
80 consumer.startConsuming(&eventBase, &queue);
82 // Start a new EventBase thread to put a message on our queue
83 ScopedEventBaseThread t1;
84 t1.getEventBase()->runInEventBaseThread([&] {
85 this->queue.putMessage(5);
88 // Loop until we receive the message
91 const auto& messages = consumer.messages;
92 EXPECT_EQ(1, messages.size());
93 EXPECT_EQ(5, messages.at(0));
96 void QueueTest::putMessages() {
99 QueueConsumer consumer;
100 QueueConsumer consumer2;
101 consumer.fn = [&](int msg) {
102 // Stop consuming after we receive a message with value 0, and start
105 consumer.stopConsuming();
106 consumer2.startConsuming(&eventBase, &queue);
109 consumer2.fn = [&](int msg) {
110 // Stop consuming after we receive a message with value 0
112 consumer2.stopConsuming();
115 consumer.startConsuming(&eventBase, &queue);
117 list<int> msgList = { 1, 2, 3, 4 };
118 vector<int> msgVector = { 5, 0, 9, 8, 7, 6, 7, 7,
119 8, 8, 2, 9, 6, 6, 10, 2, 0 };
120 // Call putMessages() several times to add messages to the queue
121 queue.putMessages(msgList.begin(), msgList.end());
122 queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
123 // Test sending 17 messages, the pipe-based queue calls write in 16 byte
125 queue.putMessages(msgVector.begin(), msgVector.end());
127 // Loop until the consumer has stopped
130 vector<int> expectedMessages = { 1, 2, 3, 4, 9, 8, 7, 5, 0 };
131 vector<int> expectedMessages2 = { 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0 };
132 EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
133 for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
134 EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
136 EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
137 for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
138 EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
142 void QueueTest::multiConsumer() {
143 uint32_t numConsumers = 8;
144 uint32_t numMessages = 10000;
146 // Create several consumers each running in their own EventBase thread
147 vector<QueueConsumer> consumers(numConsumers);
148 vector<ScopedEventBaseThread> threads(numConsumers);
150 for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
151 QueueConsumer* consumer = &consumers[consumerIdx];
153 consumer->fn = [consumer, consumerIdx, this](int value) {
154 // Treat 0 as a signal to stop.
156 consumer->stopConsuming();
157 // Put a message on the terminationQueue to indicate we have stopped
158 terminationQueue.putMessage(consumerIdx);
162 EventBase* eventBase = threads[consumerIdx].getEventBase();
163 eventBase->runInEventBaseThread([eventBase, consumer, this] {
164 consumer->startConsuming(eventBase, &queue);
168 // Now add a number of messages from this thread
169 // Start at 1 rather than 0, since 0 is the signal to stop.
170 for (uint32_t n = 1; n < numMessages; ++n) {
173 // Now add a 0 for each consumer, to signal them to stop
174 for (uint32_t n = 0; n < numConsumers; ++n) {
178 // Wait until we get notified that all of the consumers have stopped
179 // We use a separate notification queue for this.
180 QueueConsumer terminationConsumer;
181 vector<uint32_t> consumersStopped(numConsumers, 0);
182 uint32_t consumersRemaining = numConsumers;
183 terminationConsumer.fn = [&](int consumerIdx) {
184 --consumersRemaining;
185 if (consumersRemaining == 0) {
186 terminationConsumer.stopConsuming();
189 EXPECT_GE(consumerIdx, 0);
190 EXPECT_LT(consumerIdx, numConsumers);
191 ++consumersStopped[consumerIdx];
194 terminationConsumer.startConsuming(&eventBase, &terminationQueue);
197 // Verify that we saw exactly 1 stop message for each consumer
198 for (uint32_t n = 0; n < numConsumers; ++n) {
199 EXPECT_EQ(1, consumersStopped[n]);
202 // Validate that every message sent to the main queue was received exactly
204 vector<int> messageCount(numMessages, 0);
205 for (uint32_t n = 0; n < numConsumers; ++n) {
206 for (int msg : consumers[n].messages) {
208 EXPECT_LT(msg, numMessages);
213 // 0 is the signal to stop, and should have been received once by each
215 EXPECT_EQ(numConsumers, messageCount[0]);
216 // All other messages should have been received exactly once
217 for (uint32_t n = 1; n < numMessages; ++n) {
218 EXPECT_EQ(1, messageCount[n]);
222 void QueueTest::maxQueueSize() {
223 // Create a queue with a maximum size of 5, and fill it up
225 for (int n = 0; n < 5; ++n) {
226 queue.tryPutMessage(n);
229 // Calling tryPutMessage() now should fail
230 EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
232 EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
234 EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
236 // Pop a message from the queue
238 EXPECT_TRUE(queue.tryConsume(result));
239 EXPECT_EQ(0, result);
241 // We should be able to write another message now that we popped one off.
242 queue.tryPutMessage(5);
243 // But now we are full again.
244 EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
245 // putMessage() should let us exceed the maximum
248 // Pull another mesage off
249 EXPECT_TRUE(queue.tryConsume(result));
250 EXPECT_EQ(1, result);
252 // tryPutMessage() should still fail since putMessage() actually put us over
254 EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
256 // Pull another message off and try again
257 EXPECT_TRUE(queue.tryConsume(result));
258 EXPECT_EQ(2, result);
259 queue.tryPutMessage(7);
261 // Now pull all the remaining messages off
262 EXPECT_TRUE(queue.tryConsume(result));
263 EXPECT_EQ(3, result);
264 EXPECT_TRUE(queue.tryConsume(result));
265 EXPECT_EQ(4, result);
266 EXPECT_TRUE(queue.tryConsume(result));
267 EXPECT_EQ(5, result);
268 EXPECT_TRUE(queue.tryConsume(result));
269 EXPECT_EQ(6, result);
270 EXPECT_TRUE(queue.tryConsume(result));
271 EXPECT_EQ(7, result);
273 // There should be no messages left
275 EXPECT_TRUE(!queue.tryConsume(result));
276 EXPECT_EQ(-1, result);
280 void QueueTest::maxReadAtOnce() {
281 // Add 100 messages to the queue
282 for (int n = 0; n < 100; ++n) {
288 // Record how many messages were processed each loop iteration.
289 uint32_t messagesThisLoop = 0;
290 std::vector<uint32_t> messagesPerLoop;
291 std::function<void()> loopFinished = [&] {
292 // Record the current number of messages read this loop
293 messagesPerLoop.push_back(messagesThisLoop);
294 // Reset messagesThisLoop to 0 for the next loop
295 messagesThisLoop = 0;
297 // To prevent use-after-free bugs when eventBase destructs,
298 // prevent calling runInLoop any more after the test is finished.
299 // 55 == number of times loop should run.
300 if (messagesPerLoop.size() != 55) {
301 // Reschedule ourself to run at the end of the next loop
302 eventBase.runInLoop(loopFinished);
305 // Schedule the first call to loopFinished
306 eventBase.runInLoop(loopFinished);
308 QueueConsumer consumer;
309 // Read the first 50 messages 10 at a time.
310 consumer.setMaxReadAtOnce(10);
311 consumer.fn = [&](int value) {
313 // After 50 messages, drop to reading only 1 message at a time.
315 consumer.setMaxReadAtOnce(1);
317 // Terminate the loop when we reach the end of the messages.
319 eventBase.terminateLoopSoon();
322 consumer.startConsuming(&eventBase, &queue);
324 // Run the event loop until the consumer terminates it
327 // The consumer should have read all 100 messages in order
328 EXPECT_EQ(100, consumer.messages.size());
329 for (int n = 0; n < 100; ++n) {
330 EXPECT_EQ(n, consumer.messages.at(n));
333 // Currently EventBase happens to still run the loop callbacks even after
334 // terminateLoopSoon() is called. However, we don't really want to depend on
335 // this behavior. In case this ever changes in the future, add
336 // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
337 // last loop iteration.
338 if (messagesThisLoop > 0) {
339 messagesPerLoop.push_back(messagesThisLoop);
340 messagesThisLoop = 0;
343 // For the first 5 loops it should have read 10 messages each time.
344 // After that it should have read 1 messages per loop for the next 50 loops.
345 EXPECT_EQ(55, messagesPerLoop.size());
346 for (int n = 0; n < 5; ++n) {
347 EXPECT_EQ(10, messagesPerLoop.at(n));
349 for (int n = 5; n < 55; ++n) {
350 EXPECT_EQ(1, messagesPerLoop.at(n));
355 void QueueTest::destroyCallback() {
356 // Rather than using QueueConsumer, define a separate class for the destroy
357 // test. The DestroyTestConsumer will delete itself inside the
358 // messageAvailable() callback. With a regular QueueConsumer this would
359 // destroy the std::function object while the function is running, which we
360 // should probably avoid doing. This uses a pointer to a std::function to
361 // avoid destroying the function object.
362 class DestroyTestConsumer : public IntQueue::Consumer {
364 void messageAvailable(int&& value) noexcept override {
365 DestructorGuard g(this);
371 std::function<void(int)> *fn;
373 ~DestroyTestConsumer() override = default;
377 // Create a queue and add 2 messages to it
381 // Create two QueueConsumers allocated on the heap.
382 // Have whichever one gets called first destroy both of the QueueConsumers.
383 // This way one consumer will be destroyed from inside its messageAvailable()
384 // callback, and one consume will be destroyed when it isn't inside
385 // messageAvailable().
386 std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
387 consumer1(new DestroyTestConsumer);
388 std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
389 consumer2(new DestroyTestConsumer);
390 std::function<void(int)> fn = [&](int) {
397 consumer1->startConsuming(&eventBase, &queue);
398 consumer2->startConsuming(&eventBase, &queue);
400 // Run the event loop.
403 // One of the consumers should have fired, received the message,
404 // then destroyed both consumers.
405 EXPECT_TRUE(!consumer1);
406 EXPECT_TRUE(!consumer2);
407 // One message should be left in the queue
409 EXPECT_TRUE(queue.tryConsume(result));
410 EXPECT_EQ(2, result);
413 TEST(NotificationQueueTest, ConsumeUntilDrained) {
414 // Basic tests: make sure we
415 // - drain all the messages
416 // - ignore any maxReadAtOnce
417 // - can't add messages during draining
420 QueueConsumer consumer;
421 consumer.fn = [&](int i) {
422 EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
423 EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
424 EXPECT_THROW(queue.putMessage(i), std::runtime_error);
425 std::vector<int> ints{1, 2, 3};
427 queue.putMessages(ints.begin(), ints.end()),
430 consumer.setMaxReadAtOnce(10); // We should ignore this
431 consumer.startConsuming(&eventBase, &queue);
432 for (int i = 0; i < 20; i++) {
435 EXPECT_TRUE(consumer.consumeUntilDrained());
436 EXPECT_EQ(20, consumer.messages.size());
438 // Make sure there can only be one drainer at once
439 folly::Baton<> callbackBaton, threadStartBaton;
440 consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
441 QueueConsumer competingConsumer;
442 competingConsumer.startConsuming(&eventBase, &queue);
444 atomic<bool> raceA {false};
445 atomic<bool> raceB {false};
448 auto thread = std::thread([&]{
449 threadStartBaton.post();
450 raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
452 threadStartBaton.wait();
453 raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
454 callbackBaton.post();
456 EXPECT_FALSE(raceA && raceB);
457 EXPECT_TRUE(raceA || raceB);
458 EXPECT_TRUE(raceA ^ raceB);
461 TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
462 for (size_t i = 0; i < 1 << 8; ++i) {
463 // Basic tests: make sure we
464 // - drain all the messages
465 // - ignore any maxReadAtOnce
466 // - can't add messages during draining
469 QueueConsumer consumer;
470 consumer.fn = [&](int j) {
471 EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
472 EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
473 EXPECT_THROW(queue.putMessage(j), std::runtime_error);
474 std::vector<int> ints{1, 2, 3};
476 queue.putMessages(ints.begin(), ints.end()),
479 consumer.setMaxReadAtOnce(10); // We should ignore this
480 consumer.startConsuming(&eventBase, &queue);
481 for (int j = 0; j < 20; j++) {
484 EXPECT_TRUE(consumer.consumeUntilDrained());
485 EXPECT_EQ(20, consumer.messages.size());
487 // Make sure there can only be one drainer at once
488 folly::Baton<> callbackBaton, threadStartBaton;
489 consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
490 QueueConsumer competingConsumer;
491 competingConsumer.startConsuming(&eventBase, &queue);
493 atomic<bool> raceA {false};
494 atomic<bool> raceB {false};
497 auto thread = std::thread([&]{
498 threadStartBaton.post();
499 raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
501 threadStartBaton.wait();
502 raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
503 callbackBaton.post();
505 EXPECT_FALSE(raceA && raceB);
506 EXPECT_TRUE(raceA || raceB);
507 EXPECT_TRUE(raceA ^ raceB);
511 #ifdef FOLLY_HAVE_EVENTFD
512 TEST(NotificationQueueTest, SendOneEventFD) {
513 QueueTest qt(0, IntQueue::FdType::EVENTFD);
517 TEST(NotificationQueueTest, PutMessagesEventFD) {
518 QueueTest qt(0, IntQueue::FdType::EVENTFD);
522 TEST(NotificationQueueTest, MultiConsumerEventFD) {
523 QueueTest qt(0, IntQueue::FdType::EVENTFD);
527 TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
528 QueueTest qt(5, IntQueue::FdType::EVENTFD);
532 TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
533 QueueTest qt(0, IntQueue::FdType::EVENTFD);
537 TEST(NotificationQueueTest, DestroyCallbackEventFD) {
538 QueueTest qt(0, IntQueue::FdType::EVENTFD);
539 qt.destroyCallback();
543 TEST(NotificationQueueTest, SendOnePipe) {
544 QueueTest qt(0, IntQueue::FdType::PIPE);
548 TEST(NotificationQueueTest, PutMessagesPipe) {
549 QueueTest qt(0, IntQueue::FdType::PIPE);
553 TEST(NotificationQueueTest, MultiConsumerPipe) {
554 QueueTest qt(0, IntQueue::FdType::PIPE);
558 TEST(NotificationQueueTest, MaxQueueSizePipe) {
559 QueueTest qt(5, IntQueue::FdType::PIPE);
563 TEST(NotificationQueueTest, MaxReadAtOncePipe) {
564 QueueTest qt(0, IntQueue::FdType::PIPE);
568 TEST(NotificationQueueTest, DestroyCallbackPipe) {
569 QueueTest qt(0, IntQueue::FdType::PIPE);
570 qt.destroyCallback();
575 * Test code that creates a NotificationQueue, then forks, and incorrectly
576 * tries to send a message to the queue from the child process.
578 * The child process should crash in this scenario, since the child code has a
579 * bug. (Older versions of NotificationQueue didn't catch this in the child,
580 * resulting in a crash in the parent process.)
582 TEST(NotificationQueueTest, UseAfterFork) {
585 QueueConsumer consumer;
587 // Boost sets a custom SIGCHLD handler, which fails the test if a child
588 // process exits abnormally. We don't want this.
589 signal(SIGCHLD, SIG_DFL);
591 // Log some info so users reading the test output aren't confused
592 // by the child process' crash log messages.
593 LOG(INFO) << "This test makes sure the child process crashes. "
594 << "Error log messagges and a backtrace are expected.";
597 // Start a separate thread consuming from the queue
598 ScopedEventBaseThread t1;
599 t1.getEventBase()->runInEventBaseThread([&] {
600 consumer.startConsuming(t1.getEventBase(), &queue);
603 // Send a message to it, just for sanity checking
604 queue.putMessage(1234);
609 // The boost test framework installs signal handlers to catch errors.
610 // We only want to catch in the parent. In the child let SIGABRT crash
612 signal(SIGABRT, SIG_DFL);
615 // We're horrible people, so we try to send a message to the queue
616 // that is being consumed in the parent process.
618 // The putMessage() call should catch this error, and crash our process.
619 queue.putMessage(9876);
620 // We shouldn't reach here.
625 // Parent. Wait for the child to exit.
626 auto waited = waitpid(pid, &childStatus, 0);
627 EXPECT_EQ(pid, waited);
629 // Send another message to the queue before we terminate the thread.
630 queue.putMessage(5678);
633 // The child process should have crashed when it tried to call putMessage()
634 // on our NotificationQueue.
635 EXPECT_TRUE(WIFSIGNALED(childStatus));
636 EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
638 // Make sure the parent saw the expected messages.
639 // It should have gotten 1234 and 5678 from the parent process, but not
640 // 9876 from the child.
641 EXPECT_EQ(2, consumer.messages.size());
642 EXPECT_EQ(1234, consumer.messages.front());
643 consumer.messages.pop_front();
644 EXPECT_EQ(5678, consumer.messages.front());
645 consumer.messages.pop_front();
649 TEST(NotificationQueueConsumer, make) {
652 NotificationQueue<int> queue(32);
654 auto consumer = decltype(queue)::Consumer::make([&](
655 int&& msg) noexcept { value = msg; });
657 consumer->startConsuming(&evb, &queue);
659 int const newValue = 10;
660 queue.tryPutMessage(newValue);
664 EXPECT_EQ(newValue, value);