2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/io/async/AsyncTransport.h>
19 #include <folly/io/async/EventHandler.h>
20 #include <folly/io/async/DelayedDestruction.h>
21 #include <folly/io/IOBufQueue.h>
24 #include <system_error>
28 class AsyncSocketException;
31 * Read from a pipe in an async manner.
33 class AsyncPipeReader : public EventHandler,
35 public DelayedDestruction {
37 typedef std::unique_ptr<AsyncPipeReader,
38 folly::DelayedDestruction::Destructor> UniquePtr;
40 template <typename... Args>
41 static UniquePtr newReader(Args&&... args) {
42 return UniquePtr(new AsyncPipeReader(std::forward<Args>(args)...));
45 AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
46 : EventHandler(eventBase, pipeFd),
50 * Set the read callback and automatically install/uninstall the handler
53 void setReadCB(AsyncReader::ReadCallback* callback) override {
54 if (callback == readCallback_) {
57 readCallback_ = callback;
58 if (readCallback_ && !isHandlerRegistered()) {
59 registerHandler(EventHandler::READ | EventHandler::PERSIST);
60 } else if (!readCallback_ && isHandlerRegistered()) {
66 * Get the read callback
68 AsyncReader::ReadCallback* getReadCallback() const override {
73 * Set a special hook to close the socket (otherwise, will call close())
75 void setCloseCallback(std::function<void(int)> closeCb) {
82 void handlerReady(uint16_t events) noexcept override;
83 void failRead(const AsyncSocketException& ex);
87 AsyncReader::ReadCallback* readCallback_{nullptr};
88 std::function<void(int)> closeCb_;
92 * Write to a pipe in an async manner.
94 class AsyncPipeWriter : public EventHandler,
96 public DelayedDestruction {
98 typedef std::unique_ptr<AsyncPipeWriter,
99 folly::DelayedDestruction::Destructor> UniquePtr;
101 template <typename... Args>
102 static UniquePtr newWriter(Args&&... args) {
103 return UniquePtr(new AsyncPipeWriter(std::forward<Args>(args)...));
106 AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
107 : EventHandler(eventBase, pipeFd),
111 * Asynchronously write the given iobuf to this pipe, and invoke the callback
114 void write(std::unique_ptr<folly::IOBuf> iob,
115 AsyncWriter::WriteCallback* wcb = nullptr);
118 * Set a special hook to close the socket (otherwise, will call close())
120 void setCloseCallback(std::function<void(int)> closeCb) {
125 * Returns true if the pipe is closed
127 bool closed() const {
128 return (fd_ < 0 || closeOnEmpty_);
132 * Notify the pipe to close as soon as all pending writes complete
137 * Close the pipe immediately, and fail all pending writes
142 * Return true if there are currently writes pending (eg: the pipe is blocked
145 bool hasPendingWrites() const {
146 return !queue_.empty();
149 // AsyncWriter methods
150 void write(folly::AsyncWriter::WriteCallback* callback,
153 WriteFlags flags = WriteFlags::NONE) override {
154 writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
156 void writev(folly::AsyncWriter::WriteCallback*,
159 WriteFlags = WriteFlags::NONE) override {
160 throw std::runtime_error("writev is not supported. Please use writeChain.");
162 void writeChain(folly::AsyncWriter::WriteCallback* callback,
163 std::unique_ptr<folly::IOBuf>&& buf,
164 WriteFlags flags = WriteFlags::NONE) override;
167 void handlerReady(uint16_t events) noexcept override;
169 void failAllWrites(const AsyncSocketException& ex);
172 std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
173 bool closeOnEmpty_{false};
174 std::function<void(int)> closeCb_;