#pragma once
-#include <folly/wangle/channel/HandlerContext.h>
#include <folly/futures/Future.h>
#include <folly/futures/Unit.h>
#include <folly/io/async/AsyncTransport.h>
#include <folly/io/async/DelayedDestruction.h>
+#include <folly/wangle/channel/HandlerContext.h>
#include <folly/ExceptionWrapper.h>
#include <folly/Memory.h>
namespace folly { namespace wangle {
+class PipelineBase;
+
class PipelineManager {
public:
virtual ~PipelineManager() = default;
return transport_;
}
- private:
- PipelineManager* manager_{nullptr};
- std::shared_ptr<AsyncTransport> transport_;
-};
-
-/*
- * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
- * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
- *
- * Use Unit for one of the types if your pipeline is unidirectional.
- * If R is Unit, read(), readEOF(), and readException() will be disabled.
- * If W is Unit, write() and close() will be disabled.
- */
-template <class R, class W = Unit>
-class Pipeline : public PipelineBase {
- public:
- Pipeline();
- ~Pipeline();
-
void setWriteFlags(WriteFlags flags);
WriteFlags getWriteFlags();
void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
std::pair<uint64_t, uint64_t> getReadBufferSettings();
- template <class T = R>
- typename std::enable_if<!std::is_same<T, Unit>::value>::type
- read(R msg);
-
- template <class T = R>
- typename std::enable_if<!std::is_same<T, Unit>::value>::type
- readEOF();
-
- template <class T = R>
- typename std::enable_if<!std::is_same<T, Unit>::value>::type
- readException(exception_wrapper e);
-
- template <class T = R>
- typename std::enable_if<!std::is_same<T, Unit>::value>::type
- transportActive();
-
- template <class T = R>
- typename std::enable_if<!std::is_same<T, Unit>::value>::type
- transportInactive();
-
- template <class T = W>
- typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
- write(W msg);
-
- template <class T = W>
- typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
- close();
-
template <class H>
- Pipeline& addBack(std::shared_ptr<H> handler);
+ PipelineBase& addBack(std::shared_ptr<H> handler);
template <class H>
- Pipeline& addBack(H&& handler);
+ PipelineBase& addBack(H&& handler);
template <class H>
- Pipeline& addBack(H* handler);
+ PipelineBase& addBack(H* handler);
template <class H>
- Pipeline& addFront(std::shared_ptr<H> handler);
+ PipelineBase& addFront(std::shared_ptr<H> handler);
template <class H>
- Pipeline& addFront(H&& handler);
+ PipelineBase& addFront(H&& handler);
template <class H>
- Pipeline& addFront(H* handler);
+ PipelineBase& addFront(H* handler);
template <class H>
- Pipeline& remove(H* handler);
+ PipelineBase& remove(H* handler);
template <class H>
- Pipeline& remove();
+ PipelineBase& remove();
- Pipeline& removeFront();
+ PipelineBase& removeFront();
- Pipeline& removeBack();
+ PipelineBase& removeBack();
template <class H>
H* getHandler(int i);
- void finalize();
-
// If one of the handlers owns the pipeline itself, use setOwner to ensure
// that the pipeline doesn't try to detach the handler during destruction,
// lest destruction ordering issues occur.
template <class H>
bool setOwner(H* handler);
- protected:
- explicit Pipeline(bool isStatic);
+ virtual void finalize() = 0;
+ protected:
template <class Context>
void addContextFront(Context* ctx);
void detachHandlers();
+ std::vector<std::shared_ptr<PipelineContext>> ctxs_;
+ std::vector<PipelineContext*> inCtxs_;
+ std::vector<PipelineContext*> outCtxs_;
+
private:
+ PipelineManager* manager_{nullptr};
+ std::shared_ptr<AsyncTransport> transport_;
+
template <class Context>
- Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
+ PipelineBase& addHelper(std::shared_ptr<Context>&& ctx, bool front);
template <class H>
- Pipeline& removeHelper(H* handler, bool checkEqual);
+ PipelineBase& removeHelper(H* handler, bool checkEqual);
typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
ContextIterator;
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
- bool isStatic_{false};
std::shared_ptr<PipelineContext> owner_;
- std::vector<std::shared_ptr<PipelineContext>> ctxs_;
- std::vector<PipelineContext*> inCtxs_;
- std::vector<PipelineContext*> outCtxs_;
+};
+
+/*
+ * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
+ * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
+ *
+ * Use Unit for one of the types if your pipeline is unidirectional.
+ * If R is Unit, read(), readEOF(), and readException() will be disabled.
+ * If W is Unit, write() and close() will be disabled.
+ */
+template <class R, class W = Unit>
+class Pipeline : public PipelineBase {
+ public:
+ Pipeline();
+ ~Pipeline();
+
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Unit>::value>::type
+ read(R msg);
+
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Unit>::value>::type
+ readEOF();
+
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Unit>::value>::type
+ readException(exception_wrapper e);
+
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Unit>::value>::type
+ transportActive();
+
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Unit>::value>::type
+ transportInactive();
+
+ template <class T = W>
+ typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+ write(W msg);
+
+ template <class T = W>
+ typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+ close();
+
+ void finalize() override;
+
+ protected:
+ explicit Pipeline(bool isStatic);
+
+ private:
+ bool isStatic_{false};
+
InboundLink<R>* front_{nullptr};
OutboundLink<W>* back_{nullptr};
};