2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/test/DeterministicSchedule.h>
23 #include <unordered_map>
29 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
30 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
31 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
33 // access is protected by futexLock
34 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
35 std::list<std::pair<uint32_t, bool*>>> futexQueues;
37 static std::mutex futexLock;
39 DeterministicSchedule::DeterministicSchedule(
40 const std::function<int(int)>& scheduler)
41 : scheduler_(scheduler), nextThreadId_(1) {
42 assert(tls_sem == nullptr);
43 assert(tls_sched == nullptr);
46 sem_init(tls_sem, 0, 1);
47 sems_.push_back(tls_sem);
52 DeterministicSchedule::~DeterministicSchedule() {
53 assert(tls_sched == this);
54 assert(sems_.size() == 1);
55 assert(sems_[0] == tls_sem);
59 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
60 auto rand = std::make_shared<std::ranlux48>(seed);
61 return [rand](size_t numActive) {
62 auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
67 struct UniformSubset {
68 UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
69 : uniform_(DeterministicSchedule::uniform(seed)),
70 subsetSize_(subsetSize),
71 stepsBetweenSelect_(stepsBetweenSelect),
74 size_t operator()(size_t numActive) {
75 adjustPermSize(numActive);
76 if (stepsLeft_-- == 0) {
77 stepsLeft_ = stepsBetweenSelect_ - 1;
80 return perm_[uniform_(std::min(numActive, subsetSize_))];
84 std::function<int(int)> uniform_;
85 const size_t subsetSize_;
86 const int stepsBetweenSelect_;
89 // only the first subsetSize_ is properly randomized
90 std::vector<int> perm_;
92 void adjustPermSize(size_t numActive) {
93 if (perm_.size() > numActive) {
94 perm_.erase(std::remove_if(perm_.begin(), perm_.end(), [=](size_t x) {
95 return x >= numActive;
98 while (perm_.size() < numActive) {
99 perm_.push_back(perm_.size());
102 assert(perm_.size() == numActive);
105 void shufflePrefix() {
106 for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
107 int j = uniform_(perm_.size() - i) + i;
108 std::swap(perm_[i], perm_[j]);
113 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
116 auto gen = std::make_shared<UniformSubset>(seed, n, m);
117 return [=](size_t numActive) { return (*gen)(numActive); };
120 void DeterministicSchedule::beforeSharedAccess() {
126 void DeterministicSchedule::afterSharedAccess() {
127 auto sched = tls_sched;
132 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
135 int DeterministicSchedule::getRandNumber(int n) {
137 return tls_sched->scheduler_(n);
139 return std::rand() % n;
142 int DeterministicSchedule::getcpu(unsigned* cpu, unsigned* node, void* unused) {
143 if (!tls_threadId && tls_sched) {
144 beforeSharedAccess();
145 tls_threadId = tls_sched->nextThreadId_++;
152 *node = tls_threadId;
157 sem_t* DeterministicSchedule::beforeThreadCreate() {
158 sem_t* s = new sem_t;
160 beforeSharedAccess();
166 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
167 assert(tls_sem == nullptr);
168 assert(tls_sched == nullptr);
171 bool started = false;
173 beforeSharedAccess();
174 if (active_.count(std::this_thread::get_id()) == 1) {
181 void DeterministicSchedule::beforeThreadExit() {
182 assert(tls_sched == this);
183 beforeSharedAccess();
184 sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
185 active_.erase(std::this_thread::get_id());
186 if (sems_.size() > 0) {
187 FOLLY_TEST_DSCHED_VLOG("exiting");
190 sem_destroy(tls_sem);
196 void DeterministicSchedule::join(std::thread& child) {
197 auto sched = tls_sched;
201 beforeSharedAccess();
202 done = !sched->active_.count(child.get_id());
204 FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
212 void DeterministicSchedule::post(sem_t* sem) {
213 beforeSharedAccess();
215 FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
219 bool DeterministicSchedule::tryWait(sem_t* sem) {
220 beforeSharedAccess();
221 int rv = sem_trywait(sem);
222 int e = rv == 0 ? 0 : errno;
223 FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
234 void DeterministicSchedule::wait(sem_t* sem) {
235 while (!tryWait(sem)) {
236 // we're not busy waiting because this is a deterministic schedule
245 using namespace test;
246 using namespace std::chrono;
249 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
251 time_point<system_clock>* absSystemTimeout,
252 time_point<steady_clock>* absSteadyTimeout,
254 bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
256 FutexResult result = FutexResult::AWOKEN;
258 DeterministicSchedule::beforeSharedAccess();
259 FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
260 << ", .., " << std::hex << waitMask
263 if (data == expected) {
264 auto& queue = futexQueues[this];
265 queue.emplace_back(waitMask, &awoken);
266 auto ours = queue.end();
270 DeterministicSchedule::afterSharedAccess();
271 DeterministicSchedule::beforeSharedAccess();
274 // Simulate spurious wake-ups, timeouts each time with
275 // a 10% probability if we haven't been woken up already
276 if (!awoken && hasTimeout &&
277 DeterministicSchedule::getRandNumber(100) < 10) {
278 assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
281 futexQueues.erase(this);
283 // Simulate ETIMEDOUT 90% of the time and other failures
285 result = DeterministicSchedule::getRandNumber(100) >= 10
286 ? FutexResult::TIMEDOUT
287 : FutexResult::INTERRUPTED;
292 result = FutexResult::VALUE_CHANGED;
296 char const* resultStr = "?";
298 case FutexResult::AWOKEN:
299 resultStr = "AWOKEN";
301 case FutexResult::TIMEDOUT:
302 resultStr = "TIMEDOUT";
304 case FutexResult::INTERRUPTED:
305 resultStr = "INTERRUPTED";
307 case FutexResult::VALUE_CHANGED:
308 resultStr = "VALUE_CHANGED";
311 FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
312 << ", .., " << std::hex << waitMask << ") -> "
314 DeterministicSchedule::afterSharedAccess();
319 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
321 DeterministicSchedule::beforeSharedAccess();
323 if (futexQueues.count(this) > 0) {
324 auto& queue = futexQueues[this];
325 auto iter = queue.begin();
326 while (iter != queue.end() && rv < count) {
328 if ((cur->first & wakeMask) != 0) {
329 *(cur->second) = true;
335 futexQueues.erase(this);
339 FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
340 << wakeMask << ") -> " << rv);
341 DeterministicSchedule::afterSharedAccess();
346 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
347 static CacheLocality cache(CacheLocality::uniform(16));
352 const AccessSpreader<test::DeterministicAtomic>
353 AccessSpreader<test::DeterministicAtomic>::stripeByCore(
354 CacheLocality::system<>().numCachesByLevel.front());
357 const AccessSpreader<test::DeterministicAtomic>
358 AccessSpreader<test::DeterministicAtomic>::stripeByChip(
359 CacheLocality::system<>().numCachesByLevel.back());
362 AccessSpreaderArray<test::DeterministicAtomic, 128>
363 AccessSpreaderArray<test::DeterministicAtomic, 128>::sharedInstance = {};
366 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(
368 return &DeterministicSchedule::getcpu;