changes to mpmp
authorPeizhao Ou <peizhaoo@uci.edu>
Thu, 12 Feb 2015 06:27:44 +0000 (22:27 -0800)
committerPeizhao Ou <peizhaoo@uci.edu>
Thu, 12 Feb 2015 06:27:44 +0000 (22:27 -0800)
mpmc-queue/Makefile
mpmc-queue/mpmc-queue-wildcard.h [new file with mode: 0644]
mpmc-queue/testcase1.cc [new file with mode: 0644]
mpmc-queue/testcase2.cc [new file with mode: 0644]

index 8d9ad1e1014f4f2e83a85c74f6e2995339229209..25ff7a11203c3bb64838150b9d75e82b289b6d3a 100644 (file)
@@ -4,7 +4,9 @@ TESTNAME = mpmc-queue
 TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr
 TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit
 
-all: $(TESTS)
+WILDCARD_TESTS = testcase1 testcase2
+
+all: $(TESTS) $(WILDCARD_TESTS)
 
 mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2
 mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2
@@ -18,5 +20,8 @@ mpmc-rdwr-noinit: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -D
 $(TESTS): $(TESTNAME).cc $(TESTNAME).h
        $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
 
+$(WILDCARD_TESTS): % : %.cc $(TESTNAME)-wildcard.h
+       $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
+
 clean:
-       rm -f $(TESTS) *.o
+       rm -f $(TESTS) $(WILDCARD_TESTS) *.o
diff --git a/mpmc-queue/mpmc-queue-wildcard.h b/mpmc-queue/mpmc-queue-wildcard.h
new file mode 100644 (file)
index 0000000..d8711dc
--- /dev/null
@@ -0,0 +1,128 @@
+#include <stdatomic.h>
+#include <unrelacy.h>
+#include "wildcard.h"
+
+template <typename t_element, size_t t_size>
+struct mpmc_boundq_1_alt
+{
+private:
+
+       // elements should generally be cache-line-size padded :
+       t_element               m_array[t_size];
+
+       // rdwr counts the reads & writes that have started
+       atomic<unsigned int>    m_rdwr;
+       // "read" and "written" count the number completed
+       atomic<unsigned int>    m_read;
+       atomic<unsigned int>    m_written;
+
+public:
+
+       mpmc_boundq_1_alt()
+       {
+               m_rdwr = 0;
+               m_read = 0;
+               m_written = 0;
+       }
+       
+
+       /**
+       @Global_define:
+       Order_queue<unsigned int*> spec_queue;
+       */
+
+       //-----------------------------------------------------
+
+       t_element * read_fetch() {
+               unsigned int rdwr = m_rdwr.load(wildcard(1)); // acquire
+               unsigned int rd,wr;
+               for(;;) {
+                       rd = (rdwr>>16) & 0xFFFF;
+                       wr = rdwr & 0xFFFF;
+
+                       if ( wr == rd ) { // empty
+                               return false;
+                       }
+
+                       // acq_rel
+                       if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),wildcard(2)) )
+                               break;
+                       else
+                               thrd_yield();
+               }
+
+               // (*1)
+               rl::backoff bo;
+               // acquire
+               while ( (m_written.load(wildcard(3)) & 0xFFFF) != wr ) {
+                       thrd_yield();
+               }
+
+               t_element * p = & ( m_array[ rd % t_size ] );
+               
+               /**
+                       @Commit_point_Check: true
+                       @Label: ANY
+                       @Check:
+                               spec_queue.peak() == p
+               */
+               return p;
+       }
+
+       void read_consume() {
+               m_read.fetch_add(1,wildcard(4)); // release
+               /**
+                       @Commit_point_define: true
+                       @Label: Read_Consume_Success
+                       @Check:
+                               spec_queue.size() > 0
+                       @Action:
+                               spec_queue.remove();
+               */
+       }
+
+       //-----------------------------------------------------
+
+       t_element * write_prepare() {
+               unsigned int rdwr = m_rdwr.load(wildcard(5)); // acquire
+               unsigned int rd,wr;
+               for(;;) {
+                       rd = (rdwr>>16) & 0xFFFF;
+                       wr = rdwr & 0xFFFF;
+
+                       if ( wr == ((rd + t_size)&0xFFFF) ) // full
+                               return NULL;
+                       // acq_rel
+                       if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),wildcard(6)) )
+                               break;
+                       else
+                               thrd_yield();
+               }
+
+               // (*1)
+               rl::backoff bo;
+               while ( (m_read.load(wildcard(7)) & 0xFFFF) != rd ) { // acquire
+                       thrd_yield();
+               }
+
+
+               t_element * p = & ( m_array[ wr % t_size ] );
+
+               /**
+                       @Commit_point_check: ANY
+                       @Action: spec_queue.add(p);
+               */
+               return p;
+       }
+
+       void write_publish()
+       {
+               m_written.fetch_add(1,wildcard(8)); // release
+       }
+
+       //-----------------------------------------------------
+
+
+};
+
+typedef struct mpmc_boundq_1_alt<int, 4> queue_t;
diff --git a/mpmc-queue/testcase1.cc b/mpmc-queue/testcase1.cc
new file mode 100644 (file)
index 0000000..071c55d
--- /dev/null
@@ -0,0 +1,140 @@
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue-wildcard.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+       int32_t *bin = queue->write_prepare();
+       store_32(bin, 1);
+       queue->write_publish();
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+       int32_t *bin;
+       while ((bin = queue->read_fetch()) != NULL) {
+               printf("Read: %d\n", load_32(bin));
+               queue->read_consume();
+       }
+}
+
+void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+       int32_t *bin = queue->write_prepare();
+       store_32(bin, 1);
+       queue->write_publish();
+
+       while ((bin = queue->read_fetch()) != NULL) {
+               printf("Read: %d\n", load_32(bin));
+               queue->read_consume();
+       }
+}
+
+#define MAXREADERS 3
+#define MAXWRITERS 3
+#define MAXRDWR 3
+
+#ifdef CONFIG_MPMC_READERS
+#define DEFAULT_READERS (CONFIG_MPMC_READERS)
+#else
+#define DEFAULT_READERS 2
+#endif
+
+#ifdef CONFIG_MPMC_WRITERS
+#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS)
+#else
+#define DEFAULT_WRITERS 2
+#endif
+
+#ifdef CONFIG_MPMC_RDWR
+#define DEFAULT_RDWR (CONFIG_MPMC_RDWR)
+#else
+#define DEFAULT_RDWR 0
+#endif
+
+int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR;
+
+void print_usage()
+{
+       printf("Error: use the following options\n"
+               " -r <num>              Choose number of reader threads\n"
+               " -w <num>              Choose number of writer threads\n");
+       exit(EXIT_FAILURE);
+}
+
+void process_params(int argc, char **argv)
+{
+       const char *shortopts = "hr:w:";
+       int opt;
+       bool error = false;
+
+       while (!error && (opt = getopt(argc, argv, shortopts)) != -1) {
+               switch (opt) {
+               case 'h':
+                       print_usage();
+                       break;
+               case 'r':
+                       readers = atoi(optarg);
+                       break;
+               case 'w':
+                       writers = atoi(optarg);
+                       break;
+               default: /* '?' */
+                       error = true;
+                       break;
+               }
+       }
+
+       if (writers < 1 || writers > MAXWRITERS)
+               error = true;
+       if (readers < 1 || readers > MAXREADERS)
+               error = true;
+
+       if (error)
+               print_usage();
+}
+
+int user_main(int argc, char **argv)
+{
+       struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> queue;
+       thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR];
+
+       /* Note: optarg() / optind is broken in model-checker - workaround is
+        * to just copy&paste this test a few times */
+       //process_params(argc, argv);
+       printf("%d reader(s), %d writer(s)\n", readers, writers);
+
+#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
+       printf("Adding initial element\n");
+       int32_t *bin = queue.write_prepare();
+       store_32(bin, 17);
+       queue.write_publish();
+#endif
+
+       printf("Start threads\n");
+
+       for (int i = 0; i < writers; i++)
+               thrd_create(&A[i], (thrd_start_t)&threadA, &queue);
+       for (int i = 0; i < readers; i++)
+               thrd_create(&B[i], (thrd_start_t)&threadB, &queue);
+
+       for (int i = 0; i < rdwr; i++)
+               thrd_create(&C[i], (thrd_start_t)&threadC, &queue);
+
+       for (int i = 0; i < writers; i++)
+               thrd_join(A[i]);
+       for (int i = 0; i < readers; i++)
+               thrd_join(B[i]);
+       for (int i = 0; i < rdwr; i++)
+               thrd_join(C[i]);
+
+       printf("Threads complete\n");
+
+       return 0;
+}
diff --git a/mpmc-queue/testcase2.cc b/mpmc-queue/testcase2.cc
new file mode 100644 (file)
index 0000000..071c55d
--- /dev/null
@@ -0,0 +1,140 @@
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue-wildcard.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+       int32_t *bin = queue->write_prepare();
+       store_32(bin, 1);
+       queue->write_publish();
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+       int32_t *bin;
+       while ((bin = queue->read_fetch()) != NULL) {
+               printf("Read: %d\n", load_32(bin));
+               queue->read_consume();
+       }
+}
+
+void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+       int32_t *bin = queue->write_prepare();
+       store_32(bin, 1);
+       queue->write_publish();
+
+       while ((bin = queue->read_fetch()) != NULL) {
+               printf("Read: %d\n", load_32(bin));
+               queue->read_consume();
+       }
+}
+
+#define MAXREADERS 3
+#define MAXWRITERS 3
+#define MAXRDWR 3
+
+#ifdef CONFIG_MPMC_READERS
+#define DEFAULT_READERS (CONFIG_MPMC_READERS)
+#else
+#define DEFAULT_READERS 2
+#endif
+
+#ifdef CONFIG_MPMC_WRITERS
+#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS)
+#else
+#define DEFAULT_WRITERS 2
+#endif
+
+#ifdef CONFIG_MPMC_RDWR
+#define DEFAULT_RDWR (CONFIG_MPMC_RDWR)
+#else
+#define DEFAULT_RDWR 0
+#endif
+
+int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR;
+
+void print_usage()
+{
+       printf("Error: use the following options\n"
+               " -r <num>              Choose number of reader threads\n"
+               " -w <num>              Choose number of writer threads\n");
+       exit(EXIT_FAILURE);
+}
+
+void process_params(int argc, char **argv)
+{
+       const char *shortopts = "hr:w:";
+       int opt;
+       bool error = false;
+
+       while (!error && (opt = getopt(argc, argv, shortopts)) != -1) {
+               switch (opt) {
+               case 'h':
+                       print_usage();
+                       break;
+               case 'r':
+                       readers = atoi(optarg);
+                       break;
+               case 'w':
+                       writers = atoi(optarg);
+                       break;
+               default: /* '?' */
+                       error = true;
+                       break;
+               }
+       }
+
+       if (writers < 1 || writers > MAXWRITERS)
+               error = true;
+       if (readers < 1 || readers > MAXREADERS)
+               error = true;
+
+       if (error)
+               print_usage();
+}
+
+int user_main(int argc, char **argv)
+{
+       struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> queue;
+       thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR];
+
+       /* Note: optarg() / optind is broken in model-checker - workaround is
+        * to just copy&paste this test a few times */
+       //process_params(argc, argv);
+       printf("%d reader(s), %d writer(s)\n", readers, writers);
+
+#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
+       printf("Adding initial element\n");
+       int32_t *bin = queue.write_prepare();
+       store_32(bin, 17);
+       queue.write_publish();
+#endif
+
+       printf("Start threads\n");
+
+       for (int i = 0; i < writers; i++)
+               thrd_create(&A[i], (thrd_start_t)&threadA, &queue);
+       for (int i = 0; i < readers; i++)
+               thrd_create(&B[i], (thrd_start_t)&threadB, &queue);
+
+       for (int i = 0; i < rdwr; i++)
+               thrd_create(&C[i], (thrd_start_t)&threadC, &queue);
+
+       for (int i = 0; i < writers; i++)
+               thrd_join(A[i]);
+       for (int i = 0; i < readers; i++)
+               thrd_join(B[i]);
+       for (int i = 0; i < rdwr; i++)
+               thrd_join(C[i]);
+
+       printf("Threads complete\n");
+
+       return 0;
+}