2 #include "cds_threads.h"
9 #include "mpmc-queue.h"
11 void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
13 // std::this_thread::sleep_for(std::chrono::milliseconds(10));
14 int32_t *bin = queue->write_prepare();
16 queue->write_publish();
19 void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
21 // std::this_thread::sleep_for(std::chrono::milliseconds(10));
23 while ((bin = queue->read_fetch()) != NULL) {
24 printf("Read: %d\n", load_32(bin));
25 queue->read_consume();
29 void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
31 // std::this_thread::sleep_for(std::chrono::milliseconds(10));
32 int32_t *bin = queue->write_prepare();
34 queue->write_publish();
36 while ((bin = queue->read_fetch()) != NULL) {
37 printf("Read: %d\n", load_32(bin));
38 queue->read_consume();
46 #ifdef CONFIG_MPMC_READERS
47 #define DEFAULT_READERS (CONFIG_MPMC_READERS)
49 #define DEFAULT_READERS 2
52 #ifdef CONFIG_MPMC_WRITERS
53 #define DEFAULT_WRITERS (CONFIG_MPMC_WRITERS)
55 #define DEFAULT_WRITERS 2
58 #ifdef CONFIG_MPMC_RDWR
59 #define DEFAULT_RDWR (CONFIG_MPMC_RDWR)
61 #define DEFAULT_RDWR 0
64 int readers = DEFAULT_READERS, writers = DEFAULT_WRITERS, rdwr = DEFAULT_RDWR;
68 printf("Error: use the following options\n"
69 " -r <num> Choose number of reader threads\n"
70 " -w <num> Choose number of writer threads\n");
74 void process_params(int argc, char **argv)
76 const char *shortopts = "hr:w:";
80 while (!error && (opt = getopt(argc, argv, shortopts)) != -1) {
86 readers = atoi(optarg);
89 writers = atoi(optarg);
97 if (writers < 1 || writers > MAXWRITERS)
99 if (readers < 1 || readers > MAXREADERS)
106 int user_main(int argc, char **argv)
108 struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> queue;
109 std::thread A[MAXWRITERS], B[MAXREADERS], C[MAXRDWR];
111 /* Note: optarg() / optind is broken in model-checker - workaround is
112 * to just copy&paste this test a few times */
113 //process_params(argc, argv);
114 printf("%d reader(s), %d writer(s)\n", readers, writers);
116 #ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
117 printf("Adding initial element\n");
118 int32_t *bin = queue.write_prepare();
120 queue.write_publish();
123 printf("Start threads\n");
125 for (int i = 0; i < writers; i++)
126 A[i] = std::thread(threadA, &queue);
127 for (int i = 0; i < readers; i++)
128 B[i] = std::thread(threadB, &queue);
130 for (int i = 0; i < rdwr; i++)
131 C[i] = std::thread(threadC, &queue);
133 for (int i = 0; i < writers; i++)
135 for (int i = 0; i < readers; i++)
137 for (int i = 0; i < rdwr; i++)
140 printf("Threads complete\n");