X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FProducerConsumerQueue.h;h=57d41cca84b9599604bbca4581b3cf862b423fe2;hb=19db503e08e4ea46a8b4d9a272605006b6245f88;hp=69f3fd5a79019f7daeae66b6806a7df404708934;hpb=321542683a01c3f334047531e9b487f047129775;p=folly.git diff --git a/folly/ProducerConsumerQueue.h b/folly/ProducerConsumerQueue.h index 69f3fd5a..57d41cca 100644 --- a/folly/ProducerConsumerQueue.h +++ b/folly/ProducerConsumerQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2012-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,23 +17,25 @@ // @author Bo Hu (bhu@fb.com) // @author Jordan DeLong (delong.j@fb.com) -#ifndef PRODUCER_CONSUMER_QUEUE_H_ -#define PRODUCER_CONSUMER_QUEUE_H_ +#pragma once #include #include #include +#include #include #include #include +#include + namespace folly { /* * ProducerConsumerQueue is a one producer and one consumer queue * without locks. */ -template +template struct ProducerConsumerQueue { typedef T value_type; @@ -62,12 +64,12 @@ struct ProducerConsumerQueue { // (No real synchronization needed at destructor time: only one // thread can be doing this.) if (!std::is_trivially_destructible::value) { - size_t read = readIndex_; - size_t end = writeIndex_; - while (read != end) { - records_[read].~T(); - if (++read == size_) { - read = 0; + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; } } } @@ -75,7 +77,7 @@ struct ProducerConsumerQueue { std::free(records_); } - template + template bool write(Args&&... recordArgs) { auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); auto nextRecord = currentWrite + 1; @@ -135,16 +137,16 @@ struct ProducerConsumerQueue { } bool isEmpty() const { - return readIndex_.load(std::memory_order_consume) == - writeIndex_.load(std::memory_order_consume); + return readIndex_.load(std::memory_order_acquire) == + writeIndex_.load(std::memory_order_acquire); } bool isFull() const { - auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1; + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; if (nextRecord == size_) { nextRecord = 0; } - if (nextRecord != readIndex_.load(std::memory_order_consume)) { + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { return false; } // queue is full @@ -157,22 +159,30 @@ struct ProducerConsumerQueue { // be removing items concurrently). // * It is undefined to call this from any other thread. size_t sizeGuess() const { - int ret = writeIndex_.load(std::memory_order_consume) - - readIndex_.load(std::memory_order_consume); + int ret = writeIndex_.load(std::memory_order_acquire) - + readIndex_.load(std::memory_order_acquire); if (ret < 0) { ret += size_; } return ret; } -private: + // maximum number of items in the queue. + size_t capacity() const { + return size_ - 1; + } + + private: + char pad0_[hardware_destructive_interference_size]; const uint32_t size_; T* const records_; - std::atomic readIndex_; - std::atomic writeIndex_; -}; + alignas(hardware_destructive_interference_size) + std::atomic readIndex_; + alignas(hardware_destructive_interference_size) + std::atomic writeIndex_; -} + char pad1_[hardware_destructive_interference_size - sizeof(writeIndex_)]; +}; -#endif +} // namespace folly