callback->writeSuccess();
}
return;
- } // else { continue writing the next writeReq }
+ } else { // continue writing the next writeReq
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ }
mustRegister = true;
}
} else if (!connecting()) {
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly
ERROR
};
+ void setBufferCallback(BufferCallback* cb);
+
/**
* A WriteRequest object tracks information about a pending write operation.
*/
bool persistentCork_{false};
// Whether we've applied the TCP_CORK option to the socket
bool corked_{false};
+
+ BufferCallback* bufferCallback_{nullptr};
};
virtual size_t getAppBytesReceived() const = 0;
virtual size_t getRawBytesReceived() const = 0;
+ class BufferCallback {
+ public:
+ virtual ~BufferCallback() {}
+ virtual void onEgressBuffered() = 0;
+ virtual void onEgressBufferCleared() = 0;
+ };
+
protected:
virtual ~AsyncTransport() = default;
};
const size_t maxBufferSz;
};
+class BufferCallback : public AsyncTransport::BufferCallback {
+ public:
+ BufferCallback() : buffered_(false), bufferCleared_(false) {}
+
+ void onEgressBuffered() override { buffered_ = true; }
+
+ void onEgressBufferCleared() override { bufferCleared_ = true; }
+
+ bool hasBuffered() const { return buffered_; }
+
+ bool hasBufferCleared() const { return bufferCleared_; }
+
+ private:
+ bool buffered_{false};
+ bool bufferCleared_{false};
+};
+
class ReadVerifier {
};
eventBase.loop();
}
+
+/**
+ * Test AsyncTransport::BufferCallback
+ */
+TEST(AsyncSocketTest, BufferTest) {
+ TestServer server;
+
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ WriteCallback wcb;
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(bcb.hasBuffered());
+ ASSERT_TRUE(bcb.hasBufferCleared());
+
+ socket->close();
+ server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}