2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/MoveWrapper.h>
20 #include <folly/io/IOBufQueue.h>
21 #include <folly/Memory.h>
22 #include <folly/io/async/AsyncUDPSocket.h>
23 #include <folly/io/async/EventBase.h>
30 * It wraps a UDP socket waiting for packets and distributes them among
31 * a set of event loops in round robin fashion.
33 * NOTE: At the moment it is designed to work with single packet protocols
34 * in mind. We distribute incoming packets among all the listeners in
35 * round-robin fashion. So, any protocol that expects to send/recv
36 * more than 1 packet will not work because they will end up with
37 * different event base to process.
39 class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
40 , public AsyncSocketBase {
45 * Invoked when we start reading data from socket. It is invoked in
46 * each acceptors/listeners event base thread.
48 virtual void onListenStarted() noexcept = 0;
51 * Invoked when the server socket is closed. It is invoked in each
52 * acceptors/listeners event base thread.
54 virtual void onListenStopped() noexcept = 0;
57 * Invoked when a new packet is received
59 virtual void onDataAvailable(
60 std::shared_ptr<AsyncUDPSocket> socket,
61 const folly::SocketAddress& addr,
62 std::unique_ptr<folly::IOBuf> buf,
63 bool truncated) noexcept = 0;
65 virtual ~Callback() = default;
69 * Create a new UDP server socket
71 * Note about packet size - We allocate buffer of packetSize_ size to read.
72 * If packet are larger than this value, as per UDP protocol, remaining data
73 * is dropped and you get `truncated = true` in onDataAvailable callback
75 explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
81 ~AsyncUDPServerSocket() {
87 void bind(const folly::SocketAddress& addy) {
90 socket_ = std::make_shared<AsyncUDPSocket>(evb_);
91 socket_->setReusePort(reusePort_);
95 void setReusePort(bool reusePort) {
96 reusePort_ = reusePort;
99 folly::SocketAddress address() const {
101 return socket_->address();
104 void getAddress(SocketAddress* a) const {
109 * Add a listener to the round robin list
111 void addListener(EventBase* evb, Callback* callback) {
112 listeners_.emplace_back(evb, callback);
116 CHECK(socket_) << "Need to bind before listening";
118 for (auto& listener: listeners_) {
119 auto callback = listener.second;
121 listener.first->runInEventBaseThread([callback] () mutable {
122 callback->onListenStarted();
126 socket_->resumeRead(this);
130 CHECK(socket_) << "Need to bind before getting FD";
131 return socket_->getFD();
135 CHECK(socket_) << "Need to bind before closing";
140 EventBase* getEventBase() const {
145 // AsyncUDPSocket::ReadCallback
146 void getReadBuffer(void** buf, size_t* len) noexcept {
147 std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
150 void onDataAvailable(const folly::SocketAddress& clientAddress,
152 bool truncated) noexcept {
153 buf_.postallocate(len);
154 auto data = buf_.split(len);
156 if (listeners_.empty()) {
157 LOG(WARNING) << "UDP server socket dropping packet, "
158 << "no listener registered";
162 if (nextListener_ >= listeners_.size()) {
166 auto client = clientAddress;
167 auto callback = listeners_[nextListener_].second;
170 std::unique_ptr<folly::IOBuf>>(std::move(data));
171 auto socket = socket_;
173 // Schedule it in the listener's eventbase
174 // XXX: Speed this up
175 std::function<void()> f = [socket, client, callback, mvp, truncated] () mutable {
176 callback->onDataAvailable(socket, client, std::move(*mvp), truncated);
179 listeners_[nextListener_].first->runInEventBaseThread(f);
183 void onReadError(const AsyncSocketException& ex) noexcept {
184 LOG(ERROR) << ex.what();
186 // Lets register to continue listening for packets
187 socket_->resumeRead(this);
190 void onReadClosed() noexcept {
191 for (auto& listener: listeners_) {
192 auto callback = listener.second;
194 listener.first->runInEventBaseThread([callback] () mutable {
195 callback->onListenStopped();
200 EventBase* const evb_;
201 const size_t packetSize_;
203 std::shared_ptr<AsyncUDPSocket> socket_;
205 // List of listener to distribute packets among
206 typedef std::pair<EventBase*, Callback*> Listener;
207 std::vector<Listener> listeners_;
209 // Next listener to send packet to
210 uint32_t nextListener_;
212 // Temporary buffer for data
213 folly::IOBufQueue buf_;
215 bool reusePort_{false};