#include "folly/Bits.h"
#include "folly/io/IOBuf.h"
+#include "folly/io/IOBufQueue.h"
#include "folly/Likely.h"
/**
uint32_t growth_;
};
+class QueueAppender : public detail::Writable<QueueAppender> {
+ public:
+ /**
+ * Create an Appender that writes to a IOBufQueue. When we allocate
+ * space in the queue, we grow no more than growth bytes at once
+ * (unless you call ensure() with a bigger value yourself).
+ * Throw if we ever need to allocate more than maxTotalGrowth.
+ */
+ QueueAppender(IOBufQueue* queue,
+ uint32_t growth,
+ size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
+ reset(queue, growth, maxTotalGrowth);
+ }
+
+ void reset(IOBufQueue* queue,
+ uint32_t growth,
+ size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
+ queue_ = queue;
+ growth_ = growth;
+ remainingGrowth_ = maxTotalGrowth;
+ next_ = nullptr;
+ available_ = 0;
+ }
+
+ uint8_t* writableData() { return next_; }
+
+ size_t length() const { return available_; }
+
+ void append(size_t n) {
+ assert(n <= available_);
+ assert(n <= remainingGrowth_);
+ queue_->postallocate(n);
+ next_ += n;
+ available_ -= n;
+ remainingGrowth_ -= n;
+ }
+
+ // Ensure at least n contiguous; can go above growth_, throws if
+ // not enough room.
+ void ensure(uint32_t n) {
+ if (UNLIKELY(n > remainingGrowth_)) {
+ throw std::out_of_range("overflow");
+ }
+
+ if (LIKELY(length() >= n)) {
+ return;
+ }
+
+ size_t desired = std::min(growth_, remainingGrowth_ - n);
+
+ // Grab some more.
+ auto p = queue_->preallocate(n, desired);
+
+ next_ = static_cast<uint8_t*>(p.first);
+ available_ = p.second;
+ }
+
+ size_t pushAtMost(const uint8_t* buf, size_t len) {
+ if (UNLIKELY(len > remainingGrowth_)) {
+ len = remainingGrowth_;
+ }
+
+ size_t remaining = len;
+ while (remaining != 0) {
+ ensure(std::min(remaining, growth_));
+ size_t n = std::min(remaining, available_);
+ memcpy(next_, buf, n);
+ buf += n;
+ remaining -= n;
+ append(n);
+ }
+
+ return len;
+ }
+
+ // insert doesn't count towards maxTotalGrowth
+ void insert(std::unique_ptr<folly::IOBuf> buf) {
+ if (buf) {
+ queue_->append(std::move(buf), true);
+ next_ = nullptr;
+ available_ = 0;
+ }
+ }
+
+ private:
+ folly::IOBufQueue* queue_;
+ size_t growth_;
+ size_t remainingGrowth_;
+ uint8_t* next_;
+ size_t available_;
+};
+
}} // folly::io
#endif // FOLLY_CURSOR_H
}
}
+unique_ptr<IOBuf> IOBuf::createChain(
+ size_t totalCapacity, uint32_t maxBufCapacity) {
+ unique_ptr<IOBuf> out = create(
+ std::min(totalCapacity, size_t(maxBufCapacity)));
+ size_t allocatedCapacity = out->capacity();
+
+ while (allocatedCapacity < totalCapacity) {
+ unique_ptr<IOBuf> newBuf = create(
+ std::min(totalCapacity - allocatedCapacity, size_t(maxBufCapacity)));
+ allocatedCapacity += newBuf->capacity();
+ out->prependChain(std::move(newBuf));
+ }
+
+ return out;
+}
+
unique_ptr<IOBuf> IOBuf::takeOwnership(void* buf, uint32_t capacity,
uint32_t length,
FreeFunction freeFn,
*/
static std::unique_ptr<IOBuf> create(uint32_t capacity);
+ /**
+ * Allocate a new IOBuf chain with the requested total capacity, allocating
+ * no more than maxBufCapacity to each buffer.
+ */
+ static std::unique_ptr<IOBuf> createChain(
+ size_t totalCapacity, uint32_t maxBufCapacity);
+
/**
* Create a new IOBuf pointing to an existing data buffer.
*
return retBuf;
}
+void IOBufQueue::clear() {
+ if (!head_) {
+ return;
+ }
+ IOBuf* buf = head_.get();
+ do {
+ buf->clear();
+ buf = buf->next();
+ } while (buf != head_.get());
+ if (options_.cacheChainLength) {
+ chainLength_ = 0;
+ }
+}
+
} // folly
return options_;
}
+ /**
+ * Clear the queue. Note that this does not release the buffers, it
+ * just sets their length to zero; useful if you want to reuse the
+ * same queue without reallocating.
+ */
+ void clear();
+
/** Movable */
IOBufQueue(IOBufQueue&&);
IOBufQueue& operator=(IOBufQueue&&);
EXPECT_EQ("hello world", toString(*head));
}
+TEST(IOBuf, QueueAppender) {
+ folly::IOBufQueue queue;
+
+ // Allocate 100 bytes at once, but don't grow past 1024
+ QueueAppender app(&queue, 100, 1024);
+ size_t n = 1024 / sizeof(uint32_t);
+ for (uint32_t i = 0; i < n; ++i) {
+ app.writeBE(i);
+ }
+
+ EXPECT_THROW({app.writeBE(0);}, std::out_of_range);
+
+ // There must be a goodMallocSize between 100 and 1024...
+ EXPECT_LT(1, queue.front()->countChainElements());
+ const IOBuf* buf = queue.front();
+ do {
+ EXPECT_LE(100, buf->capacity());
+ buf = buf->next();
+ } while (buf != queue.front());
+
+ Cursor cursor(queue.front());
+ for (uint32_t i = 0; i < n; ++i) {
+ EXPECT_EQ(i, cursor.readBE<uint32_t>());
+ }
+
+ EXPECT_THROW({cursor.readBE<uint32_t>();}, std::out_of_range);
+}
+
TEST(IOBuf, CursorOperators) {
// Test operators on a single-item chain
{