Add a per-socket buffer callback
authorYang Chi <yangchi@fb.com>
Mon, 14 Dec 2015 23:33:49 +0000 (15:33 -0800)
committerfacebook-github-bot-1 <folly-bot@fb.com>
Tue, 15 Dec 2015 00:20:25 +0000 (16:20 -0800)
Summary: this is way simpler than D2623385 + D2709121. There will be a followup diff to clean the existing per-write call BufferCallback. The new one is on per-socket basis, much straightforward. I will only setup this in HTTPUpstreamSession.

Reviewed By: afrind

Differential Revision: D2723493

fb-gh-sync-id: 6b1c21a719281b9693330b6a4074f7149d7c342a

folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncSocket.h
folly/io/async/AsyncTransport.h
folly/io/async/test/AsyncSocketTest.h
folly/io/async/test/AsyncSocketTest2.cpp

index 782a4ec41e161f9d3312d7616809c358a1e3babe..fa475faa7ba85de7ea7caf7f4efbbc704f1a2e7b 100644 (file)
@@ -688,7 +688,11 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
           callback->writeSuccess();
         }
         return;
-      } // else { continue writing the next writeReq }
+      } else { // continue writing the next writeReq
+        if (bufferCallback_) {
+          bufferCallback_->onEgressBuffered();
+        }
+      }
       mustRegister = true;
     }
   } else if (!connecting()) {
@@ -1505,6 +1509,9 @@ void AsyncSocket::handleWrite() noexcept {
       // We'll continue around the loop, trying to write another request
     } else {
       // Partial write.
+      if (bufferCallback_) {
+        bufferCallback_->onEgressBuffered();
+      }
       writeReqHead_->consume();
       // Stop after a partial write; it's highly likely that a subsequent write
       // attempt will just return EAGAIN.
@@ -1528,6 +1535,9 @@ void AsyncSocket::handleWrite() noexcept {
       return;
     }
   }
+  if (!writeReqHead_ && bufferCallback_) {
+    bufferCallback_->onEgressBufferCleared();
+  }
 }
 
 void AsyncSocket::checkForImmediateRead() noexcept {
@@ -2049,4 +2059,8 @@ std::string AsyncSocket::withAddr(const std::string& s) {
   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
 }
 
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+  bufferCallback_ = cb;
+}
+
 } // folly
index e813216f6405df368da50588ca303c2b6e6a8d62..ceaaaf94286784a4ac4b8b072219141f5b108d15 100644 (file)
@@ -510,6 +510,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
     ERROR
   };
 
+  void setBufferCallback(BufferCallback* cb);
+
   /**
    * A WriteRequest object tracks information about a pending write operation.
    */
@@ -813,6 +815,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
   bool persistentCork_{false};
   // Whether we've applied the TCP_CORK option to the socket
   bool corked_{false};
+
+  BufferCallback* bufferCallback_{nullptr};
 };
 
 
index 40dca58b8e18acb6f61303008268cf7d445490e6..f3eebfc757d4f71f2d1912065528a90e59bd7fdd 100644 (file)
@@ -332,6 +332,13 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
   virtual size_t getAppBytesReceived() const = 0;
   virtual size_t getRawBytesReceived() const = 0;
 
+  class BufferCallback {
+   public:
+    virtual ~BufferCallback() {}
+    virtual void onEgressBuffered() = 0;
+    virtual void onEgressBufferCleared() = 0;
+  };
+
  protected:
   virtual ~AsyncTransport() = default;
 };
index 51230014203c038cda8fabb1872c61a1b6623ff5..722be94f35015762e8a1399eb42cba9842e68a46 100644 (file)
@@ -185,6 +185,23 @@ class ReadCallback : public AsyncTransportWrapper::ReadCallback {
   const size_t maxBufferSz;
 };
 
+class BufferCallback : public AsyncTransport::BufferCallback {
+ public:
+  BufferCallback() : buffered_(false), bufferCleared_(false) {}
+
+  void onEgressBuffered() override { buffered_ = true; }
+
+  void onEgressBufferCleared() override { bufferCleared_ = true; }
+
+  bool hasBuffered() const { return buffered_; }
+
+  bool hasBufferCleared() const { return bufferCleared_; }
+
+ private:
+  bool buffered_{false};
+  bool bufferCleared_{false};
+};
+
 class ReadVerifier {
 };
 
index 1a5ebeb84e1d79f9d5a18f62161e90b7a6cd1495..7ee36a8cb97525a707b709f63c60715a4c4e0487 100644 (file)
@@ -2238,3 +2238,36 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
 
   eventBase.loop();
 }
+
+/**
+ * Test AsyncTransport::BufferCallback
+ */
+TEST(AsyncSocketTest, BufferTest) {
+  TestServer server;
+
+  EventBase evb;
+  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30, option);
+
+  char buf[100 * 1024];
+  memset(buf, 'c', sizeof(buf));
+  WriteCallback wcb;
+  BufferCallback bcb;
+  socket->setBufferCallback(&bcb);
+  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+  evb.loop();
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+
+  ASSERT_TRUE(bcb.hasBuffered());
+  ASSERT_TRUE(bcb.hasBufferCleared());
+
+  socket->close();
+  server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}