std::atomic<int> connections{0};
-class TestHandlerPipeline
- : public HandlerAdapter<void*,
- std::exception> {
+class TestHandlerPipeline : public InboundHandler<void*> {
public:
void read(Context* ctx, void* conn) {
connections++;
return ctx->fireRead(conn);
}
-
- Future<void> write(Context* ctx, std::exception e) {
- return ctx->fireWrite(e);
- }
};
template <typename HandlerPipeline>
CHECK(connections == 1);
}
-class TestUDPPipeline
- : public HandlerAdapter<void*,
- std::exception> {
+class TestUDPPipeline : public InboundHandler<void*> {
public:
void read(Context* ctx, void* conn) {
connections++;
}
-
- Future<void> write(Context* ctx, std::exception e) {
- return ctx->fireWrite(e);
- }
};
TEST(Bootstrap, UDP) {
public:
explicit ServerAcceptor(
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
- std::shared_ptr<folly::wangle::Pipeline<
- void*, std::exception>> acceptorPipeline,
+ std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline,
EventBase* base)
: Acceptor(ServerSocketConfig())
, base_(base)
EventBase* base_;
std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
- std::shared_ptr<folly::wangle::Pipeline<
- void*, std::exception>> acceptorPipeline_;
+ std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline_;
};
template <typename Pipeline>
public:
explicit ServerAcceptorFactory(
std::shared_ptr<PipelineFactory<Pipeline>> factory,
- std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<
- void*, std::exception>>> pipeline)
+ std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<void*>>> pipeline)
: factory_(factory)
, pipeline_(pipeline) {}
std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
- std::shared_ptr<folly::wangle::Pipeline<
- void*, std::exception>> pipeline(
- pipeline_->newPipeline(nullptr));
+ std::shared_ptr<folly::wangle::Pipeline<void*>> pipeline(
+ pipeline_->newPipeline(nullptr));
return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
}
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
std::shared_ptr<PipelineFactory<
- folly::wangle::Pipeline<
- void*, std::exception>>> pipeline_;
+ folly::wangle::Pipeline<void*>>> pipeline_;
};
class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
}
class DefaultAcceptPipelineFactory
- : public PipelineFactory<wangle::Pipeline<void*, std::exception>> {
- typedef wangle::Pipeline<
- void*,
- std::exception> AcceptPipeline;
+ : public PipelineFactory<wangle::Pipeline<void*>> {
+ typedef wangle::Pipeline<void*> AcceptPipeline;
public:
AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
join();
}
- typedef wangle::Pipeline<
- void*,
- std::exception> AcceptPipeline;
+ typedef wangle::Pipeline<void*> AcceptPipeline;
/*
* Pipeline used to add connections to event bases.
* This is used for UDP or for load balancing
*/
};
-struct Unit{};
-
template <class Rin, class Rout = Rin>
class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
public:
typedef Rin rin;
typedef Rout rout;
- typedef Unit win;
- typedef Unit wout;
+ typedef Nothing win;
+ typedef Nothing wout;
typedef InboundHandlerContext<Rout> Context;
virtual ~InboundHandler() {}
public:
static const HandlerDir dir = HandlerDir::OUT;
- typedef Unit rin;
- typedef Unit rout;
+ typedef Nothing rin;
+ typedef Nothing rout;
typedef Win win;
typedef Wout wout;
typedef OutboundHandlerContext<Wout> Context;
namespace folly { namespace wangle {
+// See Pipeline docblock for purpose
+struct Nothing{};
+
+namespace detail {
+
+template <class T>
+inline void logWarningIfNotNothing(const std::string& warning) {
+ LOG(WARNING) << warning;
+}
+
+template <>
+inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
+ // do nothing
+}
+
+} // detail
+
/*
* R is the inbound type, i.e. inbound calls start with pipeline.read(R)
* W is the outbound type, i.e. outbound calls start with pipeline.write(W)
+ *
+ * Use Nothing for one of the types if your pipeline is unidirectional.
+ * If R is Nothing, read(), readEOF(), and readException() will be disabled.
+ * If W is Nothing, write() and close() will be disabled.
*/
-template <class R, class W>
+template <class R, class W = Nothing>
class Pipeline : public DelayedDestruction {
public:
Pipeline() : isStatic_(false) {}
return readBufferSettings_;
}
- void read(R msg) {
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+ read(R msg) {
if (!front_) {
throw std::invalid_argument("read(): no inbound handler in Pipeline");
}
front_->read(std::forward<R>(msg));
}
- void readEOF() {
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+ readEOF() {
if (!front_) {
throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
}
front_->readEOF();
}
- void readException(exception_wrapper e) {
+ template <class T = R>
+ typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+ readException(exception_wrapper e) {
if (!front_) {
throw std::invalid_argument(
"readException(): no inbound handler in Pipeline");
front_->readException(std::move(e));
}
- Future<void> write(W msg) {
+ template <class T = W>
+ typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
+ write(W msg) {
if (!back_) {
throw std::invalid_argument("write(): no outbound handler in Pipeline");
}
return back_->write(std::forward<W>(msg));
}
- Future<void> close() {
+ template <class T = W>
+ typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
+ close() {
if (!back_) {
throw std::invalid_argument("close(): no outbound handler in Pipeline");
}
}
if (!front_) {
- LOG(WARNING) << "No inbound handler in Pipeline, "
- "inbound operations will throw std::invalid_argument";
+ detail::logWarningIfNotNothing<R>(
+ "No inbound handler in Pipeline, inbound operations will throw "
+ "std::invalid_argument");
}
if (!back_) {
- LOG(WARNING) << "No outbound handler in Pipeline, "
- "outbound operations will throw std::invalid_argument";
+ detail::logWarningIfNotNothing<W>(
+ "No outbound handler in Pipeline, outbound operations will throw "
+ "std::invalid_argument");
}
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {