wangle/channel/AsyncSocketHandler.h \
wangle/channel/Handler.h \
wangle/channel/HandlerContext.h \
+ wangle/channel/HandlerContext-inl.h \
wangle/channel/OutputBufferingHandler.h \
wangle/channel/Pipeline.h \
+ wangle/channel/Pipeline-inl.h \
wangle/channel/StaticPipeline.h \
wangle/concurrent/BlockingQueue.h \
wangle/concurrent/Codel.h \
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly { namespace wangle {
+
+class PipelineContext {
+ public:
+ virtual ~PipelineContext() {}
+
+ virtual void attachPipeline() = 0;
+ virtual void detachPipeline() = 0;
+
+ virtual void attachTransport() = 0;
+ virtual void detachTransport() = 0;
+
+ template <class H, class HandlerContext>
+ void attachContext(H* handler, HandlerContext* ctx) {
+ if (++handler->attachCount_ == 1) {
+ handler->ctx_ = ctx;
+ } else {
+ handler->ctx_ = nullptr;
+ }
+ }
+
+ virtual void setNextIn(PipelineContext* ctx) = 0;
+ virtual void setNextOut(PipelineContext* ctx) = 0;
+};
+
+template <class In>
+class InboundLink {
+ public:
+ virtual ~InboundLink() {}
+ virtual void read(In msg) = 0;
+ virtual void readEOF() = 0;
+ virtual void readException(exception_wrapper e) = 0;
+};
+
+template <class Out>
+class OutboundLink {
+ public:
+ virtual ~OutboundLink() {}
+ virtual Future<void> write(Out msg) = 0;
+ virtual Future<void> close() = 0;
+};
+
+template <class P, class H, class Context>
+class ContextImplBase : public PipelineContext {
+ public:
+ ~ContextImplBase() {}
+
+ H* getHandler() {
+ return handler_.get();
+ }
+
+ void initialize(P* pipeline, std::shared_ptr<H> handler) {
+ pipeline_ = pipeline;
+ handler_ = std::move(handler);
+ }
+
+ // PipelineContext overrides
+ void attachPipeline() override {
+ if (!attached_) {
+ this->attachContext(handler_.get(), impl_);
+ handler_->attachPipeline(impl_);
+ attached_ = true;
+ }
+ }
+
+ void detachPipeline() override {
+ handler_->detachPipeline(impl_);
+ attached_ = false;
+ }
+
+ void attachTransport() override {
+ DestructorGuard dg(pipeline_);
+ handler_->attachTransport(impl_);
+ }
+
+ void detachTransport() override {
+ DestructorGuard dg(pipeline_);
+ handler_->detachTransport(impl_);
+ }
+
+ void setNextIn(PipelineContext* ctx) override {
+ auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
+ if (nextIn) {
+ nextIn_ = nextIn;
+ } else {
+ throw std::invalid_argument("inbound type mismatch");
+ }
+ }
+
+ void setNextOut(PipelineContext* ctx) override {
+ auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
+ if (nextOut) {
+ nextOut_ = nextOut;
+ } else {
+ throw std::invalid_argument("outbound type mismatch");
+ }
+ }
+
+ protected:
+ Context* impl_;
+ P* pipeline_;
+ std::shared_ptr<H> handler_;
+ InboundLink<typename H::rout>* nextIn_{nullptr};
+ OutboundLink<typename H::wout>* nextOut_{nullptr};
+
+ private:
+ bool attached_{false};
+ using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class P, class H>
+class ContextImpl
+ : public HandlerContext<typename H::rout,
+ typename H::wout>,
+ public InboundLink<typename H::rin>,
+ public OutboundLink<typename H::win>,
+ public ContextImplBase<P, H, HandlerContext<typename H::rout,
+ typename H::wout>> {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+ static const HandlerDir dir = HandlerDir::BOTH;
+
+ explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+ this->impl_ = this;
+ this->initialize(pipeline, std::move(handler));
+ }
+
+ // For StaticPipeline
+ ContextImpl() {
+ this->impl_ = this;
+ }
+
+ ~ContextImpl() {}
+
+ // HandlerContext overrides
+ void fireRead(Rout msg) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->read(std::forward<Rout>(msg));
+ } else {
+ LOG(WARNING) << "read reached end of pipeline";
+ }
+ }
+
+ void fireReadEOF() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->readEOF();
+ } else {
+ LOG(WARNING) << "readEOF reached end of pipeline";
+ }
+ }
+
+ void fireReadException(exception_wrapper e) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->readException(std::move(e));
+ } else {
+ LOG(WARNING) << "readException reached end of pipeline";
+ }
+ }
+
+ Future<void> fireWrite(Wout msg) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextOut_) {
+ return this->nextOut_->write(std::forward<Wout>(msg));
+ } else {
+ LOG(WARNING) << "write reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ Future<void> fireClose() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextOut_) {
+ return this->nextOut_->close();
+ } else {
+ LOG(WARNING) << "close reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return this->pipeline_->getTransport();
+ }
+
+ void setWriteFlags(WriteFlags flags) override {
+ this->pipeline_->setWriteFlags(flags);
+ }
+
+ WriteFlags getWriteFlags() override {
+ return this->pipeline_->getWriteFlags();
+ }
+
+ void setReadBufferSettings(
+ uint64_t minAvailable,
+ uint64_t allocationSize) override {
+ this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
+ }
+
+ std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
+ return this->pipeline_->getReadBufferSettings();
+ }
+
+ // InboundLink overrides
+ void read(Rin msg) override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->read(this, std::forward<Rin>(msg));
+ }
+
+ void readEOF() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->readEOF(this);
+ }
+
+ void readException(exception_wrapper e) override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->readException(this, std::move(e));
+ }
+
+ // OutboundLink overrides
+ Future<void> write(Win msg) override {
+ DestructorGuard dg(this->pipeline_);
+ return this->handler_->write(this, std::forward<Win>(msg));
+ }
+
+ Future<void> close() override {
+ DestructorGuard dg(this->pipeline_);
+ return this->handler_->close(this);
+ }
+
+ private:
+ using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class P, class H>
+class InboundContextImpl
+ : public InboundHandlerContext<typename H::rout>,
+ public InboundLink<typename H::rin>,
+ public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+ static const HandlerDir dir = HandlerDir::IN;
+
+ explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+ this->impl_ = this;
+ this->initialize(pipeline, std::move(handler));
+ }
+
+ // For StaticPipeline
+ InboundContextImpl() {
+ this->impl_ = this;
+ }
+
+ ~InboundContextImpl() {}
+
+ // InboundHandlerContext overrides
+ void fireRead(Rout msg) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->read(std::forward<Rout>(msg));
+ } else {
+ LOG(WARNING) << "read reached end of pipeline";
+ }
+ }
+
+ void fireReadEOF() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->readEOF();
+ } else {
+ LOG(WARNING) << "readEOF reached end of pipeline";
+ }
+ }
+
+ void fireReadException(exception_wrapper e) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->readException(std::move(e));
+ } else {
+ LOG(WARNING) << "readException reached end of pipeline";
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return this->pipeline_->getTransport();
+ }
+
+ // InboundLink overrides
+ void read(Rin msg) override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->read(this, std::forward<Rin>(msg));
+ }
+
+ void readEOF() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->readEOF(this);
+ }
+
+ void readException(exception_wrapper e) override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->readException(this, std::move(e));
+ }
+
+ private:
+ using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class P, class H>
+class OutboundContextImpl
+ : public OutboundHandlerContext<typename H::wout>,
+ public OutboundLink<typename H::win>,
+ public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+ static const HandlerDir dir = HandlerDir::OUT;
+
+ explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+ this->impl_ = this;
+ this->initialize(pipeline, std::move(handler));
+ }
+
+ // For StaticPipeline
+ OutboundContextImpl() {
+ this->impl_ = this;
+ }
+
+ ~OutboundContextImpl() {}
+
+ // OutboundHandlerContext overrides
+ Future<void> fireWrite(Wout msg) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextOut_) {
+ return this->nextOut_->write(std::forward<Wout>(msg));
+ } else {
+ LOG(WARNING) << "write reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ Future<void> fireClose() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextOut_) {
+ return this->nextOut_->close();
+ } else {
+ LOG(WARNING) << "close reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return this->pipeline_->getTransport();
+ }
+
+ // OutboundLink overrides
+ Future<void> write(Win msg) override {
+ DestructorGuard dg(this->pipeline_);
+ return this->handler_->write(this, std::forward<Win>(msg));
+ }
+
+ Future<void> close() override {
+ DestructorGuard dg(this->pipeline_);
+ return this->handler_->close(this);
+ }
+
+ private:
+ using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class Handler, class Pipeline>
+struct ContextType {
+ typedef typename std::conditional<
+ Handler::dir == HandlerDir::BOTH,
+ ContextImpl<Pipeline, Handler>,
+ typename std::conditional<
+ Handler::dir == HandlerDir::IN,
+ InboundContextImpl<Pipeline, Handler>,
+ OutboundContextImpl<Pipeline, Handler>
+ >::type>::type
+ type;
+};
+
+}} // folly::wangle
BOTH
};
-class PipelineContext {
- public:
- virtual ~PipelineContext() {}
-
- virtual void attachPipeline() = 0;
- virtual void detachPipeline() = 0;
-
- virtual void attachTransport() = 0;
- virtual void detachTransport() = 0;
-
- template <class H, class HandlerContext>
- void attachContext(H* handler, HandlerContext* ctx) {
- if (++handler->attachCount_ == 1) {
- handler->ctx_ = ctx;
- } else {
- handler->ctx_ = nullptr;
- }
- }
-
- virtual void setNextIn(PipelineContext* ctx) = 0;
- virtual void setNextOut(PipelineContext* ctx) = 0;
-};
-
-template <class In>
-class InboundLink {
- public:
- virtual ~InboundLink() {}
- virtual void read(In msg) = 0;
- virtual void readEOF() = 0;
- virtual void readException(exception_wrapper e) = 0;
-};
-
-template <class Out>
-class OutboundLink {
- public:
- virtual ~OutboundLink() {}
- virtual Future<void> write(Out msg) = 0;
- virtual Future<void> close() = 0;
-};
-
-template <class P, class H, class Context>
-class ContextImplBase : public PipelineContext {
- public:
- ~ContextImplBase() {}
-
- H* getHandler() {
- return handler_.get();
- }
-
- void initialize(P* pipeline, std::shared_ptr<H> handler) {
- pipeline_ = pipeline;
- handler_ = std::move(handler);
- }
-
- // PipelineContext overrides
- void attachPipeline() override {
- if (!attached_) {
- this->attachContext(handler_.get(), impl_);
- handler_->attachPipeline(impl_);
- attached_ = true;
- }
- }
-
- void detachPipeline() override {
- handler_->detachPipeline(impl_);
- attached_ = false;
- }
-
- void attachTransport() override {
- DestructorGuard dg(pipeline_);
- handler_->attachTransport(impl_);
- }
-
- void detachTransport() override {
- DestructorGuard dg(pipeline_);
- handler_->detachTransport(impl_);
- }
-
- void setNextIn(PipelineContext* ctx) override {
- auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
- if (nextIn) {
- nextIn_ = nextIn;
- } else {
- throw std::invalid_argument("inbound type mismatch");
- }
- }
-
- void setNextOut(PipelineContext* ctx) override {
- auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
- if (nextOut) {
- nextOut_ = nextOut;
- } else {
- throw std::invalid_argument("outbound type mismatch");
- }
- }
-
- protected:
- Context* impl_;
- P* pipeline_;
- std::shared_ptr<H> handler_;
- InboundLink<typename H::rout>* nextIn_{nullptr};
- OutboundLink<typename H::wout>* nextOut_{nullptr};
-
- private:
- bool attached_{false};
- using DestructorGuard = typename P::DestructorGuard;
-};
-
-template <class P, class H>
-class ContextImpl
- : public HandlerContext<typename H::rout,
- typename H::wout>,
- public InboundLink<typename H::rin>,
- public OutboundLink<typename H::win>,
- public ContextImplBase<P, H, HandlerContext<typename H::rout,
- typename H::wout>> {
- public:
- typedef typename H::rin Rin;
- typedef typename H::rout Rout;
- typedef typename H::win Win;
- typedef typename H::wout Wout;
- static const HandlerDir dir = HandlerDir::BOTH;
-
- explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
- this->impl_ = this;
- this->initialize(pipeline, std::move(handler));
- }
-
- // For StaticPipeline
- ContextImpl() {
- this->impl_ = this;
- }
-
- ~ContextImpl() {}
-
- // HandlerContext overrides
- void fireRead(Rout msg) override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextIn_) {
- this->nextIn_->read(std::forward<Rout>(msg));
- } else {
- LOG(WARNING) << "read reached end of pipeline";
- }
- }
-
- void fireReadEOF() override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextIn_) {
- this->nextIn_->readEOF();
- } else {
- LOG(WARNING) << "readEOF reached end of pipeline";
- }
- }
-
- void fireReadException(exception_wrapper e) override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextIn_) {
- this->nextIn_->readException(std::move(e));
- } else {
- LOG(WARNING) << "readException reached end of pipeline";
- }
- }
-
- Future<void> fireWrite(Wout msg) override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextOut_) {
- return this->nextOut_->write(std::forward<Wout>(msg));
- } else {
- LOG(WARNING) << "write reached end of pipeline";
- return makeFuture();
- }
- }
-
- Future<void> fireClose() override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextOut_) {
- return this->nextOut_->close();
- } else {
- LOG(WARNING) << "close reached end of pipeline";
- return makeFuture();
- }
- }
-
- std::shared_ptr<AsyncTransport> getTransport() override {
- return this->pipeline_->getTransport();
- }
-
- void setWriteFlags(WriteFlags flags) override {
- this->pipeline_->setWriteFlags(flags);
- }
-
- WriteFlags getWriteFlags() override {
- return this->pipeline_->getWriteFlags();
- }
-
- void setReadBufferSettings(
- uint64_t minAvailable,
- uint64_t allocationSize) override {
- this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
- }
-
- std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
- return this->pipeline_->getReadBufferSettings();
- }
-
- // InboundLink overrides
- void read(Rin msg) override {
- DestructorGuard dg(this->pipeline_);
- this->handler_->read(this, std::forward<Rin>(msg));
- }
-
- void readEOF() override {
- DestructorGuard dg(this->pipeline_);
- this->handler_->readEOF(this);
- }
-
- void readException(exception_wrapper e) override {
- DestructorGuard dg(this->pipeline_);
- this->handler_->readException(this, std::move(e));
- }
-
- // OutboundLink overrides
- Future<void> write(Win msg) override {
- DestructorGuard dg(this->pipeline_);
- return this->handler_->write(this, std::forward<Win>(msg));
- }
-
- Future<void> close() override {
- DestructorGuard dg(this->pipeline_);
- return this->handler_->close(this);
- }
-
- private:
- using DestructorGuard = typename P::DestructorGuard;
-};
-
-template <class P, class H>
-class InboundContextImpl
- : public InboundHandlerContext<typename H::rout>,
- public InboundLink<typename H::rin>,
- public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
- public:
- typedef typename H::rin Rin;
- typedef typename H::rout Rout;
- typedef typename H::win Win;
- typedef typename H::wout Wout;
- static const HandlerDir dir = HandlerDir::IN;
-
- explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
- this->impl_ = this;
- this->initialize(pipeline, std::move(handler));
- }
-
- // For StaticPipeline
- InboundContextImpl() {
- this->impl_ = this;
- }
-
- ~InboundContextImpl() {}
-
- // InboundHandlerContext overrides
- void fireRead(Rout msg) override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextIn_) {
- this->nextIn_->read(std::forward<Rout>(msg));
- } else {
- LOG(WARNING) << "read reached end of pipeline";
- }
- }
-
- void fireReadEOF() override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextIn_) {
- this->nextIn_->readEOF();
- } else {
- LOG(WARNING) << "readEOF reached end of pipeline";
- }
- }
-
- void fireReadException(exception_wrapper e) override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextIn_) {
- this->nextIn_->readException(std::move(e));
- } else {
- LOG(WARNING) << "readException reached end of pipeline";
- }
- }
-
- std::shared_ptr<AsyncTransport> getTransport() override {
- return this->pipeline_->getTransport();
- }
-
- // InboundLink overrides
- void read(Rin msg) override {
- DestructorGuard dg(this->pipeline_);
- this->handler_->read(this, std::forward<Rin>(msg));
- }
-
- void readEOF() override {
- DestructorGuard dg(this->pipeline_);
- this->handler_->readEOF(this);
- }
-
- void readException(exception_wrapper e) override {
- DestructorGuard dg(this->pipeline_);
- this->handler_->readException(this, std::move(e));
- }
-
- private:
- using DestructorGuard = typename P::DestructorGuard;
-};
-
-template <class P, class H>
-class OutboundContextImpl
- : public OutboundHandlerContext<typename H::wout>,
- public OutboundLink<typename H::win>,
- public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
- public:
- typedef typename H::rin Rin;
- typedef typename H::rout Rout;
- typedef typename H::win Win;
- typedef typename H::wout Wout;
- static const HandlerDir dir = HandlerDir::OUT;
-
- explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
- this->impl_ = this;
- this->initialize(pipeline, std::move(handler));
- }
-
- // For StaticPipeline
- OutboundContextImpl() {
- this->impl_ = this;
- }
-
- ~OutboundContextImpl() {}
-
- // OutboundHandlerContext overrides
- Future<void> fireWrite(Wout msg) override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextOut_) {
- return this->nextOut_->write(std::forward<Wout>(msg));
- } else {
- LOG(WARNING) << "write reached end of pipeline";
- return makeFuture();
- }
- }
-
- Future<void> fireClose() override {
- DestructorGuard dg(this->pipeline_);
- if (this->nextOut_) {
- return this->nextOut_->close();
- } else {
- LOG(WARNING) << "close reached end of pipeline";
- return makeFuture();
- }
- }
-
- std::shared_ptr<AsyncTransport> getTransport() override {
- return this->pipeline_->getTransport();
- }
-
- // OutboundLink overrides
- Future<void> write(Win msg) override {
- DestructorGuard dg(this->pipeline_);
- return this->handler_->write(this, std::forward<Win>(msg));
- }
-
- Future<void> close() override {
- DestructorGuard dg(this->pipeline_);
- return this->handler_->close(this);
- }
-
- private:
- using DestructorGuard = typename P::DestructorGuard;
-};
-
-template <class Handler, class Pipeline>
-struct ContextType {
- typedef typename std::conditional<
- Handler::dir == HandlerDir::BOTH,
- ContextImpl<Pipeline, Handler>,
- typename std::conditional<
- Handler::dir == HandlerDir::IN,
- InboundContextImpl<Pipeline, Handler>,
- OutboundContextImpl<Pipeline, Handler>
- >::type>::type
- type;
-};
+}} // folly::wangle
-}}
+#include <folly/wangle/channel/HandlerContext-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+template <class R, class W>
+Pipeline<R, W>::Pipeline() : isStatic_(false) {}
+
+template <class R, class W>
+Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
+ CHECK(isStatic_);
+}
+
+template <class R, class W>
+Pipeline<R, W>::~Pipeline() {
+ if (!isStatic_) {
+ detachHandlers();
+ }
+}
+
+template <class R, class W>
+std::shared_ptr<AsyncTransport> Pipeline<R, W>::getTransport() {
+ return transport_;
+}
+
+template <class R, class W>
+void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
+ writeFlags_ = flags;
+}
+
+template <class R, class W>
+WriteFlags Pipeline<R, W>::getWriteFlags() {
+ return writeFlags_;
+}
+
+template <class R, class W>
+void Pipeline<R, W>::setReadBufferSettings(
+ uint64_t minAvailable,
+ uint64_t allocationSize) {
+ readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
+}
+
+template <class R, class W>
+std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
+ return readBufferSettings_;
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::read(R msg) {
+ if (!front_) {
+ throw std::invalid_argument("read(): no inbound handler in Pipeline");
+ }
+ front_->read(std::forward<R>(msg));
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::readEOF() {
+ if (!front_) {
+ throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
+ }
+ front_->readEOF();
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::readException(exception_wrapper e) {
+ if (!front_) {
+ throw std::invalid_argument(
+ "readException(): no inbound handler in Pipeline");
+ }
+ front_->readException(std::move(e));
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
+Pipeline<R, W>::write(W msg) {
+ if (!back_) {
+ throw std::invalid_argument("write(): no outbound handler in Pipeline");
+ }
+ return back_->write(std::forward<W>(msg));
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
+Pipeline<R, W>::close() {
+ if (!back_) {
+ throw std::invalid_argument("close(): no outbound handler in Pipeline");
+ }
+ return back_->close();
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
+ typedef typename ContextType<H, Pipeline<R, W>>::type Context;
+ return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
+ return addBack(std::make_shared<H>(std::forward<H>(handler)));
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
+ return addBack(std::shared_ptr<H>(handler, [](H*){}));
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
+ typedef typename ContextType<H, Pipeline<R, W>>::type Context;
+ return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
+ return addFront(std::make_shared<H>(std::forward<H>(handler)));
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
+ return addFront(std::shared_ptr<H>(handler, [](H*){}));
+}
+
+template <class R, class W>
+template <class H>
+H* Pipeline<R, W>::getHandler(int i) {
+ typedef typename ContextType<H, Pipeline<R, W>>::type Context;
+ auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
+ CHECK(ctx);
+ return ctx->getHandler();
+}
+
+namespace detail {
+
+template <class T>
+inline void logWarningIfNotNothing(const std::string& warning) {
+ LOG(WARNING) << warning;
+}
+
+template <>
+inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
+ // do nothing
+}
+
+} // detail
+
+// TODO Have read/write/etc check that pipeline has been finalized
+template <class R, class W>
+void Pipeline<R, W>::finalize() {
+ if (!inCtxs_.empty()) {
+ front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
+ for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
+ inCtxs_[i]->setNextIn(inCtxs_[i+1]);
+ }
+ }
+
+ if (!outCtxs_.empty()) {
+ back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
+ for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
+ outCtxs_[i]->setNextOut(outCtxs_[i-1]);
+ }
+ }
+
+ if (!front_) {
+ detail::logWarningIfNotNothing<R>(
+ "No inbound handler in Pipeline, inbound operations will throw "
+ "std::invalid_argument");
+ }
+ if (!back_) {
+ detail::logWarningIfNotNothing<W>(
+ "No outbound handler in Pipeline, outbound operations will throw "
+ "std::invalid_argument");
+ }
+
+ for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
+ (*it)->attachPipeline();
+ }
+}
+
+template <class R, class W>
+template <class H>
+bool Pipeline<R, W>::setOwner(H* handler) {
+ typedef typename ContextType<H, Pipeline<R, W>>::type Context;
+ for (auto& ctx : ctxs_) {
+ auto ctxImpl = dynamic_cast<Context*>(ctx.get());
+ if (ctxImpl && ctxImpl->getHandler() == handler) {
+ owner_ = ctx;
+ return true;
+ }
+ }
+ return false;
+}
+
+template <class R, class W>
+void Pipeline<R, W>::attachTransport(
+ std::shared_ptr<AsyncTransport> transport) {
+ transport_ = std::move(transport);
+ for (auto& ctx : ctxs_) {
+ ctx->attachTransport();
+ }
+}
+
+template <class R, class W>
+void Pipeline<R, W>::detachTransport() {
+ transport_ = nullptr;
+ for (auto& ctx : ctxs_) {
+ ctx->detachTransport();
+ }
+}
+
+template <class R, class W>
+template <class Context>
+void Pipeline<R, W>::addContextFront(Context* ctx) {
+ addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
+}
+
+template <class R, class W>
+void Pipeline<R, W>::detachHandlers() {
+ for (auto& ctx : ctxs_) {
+ if (ctx != owner_) {
+ ctx->detachPipeline();
+ }
+ }
+}
+
+template <class R, class W>
+template <class Context>
+Pipeline<R, W>& Pipeline<R, W>::addHelper(
+ std::shared_ptr<Context>&& ctx,
+ bool front) {
+ ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
+ if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
+ inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
+ }
+ if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
+ outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
+ }
+ return *this;
+}
+
+}} // folly::wangle
#include <folly/io/async/DelayedDestruction.h>
#include <folly/ExceptionWrapper.h>
#include <folly/Memory.h>
-#include <glog/logging.h>
namespace folly { namespace wangle {
-// See Pipeline docblock for purpose
struct Nothing{};
-namespace detail {
-
-template <class T>
-inline void logWarningIfNotNothing(const std::string& warning) {
- LOG(WARNING) << warning;
-}
-
-template <>
-inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
- // do nothing
-}
-
-} // detail
-
/*
* R is the inbound type, i.e. inbound calls start with pipeline.read(R)
* W is the outbound type, i.e. outbound calls start with pipeline.write(W)
template <class R, class W = Nothing>
class Pipeline : public DelayedDestruction {
public:
- Pipeline() : isStatic_(false) {}
-
- ~Pipeline() {
- if (!isStatic_) {
- detachHandlers();
- }
- }
+ Pipeline();
+ ~Pipeline();
- std::shared_ptr<AsyncTransport> getTransport() {
- return transport_;
- }
+ std::shared_ptr<AsyncTransport> getTransport();
- void setWriteFlags(WriteFlags flags) {
- writeFlags_ = flags;
- }
+ void setWriteFlags(WriteFlags flags);
+ WriteFlags getWriteFlags();
- WriteFlags getWriteFlags() {
- return writeFlags_;
- }
-
- void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
- readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
- }
-
- std::pair<uint64_t, uint64_t> getReadBufferSettings() {
- return readBufferSettings_;
- }
+ void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
+ std::pair<uint64_t, uint64_t> getReadBufferSettings();
template <class T = R>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
- read(R msg) {
- if (!front_) {
- throw std::invalid_argument("read(): no inbound handler in Pipeline");
- }
- front_->read(std::forward<R>(msg));
- }
+ read(R msg);
template <class T = R>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
- readEOF() {
- if (!front_) {
- throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
- }
- front_->readEOF();
- }
+ readEOF();
template <class T = R>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
- readException(exception_wrapper e) {
- if (!front_) {
- throw std::invalid_argument(
- "readException(): no inbound handler in Pipeline");
- }
- front_->readException(std::move(e));
- }
+ readException(exception_wrapper e);
template <class T = W>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
- write(W msg) {
- if (!back_) {
- throw std::invalid_argument("write(): no outbound handler in Pipeline");
- }
- return back_->write(std::forward<W>(msg));
- }
+ write(W msg);
template <class T = W>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
- close() {
- if (!back_) {
- throw std::invalid_argument("close(): no outbound handler in Pipeline");
- }
- return back_->close();
- }
+ close();
template <class H>
- Pipeline& addBack(std::shared_ptr<H> handler) {
- typedef typename ContextType<H, Pipeline>::type Context;
- return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
- }
+ Pipeline& addBack(std::shared_ptr<H> handler);
template <class H>
- Pipeline& addBack(H&& handler) {
- return addBack(std::make_shared<H>(std::forward<H>(handler)));
- }
+ Pipeline& addBack(H&& handler);
template <class H>
- Pipeline& addBack(H* handler) {
- return addBack(std::shared_ptr<H>(handler, [](H*){}));
- }
+ Pipeline& addBack(H* handler);
template <class H>
- Pipeline& addFront(std::shared_ptr<H> handler) {
- typedef typename ContextType<H, Pipeline>::type Context;
- return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
- }
+ Pipeline& addFront(std::shared_ptr<H> handler);
template <class H>
- Pipeline& addFront(H&& handler) {
- return addFront(std::make_shared<H>(std::forward<H>(handler)));
- }
+ Pipeline& addFront(H&& handler);
template <class H>
- Pipeline& addFront(H* handler) {
- return addFront(std::shared_ptr<H>(handler, [](H*){}));
- }
+ Pipeline& addFront(H* handler);
template <class H>
- H* getHandler(int i) {
- typedef typename ContextType<H, Pipeline>::type Context;
- auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
- CHECK(ctx);
- return ctx->getHandler();
- }
-
- // TODO Have read/write/etc check that pipeline has been finalized
- void finalize() {
- if (!inCtxs_.empty()) {
- front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
- for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
- inCtxs_[i]->setNextIn(inCtxs_[i+1]);
- }
- }
-
- if (!outCtxs_.empty()) {
- back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
- for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
- outCtxs_[i]->setNextOut(outCtxs_[i-1]);
- }
- }
-
- if (!front_) {
- detail::logWarningIfNotNothing<R>(
- "No inbound handler in Pipeline, inbound operations will throw "
- "std::invalid_argument");
- }
- if (!back_) {
- detail::logWarningIfNotNothing<W>(
- "No outbound handler in Pipeline, outbound operations will throw "
- "std::invalid_argument");
- }
-
- for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
- (*it)->attachPipeline();
- }
- }
+ H* getHandler(int i);
+
+ void finalize();
// If one of the handlers owns the pipeline itself, use setOwner to ensure
// that the pipeline doesn't try to detach the handler during destruction,
// lest destruction ordering issues occur.
// See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
template <class H>
- bool setOwner(H* handler) {
- typedef typename ContextType<H, Pipeline>::type Context;
- for (auto& ctx : ctxs_) {
- auto ctxImpl = dynamic_cast<Context*>(ctx.get());
- if (ctxImpl && ctxImpl->getHandler() == handler) {
- owner_ = ctx;
- return true;
- }
- }
- return false;
- }
-
- void attachTransport(
- std::shared_ptr<AsyncTransport> transport) {
- transport_ = std::move(transport);
- for (auto& ctx : ctxs_) {
- ctx->attachTransport();
- }
- }
-
- void detachTransport() {
- transport_ = nullptr;
- for (auto& ctx : ctxs_) {
- ctx->detachTransport();
- }
- }
+ bool setOwner(H* handler);
+
+ void attachTransport(std::shared_ptr<AsyncTransport> transport);
+
+ void detachTransport();
protected:
- explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
- CHECK(isStatic_);
- }
+ explicit Pipeline(bool isStatic);
template <class Context>
- void addContextFront(Context* ctx) {
- addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
- }
-
- void detachHandlers() {
- for (auto& ctx : ctxs_) {
- if (ctx != owner_) {
- ctx->detachPipeline();
- }
- }
- }
+ void addContextFront(Context* ctx);
+
+ void detachHandlers();
private:
template <class Context>
- Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front) {
- ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
- if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
- inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
- }
- if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
- outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
- }
- return *this;
- }
+ Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
std::shared_ptr<AsyncTransport> transport_;
WriteFlags writeFlags_{WriteFlags::NONE};
};
}
+
+#include <folly/wangle/channel/Pipeline-inl.h>