* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <thread>
+
#include <folly/Conv.h>
#include <folly/Exception.h>
#include <folly/File.h>
#include <folly/experimental/TestUtil.h>
#include <folly/experimental/logging/AsyncFileWriter.h>
#include <folly/experimental/logging/LoggerDB.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
EXPECT_GT(logErrors.size(), 0);
EXPECT_LE(logErrors.size(), numMessages);
}
+
+namespace {
+size_t fillUpPipe(int fd) {
+ int flags = fcntl(fd, F_GETFL);
+ folly::checkUnixError(flags, "failed get file descriptor flags");
+ auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
+ std::vector<char> data;
+ data.resize(4000);
+ size_t totalBytes = 0;
+ size_t bytesToWrite = data.size();
+ while (true) {
+ auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
+ if (bytesWritten < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // We blocked. Keep trying smaller writes, until we get down to a
+ // single byte, just to make sure the logging code really won't be able
+ // to write anything to the pipe.
+ if (bytesToWrite <= 1) {
+ break;
+ } else {
+ bytesToWrite /= 2;
+ }
+ } else {
+ throwSystemError("error writing to pipe");
+ }
+ } else {
+ totalBytes += bytesWritten;
+ }
+ }
+ fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
+
+ rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+ folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
+
+ return totalBytes;
+}
+}
+
+TEST(AsyncFileWriter, flush) {
+ // Set up a pipe(), then write data to the write endpoint until it fills up
+ // and starts blocking.
+ std::array<int, 2> fds;
+ auto rc = pipe(fds.data());
+ folly::checkUnixError(rc, "failed to create pipe");
+ File readPipe{fds[0], true};
+ File writePipe{fds[1], true};
+
+ auto paddingSize = fillUpPipe(writePipe.fd());
+
+ // Now set up an AsyncFileWriter pointing at the write end of the pipe
+ AsyncFileWriter writer{std::move(writePipe)};
+
+ // Write a message
+ writer.writeMessage(std::string{"test message"});
+
+ // Call flush(). Use a separate thread, since this should block until we
+ // consume data from the pipe.
+ Promise<Unit> promise;
+ auto future = promise.getFuture();
+ auto flushFunction = [&] { writer.flush(); };
+ std::thread flushThread{
+ [&]() { promise.setTry(makeTryWith(flushFunction)); }};
+
+ // Sleep briefly, and make sure flush() still hasn't completed.
+ /* sleep override */
+ std::this_thread::sleep_for(10ms);
+ EXPECT_FALSE(future.isReady());
+
+ // Now read from the pipe
+ std::vector<char> buf;
+ buf.resize(paddingSize);
+ readFull(readPipe.fd(), buf.data(), buf.size());
+
+ // Make sure flush completes successfully now
+ future.get(10ms);
+ flushThread.join();
+}
#endif
/**
size_t bufferIdx = 0;
while (true) {
/* sleep override */
- usleep(stats->getSleepUS().count());
+ std::this_thread::sleep_for(stats->getSleepUS());
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);