Merge commit '64f2f2734ad853784bdd260bcf31e065c47c0741' into fix-configure-pthread...
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20
21 #include <errno.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24
25 namespace folly {
26
27 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
28     : EventHandler(CHECK_NOTNULL(evb)),
29       eventBase_(evb),
30       fd_(-1),
31       readCallback_(nullptr) {
32   DCHECK(evb->isInEventBaseThread());
33 }
34
35 AsyncUDPSocket::~AsyncUDPSocket() {
36   if (fd_ != -1) {
37     close();
38   }
39 }
40
41 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
42   int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
43   if (socket == -1) {
44     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
45                               "error creating async udp socket",
46                               errno);
47   }
48
49   auto g = folly::makeGuard([&] { ::close(socket); });
50
51   // put the socket in non-blocking mode
52   int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
53   if (ret != 0) {
54     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
55                               "failed to put socket in non-blocking mode",
56                               errno);
57   }
58
59   // put the socket in reuse mode
60   int value = 1;
61   if (setsockopt(socket,
62                  SOL_SOCKET,
63                  SO_REUSEADDR,
64                  &value,
65                  sizeof(value)) != 0) {
66     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
67                               "failed to put socket in reuse mode",
68                               errno);
69   }
70
71   // bind to the address
72   sockaddr_storage addrStorage;
73   address.getAddress(&addrStorage);
74   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
75   if (::bind(socket, saddr, address.getActualSize()) != 0) {
76     throw AsyncSocketException(
77         AsyncSocketException::NOT_OPEN,
78         "failed to bind the async udp socket for:" + address.describe(),
79         errno);
80   }
81
82   // success
83   g.dismiss();
84   fd_ = socket;
85   ownership_ = FDOwnership::OWNS;
86
87   // attach to EventHandler
88   EventHandler::changeHandlerFD(fd_);
89
90   if (address.getPort() != 0) {
91     localAddress_ = address;
92   } else {
93     localAddress_.setFromLocalAddress(fd_);
94   }
95 }
96
97 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
98   CHECK_EQ(-1, fd_) << "Already bound to another FD";
99
100   fd_ = fd;
101   ownership_ = ownership;
102
103   EventHandler::changeHandlerFD(fd_);
104   localAddress_.setFromLocalAddress(fd_);
105 }
106
107 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
108                                const std::unique_ptr<folly::IOBuf>& buf) {
109   CHECK_NE(-1, fd_) << "Socket not yet bound";
110
111   // XXX: Use `sendmsg` instead of coalescing here
112   buf->coalesce();
113
114   sockaddr_storage addrStorage;
115   address.getAddress(&addrStorage);
116   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
117
118   return ::sendto(fd_,
119                   buf->data(),
120                   buf->length(),
121                   MSG_DONTWAIT,
122                   saddr,
123                   address.getActualSize());
124 }
125
126 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
127   CHECK(!readCallback_) << "Another read callback already installed";
128   CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
129
130   readCallback_ = CHECK_NOTNULL(cob);
131   if (!updateRegistration()) {
132     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
133                            "failed to register for accept events");
134
135     readCallback_ = nullptr;
136     cob->onReadError(ex);
137     return;
138   }
139 }
140
141 void AsyncUDPSocket::pauseRead() {
142   // It is ok to pause an already paused socket
143   readCallback_ = nullptr;
144   updateRegistration();
145 }
146
147 void AsyncUDPSocket::close() {
148   DCHECK(eventBase_->isInEventBaseThread());
149
150   if (readCallback_) {
151     auto cob = readCallback_;
152     readCallback_ = nullptr;
153
154     cob->onReadClosed();
155   }
156
157   // Unregister any events we are registered for
158   unregisterHandler();
159
160   if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
161     ::close(fd_);
162   }
163
164   fd_ = -1;
165 }
166
167 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
168   if (events & EventHandler::READ) {
169     DCHECK(readCallback_);
170     handleRead();
171   }
172 }
173
174 void AsyncUDPSocket::handleRead() noexcept {
175   void* buf{nullptr};
176   size_t len{0};
177
178   readCallback_->getReadBuffer(&buf, &len);
179   if (buf == nullptr || len == 0) {
180     AsyncSocketException ex(
181         AsyncSocketException::BAD_ARGS,
182         "AsyncUDPSocket::getReadBuffer() returned empty buffer");
183
184
185     auto cob = readCallback_;
186     readCallback_ = nullptr;
187
188     cob->onReadError(ex);
189     updateRegistration();
190     return;
191   }
192
193   struct sockaddr_storage addrStorage;
194   socklen_t addrLen = sizeof(addrStorage);
195   memset(&addrStorage, 0, addrLen);
196   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
197   rawAddr->sa_family = localAddress_.getFamily();
198
199   ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
200   if (bytesRead >= 0) {
201     clientAddress_.setFromSockaddr(rawAddr, addrLen);
202
203     if (bytesRead > 0) {
204       bool truncated = false;
205       if ((size_t)bytesRead > len) {
206         truncated = true;
207         bytesRead = len;
208       }
209
210       readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
211     }
212   } else {
213     if (errno == EAGAIN || errno == EWOULDBLOCK) {
214       // No data could be read without blocking the socket
215       return;
216     }
217
218     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
219                            "::recvfrom() failed",
220                            errno);
221
222     // In case of UDP we can continue reading from the socket
223     // even if the current request fails. We notify the user
224     // so that he can do some logging/stats collection if he wants.
225     auto cob = readCallback_;
226     readCallback_ = nullptr;
227
228     cob->onReadError(ex);
229     updateRegistration();
230   }
231 }
232
233 bool AsyncUDPSocket::updateRegistration() noexcept {
234   uint16_t flags = NONE;
235
236   if (readCallback_) {
237     flags |= READ;
238   }
239
240   return registerHandler(flags | PERSIST);
241 }
242
243 } // Namespace