#include <limits.h>
#include <pthread.h>
-#include <list>
+
+#include <mutex>
#include <string>
#include <vector>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/locks.hpp>
-
#include <glog/logging.h>
-#include "folly/Exception.h"
#include "folly/Foreach.h"
+#include "folly/Exception.h"
#include "folly/Malloc.h"
+// TODO(tudorb): Remove this declaration after Malloc.h is pushed to
+// third-party.
+extern "C" int allocm(void**, size_t*, size_t, int)
+__attribute__((weak));
+
namespace folly {
namespace threadlocal_detail {
int nextId_;
std::vector<int> freeIds_;
- boost::mutex lock_;
+ std::mutex lock_;
pthread_key_t pthreadKey_;
ThreadEntry head_;
// We wouldn't call pthread_setspecific unless we actually called get()
DCHECK_NE(threadEntry_.elementsCapacity, 0);
{
- boost::lock_guard<boost::mutex> g(meta.lock_);
+ std::lock_guard<std::mutex> g(meta.lock_);
meta.erase(&threadEntry_);
// No need to hold the lock any longer; threadEntry_ is private to this
// thread now that it's been removed from meta.
static int create() {
int id;
auto & meta = instance();
- boost::lock_guard<boost::mutex> g(meta.lock_);
+ std::lock_guard<std::mutex> g(meta.lock_);
if (!meta.freeIds_.empty()) {
id = meta.freeIds_.back();
meta.freeIds_.pop_back();
// Elements in other threads that use this id.
std::vector<ElementWrapper> elements;
{
- boost::lock_guard<boost::mutex> g(meta.lock_);
+ std::lock_guard<std::mutex> g(meta.lock_);
for (ThreadEntry* e = meta.head_.next; e != &meta.head_; e = e->next) {
if (id < e->elementsCapacity && e->elements[id].ptr) {
elements.push_back(e->elements[id]);
* @id to fit in.
*/
static void reserve(int id) {
- size_t prevSize = threadEntry_.elementsCapacity;
- size_t newSize = static_cast<size_t>((id + 5) * 1.7);
+ size_t prevCapacity = threadEntry_.elementsCapacity;
+ // Growth factor < 2, see folly/docs/FBVector.md; + 5 to prevent
+ // very slow start.
+ size_t newCapacity = static_cast<size_t>((id + 5) * 1.7);
+ assert(newCapacity > prevCapacity);
auto& meta = instance();
- ElementWrapper* ptr = nullptr;
-
- // Rely on jemalloc to zero the memory if possible -- maybe it knows
- // it's already zeroed and saves us some work.
- if (!usingJEMalloc() ||
- prevSize < jemallocMinInPlaceExpandable ||
- (rallocm(
- static_cast<void**>(static_cast<void*>(&threadEntry_.elements)),
- nullptr, newSize * sizeof(ElementWrapper), 0,
- ALLOCM_NO_MOVE | ALLOCM_ZERO) != ALLOCM_SUCCESS)) {
- // Sigh, must realloc, but we can't call realloc here, as elements is
- // still linked in meta, so another thread might access invalid memory
- // after realloc succeeds. We'll copy by hand and update threadEntry_
- // under the lock.
+ ElementWrapper* reallocated = nullptr;
+
+ // Need to grow. Note that we can't call realloc, as elements is
+ // still linked in meta, so another thread might access invalid memory
+ // after realloc succeeds. We'll copy by hand and update threadEntry_
+ // under the lock.
+ if (usingJEMalloc()) {
+ bool success = false;
+ size_t newByteSize = newCapacity * sizeof(ElementWrapper);
+ size_t realByteSize = 0;
+
+ // Try to grow in place.
//
- // Note that we're using calloc instead of malloc in order to zero
- // the entire region. rallocm (ALLOCM_ZERO) will only zero newly
- // allocated memory, so if a previous allocation allocated more than
- // we requested, it's our responsibility to guarantee that the tail
- // is zeroed. calloc() is simpler than malloc() followed by memset(),
- // and potentially faster when dealing with a lot of memory, as
- // it can get already-zeroed pages from the kernel.
- ptr = static_cast<ElementWrapper*>(
- calloc(newSize, sizeof(ElementWrapper))
- );
- if (!ptr) throw std::bad_alloc();
+ // Note that rallocm(ALLOCM_ZERO) will only zero newly allocated memory,
+ // even if a previous allocation allocated more than we requested.
+ // This is fine; we always use ALLOCM_ZERO with jemalloc and we
+ // always expand our allocation to the real size.
+ if (prevCapacity * sizeof(ElementWrapper) >=
+ jemallocMinInPlaceExpandable) {
+ success = (rallocm(reinterpret_cast<void**>(&threadEntry_.elements),
+ &realByteSize,
+ newByteSize,
+ 0,
+ ALLOCM_NO_MOVE | ALLOCM_ZERO) == ALLOCM_SUCCESS);
+
+ }
+
+ // In-place growth failed.
+ if (!success) {
+ // Note that, unlike calloc,allocm(... ALLOCM_ZERO) zeros all
+ // allocated bytes (*realByteSize) and not just the requested
+ // bytes (newByteSize)
+ success = (allocm(reinterpret_cast<void**>(&reallocated),
+ &realByteSize,
+ newByteSize,
+ ALLOCM_ZERO) == ALLOCM_SUCCESS);
+ }
+
+ if (success) {
+ // Expand to real size
+ assert(realByteSize / sizeof(ElementWrapper) >= newCapacity);
+ newCapacity = realByteSize / sizeof(ElementWrapper);
+ } else {
+ throw std::bad_alloc();
+ }
+ } else { // no jemalloc
+ // calloc() is simpler than malloc() followed by memset(), and
+ // potentially faster when dealing with a lot of memory, as it can get
+ // already-zeroed pages from the kernel.
+ reallocated = static_cast<ElementWrapper*>(
+ calloc(newCapacity, sizeof(ElementWrapper)));
+ if (!reallocated) {
+ throw std::bad_alloc();
+ }
}
// Success, update the entry
{
- boost::lock_guard<boost::mutex> g(meta.lock_);
+ std::lock_guard<std::mutex> g(meta.lock_);
- if (prevSize == 0) {
+ if (prevCapacity == 0) {
meta.push_back(&threadEntry_);
}
- if (ptr) {
+ if (reallocated) {
/*
* Note: we need to hold the meta lock when copying data out of
* the old vector, because some other thread might be
* destructing a ThreadLocal and writing to the elements vector
* of this thread.
*/
- memcpy(ptr, threadEntry_.elements, sizeof(ElementWrapper) * prevSize);
+ memcpy(reallocated, threadEntry_.elements,
+ sizeof(ElementWrapper) * prevCapacity);
using std::swap;
- swap(ptr, threadEntry_.elements);
- threadEntry_.elementsCapacity = newSize;
+ swap(reallocated, threadEntry_.elements);
}
+ threadEntry_.elementsCapacity = newCapacity;
}
- free(ptr);
+ free(reallocated);
- if (prevSize == 0) {
+ if (prevCapacity == 0) {
pthread_setspecific(meta.pthreadKey_, &meta);
}
}
static ElementWrapper& get(int id) {
if (UNLIKELY(threadEntry_.elementsCapacity <= id)) {
reserve(id);
+ assert(threadEntry_.elementsCapacity > id);
}
return threadEntry_.elements[id];
}
#include <sys/types.h>
#include <sys/wait.h>
-#include <map>
-#include <unordered_map>
-#include <set>
+#include <unistd.h>
+
+#include <array>
#include <atomic>
-#include <mutex>
+#include <chrono>
#include <condition_variable>
+#include <map>
+#include <mutex>
+#include <set>
#include <thread>
-#include <unistd.h>
+#include <unordered_map>
+
#include <boost/thread/tss.hpp>
-#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include <gtest/gtest.h>
+
#include "folly/Benchmark.h"
using namespace folly;
EXPECT_EQ(4, tls.size());
}
+namespace {
+
+constexpr size_t kFillObjectSize = 300;
+
+std::atomic<uint64_t> gDestroyed;
+
+/**
+ * Fill a chunk of memory with a unique-ish pattern that includes the thread id
+ * (so deleting one of these from another thread would cause a failure)
+ *
+ * Verify it explicitly and on destruction.
+ */
+class FillObject {
+ public:
+ explicit FillObject(uint64_t idx) : idx_(idx) {
+ uint64_t v = val();
+ for (size_t i = 0; i < kFillObjectSize; ++i) {
+ data_[i] = v;
+ }
+ }
+
+ void check() {
+ uint64_t v = val();
+ for (size_t i = 0; i < kFillObjectSize; ++i) {
+ CHECK_EQ(v, data_[i]);
+ }
+ }
+
+ ~FillObject() {
+ ++gDestroyed;
+ }
+
+ private:
+ uint64_t val() const {
+ return (idx_ << 40) | uint64_t(pthread_self());
+ }
+
+ uint64_t idx_;
+ uint64_t data_[kFillObjectSize];
+};
+
+} // namespace
+
+TEST(ThreadLocal, Stress) {
+ constexpr size_t numFillObjects = 250;
+ std::array<ThreadLocalPtr<FillObject>, numFillObjects> objects;
+
+ constexpr size_t numThreads = 32;
+ constexpr size_t numReps = 20;
+
+ std::vector<std::thread> threads;
+ threads.reserve(numThreads);
+
+ for (size_t i = 0; i < numThreads; ++i) {
+ threads.emplace_back([&objects] {
+ for (size_t rep = 0; rep < numReps; ++rep) {
+ for (size_t i = 0; i < objects.size(); ++i) {
+ objects[i].reset(new FillObject(rep * objects.size() + i));
+ std::this_thread::sleep_for(std::chrono::microseconds(100));
+ }
+ for (size_t i = 0; i < objects.size(); ++i) {
+ objects[i]->check();
+ }
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ EXPECT_EQ(numFillObjects * numThreads * numReps, gDestroyed);
+}
+
// Yes, threads and fork don't mix
// (http://cppwisdom.quora.com/Why-threads-and-fork-dont-mix) but if you're
// stupid or desperate enough to try, we shouldn't stand in your way.