From: Peizhao Ou Date: Mon, 24 Mar 2014 14:59:27 +0000 (-0700) Subject: svae X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;ds=sidebyside;h=95decc5aa30758c46e77b644db67178c346752ec;p=cdsspec-compiler.git svae --- diff --git a/benchmark/cliffc-hashtable/cliffc_hashtable.h b/benchmark/cliffc-hashtable/cliffc_hashtable.h index 645531f..8aecd66 100644 --- a/benchmark/cliffc-hashtable/cliffc_hashtable.h +++ b/benchmark/cliffc-hashtable/cliffc_hashtable.h @@ -54,7 +54,7 @@ struct kvs_data { for (i = 2; i < real_size; i++) { _data[i].store(NULL, memory_order_relaxed); } - _data[1].store(hashes, memory_order_release); + _data[1].store(hashes, memory_order_relaxed); } ~kvs_data() { @@ -206,7 +206,7 @@ friend class CHM; _slots.store(0, memory_order_relaxed); _copy_idx.store(0, memory_order_relaxed); - _copy_done.store(0, memory_order_release); + _copy_done.store(0, memory_order_relaxed); } ~CHM() {} @@ -221,7 +221,7 @@ friend class CHM; } kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) { - //model_print("resizing...\n"); + model_print("resizing...\n"); kvs_data *newkvs = _newkvs.load(memory_order_acquire); if (newkvs != NULL) return newkvs; @@ -245,11 +245,13 @@ friend class CHM; // Last check cause the 'new' below is expensive newkvs = _newkvs.load(memory_order_acquire); + model_print("hey1\n"); if (newkvs != NULL) return newkvs; newkvs = new kvs_data(newsz); void *chm = (void*) new CHM(sz); - newkvs->_data[0].store(chm, memory_order_release); + model_print("hey2\n"); + newkvs->_data[0].store(chm, memory_order_relaxed); kvs_data *cur_newkvs; // Another check after the slow allocation @@ -277,10 +279,10 @@ friend class CHM; // Just follow Cliff Click's code here int panic_start = -1; int copyidx; - while (_copy_done.load(memory_order_acquire) < oldlen) { - copyidx = _copy_idx.load(memory_order_acquire); + while (_copy_done.load(memory_order_relaxed) < oldlen) { + copyidx = _copy_idx.load(memory_order_relaxed); if (panic_start == -1) { // No painc - copyidx = _copy_idx.load(memory_order_acquire); + copyidx = _copy_idx.load(memory_order_relaxed); while (copyidx < (oldlen << 1) && !_copy_idx.compare_exchange_strong(copyidx, copyidx + min_copy_work, memory_order_release, memory_order_relaxed)) @@ -307,7 +309,7 @@ friend class CHM; kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data *oldkvs, int idx, void *should_help) { - kvs_data *newkvs = _newkvs.load(memory_order_acquire); + kvs_data *newkvs = _newkvs.load(memory_order_relaxed); // We're only here cause the caller saw a Prime if (copy_slot(topmap, idx, oldkvs, newkvs)) copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied @@ -329,8 +331,8 @@ friend class CHM; // Promote the new table to the current table if (copyDone + workdone == oldlen && - topmap->_kvs.load(memory_order_acquire) == oldkvs) { - kvs_data *newkvs = _newkvs.load(memory_order_acquire); + topmap->_kvs.load(memory_order_relaxed) == oldkvs) { + kvs_data *newkvs = _newkvs.load(memory_order_relaxed); topmap->_kvs.compare_exchange_strong(oldkvs, newkvs, memory_order_release, memory_order_relaxed); } @@ -399,7 +401,7 @@ friend class CHM; kvs_data *kvs = new kvs_data(Default_Init_Size); void *chm = (void*) new CHM(0); kvs->_data[0].store(chm, memory_order_relaxed); - _kvs.store(kvs, memory_order_release); + _kvs.store(kvs, memory_order_relaxed); } cliffc_hashtable(int init_size) { @@ -415,7 +417,7 @@ friend class CHM; kvs_data *kvs = new kvs_data(init_size); void *chm = (void*) new CHM(0); kvs->_data[0].store(chm, memory_order_relaxed); - _kvs.store(kvs, memory_order_release); + _kvs.store(kvs, memory_order_relaxed); } /** @@ -424,15 +426,24 @@ friend class CHM; @Commit_point_set: Get_Success_Point1 | Get_Success_Point2 | Get_Success_Point3 @ID: getKeyTag(key) @Action: - void *_Old_Val = spec_table_get(map, key); + TypeV *_Old_Val = (TypeV*) spec_table_get(map, key); + //bool passed = equals_val(_Old_Val, __RET__); + bool passed = false; + if (!passed) { + int old = _Old_Val == NULL ? 0 : _Old_Val->_val; + int ret = __RET__ == NULL ? 0 : __RET__->_val; + model_print("Get: key: %d, _Old_Val: %d, RET: %d\n", + key->_val, old, ret); + } @Post_check: - __RET__ == NULL ? true : equals_val(_Old_Val, __RET__) + //__RET__ == NULL ? true : equals_val(_Old_Val, __RET__) + equals_val(_Old_Val, __RET__) @End */ TypeV* get(TypeK *key) { slot *key_slot = new slot(false, key); int fullhash = hash(key_slot); - kvs_data *kvs = _kvs.load(memory_order_acquire); + kvs_data *kvs = _kvs.load(memory_order_relaxed); slot *V = get_impl(this, kvs, key_slot, fullhash); if (V == NULL) return NULL; MODEL_ASSERT (!is_prime(V)); @@ -446,8 +457,16 @@ friend class CHM; @ID: getKeyTag(key) @Action: # Remember this old value at checking point - void *_Old_Val = spec_table_get(map, key); + TypeV *_Old_Val = (TypeV*) spec_table_get(map, key); spec_table_put(map, key, val); + //bool passed = equals_val(__RET__, _Old_Val); + bool passed = false; + if (!passed) { + int old = _Old_Val == NULL ? 0 : _Old_Val->_val; + int ret = __RET__ == NULL ? 0 : __RET__->_val; + model_print("Put: key: %d, val: %d, _Old_Val: %d, RET: %d\n", + key->_val, val->_val, old, ret); + } @Post_check: equals_val(__RET__, _Old_Val) @End @@ -568,7 +587,15 @@ friend class CHM; MODEL_ASSERT (idx >= 0 && idx < kvs->_size); // Corresponding to the volatile read in get_impl() and putIfMatch in // Cliff Click's Java implementation - slot *res = (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire); + slot *res = (slot*) kvs->_data[idx * 2 + 2].load(memory_order_relaxed); + /** + @Begin + # This is a complicated potential commit point since many many functions are + # calling val(). + @Potential_commit_point_define: true + @Label: Read_Key_Point + @End + */ return res; } @@ -648,7 +675,7 @@ friend class CHM; // inserted keys static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) { return kvs->_data[2 * idx + 2].compare_exchange_strong(expected, - desired, memory_order_release, memory_order_relaxed); + desired, memory_order_relaxed, memory_order_relaxed); } /** @@ -665,7 +692,7 @@ friend class CHM; # If it is a successful put instead of a copy or any other internal # operantions, expected != NULL @Begin - @Potential_commit_point_define: res == true + @Potential_commit_point_define: res @Label: Write_Val_Point @End */ @@ -682,16 +709,20 @@ friend class CHM; int reprobe_cnt = 0; while (true) { slot *K = key(kvs, idx); - slot *V = val(kvs, idx); /** @Begin @Commit_point_define: K == NULL - @Potential_commit_point_label: Read_Val_Point + @Potential_commit_point_label: Read_Key_Point @Label: Get_Success_Point_1 @End */ + slot *V = val(kvs, idx); + - if (K == NULL) return NULL; // A miss + if (K == NULL) { + model_print("Key is null\n"); + return NULL; // A miss + } if (keyeq(K, key_slot, hashes, idx, fullhash)) { // Key hit! Check if table-resize in progress @@ -713,8 +744,8 @@ friend class CHM; if (++reprobe_cnt >= REPROBE_LIMIT || key_slot == TOMBSTONE) { // Retry in new table - // Atomic read (acquire) can be here - kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire); + // Atomic read can be here + kvs_data *newkvs = chm->_newkvs.load(memory_order_relaxed); /** @Begin @Commit_point_define_check: newkvs == NULL @@ -738,7 +769,7 @@ friend class CHM; slot *key_slot = new slot(false, key); slot *value_slot = new slot(false, value); - kvs_data *kvs = _kvs.load(memory_order_acquire); + kvs_data *kvs = _kvs.load(memory_order_relaxed); slot *res = putIfMatch(this, kvs, key_slot, value_slot, old_val); // Only when copy_slot() call putIfMatch() will it return NULL MODEL_ASSERT (res != NULL); @@ -809,7 +840,7 @@ friend class CHM; // Here it tries to resize cause it doesn't want other threads to stop // its progress (eagerly try to resize soon) - newkvs = chm->_newkvs.load(memory_order_acquire); + newkvs = chm->_newkvs.load(memory_order_relaxed); if (newkvs == NULL && ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V))) { //model_print("resize2\n"); @@ -888,10 +919,10 @@ friend class CHM; // Help along an existing table-resize. This is a fast cut-out wrapper. kvs_data* help_copy(kvs_data *helper) { - kvs_data *topkvs = _kvs.load(memory_order_acquire); + kvs_data *topkvs = _kvs.load(memory_order_relaxed); CHM *topchm = get_chm(topkvs); // No cpy in progress - if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper; + if (topchm->_newkvs.load(memory_order_relaxed) == NULL) return helper; topchm->help_copy_impl(this, topkvs, false); return helper; } diff --git a/benchmark/cliffc-hashtable/main.cc b/benchmark/cliffc-hashtable/main.cc index 9e6365b..aaa4f64 100644 --- a/benchmark/cliffc-hashtable/main.cc +++ b/benchmark/cliffc-hashtable/main.cc @@ -51,8 +51,8 @@ class IntWrapper { cliffc_hashtable *table; IntWrapper *val1, *val2; -IntWrapper *k1, *k2, *k3, *k4, *k5; -IntWrapper *v1, *v2, *v3, *v4, *v5; +IntWrapper *k0, *k1, *k2, *k3, *k4, *k5; +IntWrapper *v0, *v1, *v2, *v3, *v4, *v5; void threadA(void *arg) { table->put(k1, v1); @@ -63,27 +63,33 @@ void threadA(void *arg) { if (val1 != NULL) model_print("val1: %d\n", val1->_val); else - model_print("val1: NULL\n"); - */ + model_print("val1: NULL\n");*/ //table->put(k3, v3); } void threadB(void *arg) { + table->put(k1, v1); + table->put(k2, v4); + table->put(k3, v3); } void threadMain(void *arg) { - //table->put(k3, v3); - val2 = table->get(k1); + val1 = table->get(k1); + val2 = table->get(k2); + if (val1 != NULL) + model_print("val1: %d\n", val1->_val); + else + model_print("val1: NULL\n"); if (val2 != NULL) - model_print("val2: %d\n", val1->_val); + model_print("val2: %d\n", val2->_val); else model_print("val2: NULL\n"); } int user_main(int argc, char *argv[]) { thrd_t t1, t2; - table = new cliffc_hashtable(2); + table = new cliffc_hashtable(16); k1 = new IntWrapper(3); k2 = new IntWrapper(5); k3 = new IntWrapper(11); @@ -96,6 +102,10 @@ int user_main(int argc, char *argv[]) { v4 = new IntWrapper(81); v5 = new IntWrapper(99); + v0 = new IntWrapper(2048); + table->put(k1, v0); + table->put(k2, v0); + model_print("hey\n"); thrd_create(&t1, threadA, NULL); thrd_create(&t2, threadB, NULL); threadMain(NULL); diff --git a/benchmark/mpmc-queue/mpmc-queue.cc b/benchmark/mpmc-queue/mpmc-queue.cc index ca063e9..cfa82f2 100644 --- a/benchmark/mpmc-queue/mpmc-queue.cc +++ b/benchmark/mpmc-queue/mpmc-queue.cc @@ -12,9 +12,8 @@ void threadA(struct mpmc_boundq_1_alt *queue) { int32_t *bin = queue->write_prepare(); store_32(bin, 1); - *bin = 1; printf("write_bin %d, val %d\n", bin, 1); - queue->write_publish(bin); + queue->write_publish(); } void threadB(struct mpmc_boundq_1_alt *queue) @@ -23,8 +22,7 @@ void threadB(struct mpmc_boundq_1_alt *queue) while (bin = queue->read_fetch()) { printf("Read: %d\n", load_32(bin)); printf("read_bin %d, val %d\n", bin, load_32(bin)); - printf("Read: %d\n", *bin); - queue->read_consume(bin); + queue->read_consume(); } } @@ -32,13 +30,11 @@ void threadC(struct mpmc_boundq_1_alt *queue) { int32_t *bin = queue->write_prepare(); store_32(bin, 1); - *bin = 1; - queue->write_publish(bin); + queue->write_publish(); while (bin = queue->read_fetch()) { printf("Read: %d\n", load_32(bin)); - printf("Read: %d\n", *bin); - queue->read_consume(bin); + queue->read_consume(); } } @@ -108,7 +104,7 @@ void process_params(int argc, char **argv) int user_main(int argc, char **argv) { - struct mpmc_boundq_1_alt queue; + struct mpmc_boundq_1_alt queue; thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR]; /* Note: optarg() / optind is broken in model-checker - workaround is @@ -120,9 +116,8 @@ int user_main(int argc, char **argv) printf("Adding initial element\n"); int32_t *bin = queue.write_prepare(); store_32(bin, 17); - *bin, 17; printf("init_write_bin %d, val %d\n", bin, 17); - queue.write_publish(bin); + queue.write_publish(); #endif printf("Start threads\n"); diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index e6d5485..ee3950d 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -2,12 +2,6 @@ #include #include -#include -#include -#include -#include -#include - /** @Begin @Class_begin @@ -58,13 +52,199 @@ public: } elem; @DeclareVar: spec_list *list; - //id_tag_t *tag; + id_tag_t *tag; @InitVar: list = new_spec_list(); - //tag = new_id_tag(); + tag = new_id_tag(); + @DefineFunc: + elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { + elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); + e->pos = pos; + e->written = false; + e->id = id; + e->tid = tid; + e->fetch_tid = -1; + } + @DefineFunc: + elem* get_elem_by_pos(t_element *pos) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->pos == pos) { + return e; + } + } + return NULL; + } + @DefineFunc: + void show_list() { + //model_print("Status:\n"); + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); + } + } + @DefineFunc: + elem* get_elem_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->tid== tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->fetch_tid== fetch_tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + int elem_idx_by_pos(t_element *pos) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (pos == existing->pos) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_idx_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (tid == existing->tid) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_idx_by_fetch_tid(thread_id_t fetch_tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (fetch_tid == existing->fetch_tid) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_num(t_element *pos) { + int cnt = 0; + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (pos == existing->pos) { + cnt++; + } + } + return cnt; + } + @DefineFunc: + call_id_t prepare_id() { + call_id_t res = get_and_inc(tag); + //model_print("prepare_id: %d\n", res); + return res; + } + @DefineFunc: + bool prepare_check(t_element *pos, thread_id_t tid) { + show_list(); + elem *e = get_elem_by_pos(pos); + //model_print("prepare_check: e %d\n", e); + return NULL == e; + } + @DefineFunc: + void prepare(call_id_t id, t_element *pos, thread_id_t tid) { + //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid); + elem *e = new_elem(pos, id, tid); + push_back(list, e); + } + @DefineFunc: + call_id_t publish_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool publish_check(thread_id_t tid) { + show_list(); + elem *e = get_elem_by_tid(tid); + //model_print("publish_check: tid %d\n", tid); + if (NULL == e) + return false; + if (elem_num(e->pos) > 1) + return false; + return !e->written; + } + @DefineFunc: + void publish(thread_id_t tid) { + //model_print("publish: tid %d\n", tid); + elem *e = get_elem_by_tid(tid); + e->written = true; + } + @DefineFunc: + call_id_t fetch_id(t_element *pos) { + elem *e = get_elem_by_pos(pos); + //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool fetch_check(t_element *pos) { + show_list(); + if (pos == NULL) return true; + elem *e = get_elem_by_pos(pos); + //model_print("fetch_check: pos %d, e %d\n", pos, e); + if (e == NULL) return false; + if (elem_num(e->pos) > 1) + return false; + return true; + } + @DefineFunc: + void fetch(t_element *pos, thread_id_t tid) { + if (pos == NULL) return; + elem *e = (elem*) get_elem_by_pos(pos); + //model_print("fetch: pos %d, tid %d\n", pos, tid); + // Remember the thread that fetches the position + e->fetch_tid = tid; + } + @DefineFunc: + bool consume_check(thread_id_t tid) { + show_list(); + elem *e = get_elem_by_fetch_tid(tid); + //model_print("consume_check: tid %d, e %d\n", tid, e); + if (NULL == e) + return false; + if (elem_num(e->pos) > 1) + return false; + return e->written; + } + @DefineFunc: + call_id_t consume_id(thread_id_t tid) { + elem *e = get_elem_by_fetch_tid(tid); + //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + void consume(thread_id_t tid) { + //model_print("consume: tid %d\n", tid); + int idx = elem_idx_by_fetch_tid(tid); + if (idx == -1) + return; + remove_at_index(list, idx); + } @Happens_before: - Publish -> Fetch - Consume -> Prepare + Prepare -> Fetch + Publish -> Consume @End */ @@ -74,15 +254,15 @@ public: @Begin @Interface: Fetch @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point - @ID: (call_id_t) __RET__ - //@Check: - //__RET__ == NULL || has_elem(list, __RET__) + @ID: fetch_id(__RET__) + @Check: + fetch_check(__RET__) + @Action: + fetch(__RET__, __TID__); @End */ t_element * read_fetch() { - // Try this new weaker semantics unsigned int rdwr = m_rdwr.load(mo_acquire); - //unsigned int rdwr = m_rdwr.load(mo_relaxed); /** @Begin @Potential_commit_point_define: true @@ -95,7 +275,6 @@ public: wr = rdwr & 0xFFFF; if ( wr == rd ) { // empty - /** @Begin @Commit_point_define: true @@ -103,7 +282,6 @@ public: @Label: Fetch_Empty_Point @End */ - return false; } @@ -135,15 +313,14 @@ public: @Begin @Interface: Consume @Commit_point_set: Consume_Point - @ID: (call_id_t) bin - //@Check: - // consume_check(__TID__) - //@Action: - //consume(__TID__); + @ID: consume_id(__TID__) + @Check: + consume_check(__TID__) + @Action: + consume(__TID__); @End */ - void read_consume(t_element *bin) { - /**** FIXME: miss ****/ + void read_consume() { m_read.fetch_add(1,mo_release); /** @Begin @@ -159,17 +336,15 @@ public: @Begin @Interface: Prepare @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point - @ID: (call_id_t) __RET__ - //@Check: - //prepare_check(__RET__, __TID__) - //@Action: - //push_back(list, __RET__); + @ID: prepare_id() + @Check: + prepare_check(__RET__, __TID__) + @Action: + prepare(__ID__, __RET__, __TID__); @End */ t_element * write_prepare() { - // Try weaker semantics unsigned int rdwr = m_rdwr.load(mo_acquire); - //unsigned int rdwr = m_rdwr.load(mo_relaxed); /** @Begin @Potential_commit_point_define: true @@ -182,7 +357,6 @@ public: wr = rdwr & 0xFFFF; if ( wr == ((rd + t_size)&0xFFFF) ) { // full - /** @Begin @Commit_point_define: true @@ -222,16 +396,15 @@ public: @Begin @Interface: Publish @Commit_point_set: Publish_Point - @ID: (call_id_t) bin - //@Check: - //publish_check(__TID__) - //@Action: - //publish(__TID__); + @ID: publish_id(__TID__) + @Check: + publish_check(__TID__) + @Action: + publish(__TID__); @End */ - void write_publish(t_element *bin) + void write_publish() { - /**** hb violation ****/ m_written.fetch_add(1,mo_release); /** @Begin diff --git a/benchmark/ms-queue/main.c b/benchmark/ms-queue/main.c index 12cf32e..83bd81a 100644 --- a/benchmark/ms-queue/main.c +++ b/benchmark/ms-queue/main.c @@ -30,14 +30,14 @@ static void main_task(void *param) if (!pid) { input[0] = 17; - enqueue(queue, input[0]); + //enqueue(queue, input[0]); enqueue(queue, input[0]); output[0] = dequeue(queue); } else { input[1] = 37; enqueue(queue, input[1]); //output[1] = dequeue(queue); - output[0] = dequeue(queue); + //output[0] = dequeue(queue); output[0] = dequeue(queue); } }