X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=cdschecker_modified_benchmarks%2Fmpmc-queue%2Fmpmc-queue.h;fp=cdschecker_modified_benchmarks%2Fmpmc-queue%2Fmpmc-queue.h;h=f86bec424c81a652316c04ebcd0640926b2d9f3d;hb=1491fc3840762ae57661f247ae363be83e0132a2;hp=0000000000000000000000000000000000000000;hpb=6e2a27bd9e36f65acd47ac764bb0373b76c386a3;p=c11concurrency-benchmarks.git diff --git a/cdschecker_modified_benchmarks/mpmc-queue/mpmc-queue.h b/cdschecker_modified_benchmarks/mpmc-queue/mpmc-queue.h new file mode 100644 index 0000000..f86bec4 --- /dev/null +++ b/cdschecker_modified_benchmarks/mpmc-queue/mpmc-queue.h @@ -0,0 +1,97 @@ +#include "cds_atomic.h" +#include + +template +struct mpmc_boundq_1_alt +{ +private: + + // elements should generally be cache-line-size padded : + t_element m_array[t_size]; + + // rdwr counts the reads & writes that have started + atomic m_rdwr; + // "read" and "written" count the number completed + atomic m_read; + atomic m_written; + +public: + + mpmc_boundq_1_alt() + { + m_rdwr = 0; + m_read = 0; + m_written = 0; + } + + //----------------------------------------------------- + + t_element * read_fetch() { + unsigned int rdwr = m_rdwr.load(mo_acquire); + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == rd ) // empty + return NULL; + + if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acquire) ) + break; + else + thrd_yield(); + } + + // (*1) + rl::backoff bo; + while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) { + thrd_yield(); + } + + t_element * p = & ( m_array[ rd % t_size ] ); + + return p; + } + + void read_consume() { + m_read.fetch_add(1,mo_release); + } + + //----------------------------------------------------- + + t_element * write_prepare() { + unsigned int rdwr = m_rdwr.load(mo_acquire); + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == ((rd + t_size)&0xFFFF) ) // full + return NULL; + + if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) ) + break; + else + thrd_yield(); + } + + // (*1) + rl::backoff bo; + while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) { + thrd_yield(); + } + + t_element * p = & ( m_array[ wr % t_size ] ); + + return p; + } + + void write_publish() + { + m_written.fetch_add(1,mo_relaxed); + } + + //----------------------------------------------------- + + +};