2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncPipe.h>
18 #include <folly/FileUtil.h>
19 #include <folly/io/async/AsyncSocketException.h>
22 using std::unique_ptr;
24 using folly::IOBufQueue;
28 AsyncPipeReader::~AsyncPipeReader() {
32 void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33 VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
34 "): failed while reading: " << ex.what();
36 DCHECK(readCallback_ != nullptr);
37 AsyncReader::ReadCallback* callback = readCallback_;
38 readCallback_ = nullptr;
39 callback->readErr(ex);
43 void AsyncPipeReader::close() {
57 void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58 DestructorGuard dg(this);
59 CHECK(events & EventHandler::READ);
61 VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62 assert(readCallback_ != nullptr);
64 while (readCallback_) {
65 // Get the buffer to read into.
69 readCallback_->getReadBuffer(&buf, &buflen);
70 } catch (const std::exception& ex) {
71 AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
72 string("ReadCallback::getReadBuffer() "
73 "threw exception: ") + ex.what());
77 AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
78 string("ReadCallback::getReadBuffer() "
79 "threw non-exception type"));
83 if (buf == nullptr || buflen == 0) {
84 AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
85 string("ReadCallback::getReadBuffer() "
86 "returned empty buffer"));
92 ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
94 readCallback_->readDataAvailable(bytesRead);
95 // Fall through and continue around the loop if the read
96 // completely filled the available buffer.
97 // Note that readCallback_ may have been uninstalled or changed inside
98 // readDataAvailable().
99 if (static_cast<size_t>(bytesRead) < buflen) {
102 } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
103 // No more data to read right now.
105 } else if (bytesRead < 0) {
106 AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
107 "read failed", errno);
111 assert(bytesRead == 0);
115 AsyncReader::ReadCallback* callback = readCallback_;
116 readCallback_ = nullptr;
120 // Max reads per loop?
125 void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
126 AsyncWriter::WriteCallback* callback) {
129 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
130 "attempt to write to closed pipe");
131 callback->writeErr(0, ex);
135 bool wasEmpty = (queue_.empty());
136 folly::IOBufQueue iobq;
137 iobq.append(std::move(buf));
138 std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
139 std::move(iobq), callback);
140 queue_.emplace_back(std::move(p));
144 CHECK(!queue_.empty());
145 CHECK(isHandlerRegistered());
149 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
150 std::unique_ptr<folly::IOBuf>&& buf,
152 write(std::move(buf), callback);
155 void AsyncPipeWriter::closeOnEmpty() {
156 VLOG(5) << "close on empty";
157 if (queue_.empty()) {
160 closeOnEmpty_ = true;
161 CHECK(isHandlerRegistered());
165 void AsyncPipeWriter::closeNow() {
166 VLOG(5) << "close now";
167 if (!queue_.empty()) {
168 failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
169 "closed with pending writes"));
183 void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
184 DestructorGuard dg(this);
185 while (!queue_.empty()) {
186 // the first entry of the queue could have had a partial write, but needs to
188 if (queue_.front().second) {
189 queue_.front().second->writeErr(0, ex);
196 void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
197 CHECK(events & EventHandler::WRITE);
202 void AsyncPipeWriter::handleWrite() {
203 DestructorGuard dg(this);
204 assert(!queue_.empty());
206 auto& front = queue_.front();
207 folly::IOBufQueue &curQueue = front.first;
208 DCHECK(!curQueue.empty());
209 // someday, support writev. The logic for partial writes is a bit complex
210 const IOBuf* head = curQueue.front();
211 CHECK(head->length());
212 ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
214 if (errno == EAGAIN || errno == EWOULDBLOCK) {
216 VLOG(5) << "write blocked";
217 registerHandler(EventHandler::WRITE);
220 failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
221 "write failed", errno));
225 } else if (rc == 0) {
226 registerHandler(EventHandler::WRITE);
229 curQueue.trimStart(rc);
230 if (curQueue.empty()) {
231 auto cb = front.second;
237 VLOG(5) << "partial write blocked";
239 } while (!queue_.empty());