AsyncSocketHandler
authorJames Sedgwick <jsedgwick@fb.com>
Fri, 21 Nov 2014 20:59:49 +0000 (12:59 -0800)
committerDave Watson <davejwatson@fb.com>
Thu, 11 Dec 2014 15:58:38 +0000 (07:58 -0800)
Summary:
mostly copypasta from TAsyncTransportHandler

Test Plan: compiles

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@

FB internal diff: D1690973

Signature: t1:1690973:1416529528:4feb187a68ad5405662b9b0efb160edd253a2977

folly/experimental/wangle/channel/AsyncSocketHandler.h [new file with mode: 0644]
folly/experimental/wangle/channel/ChannelTest.cpp
folly/io/async/AsyncSocket.h

diff --git a/folly/experimental/wangle/channel/AsyncSocketHandler.h b/folly/experimental/wangle/channel/AsyncSocketHandler.h
new file mode 100644 (file)
index 0000000..91277b6
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/experimental/wangle/channel/ChannelHandler.h>
+#include <folly/io/async/AsyncSocket.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventBaseManager.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/IOBufQueue.h>
+
+namespace folly { namespace wangle {
+
+class AsyncSocketHandler
+  : public folly::wangle::BytesToBytesHandler,
+    public AsyncSocket::ReadCallback {
+ public:
+  explicit AsyncSocketHandler(
+      std::shared_ptr<AsyncSocket> socket)
+    : socket_(std::move(socket)) {}
+
+  AsyncSocketHandler(AsyncSocketHandler&&) = default;
+
+  ~AsyncSocketHandler() {
+    if (socket_) {
+      detachReadCallback();
+    }
+  }
+
+  void attachReadCallback() {
+    socket_->setReadCB(socket_->good() ? this : nullptr);
+  }
+
+  void detachReadCallback() {
+    if (socket_->getReadCallback() == this) {
+      socket_->setReadCB(nullptr);
+    }
+  }
+
+  void attachEventBase(folly::EventBase* eventBase) {
+    if (eventBase && !socket_->getEventBase()) {
+      socket_->attachEventBase(eventBase);
+    }
+  }
+
+  void detachEventBase() {
+    detachReadCallback();
+    if (socket_->getEventBase()) {
+      socket_->detachEventBase();
+    }
+  }
+
+  void attachPipeline(Context* ctx) override {
+    CHECK(!ctx_);
+    ctx_ = ctx;
+  }
+
+  folly::wangle::Future<void> write(
+      Context* ctx,
+      std::unique_ptr<folly::IOBuf> buf) override {
+    if (UNLIKELY(!buf)) {
+      return folly::wangle::makeFuture();
+    }
+
+    if (!socket_->good()) {
+      VLOG(5) << "socket is closed in write()";
+      return folly::wangle::makeFuture<void>(AsyncSocketException(
+          AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
+          "socket is closed in write()"));
+    }
+
+    auto cb = new WriteCallback();
+    auto future = cb->promise_.getFuture();
+    socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
+    return future;
+  };
+
+  folly::wangle::Future<void> close(Context* ctx) {
+    if (socket_) {
+      detachReadCallback();
+      socket_->closeNow();
+    }
+    return folly::wangle::makeFuture();
+  }
+
+  // Must override to avoid warnings about hidden overloaded virtual due to
+  // AsyncSocket::ReadCallback::readEOF()
+  void readEOF(Context* ctx) override {
+    ctx->fireReadEOF();
+  }
+
+  void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
+    const auto readBufferSettings = ctx_->getReadBufferSettings();
+    const auto ret = bufQueue_.preallocate(
+        readBufferSettings.first,
+        readBufferSettings.second);
+    *bufReturn = ret.first;
+    *lenReturn = ret.second;
+  }
+
+  void readDataAvailable(size_t len) noexcept override {
+    bufQueue_.postallocate(len);
+    ctx_->fireRead(bufQueue_);
+  }
+
+  void readEOF() noexcept override {
+    ctx_->fireReadEOF();
+  }
+
+  void readErr(const AsyncSocketException& ex)
+    noexcept override {
+    ctx_->fireReadException(ex);
+  }
+
+ private:
+  class WriteCallback : private AsyncSocket::WriteCallback {
+    void writeSuccess() noexcept override {
+      promise_.setValue();
+      delete this;
+    }
+
+    void writeErr(size_t bytesWritten,
+                    const AsyncSocketException& ex)
+      noexcept override {
+      promise_.setException(ex);
+      delete this;
+    }
+
+   private:
+    friend class AsyncSocketHandler;
+    folly::wangle::Promise<void> promise_;
+  };
+
+  Context* ctx_{nullptr};
+  folly::IOBufQueue bufQueue_;
+  std::shared_ptr<AsyncSocket> socket_{nullptr};
+};
+
+}}
index 6b7ec89744ad63a6eb1a72941ed6838ebc520cf4..2f155226f5db2a7501893338b4db786052ade7b2 100644 (file)
@@ -16,6 +16,8 @@
 
 #include <folly/experimental/wangle/channel/ChannelHandler.h>
 #include <folly/experimental/wangle/channel/ChannelPipeline.h>
+#include <folly/experimental/wangle/channel/AsyncSocketHandler.h>
+#include <folly/experimental/wangle/channel/OutputBufferingHandler.h>
 #include <folly/io/IOBufQueue.h>
 #include <folly/Memory.h>
 #include <folly/Conv.h>
@@ -103,3 +105,13 @@ TEST(ChannelTest, PlzCompile2) {
     .finalize();
   pipeline.read(42);
 }
+
+TEST(ChannelTest, HandlersCompile) {
+  EventBase eb;
+  auto socket = AsyncSocket::newSocket(&eb);
+  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  pipeline
+    .addBack(AsyncSocketHandler(socket))
+    .addBack(OutputBufferingHandler())
+    .finalize();
+}
index 33924b6d807ff27cad9105c8ffb67b0367f19b6a..f8eb8e5572fa3f1cebd3d79ab8d62dae9bf480b3 100644 (file)
@@ -106,7 +106,7 @@ class AsyncSocket : virtual public AsyncTransport {
      *
      * If getReadBuffer() throws an exception, returns a nullptr buffer, or
      * returns a 0 length, the ReadCallback will be uninstalled and its
-     * readError() method will be invoked.
+     * readErr() method will be invoked.
      *
      * getReadBuffer() is not allowed to change the transport state before it
      * returns.  (For example, it should never uninstall the read callback, or
@@ -144,11 +144,11 @@ class AsyncSocket : virtual public AsyncTransport {
     virtual void readEOF() noexcept = 0;
 
     /**
-     * readError() will be invoked if an error occurs reading from the
+     * readErr() will be invoked if an error occurs reading from the
      * transport.
      *
      * The read callback will be automatically uninstalled immediately before
-     * readError() is invoked.
+     * readErr() is invoked.
      *
      * @param ex        An exception describing the error that occurred.
      */
@@ -174,7 +174,7 @@ class AsyncSocket : virtual public AsyncTransport {
     virtual void writeSuccess() noexcept = 0;
 
     /**
-     * writeError() will be invoked if an error occurs writing the data.
+     * writeErr() will be invoked if an error occurs writing the data.
      *
      * @param bytesWritten      The number of bytes that were successfull
      * @param ex                An exception describing the error that occurred.