From a3dc6b17a3b30e3295a201c3f7a185fd14d450f1 Mon Sep 17 00:00:00 2001 From: Peizhao Ou Date: Wed, 18 Nov 2015 04:50:08 -0800 Subject: [PATCH] full benchmard works --- benchmark/mpmc-queue/Makefile | 15 +- benchmark/mpmc-queue/mpmc-queue.h | 100 +++-- benchmark/mpmc-queue/mpmc-queue.h.backup | 425 ------------------ benchmark/mpmc-queue/testcase1.cc | 64 +++ benchmark/mpmc-queue/testcase2.cc | 99 ++++ benchmark/mpmc-queue/testcase3.cc | 119 +++++ output/mpmc-queue/Makefile | 15 +- .../codeGenerator/CodeGenerator.java | 3 + 8 files changed, 342 insertions(+), 498 deletions(-) delete mode 100644 benchmark/mpmc-queue/mpmc-queue.h.backup create mode 100644 benchmark/mpmc-queue/testcase1.cc create mode 100644 benchmark/mpmc-queue/testcase2.cc create mode 100644 benchmark/mpmc-queue/testcase3.cc diff --git a/benchmark/mpmc-queue/Makefile b/benchmark/mpmc-queue/Makefile index 8d9ad1e..7c20177 100644 --- a/benchmark/mpmc-queue/Makefile +++ b/benchmark/mpmc-queue/Makefile @@ -1,21 +1,10 @@ include ../benchmarks.mk -TESTNAME = mpmc-queue -TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr -TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit +TESTS = mpmc-queue testcase1 testcase2 testcase3 all: $(TESTS) -mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -mpmc-1r2w: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -mpmc-2r1w: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -mpmc-queue-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT -mpmc-1r2w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT -mpmc-2r1w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -DCONFIG_MPMC_NO_INITIAL_ELEMENT -mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT - -$(TESTS): $(TESTNAME).cc $(TESTNAME).h +$(TESTS): % : %.cc mpmc-queue.h $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) clean: diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index aa10129..dff6777 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -17,6 +17,8 @@ template struct mpmc_boundq_1_alt { private: + + unsigned int MASK; // elements should generally be cache-line-size padded : t_element m_array[t_size]; @@ -39,6 +41,8 @@ public: m_rdwr = 0; m_read = 0; m_written = 0; + // For this we want MASK = 1; MASK wrap around + MASK = 0x1; // 11 } @@ -48,26 +52,28 @@ public: LANG = CPP; CLASS = mpmc_boundq_1_alt; @Global_define: - //@DeclareStruct: - //typedef struct elem { - // t_element *pos; - // bool written; - // thread_id_t tid; - // thread_id_t fetch_tid; - // call_id_t id; - // } elem; - // @DeclareVar: - // spec_list *list; - //id_tag_t *tag; - // @InitVar: - // list = new_spec_list(); - //tag = new_id_tag(); - // @Cleanup: -// if (list) -// free_spec_list(); - @Happens_before: - Publish -> Fetch - Consume -> Prepare + @Happens_before: + Publish -> Fetch + Consume -> Prepare + @Commutativity: Prepare <-> Prepare: _Method1.__RET__ != + _Method2.__RET__ || !_Method1.__RET__ || !_Method2.__RET__ + @Commutativity: Prepare <-> Publish: _Method1.__RET__ != _Method2.bin || + !_Method1.__RET__ + @Commutativity: Prepare <-> Fetch: _Method1.__RET__ != _Method2.__RET__ + || !_Method1.__RET__ || !_Method2.__RET__ + @Commutativity: Prepare <-> Consume : _Method1.__RET__ != _Method2.bin || !_Method1.__RET__ + + @Commutativity: Publish <-> Publish: _Method1.bin != _Method2.bin + @Commutativity: Publish <-> Fetch: _Method1.bin != _Method2.__RET__ || + !_Method2.__RET__ + @Commutativity: Publish <-> Consume : _Method1.bin != _Method2.bin + + @Commutativity: Fetch <-> Fetch: _Method1.__RET__ != _Method2.__RET__ || + !_Method1.__RET__ || !_Method2.__RET__ + @Commutativity: Fetch <-> Consume : _Method1.__RET__ != _Method2.bin || !_Method1.__RET__ + + @Commutativity: Consume <-> Consume : _Method1.bin != _Method2.bin + @End */ @@ -78,12 +84,11 @@ public: @Interface: Fetch @Commit_point_set: Fetch_RW_Load_Empty | Fetch_RW_RMW | Fetch_W_Load @ID: (call_id_t) __RET__ - //@Check: - //__RET__ == NULL || has_elem(list, __RET__) + //@Action: model_print("Fetch: %d\n", __RET__); @End */ t_element * read_fetch() { - // Try this new weaker semantics + // Since we have a lool to CAS the value of m_rdwr, this can be relaxed unsigned int rdwr = m_rdwr.load(mo_acquire); //unsigned int rdwr = m_rdwr.load(mo_relaxed); /** @@ -94,8 +99,8 @@ public: */ unsigned int rd,wr; for(;;) { - rd = (rdwr>>16) & 0xFFFF; - wr = rdwr & 0xFFFF; + rd = (rdwr>>16) & MASK; + wr = rdwr & MASK; if ( wr == rd ) { // empty @@ -110,7 +115,9 @@ public: return false; } - bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel); + /**** Inadmissibility (testcase2.cc, MASK = 1, size = 1) ****/ + bool succ = + m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel); /** @Begin @Commit_point_define_check: succ @@ -126,6 +133,7 @@ public: // (*1) rl::backoff bo; while (true) { + /**** Inadmissibility ****/ int written = m_written.load(mo_acquire); /** @Begin @@ -133,9 +141,10 @@ public: @Label: Fetch_Potential_W_Load @End */ - if ((written & 0xFFFF) != wr) { + if ((written & MASK) != wr) { thrd_yield(); } else { + printf("Fetch: m_written=%d\n", written); break; } } @@ -158,14 +167,11 @@ public: @Interface: Consume @Commit_point_set: Consume_R_RMW @ID: (call_id_t) bin - //@Check: - // consume_check(__TID__) - //@Action: - //consume(__TID__); + //@Action: model_print("Consume: %d\n", bin); @End */ void read_consume(t_element *bin) { - /**** FIXME: miss ****/ + /**** Inadmissibility ****/ m_read.fetch_add(1,mo_release); /** @Begin @@ -182,14 +188,12 @@ public: @Interface: Prepare @Commit_point_set: Prepare_RW_Load_Full | Prepare_RW_RMW | Prepare_R_Load @ID: (call_id_t) __RET__ - //@Check: - //prepare_check(__RET__, __TID__) - //@Action: - //push_back(list, __RET__); + //@Action: model_print("Prepare: %d\n", __RET__); @End */ t_element * write_prepare() { // Try weaker semantics + // Since we have a lool to CAS the value of m_rdwr, this can be relaxed unsigned int rdwr = m_rdwr.load(mo_acquire); //unsigned int rdwr = m_rdwr.load(mo_relaxed); /** @@ -200,10 +204,11 @@ public: */ unsigned int rd,wr; for(;;) { - rd = (rdwr>>16) & 0xFFFF; - wr = rdwr & 0xFFFF; + rd = (rdwr>>16) & MASK; + wr = rdwr & MASK; + //printf("write_prepare: rd=%d, wr=%d\n", rd, wr); - if ( wr == ((rd + t_size)&0xFFFF) ) { // full + if ( wr == ((rd + t_size)&MASK) ) { // full /** @Begin @@ -215,14 +220,16 @@ public: return NULL; } + /**** Inadmissibility (testcase3.cc, MASK = 1, size = 1) ****/ bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | - ((wr+1)&0xFFFF),mo_acq_rel); + ((wr+1)&MASK),mo_acq_rel); /** @Begin @Commit_point_define_check: succ @Label: Prepare_RW_RMW @End */ + //printf("wr=%d\n", (wr+1)&MASK); if (succ) break; else @@ -232,6 +239,7 @@ public: // (*1) rl::backoff bo; while (true) { + /**** Inadmissibility ****/ int read = m_read.load(mo_acquire); /** @Begin @@ -239,7 +247,7 @@ public: @Label: Prepare_Potential_R_Load @End */ - if ((read & 0xFFFF) != rd) + if ((read & MASK) != rd) thrd_yield(); else break; @@ -263,22 +271,20 @@ public: @Interface: Publish @Commit_point_set: Publish_W_RMW @ID: (call_id_t) bin - //@Check: - //publish_check(__TID__) - //@Action: - //publish(__TID__); + //@Action: model_print("Publish: %d\n", bin); @End */ void write_publish(t_element *bin) { - /**** hb violation ****/ - m_written.fetch_add(1,mo_release); + /**** Inadmissibility ****/ + int tmp = m_written.fetch_add(1,mo_release); /** @Begin @Commit_point_define_check: true @Label: Publish_W_RMW @End */ + printf("publish: m_written=%d\n", tmp + 1); } //----------------------------------------------------- diff --git a/benchmark/mpmc-queue/mpmc-queue.h.backup b/benchmark/mpmc-queue/mpmc-queue.h.backup deleted file mode 100644 index ee3950d..0000000 --- a/benchmark/mpmc-queue/mpmc-queue.h.backup +++ /dev/null @@ -1,425 +0,0 @@ -#include -#include -#include - -/** - @Begin - @Class_begin - @End -*/ -template -struct mpmc_boundq_1_alt -{ -private: - - // elements should generally be cache-line-size padded : - t_element m_array[t_size]; - - // rdwr counts the reads & writes that have started - atomic m_rdwr; - // "read" and "written" count the number completed - atomic m_read; - atomic m_written; - -public: - - mpmc_boundq_1_alt() - { - /** - @Begin - @Entry_point - @End - */ - m_rdwr = 0; - m_read = 0; - m_written = 0; - } - - - /** - @Begin - @Options: - LANG = CPP; - CLASS = mpmc_boundq_1_alt; - @Global_define: - @DeclareStruct: - typedef struct elem { - t_element *pos; - bool written; - thread_id_t tid; - thread_id_t fetch_tid; - call_id_t id; - } elem; - @DeclareVar: - spec_list *list; - id_tag_t *tag; - @InitVar: - list = new_spec_list(); - tag = new_id_tag(); - @DefineFunc: - elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { - elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); - e->pos = pos; - e->written = false; - e->id = id; - e->tid = tid; - e->fetch_tid = -1; - } - @DefineFunc: - elem* get_elem_by_pos(t_element *pos) { - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - if (e->pos == pos) { - return e; - } - } - return NULL; - } - @DefineFunc: - void show_list() { - //model_print("Status:\n"); - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); - } - } - @DefineFunc: - elem* get_elem_by_tid(thread_id_t tid) { - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - if (e->tid== tid) { - return e; - } - } - return NULL; - } - @DefineFunc: - elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) { - for (int i = 0; i < size(list); i++) { - elem *e = (elem*) elem_at_index(list, i); - if (e->fetch_tid== fetch_tid) { - return e; - } - } - return NULL; - } - @DefineFunc: - int elem_idx_by_pos(t_element *pos) { - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (pos == existing->pos) { - return i; - } - } - return -1; - } - @DefineFunc: - int elem_idx_by_tid(thread_id_t tid) { - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (tid == existing->tid) { - return i; - } - } - return -1; - } - @DefineFunc: - int elem_idx_by_fetch_tid(thread_id_t fetch_tid) { - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (fetch_tid == existing->fetch_tid) { - return i; - } - } - return -1; - } - @DefineFunc: - int elem_num(t_element *pos) { - int cnt = 0; - for (int i = 0; i < size(list); i++) { - elem *existing = (elem*) elem_at_index(list, i); - if (pos == existing->pos) { - cnt++; - } - } - return cnt; - } - @DefineFunc: - call_id_t prepare_id() { - call_id_t res = get_and_inc(tag); - //model_print("prepare_id: %d\n", res); - return res; - } - @DefineFunc: - bool prepare_check(t_element *pos, thread_id_t tid) { - show_list(); - elem *e = get_elem_by_pos(pos); - //model_print("prepare_check: e %d\n", e); - return NULL == e; - } - @DefineFunc: - void prepare(call_id_t id, t_element *pos, thread_id_t tid) { - //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid); - elem *e = new_elem(pos, id, tid); - push_back(list, e); - } - @DefineFunc: - call_id_t publish_id(thread_id_t tid) { - elem *e = get_elem_by_tid(tid); - //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id); - if (NULL == e) - return DEFAULT_CALL_ID; - return e->id; - } - @DefineFunc: - bool publish_check(thread_id_t tid) { - show_list(); - elem *e = get_elem_by_tid(tid); - //model_print("publish_check: tid %d\n", tid); - if (NULL == e) - return false; - if (elem_num(e->pos) > 1) - return false; - return !e->written; - } - @DefineFunc: - void publish(thread_id_t tid) { - //model_print("publish: tid %d\n", tid); - elem *e = get_elem_by_tid(tid); - e->written = true; - } - @DefineFunc: - call_id_t fetch_id(t_element *pos) { - elem *e = get_elem_by_pos(pos); - //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id); - if (NULL == e) - return DEFAULT_CALL_ID; - return e->id; - } - @DefineFunc: - bool fetch_check(t_element *pos) { - show_list(); - if (pos == NULL) return true; - elem *e = get_elem_by_pos(pos); - //model_print("fetch_check: pos %d, e %d\n", pos, e); - if (e == NULL) return false; - if (elem_num(e->pos) > 1) - return false; - return true; - } - @DefineFunc: - void fetch(t_element *pos, thread_id_t tid) { - if (pos == NULL) return; - elem *e = (elem*) get_elem_by_pos(pos); - //model_print("fetch: pos %d, tid %d\n", pos, tid); - // Remember the thread that fetches the position - e->fetch_tid = tid; - } - @DefineFunc: - bool consume_check(thread_id_t tid) { - show_list(); - elem *e = get_elem_by_fetch_tid(tid); - //model_print("consume_check: tid %d, e %d\n", tid, e); - if (NULL == e) - return false; - if (elem_num(e->pos) > 1) - return false; - return e->written; - } - @DefineFunc: - call_id_t consume_id(thread_id_t tid) { - elem *e = get_elem_by_fetch_tid(tid); - //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id); - if (NULL == e) - return DEFAULT_CALL_ID; - return e->id; - } - @DefineFunc: - void consume(thread_id_t tid) { - //model_print("consume: tid %d\n", tid); - int idx = elem_idx_by_fetch_tid(tid); - if (idx == -1) - return; - remove_at_index(list, idx); - } - @Happens_before: - Prepare -> Fetch - Publish -> Consume - @End - */ - - //----------------------------------------------------- - - /** - @Begin - @Interface: Fetch - @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point - @ID: fetch_id(__RET__) - @Check: - fetch_check(__RET__) - @Action: - fetch(__RET__, __TID__); - @End - */ - t_element * read_fetch() { - unsigned int rdwr = m_rdwr.load(mo_acquire); - /** - @Begin - @Potential_commit_point_define: true - @Label: Fetch_Potential_Point - @End - */ - unsigned int rd,wr; - for(;;) { - rd = (rdwr>>16) & 0xFFFF; - wr = rdwr & 0xFFFF; - - if ( wr == rd ) { // empty - /** - @Begin - @Commit_point_define: true - @Potential_commit_point_label: Fetch_Potential_Point - @Label: Fetch_Empty_Point - @End - */ - return false; - } - - bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel); - /** - @Begin - @Commit_point_define_check: succ == true - @Label: Fetch_Succ_Point - @End - */ - if (succ) - break; - else - thrd_yield(); - } - - // (*1) - rl::backoff bo; - while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) { - thrd_yield(); - } - - t_element * p = & ( m_array[ rd % t_size ] ); - - return p; - } - - /** - @Begin - @Interface: Consume - @Commit_point_set: Consume_Point - @ID: consume_id(__TID__) - @Check: - consume_check(__TID__) - @Action: - consume(__TID__); - @End - */ - void read_consume() { - m_read.fetch_add(1,mo_release); - /** - @Begin - @Commit_point_define_check: true - @Label: Consume_Point - @End - */ - } - - //----------------------------------------------------- - - /** - @Begin - @Interface: Prepare - @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point - @ID: prepare_id() - @Check: - prepare_check(__RET__, __TID__) - @Action: - prepare(__ID__, __RET__, __TID__); - @End - */ - t_element * write_prepare() { - unsigned int rdwr = m_rdwr.load(mo_acquire); - /** - @Begin - @Potential_commit_point_define: true - @Label: Prepare_Potential_Point - @End - */ - unsigned int rd,wr; - for(;;) { - rd = (rdwr>>16) & 0xFFFF; - wr = rdwr & 0xFFFF; - - if ( wr == ((rd + t_size)&0xFFFF) ) { // full - /** - @Begin - @Commit_point_define: true - @Potential_commit_point_label: Prepare_Potential_Point - @Label: Prepare_Full_Point - @End - */ - return NULL; - } - - bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | - ((wr+1)&0xFFFF),mo_acq_rel); - /** - @Begin - @Commit_point_define_check: succ == true - @Label: Prepare_Succ_Point - @End - */ - if (succ) - break; - else - thrd_yield(); - } - - // (*1) - rl::backoff bo; - while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) { - thrd_yield(); - } - - t_element * p = & ( m_array[ wr % t_size ] ); - - return p; - } - - /** - @Begin - @Interface: Publish - @Commit_point_set: Publish_Point - @ID: publish_id(__TID__) - @Check: - publish_check(__TID__) - @Action: - publish(__TID__); - @End - */ - void write_publish() - { - m_written.fetch_add(1,mo_release); - /** - @Begin - @Commit_point_define_check: true - @Label: Publish_Point - @End - */ - } - - //----------------------------------------------------- - - -}; -/** - @Begin - @Class_end - @End -*/ diff --git a/benchmark/mpmc-queue/testcase1.cc b/benchmark/mpmc-queue/testcase1.cc new file mode 100644 index 0000000..594564f --- /dev/null +++ b/benchmark/mpmc-queue/testcase1.cc @@ -0,0 +1,64 @@ +#include +#include +#include +#include +#include + +#include + +#include "mpmc-queue.h" + +void threadA(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + bin = queue->write_prepare(); + if (bin) { + *bin = 1; + printf("write_bin %d, val %d\n", bin, 1); + queue->write_publish(bin); + } else { + printf("write failed\n"); + } +} + +void threadB(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + bin = queue->read_fetch(); + if (bin) { + printf("read_bin: %d, val %d\n", bin, *bin); + queue->read_consume(bin); + } else { + printf("Read failed\n"); + } +} + +int user_main(int argc, char **argv) +{ + struct mpmc_boundq_1_alt queue; + thrd_t A, B; + + printf("Adding initial element\n"); + int32_t *bin; + bin = queue.write_prepare(); + *bin = 17; + printf("init_write_bin %d, val %d\n", bin, 17); + queue.write_publish(bin); + + bin = queue.write_prepare(); + *bin = 27; + printf("init_write_bin %d, val %d\n", bin, 27); + queue.write_publish(bin); + + + printf("Start threads\n"); + + thrd_create(&A, (thrd_start_t)&threadA, &queue); + thrd_create(&B, (thrd_start_t)&threadB, &queue); + + thrd_join(A); + thrd_join(B); + printf("Threads complete\n"); + + return 0; +} diff --git a/benchmark/mpmc-queue/testcase2.cc b/benchmark/mpmc-queue/testcase2.cc new file mode 100644 index 0000000..a26f981 --- /dev/null +++ b/benchmark/mpmc-queue/testcase2.cc @@ -0,0 +1,99 @@ +#include +#include +#include +#include +#include + +#include + +#include "mpmc-queue.h" + +void threadA(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + /* + bin = queue->write_prepare(); + if (bin) { + *bin = 1; + printf("write_bin %d, val %d\n", bin, 1); + queue->write_publish(bin); + } else { + printf("write failed\n"); + } + */ + + for (int i = 0; i < 1; i++) { + bin = queue->write_prepare(); + if (bin) { + *bin = 1; + queue->write_publish(bin); + printf("write_bin %d, val %d\n", bin, 1); + } else { + printf("write failed\n"); + } + + bin = queue->read_fetch(); + if (bin) { + printf("read_bin: %d, val %d\n", bin, *bin); + queue->read_consume(bin); + } else { + printf("read failed\n"); + } + } + +} + +void threadB(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + for (int i = 0; i < 1; i++) { + bin = queue->read_fetch(); + if (bin) { + printf("read_bin: %d, val %d\n", bin, *bin); + queue->read_consume(bin); + } else { + printf("read failed\n"); + } + } + + +} + +int user_main(int argc, char **argv) +{ + struct mpmc_boundq_1_alt queue; + thrd_t A, A1, B; + + printf("Adding initial element\n"); + int32_t *bin; + for (int i = 0; i < 1; i++) { + printf("#%d, \n", i); + bin = queue.write_prepare(); + *bin = 17; + printf("init_write_bin %d, val %d\n", bin, 17); + queue.write_publish(bin); + + bin = queue.read_fetch(); + if (bin) { + printf("init_read: %d, val %d\n", bin, *bin); + queue.read_consume(bin); + } + } + + for (int i = 0; i < 3; i++) { + + } + + printf("Start threads\n"); + + thrd_create(&A, (thrd_start_t)&threadA, &queue); + thrd_create(&A1, (thrd_start_t)&threadA, &queue); + thrd_create(&B, (thrd_start_t)&threadB, &queue); + + thrd_join(A); + thrd_join(A1); + thrd_join(B); + printf("Threads complete\n"); + + return 0; +} diff --git a/benchmark/mpmc-queue/testcase3.cc b/benchmark/mpmc-queue/testcase3.cc new file mode 100644 index 0000000..b171c75 --- /dev/null +++ b/benchmark/mpmc-queue/testcase3.cc @@ -0,0 +1,119 @@ +#include +#include +#include +#include +#include + +#include + +#include "mpmc-queue.h" + +void threadA(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + /* + bin = queue->write_prepare(); + if (bin) { + *bin = 1; + printf("write_bin %d, val %d\n", bin, 1); + queue->write_publish(bin); + } else { + printf("write failed\n"); + } + */ + + for (int i = 0; i < 1; i++) { + bin = queue->write_prepare(); + if (bin) { + *bin = 1; + queue->write_publish(bin); + printf("write_bin %d, val %d\n", bin, 1); + } else { + printf("write failed\n"); + } + + bin = queue->read_fetch(); + if (bin) { + printf("read_bin: %d, val %d\n", bin, *bin); + queue->read_consume(bin); + } else { + printf("read failed\n"); + } + } + +} + +void threadB(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + for (int i = 0; i < 1; i++) { + bin = queue->read_fetch(); + if (bin) { + printf("read_bin: %d, val %d\n", bin, *bin); + queue->read_consume(bin); + } else { + printf("read failed\n"); + } + } + + +} + +void threadC(struct mpmc_boundq_1_alt *queue) +{ + int *bin; + bin = queue->write_prepare(); + if (bin) { + *bin = 1; + queue->write_publish(bin); + printf("write_bin %d, val %d\n", bin, 1); + } else { + printf("write failed\n"); + } +} + +int user_main(int argc, char **argv) +{ + struct mpmc_boundq_1_alt queue; + thrd_t A, A1, B, B1, C, C1; + + printf("Adding initial element\n"); + int32_t *bin; + for (int i = 0; i < 0; i++) { + printf("#%d, \n", i); + bin = queue.write_prepare(); + *bin = 17; + printf("init_write_bin %d, val %d\n", bin, 17); + queue.write_publish(bin); +/* + bin = queue.read_fetch(); + if (bin) { + printf("init_read: %d, val %d\n", bin, *bin); + queue.read_consume(bin); + } + */ + } + + for (int i = 0; i < 3; i++) { + + } + + printf("Start threads\n"); + + thrd_create(&A, (thrd_start_t)&threadB, &queue); + thrd_create(&A1, (thrd_start_t)&threadC, &queue); + thrd_create(&B, (thrd_start_t)&threadB, &queue); + thrd_create(&B1, (thrd_start_t)&threadC, &queue); + thrd_create(&C, (thrd_start_t)&threadB, &queue); + thrd_create(&C1, (thrd_start_t)&threadC, &queue); + + thrd_join(A); + thrd_join(A1); + thrd_join(B); + thrd_join(B1); + thrd_join(C); + thrd_join(C1); + printf("Threads complete\n"); + + return 0; +} diff --git a/output/mpmc-queue/Makefile b/output/mpmc-queue/Makefile index 8d9ad1e..7c20177 100644 --- a/output/mpmc-queue/Makefile +++ b/output/mpmc-queue/Makefile @@ -1,21 +1,10 @@ include ../benchmarks.mk -TESTNAME = mpmc-queue -TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr -TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit +TESTS = mpmc-queue testcase1 testcase2 testcase3 all: $(TESTS) -mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -mpmc-1r2w: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -mpmc-2r1w: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -mpmc-queue-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT -mpmc-1r2w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT -mpmc-2r1w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -DCONFIG_MPMC_NO_INITIAL_ELEMENT -mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT - -$(TESTS): $(TESTNAME).cc $(TESTNAME).h +$(TESTS): % : %.cc mpmc-queue.h $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) clean: diff --git a/src/edu/uci/eecs/specCompiler/codeGenerator/CodeGenerator.java b/src/edu/uci/eecs/specCompiler/codeGenerator/CodeGenerator.java index 492263f..7491e70 100644 --- a/src/edu/uci/eecs/specCompiler/codeGenerator/CodeGenerator.java +++ b/src/edu/uci/eecs/specCompiler/codeGenerator/CodeGenerator.java @@ -336,6 +336,9 @@ public class CodeGenerator { File[] srcMPMCQueue = { new File(homeDir + "/benchmark/mpmc-queue/mpmc-queue.h"), + new File(homeDir + "/benchmark/mpmc-queue/testcase1.cc"), + new File(homeDir + "/benchmark/mpmc-queue/testcase2.cc"), + new File(homeDir + "/benchmark/mpmc-queue/testcase3.cc"), new File(homeDir + "/benchmark/mpmc-queue/mpmc-queue.cc") }; // // File[][] sources = {srcLinuxRWLock1 , srcMSQueue, srcRCU, -- 2.34.1