TESTS = mpmc-queue mpmc-1r2w mpmc-2r1w mpmc-queue-rdwr
TESTS += mpmc-queue-noinit mpmc-1r2w-noinit mpmc-2r1w-noinit mpmc-rdwr-noinit
-all: $(TESTS)
+WILDCARD_TESTS = testcase1 testcase2
+
+all: $(TESTS) $(WILDCARD_TESTS)
mpmc-queue: CPPFLAGS += -DCONFIG_MPMC_READERS=2 -DCONFIG_MPMC_WRITERS=2
mpmc-queue-rdwr: CPPFLAGS += -DCONFIG_MPMC_READERS=0 -DCONFIG_MPMC_WRITERS=0 -DCONFIG_MPMC_RDWR=2
$(TESTS): $(TESTNAME).cc $(TESTNAME).h
$(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
+$(WILDCARD_TESTS): % : %.cc $(TESTNAME)-wildcard.h
+ $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS)
+
clean:
- rm -f $(TESTS) *.o
+ rm -f $(TESTS) $(WILDCARD_TESTS) *.o
--- /dev/null
+#include <stdatomic.h>
+#include <unrelacy.h>
+#include "wildcard.h"
+
+template <typename t_element, size_t t_size>
+struct mpmc_boundq_1_alt
+{
+private:
+
+ // elements should generally be cache-line-size padded :
+ t_element m_array[t_size];
+
+ // rdwr counts the reads & writes that have started
+ atomic<unsigned int> m_rdwr;
+ // "read" and "written" count the number completed
+ atomic<unsigned int> m_read;
+ atomic<unsigned int> m_written;
+
+public:
+
+ mpmc_boundq_1_alt()
+ {
+ m_rdwr = 0;
+ m_read = 0;
+ m_written = 0;
+ }
+
+
+ /**
+ @Global_define:
+ Order_queue<unsigned int*> spec_queue;
+ */
+
+ //-----------------------------------------------------
+
+ t_element * read_fetch() {
+ unsigned int rdwr = m_rdwr.load(wildcard(1)); // acquire
+ unsigned int rd,wr;
+ for(;;) {
+ rd = (rdwr>>16) & 0xFFFF;
+ wr = rdwr & 0xFFFF;
+
+ if ( wr == rd ) { // empty
+ return false;
+ }
+
+ // acq_rel
+ if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),wildcard(2)) )
+ break;
+ else
+ thrd_yield();
+ }
+
+ // (*1)
+ rl::backoff bo;
+ // acquire
+ while ( (m_written.load(wildcard(3)) & 0xFFFF) != wr ) {
+ thrd_yield();
+ }
+
+ t_element * p = & ( m_array[ rd % t_size ] );
+
+ /**
+ @Commit_point_Check: true
+ @Label: ANY
+ @Check:
+ spec_queue.peak() == p
+ */
+ return p;
+ }
+
+ void read_consume() {
+ m_read.fetch_add(1,wildcard(4)); // release
+ /**
+ @Commit_point_define: true
+ @Label: Read_Consume_Success
+ @Check:
+ spec_queue.size() > 0
+ @Action:
+ spec_queue.remove();
+ */
+ }
+
+ //-----------------------------------------------------
+
+ t_element * write_prepare() {
+ unsigned int rdwr = m_rdwr.load(wildcard(5)); // acquire
+ unsigned int rd,wr;
+ for(;;) {
+ rd = (rdwr>>16) & 0xFFFF;
+ wr = rdwr & 0xFFFF;
+
+ if ( wr == ((rd + t_size)&0xFFFF) ) // full
+ return NULL;
+ // acq_rel
+ if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),wildcard(6)) )
+ break;
+ else
+ thrd_yield();
+ }
+
+ // (*1)
+ rl::backoff bo;
+ while ( (m_read.load(wildcard(7)) & 0xFFFF) != rd ) { // acquire
+ thrd_yield();
+ }
+
+
+ t_element * p = & ( m_array[ wr % t_size ] );
+
+ /**
+ @Commit_point_check: ANY
+ @Action: spec_queue.add(p);
+ */
+ return p;
+ }
+
+ void write_publish()
+ {
+ m_written.fetch_add(1,wildcard(8)); // release
+ }
+
+ //-----------------------------------------------------
+
+
+};
+
+typedef struct mpmc_boundq_1_alt<int, 4> queue_t;
--- /dev/null
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue-wildcard.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+ int32_t *bin = queue->write_prepare();
+ store_32(bin, 1);
+ queue->write_publish();
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+ int32_t *bin;
+ while ((bin = queue->read_fetch()) != NULL) {
+ printf("Read: %d\n", load_32(bin));
+ queue->read_consume();
+ }
+}
+
+void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+ int32_t *bin = queue->write_prepare();
+ store_32(bin, 1);
+ queue->write_publish();
+
+ while ((bin = queue->read_fetch()) != NULL) {
+ printf("Read: %d\n", load_32(bin));
+ queue->read_consume();
+ }
+}
+
+#define MAXREADERS 3
+#define MAXWRITERS 3
+#define MAXRDWR 3
+
+#ifdef CONFIG_MPMC_READERS
+#define DEFAULT_READERS (CONFIG_MPMC_READERS)
+#else
+#define DEFAULT_READERS 2
+#endif
+
+#ifdef CONFIG_MPMC_WRITERS
+#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS)
+#else
+#define DEFAULT_WRITERS 2
+#endif
+
+#ifdef CONFIG_MPMC_RDWR
+#define DEFAULT_RDWR (CONFIG_MPMC_RDWR)
+#else
+#define DEFAULT_RDWR 0
+#endif
+
+int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR;
+
+void print_usage()
+{
+ printf("Error: use the following options\n"
+ " -r <num> Choose number of reader threads\n"
+ " -w <num> Choose number of writer threads\n");
+ exit(EXIT_FAILURE);
+}
+
+void process_params(int argc, char **argv)
+{
+ const char *shortopts = "hr:w:";
+ int opt;
+ bool error = false;
+
+ while (!error && (opt = getopt(argc, argv, shortopts)) != -1) {
+ switch (opt) {
+ case 'h':
+ print_usage();
+ break;
+ case 'r':
+ readers = atoi(optarg);
+ break;
+ case 'w':
+ writers = atoi(optarg);
+ break;
+ default: /* '?' */
+ error = true;
+ break;
+ }
+ }
+
+ if (writers < 1 || writers > MAXWRITERS)
+ error = true;
+ if (readers < 1 || readers > MAXREADERS)
+ error = true;
+
+ if (error)
+ print_usage();
+}
+
+int user_main(int argc, char **argv)
+{
+ struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> queue;
+ thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR];
+
+ /* Note: optarg() / optind is broken in model-checker - workaround is
+ * to just copy&paste this test a few times */
+ //process_params(argc, argv);
+ printf("%d reader(s), %d writer(s)\n", readers, writers);
+
+#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
+ printf("Adding initial element\n");
+ int32_t *bin = queue.write_prepare();
+ store_32(bin, 17);
+ queue.write_publish();
+#endif
+
+ printf("Start threads\n");
+
+ for (int i = 0; i < writers; i++)
+ thrd_create(&A[i], (thrd_start_t)&threadA, &queue);
+ for (int i = 0; i < readers; i++)
+ thrd_create(&B[i], (thrd_start_t)&threadB, &queue);
+
+ for (int i = 0; i < rdwr; i++)
+ thrd_create(&C[i], (thrd_start_t)&threadC, &queue);
+
+ for (int i = 0; i < writers; i++)
+ thrd_join(A[i]);
+ for (int i = 0; i < readers; i++)
+ thrd_join(B[i]);
+ for (int i = 0; i < rdwr; i++)
+ thrd_join(C[i]);
+
+ printf("Threads complete\n");
+
+ return 0;
+}
--- /dev/null
+#include <inttypes.h>
+#include <threads.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <librace.h>
+
+#include "mpmc-queue-wildcard.h"
+
+void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+ int32_t *bin = queue->write_prepare();
+ store_32(bin, 1);
+ queue->write_publish();
+}
+
+void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+ int32_t *bin;
+ while ((bin = queue->read_fetch()) != NULL) {
+ printf("Read: %d\n", load_32(bin));
+ queue->read_consume();
+ }
+}
+
+void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
+{
+ int32_t *bin = queue->write_prepare();
+ store_32(bin, 1);
+ queue->write_publish();
+
+ while ((bin = queue->read_fetch()) != NULL) {
+ printf("Read: %d\n", load_32(bin));
+ queue->read_consume();
+ }
+}
+
+#define MAXREADERS 3
+#define MAXWRITERS 3
+#define MAXRDWR 3
+
+#ifdef CONFIG_MPMC_READERS
+#define DEFAULT_READERS (CONFIG_MPMC_READERS)
+#else
+#define DEFAULT_READERS 2
+#endif
+
+#ifdef CONFIG_MPMC_WRITERS
+#define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS)
+#else
+#define DEFAULT_WRITERS 2
+#endif
+
+#ifdef CONFIG_MPMC_RDWR
+#define DEFAULT_RDWR (CONFIG_MPMC_RDWR)
+#else
+#define DEFAULT_RDWR 0
+#endif
+
+int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR;
+
+void print_usage()
+{
+ printf("Error: use the following options\n"
+ " -r <num> Choose number of reader threads\n"
+ " -w <num> Choose number of writer threads\n");
+ exit(EXIT_FAILURE);
+}
+
+void process_params(int argc, char **argv)
+{
+ const char *shortopts = "hr:w:";
+ int opt;
+ bool error = false;
+
+ while (!error && (opt = getopt(argc, argv, shortopts)) != -1) {
+ switch (opt) {
+ case 'h':
+ print_usage();
+ break;
+ case 'r':
+ readers = atoi(optarg);
+ break;
+ case 'w':
+ writers = atoi(optarg);
+ break;
+ default: /* '?' */
+ error = true;
+ break;
+ }
+ }
+
+ if (writers < 1 || writers > MAXWRITERS)
+ error = true;
+ if (readers < 1 || readers > MAXREADERS)
+ error = true;
+
+ if (error)
+ print_usage();
+}
+
+int user_main(int argc, char **argv)
+{
+ struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> queue;
+ thrd_t A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR];
+
+ /* Note: optarg() / optind is broken in model-checker - workaround is
+ * to just copy&paste this test a few times */
+ //process_params(argc, argv);
+ printf("%d reader(s), %d writer(s)\n", readers, writers);
+
+#ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
+ printf("Adding initial element\n");
+ int32_t *bin = queue.write_prepare();
+ store_32(bin, 17);
+ queue.write_publish();
+#endif
+
+ printf("Start threads\n");
+
+ for (int i = 0; i < writers; i++)
+ thrd_create(&A[i], (thrd_start_t)&threadA, &queue);
+ for (int i = 0; i < readers; i++)
+ thrd_create(&B[i], (thrd_start_t)&threadB, &queue);
+
+ for (int i = 0; i < rdwr; i++)
+ thrd_create(&C[i], (thrd_start_t)&threadC, &queue);
+
+ for (int i = 0; i < writers; i++)
+ thrd_join(A[i]);
+ for (int i = 0; i < readers; i++)
+ thrd_join(B[i]);
+ for (int i = 0; i < rdwr; i++)
+ thrd_join(C[i]);
+
+ printf("Threads complete\n");
+
+ return 0;
+}