2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/RecordIO.h>
19 #include <sys/types.h>
22 #include <folly/Exception.h>
23 #include <folly/FileUtil.h>
24 #include <folly/Memory.h>
25 #include <folly/Portability.h>
26 #include <folly/ScopeGuard.h>
27 #include <folly/String.h>
31 using namespace recordio_helpers;
33 RecordIOWriter::RecordIOWriter(File file, uint32_t fileId)
34 : file_(std::move(file)),
36 writeLock_(file_, std::defer_lock),
38 if (!writeLock_.try_lock()) {
39 throw std::runtime_error("RecordIOWriter: file locked by another process");
43 checkUnixError(fstat(file_.fd(), &st), "fstat() failed");
45 filePos_ = st.st_size;
48 void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) {
49 size_t totalLength = prependHeader(buf, fileId_);
50 if (totalLength == 0) {
51 return; // nothing to do
54 DCHECK_EQ(buf->computeChainDataLength(), totalLength);
56 // We're going to write. Reserve space for ourselves.
57 off_t pos = filePos_.fetch_add(totalLength);
59 #if FOLLY_HAVE_PWRITEV
60 auto iov = buf->getIov();
61 ssize_t bytes = pwritevFull(file_.fd(), iov.data(), iov.size(), pos);
65 ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos);
68 checkUnixError(bytes, "pwrite() failed");
69 DCHECK_EQ(bytes, totalLength);
72 RecordIOReader::RecordIOReader(File file, uint32_t fileId)
73 : map_(std::move(file)),
77 RecordIOReader::Iterator::Iterator(ByteRange range, uint32_t fileId, off_t pos)
80 recordAndPos_(ByteRange(), 0) {
81 if (size_t(pos) >= range_.size()) {
82 // Note that this branch can execute if pos is negative as well.
83 recordAndPos_.second = off_t(-1);
86 recordAndPos_.second = pos;
92 void RecordIOReader::Iterator::advanceToValid() {
93 ByteRange record = findRecord(range_, fileId_).record;
95 recordAndPos_ = std::make_pair(ByteRange(), off_t(-1));
96 range_.clear(); // at end
98 size_t skipped = record.begin() - range_.begin();
99 DCHECK_GE(skipped, headerSize());
100 skipped -= headerSize();
101 range_.advance(skipped);
102 recordAndPos_.first = record;
103 recordAndPos_.second += skipped;
107 namespace recordio_helpers {
109 using namespace detail;
113 constexpr uint32_t kHashSeed = 0xdeadbeef; // for mcurtiss
115 uint32_t headerHash(const Header& header) {
116 return hash::SpookyHashV2::Hash32(&header, offsetof(Header, headerHash),
120 std::pair<size_t, uint64_t> dataLengthAndHash(const IOBuf* buf) {
122 hash::SpookyHashV2 hasher;
123 hasher.Init(kHashSeed, kHashSeed);
124 for (auto br : *buf) {
126 hasher.Update(br.data(), br.size());
130 hasher.Final(&hash1, &hash2);
131 if (len + headerSize() >= std::numeric_limits<uint32_t>::max()) {
132 throw std::invalid_argument("Record length must fit in 32 bits");
134 return std::make_pair(len, hash1);
137 uint64_t dataHash(ByteRange range) {
138 return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed);
143 size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId) {
145 throw std::invalid_argument("invalid file id");
147 auto lengthAndHash = dataLengthAndHash(buf.get());
148 if (lengthAndHash.first == 0) {
149 return 0; // empty, nothing to do, no zero-length records
152 // Prepend to the first buffer in the chain if we have room, otherwise
153 // prepend a new buffer.
154 if (buf->headroom() >= headerSize()) {
156 buf->prepend(headerSize());
158 auto b = IOBuf::create(headerSize());
159 b->append(headerSize());
160 b->appendChain(std::move(buf));
163 detail::Header* header =
164 reinterpret_cast<detail::Header*>(buf->writableData());
165 memset(header, 0, sizeof(Header));
166 header->magic = detail::Header::kMagic;
167 header->fileId = fileId;
168 header->dataLength = lengthAndHash.first;
169 header->dataHash = lengthAndHash.second;
170 header->headerHash = headerHash(*header);
172 return lengthAndHash.first + headerSize();
175 RecordInfo validateRecord(ByteRange range, uint32_t fileId) {
176 if (range.size() <= headerSize()) { // records may not be empty
179 const Header* header = reinterpret_cast<const Header*>(range.begin());
180 range.advance(sizeof(Header));
181 if (header->magic != Header::kMagic ||
182 header->version != 0 ||
183 header->hashFunction != 0 ||
184 header->flags != 0 ||
185 (fileId != 0 && header->fileId != fileId) ||
186 header->dataLength > range.size()) {
189 if (headerHash(*header) != header->headerHash) {
192 range.reset(range.begin(), header->dataLength);
193 if (dataHash(range) != header->dataHash) {
196 return {header->fileId, range};
199 RecordInfo findRecord(ByteRange searchRange,
200 ByteRange wholeRange,
202 static const uint32_t magic = Header::kMagic;
203 static const ByteRange magicRange(reinterpret_cast<const uint8_t*>(&magic),
206 DCHECK_GE(searchRange.begin(), wholeRange.begin());
207 DCHECK_LE(searchRange.end(), wholeRange.end());
209 const uint8_t* start = searchRange.begin();
210 const uint8_t* end = std::min(searchRange.end(),
211 wholeRange.end() - sizeof(Header));
212 // end-1: the last place where a Header could start
213 while (start < end) {
214 auto p = ByteRange(start, end + sizeof(magic)).find(magicRange);
215 if (p == ByteRange::npos) {
220 auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId);
221 if (!r.record.empty()) {
225 // No repeated prefix in magic, so we can do better than start++
226 start += sizeof(magic);