10 template <typename t_element, size_t t_size>
11 struct mpmc_boundq_1_alt
15 // elements should generally be cache-line-size padded :
16 t_element m_array[t_size];
18 // rdwr counts the reads & writes that have started
19 atomic<unsigned int> m_rdwr;
20 // "read" and "written" count the number completed
21 atomic<unsigned int> m_read;
22 atomic<unsigned int> m_written;
43 CLASS = mpmc_boundq_1_alt;
50 thread_id_t fetch_tid;
57 list = new_spec_list();
60 elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
61 elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
69 elem* get_elem_by_pos(t_element *pos) {
70 for (int i = 0; i < size(list); i++) {
71 elem *e = (elem*) elem_at_index(list, i);
80 //model_print("Status:\n");
81 for (int i = 0; i < size(list); i++) {
82 elem *e = (elem*) elem_at_index(list, i);
83 //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id);
87 elem* get_elem_by_tid(thread_id_t tid) {
88 for (int i = 0; i < size(list); i++) {
89 elem *e = (elem*) elem_at_index(list, i);
97 elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
98 for (int i = 0; i < size(list); i++) {
99 elem *e = (elem*) elem_at_index(list, i);
100 if (e->fetch_tid== fetch_tid) {
107 int elem_idx_by_pos(t_element *pos) {
108 for (int i = 0; i < size(list); i++) {
109 elem *existing = (elem*) elem_at_index(list, i);
110 if (pos == existing->pos) {
117 int elem_idx_by_tid(thread_id_t tid) {
118 for (int i = 0; i < size(list); i++) {
119 elem *existing = (elem*) elem_at_index(list, i);
120 if (tid == existing->tid) {
127 int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
128 for (int i = 0; i < size(list); i++) {
129 elem *existing = (elem*) elem_at_index(list, i);
130 if (fetch_tid == existing->fetch_tid) {
137 int elem_num(t_element *pos) {
139 for (int i = 0; i < size(list); i++) {
140 elem *existing = (elem*) elem_at_index(list, i);
141 if (pos == existing->pos) {
148 call_id_t prepare_id() {
149 call_id_t res = get_and_inc(tag);
150 //model_print("prepare_id: %d\n", res);
154 bool prepare_check(t_element *pos, thread_id_t tid) {
156 elem *e = get_elem_by_pos(pos);
157 //model_print("prepare_check: e %d\n", e);
161 void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
162 //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
163 elem *e = new_elem(pos, id, tid);
167 call_id_t publish_id(thread_id_t tid) {
168 elem *e = get_elem_by_tid(tid);
169 //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
171 return DEFAULT_CALL_ID;
175 bool publish_check(thread_id_t tid) {
177 elem *e = get_elem_by_tid(tid);
178 //model_print("publish_check: tid %d\n", tid);
181 if (elem_num(e->pos) > 1)
186 void publish(thread_id_t tid) {
187 //model_print("publish: tid %d\n", tid);
188 elem *e = get_elem_by_tid(tid);
192 call_id_t fetch_id(t_element *pos) {
193 elem *e = get_elem_by_pos(pos);
194 //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
196 return DEFAULT_CALL_ID;
200 bool fetch_check(t_element *pos) {
202 if (pos == NULL) return true;
203 elem *e = get_elem_by_pos(pos);
204 //model_print("fetch_check: pos %d, e %d\n", pos, e);
205 if (e == NULL) return false;
206 if (elem_num(e->pos) > 1)
211 void fetch(t_element *pos, thread_id_t tid) {
212 if (pos == NULL) return;
213 elem *e = (elem*) get_elem_by_pos(pos);
214 //model_print("fetch: pos %d, tid %d\n", pos, tid);
215 // Remember the thread that fetches the position
219 bool consume_check(thread_id_t tid) {
221 elem *e = get_elem_by_fetch_tid(tid);
222 //model_print("consume_check: tid %d, e %d\n", tid, e);
225 if (elem_num(e->pos) > 1)
230 call_id_t consume_id(thread_id_t tid) {
231 elem *e = get_elem_by_fetch_tid(tid);
232 //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
234 return DEFAULT_CALL_ID;
238 void consume(thread_id_t tid) {
239 //model_print("consume: tid %d\n", tid);
240 int idx = elem_idx_by_fetch_tid(tid);
243 remove_at_index(list, idx);
251 //-----------------------------------------------------
256 @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
257 @ID: fetch_id(__RET__)
261 fetch(__RET__, __TID__);
264 t_element * read_fetch() {
265 unsigned int rdwr = m_rdwr.load(mo_acquire);
268 @Potential_commit_point_define: true
269 @Label: Fetch_Potential_Point
274 rd = (rdwr>>16) & 0xFFFF;
277 if ( wr == rd ) { // empty
280 @Commit_point_define: true
281 @Potential_commit_point_label: Fetch_Potential_Point
282 @Label: Fetch_Empty_Point
288 bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
291 @Commit_point_define_check: succ == true
292 @Label: Fetch_Succ_Point
303 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
307 t_element * p = & ( m_array[ rd % t_size ] );
315 @Commit_point_set: Consume_Point
316 @ID: consume_id(__TID__)
318 consume_check(__TID__)
323 void read_consume() {
324 m_read.fetch_add(1,mo_release);
327 @Commit_point_define_check: true
328 @Label: Consume_Point
333 //-----------------------------------------------------
338 @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
341 prepare_check(__RET__, __TID__)
343 prepare(__ID__, __RET__, __TID__);
346 t_element * write_prepare() {
347 unsigned int rdwr = m_rdwr.load(mo_acquire);
350 @Potential_commit_point_define: true
351 @Label: Prepare_Potential_Point
356 rd = (rdwr>>16) & 0xFFFF;
359 if ( wr == ((rd + t_size)&0xFFFF) ) { // full
362 @Commit_point_define: true
363 @Potential_commit_point_label: Prepare_Potential_Point
364 @Label: Prepare_Full_Point
370 bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
371 ((wr+1)&0xFFFF),mo_acq_rel);
374 @Commit_point_define_check: succ == true
375 @Label: Prepare_Succ_Point
386 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
390 t_element * p = & ( m_array[ wr % t_size ] );
398 @Commit_point_set: Publish_Point
399 @ID: publish_id(__TID__)
401 publish_check(__TID__)
408 m_written.fetch_add(1,mo_release);
411 @Commit_point_define_check: true
412 @Label: Publish_Point
417 //-----------------------------------------------------