--- /dev/null
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue.h
+/// @brief Definition of a circular array based lock-free queue
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 11-Jul-2010 Original development
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
+#define __ARRAY_LOCK_FREE_QUEUE_H__
+
+#include <stdint.h> // uint32_t
+#include "atomic_ops.h" // atomic operations wrappers
+
+#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // (2^16)
+
+// define this variable if calls to "size" must return the real size of the
+// queue. If it is undefined that function will try to take a snapshot of
+// the queue, but returned value might be bogus
+#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1
+
+
+/// @brief Lock-free queue based on a circular array
+/// No allocation of extra memory for the nodes handling is needed, but it has to add
+/// extra overhead (extra CAS operation) when inserting to ensure the thread-safety of the queue
+/// ELEM_T represents the type of elementes pushed and popped from the queue
+/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure
+/// indexes in the circular queue keep stable when the uint32_t
+/// variable that holds the current position rolls over from FFFFFFFF
+/// to 0. For instance
+/// 2 -> 0x02
+/// 4 -> 0x04
+/// 8 -> 0x08
+/// 16 -> 0x10
+/// (...)
+/// 1024 -> 0x400
+/// 2048 -> 0x800
+///
+/// if queue size is not defined as requested, let's say, for
+/// instance 100, when current position is FFFFFFFF (4,294,967,295)
+/// index in the circular array is 4,294,967,295 % 100 = 95.
+/// When that value is incremented it will be set to 0, that is the
+/// last 4 elements of the queue are not used when the counter rolls
+/// over to 0
+template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
+class ArrayLockFreeQueue
+{
+public:
+ /// @brief constructor of the class
+ ArrayLockFreeQueue();
+ virtual ~ArrayLockFreeQueue();
+
+ /// @brief returns the current number of items in the queue
+ /// It tries to take a snapshot of the size of the queue, but in busy environments
+ /// this function might return bogus values.
+ ///
+ /// If a reliable queue size must be kept you might want to have a look at
+ /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE'
+ /// it enables a reliable size though it hits overall performance of the queue
+ /// (when the reliable size variable is on it's got an impact of about 20% in time)
+ uint32_t size();
+
+ /// @brief push an element at the tail of the queue
+ /// @param the element to insert in the queue
+ /// Note that the element is not a pointer or a reference, so if you are using large data
+ /// structures to be inserted in the queue you should think of instantiate the template
+ /// of the queue as a pointer to that large structure
+ /// @returns true if the element was inserted in the queue. False if the queue was full
+ bool push(const ELEM_T &a_data);
+
+ /// @brief pop the element at the head of the queue
+ /// @param a reference where the element in the head of the queue will be saved to
+ /// Note that the a_data parameter might contain rubbish if the function returns false
+ /// @returns true if the element was successfully extracted from the queue. False if the queue was empty
+ bool pop(ELEM_T &a_data);
+
+private:
+ /// @brief array to keep the elements
+ ELEM_T m_theQueue[Q_SIZE];
+
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ /// @brief number of elements in the queue
+ volatile uint32_t m_count;
+#endif
+
+ /// @brief where a new element will be inserted
+ volatile uint32_t m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ volatile uint32_t m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this index is only used for MultipleProducerThread queues
+ volatile uint32_t m_maximumReadIndex;
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+};
+
+// include the implementation file
+#include "array_lock_free_queue_impl.h"
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_H__
--- /dev/null
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue_impl.h
+/// @brief Implementation of a circular array based lock-free queue
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 11-Jul-2010 Original development
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
+#define __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
+
+#include <assert.h> // assert()
+#include <sched.h> // sched_yield()
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
+ m_writeIndex(0),
+ m_readIndex(0),
+ m_maximumReadIndex(0) // only for MultipleProducerThread queues
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ m_count = 0;
+#endif
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
+{
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
+{
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ return m_count;
+#else
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
+ // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_writeIndex is 5
+ // m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+
+ if (currentWriteIndex >= currentReadIndex)
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
+#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
+{
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+
+ do
+ {
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+ if (countToIndex(currentWriteIndex + 1) ==
+ countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return false;
+ }
+
+ } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+ // We know now that this index is reserved for us. Use it to save the data
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+ // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+ // inserting in the queue. It might fail if there are more than 1 producer threads because this
+ // operation has to be done in the same order as the previous CAS
+ while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+ {
+ // this is a good place to yield the thread in case there are more
+ // software threads than hardware processors and you have more
+ // than 1 producer thread
+ // have a look at sched_yield (POSIX.1b)
+ sched_yield();
+ }
+
+ // The value was successfully inserted into the queue
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicAdd(&m_count, 1);
+#endif
+
+ return true;
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
+{
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
+
+ do
+ {
+ // to ensure thread-safety when there is more than 1 producer thread
+ // a second index is defined (m_maximumReadIndex)
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_maximumReadIndex;
+
+ if (countToIndex(currentReadIndex) ==
+ countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return false;
+ }
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+ {
+ // got here. The value was retrieved from the queue. Note that the
+ // data inside the m_queue array is not deleted nor reseted
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicSub(&m_count, 1);
+#endif
+ return true;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while(1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return false;
+}
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
+
--- /dev/null
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue_single_producer.h
+/// @brief Definition of a circular array based lock-free queue
+///
+/// WARNING: This queue is not thread safe when several threads try to insert
+/// elements into the queue. It is allowed to use as many consumers
+/// as needed though.
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 11-Jul-2010 Original development
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
+#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
+
+#include <stdint.h> // uint32_t
+#include "atomic_ops.h" // atomic operations wrappers
+
+#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65536 // 2^16 = 65,536 elements by default
+
+// define this variable if calls to "size" must return the real size of the
+// queue. If it is undefined that function will try to take a snapshot of
+// the queue, but returned value might be bogus
+#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1
+
+
+/// @brief especialisation of the ArrayLockFreeQueue to be used when there is
+/// only one producer thread
+/// No allocation of extra memory for the nodes handling is needed
+/// WARNING: This queue is not thread safe when several threads try to insert elements
+/// into the queue
+/// ELEM_T represents the type of elementes pushed and popped from the queue
+/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure
+/// indexes in the circular queue keep stable when the uint32_t
+/// variable that holds the current position rolls over from FFFFFFFF
+/// to 0. For instance
+/// 2 -> 0x02
+/// 4 -> 0x04
+/// 8 -> 0x08
+/// 16 -> 0x10
+/// (...)
+/// 1024 -> 0x400
+/// 2048 -> 0x800
+///
+/// if queue size is not defined as requested, let's say, for
+/// instance 100, when current position is FFFFFFFF (4,294,967,295)
+/// index in the circular array is 4,294,967,295 % 100 = 95.
+/// When that value is incremented it will be set to 0, that is the
+/// last 4 elements of the queue are not used when the counter rolls
+/// over to 0
+template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
+class ArrayLockFreeQueueSingleProducer
+{
+public:
+ /// @brief constructor of the class
+ ArrayLockFreeQueueSingleProducer();
+ virtual ~ArrayLockFreeQueueSingleProducer();
+
+ /// @brief returns the current number of items in the queue
+ /// It tries to take a snapshot of the size of the queue, but in busy environments
+ /// this function might return bogus values.
+ ///
+ /// If a reliable queue size must be kept you might want to have a look at
+ /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE'
+ /// it enables a reliable size though it hits overall performance of the queue
+ /// (when the reliable size variable is on it's got an impact of about 20% in time)
+ uint32_t size();
+
+ /// @brief push an element at the tail of the queue
+ /// @param the element to insert in the queue
+ /// Note that the element is not a pointer or a reference, so if you are using large data
+ /// structures to be inserted in the queue you should think of instantiate the template
+ /// of the queue as a pointer to that large structure
+ /// @returns true if the element was inserted in the queue. False if the queue was full
+ bool push(const ELEM_T &a_data);
+
+ /// @brief pop the element at the head of the queue
+ /// @param a reference where the element in the head of the queue will be saved to
+ /// Note that the a_data parameter might contain rubbish if the function returns false
+ /// @returns true if the element was successfully extracted from the queue. False if the queue was empty
+ bool pop(ELEM_T &a_data);
+
+private:
+ /// @brief array to keep the elements
+ ELEM_T m_theQueue[Q_SIZE];
+
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ /// @brief number of elements in the queue
+ volatile uint32_t m_count;
+#endif
+
+ /// @brief where a new element will be inserted
+ volatile uint32_t m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ volatile uint32_t m_readIndex;
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+};
+
+// include the implementation file
+#include "array_lock_free_queue_single_producer_impl.h"
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
--- /dev/null
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue_single_producer_impl.h
+/// @brief Implementation of a circular array based lock-free queue
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 11-Jul-2010 Original development
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
+#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
+
+#include <assert.h> // assert()
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::ArrayLockFreeQueueSingleProducer() :
+ m_writeIndex(0),
+ m_readIndex(0)
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ m_count = 0;
+#endif
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::~ArrayLockFreeQueueSingleProducer()
+{
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+inline
+uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
+{
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::size()
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ return m_count;
+#else
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
+ // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_writeIndex is 5
+ // m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+
+ if (currentWriteIndex >= currentReadIndex)
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
+#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
+{
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+ if (countToIndex(currentWriteIndex + 1) ==
+ countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return false;
+ }
+
+ // save the date into the q
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+ // increment atomically write index. Now a consumer thread can read
+ // the piece of data that was just stored
+ AtomicAdd(&m_writeIndex, 1);
+
+ // The value was successfully inserted into the queue
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicAdd(&m_count, 1);
+#endif
+
+ return true;
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
+{
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
+
+ do
+ {
+ // m_maximumReadIndex doesn't exist when the queue is set up as
+ // single-producer. The maximum read index is described by the current
+ // write index
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_writeIndex;
+
+ if (countToIndex(currentReadIndex) ==
+ countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return false;
+ }
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+ {
+ // got here. The value was retrieved from the queue. Note that the
+ // data inside the m_queue array is not deleted nor reseted
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicSub(&m_count, 1);
+#endif
+ return true;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while(1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return false;
+}
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
+
--- /dev/null
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file atomic_ops.h
+/// @brief This file contains functions to wrap the built-in atomic operations
+/// defined for your compiler
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 11-Jul-2010 Original development. GCC support
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef __ATOMIC_OPS_H
+#define __ATOMIC_OPS_H
+
+#ifdef __GNUC__
+// Atomic functions in GCC are present from version 4.1.0 on
+// http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html
+
+// Test for GCC >= 4.1.0
+#if (__GNUC__ < 4) || \
+ ((__GNUC__ == 4) && ((__GNUC_MINOR__ < 1) || \
+ ((__GNUC_MINOR__ == 1) && \
+ (__GNUC_PATCHLEVEL__ < 0))) )
+
+#error Atomic built-in functions are only available in GCC in versions >= 4.1.0
+#endif // end of check for GCC 4.1.0
+
+/// @brief atomically adds a_count to the variable pointed by a_ptr
+/// @return the value that had previously been in memory
+#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
+
+/// @brief atomically substracts a_count from the variable pointed by a_ptr
+/// @return the value that had previously been in memory
+#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
+
+/// @brief Compare And Swap
+/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
+/// @return true if the comparison is successful and a_newVal was written
+#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
+
+/// @brief Compare And Swap
+/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
+/// @return the contents of *a_ptr before the operation
+#define CASVal(a_ptr, a_oldVal, a_newVal) __sync_val_compare_and_swap(a_ptr, a_oldVal, a_newVal)
+
+#else
+#error Atomic functions such as CAS or AtomicAdd are not defined for your compiler. Please add them in atomic_ops.h
+#endif // __GNUC__
+
+
+#endif // __ATOMIC_OPS_H
--- /dev/null
+// ============================================================================
+// Copyright (c) 2009-2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file q_blocking_queue.h
+/// @brief Definition of a thread-safe queue based on glib system calls
+/// It internally contains a std::queue which is protected from concurrent
+/// access by glib mutextes and conditional variables
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 04-May-2009 Original development (based on pthreads)
+/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef _GBLOCKINGQUEUE_H_
+#define _GBLOCKINGQUEUE_H_
+
+#include <glib.h>
+#include <queue>
+#include <limits> // std::numeric_limits<>::max
+
+#define BLOCKING_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max()
+
+/// @brief blocking thread-safe queue
+/// It uses a mutex+condition variables to protect the internal queue
+/// implementation. Inserting or reading elements use the same mutex
+template <typename T>
+class BlockingQueue
+{
+public:
+ BlockingQueue(std::size_t a_maxSize = BLOCKING_QUEUE_DEFAULT_MAX_SIZE);
+ ~BlockingQueue();
+
+ /// @brief Check if the queue is empty
+ /// This call can block if another thread owns the lock that protects the
+ /// queue
+ /// @return true if the queue is empty. False otherwise
+ bool IsEmpty();
+
+ /// @brief inserts an element into queue queue
+ /// This call can block if another thread owns the lock that protects the
+ /// queue. If the queue is full The thread will be blocked in this queue
+ /// until someone else gets an element from the queue
+ /// @param element to insert into the queue
+ /// @return True if the elem was successfully inserted into the queue.
+ /// False otherwise
+ bool Push(const T &a_elem);
+
+ /// @brief inserts an element into queue queue
+ /// This call can block if another thread owns the lock that protects the
+ /// queue. If the queue is full The call will return false and the element
+ /// won't be inserted
+ /// @param element to insert into the queue
+ /// @return True if the elem was successfully inserted into the queue.
+ /// False otherwise
+ bool TryPush(const T &a_elem);
+
+ /// @brief extracts an element from the queue (and deletes it from the q)
+ /// If the queue is empty this call will block the thread until there is
+ /// something in the queue to be extracted
+ /// @param a reference where the element from the queue will be saved to
+ void Pop(T &out_data);
+
+ /// @brief extracts an element from the queue (and deletes it from the q)
+ /// This call gets the block that protects the queue. It will extract the
+ /// element from the queue only if there are elements in it
+ /// @param reference to the variable where the result will be saved
+ /// @return True if the element was retrieved from the queue.
+ /// False if the queue was empty
+ bool TryPop(T &out_data);
+
+ /// @brief extracts an element from the queue (and deletes it from the q)
+ /// If the queue is empty this call will block the thread until there
+ /// is something in the queue to be extracted or until the timer
+ /// (2nd parameter) expires
+ /// @param reference to the variable where the result will be saved
+ /// @param microsecondsto wait before returning if the queue was empty
+ /// @return True if the element was retrieved from the queue.
+ /// False if the timeout was reached
+ bool TimedWaitPop(T &data, glong microsecs);
+
+protected:
+ std::queue<T> m_theQueue;
+ /// maximum number of elements for the queue
+ std::size_t m_maximumSize;
+ /// Mutex to protect the queue
+ GMutex* m_mutex;
+ /// Conditional variable to wake up threads
+ GCond* m_cond;
+};
+
+// include the implementation file
+#include "g_blocking_queue_impl.h"
+
+#endif /* _GBLOCKINGQUEUE_H_ */
--- /dev/null
+// ============================================================================
+// Copyright (c) 2009-2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// 3. The name of the author may not be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file q_blocking_queue_impl.h
+/// @brief Implementation of a thread-safe queue based on glib system calls
+/// It internally contains a std::queue which is protected from concurrent
+/// access by glib mutextes and conditional variables
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref Who When What
+/// Faustino Frechilla 04-May-2009 Original development (based on pthreads)
+/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef _GBLOCKINGQUEUEIMPL_H_
+#define _GBLOCKINGQUEUEIMPL_H_
+
+#include <assert.h>
+
+#define NANOSECONDS_PER_SECOND 1000000000
+
+template <typename T>
+BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :
+ m_maximumSize(a_maxSize)
+{
+ if (!g_thread_supported ())
+ {
+ // glib thread system hasn't been initialized yet
+ g_thread_init(NULL);
+ }
+
+ m_mutex = g_mutex_new();
+ m_cond = g_cond_new();
+
+ assert(m_mutex != NULL);
+ assert(m_cond != NULL);
+}
+
+template <typename T>
+BlockingQueue<T>::~BlockingQueue()
+{
+ g_cond_free(m_cond);
+ g_mutex_free(m_mutex);
+}
+
+template <typename T>
+bool BlockingQueue<T>::IsEmpty()
+{
+ bool rv;
+
+ g_mutex_lock(m_mutex);
+ rv = m_theQueue.empty();
+ g_mutex_unlock(m_mutex);
+
+ return rv;
+}
+
+template <typename T>
+bool BlockingQueue<T>::Push(const T &a_elem)
+{
+ g_mutex_lock(m_mutex);
+
+ while (m_theQueue.size() >= m_maximumSize)
+ {
+ g_cond_wait(m_cond, m_mutex);
+ }
+
+ bool queueEmpty = m_theQueue.empty();
+
+ m_theQueue.push(a_elem);
+
+ if (queueEmpty)
+ {
+ // wake up threads waiting for stuff
+ g_cond_broadcast(m_cond);
+ }
+
+ g_mutex_unlock(m_mutex);
+
+ return true;
+}
+
+template <typename T>
+bool BlockingQueue<T>::TryPush(const T &a_elem)
+{
+ g_mutex_lock(m_mutex);
+
+ bool rv = false;
+ bool queueEmpty = m_theQueue.empty();
+
+ if (m_theQueue.size() < m_maximumSize)
+ {
+ m_theQueue.push(a_elem);
+ rv = true;
+ }
+
+ if (queueEmpty)
+ {
+ // wake up threads waiting for stuff
+ g_cond_broadcast(m_cond);
+ }
+
+ g_mutex_unlock(m_mutex);
+
+ return rv;
+}
+
+template <typename T>
+void BlockingQueue<T>::Pop(T &out_data)
+{
+ g_mutex_lock(m_mutex);
+
+ while (m_theQueue.empty())
+ {
+ g_cond_wait(m_cond, m_mutex);
+ }
+
+ bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+ out_data = m_theQueue.front();
+ m_theQueue.pop();
+
+ if (queueFull)
+ {
+ // wake up threads waiting for stuff
+ g_cond_broadcast(m_cond);
+ }
+
+ g_mutex_unlock(m_mutex);
+}
+
+template <typename T>
+bool BlockingQueue<T>::TryPop(T &out_data)
+{
+ g_mutex_lock(m_mutex);
+
+ bool rv = false;
+ if (!m_theQueue.empty())
+ {
+ bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+ out_data = m_theQueue.front();
+ m_theQueue.pop();
+
+ if (queueFull)
+ {
+ // wake up threads waiting for stuff
+ g_cond_broadcast(m_cond);
+ }
+
+ rv = true;
+ }
+
+ g_mutex_unlock(m_mutex);
+
+ return rv;
+}
+
+template <typename T>
+bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)
+{
+ g_mutex_lock(m_mutex);
+
+ // adding microsecs to now
+ GTimeVal abs_time;
+ g_get_current_time(&abs_time);
+ g_time_val_add(&abs_time, microsecs);
+
+ gboolean retcode = TRUE;
+ while (m_theQueue.empty() && (retcode != FALSE))
+ {
+ // Returns TRUE if cond was signalled, or FALSE on timeout
+ retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time);
+ }
+
+ bool rv = false;
+ bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+ if (retcode != FALSE)
+ {
+ data = m_theQueue.front();
+ m_theQueue.pop();
+
+ rv = true;
+ }
+
+ if (rv && queueFull)
+ {
+ // wake up threads waiting for stuff
+ g_cond_broadcast(m_cond);
+ }
+
+ g_mutex_unlock(m_mutex);
+
+ return rv;
+}
+
+#endif /* _GBLOCKINGQUEUEIMPL_H_ */
--- /dev/null
+# -fno-schedule-insns -fno-rerun-loop-opt are a workaround for a compiler error in 4.2
+# -Wno-unused-parameter
+
+CC = g++
+CFLAGS = -g -O3 -fopenmp -fno-schedule-insns -fno-schedule-insns2 -W -Wall #-Wno-unused-parameter
+CFLAGS += `pkg-config --cflags glib-2.0`
+#CFLAGS += -march=i686
+#CFLAGS += -march=core2
+LDFLAGS = -lgomp
+LDFLAGS+= `pkg-config --libs glib-2.0`
+# g_blocking_queue also depends on gthread-2.0
+CFLAGS_GTHREAD = `pkg-config gthread-2.0`
+LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`
+
+#compile-time parameters
+ifdef N_PRODUCERS
+CFLAGS += -DN_PRODUCERS=$(N_PRODUCERS)
+endif
+ifdef N_CONSUMERS
+CFLAGS += -DN_CONSUMERS=$(N_CONSUMERS)
+endif
+ifdef N_ITERATIONS
+CFLAGS += -DN_ITERATIONS=$(N_ITERATIONS)
+endif
+ifdef QUEUE_SIZE
+CFLAGS += -DQUEUE_SIZE=$(QUEUE_SIZE)
+endif
+
+
+LOCK_FREE_Q_INCLUDE = \
+ array_lock_free_queue.h \
+ array_lock_free_queue_impl.h
+
+BLOCKING_Q_INCLUDE = \
+ g_blocking_queue.h \
+ g_blocking_queue_impl.h
+
+LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \
+ array_lock_free_queue_single_producer.h \
+ array_lock_free_queue_single_producer_impl.h
+
+SHARED_INCLUDE = \
+ atomic_ops.h
+
+all : test_lock_free_q test_lock_free_single_producer_q test_blocking_q
+
+test_lock_free_q : test_lock_free_q.o
+ $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS)
+
+test_blocking_q : test_blocking_q.o
+ $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS) $(LDFLAGS_GTHREAD)
+
+test_lock_free_single_producer_q : test_lock_free_single_producer_q.o
+ $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS)
+
+test_lock_free_q.o : test_lock_free_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_Q_INCLUDE)
+ $(CC) -c $< $(CFLAGS)
+
+test_lock_free_single_producer_q.o : test_lock_free_single_producer_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE)
+ $(CC) -c $< $(CFLAGS)
+
+test_blocking_q.o: test_blocking_q.cpp $(SHARED_INCLUDE) $(BLOCKING_Q_INCLUDE)
+ $(CC) -c $< $(CFLAGS) $(CFLAGS_GTHREAD)
+
+clean:
+ rm test_lock_free_q test_blocking_q test_lock_free_single_producer_q; rm *.o
+
--- /dev/null
+!_TAG_FILE_FORMAT 2 /extended format; --format=1 will not append ;" to lines/
+!_TAG_FILE_SORTED 1 /0=unsorted, 1=sorted, 2=foldcase/
+!_TAG_PROGRAM_AUTHOR Darren Hiebert /dhiebert@users.sourceforge.net/
+!_TAG_PROGRAM_NAME Exuberant Ctags //
+!_TAG_PROGRAM_URL http://ctags.sourceforge.net /official site/
+!_TAG_PROGRAM_VERSION 5.9~svn20110310 //
+ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue.h 45;" d
+ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue_single_producer.h 49;" d
+ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE array_lock_free_queue.h 50;" d
+ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE array_lock_free_queue_single_producer.h 54;" d
+ArrayLockFreeQueue array_lock_free_queue.h /^class ArrayLockFreeQueue$/;" c
+ArrayLockFreeQueue array_lock_free_queue_impl.h /^ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :$/;" f class:ArrayLockFreeQueue
+ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer.h /^class ArrayLockFreeQueueSingleProducer$/;" c
+ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer_impl.h /^ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::ArrayLockFreeQueueSingleProducer() :$/;" f class:ArrayLockFreeQueueSingleProducer
+AtomicAdd atomic_ops.h 51;" d
+AtomicAdd atomic_ops.h 87;" d
+AtomicSub atomic_ops.h 55;" d
+AtomicSub atomic_ops.h 91;" d
+BLOCKING_QUEUE_DEFAULT_MAX_SIZE g_blocking_queue.h 49;" d
+BLOCKING_Q_INCLUDE makefile /^BLOCKING_Q_INCLUDE = \\$/;" m
+BlockingQueue g_blocking_queue.h /^class BlockingQueue$/;" c
+BlockingQueue g_blocking_queue_impl.h /^BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :$/;" f class:BlockingQueue
+CAS atomic_ops.h 60;" d
+CAS atomic_ops.h 96;" d
+CASVal atomic_ops.h 101;" d
+CASVal atomic_ops.h 65;" d
+CC makefile /^CC = g++$/;" m
+CFLAGS_GTHREAD makefile /^CFLAGS_GTHREAD = `pkg-config gthread-2.0`$/;" m
+IsEmpty g_blocking_queue_impl.h /^bool BlockingQueue<T>::IsEmpty()$/;" f class:BlockingQueue
+LDFLAGS_GTHREAD makefile /^LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`$/;" m
+LOCK_FREE_Q_INCLUDE makefile /^LOCK_FREE_Q_INCLUDE = \\$/;" m
+LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE makefile /^LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \\$/;" m
+NANOSECONDS_PER_SECOND g_blocking_queue_impl.h 47;" d
+N_CONSUMERS test_blocking_q.cpp 17;" d file:
+N_CONSUMERS test_lock_free_q.cpp 17;" d file:
+N_CONSUMERS test_lock_free_single_producer_q.cpp 13;" d file:
+N_ITERATIONS test_blocking_q.cpp 21;" d file:
+N_ITERATIONS test_lock_free_q.cpp 21;" d file:
+N_ITERATIONS test_lock_free_single_producer_q.cpp 17;" d file:
+N_PRODUCERS test_blocking_q.cpp 13;" d file:
+N_PRODUCERS test_lock_free_q.cpp 13;" d file:
+Pop g_blocking_queue_impl.h /^void BlockingQueue<T>::Pop(T &out_data)$/;" f class:BlockingQueue
+Push g_blocking_queue_impl.h /^bool BlockingQueue<T>::Push(const T &a_elem)$/;" f class:BlockingQueue
+QUEUE_SIZE test_blocking_q.cpp 25;" d file:
+QUEUE_SIZE test_lock_free_q.cpp 25;" d file:
+QUEUE_SIZE test_lock_free_single_producer_q.cpp 21;" d file:
+SHARED_INCLUDE makefile /^SHARED_INCLUDE = \\$/;" m
+TestBlockingQueue test_blocking_q.cpp /^void TestBlockingQueue()$/;" f
+TestLockFreeQueue test_lock_free_q.cpp /^void TestLockFreeQueue()$/;" f
+TestLockFreeQueue test_lock_free_single_producer_q.cpp /^void TestLockFreeQueue()$/;" f
+TimedWaitPop g_blocking_queue_impl.h /^bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)$/;" f class:BlockingQueue
+TryPop g_blocking_queue_impl.h /^bool BlockingQueue<T>::TryPop(T &out_data)$/;" f class:BlockingQueue
+TryPush g_blocking_queue_impl.h /^bool BlockingQueue<T>::TryPush(const T &a_elem)$/;" f class:BlockingQueue
+_GBLOCKINGQUEUEIMPL_H_ g_blocking_queue_impl.h 43;" d
+_GBLOCKINGQUEUE_H_ g_blocking_queue.h 43;" d
+__ARRAY_LOCK_FREE_QUEUE_H__ array_lock_free_queue.h 40;" d
+__ARRAY_LOCK_FREE_QUEUE_IMPL_H__ array_lock_free_queue_impl.h 40;" d
+__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__ array_lock_free_queue_single_producer.h 44;" d
+__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__ array_lock_free_queue_single_producer_impl.h 40;" d
+__ATOMIC_OPS_H atomic_ops.h 41;" d
+atomic32 atomic_ops.h /^typedef atomic_uint atomic32;$/;" t
+atomic32 atomic_ops.h /^typedef volatile uint32 atomic32;$/;" t
+countToIndex array_lock_free_queue_impl.h /^uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)$/;" f class:ArrayLockFreeQueue
+countToIndex array_lock_free_queue_single_producer_impl.h /^uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)$/;" f class:ArrayLockFreeQueueSingleProducer
+m_cond g_blocking_queue.h /^ GCond* m_cond;$/;" m class:BlockingQueue
+m_count array_lock_free_queue.h /^ atomic32 m_count;$/;" m class:ArrayLockFreeQueue
+m_count array_lock_free_queue_single_producer.h /^ atomic32 m_count;$/;" m class:ArrayLockFreeQueueSingleProducer
+m_maximumReadIndex array_lock_free_queue.h /^ atomic32 m_maximumReadIndex;$/;" m class:ArrayLockFreeQueue
+m_maximumSize g_blocking_queue.h /^ std::size_t m_maximumSize;$/;" m class:BlockingQueue
+m_mutex g_blocking_queue.h /^ GMutex* m_mutex;$/;" m class:BlockingQueue
+m_readIndex array_lock_free_queue.h /^ atomic32 m_readIndex;$/;" m class:ArrayLockFreeQueue
+m_readIndex array_lock_free_queue_single_producer.h /^ atomic32 m_readIndex;$/;" m class:ArrayLockFreeQueueSingleProducer
+m_theQueue array_lock_free_queue.h /^ ELEM_T m_theQueue[Q_SIZE];$/;" m class:ArrayLockFreeQueue
+m_theQueue array_lock_free_queue_single_producer.h /^ ELEM_T m_theQueue[Q_SIZE];$/;" m class:ArrayLockFreeQueueSingleProducer
+m_theQueue g_blocking_queue.h /^ std::queue<T> m_theQueue;$/;" m class:BlockingQueue
+m_writeIndex array_lock_free_queue.h /^ atomic32 m_writeIndex;$/;" m class:ArrayLockFreeQueue
+m_writeIndex array_lock_free_queue_single_producer.h /^ atomic32 m_writeIndex;$/;" m class:ArrayLockFreeQueueSingleProducer
+main test_blocking_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f
+main test_lock_free_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f
+main test_lock_free_single_producer_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f
+pop array_lock_free_queue_impl.h /^bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)$/;" f class:ArrayLockFreeQueue
+pop array_lock_free_queue_single_producer_impl.h /^bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)$/;" f class:ArrayLockFreeQueueSingleProducer
+push array_lock_free_queue_impl.h /^bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)$/;" f class:ArrayLockFreeQueue
+push array_lock_free_queue_single_producer_impl.h /^bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)$/;" f class:ArrayLockFreeQueueSingleProducer
+size array_lock_free_queue_impl.h /^uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()$/;" f class:ArrayLockFreeQueue
+size array_lock_free_queue_single_producer_impl.h /^uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::size()$/;" f class:ArrayLockFreeQueueSingleProducer
+~ArrayLockFreeQueue array_lock_free_queue_impl.h /^ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()$/;" f class:ArrayLockFreeQueue
+~ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer_impl.h /^ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::~ArrayLockFreeQueueSingleProducer()$/;" f class:ArrayLockFreeQueueSingleProducer
+~BlockingQueue g_blocking_queue_impl.h /^BlockingQueue<T>::~BlockingQueue()$/;" f class:BlockingQueue
--- /dev/null
+// ============================================================================
+/// @file test_blocking_q.cpp
+/// @brief Benchmark blocking queue
+// ============================================================================
+
+
+#include <iostream>
+#include <glib.h> // GTimeVal + g_get_current_time
+#include <omp.h> // parallel processing support in gcc
+#include "g_blocking_queue.h"
+
+#ifndef N_PRODUCERS
+#define N_PRODUCERS 1
+#endif
+
+#ifndef N_CONSUMERS
+#define N_CONSUMERS 1
+#endif
+
+#ifndef N_ITERATIONS
+#define N_ITERATIONS 10000000
+#endif
+
+#ifndef QUEUE_SIZE
+#define QUEUE_SIZE 1000
+#endif
+
+void TestBlockingQueue()
+{
+ BlockingQueue<int> theQueue(QUEUE_SIZE);
+ GTimeVal iniTimestamp;
+ GTimeVal endTimestamp;
+
+ std::cout << "=== Start of testing blocking queue ===" << std::endl;
+ g_get_current_time(&iniTimestamp);
+ #pragma omp parallel shared(theQueue) num_threads (2)
+ {
+ if (omp_get_thread_num() == 0)
+ {
+ if (!omp_get_nested())
+ {
+ std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
+ std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
+ }
+ }
+
+ #pragma omp sections //nowait
+ {
+ #pragma omp section
+ {
+ // producer section
+ #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS)
+ {
+ int i;
+ #pragma omp for schedule(static) private(i) nowait
+ for (i = 0 ; i < N_ITERATIONS ; i++)
+ {
+ while(!theQueue.Push(i))
+ {
+ // queue full
+ }
+ }
+ }
+ }
+
+ #pragma omp section
+ {
+ // consumer section
+ #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
+ {
+ int i;
+ int result;
+ #pragma omp for schedule(static) private(i, result) nowait
+ for (i = 0 ; i < N_ITERATIONS ; i++)
+ {
+ // this call will block if the queue is empty until
+ // some other thread pushes something into it
+ theQueue.Pop(result);
+ }
+ }
+ }
+ }
+
+ } // #pragma omp parallel
+
+ g_get_current_time(&endTimestamp);
+
+ // calculate elapsed time
+ GTimeVal elapsedTime;
+ if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
+ {
+ elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec;
+ elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
+ }
+ else
+ {
+ elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1;
+ elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
+ }
+
+ std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
+ std::cout << "=== End of testing blocking queue ===" << std::endl;
+}
+
+int main(int /*argc*/, char** /*argv*/)
+{
+ TestBlockingQueue();
+ std::cout << "Done!!!" << std::endl;
+
+ return 0;
+}
+
--- /dev/null
+// ============================================================================
+/// @file test_lock_free_q.cpp
+/// @brief Benchmark lock free queue
+// ============================================================================
+
+
+#include <iostream>
+#include <glib.h> // GTimeVal + g_get_current_time
+#include <omp.h> // parallel processing support in gcc
+#include "array_lock_free_queue.h"
+
+#ifndef N_PRODUCERS
+#define N_PRODUCERS 1
+#endif
+
+#ifndef N_CONSUMERS
+#define N_CONSUMERS 1
+#endif
+
+#ifndef N_ITERATIONS
+#define N_ITERATIONS 10000000
+#endif
+
+#ifndef QUEUE_SIZE
+#define QUEUE_SIZE 1024
+#endif
+
+void TestLockFreeQueue()
+{
+ ArrayLockFreeQueue<int, QUEUE_SIZE> theQueue;
+ GTimeVal iniTimestamp;
+ GTimeVal endTimestamp;
+
+ std::cout << "=== Start of testing lock-free queue ===" << std::endl;
+ g_get_current_time(&iniTimestamp);
+ #pragma omp parallel shared(theQueue) num_threads (2)
+ {
+ if (omp_get_thread_num() == 0)
+ {
+ //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl;
+
+ if (!omp_get_nested())
+ {
+ std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
+ std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
+ }
+ }
+
+ #pragma omp sections //nowait
+ {
+ #pragma omp section
+ {
+ // producer section
+ #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS)
+ {
+ //if (omp_get_thread_num() == 0)
+ //{
+ // std::cout << "\t Producers: " << omp_get_num_threads() << std::endl;
+ //}
+ int i;
+ #pragma omp for schedule(static) private(i) nowait
+ for (i = 0 ; i < N_ITERATIONS ; i++)
+ {
+ while(!theQueue.push(i))
+ {
+ // queue full
+ }
+ }
+ }
+ }
+
+ #pragma omp section
+ {
+ // consumer section
+ #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
+ {
+ //if (omp_get_thread_num() == 0)
+ //{
+ // std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl;
+ //}
+
+ int i;
+ int result;
+ #pragma omp for schedule(static) private(i, result) nowait
+ for (i = 0 ; i < N_ITERATIONS ; i++)
+ {
+ while (!theQueue.pop(result))
+ {
+ // queue empty
+ }
+
+#if (N_CONSUMERS == 1 && N_PRODUCERS == 1)
+ if (i != result)
+ {
+ std::cout << "FAILED i=" << i << " result=" << result << std::endl;
+ }
+#endif
+ }
+ }
+ }
+ }
+
+ } // #pragma omp parallel
+
+ g_get_current_time(&endTimestamp);
+
+ // calculate elapsed time
+ GTimeVal elapsedTime;
+ if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
+ {
+ elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec;
+ elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
+ }
+ else
+ {
+ elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1;
+ elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
+ }
+
+ std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
+ std::cout << "=== End of testing lock-free queue ===" << std::endl;
+}
+
+int main(int /*argc*/, char** /*argv*/)
+{
+
+ TestLockFreeQueue();
+ std::cout << "Done!!!" << std::endl;
+
+ return 0;
+}
+
--- /dev/null
+// ============================================================================
+/// @file test_lock_free_q.cpp
+/// @brief Benchmark lock free queue
+// ============================================================================
+
+
+#include <iostream>
+#include <glib.h> // GTimeVal + g_get_current_time
+#include <omp.h> // parallel processing support in gcc
+#include "array_lock_free_queue_single_producer.h"
+
+#ifndef N_CONSUMERS
+#define N_CONSUMERS 1
+#endif
+
+#ifndef N_ITERATIONS
+#define N_ITERATIONS 10000000
+#endif
+
+#ifndef QUEUE_SIZE
+#define QUEUE_SIZE 1000
+#endif
+
+void TestLockFreeQueue()
+{
+ ArrayLockFreeQueueSingleProducer<int, QUEUE_SIZE> theQueue;
+ GTimeVal iniTimestamp;
+ GTimeVal endTimestamp;
+
+ std::cout << "=== Start of testing lock-free queue ===" << std::endl;
+ g_get_current_time(&iniTimestamp);
+ #pragma omp parallel shared(theQueue) num_threads (2)
+ {
+ if (omp_get_thread_num() == 0)
+ {
+ //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl;
+
+ if (!omp_get_nested())
+ {
+ std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
+ std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
+ }
+ }
+
+ #pragma omp sections //nowait
+ {
+ #pragma omp section
+ {
+ // producer section. only 1 thread
+ //#pragma omp parallel shared(theQueue) num_threads (1)
+ {
+ //if (omp_get_thread_num() == 0)
+ //{
+ // std::cout << "\t Producers: " << omp_get_num_threads() << std::endl;
+ //}
+ int i;
+ for (i = 0 ; i < N_ITERATIONS ; i++)
+ {
+ while(!theQueue.push(i))
+ {
+ // queue full
+ ;
+ }
+ }
+ }
+ }
+
+ #pragma omp section
+ {
+ // consumer section
+ #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
+ {
+ //if (omp_get_thread_num() == 0)
+ //{
+ // std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl;
+ //}
+
+ int i;
+ int result;
+ #pragma omp for schedule(static) private(i, result) nowait
+ for (i = 0 ; i < N_ITERATIONS ; i++)
+ {
+ while (!theQueue.pop(result))
+ {
+ // queue empty
+ ;
+ }
+
+#if N_CONSUMERS == 1
+ // if there are several consumers this test will fail
+ if (i != result)
+ {
+ std::cout << "FAILED i=" << i << " result=" << result << std::endl;
+ }
+#endif
+ }
+ }
+ }
+ }
+
+ } // #pragma omp parallel
+
+ g_get_current_time(&endTimestamp);
+
+ // calculate elapsed time
+ GTimeVal elapsedTime;
+ if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
+ {
+ elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec;
+ elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
+ }
+ else
+ {
+ elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1;
+ elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
+ }
+
+ std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
+ std::cout << "=== End of testing lock-free queue ===" << std::endl;
+}
+
+int main(int /*argc*/, char** /*argv*/)
+{
+
+ TestLockFreeQueue();
+ std::cout << "Done!!!" << std::endl;
+
+ return 0;
+}
+