template <typename Pipeline>
class ServerAcceptor
: public Acceptor
- , public folly::wangle::HandlerAdapter<void*, std::exception> {
+ , public folly::wangle::InboundHandler<void*> {
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
Acceptor::addConnection(connection);
}
- folly::Future<void> write(Context* ctx, std::exception e) {
- return ctx->fireWrite(e);
- }
-
/* See Acceptor::onNewConnection for details */
void onNewConnection(
AsyncSocket::UniquePtr transport, const SocketAddress* address,
typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
BytesToBytesHandler;
-typedef InboundHandler<IOBufQueue&>
+typedef InboundHandler<IOBufQueue&, std::unique_ptr<IOBuf>>
InboundBytesToBytesHandler;
typedef OutboundHandler<std::unique_ptr<IOBuf>>
}
void read(R msg) {
+ if (!front_) {
+ throw std::invalid_argument("read(): no inbound handler in Pipeline");
+ }
front_->read(std::forward<R>(msg));
}
void readEOF() {
+ if (!front_) {
+ throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
+ }
front_->readEOF();
}
void readException(exception_wrapper e) {
+ if (!front_) {
+ throw std::invalid_argument(
+ "readException(): no inbound handler in Pipeline");
+ }
front_->readException(std::move(e));
}
Future<void> write(W msg) {
+ if (!back_) {
+ throw std::invalid_argument("write(): no outbound handler in Pipeline");
+ }
return back_->write(std::forward<W>(msg));
}
Future<void> close() {
+ if (!back_) {
+ throw std::invalid_argument("close(): no outbound handler in Pipeline");
+ }
return back_->close();
}
}
if (!front_) {
- throw std::invalid_argument("no inbound handler in Pipeline");
+ LOG(WARNING) << "No inbound handler in Pipeline, "
+ "inbound operations will throw std::invalid_argument";
}
if (!back_) {
- throw std::invalid_argument("no outbound handler in Pipeline");
+ LOG(WARNING) << "No outbound handler in Pipeline, "
+ "outbound operations will throw std::invalid_argument";
}
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
EXPECT_CALL(handler, detachPipeline(_)).Times(2);
}
+TEST(PipelineTest, NoDetachOnOwner) {
+ IntHandler handler;
+ EXPECT_CALL(handler, attachPipeline(_));
+ StaticPipeline<int, int, IntHandler> pipeline(&handler);
+ pipeline.setOwner(&handler);
+}
+
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class ConcreteHandler : public Handler<Rin, Rout, Win, Wout> {
typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
typedef ConcreteHandler<int, std::string> IntToStringHandler;
typedef ConcreteHandler<std::string, int> StringToIntHandler;
-TEST(Pipeline, DynamicConstruction) {
- {
- Pipeline<int, int> pipeline;
- EXPECT_THROW(
- pipeline
- .addBack(HandlerAdapter<std::string, std::string>{})
- .finalize(), std::invalid_argument);
- }
- {
- Pipeline<int, int> pipeline;
- EXPECT_THROW(
- pipeline
- .addFront(HandlerAdapter<std::string, std::string>{})
- .finalize(),
+TEST(Pipeline, MissingInboundOrOutbound) {
+ Pipeline<int, int> pipeline;
+ pipeline
+ .addBack(HandlerAdapter<std::string, std::string>{})
+ .finalize();
+ EXPECT_THROW(pipeline.read(0), std::invalid_argument);
+ EXPECT_THROW(pipeline.readEOF(), std::invalid_argument);
+ EXPECT_THROW(
+ pipeline.readException(exception_wrapper(std::runtime_error("blah"))),
std::invalid_argument);
- }
+ EXPECT_THROW(pipeline.write(0), std::invalid_argument);
+ EXPECT_THROW(pipeline.close(), std::invalid_argument);
+}
+
+TEST(Pipeline, DynamicConstruction) {
{
StaticPipeline<std::string, std::string, StringHandler, StringHandler>
pipeline{StringHandler(), StringHandler()};
while (true) {
result = decode(ctx, q, needed);
if (result) {
- q_.append(std::move(result));
- ctx->fireRead(q_);
+ ctx->fireRead(std::move(result));
} else {
break;
}
Context* ctx, IOBufQueue& buf, size_t&) = 0;
void read(Context* ctx, IOBufQueue& q);
-
- private:
- IOBufQueue q_;
};
}}
using namespace folly::io;
class FrameTester
- : public BytesToBytesHandler {
+ : public InboundHandler<std::unique_ptr<IOBuf>> {
public:
- FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
+ explicit FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
: test_(test) {}
- void read(Context* ctx, IOBufQueue& q) {
- test_(q.move());
+ void read(Context* ctx, std::unique_ptr<IOBuf> buf) {
+ test_(std::move(buf));
}
void readException(Context* ctx, exception_wrapper w) {
* only one request is allowed at a time.
*/
template <typename Pipeline, typename Req, typename Resp = Req>
-class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
+class SerialClientDispatcher : public InboundHandler<Req>
, public Service<Req, Resp> {
public:
- typedef typename HandlerAdapter<Req, Resp>::Context Context;
+ typedef typename InboundHandler<Req>::Context Context;
void setPipeline(Pipeline* pipeline) {
pipeline_ = pipeline;