2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncUDPSocket.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/Likely.h>
21 #include <folly/portability/Fcntl.h>
22 #include <folly/portability/Sockets.h>
23 #include <folly/portability/Unistd.h>
27 // Due to the way kernel headers are included, this may or may not be defined.
28 // Number pulled from 3.10 kernel headers.
30 #define SO_REUSEPORT 15
33 namespace fsp = folly::portability::sockets;
37 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
38 : EventHandler(CHECK_NOTNULL(evb)),
41 readCallback_(nullptr) {
42 DCHECK(evb->isInEventBaseThread());
45 AsyncUDPSocket::~AsyncUDPSocket() {
51 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
52 int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
54 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
55 "error creating async udp socket",
59 auto g = folly::makeGuard([&] { ::close(socket); });
61 // put the socket in non-blocking mode
62 int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
64 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
65 "failed to put socket in non-blocking mode",
70 // put the socket in reuse mode
72 if (setsockopt(socket,
76 sizeof(value)) != 0) {
77 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
78 "failed to put socket in reuse mode",
84 // put the socket in port reuse mode
86 if (setsockopt(socket,
90 sizeof(value)) != 0) {
91 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
92 "failed to put socket in reuse_port mode",
98 // If we're using IPv6, make sure we don't accept V4-mapped connections
99 if (address.getFamily() == AF_INET6) {
101 if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
102 throw AsyncSocketException(
103 AsyncSocketException::NOT_OPEN,
104 "Failed to set IPV6_V6ONLY",
109 // bind to the address
110 sockaddr_storage addrStorage;
111 address.getAddress(&addrStorage);
112 sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
113 if (fsp::bind(socket, saddr, address.getActualSize()) != 0) {
114 throw AsyncSocketException(
115 AsyncSocketException::NOT_OPEN,
116 "failed to bind the async udp socket for:" + address.describe(),
123 ownership_ = FDOwnership::OWNS;
125 // attach to EventHandler
126 EventHandler::changeHandlerFD(fd_);
128 if (address.getPort() != 0) {
129 localAddress_ = address;
131 localAddress_.setFromLocalAddress(fd_);
135 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
136 CHECK_EQ(-1, fd_) << "Already bound to another FD";
139 ownership_ = ownership;
141 EventHandler::changeHandlerFD(fd_);
142 localAddress_.setFromLocalAddress(fd_);
145 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
146 const std::unique_ptr<folly::IOBuf>& buf) {
147 // UDP's typical MTU size is 1500, so high number of buffers
148 // really do not make sense. Optimze for buffer chains with
149 // buffers less than 16, which is the highest I can think of
150 // for a real use case.
152 size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
153 if (UNLIKELY(iovec_len == 0)) {
155 vec[0].iov_base = const_cast<uint8_t*>(buf->data());
156 vec[0].iov_len = buf->length();
160 return writev(address, vec, iovec_len);
163 ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
164 const struct iovec* vec, size_t iovec_len) {
165 CHECK_NE(-1, fd_) << "Socket not yet bound";
167 sockaddr_storage addrStorage;
168 address.getAddress(&addrStorage);
171 msg.msg_name = reinterpret_cast<void*>(&addrStorage);
172 msg.msg_namelen = address.getActualSize();
173 msg.msg_iov = const_cast<struct iovec*>(vec);
174 msg.msg_iovlen = iovec_len;
175 msg.msg_control = nullptr;
176 msg.msg_controllen = 0;
179 return ::sendmsg(fd_, &msg, 0);
182 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
183 CHECK(!readCallback_) << "Another read callback already installed";
184 CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
186 readCallback_ = CHECK_NOTNULL(cob);
187 if (!updateRegistration()) {
188 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
189 "failed to register for accept events");
191 readCallback_ = nullptr;
192 cob->onReadError(ex);
197 void AsyncUDPSocket::pauseRead() {
198 // It is ok to pause an already paused socket
199 readCallback_ = nullptr;
200 updateRegistration();
203 void AsyncUDPSocket::close() {
204 DCHECK(eventBase_->isInEventBaseThread());
207 auto cob = readCallback_;
208 readCallback_ = nullptr;
213 // Unregister any events we are registered for
216 if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
223 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
224 if (events & EventHandler::READ) {
225 DCHECK(readCallback_);
230 void AsyncUDPSocket::handleRead() noexcept {
234 readCallback_->getReadBuffer(&buf, &len);
235 if (buf == nullptr || len == 0) {
236 AsyncSocketException ex(
237 AsyncSocketException::BAD_ARGS,
238 "AsyncUDPSocket::getReadBuffer() returned empty buffer");
241 auto cob = readCallback_;
242 readCallback_ = nullptr;
244 cob->onReadError(ex);
245 updateRegistration();
249 struct sockaddr_storage addrStorage;
250 socklen_t addrLen = sizeof(addrStorage);
251 memset(&addrStorage, 0, size_t(addrLen));
252 struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
253 rawAddr->sa_family = localAddress_.getFamily();
255 ssize_t bytesRead = recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
256 if (bytesRead >= 0) {
257 clientAddress_.setFromSockaddr(rawAddr, addrLen);
260 bool truncated = false;
261 if ((size_t)bytesRead > len) {
263 bytesRead = ssize_t(len);
266 readCallback_->onDataAvailable(
267 clientAddress_, size_t(bytesRead), truncated);
270 if (errno == EAGAIN || errno == EWOULDBLOCK) {
271 // No data could be read without blocking the socket
275 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
276 "::recvfrom() failed",
279 // In case of UDP we can continue reading from the socket
280 // even if the current request fails. We notify the user
281 // so that he can do some logging/stats collection if he wants.
282 auto cob = readCallback_;
283 readCallback_ = nullptr;
285 cob->onReadError(ex);
286 updateRegistration();
290 bool AsyncUDPSocket::updateRegistration() noexcept {
291 uint16_t flags = NONE;
297 return registerHandler(uint16_t(flags | PERSIST));