From: Peizhao Ou Date: Fri, 21 Mar 2014 21:06:33 +0000 (-0700) Subject: injection result for ms-queue X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=2b614cd83e5ae43993bae6b8133d055f1091ef3d;p=cdsspec-compiler.git injection result for ms-queue --- diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index 0c94b6e..ee3950d 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -2,13 +2,6 @@ #include #include -#include -#include -#include -#include -#include -#include "common.h" - /** @Begin @Class_begin @@ -49,12 +42,209 @@ public: LANG = CPP; CLASS = mpmc_boundq_1_alt; @Global_define: + @DeclareStruct: + typedef struct elem { + t_element *pos; + bool written; + thread_id_t tid; + thread_id_t fetch_tid; + call_id_t id; + } elem; @DeclareVar: + spec_list *list; id_tag_t *tag; @InitVar: - tag = NULL; + list = new_spec_list(); + tag = new_id_tag(); + @DefineFunc: + elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { + elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); + e->pos = pos; + e->written = false; + e->id = id; + e->tid = tid; + e->fetch_tid = -1; + } + @DefineFunc: + elem* get_elem_by_pos(t_element *pos) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->pos == pos) { + return e; + } + } + return NULL; + } + @DefineFunc: + void show_list() { + //model_print("Status:\n"); + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); + } + } + @DefineFunc: + elem* get_elem_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->tid== tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->fetch_tid== fetch_tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + int elem_idx_by_pos(t_element *pos) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (pos == existing->pos) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_idx_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (tid == existing->tid) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_idx_by_fetch_tid(thread_id_t fetch_tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (fetch_tid == existing->fetch_tid) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_num(t_element *pos) { + int cnt = 0; + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (pos == existing->pos) { + cnt++; + } + } + return cnt; + } + @DefineFunc: + call_id_t prepare_id() { + call_id_t res = get_and_inc(tag); + //model_print("prepare_id: %d\n", res); + return res; + } + @DefineFunc: + bool prepare_check(t_element *pos, thread_id_t tid) { + show_list(); + elem *e = get_elem_by_pos(pos); + //model_print("prepare_check: e %d\n", e); + return NULL == e; + } + @DefineFunc: + void prepare(call_id_t id, t_element *pos, thread_id_t tid) { + //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid); + elem *e = new_elem(pos, id, tid); + push_back(list, e); + } + @DefineFunc: + call_id_t publish_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool publish_check(thread_id_t tid) { + show_list(); + elem *e = get_elem_by_tid(tid); + //model_print("publish_check: tid %d\n", tid); + if (NULL == e) + return false; + if (elem_num(e->pos) > 1) + return false; + return !e->written; + } + @DefineFunc: + void publish(thread_id_t tid) { + //model_print("publish: tid %d\n", tid); + elem *e = get_elem_by_tid(tid); + e->written = true; + } + @DefineFunc: + call_id_t fetch_id(t_element *pos) { + elem *e = get_elem_by_pos(pos); + //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool fetch_check(t_element *pos) { + show_list(); + if (pos == NULL) return true; + elem *e = get_elem_by_pos(pos); + //model_print("fetch_check: pos %d, e %d\n", pos, e); + if (e == NULL) return false; + if (elem_num(e->pos) > 1) + return false; + return true; + } + @DefineFunc: + void fetch(t_element *pos, thread_id_t tid) { + if (pos == NULL) return; + elem *e = (elem*) get_elem_by_pos(pos); + //model_print("fetch: pos %d, tid %d\n", pos, tid); + // Remember the thread that fetches the position + e->fetch_tid = tid; + } + @DefineFunc: + bool consume_check(thread_id_t tid) { + show_list(); + elem *e = get_elem_by_fetch_tid(tid); + //model_print("consume_check: tid %d, e %d\n", tid, e); + if (NULL == e) + return false; + if (elem_num(e->pos) > 1) + return false; + return e->written; + } + @DefineFunc: + call_id_t consume_id(thread_id_t tid) { + elem *e = get_elem_by_fetch_tid(tid); + //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + void consume(thread_id_t tid) { + //model_print("consume: tid %d\n", tid); + int idx = elem_idx_by_fetch_tid(tid); + if (idx == -1) + return; + remove_at_index(list, idx); + } @Happens_before: - Publish -> Fetch + Prepare -> Fetch + Publish -> Consume @End */ @@ -63,16 +253,20 @@ public: /** @Begin @Interface: Fetch - @Commit_point_set: Fetch_Succ_Point | Fetch_Fail_Point - @ID: (call_id_t) __RET__ + @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point + @ID: fetch_id(__RET__) + @Check: + fetch_check(__RET__) + @Action: + fetch(__RET__, __TID__); @End */ t_element * read_fetch() { unsigned int rdwr = m_rdwr.load(mo_acquire); /** @Begin - @Commit_point_define_check: (unsigned int) ((rdwr>>16) & 0xFFFF) == (unsigned int) (rdwr & 0xFFFF) - @Label: Fetch_Fail_Point + @Potential_commit_point_define: true + @Label: Fetch_Potential_Point @End */ unsigned int rd,wr; @@ -80,23 +274,24 @@ public: rd = (rdwr>>16) & 0xFFFF; wr = rdwr & 0xFFFF; - //model_print("cond: %d\n", (unsigned int) ((rdwr>>16) & 0xFFFF) == - //(unsigned int) (rdwr & 0xFFFF)); - //model_print("cond: %d\n", wr == rd); if ( wr == rd ) { // empty /** - //@Begin + @Begin @Commit_point_define: true @Potential_commit_point_label: Fetch_Potential_Point - @Label: Fetch_Fail_Point + @Label: Fetch_Empty_Point @End */ - model_print("in cond\n"); return false; } bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel); - + /** + @Begin + @Commit_point_define_check: succ == true + @Label: Fetch_Succ_Point + @End + */ if (succ) break; else @@ -105,17 +300,7 @@ public: // (*1) rl::backoff bo; - - while ( true ) { - if ((m_written.load(mo_acquire) & 0xFFFF) == wr) { - /** - @Begin - @Commit_point_define_check: true - @Label: Fetch_Succ_Point - @End - */ - break; - } + while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) { thrd_yield(); } @@ -124,25 +309,72 @@ public: return p; } + /** + @Begin + @Interface: Consume + @Commit_point_set: Consume_Point + @ID: consume_id(__TID__) + @Check: + consume_check(__TID__) + @Action: + consume(__TID__); + @End + */ void read_consume() { m_read.fetch_add(1,mo_release); + /** + @Begin + @Commit_point_define_check: true + @Label: Consume_Point + @End + */ } //----------------------------------------------------- + /** + @Begin + @Interface: Prepare + @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point + @ID: prepare_id() + @Check: + prepare_check(__RET__, __TID__) + @Action: + prepare(__ID__, __RET__, __TID__); + @End + */ t_element * write_prepare() { unsigned int rdwr = m_rdwr.load(mo_acquire); + /** + @Begin + @Potential_commit_point_define: true + @Label: Prepare_Potential_Point + @End + */ unsigned int rd,wr; for(;;) { rd = (rdwr>>16) & 0xFFFF; wr = rdwr & 0xFFFF; if ( wr == ((rd + t_size)&0xFFFF) ) { // full + /** + @Begin + @Commit_point_define: true + @Potential_commit_point_label: Prepare_Potential_Point + @Label: Prepare_Full_Point + @End + */ return NULL; } bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel); + /** + @Begin + @Commit_point_define_check: succ == true + @Label: Prepare_Succ_Point + @End + */ if (succ) break; else @@ -164,10 +396,14 @@ public: @Begin @Interface: Publish @Commit_point_set: Publish_Point - @ID: (uint64_t) elem + @ID: publish_id(__TID__) + @Check: + publish_check(__TID__) + @Action: + publish(__TID__); @End */ - void write_publish(t_element *elem) + void write_publish() { m_written.fetch_add(1,mo_release); /** diff --git a/benchmark/ms-queue/my_queue.c b/benchmark/ms-queue/my_queue.c index ae8d2b7..017670b 100644 --- a/benchmark/ms-queue/my_queue.c +++ b/benchmark/ms-queue/my_queue.c @@ -101,7 +101,9 @@ void enqueue(queue_t *q, unsigned int val) atomic_store_explicit(&q->nodes[node].next, tmp, relaxed); while (!success) { + /****FIXME: detected UL ****/ tail = atomic_load_explicit(&q->tail, acquire); + /****FIXME: miss ****/ next = atomic_load_explicit(&q->nodes[get_ptr(tail)].next, acquire); if (tail == atomic_load_explicit(&q->tail, relaxed)) { @@ -110,6 +112,7 @@ void enqueue(queue_t *q, unsigned int val) if (get_ptr(next) == 0) { // == NULL pointer value = MAKE_POINTER(node, get_count(next) + 1); + /****FIXME: first release UL, second release miss ****/ success = atomic_compare_exchange_strong_explicit(&q->nodes[get_ptr(tail)].next, &next, value, release, release); /** @@ -120,15 +123,18 @@ void enqueue(queue_t *q, unsigned int val) */ } if (!success) { + /****FIXME: detected UL ****/ unsigned int ptr = get_ptr(atomic_load_explicit(&q->nodes[get_ptr(tail)].next, acquire)); pointer value = MAKE_POINTER(ptr, get_count(tail) + 1); + /****FIXME: both miss ****/ atomic_compare_exchange_strong_explicit(&q->tail, &tail, value, release, release); thrd_yield(); } } } + /****FIXME: first UL, second miss ****/ atomic_compare_exchange_strong_explicit(&q->tail, &tail, MAKE_POINTER(node, get_count(tail) + 1), @@ -149,8 +155,10 @@ unsigned int dequeue(queue_t *q) pointer next; while (!success) { + /****FIXME: detected correctness error ****/ head = atomic_load_explicit(&q->head, acquire); tail = atomic_load_explicit(&q->tail, relaxed); + /****FIXME: miss ****/ next = atomic_load_explicit(&q->nodes[get_ptr(head)].next, acquire); if (atomic_load_explicit(&q->head, relaxed) == head) { if (get_ptr(head) == get_ptr(tail)) { @@ -167,6 +175,7 @@ unsigned int dequeue(queue_t *q) */ return 0; // NULL } + /****FIXME: both miss ****/ atomic_compare_exchange_strong_explicit(&q->tail, &tail, MAKE_POINTER(get_ptr(next), get_count(tail) + 1), @@ -175,6 +184,7 @@ unsigned int dequeue(queue_t *q) } else { //value = load_32(&q->nodes[get_ptr(next)].value); value = q->nodes[get_ptr(next)].value; + /****FIXME: first correctness error, second miss ****/ success = atomic_compare_exchange_strong_explicit(&q->head, &head, MAKE_POINTER(get_ptr(next), get_count(head) + 1),