class ReadCallback : public AsyncTransportWrapper::ReadCallback {
public:
- ReadCallback()
+ explicit ReadCallback(size_t _maxBufferSz = 4096)
: state(STATE_WAITING)
, exception(AsyncSocketException::UNKNOWN, "none")
- , buffers() {}
+ , buffers()
+ , maxBufferSz(_maxBufferSz) {}
~ReadCallback() {
for (std::vector<Buffer>::iterator it = buffers.begin();
void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
if (!currentBuffer.buffer) {
- currentBuffer.allocate(4096);
+ currentBuffer.allocate(maxBufferSz);
}
*bufReturn = currentBuffer.buffer;
*lenReturn = currentBuffer.length;
CHECK_EQ(offset, expectedLen);
}
+ size_t dataRead() const {
+ size_t ret = 0;
+ for (const auto& buf : buffers) {
+ ret += buf.length;
+ }
+ return ret;
+ }
+
class Buffer {
public:
Buffer() : buffer(nullptr), length(0) {}
std::vector<Buffer> buffers;
Buffer currentBuffer;
VoidCallback dataAvailableCallback;
+ const size_t maxBufferSz;
};
class ReadVerifier {
}
}
+///////////////////////////////////////////////////////////////////////////
+// ImmediateRead related tests
+///////////////////////////////////////////////////////////////////////////
+
+/* AsyncSocket use to verify immediate read works */
+class AsyncSocketImmediateRead : public folly::AsyncSocket {
+ public:
+ bool immediateReadCalled = false;
+ explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
+ protected:
+ virtual void checkForImmediateRead() noexcept override {
+ immediateReadCalled = true;
+ AsyncSocket::handleRead();
+ }
+};
+
+TEST(AsyncSocket, ConnectReadImmediateRead) {
+ TestServer server;
+
+ const size_t maxBufferSz = 100;
+ const size_t maxReadsPerEvent = 1;
+ const size_t expectedDataSz = maxBufferSz * 3;
+ char expectedData[expectedDataSz];
+ memset(expectedData, 'j', expectedDataSz);
+
+ EventBase evb;
+ ReadCallback rcb(maxBufferSz);
+ AsyncSocketImmediateRead socket(&evb);
+ socket.connect(nullptr, server.getAddress(), 30);
+
+ evb.loop(); // loop until the socket is connected
+
+ socket.setReadCB(&rcb);
+ socket.setMaxReadsPerEvent(maxReadsPerEvent);
+ socket.immediateReadCalled = false;
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback rcbServer;
+ WriteCallback wcbServer;
+ rcbServer.dataAvailableCallback = [&]() {
+ if (rcbServer.dataRead() == expectedDataSz) {
+ // write back all data read
+ rcbServer.verifyData(expectedData, expectedDataSz);
+ acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
+ acceptedSocket->close();
+ }
+ };
+ acceptedSocket->setReadCB(&rcbServer);
+
+ // write data
+ WriteCallback wcb1;
+ socket.write(&wcb1, expectedData, expectedDataSz);
+ evb.loop();
+ CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+ rcb.verifyData(expectedData, expectedDataSz);
+ CHECK_EQ(socket.immediateReadCalled, true);
+}
+
+TEST(AsyncSocket, ConnectReadUninstallRead) {
+ TestServer server;
+
+ const size_t maxBufferSz = 100;
+ const size_t maxReadsPerEvent = 1;
+ const size_t expectedDataSz = maxBufferSz * 3;
+ char expectedData[expectedDataSz];
+ memset(expectedData, 'k', expectedDataSz);
+
+ EventBase evb;
+ ReadCallback rcb(maxBufferSz);
+ AsyncSocketImmediateRead socket(&evb);
+ socket.connect(nullptr, server.getAddress(), 30);
+
+ evb.loop(); // loop until the socket is connected
+
+ socket.setReadCB(&rcb);
+ socket.setMaxReadsPerEvent(maxReadsPerEvent);
+ socket.immediateReadCalled = false;
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback rcbServer;
+ WriteCallback wcbServer;
+ rcbServer.dataAvailableCallback = [&]() {
+ if (rcbServer.dataRead() == expectedDataSz) {
+ // write back all data read
+ rcbServer.verifyData(expectedData, expectedDataSz);
+ acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
+ acceptedSocket->close();
+ }
+ };
+ acceptedSocket->setReadCB(&rcbServer);
+
+ rcb.dataAvailableCallback = [&]() {
+ // we read data and reset readCB
+ socket.setReadCB(nullptr);
+ };
+
+ // write data
+ WriteCallback wcb;
+ socket.write(&wcb, expectedData, expectedDataSz);
+ evb.loop();
+ CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+
+ /* we shoud've only read maxBufferSz data since readCallback_
+ * was reset in dataAvailableCallback */
+ CHECK_EQ(rcb.dataRead(), maxBufferSz);
+ CHECK_EQ(socket.immediateReadCalled, false);
+}
// TODO:
// - Test connect() and have the connect callback set the read callback