2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/test/DeterministicSchedule.h>
23 #include <unordered_map>
26 DECLARE_ACCESS_SPREADER_TYPE(folly::test::DeterministicAtomic)
31 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
32 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
33 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
35 // access is protected by futexLock
36 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
37 std::list<std::pair<uint32_t, bool*>>> futexQueues;
39 static std::mutex futexLock;
41 DeterministicSchedule::DeterministicSchedule(
42 const std::function<int(int)>& scheduler)
43 : scheduler_(scheduler), nextThreadId_(1) {
44 assert(tls_sem == nullptr);
45 assert(tls_sched == nullptr);
48 sem_init(tls_sem, 0, 1);
49 sems_.push_back(tls_sem);
54 DeterministicSchedule::~DeterministicSchedule() {
55 assert(tls_sched == this);
56 assert(sems_.size() == 1);
57 assert(sems_[0] == tls_sem);
61 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
62 auto rand = std::make_shared<std::ranlux48>(seed);
63 return [rand](size_t numActive) {
64 auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
69 struct UniformSubset {
70 UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
71 : uniform_(DeterministicSchedule::uniform(seed)),
72 subsetSize_(subsetSize),
73 stepsBetweenSelect_(stepsBetweenSelect),
76 size_t operator()(size_t numActive) {
77 adjustPermSize(numActive);
78 if (stepsLeft_-- == 0) {
79 stepsLeft_ = stepsBetweenSelect_ - 1;
82 return perm_[uniform_(std::min(numActive, subsetSize_))];
86 std::function<int(int)> uniform_;
87 const size_t subsetSize_;
88 const int stepsBetweenSelect_;
91 // only the first subsetSize_ is properly randomized
92 std::vector<int> perm_;
94 void adjustPermSize(size_t numActive) {
95 if (perm_.size() > numActive) {
96 perm_.erase(std::remove_if(perm_.begin(),
98 [=](size_t x) { return x >= numActive; }),
101 while (perm_.size() < numActive) {
102 perm_.push_back(perm_.size());
105 assert(perm_.size() == numActive);
108 void shufflePrefix() {
109 for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
110 int j = uniform_(perm_.size() - i) + i;
111 std::swap(perm_[i], perm_[j]);
116 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
119 auto gen = std::make_shared<UniformSubset>(seed, n, m);
120 return [=](size_t numActive) { return (*gen)(numActive); };
123 void DeterministicSchedule::beforeSharedAccess() {
129 void DeterministicSchedule::afterSharedAccess() {
130 auto sched = tls_sched;
135 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
138 int DeterministicSchedule::getRandNumber(int n) {
140 return tls_sched->scheduler_(n);
142 return std::rand() % n;
145 int DeterministicSchedule::getcpu(unsigned* cpu,
147 void* /* unused */) {
148 if (!tls_threadId && tls_sched) {
149 beforeSharedAccess();
150 tls_threadId = tls_sched->nextThreadId_++;
157 *node = tls_threadId;
162 sem_t* DeterministicSchedule::beforeThreadCreate() {
163 sem_t* s = new sem_t;
165 beforeSharedAccess();
171 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
172 assert(tls_sem == nullptr);
173 assert(tls_sched == nullptr);
176 bool started = false;
178 beforeSharedAccess();
179 if (active_.count(std::this_thread::get_id()) == 1) {
186 void DeterministicSchedule::beforeThreadExit() {
187 assert(tls_sched == this);
188 beforeSharedAccess();
189 sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
190 active_.erase(std::this_thread::get_id());
191 if (sems_.size() > 0) {
192 FOLLY_TEST_DSCHED_VLOG("exiting");
195 sem_destroy(tls_sem);
201 void DeterministicSchedule::join(std::thread& child) {
202 auto sched = tls_sched;
206 beforeSharedAccess();
207 done = !sched->active_.count(child.get_id());
209 FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
217 void DeterministicSchedule::post(sem_t* sem) {
218 beforeSharedAccess();
220 FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
224 bool DeterministicSchedule::tryWait(sem_t* sem) {
225 beforeSharedAccess();
226 int rv = sem_trywait(sem);
227 int e = rv == 0 ? 0 : errno;
228 FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
239 void DeterministicSchedule::wait(sem_t* sem) {
240 while (!tryWait(sem)) {
241 // we're not busy waiting because this is a deterministic schedule
250 using namespace test;
251 using namespace std::chrono;
254 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
256 time_point<system_clock>* absSystemTimeout,
257 time_point<steady_clock>* absSteadyTimeout,
259 bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
261 FutexResult result = FutexResult::AWOKEN;
263 DeterministicSchedule::beforeSharedAccess();
264 FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
265 << ", .., " << std::hex << waitMask
268 if (data == expected) {
269 auto& queue = futexQueues[this];
270 queue.emplace_back(waitMask, &awoken);
271 auto ours = queue.end();
275 DeterministicSchedule::afterSharedAccess();
276 DeterministicSchedule::beforeSharedAccess();
279 // Simulate spurious wake-ups, timeouts each time with
280 // a 10% probability if we haven't been woken up already
281 if (!awoken && hasTimeout &&
282 DeterministicSchedule::getRandNumber(100) < 10) {
283 assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
286 futexQueues.erase(this);
288 // Simulate ETIMEDOUT 90% of the time and other failures
290 result = DeterministicSchedule::getRandNumber(100) >= 10
291 ? FutexResult::TIMEDOUT
292 : FutexResult::INTERRUPTED;
297 result = FutexResult::VALUE_CHANGED;
301 char const* resultStr = "?";
303 case FutexResult::AWOKEN:
304 resultStr = "AWOKEN";
306 case FutexResult::TIMEDOUT:
307 resultStr = "TIMEDOUT";
309 case FutexResult::INTERRUPTED:
310 resultStr = "INTERRUPTED";
312 case FutexResult::VALUE_CHANGED:
313 resultStr = "VALUE_CHANGED";
316 FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
317 << ", .., " << std::hex << waitMask << ") -> "
319 DeterministicSchedule::afterSharedAccess();
324 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
326 DeterministicSchedule::beforeSharedAccess();
328 if (futexQueues.count(this) > 0) {
329 auto& queue = futexQueues[this];
330 auto iter = queue.begin();
331 while (iter != queue.end() && rv < count) {
333 if ((cur->first & wakeMask) != 0) {
334 *(cur->second) = true;
340 futexQueues.erase(this);
344 FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
345 << wakeMask << ") -> " << rv);
346 DeterministicSchedule::afterSharedAccess();
351 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
352 static CacheLocality cache(CacheLocality::uniform(16));
357 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
358 return &DeterministicSchedule::getcpu;