include ../benchmarks.mk
-TESTNAME = mpmc-queue
-TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr
-TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit
+TESTS = mpmc-queue testcase1 testcase2 testcase3
all: $(TESTS)
-mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2
-mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2
-mpmc-1r2w: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2
-mpmc-2r1w: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1
-mpmc-queue-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-mpmc-1r2w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-mpmc-2r1w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-
-$(TESTS): $(TESTNAME).cc $(TESTNAME).h
+$(TESTS): % : %.cc mpmc-queue.h
$(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
clean:
struct mpmc_boundq_1_alt
{
private:
+
+ unsigned int MASK;
// elements should generally be cache-line-size padded :
t_element m_array[t_size];
m_rdwr = 0;
m_read = 0;
m_written = 0;
+ // For this we want MASK = 1; MASK wrap around
+ MASK = 0x1; // 11
}
LANG = CPP;
CLASS = mpmc_boundq_1_alt;
@Global_define:
- //@DeclareStruct:
- //typedef struct elem {
- // t_element *pos;
- // bool written;
- // thread_id_t tid;
- // thread_id_t fetch_tid;
- // call_id_t id;
- // } elem;
- // @DeclareVar:
- // spec_list *list;
- //id_tag_t *tag;
- // @InitVar:
- // list = new_spec_list();
- //tag = new_id_tag();
- // @Cleanup:
-// if (list)
-// free_spec_list();
- @Happens_before:
- Publish -> Fetch
- Consume -> Prepare
+ @Happens_before:
+ Publish -> Fetch
+ Consume -> Prepare
+ @Commutativity: Prepare <-> Prepare: _Method1.__RET__ !=
+ _Method2.__RET__ || !_Method1.__RET__ || !_Method2.__RET__
+ @Commutativity: Prepare <-> Publish: _Method1.__RET__ != _Method2.bin ||
+ !_Method1.__RET__
+ @Commutativity: Prepare <-> Fetch: _Method1.__RET__ != _Method2.__RET__
+ || !_Method1.__RET__ || !_Method2.__RET__
+ @Commutativity: Prepare <-> Consume : _Method1.__RET__ != _Method2.bin || !_Method1.__RET__
+
+ @Commutativity: Publish <-> Publish: _Method1.bin != _Method2.bin
+ @Commutativity: Publish <-> Fetch: _Method1.bin != _Method2.__RET__ ||
+ !_Method2.__RET__
+ @Commutativity: Publish <-> Consume : _Method1.bin != _Method2.bin
+
+ @Commutativity: Fetch <-> Fetch: _Method1.__RET__ != _Method2.__RET__ ||
+ !_Method1.__RET__ || !_Method2.__RET__
+ @Commutativity: Fetch <-> Consume : _Method1.__RET__ != _Method2.bin || !_Method1.__RET__
+
+ @Commutativity: Consume <-> Consume : _Method1.bin != _Method2.bin
+
@End
*/
@Interface: Fetch
@Commit_point_set: Fetch_RW_Load_Empty | Fetch_RW_RMW | Fetch_W_Load
@ID: (call_id_t) __RET__
- //@Check:
- //__RET__ == NULL || has_elem(list, __RET__)
+ //@Action: model_print("Fetch: %d\n", __RET__);
@End
*/
t_element * read_fetch() {
- // Try this new weaker semantics
+ // Since we have a lool to CAS the value of m_rdwr, this can be relaxed
unsigned int rdwr = m_rdwr.load(mo_acquire);
//unsigned int rdwr = m_rdwr.load(mo_relaxed);
/**
*/
unsigned int rd,wr;
for(;;) {
- rd = (rdwr>>16) & 0xFFFF;
- wr = rdwr & 0xFFFF;
+ rd = (rdwr>>16) & MASK;
+ wr = rdwr & MASK;
if ( wr == rd ) { // empty
return false;
}
- bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
+ /**** Inadmissibility (testcase2.cc, MASK = 1, size = 1) ****/
+ bool succ =
+ m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
/**
@Begin
@Commit_point_define_check: succ
// (*1)
rl::backoff bo;
while (true) {
+ /**** Inadmissibility ****/
int written = m_written.load(mo_acquire);
/**
@Begin
@Label: Fetch_Potential_W_Load
@End
*/
- if ((written & 0xFFFF) != wr) {
+ if ((written & MASK) != wr) {
thrd_yield();
} else {
+ printf("Fetch: m_written=%d\n", written);
break;
}
}
@Interface: Consume
@Commit_point_set: Consume_R_RMW
@ID: (call_id_t) bin
- //@Check:
- // consume_check(__TID__)
- //@Action:
- //consume(__TID__);
+ //@Action: model_print("Consume: %d\n", bin);
@End
*/
void read_consume(t_element *bin) {
- /**** FIXME: miss ****/
+ /**** Inadmissibility ****/
m_read.fetch_add(1,mo_release);
/**
@Begin
@Interface: Prepare
@Commit_point_set: Prepare_RW_Load_Full | Prepare_RW_RMW | Prepare_R_Load
@ID: (call_id_t) __RET__
- //@Check:
- //prepare_check(__RET__, __TID__)
- //@Action:
- //push_back(list, __RET__);
+ //@Action: model_print("Prepare: %d\n", __RET__);
@End
*/
t_element * write_prepare() {
// Try weaker semantics
+ // Since we have a lool to CAS the value of m_rdwr, this can be relaxed
unsigned int rdwr = m_rdwr.load(mo_acquire);
//unsigned int rdwr = m_rdwr.load(mo_relaxed);
/**
*/
unsigned int rd,wr;
for(;;) {
- rd = (rdwr>>16) & 0xFFFF;
- wr = rdwr & 0xFFFF;
+ rd = (rdwr>>16) & MASK;
+ wr = rdwr & MASK;
+ //printf("write_prepare: rd=%d, wr=%d\n", rd, wr);
- if ( wr == ((rd + t_size)&0xFFFF) ) { // full
+ if ( wr == ((rd + t_size)&MASK) ) { // full
/**
@Begin
return NULL;
}
+ /**** Inadmissibility (testcase3.cc, MASK = 1, size = 1) ****/
bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
- ((wr+1)&0xFFFF),mo_acq_rel);
+ ((wr+1)&MASK),mo_acq_rel);
/**
@Begin
@Commit_point_define_check: succ
@Label: Prepare_RW_RMW
@End
*/
+ //printf("wr=%d\n", (wr+1)&MASK);
if (succ)
break;
else
// (*1)
rl::backoff bo;
while (true) {
+ /**** Inadmissibility ****/
int read = m_read.load(mo_acquire);
/**
@Begin
@Label: Prepare_Potential_R_Load
@End
*/
- if ((read & 0xFFFF) != rd)
+ if ((read & MASK) != rd)
thrd_yield();
else
break;
@Interface: Publish
@Commit_point_set: Publish_W_RMW
@ID: (call_id_t) bin
- //@Check:
- //publish_check(__TID__)
- //@Action:
- //publish(__TID__);
+ //@Action: model_print("Publish: %d\n", bin);
@End
*/
void write_publish(t_element *bin)
{
- /**** hb violation ****/
- m_written.fetch_add(1,mo_release);
+ /**** Inadmissibility ****/
+ int tmp = m_written.fetch_add(1,mo_release);
/**
@Begin
@Commit_point_define_check: true
@Label: Publish_W_RMW
@End
*/
+ printf("publish: m_written=%d\n", tmp + 1);
}
//-----------------------------------------------------
+++ /dev/null
-#include <stdatomic.h>
-#include <unrelacy.h>
-#include <common.h>
-
-/**
- @Begin
- @Class_begin
- @End
-*/
-template <typename t_element, size_t t_size>
-struct mpmc_boundq_1_alt
-{
-private:
-
- // elements should generally be cache-line-size padded :
- t_element m_array[t_size];
-
- // rdwr counts the reads & writes that have started
- atomic<unsigned int> m_rdwr;
- // "read" and "written" count the number completed
- atomic<unsigned int> m_read;
- atomic<unsigned int> m_written;
-
-public:
-
- mpmc_boundq_1_alt()
- {
- /**
- @Begin
- @Entry_point
- @End
- */
- m_rdwr = 0;
- m_read = 0;
- m_written = 0;
- }
-
-
- /**
- @Begin
- @Options:
- LANG = CPP;
- CLASS = mpmc_boundq_1_alt;
- @Global_define:
- @DeclareStruct:
- typedef struct elem {
- t_element *pos;
- bool written;
- thread_id_t tid;
- thread_id_t fetch_tid;
- call_id_t id;
- } elem;
- @DeclareVar:
- spec_list *list;
- id_tag_t *tag;
- @InitVar:
- list = new_spec_list();
- tag = new_id_tag();
- @DefineFunc:
- elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
- elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
- e->pos = pos;
- e->written = false;
- e->id = id;
- e->tid = tid;
- e->fetch_tid = -1;
- }
- @DefineFunc:
- elem* get_elem_by_pos(t_element *pos) {
- for (int i = 0; i < size(list); i++) {
- elem *e = (elem*) elem_at_index(list, i);
- if (e->pos == pos) {
- return e;
- }
- }
- return NULL;
- }
- @DefineFunc:
- void show_list() {
- //model_print("Status:\n");
- for (int i = 0; i < size(list); i++) {
- elem *e = (elem*) elem_at_index(list, i);
- //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id);
- }
- }
- @DefineFunc:
- elem* get_elem_by_tid(thread_id_t tid) {
- for (int i = 0; i < size(list); i++) {
- elem *e = (elem*) elem_at_index(list, i);
- if (e->tid== tid) {
- return e;
- }
- }
- return NULL;
- }
- @DefineFunc:
- elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
- for (int i = 0; i < size(list); i++) {
- elem *e = (elem*) elem_at_index(list, i);
- if (e->fetch_tid== fetch_tid) {
- return e;
- }
- }
- return NULL;
- }
- @DefineFunc:
- int elem_idx_by_pos(t_element *pos) {
- for (int i = 0; i < size(list); i++) {
- elem *existing = (elem*) elem_at_index(list, i);
- if (pos == existing->pos) {
- return i;
- }
- }
- return -1;
- }
- @DefineFunc:
- int elem_idx_by_tid(thread_id_t tid) {
- for (int i = 0; i < size(list); i++) {
- elem *existing = (elem*) elem_at_index(list, i);
- if (tid == existing->tid) {
- return i;
- }
- }
- return -1;
- }
- @DefineFunc:
- int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
- for (int i = 0; i < size(list); i++) {
- elem *existing = (elem*) elem_at_index(list, i);
- if (fetch_tid == existing->fetch_tid) {
- return i;
- }
- }
- return -1;
- }
- @DefineFunc:
- int elem_num(t_element *pos) {
- int cnt = 0;
- for (int i = 0; i < size(list); i++) {
- elem *existing = (elem*) elem_at_index(list, i);
- if (pos == existing->pos) {
- cnt++;
- }
- }
- return cnt;
- }
- @DefineFunc:
- call_id_t prepare_id() {
- call_id_t res = get_and_inc(tag);
- //model_print("prepare_id: %d\n", res);
- return res;
- }
- @DefineFunc:
- bool prepare_check(t_element *pos, thread_id_t tid) {
- show_list();
- elem *e = get_elem_by_pos(pos);
- //model_print("prepare_check: e %d\n", e);
- return NULL == e;
- }
- @DefineFunc:
- void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
- //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
- elem *e = new_elem(pos, id, tid);
- push_back(list, e);
- }
- @DefineFunc:
- call_id_t publish_id(thread_id_t tid) {
- elem *e = get_elem_by_tid(tid);
- //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
- if (NULL == e)
- return DEFAULT_CALL_ID;
- return e->id;
- }
- @DefineFunc:
- bool publish_check(thread_id_t tid) {
- show_list();
- elem *e = get_elem_by_tid(tid);
- //model_print("publish_check: tid %d\n", tid);
- if (NULL == e)
- return false;
- if (elem_num(e->pos) > 1)
- return false;
- return !e->written;
- }
- @DefineFunc:
- void publish(thread_id_t tid) {
- //model_print("publish: tid %d\n", tid);
- elem *e = get_elem_by_tid(tid);
- e->written = true;
- }
- @DefineFunc:
- call_id_t fetch_id(t_element *pos) {
- elem *e = get_elem_by_pos(pos);
- //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
- if (NULL == e)
- return DEFAULT_CALL_ID;
- return e->id;
- }
- @DefineFunc:
- bool fetch_check(t_element *pos) {
- show_list();
- if (pos == NULL) return true;
- elem *e = get_elem_by_pos(pos);
- //model_print("fetch_check: pos %d, e %d\n", pos, e);
- if (e == NULL) return false;
- if (elem_num(e->pos) > 1)
- return false;
- return true;
- }
- @DefineFunc:
- void fetch(t_element *pos, thread_id_t tid) {
- if (pos == NULL) return;
- elem *e = (elem*) get_elem_by_pos(pos);
- //model_print("fetch: pos %d, tid %d\n", pos, tid);
- // Remember the thread that fetches the position
- e->fetch_tid = tid;
- }
- @DefineFunc:
- bool consume_check(thread_id_t tid) {
- show_list();
- elem *e = get_elem_by_fetch_tid(tid);
- //model_print("consume_check: tid %d, e %d\n", tid, e);
- if (NULL == e)
- return false;
- if (elem_num(e->pos) > 1)
- return false;
- return e->written;
- }
- @DefineFunc:
- call_id_t consume_id(thread_id_t tid) {
- elem *e = get_elem_by_fetch_tid(tid);
- //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
- if (NULL == e)
- return DEFAULT_CALL_ID;
- return e->id;
- }
- @DefineFunc:
- void consume(thread_id_t tid) {
- //model_print("consume: tid %d\n", tid);
- int idx = elem_idx_by_fetch_tid(tid);
- if (idx == -1)
- return;
- remove_at_index(list, idx);
- }
- @Happens_before:
- Prepare -> Fetch
- Publish -> Consume
- @End
- */
-
- //-----------------------------------------------------
-
- /**
- @Begin
- @Interface: Fetch
- @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
- @ID: fetch_id(__RET__)
- @Check:
- fetch_check(__RET__)
- @Action:
- fetch(__RET__, __TID__);
- @End
- */
- t_element * read_fetch() {
- unsigned int rdwr = m_rdwr.load(mo_acquire);
- /**
- @Begin
- @Potential_commit_point_define: true
- @Label: Fetch_Potential_Point
- @End
- */
- unsigned int rd,wr;
- for(;;) {
- rd = (rdwr>>16) & 0xFFFF;
- wr = rdwr & 0xFFFF;
-
- if ( wr == rd ) { // empty
- /**
- @Begin
- @Commit_point_define: true
- @Potential_commit_point_label: Fetch_Potential_Point
- @Label: Fetch_Empty_Point
- @End
- */
- return false;
- }
-
- bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
- /**
- @Begin
- @Commit_point_define_check: succ == true
- @Label: Fetch_Succ_Point
- @End
- */
- if (succ)
- break;
- else
- thrd_yield();
- }
-
- // (*1)
- rl::backoff bo;
- while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
- thrd_yield();
- }
-
- t_element * p = & ( m_array[ rd % t_size ] );
-
- return p;
- }
-
- /**
- @Begin
- @Interface: Consume
- @Commit_point_set: Consume_Point
- @ID: consume_id(__TID__)
- @Check:
- consume_check(__TID__)
- @Action:
- consume(__TID__);
- @End
- */
- void read_consume() {
- m_read.fetch_add(1,mo_release);
- /**
- @Begin
- @Commit_point_define_check: true
- @Label: Consume_Point
- @End
- */
- }
-
- //-----------------------------------------------------
-
- /**
- @Begin
- @Interface: Prepare
- @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
- @ID: prepare_id()
- @Check:
- prepare_check(__RET__, __TID__)
- @Action:
- prepare(__ID__, __RET__, __TID__);
- @End
- */
- t_element * write_prepare() {
- unsigned int rdwr = m_rdwr.load(mo_acquire);
- /**
- @Begin
- @Potential_commit_point_define: true
- @Label: Prepare_Potential_Point
- @End
- */
- unsigned int rd,wr;
- for(;;) {
- rd = (rdwr>>16) & 0xFFFF;
- wr = rdwr & 0xFFFF;
-
- if ( wr == ((rd + t_size)&0xFFFF) ) { // full
- /**
- @Begin
- @Commit_point_define: true
- @Potential_commit_point_label: Prepare_Potential_Point
- @Label: Prepare_Full_Point
- @End
- */
- return NULL;
- }
-
- bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
- ((wr+1)&0xFFFF),mo_acq_rel);
- /**
- @Begin
- @Commit_point_define_check: succ == true
- @Label: Prepare_Succ_Point
- @End
- */
- if (succ)
- break;
- else
- thrd_yield();
- }
-
- // (*1)
- rl::backoff bo;
- while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
- thrd_yield();
- }
-
- t_element * p = & ( m_array[ wr % t_size ] );
-
- return p;
- }
-
- /**
- @Begin
- @Interface: Publish
- @Commit_point_set: Publish_Point
- @ID: publish_id(__TID__)
- @Check:
- publish_check(__TID__)
- @Action:
- publish(__TID__);
- @End
- */
- void write_publish()
- {
- m_written.fetch_add(1,mo_release);
- /**
- @Begin
- @Commit_point_define_check: true
- @Label: Publish_Point
- @End
- */
- }
-
- //-----------------------------------------------------
-
-
-};
-/**
- @Begin
- @Class_end
- @End
-*/
--- /dev/null
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, 2> *queue)
+{
+ int32_t *bin;
+ bin = queue->write_prepare();
+ if (bin) {
+ *bin = 1;
+ printf("write_bin %d, val %d\n", bin, 1);
+ queue->write_publish(bin);
+ } else {
+ printf("write failed\n");
+ }
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, 2> *queue)
+{
+ int32_t *bin;
+ bin = queue->read_fetch();
+ if (bin) {
+ printf("read_bin: %d, val %d\n", bin, *bin);
+ queue->read_consume(bin);
+ } else {
+ printf("Read failed\n");
+ }
+}
+
+int user_main(int argc, char **argv)
+{
+ struct mpmc_boundq_1_alt<int32_t, 2> queue;
+ thrd_t A, B;
+
+ printf("Adding initial element\n");
+ int32_t *bin;
+ bin = queue.write_prepare();
+ *bin = 17;
+ printf("init_write_bin %d, val %d\n", bin, 17);
+ queue.write_publish(bin);
+
+ bin = queue.write_prepare();
+ *bin = 27;
+ printf("init_write_bin %d, val %d\n", bin, 27);
+ queue.write_publish(bin);
+
+
+ printf("Start threads\n");
+
+ thrd_create(&A, (thrd_start_t)&threadA, &queue);
+ thrd_create(&B, (thrd_start_t)&threadB, &queue);
+
+ thrd_join(A);
+ thrd_join(B);
+ printf("Threads complete\n");
+
+ return 0;
+}
--- /dev/null
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, 1> *queue)
+{
+ int32_t *bin;
+ /*
+ bin = queue->write_prepare();
+ if (bin) {
+ *bin = 1;
+ printf("write_bin %d, val %d\n", bin, 1);
+ queue->write_publish(bin);
+ } else {
+ printf("write failed\n");
+ }
+ */
+
+ for (int i = 0; i < 1; i++) {
+ bin = queue->write_prepare();
+ if (bin) {
+ *bin = 1;
+ queue->write_publish(bin);
+ printf("write_bin %d, val %d\n", bin, 1);
+ } else {
+ printf("write failed\n");
+ }
+
+ bin = queue->read_fetch();
+ if (bin) {
+ printf("read_bin: %d, val %d\n", bin, *bin);
+ queue->read_consume(bin);
+ } else {
+ printf("read failed\n");
+ }
+ }
+
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, 1> *queue)
+{
+ int32_t *bin;
+ for (int i = 0; i < 1; i++) {
+ bin = queue->read_fetch();
+ if (bin) {
+ printf("read_bin: %d, val %d\n", bin, *bin);
+ queue->read_consume(bin);
+ } else {
+ printf("read failed\n");
+ }
+ }
+
+
+}
+
+int user_main(int argc, char **argv)
+{
+ struct mpmc_boundq_1_alt<int32_t, 1> queue;
+ thrd_t A, A1, B;
+
+ printf("Adding initial element\n");
+ int32_t *bin;
+ for (int i = 0; i < 1; i++) {
+ printf("#%d, \n", i);
+ bin = queue.write_prepare();
+ *bin = 17;
+ printf("init_write_bin %d, val %d\n", bin, 17);
+ queue.write_publish(bin);
+
+ bin = queue.read_fetch();
+ if (bin) {
+ printf("init_read: %d, val %d\n", bin, *bin);
+ queue.read_consume(bin);
+ }
+ }
+
+ for (int i = 0; i < 3; i++) {
+
+ }
+
+ printf("Start threads\n");
+
+ thrd_create(&A, (thrd_start_t)&threadA, &queue);
+ thrd_create(&A1, (thrd_start_t)&threadA, &queue);
+ thrd_create(&B, (thrd_start_t)&threadB, &queue);
+
+ thrd_join(A);
+ thrd_join(A1);
+ thrd_join(B);
+ printf("Threads complete\n");
+
+ return 0;
+}
--- /dev/null
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, 1> *queue)
+{
+ int32_t *bin;
+ /*
+ bin = queue->write_prepare();
+ if (bin) {
+ *bin = 1;
+ printf("write_bin %d, val %d\n", bin, 1);
+ queue->write_publish(bin);
+ } else {
+ printf("write failed\n");
+ }
+ */
+
+ for (int i = 0; i < 1; i++) {
+ bin = queue->write_prepare();
+ if (bin) {
+ *bin = 1;
+ queue->write_publish(bin);
+ printf("write_bin %d, val %d\n", bin, 1);
+ } else {
+ printf("write failed\n");
+ }
+
+ bin = queue->read_fetch();
+ if (bin) {
+ printf("read_bin: %d, val %d\n", bin, *bin);
+ queue->read_consume(bin);
+ } else {
+ printf("read failed\n");
+ }
+ }
+
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, 1> *queue)
+{
+ int32_t *bin;
+ for (int i = 0; i < 1; i++) {
+ bin = queue->read_fetch();
+ if (bin) {
+ printf("read_bin: %d, val %d\n", bin, *bin);
+ queue->read_consume(bin);
+ } else {
+ printf("read failed\n");
+ }
+ }
+
+
+}
+
+void threadC(struct mpmc_boundq_1_alt<int32_t, 1> *queue)
+{
+ int *bin;
+ bin = queue->write_prepare();
+ if (bin) {
+ *bin = 1;
+ queue->write_publish(bin);
+ printf("write_bin %d, val %d\n", bin, 1);
+ } else {
+ printf("write failed\n");
+ }
+}
+
+int user_main(int argc, char **argv)
+{
+ struct mpmc_boundq_1_alt<int32_t, 1> queue;
+ thrd_t A, A1, B, B1, C, C1;
+
+ printf("Adding initial element\n");
+ int32_t *bin;
+ for (int i = 0; i < 0; i++) {
+ printf("#%d, \n", i);
+ bin = queue.write_prepare();
+ *bin = 17;
+ printf("init_write_bin %d, val %d\n", bin, 17);
+ queue.write_publish(bin);
+/*
+ bin = queue.read_fetch();
+ if (bin) {
+ printf("init_read: %d, val %d\n", bin, *bin);
+ queue.read_consume(bin);
+ }
+ */
+ }
+
+ for (int i = 0; i < 3; i++) {
+
+ }
+
+ printf("Start threads\n");
+
+ thrd_create(&A, (thrd_start_t)&threadB, &queue);
+ thrd_create(&A1, (thrd_start_t)&threadC, &queue);
+ thrd_create(&B, (thrd_start_t)&threadB, &queue);
+ thrd_create(&B1, (thrd_start_t)&threadC, &queue);
+ thrd_create(&C, (thrd_start_t)&threadB, &queue);
+ thrd_create(&C1, (thrd_start_t)&threadC, &queue);
+
+ thrd_join(A);
+ thrd_join(A1);
+ thrd_join(B);
+ thrd_join(B1);
+ thrd_join(C);
+ thrd_join(C1);
+ printf("Threads complete\n");
+
+ return 0;
+}
include ../benchmarks.mk
-TESTNAME = mpmc-queue
-TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr
-TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit
+TESTS = mpmc-queue testcase1 testcase2 testcase3
all: $(TESTS)
-mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2
-mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2
-mpmc-1r2w: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2
-mpmc-2r1w: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1
-mpmc-queue-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-mpmc-1r2w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=1 -DCONFIG_MPMC_WRITERS=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-mpmc-2r1w-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=1 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 -DCONFIG_MPMC_NO_INITIAL_ELEMENT
-
-$(TESTS): $(TESTNAME).cc $(TESTNAME).h
+$(TESTS): % : %.cc mpmc-queue.h
$(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
clean:
File[] srcMPMCQueue = {
new File(homeDir + "/benchmark/mpmc-queue/mpmc-queue.h"),
+ new File(homeDir + "/benchmark/mpmc-queue/testcase1.cc"),
+ new File(homeDir + "/benchmark/mpmc-queue/testcase2.cc"),
+ new File(homeDir + "/benchmark/mpmc-queue/testcase3.cc"),
new File(homeDir + "/benchmark/mpmc-queue/mpmc-queue.cc") };
//
// File[][] sources = {srcLinuxRWLock1 , srcMSQueue, srcRCU,