From: Andre Pinto Date: Tue, 29 Sep 2015 21:13:54 +0000 (-0700) Subject: Open source AsyncPipe X-Git-Tag: deprecate-dynamic-initializer~372 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=2773b36d0fc36a9e2ca04a7600526086d3b4c0a2;p=folly.git Open source AsyncPipe Summary: AsyncPipeReader and AsyncPipeWriter are classes to asynchronously read and write to pipes. Reviewed By: @djwatson Differential Revision: D2479514 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 2d90adde..a4e2e483 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -186,6 +186,7 @@ nobase_follyinclude_HEADERS = \ io/RecordIO-inl.h \ io/TypedIOBuf.h \ io/ShutdownSocketSet.h \ + io/async/AsyncPipe.h \ io/async/AsyncTimeout.h \ io/async/AsyncTransport.h \ io/async/AsyncUDPServerSocket.h \ @@ -336,6 +337,7 @@ libfolly_la_SOURCES = \ io/IOBufQueue.cpp \ io/RecordIO.cpp \ io/ShutdownSocketSet.cpp \ + io/async/AsyncPipe.cpp \ io/async/AsyncTimeout.cpp \ io/async/AsyncUDPSocket.cpp \ io/async/AsyncServerSocket.cpp \ diff --git a/folly/io/async/AsyncPipe.cpp b/folly/io/async/AsyncPipe.cpp new file mode 100644 index 00000000..7937e400 --- /dev/null +++ b/folly/io/async/AsyncPipe.cpp @@ -0,0 +1,249 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +using std::string; +using std::unique_ptr; +using folly::IOBuf; +using folly::IOBufQueue; + +namespace folly { + +AsyncPipeReader::~AsyncPipeReader() { + close(); +} + +void AsyncPipeReader::failRead(const AsyncSocketException& ex) { + VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ << + "): failed while reading: " << ex.what(); + + DCHECK(readCallback_ != nullptr); + AsyncReader::ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readErr(ex); + close(); +} + +void AsyncPipeReader::close() { + unregisterHandler(); + if (fd_ >= 0) { + changeHandlerFD(-1); + + if (closeCb_) { + closeCb_(fd_); + } else { + ::close(fd_); + } + fd_ = -1; + } +} + +void AsyncPipeReader::handlerReady(uint16_t events) noexcept { + DestructorGuard dg(this); + CHECK(events & EventHandler::READ); + + VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_; + assert(readCallback_ != nullptr); + + uint16_t numReads = 0; + while (readCallback_) { + // Get the buffer to read into. + void* buf = nullptr; + size_t buflen = 0; + try { + readCallback_->getReadBuffer(&buf, &buflen); + } catch (const std::exception& ex) { + AsyncSocketException aex(AsyncSocketException::BAD_ARGS, + string("ReadCallback::getReadBuffer() " + "threw exception: ") + ex.what()); + failRead(aex); + return; + } catch (...) { + AsyncSocketException ex(AsyncSocketException::BAD_ARGS, + string("ReadCallback::getReadBuffer() " + "threw non-exception type")); + failRead(ex); + return; + } + if (buf == nullptr || buflen == 0) { + AsyncSocketException ex(AsyncSocketException::INVALID_STATE, + string("ReadCallback::getReadBuffer() " + "returned empty buffer")); + failRead(ex); + return; + } + + // Perform the read + ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen); + if (bytesRead > 0) { + readCallback_->readDataAvailable(bytesRead); + // Fall through and continue around the loop if the read + // completely filled the available buffer. + // Note that readCallback_ may have been uninstalled or changed inside + // readDataAvailable(). + if (static_cast(bytesRead) < buflen) { + return; + } + } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // No more data to read right now. + return; + } else if (bytesRead < 0) { + AsyncSocketException ex(AsyncSocketException::INVALID_STATE, + "read failed", errno); + failRead(ex); + return; + } else { + assert(bytesRead == 0); + // EOF + + unregisterHandler(); + AsyncReader::ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readEOF(); + return; + } + // Max reads per loop? + } +} + + +void AsyncPipeWriter::write(unique_ptr buf, + AsyncWriter::WriteCallback* callback) { + if (closed()) { + if (callback) { + AsyncSocketException ex(AsyncSocketException::NOT_OPEN, + "attempt to write to closed pipe"); + callback->writeErr(0, ex); + } + return; + } + bool wasEmpty = (queue_.empty()); + folly::IOBufQueue iobq; + iobq.append(std::move(buf)); + std::pair p( + std::move(iobq), callback); + queue_.emplace_back(std::move(p)); + if (wasEmpty) { + handleWrite(); + } else { + CHECK(!queue_.empty()); + CHECK(isHandlerRegistered()); + } +} + +void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback, + std::unique_ptr&& buf, + WriteFlags) { + write(std::move(buf), callback); +} + +void AsyncPipeWriter::closeOnEmpty() { + VLOG(5) << "close on empty"; + if (queue_.empty()) { + closeNow(); + } else { + closeOnEmpty_ = true; + CHECK(isHandlerRegistered()); + } +} + +void AsyncPipeWriter::closeNow() { + VLOG(5) << "close now"; + if (!queue_.empty()) { + failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN, + "closed with pending writes")); + } + if (fd_ >= 0) { + unregisterHandler(); + changeHandlerFD(-1); + if (closeCb_) { + closeCb_(fd_); + } else { + close(fd_); + } + fd_ = -1; + } +} + +void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) { + DestructorGuard dg(this); + while (!queue_.empty()) { + // the first entry of the queue could have had a partial write, but needs to + // be tracked. + if (queue_.front().second) { + queue_.front().second->writeErr(0, ex); + } + queue_.pop_front(); + } +} + + +void AsyncPipeWriter::handlerReady(uint16_t events) noexcept { + CHECK(events & EventHandler::WRITE); + + handleWrite(); +} + +void AsyncPipeWriter::handleWrite() { + DestructorGuard dg(this); + assert(!queue_.empty()); + do { + auto& front = queue_.front(); + folly::IOBufQueue &curQueue = front.first; + DCHECK(!curQueue.empty()); + // someday, support writev. The logic for partial writes is a bit complex + const IOBuf* head = curQueue.front(); + CHECK(head->length()); + ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length()); + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // pipe is full + VLOG(5) << "write blocked"; + registerHandler(EventHandler::WRITE); + return; + } else { + failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + "write failed", errno)); + closeNow(); + return; + } + } else if (rc == 0) { + registerHandler(EventHandler::WRITE); + return; + } + curQueue.trimStart(rc); + if (curQueue.empty()) { + auto cb = front.second; + queue_.pop_front(); + if (cb) { + cb->writeSuccess(); + } + } else { + VLOG(5) << "partial write blocked"; + } + } while (!queue_.empty()); + + if (closeOnEmpty_) { + closeNow(); + } else { + unregisterHandler(); + } +} + +} // folly diff --git a/folly/io/async/AsyncPipe.h b/folly/io/async/AsyncPipe.h new file mode 100644 index 00000000..63e73acb --- /dev/null +++ b/folly/io/async/AsyncPipe.h @@ -0,0 +1,167 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace folly { + +class AsyncSocketException; + +/** + * Read from a pipe in an async manner. + */ +class AsyncPipeReader : public EventHandler, + public AsyncReader, + public DelayedDestruction { + public: + typedef std::unique_ptr UniquePtr; + + AsyncPipeReader(folly::EventBase* eventBase, int pipeFd) + : EventHandler(eventBase, pipeFd), + fd_(pipeFd) {} + + /** + * Set the read callback and automatically install/uninstall the handler + * for events. + */ + void setReadCB(AsyncReader::ReadCallback* callback) override { + if (callback == readCallback_) { + return; + } + readCallback_ = callback; + if (readCallback_ && !isHandlerRegistered()) { + registerHandler(EventHandler::READ | EventHandler::PERSIST); + } else if (!readCallback_ && isHandlerRegistered()) { + unregisterHandler(); + } + } + + /** + * Get the read callback + */ + AsyncReader::ReadCallback* getReadCallback() const override { + return readCallback_; + } + + /** + * Set a special hook to close the socket (otherwise, will call close()) + */ + void setCloseCallback(std::function closeCb) { + closeCb_ = closeCb; + } + + private: + ~AsyncPipeReader(); + + void handlerReady(uint16_t events) noexcept override; + void failRead(const AsyncSocketException& ex); + void close(); + + int fd_; + AsyncReader::ReadCallback* readCallback_{nullptr}; + std::function closeCb_; +}; + +/** + * Write to a pipe in an async manner. + */ +class AsyncPipeWriter : public EventHandler, + public AsyncWriter, + public DelayedDestruction { + public: + typedef std::unique_ptr UniquePtr; + + AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd) + : EventHandler(eventBase, pipeFd), + fd_(pipeFd) {} + + /** + * Asynchronously write the given iobuf to this pipe, and invoke the callback + * on success/error. + */ + void write(std::unique_ptr iob, + AsyncWriter::WriteCallback* wcb = nullptr); + + /** + * Set a special hook to close the socket (otherwise, will call close()) + */ + void setCloseCallback(std::function closeCb) { + closeCb_ = closeCb; + } + + /** + * Returns true if the pipe is closed + */ + bool closed() const { + return (fd_ < 0 || closeOnEmpty_); + } + + /** + * Notify the pipe to close as soon as all pending writes complete + */ + void closeOnEmpty(); + + /** + * Close the pipe immediately, and fail all pending writes + */ + void closeNow(); + + /** + * Return true if there are currently writes pending (eg: the pipe is blocked + * for writing) + */ + bool hasPendingWrites() const { + return !queue_.empty(); + } + + // AsyncWriter methods + void write(folly::AsyncWriter::WriteCallback* callback, const void* buf, + size_t bytes, WriteFlags flags = WriteFlags::NONE) override { + writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags); + } + void writev(folly::AsyncWriter::WriteCallback*, const iovec*, + size_t, WriteFlags = WriteFlags::NONE) override { + throw std::runtime_error("writev is not supported. Please use writeChain."); + } + void writeChain(folly::AsyncWriter::WriteCallback* callback, + std::unique_ptr&& buf, + WriteFlags flags = WriteFlags::NONE) override; + + private: + void handlerReady(uint16_t events) noexcept override; + void handleWrite(); + void failAllWrites(const AsyncSocketException& ex); + + int fd_; + std::list> queue_; + bool closeOnEmpty_{false}; + std::function closeCb_; + + ~AsyncPipeWriter() { + closeNow(); + } +}; + +} // folly diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index b45ed96d..e4a6c29b 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -331,12 +331,8 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { virtual ~AsyncTransport() = default; }; -// Transitional intermediate interface. This is deprecated. -// Wrapper around folly::AsyncTransport, that includes read/write callbacks -class AsyncTransportWrapper : virtual public AsyncTransport { +class AsyncReader { public: - typedef std::unique_ptr UniquePtr; - class ReadCallback { public: virtual ~ReadCallback() = default; @@ -453,6 +449,16 @@ class AsyncTransportWrapper : virtual public AsyncTransport { virtual void readErr(const AsyncSocketException& ex) noexcept = 0; }; + // Read methods that aren't part of AsyncTransport. + virtual void setReadCB(ReadCallback* callback) = 0; + virtual ReadCallback* getReadCallback() const = 0; + + protected: + virtual ~AsyncReader() = default; +}; + +class AsyncWriter { + public: class WriteCallback { public: virtual ~WriteCallback() = default; @@ -480,10 +486,7 @@ class AsyncTransportWrapper : virtual public AsyncTransport { const AsyncSocketException& ex) noexcept = 0; }; - // Read/write methods that aren't part of AsyncTransport - virtual void setReadCB(ReadCallback* callback) = 0; - virtual ReadCallback* getReadCallback() const = 0; - + // Write methods that aren't part of AsyncTransport virtual void write(WriteCallback* callback, const void* buf, size_t bytes, WriteFlags flags = WriteFlags::NONE) = 0; virtual void writev(WriteCallback* callback, const iovec* vec, size_t count, @@ -491,6 +494,32 @@ class AsyncTransportWrapper : virtual public AsyncTransport { virtual void writeChain(WriteCallback* callback, std::unique_ptr&& buf, WriteFlags flags = WriteFlags::NONE) = 0; + + protected: + virtual ~AsyncWriter() = default; +}; + +// Transitional intermediate interface. This is deprecated. +// Wrapper around folly::AsyncTransport, that includes read/write callbacks +class AsyncTransportWrapper : virtual public AsyncTransport, + virtual public AsyncReader, + virtual public AsyncWriter { + public: + using UniquePtr = std::unique_ptr; + + // Alias for inherited members from AsyncReader and AsyncWriter + // to keep compatibility. + using ReadCallback = AsyncReader::ReadCallback; + using WriteCallback = AsyncWriter::WriteCallback; + virtual void setReadCB(ReadCallback* callback) override = 0; + virtual ReadCallback* getReadCallback() const override = 0; + virtual void write(WriteCallback* callback, const void* buf, size_t bytes, + WriteFlags flags = WriteFlags::NONE) override = 0; + virtual void writev(WriteCallback* callback, const iovec* vec, size_t count, + WriteFlags flags = WriteFlags::NONE) override = 0; + virtual void writeChain(WriteCallback* callback, + std::unique_ptr&& buf, + WriteFlags flags = WriteFlags::NONE) override = 0; }; } // folly diff --git a/folly/io/async/README.md b/folly/io/async/README.md index ce61f64e..ed03085d 100644 --- a/folly/io/async/README.md +++ b/folly/io/async/README.md @@ -256,8 +256,6 @@ clarity, we don't reuse the same fd as a socket to receive signals. ### AsyncPipe -TODO: not currently open souce - Async reads/writes to a unix pipe, to send data between processes. Why don't you just use AsyncSocket for now? @@ -353,4 +351,4 @@ Some best practices we've found: notify of overload, such as timeouts, or CPU usage. For sync systems, you are almost always limited by the number of threads. For more details see [No Time for - Asynchrony](https://www.usenix.org/legacy/event/hotos09/tech/full_papers/aguilera/aguilera.pdf) \ No newline at end of file + Asynchrony](https://www.usenix.org/legacy/event/hotos09/tech/full_papers/aguilera/aguilera.pdf) diff --git a/folly/io/async/test/AsyncPipeTest.cpp b/folly/io/async/test/AsyncPipeTest.cpp new file mode 100644 index 00000000..750e9aa9 --- /dev/null +++ b/folly/io/async/test/AsyncPipeTest.cpp @@ -0,0 +1,144 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +using namespace testing; + +namespace { + +class TestReadCallback : public folly::AsyncReader::ReadCallback { + public: + void readDataAvailable(size_t len) noexcept override { + readBuffer_.postallocate(len); + } + + void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override { + auto res = readBuffer_.preallocate(4000, 65000); + *bufReturn = res.first; + *lenReturn = res.second; + } + + void readEOF() noexcept override {} + + void readErr(const folly::AsyncSocketException&) noexcept override { + error_ = true; + } + + std::string getData() { + auto buf = readBuffer_.move(); + buf->coalesce(); + return std::string((char *)buf->data(), buf->length()); + } + + folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()}; + bool error_{false}; +}; + +class TestWriteCallback : public folly::AsyncWriter::WriteCallback { + public: + void writeSuccess() noexcept override { writes_++; } + + void writeErr(size_t, const folly::AsyncSocketException&) noexcept override { + error_ = true; + } + + uint32_t writes_{0}; + bool error_{false}; +}; + +class AsyncPipeTest: public Test { + public: + void SetUp() override { + int rc = pipe(pipeFds_); + EXPECT_EQ(rc, 0); + + EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0); + EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0); + reader_ = folly::make_unique( + &eventBase_, pipeFds_[0]); + writer_ = folly::make_unique( + &eventBase_, pipeFds_[1]); + } + + protected: + folly::EventBase eventBase_; + int pipeFds_[2]; + folly::AsyncPipeReader::UniquePtr reader_; + folly::AsyncPipeWriter::UniquePtr writer_; + TestReadCallback readCallback_; + TestWriteCallback writeCallback_; +}; + +std::unique_ptr getBuf(const std::string& data) { + auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length()); + return buf; +} + +} // anonymous namespace + + +TEST_F(AsyncPipeTest, simple) { + reader_->setReadCB(&readCallback_); + writer_->write(getBuf("hello"), &writeCallback_); + writer_->closeOnEmpty(); + eventBase_.loop(); + EXPECT_EQ(readCallback_.getData(), "hello"); + EXPECT_FALSE(readCallback_.error_); + EXPECT_EQ(writeCallback_.writes_, 1); + EXPECT_FALSE(writeCallback_.error_); +} + +TEST_F(AsyncPipeTest, blocked_writes) { + uint32_t writeAttempts = 0; + do { + ++writeAttempts; + writer_->write(getBuf("hello"), &writeCallback_); + } while (writeCallback_.writes_ == writeAttempts); + // there is one blocked write + writer_->closeOnEmpty(); + + reader_->setReadCB(&readCallback_); + + eventBase_.loop(); + std::string expected; + for (uint32_t i = 0; i < writeAttempts; i++) { + expected += "hello"; + } + EXPECT_EQ(readCallback_.getData(), expected); + EXPECT_FALSE(readCallback_.error_); + EXPECT_EQ(writeCallback_.writes_, writeAttempts); + EXPECT_FALSE(writeCallback_.error_); +} + +TEST_F(AsyncPipeTest, writeOnClose) { + reader_->setReadCB(&readCallback_); + writer_->write(getBuf("hello"), &writeCallback_); + writer_->closeOnEmpty(); + writer_->write(getBuf("hello"), &writeCallback_); + eventBase_.loop(); + EXPECT_EQ(readCallback_.getData(), "hello"); + EXPECT_FALSE(readCallback_.error_); + EXPECT_EQ(writeCallback_.writes_, 1); + EXPECT_TRUE(writeCallback_.error_); +}