From 2ba26b052ea53cfa4126981de54677c2af623f66 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Tue, 21 May 2013 10:59:39 -0700 Subject: [PATCH] Add QueueAppender Summary: Add an Appender that appends to a IOBufQueue. You can pass in the maximum append size if you know it, and it will throw on overflow, and will never allocate IOBufs larger than the remaining size. Test Plan: test added Reviewed By: davejwatson@fb.com FB internal diff: D820160 --- folly/io/Cursor.h | 93 +++++++++++++++++++++++++++++++ folly/io/IOBuf.cpp | 16 ++++++ folly/io/IOBuf.h | 7 +++ folly/io/IOBufQueue.cpp | 14 +++++ folly/io/IOBufQueue.h | 7 +++ folly/io/test/IOBufCursorTest.cpp | 28 ++++++++++ 6 files changed, 165 insertions(+) diff --git a/folly/io/Cursor.h b/folly/io/Cursor.h index 790b9cca..ce737051 100644 --- a/folly/io/Cursor.h +++ b/folly/io/Cursor.h @@ -25,6 +25,7 @@ #include "folly/Bits.h" #include "folly/io/IOBuf.h" +#include "folly/io/IOBufQueue.h" #include "folly/Likely.h" /** @@ -596,6 +597,98 @@ class Appender : public detail::Writable { uint32_t growth_; }; +class QueueAppender : public detail::Writable { + public: + /** + * Create an Appender that writes to a IOBufQueue. When we allocate + * space in the queue, we grow no more than growth bytes at once + * (unless you call ensure() with a bigger value yourself). + * Throw if we ever need to allocate more than maxTotalGrowth. + */ + QueueAppender(IOBufQueue* queue, + uint32_t growth, + size_t maxTotalGrowth = std::numeric_limits::max()) { + reset(queue, growth, maxTotalGrowth); + } + + void reset(IOBufQueue* queue, + uint32_t growth, + size_t maxTotalGrowth = std::numeric_limits::max()) { + queue_ = queue; + growth_ = growth; + remainingGrowth_ = maxTotalGrowth; + next_ = nullptr; + available_ = 0; + } + + uint8_t* writableData() { return next_; } + + size_t length() const { return available_; } + + void append(size_t n) { + assert(n <= available_); + assert(n <= remainingGrowth_); + queue_->postallocate(n); + next_ += n; + available_ -= n; + remainingGrowth_ -= n; + } + + // Ensure at least n contiguous; can go above growth_, throws if + // not enough room. + void ensure(uint32_t n) { + if (UNLIKELY(n > remainingGrowth_)) { + throw std::out_of_range("overflow"); + } + + if (LIKELY(length() >= n)) { + return; + } + + size_t desired = std::min(growth_, remainingGrowth_ - n); + + // Grab some more. + auto p = queue_->preallocate(n, desired); + + next_ = static_cast(p.first); + available_ = p.second; + } + + size_t pushAtMost(const uint8_t* buf, size_t len) { + if (UNLIKELY(len > remainingGrowth_)) { + len = remainingGrowth_; + } + + size_t remaining = len; + while (remaining != 0) { + ensure(std::min(remaining, growth_)); + size_t n = std::min(remaining, available_); + memcpy(next_, buf, n); + buf += n; + remaining -= n; + append(n); + } + + return len; + } + + // insert doesn't count towards maxTotalGrowth + void insert(std::unique_ptr buf) { + if (buf) { + queue_->append(std::move(buf), true); + next_ = nullptr; + available_ = 0; + } + } + + private: + folly::IOBufQueue* queue_; + size_t growth_; + size_t remainingGrowth_; + uint8_t* next_; + size_t available_; +}; + }} // folly::io #endif // FOLLY_CURSOR_H diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp index e9b9151b..8fc040a5 100644 --- a/folly/io/IOBuf.cpp +++ b/folly/io/IOBuf.cpp @@ -116,6 +116,22 @@ unique_ptr IOBuf::create(uint32_t capacity) { } } +unique_ptr IOBuf::createChain( + size_t totalCapacity, uint32_t maxBufCapacity) { + unique_ptr out = create( + std::min(totalCapacity, size_t(maxBufCapacity))); + size_t allocatedCapacity = out->capacity(); + + while (allocatedCapacity < totalCapacity) { + unique_ptr newBuf = create( + std::min(totalCapacity - allocatedCapacity, size_t(maxBufCapacity))); + allocatedCapacity += newBuf->capacity(); + out->prependChain(std::move(newBuf)); + } + + return out; +} + unique_ptr IOBuf::takeOwnership(void* buf, uint32_t capacity, uint32_t length, FreeFunction freeFn, diff --git a/folly/io/IOBuf.h b/folly/io/IOBuf.h index 007c3841..935f82b6 100644 --- a/folly/io/IOBuf.h +++ b/folly/io/IOBuf.h @@ -218,6 +218,13 @@ class IOBuf { */ static std::unique_ptr create(uint32_t capacity); + /** + * Allocate a new IOBuf chain with the requested total capacity, allocating + * no more than maxBufCapacity to each buffer. + */ + static std::unique_ptr createChain( + size_t totalCapacity, uint32_t maxBufCapacity); + /** * Create a new IOBuf pointing to an existing data buffer. * diff --git a/folly/io/IOBufQueue.cpp b/folly/io/IOBufQueue.cpp index 27c5420a..c3137256 100644 --- a/folly/io/IOBufQueue.cpp +++ b/folly/io/IOBufQueue.cpp @@ -297,4 +297,18 @@ std::unique_ptr IOBufQueue::pop_front() { return retBuf; } +void IOBufQueue::clear() { + if (!head_) { + return; + } + IOBuf* buf = head_.get(); + do { + buf->clear(); + buf = buf->next(); + } while (buf != head_.get()); + if (options_.cacheChainLength) { + chainLength_ = 0; + } +} + } // folly diff --git a/folly/io/IOBufQueue.h b/folly/io/IOBufQueue.h index 7c32875f..2a63fbf9 100644 --- a/folly/io/IOBufQueue.h +++ b/folly/io/IOBufQueue.h @@ -226,6 +226,13 @@ class IOBufQueue { return options_; } + /** + * Clear the queue. Note that this does not release the buffers, it + * just sets their length to zero; useful if you want to reuse the + * same queue without reallocating. + */ + void clear(); + /** Movable */ IOBufQueue(IOBufQueue&&); IOBufQueue& operator=(IOBufQueue&&); diff --git a/folly/io/test/IOBufCursorTest.cpp b/folly/io/test/IOBufCursorTest.cpp index 4c7d622c..4a5366dd 100644 --- a/folly/io/test/IOBufCursorTest.cpp +++ b/folly/io/test/IOBufCursorTest.cpp @@ -326,6 +326,34 @@ TEST(IOBuf, Appender) { EXPECT_EQ("hello world", toString(*head)); } +TEST(IOBuf, QueueAppender) { + folly::IOBufQueue queue; + + // Allocate 100 bytes at once, but don't grow past 1024 + QueueAppender app(&queue, 100, 1024); + size_t n = 1024 / sizeof(uint32_t); + for (uint32_t i = 0; i < n; ++i) { + app.writeBE(i); + } + + EXPECT_THROW({app.writeBE(0);}, std::out_of_range); + + // There must be a goodMallocSize between 100 and 1024... + EXPECT_LT(1, queue.front()->countChainElements()); + const IOBuf* buf = queue.front(); + do { + EXPECT_LE(100, buf->capacity()); + buf = buf->next(); + } while (buf != queue.front()); + + Cursor cursor(queue.front()); + for (uint32_t i = 0; i < n; ++i) { + EXPECT_EQ(i, cursor.readBE()); + } + + EXPECT_THROW({cursor.readBE();}, std::out_of_range); +} + TEST(IOBuf, CursorOperators) { // Test operators on a single-item chain { -- 2.34.1