From b597b4f413812a6dcba5ef6823a5fce07b2cbe5c Mon Sep 17 00:00:00 2001 From: Peizhao Ou Date: Wed, 11 Feb 2015 22:27:44 -0800 Subject: [PATCH] changes to mpmp --- mpmc-queue/Makefile | 9 +- mpmc-queue/mpmc-queue-wildcard.h | 128 ++++++++++++++++++++++++++++ mpmc-queue/testcase1.cc | 140 +++++++++++++++++++++++++++++++ mpmc-queue/testcase2.cc | 140 +++++++++++++++++++++++++++++++ 4 files changed, 415 insertions(+), 2 deletions(-) create mode 100644 mpmc-queue/mpmc-queue-wildcard.h create mode 100644 mpmc-queue/testcase1.cc create mode 100644 mpmc-queue/testcase2.cc diff --git a/mpmc-queue/Makefile b/mpmc-queue/Makefile index 8d9ad1e..25ff7a1 100644 --- a/mpmc-queue/Makefile +++ b/mpmc-queue/Makefile @@ -4,7 +4,9 @@ TESTNAME = mpmc-queue TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit -all: $(TESTS) +WILDCARD_TESTS = testcase1 testcase2 + +all: $(TESTS) $(WILDCARD_TESTS) mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2 mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2 @@ -18,5 +20,8 @@ mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -D $(TESTS): $(TESTNAME).cc $(TESTNAME).h $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) +$(WILDCARD_TESTS): % : %.cc $(TESTNAME)-wildcard.h + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) + clean: - rm -f $(TESTS) *.o + rm -f $(TESTS) $(WILDCARD_TESTS) *.o diff --git a/mpmc-queue/mpmc-queue-wildcard.h b/mpmc-queue/mpmc-queue-wildcard.h new file mode 100644 index 0000000..d8711dc --- /dev/null +++ b/mpmc-queue/mpmc-queue-wildcard.h @@ -0,0 +1,128 @@ +#include +#include +#include "wildcard.h" + +template +struct mpmc_boundq_1_alt +{ +private: + + // elements should generally be cache-line-size padded : + t_element m_array[t_size]; + + // rdwr counts the reads & writes that have started + atomic m_rdwr; + // "read" and "written" count the number completed + atomic m_read; + atomic m_written; + +public: + + mpmc_boundq_1_alt() + { + m_rdwr = 0; + m_read = 0; + m_written = 0; + } + + + /** + @Global_define: + Order_queue spec_queue; + */ + + //----------------------------------------------------- + + t_element * read_fetch() { + unsigned int rdwr = m_rdwr.load(wildcard(1)); // acquire + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == rd ) { // empty + return false; + } + + // acq_rel + if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),wildcard(2)) ) + break; + else + thrd_yield(); + } + + // (*1) + rl::backoff bo; + // acquire + while ( (m_written.load(wildcard(3)) & 0xFFFF) != wr ) { + thrd_yield(); + } + + t_element * p = & ( m_array[ rd % t_size ] ); + + /** + @Commit_point_Check: true + @Label: ANY + @Check: + spec_queue.peak() == p + */ + return p; + } + + void read_consume() { + m_read.fetch_add(1,wildcard(4)); // release + /** + @Commit_point_define: true + @Label: Read_Consume_Success + @Check: + spec_queue.size() > 0 + @Action: + spec_queue.remove(); + */ + } + + //----------------------------------------------------- + + t_element * write_prepare() { + unsigned int rdwr = m_rdwr.load(wildcard(5)); // acquire + unsigned int rd,wr; + for(;;) { + rd = (rdwr>>16) & 0xFFFF; + wr = rdwr & 0xFFFF; + + if ( wr == ((rd + t_size)&0xFFFF) ) // full + return NULL; + // acq_rel + if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),wildcard(6)) ) + break; + else + thrd_yield(); + } + + // (*1) + rl::backoff bo; + while ( (m_read.load(wildcard(7)) & 0xFFFF) != rd ) { // acquire + thrd_yield(); + } + + + t_element * p = & ( m_array[ wr % t_size ] ); + + /** + @Commit_point_check: ANY + @Action: spec_queue.add(p); + */ + return p; + } + + void write_publish() + { + m_written.fetch_add(1,wildcard(8)); // release + } + + //----------------------------------------------------- + + +}; + +typedef struct mpmc_boundq_1_alt queue_t; diff --git a/mpmc-queue/testcase1.cc b/mpmc-queue/testcase1.cc new file mode 100644 index 0000000..071c55d --- /dev/null +++ b/mpmc-queue/testcase1.cc @@ -0,0 +1,140 @@ +#include +#include +#include +#include +#include + +#include + +#include "mpmc-queue-wildcard.h" + +void threadA(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin = queue->write_prepare(); + store_32(bin, 1); + queue->write_publish(); +} + +void threadB(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + while ((bin = queue->read_fetch()) != NULL) { + printf("Read: %d\n", load_32(bin)); + queue->read_consume(); + } +} + +void threadC(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin = queue->write_prepare(); + store_32(bin, 1); + queue->write_publish(); + + while ((bin = queue->read_fetch()) != NULL) { + printf("Read: %d\n", load_32(bin)); + queue->read_consume(); + } +} + +#define MAXREADERS 3 +#define MAXWRITERS 3 +#define MAXRDWR 3 + +#ifdef CONFIG_MPMC_READERS +#define DEFAULT_READERS (CONFIG_MPMC_READERS) +#else +#define DEFAULT_READERS 2 +#endif + +#ifdef CONFIG_MPMC_WRITERS +#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS) +#else +#define DEFAULT_WRITERS 2 +#endif + +#ifdef CONFIG_MPMC_RDWR +#define DEFAULT_RDWR (CONFIG_MPMC_RDWR) +#else +#define DEFAULT_RDWR 0 +#endif + +int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR; + +void print_usage() +{ + printf("Error: use the following options\n" + " -r Choose number of reader threads\n" + " -w Choose number of writer threads\n"); + exit(EXIT_FAILURE); +} + +void process_params(int argc, char **argv) +{ + const char *shortopts = "hr:w:"; + int opt; + bool error = false; + + while (!error && (opt = getopt(argc, argv, shortopts)) != -1) { + switch (opt) { + case 'h': + print_usage(); + break; + case 'r': + readers = atoi(optarg); + break; + case 'w': + writers = atoi(optarg); + break; + default: /* '?' */ + error = true; + break; + } + } + + if (writers < 1 || writers > MAXWRITERS) + error = true; + if (readers < 1 || readers > MAXREADERS) + error = true; + + if (error) + print_usage(); +} + +int user_main(int argc, char **argv) +{ + struct mpmc_boundq_1_alt queue; + thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR]; + + /* Note: optarg() / optind is broken in model-checker - workaround is + * to just copy&paste this test a few times */ + //process_params(argc, argv); + printf("%d reader(s), %d writer(s)\n", readers, writers); + +#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT + printf("Adding initial element\n"); + int32_t *bin = queue.write_prepare(); + store_32(bin, 17); + queue.write_publish(); +#endif + + printf("Start threads\n"); + + for (int i = 0; i < writers; i++) + thrd_create(&A[i], (thrd_start_t)&threadA, &queue); + for (int i = 0; i < readers; i++) + thrd_create(&B[i], (thrd_start_t)&threadB, &queue); + + for (int i = 0; i < rdwr; i++) + thrd_create(&C[i], (thrd_start_t)&threadC, &queue); + + for (int i = 0; i < writers; i++) + thrd_join(A[i]); + for (int i = 0; i < readers; i++) + thrd_join(B[i]); + for (int i = 0; i < rdwr; i++) + thrd_join(C[i]); + + printf("Threads complete\n"); + + return 0; +} diff --git a/mpmc-queue/testcase2.cc b/mpmc-queue/testcase2.cc new file mode 100644 index 0000000..071c55d --- /dev/null +++ b/mpmc-queue/testcase2.cc @@ -0,0 +1,140 @@ +#include +#include +#include +#include +#include + +#include + +#include "mpmc-queue-wildcard.h" + +void threadA(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin = queue->write_prepare(); + store_32(bin, 1); + queue->write_publish(); +} + +void threadB(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin; + while ((bin = queue->read_fetch()) != NULL) { + printf("Read: %d\n", load_32(bin)); + queue->read_consume(); + } +} + +void threadC(struct mpmc_boundq_1_alt *queue) +{ + int32_t *bin = queue->write_prepare(); + store_32(bin, 1); + queue->write_publish(); + + while ((bin = queue->read_fetch()) != NULL) { + printf("Read: %d\n", load_32(bin)); + queue->read_consume(); + } +} + +#define MAXREADERS 3 +#define MAXWRITERS 3 +#define MAXRDWR 3 + +#ifdef CONFIG_MPMC_READERS +#define DEFAULT_READERS (CONFIG_MPMC_READERS) +#else +#define DEFAULT_READERS 2 +#endif + +#ifdef CONFIG_MPMC_WRITERS +#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS) +#else +#define DEFAULT_WRITERS 2 +#endif + +#ifdef CONFIG_MPMC_RDWR +#define DEFAULT_RDWR (CONFIG_MPMC_RDWR) +#else +#define DEFAULT_RDWR 0 +#endif + +int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR; + +void print_usage() +{ + printf("Error: use the following options\n" + " -r Choose number of reader threads\n" + " -w Choose number of writer threads\n"); + exit(EXIT_FAILURE); +} + +void process_params(int argc, char **argv) +{ + const char *shortopts = "hr:w:"; + int opt; + bool error = false; + + while (!error && (opt = getopt(argc, argv, shortopts)) != -1) { + switch (opt) { + case 'h': + print_usage(); + break; + case 'r': + readers = atoi(optarg); + break; + case 'w': + writers = atoi(optarg); + break; + default: /* '?' */ + error = true; + break; + } + } + + if (writers < 1 || writers > MAXWRITERS) + error = true; + if (readers < 1 || readers > MAXREADERS) + error = true; + + if (error) + print_usage(); +} + +int user_main(int argc, char **argv) +{ + struct mpmc_boundq_1_alt queue; + thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR]; + + /* Note: optarg() / optind is broken in model-checker - workaround is + * to just copy&paste this test a few times */ + //process_params(argc, argv); + printf("%d reader(s), %d writer(s)\n", readers, writers); + +#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT + printf("Adding initial element\n"); + int32_t *bin = queue.write_prepare(); + store_32(bin, 17); + queue.write_publish(); +#endif + + printf("Start threads\n"); + + for (int i = 0; i < writers; i++) + thrd_create(&A[i], (thrd_start_t)&threadA, &queue); + for (int i = 0; i < readers; i++) + thrd_create(&B[i], (thrd_start_t)&threadB, &queue); + + for (int i = 0; i < rdwr; i++) + thrd_create(&C[i], (thrd_start_t)&threadC, &queue); + + for (int i = 0; i < writers; i++) + thrd_join(A[i]); + for (int i = 0; i < readers; i++) + thrd_join(B[i]); + for (int i = 0; i < rdwr; i++) + thrd_join(C[i]); + + printf("Threads complete\n"); + + return 0; +} -- 2.34.1