+#ifndef _MPMC_QUEUE_H
+#define _MPMC_QUEUE_H
+
#include <stdatomic.h>
#include <unrelacy.h>
-template <typename t_element, size_t t_size>
+template <typename t_element>
struct mpmc_boundq_1_alt
{
private:
-
+ // buffer capacity
+ size_t t_size;
// elements should generally be cache-line-size padded :
- t_element m_array[t_size];
+ t_element *m_array;
// rdwr counts the reads & writes that have started
atomic<unsigned int> m_rdwr;
public:
- mpmc_boundq_1_alt()
+ mpmc_boundq_1_alt(int size)
{
+ t_size = size;
+ m_array = new t_element[size];
m_rdwr = 0;
m_read = 0;
m_written = 0;
//-----------------------------------------------------
- t_element * read_fetch() {
- unsigned int rdwr = m_rdwr.load(mo_acquire);
- unsigned int rd,wr;
- for(;;) {
- rd = (rdwr>>16) & 0xFFFF;
- wr = rdwr & 0xFFFF;
-
- if ( wr == rd ) // empty
- return NULL;
-
- if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
- break;
- else
- thrd_yield();
- }
-
- // (*1)
- rl::backoff bo;
- while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
- thrd_yield();
- }
-
- t_element * p = & ( m_array[ rd % t_size ] );
-
- return p;
- }
-
- void read_consume() {
- m_read.fetch_add(1,mo_release);
- }
+ t_element * read_fetch();
- //-----------------------------------------------------
+ void read_consume(t_element *bin);
- t_element * write_prepare() {
- unsigned int rdwr = m_rdwr.load(mo_acquire);
- unsigned int rd,wr;
- for(;;) {
- rd = (rdwr>>16) & 0xFFFF;
- wr = rdwr & 0xFFFF;
+ t_element * write_prepare();
- if ( wr == ((rd + t_size)&0xFFFF) ) // full
- return NULL;
+ void write_publish(t_element *bin);
+};
- if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
- break;
- else
- thrd_yield();
- }
+mpmc_boundq_1_alt<int32_t>* createMPMC(int size);
- // (*1)
- rl::backoff bo;
- while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
- thrd_yield();
- }
+void destroyMPMC(mpmc_boundq_1_alt<int32_t> *q);
- t_element * p = & ( m_array[ wr % t_size ] );
+int32_t * read_fetch(mpmc_boundq_1_alt<int32_t> *q);
- return p;
- }
+void read_consume(mpmc_boundq_1_alt<int32_t> *q, int32_t *bin);
- void write_publish()
- {
- m_written.fetch_add(1,mo_release);
- }
-
- //-----------------------------------------------------
+int32_t * write_prepare(mpmc_boundq_1_alt<int32_t> *q);
+void write_publish(mpmc_boundq_1_alt<int32_t> *q, int32_t *bin);
-};
+#endif