2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <system_error>
22 #include <folly/io/IOBufQueue.h>
23 #include <folly/io/async/AsyncTransport.h>
24 #include <folly/io/async/DelayedDestruction.h>
25 #include <folly/io/async/EventHandler.h>
29 class AsyncSocketException;
32 * Read from a pipe in an async manner.
34 class AsyncPipeReader : public EventHandler,
36 public DelayedDestruction {
38 typedef std::unique_ptr<AsyncPipeReader,
39 folly::DelayedDestruction::Destructor> UniquePtr;
41 template <typename... Args>
42 static UniquePtr newReader(Args&&... args) {
43 return UniquePtr(new AsyncPipeReader(std::forward<Args>(args)...));
46 AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
47 : EventHandler(eventBase, pipeFd),
51 * Set the read callback and automatically install/uninstall the handler
54 void setReadCB(AsyncReader::ReadCallback* callback) override {
55 if (callback == readCallback_) {
58 readCallback_ = callback;
59 if (readCallback_ && !isHandlerRegistered()) {
60 registerHandler(EventHandler::READ | EventHandler::PERSIST);
61 } else if (!readCallback_ && isHandlerRegistered()) {
67 * Get the read callback
69 AsyncReader::ReadCallback* getReadCallback() const override {
74 * Set a special hook to close the socket (otherwise, will call close())
76 void setCloseCallback(std::function<void(int)> closeCb) {
81 ~AsyncPipeReader() override;
83 void handlerReady(uint16_t events) noexcept override;
84 void failRead(const AsyncSocketException& ex);
88 AsyncReader::ReadCallback* readCallback_{nullptr};
89 std::function<void(int)> closeCb_;
93 * Write to a pipe in an async manner.
95 class AsyncPipeWriter : public EventHandler,
97 public DelayedDestruction {
99 typedef std::unique_ptr<AsyncPipeWriter,
100 folly::DelayedDestruction::Destructor> UniquePtr;
102 template <typename... Args>
103 static UniquePtr newWriter(Args&&... args) {
104 return UniquePtr(new AsyncPipeWriter(std::forward<Args>(args)...));
107 AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
108 : EventHandler(eventBase, pipeFd),
112 * Asynchronously write the given iobuf to this pipe, and invoke the callback
115 void write(std::unique_ptr<folly::IOBuf> iob,
116 AsyncWriter::WriteCallback* wcb = nullptr);
119 * Set a special hook to close the socket (otherwise, will call close())
121 void setCloseCallback(std::function<void(int)> closeCb) {
126 * Returns true if the pipe is closed
128 bool closed() const {
129 return (fd_ < 0 || closeOnEmpty_);
133 * Notify the pipe to close as soon as all pending writes complete
138 * Close the pipe immediately, and fail all pending writes
143 * Return true if there are currently writes pending (eg: the pipe is blocked
146 bool hasPendingWrites() const {
147 return !queue_.empty();
150 // AsyncWriter methods
151 void write(folly::AsyncWriter::WriteCallback* callback,
154 WriteFlags flags = WriteFlags::NONE) override {
155 writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
157 void writev(folly::AsyncWriter::WriteCallback*,
160 WriteFlags = WriteFlags::NONE) override {
161 throw std::runtime_error("writev is not supported. Please use writeChain.");
163 void writeChain(folly::AsyncWriter::WriteCallback* callback,
164 std::unique_ptr<folly::IOBuf>&& buf,
165 WriteFlags flags = WriteFlags::NONE) override;
168 void handlerReady(uint16_t events) noexcept override;
170 void failAllWrites(const AsyncSocketException& ex);
173 std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
174 bool closeOnEmpty_{false};
175 std::function<void(int)> closeCb_;
177 ~AsyncPipeWriter() override {