From 4a46ffa2a182e6d70c57abd97188f7698dfd7a50 Mon Sep 17 00:00:00 2001 From: Yang Chi Date: Mon, 14 Dec 2015 15:33:49 -0800 Subject: [PATCH] Add a per-socket buffer callback Summary: this is way simpler than D2623385 + D2709121. There will be a followup diff to clean the existing per-write call BufferCallback. The new one is on per-socket basis, much straightforward. I will only setup this in HTTPUpstreamSession. Reviewed By: afrind Differential Revision: D2723493 fb-gh-sync-id: 6b1c21a719281b9693330b6a4074f7149d7c342a --- folly/io/async/AsyncSocket.cpp | 16 +++++++++++- folly/io/async/AsyncSocket.h | 4 +++ folly/io/async/AsyncTransport.h | 7 +++++ folly/io/async/test/AsyncSocketTest.h | 17 ++++++++++++ folly/io/async/test/AsyncSocketTest2.cpp | 33 ++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 1 deletion(-) diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 782a4ec4..fa475faa 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -688,7 +688,11 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, callback->writeSuccess(); } return; - } // else { continue writing the next writeReq } + } else { // continue writing the next writeReq + if (bufferCallback_) { + bufferCallback_->onEgressBuffered(); + } + } mustRegister = true; } } else if (!connecting()) { @@ -1505,6 +1509,9 @@ void AsyncSocket::handleWrite() noexcept { // We'll continue around the loop, trying to write another request } else { // Partial write. + if (bufferCallback_) { + bufferCallback_->onEgressBuffered(); + } writeReqHead_->consume(); // Stop after a partial write; it's highly likely that a subsequent write // attempt will just return EAGAIN. @@ -1528,6 +1535,9 @@ void AsyncSocket::handleWrite() noexcept { return; } } + if (!writeReqHead_ && bufferCallback_) { + bufferCallback_->onEgressBufferCleared(); + } } void AsyncSocket::checkForImmediateRead() noexcept { @@ -2049,4 +2059,8 @@ std::string AsyncSocket::withAddr(const std::string& s) { return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")"; } +void AsyncSocket::setBufferCallback(BufferCallback* cb) { + bufferCallback_ = cb; +} + } // folly diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index e813216f..ceaaaf94 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -510,6 +510,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ERROR }; + void setBufferCallback(BufferCallback* cb); + /** * A WriteRequest object tracks information about a pending write operation. */ @@ -813,6 +815,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { bool persistentCork_{false}; // Whether we've applied the TCP_CORK option to the socket bool corked_{false}; + + BufferCallback* bufferCallback_{nullptr}; }; diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index 40dca58b..f3eebfc7 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -332,6 +332,13 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { virtual size_t getAppBytesReceived() const = 0; virtual size_t getRawBytesReceived() const = 0; + class BufferCallback { + public: + virtual ~BufferCallback() {} + virtual void onEgressBuffered() = 0; + virtual void onEgressBufferCleared() = 0; + }; + protected: virtual ~AsyncTransport() = default; }; diff --git a/folly/io/async/test/AsyncSocketTest.h b/folly/io/async/test/AsyncSocketTest.h index 51230014..722be94f 100644 --- a/folly/io/async/test/AsyncSocketTest.h +++ b/folly/io/async/test/AsyncSocketTest.h @@ -185,6 +185,23 @@ class ReadCallback : public AsyncTransportWrapper::ReadCallback { const size_t maxBufferSz; }; +class BufferCallback : public AsyncTransport::BufferCallback { + public: + BufferCallback() : buffered_(false), bufferCleared_(false) {} + + void onEgressBuffered() override { buffered_ = true; } + + void onEgressBufferCleared() override { bufferCleared_ = true; } + + bool hasBuffered() const { return buffered_; } + + bool hasBufferCleared() const { return bufferCleared_; } + + private: + bool buffered_{false}; + bool bufferCleared_{false}; +}; + class ReadVerifier { }; diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index 1a5ebeb8..7ee36a8c 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -2238,3 +2238,36 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) { eventBase.loop(); } + +/** + * Test AsyncTransport::BufferCallback + */ +TEST(AsyncSocketTest, BufferTest) { + TestServer server; + + EventBase evb; + AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}}; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30, option); + + char buf[100 * 1024]; + memset(buf, 'c', sizeof(buf)); + WriteCallback wcb; + BufferCallback bcb; + socket->setBufferCallback(&bcb); + socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE); + + evb.loop(); + CHECK_EQ(ccb.state, STATE_SUCCEEDED); + CHECK_EQ(wcb.state, STATE_SUCCEEDED); + + ASSERT_TRUE(bcb.hasBuffered()); + ASSERT_TRUE(bcb.hasBufferCleared()); + + socket->close(); + server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); +} -- 2.34.1