From adba2ec8eeee40e88b722c77d606b29144fea666 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 4 May 2015 09:58:49 -0700 Subject: [PATCH] telnet server Summary: similar to https://github.com/netty/netty/tree/master/example/src/main/java/io/netty/example/telnet Test Plan: fbconfig folly/wangle/example/telnet; fbmake dbg _bin/folly/wangle/example/telnet_server --port=8080 telnet localhost 8080 Still a couple sharp edges: * No easy way to wait for ServerBootstrap termination. * Pipelines always have to call attachReadCallback * a bunch of missing methods in pipeline still, like channelActive Reviewed By: hans@fb.com Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D1959172 Signature: t1:1959172:1427993978:463f237036996451187e3ef3983cf2b4e89685ef --- folly/wangle/bootstrap/BootstrapTest.cpp | 19 +++++++--- folly/wangle/bootstrap/ClientBootstrap.h | 6 ++-- folly/wangle/bootstrap/ServerBootstrap-inl.h | 37 +++++++++++++------- folly/wangle/bootstrap/ServerBootstrap.h | 18 ++++++++++ folly/wangle/channel/AsyncSocketHandler.h | 1 + folly/wangle/channel/HandlerContext-inl.h | 12 +++++++ folly/wangle/channel/HandlerContext.h | 8 +++++ folly/wangle/channel/Pipeline.h | 28 +++++++++++++-- folly/wangle/codec/StringCodec.h | 7 ++-- folly/wangle/service/ServiceTest.cpp | 26 ++++++++++---- 10 files changed, 129 insertions(+), 33 deletions(-) diff --git a/folly/wangle/bootstrap/BootstrapTest.cpp b/folly/wangle/bootstrap/BootstrapTest.cpp index 3e8908b8..0696f71f 100644 --- a/folly/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/wangle/bootstrap/BootstrapTest.cpp @@ -32,7 +32,8 @@ typedef ClientBootstrap TestClient; class TestClientPipelineFactory : public PipelineFactory { public: - BytesPipeline* newPipeline(std::shared_ptr sock) { + std::unique_ptr + newPipeline(std::shared_ptr sock) { CHECK(sock->good()); // We probably aren't connected immedately, check after a small delay @@ -45,9 +46,12 @@ class TestClientPipelineFactory : public PipelineFactory { class TestPipelineFactory : public PipelineFactory { public: - BytesPipeline* newPipeline(std::shared_ptr sock) { + std::unique_ptr newPipeline( + std::shared_ptr sock) { + pipelines++; - return new BytesPipeline(); + return std::unique_ptr( + new BytesPipeline()); } std::atomic pipelines{0}; }; @@ -279,8 +283,13 @@ template class TestHandlerPipelineFactory : public PipelineFactory::AcceptPipeline> { public: - ServerBootstrap::AcceptPipeline* newPipeline(std::shared_ptr) { - auto pipeline = new ServerBootstrap::AcceptPipeline; + std::unique_ptr::AcceptPipeline, + folly::DelayedDestruction::Destructor> + newPipeline(std::shared_ptr) { + + std::unique_ptr::AcceptPipeline, + folly::DelayedDestruction::Destructor> pipeline( + new ServerBootstrap::AcceptPipeline); pipeline->addBack(HandlerPipeline()); return pipeline; } diff --git a/folly/wangle/bootstrap/ClientBootstrap.h b/folly/wangle/bootstrap/ClientBootstrap.h index 2b3f3be2..94e6789f 100644 --- a/folly/wangle/bootstrap/ClientBootstrap.h +++ b/folly/wangle/bootstrap/ClientBootstrap.h @@ -34,10 +34,10 @@ class ClientBootstrap { } ClientBootstrap* connect(SocketAddress address) { DCHECK(pipelineFactory_); - pipeline_.reset( + pipeline_= pipelineFactory_->newPipeline( - AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address) - )); + AsyncSocket::newSocket( + EventBaseManager::get()->getEventBase(), address)); return this; } diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index 543a655f..85342646 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -32,27 +32,35 @@ class ServerAcceptor typedef std::unique_ptr PipelinePtr; - class ServerConnection : public wangle::ManagedConnection { + class ServerConnection : public wangle::ManagedConnection, + public wangle::PipelineManager { public: explicit ServerConnection(PipelinePtr pipeline) - : pipeline_(std::move(pipeline)) {} - - ~ServerConnection() { + : pipeline_(std::move(pipeline)) { + pipeline_->setPipelineManager(this); } - void timeoutExpired() noexcept { + ~ServerConnection() {} + + void timeoutExpired() noexcept override { } - void describe(std::ostream& os) const {} - bool isBusy() const { + void describe(std::ostream& os) const override {} + bool isBusy() const override { return false; } - void notifyPendingShutdown() {} - void closeWhenIdle() {} - void dropConnection() { + void notifyPendingShutdown() override {} + void closeWhenIdle() override {} + void dropConnection() override { delete this; } - void dumpConnectionState(uint8_t loglevel) {} + void dumpConnectionState(uint8_t loglevel) override {} + + void deletePipeline(wangle::PipelineBase* p) override { + CHECK(p == pipeline_.get()); + delete this; + } + private: PipelinePtr pipeline_; }; @@ -178,8 +186,11 @@ class DefaultAcceptPipelineFactory typedef wangle::Pipeline AcceptPipeline; public: - AcceptPipeline* newPipeline(std::shared_ptr) { - return new AcceptPipeline; + std::unique_ptr + newPipeline(std::shared_ptr) { + + return std::unique_ptr + (new AcceptPipeline); } }; diff --git a/folly/wangle/bootstrap/ServerBootstrap.h b/folly/wangle/bootstrap/ServerBootstrap.h index 3de9c85c..4940e67b 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.h +++ b/folly/wangle/bootstrap/ServerBootstrap.h @@ -284,6 +284,13 @@ class ServerBootstrap { } sockets_->clear(); } + if (!stopped_) { + stopped_ = true; + // stopBaton_ may be null if ServerBootstrap has been std::move'd + if (stopBaton_) { + stopBaton_->post(); + } + } } void join() { @@ -295,6 +302,13 @@ class ServerBootstrap { } } + void waitForStop() { + if (!stopped_) { + CHECK(stopBaton_); + stopBaton_->wait(); + } + } + /* * Get the list of listening sockets */ @@ -328,6 +342,10 @@ class ServerBootstrap { std::make_shared()}; std::shared_ptr socketFactory_{ std::make_shared()}; + + std::unique_ptr> stopBaton_{ + folly::make_unique>()}; + bool stopped_{false}; }; } // namespace diff --git a/folly/wangle/channel/AsyncSocketHandler.h b/folly/wangle/channel/AsyncSocketHandler.h index dca22236..04491bbd 100644 --- a/folly/wangle/channel/AsyncSocketHandler.h +++ b/folly/wangle/channel/AsyncSocketHandler.h @@ -94,6 +94,7 @@ class AsyncSocketHandler detachReadCallback(); socket_->closeNow(); } + ctx->getPipeline()->deletePipeline(); return folly::makeFuture(); } diff --git a/folly/wangle/channel/HandlerContext-inl.h b/folly/wangle/channel/HandlerContext-inl.h index 3d472467..49f2517f 100644 --- a/folly/wangle/channel/HandlerContext-inl.h +++ b/folly/wangle/channel/HandlerContext-inl.h @@ -201,6 +201,10 @@ class ContextImpl } } + PipelineBase* getPipeline() override { + return this->pipeline_; + } + std::shared_ptr getTransport() override { return this->pipeline_->getTransport(); } @@ -306,6 +310,10 @@ class InboundContextImpl } } + PipelineBase* getPipeline() override { + return this->pipeline_; + } + std::shared_ptr getTransport() override { return this->pipeline_->getTransport(); } @@ -375,6 +383,10 @@ class OutboundContextImpl } } + PipelineBase* getPipeline() override { + return this->pipeline_; + } + std::shared_ptr getTransport() override { return this->pipeline_->getTransport(); } diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h index f95a80f0..2173ab35 100644 --- a/folly/wangle/channel/HandlerContext.h +++ b/folly/wangle/channel/HandlerContext.h @@ -22,6 +22,8 @@ namespace folly { namespace wangle { +class PipelineBase; + template class HandlerContext { public: @@ -34,6 +36,8 @@ class HandlerContext { virtual Future fireWrite(Out msg) = 0; virtual Future fireClose() = 0; + virtual PipelineBase* getPipeline() = 0; + virtual std::shared_ptr getTransport() = 0; virtual void setWriteFlags(WriteFlags flags) = 0; @@ -64,6 +68,8 @@ class InboundHandlerContext { virtual void fireReadEOF() = 0; virtual void fireReadException(exception_wrapper e) = 0; + virtual PipelineBase* getPipeline() = 0; + virtual std::shared_ptr getTransport() = 0; // TODO Need get/set writeFlags, readBufferSettings? Probably not. @@ -79,6 +85,8 @@ class OutboundHandlerContext { virtual Future fireWrite(Out msg) = 0; virtual Future fireClose() = 0; + virtual PipelineBase* getPipeline() = 0; + virtual std::shared_ptr getTransport() = 0; }; diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index fa039fbe..14a586ff 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -25,6 +25,28 @@ namespace folly { namespace wangle { +class PipelineManager { + public: + virtual ~PipelineManager() {} + virtual void deletePipeline(PipelineBase* pipeline) = 0; +}; + +class PipelineBase { + public: + void setPipelineManager(PipelineManager* manager) { + manager_ = manager; + } + + void deletePipeline() { + if (manager_) { + manager_->deletePipeline(this); + } + } + + private: + PipelineManager* manager_{nullptr}; +}; + struct Nothing{}; /* @@ -36,7 +58,7 @@ struct Nothing{}; * If W is Nothing, write() and close() will be disabled. */ template -class Pipeline : public DelayedDestruction { +class Pipeline : public PipelineBase, public DelayedDestruction { public: Pipeline(); ~Pipeline(); @@ -137,7 +159,9 @@ class AsyncSocket; template class PipelineFactory { public: - virtual Pipeline* newPipeline(std::shared_ptr) = 0; + virtual std::unique_ptr + newPipeline(std::shared_ptr) = 0; + virtual ~PipelineFactory() {} }; diff --git a/folly/wangle/codec/StringCodec.h b/folly/wangle/codec/StringCodec.h index 226a3677..cbe0e843 100644 --- a/folly/wangle/codec/StringCodec.h +++ b/folly/wangle/codec/StringCodec.h @@ -23,15 +23,14 @@ namespace folly { namespace wangle { /* * StringCodec converts a pipeline from IOBufs to std::strings. */ -class StringCodec : public Handler, std::string, std::string, std::unique_ptr> { public: typedef typename Handler< - IOBufQueue&, std::string, + std::unique_ptr, std::string, std::string, std::unique_ptr>::Context Context; - void read(Context* ctx, IOBufQueue& q) override { - auto buf = q.pop_front(); + void read(Context* ctx, std::unique_ptr buf) override { buf->coalesce(); std::string data((const char*)buf->data(), buf->length()); diff --git a/folly/wangle/service/ServiceTest.cpp b/folly/wangle/service/ServiceTest.cpp index be018049..9d58e09a 100644 --- a/folly/wangle/service/ServiceTest.cpp +++ b/folly/wangle/service/ServiceTest.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -26,6 +27,14 @@ using namespace wangle; typedef Pipeline ServicePipeline; +class SimpleDecode : public ByteToMessageCodec { + public: + virtual std::unique_ptr decode( + Context* ctx, IOBufQueue& buf, size_t&) { + return buf.move(); + } +}; + class EchoService : public Service { public: virtual Future operator()(std::string req) override { @@ -45,10 +54,12 @@ class ServerPipelineFactory : public PipelineFactory { public: - ServicePipeline* newPipeline( - std::shared_ptr socket) override { - auto pipeline = new ServicePipeline(); + std::unique_ptr + newPipeline(std::shared_ptr socket) override { + std::unique_ptr pipeline( + new ServicePipeline()); pipeline->addBack(AsyncSocketHandler(socket)); + pipeline->addBack(SimpleDecode()); pipeline->addBack(StringCodec()); pipeline->addBack(SerialServerDispatcher(&service_)); pipeline->finalize(); @@ -63,11 +74,14 @@ template class ClientPipelineFactory : public PipelineFactory { public: - ServicePipeline* newPipeline( - std::shared_ptr socket) override { - auto pipeline = new ServicePipeline(); + std::unique_ptr + newPipeline(std::shared_ptr socket) override { + std::unique_ptr pipeline( + new ServicePipeline()); pipeline->addBack(AsyncSocketHandler(socket)); + pipeline->addBack(SimpleDecode()); pipeline->addBack(StringCodec()); + pipeline->finalize(); return pipeline; } }; -- 2.34.1