typedef DeterministicSchedule DSched;
+template <template<typename> class Atom>
+void run_mt_sequencer_thread(
+ int numThreads,
+ int numOps,
+ uint32_t init,
+ TurnSequencer<Atom>& seq,
+ Atom<int>& spinThreshold,
+ int& prev,
+ int i) {
+ for (int op = i; op < numOps; op += numThreads) {
+ seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
+ EXPECT_EQ(prev, op - 1);
+ prev = op;
+ seq.completeTurn(init + op);
+ }
+}
template <template<typename> class Atom>
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
int prev = -1;
std::vector<std::thread> threads(numThreads);
for (int i = 0; i < numThreads; ++i) {
- threads[i] = DSched::thread([&, i]{
- for (int op = i; op < numOps; op += numThreads) {
- seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
- EXPECT_EQ(prev, op - 1);
- prev = op;
- seq.completeTurn(init + op);
- }
- });
+ threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
+ numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
+ std::ref(prev), i));
}
for (auto& thr : threads) {
}
}
+template <template<typename> class Atom>
+void runTryEnqDeqThread(
+ int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ int src = t;
+ // received doesn't reflect any actual values, we just start with
+ // t and increment by numThreads to get the rounding of termination
+ // correct if numThreads doesn't evenly divide numOps
+ int received = t;
+ while (src < n || received < n) {
+ if (src < n && cq.write(src)) {
+ src += numThreads;
+ }
+
+ int dst;
+ if (received < n && cq.read(dst)) {
+ received += numThreads;
+ threadSum += dst;
+ }
+ }
+ sum += threadSum;
+}
+
template <template<typename> class Atom>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
std::vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread([&,t]{
- uint64_t threadSum = 0;
- int src = t;
- // received doesn't reflect any actual values, we just start with
- // t and increment by numThreads to get the rounding of termination
- // correct if numThreads doesn't evenly divide numOps
- int received = t;
- while (src < n || received < n) {
- if (src < n && cq.write(src)) {
- src += numThreads;
- }
-
- int dst;
- if (received < n && cq.read(dst)) {
- received += numThreads;
- threadSum += dst;
- }
- }
- sum += threadSum;
- });
+ threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+ numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
DSched::join(t);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
}
+template <template<typename> class Atom>
+void runNeverFailThread(
+ int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ for (int i = t; i < n; i += numThreads) {
+ // enq + deq
+ EXPECT_TRUE(cq.writeIfNotFull(i));
+
+ int dest = -1;
+ EXPECT_TRUE(cq.readIfNotEmpty(dest));
+ EXPECT_TRUE(dest >= 0);
+ threadSum += dest;
+ }
+ sum += threadSum;
+}
+
template <template<typename> class Atom>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
std::vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread([&,t]{
- uint64_t threadSum = 0;
- for (int i = t; i < n; i += numThreads) {
- // enq + deq
- EXPECT_TRUE(cq.writeIfNotFull(i));
-
- int dest = -1;
- EXPECT_TRUE(cq.readIfNotEmpty(dest));
- EXPECT_TRUE(dest >= 0);
- threadSum += dest;
- }
- sum += threadSum;
- });
+ threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+ numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
DSched::join(t);