io/ShutdownSocketSet.h \
io/async/AsyncTimeout.h \
io/async/AsyncTransport.h \
+ io/async/AsyncUDPServerSocket.h \
+ io/async/AsyncUDPSocket.h \
io/async/AsyncServerSocket.h \
io/async/AsyncSSLServerSocket.h \
io/async/AsyncSocket.h \
io/RecordIO.cpp \
io/ShutdownSocketSet.cpp \
io/async/AsyncTimeout.cpp \
+ io/async/AsyncUDPSocket.cpp \
io/async/AsyncServerSocket.cpp \
io/async/AsyncSSLServerSocket.cpp \
io/async/AsyncSocket.cpp \
#pragma once
+#include <folly/Format.h>
#include <folly/io/async/DelayedDestruction.h>
namespace folly {
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/MoveWrapper.h>
+#include <folly/io/IOBufQueue.h>
+#include <folly/Memory.h>
+#include <folly/io/async/AsyncUDPSocket.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly {
+
+/**
+ * UDP server socket
+ *
+ * It wraps a UDP socket waiting for packets and distributes them among
+ * a set of event loops in round robin fashion.
+ *
+ * NOTE: At the moment it is designed to work with single packet protocols
+ * in mind. We distribute incoming packets among all the listeners in
+ * round-robin fashion. So, any protocol that expects to send/recv
+ * more than 1 packet will not work because they will end up with
+ * different event base to process.
+ */
+class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
+ public:
+ class Callback {
+ public:
+ /**
+ * Invoked when we start reading data from socket. It is invoked in
+ * each acceptors/listeners event base thread.
+ */
+ virtual void onListenStarted() noexcept = 0;
+
+ /**
+ * Invoked when the server socket is closed. It is invoked in each
+ * acceptors/listeners event base thread.
+ */
+ virtual void onListenStopped() noexcept = 0;
+
+ /**
+ * Invoked when a new packet is received
+ */
+ virtual void onDataAvailable(const folly::SocketAddress& addr,
+ std::unique_ptr<folly::IOBuf> buf,
+ bool truncated) noexcept = 0;
+
+ virtual ~Callback() {}
+ };
+
+ /**
+ * Create a new UDP server socket
+ *
+ * Note about packet size - We allocate buffer of packetSize_ size to read.
+ * If packet are larger than this value, as per UDP protocol, remaining data
+ * is dropped and you get `truncated = true` in onDataAvailable callback
+ */
+ explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
+ : evb_(evb),
+ packetSize_(sz),
+ nextListener_(0) {
+ }
+
+ ~AsyncUDPServerSocket() {
+ if (socket_) {
+ close();
+ }
+ }
+
+ void bind(const folly::SocketAddress& address) {
+ CHECK(!socket_);
+
+ socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
+ socket_->bind(address);
+ }
+
+ folly::SocketAddress address() const {
+ CHECK(socket_);
+ return socket_->address();
+ }
+
+ /**
+ * Add a listener to the round robin list
+ */
+ void addListener(EventBase* evb, Callback* callback) {
+ listeners_.emplace_back(evb, callback);
+ }
+
+ void listen() {
+ CHECK(socket_) << "Need to bind before listening";
+
+ for (auto& listener: listeners_) {
+ auto callback = listener.second;
+
+ listener.first->runInEventBaseThread([callback] () mutable {
+ callback->onListenStarted();
+ });
+ }
+
+ socket_->resumeRead(this);
+ }
+
+ int getFD() {
+ CHECK(socket_) << "Need to bind before getting FD";
+ return socket_->getFD();
+ }
+
+ void close() {
+ CHECK(socket_) << "Need to bind before closing";
+ socket_.reset();
+ }
+
+ private:
+ // AsyncUDPSocket::ReadCallback
+ void getReadBuffer(void** buf, size_t* len) noexcept {
+ std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
+ }
+
+ void onDataAvailable(const folly::SocketAddress& clientAddress,
+ size_t len,
+ bool truncated) noexcept {
+ buf_.postallocate(len);
+ auto data = buf_.split(len);
+
+ if (listeners_.empty()) {
+ LOG(WARNING) << "UDP server socket dropping packet, "
+ << "no listener registered";
+ return;
+ }
+
+ if (nextListener_ >= listeners_.size()) {
+ nextListener_ = 0;
+ }
+
+ auto client = clientAddress;
+ auto callback = listeners_[nextListener_].second;
+ auto mvp =
+ folly::MoveWrapper<
+ std::unique_ptr<folly::IOBuf>>(std::move(data));
+
+ // Schedule it in the listener's eventbase
+ // XXX: Speed this up
+ std::function<void()> f = [client, callback, mvp, truncated] () mutable {
+ callback->onDataAvailable(client, std::move(*mvp), truncated);
+ };
+
+ listeners_[nextListener_].first->runInEventBaseThread(f);
+ ++nextListener_;
+ }
+
+ void onReadError(const AsyncSocketException& ex) noexcept {
+ LOG(ERROR) << ex.what();
+
+ // Lets register to continue listening for packets
+ socket_->resumeRead(this);
+ }
+
+ void onReadClosed() noexcept {
+ for (auto& listener: listeners_) {
+ auto callback = listener.second;
+
+ listener.first->runInEventBaseThread([callback] () mutable {
+ callback->onListenStopped();
+ });
+ }
+ }
+
+ EventBase* const evb_;
+ const size_t packetSize_;
+
+ std::unique_ptr<AsyncUDPSocket> socket_;
+
+ // List of listener to distribute packets among
+ typedef std::pair<EventBase*, Callback*> Listener;
+ std::vector<Listener> listeners_;
+
+ // Next listener to send packet to
+ uint32_t nextListener_;
+
+ // Temporary buffer for data
+ folly::IOBufQueue buf_;
+};
+
+} // Namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncUDPSocket.h>
+
+#include <folly/io/async/EventBase.h>
+
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+namespace folly {
+
+AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
+ : EventHandler(CHECK_NOTNULL(evb)),
+ eventBase_(evb),
+ fd_(-1),
+ readCallback_(nullptr) {
+ DCHECK(evb->isInEventBaseThread());
+}
+
+AsyncUDPSocket::~AsyncUDPSocket() {
+ if (fd_ != -1) {
+ close();
+ }
+}
+
+void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
+ int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
+ if (socket == -1) {
+ throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+ "error creating async udp socket",
+ errno);
+ }
+
+ auto g = folly::makeGuard([&] { ::close(socket); });
+
+ // put the socket in non-blocking mode
+ int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
+ if (ret != 0) {
+ throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+ "failed to put socket in non-blocking mode",
+ errno);
+ }
+
+ // put the socket in reuse mode
+ int value = 1;
+ if (setsockopt(socket,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ &value,
+ sizeof(value)) != 0) {
+ throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+ "failed to put socket in reuse mode",
+ errno);
+ }
+
+ // bind to the address
+ sockaddr_storage addrStorage;
+ address.getAddress(&addrStorage);
+ sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+ if (::bind(socket, saddr, address.getActualSize()) != 0) {
+ throw AsyncSocketException(
+ AsyncSocketException::NOT_OPEN,
+ "failed to bind the async udp socket for:" + address.describe(),
+ errno);
+ }
+
+ // success
+ g.dismiss();
+ fd_ = socket;
+ ownership_ = FDOwnership::OWNS;
+
+ // attach to EventHandler
+ EventHandler::changeHandlerFD(fd_);
+
+ if (address.getPort() != 0) {
+ localAddress_ = address;
+ } else {
+ localAddress_.setFromLocalAddress(fd_);
+ }
+}
+
+void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
+ CHECK_EQ(-1, fd_) << "Already bound to another FD";
+
+ fd_ = fd;
+ ownership_ = ownership;
+
+ EventHandler::changeHandlerFD(fd_);
+ localAddress_.setFromLocalAddress(fd_);
+}
+
+ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
+ const std::unique_ptr<folly::IOBuf>& buf) {
+ CHECK_NE(-1, fd_) << "Socket not yet bound";
+
+ // XXX: Use `sendmsg` instead of coalescing here
+ buf->coalesce();
+
+ sockaddr_storage addrStorage;
+ address.getAddress(&addrStorage);
+ sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+
+ return ::sendto(fd_,
+ buf->data(),
+ buf->length(),
+ MSG_DONTWAIT,
+ saddr,
+ address.getActualSize());
+}
+
+void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
+ CHECK(!readCallback_) << "Another read callback already installed";
+ CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
+
+ readCallback_ = CHECK_NOTNULL(cob);
+ if (!updateRegistration()) {
+ AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+ "failed to register for accept events");
+
+ readCallback_ = nullptr;
+ cob->onReadError(ex);
+ return;
+ }
+}
+
+void AsyncUDPSocket::pauseRead() {
+ // It is ok to pause an already paused socket
+ readCallback_ = nullptr;
+ updateRegistration();
+}
+
+void AsyncUDPSocket::close() {
+ DCHECK(eventBase_->isInEventBaseThread());
+
+ if (readCallback_) {
+ auto cob = readCallback_;
+ readCallback_ = nullptr;
+
+ cob->onReadClosed();
+ }
+
+ // Unregister any events we are registered for
+ unregisterHandler();
+
+ if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
+ ::close(fd_);
+ }
+
+ fd_ = -1;
+}
+
+void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
+ if (events & EventHandler::READ) {
+ DCHECK(readCallback_);
+ handleRead();
+ }
+}
+
+void AsyncUDPSocket::handleRead() noexcept {
+ void* buf{nullptr};
+ size_t len{0};
+
+ readCallback_->getReadBuffer(&buf, &len);
+ if (buf == nullptr || len == 0) {
+ AsyncSocketException ex(
+ AsyncSocketException::BAD_ARGS,
+ "AsyncUDPSocket::getReadBuffer() returned empty buffer");
+
+
+ auto cob = readCallback_;
+ readCallback_ = nullptr;
+
+ cob->onReadError(ex);
+ updateRegistration();
+ return;
+ }
+
+ struct sockaddr_storage addrStorage;
+ socklen_t addrLen = sizeof(addrStorage);
+ memset(&addrStorage, 0, addrLen);
+ struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
+ rawAddr->sa_family = localAddress_.getFamily();
+
+ ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
+ if (bytesRead >= 0) {
+ clientAddress_.setFromSockaddr(rawAddr, addrLen);
+
+ if (bytesRead > 0) {
+ bool truncated = false;
+ if ((size_t)bytesRead > len) {
+ truncated = true;
+ bytesRead = len;
+ }
+
+ readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
+ }
+ } else {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // No data could be read without blocking the socket
+ return;
+ }
+
+ AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+ "::recvfrom() failed",
+ errno);
+
+ // In case of UDP we can continue reading from the socket
+ // even if the current request fails. We notify the user
+ // so that he can do some logging/stats collection if he wants.
+ auto cob = readCallback_;
+ readCallback_ = nullptr;
+
+ cob->onReadError(ex);
+ updateRegistration();
+ }
+}
+
+bool AsyncUDPSocket::updateRegistration() noexcept {
+ uint16_t flags = NONE;
+
+ if (readCallback_) {
+ flags |= READ;
+ }
+
+ return registerHandler(flags | PERSIST);
+}
+
+} // Namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/IOBuf.h>
+#include <folly/ScopeGuard.h>
+#include <folly/io/async/AsyncSocketException.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/SocketAddress.h>
+
+#include <memory>
+
+namespace folly {
+
+/**
+ * UDP socket
+ */
+class AsyncUDPSocket : public EventHandler {
+ public:
+ enum class FDOwnership {
+ OWNS,
+ SHARED
+ };
+
+ class ReadCallback {
+ public:
+ /**
+ * Invoked when the socket becomes readable and we want buffer
+ * to write to.
+ *
+ * NOTE: From socket we will end up reading at most `len` bytes
+ * and if there were more bytes in datagram, we will end up
+ * dropping them.
+ */
+ virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0;
+
+ /**
+ * Invoked when a new datagraom is available on the socket. `len`
+ * is the number of bytes read and `truncated` is true if we had
+ * to drop few bytes because of running out of buffer space.
+ */
+ virtual void onDataAvailable(const folly::SocketAddress& client,
+ size_t len,
+ bool truncated) noexcept = 0;
+
+ /**
+ * Invoked when there is an error reading from the socket.
+ *
+ * NOTE: Since UDP is connectionless, you can still read from the socket.
+ * But you have to re-register readCallback yourself after
+ * onReadError.
+ */
+ virtual void onReadError(const AsyncSocketException& ex)
+ noexcept = 0;
+
+ /**
+ * Invoked when socket is closed and a read callback is registered.
+ */
+ virtual void onReadClosed() noexcept = 0;
+
+ virtual ~ReadCallback() {}
+ };
+
+ /**
+ * Create a new UDP socket that will run in the
+ * given eventbase
+ */
+ explicit AsyncUDPSocket(EventBase* evb);
+ ~AsyncUDPSocket();
+
+ /**
+ * Returns the address server is listening on
+ */
+ const folly::SocketAddress& address() const {
+ CHECK_NE(-1, fd_) << "Server not yet bound to an address";
+ return localAddress_;
+ }
+
+ /**
+ * Bind the socket to the following address. If port is not
+ * set in the `address` an ephemeral port is chosen and you can
+ * use `address()` method above to get it after this method successfully
+ * returns.
+ */
+ void bind(const folly::SocketAddress& address);
+
+ /**
+ * Use an already bound file descriptor. You can either transfer ownership
+ * of this FD by using ownership = FDOwnership::OWNS or share it using
+ * FDOwnership::SHARED. In case FD is shared, it will not be `close`d in
+ * destructor.
+ */
+ void setFD(int fd, FDOwnership ownership);
+
+ /**
+ * Send the data in buffer to destination. Returns the return code from
+ * ::sendto.
+ */
+ ssize_t write(const folly::SocketAddress& address,
+ const std::unique_ptr<folly::IOBuf>& buf);
+
+ /**
+ * Start reading datagrams
+ */
+ void resumeRead(ReadCallback* cob);
+
+ /**
+ * Pause reading datagrams
+ */
+ void pauseRead();
+
+ /**
+ * Stop listening on the socket.
+ */
+ void close();
+
+ /**
+ * Get internal FD used by this socket
+ */
+ int getFD() const {
+ CHECK_NE(-1, fd_) << "Need to bind before getting FD out";
+ return fd_;
+ }
+ private:
+ AsyncUDPSocket(const AsyncUDPSocket&) = delete;
+ AsyncUDPSocket& operator=(const AsyncUDPSocket&) = delete;
+
+ // EventHandler
+ void handlerReady(uint16_t events) noexcept;
+
+ void handleRead() noexcept;
+ bool updateRegistration() noexcept;
+
+ EventBase* eventBase_;
+ folly::SocketAddress localAddress_;
+
+ int fd_;
+ FDOwnership ownership_;
+
+ // Temp space to receive client address
+ folly::SocketAddress clientAddress_;
+
+ // Non-null only when we are reading
+ ReadCallback* readCallback_;
+};
+
+} // Namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncUDPSocket.h>
+#include <folly/io/async/AsyncUDPServerSocket.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/SocketAddress.h>
+
+#include <boost/thread/barrier.hpp>
+
+#include <folly/io/IOBuf.h>
+
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using folly::AsyncUDPSocket;
+using folly::AsyncUDPServerSocket;
+using folly::AsyncTimeout;
+using folly::EventBase;
+using folly::SocketAddress;
+using folly::IOBuf;
+
+class UDPAcceptor
+ : public AsyncUDPServerSocket::Callback {
+ public:
+ UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
+ }
+
+ void onListenStarted() noexcept {
+ }
+
+ void onListenStopped() noexcept {
+ }
+
+ void onDataAvailable(const folly::SocketAddress& client,
+ std::unique_ptr<folly::IOBuf> data,
+ bool truncated) noexcept {
+
+ lastClient_ = client;
+ lastMsg_ = data->moveToFbString().toStdString();
+
+ auto len = data->computeChainDataLength();
+ VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
+ << "(trun:" << truncated << ") from " << client.describe()
+ << " - " << lastMsg_;
+
+ sendPong();
+ }
+
+ void sendPong() noexcept {
+ try {
+ AsyncUDPSocket socket(evb_);
+ socket.bind(folly::SocketAddress("127.0.0.1", 0));
+ socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
+ } catch (const std::exception& ex) {
+ VLOG(4) << "Failed to send PONG " << ex.what();
+ }
+ }
+
+ private:
+ EventBase* const evb_{nullptr};
+ const int n_{-1};
+
+ folly::SocketAddress lastClient_;
+ std::string lastMsg_;
+};
+
+class UDPServer {
+ public:
+ UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
+ : evb_(evb), addr_(addr), evbs_(n) {
+ }
+
+ void start() {
+ CHECK(evb_->isInEventBaseThread());
+
+ socket_ = folly::make_unique<AsyncUDPServerSocket>(
+ evb_,
+ 1500);
+
+ try {
+ socket_->bind(addr_);
+ VLOG(4) << "Server listening on " << socket_->address().describe();
+ } catch (const std::exception& ex) {
+ LOG(FATAL) << ex.what();
+ }
+
+ acceptors_.reserve(evbs_.size());
+ threads_.reserve(evbs_.size());
+
+ // Add numWorkers thread
+ int i = 0;
+ for (auto& evb: evbs_) {
+ acceptors_.emplace_back(&evb, i);
+
+ std::thread t([&] () {
+ evb.loopForever();
+ });
+
+ auto r = std::make_shared<boost::barrier>(2);
+ evb.runInEventBaseThread([r] () {
+ r->wait();
+ });
+ r->wait();
+
+ socket_->addListener(&evb, &acceptors_[i]);
+ threads_.emplace_back(std::move(t));
+ ++i;
+ }
+
+ socket_->listen();
+ }
+
+ folly::SocketAddress address() const {
+ return socket_->address();
+ }
+
+ void shutdown() {
+ CHECK(evb_->isInEventBaseThread());
+ socket_->close();
+ socket_.reset();
+
+ for (auto& evb: evbs_) {
+ evb.terminateLoopSoon();
+ }
+
+ for (auto& t: threads_) {
+ t.join();
+ }
+ }
+
+ private:
+ EventBase* const evb_{nullptr};
+ const folly::SocketAddress addr_;
+
+ std::unique_ptr<AsyncUDPServerSocket> socket_;
+ std::vector<std::thread> threads_;
+ std::vector<folly::EventBase> evbs_;
+ std::vector<UDPAcceptor> acceptors_;
+};
+
+class UDPClient
+ : private AsyncUDPSocket::ReadCallback,
+ private AsyncTimeout {
+ public:
+ explicit UDPClient(EventBase* evb)
+ : AsyncTimeout(evb),
+ evb_(evb) {
+ }
+
+ void start(const folly::SocketAddress& server, int n) {
+ CHECK(evb_->isInEventBaseThread());
+
+ server_ = server;
+ socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
+
+ try {
+ socket_->bind(folly::SocketAddress("127.0.0.1", 0));
+ VLOG(4) << "Client bound to " << socket_->address().describe();
+ } catch (const std::exception& ex) {
+ LOG(FATAL) << ex.what();
+ }
+
+ socket_->resumeRead(this);
+
+ n_ = n;
+
+ // Start playing ping pong
+ sendPing();
+ }
+
+ void shutdown() {
+ CHECK(evb_->isInEventBaseThread());
+ socket_->pauseRead();
+ socket_->close();
+ socket_.reset();
+ evb_->terminateLoopSoon();
+ }
+
+ void sendPing() {
+ if (n_ == 0) {
+ shutdown();
+ return;
+ }
+
+ --n_;
+ scheduleTimeout(5);
+ socket_->write(
+ server_,
+ folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
+ }
+
+ void getReadBuffer(void** buf, size_t* len) noexcept {
+ *buf = buf_;
+ *len = 1024;
+ }
+
+ void onDataAvailable(const folly::SocketAddress& client,
+ size_t len,
+ bool truncated) noexcept {
+ VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
+ << client.describe() << " - " << std::string(buf_, len);
+ VLOG(4) << n_ << " left";
+
+ ++pongRecvd_;
+
+ sendPing();
+ }
+
+ void onReadError(const folly::AsyncSocketException& ex) noexcept {
+ VLOG(4) << ex.what();
+
+ // Start listening for next PONG
+ socket_->resumeRead(this);
+ }
+
+ void onReadClosed() noexcept {
+ CHECK(false) << "We unregister reads before closing";
+ }
+
+ void timeoutExpired() noexcept {
+ VLOG(4) << "Timeout expired";
+ sendPing();
+ }
+
+ int pongRecvd() const {
+ return pongRecvd_;
+ }
+
+ private:
+ EventBase* const evb_{nullptr};
+
+ folly::SocketAddress server_;
+ std::unique_ptr<AsyncUDPSocket> socket_;
+
+ int pongRecvd_{0};
+
+ int n_{0};
+ char buf_[1024];
+};
+
+TEST(AsyncSocketTest, PingPong) {
+ folly::EventBase sevb;
+ UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
+ boost::barrier barrier(2);
+
+ // Start event loop in a separate thread
+ auto serverThread = std::thread([&sevb] () {
+ sevb.loopForever();
+ });
+
+ // Wait for event loop to start
+ sevb.runInEventBaseThread([&] () { barrier.wait(); });
+ barrier.wait();
+
+ // Start the server
+ sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
+ barrier.wait();
+
+ folly::EventBase cevb;
+ UDPClient client(&cevb);
+
+ // Start event loop in a separate thread
+ auto clientThread = std::thread([&cevb] () {
+ cevb.loopForever();
+ });
+
+ // Wait for event loop to start
+ cevb.runInEventBaseThread([&] () { barrier.wait(); });
+ barrier.wait();
+
+ // Send ping
+ cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
+
+ // Wait for client to finish
+ clientThread.join();
+
+ // Check that some PING/PONGS were exchanged. Out of 1000 transactions
+ // at least 1 should succeed
+ CHECK_GT(client.pongRecvd(), 0);
+
+ // Shutdown server
+ sevb.runInEventBaseThread([&] () {
+ server.shutdown();
+ sevb.terminateLoopSoon();
+ });
+
+ // Wait for server thread to joib
+ serverThread.join();
+}