#include <folly/File.h>
#include <folly/FileUtil.h>
#include <folly/String.h>
+#include <folly/Synchronized.h>
#include <folly/experimental/TestUtil.h>
#include <folly/experimental/logging/AsyncFileWriter.h>
+#include <folly/experimental/logging/Init.h>
#include <folly/experimental/logging/LoggerDB.h>
+#include <folly/experimental/logging/xlog.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
+#include <folly/init/Init.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <folly/portability/Unistd.h>
+DEFINE_string(logging, "", "folly::logging configuration");
DEFINE_int64(
- async_discard_num_writer_threads,
- 32,
- "number of threads to use to generate log messages during "
+ async_discard_num_normal_writers,
+ 30,
+ "number of threads to use to generate normal log messages during "
"the AsyncFileWriter.discard test");
DEFINE_int64(
- async_discard_messages_per_writer,
- 200000,
- "number of messages each writer threads should generate in "
+ async_discard_num_nodiscard_writers,
+ 2,
+ "number of threads to use to generate non-discardable log messages during "
"the AsyncFileWriter.discard test");
DEFINE_int64(
async_discard_read_sleep_usec,
500,
"how long the read thread should sleep between reads in "
"the AsyncFileWriter.discard test");
+DEFINE_int64(
+ async_discard_timeout_msec,
+ 10000,
+ "A timeout for the AsyncFileWriter.discard test if it cannot generate "
+ "enough discards");
+DEFINE_int64(
+ async_discard_num_events,
+ 10,
+ "The number of discard events to wait for in the AsyncFileWriter.discard "
+ "test");
using namespace folly;
using namespace std::literals::chrono_literals;
using folly::test::TemporaryFile;
+using std::chrono::steady_clock;
+using std::chrono::milliseconds;
TEST(AsyncFileWriter, noMessages) {
TemporaryFile tmpFile{"logging_test"};
totalBytes += bytesWritten;
}
}
- fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
+ XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
}
#endif
-/**
- * writeThread() writes a series of messages to the AsyncFileWriter
- */
-void writeThread(
- AsyncFileWriter* writer,
- size_t id,
- size_t numMessages,
- uint32_t flags) {
- for (size_t n = 0; n < numMessages; ++n) {
- writer->writeMessage(
- folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
- }
-}
+// A large-ish message suffix, just to consume space and help fill up
+// log buffers faster.
+static constexpr StringPiece kMsgSuffix{
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
class ReadStats {
public:
ReadStats()
- : readSleepUS_{static_cast<uint64_t>(
+ : deadline_{steady_clock::now() +
+ milliseconds{FLAGS_async_discard_timeout_msec}},
+ readSleepUS_{static_cast<uint64_t>(
std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
void clearSleepDuration() {
return std::chrono::microseconds{readSleepUS_.load()};
}
- void check(size_t numThreads, size_t messagesPerThread) {
- EXPECT_EQ("", trailingData_);
- EXPECT_EQ(numThreads, writers_.size());
- size_t totalMessagesReceived = 0;
- for (const auto& writerData : writers_) {
- EXPECT_LE(writerData.numMessages, messagesPerThread);
- EXPECT_LE(writerData.lastId, messagesPerThread);
- totalMessagesReceived += writerData.numMessages;
+ bool shouldWriterStop() const {
+ // Stop after we have seen the required number of separate discard events.
+ // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
+ // ensures the async writer blocks and then makes progress again multiple
+ // times.
+ if (FLAGS_async_discard_num_events > 0 &&
+ discardEventsSeen_.load() >
+ static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
+ return true;
}
+ // Stop after a timeout, even if we don't hit the number of requested
+ // discards.
+ return steady_clock::now() > deadline_;
+ }
+ void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
+ auto map = perThreadWriteData_.wlock();
+ assert(map->find(threadID) == map->end());
+ auto& data = (*map)[threadID];
+ data.numMessagesWritten = messagesWritten;
+ data.flags = flags;
+ }
+
+ void check() {
+ auto writeDataMap = perThreadWriteData_.wlock();
+
+ EXPECT_EQ("", trailingData_);
EXPECT_EQ(0, numUnableToParse_);
EXPECT_EQ(0, numOutOfOrder_);
- EXPECT_EQ(
- numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
- }
- /**
- * Check that no messages were dropped from the specified thread.
- */
- void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
- EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
- EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
+ // Check messages received from each writer thread
+ size_t readerStatsChecked = 0;
+ size_t totalMessagesWritten = 0;
+ size_t totalMessagesRead = 0;
+ for (const auto& writeEntry : *writeDataMap) {
+ const auto& writeInfo = writeEntry.second;
+ totalMessagesWritten += writeInfo.numMessagesWritten;
+
+ auto iter = perThreadReadData_.find(writeEntry.first);
+ if (iter == perThreadReadData_.end()) {
+ // We never received any messages from this writer thread.
+ // This is okay as long as this is not a NEVER_DISCARD writer.
+ EXPECT_EQ(0, writeInfo.flags);
+ continue;
+ }
+ const auto& readInfo = iter->second;
+ ++readerStatsChecked;
+ totalMessagesRead += readInfo.numMessagesRead;
+ if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
+ // Non-discarding threads should never discard anything
+ EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
+ EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
+ } else {
+ // Other threads may have discarded some messages
+ EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
+ EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
+ }
+ }
+
+ EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
+ EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
+
+ // This test is intended to check the discard behavior.
+ // Fail the test if we didn't actually trigger any discards before we timed
+ // out.
+ EXPECT_GT(numDiscarded_, 0);
+
+ XLOG(DBG1) << totalMessagesWritten << " messages written, "
+ << totalMessagesRead << " messages read, " << numDiscarded_
+ << " messages discarded";
}
void messageReceived(StringPiece msg) {
if (msg.endsWith(" log messages discarded: "
"logging faster than we can write")) {
auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
- fprintf(stderr, "received discard notification: %zu\n", discardCount);
+ XLOG(DBG3, "received discard notification: ", discardCount);
numDiscarded_ += discardCount;
+ ++discardEventsSeen_;
return;
}
parseMessage(msg, &threadID, &messageIndex);
} catch (const std::exception& ex) {
++numUnableToParse_;
- fprintf(
- stderr,
- "unable to parse log message: %s\n",
- folly::humanify(msg.str()).c_str());
+ XLOG(ERR, "unable to parse log message: ", msg);
return;
}
- if (threadID >= writers_.size()) {
- writers_.resize(threadID + 1);
- }
- writers_[threadID].numMessages++;
- if (messageIndex > writers_[threadID].lastId) {
- writers_[threadID].lastId = messageIndex;
+ auto& data = perThreadReadData_[threadID];
+ data.numMessagesRead++;
+ if (messageIndex > data.lastId) {
+ data.lastId = messageIndex;
} else {
++numOutOfOrder_;
- fprintf(
- stderr,
- "received out-of-order messages from writer %zu: "
- "%zu received after %zu\n",
- threadID,
- messageIndex,
- writers_[threadID].lastId);
+ XLOG(ERR) << "received out-of-order messages from writer " << threadID
+ << ": " << messageIndex << " received after " << data.lastId;
}
}
}
private:
- struct WriterStats {
- size_t numMessages{0};
+ struct ReaderData {
+ size_t numMessagesRead{0};
size_t lastId{0};
};
+ struct WriterData {
+ size_t numMessagesWritten{0};
+ int flags{0};
+ };
void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
+ // Validate and strip off the message prefix and suffix
constexpr StringPiece prefix{"thread "};
- constexpr StringPiece middle{" message "};
if (!msg.startsWith(prefix)) {
throw std::runtime_error("bad message prefix");
}
+ msg.advance(prefix.size());
+ if (!msg.endsWith(kMsgSuffix)) {
+ throw std::runtime_error("bad message suffix");
+ }
+ msg.subtract(kMsgSuffix.size());
- auto idx = prefix.size();
- auto end = msg.find(' ', idx);
- if (end == StringPiece::npos) {
+ // Parse then strip off the thread index
+ auto threadIDEnd = msg.find(' ');
+ if (threadIDEnd == StringPiece::npos) {
throw std::runtime_error("no middle found");
}
+ *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
+ msg.advance(threadIDEnd);
- *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
- auto rest = msg.subpiece(end);
- if (!rest.startsWith(middle)) {
+ // Validate that the middle of the message is what we expect,
+ // then strip it off
+ constexpr StringPiece middle{" message "};
+ if (!msg.startsWith(middle)) {
throw std::runtime_error("bad message middle");
}
+ msg.advance(middle.size());
- rest.advance(middle.size());
- *messageIndex = folly::to<size_t>(rest);
+ // Parse the message index
+ *messageIndex = folly::to<size_t>(msg);
}
- std::vector<WriterStats> writers_;
+ /**
+ * Data about each writer thread, as recorded by the reader thread.
+ *
+ * At the end of the test we will compare perThreadReadData_ (recorded by the
+ * reader) with perThreadWriteData_ (recorded by the writers) to make sure
+ * the data matches up.
+ *
+ * This is a map from writer_thread_id to ReaderData.
+ * The writer_thread_id is extracted from the received messages.
+ *
+ * This field does not need locking as it is only updated by the single
+ * reader thread.
+ */
+ std::unordered_map<size_t, ReaderData> perThreadReadData_;
+
+ /*
+ * Additional information recorded by the reader thread.
+ */
std::string trailingData_;
size_t numUnableToParse_{0};
size_t numOutOfOrder_{0};
size_t numDiscarded_{0};
- std::atomic<uint64_t> readSleepUS_;
+ /**
+ * deadline_ is a maximum end time for the test.
+ *
+ * The writer threads quit if the deadline is reached even if they have not
+ * produced the desired number of discard events yet.
+ */
+ const std::chrono::steady_clock::time_point deadline_;
+
+ /**
+ * How long the reader thread should sleep between each read event.
+ *
+ * This is initially set to a non-zero value (read from the
+ * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
+ * slowly, which will fill up the pipe buffer and cause discard events.
+ *
+ * Once we have produce enough discards and are ready to finish the test the
+ * main thread reduces readSleepUS_ to 0, so the reader will finish the
+ * remaining message backlog quickly.
+ */
+ std::atomic<uint64_t> readSleepUS_{0};
+
+ /**
+ * A count of how many discard events have been seen so far.
+ *
+ * The reader increments discardEventsSeen_ each time it sees a discard
+ * notification message. A "discard event" basically corresponds to a single
+ * group of dropped messages. Once the reader pulls some messages off out of
+ * the pipe the writers should be able to send more data, but the buffer will
+ * eventually fill up again, producing another discard event.
+ */
+ std::atomic<uint64_t> discardEventsSeen_{0};
+
+ /**
+ * Data about each writer thread, as recorded by the writers.
+ *
+ * When each writer thread finishes it records how many messages it wrote,
+ * plus the flags it used to write the messages.
+ */
+ folly::Synchronized<std::unordered_map<size_t, WriterData>>
+ perThreadWriteData_;
};
/**
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
if (readResult < 0) {
- fprintf(stderr, "error reading from pipe: %d\n", errno);
+ XLOG(ERR, "error reading from pipe: ", errno);
return;
}
if (readResult == 0) {
- fprintf(stderr, "read EOF\n");
+ XLOG(DBG2, "read EOF");
break;
}
}
}
+/**
+ * writeThread() writes a series of messages to the AsyncFileWriter
+ */
+void writeThread(
+ AsyncFileWriter* writer,
+ size_t id,
+ uint32_t flags,
+ ReadStats* readStats) {
+ size_t msgID = 0;
+ while (true) {
+ ++msgID;
+ writer->writeMessage(
+ folly::to<std::string>(
+ "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
+ flags);
+
+ // Break out once the reader has seen enough discards
+ if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
+ readStats->writerFinished(id, msgID, flags);
+ break;
+ }
+ }
+}
+
/*
* The discard test spawns a number of threads that each write a large number
* of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
folly::File readPipe{fds[0], true};
folly::File writePipe{fds[1], true};
- // This test should always be run with at least 2 writer threads.
- // The last one will use the NEVER_DISCARD flag, and we want at least
- // one that does discard messages.
- ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
-
ReadStats readStats;
std::thread reader(readThread, std::move(readPipe), &readStats);
{
AsyncFileWriter writer{std::move(writePipe)};
std::vector<std::thread> writeThreads;
- for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+ size_t numThreads = FLAGS_async_discard_num_normal_writers +
+ FLAGS_async_discard_num_nodiscard_writers;
+
+ for (size_t n = 0; n < numThreads; ++n) {
uint32_t flags = 0;
- // Configure the last writer thread to never drop messages
- if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
+ if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
flags = LogWriter::NEVER_DISCARD;
}
+ XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
- writeThreads.emplace_back(
- writeThread,
- &writer,
- n,
- FLAGS_async_discard_messages_per_writer,
- flags);
+ writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
}
for (auto& t : writeThreads) {
t.join();
}
- fprintf(stderr, "writers done\n");
+ XLOG(DBG2, "writers done");
}
// Clear the read sleep duration so the reader will finish quickly now
readStats.clearSleepDuration();
reader.join();
- readStats.check(
- FLAGS_async_discard_num_writer_threads,
- FLAGS_async_discard_messages_per_writer);
- // Check that no messages were dropped from the thread using the
- // NEVER_DISCARD flag.
- readStats.checkNoDrops(
- FLAGS_async_discard_num_writer_threads - 1,
- FLAGS_async_discard_messages_per_writer);
+ readStats.check();
+}
+
+int main(int argc, char* argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ folly::init(&argc, &argv);
+ // Don't use async logging in the async logging tests :-)
+ folly::initLoggingGlogStyle(FLAGS_logging, LogLevel::INFO, /* async */ false);
+
+ return RUN_ALL_TESTS();
}