From 430a2ed17f17aef40438c38c820747989f4e9bbc Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Tue, 25 Nov 2014 08:13:37 -0800 Subject: [PATCH] AsyncUDPSocket Summary: Move AsyncUDPSocket to folly. There is also one under realtime/voip/async that looks functionaly equivalent? I think this one is only used in gangplank currently. Test Plan: contbuild Reviewed By: hans@fb.com Subscribers: trunkagent, doug, alandau, bmatheny, njormrod, mshneer, folly-diffs@ FB internal diff: D1710675 Tasks: 5788116 Signature: t1:1710675:1417477000:9aebb466757554a5fa49d7c36cb504b4d8711b68 --- folly/Makefile.am | 3 + folly/io/async/AsyncSocketException.h | 1 + folly/io/async/AsyncUDPServerSocket.h | 198 +++++++++++++ folly/io/async/AsyncUDPSocket.cpp | 243 ++++++++++++++++ folly/io/async/AsyncUDPSocket.h | 162 +++++++++++ folly/io/async/test/AsyncUDPSocketTest.cpp | 305 +++++++++++++++++++++ 6 files changed, 912 insertions(+) create mode 100644 folly/io/async/AsyncUDPServerSocket.h create mode 100644 folly/io/async/AsyncUDPSocket.cpp create mode 100644 folly/io/async/AsyncUDPSocket.h create mode 100644 folly/io/async/test/AsyncUDPSocketTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index bd5cc48d..4f2d8da7 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -156,6 +156,8 @@ nobase_follyinclude_HEADERS = \ io/ShutdownSocketSet.h \ io/async/AsyncTimeout.h \ io/async/AsyncTransport.h \ + io/async/AsyncUDPServerSocket.h \ + io/async/AsyncUDPSocket.h \ io/async/AsyncServerSocket.h \ io/async/AsyncSSLServerSocket.h \ io/async/AsyncSocket.h \ @@ -292,6 +294,7 @@ libfolly_la_SOURCES = \ io/RecordIO.cpp \ io/ShutdownSocketSet.cpp \ io/async/AsyncTimeout.cpp \ + io/async/AsyncUDPSocket.cpp \ io/async/AsyncServerSocket.cpp \ io/async/AsyncSSLServerSocket.cpp \ io/async/AsyncSocket.cpp \ diff --git a/folly/io/async/AsyncSocketException.h b/folly/io/async/AsyncSocketException.h index 762e9bc0..f18bfed9 100644 --- a/folly/io/async/AsyncSocketException.h +++ b/folly/io/async/AsyncSocketException.h @@ -16,6 +16,7 @@ #pragma once +#include #include namespace folly { diff --git a/folly/io/async/AsyncUDPServerSocket.h b/folly/io/async/AsyncUDPServerSocket.h new file mode 100644 index 00000000..f2935334 --- /dev/null +++ b/folly/io/async/AsyncUDPServerSocket.h @@ -0,0 +1,198 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { + +/** + * UDP server socket + * + * It wraps a UDP socket waiting for packets and distributes them among + * a set of event loops in round robin fashion. + * + * NOTE: At the moment it is designed to work with single packet protocols + * in mind. We distribute incoming packets among all the listeners in + * round-robin fashion. So, any protocol that expects to send/recv + * more than 1 packet will not work because they will end up with + * different event base to process. + */ +class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback { + public: + class Callback { + public: + /** + * Invoked when we start reading data from socket. It is invoked in + * each acceptors/listeners event base thread. + */ + virtual void onListenStarted() noexcept = 0; + + /** + * Invoked when the server socket is closed. It is invoked in each + * acceptors/listeners event base thread. + */ + virtual void onListenStopped() noexcept = 0; + + /** + * Invoked when a new packet is received + */ + virtual void onDataAvailable(const folly::SocketAddress& addr, + std::unique_ptr buf, + bool truncated) noexcept = 0; + + virtual ~Callback() {} + }; + + /** + * Create a new UDP server socket + * + * Note about packet size - We allocate buffer of packetSize_ size to read. + * If packet are larger than this value, as per UDP protocol, remaining data + * is dropped and you get `truncated = true` in onDataAvailable callback + */ + explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500) + : evb_(evb), + packetSize_(sz), + nextListener_(0) { + } + + ~AsyncUDPServerSocket() { + if (socket_) { + close(); + } + } + + void bind(const folly::SocketAddress& address) { + CHECK(!socket_); + + socket_ = folly::make_unique(evb_); + socket_->bind(address); + } + + folly::SocketAddress address() const { + CHECK(socket_); + return socket_->address(); + } + + /** + * Add a listener to the round robin list + */ + void addListener(EventBase* evb, Callback* callback) { + listeners_.emplace_back(evb, callback); + } + + void listen() { + CHECK(socket_) << "Need to bind before listening"; + + for (auto& listener: listeners_) { + auto callback = listener.second; + + listener.first->runInEventBaseThread([callback] () mutable { + callback->onListenStarted(); + }); + } + + socket_->resumeRead(this); + } + + int getFD() { + CHECK(socket_) << "Need to bind before getting FD"; + return socket_->getFD(); + } + + void close() { + CHECK(socket_) << "Need to bind before closing"; + socket_.reset(); + } + + private: + // AsyncUDPSocket::ReadCallback + void getReadBuffer(void** buf, size_t* len) noexcept { + std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_); + } + + void onDataAvailable(const folly::SocketAddress& clientAddress, + size_t len, + bool truncated) noexcept { + buf_.postallocate(len); + auto data = buf_.split(len); + + if (listeners_.empty()) { + LOG(WARNING) << "UDP server socket dropping packet, " + << "no listener registered"; + return; + } + + if (nextListener_ >= listeners_.size()) { + nextListener_ = 0; + } + + auto client = clientAddress; + auto callback = listeners_[nextListener_].second; + auto mvp = + folly::MoveWrapper< + std::unique_ptr>(std::move(data)); + + // Schedule it in the listener's eventbase + // XXX: Speed this up + std::function f = [client, callback, mvp, truncated] () mutable { + callback->onDataAvailable(client, std::move(*mvp), truncated); + }; + + listeners_[nextListener_].first->runInEventBaseThread(f); + ++nextListener_; + } + + void onReadError(const AsyncSocketException& ex) noexcept { + LOG(ERROR) << ex.what(); + + // Lets register to continue listening for packets + socket_->resumeRead(this); + } + + void onReadClosed() noexcept { + for (auto& listener: listeners_) { + auto callback = listener.second; + + listener.first->runInEventBaseThread([callback] () mutable { + callback->onListenStopped(); + }); + } + } + + EventBase* const evb_; + const size_t packetSize_; + + std::unique_ptr socket_; + + // List of listener to distribute packets among + typedef std::pair Listener; + std::vector listeners_; + + // Next listener to send packet to + uint32_t nextListener_; + + // Temporary buffer for data + folly::IOBufQueue buf_; +}; + +} // Namespace diff --git a/folly/io/async/AsyncUDPSocket.cpp b/folly/io/async/AsyncUDPSocket.cpp new file mode 100644 index 00000000..979c2b75 --- /dev/null +++ b/folly/io/async/AsyncUDPSocket.cpp @@ -0,0 +1,243 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include +#include +#include + +namespace folly { + +AsyncUDPSocket::AsyncUDPSocket(EventBase* evb) + : EventHandler(CHECK_NOTNULL(evb)), + eventBase_(evb), + fd_(-1), + readCallback_(nullptr) { + DCHECK(evb->isInEventBaseThread()); +} + +AsyncUDPSocket::~AsyncUDPSocket() { + if (fd_ != -1) { + close(); + } +} + +void AsyncUDPSocket::bind(const folly::SocketAddress& address) { + int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP); + if (socket == -1) { + throw AsyncSocketException(AsyncSocketException::NOT_OPEN, + "error creating async udp socket", + errno); + } + + auto g = folly::makeGuard([&] { ::close(socket); }); + + // put the socket in non-blocking mode + int ret = fcntl(socket, F_SETFL, O_NONBLOCK); + if (ret != 0) { + throw AsyncSocketException(AsyncSocketException::NOT_OPEN, + "failed to put socket in non-blocking mode", + errno); + } + + // put the socket in reuse mode + int value = 1; + if (setsockopt(socket, + SOL_SOCKET, + SO_REUSEADDR, + &value, + sizeof(value)) != 0) { + throw AsyncSocketException(AsyncSocketException::NOT_OPEN, + "failed to put socket in reuse mode", + errno); + } + + // bind to the address + sockaddr_storage addrStorage; + address.getAddress(&addrStorage); + sockaddr* saddr = reinterpret_cast(&addrStorage); + if (::bind(socket, saddr, address.getActualSize()) != 0) { + throw AsyncSocketException( + AsyncSocketException::NOT_OPEN, + "failed to bind the async udp socket for:" + address.describe(), + errno); + } + + // success + g.dismiss(); + fd_ = socket; + ownership_ = FDOwnership::OWNS; + + // attach to EventHandler + EventHandler::changeHandlerFD(fd_); + + if (address.getPort() != 0) { + localAddress_ = address; + } else { + localAddress_.setFromLocalAddress(fd_); + } +} + +void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) { + CHECK_EQ(-1, fd_) << "Already bound to another FD"; + + fd_ = fd; + ownership_ = ownership; + + EventHandler::changeHandlerFD(fd_); + localAddress_.setFromLocalAddress(fd_); +} + +ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address, + const std::unique_ptr& buf) { + CHECK_NE(-1, fd_) << "Socket not yet bound"; + + // XXX: Use `sendmsg` instead of coalescing here + buf->coalesce(); + + sockaddr_storage addrStorage; + address.getAddress(&addrStorage); + sockaddr* saddr = reinterpret_cast(&addrStorage); + + return ::sendto(fd_, + buf->data(), + buf->length(), + MSG_DONTWAIT, + saddr, + address.getActualSize()); +} + +void AsyncUDPSocket::resumeRead(ReadCallback* cob) { + CHECK(!readCallback_) << "Another read callback already installed"; + CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address"; + + readCallback_ = CHECK_NOTNULL(cob); + if (!updateRegistration()) { + AsyncSocketException ex(AsyncSocketException::NOT_OPEN, + "failed to register for accept events"); + + readCallback_ = nullptr; + cob->onReadError(ex); + return; + } +} + +void AsyncUDPSocket::pauseRead() { + // It is ok to pause an already paused socket + readCallback_ = nullptr; + updateRegistration(); +} + +void AsyncUDPSocket::close() { + DCHECK(eventBase_->isInEventBaseThread()); + + if (readCallback_) { + auto cob = readCallback_; + readCallback_ = nullptr; + + cob->onReadClosed(); + } + + // Unregister any events we are registered for + unregisterHandler(); + + if (fd_ != -1 && ownership_ == FDOwnership::OWNS) { + ::close(fd_); + } + + fd_ = -1; +} + +void AsyncUDPSocket::handlerReady(uint16_t events) noexcept { + if (events & EventHandler::READ) { + DCHECK(readCallback_); + handleRead(); + } +} + +void AsyncUDPSocket::handleRead() noexcept { + void* buf{nullptr}; + size_t len{0}; + + readCallback_->getReadBuffer(&buf, &len); + if (buf == nullptr || len == 0) { + AsyncSocketException ex( + AsyncSocketException::BAD_ARGS, + "AsyncUDPSocket::getReadBuffer() returned empty buffer"); + + + auto cob = readCallback_; + readCallback_ = nullptr; + + cob->onReadError(ex); + updateRegistration(); + return; + } + + struct sockaddr_storage addrStorage; + socklen_t addrLen = sizeof(addrStorage); + memset(&addrStorage, 0, addrLen); + struct sockaddr* rawAddr = reinterpret_cast(&addrStorage); + rawAddr->sa_family = localAddress_.getFamily(); + + ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen); + if (bytesRead >= 0) { + clientAddress_.setFromSockaddr(rawAddr, addrLen); + + if (bytesRead > 0) { + bool truncated = false; + if ((size_t)bytesRead > len) { + truncated = true; + bytesRead = len; + } + + readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated); + } + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No data could be read without blocking the socket + return; + } + + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + "::recvfrom() failed", + errno); + + // In case of UDP we can continue reading from the socket + // even if the current request fails. We notify the user + // so that he can do some logging/stats collection if he wants. + auto cob = readCallback_; + readCallback_ = nullptr; + + cob->onReadError(ex); + updateRegistration(); + } +} + +bool AsyncUDPSocket::updateRegistration() noexcept { + uint16_t flags = NONE; + + if (readCallback_) { + flags |= READ; + } + + return registerHandler(flags | PERSIST); +} + +} // Namespace diff --git a/folly/io/async/AsyncUDPSocket.h b/folly/io/async/AsyncUDPSocket.h new file mode 100644 index 00000000..1c5d3f0a --- /dev/null +++ b/folly/io/async/AsyncUDPSocket.h @@ -0,0 +1,162 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace folly { + +/** + * UDP socket + */ +class AsyncUDPSocket : public EventHandler { + public: + enum class FDOwnership { + OWNS, + SHARED + }; + + class ReadCallback { + public: + /** + * Invoked when the socket becomes readable and we want buffer + * to write to. + * + * NOTE: From socket we will end up reading at most `len` bytes + * and if there were more bytes in datagram, we will end up + * dropping them. + */ + virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0; + + /** + * Invoked when a new datagraom is available on the socket. `len` + * is the number of bytes read and `truncated` is true if we had + * to drop few bytes because of running out of buffer space. + */ + virtual void onDataAvailable(const folly::SocketAddress& client, + size_t len, + bool truncated) noexcept = 0; + + /** + * Invoked when there is an error reading from the socket. + * + * NOTE: Since UDP is connectionless, you can still read from the socket. + * But you have to re-register readCallback yourself after + * onReadError. + */ + virtual void onReadError(const AsyncSocketException& ex) + noexcept = 0; + + /** + * Invoked when socket is closed and a read callback is registered. + */ + virtual void onReadClosed() noexcept = 0; + + virtual ~ReadCallback() {} + }; + + /** + * Create a new UDP socket that will run in the + * given eventbase + */ + explicit AsyncUDPSocket(EventBase* evb); + ~AsyncUDPSocket(); + + /** + * Returns the address server is listening on + */ + const folly::SocketAddress& address() const { + CHECK_NE(-1, fd_) << "Server not yet bound to an address"; + return localAddress_; + } + + /** + * Bind the socket to the following address. If port is not + * set in the `address` an ephemeral port is chosen and you can + * use `address()` method above to get it after this method successfully + * returns. + */ + void bind(const folly::SocketAddress& address); + + /** + * Use an already bound file descriptor. You can either transfer ownership + * of this FD by using ownership = FDOwnership::OWNS or share it using + * FDOwnership::SHARED. In case FD is shared, it will not be `close`d in + * destructor. + */ + void setFD(int fd, FDOwnership ownership); + + /** + * Send the data in buffer to destination. Returns the return code from + * ::sendto. + */ + ssize_t write(const folly::SocketAddress& address, + const std::unique_ptr& buf); + + /** + * Start reading datagrams + */ + void resumeRead(ReadCallback* cob); + + /** + * Pause reading datagrams + */ + void pauseRead(); + + /** + * Stop listening on the socket. + */ + void close(); + + /** + * Get internal FD used by this socket + */ + int getFD() const { + CHECK_NE(-1, fd_) << "Need to bind before getting FD out"; + return fd_; + } + private: + AsyncUDPSocket(const AsyncUDPSocket&) = delete; + AsyncUDPSocket& operator=(const AsyncUDPSocket&) = delete; + + // EventHandler + void handlerReady(uint16_t events) noexcept; + + void handleRead() noexcept; + bool updateRegistration() noexcept; + + EventBase* eventBase_; + folly::SocketAddress localAddress_; + + int fd_; + FDOwnership ownership_; + + // Temp space to receive client address + folly::SocketAddress clientAddress_; + + // Non-null only when we are reading + ReadCallback* readCallback_; +}; + +} // Namespace diff --git a/folly/io/async/test/AsyncUDPSocketTest.cpp b/folly/io/async/test/AsyncUDPSocketTest.cpp new file mode 100644 index 00000000..feadc49c --- /dev/null +++ b/folly/io/async/test/AsyncUDPSocketTest.cpp @@ -0,0 +1,305 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include + +using folly::AsyncUDPSocket; +using folly::AsyncUDPServerSocket; +using folly::AsyncTimeout; +using folly::EventBase; +using folly::SocketAddress; +using folly::IOBuf; + +class UDPAcceptor + : public AsyncUDPServerSocket::Callback { + public: + UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) { + } + + void onListenStarted() noexcept { + } + + void onListenStopped() noexcept { + } + + void onDataAvailable(const folly::SocketAddress& client, + std::unique_ptr data, + bool truncated) noexcept { + + lastClient_ = client; + lastMsg_ = data->moveToFbString().toStdString(); + + auto len = data->computeChainDataLength(); + VLOG(4) << "Worker " << n_ << " read " << len << " bytes " + << "(trun:" << truncated << ") from " << client.describe() + << " - " << lastMsg_; + + sendPong(); + } + + void sendPong() noexcept { + try { + AsyncUDPSocket socket(evb_); + socket.bind(folly::SocketAddress("127.0.0.1", 0)); + socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_)); + } catch (const std::exception& ex) { + VLOG(4) << "Failed to send PONG " << ex.what(); + } + } + + private: + EventBase* const evb_{nullptr}; + const int n_{-1}; + + folly::SocketAddress lastClient_; + std::string lastMsg_; +}; + +class UDPServer { + public: + UDPServer(EventBase* evb, folly::SocketAddress addr, int n) + : evb_(evb), addr_(addr), evbs_(n) { + } + + void start() { + CHECK(evb_->isInEventBaseThread()); + + socket_ = folly::make_unique( + evb_, + 1500); + + try { + socket_->bind(addr_); + VLOG(4) << "Server listening on " << socket_->address().describe(); + } catch (const std::exception& ex) { + LOG(FATAL) << ex.what(); + } + + acceptors_.reserve(evbs_.size()); + threads_.reserve(evbs_.size()); + + // Add numWorkers thread + int i = 0; + for (auto& evb: evbs_) { + acceptors_.emplace_back(&evb, i); + + std::thread t([&] () { + evb.loopForever(); + }); + + auto r = std::make_shared(2); + evb.runInEventBaseThread([r] () { + r->wait(); + }); + r->wait(); + + socket_->addListener(&evb, &acceptors_[i]); + threads_.emplace_back(std::move(t)); + ++i; + } + + socket_->listen(); + } + + folly::SocketAddress address() const { + return socket_->address(); + } + + void shutdown() { + CHECK(evb_->isInEventBaseThread()); + socket_->close(); + socket_.reset(); + + for (auto& evb: evbs_) { + evb.terminateLoopSoon(); + } + + for (auto& t: threads_) { + t.join(); + } + } + + private: + EventBase* const evb_{nullptr}; + const folly::SocketAddress addr_; + + std::unique_ptr socket_; + std::vector threads_; + std::vector evbs_; + std::vector acceptors_; +}; + +class UDPClient + : private AsyncUDPSocket::ReadCallback, + private AsyncTimeout { + public: + explicit UDPClient(EventBase* evb) + : AsyncTimeout(evb), + evb_(evb) { + } + + void start(const folly::SocketAddress& server, int n) { + CHECK(evb_->isInEventBaseThread()); + + server_ = server; + socket_ = folly::make_unique(evb_); + + try { + socket_->bind(folly::SocketAddress("127.0.0.1", 0)); + VLOG(4) << "Client bound to " << socket_->address().describe(); + } catch (const std::exception& ex) { + LOG(FATAL) << ex.what(); + } + + socket_->resumeRead(this); + + n_ = n; + + // Start playing ping pong + sendPing(); + } + + void shutdown() { + CHECK(evb_->isInEventBaseThread()); + socket_->pauseRead(); + socket_->close(); + socket_.reset(); + evb_->terminateLoopSoon(); + } + + void sendPing() { + if (n_ == 0) { + shutdown(); + return; + } + + --n_; + scheduleTimeout(5); + socket_->write( + server_, + folly::IOBuf::copyBuffer(folly::to("PING ", n_))); + } + + void getReadBuffer(void** buf, size_t* len) noexcept { + *buf = buf_; + *len = 1024; + } + + void onDataAvailable(const folly::SocketAddress& client, + size_t len, + bool truncated) noexcept { + VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from " + << client.describe() << " - " << std::string(buf_, len); + VLOG(4) << n_ << " left"; + + ++pongRecvd_; + + sendPing(); + } + + void onReadError(const folly::AsyncSocketException& ex) noexcept { + VLOG(4) << ex.what(); + + // Start listening for next PONG + socket_->resumeRead(this); + } + + void onReadClosed() noexcept { + CHECK(false) << "We unregister reads before closing"; + } + + void timeoutExpired() noexcept { + VLOG(4) << "Timeout expired"; + sendPing(); + } + + int pongRecvd() const { + return pongRecvd_; + } + + private: + EventBase* const evb_{nullptr}; + + folly::SocketAddress server_; + std::unique_ptr socket_; + + int pongRecvd_{0}; + + int n_{0}; + char buf_[1024]; +}; + +TEST(AsyncSocketTest, PingPong) { + folly::EventBase sevb; + UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4); + boost::barrier barrier(2); + + // Start event loop in a separate thread + auto serverThread = std::thread([&sevb] () { + sevb.loopForever(); + }); + + // Wait for event loop to start + sevb.runInEventBaseThread([&] () { barrier.wait(); }); + barrier.wait(); + + // Start the server + sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); }); + barrier.wait(); + + folly::EventBase cevb; + UDPClient client(&cevb); + + // Start event loop in a separate thread + auto clientThread = std::thread([&cevb] () { + cevb.loopForever(); + }); + + // Wait for event loop to start + cevb.runInEventBaseThread([&] () { barrier.wait(); }); + barrier.wait(); + + // Send ping + cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); }); + + // Wait for client to finish + clientThread.join(); + + // Check that some PING/PONGS were exchanged. Out of 1000 transactions + // at least 1 should succeed + CHECK_GT(client.pongRecvd(), 0); + + // Shutdown server + sevb.runInEventBaseThread([&] () { + server.shutdown(); + sevb.terminateLoopSoon(); + }); + + // Wait for server thread to joib + serverThread.join(); +} -- 2.34.1