From 0703c0070b211897f6515656f2e80884ec52b8ee Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Wed, 17 Sep 2014 10:20:31 -0700 Subject: [PATCH] move connectionmanager/managed connection to wangle Summary: Moar code sharing between proxygen/thrift. Code must be moved to a common library: The only one we currently have is folly, but if someone wants to argue for a separate lib, that's cool too Test Plan: fbconfig -r proxygen; fbmake dev Reviewed By: dcsommer@fb.com Subscribers: doug, fugalh, bmatheny, njormrod FB internal diff: D1519919 Tasks: 5002343 --- .../experimental/wangle/ConnectionManager.cpp | 175 ++++++++++++++++ folly/experimental/wangle/ConnectionManager.h | 191 ++++++++++++++++++ .../experimental/wangle/ManagedConnection.cpp | 57 ++++++ folly/experimental/wangle/ManagedConnection.h | 115 +++++++++++ 4 files changed, 538 insertions(+) create mode 100644 folly/experimental/wangle/ConnectionManager.cpp create mode 100644 folly/experimental/wangle/ConnectionManager.h create mode 100644 folly/experimental/wangle/ManagedConnection.cpp create mode 100644 folly/experimental/wangle/ManagedConnection.h diff --git a/folly/experimental/wangle/ConnectionManager.cpp b/folly/experimental/wangle/ConnectionManager.cpp new file mode 100644 index 00000000..8489fa08 --- /dev/null +++ b/folly/experimental/wangle/ConnectionManager.cpp @@ -0,0 +1,175 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +using folly::HHWheelTimer; +using std::chrono::milliseconds; + +namespace folly { namespace wangle { + +ConnectionManager::ConnectionManager(EventBase* eventBase, + milliseconds timeout, Callback* callback) + : connTimeouts_(new HHWheelTimer(eventBase)), + callback_(callback), + eventBase_(eventBase), + idleIterator_(conns_.end()), + idleLoopCallback_(this), + timeout_(timeout) { + +} + +void +ConnectionManager::addConnection(ManagedConnection* connection, + bool timeout) { + CHECK_NOTNULL(connection); + ConnectionManager* oldMgr = connection->getConnectionManager(); + if (oldMgr != this) { + if (oldMgr) { + // 'connection' was being previously managed in a different thread. + // We must remove it from that manager before adding it to this one. + oldMgr->removeConnection(connection); + } + conns_.push_back(*connection); + connection->setConnectionManager(this); + if (callback_) { + callback_->onConnectionAdded(*this); + } + } + if (timeout) { + scheduleTimeout(connection); + } +} + +void +ConnectionManager::scheduleTimeout(ManagedConnection* connection) { + if (timeout_ > std::chrono::milliseconds(0)) { + connTimeouts_->scheduleTimeout(connection, timeout_); + } +} + +void ConnectionManager::scheduleTimeout( + folly::HHWheelTimer::Callback* callback, + std::chrono::milliseconds timeout) { + connTimeouts_->scheduleTimeout(callback, timeout); +} + +void +ConnectionManager::removeConnection(ManagedConnection* connection) { + if (connection->getConnectionManager() == this) { + connection->cancelTimeout(); + connection->setConnectionManager(nullptr); + + // Un-link the connection from our list, being careful to keep the iterator + // that we're using for idle shedding valid + auto it = conns_.iterator_to(*connection); + if (it == idleIterator_) { + ++idleIterator_; + } + conns_.erase(it); + + if (callback_) { + callback_->onConnectionRemoved(*this); + if (getNumConnections() == 0) { + callback_->onEmpty(*this); + } + } + } +} + +void +ConnectionManager::initiateGracefulShutdown( + std::chrono::milliseconds idleGrace) { + if (idleGrace.count() > 0) { + idleLoopCallback_.scheduleTimeout(idleGrace); + VLOG(3) << "Scheduling idle grace period of " << idleGrace.count() << "ms"; + } else { + action_ = ShutdownAction::DRAIN2; + VLOG(3) << "proceeding directly to closing idle connections"; + } + drainAllConnections(); +} + +void +ConnectionManager::drainAllConnections() { + DestructorGuard g(this); + size_t numCleared = 0; + size_t numKept = 0; + + auto it = idleIterator_ == conns_.end() ? + conns_.begin() : idleIterator_; + + while (it != conns_.end() && (numKept + numCleared) < 64) { + ManagedConnection& conn = *it++; + if (action_ == ShutdownAction::DRAIN1) { + conn.notifyPendingShutdown(); + } else { + // Second time around: close idle sessions. If they aren't idle yet, + // have them close when they are idle + if (conn.isBusy()) { + numKept++; + } else { + numCleared++; + } + conn.closeWhenIdle(); + } + } + + if (action_ == ShutdownAction::DRAIN2) { + VLOG(2) << "Idle connections cleared: " << numCleared << + ", busy conns kept: " << numKept; + } + if (it != conns_.end()) { + idleIterator_ = it; + eventBase_->runInLoop(&idleLoopCallback_); + } else { + action_ = ShutdownAction::DRAIN2; + } +} + +void +ConnectionManager::dropAllConnections() { + DestructorGuard g(this); + + // Iterate through our connection list, and drop each connection. + VLOG(3) << "connections to drop: " << conns_.size(); + idleLoopCallback_.cancelTimeout(); + unsigned i = 0; + while (!conns_.empty()) { + ManagedConnection& conn = conns_.front(); + conns_.pop_front(); + conn.cancelTimeout(); + conn.setConnectionManager(nullptr); + // For debugging purposes, dump information about the first few + // connections. + static const unsigned MAX_CONNS_TO_DUMP = 2; + if (++i <= MAX_CONNS_TO_DUMP) { + conn.dumpConnectionState(3); + } + conn.dropConnection(); + } + idleIterator_ = conns_.end(); + idleLoopCallback_.cancelLoopCallback(); + + if (callback_) { + callback_->onEmpty(*this); + } +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/ConnectionManager.h b/folly/experimental/wangle/ConnectionManager.h new file mode 100644 index 00000000..30117e43 --- /dev/null +++ b/folly/experimental/wangle/ConnectionManager.h @@ -0,0 +1,191 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { + +/** + * A ConnectionManager keeps track of ManagedConnections. + */ +class ConnectionManager: public folly::DelayedDestruction { + public: + + /** + * Interface for an optional observer that's notified about + * various events in a ConnectionManager + */ + class Callback { + public: + virtual ~Callback() {} + + /** + * Invoked when the number of connections managed by the + * ConnectionManager changes from nonzero to zero. + */ + virtual void onEmpty(const ConnectionManager& cm) = 0; + + /** + * Invoked when a connection is added to the ConnectionManager. + */ + virtual void onConnectionAdded(const ConnectionManager& cm) = 0; + + /** + * Invoked when a connection is removed from the ConnectionManager. + */ + virtual void onConnectionRemoved(const ConnectionManager& cm) = 0; + }; + + typedef std::unique_ptr UniquePtr; + + /** + * Returns a new instance of ConnectionManager wrapped in a unique_ptr + */ + template + static UniquePtr makeUnique(Args&&... args) { + return folly::make_unique( + std::forward(args)...); + } + + /** + * Constructor not to be used by itself. + */ + ConnectionManager(folly::EventBase* eventBase, + std::chrono::milliseconds timeout, + Callback* callback = nullptr); + + /** + * Add a connection to the set of connections managed by this + * ConnectionManager. + * + * @param connection The connection to add. + * @param timeout Whether to immediately register this connection + * for an idle timeout callback. + */ + void addConnection(ManagedConnection* connection, + bool timeout = false); + + /** + * Schedule a timeout callback for a connection. + */ + void scheduleTimeout(ManagedConnection* connection); + + /* + * Schedule a callback on the wheel timer + */ + void scheduleTimeout(folly::HHWheelTimer::Callback* callback, + std::chrono::milliseconds timeout); + + /** + * Remove a connection from this ConnectionManager and, if + * applicable, cancel the pending timeout callback that the + * ConnectionManager has scheduled for the connection. + * + * @note This method does NOT destroy the connection. + */ + void removeConnection(ManagedConnection* connection); + + /* Begin gracefully shutting down connections in this ConnectionManager. + * Notify all connections of pending shutdown, and after idleGrace, + * begin closing idle connections. + */ + void initiateGracefulShutdown(std::chrono::milliseconds idleGrace); + + /** + * Destroy all connections Managed by this ConnectionManager, even + * the ones that are busy. + */ + void dropAllConnections(); + + size_t getNumConnections() const { return conns_.size(); } + + private: + class CloseIdleConnsCallback : + public folly::EventBase::LoopCallback, + public folly::AsyncTimeout { + public: + explicit CloseIdleConnsCallback(ConnectionManager* manager) + : folly::AsyncTimeout(manager->eventBase_), + manager_(manager) {} + + void runLoopCallback() noexcept override { + VLOG(3) << "Draining more conns from loop callback"; + manager_->drainAllConnections(); + } + + void timeoutExpired() noexcept override { + VLOG(3) << "Idle grace expired"; + manager_->drainAllConnections(); + } + + private: + ConnectionManager* manager_; + }; + + enum class ShutdownAction : uint8_t { + /** + * Drain part 1: inform remote that you will soon reject new requests. + */ + DRAIN1 = 0, + /** + * Drain part 2: start rejecting new requests. + */ + DRAIN2 = 1, + }; + + ~ConnectionManager() {} + + ConnectionManager(const ConnectionManager&) = delete; + ConnectionManager& operator=(ConnectionManager&) = delete; + + /** + * Destroy all connections managed by this ConnectionManager that + * are currently idle, as determined by a call to each ManagedConnection's + * isBusy() method. + */ + void drainAllConnections(); + + /** All connections */ + folly::CountedIntrusiveList< + ManagedConnection,&ManagedConnection::listHook_> conns_; + + /** Connections that currently are registered for timeouts */ + folly::HHWheelTimer::UniquePtr connTimeouts_; + + /** Optional callback to notify of state changes */ + Callback* callback_; + + /** Event base in which we run */ + folly::EventBase* eventBase_; + + /** Iterator to the next connection to shed; used by drainAllConnections() */ + folly::CountedIntrusiveList< + ManagedConnection,&ManagedConnection::listHook_>::iterator idleIterator_; + CloseIdleConnsCallback idleLoopCallback_; + ShutdownAction action_{ShutdownAction::DRAIN1}; + std::chrono::milliseconds timeout_; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/ManagedConnection.cpp b/folly/experimental/wangle/ManagedConnection.cpp new file mode 100644 index 00000000..66db04f2 --- /dev/null +++ b/folly/experimental/wangle/ManagedConnection.cpp @@ -0,0 +1,57 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +namespace folly { namespace wangle { + +ManagedConnection::ManagedConnection() + : connectionManager_(nullptr) { +} + +ManagedConnection::~ManagedConnection() { + if (connectionManager_) { + connectionManager_->removeConnection(this); + } +} + +void +ManagedConnection::resetTimeout() { + if (connectionManager_) { + connectionManager_->scheduleTimeout(this); + } +} + +void +ManagedConnection::scheduleTimeout( + folly::HHWheelTimer::Callback* callback, + std::chrono::milliseconds timeout) { + if (connectionManager_) { + connectionManager_->scheduleTimeout(callback, timeout); + } +} + +////////////////////// Globals ///////////////////// + +std::ostream& +operator<<(std::ostream& os, const ManagedConnection& conn) { + conn.describe(os); + return os; +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/ManagedConnection.h b/folly/experimental/wangle/ManagedConnection.h new file mode 100644 index 00000000..50e7c057 --- /dev/null +++ b/folly/experimental/wangle/ManagedConnection.h @@ -0,0 +1,115 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { namespace wangle { + +class ConnectionManager; + +/** + * Interface describing a connection that can be managed by a + * container such as an Acceptor. + */ +class ManagedConnection: + public folly::HHWheelTimer::Callback, + public folly::DelayedDestruction { + public: + + ManagedConnection(); + + // HHWheelTimer::Callback API (left for subclasses to implement). + virtual void timeoutExpired() noexcept = 0; + + /** + * Print a human-readable description of the connection. + * @param os Destination stream. + */ + virtual void describe(std::ostream& os) const = 0; + + /** + * Check whether the connection has any requests outstanding. + */ + virtual bool isBusy() const = 0; + + /** + * Notify the connection that a shutdown is pending. This method will be + * called at the beginning of graceful shutdown. + */ + virtual void notifyPendingShutdown() = 0; + + /** + * Instruct the connection that it should shutdown as soon as it is + * safe. This is called after notifyPendingShutdown(). + */ + virtual void closeWhenIdle() = 0; + + /** + * Forcibly drop a connection. + * + * If a request is in progress, this should cause the connection to be + * closed with a reset. + */ + virtual void dropConnection() = 0; + + /** + * Dump the state of the connection to the log + */ + virtual void dumpConnectionState(uint8_t loglevel) = 0; + + /** + * If the connection has a connection manager, reset the timeout + * countdown. + * @note If the connection manager doesn't have the connection scheduled + * for a timeout already, this method will schedule one. If the + * connection manager does have the connection connection scheduled + * for a timeout, this method will push back the timeout to N msec + * from now, where N is the connection manager's timer interval. + */ + virtual void resetTimeout(); + + // Schedule an arbitrary timeout on the HHWheelTimer + virtual void scheduleTimeout( + folly::HHWheelTimer::Callback* callback, + std::chrono::milliseconds timeout); + + ConnectionManager* getConnectionManager() { + return connectionManager_; + } + + protected: + virtual ~ManagedConnection(); + + private: + friend class ConnectionManager; + + void setConnectionManager(ConnectionManager* mgr) { + connectionManager_ = mgr; + } + + ConnectionManager* connectionManager_; + + folly::SafeIntrusiveListHook listHook_; +}; + +std::ostream& operator<<(std::ostream& os, const ManagedConnection& conn); + +}} // folly::wangle -- 2.34.1