2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
26 namespace folly { namespace wangle {
28 // This handler may only be used in a single Pipeline
29 class AsyncSocketHandler
30 : public folly::wangle::BytesToBytesHandler,
31 public AsyncSocket::ReadCallback {
33 explicit AsyncSocketHandler(
34 std::shared_ptr<AsyncSocket> socket)
35 : socket_(std::move(socket)) {}
37 AsyncSocketHandler(AsyncSocketHandler&&) = default;
39 ~AsyncSocketHandler() {
43 void attachReadCallback() {
44 socket_->setReadCB(socket_->good() ? this : nullptr);
47 void detachReadCallback() {
48 if (socket_ && socket_->getReadCallback() == this) {
49 socket_->setReadCB(nullptr);
51 auto ctx = getContext();
52 if (ctx && !firedInactive_) {
53 firedInactive_ = true;
54 ctx->fireTransportInactive();
58 void attachEventBase(folly::EventBase* eventBase) {
59 if (eventBase && !socket_->getEventBase()) {
60 socket_->attachEventBase(eventBase);
64 void detachEventBase() {
66 if (socket_->getEventBase()) {
67 socket_->detachEventBase();
71 void transportActive(Context* ctx) override {
72 ctx->getPipeline()->setTransport(socket_);
74 ctx->fireTransportActive();
77 void detachPipeline(Context* ctx) override {
81 folly::Future<void> write(
83 std::unique_ptr<folly::IOBuf> buf) override {
85 return folly::makeFuture();
88 if (!socket_->good()) {
89 VLOG(5) << "socket is closed in write()";
90 return folly::makeFuture<void>(AsyncSocketException(
91 AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
92 "socket is closed in write()"));
95 auto cb = new WriteCallback();
96 auto future = cb->promise_.getFuture();
97 socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
101 folly::Future<void> close(Context* ctx) override {
103 detachReadCallback();
106 ctx->getPipeline()->deletePipeline();
107 return folly::makeFuture();
110 // Must override to avoid warnings about hidden overloaded virtual due to
111 // AsyncSocket::ReadCallback::readEOF()
112 void readEOF(Context* ctx) override {
116 void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
117 const auto readBufferSettings = getContext()->getReadBufferSettings();
118 const auto ret = bufQueue_.preallocate(
119 readBufferSettings.first,
120 readBufferSettings.second);
121 *bufReturn = ret.first;
122 *lenReturn = ret.second;
125 void readDataAvailable(size_t len) noexcept override {
126 bufQueue_.postallocate(len);
127 getContext()->fireRead(bufQueue_);
130 void readEOF() noexcept override {
131 getContext()->fireReadEOF();
134 void readErr(const AsyncSocketException& ex)
136 getContext()->fireReadException(
137 make_exception_wrapper<AsyncSocketException>(ex));
141 class WriteCallback : private AsyncSocket::WriteCallback {
142 void writeSuccess() noexcept override {
147 void writeErr(size_t bytesWritten,
148 const AsyncSocketException& ex)
150 promise_.setException(ex);
155 friend class AsyncSocketHandler;
156 folly::Promise<void> promise_;
159 folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
160 std::shared_ptr<AsyncSocket> socket_{nullptr};
161 bool firedInactive_{false};