AsyncUDPSocket
authorDave Watson <davejwatson@fb.com>
Tue, 25 Nov 2014 16:13:37 +0000 (08:13 -0800)
committerJoelMarcey <joelm@fb.com>
Thu, 18 Dec 2014 20:29:40 +0000 (12:29 -0800)
Summary:
Move AsyncUDPSocket to folly.

There is also one under realtime/voip/async that looks functionaly equivalent?  I think this one is only used in gangplank currently.

Test Plan: contbuild

Reviewed By: hans@fb.com

Subscribers: trunkagent, doug, alandau, bmatheny, njormrod, mshneer, folly-diffs@

FB internal diff: D1710675

Tasks: 5788116

Signature: t1:1710675:1417477000:9aebb466757554a5fa49d7c36cb504b4d8711b68

folly/Makefile.am
folly/io/async/AsyncSocketException.h
folly/io/async/AsyncUDPServerSocket.h [new file with mode: 0644]
folly/io/async/AsyncUDPSocket.cpp [new file with mode: 0644]
folly/io/async/AsyncUDPSocket.h [new file with mode: 0644]
folly/io/async/test/AsyncUDPSocketTest.cpp [new file with mode: 0644]

index bd5cc48dd1532f3e7255653cff63904215cc7c38..4f2d8da708daeedf817edebb93b862bb44944b98 100644 (file)
@@ -156,6 +156,8 @@ nobase_follyinclude_HEADERS = \
        io/ShutdownSocketSet.h \
        io/async/AsyncTimeout.h \
        io/async/AsyncTransport.h \
+       io/async/AsyncUDPServerSocket.h \
+       io/async/AsyncUDPSocket.h \
        io/async/AsyncServerSocket.h \
        io/async/AsyncSSLServerSocket.h \
        io/async/AsyncSocket.h \
@@ -292,6 +294,7 @@ libfolly_la_SOURCES = \
        io/RecordIO.cpp \
        io/ShutdownSocketSet.cpp \
        io/async/AsyncTimeout.cpp \
+       io/async/AsyncUDPSocket.cpp \
        io/async/AsyncServerSocket.cpp \
        io/async/AsyncSSLServerSocket.cpp \
        io/async/AsyncSocket.cpp \
index 762e9bc021d8c99d234b8778a58bfb5f9f71ccd8..f18bfed9f1f0f5aa17c5e3baaeb9f42e058c723e 100644 (file)
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <folly/Format.h>
 #include <folly/io/async/DelayedDestruction.h>
 
 namespace folly {
diff --git a/folly/io/async/AsyncUDPServerSocket.h b/folly/io/async/AsyncUDPServerSocket.h
new file mode 100644 (file)
index 0000000..f293533
--- /dev/null
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/MoveWrapper.h>
+#include <folly/io/IOBufQueue.h>
+#include <folly/Memory.h>
+#include <folly/io/async/AsyncUDPSocket.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly {
+
+/**
+ * UDP server socket
+ *
+ * It wraps a UDP socket waiting for packets and distributes them among
+ * a set of event loops in round robin fashion.
+ *
+ * NOTE: At the moment it is designed to work with single packet protocols
+ *       in mind. We distribute incoming packets among all the listeners in
+ *       round-robin fashion. So, any protocol that expects to send/recv
+ *       more than 1 packet will not work because they will end up with
+ *       different event base to process.
+ */
+class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
+ public:
+  class Callback {
+   public:
+    /**
+     * Invoked when we start reading data from socket. It is invoked in
+     * each acceptors/listeners event base thread.
+     */
+     virtual void onListenStarted() noexcept = 0;
+
+    /**
+     * Invoked when the server socket is closed. It is invoked in each
+     * acceptors/listeners event base thread.
+     */
+     virtual void onListenStopped() noexcept = 0;
+
+    /**
+     * Invoked when a new packet is received
+     */
+    virtual void onDataAvailable(const folly::SocketAddress& addr,
+                                 std::unique_ptr<folly::IOBuf> buf,
+                                 bool truncated) noexcept = 0;
+
+    virtual ~Callback() {}
+  };
+
+  /**
+   * Create a new UDP server socket
+   *
+   * Note about packet size - We allocate buffer of packetSize_ size to read.
+   * If packet are larger than this value, as per UDP protocol, remaining data
+   * is dropped and you get `truncated = true` in onDataAvailable callback
+   */
+  explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
+      : evb_(evb),
+        packetSize_(sz),
+        nextListener_(0) {
+  }
+
+  ~AsyncUDPServerSocket() {
+    if (socket_) {
+      close();
+    }
+  }
+
+  void bind(const folly::SocketAddress& address) {
+    CHECK(!socket_);
+
+    socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
+    socket_->bind(address);
+  }
+
+  folly::SocketAddress address() const {
+    CHECK(socket_);
+    return socket_->address();
+  }
+
+  /**
+   * Add a listener to the round robin list
+   */
+  void addListener(EventBase* evb, Callback* callback) {
+    listeners_.emplace_back(evb, callback);
+  }
+
+  void listen() {
+    CHECK(socket_) << "Need to bind before listening";
+
+    for (auto& listener: listeners_) {
+      auto callback = listener.second;
+
+      listener.first->runInEventBaseThread([callback] () mutable {
+        callback->onListenStarted();
+      });
+    }
+
+    socket_->resumeRead(this);
+  }
+
+  int getFD() {
+    CHECK(socket_) << "Need to bind before getting FD";
+    return socket_->getFD();
+  }
+
+  void close() {
+    CHECK(socket_) << "Need to bind before closing";
+    socket_.reset();
+  }
+
+ private:
+  // AsyncUDPSocket::ReadCallback
+  void getReadBuffer(void** buf, size_t* len) noexcept {
+    std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
+  }
+
+  void onDataAvailable(const folly::SocketAddress& clientAddress,
+                       size_t len,
+                       bool truncated) noexcept {
+    buf_.postallocate(len);
+    auto data = buf_.split(len);
+
+    if (listeners_.empty()) {
+      LOG(WARNING) << "UDP server socket dropping packet, "
+                   << "no listener registered";
+      return;
+    }
+
+    if (nextListener_ >= listeners_.size()) {
+      nextListener_ = 0;
+    }
+
+    auto client = clientAddress;
+    auto callback = listeners_[nextListener_].second;
+    auto mvp =
+        folly::MoveWrapper<
+            std::unique_ptr<folly::IOBuf>>(std::move(data));
+
+    // Schedule it in the listener's eventbase
+    // XXX: Speed this up
+    std::function<void()> f = [client, callback, mvp, truncated] () mutable {
+      callback->onDataAvailable(client, std::move(*mvp), truncated);
+    };
+
+    listeners_[nextListener_].first->runInEventBaseThread(f);
+    ++nextListener_;
+  }
+
+  void onReadError(const AsyncSocketException& ex) noexcept {
+    LOG(ERROR) << ex.what();
+
+    // Lets register to continue listening for packets
+    socket_->resumeRead(this);
+  }
+
+  void onReadClosed() noexcept {
+    for (auto& listener: listeners_) {
+      auto callback = listener.second;
+
+      listener.first->runInEventBaseThread([callback] () mutable {
+        callback->onListenStopped();
+      });
+    }
+  }
+
+  EventBase* const evb_;
+  const size_t packetSize_;
+
+  std::unique_ptr<AsyncUDPSocket> socket_;
+
+  // List of listener to distribute packets among
+  typedef std::pair<EventBase*, Callback*> Listener;
+  std::vector<Listener> listeners_;
+
+  // Next listener to send packet to
+  uint32_t nextListener_;
+
+  // Temporary buffer for data
+  folly::IOBufQueue buf_;
+};
+
+} // Namespace
diff --git a/folly/io/async/AsyncUDPSocket.cpp b/folly/io/async/AsyncUDPSocket.cpp
new file mode 100644 (file)
index 0000000..979c2b7
--- /dev/null
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncUDPSocket.h>
+
+#include <folly/io/async/EventBase.h>
+
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+namespace folly {
+
+AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
+    : EventHandler(CHECK_NOTNULL(evb)),
+      eventBase_(evb),
+      fd_(-1),
+      readCallback_(nullptr) {
+  DCHECK(evb->isInEventBaseThread());
+}
+
+AsyncUDPSocket::~AsyncUDPSocket() {
+  if (fd_ != -1) {
+    close();
+  }
+}
+
+void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
+  int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
+  if (socket == -1) {
+    throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                              "error creating async udp socket",
+                              errno);
+  }
+
+  auto g = folly::makeGuard([&] { ::close(socket); });
+
+  // put the socket in non-blocking mode
+  int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
+  if (ret != 0) {
+    throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                              "failed to put socket in non-blocking mode",
+                              errno);
+  }
+
+  // put the socket in reuse mode
+  int value = 1;
+  if (setsockopt(socket,
+                 SOL_SOCKET,
+                 SO_REUSEADDR,
+                 &value,
+                 sizeof(value)) != 0) {
+    throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                              "failed to put socket in reuse mode",
+                              errno);
+  }
+
+  // bind to the address
+  sockaddr_storage addrStorage;
+  address.getAddress(&addrStorage);
+  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+  if (::bind(socket, saddr, address.getActualSize()) != 0) {
+    throw AsyncSocketException(
+        AsyncSocketException::NOT_OPEN,
+        "failed to bind the async udp socket for:" + address.describe(),
+        errno);
+  }
+
+  // success
+  g.dismiss();
+  fd_ = socket;
+  ownership_ = FDOwnership::OWNS;
+
+  // attach to EventHandler
+  EventHandler::changeHandlerFD(fd_);
+
+  if (address.getPort() != 0) {
+    localAddress_ = address;
+  } else {
+    localAddress_.setFromLocalAddress(fd_);
+  }
+}
+
+void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
+  CHECK_EQ(-1, fd_) << "Already bound to another FD";
+
+  fd_ = fd;
+  ownership_ = ownership;
+
+  EventHandler::changeHandlerFD(fd_);
+  localAddress_.setFromLocalAddress(fd_);
+}
+
+ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
+                               const std::unique_ptr<folly::IOBuf>& buf) {
+  CHECK_NE(-1, fd_) << "Socket not yet bound";
+
+  // XXX: Use `sendmsg` instead of coalescing here
+  buf->coalesce();
+
+  sockaddr_storage addrStorage;
+  address.getAddress(&addrStorage);
+  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+
+  return ::sendto(fd_,
+                  buf->data(),
+                  buf->length(),
+                  MSG_DONTWAIT,
+                  saddr,
+                  address.getActualSize());
+}
+
+void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
+  CHECK(!readCallback_) << "Another read callback already installed";
+  CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
+
+  readCallback_ = CHECK_NOTNULL(cob);
+  if (!updateRegistration()) {
+    AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+                           "failed to register for accept events");
+
+    readCallback_ = nullptr;
+    cob->onReadError(ex);
+    return;
+  }
+}
+
+void AsyncUDPSocket::pauseRead() {
+  // It is ok to pause an already paused socket
+  readCallback_ = nullptr;
+  updateRegistration();
+}
+
+void AsyncUDPSocket::close() {
+  DCHECK(eventBase_->isInEventBaseThread());
+
+  if (readCallback_) {
+    auto cob = readCallback_;
+    readCallback_ = nullptr;
+
+    cob->onReadClosed();
+  }
+
+  // Unregister any events we are registered for
+  unregisterHandler();
+
+  if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
+    ::close(fd_);
+  }
+
+  fd_ = -1;
+}
+
+void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
+  if (events & EventHandler::READ) {
+    DCHECK(readCallback_);
+    handleRead();
+  }
+}
+
+void AsyncUDPSocket::handleRead() noexcept {
+  void* buf{nullptr};
+  size_t len{0};
+
+  readCallback_->getReadBuffer(&buf, &len);
+  if (buf == nullptr || len == 0) {
+    AsyncSocketException ex(
+        AsyncSocketException::BAD_ARGS,
+        "AsyncUDPSocket::getReadBuffer() returned empty buffer");
+
+
+    auto cob = readCallback_;
+    readCallback_ = nullptr;
+
+    cob->onReadError(ex);
+    updateRegistration();
+    return;
+  }
+
+  struct sockaddr_storage addrStorage;
+  socklen_t addrLen = sizeof(addrStorage);
+  memset(&addrStorage, 0, addrLen);
+  struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
+  rawAddr->sa_family = localAddress_.getFamily();
+
+  ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
+  if (bytesRead >= 0) {
+    clientAddress_.setFromSockaddr(rawAddr, addrLen);
+
+    if (bytesRead > 0) {
+      bool truncated = false;
+      if ((size_t)bytesRead > len) {
+        truncated = true;
+        bytesRead = len;
+      }
+
+      readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
+    }
+  } else {
+    if (errno == EAGAIN || errno == EWOULDBLOCK) {
+      // No data could be read without blocking the socket
+      return;
+    }
+
+    AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                           "::recvfrom() failed",
+                           errno);
+
+    // In case of UDP we can continue reading from the socket
+    // even if the current request fails. We notify the user
+    // so that he can do some logging/stats collection if he wants.
+    auto cob = readCallback_;
+    readCallback_ = nullptr;
+
+    cob->onReadError(ex);
+    updateRegistration();
+  }
+}
+
+bool AsyncUDPSocket::updateRegistration() noexcept {
+  uint16_t flags = NONE;
+
+  if (readCallback_) {
+    flags |= READ;
+  }
+
+  return registerHandler(flags | PERSIST);
+}
+
+} // Namespace
diff --git a/folly/io/async/AsyncUDPSocket.h b/folly/io/async/AsyncUDPSocket.h
new file mode 100644 (file)
index 0000000..1c5d3f0
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/IOBuf.h>
+#include <folly/ScopeGuard.h>
+#include <folly/io/async/AsyncSocketException.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/SocketAddress.h>
+
+#include <memory>
+
+namespace folly {
+
+/**
+ * UDP socket
+ */
+class AsyncUDPSocket : public EventHandler {
+ public:
+  enum class FDOwnership {
+    OWNS,
+    SHARED
+  };
+
+  class ReadCallback {
+   public:
+    /**
+     * Invoked when the socket becomes readable and we want buffer
+     * to write to.
+     *
+     * NOTE: From socket we will end up reading at most `len` bytes
+     *       and if there were more bytes in datagram, we will end up
+     *       dropping them.
+     */
+     virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0;
+
+    /**
+     * Invoked when a new datagraom is available on the socket. `len`
+     * is the number of bytes read and `truncated` is true if we had
+     * to drop few bytes because of running out of buffer space.
+     */
+    virtual void onDataAvailable(const folly::SocketAddress& client,
+                                 size_t len,
+                                 bool truncated) noexcept = 0;
+
+    /**
+     * Invoked when there is an error reading from the socket.
+     *
+     * NOTE: Since UDP is connectionless, you can still read from the socket.
+     *       But you have to re-register readCallback yourself after
+     *       onReadError.
+     */
+    virtual void onReadError(const AsyncSocketException& ex)
+        noexcept = 0;
+
+    /**
+     * Invoked when socket is closed and a read callback is registered.
+     */
+    virtual void onReadClosed() noexcept = 0;
+
+    virtual ~ReadCallback() {}
+  };
+
+  /**
+   * Create a new UDP socket that will run in the
+   * given eventbase
+   */
+  explicit AsyncUDPSocket(EventBase* evb);
+  ~AsyncUDPSocket();
+
+  /**
+   * Returns the address server is listening on
+   */
+  const folly::SocketAddress& address() const {
+    CHECK_NE(-1, fd_) << "Server not yet bound to an address";
+    return localAddress_;
+  }
+
+  /**
+   * Bind the socket to the following address. If port is not
+   * set in the `address` an ephemeral port is chosen and you can
+   * use `address()` method above to get it after this method successfully
+   * returns.
+   */
+  void bind(const folly::SocketAddress& address);
+
+  /**
+   * Use an already bound file descriptor. You can either transfer ownership
+   * of this FD by using ownership = FDOwnership::OWNS or share it using
+   * FDOwnership::SHARED. In case FD is shared, it will not be `close`d in
+   * destructor.
+   */
+  void setFD(int fd, FDOwnership ownership);
+
+  /**
+   * Send the data in buffer to destination. Returns the return code from
+   * ::sendto.
+   */
+  ssize_t write(const folly::SocketAddress& address,
+                const std::unique_ptr<folly::IOBuf>& buf);
+
+  /**
+   * Start reading datagrams
+   */
+  void resumeRead(ReadCallback* cob);
+
+  /**
+   * Pause reading datagrams
+   */
+  void pauseRead();
+
+  /**
+   * Stop listening on the socket.
+   */
+  void close();
+
+  /**
+   * Get internal FD used by this socket
+   */
+  int getFD() const {
+    CHECK_NE(-1, fd_) << "Need to bind before getting FD out";
+    return fd_;
+  }
+ private:
+  AsyncUDPSocket(const AsyncUDPSocket&) = delete;
+  AsyncUDPSocket& operator=(const AsyncUDPSocket&) = delete;
+
+  // EventHandler
+  void handlerReady(uint16_t events) noexcept;
+
+  void handleRead() noexcept;
+  bool updateRegistration() noexcept;
+
+  EventBase* eventBase_;
+  folly::SocketAddress localAddress_;
+
+  int fd_;
+  FDOwnership ownership_;
+
+  // Temp space to receive client address
+  folly::SocketAddress clientAddress_;
+
+  // Non-null only when we are reading
+  ReadCallback* readCallback_;
+};
+
+} // Namespace
diff --git a/folly/io/async/test/AsyncUDPSocketTest.cpp b/folly/io/async/test/AsyncUDPSocketTest.cpp
new file mode 100644 (file)
index 0000000..feadc49
--- /dev/null
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncUDPSocket.h>
+#include <folly/io/async/AsyncUDPServerSocket.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/SocketAddress.h>
+
+#include <boost/thread/barrier.hpp>
+
+#include <folly/io/IOBuf.h>
+
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using folly::AsyncUDPSocket;
+using folly::AsyncUDPServerSocket;
+using folly::AsyncTimeout;
+using folly::EventBase;
+using folly::SocketAddress;
+using folly::IOBuf;
+
+class UDPAcceptor
+    : public AsyncUDPServerSocket::Callback {
+ public:
+  UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
+  }
+
+  void onListenStarted() noexcept {
+  }
+
+  void onListenStopped() noexcept {
+  }
+
+  void onDataAvailable(const folly::SocketAddress& client,
+                       std::unique_ptr<folly::IOBuf> data,
+                       bool truncated) noexcept {
+
+    lastClient_ = client;
+    lastMsg_ = data->moveToFbString().toStdString();
+
+    auto len = data->computeChainDataLength();
+    VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
+            << "(trun:" << truncated << ") from " << client.describe()
+            << " - " << lastMsg_;
+
+    sendPong();
+  }
+
+  void sendPong() noexcept {
+    try {
+      AsyncUDPSocket socket(evb_);
+      socket.bind(folly::SocketAddress("127.0.0.1", 0));
+      socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
+    } catch (const std::exception& ex) {
+      VLOG(4) << "Failed to send PONG " << ex.what();
+    }
+  }
+
+ private:
+  EventBase* const evb_{nullptr};
+  const int n_{-1};
+
+  folly::SocketAddress lastClient_;
+  std::string lastMsg_;
+};
+
+class UDPServer {
+ public:
+  UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
+      : evb_(evb), addr_(addr), evbs_(n) {
+  }
+
+  void start() {
+    CHECK(evb_->isInEventBaseThread());
+
+    socket_ = folly::make_unique<AsyncUDPServerSocket>(
+        evb_,
+        1500);
+
+    try {
+      socket_->bind(addr_);
+      VLOG(4) << "Server listening on " << socket_->address().describe();
+    } catch (const std::exception& ex) {
+      LOG(FATAL) << ex.what();
+    }
+
+    acceptors_.reserve(evbs_.size());
+    threads_.reserve(evbs_.size());
+
+    // Add numWorkers thread
+    int i = 0;
+    for (auto& evb: evbs_) {
+      acceptors_.emplace_back(&evb, i);
+
+      std::thread t([&] () {
+        evb.loopForever();
+      });
+
+      auto r = std::make_shared<boost::barrier>(2);
+      evb.runInEventBaseThread([r] () {
+        r->wait();
+      });
+      r->wait();
+
+      socket_->addListener(&evb, &acceptors_[i]);
+      threads_.emplace_back(std::move(t));
+      ++i;
+    }
+
+    socket_->listen();
+  }
+
+  folly::SocketAddress address() const {
+    return socket_->address();
+  }
+
+  void shutdown() {
+    CHECK(evb_->isInEventBaseThread());
+    socket_->close();
+    socket_.reset();
+
+    for (auto& evb: evbs_) {
+      evb.terminateLoopSoon();
+    }
+
+    for (auto& t: threads_) {
+      t.join();
+    }
+  }
+
+ private:
+  EventBase* const evb_{nullptr};
+  const folly::SocketAddress addr_;
+
+  std::unique_ptr<AsyncUDPServerSocket> socket_;
+  std::vector<std::thread> threads_;
+  std::vector<folly::EventBase> evbs_;
+  std::vector<UDPAcceptor> acceptors_;
+};
+
+class UDPClient
+    : private AsyncUDPSocket::ReadCallback,
+      private AsyncTimeout {
+ public:
+  explicit UDPClient(EventBase* evb)
+      : AsyncTimeout(evb),
+        evb_(evb) {
+  }
+
+  void start(const folly::SocketAddress& server, int n) {
+    CHECK(evb_->isInEventBaseThread());
+
+    server_ = server;
+    socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
+
+    try {
+      socket_->bind(folly::SocketAddress("127.0.0.1", 0));
+      VLOG(4) << "Client bound to " << socket_->address().describe();
+    } catch (const std::exception& ex) {
+      LOG(FATAL) << ex.what();
+    }
+
+    socket_->resumeRead(this);
+
+    n_ = n;
+
+    // Start playing ping pong
+    sendPing();
+  }
+
+  void shutdown() {
+    CHECK(evb_->isInEventBaseThread());
+    socket_->pauseRead();
+    socket_->close();
+    socket_.reset();
+    evb_->terminateLoopSoon();
+  }
+
+  void sendPing() {
+    if (n_ == 0) {
+      shutdown();
+      return;
+    }
+
+    --n_;
+    scheduleTimeout(5);
+    socket_->write(
+        server_,
+        folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
+  }
+
+  void getReadBuffer(void** buf, size_t* len) noexcept {
+    *buf = buf_;
+    *len = 1024;
+  }
+
+  void onDataAvailable(const folly::SocketAddress& client,
+                       size_t len,
+                       bool truncated) noexcept {
+    VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
+              << client.describe() << " - " << std::string(buf_, len);
+    VLOG(4) << n_ << " left";
+
+    ++pongRecvd_;
+
+    sendPing();
+  }
+
+  void onReadError(const folly::AsyncSocketException& ex) noexcept {
+    VLOG(4) << ex.what();
+
+    // Start listening for next PONG
+    socket_->resumeRead(this);
+  }
+
+  void onReadClosed() noexcept {
+    CHECK(false) << "We unregister reads before closing";
+  }
+
+  void timeoutExpired() noexcept {
+    VLOG(4) << "Timeout expired";
+    sendPing();
+  }
+
+  int pongRecvd() const {
+    return pongRecvd_;
+  }
+
+ private:
+  EventBase* const evb_{nullptr};
+
+  folly::SocketAddress server_;
+  std::unique_ptr<AsyncUDPSocket> socket_;
+
+  int pongRecvd_{0};
+
+  int n_{0};
+  char buf_[1024];
+};
+
+TEST(AsyncSocketTest, PingPong) {
+  folly::EventBase sevb;
+  UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
+  boost::barrier barrier(2);
+
+  // Start event loop in a separate thread
+  auto serverThread = std::thread([&sevb] () {
+    sevb.loopForever();
+  });
+
+  // Wait for event loop to start
+  sevb.runInEventBaseThread([&] () { barrier.wait(); });
+  barrier.wait();
+
+  // Start the server
+  sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
+  barrier.wait();
+
+  folly::EventBase cevb;
+  UDPClient client(&cevb);
+
+  // Start event loop in a separate thread
+  auto clientThread = std::thread([&cevb] () {
+    cevb.loopForever();
+  });
+
+  // Wait for event loop to start
+  cevb.runInEventBaseThread([&] () { barrier.wait(); });
+  barrier.wait();
+
+  // Send ping
+  cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
+
+  // Wait for client to finish
+  clientThread.join();
+
+  // Check that some PING/PONGS were exchanged. Out of 1000 transactions
+  // at least 1 should succeed
+  CHECK_GT(client.pongRecvd(), 0);
+
+  // Shutdown server
+  sevb.runInEventBaseThread([&] () {
+    server.shutdown();
+    sevb.terminateLoopSoon();
+  });
+
+  // Wait for server thread to joib
+  serverThread.join();
+}