, bootstrap_(bootstrap) {}
void connectSuccess() noexcept override {
+ if (bootstrap_->getPipeline()) {
+ bootstrap_->getPipeline()->transportActive();
+ }
promise_.setValue(bootstrap_->getPipeline());
delete this;
}
socket->connect(
new ConnectCallback(std::move(promise), this), address);
pipeline_ = pipelineFactory_->newPipeline(socket);
- if (pipeline_) {
- pipeline_->attachTransport(socket);
- }
});
return retval;
}
std::shared_ptr<AsyncSocket>(
transport.release(),
folly::DelayedDestruction::Destructor())));
+ pipeline->transportActive();
auto connection = new ServerConnection(std::move(pipeline));
Acceptor::addConnection(connection);
}
AsyncSocketHandler(AsyncSocketHandler&&) = default;
~AsyncSocketHandler() {
- if (socket_) {
- detachReadCallback();
- }
+ detachReadCallback();
}
void attachReadCallback() {
}
void detachReadCallback() {
- if (socket_->getReadCallback() == this) {
+ if (socket_ && socket_->getReadCallback() == this) {
socket_->setReadCB(nullptr);
}
+ auto ctx = getContext();
+ if (ctx && !firedInactive_) {
+ firedInactive_ = true;
+ ctx->fireTransportInactive();
+ }
}
void attachEventBase(folly::EventBase* eventBase) {
}
}
- void attachPipeline(Context* ctx) override {
+ void transportActive(Context* ctx) override {
+ ctx->getPipeline()->setTransport(socket_);
attachReadCallback();
+ ctx->fireTransportActive();
+ }
+
+ void detachPipeline(Context* ctx) override {
+ detachReadCallback();
}
folly::Future<void> write(
folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
std::shared_ptr<AsyncSocket> socket_{nullptr};
+ bool firedInactive_{false};
};
}}
virtual ~HandlerBase() {}
virtual void attachPipeline(Context* ctx) {}
- virtual void attachTransport(Context* ctx) {}
-
virtual void detachPipeline(Context* ctx) {}
- virtual void detachTransport(Context* ctx) {}
Context* getContext() {
if (attachCount_ != 1) {
virtual void readException(Context* ctx, exception_wrapper e) {
ctx->fireReadException(std::move(e));
}
+ virtual void transportActive(Context* ctx) {
+ ctx->fireTransportActive();
+ }
+ virtual void transportInactive(Context* ctx) {
+ ctx->fireTransportInactive();
+ }
virtual Future<void> write(Context* ctx, Win msg) = 0;
virtual Future<void> close(Context* ctx) {
exception_wrapper e) {}
virtual void channelRegistered(HandlerContext* ctx) {}
virtual void channelUnregistered(HandlerContext* ctx) {}
- virtual void channelActive(HandlerContext* ctx) {}
- virtual void channelInactive(HandlerContext* ctx) {}
virtual void channelReadComplete(HandlerContext* ctx) {}
virtual void userEventTriggered(HandlerContext* ctx, void* evt) {}
virtual void channelWritabilityChanged(HandlerContext* ctx) {}
virtual void readException(Context* ctx, exception_wrapper e) {
ctx->fireReadException(std::move(e));
}
+ virtual void transportActive(Context* ctx) {
+ ctx->fireTransportActive();
+ }
+ virtual void transportInactive(Context* ctx) {
+ ctx->fireTransportInactive();
+ }
};
template <class Win, class Wout = Win>
virtual void attachPipeline() = 0;
virtual void detachPipeline() = 0;
- virtual void attachTransport() = 0;
- virtual void detachTransport() = 0;
-
template <class H, class HandlerContext>
void attachContext(H* handler, HandlerContext* ctx) {
if (++handler->attachCount_ == 1) {
virtual void read(In msg) = 0;
virtual void readEOF() = 0;
virtual void readException(exception_wrapper e) = 0;
+ virtual void transportActive() = 0;
+ virtual void transportInactive() = 0;
};
template <class Out>
attached_ = false;
}
- void attachTransport() override {
- DestructorGuard dg(pipeline_);
- handler_->attachTransport(impl_);
- }
-
- void detachTransport() override {
- DestructorGuard dg(pipeline_);
- handler_->detachTransport(impl_);
- }
-
void setNextIn(PipelineContext* ctx) override {
auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
if (nextIn) {
}
}
+ void fireTransportActive() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->transportActive();
+ }
+ }
+
+ void fireTransportInactive() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->transportInactive();
+ }
+ }
+
Future<void> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->pipeline_;
}
- std::shared_ptr<AsyncTransport> getTransport() override {
- return this->pipeline_->getTransport();
- }
-
void setWriteFlags(WriteFlags flags) override {
this->pipeline_->setWriteFlags(flags);
}
this->handler_->readException(this, std::move(e));
}
+ void transportActive() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->transportActive(this);
+ }
+
+ void transportInactive() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->transportInactive(this);
+ }
+
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
}
}
- PipelineBase* getPipeline() override {
- return this->pipeline_;
+ void fireTransportActive() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->transportActive();
+ }
}
- std::shared_ptr<AsyncTransport> getTransport() override {
- return this->pipeline_->getTransport();
+ void fireTransportInactive() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->transportInactive();
+ }
+ }
+
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
}
// InboundLink overrides
this->handler_->readException(this, std::move(e));
}
+ void transportActive() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->transportActive(this);
+ }
+
+ void transportInactive() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->transportInactive(this);
+ }
+
private:
using DestructorGuard = typename P::DestructorGuard;
};
return this->pipeline_;
}
- std::shared_ptr<AsyncTransport> getTransport() override {
- return this->pipeline_->getTransport();
- }
-
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
virtual void fireRead(In msg) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
+ virtual void fireTransportActive() = 0;
+ virtual void fireTransportInactive() = 0;
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
virtual PipelineBase* getPipeline() = 0;
-
- virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return getPipeline()->getTransport();
+ }
virtual void setWriteFlags(WriteFlags flags) = 0;
virtual WriteFlags getWriteFlags() = 0;
virtual void fireRead(In msg) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
+ virtual void fireTransportActive() = 0;
+ virtual void fireTransportInactive() = 0;
virtual PipelineBase* getPipeline() = 0;
-
- virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return getPipeline()->getTransport();
+ }
// TODO Need get/set writeFlags, readBufferSettings? Probably not.
// Do we even really need them stored in the pipeline at all?
virtual Future<void> fireClose() = 0;
virtual PipelineBase* getPipeline() = 0;
-
- virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return getPipeline()->getTransport();
+ }
};
enum class HandlerDir {
}
}
-template <class R, class W>
-std::shared_ptr<AsyncTransport> Pipeline<R, W>::getTransport() {
- return transport_;
-}
-
template <class R, class W>
void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
writeFlags_ = flags;
front_->readEOF();
}
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::transportActive() {
+ if (front_) {
+ front_->transportActive();
+ }
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::transportInactive() {
+ if (front_) {
+ front_->transportInactive();
+ }
+}
+
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
return false;
}
-template <class R, class W>
-void Pipeline<R, W>::attachTransport(
- std::shared_ptr<AsyncTransport> transport) {
- transport_ = std::move(transport);
- for (auto& ctx : ctxs_) {
- ctx->attachTransport();
- }
-}
-
-template <class R, class W>
-void Pipeline<R, W>::detachTransport() {
- transport_ = nullptr;
- for (auto& ctx : ctxs_) {
- ctx->detachTransport();
- }
-}
-
template <class R, class W>
template <class Context>
void Pipeline<R, W>::addContextFront(Context* ctx) {
}
}
+ void setTransport(std::shared_ptr<AsyncTransport> transport) {
+ transport_ = transport;
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return transport_;
+ }
+
private:
PipelineManager* manager_{nullptr};
+ std::shared_ptr<AsyncTransport> transport_;
};
struct Nothing{};
Pipeline();
~Pipeline();
- std::shared_ptr<AsyncTransport> getTransport();
-
void setWriteFlags(WriteFlags flags);
WriteFlags getWriteFlags();
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
readException(exception_wrapper e);
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+ transportActive();
+
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+ transportInactive();
+
template <class T = W>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
write(W msg);
template <class H>
bool setOwner(H* handler);
- void attachTransport(std::shared_ptr<AsyncTransport> transport);
-
- void detachTransport();
-
protected:
explicit Pipeline(bool isStatic);
template <class Context>
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
- std::shared_ptr<AsyncTransport> transport_;
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
EventBase eb;
auto socket = AsyncSocket::newSocket(&eb);
- EXPECT_CALL(mockHandler, attachTransport(_));
- pipeline.attachTransport(socket);
+ pipeline.setTransport(socket);
// Buffering should prevent writes until the EB loops, and the writes should
// be batched into one write call.
.finalize());
}
}
-
-TEST(Pipeline, AttachTransport) {
- IntHandler handler;
- EXPECT_CALL(handler, attachPipeline(_));
- StaticPipeline<int, int, IntHandler>
- pipeline(&handler);
-
- EventBase eb;
- auto socket = AsyncSocket::newSocket(&eb);
-
- EXPECT_CALL(handler, attachTransport(_));
- pipeline.attachTransport(socket);
-
- EXPECT_CALL(handler, detachTransport(_));
- pipeline.detachTransport();
-
- EXPECT_CALL(handler, detachPipeline(_));
-}