typename std::enable_if<std::is_integral<T>::value>::type
write(T value) {
const uint8_t* u8 = reinterpret_cast<const uint8_t*>(&value);
+ Derived* d = static_cast<Derived*>(this);
push(u8, sizeof(T));
}
template <class T>
void writeBE(T value) {
- write(Endian::big(value));
+ Derived* d = static_cast<Derived*>(this);
+ d->write(Endian::big(value));
}
template <class T>
void writeLE(T value) {
- write(Endian::little(value));
+ Derived* d = static_cast<Derived*>(this);
+ d->write(Endian::little(value));
}
void push(const uint8_t* buf, size_t len) {
* Create an Appender that writes to a IOBufQueue. When we allocate
* space in the queue, we grow no more than growth bytes at once
* (unless you call ensure() with a bigger value yourself).
- * Throw if we ever need to allocate more than maxTotalGrowth.
*/
- QueueAppender(IOBufQueue* queue,
- uint32_t growth,
- size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
- reset(queue, growth, maxTotalGrowth);
+ QueueAppender(IOBufQueue* queue, uint32_t growth) {
+ reset(queue, growth);
}
- void reset(IOBufQueue* queue,
- uint32_t growth,
- size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
+ void reset(IOBufQueue* queue, uint32_t growth) {
queue_ = queue;
growth_ = growth;
- remainingGrowth_ = maxTotalGrowth;
- next_ = nullptr;
- available_ = 0;
}
- uint8_t* writableData() { return next_; }
+ uint8_t* writableData() {
+ return static_cast<uint8_t*>(queue_->writableTail());
+ }
- size_t length() const { return available_; }
+ size_t length() const { return queue_->tailroom(); }
- void append(size_t n) {
- assert(n <= available_);
- assert(n <= remainingGrowth_);
- queue_->postallocate(n);
- next_ += n;
- available_ -= n;
- remainingGrowth_ -= n;
- }
+ void append(size_t n) { queue_->postallocate(n); }
// Ensure at least n contiguous; can go above growth_, throws if
// not enough room.
- void ensure(uint32_t n) {
- if (UNLIKELY(n > remainingGrowth_)) {
- throw std::out_of_range("overflow");
- }
-
- if (LIKELY(length() >= n)) {
- return;
- }
+ void ensure(uint32_t n) { queue_->preallocate(n, growth_); }
- size_t desired = std::min(growth_, remainingGrowth_ - n);
-
- // Grab some more.
- auto p = queue_->preallocate(n, desired);
-
- next_ = static_cast<uint8_t*>(p.first);
- available_ = p.second;
+ template <class T>
+ typename std::enable_if<std::is_integral<T>::value>::type
+ write(T value) {
+ // We can't fail.
+ auto p = queue_->preallocate(sizeof(T), growth_);
+ storeUnaligned(p.first, value);
+ queue_->postallocate(sizeof(T));
}
- size_t pushAtMost(const uint8_t* buf, size_t len) {
- if (UNLIKELY(len > remainingGrowth_)) {
- len = remainingGrowth_;
- }
+ size_t pushAtMost(const uint8_t* buf, size_t len) {
size_t remaining = len;
while (remaining != 0) {
- ensure(std::min(remaining, growth_));
- size_t n = std::min(remaining, available_);
- memcpy(next_, buf, n);
- buf += n;
- remaining -= n;
- append(n);
+ auto p = queue_->preallocate(std::min(remaining, growth_),
+ growth_,
+ remaining);
+ memcpy(p.first, buf, p.second);
+ queue_->postallocate(p.second);
+ buf += p.second;
+ remaining -= p.second;
}
return len;
}
- // insert doesn't count towards maxTotalGrowth
void insert(std::unique_ptr<folly::IOBuf> buf) {
if (buf) {
queue_->append(std::move(buf), true);
- next_ = nullptr;
- available_ = 0;
}
}
private:
folly::IOBufQueue* queue_;
size_t growth_;
- size_t remainingGrowth_;
- uint8_t* next_;
- size_t available_;
};
}} // folly::io
unique_ptr<IOBuf> IOBuf::cloneOne() const {
if (flags_ & kFlagExt) {
+ if (ext_.sharedInfo) {
+ flags_ |= kFlagMaybeShared;
+ }
unique_ptr<IOBuf> iobuf(new IOBuf(static_cast<ExtBufTypeEnum>(ext_.type),
flags_, ext_.buf, ext_.capacity,
data_, length_,
void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) {
size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom;
- CHECK_LT(newCapacity, UINT32_MAX);
+ DCHECK_LT(newCapacity, UINT32_MAX);
// We'll need to reallocate the buffer.
// There are a few options.
* This does not modify any actual data in the buffer.
*/
void prepend(uint32_t amount) {
- CHECK(amount <= headroom());
+ DCHECK_LE(amount, headroom());
data_ -= amount;
length_ += amount;
}
* This does not modify any actual data in the buffer.
*/
void append(uint32_t amount) {
- CHECK(amount <= tailroom());
+ DCHECK_LE(amount, tailroom());
length_ += amount;
}
* This does not modify any actual data in the buffer.
*/
void trimStart(uint32_t amount) {
- CHECK(amount <= length_);
+ DCHECK_LE(amount, length_);
data_ += amount;
length_ -= amount;
}
* This does not modify any actual data in the buffer.
*/
void trimEnd(uint32_t amount) {
- CHECK(amount <= length_);
+ DCHECK_LE(amount, length_);
length_ -= amount;
}
* This only checks the current IOBuf, and not other IOBufs in the chain.
*/
bool isSharedOne() const {
+ if (LIKELY(flags_ & (kFlagUserOwned | kFlagMaybeShared)) == 0) {
+ return false;
+ }
+
// If this is a user-owned buffer, it is always considered shared
if (flags_ & kFlagUserOwned) {
return true;
}
- if (flags_ & kFlagExt) {
- return ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1;
- } else {
- return false;
+ // an internal buffer wouldn't have kFlagMaybeShared or kFlagUserOwned
+ // so we would have returned false already. The only remaining case
+ // is an external buffer which may be shared, so we need to read
+ // the reference count.
+ assert((flags_ & (kFlagExt | kFlagMaybeShared)) ==
+ (kFlagExt | kFlagMaybeShared));
+
+ bool shared =
+ ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1;
+ if (!shared) {
+ // we're the last one left
+ flags_ &= ~kFlagMaybeShared;
}
+ return shared;
}
/**
Iterator end() const;
private:
- enum FlagsEnum {
+ enum FlagsEnum : uint32_t {
kFlagExt = 0x1,
kFlagUserOwned = 0x2,
kFlagFreeSharedInfo = 0x4,
+ kFlagMaybeShared = 0x8,
};
// Values for the ExternalBuf type field.
*/
uint8_t* data_;
uint32_t length_;
- uint32_t flags_;
+ mutable uint32_t flags_;
union {
ExternalBuf ext_;
std::unique_ptr<IOBuf>>::type
IOBuf::takeOwnership(UniquePtr&& buf, size_t count) {
size_t size = count * sizeof(typename UniquePtr::element_type);
- CHECK_LT(size, size_t(std::numeric_limits<uint32_t>::max()));
+ DCHECK_LT(size, size_t(std::numeric_limits<uint32_t>::max()));
auto deleter = new UniquePtrDeleter<UniquePtr>(buf.get_deleter());
return takeOwnership(buf.release(),
size,
}
assert(head_);
head_->prepend(n);
- if (options_.cacheChainLength) {
- chainLength_ += n;
- }
+ chainLength_ += n;
}
void
memcpy(last->writableTail(), src, copyLen);
src += copyLen;
last->append(copyLen);
- if (options_.cacheChainLength) {
- chainLength_ += copyLen;
- }
+ chainLength_ += copyLen;
len -= copyLen;
}
}
}
pair<void*,uint32_t>
-IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize,
- uint32_t max) {
- if (head_ != nullptr) {
- // If there's enough space left over at the end of the queue, use that.
- IOBuf* last = head_->prev();
- if (!last->isSharedOne()) {
- uint32_t avail = last->tailroom();
- if (avail >= min) {
- return make_pair(last->writableTail(), std::min(max, avail));
- }
- }
- }
+IOBufQueue::preallocateSlow(uint32_t min, uint32_t newAllocationSize,
+ uint32_t max) {
// Allocate a new buffer of the requested max size.
unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
appendToChain(head_, std::move(newBuf), false);
std::min(max, last->tailroom()));
}
-void
-IOBufQueue::postallocate(uint32_t n) {
- head_->prev()->append(n);
- if (options_.cacheChainLength) {
- chainLength_ += n;
- }
-}
-
unique_ptr<IOBuf>
IOBufQueue::split(size_t n) {
unique_ptr<IOBuf> result;
"Attempt to remove more bytes than are present in IOBufQueue");
} else if (head_->length() <= n) {
n -= head_->length();
- if (options_.cacheChainLength) {
- chainLength_ -= head_->length();
- }
+ chainLength_ -= head_->length();
unique_ptr<IOBuf> remainder = head_->pop();
appendToChain(result, std::move(head_), false);
head_ = std::move(remainder);
clone->trimEnd(clone->length() - n);
appendToChain(result, std::move(clone), false);
head_->trimStart(n);
- if (options_.cacheChainLength) {
- chainLength_ -= n;
- }
+ chainLength_ -= n;
break;
}
}
}
if (head_->length() > amount) {
head_->trimStart(amount);
- if (options_.cacheChainLength) {
- chainLength_ -= amount;
- }
+ chainLength_ -= amount;
break;
}
amount -= head_->length();
- if (options_.cacheChainLength) {
- chainLength_ -= head_->length();
- }
+ chainLength_ -= head_->length();
head_ = head_->pop();
}
}
}
if (head_->prev()->length() > amount) {
head_->prev()->trimEnd(amount);
- if (options_.cacheChainLength) {
- chainLength_ -= amount;
- }
+ chainLength_ -= amount;
break;
}
amount -= head_->prev()->length();
- if (options_.cacheChainLength) {
- chainLength_ -= head_->prev()->length();
- }
+ chainLength_ -= head_->prev()->length();
if (head_->isChained()) {
head_->prev()->unlink();
if (!head_) {
return nullptr;
}
- if (options_.cacheChainLength) {
- chainLength_ -= head_->length();
- }
+ chainLength_ -= head_->length();
std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
head_ = retBuf->pop();
return retBuf;
buf->clear();
buf = buf->next();
} while (buf != head_.get());
- if (options_.cacheChainLength) {
- chainLength_ = 0;
- }
+ chainLength_ = 0;
}
} // folly
*/
std::pair<void*,uint32_t> preallocate(
uint32_t min, uint32_t newAllocationSize,
- uint32_t max = std::numeric_limits<uint32_t>::max());
+ uint32_t max = std::numeric_limits<uint32_t>::max()) {
+ auto buf = tailBuf();
+ if (LIKELY(buf && buf->tailroom() >= min)) {
+ return std::make_pair(buf->writableTail(),
+ std::min(max, buf->tailroom()));
+ }
+
+ return preallocateSlow(min, newAllocationSize, max);
+ }
/**
* Tell the queue that the caller has written data into the first n
* invoke any other non-const methods on this IOBufQueue between
* the call to preallocate and the call to postallocate().
*/
- void postallocate(uint32_t n);
+ void postallocate(uint32_t n) {
+ head_->prev()->append(n);
+ chainLength_ += n;
+ }
/**
* Obtain a writable block of n contiguous bytes, allocating more space
return p;
}
+ void* writableTail() const {
+ auto buf = tailBuf();
+ return buf ? buf->writableTail() : nullptr;
+ }
+
+ size_t tailroom() const {
+ auto buf = tailBuf();
+ return buf ? buf->tailroom() : 0;
+ }
+
/**
* Split off the first n bytes of the queue into a separate IOBuf chain,
* and transfer ownership of the new chain to the caller. The IOBufQueue
* constructor.
*/
size_t chainLength() const {
- if (!options_.cacheChainLength) {
+ if (UNLIKELY(!options_.cacheChainLength)) {
throw std::invalid_argument("IOBufQueue: chain length not cached");
}
return chainLength_;
IOBufQueue& operator=(IOBufQueue&&);
private:
+ IOBuf* tailBuf() const {
+ if (UNLIKELY(!head_)) return nullptr;
+ IOBuf* buf = head_->prev();
+ return LIKELY(!buf->isSharedOne()) ? buf : nullptr;
+ }
+ std::pair<void*,uint32_t> preallocateSlow(
+ uint32_t min, uint32_t newAllocationSize, uint32_t max);
+
static const size_t kChainLengthNotCached = (size_t)-1;
/** Not copyable */
IOBufQueue(const IOBufQueue&) = delete;
IOBufQueue& operator=(const IOBufQueue&) = delete;
Options options_;
+
+ // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
+ // because doing it unchecked in postallocate() is faster (no (mis)predicted
+ // branch)
size_t chainLength_;
/** Everything that has been appended but not yet discarded or moved out */
std::unique_ptr<folly::IOBuf> head_;
folly::IOBufQueue queue;
// Allocate 100 bytes at once, but don't grow past 1024
- QueueAppender app(&queue, 100, 1024);
+ QueueAppender app(&queue, 100);
size_t n = 1024 / sizeof(uint32_t);
for (uint32_t i = 0; i < n; ++i) {
app.writeBE(i);
}
- EXPECT_THROW({app.writeBE(0);}, std::out_of_range);
-
// There must be a goodMallocSize between 100 and 1024...
EXPECT_LT(1, queue.front()->countChainElements());
const IOBuf* buf = queue.front();