From df7a2f8103818afb0f76654073f1505e83539c96 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Wed, 15 Jan 2014 15:15:31 -0800 Subject: [PATCH] IndexedMemPool - pool allocator tailored for lock-free data structures Summary: Instances of IndexedMemPool dynamically allocate and then pool their element type (T), returning 4-byte integer indices that can be passed to the pool's operator[] method to access or obtain pointers to the actual elements. Once they are constructed, elements are never destroyed. These two features are useful for lock-free algorithms. The indexing behavior makes it easy to build tagged pointer-like-things, since a large number of elements can be managed using fewer bits than a full pointer. The pooling behavior makes it safe to read from T-s even after they have been recycled Test Plan: 1. unit tests 2. unit tests using DeterministicSchedule 3. this code is moved from tao/queues where it is in production use Reviewed By: davejwatson@fb.com FB internal diff: D1089053 --- folly/IndexedMemPool.h | 427 ++++++++++++++++++++++++++++++ folly/Makefile.am | 1 + folly/test/IndexedMemPoolTest.cpp | 162 ++++++++++++ 3 files changed, 590 insertions(+) create mode 100644 folly/IndexedMemPool.h create mode 100644 folly/test/IndexedMemPoolTest.cpp diff --git a/folly/IndexedMemPool.h b/folly/IndexedMemPool.h new file mode 100644 index 00000000..cef4d0ea --- /dev/null +++ b/folly/IndexedMemPool.h @@ -0,0 +1,427 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_DETAIL_INDEXEDMEMPOOL_H +#define FOLLY_DETAIL_INDEXEDMEMPOOL_H + +#include +#include +#include +#include +#include +#include +#include + +namespace folly { + +namespace detail { +template +struct IndexedMemPoolRecycler; +} + +/// Instances of IndexedMemPool dynamically allocate and then pool +/// their element type (T), returning 4-byte integer indices that can be +/// passed to the pool's operator[] method to access or obtain pointers +/// to the actual elements. Once they are constructed, elements are +/// never destroyed. These two features are useful for lock-free +/// algorithms. The indexing behavior makes it easy to build tagged +/// pointer-like-things, since a large number of elements can be managed +/// using fewer bits than a full pointer. The pooling behavior makes +/// it safe to read from T-s even after they have been recycled, since +/// it is guaranteed that the memory won't have been returned to the OS +/// and unmapped (the algorithm must still use a mechanism to validate +/// that the read was correct, but it doesn't have to worry about page +/// faults), and if the elements use internal sequence numbers it can be +/// guaranteed that there won't be an ABA match due to the element being +/// overwritten with a different type that has the same bit pattern. +/// +/// IMPORTANT: Space for extra elements is allocated to account for those +/// that are inaccessible because they are in other local lists, so the +/// actual number of items that can be allocated ranges from capacity to +/// capacity + (NumLocalLists_-1)*LocalListLimit_. This is important if +/// you are trying to maximize the capacity of the pool while constraining +/// the bit size of the resulting pointers, because the pointers will +/// actually range up to the boosted capacity. See maxIndexForCapacity +/// and capacityForMaxIndex. +/// +/// To avoid contention, NumLocalLists_ free lists of limited (less than +/// or equal to LocalListLimit_) size are maintained, and each thread +/// retrieves and returns entries from its associated local list. If the +/// local list becomes too large then elements are placed in bulk in a +/// global free list. This allows items to be efficiently recirculated +/// from consumers to producers. AccessSpreader is used to access the +/// local lists, so there is no performance advantage to having more +/// local lists than L1 caches. +/// +/// The pool mmap-s the entire necessary address space when the pool is +/// constructed, but delays element construction. This means that only +/// elements that are actually returned to the caller get paged into the +/// process's resident set (RSS). +template class Atom = std::atomic> +struct IndexedMemPool : boost::noncopyable { + typedef T value_type; + + typedef std::unique_ptr> + UniquePtr; + + static_assert(LocalListLimit_ <= 255, "LocalListLimit must fit in 8 bits"); + enum { + NumLocalLists = NumLocalLists_, + LocalListLimit = LocalListLimit_ + }; + + + // these are public because clients may need to reason about the number + // of bits required to hold indices from a pool, given its capacity + + static constexpr uint32_t maxIndexForCapacity(uint32_t capacity) { + // index of uint32_t(-1) == UINT32_MAX is reserved for isAllocated tracking + return std::min(uint64_t(capacity) + (NumLocalLists - 1) * LocalListLimit, + uint64_t(uint32_t(-1) - 1)); + } + + static constexpr uint32_t capacityForMaxIndex(uint32_t maxIndex) { + return maxIndex - (NumLocalLists - 1) * LocalListLimit; + } + + + /// Constructs a pool that can allocate at least _capacity_ elements, + /// even if all the local lists are full + explicit IndexedMemPool(uint32_t capacity) + : actualCapacity_(maxIndexForCapacity(capacity)) + , size_(0) + , globalHead_(TaggedPtr{}) + { + const size_t needed = sizeof(Slot) * (actualCapacity_ + 1); + long pagesize = sysconf(_SC_PAGESIZE); + mmapLength_ = ((needed - 1) & ~(pagesize - 1)) + pagesize; + assert(needed <= mmapLength_ && mmapLength_ < needed + pagesize); + assert((mmapLength_ % pagesize) == 0); + + slots_ = static_cast(mmap(nullptr, mmapLength_, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)); + if (slots_ == nullptr) { + assert(errno == ENOMEM); + throw std::bad_alloc(); + } + } + + /// Destroys all of the contained elements + ~IndexedMemPool() { + for (size_t i = size_; i > 0; --i) { + slots_[i].~Slot(); + } + munmap(slots_, mmapLength_); + } + + /// Returns a lower bound on the number of elements that may be + /// simultaneously allocated and not yet recycled. Because of the + /// local lists it is possible that more elements than this are returned + /// successfully + size_t capacity() { + return capacityForMaxIndex(actualCapacity_); + } + + /// Grants ownership of (*this)[retval], or returns 0 if no elements + /// are available + uint32_t allocIndex() { + return localPop(localHead()); + } + + /// If an element is available, returns a std::unique_ptr to it that will + /// recycle the element to the pool when it is reclaimed, otherwise returns + /// a null (falsy) std::unique_ptr + UniquePtr allocElem() { + auto idx = allocIndex(); + auto ptr = idx == 0 ? nullptr : &slot(idx).elem; + return UniquePtr(ptr, typename UniquePtr::deleter_type(this)); + } + + /// Gives up ownership previously granted by alloc() + void recycleIndex(uint32_t idx) { + assert(isAllocated(idx)); + localPush(localHead(), idx); + } + + /// Provides access to the pooled element referenced by idx + T& operator[](uint32_t idx) { + return slot(idx).elem; + } + + /// Provides access to the pooled element referenced by idx + const T& operator[](uint32_t idx) const { + return slot(idx).elem; + } + + /// If elem == &pool[idx], then pool.locateElem(elem) == idx. Also, + /// pool.locateElem(nullptr) == 0 + uint32_t locateElem(const T* elem) const { + if (!elem) { + return 0; + } + + static_assert(std::is_standard_layout::value, "offsetof needs POD"); + + auto slot = reinterpret_cast( + reinterpret_cast(elem) - offsetof(Slot, elem)); + auto rv = slot - slots_; + + // this assert also tests that rv is in range + assert(elem == &(*this)[rv]); + return rv; + } + + /// Returns true iff idx has been alloc()ed and not recycleIndex()ed + bool isAllocated(uint32_t idx) const { + return slot(idx).localNext == uint32_t(-1); + } + + + private: + ///////////// types + + struct Slot { + T elem; + uint32_t localNext; + uint32_t globalNext; + + Slot() : localNext{}, globalNext{} {} + }; + + struct TaggedPtr { + uint32_t idx; + + // size is bottom 8 bits, tag in top 24. g++'s code generation for + // bitfields seems to depend on the phase of the moon, plus we can + // do better because we can rely on other checks to avoid masking + uint32_t tagAndSize; + + enum : uint32_t { + SizeBits = 8, + SizeMask = (1U << SizeBits) - 1, + TagIncr = 1U << SizeBits, + }; + + uint32_t size() const { + return tagAndSize & SizeMask; + } + + TaggedPtr withSize(uint32_t repl) const { + assert(repl <= LocalListLimit); + return TaggedPtr{ idx, (tagAndSize & ~SizeMask) | repl }; + } + + TaggedPtr withSizeIncr() const { + assert(size() < LocalListLimit); + return TaggedPtr{ idx, tagAndSize + 1 }; + } + + TaggedPtr withSizeDecr() const { + assert(size() > 0); + return TaggedPtr{ idx, tagAndSize - 1 }; + } + + TaggedPtr withIdx(uint32_t repl) const { + return TaggedPtr{ repl, tagAndSize + TagIncr }; + } + + TaggedPtr withEmpty() const { + return withIdx(0).withSize(0); + } + }; + + struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING LocalList { + AtomicStruct head; + + LocalList() : head(TaggedPtr{}) {} + }; + + ////////// fields + + /// the actual number of slots that we will allocate, to guarantee + /// that we will satisfy the capacity requested at construction time. + /// They will be numbered 1..actualCapacity_ (note the 1-based counting), + /// and occupy slots_[1..actualCapacity_]. + size_t actualCapacity_; + + /// the number of bytes allocated from mmap, which is a multiple of + /// the page size of the machine + size_t mmapLength_; + + /// this records the number of slots that have actually been constructed. + /// To allow use of atomic ++ instead of CAS, we let this overflow. + /// The actual number of constructed elements is min(actualCapacity_, + /// size_) + std::atomic size_; + + /// raw storage, only 1..min(size_,actualCapacity_) (inclusive) are + /// actually constructed. Note that slots_[0] is not constructed or used + Slot* FOLLY_ALIGN_TO_AVOID_FALSE_SHARING slots_; + + /// use AccessSpreader to find your list. We use stripes instead of + /// thread-local to avoid the need to grow or shrink on thread start + /// or join. These are heads of lists chained with localNext + LocalList local_[NumLocalLists]; + + /// this is the head of a list of node chained by globalNext, that are + /// themselves each the head of a list chained by localNext + AtomicStruct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING globalHead_; + + ///////////// private methods + + size_t slotIndex(uint32_t idx) const { + assert(0 < idx && + idx <= actualCapacity_ && + idx <= size_.load(std::memory_order_acquire)); + return idx; + } + + Slot& slot(uint32_t idx) { + return slots_[slotIndex(idx)]; + } + + const Slot& slot(uint32_t idx) const { + return slots_[slotIndex(idx)]; + } + + // localHead references a full list chained by localNext. s should + // reference slot(localHead), it is passed as a micro-optimization + void globalPush(Slot& s, uint32_t localHead) { + while (true) { + TaggedPtr gh = globalHead_.load(std::memory_order_acquire); + s.globalNext = gh.idx; + if (globalHead_.compare_exchange_strong(gh, gh.withIdx(localHead))) { + // success + return; + } + } + } + + // idx references a single node + void localPush(AtomicStruct& head, uint32_t idx) { + Slot& s = slot(idx); + TaggedPtr h = head.load(std::memory_order_acquire); + while (true) { + s.localNext = h.idx; + + if (h.size() == LocalListLimit) { + // push will overflow local list, steal it instead + if (head.compare_exchange_strong(h, h.withEmpty())) { + // steal was successful, put everything in the global list + globalPush(s, idx); + return; + } + } else { + // local list has space + if (head.compare_exchange_strong(h, h.withIdx(idx).withSizeIncr())) { + // success + return; + } + } + // h was updated by failing CAS + } + } + + // returns 0 if empty + uint32_t globalPop() { + while (true) { + TaggedPtr gh = globalHead_.load(std::memory_order_acquire); + if (gh.idx == 0 || globalHead_.compare_exchange_strong( + gh, gh.withIdx(slot(gh.idx).globalNext))) { + // global list is empty, or pop was successful + return gh.idx; + } + } + } + + // returns 0 if allocation failed + uint32_t localPop(AtomicStruct& head) { + while (true) { + TaggedPtr h = head.load(std::memory_order_acquire); + if (h.idx != 0) { + // local list is non-empty, try to pop + Slot& s = slot(h.idx); + if (head.compare_exchange_strong( + h, h.withIdx(s.localNext).withSizeDecr())) { + // success + s.localNext = uint32_t(-1); + return h.idx; + } + continue; + } + + uint32_t idx = globalPop(); + if (idx == 0) { + // global list is empty, allocate and construct new slot + if (size_.load(std::memory_order_relaxed) >= actualCapacity_ || + (idx = ++size_) > actualCapacity_) { + // allocation failed + return 0; + } + // construct it + new (&slot(idx)) T(); + slot(idx).localNext = uint32_t(-1); + return idx; + } + + Slot& s = slot(idx); + if (head.compare_exchange_strong( + h, h.withIdx(s.localNext).withSize(LocalListLimit))) { + // global list moved to local list, keep head for us + s.localNext = uint32_t(-1); + return idx; + } + // local bulk push failed, return idx to the global list and try again + globalPush(s, idx); + } + } + + AtomicStruct& localHead() { + auto stripe = detail::AccessSpreader::current(NumLocalLists); + return local_[stripe].head; + } +}; + +namespace detail { + +/// This is a stateful Deleter functor, which allows std::unique_ptr +/// to track elements allocated from an IndexedMemPool by tracking the +/// associated pool. See IndexedMemPool::allocElem. +template +struct IndexedMemPoolRecycler { + Pool* pool; + + explicit IndexedMemPoolRecycler(Pool* pool) : pool(pool) {} + + IndexedMemPoolRecycler(const IndexedMemPoolRecycler& rhs) + = default; + IndexedMemPoolRecycler& operator= (const IndexedMemPoolRecycler& rhs) + = default; + + void operator()(typename Pool::value_type* elem) const { + pool->recycleIndex(pool->locateElem(elem)); + } +}; + +} + +} // namespace folly + +#endif diff --git a/folly/Makefile.am b/folly/Makefile.am index f0a34935..ead0045f 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -65,6 +65,7 @@ nobase_follyinclude_HEADERS = \ Format-inl.h \ GroupVarint.h \ Hash.h \ + IndexedMemPool.h \ IntrusiveList.h \ io/Cursor.h \ io/IOBuf.h \ diff --git a/folly/test/IndexedMemPoolTest.cpp b/folly/test/IndexedMemPoolTest.cpp new file mode 100644 index 00000000..40f3dd13 --- /dev/null +++ b/folly/test/IndexedMemPoolTest.cpp @@ -0,0 +1,162 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +using namespace folly; +using namespace folly::test; + +TEST(IndexedMemPool, unique_ptr) { + typedef IndexedMemPool Pool; + Pool pool(100); + + for (size_t i = 0; i < 100000; ++i) { + auto ptr = pool.allocElem(); + EXPECT_TRUE(!!ptr); + *ptr = i; + } + + std::vector leak; + while (true) { + auto ptr = pool.allocElem(); + if (!ptr) { + // good, we finally ran out + break; + } + leak.emplace_back(std::move(ptr)); + EXPECT_LT(leak.size(), 10000); + } +} + +TEST(IndexedMemPool, no_starvation) { + const int count = 1000; + const int poolSize = 100; + + typedef DeterministicSchedule Sched; + Sched sched(Sched::uniform(0)); + + typedef IndexedMemPool Pool; + Pool pool(poolSize); + + for (auto pass = 0; pass < 10; ++pass) { + int fd[2]; + EXPECT_EQ(pipe(fd), 0); + + // makes sure we wait for available nodes, rather than fail allocIndex + sem_t allocSem; + sem_init(&allocSem, 0, poolSize); + + // this semaphore is only needed for deterministic replay, so that we + // always block in an Sched:: operation rather than in a read() syscall + sem_t readSem; + sem_init(&readSem, 0, 0); + + std::thread produce = Sched::thread([&]() { + for (auto i = 0; i < count; ++i) { + Sched::wait(&allocSem); + uint32_t idx = pool.allocIndex(); + EXPECT_NE(idx, 0); + EXPECT_LE(idx, + poolSize + (pool.NumLocalLists - 1) * pool.LocalListLimit); + pool[idx] = i; + EXPECT_EQ(write(fd[1], &idx, sizeof(idx)), sizeof(idx)); + Sched::post(&readSem); + } + }); + + std::thread consume = Sched::thread([&]() { + for (auto i = 0; i < count; ++i) { + uint32_t idx; + Sched::wait(&readSem); + EXPECT_EQ(read(fd[0], &idx, sizeof(idx)), sizeof(idx)); + EXPECT_NE(idx, 0); + EXPECT_GE(idx, 1); + EXPECT_LE(idx, + poolSize + (Pool::NumLocalLists - 1) * Pool::LocalListLimit); + EXPECT_EQ(pool[idx], i); + pool.recycleIndex(idx); + Sched::post(&allocSem); + } + }); + + Sched::join(produce); + Sched::join(consume); + close(fd[0]); + close(fd[1]); + } +} + +TEST(IndexedMemPool, st_capacity) { + // only one local list => capacity is exact + typedef IndexedMemPool Pool; + Pool pool(10); + + EXPECT_EQ(pool.capacity(), 10); + EXPECT_EQ(Pool::maxIndexForCapacity(10), 10); + for (auto i = 0; i < 10; ++i) { + EXPECT_NE(pool.allocIndex(), 0); + } + EXPECT_EQ(pool.allocIndex(), 0); +} + +TEST(IndexedMemPool, mt_capacity) { + typedef IndexedMemPool Pool; + Pool pool(1000); + + std::thread threads[10]; + for (auto i = 0; i < 10; ++i) { + threads[i] = std::thread([&]() { + for (auto j = 0; j < 100; ++j) { + uint32_t idx = pool.allocIndex(); + EXPECT_NE(idx, 0); + } + }); + } + + for (auto i = 0; i < 10; ++i) { + threads[i].join(); + } + + for (auto i = 0; i < 16 * 32; ++i) { + pool.allocIndex(); + } + EXPECT_EQ(pool.allocIndex(), 0); +} + +TEST(IndexedMemPool, locate_elem) { + IndexedMemPool pool(1000); + + for (auto i = 0; i < 1000; ++i) { + auto idx = pool.allocIndex(); + EXPECT_FALSE(idx == 0); + int* elem = &pool[idx]; + EXPECT_TRUE(idx == pool.locateElem(elem)); + } + + EXPECT_EQ(pool.locateElem(nullptr), 0); +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} -- 2.34.1