2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
26 namespace folly { namespace wangle {
28 class AsyncSocketHandler
29 : public folly::wangle::BytesToBytesHandler,
30 public AsyncSocket::ReadCallback {
32 explicit AsyncSocketHandler(
33 std::shared_ptr<AsyncSocket> socket)
34 : socket_(std::move(socket)) {}
36 AsyncSocketHandler(AsyncSocketHandler&&) = default;
38 ~AsyncSocketHandler() {
44 void attachReadCallback() {
45 socket_->setReadCB(socket_->good() ? this : nullptr);
48 void detachReadCallback() {
49 if (socket_->getReadCallback() == this) {
50 socket_->setReadCB(nullptr);
54 void attachEventBase(folly::EventBase* eventBase) {
55 if (eventBase && !socket_->getEventBase()) {
56 socket_->attachEventBase(eventBase);
60 void detachEventBase() {
62 if (socket_->getEventBase()) {
63 socket_->detachEventBase();
67 void attachPipeline(Context* ctx) override {
73 folly::Future<void> write(
75 std::unique_ptr<folly::IOBuf> buf) override {
77 return folly::makeFuture();
80 if (!socket_->good()) {
81 VLOG(5) << "socket is closed in write()";
82 return folly::makeFuture<void>(AsyncSocketException(
83 AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
84 "socket is closed in write()"));
87 auto cb = new WriteCallback();
88 auto future = cb->promise_.getFuture();
89 socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
93 folly::Future<void> close(Context* ctx) override {
98 return folly::makeFuture();
101 // Must override to avoid warnings about hidden overloaded virtual due to
102 // AsyncSocket::ReadCallback::readEOF()
103 void readEOF(Context* ctx) override {
107 void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
108 const auto readBufferSettings = ctx_->getReadBufferSettings();
109 const auto ret = bufQueue_.preallocate(
110 readBufferSettings.first,
111 readBufferSettings.second);
112 *bufReturn = ret.first;
113 *lenReturn = ret.second;
116 void readDataAvailable(size_t len) noexcept override {
117 bufQueue_.postallocate(len);
118 ctx_->fireRead(bufQueue_);
121 void readEOF() noexcept override {
125 void readErr(const AsyncSocketException& ex)
127 ctx_->fireReadException(make_exception_wrapper<AsyncSocketException>(ex));
131 class WriteCallback : private AsyncSocket::WriteCallback {
132 void writeSuccess() noexcept override {
137 void writeErr(size_t bytesWritten,
138 const AsyncSocketException& ex)
140 promise_.setException(ex);
145 friend class AsyncSocketHandler;
146 folly::Promise<void> promise_;
149 Context* ctx_{nullptr};
150 folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
151 std::shared_ptr<AsyncSocket> socket_{nullptr};