X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FProducerConsumerQueue.h;h=16de57d35486a8207fcb1def59c48d59a1360fd3;hb=24db104019844e325def3f12c9f06ee48b8cfce6;hp=670f725436da5be8bcf64b179df7204966e4a3f0;hpb=fdbbdb8f25443a74ec7f351abd9b7f13b6b78e13;p=folly.git diff --git a/folly/ProducerConsumerQueue.h b/folly/ProducerConsumerQueue.h index 670f7254..16de57d3 100644 --- a/folly/ProducerConsumerQueue.h +++ b/folly/ProducerConsumerQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2012 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +17,17 @@ // @author Bo Hu (bhu@fb.com) // @author Jordan DeLong (delong.j@fb.com) -#ifndef PRODUCER_CONSUMER_QUEUE_H_ -#define PRODUCER_CONSUMER_QUEUE_H_ +#pragma once #include #include #include +#include #include #include #include -#include + +#include namespace folly { @@ -34,10 +35,13 @@ namespace folly { * ProducerConsumerQueue is a one producer and one consumer queue * without locks. */ -template -struct ProducerConsumerQueue : private boost::noncopyable { +template +struct ProducerConsumerQueue { typedef T value_type; + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete; + // size must be >= 2. // // Also, note that the number of usable slots in the queue at any @@ -59,13 +63,13 @@ struct ProducerConsumerQueue : private boost::noncopyable { // We need to destruct anything that may still exist in our queue. // (No real synchronization needed at destructor time: only one // thread can be doing this.) - if (!std::has_trivial_destructor::value) { - int read = readIndex_; - int end = writeIndex_; - while (read != end) { - records_[read].~T(); - if (++read == size_) { - read = 0; + if (!std::is_trivially_destructible::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; } } } @@ -73,7 +77,7 @@ struct ProducerConsumerQueue : private boost::noncopyable { std::free(records_); } - template + template bool write(Args&&... recordArgs) { auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); auto nextRecord = currentWrite + 1; @@ -133,16 +137,16 @@ struct ProducerConsumerQueue : private boost::noncopyable { } bool isEmpty() const { - return readIndex_.load(std::memory_order_consume) == - writeIndex_.load(std::memory_order_consume); + return readIndex_.load(std::memory_order_acquire) == + writeIndex_.load(std::memory_order_acquire); } bool isFull() const { - auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1; + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; if (nextRecord == size_) { nextRecord = 0; } - if (nextRecord != readIndex_.load(std::memory_order_consume)) { + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { return false; } // queue is full @@ -155,22 +159,28 @@ struct ProducerConsumerQueue : private boost::noncopyable { // be removing items concurrently). // * It is undefined to call this from any other thread. size_t sizeGuess() const { - int ret = writeIndex_.load(std::memory_order_consume) - - readIndex_.load(std::memory_order_consume); + int ret = writeIndex_.load(std::memory_order_acquire) - + readIndex_.load(std::memory_order_acquire); if (ret < 0) { ret += size_; } return ret; } -private: + // maximum number of items in the queue. + size_t capacity() const { + return size_ - 1; + } + + private: + char pad0_[CacheLocality::kFalseSharingRange]; const uint32_t size_; T* const records_; - std::atomic readIndex_; - std::atomic writeIndex_; -}; + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic readIndex_; + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic writeIndex_; -} + char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)]; +}; -#endif +} // namespace folly