void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags,
- BufferCallback*) {
+ WriteFlags) {
write(std::move(buf), callback);
}
}
// AsyncWriter methods
- void write(folly::AsyncWriter::WriteCallback* callback, const void* buf,
- size_t bytes, WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override {
- writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags, bufCallback);
+ void write(folly::AsyncWriter::WriteCallback* callback,
+ const void* buf,
+ size_t bytes,
+ WriteFlags flags = WriteFlags::NONE) override {
+ writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
}
- void writev(folly::AsyncWriter::WriteCallback*, const iovec*,
- size_t, WriteFlags = WriteFlags::NONE,
- BufferCallback* = nullptr) override {
+ void writev(folly::AsyncWriter::WriteCallback*,
+ const iovec*,
+ size_t,
+ WriteFlags = WriteFlags::NONE) override {
throw std::runtime_error("writev is not supported. Please use writeChain.");
}
void writeChain(folly::AsyncWriter::WriteCallback* callback,
std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override;
+ WriteFlags flags = WriteFlags::NONE) override;
private:
void handlerReady(uint16_t events) noexcept override;
*/
class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
public:
- static BytesWriteRequest* newRequest(
- AsyncSocket* socket,
- WriteCallback* callback,
- const iovec* ops,
- uint32_t opCount,
- uint32_t partialWritten,
- uint32_t bytesWritten,
- unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags,
- BufferCallback* bufferCallback = nullptr) {
+ static BytesWriteRequest* newRequest(AsyncSocket* socket,
+ WriteCallback* callback,
+ const iovec* ops,
+ uint32_t opCount,
+ uint32_t partialWritten,
+ uint32_t bytesWritten,
+ unique_ptr<IOBuf>&& ioBuf,
+ WriteFlags flags) {
assert(opCount > 0);
// Since we put a variable size iovec array at the end
// of each BytesWriteRequest, we have to manually allocate the memory.
return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
partialWritten, bytesWritten,
- std::move(ioBuf), flags, bufferCallback);
+ std::move(ioBuf), flags);
}
void destroy() override {
uint32_t partialBytes,
uint32_t bytesWritten,
unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags,
- BufferCallback* bufferCallback = nullptr)
- : AsyncSocket::WriteRequest(socket, callback, bufferCallback)
+ WriteFlags flags)
+ : AsyncSocket::WriteRequest(socket, callback)
, opCount_(opCount)
, opIndex_(0)
, flags_(flags)
}
void AsyncSocket::write(WriteCallback* callback,
- const void* buf, size_t bytes, WriteFlags flags,
- BufferCallback* bufCallback) {
+ const void* buf, size_t bytes, WriteFlags flags) {
iovec op;
op.iov_base = const_cast<void*>(buf);
op.iov_len = bytes;
- writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags, bufCallback);
+ writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writev(WriteCallback* callback,
const iovec* vec,
size_t count,
- WriteFlags flags,
- BufferCallback* bufCallback) {
- writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags, bufCallback);
+ WriteFlags flags) {
+ writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
- WriteFlags flags, BufferCallback* bufCallback) {
+ WriteFlags flags) {
constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
if (count <= kSmallSizeMax) {
iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
- writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
+ writeChainImpl(callback, vec, count, std::move(buf), flags);
} else {
iovec* vec = new iovec[count];
- writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
+ writeChainImpl(callback, vec, count, std::move(buf), flags);
delete[] vec;
}
}
void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
- size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags,
- BufferCallback* bufCallback) {
+ size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
size_t veclen = buf->fillIov(vec, count);
- writeImpl(callback, vec, veclen, std::move(buf), flags, bufCallback);
+ writeImpl(callback, vec, veclen, std::move(buf), flags);
}
void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
size_t count, unique_ptr<IOBuf>&& buf,
- WriteFlags flags, BufferCallback* bufCallback) {
+ WriteFlags flags) {
VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
<< ", callback=" << callback << ", count=" << count
<< ", state=" << state_;
callback->writeSuccess();
}
return;
- } else { // continue writing the next writeReq
- if (bufCallback) {
- bufCallback->onEgressBuffered();
- }
- }
+ } // else { continue writing the next writeReq }
mustRegister = true;
}
} else if (!connecting()) {
try {
req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
count - countWritten, partialWritten,
- bytesWritten, std::move(ioBuf), flags,
- bufCallback);
+ bytesWritten, std::move(ioBuf), flags);
} catch (const std::exception& ex) {
// we mainly expect to catch std::bad_alloc here
AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
}
// We'll continue around the loop, trying to write another request
} else {
- // Notify BufferCallback:
- BufferCallback* bufferCallback = writeReqHead_->getBufferCallback();
- if (bufferCallback) {
- bufferCallback->onEgressBuffered();
- }
// Partial write.
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
ReadCallback* getReadCallback() const override;
void write(WriteCallback* callback, const void* buf, size_t bytes,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override;
+ WriteFlags flags = WriteFlags::NONE) override;
void writev(WriteCallback* callback, const iovec* vec, size_t count,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override;
+ WriteFlags flags = WriteFlags::NONE) override;
void writeChain(WriteCallback* callback,
std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override;
+ WriteFlags flags = WriteFlags::NONE) override;
class WriteRequest;
virtual void writeRequest(WriteRequest* req);
*/
class WriteRequest {
public:
- WriteRequest(
- AsyncSocket* socket,
- WriteCallback* callback,
- BufferCallback* bufferCallback = nullptr) :
- socket_(socket), callback_(callback), bufferCallback_(bufferCallback) {}
+ WriteRequest(AsyncSocket* socket, WriteCallback* callback) :
+ socket_(socket), callback_(callback) {}
virtual void start() {};
socket_->appBytesWritten_ += count;
}
- BufferCallback* getBufferCallback() const {
- return bufferCallback_;
- }
-
protected:
// protected destructor, to ensure callers use destroy()
virtual ~WriteRequest() {}
WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest
WriteCallback* callback_; ///< completion callback
uint32_t totalBytesWritten_{0}; ///< total bytes written
- BufferCallback* bufferCallback_{nullptr};
};
protected:
/**
* Populate an iovec array from an IOBuf and attempt to write it.
*
- * @param callback Write completion/error callback.
- * @param vec Target iovec array; caller retains ownership.
- * @param count Number of IOBufs to write, beginning at start of buf.
- * @param buf Chain of iovecs.
- * @param flags set of flags for the underlying write calls, like cork
- * @param bufCallback Callback when egress data begins to buffer
+ * @param callback Write completion/error callback.
+ * @param vec Target iovec array; caller retains ownership.
+ * @param count Number of IOBufs to write, beginning at start of buf.
+ * @param buf Chain of iovecs.
+ * @param flags set of flags for the underlying write calls, like cork
*/
void writeChainImpl(WriteCallback* callback, iovec* vec,
size_t count, std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags, BufferCallback* bufCallback = nullptr);
+ WriteFlags flags);
/**
* Write as much data as possible to the socket without blocking,
* and queue up any leftover data to send when the socket can
* handle writes again.
*
- * @param callback The callback to invoke when the write is completed.
- * @param vec Array of buffers to write; this method will make a
- * copy of the vector (but not the buffers themselves)
- * if the write has to be completed asynchronously.
- * @param count Number of elements in vec.
- * @param buf The IOBuf that manages the buffers referenced by
- * vec, or a pointer to nullptr if the buffers are not
- * associated with an IOBuf. Note that ownership of
- * the IOBuf is transferred here; upon completion of
- * the write, the AsyncSocket deletes the IOBuf.
- * @param flags Set of write flags.
- * @param bufCallback Callback when egress data buffers up
+ * @param callback The callback to invoke when the write is completed.
+ * @param vec Array of buffers to write; this method will make a
+ * copy of the vector (but not the buffers themselves)
+ * if the write has to be completed asynchronously.
+ * @param count Number of elements in vec.
+ * @param buf The IOBuf that manages the buffers referenced by
+ * vec, or a pointer to nullptr if the buffers are not
+ * associated with an IOBuf. Note that ownership of
+ * the IOBuf is transferred here; upon completion of
+ * the write, the AsyncSocket deletes the IOBuf.
+ * @param flags Set of write flags.
*/
void writeImpl(WriteCallback* callback, const iovec* vec, size_t count,
std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr);
+ WriteFlags flags = WriteFlags::NONE);
/**
* Attempt to write to the socket.
class AsyncWriter {
public:
- class BufferCallback {
- public:
- virtual ~BufferCallback() {}
- virtual void onEgressBuffered() = 0;
- };
-
class WriteCallback {
public:
virtual ~WriteCallback() = default;
// Write methods that aren't part of AsyncTransport
virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) = 0;
+ WriteFlags flags = WriteFlags::NONE) = 0;
virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) = 0;
+ WriteFlags flags = WriteFlags::NONE) = 0;
virtual void writeChain(WriteCallback* callback,
std::unique_ptr<IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) = 0;
+ WriteFlags flags = WriteFlags::NONE) = 0;
protected:
virtual ~AsyncWriter() = default;
// to keep compatibility.
using ReadCallback = AsyncReader::ReadCallback;
using WriteCallback = AsyncWriter::WriteCallback;
- using BufferCallback = AsyncWriter::BufferCallback;
virtual void setReadCB(ReadCallback* callback) override = 0;
virtual ReadCallback* getReadCallback() const override = 0;
virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override = 0;
+ WriteFlags flags = WriteFlags::NONE) override = 0;
virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override = 0;
+ WriteFlags flags = WriteFlags::NONE) override = 0;
virtual void writeChain(WriteCallback* callback,
std::unique_ptr<IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE,
- BufferCallback* bufCallback = nullptr) override = 0;
+ WriteFlags flags = WriteFlags::NONE) override = 0;
/**
* The transport wrapper may wrap another transport. This returns the
* transport that is wrapped. It returns nullptr if there is no wrapped
VoidCallback errorCallback;
};
-class BufferCallback : public AsyncTransportWrapper::BufferCallback {
- public:
- BufferCallback()
- : buffered_(false) {}
-
- void onEgressBuffered() override {
- buffered_ = true;
- }
-
- bool hasBuffered() const {
- return buffered_;
- }
-
- private:
- bool buffered_{false};
-};
-
class WriteCallback : public AsyncTransportWrapper::WriteCallback {
public:
WriteCallback()
eventBase.loop();
}
-
-TEST(AsyncSocketTest, BufferTest) {
- TestServer server;
-
- EventBase evb;
- AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
- std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
- ConnCallback ccb;
- socket->connect(&ccb, server.getAddress(), 30, option);
-
-
- char buf[100 * 1024];
- memset(buf, 'c', sizeof(buf));
- WriteCallback wcb;
- BufferCallback bcb;
- socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE, &bcb);
-
- evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
-
- ASSERT_TRUE(bcb.hasBuffered());
-
- socket->close();
- server.verifyConnection(buf, sizeof(buf));
-
- ASSERT_TRUE(socket->isClosedBySelf());
- ASSERT_FALSE(socket->isClosedByPeer());
-}
MOCK_METHOD1(setReadCB, void(ReadCallback*));
MOCK_CONST_METHOD0(getReadCallback, ReadCallback*());
MOCK_CONST_METHOD0(getReadCB, ReadCallback*());
- MOCK_METHOD5(write, void(WriteCallback*,
- const void*, size_t,
- WriteFlags,
- BufferCallback*));
- MOCK_METHOD5(writev, void(WriteCallback*,
- const iovec*, size_t,
- WriteFlags,
- BufferCallback*));
- MOCK_METHOD4(writeChain,
- void(WriteCallback*,
- std::shared_ptr<folly::IOBuf>,
- WriteFlags,
- BufferCallback*));
-
+ MOCK_METHOD4(write, void(WriteCallback*, const void*, size_t, WriteFlags));
+ MOCK_METHOD4(writev, void(WriteCallback*, const iovec*, size_t, WriteFlags));
+ MOCK_METHOD3(writeChain,
+ void(WriteCallback*, std::shared_ptr<folly::IOBuf>, WriteFlags));
void writeChain(WriteCallback* callback,
std::unique_ptr<folly::IOBuf>&& iob,
- WriteFlags flags =
- WriteFlags::NONE,
- BufferCallback* bufCB = nullptr) override {
- writeChain(
- callback,
- std::shared_ptr<folly::IOBuf>(iob.release()),
- flags,
- bufCB);
+ WriteFlags flags = WriteFlags::NONE) override {
+ writeChain(callback, std::shared_ptr<folly::IOBuf>(iob.release()), flags);
}
MOCK_METHOD0(close, void());