2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Conv.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/AsyncUDPServerSocket.h>
24 #include <folly/io/async/AsyncUDPSocket.h>
25 #include <folly/io/async/EventBase.h>
26 #include <folly/portability/GMock.h>
27 #include <folly/portability/GTest.h>
29 using folly::AsyncUDPSocket;
30 using folly::AsyncUDPServerSocket;
31 using folly::AsyncTimeout;
32 using folly::EventBase;
33 using folly::SocketAddress;
35 using namespace testing;
38 : public AsyncUDPServerSocket::Callback {
40 UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
43 void onListenStarted() noexcept override {}
45 void onListenStopped() noexcept override {}
47 void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
48 const folly::SocketAddress& client,
49 std::unique_ptr<folly::IOBuf> data,
50 bool truncated) noexcept override {
53 lastMsg_ = data->moveToFbString().toStdString();
55 auto len = data->computeChainDataLength();
56 VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
57 << "(trun:" << truncated << ") from " << client.describe()
63 void sendPong() noexcept {
65 AsyncUDPSocket socket(evb_);
66 socket.bind(folly::SocketAddress("127.0.0.1", 0));
67 socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
68 } catch (const std::exception& ex) {
69 VLOG(4) << "Failed to send PONG " << ex.what();
74 EventBase* const evb_{nullptr};
77 folly::SocketAddress lastClient_;
83 UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
84 : evb_(evb), addr_(addr), evbs_(n) {
88 CHECK(evb_->isInEventBaseThread());
90 socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
94 VLOG(4) << "Server listening on " << socket_->address().describe();
95 } catch (const std::exception& ex) {
96 LOG(FATAL) << ex.what();
99 acceptors_.reserve(evbs_.size());
100 threads_.reserve(evbs_.size());
102 // Add numWorkers thread
104 for (auto& evb: evbs_) {
105 acceptors_.emplace_back(&evb, i);
107 std::thread t([&] () {
111 evb.waitUntilRunning();
113 socket_->addListener(&evb, &acceptors_[i]);
114 threads_.emplace_back(std::move(t));
121 folly::SocketAddress address() const {
122 return socket_->address();
126 CHECK(evb_->isInEventBaseThread());
130 for (auto& evb: evbs_) {
131 evb.terminateLoopSoon();
134 for (auto& t: threads_) {
140 EventBase* const evb_{nullptr};
141 const folly::SocketAddress addr_;
143 std::unique_ptr<AsyncUDPServerSocket> socket_;
144 std::vector<std::thread> threads_;
145 std::vector<folly::EventBase> evbs_;
146 std::vector<UDPAcceptor> acceptors_;
150 : private AsyncUDPSocket::ReadCallback,
151 private AsyncTimeout {
153 explicit UDPClient(EventBase* evb)
158 void start(const folly::SocketAddress& server, int n) {
159 CHECK(evb_->isInEventBaseThread());
162 socket_ = std::make_unique<AsyncUDPSocket>(evb_);
165 socket_->bind(folly::SocketAddress("127.0.0.1", 0));
166 VLOG(4) << "Client bound to " << socket_->address().describe();
167 } catch (const std::exception& ex) {
168 LOG(FATAL) << ex.what();
171 socket_->resumeRead(this);
175 // Start playing ping pong
180 CHECK(evb_->isInEventBaseThread());
181 socket_->pauseRead();
184 evb_->terminateLoopSoon();
197 folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
200 void getReadBuffer(void** buf, size_t* len) noexcept override {
205 void onDataAvailable(const folly::SocketAddress& client,
207 bool truncated) noexcept override {
208 VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
209 << client.describe() << " - " << std::string(buf_, len);
210 VLOG(4) << n_ << " left";
217 void onReadError(const folly::AsyncSocketException& ex) noexcept override {
218 VLOG(4) << ex.what();
220 // Start listening for next PONG
221 socket_->resumeRead(this);
224 void onReadClosed() noexcept override {
225 CHECK(false) << "We unregister reads before closing";
228 void timeoutExpired() noexcept override {
229 VLOG(4) << "Timeout expired";
233 int pongRecvd() const {
238 EventBase* const evb_{nullptr};
240 folly::SocketAddress server_;
241 std::unique_ptr<AsyncUDPSocket> socket_;
249 TEST(AsyncSocketTest, PingPong) {
250 folly::EventBase sevb;
251 UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
253 // Start event loop in a separate thread
254 auto serverThread = std::thread([&sevb] () {
258 // Wait for event loop to start
259 sevb.waitUntilRunning();
262 sevb.runInEventBaseThreadAndWait([&]() { server.start(); });
264 folly::EventBase cevb;
265 UDPClient client(&cevb);
267 // Start event loop in a separate thread
268 auto clientThread = std::thread([&cevb] () {
272 // Wait for event loop to start
273 cevb.waitUntilRunning();
276 cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
278 // Wait for client to finish
281 // Check that some PING/PONGS were exchanged. Out of 1000 transactions
282 // at least 1 should succeed
283 CHECK_GT(client.pongRecvd(), 0);
286 sevb.runInEventBaseThread([&] () {
288 sevb.terminateLoopSoon();
291 // Wait for server thread to joib
295 class TestAsyncUDPSocket : public AsyncUDPSocket {
297 explicit TestAsyncUDPSocket(EventBase* evb) : AsyncUDPSocket(evb) {}
299 MOCK_METHOD3(sendmsg, ssize_t(int, const struct msghdr*, int));