2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/ScopeGuard.h>
20 #include <folly/io/IOBuf.h>
28 * An IOBufQueue encapsulates a chain of IOBufs and provides
29 * convenience functions to append data to the back of the chain
30 * and remove data from the front.
32 * You may also prepend data into the headroom of the first buffer in the
38 * This guard should be taken by any method that intends to do any changes
39 * to in data_ (e.g. appending to it).
41 * It flushes the writable tail cache and refills it on destruction.
45 return folly::makeGuard([this] { updateWritableTailCache(); });
48 struct WritableRangeCacheData {
49 std::pair<uint8_t*, uint8_t*> cachedRange;
52 WritableRangeCacheData() = default;
54 WritableRangeCacheData(WritableRangeCacheData&& other)
55 : cachedRange(other.cachedRange), attached(other.attached) {
56 other.cachedRange = {};
57 other.attached = false;
59 WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
60 cachedRange = other.cachedRange;
61 attached = other.attached;
63 other.cachedRange = {};
64 other.attached = false;
69 WritableRangeCacheData(const WritableRangeCacheData&) = delete;
70 WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
75 Options() : cacheChainLength(false) {}
76 bool cacheChainLength;
80 * Commonly used Options, currently the only possible value other than
83 static Options cacheChainLength() {
85 options.cacheChainLength = true;
90 * WritableRangeCache represents a cache of current writable tail and provides
91 * cheap and simple interface to append to it that avoids paying the cost of
92 * preallocate/postallocate pair (i.e. indirections and checks).
94 * The cache is flushed on destruction/copy/move and on non-const accesses to
95 * the underlying IOBufQueue.
97 * Note: there can be only one active cache for a given IOBufQueue, i.e. when
98 * you fill a cache object it automatically invalidates other
101 class WritableRangeCache {
103 explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
110 * Move constructor/assignment can move the cached range, but must update
111 * the reference in IOBufQueue.
113 WritableRangeCache(WritableRangeCache&& other)
114 : data_(std::move(other.data_)), queue_(other.queue_) {
115 if (data_.attached) {
116 queue_->updateCacheRef(data_);
119 WritableRangeCache& operator=(WritableRangeCache&& other) {
120 if (data_.attached) {
121 queue_->clearWritableRangeCache();
124 data_ = std::move(other.data_);
125 queue_ = other.queue_;
127 if (data_.attached) {
128 queue_->updateCacheRef(data_);
135 * Copy constructor/assignment cannot copy the cached range.
137 WritableRangeCache(const WritableRangeCache& other)
138 : queue_(other.queue_) {}
139 WritableRangeCache& operator=(const WritableRangeCache& other) {
140 if (data_.attached) {
141 queue_->clearWritableRangeCache();
144 queue_ = other.queue_;
149 ~WritableRangeCache() {
150 if (data_.attached) {
151 queue_->clearWritableRangeCache();
156 * Reset the underlying IOBufQueue, will flush current cache if present.
158 void reset(IOBufQueue* q) {
159 if (data_.attached) {
160 queue_->clearWritableRangeCache();
171 * Get a pointer to the underlying IOBufQueue object.
173 IOBufQueue* queue() {
178 * Return a pointer to the start of cached writable tail.
180 * Note: doesn't populate cache.
182 uint8_t* writableData() {
184 return data_.cachedRange.first;
188 * Return a length of cached writable tail.
190 * Note: doesn't populate cache.
194 return data_.cachedRange.second - data_.cachedRange.first;
198 * Mark n bytes as occupied (e.g. postallocate).
200 void append(size_t n) {
202 // This can happen only if somebody is misusing the interface.
203 // E.g. calling append after touching IOBufQueue or without checking
205 if (LIKELY(data_.cachedRange.first != nullptr)) {
206 DCHECK_LE(n, length());
207 data_.cachedRange.first += n;
214 * Same as append(n), but avoids checking if there is a cache.
215 * The caller must guarantee that the cache is set (e.g. the caller just
216 * called fillCache or checked that it's not empty).
218 void appendUnsafe(size_t n) {
219 data_.cachedRange.first += n;
223 * Fill the cache of writable tail from the underlying IOBufQueue.
226 queue_->fillWritableRangeCache(data_);
230 WritableRangeCacheData data_;
233 FOLLY_NOINLINE void appendSlow(size_t n) {
234 queue_->postallocate(n);
237 void dcheckIntegrity() {
238 // Tail start should always be less than tail end.
240 (void*)data_.cachedRange.first, (void*)data_.cachedRange.second);
242 data_.cachedRange.first != nullptr ||
243 data_.cachedRange.second == nullptr);
245 // Cached range should be always empty if the cache is not attached.
248 (data_.cachedRange.first == nullptr &&
249 data_.cachedRange.second == nullptr));
251 // We cannot be in attached state if the queue_ is not set.
252 DCHECK(queue_ != nullptr || !data_.attached);
254 // If we're attached and the cache is not empty, then it should coincide
255 // with the tail buffer.
257 !data_.attached || data_.cachedRange.first == nullptr ||
258 (queue_->head_ != nullptr &&
259 data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
260 data_.cachedRange.second ==
261 queue_->head_->prev()->writableTail() +
262 queue_->head_->prev()->tailroom()));
266 explicit IOBufQueue(const Options& options = Options());
270 * Return a space to prepend bytes and the amount of headroom available.
272 std::pair<void*, uint64_t> headroom();
275 * Indicate that n bytes from the headroom have been used.
277 void markPrepended(uint64_t n);
280 * Prepend an existing range; throws std::overflow_error if not enough
283 void prepend(const void* buf, uint64_t n);
286 * Add a buffer or buffer chain to the end of this queue. The
287 * queue takes ownership of buf.
289 * If pack is true, we try to reduce wastage at the end of this queue
290 * by copying some data from the first buffers in the buf chain (and
291 * releasing the buffers), if possible. If pack is false, we leave
292 * the chain topology unchanged.
294 void append(std::unique_ptr<folly::IOBuf>&& buf,
298 * Add a queue to the end of this queue. The queue takes ownership of
299 * all buffers from the other queue.
301 void append(IOBufQueue& other, bool pack=false);
302 void append(IOBufQueue&& other, bool pack=false) {
303 append(other, pack); // call lvalue reference overload, above
307 * Copy len bytes, starting at buf, to the end of this queue.
308 * The caller retains ownership of the source data.
310 void append(const void* buf, size_t len);
313 * Copy a string to the end of this queue.
314 * The caller retains ownership of the source data.
316 void append(StringPiece sp) {
317 append(sp.data(), sp.size());
321 * Append a chain of IOBuf objects that point to consecutive regions
324 * Just like IOBuf::wrapBuffer, this should only be used when the caller
325 * knows ahead of time and can ensure that all IOBuf objects that will point
326 * to this buffer will be destroyed before the buffer itself is destroyed;
327 * all other caveats from wrapBuffer also apply.
329 * Every buffer except for the last will wrap exactly blockSize bytes.
330 * Importantly, this method may be used to wrap buffers larger than 4GB.
332 void wrapBuffer(const void* buf, size_t len,
333 uint64_t blockSize=(1U << 31)); // default block size: 2GB
336 * Obtain a writable block of contiguous bytes at the end of this
337 * queue, allocating more space if necessary. The amount of space
338 * reserved will be at least min. If min contiguous space is not
339 * available at the end of the queue, and IOBuf with size newAllocationSize
340 * is appended to the chain and returned. The actual available space
341 * may be larger than newAllocationSize, but will be truncated to max,
344 * If the caller subsequently writes anything into the returned space,
345 * it must call the postallocate() method.
347 * @return The starting address of the block and the length in bytes.
349 * @note The point of the preallocate()/postallocate() mechanism is
350 * to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback
351 * that request a buffer from the application and then, in a later
352 * callback, tell the application how much of the buffer they've
355 std::pair<void*,uint64_t> preallocate(
356 uint64_t min, uint64_t newAllocationSize,
357 uint64_t max = std::numeric_limits<uint64_t>::max()) {
358 dcheckCacheIntegrity();
360 if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
361 return std::make_pair(
362 writableTail(), std::min<uint64_t>(max, tailroom()));
365 return preallocateSlow(min, newAllocationSize, max);
369 * Tell the queue that the caller has written data into the first n
370 * bytes provided by the previous preallocate() call.
372 * @note n should be less than or equal to the size returned by
373 * preallocate(). If n is zero, the caller may skip the call
374 * to postallocate(). If n is nonzero, the caller must not
375 * invoke any other non-const methods on this IOBufQueue between
376 * the call to preallocate and the call to postallocate().
378 void postallocate(uint64_t n) {
379 dcheckCacheIntegrity();
381 (void*)(cachePtr_->cachedRange.first + n),
382 (void*)cachePtr_->cachedRange.second);
383 cachePtr_->cachedRange.first += n;
387 * Obtain a writable block of n contiguous bytes, allocating more space
388 * if necessary, and mark it as used. The caller can fill it later.
390 void* allocate(uint64_t n) {
391 void* p = preallocate(n, n).first;
396 void* writableTail() const {
397 dcheckCacheIntegrity();
398 return cachePtr_->cachedRange.first;
401 size_t tailroom() const {
402 dcheckCacheIntegrity();
403 return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
407 * Split off the first n bytes of the queue into a separate IOBuf chain,
408 * and transfer ownership of the new chain to the caller. The IOBufQueue
409 * retains ownership of everything after the split point.
411 * @warning If the split point lies in the middle of some IOBuf within
412 * the chain, this function may, as an implementation detail,
415 * @throws std::underflow_error if n exceeds the number of bytes
418 std::unique_ptr<folly::IOBuf> split(size_t n) {
419 return split(n, true);
423 * Similar to split, but will return the entire queue instead of throwing
424 * if n exceeds the number of bytes in the queue.
426 std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
427 return split(n, false);
431 * Similar to IOBuf::trimStart, but works on the whole queue. Will
432 * pop off buffers that have been completely trimmed.
434 void trimStart(size_t amount);
437 * Similar to trimStart, but will trim at most amount bytes and returns
438 * the number of bytes trimmed.
440 size_t trimStartAtMost(size_t amount);
443 * Similar to IOBuf::trimEnd, but works on the whole queue. Will
444 * pop off buffers that have been completely trimmed.
446 void trimEnd(size_t amount);
449 * Similar to trimEnd, but will trim at most amount bytes and returns
450 * the number of bytes trimmed.
452 size_t trimEndAtMost(size_t amount);
455 * Transfer ownership of the queue's entire IOBuf chain to the caller.
457 std::unique_ptr<folly::IOBuf> move() {
458 auto guard = updateGuard();
459 std::unique_ptr<folly::IOBuf> res = std::move(head_);
465 * Access the front IOBuf.
467 * Note: caller will see the current state of the chain, but may not see
468 * future updates immediately, due to the presence of a tail cache.
469 * Note: the caller may potentially clone the chain, thus marking all buffers
470 * as shared. We may still continue writing to the tail of the last
471 * IOBuf without checking if it's shared, but this is fine, since the
472 * cloned IOBufs won't reference that data.
474 const folly::IOBuf* front() const {
480 * returns the first IOBuf in the chain and removes it from the chain
482 * @return first IOBuf in the chain or nullptr if none.
484 std::unique_ptr<folly::IOBuf> pop_front();
487 * Total chain length, only valid if cacheLength was specified in the
490 size_t chainLength() const {
491 if (UNLIKELY(!options_.cacheChainLength)) {
492 throw std::invalid_argument("IOBufQueue: chain length not cached");
494 dcheckCacheIntegrity();
495 return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
499 * Returns true iff the IOBuf chain length is 0.
502 dcheckCacheIntegrity();
504 (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
507 const Options& options() const {
512 * Clear the queue. Note that this does not release the buffers, it
513 * just sets their length to zero; useful if you want to reuse the
514 * same queue without reallocating.
519 * Append the queue to a std::string. Non-destructive.
521 void appendToString(std::string& out) const;
524 * Calls IOBuf::gather() on the head of the queue, if it exists.
526 void gather(uint64_t maxLength);
529 IOBufQueue(IOBufQueue&&) noexcept;
530 IOBufQueue& operator=(IOBufQueue&&);
533 std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
535 static const size_t kChainLengthNotCached = (size_t)-1;
537 IOBufQueue(const IOBufQueue&) = delete;
538 IOBufQueue& operator=(const IOBufQueue&) = delete;
542 // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
543 // because doing it unchecked in postallocate() is faster (no (mis)predicted
545 mutable size_t chainLength_{0};
547 * Everything that has been appended but not yet discarded or moved out
548 * Note: anything that needs to operate on a tail should either call
549 * flushCache() or grab updateGuard() (it will flush the cache itself).
551 std::unique_ptr<folly::IOBuf> head_;
553 mutable uint8_t* tailStart_{nullptr};
554 WritableRangeCacheData* cachePtr_{nullptr};
555 WritableRangeCacheData localCache_;
557 void dcheckCacheIntegrity() const {
558 // Tail start should always be less than tail end.
559 DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first);
561 (void*)cachePtr_->cachedRange.first,
562 (void*)cachePtr_->cachedRange.second);
564 cachePtr_->cachedRange.first != nullptr ||
565 cachePtr_->cachedRange.second == nullptr);
567 // There is always an attached cache instance.
568 DCHECK(cachePtr_->attached);
570 // Either cache is empty or it coincides with the tail.
572 cachePtr_->cachedRange.first == nullptr ||
573 (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
574 tailStart_ <= cachePtr_->cachedRange.first &&
575 cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
576 cachePtr_->cachedRange.second ==
577 head_->prev()->writableTail() + head_->prev()->tailroom()));
581 * Populate dest with writable tail range cache.
583 void fillWritableRangeCache(WritableRangeCacheData& dest) {
584 dcheckCacheIntegrity();
585 if (cachePtr_ != &dest) {
586 dest = std::move(*cachePtr_);
592 * Clear current writable tail cache and reset it to localCache_
594 void clearWritableRangeCache() {
597 if (cachePtr_ != &localCache_) {
598 localCache_ = std::move(*cachePtr_);
599 cachePtr_ = &localCache_;
602 DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
606 * Commit any pending changes to the tail of the queue.
608 void flushCache() const {
609 dcheckCacheIntegrity();
611 if (tailStart_ != cachePtr_->cachedRange.first) {
612 auto buf = head_->prev();
614 (void*)(buf->writableTail() + buf->tailroom()),
615 (void*)cachePtr_->cachedRange.second);
616 auto len = cachePtr_->cachedRange.first - tailStart_;
623 // For WritableRangeCache move assignment/construction.
624 void updateCacheRef(WritableRangeCacheData& newRef) {
629 * Update cached writable tail range. Called by updateGuard()
631 void updateWritableTailCache() {
632 if (LIKELY(head_ != nullptr)) {
633 IOBuf* buf = head_->prev();
634 if (LIKELY(!buf->isSharedOne())) {
635 tailStart_ = buf->writableTail();
636 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
637 tailStart_, tailStart_ + buf->tailroom());
641 tailStart_ = nullptr;
642 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
645 std::pair<void*, uint64_t>
646 preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max);