add function to push data from another cursor
[folly.git] / folly / io / Cursor.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_CURSOR_H
18 #define FOLLY_CURSOR_H
19
20 #include <assert.h>
21 #include <stdexcept>
22 #include <string.h>
23 #include <type_traits>
24 #include <memory>
25
26 #include <folly/Bits.h>
27 #include <folly/io/IOBuf.h>
28 #include <folly/io/IOBufQueue.h>
29 #include <folly/Likely.h>
30 #include <folly/Memory.h>
31
32 /**
33  * Cursor class for fast iteration over IOBuf chains.
34  *
35  * Cursor - Read-only access
36  *
37  * RWPrivateCursor - Read-write access, assumes private access to IOBuf chain
38  * RWUnshareCursor - Read-write access, calls unshare on write (COW)
39  * Appender        - Write access, assumes private access to IOBuf chian
40  *
41  * Note that RW cursors write in the preallocated part of buffers (that is,
42  * between the buffer's data() and tail()), while Appenders append to the end
43  * of the buffer (between the buffer's tail() and bufferEnd()).  Appenders
44  * automatically adjust the buffer pointers, so you may only use one
45  * Appender with a buffer chain; for this reason, Appenders assume private
46  * access to the buffer (you need to call unshare() yourself if necessary).
47  **/
48 namespace folly { namespace io {
49 namespace detail {
50
51 template <class Derived, typename BufType>
52 class CursorBase {
53  public:
54   const uint8_t* data() const {
55     return crtBuf_->data() + offset_;
56   }
57
58   /*
59    * Return the remaining space available in the current IOBuf.
60    *
61    * May return 0 if the cursor is at the end of an IOBuf.  Use peek() instead
62    * if you want to avoid this.  peek() will advance to the next non-empty
63    * IOBuf (up to the end of the chain) if the cursor is currently pointing at
64    * the end of a buffer.
65    */
66   size_t length() const {
67     return crtBuf_->length() - offset_;
68   }
69
70   /*
71    * Return the space available until the end of the entire IOBuf chain.
72    */
73   size_t totalLength() const {
74     if (crtBuf_ == buffer_) {
75       return crtBuf_->computeChainDataLength() - offset_;
76     }
77     CursorBase end(buffer_->prev());
78     end.offset_ = end.buffer_->length();
79     return end - *this;
80   }
81
82   Derived& operator+=(size_t offset) {
83     Derived* p = static_cast<Derived*>(this);
84     p->skip(offset);
85     return *p;
86   }
87   Derived operator+(size_t offset) const {
88     Derived other(*this);
89     other.skip(offset);
90     return other;
91   }
92
93   /**
94    * Compare cursors for equality/inequality.
95    *
96    * Two cursors are equal if they are pointing to the same location in the
97    * same IOBuf chain.
98    */
99   bool operator==(const Derived& other) const {
100     return (offset_ == other.offset_) && (crtBuf_ == other.crtBuf_);
101   }
102   bool operator!=(const Derived& other) const {
103     return !operator==(other);
104   }
105
106   template <class T>
107   typename std::enable_if<std::is_arithmetic<T>::value, T>::type
108   read() {
109     T val;
110     pull(&val, sizeof(T));
111     return val;
112   }
113
114   template <class T>
115   T readBE() {
116     return Endian::big(read<T>());
117   }
118
119   template <class T>
120   T readLE() {
121     return Endian::little(read<T>());
122   }
123
124   /**
125    * Read a fixed-length string.
126    *
127    * The std::string-based APIs should probably be avoided unless you
128    * ultimately want the data to live in an std::string. You're better off
129    * using the pull() APIs to copy into a raw buffer otherwise.
130    */
131   std::string readFixedString(size_t len) {
132     std::string str;
133
134     str.reserve(len);
135     for (;;) {
136       // Fast path: it all fits in one buffer.
137       size_t available = length();
138       if (LIKELY(available >= len)) {
139         str.append(reinterpret_cast<const char*>(data()), len);
140         offset_ += len;
141         return str;
142       }
143
144       str.append(reinterpret_cast<const char*>(data()), available);
145       if (UNLIKELY(!tryAdvanceBuffer())) {
146         throw std::out_of_range("string underflow");
147       }
148       len -= available;
149     }
150   }
151
152   /**
153    * Read a string consisting of bytes until the given terminator character is
154    * seen. Raises an std::length_error if maxLength bytes have been processed
155    * before the terminator is seen.
156    *
157    * See comments in readFixedString() about when it's appropriate to use this
158    * vs. using pull().
159    */
160   std::string readTerminatedString(
161     char termChar = '\0',
162     size_t maxLength = std::numeric_limits<size_t>::max()) {
163     std::string str;
164
165     for (;;) {
166       const uint8_t* buf = data();
167       size_t buflen = length();
168
169       size_t i = 0;
170       while (i < buflen && buf[i] != termChar) {
171         ++i;
172
173         // Do this check after incrementing 'i', as even though we start at the
174         // 0 byte, it still represents a single character
175         if (str.length() + i >= maxLength) {
176           throw std::length_error("string overflow");
177         }
178       }
179
180       str.append(reinterpret_cast<const char*>(buf), i);
181       if (i < buflen) {
182         skip(i + 1);
183         return str;
184       }
185
186       skip(i);
187
188       if (UNLIKELY(!tryAdvanceBuffer())) {
189         throw std::out_of_range("string underflow");
190       }
191     }
192   }
193
194   explicit CursorBase(BufType* buf)
195     : crtBuf_(buf)
196     , offset_(0)
197     , buffer_(buf) {}
198
199   // Make all the templated classes friends for copy constructor.
200   template <class D, typename B> friend class CursorBase;
201
202   /*
203    * Copy constructor.
204    *
205    * This also allows constructing a CursorBase from other derived types.
206    * For instance, this allows constructing a Cursor from an RWPrivateCursor.
207    */
208   template <class OtherDerived, class OtherBuf>
209   explicit CursorBase(const CursorBase<OtherDerived, OtherBuf>& cursor)
210     : crtBuf_(cursor.crtBuf_),
211       offset_(cursor.offset_),
212       buffer_(cursor.buffer_) {}
213
214   // reset cursor to point to a new buffer.
215   void reset(BufType* buf) {
216     crtBuf_ = buf;
217     buffer_ = buf;
218     offset_ = 0;
219   }
220
221   /**
222    * Return the available data in the current buffer.
223    * If you want to gather more data from the chain into a contiguous region
224    * (for hopefully zero-copy access), use gather() before peek().
225    */
226   std::pair<const uint8_t*, size_t> peek() {
227     // Ensure that we're pointing to valid data
228     size_t available = length();
229     while (UNLIKELY(available == 0 && tryAdvanceBuffer())) {
230       available = length();
231     }
232
233     return std::make_pair(data(), available);
234   }
235
236   void pull(void* buf, size_t len) {
237     if (UNLIKELY(pullAtMost(buf, len) != len)) {
238       throw std::out_of_range("underflow");
239     }
240   }
241
242   void clone(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
243     if (UNLIKELY(cloneAtMost(buf, len) != len)) {
244       throw std::out_of_range("underflow");
245     }
246   }
247
248   void clone(folly::IOBuf& buf, size_t len) {
249     if (UNLIKELY(cloneAtMost(buf, len) != len)) {
250       throw std::out_of_range("underflow");
251     }
252   }
253
254   void skip(size_t len) {
255     if (UNLIKELY(skipAtMost(len) != len)) {
256       throw std::out_of_range("underflow");
257     }
258   }
259
260   size_t pullAtMost(void* buf, size_t len) {
261     uint8_t* p = reinterpret_cast<uint8_t*>(buf);
262     size_t copied = 0;
263     for (;;) {
264       // Fast path: it all fits in one buffer.
265       size_t available = length();
266       if (LIKELY(available >= len)) {
267         memcpy(p, data(), len);
268         offset_ += len;
269         return copied + len;
270       }
271
272       memcpy(p, data(), available);
273       copied += available;
274       if (UNLIKELY(!tryAdvanceBuffer())) {
275         return copied;
276       }
277       p += available;
278       len -= available;
279     }
280   }
281
282   size_t cloneAtMost(folly::IOBuf& buf, size_t len) {
283     buf = folly::IOBuf();
284
285     std::unique_ptr<folly::IOBuf> tmp;
286     size_t copied = 0;
287     for (int loopCount = 0; true; ++loopCount) {
288       // Fast path: it all fits in one buffer.
289       size_t available = length();
290       if (LIKELY(available >= len)) {
291         if (loopCount == 0) {
292           crtBuf_->cloneOneInto(buf);
293           buf.trimStart(offset_);
294           buf.trimEnd(buf.length() - len);
295         } else {
296           tmp = crtBuf_->cloneOne();
297           tmp->trimStart(offset_);
298           tmp->trimEnd(tmp->length() - len);
299           buf.prependChain(std::move(tmp));
300         }
301
302         offset_ += len;
303         return copied + len;
304       }
305
306
307       if (loopCount == 0) {
308         crtBuf_->cloneOneInto(buf);
309         buf.trimStart(offset_);
310       } else {
311         tmp = crtBuf_->cloneOne();
312         tmp->trimStart(offset_);
313         buf.prependChain(std::move(tmp));
314       }
315
316       copied += available;
317       if (UNLIKELY(!tryAdvanceBuffer())) {
318         return copied;
319       }
320       len -= available;
321     }
322   }
323
324   size_t cloneAtMost(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
325     if (!buf) {
326       buf = make_unique<folly::IOBuf>();
327     }
328
329     return cloneAtMost(*buf, len);
330   }
331
332   size_t skipAtMost(size_t len) {
333     size_t skipped = 0;
334     for (;;) {
335       // Fast path: it all fits in one buffer.
336       size_t available = length();
337       if (LIKELY(available >= len)) {
338         offset_ += len;
339         return skipped + len;
340       }
341
342       skipped += available;
343       if (UNLIKELY(!tryAdvanceBuffer())) {
344         return skipped;
345       }
346       len -= available;
347     }
348   }
349
350   /**
351    * Return the distance between two cursors.
352    */
353   size_t operator-(const CursorBase& other) const {
354     BufType *otherBuf = other.crtBuf_;
355     size_t len = 0;
356
357     if (otherBuf != crtBuf_) {
358       len += otherBuf->length() - other.offset_;
359
360       for (otherBuf = otherBuf->next();
361            otherBuf != crtBuf_ && otherBuf != other.buffer_;
362            otherBuf = otherBuf->next()) {
363         len += otherBuf->length();
364       }
365
366       if (otherBuf == other.buffer_) {
367         throw std::out_of_range("wrap-around");
368       }
369
370       len += offset_;
371     } else {
372       if (offset_ < other.offset_) {
373         throw std::out_of_range("underflow");
374       }
375
376       len += offset_ - other.offset_;
377     }
378
379     return len;
380   }
381
382   /**
383    * Return the distance from the given IOBuf to the this cursor.
384    */
385   size_t operator-(const BufType* buf) const {
386     size_t len = 0;
387
388     BufType *curBuf = buf;
389     while (curBuf != crtBuf_) {
390       len += curBuf->length();
391       curBuf = curBuf->next();
392       if (curBuf == buf || curBuf == buffer_) {
393         throw std::out_of_range("wrap-around");
394       }
395     }
396
397     len += offset_;
398     return len;
399   }
400
401  protected:
402   BufType* crtBuf_;
403   size_t offset_;
404
405   ~CursorBase(){}
406
407   BufType* head() {
408     return buffer_;
409   }
410
411   bool tryAdvanceBuffer() {
412     BufType* nextBuf = crtBuf_->next();
413     if (UNLIKELY(nextBuf == buffer_)) {
414       offset_ = crtBuf_->length();
415       return false;
416     }
417
418     offset_ = 0;
419     crtBuf_ = nextBuf;
420     static_cast<Derived*>(this)->advanceDone();
421     return true;
422   }
423
424  private:
425   void advanceDone() {
426   }
427
428   BufType* buffer_;
429 };
430
431 } //namespace detail
432
433 class Cursor : public detail::CursorBase<Cursor, const IOBuf> {
434  public:
435   explicit Cursor(const IOBuf* buf)
436     : detail::CursorBase<Cursor, const IOBuf>(buf) {}
437
438   template <class OtherDerived, class OtherBuf>
439   explicit Cursor(const detail::CursorBase<OtherDerived, OtherBuf>& cursor)
440     : detail::CursorBase<Cursor, const IOBuf>(cursor) {}
441 };
442
443 namespace detail {
444
445 template <class Derived>
446 class Writable {
447  public:
448   template <class T>
449   typename std::enable_if<std::is_arithmetic<T>::value>::type
450   write(T value) {
451     const uint8_t* u8 = reinterpret_cast<const uint8_t*>(&value);
452     Derived* d = static_cast<Derived*>(this);
453     d->push(u8, sizeof(T));
454   }
455
456   template <class T>
457   void writeBE(T value) {
458     Derived* d = static_cast<Derived*>(this);
459     d->write(Endian::big(value));
460   }
461
462   template <class T>
463   void writeLE(T value) {
464     Derived* d = static_cast<Derived*>(this);
465     d->write(Endian::little(value));
466   }
467
468   void push(const uint8_t* buf, size_t len) {
469     Derived* d = static_cast<Derived*>(this);
470     if (d->pushAtMost(buf, len) != len) {
471       throw std::out_of_range("overflow");
472     }
473   }
474
475   /**
476    * push len bytes of data from input cursor, data could be in an IOBuf chain.
477    * If input cursor contains less than len bytes, or this cursor has less than
478    * len bytes writable space, an out_of_range exception will be thrown.
479    */
480   void push(Cursor cursor, size_t len) {
481     if (this->pushAtMost(cursor, len) != len) {
482       throw std::out_of_range("overflow");
483     }
484   }
485
486   size_t pushAtMost(Cursor cursor, size_t len) {
487     size_t written = 0;
488     for(;;) {
489       auto currentBuffer = cursor.peek();
490       const uint8_t* crtData = currentBuffer.first;
491       size_t available = currentBuffer.second;
492       if (available == 0) {
493         // end of buffer chain
494         return written;
495       }
496       // all data is in current buffer
497       if (available >= len) {
498         this->push(crtData, len);
499         cursor.skip(len);
500         return written + len;
501       }
502
503       // write the whole current IOBuf
504       this->push(crtData, available);
505       cursor.skip(available);
506       written += available;
507       len -= available;
508     }
509   }
510
511 };
512
513 } // namespace detail
514
515 enum class CursorAccess {
516   PRIVATE,
517   UNSHARE
518 };
519
520 template <CursorAccess access>
521 class RWCursor
522   : public detail::CursorBase<RWCursor<access>, IOBuf>,
523     public detail::Writable<RWCursor<access>> {
524   friend class detail::CursorBase<RWCursor<access>, IOBuf>;
525  public:
526   explicit RWCursor(IOBuf* buf)
527     : detail::CursorBase<RWCursor<access>, IOBuf>(buf),
528       maybeShared_(true) {}
529
530   template <class OtherDerived, class OtherBuf>
531   explicit RWCursor(const detail::CursorBase<OtherDerived, OtherBuf>& cursor)
532     : detail::CursorBase<RWCursor<access>, IOBuf>(cursor),
533       maybeShared_(true) {}
534   /**
535    * Gather at least n bytes contiguously into the current buffer,
536    * by coalescing subsequent buffers from the chain as necessary.
537    */
538   void gather(size_t n) {
539     // Forbid attempts to gather beyond the end of this IOBuf chain.
540     // Otherwise we could try to coalesce the head of the chain and end up
541     // accidentally freeing it, invalidating the pointer owned by external
542     // code.
543     //
544     // If crtBuf_ == head() then IOBuf::gather() will perform all necessary
545     // checking.  We only have to perform an explicit check here when calling
546     // gather() on a non-head element.
547     if (this->crtBuf_ != this->head() && this->totalLength() < n) {
548       throw std::overflow_error("cannot gather() past the end of the chain");
549     }
550     this->crtBuf_->gather(this->offset_ + n);
551   }
552   void gatherAtMost(size_t n) {
553     size_t size = std::min(n, this->totalLength());
554     return this->crtBuf_->gather(this->offset_ + size);
555   }
556
557   size_t pushAtMost(const uint8_t* buf, size_t len) {
558     size_t copied = 0;
559     for (;;) {
560       // Fast path: the current buffer is big enough.
561       size_t available = this->length();
562       if (LIKELY(available >= len)) {
563         if (access == CursorAccess::UNSHARE) {
564           maybeUnshare();
565         }
566         memcpy(writableData(), buf, len);
567         this->offset_ += len;
568         return copied + len;
569       }
570
571       if (access == CursorAccess::UNSHARE) {
572         maybeUnshare();
573       }
574       memcpy(writableData(), buf, available);
575       copied += available;
576       if (UNLIKELY(!this->tryAdvanceBuffer())) {
577         return copied;
578       }
579       buf += available;
580       len -= available;
581     }
582   }
583
584   void insert(std::unique_ptr<folly::IOBuf> buf) {
585     folly::IOBuf* nextBuf;
586     if (this->offset_ == 0) {
587       // Can just prepend
588       nextBuf = this->crtBuf_;
589       this->crtBuf_->prependChain(std::move(buf));
590     } else {
591       std::unique_ptr<folly::IOBuf> remaining;
592       if (this->crtBuf_->length() - this->offset_ > 0) {
593         // Need to split current IOBuf in two.
594         remaining = this->crtBuf_->cloneOne();
595         remaining->trimStart(this->offset_);
596         nextBuf = remaining.get();
597         buf->prependChain(std::move(remaining));
598       } else {
599         // Can just append
600         nextBuf = this->crtBuf_->next();
601       }
602       this->crtBuf_->trimEnd(this->length());
603       this->crtBuf_->appendChain(std::move(buf));
604     }
605     // Jump past the new links
606     this->offset_ = 0;
607     this->crtBuf_ = nextBuf;
608   }
609
610   uint8_t* writableData() {
611     return this->crtBuf_->writableData() + this->offset_;
612   }
613
614  private:
615   void maybeUnshare() {
616     if (UNLIKELY(maybeShared_)) {
617       this->crtBuf_->unshareOne();
618       maybeShared_ = false;
619     }
620   }
621
622   void advanceDone() {
623     maybeShared_ = true;
624   }
625
626   bool maybeShared_;
627 };
628
629 typedef RWCursor<CursorAccess::PRIVATE> RWPrivateCursor;
630 typedef RWCursor<CursorAccess::UNSHARE> RWUnshareCursor;
631
632 /**
633  * Append to the end of a buffer chain, growing the chain (by allocating new
634  * buffers) in increments of at least growth bytes every time.  Won't grow
635  * (and push() and ensure() will throw) if growth == 0.
636  *
637  * TODO(tudorb): add a flavor of Appender that reallocates one IOBuf instead
638  * of chaining.
639  */
640 class Appender : public detail::Writable<Appender> {
641  public:
642   Appender(IOBuf* buf, uint64_t growth)
643     : buffer_(buf),
644       crtBuf_(buf->prev()),
645       growth_(growth) {
646   }
647
648   uint8_t* writableData() {
649     return crtBuf_->writableTail();
650   }
651
652   size_t length() const {
653     return crtBuf_->tailroom();
654   }
655
656   /**
657    * Mark n bytes (must be <= length()) as appended, as per the
658    * IOBuf::append() method.
659    */
660   void append(size_t n) {
661     crtBuf_->append(n);
662   }
663
664   /**
665    * Ensure at least n contiguous bytes available to write.
666    * Postcondition: length() >= n.
667    */
668   void ensure(uint64_t n) {
669     if (LIKELY(length() >= n)) {
670       return;
671     }
672
673     // Waste the rest of the current buffer and allocate a new one.
674     // Don't make it too small, either.
675     if (growth_ == 0) {
676       throw std::out_of_range("can't grow buffer chain");
677     }
678
679     n = std::max(n, growth_);
680     buffer_->prependChain(IOBuf::create(n));
681     crtBuf_ = buffer_->prev();
682   }
683
684   size_t pushAtMost(const uint8_t* buf, size_t len) {
685     size_t copied = 0;
686     for (;;) {
687       // Fast path: it all fits in one buffer.
688       size_t available = length();
689       if (LIKELY(available >= len)) {
690         memcpy(writableData(), buf, len);
691         append(len);
692         return copied + len;
693       }
694
695       memcpy(writableData(), buf, available);
696       append(available);
697       copied += available;
698       if (UNLIKELY(!tryGrowChain())) {
699         return copied;
700       }
701       buf += available;
702       len -= available;
703     }
704   }
705
706  private:
707   bool tryGrowChain() {
708     assert(crtBuf_->next() == buffer_);
709     if (growth_ == 0) {
710       return false;
711     }
712
713     buffer_->prependChain(IOBuf::create(growth_));
714     crtBuf_ = buffer_->prev();
715     return true;
716   }
717
718   IOBuf* buffer_;
719   IOBuf* crtBuf_;
720   uint64_t growth_;
721 };
722
723 class QueueAppender : public detail::Writable<QueueAppender> {
724  public:
725   /**
726    * Create an Appender that writes to a IOBufQueue.  When we allocate
727    * space in the queue, we grow no more than growth bytes at once
728    * (unless you call ensure() with a bigger value yourself).
729    */
730   QueueAppender(IOBufQueue* queue, uint64_t growth) {
731     reset(queue, growth);
732   }
733
734   void reset(IOBufQueue* queue, uint64_t growth) {
735     queue_ = queue;
736     growth_ = growth;
737   }
738
739   uint8_t* writableData() {
740     return static_cast<uint8_t*>(queue_->writableTail());
741   }
742
743   size_t length() const { return queue_->tailroom(); }
744
745   void append(size_t n) { queue_->postallocate(n); }
746
747   // Ensure at least n contiguous; can go above growth_, throws if
748   // not enough room.
749   void ensure(uint64_t n) { queue_->preallocate(n, growth_); }
750
751   template <class T>
752   typename std::enable_if<std::is_arithmetic<T>::value>::type
753   write(T value) {
754     // We can't fail.
755     auto p = queue_->preallocate(sizeof(T), growth_);
756     storeUnaligned(p.first, value);
757     queue_->postallocate(sizeof(T));
758   }
759
760
761   size_t pushAtMost(const uint8_t* buf, size_t len) {
762     size_t remaining = len;
763     while (remaining != 0) {
764       auto p = queue_->preallocate(std::min(remaining, growth_),
765                                    growth_,
766                                    remaining);
767       memcpy(p.first, buf, p.second);
768       queue_->postallocate(p.second);
769       buf += p.second;
770       remaining -= p.second;
771     }
772
773     return len;
774   }
775
776   void insert(std::unique_ptr<folly::IOBuf> buf) {
777     if (buf) {
778       queue_->append(std::move(buf), true);
779     }
780   }
781
782  private:
783   folly::IOBufQueue* queue_;
784   size_t growth_;
785 };
786
787 }}  // folly::io
788
789 #endif // FOLLY_CURSOR_H