Telnet client
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 4 May 2015 19:15:16 +0000 (12:15 -0700)
committerPraveen Kumar Ramakrishnan <praveenr@fb.com>
Tue, 12 May 2015 00:02:12 +0000 (17:02 -0700)
Summary:
A client example to match telnet server.

Required a couple additions:
* future result when socket actually connects, similar to netty
* clients support IOThreadPoolExecutor groups
* a pipeline stage to make sure everything runs in the right eventbase thread.

Test Plan:
fbconfig follg/wangle/example/telnet && fbmake dbg
telnet_server
telnet_client

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2010289

Signature: t1:2010289:1430766232:65c6f946e454000f6ea9f41b49197ddbeea5ba3f

folly/Makefile.am
folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ClientBootstrap.h
folly/wangle/channel/EventBaseHandler.h [new file with mode: 0644]

index b11be4502c5166ee846593dbbe32f2ded8d5fecf..ad215492296154bea5207e0f7fde886884ed0178 100644 (file)
@@ -276,6 +276,7 @@ nobase_follyinclude_HEADERS = \
        wangle/bootstrap/ServerSocketFactory.h \
        wangle/bootstrap/ClientBootstrap.h \
        wangle/channel/AsyncSocketHandler.h \
+       wangle/channel/EventBaseHandler.h \
        wangle/channel/Handler.h \
        wangle/channel/HandlerContext.h \
        wangle/channel/HandlerContext-inl.h \
index 0696f71fd4fa8a2a1618c470afb9ef80dfdd847f..724afec93f24a5b693276372bee8284038bec557 100644 (file)
@@ -34,10 +34,9 @@ class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
  public:
   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
   newPipeline(std::shared_ptr<AsyncSocket> sock) {
-    CHECK(sock->good());
-
     // We probably aren't connected immedately, check after a small delay
     EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
+      CHECK(sock->good());
       CHECK(sock->readable());
     }, 100);
     return nullptr;
index 94e6789f2f0cd6a3b602df75729a26c52b82c6d0..3ae2ce4f294770bd79179eec4d281f8b757584a9 100644 (file)
@@ -16,6 +16,9 @@
 #pragma once
 
 #include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <folly/io/async/AsyncSocket.h>
+#include <folly/io/async/EventBaseManager.h>
 
 namespace folly {
 
@@ -25,20 +28,60 @@ namespace folly {
  */
 template <typename Pipeline>
 class ClientBootstrap {
+
+  class ConnectCallback : public AsyncSocket::ConnectCallback {
+   public:
+    ConnectCallback(Promise<Pipeline*> promise, ClientBootstrap* bootstrap)
+        : promise_(std::move(promise))
+        , bootstrap_(bootstrap) {}
+
+    void connectSuccess() noexcept override {
+      promise_.setValue(bootstrap_->getPipeline());
+      delete this;
+    }
+
+    void connectErr(const AsyncSocketException& ex) noexcept override {
+      promise_.setException(
+        folly::make_exception_wrapper<AsyncSocketException>(ex));
+      delete this;
+    }
+   private:
+    Promise<Pipeline*> promise_;
+    ClientBootstrap* bootstrap_;
+  };
+
  public:
   ClientBootstrap() {
   }
+
+  ClientBootstrap* group(
+      std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group) {
+    group_ = group;
+    return this;
+  }
   ClientBootstrap* bind(int port) {
     port_ = port;
     return this;
   }
-  ClientBootstrap* connect(SocketAddress address) {
+  Future<Pipeline*> connect(SocketAddress address) {
     DCHECK(pipelineFactory_);
-    pipeline_=
-      pipelineFactory_->newPipeline(
-        AsyncSocket::newSocket(
-          EventBaseManager::get()->getEventBase(), address));
-    return this;
+    auto base = EventBaseManager::get()->getEventBase();
+    if (group_) {
+      base = group_->getEventBase();
+    }
+    Future<Pipeline*> retval((Pipeline*)nullptr);
+    base->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
+      auto socket = AsyncSocket::newSocket(base);
+      Promise<Pipeline*> promise;
+      retval = promise.getFuture();
+      socket->connect(
+        new ConnectCallback(std::move(promise), this), address);
+      pipeline_ = pipelineFactory_->newPipeline(socket);
+      if (pipeline_) {
+        pipeline_->attachTransport(socket);
+      }
+    });
+    return retval;
   }
 
   ClientBootstrap* pipelineFactory(
@@ -60,6 +103,7 @@ class ClientBootstrap {
   int port_;
 
   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+  std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group_;
 };
 
 } // namespace
diff --git a/folly/wangle/channel/EventBaseHandler.h b/folly/wangle/channel/EventBaseHandler.h
new file mode 100644 (file)
index 0000000..55290de
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly { namespace wangle {
+
+class EventBaseHandler : public OutboundBytesToBytesHandler {
+ public:
+  folly::Future<void> write(
+      Context* ctx,
+      std::unique_ptr<folly::IOBuf> buf) override {
+    folly::Future<void> retval;
+    DCHECK(ctx->getTransport());
+    DCHECK(ctx->getTransport()->getEventBase());
+    ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
+        retval = ctx->fireWrite(std::move(buf));
+    });
+    return retval;
+  }
+
+  Future<void> close(Context* ctx) override {
+    DCHECK(ctx->getTransport());
+    DCHECK(ctx->getTransport()->getEventBase());
+    Future<void> retval;
+    ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
+        retval = ctx->fireClose();
+    });
+    return retval;
+  }
+};
+
+}} // namespace