--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/experimental/wangle/channel/ChannelHandler.h>
+#include <folly/io/async/AsyncSocket.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventBaseManager.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/IOBufQueue.h>
+
+namespace folly { namespace wangle {
+
+class AsyncSocketHandler
+ : public folly::wangle::BytesToBytesHandler,
+ public AsyncSocket::ReadCallback {
+ public:
+ explicit AsyncSocketHandler(
+ std::shared_ptr<AsyncSocket> socket)
+ : socket_(std::move(socket)) {}
+
+ AsyncSocketHandler(AsyncSocketHandler&&) = default;
+
+ ~AsyncSocketHandler() {
+ if (socket_) {
+ detachReadCallback();
+ }
+ }
+
+ void attachReadCallback() {
+ socket_->setReadCB(socket_->good() ? this : nullptr);
+ }
+
+ void detachReadCallback() {
+ if (socket_->getReadCallback() == this) {
+ socket_->setReadCB(nullptr);
+ }
+ }
+
+ void attachEventBase(folly::EventBase* eventBase) {
+ if (eventBase && !socket_->getEventBase()) {
+ socket_->attachEventBase(eventBase);
+ }
+ }
+
+ void detachEventBase() {
+ detachReadCallback();
+ if (socket_->getEventBase()) {
+ socket_->detachEventBase();
+ }
+ }
+
+ void attachPipeline(Context* ctx) override {
+ CHECK(!ctx_);
+ ctx_ = ctx;
+ }
+
+ folly::wangle::Future<void> write(
+ Context* ctx,
+ std::unique_ptr<folly::IOBuf> buf) override {
+ if (UNLIKELY(!buf)) {
+ return folly::wangle::makeFuture();
+ }
+
+ if (!socket_->good()) {
+ VLOG(5) << "socket is closed in write()";
+ return folly::wangle::makeFuture<void>(AsyncSocketException(
+ AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
+ "socket is closed in write()"));
+ }
+
+ auto cb = new WriteCallback();
+ auto future = cb->promise_.getFuture();
+ socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
+ return future;
+ };
+
+ folly::wangle::Future<void> close(Context* ctx) {
+ if (socket_) {
+ detachReadCallback();
+ socket_->closeNow();
+ }
+ return folly::wangle::makeFuture();
+ }
+
+ // Must override to avoid warnings about hidden overloaded virtual due to
+ // AsyncSocket::ReadCallback::readEOF()
+ void readEOF(Context* ctx) override {
+ ctx->fireReadEOF();
+ }
+
+ void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
+ const auto readBufferSettings = ctx_->getReadBufferSettings();
+ const auto ret = bufQueue_.preallocate(
+ readBufferSettings.first,
+ readBufferSettings.second);
+ *bufReturn = ret.first;
+ *lenReturn = ret.second;
+ }
+
+ void readDataAvailable(size_t len) noexcept override {
+ bufQueue_.postallocate(len);
+ ctx_->fireRead(bufQueue_);
+ }
+
+ void readEOF() noexcept override {
+ ctx_->fireReadEOF();
+ }
+
+ void readErr(const AsyncSocketException& ex)
+ noexcept override {
+ ctx_->fireReadException(ex);
+ }
+
+ private:
+ class WriteCallback : private AsyncSocket::WriteCallback {
+ void writeSuccess() noexcept override {
+ promise_.setValue();
+ delete this;
+ }
+
+ void writeErr(size_t bytesWritten,
+ const AsyncSocketException& ex)
+ noexcept override {
+ promise_.setException(ex);
+ delete this;
+ }
+
+ private:
+ friend class AsyncSocketHandler;
+ folly::wangle::Promise<void> promise_;
+ };
+
+ Context* ctx_{nullptr};
+ folly::IOBufQueue bufQueue_;
+ std::shared_ptr<AsyncSocket> socket_{nullptr};
+};
+
+}}
*
* If getReadBuffer() throws an exception, returns a nullptr buffer, or
* returns a 0 length, the ReadCallback will be uninstalled and its
- * readError() method will be invoked.
+ * readErr() method will be invoked.
*
* getReadBuffer() is not allowed to change the transport state before it
* returns. (For example, it should never uninstall the read callback, or
virtual void readEOF() noexcept = 0;
/**
- * readError() will be invoked if an error occurs reading from the
+ * readErr() will be invoked if an error occurs reading from the
* transport.
*
* The read callback will be automatically uninstalled immediately before
- * readError() is invoked.
+ * readErr() is invoked.
*
* @param ex An exception describing the error that occurred.
*/
virtual void writeSuccess() noexcept = 0;
/**
- * writeError() will be invoked if an error occurs writing the data.
+ * writeErr() will be invoked if an error occurs writing the data.
*
* @param bytesWritten The number of bytes that were successfull
* @param ex An exception describing the error that occurred.