#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/Request.h>
#include <folly/Likely.h>
#include <folly/ScopeGuard.h>
/**
* A callback interface for consuming messages from the queue as they arrive.
*/
- class Consumer : private EventHandler {
+ class Consumer : public DelayedDestruction, private EventHandler {
public:
enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
destroyedFlagPtr_(nullptr),
maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
- virtual ~Consumer();
-
/**
* messageAvailable() will be invoked whenever a new
* message is available from the pipe.
return base_;
}
- virtual void handlerReady(uint16_t events) noexcept;
+ void handlerReady(uint16_t events) noexcept override;
+
+ protected:
+
+ void destroy() override;
+
+ virtual ~Consumer() {}
private:
/**
};
template<typename MessageT>
-NotificationQueue<MessageT>::Consumer::~Consumer() {
+void NotificationQueue<MessageT>::Consumer::destroy() {
// If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
// will be non-nullptr. Mark the value that it points to, so that
// handlerReady() will know the callback is destroyed, and that it cannot
if (destroyedFlagPtr_) {
*destroyedFlagPtr_ = true;
}
+ stopConsuming();
+ DelayedDestruction::destroy();
}
template<typename MessageT>
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::consumeMessages(
bool isDrain, size_t* numConsumed) noexcept {
+ DestructorGuard dg(this);
uint32_t numProcessed = 0;
bool firstRun = true;
setActive(true);
CHECK(destroyedFlagPtr_ == nullptr);
destroyedFlagPtr_ = &callbackDestroyed;
messageAvailable(std::move(msg));
+ destroyedFlagPtr_ = nullptr;
RequestContext::setContext(old_ctx);
if (callbackDestroyed) {
return;
}
- destroyedFlagPtr_ = nullptr;
// If the callback is no longer installed, we are done.
if (queue_ == nullptr) {
template<typename MessageT>
bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
size_t* numConsumed) noexcept {
+ DestructorGuard dg(this);
{
folly::SpinLockGuard g(queue_->spinlock_);
if (queue_->draining_) {
// avoid destroying the function object.
class DestroyTestConsumer : public IntQueue::Consumer {
public:
- DestroyTestConsumer() {}
-
void messageAvailable(int&& value) override {
+ DestructorGuard g(this);
if (fn && *fn) {
(*fn)(value);
}
}
std::function<void(int)> *fn;
+ protected:
+ virtual ~DestroyTestConsumer() = default;
};
EventBase eventBase;
// This way one consumer will be destroyed from inside its messageAvailable()
// callback, and one consume will be destroyed when it isn't inside
// messageAvailable().
- std::unique_ptr<DestroyTestConsumer> consumer1(new DestroyTestConsumer);
- std::unique_ptr<DestroyTestConsumer> consumer2(new DestroyTestConsumer);
+ std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
+ consumer1(new DestroyTestConsumer);
+ std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
+ consumer2(new DestroyTestConsumer);
std::function<void(int)> fn = [&](int) {
- consumer1.reset();
- consumer2.reset();
+ consumer1 = nullptr;
+ consumer2 = nullptr;
};
consumer1->fn = &fn;
consumer2->fn = &fn;
// We shouldn't reach here.
_exit(0);
}
+ PCHECK(pid > 0);
// Parent. Wait for the child to exit.
auto waited = waitpid(pid, &childStatus, 0);