*/
class LogWriter {
public:
+ /**
+ * Bit flag values for use with writeMessage()
+ */
+ enum Flags : uint32_t {
+ NO_FLAGS = 0x00,
+ /**
+ * Ensure that this log message never gets discarded.
+ *
+ * Some LogWriter implementations may discard messages when messages are
+ * being received faster than they can be written. This flag ensures that
+ * this message will never be discarded.
+ *
+ * This flag is used to ensure that LOG(FATAL) messages never get
+ * discarded, so we always report the reason for a crash.
+ */
+ NEVER_DISCARD = 0x01,
+ };
+
virtual ~LogWriter() {}
/**
* Write a serialized log message.
+ *
+ * The flags parameter is a bitwise-ORed set of Flag values defined above.
*/
- virtual void writeMessage(folly::StringPiece buffer) = 0;
+ virtual void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) = 0;
/**
* Write a serialized message.
* writeMessage(), but subclasses may override this implementation if
* desired.
*/
- virtual void writeMessage(std::string&& buffer) {
- writeMessage(folly::StringPiece{buffer});
+ virtual void writeMessage(std::string&& buffer, uint32_t flags = 0) {
+ writeMessage(folly::StringPiece{buffer}, flags);
}
};
}
"the AsyncFileWriter.discard test");
using namespace folly;
+using namespace std::literals::chrono_literals;
using folly::test::TemporaryFile;
TEST(AsyncFileWriter, noMessages) {
/**
* writeThread() writes a series of messages to the AsyncFileWriter
*/
-void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) {
+void writeThread(
+ AsyncFileWriter* writer,
+ size_t id,
+ size_t numMessages,
+ uint32_t flags) {
for (size_t n = 0; n < numMessages; ++n) {
writer->writeMessage(
- folly::to<std::string>("thread ", id, " message ", n + 1, '\n'));
+ folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
}
}
class ReadStats {
public:
+ ReadStats()
+ : readSleepUS_{static_cast<uint64_t>(
+ std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
+
+ void clearSleepDuration() {
+ readSleepUS_.store(0);
+ }
+ std::chrono::microseconds getSleepUS() const {
+ return std::chrono::microseconds{readSleepUS_.load()};
+ }
+
void check(size_t numThreads, size_t messagesPerThread) {
EXPECT_EQ("", trailingData_);
EXPECT_EQ(numThreads, writers_.size());
numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
}
+ /**
+ * Check that no messages were dropped from the specified thread.
+ */
+ void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
+ EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
+ EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
+ }
+
void messageReceived(StringPiece msg) {
if (msg.endsWith(" log messages discarded: "
"logging faster than we can write")) {
size_t numUnableToParse_{0};
size_t numOutOfOrder_{0};
size_t numDiscarded_{0};
+
+ std::atomic<uint64_t> readSleepUS_;
};
/**
size_t bufferIdx = 0;
while (true) {
/* sleep override */
- usleep(FLAGS_async_discard_read_sleep_usec);
+ usleep(stats->getSleepUS().count());
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
folly::File readPipe{fds[0], true};
folly::File writePipe{fds[1], true};
+ // This test should always be run with at least 2 writer threads.
+ // The last one will use the NEVER_DISCARD flag, and we want at least
+ // one that does discard messages.
+ ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
+
ReadStats readStats;
std::thread reader(readThread, std::move(readPipe), &readStats);
{
std::vector<std::thread> writeThreads;
for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+ uint32_t flags = 0;
+ // Configure the last writer thread to never drop messages
+ if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
+ flags = LogWriter::NEVER_DISCARD;
+ }
+
writeThreads.emplace_back(
- writeThread, &writer, n, FLAGS_async_discard_messages_per_writer);
+ writeThread,
+ &writer,
+ n,
+ FLAGS_async_discard_messages_per_writer,
+ flags);
}
for (auto& t : writeThreads) {
}
fprintf(stderr, "writers done\n");
}
+ // Clear the read sleep duration so the reader will finish quickly now
+ readStats.clearSleepDuration();
reader.join();
readStats.check(
FLAGS_async_discard_num_writer_threads,
FLAGS_async_discard_messages_per_writer);
+ // Check that no messages were dropped from the thread using the
+ // NEVER_DISCARD flag.
+ readStats.checkNoDrops(
+ FLAGS_async_discard_num_writer_threads - 1,
+ FLAGS_async_discard_messages_per_writer);
}