Convert newlines in folly/portability/PThread.cpp
[folly.git] / folly / io / async / test / NotificationQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/NotificationQueue.h>
18
19 #include <sys/types.h>
20
21 #include <iostream>
22 #include <list>
23 #include <thread>
24
25 #include <folly/io/async/ScopedEventBaseThread.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/synchronization/Baton.h>
28
29 #ifndef _WIN32
30 #include <sys/wait.h>
31 #endif
32
33 using namespace std;
34 using namespace folly;
35
36 typedef NotificationQueue<int> IntQueue;
37
38 class QueueConsumer : public IntQueue::Consumer {
39  public:
40   QueueConsumer() {}
41
42   void messageAvailable(int&& value) noexcept override {
43     messages.push_back(value);
44     if (fn) {
45       fn(value);
46     }
47   }
48
49   std::function<void(int)> fn;
50   std::deque<int> messages;
51 };
52
53 class QueueTest {
54  public:
55   explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
56       : queue(maxSize, type), terminationQueue(maxSize, type) {}
57
58   void sendOne();
59   void putMessages();
60   void multiConsumer();
61   void maxQueueSize();
62   void maxReadAtOnce();
63   void destroyCallback();
64   void useAfterFork();
65
66   IntQueue queue;
67   IntQueue terminationQueue;
68
69 };
70
71 void QueueTest::sendOne() {
72   // Create a notification queue and a callback in this thread
73   EventBase eventBase;
74
75   QueueConsumer consumer;
76   consumer.fn = [&](int) {
77     // Stop consuming after we receive 1 message
78     consumer.stopConsuming();
79   };
80   consumer.startConsuming(&eventBase, &queue);
81
82   // Start a new EventBase thread to put a message on our queue
83   ScopedEventBaseThread t1;
84   t1.getEventBase()->runInEventBaseThread([&] {
85     this->queue.putMessage(5);
86   });
87
88   // Loop until we receive the message
89   eventBase.loop();
90
91   const auto& messages = consumer.messages;
92   EXPECT_EQ(1, messages.size());
93   EXPECT_EQ(5, messages.at(0));
94 }
95
96 void QueueTest::putMessages() {
97   EventBase eventBase;
98
99   QueueConsumer consumer;
100   QueueConsumer consumer2;
101   consumer.fn = [&](int msg) {
102     // Stop consuming after we receive a message with value 0, and start
103     // consumer2
104     if (msg == 0) {
105       consumer.stopConsuming();
106       consumer2.startConsuming(&eventBase, &queue);
107     }
108   };
109   consumer2.fn = [&](int msg) {
110     // Stop consuming after we receive a message with value 0
111     if (msg == 0) {
112       consumer2.stopConsuming();
113     }
114   };
115   consumer.startConsuming(&eventBase, &queue);
116
117   list<int> msgList = { 1, 2, 3, 4 };
118   vector<int> msgVector = { 5, 0, 9, 8, 7, 6, 7, 7,
119                             8, 8, 2, 9, 6, 6, 10, 2, 0 };
120   // Call putMessages() several times to add messages to the queue
121   queue.putMessages(msgList.begin(), msgList.end());
122   queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
123   // Test sending 17 messages, the pipe-based queue calls write in 16 byte
124   // chunks
125   queue.putMessages(msgVector.begin(), msgVector.end());
126
127   // Loop until the consumer has stopped
128   eventBase.loop();
129
130   vector<int> expectedMessages = { 1, 2, 3, 4, 9, 8, 7, 5, 0 };
131   vector<int> expectedMessages2 = { 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0 };
132   EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
133   for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
134     EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
135   }
136   EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
137   for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
138     EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
139   }
140 }
141
142 void QueueTest::multiConsumer() {
143   uint32_t numConsumers = 8;
144   uint32_t numMessages = 10000;
145
146   // Create several consumers each running in their own EventBase thread
147   vector<QueueConsumer> consumers(numConsumers);
148   vector<ScopedEventBaseThread> threads(numConsumers);
149
150   for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
151     QueueConsumer* consumer = &consumers[consumerIdx];
152
153     consumer->fn = [consumer, consumerIdx, this](int value) {
154       // Treat 0 as a signal to stop.
155       if (value == 0) {
156         consumer->stopConsuming();
157         // Put a message on the terminationQueue to indicate we have stopped
158         terminationQueue.putMessage(consumerIdx);
159       }
160     };
161
162     EventBase* eventBase = threads[consumerIdx].getEventBase();
163     eventBase->runInEventBaseThread([eventBase, consumer, this] {
164       consumer->startConsuming(eventBase, &queue);
165     });
166   }
167
168   // Now add a number of messages from this thread
169   // Start at 1 rather than 0, since 0 is the signal to stop.
170   for (uint32_t n = 1; n < numMessages; ++n) {
171     queue.putMessage(n);
172   }
173   // Now add a 0 for each consumer, to signal them to stop
174   for (uint32_t n = 0; n < numConsumers; ++n) {
175     queue.putMessage(0);
176   }
177
178   // Wait until we get notified that all of the consumers have stopped
179   // We use a separate notification queue for this.
180   QueueConsumer terminationConsumer;
181   vector<uint32_t> consumersStopped(numConsumers, 0);
182   uint32_t consumersRemaining = numConsumers;
183   terminationConsumer.fn = [&](int consumerIdx) {
184     --consumersRemaining;
185     if (consumersRemaining == 0) {
186       terminationConsumer.stopConsuming();
187     }
188
189     EXPECT_GE(consumerIdx, 0);
190     EXPECT_LT(consumerIdx, numConsumers);
191     ++consumersStopped[consumerIdx];
192   };
193   EventBase eventBase;
194   terminationConsumer.startConsuming(&eventBase, &terminationQueue);
195   eventBase.loop();
196
197   // Verify that we saw exactly 1 stop message for each consumer
198   for (uint32_t n = 0; n < numConsumers; ++n) {
199     EXPECT_EQ(1, consumersStopped[n]);
200   }
201
202   // Validate that every message sent to the main queue was received exactly
203   // once.
204   vector<int> messageCount(numMessages, 0);
205   for (uint32_t n = 0; n < numConsumers; ++n) {
206     for (int msg : consumers[n].messages) {
207       EXPECT_GE(msg, 0);
208       EXPECT_LT(msg, numMessages);
209       ++messageCount[msg];
210     }
211   }
212
213   // 0 is the signal to stop, and should have been received once by each
214   // consumer
215   EXPECT_EQ(numConsumers, messageCount[0]);
216   // All other messages should have been received exactly once
217   for (uint32_t n = 1; n < numMessages; ++n) {
218     EXPECT_EQ(1, messageCount[n]);
219   }
220 }
221
222 void QueueTest::maxQueueSize() {
223   // Create a queue with a maximum size of 5, and fill it up
224
225   for (int n = 0; n < 5; ++n) {
226     queue.tryPutMessage(n);
227   }
228
229   // Calling tryPutMessage() now should fail
230   EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
231
232   EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
233   int val = 5;
234   EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
235
236   // Pop a message from the queue
237   int result = -1;
238   EXPECT_TRUE(queue.tryConsume(result));
239   EXPECT_EQ(0, result);
240
241   // We should be able to write another message now that we popped one off.
242   queue.tryPutMessage(5);
243   // But now we are full again.
244   EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
245   // putMessage() should let us exceed the maximum
246   queue.putMessage(6);
247
248   // Pull another mesage off
249   EXPECT_TRUE(queue.tryConsume(result));
250   EXPECT_EQ(1, result);
251
252   // tryPutMessage() should still fail since putMessage() actually put us over
253   // the max.
254   EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
255
256   // Pull another message off and try again
257   EXPECT_TRUE(queue.tryConsume(result));
258   EXPECT_EQ(2, result);
259   queue.tryPutMessage(7);
260
261   // Now pull all the remaining messages off
262   EXPECT_TRUE(queue.tryConsume(result));
263   EXPECT_EQ(3, result);
264   EXPECT_TRUE(queue.tryConsume(result));
265   EXPECT_EQ(4, result);
266   EXPECT_TRUE(queue.tryConsume(result));
267   EXPECT_EQ(5, result);
268   EXPECT_TRUE(queue.tryConsume(result));
269   EXPECT_EQ(6, result);
270   EXPECT_TRUE(queue.tryConsume(result));
271   EXPECT_EQ(7, result);
272
273   // There should be no messages left
274   result = -1;
275   EXPECT_TRUE(!queue.tryConsume(result));
276   EXPECT_EQ(-1, result);
277 }
278
279
280 void QueueTest::maxReadAtOnce() {
281   // Add 100 messages to the queue
282   for (int n = 0; n < 100; ++n) {
283     queue.putMessage(n);
284   }
285
286   EventBase eventBase;
287
288   // Record how many messages were processed each loop iteration.
289   uint32_t messagesThisLoop = 0;
290   std::vector<uint32_t> messagesPerLoop;
291   std::function<void()> loopFinished = [&] {
292     // Record the current number of messages read this loop
293     messagesPerLoop.push_back(messagesThisLoop);
294     // Reset messagesThisLoop to 0 for the next loop
295     messagesThisLoop = 0;
296
297     // To prevent use-after-free bugs when eventBase destructs,
298     // prevent calling runInLoop any more after the test is finished.
299     // 55 == number of times loop should run.
300     if (messagesPerLoop.size() != 55) {
301       // Reschedule ourself to run at the end of the next loop
302       eventBase.runInLoop(loopFinished);
303     }
304   };
305   // Schedule the first call to loopFinished
306   eventBase.runInLoop(loopFinished);
307
308   QueueConsumer consumer;
309   // Read the first 50 messages 10 at a time.
310   consumer.setMaxReadAtOnce(10);
311   consumer.fn = [&](int value) {
312     ++messagesThisLoop;
313     // After 50 messages, drop to reading only 1 message at a time.
314     if (value == 50) {
315       consumer.setMaxReadAtOnce(1);
316     }
317     // Terminate the loop when we reach the end of the messages.
318     if (value == 99) {
319       eventBase.terminateLoopSoon();
320     }
321   };
322   consumer.startConsuming(&eventBase, &queue);
323
324   // Run the event loop until the consumer terminates it
325   eventBase.loop();
326
327   // The consumer should have read all 100 messages in order
328   EXPECT_EQ(100, consumer.messages.size());
329   for (int n = 0; n < 100; ++n) {
330     EXPECT_EQ(n, consumer.messages.at(n));
331   }
332
333   // Currently EventBase happens to still run the loop callbacks even after
334   // terminateLoopSoon() is called.  However, we don't really want to depend on
335   // this behavior.  In case this ever changes in the future, add
336   // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
337   // last loop iteration.
338   if (messagesThisLoop > 0) {
339     messagesPerLoop.push_back(messagesThisLoop);
340     messagesThisLoop = 0;
341   }
342
343   // For the first 5 loops it should have read 10 messages each time.
344   // After that it should have read 1 messages per loop for the next 50 loops.
345   EXPECT_EQ(55, messagesPerLoop.size());
346   for (int n = 0; n < 5; ++n) {
347     EXPECT_EQ(10, messagesPerLoop.at(n));
348   }
349   for (int n = 5; n < 55; ++n) {
350     EXPECT_EQ(1, messagesPerLoop.at(n));
351   }
352 }
353
354
355 void QueueTest::destroyCallback() {
356   // Rather than using QueueConsumer, define a separate class for the destroy
357   // test.  The DestroyTestConsumer will delete itself inside the
358   // messageAvailable() callback.  With a regular QueueConsumer this would
359   // destroy the std::function object while the function is running, which we
360   // should probably avoid doing.  This uses a pointer to a std::function to
361   // avoid destroying the function object.
362   class DestroyTestConsumer : public IntQueue::Consumer {
363    public:
364     void messageAvailable(int&& value) noexcept override {
365       DestructorGuard g(this);
366       if (fn && *fn) {
367         (*fn)(value);
368       }
369     }
370
371     std::function<void(int)> *fn;
372    protected:
373     ~DestroyTestConsumer() override = default;
374   };
375
376   EventBase eventBase;
377   // Create a queue and add 2 messages to it
378   queue.putMessage(1);
379   queue.putMessage(2);
380
381   // Create two QueueConsumers allocated on the heap.
382   // Have whichever one gets called first destroy both of the QueueConsumers.
383   // This way one consumer will be destroyed from inside its messageAvailable()
384   // callback, and one consume will be destroyed when it isn't inside
385   // messageAvailable().
386   std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
387     consumer1(new DestroyTestConsumer);
388   std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
389     consumer2(new DestroyTestConsumer);
390   std::function<void(int)> fn = [&](int) {
391     consumer1 = nullptr;
392     consumer2 = nullptr;
393   };
394   consumer1->fn = &fn;
395   consumer2->fn = &fn;
396
397   consumer1->startConsuming(&eventBase, &queue);
398   consumer2->startConsuming(&eventBase, &queue);
399
400   // Run the event loop.
401   eventBase.loop();
402
403   // One of the consumers should have fired, received the message,
404   // then destroyed both consumers.
405   EXPECT_TRUE(!consumer1);
406   EXPECT_TRUE(!consumer2);
407   // One message should be left in the queue
408   int result = 1;
409   EXPECT_TRUE(queue.tryConsume(result));
410   EXPECT_EQ(2, result);
411 }
412
413 TEST(NotificationQueueTest, ConsumeUntilDrained) {
414   // Basic tests: make sure we
415   // - drain all the messages
416   // - ignore any maxReadAtOnce
417   // - can't add messages during draining
418   EventBase eventBase;
419   IntQueue queue;
420   QueueConsumer consumer;
421   consumer.fn = [&](int i) {
422     EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
423     EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
424     EXPECT_THROW(queue.putMessage(i), std::runtime_error);
425     std::vector<int> ints{1, 2, 3};
426     EXPECT_THROW(
427         queue.putMessages(ints.begin(), ints.end()),
428         std::runtime_error);
429   };
430   consumer.setMaxReadAtOnce(10); // We should ignore this
431   consumer.startConsuming(&eventBase, &queue);
432   for (int i = 0; i < 20; i++) {
433     queue.putMessage(i);
434   }
435   EXPECT_TRUE(consumer.consumeUntilDrained());
436   EXPECT_EQ(20, consumer.messages.size());
437
438   // Make sure there can only be one drainer at once
439   folly::Baton<> callbackBaton, threadStartBaton;
440   consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
441   QueueConsumer competingConsumer;
442   competingConsumer.startConsuming(&eventBase, &queue);
443   queue.putMessage(1);
444   atomic<bool> raceA {false};
445   atomic<bool> raceB {false};
446   size_t numConsA = 0;
447   size_t numConsB = 0;
448   auto thread = std::thread([&]{
449     threadStartBaton.post();
450     raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
451   });
452   threadStartBaton.wait();
453   raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
454   callbackBaton.post();
455   thread.join();
456   EXPECT_FALSE(raceA && raceB);
457   EXPECT_TRUE(raceA || raceB);
458   EXPECT_TRUE(raceA ^ raceB);
459 }
460
461 TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
462   for (size_t i = 0; i < 1 << 8; ++i) {
463     // Basic tests: make sure we
464     // - drain all the messages
465     // - ignore any maxReadAtOnce
466     // - can't add messages during draining
467     EventBase eventBase;
468     IntQueue queue;
469     QueueConsumer consumer;
470     consumer.fn = [&](int j) {
471       EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
472       EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
473       EXPECT_THROW(queue.putMessage(j), std::runtime_error);
474       std::vector<int> ints{1, 2, 3};
475       EXPECT_THROW(
476           queue.putMessages(ints.begin(), ints.end()),
477           std::runtime_error);
478     };
479     consumer.setMaxReadAtOnce(10); // We should ignore this
480     consumer.startConsuming(&eventBase, &queue);
481     for (int j = 0; j < 20; j++) {
482       queue.putMessage(j);
483     }
484     EXPECT_TRUE(consumer.consumeUntilDrained());
485     EXPECT_EQ(20, consumer.messages.size());
486
487     // Make sure there can only be one drainer at once
488     folly::Baton<> callbackBaton, threadStartBaton;
489     consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
490     QueueConsumer competingConsumer;
491     competingConsumer.startConsuming(&eventBase, &queue);
492     queue.putMessage(1);
493     atomic<bool> raceA {false};
494     atomic<bool> raceB {false};
495     size_t numConsA = 0;
496     size_t numConsB = 0;
497     auto thread = std::thread([&]{
498       threadStartBaton.post();
499       raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
500     });
501     threadStartBaton.wait();
502     raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
503     callbackBaton.post();
504     thread.join();
505     EXPECT_FALSE(raceA && raceB);
506     EXPECT_TRUE(raceA || raceB);
507     EXPECT_TRUE(raceA ^ raceB);
508   }
509 }
510
511 #ifdef FOLLY_HAVE_EVENTFD
512 TEST(NotificationQueueTest, SendOneEventFD) {
513   QueueTest qt(0, IntQueue::FdType::EVENTFD);
514   qt.sendOne();
515 }
516
517 TEST(NotificationQueueTest, PutMessagesEventFD) {
518   QueueTest qt(0, IntQueue::FdType::EVENTFD);
519   qt.sendOne();
520 }
521
522 TEST(NotificationQueueTest, MultiConsumerEventFD) {
523   QueueTest qt(0, IntQueue::FdType::EVENTFD);
524   qt.multiConsumer();
525 }
526
527 TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
528   QueueTest qt(5, IntQueue::FdType::EVENTFD);
529   qt.maxQueueSize();
530 }
531
532 TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
533   QueueTest qt(0, IntQueue::FdType::EVENTFD);
534   qt.maxReadAtOnce();
535 }
536
537 TEST(NotificationQueueTest, DestroyCallbackEventFD) {
538   QueueTest qt(0, IntQueue::FdType::EVENTFD);
539   qt.destroyCallback();
540 }
541 #endif
542
543 TEST(NotificationQueueTest, SendOnePipe) {
544   QueueTest qt(0, IntQueue::FdType::PIPE);
545   qt.sendOne();
546 }
547
548 TEST(NotificationQueueTest, PutMessagesPipe) {
549   QueueTest qt(0, IntQueue::FdType::PIPE);
550   qt.sendOne();
551 }
552
553 TEST(NotificationQueueTest, MultiConsumerPipe) {
554   QueueTest qt(0, IntQueue::FdType::PIPE);
555   qt.multiConsumer();
556 }
557
558 TEST(NotificationQueueTest, MaxQueueSizePipe) {
559   QueueTest qt(5, IntQueue::FdType::PIPE);
560   qt.maxQueueSize();
561 }
562
563 TEST(NotificationQueueTest, MaxReadAtOncePipe) {
564   QueueTest qt(0, IntQueue::FdType::PIPE);
565   qt.maxReadAtOnce();
566 }
567
568 TEST(NotificationQueueTest, DestroyCallbackPipe) {
569   QueueTest qt(0, IntQueue::FdType::PIPE);
570   qt.destroyCallback();
571 }
572
573 #ifndef _WIN32
574 /*
575  * Test code that creates a NotificationQueue, then forks, and incorrectly
576  * tries to send a message to the queue from the child process.
577  *
578  * The child process should crash in this scenario, since the child code has a
579  * bug.  (Older versions of NotificationQueue didn't catch this in the child,
580  * resulting in a crash in the parent process.)
581  */
582 TEST(NotificationQueueTest, UseAfterFork) {
583   IntQueue queue;
584   int childStatus = 0;
585   QueueConsumer consumer;
586
587   // Boost sets a custom SIGCHLD handler, which fails the test if a child
588   // process exits abnormally.  We don't want this.
589   signal(SIGCHLD, SIG_DFL);
590
591   // Log some info so users reading the test output aren't confused
592   // by the child process' crash log messages.
593   LOG(INFO) << "This test makes sure the child process crashes.  "
594     << "Error log messagges and a backtrace are expected.";
595
596   {
597     // Start a separate thread consuming from the queue
598     ScopedEventBaseThread t1;
599     t1.getEventBase()->runInEventBaseThread([&] {
600       consumer.startConsuming(t1.getEventBase(), &queue);
601     });
602
603     // Send a message to it, just for sanity checking
604     queue.putMessage(1234);
605
606     // Fork
607     pid_t pid = fork();
608     if (pid == 0) {
609       // The boost test framework installs signal handlers to catch errors.
610       // We only want to catch in the parent.  In the child let SIGABRT crash
611       // us normally.
612       signal(SIGABRT, SIG_DFL);
613
614       // Child.
615       // We're horrible people, so we try to send a message to the queue
616       // that is being consumed in the parent process.
617       //
618       // The putMessage() call should catch this error, and crash our process.
619       queue.putMessage(9876);
620       // We shouldn't reach here.
621       _exit(0);
622     }
623     PCHECK(pid > 0);
624
625     // Parent.  Wait for the child to exit.
626     auto waited = waitpid(pid, &childStatus, 0);
627     EXPECT_EQ(pid, waited);
628
629     // Send another message to the queue before we terminate the thread.
630     queue.putMessage(5678);
631   }
632
633   // The child process should have crashed when it tried to call putMessage()
634   // on our NotificationQueue.
635   EXPECT_TRUE(WIFSIGNALED(childStatus));
636   EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
637
638   // Make sure the parent saw the expected messages.
639   // It should have gotten 1234 and 5678 from the parent process, but not
640   // 9876 from the child.
641   EXPECT_EQ(2, consumer.messages.size());
642   EXPECT_EQ(1234, consumer.messages.front());
643   consumer.messages.pop_front();
644   EXPECT_EQ(5678, consumer.messages.front());
645   consumer.messages.pop_front();
646 }
647 #endif
648
649 TEST(NotificationQueueConsumer, make) {
650   int value = 0;
651   EventBase evb;
652   NotificationQueue<int> queue(32);
653
654   auto consumer = decltype(queue)::Consumer::make([&](
655       int&& msg) noexcept { value = msg; });
656
657   consumer->startConsuming(&evb, &queue);
658
659   int const newValue = 10;
660   queue.tryPutMessage(newValue);
661
662   evb.loopOnce();
663
664   EXPECT_EQ(newValue, value);
665 }