2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/io/async/DelayedDestruction.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/NotificationQueue.h>
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/AsyncSocketBase.h>
25 #include <folly/io/ShutdownSocketSet.h>
26 #include <folly/SocketAddress.h>
32 #include <sys/socket.h>
35 // Due to the way kernel headers are included, this may or may not be defined.
36 // Number pulled from 3.10 kernel headers.
38 #define SO_REUSEPORT 15
44 * A listening socket that asynchronously informs a callback whenever a new
45 * connection has been accepted.
47 * Unlike most async interfaces that always invoke their callback in the same
48 * EventBase thread, AsyncServerSocket is unusual in that it can distribute
49 * the callbacks across multiple EventBase threads.
51 * This supports a common use case for network servers to distribute incoming
52 * connections across a number of EventBase threads. (Servers typically run
53 * with one EventBase thread per CPU.)
55 * Despite being able to invoke callbacks in multiple EventBase threads,
56 * AsyncServerSocket still has one "primary" EventBase. Operations that
57 * modify the AsyncServerSocket state may only be performed from the primary
60 class AsyncServerSocket : public DelayedDestruction
61 , public AsyncSocketBase {
63 typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
64 // Disallow copy, move, and default construction.
65 AsyncServerSocket(AsyncServerSocket&&) = delete;
67 class AcceptCallback {
69 virtual ~AcceptCallback() = default;
72 * connectionAccepted() is called whenever a new client connection is
75 * The AcceptCallback will remain installed after connectionAccepted()
78 * @param fd The newly accepted client socket. The AcceptCallback
79 * assumes ownership of this socket, and is responsible
80 * for closing it when done. The newly accepted file
81 * descriptor will have already been put into
83 * @param clientAddr A reference to a SocketAddress struct containing the
84 * client's address. This struct is only guaranteed to
85 * remain valid until connectionAccepted() returns.
87 virtual void connectionAccepted(int fd,
88 const SocketAddress& clientAddr)
92 * acceptError() is called if an error occurs while accepting.
94 * The AcceptCallback will remain installed even after an accept error,
95 * as the errors are typically somewhat transient, such as being out of
96 * file descriptors. The server socket must be explicitly stopped if you
97 * wish to stop accepting after an error.
99 * @param ex An exception representing the error.
101 virtual void acceptError(const std::exception& ex) noexcept = 0;
104 * acceptStarted() will be called in the callback's EventBase thread
105 * after this callback has been added to the AsyncServerSocket.
107 * acceptStarted() will be called before any calls to connectionAccepted()
108 * or acceptError() are made on this callback.
110 * acceptStarted() makes it easier for callbacks to perform initialization
111 * inside the callback thread. (The call to addAcceptCallback() must
112 * always be made from the AsyncServerSocket's primary EventBase thread.
113 * acceptStarted() provides a hook that will always be invoked in the
114 * callback's thread.)
116 * Note that the call to acceptStarted() is made once the callback is
117 * added, regardless of whether or not the AsyncServerSocket is actually
118 * accepting at the moment. acceptStarted() will be called even if the
119 * AsyncServerSocket is paused when the callback is added (including if
120 * the initial call to startAccepting() on the AsyncServerSocket has not
123 virtual void acceptStarted() noexcept {}
126 * acceptStopped() will be called when this AcceptCallback is removed from
127 * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
128 * whichever occurs first.
130 * No more calls to connectionAccepted() or acceptError() will be made
131 * after acceptStopped() is invoked.
133 virtual void acceptStopped() noexcept {}
136 static const uint32_t kDefaultMaxAcceptAtOnce = 30;
137 static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
138 static const uint32_t kDefaultMaxMessagesInQueue = 0;
140 * Create a new AsyncServerSocket with the specified EventBase.
142 * @param eventBase The EventBase to use for driving the asynchronous I/O.
143 * If this parameter is nullptr, attachEventBase() must be
144 * called before this socket can begin accepting
147 explicit AsyncServerSocket(EventBase* eventBase = nullptr);
150 * Helper function to create a shared_ptr<AsyncServerSocket>.
152 * This passes in the correct destructor object, since AsyncServerSocket's
153 * destructor is protected and cannot be invoked directly.
155 static std::shared_ptr<AsyncServerSocket>
156 newSocket(EventBase* evb = nullptr) {
157 return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
161 void setShutdownSocketSet(ShutdownSocketSet* newSS);
164 * Destroy the socket.
166 * AsyncServerSocket::destroy() must be called to destroy the socket.
167 * The normal destructor is private, and should not be invoked directly.
168 * This prevents callers from deleting a AsyncServerSocket while it is
169 * invoking a callback.
171 * destroy() must be invoked from the socket's primary EventBase thread.
173 * If there are AcceptCallbacks still installed when destroy() is called,
174 * acceptStopped() will be called on these callbacks to notify them that
175 * accepting has stopped. Accept callbacks being driven by other EventBase
176 * threads may continue to receive new accept callbacks for a brief period of
177 * time after destroy() returns. They will not receive any more callback
178 * invocations once acceptStopped() is invoked.
180 virtual void destroy();
183 * Attach this AsyncServerSocket to its primary EventBase.
185 * This may only be called if the AsyncServerSocket is not already attached
186 * to a EventBase. The AsyncServerSocket must be attached to a EventBase
187 * before it can begin accepting connections.
189 void attachEventBase(EventBase *eventBase);
192 * Detach the AsyncServerSocket from its primary EventBase.
194 * detachEventBase() may only be called if the AsyncServerSocket is not
195 * currently accepting connections.
197 void detachEventBase();
200 * Get the EventBase used by this socket.
202 EventBase* getEventBase() const {
207 * Create a AsyncServerSocket from an existing socket file descriptor.
209 * useExistingSocket() will cause the AsyncServerSocket to take ownership of
210 * the specified file descriptor, and use it to listen for new connections.
211 * The AsyncServerSocket will close the file descriptor when it is
214 * useExistingSocket() must be called before bind() or listen().
216 * The supplied file descriptor will automatically be put into non-blocking
217 * mode. The caller may have already directly called bind() and possibly
218 * listen on the file descriptor. If so the caller should skip calling the
219 * corresponding AsyncServerSocket::bind() and listen() methods.
221 * On error a TTransportException will be thrown and the caller will retain
222 * ownership of the file descriptor.
224 void useExistingSocket(int fd);
225 void useExistingSockets(const std::vector<int>& fds);
228 * Return the underlying file descriptor
230 std::vector<int> getSockets() const {
231 std::vector<int> sockets;
232 for (auto& handler : sockets_) {
233 sockets.push_back(handler.socket_);
239 * Backwards compatible getSocket, warns if > 1 socket
241 int getSocket() const {
242 if (sockets_.size() > 1) {
243 VLOG(2) << "Warning: getSocket can return multiple fds, " <<
244 "but getSockets was not called, so only returning the first";
246 if (sockets_.size() == 0) {
249 return sockets_[0].socket_;
254 * Bind to the specified address.
256 * This must be called from the primary EventBase thread.
258 * Throws TTransportException on error.
260 virtual void bind(const SocketAddress& address);
263 * Bind to the specified port for the specified addresses.
265 * This must be called from the primary EventBase thread.
267 * Throws TTransportException on error.
270 const std::vector<IPAddress>& ipAddresses,
274 * Bind to the specified port.
276 * This must be called from the primary EventBase thread.
278 * Throws TTransportException on error.
280 virtual void bind(uint16_t port);
283 * Get the local address to which the socket is bound.
285 * Throws TTransportException on error.
287 void getAddress(SocketAddress* addressReturn) const;
290 * Get all the local addresses to which the socket is bound.
292 * Throws TTransportException on error.
294 std::vector<SocketAddress> getAddresses() const;
297 * Begin listening for connections.
299 * This calls ::listen() with the specified backlog.
301 * Once listen() is invoked the socket will actually be open so that remote
302 * clients may establish connections. (Clients that attempt to connect
303 * before listen() is called will receive a connection refused error.)
305 * At least one callback must be set and startAccepting() must be called to
306 * actually begin notifying the accept callbacks of newly accepted
307 * connections. The backlog parameter controls how many connections the
308 * kernel will accept and buffer internally while the accept callbacks are
309 * paused (or if accepting is enabled but the callbacks cannot keep up).
311 * bind() must be called before calling listen().
312 * listen() must be called from the primary EventBase thread.
314 * Throws TTransportException on error.
316 virtual void listen(int backlog);
319 * Add an AcceptCallback.
321 * When a new socket is accepted, one of the AcceptCallbacks will be invoked
322 * with the new socket. The AcceptCallbacks are invoked in a round-robin
323 * fashion. This allows the accepted sockets to distributed among a pool of
324 * threads, each running its own EventBase object. This is a common model,
325 * since most asynchronous-style servers typically run one EventBase thread
328 * The EventBase object associated with each AcceptCallback must be running
329 * its loop. If the EventBase loop is not running, sockets will still be
330 * scheduled for the callback, but the callback cannot actually get invoked
331 * until the loop runs.
333 * This method must be invoked from the AsyncServerSocket's primary
336 * Note that startAccepting() must be called on the AsyncServerSocket to
337 * cause it to actually start accepting sockets once callbacks have been
340 * @param callback The callback to invoke.
341 * @param eventBase The EventBase to use to invoke the callback. This
342 * parameter may be nullptr, in which case the callback will be invoked in
343 * the AsyncServerSocket's primary EventBase.
344 * @param maxAtOnce The maximum number of connections to accept in this
345 * callback on a single iteration of the event base loop.
346 * This only takes effect when eventBase is non-nullptr.
347 * When using a nullptr eventBase for the callback, the
348 * setMaxAcceptAtOnce() method controls how many
349 * connections the main event base will accept at once.
351 virtual void addAcceptCallback(
352 AcceptCallback *callback,
353 EventBase *eventBase,
354 uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
357 * Remove an AcceptCallback.
359 * This allows a single AcceptCallback to be removed from the round-robin
362 * This method must be invoked from the AsyncServerSocket's primary
363 * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the
364 * operation in the correct EventBase if your code is not in the server
365 * socket's primary EventBase.
367 * Given that the accept callback is being driven by a different EventBase,
368 * the AcceptCallback may continue to be invoked for a short period of time
369 * after removeAcceptCallback() returns in this thread. Once the other
370 * EventBase thread receives the notification to stop, it will call
371 * acceptStopped() on the callback to inform it that it is fully stopped and
372 * will not receive any new sockets.
374 * If the last accept callback is removed while the socket is accepting,
375 * the socket will implicitly pause accepting. If a callback is later added,
376 * it will resume accepting immediately, without requiring startAccepting()
379 * @param callback The callback to uninstall.
380 * @param eventBase The EventBase associated with this callback. This must
381 * be the same EventBase that was used when the callback was installed
382 * with addAcceptCallback().
384 void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
387 * Begin accepting connctions on this socket.
389 * bind() and listen() must be called before calling startAccepting().
391 * When a AsyncServerSocket is initially created, it will not begin
392 * accepting connections until at least one callback has been added and
393 * startAccepting() has been called. startAccepting() can also be used to
394 * resume accepting connections after a call to pauseAccepting().
396 * If startAccepting() is called when there are no accept callbacks
397 * installed, the socket will not actually begin accepting until an accept
400 * This method may only be called from the primary EventBase thread.
402 virtual void startAccepting();
405 * Pause accepting connections.
407 * startAccepting() may be called to resume accepting.
409 * This method may only be called from the primary EventBase thread.
410 * If there are AcceptCallbacks being driven by other EventBase threads they
411 * may continue to receive callbacks for a short period of time after
412 * pauseAccepting() returns.
414 * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
415 * called on the AcceptCallback objects simply due to a temporary pause. If
416 * the server socket is later destroyed while paused, acceptStopped() will be
417 * called all of the installed AcceptCallbacks.
419 void pauseAccepting();
422 * Shutdown the listen socket and notify all callbacks that accept has
423 * stopped, but don't close the socket. This invokes shutdown(2) with the
424 * supplied argument. Passing -1 will close the socket now. Otherwise, the
425 * close will be delayed until this object is destroyed.
427 * Only use this if you have reason to pass special flags to shutdown.
428 * Otherwise just destroy the socket.
430 * This method has no effect when a ShutdownSocketSet option is used.
432 * Returns the result of shutdown on sockets_[n-1]
434 int stopAccepting(int shutdownFlags = -1);
437 * Get the maximum number of connections that will be accepted each time
438 * around the event loop.
440 uint32_t getMaxAcceptAtOnce() const {
441 return maxAcceptAtOnce_;
445 * Set the maximum number of connections that will be accepted each time
446 * around the event loop.
448 * This provides a very coarse-grained way of controlling how fast the
449 * AsyncServerSocket will accept connections. If you find that when your
450 * server is overloaded AsyncServerSocket accepts connections more quickly
451 * than your code can process them, you can try lowering this number so that
452 * fewer connections will be accepted each event loop iteration.
454 * For more explicit control over the accept rate, you can also use
455 * pauseAccepting() to temporarily pause accepting when your server is
456 * overloaded, and then use startAccepting() later to resume accepting.
458 void setMaxAcceptAtOnce(uint32_t numConns) {
459 maxAcceptAtOnce_ = numConns;
463 * Get the maximum number of unprocessed messages which a NotificationQueue
466 uint32_t getMaxNumMessagesInQueue() const {
467 return maxNumMsgsInQueue_;
471 * Set the maximum number of unprocessed messages in NotificationQueue.
472 * No new message will be sent to that NotificationQueue if there are more
473 * than such number of unprocessed messages in that queue.
475 * Only works if called before addAcceptCallback.
477 void setMaxNumMessagesInQueue(uint32_t num) {
478 maxNumMsgsInQueue_ = num;
482 * Get the speed of adjusting connection accept rate.
484 double getAcceptRateAdjustSpeed() const {
485 return acceptRateAdjustSpeed_;
489 * Set the speed of adjusting connection accept rate.
491 void setAcceptRateAdjustSpeed(double speed) {
492 acceptRateAdjustSpeed_ = speed;
496 * Get the number of connections dropped by the AsyncServerSocket
498 uint64_t getNumDroppedConnections() const {
499 return numDroppedConnections_;
503 * Set whether or not SO_KEEPALIVE should be enabled on the server socket
504 * (and thus on all subsequently-accepted connections). By default, keepalive
507 * Note that TCP keepalive usually only kicks in after the connection has
508 * been idle for several hours. Applications should almost always have their
509 * own, shorter idle timeout.
511 void setKeepAliveEnabled(bool enabled) {
512 keepAliveEnabled_ = enabled;
514 for (auto& handler : sockets_) {
515 if (handler.socket_ < 0) {
519 int val = (enabled) ? 1 : 0;
520 if (setsockopt(handler.socket_, SOL_SOCKET,
521 SO_KEEPALIVE, &val, sizeof(val)) != 0) {
522 LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
529 * Get whether or not SO_KEEPALIVE is enabled on the server socket.
531 bool getKeepAliveEnabled() const {
532 return keepAliveEnabled_;
536 * Set whether or not SO_REUSEPORT should be enabled on the server socket,
537 * allowing multiple binds to the same port
539 void setReusePortEnabled(bool enabled) {
540 reusePortEnabled_ = enabled;
542 for (auto& handler : sockets_) {
543 if (handler.socket_ < 0) {
547 int val = (enabled) ? 1 : 0;
548 if (setsockopt(handler.socket_, SOL_SOCKET,
549 SO_REUSEPORT, &val, sizeof(val)) != 0) {
551 "failed to set SO_REUSEPORT on async server socket " << errno;
552 folly::throwSystemError(errno,
553 "failed to bind to async server socket");
559 * Get whether or not SO_REUSEPORT is enabled on the server socket.
561 bool getReusePortEnabled_() const {
562 return reusePortEnabled_;
566 * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
567 * default, this is enabled
569 void setCloseOnExec(bool closeOnExec) {
570 closeOnExec_ = closeOnExec;
574 * Get whether or not FD_CLOEXEC is enabled on the server socket.
576 bool getCloseOnExec() const {
581 * Get whether or not the socket is accepting new connections
583 bool getAccepting() const {
589 * Protected destructor.
591 * Invoke destroy() instead to destroy the AsyncServerSocket.
593 virtual ~AsyncServerSocket();
596 enum class MessageType {
601 struct QueueMessage {
605 SocketAddress address;
610 * A class to receive notifications to invoke AcceptCallback objects
611 * in other EventBase threads.
613 * A RemoteAcceptor object is created for each AcceptCallback that
614 * is installed in a separate EventBase thread. The RemoteAcceptor
615 * receives notification of new sockets via a NotificationQueue,
616 * and then invokes the AcceptCallback.
619 : private NotificationQueue<QueueMessage>::Consumer {
621 explicit RemoteAcceptor(AcceptCallback *callback)
622 : callback_(callback) {}
624 ~RemoteAcceptor() = default;
626 void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
627 void stop(EventBase* eventBase, AcceptCallback* callback);
629 virtual void messageAvailable(QueueMessage&& message);
631 NotificationQueue<QueueMessage>* getQueue() {
636 AcceptCallback *callback_;
638 NotificationQueue<QueueMessage> queue_;
642 * A struct to keep track of the callbacks associated with this server
645 struct CallbackInfo {
646 CallbackInfo(AcceptCallback *cb, EventBase *evb)
651 AcceptCallback *callback;
652 EventBase *eventBase;
654 RemoteAcceptor* consumer;
657 class BackoffTimeout;
659 virtual void handlerReady(
660 uint16_t events, int socket, sa_family_t family) noexcept;
662 int createSocket(int family);
663 void setupSocket(int fd);
664 void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
665 void dispatchSocket(int socket, SocketAddress&& address);
666 void dispatchError(const char *msg, int errnoValue);
668 void backoffTimeoutExpired();
670 CallbackInfo* nextCallback() {
671 CallbackInfo* info = &callbacks_[callbackIndex_];
674 if (callbackIndex_ >= callbacks_.size()) {
681 struct ServerEventHandler : public EventHandler {
682 ServerEventHandler(EventBase* eventBase, int socket,
683 AsyncServerSocket* parent,
684 sa_family_t addressFamily)
685 : EventHandler(eventBase, socket)
686 , eventBase_(eventBase)
689 , addressFamily_(addressFamily) {}
691 ServerEventHandler(const ServerEventHandler& other)
692 : EventHandler(other.eventBase_, other.socket_)
693 , eventBase_(other.eventBase_)
694 , socket_(other.socket_)
695 , parent_(other.parent_)
696 , addressFamily_(other.addressFamily_) {}
698 ServerEventHandler& operator=(
699 const ServerEventHandler& other) {
700 if (this != &other) {
701 eventBase_ = other.eventBase_;
702 socket_ = other.socket_;
703 parent_ = other.parent_;
704 addressFamily_ = other.addressFamily_;
707 attachEventBase(other.eventBase_);
708 changeHandlerFD(other.socket_);
713 // Inherited from EventHandler
714 virtual void handlerReady(uint16_t events) noexcept {
715 parent_->handlerReady(events, socket_, addressFamily_);
718 EventBase* eventBase_;
720 AsyncServerSocket* parent_;
721 sa_family_t addressFamily_;
724 EventBase *eventBase_;
725 std::vector<ServerEventHandler> sockets_;
726 std::vector<int> pendingCloseSockets_;
728 uint32_t maxAcceptAtOnce_;
729 uint32_t maxNumMsgsInQueue_;
730 double acceptRateAdjustSpeed_; //0 to disable auto adjust
732 std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
733 uint64_t numDroppedConnections_;
734 uint32_t callbackIndex_;
735 BackoffTimeout *backoffTimeout_;
736 std::vector<CallbackInfo> callbacks_;
737 bool keepAliveEnabled_;
738 bool reusePortEnabled_{false};
740 ShutdownSocketSet* shutdownSocketSet_;