ad76e08ea4584902c70ff2663c583a5e338fb166
[folly.git] / folly / wangle / channel / AsyncSocketHandler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25
26 namespace folly { namespace wangle {
27
28 class AsyncSocketHandler
29   : public folly::wangle::BytesToBytesHandler,
30     public AsyncSocket::ReadCallback {
31  public:
32   explicit AsyncSocketHandler(
33       std::shared_ptr<AsyncSocket> socket)
34     : socket_(std::move(socket)) {}
35
36   AsyncSocketHandler(AsyncSocketHandler&&) = default;
37
38   ~AsyncSocketHandler() {
39     if (socket_) {
40       detachReadCallback();
41     }
42   }
43
44   void attachReadCallback() {
45     socket_->setReadCB(socket_->good() ? this : nullptr);
46   }
47
48   void detachReadCallback() {
49     if (socket_->getReadCallback() == this) {
50       socket_->setReadCB(nullptr);
51     }
52   }
53
54   void attachEventBase(folly::EventBase* eventBase) {
55     if (eventBase && !socket_->getEventBase()) {
56       socket_->attachEventBase(eventBase);
57     }
58   }
59
60   void detachEventBase() {
61     detachReadCallback();
62     if (socket_->getEventBase()) {
63       socket_->detachEventBase();
64     }
65   }
66
67   void attachPipeline(Context* ctx) override {
68     CHECK(!ctx_);
69     ctx_ = ctx;
70     attachReadCallback();
71   }
72
73   folly::Future<void> write(
74       Context* ctx,
75       std::unique_ptr<folly::IOBuf> buf) override {
76     if (UNLIKELY(!buf)) {
77       return folly::makeFuture();
78     }
79
80     if (!socket_->good()) {
81       VLOG(5) << "socket is closed in write()";
82       return folly::makeFuture<void>(AsyncSocketException(
83           AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
84           "socket is closed in write()"));
85     }
86
87     auto cb = new WriteCallback();
88     auto future = cb->promise_.getFuture();
89     socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
90     return future;
91   };
92
93   folly::Future<void> close(Context* ctx) override {
94     if (socket_) {
95       detachReadCallback();
96       socket_->closeNow();
97     }
98     return folly::makeFuture();
99   }
100
101   // Must override to avoid warnings about hidden overloaded virtual due to
102   // AsyncSocket::ReadCallback::readEOF()
103   void readEOF(Context* ctx) override {
104     ctx->fireReadEOF();
105   }
106
107   void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
108     const auto readBufferSettings = ctx_->getReadBufferSettings();
109     const auto ret = bufQueue_.preallocate(
110         readBufferSettings.first,
111         readBufferSettings.second);
112     *bufReturn = ret.first;
113     *lenReturn = ret.second;
114   }
115
116   void readDataAvailable(size_t len) noexcept override {
117     bufQueue_.postallocate(len);
118     ctx_->fireRead(bufQueue_);
119   }
120
121   void readEOF() noexcept override {
122     ctx_->fireReadEOF();
123   }
124
125   void readErr(const AsyncSocketException& ex)
126     noexcept override {
127     ctx_->fireReadException(make_exception_wrapper<AsyncSocketException>(ex));
128   }
129
130  private:
131   class WriteCallback : private AsyncSocket::WriteCallback {
132     void writeSuccess() noexcept override {
133       promise_.setValue();
134       delete this;
135     }
136
137     void writeErr(size_t bytesWritten,
138                     const AsyncSocketException& ex)
139       noexcept override {
140       promise_.setException(ex);
141       delete this;
142     }
143
144    private:
145     friend class AsyncSocketHandler;
146     folly::Promise<void> promise_;
147   };
148
149   Context* ctx_{nullptr};
150   folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
151   std::shared_ptr<AsyncSocket> socket_{nullptr};
152 };
153
154 }}