4 template <typename t_element, size_t t_size>
5 struct mpmc_boundq_1_alt
9 // elements should generally be cache-line-size padded :
10 t_element m_array[t_size];
12 // rdwr counts the reads & writes that have started
13 atomic<unsigned int> m_rdwr;
14 // "read" and "written" count the number completed
15 atomic<unsigned int> m_read;
16 atomic<unsigned int> m_written;
32 CLASS = mpmc_boundq_1_alt;
44 list = new_spec_list();
47 elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
48 elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
55 elem* get_elem_by_pos(t_element *pos) {
56 for (int i = 0; i < size(list); i++) {
57 elem *e = (elem*) elem_at_index(list, i);
65 elem* get_elem_by_tid(thread_id_t tid) {
66 for (int i = 0; i < size(list); i++) {
67 elem *e = (elem*) elem_at_index(list, i);
75 int elem_idx_by_pos(t_element *pos) {
76 for (int i = 0; i < size(list); i++) {
77 elem *existing = (elem*) elem_at_index(list, i);
78 if (pos == existing->pos) {
85 int elem_idx_by_tid(thread_id_t tid) {
86 for (int i = 0; i < size(list); i++) {
87 elem *existing = (elem*) elem_at_index(list, i);
88 if (tid == existing->tid) {
95 call_id_t prepare_id() {
96 return get_and_inc(tag);
99 bool prepare_check(t_element *pos, thread_id_t tid) {
100 elem *e = get_elem_by_tid(tid);
104 void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
105 call_id_t id = get_and_inc(tag);
106 elem *e = new_elem(pos, id, tid);
110 call_id_t publish_id(thread_id_t tid) {
111 elem *e = get_elem_by_tid(tid);
113 return DEFAULT_CALL_ID;
117 bool publish_check(thread_id_t tid) {
118 elem *e = get_elem_by_tid(tid);
124 void publish(thread_id_t tid) {
125 elem *e = get_elem_by_tid(tid);
129 call_id_t fetch_id(t_element *pos) {
130 elem *e = get_elem_by_pos(pos);
132 return DEFAULT_CALL_ID;
136 bool fetch_check(t_element *pos) {
137 int idx = elem_idx_by_pos(pos);
144 void fetch(t_element *pos) {
145 int idx = elem_idx_by_pos(pos);
148 remove_at_index(list, idx);
151 bool consume_check(thread_id_t tid) {
152 elem *e = get_elem_by_tid(tid);
158 call_id_t consume_id(thread_id_t tid) {
159 elem *e = get_elem_by_tid(tid);
161 return DEFAULT_CALL_ID;
165 void consume(thread_id_t tid) {
166 int idx = elem_idx_by_tid(tid);
169 remove_at_index(list, idx);
177 //-----------------------------------------------------
182 @Commit_point_set: Fetch_Point
183 @ID: fetch_id(__RET__)
190 t_element * read_fetch() {
191 unsigned int rdwr = m_rdwr.load(mo_acquire);
194 rd = (rdwr>>16) & 0xFFFF;
197 if ( wr == rd ) { // empty
201 if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
209 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
213 t_element * p = & ( m_array[ rd % t_size ] );
221 @Commit_point_set: Consume_Point
222 @ID: consume_id(__TID__)
224 consume_check(__TID__)
229 void read_consume() {
230 m_read.fetch_add(1,mo_release);
233 //-----------------------------------------------------
238 @Commit_point_set: Prepare_Point
239 @ID: prepare_id(__RET__)
241 prepare_check(__RET__)
246 t_element * write_prepare() {
247 unsigned int rdwr = m_rdwr.load(mo_acquire);
250 rd = (rdwr>>16) & 0xFFFF;
253 if ( wr == ((rd + t_size)&0xFFFF) ) // full
256 if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
264 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
269 t_element * p = & ( m_array[ wr % t_size ] );
277 @Commit_point_set: Publish_Point
278 @ID: publish_id(__TID__)
280 publish_check(__TID__)
287 m_written.fetch_add(1,mo_release);
290 //-----------------------------------------------------