2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncUDPSocket.h>
18 #include <folly/io/async/AsyncUDPServerSocket.h>
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/SocketAddress.h>
23 #include <boost/thread/barrier.hpp>
25 #include <folly/io/IOBuf.h>
29 #include <gtest/gtest.h>
31 using folly::AsyncUDPSocket;
32 using folly::AsyncUDPServerSocket;
33 using folly::AsyncTimeout;
34 using folly::EventBase;
35 using folly::SocketAddress;
39 : public AsyncUDPServerSocket::Callback {
41 UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
44 void onListenStarted() noexcept override {}
46 void onListenStopped() noexcept override {}
48 void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
49 const folly::SocketAddress& client,
50 std::unique_ptr<folly::IOBuf> data,
51 bool truncated) noexcept override {
54 lastMsg_ = data->moveToFbString().toStdString();
56 auto len = data->computeChainDataLength();
57 VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
58 << "(trun:" << truncated << ") from " << client.describe()
64 void sendPong() noexcept {
66 AsyncUDPSocket socket(evb_);
67 socket.bind(folly::SocketAddress("127.0.0.1", 0));
68 socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
69 } catch (const std::exception& ex) {
70 VLOG(4) << "Failed to send PONG " << ex.what();
75 EventBase* const evb_{nullptr};
78 folly::SocketAddress lastClient_;
84 UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
85 : evb_(evb), addr_(addr), evbs_(n) {
89 CHECK(evb_->isInEventBaseThread());
91 socket_ = folly::make_unique<AsyncUDPServerSocket>(
97 VLOG(4) << "Server listening on " << socket_->address().describe();
98 } catch (const std::exception& ex) {
99 LOG(FATAL) << ex.what();
102 acceptors_.reserve(evbs_.size());
103 threads_.reserve(evbs_.size());
105 // Add numWorkers thread
107 for (auto& evb: evbs_) {
108 acceptors_.emplace_back(&evb, i);
110 std::thread t([&] () {
114 auto r = std::make_shared<boost::barrier>(2);
115 evb.runInEventBaseThread([r] () {
120 socket_->addListener(&evb, &acceptors_[i]);
121 threads_.emplace_back(std::move(t));
128 folly::SocketAddress address() const {
129 return socket_->address();
133 CHECK(evb_->isInEventBaseThread());
137 for (auto& evb: evbs_) {
138 evb.terminateLoopSoon();
141 for (auto& t: threads_) {
147 EventBase* const evb_{nullptr};
148 const folly::SocketAddress addr_;
150 std::unique_ptr<AsyncUDPServerSocket> socket_;
151 std::vector<std::thread> threads_;
152 std::vector<folly::EventBase> evbs_;
153 std::vector<UDPAcceptor> acceptors_;
157 : private AsyncUDPSocket::ReadCallback,
158 private AsyncTimeout {
160 explicit UDPClient(EventBase* evb)
165 void start(const folly::SocketAddress& server, int n) {
166 CHECK(evb_->isInEventBaseThread());
169 socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
172 socket_->bind(folly::SocketAddress("127.0.0.1", 0));
173 VLOG(4) << "Client bound to " << socket_->address().describe();
174 } catch (const std::exception& ex) {
175 LOG(FATAL) << ex.what();
178 socket_->resumeRead(this);
182 // Start playing ping pong
187 CHECK(evb_->isInEventBaseThread());
188 socket_->pauseRead();
191 evb_->terminateLoopSoon();
204 folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
207 void getReadBuffer(void** buf, size_t* len) noexcept override {
212 void onDataAvailable(const folly::SocketAddress& client,
214 bool truncated) noexcept override {
215 VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
216 << client.describe() << " - " << std::string(buf_, len);
217 VLOG(4) << n_ << " left";
224 void onReadError(const folly::AsyncSocketException& ex) noexcept override {
225 VLOG(4) << ex.what();
227 // Start listening for next PONG
228 socket_->resumeRead(this);
231 void onReadClosed() noexcept override {
232 CHECK(false) << "We unregister reads before closing";
235 void timeoutExpired() noexcept override {
236 VLOG(4) << "Timeout expired";
240 int pongRecvd() const {
245 EventBase* const evb_{nullptr};
247 folly::SocketAddress server_;
248 std::unique_ptr<AsyncUDPSocket> socket_;
256 TEST(AsyncSocketTest, PingPong) {
257 folly::EventBase sevb;
258 UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
259 boost::barrier barrier(2);
261 // Start event loop in a separate thread
262 auto serverThread = std::thread([&sevb] () {
266 // Wait for event loop to start
267 sevb.runInEventBaseThread([&] () { barrier.wait(); });
271 sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
274 folly::EventBase cevb;
275 UDPClient client(&cevb);
277 // Start event loop in a separate thread
278 auto clientThread = std::thread([&cevb] () {
282 // Wait for event loop to start
283 cevb.runInEventBaseThread([&] () { barrier.wait(); });
287 cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
289 // Wait for client to finish
292 // Check that some PING/PONGS were exchanged. Out of 1000 transactions
293 // at least 1 should succeed
294 CHECK_GT(client.pongRecvd(), 0);
297 sevb.runInEventBaseThread([&] () {
299 sevb.terminateLoopSoon();
302 // Wait for server thread to joib