Udp Acceptor
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
index ac18f314bd946cf50ce4a88715bc8bf279b33983..a6ffd26884b94f79085b9405ee90ba1aefed2a98 100644 (file)
 #pragma once
 
 #include <folly/wangle/acceptor/Acceptor.h>
+#include <folly/wangle/bootstrap/ServerSocketFactory.h>
 #include <folly/io/async/EventBaseManager.h>
 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
 #include <folly/wangle/acceptor/ManagedConnection.h>
 #include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/ChannelHandler.h>
 
 namespace folly {
 
 template <typename Pipeline>
-class ServerAcceptor : public Acceptor {
+class ServerAcceptor
+    : public Acceptor
+    , public folly::wangle::ChannelHandlerAdapter<void*, std::exception> {
   typedef std::unique_ptr<Pipeline,
                           folly::DelayedDestruction::Destructor> PipelinePtr;
 
@@ -55,21 +59,26 @@ class ServerAcceptor : public Acceptor {
 
  public:
   explicit ServerAcceptor(
-    std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
-    EventBase* base)
+        std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
+        std::shared_ptr<folly::wangle::ChannelPipeline<
+                          void*, std::exception>> acceptorPipeline,
+        EventBase* base)
       : Acceptor(ServerSocketConfig())
-      , pipelineFactory_(pipelineFactory) {
-    Acceptor::init(nullptr, base);
+      , base_(base)
+      , childPipelineFactory_(pipelineFactory)
+      , acceptorPipeline_(acceptorPipeline) {
+    Acceptor::init(nullptr, base_);
+    CHECK(acceptorPipeline_);
+
+    acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr<ServerAcceptor, false>(this));
+    acceptorPipeline_->finalize();
   }
 
-  /* See Acceptor::onNewConnection for details */
-  void onNewConnection(
-    AsyncSocket::UniquePtr transport, const SocketAddress* address,
-    const std::string& nextProtocolName, const TransportInfo& tinfo) {
-
+  void read(Context* ctx, void* conn) {
+    AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
       std::unique_ptr<Pipeline,
                        folly::DelayedDestruction::Destructor>
-      pipeline(pipelineFactory_->newPipeline(
+      pipeline(childPipelineFactory_->newPipeline(
         std::shared_ptr<AsyncSocket>(
           transport.release(),
           folly::DelayedDestruction::Destructor())));
@@ -77,22 +86,53 @@ class ServerAcceptor : public Acceptor {
     Acceptor::addConnection(connection);
   }
 
+  folly::Future<void> write(Context* ctx, std::exception e) {
+    return ctx->fireWrite(e);
+  }
+
+  /* See Acceptor::onNewConnection for details */
+  void onNewConnection(
+    AsyncSocket::UniquePtr transport, const SocketAddress* address,
+    const std::string& nextProtocolName, const TransportInfo& tinfo) {
+    acceptorPipeline_->read(transport.release());
+  }
+
+  // UDP thunk
+  void onDataAvailable(const folly::SocketAddress& addr,
+                       std::unique_ptr<folly::IOBuf> buf,
+                       bool truncated) noexcept {
+    acceptorPipeline_->read(buf.release());
+  }
+
  private:
-  std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+  EventBase* base_;
+
+  std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
+  std::shared_ptr<folly::wangle::ChannelPipeline<
+    void*, std::exception>> acceptorPipeline_;
 };
 
 template <typename Pipeline>
 class ServerAcceptorFactory : public AcceptorFactory {
  public:
   explicit ServerAcceptorFactory(
-      std::shared_ptr<PipelineFactory<Pipeline>> factory)
-    : factory_(factory) {}
-
-  std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
-    return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
+    std::shared_ptr<PipelineFactory<Pipeline>> factory,
+    std::shared_ptr<PipelineFactory<folly::wangle::ChannelPipeline<
+    void*, std::exception>>> pipeline)
+    : factory_(factory)
+    , pipeline_(pipeline) {}
+
+  std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
+    std::shared_ptr<folly::wangle::ChannelPipeline<
+                      void*, std::exception>> pipeline(
+                        pipeline_->newPipeline(nullptr));
+    return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
   }
  private:
   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
+  std::shared_ptr<PipelineFactory<
+    folly::wangle::ChannelPipeline<
+      void*, std::exception>>> pipeline_;
 };
 
 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
@@ -100,10 +140,12 @@ class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
   explicit ServerWorkerPool(
     std::shared_ptr<AcceptorFactory> acceptorFactory,
     folly::wangle::IOThreadPoolExecutor* exec,
-    std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
+    std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets,
+    std::shared_ptr<ServerSocketFactory> socketFactory)
       : acceptorFactory_(acceptorFactory)
       , exec_(exec)
-      , sockets_(sockets) {
+      , sockets_(sockets)
+      , socketFactory_(socketFactory) {
     CHECK(exec);
   }
 
@@ -128,7 +170,8 @@ class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
            std::shared_ptr<Acceptor>> workers_;
   std::shared_ptr<AcceptorFactory> acceptorFactory_;
   folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
-  std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
+  std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets_;
+  std::shared_ptr<ServerSocketFactory> socketFactory_;
 };
 
 template <typename F>
@@ -138,4 +181,16 @@ void ServerWorkerPool::forEachWorker(F&& f) const {
   }
 }
 
+class DefaultAcceptPipelineFactory
+    : public PipelineFactory<wangle::ChannelPipeline<void*, std::exception>> {
+  typedef wangle::ChannelPipeline<
+      void*,
+      std::exception> AcceptPipeline;
+
+ public:
+  AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+    return new AcceptPipeline;
+  }
+};
+
 } // namespace