assert(readCallback_ != nullptr);
while (readCallback_) {
+ // - What API does callback support?
+ const auto movable = readCallback_->isBufferMovable(); // noexcept
+
// Get the buffer to read into.
void* buf = nullptr;
size_t buflen = 0;
- try {
- readCallback_->getReadBuffer(&buf, &buflen);
- } catch (const std::exception& ex) {
- AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
- string("ReadCallback::getReadBuffer() "
- "threw exception: ") + ex.what());
- failRead(aex);
- return;
- } catch (...) {
- AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
- string("ReadCallback::getReadBuffer() "
- "threw non-exception type"));
- failRead(ex);
- return;
- }
- if (buf == nullptr || buflen == 0) {
- AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
- string("ReadCallback::getReadBuffer() "
- "returned empty buffer"));
- failRead(ex);
- return;
+ std::unique_ptr<IOBuf> ioBuf;
+
+ if (movable) {
+ ioBuf = IOBuf::create(readCallback_->maxBufferSize());
+ buf = ioBuf->writableBuffer();
+ buflen = ioBuf->capacity();
+ } else {
+ try {
+ readCallback_->getReadBuffer(&buf, &buflen);
+ } catch (const std::exception& ex) {
+ AsyncSocketException aex(
+ AsyncSocketException::BAD_ARGS,
+ string("ReadCallback::getReadBuffer() "
+ "threw exception: ") +
+ ex.what());
+ failRead(aex);
+ return;
+ } catch (...) {
+ AsyncSocketException aex(
+ AsyncSocketException::BAD_ARGS,
+ string("ReadCallback::getReadBuffer() "
+ "threw non-exception type"));
+ failRead(aex);
+ return;
+ }
+ if (buf == nullptr || buflen == 0) {
+ AsyncSocketException aex(
+ AsyncSocketException::INVALID_STATE,
+ string("ReadCallback::getReadBuffer() "
+ "returned empty buffer"));
+ failRead(aex);
+ return;
+ }
}
// Perform the read
ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
+
if (bytesRead > 0) {
- readCallback_->readDataAvailable(bytesRead);
+ if (movable) {
+ ioBuf->append(bytesRead);
+ readCallback_->readBufferAvailable(std::move(ioBuf));
+ } else {
+ readCallback_->readDataAvailable(bytesRead);
+ }
// Fall through and continue around the loop if the read
// completely filled the available buffer.
// Note that readCallback_ may have been uninstalled or changed inside
class TestReadCallback : public folly::AsyncReader::ReadCallback {
public:
+ bool isBufferMovable() noexcept override {
+ return movable_;
+ }
+ void setMovable(bool movable) {
+ movable_ = movable;
+ }
+
+ void readBufferAvailable(
+ std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
+ readBuffer_.append(std::move(readBuf));
+ }
+
void readDataAvailable(size_t len) noexcept override {
readBuffer_.postallocate(len);
}
return std::string((char *)buf->data(), buf->length());
}
+ void reset() {
+ movable_ = false;
+ error_ = false;
+ readBuffer_.clear();
+ }
+
folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
bool error_{false};
+ bool movable_{false};
};
class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
error_ = true;
}
+ void reset() {
+ writes_ = 0;
+ error_ = false;
+ }
+
uint32_t writes_{0};
bool error_{false};
};
class AsyncPipeTest: public Test {
public:
- void SetUp() override {
+ void reset(bool movable) {
+ reader_.reset();
+ readCallback_.reset();
+ writer_.reset();
+ writeCallback_.reset();
+
int rc = pipe(pipeFds_);
EXPECT_EQ(rc, 0);
&eventBase_, pipeFds_[0]);
writer_ = folly::AsyncPipeWriter::newWriter(
&eventBase_, pipeFds_[1]);
+
+ readCallback_.setMovable(movable);
}
protected:
TEST_F(AsyncPipeTest, simple) {
- reader_->setReadCB(&readCallback_);
- writer_->write(getBuf("hello"), &writeCallback_);
- writer_->closeOnEmpty();
- eventBase_.loop();
- EXPECT_EQ(readCallback_.getData(), "hello");
- EXPECT_FALSE(readCallback_.error_);
- EXPECT_EQ(writeCallback_.writes_, 1);
- EXPECT_FALSE(writeCallback_.error_);
+ for (int pass = 0; pass < 2; ++pass) {
+ reset(pass % 2 != 0);
+ reader_->setReadCB(&readCallback_);
+ writer_->write(getBuf("hello"), &writeCallback_);
+ writer_->closeOnEmpty();
+ eventBase_.loop();
+ EXPECT_EQ(readCallback_.getData(), "hello");
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, 1);
+ EXPECT_FALSE(writeCallback_.error_);
+ }
}
TEST_F(AsyncPipeTest, blocked_writes) {
- uint32_t writeAttempts = 0;
- do {
- ++writeAttempts;
- writer_->write(getBuf("hello"), &writeCallback_);
- } while (writeCallback_.writes_ == writeAttempts);
- // there is one blocked write
- writer_->closeOnEmpty();
-
- reader_->setReadCB(&readCallback_);
-
- eventBase_.loop();
- std::string expected;
- for (uint32_t i = 0; i < writeAttempts; i++) {
- expected += "hello";
+ for (int pass = 0; pass < 2; ++pass) {
+ reset(pass % 2 != 0);
+ uint32_t writeAttempts = 0;
+ do {
+ ++writeAttempts;
+ writer_->write(getBuf("hello"), &writeCallback_);
+ } while (writeCallback_.writes_ == writeAttempts);
+ // there is one blocked write
+ writer_->closeOnEmpty();
+
+ reader_->setReadCB(&readCallback_);
+
+ eventBase_.loop();
+ std::string expected;
+ for (uint32_t i = 0; i < writeAttempts; i++) {
+ expected += "hello";
+ }
+ EXPECT_EQ(readCallback_.getData(), expected);
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, writeAttempts);
+ EXPECT_FALSE(writeCallback_.error_);
}
- EXPECT_EQ(readCallback_.getData(), expected);
- EXPECT_FALSE(readCallback_.error_);
- EXPECT_EQ(writeCallback_.writes_, writeAttempts);
- EXPECT_FALSE(writeCallback_.error_);
}
TEST_F(AsyncPipeTest, writeOnClose) {
- reader_->setReadCB(&readCallback_);
- writer_->write(getBuf("hello"), &writeCallback_);
- writer_->closeOnEmpty();
- writer_->write(getBuf("hello"), &writeCallback_);
- eventBase_.loop();
- EXPECT_EQ(readCallback_.getData(), "hello");
- EXPECT_FALSE(readCallback_.error_);
- EXPECT_EQ(writeCallback_.writes_, 1);
- EXPECT_TRUE(writeCallback_.error_);
+ for (int pass = 0; pass < 2; ++pass) {
+ reset(pass % 2 != 0);
+ reader_->setReadCB(&readCallback_);
+ writer_->write(getBuf("hello"), &writeCallback_);
+ writer_->closeOnEmpty();
+ writer_->write(getBuf("hello"), &writeCallback_);
+ eventBase_.loop();
+ EXPECT_EQ(readCallback_.getData(), "hello");
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, 1);
+ EXPECT_TRUE(writeCallback_.error_);
+ }
}