From 8ff287cadddda3206afcea38178860af6ba6dea6 Mon Sep 17 00:00:00 2001 From: Peizhao Ou Date: Wed, 19 Mar 2014 23:01:13 -0700 Subject: [PATCH] save --- benchmark/chase-lev-deque-bugfix/deque.c | 38 +-- benchmark/chase-lev-deque-bugfix/deque.h | 11 +- benchmark/chase-lev-deque-bugfix/main.c | 5 +- benchmark/mpmc-queue/mpmc-queue.cc | 6 +- benchmark/mpmc-queue/mpmc-queue.h | 293 ++--------------------- 5 files changed, 54 insertions(+), 299 deletions(-) diff --git a/benchmark/chase-lev-deque-bugfix/deque.c b/benchmark/chase-lev-deque-bugfix/deque.c index 2cfbb76..7c32656 100644 --- a/benchmark/chase-lev-deque-bugfix/deque.c +++ b/benchmark/chase-lev-deque-bugfix/deque.c @@ -35,7 +35,8 @@ int take(Deque *q) { int x; if (t <= b) { /* Non-empty queue. */ - x = atomic_load_explicit(&a->buffer[b % atomic_load_explicit(&a->size,memory_order_relaxed)], memory_order_relaxed); + int size = atomic_load_explicit(&a->size,memory_order_relaxed); + x = atomic_load_explicit(&a->buffer[b % size], memory_order_relaxed); /** @Begin @Commit_point_define_check: t != b @@ -48,17 +49,10 @@ int take(Deque *q) { 1, memory_order_seq_cst, memory_order_relaxed); /** @Begin - @Commit_point_define_check: succ == true + @Commit_point_define_check: succ @Label: Take_Point3 @End */ - - /** - @Begin - @Commit_point_define_check: succ == false - @Label: Take_Point4 - @End - */ if (!succ) { /* Failed race. */ x = EMPTY; @@ -102,15 +96,18 @@ void push(Deque *q, int x) { //Bug in paper...should have next line... a = (Array *) atomic_load_explicit(&q->array, memory_order_relaxed); } - atomic_store_explicit(&a->buffer[b % atomic_load_explicit(&a->size, memory_order_relaxed)], x, memory_order_relaxed); - atomic_thread_fence(memory_order_release); + int size = atomic_load_explicit(&a->size, memory_order_relaxed); + atomic_store_explicit(&a->buffer[b % size], x, memory_order_relaxed); /** @Begin @Commit_point_define_check: true @Label: Push_Point @End */ + atomic_thread_fence(memory_order_release); + atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); + } /** @@ -132,19 +129,28 @@ int steal(Deque *q) { if (t < b) { /* Non-empty queue. */ Array *a = (Array *) atomic_load_explicit(&q->array, memory_order_acquire); - x = atomic_load_explicit(&a->buffer[t % atomic_load_explicit(&a->size, memory_order_relaxed)], memory_order_relaxed); + int size = atomic_load_explicit(&a->size, memory_order_relaxed); + x = atomic_load_explicit(&a->buffer[t % size], memory_order_relaxed); + /** + @Begin + @Potential_commit_point_define: true + @Label: Potential_Steal + @End + */ + bool succ = atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed); /** @Begin - @Commit_point_define_check: succ == true - @Label: Steal_Point2 + @Commit_point_define_check: !succ + @Label: Steal_Point4 @End */ - + /** @Begin - @Commit_point_define_check: succ == false + @Commit_point_define: succ + @Potential_commit_point_label: Potential_Steal @Label: Steal_Point3 @End */ diff --git a/benchmark/chase-lev-deque-bugfix/deque.h b/benchmark/chase-lev-deque-bugfix/deque.h index fa76d8d..d8114d5 100644 --- a/benchmark/chase-lev-deque-bugfix/deque.h +++ b/benchmark/chase-lev-deque-bugfix/deque.h @@ -1,6 +1,13 @@ #ifndef DEQUE_H #define DEQUE_H +#include +#include +#include +#include +#include +#include "common.h" + typedef struct { atomic_size_t size; atomic_int buffer[]; @@ -59,7 +66,7 @@ void resize(Deque *q); /** @Begin @Interface: Take - @Commit_point_set: Take_Point1 | Take_Point2 | Take_Point3 | Take_Point4 + @Commit_point_set: Take_Point1 | Take_Point2 | Take_Point3 @ID: size(__deque) == 0 ? DEFAULT_CALL_ID : get_id(back(__deque)) @Action: int _Old_Val = EMPTY; @@ -88,7 +95,7 @@ void push(Deque *q, int x); /** @Begin @Interface: Steal - @Commit_point_set: Steal_Point1 | Steal_Point2 | Steal_Point3 + @Commit_point_set: Steal_Point1 | Steal_Point2 @ID: size(__deque) == 0 ? DEFAULT_CALL_ID : get_id(front(__deque)) @Action: int _Old_Val = EMPTY; diff --git a/benchmark/chase-lev-deque-bugfix/main.c b/benchmark/chase-lev-deque-bugfix/main.c index 7fb831f..782539e 100644 --- a/benchmark/chase-lev-deque-bugfix/main.c +++ b/benchmark/chase-lev-deque-bugfix/main.c @@ -14,9 +14,6 @@ int b; int c; static void task(void * param) { - // FIXME: Add the following take() will expose an Uninitialzied Load bug... - //a=take(q); - a=steal(q); } @@ -48,7 +45,7 @@ int user_main(int argc, char **argv) correct=false; if (!correct) printf("a=%d b=%d c=%d\n",a,b,c); - MODEL_ASSERT(correct); + //MODEL_ASSERT(correct); return 0; } diff --git a/benchmark/mpmc-queue/mpmc-queue.cc b/benchmark/mpmc-queue/mpmc-queue.cc index cfa82f2..0785711 100644 --- a/benchmark/mpmc-queue/mpmc-queue.cc +++ b/benchmark/mpmc-queue/mpmc-queue.cc @@ -13,7 +13,7 @@ void threadA(struct mpmc_boundq_1_alt *queue) int32_t *bin = queue->write_prepare(); store_32(bin, 1); printf("write_bin %d, val %d\n", bin, 1); - queue->write_publish(); + queue->write_publish(bin); } void threadB(struct mpmc_boundq_1_alt *queue) @@ -30,7 +30,7 @@ void threadC(struct mpmc_boundq_1_alt *queue) { int32_t *bin = queue->write_prepare(); store_32(bin, 1); - queue->write_publish(); + queue->write_publish(bin); while (bin = queue->read_fetch()) { printf("Read: %d\n", load_32(bin)); @@ -117,7 +117,7 @@ int user_main(int argc, char **argv) int32_t *bin = queue.write_prepare(); store_32(bin, 17); printf("init_write_bin %d, val %d\n", bin, 17); - queue.write_publish(); + queue.write_publish(bin); #endif printf("Start threads\n"); diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index cd9d430..a0797bc 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -49,209 +49,12 @@ public: LANG = CPP; CLASS = mpmc_boundq_1_alt; @Global_define: - @DeclareStruct: - typedef struct elem { - t_element *pos; - bool written; - thread_id_t tid; - thread_id_t fetch_tid; - call_id_t id; - } elem; @DeclareVar: - spec_list *list; id_tag_t *tag; @InitVar: - list = new_spec_list(); - tag = new_id_tag(); - @DefineFunc: - elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { - elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); - e->pos = pos; - e->written = false; - e->id = id; - e->tid = tid; - e->fetch_tid = -1; - } - @DefineFunc: - elem* get_elem_by_pos(t_element *pos) { - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - if (e->pos == pos) { - return e; - } - } - return NULL; - } - @DefineFunc: - void show_list() { - //model_print("Status:\n"); - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); - } - } - @DefineFunc: - elem* get_elem_by_tid(thread_id_t tid) { - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - if (e->tid== tid) { - return e; - } - } - return NULL; - } - @DefineFunc: - elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) { - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - if (e->fetch_tid== fetch_tid) { - return e; - } - } - return NULL; - } - @DefineFunc: - int elem_idx_by_pos(t_element *pos) { - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (pos == existing->pos) { - return i; - } - } - return -1; - } - @DefineFunc: - int elem_idx_by_tid(thread_id_t tid) { - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (tid == existing->tid) { - return i; - } - } - return -1; - } - @DefineFunc: - int elem_idx_by_fetch_tid(thread_id_t fetch_tid) { - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (fetch_tid == existing->fetch_tid) { - return i; - } - } - return -1; - } - @DefineFunc: - int elem_num(t_element *pos) { - int cnt = 0; - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (pos == existing->pos) { - cnt++; - } - } - return cnt; - } - @DefineFunc: - call_id_t prepare_id() { - call_id_t res = get_and_inc(tag); - //model_print("prepare_id: %d\n", res); - return res; - } - @DefineFunc: - bool prepare_check(t_element *pos, thread_id_t tid) { - show_list(); - elem *e = get_elem_by_pos(pos); - //model_print("prepare_check: e %d\n", e); - return NULL == e; - } - @DefineFunc: - void prepare(call_id_t id, t_element *pos, thread_id_t tid) { - //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid); - elem *e = new_elem(pos, id, tid); - push_back(list, e); - } - @DefineFunc: - call_id_t publish_id(thread_id_t tid) { - elem *e = get_elem_by_tid(tid); - //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id); - if (NULL == e) - return DEFAULT_CALL_ID; - return e->id; - } - @DefineFunc: - bool publish_check(thread_id_t tid) { - show_list(); - elem *e = get_elem_by_tid(tid); - //model_print("publish_check: tid %d\n", tid); - if (NULL == e) - return false; - if (elem_num(e->pos) > 1) - return false; - return !e->written; - } - @DefineFunc: - void publish(thread_id_t tid) { - //model_print("publish: tid %d\n", tid); - elem *e = get_elem_by_tid(tid); - e->written = true; - } - @DefineFunc: - call_id_t fetch_id(t_element *pos) { - elem *e = get_elem_by_pos(pos); - //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id); - if (NULL == e) - return DEFAULT_CALL_ID; - return e->id; - } - @DefineFunc: - bool fetch_check(t_element *pos) { - show_list(); - if (pos == NULL) return true; - elem *e = get_elem_by_pos(pos); - //model_print("fetch_check: pos %d, e %d\n", pos, e); - if (e == NULL) return false; - if (elem_num(e->pos) > 1) - return false; - return true; - } - @DefineFunc: - void fetch(t_element *pos, thread_id_t tid) { - if (pos == NULL) return; - elem *e = (elem*) get_elem_by_pos(pos); - //model_print("fetch: pos %d, tid %d\n", pos, tid); - // Remember the thread that fetches the position - e->fetch_tid = tid; - } - @DefineFunc: - bool consume_check(thread_id_t tid) { - show_list(); - elem *e = get_elem_by_fetch_tid(tid); - //model_print("consume_check: tid %d, e %d\n", tid, e); - if (NULL == e) - return false; - if (elem_num(e->pos) > 1) - return false; - return e->written; - } - @DefineFunc: - call_id_t consume_id(thread_id_t tid) { - elem *e = get_elem_by_fetch_tid(tid); - //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id); - if (NULL == e) - return DEFAULT_CALL_ID; - return e->id; - } - @DefineFunc: - void consume(thread_id_t tid) { - //model_print("consume: tid %d\n", tid); - int idx = elem_idx_by_fetch_tid(tid); - if (idx == -1) - return; - remove_at_index(list, idx); - } + tag = NULL; @Happens_before: - Prepare -> Fetch - Publish -> Consume + Publish -> Fetch @End */ @@ -260,20 +63,16 @@ public: /** @Begin @Interface: Fetch - @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point - @ID: fetch_id(__RET__) - @Check: - fetch_check(__RET__) - @Action: - fetch(__RET__, __TID__); + @Commit_point_set: Fetch_Succ_Point + @ID: (call_id_t) __RET__ @End */ t_element * read_fetch() { unsigned int rdwr = m_rdwr.load(mo_acquire); /** @Begin - @Potential_commit_point_define: true - @Label: Fetch_Potential_Point + @Commit_point_define_check: (rdwr>>16) & 0xFFFF == rdwr & 0xFFFF + @Label: Fetch_Succ_Point1 @End */ unsigned int rd,wr; @@ -282,23 +81,11 @@ public: wr = rdwr & 0xFFFF; if ( wr == rd ) { // empty - /** - @Begin - @Commit_point_define: true - @Potential_commit_point_label: Fetch_Potential_Point - @Label: Fetch_Empty_Point - @End - */ return false; } bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel); - /** - @Begin - @Commit_point_define_check: succ == true - @Label: Fetch_Succ_Point - @End - */ + if (succ) break; else @@ -307,7 +94,16 @@ public: // (*1) rl::backoff bo; - while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) { + while ( true ) { + unsigned int tmp = m_written.load(mo_acquire); + /** + @Begin + @Commit_point_define_check: (tmp & 0xFFFF) == wr + @Label: Fetch_Succ_Point2 + @End + */ + if ((tmp & 0xFFFF) == wr) + break; thrd_yield(); } @@ -316,72 +112,25 @@ public: return p; } - /** - @Begin - @Interface: Consume - @Commit_point_set: Consume_Point - @ID: consume_id(__TID__) - @Check: - consume_check(__TID__) - @Action: - consume(__TID__); - @End - */ void read_consume() { m_read.fetch_add(1,mo_release); - /** - @Begin - @Commit_point_define_check: true - @Label: Consume_Point - @End - */ } //----------------------------------------------------- - /** - @Begin - @Interface: Prepare - @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point - @ID: prepare_id() - @Check: - prepare_check(__RET__, __TID__) - @Action: - prepare(__ID__, __RET__, __TID__); - @End - */ t_element * write_prepare() { unsigned int rdwr = m_rdwr.load(mo_acquire); - /** - @Begin - @Potential_commit_point_define: true - @Label: Prepare_Potential_Point - @End - */ unsigned int rd,wr; for(;;) { rd = (rdwr>>16) & 0xFFFF; wr = rdwr & 0xFFFF; if ( wr == ((rd + t_size)&0xFFFF) ) { // full - /** - @Begin - @Commit_point_define: true - @Potential_commit_point_label: Prepare_Potential_Point - @Label: Prepare_Full_Point - @End - */ return NULL; } bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel); - /** - @Begin - @Commit_point_define_check: succ == true - @Label: Prepare_Succ_Point - @End - */ if (succ) break; else @@ -403,14 +152,10 @@ public: @Begin @Interface: Publish @Commit_point_set: Publish_Point - @ID: publish_id(__TID__) - @Check: - publish_check(__TID__) - @Action: - publish(__TID__); + @ID: (uint64_t) elem @End */ - void write_publish() + void write_publish(t_element *elem) { m_written.fetch_add(1,mo_release); /** -- 2.34.1