1 // ============================================================================
2 // Copyright (c) 2009-2010 Faustino Frechilla
3 // All rights reserved.
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
8 // 1. Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // 3. The name of the author may not be used to endorse or promote products
14 // derived from this software without specific prior written permission.
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
28 /// @file q_blocking_queue_impl.h
29 /// @brief Implementation of a thread-safe queue based on glib system calls
30 /// It internally contains a std::queue which is protected from concurrent
31 /// access by glib mutextes and conditional variables
33 /// @author Faustino Frechilla
36 /// Faustino Frechilla 04-May-2009 Original development (based on pthreads)
37 /// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
40 // ============================================================================
42 #ifndef _GBLOCKINGQUEUEIMPL_H_
43 #define _GBLOCKINGQUEUEIMPL_H_
47 #define NANOSECONDS_PER_SECOND 1000000000
50 BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :
51 m_maximumSize(a_maxSize)
53 if (!g_thread_supported ())
55 // glib thread system hasn't been initialized yet
59 m_mutex = g_mutex_new();
60 m_cond = g_cond_new();
62 assert(m_mutex != NULL);
63 assert(m_cond != NULL);
67 BlockingQueue<T>::~BlockingQueue()
70 g_mutex_free(m_mutex);
74 bool BlockingQueue<T>::IsEmpty()
78 g_mutex_lock(m_mutex);
79 rv = m_theQueue.empty();
80 g_mutex_unlock(m_mutex);
86 bool BlockingQueue<T>::Push(const T &a_elem)
88 g_mutex_lock(m_mutex);
90 while (m_theQueue.size() >= m_maximumSize)
92 g_cond_wait(m_cond, m_mutex);
95 bool queueEmpty = m_theQueue.empty();
97 m_theQueue.push(a_elem);
101 // wake up threads waiting for stuff
102 g_cond_broadcast(m_cond);
105 g_mutex_unlock(m_mutex);
110 template <typename T>
111 bool BlockingQueue<T>::TryPush(const T &a_elem)
113 g_mutex_lock(m_mutex);
116 bool queueEmpty = m_theQueue.empty();
118 if (m_theQueue.size() < m_maximumSize)
120 m_theQueue.push(a_elem);
126 // wake up threads waiting for stuff
127 g_cond_broadcast(m_cond);
130 g_mutex_unlock(m_mutex);
135 template <typename T>
136 void BlockingQueue<T>::Pop(T &out_data)
138 g_mutex_lock(m_mutex);
140 while (m_theQueue.empty())
142 g_cond_wait(m_cond, m_mutex);
145 bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
147 out_data = m_theQueue.front();
152 // wake up threads waiting for stuff
153 g_cond_broadcast(m_cond);
156 g_mutex_unlock(m_mutex);
159 template <typename T>
160 bool BlockingQueue<T>::TryPop(T &out_data)
162 g_mutex_lock(m_mutex);
165 if (!m_theQueue.empty())
167 bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
169 out_data = m_theQueue.front();
174 // wake up threads waiting for stuff
175 g_cond_broadcast(m_cond);
181 g_mutex_unlock(m_mutex);
186 template <typename T>
187 bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)
189 g_mutex_lock(m_mutex);
191 // adding microsecs to now
193 g_get_current_time(&abs_time);
194 g_time_val_add(&abs_time, microsecs);
196 gboolean retcode = TRUE;
197 while (m_theQueue.empty() && (retcode != FALSE))
199 // Returns TRUE if cond was signalled, or FALSE on timeout
200 retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time);
204 bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
205 if (retcode != FALSE)
207 data = m_theQueue.front();
215 // wake up threads waiting for stuff
216 g_cond_broadcast(m_cond);
219 g_mutex_unlock(m_mutex);
224 #endif /* _GBLOCKINGQUEUEIMPL_H_ */