const size_t MIN_ALLOC_SIZE = 2000;
const size_t MAX_ALLOC_SIZE = 8000; // Must fit within a uint32_t
+const size_t MAX_PACK_COPY = 4096;
/**
* Convenience function to append chain src to chain dst.
*/
void
-appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src) {
+appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
if (dst == NULL) {
dst = std::move(src);
} else {
- dst->prev()->appendChain(std::move(src));
+ IOBuf* tail = dst->prev();
+ if (pack) {
+ // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
+ // reduce wastage (the tail's tailroom and the head's headroom) when
+ // joining two IOBufQueues together.
+ size_t copyRemaining = MAX_PACK_COPY;
+ uint32_t n;
+ while (src &&
+ (n = src->length()) < copyRemaining &&
+ n < tail->tailroom()) {
+ memcpy(tail->writableTail(), src->data(), n);
+ tail->append(n);
+ copyRemaining -= n;
+ src = src->pop();
+ }
+ }
+ if (src) {
+ tail->appendChain(std::move(src));
+ }
}
}
}
void
-IOBufQueue::append(unique_ptr<IOBuf>&& buf) {
+IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
if (!buf) {
return;
}
if (options_.cacheChainLength) {
chainLength_ += buf->computeChainDataLength();
}
- appendToChain(head_, std::move(buf));
+ appendToChain(head_, std::move(buf), pack);
}
void
-IOBufQueue::append(IOBufQueue& other) {
+IOBufQueue::append(IOBufQueue& other, bool pack) {
if (!other.head_) {
return;
}
chainLength_ += other.head_->computeChainDataLength();
}
}
- appendToChain(head_, std::move(other.head_));
+ appendToChain(head_, std::move(other.head_), pack);
other.chainLength_ = 0;
}
(head_->prev()->tailroom() == 0)) {
appendToChain(head_, std::move(
IOBuf::create(std::max(MIN_ALLOC_SIZE,
- std::min(len, MAX_ALLOC_SIZE)))));
+ std::min(len, MAX_ALLOC_SIZE)))),
+ false);
}
IOBuf* last = head_->prev();
uint32_t copyLen = std::min(len, (size_t)last->tailroom());
}
// Allocate a new buffer of the requested max size.
unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
- appendToChain(head_, std::move(newBuf));
+ appendToChain(head_, std::move(newBuf), false);
IOBuf* last = head_->prev();
return make_pair(last->writableTail(),
std::min(max, last->tailroom()));
chainLength_ -= head_->length();
}
unique_ptr<IOBuf> remainder = head_->pop();
- appendToChain(result, std::move(head_));
+ appendToChain(result, std::move(head_), false);
head_ = std::move(remainder);
} else {
unique_ptr<IOBuf> clone = head_->cloneOne();
clone->trimEnd(clone->length() - n);
- appendToChain(result, std::move(clone));
+ appendToChain(result, std::move(clone), false);
head_->trimStart(n);
if (options_.cacheChainLength) {
chainLength_ -= n;
if (options_.cacheChainLength) {
chainLength_ -= head_->prev()->length();
}
- unique_ptr<IOBuf> b = head_->prev()->unlink();
- // Null queue if we unlinked the head.
- if (b.get() == head_.get()) {
+ if (head_->isChained()) {
+ head_->prev()->unlink();
+ } else {
head_.reset();
}
}
/**
* Add a buffer or buffer chain to the end of this queue. The
* queue takes ownership of buf.
+ *
+ * If pack is true, we try to reduce wastage at the end of this queue
+ * by copying some data from the first buffers in the buf chain (and
+ * releasing the buffers), if possible. If pack is false, we leave
+ * the chain topology unchanged.
*/
- void append(std::unique_ptr<folly::IOBuf>&& buf);
+ void append(std::unique_ptr<folly::IOBuf>&& buf,
+ bool pack=false);
/**
* Add a queue to the end of this queue. The queue takes ownership of
* all buffers from the other queue.
*/
- void append(IOBufQueue& other);
- void append(IOBufQueue&& other) {
- append(other); // call lvalue reference overload, above
+ void append(IOBufQueue& other, bool pack=false);
+ void append(IOBufQueue&& other, bool pack=false) {
+ append(other, pack); // call lvalue reference overload, above
}
/**
iob->length()));
}
-TEST(IOBufQueue, trim) {
+TEST(IOBufQueue, Trim) {
IOBufQueue queue(clOptions);
unique_ptr<IOBuf> a = IOBuf::create(4);
a->append(4);
checkConsistency(queue);
}
+TEST(IOBufQueue, TrimPack) {
+ IOBufQueue queue(clOptions);
+ unique_ptr<IOBuf> a = IOBuf::create(64);
+ a->append(4);
+ queue.append(std::move(a), true);
+ checkConsistency(queue);
+ a = IOBuf::create(6);
+ a->append(6);
+ queue.append(std::move(a), true);
+ checkConsistency(queue);
+ a = IOBuf::create(8);
+ a->append(8);
+ queue.append(std::move(a), true);
+ checkConsistency(queue);
+ a = IOBuf::create(10);
+ a->append(10);
+ queue.append(std::move(a), true);
+ checkConsistency(queue);
+
+ EXPECT_EQ(1, queue.front()->countChainElements());
+ EXPECT_EQ(28, queue.front()->computeChainDataLength());
+ EXPECT_EQ(28, queue.front()->length());
+
+ queue.trimStart(1);
+ checkConsistency(queue);
+ EXPECT_EQ(1, queue.front()->countChainElements());
+ EXPECT_EQ(27, queue.front()->computeChainDataLength());
+ EXPECT_EQ(27, queue.front()->length());
+
+ queue.trimStart(5);
+ checkConsistency(queue);
+ EXPECT_EQ(1, queue.front()->countChainElements());
+ EXPECT_EQ(22, queue.front()->computeChainDataLength());
+ EXPECT_EQ(22, queue.front()->length());
+
+ queue.trimEnd(1);
+ checkConsistency(queue);
+ EXPECT_EQ(1, queue.front()->countChainElements());
+ EXPECT_EQ(21, queue.front()->computeChainDataLength());
+ EXPECT_EQ(21, queue.front()->prev()->length());
+
+ queue.trimEnd(20);
+ checkConsistency(queue);
+ EXPECT_EQ(1, queue.front()->countChainElements());
+ EXPECT_EQ(1, queue.front()->computeChainDataLength());
+ EXPECT_EQ(1, queue.front()->prev()->length());
+
+ queue.trimEnd(1);
+ checkConsistency(queue);
+ EXPECT_EQ(NULL, queue.front());
+
+ EXPECT_THROW(queue.trimStart(2), std::underflow_error);
+ checkConsistency(queue);
+
+ EXPECT_THROW(queue.trimEnd(30), std::underflow_error);
+ checkConsistency(queue);
+}
+
TEST(IOBufQueue, Prepend) {
folly::IOBufQueue queue;