Add QueueAppender
authorTudor Bosman <tudorb@fb.com>
Tue, 21 May 2013 17:59:39 +0000 (10:59 -0700)
committerOwen Yamauchi <oyamauchi@fb.com>
Mon, 3 Jun 2013 19:20:53 +0000 (12:20 -0700)
Summary:
Add an Appender that appends to a IOBufQueue.  You can pass in the maximum
append size if you know it, and it will throw on overflow, and will never
allocate IOBufs larger than the remaining size.

Test Plan: test added

Reviewed By: davejwatson@fb.com

FB internal diff: D820160

folly/io/Cursor.h
folly/io/IOBuf.cpp
folly/io/IOBuf.h
folly/io/IOBufQueue.cpp
folly/io/IOBufQueue.h
folly/io/test/IOBufCursorTest.cpp

index 790b9cca4a9e22994f38462dfd3af7a95cd7d10c..ce73705125957ec1ddb4d33a502b98d337326f72 100644 (file)
@@ -25,6 +25,7 @@
 
 #include "folly/Bits.h"
 #include "folly/io/IOBuf.h"
+#include "folly/io/IOBufQueue.h"
 #include "folly/Likely.h"
 
 /**
@@ -596,6 +597,98 @@ class Appender : public detail::Writable<Appender> {
   uint32_t growth_;
 };
 
+class QueueAppender : public detail::Writable<QueueAppender> {
+ public:
+  /**
+   * Create an Appender that writes to a IOBufQueue.  When we allocate
+   * space in the queue, we grow no more than growth bytes at once
+   * (unless you call ensure() with a bigger value yourself).
+   * Throw if we ever need to allocate more than maxTotalGrowth.
+   */
+  QueueAppender(IOBufQueue* queue,
+                uint32_t growth,
+                size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
+    reset(queue, growth, maxTotalGrowth);
+  }
+
+  void reset(IOBufQueue* queue,
+             uint32_t growth,
+             size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
+    queue_ = queue;
+    growth_ = growth;
+    remainingGrowth_ = maxTotalGrowth;
+    next_ = nullptr;
+    available_ = 0;
+  }
+
+  uint8_t* writableData() { return next_; }
+
+  size_t length() const { return available_; }
+
+  void append(size_t n) {
+    assert(n <= available_);
+    assert(n <= remainingGrowth_);
+    queue_->postallocate(n);
+    next_ += n;
+    available_ -= n;
+    remainingGrowth_ -= n;
+  }
+
+  // Ensure at least n contiguous; can go above growth_, throws if
+  // not enough room.
+  void ensure(uint32_t n) {
+    if (UNLIKELY(n > remainingGrowth_)) {
+      throw std::out_of_range("overflow");
+    }
+
+    if (LIKELY(length() >= n)) {
+      return;
+    }
+
+    size_t desired = std::min(growth_, remainingGrowth_ - n);
+
+    // Grab some more.
+    auto p = queue_->preallocate(n, desired);
+
+    next_ = static_cast<uint8_t*>(p.first);
+    available_ = p.second;
+  }
+
+  size_t pushAtMost(const uint8_t* buf, size_t len) {
+    if (UNLIKELY(len > remainingGrowth_)) {
+      len = remainingGrowth_;
+    }
+
+    size_t remaining = len;
+    while (remaining != 0) {
+      ensure(std::min(remaining, growth_));
+      size_t n = std::min(remaining, available_);
+      memcpy(next_, buf, n);
+      buf += n;
+      remaining -= n;
+      append(n);
+    }
+
+    return len;
+  }
+
+  // insert doesn't count towards maxTotalGrowth
+  void insert(std::unique_ptr<folly::IOBuf> buf) {
+    if (buf) {
+      queue_->append(std::move(buf), true);
+      next_ = nullptr;
+      available_ = 0;
+    }
+  }
+
+ private:
+  folly::IOBufQueue* queue_;
+  size_t growth_;
+  size_t remainingGrowth_;
+  uint8_t* next_;
+  size_t available_;
+};
+
 }}  // folly::io
 
 #endif // FOLLY_CURSOR_H
index e9b9151b13dc93a29d1e31a715f4c4994bec7d48..8fc040a57103ec8d391386800329140375fad3c0 100644 (file)
@@ -116,6 +116,22 @@ unique_ptr<IOBuf> IOBuf::create(uint32_t capacity) {
   }
 }
 
+unique_ptr<IOBuf> IOBuf::createChain(
+    size_t totalCapacity, uint32_t maxBufCapacity) {
+  unique_ptr<IOBuf> out = create(
+      std::min(totalCapacity, size_t(maxBufCapacity)));
+  size_t allocatedCapacity = out->capacity();
+
+  while (allocatedCapacity < totalCapacity) {
+    unique_ptr<IOBuf> newBuf = create(
+        std::min(totalCapacity - allocatedCapacity, size_t(maxBufCapacity)));
+    allocatedCapacity += newBuf->capacity();
+    out->prependChain(std::move(newBuf));
+  }
+
+  return out;
+}
+
 unique_ptr<IOBuf> IOBuf::takeOwnership(void* buf, uint32_t capacity,
                                        uint32_t length,
                                        FreeFunction freeFn,
index 007c38414279c5a3e3e49bb16210f3a4a32a45ba..935f82b6ff51a518621799b9a11f77b90d0e0ae3 100644 (file)
@@ -218,6 +218,13 @@ class IOBuf {
    */
   static std::unique_ptr<IOBuf> create(uint32_t capacity);
 
+  /**
+   * Allocate a new IOBuf chain with the requested total capacity, allocating
+   * no more than maxBufCapacity to each buffer.
+   */
+  static std::unique_ptr<IOBuf> createChain(
+      size_t totalCapacity, uint32_t maxBufCapacity);
+
   /**
    * Create a new IOBuf pointing to an existing data buffer.
    *
index 27c5420ae50a13d0377bf7c085a4675099462b9d..c313725603b7549a7e5faa4a289773b58406248c 100644 (file)
@@ -297,4 +297,18 @@ std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
   return retBuf;
 }
 
+void IOBufQueue::clear() {
+  if (!head_) {
+    return;
+  }
+  IOBuf* buf = head_.get();
+  do {
+    buf->clear();
+    buf = buf->next();
+  } while (buf != head_.get());
+  if (options_.cacheChainLength) {
+    chainLength_ = 0;
+  }
+}
+
 } // folly
index 7c32875f2c9068bcdd579a20dd217957d1b639de..2a63fbf91fe57bc2b96fd34a09daeae77ad0cf6b 100644 (file)
@@ -226,6 +226,13 @@ class IOBufQueue {
     return options_;
   }
 
+  /**
+   * Clear the queue.  Note that this does not release the buffers, it
+   * just sets their length to zero; useful if you want to reuse the
+   * same queue without reallocating.
+   */
+  void clear();
+
   /** Movable */
   IOBufQueue(IOBufQueue&&);
   IOBufQueue& operator=(IOBufQueue&&);
index 4c7d622cdf36e6ada7c07e05c4dd95adeaf3b241..4a5366ddbe4cb90151c96201aa15becadfd7979c 100644 (file)
@@ -326,6 +326,34 @@ TEST(IOBuf, Appender) {
   EXPECT_EQ("hello world", toString(*head));
 }
 
+TEST(IOBuf, QueueAppender) {
+  folly::IOBufQueue queue;
+
+  // Allocate 100 bytes at once, but don't grow past 1024
+  QueueAppender app(&queue, 100, 1024);
+  size_t n = 1024 / sizeof(uint32_t);
+  for (uint32_t i = 0; i < n; ++i) {
+    app.writeBE(i);
+  }
+
+  EXPECT_THROW({app.writeBE(0);}, std::out_of_range);
+
+  // There must be a goodMallocSize between 100 and 1024...
+  EXPECT_LT(1, queue.front()->countChainElements());
+  const IOBuf* buf = queue.front();
+  do {
+    EXPECT_LE(100, buf->capacity());
+    buf = buf->next();
+  } while (buf != queue.front());
+
+  Cursor cursor(queue.front());
+  for (uint32_t i = 0; i < n; ++i) {
+    EXPECT_EQ(i, cursor.readBE<uint32_t>());
+  }
+
+  EXPECT_THROW({cursor.readBE<uint32_t>();}, std::out_of_range);
+}
+
 TEST(IOBuf, CursorOperators) {
   // Test operators on a single-item chain
   {