--- /dev/null
+/*
+ * Copyright 2004-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/logging/AsyncFileWriter.h>
+
+#include <folly/Exception.h>
+#include <folly/FileUtil.h>
+#include <folly/experimental/logging/LoggerDB.h>
+
+using folly::File;
+using folly::StringPiece;
+
+namespace folly {
+
+AsyncFileWriter::AsyncFileWriter(StringPiece path)
+ : AsyncFileWriter{File{path.str(), O_WRONLY | O_APPEND | O_CREAT}} {}
+
+AsyncFileWriter::AsyncFileWriter(folly::File&& file)
+ : file_{std::move(file)}, ioThread_([this] { ioThread(); }) {}
+
+AsyncFileWriter::~AsyncFileWriter() {
+ data_->stop = true;
+ messageReady_.notify_one();
+ ioThread_.join();
+}
+
+void AsyncFileWriter::writeMessage(StringPiece buffer) {
+ return writeMessage(buffer.str());
+}
+
+void AsyncFileWriter::writeMessage(std::string&& buffer) {
+ auto data = data_.lock();
+ if (data->currentBufferSize >= data->maxBufferBytes) {
+ ++data->numDiscarded;
+ return;
+ }
+
+ data->currentBufferSize += buffer.size();
+ auto* queue = data->getCurrentQueue();
+ queue->emplace_back(std::move(buffer));
+ messageReady_.notify_one();
+}
+
+void AsyncFileWriter::flush() {
+ auto data = data_.lock();
+ auto start = data->ioThreadCounter;
+
+ // Wait until ioThreadCounter increments by at least two.
+ // Waiting for a single increment is not sufficient, as this happens after
+ // the I/O thread has swapped the queues, which is before it has actually
+ // done the I/O.
+ while (data->ioThreadCounter < start + 2) {
+ if (data->ioThreadDone) {
+ return;
+ }
+
+ // Enqueue an empty string and wake the I/O thread.
+ // The empty string ensures that the I/O thread will break out of its wait
+ // loop and increment the ioThreadCounter, even if there is no other work
+ // to do.
+ data->getCurrentQueue()->emplace_back();
+ messageReady_.notify_one();
+
+ // Wait for notification from the I/O thread that it has done work.
+ ioCV_.wait(data.getUniqueLock());
+ }
+}
+
+void AsyncFileWriter::ioThread() {
+ while (true) {
+ // With the lock held, grab a pointer to the current queue, then increment
+ // the ioThreadCounter index so that other threads will write into the
+ // other queue as we process this one.
+ std::vector<std::string>* ioQueue;
+ size_t numDiscarded;
+ bool stop;
+ {
+ auto data = data_.lock();
+ ioQueue = data->getCurrentQueue();
+ while (ioQueue->empty() && !data->stop) {
+ messageReady_.wait(data.getUniqueLock());
+ }
+
+ ++data->ioThreadCounter;
+ numDiscarded = data->numDiscarded;
+ data->numDiscarded = 0;
+ data->currentBufferSize = 0;
+ stop = data->stop;
+ }
+ ioCV_.notify_all();
+
+ // Write the log messages now that we have released the lock
+ try {
+ performIO(ioQueue);
+ } catch (const std::exception& ex) {
+ onIoError(ex);
+ }
+
+ // clear() empties the vector, but the allocated capacity remains so we can
+ // just reuse it without having to re-allocate in most cases.
+ ioQueue->clear();
+
+ if (numDiscarded > 0) {
+ auto msg = getNumDiscardedMsg(numDiscarded);
+ if (!msg.empty()) {
+ auto ret = folly::writeFull(file_.fd(), msg.data(), msg.size());
+ // We currently ignore errors from writeFull() here.
+ // There's not much we can really do.
+ (void)ret;
+ }
+ }
+
+ if (stop) {
+ data_->ioThreadDone = true;
+ break;
+ }
+ }
+}
+
+void AsyncFileWriter::performIO(std::vector<std::string>* ioQueue) {
+ // kNumIovecs controls the maximum number of strings we write at once in a
+ // single writev() call.
+ constexpr int kNumIovecs = 64;
+ std::array<iovec, kNumIovecs> iovecs;
+
+ size_t idx = 0;
+ while (idx < ioQueue->size()) {
+ int numIovecs = 0;
+ while (numIovecs < kNumIovecs && idx < ioQueue->size()) {
+ const auto& str = (*ioQueue)[idx];
+ iovecs[numIovecs].iov_base = const_cast<char*>(str.data());
+ iovecs[numIovecs].iov_len = str.size();
+ ++numIovecs;
+ ++idx;
+ }
+
+ auto ret = folly::writevFull(file_.fd(), iovecs.data(), numIovecs);
+ folly::checkUnixError(ret, "writeFull() failed");
+ }
+}
+
+void AsyncFileWriter::onIoError(const std::exception& ex) {
+ LoggerDB::internalWarning(
+ __FILE__,
+ __LINE__,
+ "error writing to log file ",
+ file_.fd(),
+ " in AsyncFileWriter: ",
+ folly::exceptionStr(ex));
+}
+
+std::string AsyncFileWriter::getNumDiscardedMsg(size_t numDiscarded) {
+ // We may want to make this customizable in the future (e.g., to allow it to
+ // conform to the LogFormatter style being used).
+ // For now just return a simple fixed message.
+ return folly::to<std::string>(
+ numDiscarded,
+ " log messages discarded: logging faster than we can write\n");
+}
+}
--- /dev/null
+/*
+ * Copyright 2004-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+#include <folly/File.h>
+#include <folly/Range.h>
+#include <folly/Synchronized.h>
+#include <folly/experimental/logging/LogWriter.h>
+
+namespace folly {
+
+/**
+ * A LogWriter implementation that asynchronously writes to a file descriptor.
+ *
+ * This class performs the log I/O in a separarate thread.
+ *
+ * The advantage of this class over ImmediateFileWriter is that logging I/O can
+ * never slow down or block your normal program operation. If log messages are
+ * generated faster than they can be written, messages will be dropped (and an
+ * indication of how many messages were dropped will be written to the log file
+ * when we are able to catch up a bit.)
+ *
+ * However, one downside is that if your program crashes, not all log messages
+ * may have been written, so you may lose messages generated immediately before
+ * the crash.
+ */
+class AsyncFileWriter : public LogWriter {
+ public:
+ /**
+ * Construct an AsyncFileWriter that appends to the file at the specified
+ * path.
+ */
+ explicit AsyncFileWriter(folly::StringPiece path);
+
+ /**
+ * Construct an AsyncFileWriter that writes to the specified File object.
+ */
+ explicit AsyncFileWriter(folly::File&& file);
+
+ ~AsyncFileWriter();
+
+ void writeMessage(folly::StringPiece buffer) override;
+ void writeMessage(std::string&& buffer) override;
+
+ /**
+ * Block until the I/O thread has finished writing all messages that
+ * were already enqueued when flush() was called.
+ */
+ void flush();
+
+ private:
+ /*
+ * A simple implementation using two queues.
+ * All writer threads enqueue into one queue while the I/O thread is
+ * processing the other.
+ *
+ * We could potentially also provide an implementation using folly::MPMCQueue
+ * in the future, which may improve contention under very high write loads.
+ */
+ struct Data {
+ std::array<std::vector<std::string>, 2> queues;
+ bool stop{false};
+ bool ioThreadDone{false};
+ uint64_t ioThreadCounter{0};
+ size_t maxBufferBytes{1024 * 1024};
+ size_t currentBufferSize{0};
+ size_t numDiscarded{0};
+
+ std::vector<std::string>* getCurrentQueue() {
+ return &queues[ioThreadCounter & 0x1];
+ }
+ };
+
+ void ioThread();
+ void performIO(std::vector<std::string>* ioQueue);
+
+ void onIoError(const std::exception& ex);
+ std::string getNumDiscardedMsg(size_t numDiscarded);
+
+ folly::File file_;
+ folly::Synchronized<Data, std::mutex> data_;
+ /**
+ * messageReady_ is signaled by writer threads whenever they add a new
+ * message to the current queue.
+ */
+ std::condition_variable messageReady_;
+ /**
+ * ioCV_ is signaled by the I/O thread each time it increments
+ * the ioThreadCounter (once each time around its loop).
+ */
+ std::condition_variable ioCV_;
+
+ /**
+ * The I/O thread.
+ *
+ * This should come last, since all other member variables need to be
+ * constructed before the I/O thread starts.
+ */
+ std::thread ioThread_;
+};
+}
--- /dev/null
+/*
+ * Copyright 2004-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/Conv.h>
+#include <folly/Exception.h>
+#include <folly/File.h>
+#include <folly/FileUtil.h>
+#include <folly/String.h>
+#include <folly/experimental/TestUtil.h>
+#include <folly/experimental/logging/AsyncFileWriter.h>
+#include <folly/experimental/logging/LoggerDB.h>
+#include <folly/portability/GFlags.h>
+#include <folly/portability/GMock.h>
+#include <folly/portability/GTest.h>
+#include <folly/portability/Unistd.h>
+
+DEFINE_int64(
+ async_discard_num_writer_threads,
+ 32,
+ "number of threads to use to generate log messages during "
+ "the AsyncFileWriter.discard test");
+DEFINE_int64(
+ async_discard_messages_per_writer,
+ 200000,
+ "number of messages each writer threads should generate in "
+ "the AsyncFileWriter.discard test");
+DEFINE_int64(
+ async_discard_read_sleep_usec,
+ 500,
+ "how long the read thread should sleep between reads in "
+ "the AsyncFileWriter.discard test");
+
+using namespace folly;
+using folly::test::TemporaryFile;
+
+TEST(AsyncFileWriter, noMessages) {
+ TemporaryFile tmpFile{"logging_test"};
+
+ // Test the simple construction and destruction of an AsyncFileWriter
+ // without ever writing any messages. This still exercises the I/O
+ // thread start-up and shutdown code.
+ AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
+}
+
+TEST(AsyncFileWriter, simpleMessages) {
+ TemporaryFile tmpFile{"logging_test"};
+
+ {
+ AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
+ for (int n = 0; n < 10; ++n) {
+ writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
+ sched_yield();
+ }
+ }
+
+ std::string data;
+ auto ret = folly::readFile(tmpFile.path().native().c_str(), data);
+ ASSERT_TRUE(ret);
+
+ std::string expected =
+ "message 0\n"
+ "message 1\n"
+ "message 2\n"
+ "message 3\n"
+ "message 4\n"
+ "message 5\n"
+ "message 6\n"
+ "message 7\n"
+ "message 8\n"
+ "message 9\n";
+ EXPECT_EQ(expected, data);
+}
+
+#ifndef _WIN32
+namespace {
+static std::vector<std::string>* internalWarnings;
+
+void handleLoggingError(
+ StringPiece /* file */,
+ int /* lineNumber */,
+ std::string&& msg) {
+ internalWarnings->emplace_back(std::move(msg));
+}
+}
+
+TEST(AsyncFileWriter, ioError) {
+ // Set the LoggerDB internal warning handler so we can record the messages
+ std::vector<std::string> logErrors;
+ internalWarnings = &logErrors;
+ LoggerDB::setInternalWarningHandler(handleLoggingError);
+
+ // Create an AsyncFileWriter that refers to a pipe whose read end is closed
+ std::array<int, 2> fds;
+ auto rc = pipe(fds.data());
+ folly::checkUnixError(rc, "failed to create pipe");
+ signal(SIGPIPE, SIG_IGN);
+ ::close(fds[0]);
+
+ // Log a bunch of messages to the writer
+ size_t numMessages = 100;
+ {
+ AsyncFileWriter writer{folly::File{fds[1], true}};
+ for (size_t n = 0; n < numMessages; ++n) {
+ writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
+ sched_yield();
+ }
+ }
+
+ LoggerDB::setInternalWarningHandler(nullptr);
+
+ // AsyncFileWriter should have some internal warning messages about the
+ // log failures. This will generally be many fewer than the number of
+ // messages we wrote, though, since it performs write batching.
+ for (const auto& msg : logErrors) {
+ EXPECT_THAT(
+ msg,
+ testing::ContainsRegex(
+ "error writing to log file .* in AsyncFileWriter.*: Broken pipe"));
+ }
+ EXPECT_GT(logErrors.size(), 0);
+ EXPECT_LE(logErrors.size(), numMessages);
+}
+#endif
+
+/**
+ * writeThread() writes a series of messages to the AsyncFileWriter
+ */
+void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) {
+ for (size_t n = 0; n < numMessages; ++n) {
+ writer->writeMessage(
+ folly::to<std::string>("thread ", id, " message ", n + 1, '\n'));
+ }
+}
+
+class ReadStats {
+ public:
+ void check(size_t numThreads, size_t messagesPerThread) {
+ EXPECT_EQ("", trailingData_);
+ EXPECT_EQ(numThreads, writers_.size());
+ size_t totalMessagesReceived = 0;
+ for (const auto& writerData : writers_) {
+ EXPECT_LE(writerData.numMessages, messagesPerThread);
+ EXPECT_LE(writerData.lastId, messagesPerThread);
+ totalMessagesReceived += writerData.numMessages;
+ }
+
+ EXPECT_EQ(0, numUnableToParse_);
+ EXPECT_EQ(0, numOutOfOrder_);
+ EXPECT_EQ(
+ numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
+ }
+
+ void messageReceived(StringPiece msg) {
+ if (msg.endsWith(" log messages discarded: "
+ "logging faster than we can write")) {
+ auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
+ fprintf(stderr, "received discard notification: %zu\n", discardCount);
+ numDiscarded_ += discardCount;
+ return;
+ }
+
+ size_t threadID = 0;
+ size_t messageIndex = 0;
+ try {
+ parseMessage(msg, &threadID, &messageIndex);
+ } catch (const std::exception& ex) {
+ ++numUnableToParse_;
+ fprintf(
+ stderr,
+ "unable to parse log message: %s\n",
+ folly::humanify(msg.str()).c_str());
+ return;
+ }
+
+ if (threadID >= writers_.size()) {
+ writers_.resize(threadID + 1);
+ }
+ writers_[threadID].numMessages++;
+ if (messageIndex > writers_[threadID].lastId) {
+ writers_[threadID].lastId = messageIndex;
+ } else {
+ ++numOutOfOrder_;
+ fprintf(
+ stderr,
+ "received out-of-order messages from writer %zu: "
+ "%zu received after %zu\n",
+ threadID,
+ messageIndex,
+ writers_[threadID].lastId);
+ }
+ }
+
+ void trailingData(StringPiece data) {
+ trailingData_ = data.str();
+ }
+
+ private:
+ struct WriterStats {
+ size_t numMessages{0};
+ size_t lastId{0};
+ };
+
+ void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
+ constexpr StringPiece prefix{"thread "};
+ constexpr StringPiece middle{" message "};
+ if (!msg.startsWith(prefix)) {
+ throw std::runtime_error("bad message prefix");
+ }
+
+ auto idx = prefix.size();
+ auto end = msg.find(' ', idx);
+ if (end == StringPiece::npos) {
+ throw std::runtime_error("no middle found");
+ }
+
+ *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
+ auto rest = msg.subpiece(end);
+ if (!rest.startsWith(middle)) {
+ throw std::runtime_error("bad message middle");
+ }
+
+ rest.advance(middle.size());
+ *messageIndex = folly::to<size_t>(rest);
+ }
+
+ std::vector<WriterStats> writers_;
+ std::string trailingData_;
+ size_t numUnableToParse_{0};
+ size_t numOutOfOrder_{0};
+ size_t numDiscarded_{0};
+};
+
+/**
+ * readThread() reads messages slowly from a pipe. This helps test the
+ * AsyncFileWriter behavior when I/O is slow.
+ */
+void readThread(folly::File&& file, ReadStats* stats) {
+ std::vector<char> buffer;
+ buffer.resize(1024);
+
+ size_t bufferIdx = 0;
+ while (true) {
+ /* sleep override */
+ usleep(FLAGS_async_discard_read_sleep_usec);
+
+ auto readResult = folly::readNoInt(
+ file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
+ if (readResult < 0) {
+ fprintf(stderr, "error reading from pipe: %d\n", errno);
+ return;
+ }
+ if (readResult == 0) {
+ fprintf(stderr, "read EOF\n");
+ break;
+ }
+
+ auto logDataLen = bufferIdx + readResult;
+ StringPiece logData{buffer.data(), logDataLen};
+ auto idx = 0;
+ while (true) {
+ auto end = logData.find('\n', idx);
+ if (end == StringPiece::npos) {
+ bufferIdx = logDataLen - idx;
+ memmove(buffer.data(), buffer.data() + idx, bufferIdx);
+ break;
+ }
+
+ StringPiece logMsg{logData.data() + idx, end - idx};
+ stats->messageReceived(logMsg);
+ idx = end + 1;
+ }
+ }
+
+ if (bufferIdx != 0) {
+ stats->trailingData(StringPiece{buffer.data(), bufferIdx});
+ }
+}
+
+/*
+ * The discard test spawns a number of threads that each write a large number
+ * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
+ * thread reads from it slowly, causing a backlog to build up.
+ *
+ * The test then checks that:
+ * - The read thread always receives full messages (no partial log messages)
+ * - Messages that are received are received in order
+ * - The number of messages received plus the number reported in discard
+ * notifications matches the number of messages sent.
+ */
+TEST(AsyncFileWriter, discard) {
+ std::array<int, 2> fds;
+ auto pipeResult = pipe(fds.data());
+ folly::checkUnixError(pipeResult, "pipe failed");
+ folly::File readPipe{fds[0], true};
+ folly::File writePipe{fds[1], true};
+
+ ReadStats readStats;
+ std::thread reader(readThread, std::move(readPipe), &readStats);
+ {
+ AsyncFileWriter writer{std::move(writePipe)};
+
+ std::vector<std::thread> writeThreads;
+ for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+ writeThreads.emplace_back(
+ writeThread, &writer, n, FLAGS_async_discard_messages_per_writer);
+ }
+
+ for (auto& t : writeThreads) {
+ t.join();
+ }
+ fprintf(stderr, "writers done\n");
+ }
+ reader.join();
+ readStats.check(
+ FLAGS_async_discard_num_writer_threads,
+ FLAGS_async_discard_messages_per_writer);
+}