telnet server
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 4 May 2015 16:58:49 +0000 (09:58 -0700)
committerPraveen Kumar Ramakrishnan <praveenr@fb.com>
Tue, 12 May 2015 00:02:11 +0000 (17:02 -0700)
Summary: similar to https://github.com/netty/netty/tree/master/example/src/main/java/io/netty/example/telnet

Test Plan:
fbconfig folly/wangle/example/telnet; fbmake dbg
_bin/folly/wangle/example/telnet_server --port=8080
telnet localhost 8080

Still a couple sharp edges:
* No easy way to wait for ServerBootstrap termination.
* Pipelines always have to call attachReadCallback
* a bunch of missing methods in pipeline still, like channelActive

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D1959172

Signature: t1:1959172:1427993978:463f237036996451187e3ef3983cf2b4e89685ef

folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ClientBootstrap.h
folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/bootstrap/ServerBootstrap.h
folly/wangle/channel/AsyncSocketHandler.h
folly/wangle/channel/HandlerContext-inl.h
folly/wangle/channel/HandlerContext.h
folly/wangle/channel/Pipeline.h
folly/wangle/codec/StringCodec.h
folly/wangle/service/ServiceTest.cpp

index 3e8908b87488f458ae6e80fbe09c209fc5b75635..0696f71fd4fa8a2a1618c470afb9ef80dfdd847f 100644 (file)
@@ -32,7 +32,8 @@ typedef ClientBootstrap<BytesPipeline> TestClient;
 
 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
  public:
-  BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+  std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
+  newPipeline(std::shared_ptr<AsyncSocket> sock) {
     CHECK(sock->good());
 
     // We probably aren't connected immedately, check after a small delay
@@ -45,9 +46,12 @@ class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
 
 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
  public:
-  BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+  std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
+    std::shared_ptr<AsyncSocket> sock) {
+
     pipelines++;
-    return new BytesPipeline();
+    return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
+      new BytesPipeline());
   }
   std::atomic<int> pipelines{0};
 };
@@ -279,8 +283,13 @@ template <typename HandlerPipeline>
 class TestHandlerPipelineFactory
     : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
  public:
-  ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
-    auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
+  std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
+                  folly::DelayedDestruction::Destructor>
+  newPipeline(std::shared_ptr<AsyncSocket>) {
+
+    std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
+                    folly::DelayedDestruction::Destructor> pipeline(
+                      new ServerBootstrap<BytesPipeline>::AcceptPipeline);
     pipeline->addBack(HandlerPipeline());
     return pipeline;
   }
index 2b3f3be2798dab34abe97941fe93cc9abc51fad5..94e6789f2f0cd6a3b602df75729a26c52b82c6d0 100644 (file)
@@ -34,10 +34,10 @@ class ClientBootstrap {
   }
   ClientBootstrap* connect(SocketAddress address) {
     DCHECK(pipelineFactory_);
-    pipeline_.reset(
+    pipeline_=
       pipelineFactory_->newPipeline(
-        AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
-      ));
+        AsyncSocket::newSocket(
+          EventBaseManager::get()->getEventBase(), address));
     return this;
   }
 
index 543a655f46081fce30e9aec8bc168741ff2ad889..85342646c3c30f77af678ce570818e0aef876a33 100644 (file)
@@ -32,27 +32,35 @@ class ServerAcceptor
   typedef std::unique_ptr<Pipeline,
                           folly::DelayedDestruction::Destructor> PipelinePtr;
 
-  class ServerConnection : public wangle::ManagedConnection {
+  class ServerConnection : public wangle::ManagedConnection,
+                           public wangle::PipelineManager {
    public:
     explicit ServerConnection(PipelinePtr pipeline)
-        : pipeline_(std::move(pipeline)) {}
-
-    ~ServerConnection() {
+        : pipeline_(std::move(pipeline)) {
+      pipeline_->setPipelineManager(this);
     }
 
-    void timeoutExpired() noexcept {
+    ~ServerConnection() {}
+
+    void timeoutExpired() noexcept override {
     }
 
-    void describe(std::ostream& os) const {}
-    bool isBusy() const {
+    void describe(std::ostream& os) const override {}
+    bool isBusy() const override {
       return false;
     }
-    void notifyPendingShutdown() {}
-    void closeWhenIdle() {}
-    void dropConnection() {
+    void notifyPendingShutdown() override {}
+    void closeWhenIdle() override {}
+    void dropConnection() override {
       delete this;
     }
-    void dumpConnectionState(uint8_t loglevel) {}
+    void dumpConnectionState(uint8_t loglevel) override {}
+
+    void deletePipeline(wangle::PipelineBase* p) override {
+      CHECK(p == pipeline_.get());
+      delete this;
+    }
+
    private:
     PipelinePtr pipeline_;
   };
@@ -178,8 +186,11 @@ class DefaultAcceptPipelineFactory
   typedef wangle::Pipeline<void*> AcceptPipeline;
 
  public:
-  AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
-    return new AcceptPipeline;
+  std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
+    newPipeline(std::shared_ptr<AsyncSocket>) {
+
+    return std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
+      (new AcceptPipeline);
   }
 };
 
index 3de9c85c9f84425f3c809d9716ebaf35ad37be48..4940e67b504509b571c7e73e1c3cf5af0b76910a 100644 (file)
@@ -284,6 +284,13 @@ class ServerBootstrap {
       }
       sockets_->clear();
     }
+    if (!stopped_) {
+      stopped_ = true;
+      // stopBaton_ may be null if ServerBootstrap has been std::move'd
+      if (stopBaton_) {
+        stopBaton_->post();
+      }
+    }
   }
 
   void join() {
@@ -295,6 +302,13 @@ class ServerBootstrap {
     }
   }
 
+  void waitForStop() {
+    if (!stopped_) {
+      CHECK(stopBaton_);
+      stopBaton_->wait();
+    }
+  }
+
   /*
    * Get the list of listening sockets
    */
@@ -328,6 +342,10 @@ class ServerBootstrap {
     std::make_shared<DefaultAcceptPipelineFactory>()};
   std::shared_ptr<ServerSocketFactory> socketFactory_{
     std::make_shared<AsyncServerSocketFactory>()};
+
+  std::unique_ptr<folly::Baton<>> stopBaton_{
+    folly::make_unique<folly::Baton<>>()};
+  bool stopped_{false};
 };
 
 } // namespace
index dca22236d4fe2a7f20832c61d416254158d08516..04491bbd5c51992836ea2921eea4cc7998a114bb 100644 (file)
@@ -94,6 +94,7 @@ class AsyncSocketHandler
       detachReadCallback();
       socket_->closeNow();
     }
+    ctx->getPipeline()->deletePipeline();
     return folly::makeFuture();
   }
 
index 3d472467d6e9762842ac1e59692f14e135ab26cd..49f2517f1eeaedea94fdd9ffb54adfa1f3f744f5 100644 (file)
@@ -201,6 +201,10 @@ class ContextImpl
     }
   }
 
+  PipelineBase* getPipeline() override {
+    return this->pipeline_;
+  }
+
   std::shared_ptr<AsyncTransport> getTransport() override {
     return this->pipeline_->getTransport();
   }
@@ -306,6 +310,10 @@ class InboundContextImpl
     }
   }
 
+  PipelineBase* getPipeline() override {
+    return this->pipeline_;
+  }
+
   std::shared_ptr<AsyncTransport> getTransport() override {
     return this->pipeline_->getTransport();
   }
@@ -375,6 +383,10 @@ class OutboundContextImpl
     }
   }
 
+  PipelineBase* getPipeline() override {
+    return this->pipeline_;
+  }
+
   std::shared_ptr<AsyncTransport> getTransport() override {
     return this->pipeline_->getTransport();
   }
index f95a80f03c1571b80d03b541b1dbd30ab52dc945..2173ab351eb47af4a294b64dfd710888b97fc8ee 100644 (file)
@@ -22,6 +22,8 @@
 
 namespace folly { namespace wangle {
 
+class PipelineBase;
+
 template <class In, class Out>
 class HandlerContext {
  public:
@@ -34,6 +36,8 @@ class HandlerContext {
   virtual Future<void> fireWrite(Out msg) = 0;
   virtual Future<void> fireClose() = 0;
 
+  virtual PipelineBase* getPipeline() = 0;
+
   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
 
   virtual void setWriteFlags(WriteFlags flags) = 0;
@@ -64,6 +68,8 @@ class InboundHandlerContext {
   virtual void fireReadEOF() = 0;
   virtual void fireReadException(exception_wrapper e) = 0;
 
+  virtual PipelineBase* getPipeline() = 0;
+
   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
 
   // TODO Need get/set writeFlags, readBufferSettings? Probably not.
@@ -79,6 +85,8 @@ class OutboundHandlerContext {
   virtual Future<void> fireWrite(Out msg) = 0;
   virtual Future<void> fireClose() = 0;
 
+  virtual PipelineBase* getPipeline() = 0;
+
   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
 };
 
index fa039fbe11c7c6e269b85f1f84cd404c94e5b354..14a586ffa813f76b7d3aa94cc9577ee6c09e2d96 100644 (file)
 
 namespace folly { namespace wangle {
 
+class PipelineManager {
+ public:
+  virtual ~PipelineManager() {}
+  virtual void deletePipeline(PipelineBase* pipeline) = 0;
+};
+
+class PipelineBase {
+ public:
+  void setPipelineManager(PipelineManager* manager) {
+    manager_ = manager;
+  }
+
+  void deletePipeline() {
+    if (manager_) {
+      manager_->deletePipeline(this);
+    }
+  }
+
+ private:
+  PipelineManager* manager_{nullptr};
+};
+
 struct Nothing{};
 
 /*
@@ -36,7 +58,7 @@ struct Nothing{};
  * If W is Nothing, write() and close() will be disabled.
  */
 template <class R, class W = Nothing>
-class Pipeline : public DelayedDestruction {
+class Pipeline : public PipelineBase, public DelayedDestruction {
  public:
   Pipeline();
   ~Pipeline();
@@ -137,7 +159,9 @@ class AsyncSocket;
 template <typename Pipeline>
 class PipelineFactory {
  public:
-  virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+  virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
+  newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+
   virtual ~PipelineFactory() {}
 };
 
index 226a3677098e3298ef737c4186754241cea3c17f..cbe0e843b25c722d2da0a3923107fa02b012eec4 100644 (file)
@@ -23,15 +23,14 @@ namespace folly { namespace wangle {
 /*
  * StringCodec converts a pipeline from IOBufs to std::strings.
  */
-class StringCodec : public Handler<IOBufQueue&, std::string,
+class StringCodec : public Handler<std::unique_ptr<IOBuf>, std::string,
                                    std::string, std::unique_ptr<IOBuf>> {
  public:
   typedef typename Handler<
-   IOBufQueue&, std::string,
+   std::unique_ptr<IOBuf>, std::string,
    std::string, std::unique_ptr<IOBuf>>::Context Context;
 
-  void read(Context* ctx, IOBufQueue& q) override {
-    auto buf = q.pop_front();
+  void read(Context* ctx, std::unique_ptr<IOBuf> buf) override {
     buf->coalesce();
     std::string data((const char*)buf->data(), buf->length());
 
index be01804995772f316c9aa4ca133f7bf9e8f75a31..9d58e09a4a66e6cc437072d4ffdba031673770ba 100644 (file)
@@ -16,6 +16,7 @@
 #include <gtest/gtest.h>
 
 #include <folly/wangle/codec/StringCodec.h>
+#include <folly/wangle/codec/ByteToMessageCodec.h>
 #include <folly/wangle/service/ClientDispatcher.h>
 #include <folly/wangle/service/ServerDispatcher.h>
 #include <folly/wangle/service/Service.h>
@@ -26,6 +27,14 @@ using namespace wangle;
 
 typedef Pipeline<IOBufQueue&, std::string> ServicePipeline;
 
+class SimpleDecode : public ByteToMessageCodec {
+ public:
+  virtual std::unique_ptr<IOBuf> decode(
+    Context* ctx, IOBufQueue& buf, size_t&) {
+    return buf.move();
+  }
+};
+
 class EchoService : public Service<std::string, std::string> {
  public:
   virtual Future<std::string> operator()(std::string req) override {
@@ -45,10 +54,12 @@ class ServerPipelineFactory
     : public PipelineFactory<ServicePipeline> {
  public:
 
-  ServicePipeline* newPipeline(
-      std::shared_ptr<AsyncSocket> socket) override {
-    auto pipeline = new ServicePipeline();
+  std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor>
+  newPipeline(std::shared_ptr<AsyncSocket> socket) override {
+    std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor> pipeline(
+      new ServicePipeline());
     pipeline->addBack(AsyncSocketHandler(socket));
+    pipeline->addBack(SimpleDecode());
     pipeline->addBack(StringCodec());
     pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
     pipeline->finalize();
@@ -63,11 +74,14 @@ template <typename Req, typename Resp>
 class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
  public:
 
-  ServicePipeline* newPipeline(
-      std::shared_ptr<AsyncSocket> socket) override {
-    auto pipeline = new ServicePipeline();
+  std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor>
+  newPipeline(std::shared_ptr<AsyncSocket> socket) override {
+    std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor> pipeline(
+      new ServicePipeline());
     pipeline->addBack(AsyncSocketHandler(socket));
+    pipeline->addBack(SimpleDecode());
     pipeline->addBack(StringCodec());
+    pipeline->finalize();
     return pipeline;
    }
 };