# A (work-in-progress) test script for running our benchmarks
# Runs all tests, with timing information
-## Unfinished benchmarks - do not use
-# queue williams-queue
-
DATECMD="date +%Y-%m-%d-%R"
DATE="`${DATECMD}`"
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file array_lock_free_queue.h
-/// @brief Definition of a circular array based lock-free queue
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 11-Jul-2010 Original development
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
-#define __ARRAY_LOCK_FREE_QUEUE_H__
-
-#include <stdint.h> // uint32_t
-#include "atomic_ops.h" // atomic operations wrappers
-
-#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // (2^16)
-
-// define this variable if calls to "size" must return the real size of the
-// queue. If it is undefined that function will try to take a snapshot of
-// the queue, but returned value might be bogus
-#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
-//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1
-
-
-/// @brief Lock-free queue based on a circular array
-/// No allocation of extra memory for the nodes handling is needed, but it has to add
-/// extra overhead (extra CAS operation) when inserting to ensure the thread-safety of the queue
-/// ELEM_T represents the type of elementes pushed and popped from the queue
-/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure
-/// indexes in the circular queue keep stable when the uint32_t
-/// variable that holds the current position rolls over from FFFFFFFF
-/// to 0. For instance
-/// 2 -> 0x02
-/// 4 -> 0x04
-/// 8 -> 0x08
-/// 16 -> 0x10
-/// (...)
-/// 1024 -> 0x400
-/// 2048 -> 0x800
-///
-/// if queue size is not defined as requested, let's say, for
-/// instance 100, when current position is FFFFFFFF (4,294,967,295)
-/// index in the circular array is 4,294,967,295 % 100 = 95.
-/// When that value is incremented it will be set to 0, that is the
-/// last 4 elements of the queue are not used when the counter rolls
-/// over to 0
-template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
-class ArrayLockFreeQueue
-{
-public:
- /// @brief constructor of the class
- ArrayLockFreeQueue();
- virtual ~ArrayLockFreeQueue();
-
- /// @brief returns the current number of items in the queue
- /// It tries to take a snapshot of the size of the queue, but in busy environments
- /// this function might return bogus values.
- ///
- /// If a reliable queue size must be kept you might want to have a look at
- /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE'
- /// it enables a reliable size though it hits overall performance of the queue
- /// (when the reliable size variable is on it's got an impact of about 20% in time)
- uint32_t size();
-
- /// @brief push an element at the tail of the queue
- /// @param the element to insert in the queue
- /// Note that the element is not a pointer or a reference, so if you are using large data
- /// structures to be inserted in the queue you should think of instantiate the template
- /// of the queue as a pointer to that large structure
- /// @returns true if the element was inserted in the queue. False if the queue was full
- bool push(const ELEM_T &a_data);
-
- /// @brief pop the element at the head of the queue
- /// @param a reference where the element in the head of the queue will be saved to
- /// Note that the a_data parameter might contain rubbish if the function returns false
- /// @returns true if the element was successfully extracted from the queue. False if the queue was empty
- bool pop(ELEM_T &a_data);
-
-private:
- /// @brief array to keep the elements
- ELEM_T m_theQueue[Q_SIZE];
-
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- /// @brief number of elements in the queue
- volatile uint32_t m_count;
-#endif
-
- /// @brief where a new element will be inserted
- volatile uint32_t m_writeIndex;
-
- /// @brief where the next element where be extracted from
- volatile uint32_t m_readIndex;
-
- /// @brief maximum read index for multiple producer queues
- /// If it's not the same as m_writeIndex it means
- /// there are writes pending to be "committed" to the queue, that means,
- /// the place for the data was reserved (the index in the array) but
- /// data is still not in the queue, so the thread trying to read will have
- /// to wait for those other threads to save the data into the queue
- ///
- /// note this index is only used for MultipleProducerThread queues
- volatile uint32_t m_maximumReadIndex;
-
- /// @brief calculate the index in the circular array that corresponds
- /// to a particular "count" value
- inline uint32_t countToIndex(uint32_t a_count);
-};
-
-// include the implementation file
-#include "array_lock_free_queue_impl.h"
-
-#endif // __ARRAY_LOCK_FREE_QUEUE_H__
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file array_lock_free_queue_impl.h
-/// @brief Implementation of a circular array based lock-free queue
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 11-Jul-2010 Original development
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
-#define __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
-
-#include <assert.h> // assert()
-#include <sched.h> // sched_yield()
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
- m_writeIndex(0),
- m_readIndex(0),
- m_maximumReadIndex(0) // only for MultipleProducerThread queues
-{
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- m_count = 0;
-#endif
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
-{
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-inline
-uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
-{
- // if Q_SIZE is a power of 2 this statement could be also written as
- // return (a_count & (Q_SIZE - 1));
- return (a_count % Q_SIZE);
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
-{
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- return m_count;
-#else
- uint32_t currentWriteIndex = m_writeIndex;
- uint32_t currentReadIndex = m_readIndex;
-
- // let's think of a scenario where this function returns bogus data
- // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
- // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
- // 2. afterwards this thread is preemted. While this thread is inactive 2
- // elements are inserted and removed from the queue, so m_writeIndex is 5
- // m_readIndex 4. Real size is still 1
- // 3. Now the current thread comes back from preemption and reads m_readIndex.
- // currentReadIndex is 4
- // 4. currentReadIndex is bigger than currentWriteIndex, so
- // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
- // it returns that the queue is almost full, when it is almost empty
-
- if (currentWriteIndex >= currentReadIndex)
- {
- return (currentWriteIndex - currentReadIndex);
- }
- else
- {
- return (Q_SIZE + currentWriteIndex - currentReadIndex);
- }
-#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
-{
- uint32_t currentReadIndex;
- uint32_t currentWriteIndex;
-
- do
- {
- currentWriteIndex = m_writeIndex;
- currentReadIndex = m_readIndex;
- if (countToIndex(currentWriteIndex + 1) ==
- countToIndex(currentReadIndex))
- {
- // the queue is full
- return false;
- }
-
- } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
-
- // We know now that this index is reserved for us. Use it to save the data
- m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-
- // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
- // inserting in the queue. It might fail if there are more than 1 producer threads because this
- // operation has to be done in the same order as the previous CAS
- while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
- {
- // this is a good place to yield the thread in case there are more
- // software threads than hardware processors and you have more
- // than 1 producer thread
- // have a look at sched_yield (POSIX.1b)
- sched_yield();
- }
-
- // The value was successfully inserted into the queue
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicAdd(&m_count, 1);
-#endif
-
- return true;
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
-{
- uint32_t currentMaximumReadIndex;
- uint32_t currentReadIndex;
-
- do
- {
- // to ensure thread-safety when there is more than 1 producer thread
- // a second index is defined (m_maximumReadIndex)
- currentReadIndex = m_readIndex;
- currentMaximumReadIndex = m_maximumReadIndex;
-
- if (countToIndex(currentReadIndex) ==
- countToIndex(currentMaximumReadIndex))
- {
- // the queue is empty or
- // a producer thread has allocate space in the queue but is
- // waiting to commit the data into it
- return false;
- }
-
- // retrieve the data from the queue
- a_data = m_theQueue[countToIndex(currentReadIndex)];
-
- // try to perfrom now the CAS operation on the read index. If we succeed
- // a_data already contains what m_readIndex pointed to before we
- // increased it
- if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
- {
- // got here. The value was retrieved from the queue. Note that the
- // data inside the m_queue array is not deleted nor reseted
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicSub(&m_count, 1);
-#endif
- return true;
- }
-
- // it failed retrieving the element off the queue. Someone else must
- // have read the element stored at countToIndex(currentReadIndex)
- // before we could perform the CAS operation
-
- } while(1); // keep looping to try again!
-
- // Something went wrong. it shouldn't be possible to reach here
- assert(0);
-
- // Add this return statement to avoid compiler warnings
- return false;
-}
-
-#endif // __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
-
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file array_lock_free_queue_single_producer.h
-/// @brief Definition of a circular array based lock-free queue
-///
-/// WARNING: This queue is not thread safe when several threads try to insert
-/// elements into the queue. It is allowed to use as many consumers
-/// as needed though.
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 11-Jul-2010 Original development
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
-#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
-
-#include <stdint.h> // uint32_t
-#include "atomic_ops.h" // atomic operations wrappers
-
-#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65536 // 2^16 = 65,536 elements by default
-
-// define this variable if calls to "size" must return the real size of the
-// queue. If it is undefined that function will try to take a snapshot of
-// the queue, but returned value might be bogus
-#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
-//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1
-
-
-/// @brief especialisation of the ArrayLockFreeQueue to be used when there is
-/// only one producer thread
-/// No allocation of extra memory for the nodes handling is needed
-/// WARNING: This queue is not thread safe when several threads try to insert elements
-/// into the queue
-/// ELEM_T represents the type of elementes pushed and popped from the queue
-/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure
-/// indexes in the circular queue keep stable when the uint32_t
-/// variable that holds the current position rolls over from FFFFFFFF
-/// to 0. For instance
-/// 2 -> 0x02
-/// 4 -> 0x04
-/// 8 -> 0x08
-/// 16 -> 0x10
-/// (...)
-/// 1024 -> 0x400
-/// 2048 -> 0x800
-///
-/// if queue size is not defined as requested, let's say, for
-/// instance 100, when current position is FFFFFFFF (4,294,967,295)
-/// index in the circular array is 4,294,967,295 % 100 = 95.
-/// When that value is incremented it will be set to 0, that is the
-/// last 4 elements of the queue are not used when the counter rolls
-/// over to 0
-template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
-class ArrayLockFreeQueueSingleProducer
-{
-public:
- /// @brief constructor of the class
- ArrayLockFreeQueueSingleProducer();
- virtual ~ArrayLockFreeQueueSingleProducer();
-
- /// @brief returns the current number of items in the queue
- /// It tries to take a snapshot of the size of the queue, but in busy environments
- /// this function might return bogus values.
- ///
- /// If a reliable queue size must be kept you might want to have a look at
- /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE'
- /// it enables a reliable size though it hits overall performance of the queue
- /// (when the reliable size variable is on it's got an impact of about 20% in time)
- uint32_t size();
-
- /// @brief push an element at the tail of the queue
- /// @param the element to insert in the queue
- /// Note that the element is not a pointer or a reference, so if you are using large data
- /// structures to be inserted in the queue you should think of instantiate the template
- /// of the queue as a pointer to that large structure
- /// @returns true if the element was inserted in the queue. False if the queue was full
- bool push(const ELEM_T &a_data);
-
- /// @brief pop the element at the head of the queue
- /// @param a reference where the element in the head of the queue will be saved to
- /// Note that the a_data parameter might contain rubbish if the function returns false
- /// @returns true if the element was successfully extracted from the queue. False if the queue was empty
- bool pop(ELEM_T &a_data);
-
-private:
- /// @brief array to keep the elements
- ELEM_T m_theQueue[Q_SIZE];
-
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- /// @brief number of elements in the queue
- volatile uint32_t m_count;
-#endif
-
- /// @brief where a new element will be inserted
- volatile uint32_t m_writeIndex;
-
- /// @brief where the next element where be extracted from
- volatile uint32_t m_readIndex;
-
- /// @brief calculate the index in the circular array that corresponds
- /// to a particular "count" value
- inline uint32_t countToIndex(uint32_t a_count);
-};
-
-// include the implementation file
-#include "array_lock_free_queue_single_producer_impl.h"
-
-#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file array_lock_free_queue_single_producer_impl.h
-/// @brief Implementation of a circular array based lock-free queue
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 11-Jul-2010 Original development
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
-#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
-
-#include <assert.h> // assert()
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::ArrayLockFreeQueueSingleProducer() :
- m_writeIndex(0),
- m_readIndex(0)
-{
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- m_count = 0;
-#endif
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::~ArrayLockFreeQueueSingleProducer()
-{
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-inline
-uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
-{
- // if Q_SIZE is a power of 2 this statement could be also written as
- // return (a_count & (Q_SIZE - 1));
- return (a_count % Q_SIZE);
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::size()
-{
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- return m_count;
-#else
- uint32_t currentWriteIndex = m_writeIndex;
- uint32_t currentReadIndex = m_readIndex;
-
- // let's think of a scenario where this function returns bogus data
- // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
- // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
- // 2. afterwards this thread is preemted. While this thread is inactive 2
- // elements are inserted and removed from the queue, so m_writeIndex is 5
- // m_readIndex 4. Real size is still 1
- // 3. Now the current thread comes back from preemption and reads m_readIndex.
- // currentReadIndex is 4
- // 4. currentReadIndex is bigger than currentWriteIndex, so
- // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
- // it returns that the queue is almost full, when it is almost empty
-
- if (currentWriteIndex >= currentReadIndex)
- {
- return (currentWriteIndex - currentReadIndex);
- }
- else
- {
- return (Q_SIZE + currentWriteIndex - currentReadIndex);
- }
-#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
-{
- uint32_t currentReadIndex;
- uint32_t currentWriteIndex;
-
- currentWriteIndex = m_writeIndex;
- currentReadIndex = m_readIndex;
- if (countToIndex(currentWriteIndex + 1) ==
- countToIndex(currentReadIndex))
- {
- // the queue is full
- return false;
- }
-
- // save the date into the q
- m_theQueue[countToIndex(currentWriteIndex)] = a_data;
- // increment atomically write index. Now a consumer thread can read
- // the piece of data that was just stored
- AtomicAdd(&m_writeIndex, 1);
-
- // The value was successfully inserted into the queue
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicAdd(&m_count, 1);
-#endif
-
- return true;
-}
-
-template <typename ELEM_T, uint32_t Q_SIZE>
-bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
-{
- uint32_t currentMaximumReadIndex;
- uint32_t currentReadIndex;
-
- do
- {
- // m_maximumReadIndex doesn't exist when the queue is set up as
- // single-producer. The maximum read index is described by the current
- // write index
- currentReadIndex = m_readIndex;
- currentMaximumReadIndex = m_writeIndex;
-
- if (countToIndex(currentReadIndex) ==
- countToIndex(currentMaximumReadIndex))
- {
- // the queue is empty or
- // a producer thread has allocate space in the queue but is
- // waiting to commit the data into it
- return false;
- }
-
- // retrieve the data from the queue
- a_data = m_theQueue[countToIndex(currentReadIndex)];
-
- // try to perfrom now the CAS operation on the read index. If we succeed
- // a_data already contains what m_readIndex pointed to before we
- // increased it
- if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
- {
- // got here. The value was retrieved from the queue. Note that the
- // data inside the m_queue array is not deleted nor reseted
-#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicSub(&m_count, 1);
-#endif
- return true;
- }
-
- // it failed retrieving the element off the queue. Someone else must
- // have read the element stored at countToIndex(currentReadIndex)
- // before we could perform the CAS operation
-
- } while(1); // keep looping to try again!
-
- // Something went wrong. it shouldn't be possible to reach here
- assert(0);
-
- // Add this return statement to avoid compiler warnings
- return false;
-}
-
-#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
-
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file atomic_ops.h
-/// @brief This file contains functions to wrap the built-in atomic operations
-/// defined for your compiler
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 11-Jul-2010 Original development. GCC support
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef __ATOMIC_OPS_H
-#define __ATOMIC_OPS_H
-
-#ifdef __GNUC__
-// Atomic functions in GCC are present from version 4.1.0 on
-// http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html
-
-// Test for GCC >= 4.1.0
-#if (__GNUC__ < 4) || \
- ((__GNUC__ == 4) && ((__GNUC_MINOR__ < 1) || \
- ((__GNUC_MINOR__ == 1) && \
- (__GNUC_PATCHLEVEL__ < 0))) )
-
-#error Atomic built-in functions are only available in GCC in versions >= 4.1.0
-#endif // end of check for GCC 4.1.0
-
-/// @brief atomically adds a_count to the variable pointed by a_ptr
-/// @return the value that had previously been in memory
-#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
-
-/// @brief atomically substracts a_count from the variable pointed by a_ptr
-/// @return the value that had previously been in memory
-#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
-
-/// @brief Compare And Swap
-/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
-/// @return true if the comparison is successful and a_newVal was written
-#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
-
-/// @brief Compare And Swap
-/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
-/// @return the contents of *a_ptr before the operation
-#define CASVal(a_ptr, a_oldVal, a_newVal) __sync_val_compare_and_swap(a_ptr, a_oldVal, a_newVal)
-
-#else
-#error Atomic functions such as CAS or AtomicAdd are not defined for your compiler. Please add them in atomic_ops.h
-#endif // __GNUC__
-
-
-#endif // __ATOMIC_OPS_H
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2009-2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file q_blocking_queue.h
-/// @brief Definition of a thread-safe queue based on glib system calls
-/// It internally contains a std::queue which is protected from concurrent
-/// access by glib mutextes and conditional variables
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 04-May-2009 Original development (based on pthreads)
-/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef _GBLOCKINGQUEUE_H_
-#define _GBLOCKINGQUEUE_H_
-
-#include <glib.h>
-#include <queue>
-#include <limits> // std::numeric_limits<>::max
-
-#define BLOCKING_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max()
-
-/// @brief blocking thread-safe queue
-/// It uses a mutex+condition variables to protect the internal queue
-/// implementation. Inserting or reading elements use the same mutex
-template <typename T>
-class BlockingQueue
-{
-public:
- BlockingQueue(std::size_t a_maxSize = BLOCKING_QUEUE_DEFAULT_MAX_SIZE);
- ~BlockingQueue();
-
- /// @brief Check if the queue is empty
- /// This call can block if another thread owns the lock that protects the
- /// queue
- /// @return true if the queue is empty. False otherwise
- bool IsEmpty();
-
- /// @brief inserts an element into queue queue
- /// This call can block if another thread owns the lock that protects the
- /// queue. If the queue is full The thread will be blocked in this queue
- /// until someone else gets an element from the queue
- /// @param element to insert into the queue
- /// @return True if the elem was successfully inserted into the queue.
- /// False otherwise
- bool Push(const T &a_elem);
-
- /// @brief inserts an element into queue queue
- /// This call can block if another thread owns the lock that protects the
- /// queue. If the queue is full The call will return false and the element
- /// won't be inserted
- /// @param element to insert into the queue
- /// @return True if the elem was successfully inserted into the queue.
- /// False otherwise
- bool TryPush(const T &a_elem);
-
- /// @brief extracts an element from the queue (and deletes it from the q)
- /// If the queue is empty this call will block the thread until there is
- /// something in the queue to be extracted
- /// @param a reference where the element from the queue will be saved to
- void Pop(T &out_data);
-
- /// @brief extracts an element from the queue (and deletes it from the q)
- /// This call gets the block that protects the queue. It will extract the
- /// element from the queue only if there are elements in it
- /// @param reference to the variable where the result will be saved
- /// @return True if the element was retrieved from the queue.
- /// False if the queue was empty
- bool TryPop(T &out_data);
-
- /// @brief extracts an element from the queue (and deletes it from the q)
- /// If the queue is empty this call will block the thread until there
- /// is something in the queue to be extracted or until the timer
- /// (2nd parameter) expires
- /// @param reference to the variable where the result will be saved
- /// @param microsecondsto wait before returning if the queue was empty
- /// @return True if the element was retrieved from the queue.
- /// False if the timeout was reached
- bool TimedWaitPop(T &data, glong microsecs);
-
-protected:
- std::queue<T> m_theQueue;
- /// maximum number of elements for the queue
- std::size_t m_maximumSize;
- /// Mutex to protect the queue
- GMutex* m_mutex;
- /// Conditional variable to wake up threads
- GCond* m_cond;
-};
-
-// include the implementation file
-#include "g_blocking_queue_impl.h"
-
-#endif /* _GBLOCKINGQUEUE_H_ */
+++ /dev/null
-// ============================================================================
-// Copyright (c) 2009-2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file q_blocking_queue_impl.h
-/// @brief Implementation of a thread-safe queue based on glib system calls
-/// It internally contains a std::queue which is protected from concurrent
-/// access by glib mutextes and conditional variables
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 04-May-2009 Original development (based on pthreads)
-/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef _GBLOCKINGQUEUEIMPL_H_
-#define _GBLOCKINGQUEUEIMPL_H_
-
-#include <assert.h>
-
-#define NANOSECONDS_PER_SECOND 1000000000
-
-template <typename T>
-BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :
- m_maximumSize(a_maxSize)
-{
- if (!g_thread_supported ())
- {
- // glib thread system hasn't been initialized yet
- g_thread_init(NULL);
- }
-
- m_mutex = g_mutex_new();
- m_cond = g_cond_new();
-
- assert(m_mutex != NULL);
- assert(m_cond != NULL);
-}
-
-template <typename T>
-BlockingQueue<T>::~BlockingQueue()
-{
- g_cond_free(m_cond);
- g_mutex_free(m_mutex);
-}
-
-template <typename T>
-bool BlockingQueue<T>::IsEmpty()
-{
- bool rv;
-
- g_mutex_lock(m_mutex);
- rv = m_theQueue.empty();
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-template <typename T>
-bool BlockingQueue<T>::Push(const T &a_elem)
-{
- g_mutex_lock(m_mutex);
-
- while (m_theQueue.size() >= m_maximumSize)
- {
- g_cond_wait(m_cond, m_mutex);
- }
-
- bool queueEmpty = m_theQueue.empty();
-
- m_theQueue.push(a_elem);
-
- if (queueEmpty)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-
- return true;
-}
-
-template <typename T>
-bool BlockingQueue<T>::TryPush(const T &a_elem)
-{
- g_mutex_lock(m_mutex);
-
- bool rv = false;
- bool queueEmpty = m_theQueue.empty();
-
- if (m_theQueue.size() < m_maximumSize)
- {
- m_theQueue.push(a_elem);
- rv = true;
- }
-
- if (queueEmpty)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-template <typename T>
-void BlockingQueue<T>::Pop(T &out_data)
-{
- g_mutex_lock(m_mutex);
-
- while (m_theQueue.empty())
- {
- g_cond_wait(m_cond, m_mutex);
- }
-
- bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
-
- out_data = m_theQueue.front();
- m_theQueue.pop();
-
- if (queueFull)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-}
-
-template <typename T>
-bool BlockingQueue<T>::TryPop(T &out_data)
-{
- g_mutex_lock(m_mutex);
-
- bool rv = false;
- if (!m_theQueue.empty())
- {
- bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
-
- out_data = m_theQueue.front();
- m_theQueue.pop();
-
- if (queueFull)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- rv = true;
- }
-
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-template <typename T>
-bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)
-{
- g_mutex_lock(m_mutex);
-
- // adding microsecs to now
- GTimeVal abs_time;
- g_get_current_time(&abs_time);
- g_time_val_add(&abs_time, microsecs);
-
- gboolean retcode = TRUE;
- while (m_theQueue.empty() && (retcode != FALSE))
- {
- // Returns TRUE if cond was signalled, or FALSE on timeout
- retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time);
- }
-
- bool rv = false;
- bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
- if (retcode != FALSE)
- {
- data = m_theQueue.front();
- m_theQueue.pop();
-
- rv = true;
- }
-
- if (rv && queueFull)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-#endif /* _GBLOCKINGQUEUEIMPL_H_ */
+++ /dev/null
-# -fno-schedule-insns -fno-rerun-loop-opt are a workaround for a compiler error in 4.2
-# -Wno-unused-parameter
-
-CC = g++
-CFLAGS = -g -O3 -fopenmp -fno-schedule-insns -fno-schedule-insns2 -W -Wall #-Wno-unused-parameter
-CFLAGS += `pkg-config --cflags glib-2.0`
-#CFLAGS += -march=i686
-#CFLAGS += -march=core2
-LDFLAGS = -lgomp
-LDFLAGS+= `pkg-config --libs glib-2.0`
-# g_blocking_queue also depends on gthread-2.0
-CFLAGS_GTHREAD = `pkg-config gthread-2.0`
-LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`
-
-#compile-time parameters
-ifdef N_PRODUCERS
-CFLAGS += -DN_PRODUCERS=$(N_PRODUCERS)
-endif
-ifdef N_CONSUMERS
-CFLAGS += -DN_CONSUMERS=$(N_CONSUMERS)
-endif
-ifdef N_ITERATIONS
-CFLAGS += -DN_ITERATIONS=$(N_ITERATIONS)
-endif
-ifdef QUEUE_SIZE
-CFLAGS += -DQUEUE_SIZE=$(QUEUE_SIZE)
-endif
-
-
-LOCK_FREE_Q_INCLUDE = \
- array_lock_free_queue.h \
- array_lock_free_queue_impl.h
-
-BLOCKING_Q_INCLUDE = \
- g_blocking_queue.h \
- g_blocking_queue_impl.h
-
-LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \
- array_lock_free_queue_single_producer.h \
- array_lock_free_queue_single_producer_impl.h
-
-SHARED_INCLUDE = \
- atomic_ops.h
-
-all : test_lock_free_q test_lock_free_single_producer_q test_blocking_q
-
-test_lock_free_q : test_lock_free_q.o
- $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS)
-
-test_blocking_q : test_blocking_q.o
- $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS) $(LDFLAGS_GTHREAD)
-
-test_lock_free_single_producer_q : test_lock_free_single_producer_q.o
- $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS)
-
-test_lock_free_q.o : test_lock_free_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_Q_INCLUDE)
- $(CC) -c $< $(CFLAGS)
-
-test_lock_free_single_producer_q.o : test_lock_free_single_producer_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE)
- $(CC) -c $< $(CFLAGS)
-
-test_blocking_q.o: test_blocking_q.cpp $(SHARED_INCLUDE) $(BLOCKING_Q_INCLUDE)
- $(CC) -c $< $(CFLAGS) $(CFLAGS_GTHREAD)
-
-clean:
- rm test_lock_free_q test_blocking_q test_lock_free_single_producer_q; rm *.o
-
+++ /dev/null
-!_TAG_FILE_FORMAT 2 /extended format; --format=1 will not append ;" to lines/
-!_TAG_FILE_SORTED 1 /0=unsorted, 1=sorted, 2=foldcase/
-!_TAG_PROGRAM_AUTHOR Darren Hiebert /dhiebert@users.sourceforge.net/
-!_TAG_PROGRAM_NAME Exuberant Ctags //
-!_TAG_PROGRAM_URL http://ctags.sourceforge.net /official site/
-!_TAG_PROGRAM_VERSION 5.9~svn20110310 //
-ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue.h 45;" d
-ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue_single_producer.h 49;" d
-ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE array_lock_free_queue.h 50;" d
-ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE array_lock_free_queue_single_producer.h 54;" d
-ArrayLockFreeQueue array_lock_free_queue.h /^class ArrayLockFreeQueue$/;" c
-ArrayLockFreeQueue array_lock_free_queue_impl.h /^ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :$/;" f class:ArrayLockFreeQueue
-ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer.h /^class ArrayLockFreeQueueSingleProducer$/;" c
-ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer_impl.h /^ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::ArrayLockFreeQueueSingleProducer() :$/;" f class:ArrayLockFreeQueueSingleProducer
-AtomicAdd atomic_ops.h 51;" d
-AtomicAdd atomic_ops.h 87;" d
-AtomicSub atomic_ops.h 55;" d
-AtomicSub atomic_ops.h 91;" d
-BLOCKING_QUEUE_DEFAULT_MAX_SIZE g_blocking_queue.h 49;" d
-BLOCKING_Q_INCLUDE makefile /^BLOCKING_Q_INCLUDE = \\$/;" m
-BlockingQueue g_blocking_queue.h /^class BlockingQueue$/;" c
-BlockingQueue g_blocking_queue_impl.h /^BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :$/;" f class:BlockingQueue
-CAS atomic_ops.h 60;" d
-CAS atomic_ops.h 96;" d
-CASVal atomic_ops.h 101;" d
-CASVal atomic_ops.h 65;" d
-CC makefile /^CC = g++$/;" m
-CFLAGS_GTHREAD makefile /^CFLAGS_GTHREAD = `pkg-config gthread-2.0`$/;" m
-IsEmpty g_blocking_queue_impl.h /^bool BlockingQueue<T>::IsEmpty()$/;" f class:BlockingQueue
-LDFLAGS_GTHREAD makefile /^LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`$/;" m
-LOCK_FREE_Q_INCLUDE makefile /^LOCK_FREE_Q_INCLUDE = \\$/;" m
-LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE makefile /^LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \\$/;" m
-NANOSECONDS_PER_SECOND g_blocking_queue_impl.h 47;" d
-N_CONSUMERS test_blocking_q.cpp 17;" d file:
-N_CONSUMERS test_lock_free_q.cpp 17;" d file:
-N_CONSUMERS test_lock_free_single_producer_q.cpp 13;" d file:
-N_ITERATIONS test_blocking_q.cpp 21;" d file:
-N_ITERATIONS test_lock_free_q.cpp 21;" d file:
-N_ITERATIONS test_lock_free_single_producer_q.cpp 17;" d file:
-N_PRODUCERS test_blocking_q.cpp 13;" d file:
-N_PRODUCERS test_lock_free_q.cpp 13;" d file:
-Pop g_blocking_queue_impl.h /^void BlockingQueue<T>::Pop(T &out_data)$/;" f class:BlockingQueue
-Push g_blocking_queue_impl.h /^bool BlockingQueue<T>::Push(const T &a_elem)$/;" f class:BlockingQueue
-QUEUE_SIZE test_blocking_q.cpp 25;" d file:
-QUEUE_SIZE test_lock_free_q.cpp 25;" d file:
-QUEUE_SIZE test_lock_free_single_producer_q.cpp 21;" d file:
-SHARED_INCLUDE makefile /^SHARED_INCLUDE = \\$/;" m
-TestBlockingQueue test_blocking_q.cpp /^void TestBlockingQueue()$/;" f
-TestLockFreeQueue test_lock_free_q.cpp /^void TestLockFreeQueue()$/;" f
-TestLockFreeQueue test_lock_free_single_producer_q.cpp /^void TestLockFreeQueue()$/;" f
-TimedWaitPop g_blocking_queue_impl.h /^bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)$/;" f class:BlockingQueue
-TryPop g_blocking_queue_impl.h /^bool BlockingQueue<T>::TryPop(T &out_data)$/;" f class:BlockingQueue
-TryPush g_blocking_queue_impl.h /^bool BlockingQueue<T>::TryPush(const T &a_elem)$/;" f class:BlockingQueue
-_GBLOCKINGQUEUEIMPL_H_ g_blocking_queue_impl.h 43;" d
-_GBLOCKINGQUEUE_H_ g_blocking_queue.h 43;" d
-__ARRAY_LOCK_FREE_QUEUE_H__ array_lock_free_queue.h 40;" d
-__ARRAY_LOCK_FREE_QUEUE_IMPL_H__ array_lock_free_queue_impl.h 40;" d
-__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__ array_lock_free_queue_single_producer.h 44;" d
-__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__ array_lock_free_queue_single_producer_impl.h 40;" d
-__ATOMIC_OPS_H atomic_ops.h 41;" d
-atomic32 atomic_ops.h /^typedef atomic_uint atomic32;$/;" t
-atomic32 atomic_ops.h /^typedef volatile uint32 atomic32;$/;" t
-countToIndex array_lock_free_queue_impl.h /^uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)$/;" f class:ArrayLockFreeQueue
-countToIndex array_lock_free_queue_single_producer_impl.h /^uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)$/;" f class:ArrayLockFreeQueueSingleProducer
-m_cond g_blocking_queue.h /^ GCond* m_cond;$/;" m class:BlockingQueue
-m_count array_lock_free_queue.h /^ atomic32 m_count;$/;" m class:ArrayLockFreeQueue
-m_count array_lock_free_queue_single_producer.h /^ atomic32 m_count;$/;" m class:ArrayLockFreeQueueSingleProducer
-m_maximumReadIndex array_lock_free_queue.h /^ atomic32 m_maximumReadIndex;$/;" m class:ArrayLockFreeQueue
-m_maximumSize g_blocking_queue.h /^ std::size_t m_maximumSize;$/;" m class:BlockingQueue
-m_mutex g_blocking_queue.h /^ GMutex* m_mutex;$/;" m class:BlockingQueue
-m_readIndex array_lock_free_queue.h /^ atomic32 m_readIndex;$/;" m class:ArrayLockFreeQueue
-m_readIndex array_lock_free_queue_single_producer.h /^ atomic32 m_readIndex;$/;" m class:ArrayLockFreeQueueSingleProducer
-m_theQueue array_lock_free_queue.h /^ ELEM_T m_theQueue[Q_SIZE];$/;" m class:ArrayLockFreeQueue
-m_theQueue array_lock_free_queue_single_producer.h /^ ELEM_T m_theQueue[Q_SIZE];$/;" m class:ArrayLockFreeQueueSingleProducer
-m_theQueue g_blocking_queue.h /^ std::queue<T> m_theQueue;$/;" m class:BlockingQueue
-m_writeIndex array_lock_free_queue.h /^ atomic32 m_writeIndex;$/;" m class:ArrayLockFreeQueue
-m_writeIndex array_lock_free_queue_single_producer.h /^ atomic32 m_writeIndex;$/;" m class:ArrayLockFreeQueueSingleProducer
-main test_blocking_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f
-main test_lock_free_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f
-main test_lock_free_single_producer_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f
-pop array_lock_free_queue_impl.h /^bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)$/;" f class:ArrayLockFreeQueue
-pop array_lock_free_queue_single_producer_impl.h /^bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)$/;" f class:ArrayLockFreeQueueSingleProducer
-push array_lock_free_queue_impl.h /^bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)$/;" f class:ArrayLockFreeQueue
-push array_lock_free_queue_single_producer_impl.h /^bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)$/;" f class:ArrayLockFreeQueueSingleProducer
-size array_lock_free_queue_impl.h /^uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()$/;" f class:ArrayLockFreeQueue
-size array_lock_free_queue_single_producer_impl.h /^uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::size()$/;" f class:ArrayLockFreeQueueSingleProducer
-~ArrayLockFreeQueue array_lock_free_queue_impl.h /^ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()$/;" f class:ArrayLockFreeQueue
-~ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer_impl.h /^ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::~ArrayLockFreeQueueSingleProducer()$/;" f class:ArrayLockFreeQueueSingleProducer
-~BlockingQueue g_blocking_queue_impl.h /^BlockingQueue<T>::~BlockingQueue()$/;" f class:BlockingQueue
+++ /dev/null
-// ============================================================================
-/// @file test_blocking_q.cpp
-/// @brief Benchmark blocking queue
-// ============================================================================
-
-
-#include <iostream>
-#include <glib.h> // GTimeVal + g_get_current_time
-#include <omp.h> // parallel processing support in gcc
-#include "g_blocking_queue.h"
-
-#ifndef N_PRODUCERS
-#define N_PRODUCERS 1
-#endif
-
-#ifndef N_CONSUMERS
-#define N_CONSUMERS 1
-#endif
-
-#ifndef N_ITERATIONS
-#define N_ITERATIONS 10000000
-#endif
-
-#ifndef QUEUE_SIZE
-#define QUEUE_SIZE 1000
-#endif
-
-void TestBlockingQueue()
-{
- BlockingQueue<int> theQueue(QUEUE_SIZE);
- GTimeVal iniTimestamp;
- GTimeVal endTimestamp;
-
- std::cout << "=== Start of testing blocking queue ===" << std::endl;
- g_get_current_time(&iniTimestamp);
- #pragma omp parallel shared(theQueue) num_threads (2)
- {
- if (omp_get_thread_num() == 0)
- {
- if (!omp_get_nested())
- {
- std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
- std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
- }
- }
-
- #pragma omp sections //nowait
- {
- #pragma omp section
- {
- // producer section
- #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS)
- {
- int i;
- #pragma omp for schedule(static) private(i) nowait
- for (i = 0 ; i < N_ITERATIONS ; i++)
- {
- while(!theQueue.Push(i))
- {
- // queue full
- }
- }
- }
- }
-
- #pragma omp section
- {
- // consumer section
- #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
- {
- int i;
- int result;
- #pragma omp for schedule(static) private(i, result) nowait
- for (i = 0 ; i < N_ITERATIONS ; i++)
- {
- // this call will block if the queue is empty until
- // some other thread pushes something into it
- theQueue.Pop(result);
- }
- }
- }
- }
-
- } // #pragma omp parallel
-
- g_get_current_time(&endTimestamp);
-
- // calculate elapsed time
- GTimeVal elapsedTime;
- if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
- {
- elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec;
- elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
- }
- else
- {
- elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1;
- elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
- }
-
- std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
- std::cout << "=== End of testing blocking queue ===" << std::endl;
-}
-
-int main(int /*argc*/, char** /*argv*/)
-{
- TestBlockingQueue();
- std::cout << "Done!!!" << std::endl;
-
- return 0;
-}
-
+++ /dev/null
-// ============================================================================
-/// @file test_lock_free_q.cpp
-/// @brief Benchmark lock free queue
-// ============================================================================
-
-
-#include <iostream>
-#include <glib.h> // GTimeVal + g_get_current_time
-#include <omp.h> // parallel processing support in gcc
-#include "array_lock_free_queue.h"
-
-#ifndef N_PRODUCERS
-#define N_PRODUCERS 1
-#endif
-
-#ifndef N_CONSUMERS
-#define N_CONSUMERS 1
-#endif
-
-#ifndef N_ITERATIONS
-#define N_ITERATIONS 10000000
-#endif
-
-#ifndef QUEUE_SIZE
-#define QUEUE_SIZE 1024
-#endif
-
-void TestLockFreeQueue()
-{
- ArrayLockFreeQueue<int, QUEUE_SIZE> theQueue;
- GTimeVal iniTimestamp;
- GTimeVal endTimestamp;
-
- std::cout << "=== Start of testing lock-free queue ===" << std::endl;
- g_get_current_time(&iniTimestamp);
- #pragma omp parallel shared(theQueue) num_threads (2)
- {
- if (omp_get_thread_num() == 0)
- {
- //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl;
-
- if (!omp_get_nested())
- {
- std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
- std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
- }
- }
-
- #pragma omp sections //nowait
- {
- #pragma omp section
- {
- // producer section
- #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS)
- {
- //if (omp_get_thread_num() == 0)
- //{
- // std::cout << "\t Producers: " << omp_get_num_threads() << std::endl;
- //}
- int i;
- #pragma omp for schedule(static) private(i) nowait
- for (i = 0 ; i < N_ITERATIONS ; i++)
- {
- while(!theQueue.push(i))
- {
- // queue full
- }
- }
- }
- }
-
- #pragma omp section
- {
- // consumer section
- #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
- {
- //if (omp_get_thread_num() == 0)
- //{
- // std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl;
- //}
-
- int i;
- int result;
- #pragma omp for schedule(static) private(i, result) nowait
- for (i = 0 ; i < N_ITERATIONS ; i++)
- {
- while (!theQueue.pop(result))
- {
- // queue empty
- }
-
-#if (N_CONSUMERS == 1 && N_PRODUCERS == 1)
- if (i != result)
- {
- std::cout << "FAILED i=" << i << " result=" << result << std::endl;
- }
-#endif
- }
- }
- }
- }
-
- } // #pragma omp parallel
-
- g_get_current_time(&endTimestamp);
-
- // calculate elapsed time
- GTimeVal elapsedTime;
- if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
- {
- elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec;
- elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
- }
- else
- {
- elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1;
- elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
- }
-
- std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
- std::cout << "=== End of testing lock-free queue ===" << std::endl;
-}
-
-int main(int /*argc*/, char** /*argv*/)
-{
-
- TestLockFreeQueue();
- std::cout << "Done!!!" << std::endl;
-
- return 0;
-}
-
+++ /dev/null
-// ============================================================================
-/// @file test_lock_free_q.cpp
-/// @brief Benchmark lock free queue
-// ============================================================================
-
-
-#include <iostream>
-#include <glib.h> // GTimeVal + g_get_current_time
-#include <omp.h> // parallel processing support in gcc
-#include "array_lock_free_queue_single_producer.h"
-
-#ifndef N_CONSUMERS
-#define N_CONSUMERS 1
-#endif
-
-#ifndef N_ITERATIONS
-#define N_ITERATIONS 10000000
-#endif
-
-#ifndef QUEUE_SIZE
-#define QUEUE_SIZE 1000
-#endif
-
-void TestLockFreeQueue()
-{
- ArrayLockFreeQueueSingleProducer<int, QUEUE_SIZE> theQueue;
- GTimeVal iniTimestamp;
- GTimeVal endTimestamp;
-
- std::cout << "=== Start of testing lock-free queue ===" << std::endl;
- g_get_current_time(&iniTimestamp);
- #pragma omp parallel shared(theQueue) num_threads (2)
- {
- if (omp_get_thread_num() == 0)
- {
- //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl;
-
- if (!omp_get_nested())
- {
- std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
- std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
- }
- }
-
- #pragma omp sections //nowait
- {
- #pragma omp section
- {
- // producer section. only 1 thread
- //#pragma omp parallel shared(theQueue) num_threads (1)
- {
- //if (omp_get_thread_num() == 0)
- //{
- // std::cout << "\t Producers: " << omp_get_num_threads() << std::endl;
- //}
- int i;
- for (i = 0 ; i < N_ITERATIONS ; i++)
- {
- while(!theQueue.push(i))
- {
- // queue full
- ;
- }
- }
- }
- }
-
- #pragma omp section
- {
- // consumer section
- #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
- {
- //if (omp_get_thread_num() == 0)
- //{
- // std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl;
- //}
-
- int i;
- int result;
- #pragma omp for schedule(static) private(i, result) nowait
- for (i = 0 ; i < N_ITERATIONS ; i++)
- {
- while (!theQueue.pop(result))
- {
- // queue empty
- ;
- }
-
-#if N_CONSUMERS == 1
- // if there are several consumers this test will fail
- if (i != result)
- {
- std::cout << "FAILED i=" << i << " result=" << result << std::endl;
- }
-#endif
- }
- }
- }
- }
-
- } // #pragma omp parallel
-
- g_get_current_time(&endTimestamp);
-
- // calculate elapsed time
- GTimeVal elapsedTime;
- if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
- {
- elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec;
- elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
- }
- else
- {
- elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1;
- elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
- }
-
- std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
- std::cout << "=== End of testing lock-free queue ===" << std::endl;
-}
-
-int main(int /*argc*/, char** /*argv*/)
-{
-
- TestLockFreeQueue();
- std::cout << "Done!!!" << std::endl;
-
- return 0;
-}
-
+++ /dev/null
-williams-queue
+++ /dev/null
-include ../benchmarks.mk
-
-TESTNAME = williams-queue
-
-all: $(TESTNAME)
-
-$(TESTNAME): $(TESTNAME).cc $(TESTNAME).h
- $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
-
-clean:
- rm -f $(TESTNAME) *.o
+++ /dev/null
-//#include <threads.h>
-#include <thread>
-
-#include "williams-queue.h"
-
-lock_free_queue<int> *queue;
-
-void threadA(void *arg)
-{
-}
-
-#define user_main main
-
-int user_main(int argc, char **argv)
-{
- /*thrd_t A, B;
-
- thrd_create(&A, &threadA, NULL);
- thrd_join(A);*/
- queue = new lock_free_queue<int>();
- std::thread t(threadA, (void *)NULL);
- t.join();
-
- return 0;
-}
+++ /dev/null
-/*
- * Lock-free queue code from
- * "C++ Concurrency in Action: Practical Multithreading", by Anthony Williams
- *
- * Code taken from:
- * http://www.manning.com/williams/CCiA_SourceCode.zip
- * http://www.manning.com/williams/
- */
-
-#include <memory>
-#include <atomic>
-
-template<typename T>
-class lock_free_queue
-{
-private:
- struct node;
- struct node_counter;
- node* pop_head()
- {
- node* const old_head=head.load();
- if(old_head==tail.load())
- {
- return nullptr;
- }
- head.store(old_head->next);
- return old_head;
- }
-
- struct counted_node_ptr
- {
- int external_count;
- node* ptr;
- };
- std::atomic<counted_node_ptr> head;
- std::atomic<counted_node_ptr> tail;
- struct node_counter
- {
- unsigned internal_count:30;
- unsigned external_counters:2;
- };
-
- struct node
- {
- std::atomic<T*> data;
- std::atomic<node_counter> count;
- std::atomic<counted_node_ptr> next;
- node()
- {
- node_counter new_count;
- new_count.internal_count=0;
- new_count.external_counters=2;
- count.store(new_count);
-
- counted_node_ptr emptynode = {0, nullptr};
- next = emptynode;
- }
- void release_ref()
- {
- node_counter old_counter=
- count.load(std::memory_order_relaxed);
- node_counter new_counter;
- do
- {
- new_counter=old_counter;
- --new_counter.internal_count;
- }
- while(!count.compare_exchange_strong(
- old_counter,new_counter,
- std::memory_order_acquire,std::memory_order_relaxed));
- if(!new_counter.internal_count &&
- !new_counter.external_counters)
- {
- delete this;
- }
- }
- };
-
- static void increase_external_count(
- std::atomic<counted_node_ptr>& counter,
- counted_node_ptr& old_counter)
- {
- counted_node_ptr new_counter;
- do
- {
- new_counter=old_counter;
- ++new_counter.external_count;
- }
- while(!counter.compare_exchange_strong(
- old_counter,new_counter,
- std::memory_order_acquire,std::memory_order_relaxed));
- old_counter.external_count=new_counter.external_count;
- }
-
- static void free_external_counter(counted_node_ptr &old_node_ptr)
- {
- node* const ptr=old_node_ptr.ptr;
- int const count_increase=old_node_ptr.external_count-2;
- node_counter old_counter=
- ptr->count.load(std::memory_order_relaxed);
- node_counter new_counter;
- do
- {
- new_counter=old_counter;
- --new_counter.external_counters;
- new_counter.internal_count+=count_increase;
- }
- while(!ptr->count.compare_exchange_strong(
- old_counter,new_counter,
- std::memory_order_acquire,std::memory_order_relaxed));
- if(!new_counter.internal_count &&
- !new_counter.external_counters)
- {
- delete ptr;
- }
- }
-public:
- std::unique_ptr<T> pop()
- {
- counted_node_ptr old_head=head.load(std::memory_order_relaxed);
- for(;;)
- {
- increase_external_count(head,old_head);
- node* const ptr=old_head.ptr;
- if(ptr==tail.load().ptr)
- {
- return std::unique_ptr<T>();
- }
- counted_node_ptr next=ptr->next.load();
- if(head.compare_exchange_strong(old_head,next))
- {
- T* const res=ptr->data.exchange(nullptr);
- free_external_counter(old_head);
- return std::unique_ptr<T>(res);
- }
- ptr->release_ref();
- }
- }
-
-private:
- void set_new_tail(counted_node_ptr &old_tail,
- counted_node_ptr const &new_tail)
- {
- node* const current_tail_ptr=old_tail.ptr;
- while(!tail.compare_exchange_weak(old_tail,new_tail) &&
- old_tail.ptr==current_tail_ptr);
- if(old_tail.ptr==current_tail_ptr)
- free_external_counter(old_tail);
- else
- current_tail_ptr->release_ref();
- }
-public:
- lock_free_queue()
- {
- counted_node_ptr newnode = {0, new node};
- head = newnode;
- tail = head.load();
- }
- // lock_free_queue(const lock_free_queue& other)=delete;
- // lock_free_queue& operator=(const lock_free_queue& other)=delete;
- ~lock_free_queue()
- {
- while(node* const old_head=head.load())
- {
- head.store(old_head->next);
- delete old_head;
- }
- }
-
- void push(T new_value)
- {
- std::unique_ptr<T> new_data(new T(new_value));
- counted_node_ptr new_next;
- new_next.ptr=new node;
- new_next.external_count=1;
- counted_node_ptr old_tail=tail.load();
- for(;;)
- {
- increase_external_count(tail,old_tail);
- T* old_data=nullptr;
- if(old_tail.ptr->data.compare_exchange_strong(
- old_data,new_data.get()))
- {
- counted_node_ptr old_next={0};
- if(!old_tail.ptr->next.compare_exchange_strong(
- old_next,new_next))
- {
- delete new_next.ptr;
- new_next=old_next;
- }
- set_new_tail(old_tail, new_next);
- new_data.release();
- break;
- }
- else
- {
- counted_node_ptr old_next={0};
- if(old_tail.ptr->next.compare_exchange_strong(
- old_next,new_next))
- {
- old_next=new_next;
- new_next.ptr=new node;
- }
- set_new_tail(old_tail, old_next);
- }
- }
- }
-};