2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncPipe.h>
18 #include <folly/FileUtil.h>
19 #include <folly/io/async/AsyncSocketException.h>
22 using std::unique_ptr;
24 using folly::IOBufQueue;
28 AsyncPipeReader::~AsyncPipeReader() {
32 void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33 VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
34 "): failed while reading: " << ex.what();
36 DCHECK(readCallback_ != nullptr);
37 AsyncReader::ReadCallback* callback = readCallback_;
38 readCallback_ = nullptr;
39 callback->readErr(ex);
43 void AsyncPipeReader::close() {
57 void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58 DestructorGuard dg(this);
59 CHECK(events & EventHandler::READ);
61 VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62 assert(readCallback_ != nullptr);
64 while (readCallback_) {
65 // - What API does callback support?
66 const auto movable = readCallback_->isBufferMovable(); // noexcept
68 // Get the buffer to read into.
71 std::unique_ptr<IOBuf> ioBuf;
74 ioBuf = IOBuf::create(readCallback_->maxBufferSize());
75 buf = ioBuf->writableBuffer();
76 buflen = ioBuf->capacity();
79 readCallback_->getReadBuffer(&buf, &buflen);
80 } catch (const std::exception& ex) {
81 AsyncSocketException aex(
82 AsyncSocketException::BAD_ARGS,
83 string("ReadCallback::getReadBuffer() "
84 "threw exception: ") +
89 AsyncSocketException aex(
90 AsyncSocketException::BAD_ARGS,
91 string("ReadCallback::getReadBuffer() "
92 "threw non-exception type"));
96 if (buf == nullptr || buflen == 0) {
97 AsyncSocketException aex(
98 AsyncSocketException::INVALID_STATE,
99 string("ReadCallback::getReadBuffer() "
100 "returned empty buffer"));
107 ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
111 ioBuf->append(bytesRead);
112 readCallback_->readBufferAvailable(std::move(ioBuf));
114 readCallback_->readDataAvailable(bytesRead);
116 // Fall through and continue around the loop if the read
117 // completely filled the available buffer.
118 // Note that readCallback_ may have been uninstalled or changed inside
119 // readDataAvailable().
120 if (static_cast<size_t>(bytesRead) < buflen) {
123 } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
124 // No more data to read right now.
126 } else if (bytesRead < 0) {
127 AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
128 "read failed", errno);
132 assert(bytesRead == 0);
136 AsyncReader::ReadCallback* callback = readCallback_;
137 readCallback_ = nullptr;
141 // Max reads per loop?
146 void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
147 AsyncWriter::WriteCallback* callback) {
150 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
151 "attempt to write to closed pipe");
152 callback->writeErr(0, ex);
156 bool wasEmpty = (queue_.empty());
157 folly::IOBufQueue iobq;
158 iobq.append(std::move(buf));
159 std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
160 std::move(iobq), callback);
161 queue_.emplace_back(std::move(p));
165 CHECK(!queue_.empty());
166 CHECK(isHandlerRegistered());
170 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
171 std::unique_ptr<folly::IOBuf>&& buf,
173 write(std::move(buf), callback);
176 void AsyncPipeWriter::closeOnEmpty() {
177 VLOG(5) << "close on empty";
178 if (queue_.empty()) {
181 closeOnEmpty_ = true;
182 CHECK(isHandlerRegistered());
186 void AsyncPipeWriter::closeNow() {
187 VLOG(5) << "close now";
188 if (!queue_.empty()) {
189 failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
190 "closed with pending writes"));
204 void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
205 DestructorGuard dg(this);
206 while (!queue_.empty()) {
207 // the first entry of the queue could have had a partial write, but needs to
209 if (queue_.front().second) {
210 queue_.front().second->writeErr(0, ex);
217 void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
218 CHECK(events & EventHandler::WRITE);
223 void AsyncPipeWriter::handleWrite() {
224 DestructorGuard dg(this);
225 assert(!queue_.empty());
227 auto& front = queue_.front();
228 folly::IOBufQueue &curQueue = front.first;
229 DCHECK(!curQueue.empty());
230 // someday, support writev. The logic for partial writes is a bit complex
231 const IOBuf* head = curQueue.front();
232 CHECK(head->length());
233 ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
235 if (errno == EAGAIN || errno == EWOULDBLOCK) {
237 VLOG(5) << "write blocked";
238 registerHandler(EventHandler::WRITE);
241 failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
242 "write failed", errno));
246 } else if (rc == 0) {
247 registerHandler(EventHandler::WRITE);
250 curQueue.trimStart(rc);
251 if (curQueue.empty()) {
252 auto cb = front.second;
258 VLOG(5) << "partial write blocked";
260 } while (!queue_.empty());