Add IOBufQueue::wrapBuffer, which handles buffers > 4GB.
[folly.git] / folly / experimental / io / IOBufQueue.cpp
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/IOBufQueue.h"
18
19 #include <string.h>
20
21 #include <stdexcept>
22
23 using std::make_pair;
24 using std::pair;
25 using std::unique_ptr;
26
27 namespace {
28
29 using folly::IOBuf;
30
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000; // Must fit within a uint32_t
33
34 /**
35  * Convenience function to append chain src to chain dst.
36  */
37 void
38 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src) {
39   if (dst == NULL) {
40     dst = std::move(src);
41   } else {
42     dst->prev()->appendChain(std::move(src));
43   }
44 }
45
46 } // anonymous namespace
47
48 namespace folly {
49
50 IOBufQueue::IOBufQueue(const Options& options)
51   : options_(options),
52     chainLength_(0) {
53 }
54
55 IOBufQueue::IOBufQueue(IOBufQueue&& other)
56   : options_(other.options_),
57     chainLength_(other.chainLength_),
58     head_(std::move(other.head_)) {
59   other.chainLength_ = 0;
60 }
61
62 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
63   if (&other != this) {
64     options_ = other.options_;
65     chainLength_ = other.chainLength_;
66     head_ = std::move(other.head_);
67     other.chainLength_ = 0;
68   }
69   return *this;
70 }
71
72 void
73 IOBufQueue::append(unique_ptr<IOBuf>&& buf) {
74   if (!buf) {
75     return;
76   }
77   if (options_.cacheChainLength) {
78     chainLength_ += buf->computeChainDataLength();
79   }
80   appendToChain(head_, std::move(buf));
81 }
82
83 void
84 IOBufQueue::append(IOBufQueue& other) {
85   if (!other.head_) {
86     return;
87   }
88   if (options_.cacheChainLength) {
89     if (other.options_.cacheChainLength) {
90       chainLength_ += other.chainLength_;
91     } else {
92       chainLength_ += other.head_->computeChainDataLength();
93     }
94   }
95   appendToChain(head_, std::move(other.head_));
96   other.chainLength_ = 0;
97 }
98
99 void
100 IOBufQueue::append(const void* buf, size_t len) {
101   auto src = static_cast<const uint8_t*>(buf);
102   while (len != 0) {
103     if ((head_ == NULL) || head_->prev()->isSharedOne() ||
104         (head_->prev()->tailroom() == 0)) {
105       appendToChain(head_, std::move(
106           IOBuf::create(std::max(MIN_ALLOC_SIZE,
107               std::min(len, MAX_ALLOC_SIZE)))));
108     }
109     IOBuf* last = head_->prev();
110     uint32_t copyLen = std::min(len, (size_t)last->tailroom());
111     memcpy(last->writableTail(), src, copyLen);
112     src += copyLen;
113     last->append(copyLen);
114     if (options_.cacheChainLength) {
115       chainLength_ += copyLen;
116     }
117     len -= copyLen;
118   }
119 }
120
121 void
122 IOBufQueue::wrapBuffer(const void* buf, size_t len, uint32_t blockSize) {
123   auto src = static_cast<const uint8_t*>(buf);
124   while (len != 0) {
125     size_t n = std::min(len, size_t(blockSize));
126     append(IOBuf::wrapBuffer(src, n));
127     src += n;
128     len -= n;
129   }
130 }
131
132 pair<void*,uint32_t>
133 IOBufQueue::preallocate(uint32_t min, uint32_t max) {
134   if (head_ != NULL) {
135     // If there's enough space left over at the end of the queue, use that.
136     IOBuf* last = head_->prev();
137     if (!last->isSharedOne()) {
138       uint32_t avail = last->tailroom();
139       if (avail >= min) {
140         return make_pair(
141             last->writableTail(), std::min(max, avail));
142       }
143     }
144   }
145   // Allocate a new buffer of the requested max size.
146   unique_ptr<IOBuf> newBuf(IOBuf::create(max));
147   appendToChain(head_, std::move(newBuf));
148   IOBuf* last = head_->prev();
149   return make_pair(last->writableTail(),
150       std::min(max, last->tailroom()));
151 }
152
153 void
154 IOBufQueue::postallocate(uint32_t n) {
155   head_->prev()->append(n);
156   if (options_.cacheChainLength) {
157     chainLength_ += n;
158   }
159 }
160
161 unique_ptr<IOBuf>
162 IOBufQueue::split(size_t n) {
163   unique_ptr<IOBuf> result;
164   while (n != 0) {
165     if (head_ == NULL) {
166       throw std::underflow_error(
167           "Attempt to remove more bytes than are present in IOBufQueue");
168     } else if (head_->length() <= n) {
169       n -= head_->length();
170       if (options_.cacheChainLength) {
171         chainLength_ -= head_->length();
172       }
173       unique_ptr<IOBuf> remainder = head_->pop();
174       appendToChain(result, std::move(head_));
175       head_ = std::move(remainder);
176     } else {
177       unique_ptr<IOBuf> clone = head_->cloneOne();
178       clone->trimEnd(clone->length() - n);
179       appendToChain(result, std::move(clone));
180       head_->trimStart(n);
181       if (options_.cacheChainLength) {
182         chainLength_ -= n;
183       }
184       break;
185     }
186   }
187   return std::move(result);
188 }
189
190 void IOBufQueue::trimStart(size_t amount) {
191   while (amount > 0) {
192     if (!head_) {
193       throw std::underflow_error(
194         "Attempt to trim more bytes than are present in IOBufQueue");
195     }
196     if (head_->length() > amount) {
197       head_->trimStart(amount);
198       if (options_.cacheChainLength) {
199         chainLength_ -= amount;
200       }
201       break;
202     }
203     amount -= head_->length();
204     if (options_.cacheChainLength) {
205       chainLength_ -= head_->length();
206     }
207     head_ = head_->pop();
208   }
209 }
210
211 void IOBufQueue::trimEnd(size_t amount) {
212   while (amount > 0) {
213     if (!head_) {
214       throw std::underflow_error(
215         "Attempt to trim more bytes than are present in IOBufQueue");
216     }
217     if (head_->prev()->length() > amount) {
218       head_->prev()->trimEnd(amount);
219       if (options_.cacheChainLength) {
220         chainLength_ -= amount;
221       }
222       break;
223     }
224     amount -= head_->prev()->length();
225     if (options_.cacheChainLength) {
226       chainLength_ -= head_->prev()->length();
227     }
228     unique_ptr<IOBuf> b = head_->prev()->unlink();
229
230     // Null queue if we unlinked the head.
231     if (b.get() == head_.get()) {
232       head_.reset();
233     }
234   }
235 }
236
237 } // folly