1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
24 using namespace turf::intTypes;
25 typedef junction::extra::MapAdapter MapAdapter;
27 static const ureg NumKeysPerThread = 16384;
28 static const ureg DefaultReadsPerWrite = 19;
29 static const ureg DefaultItersPerChunk = 128;
30 static const ureg DefaultChunks = 10;
31 static const u32 Prime = 0x4190ab09;
35 turf::extra::Random m_rand;
40 m_threshold = u32(double(0xffffffffu) * ratio);
43 void delay(ureg& workUnits) {
45 volatile ureg v = m_rand.next32();
56 ureg numKeysPerThread;
61 turf::extra::SpinKicker spinKicker;
62 turf::Atomic<u32> doneFlag;
64 SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
65 : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite),
66 itersPerChunk(itersPerChunk) {
68 doneFlag.storeNonatomic(0);
74 SharedState* m_shared;
75 MapAdapter::ThreadContext m_threadCtx;
94 Stats& operator+=(const Stats& other) {
95 workUnitsDone += other.workUnitsDone;
96 mapOpsDone += other.mapOpsDone;
97 duration += other.duration;
101 bool operator<(const Stats& other) const {
102 return duration < other.duration;
108 ThreadState(SharedState* shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
109 : m_shared(shared), m_threadCtx(shared->adapter, threadIndex) {
110 m_threadIndex = threadIndex;
113 m_addIndex = rangeLo;
114 m_removeIndex = rangeLo;
117 void registerThread() {
118 m_threadCtx.registerThread();
121 void unregisterThread() {
122 m_threadCtx.unregisterThread();
125 void initialPopulate() {
126 TURF_ASSERT(m_addIndex == m_removeIndex);
127 MapAdapter::Map* map = m_shared->map;
128 for (ureg i = 0; i < m_shared->numKeysPerThread; i++) {
129 u32 key = m_addIndex * Prime;
130 map->assign(key, (void*) (key & ~uptr(3)));
131 if (++m_addIndex == m_rangeHi)
132 m_addIndex = m_rangeLo;
137 MapAdapter::Map* map = m_shared->map;
138 turf::CPUTimer::Converter converter;
139 Delay delay(m_shared->delayFactor);
141 ureg lookupIndex = m_rangeLo;
142 ureg remaining = m_shared->itersPerChunk;
143 if (m_threadIndex == 0)
144 m_shared->spinKicker.kick(m_shared->numThreads - 1);
147 m_shared->spinKicker.waitForKick();
151 turf::CPUTimer::Point start = turf::CPUTimer::get();
152 for (; remaining > 0; remaining--) {
154 delay.delay(stats.workUnitsDone);
155 if (m_shared->doneFlag.load(turf::Relaxed))
157 u32 key = m_addIndex * Prime;
159 map->assign(key, (void*) uptr(key));
162 if (++m_addIndex == m_rangeHi)
163 m_addIndex = m_rangeLo;
166 if (s32(lookupIndex - m_removeIndex) < 0)
167 lookupIndex = m_removeIndex;
168 for (ureg l = 0; l < m_shared->readsPerWrite; l++) {
169 delay.delay(stats.workUnitsDone);
170 if (m_shared->doneFlag.load(turf::Relaxed))
172 key = lookupIndex * Prime;
174 volatile void* value = map->get(key);
178 if (++lookupIndex == m_rangeHi)
179 lookupIndex = m_rangeLo;
180 if (lookupIndex == m_addIndex)
181 lookupIndex = m_removeIndex;
185 delay.delay(stats.workUnitsDone);
186 if (m_shared->doneFlag.load(turf::Relaxed))
188 key = m_removeIndex * Prime;
193 if (++m_removeIndex == m_rangeHi)
194 m_removeIndex = m_rangeLo;
197 if (s32(lookupIndex - m_removeIndex) < 0)
198 lookupIndex = m_removeIndex;
199 for (ureg l = 0; l < m_shared->readsPerWrite; l++) {
200 delay.delay(stats.workUnitsDone);
201 if (m_shared->doneFlag.load(turf::Relaxed))
203 key = lookupIndex * Prime;
205 volatile void* value = map->get(key);
209 if (++lookupIndex == m_rangeHi)
210 lookupIndex = m_rangeLo;
211 if (lookupIndex == m_addIndex)
212 lookupIndex = m_removeIndex;
215 if (m_threadIndex == 0)
216 m_shared->doneFlag.store(1, turf::Relaxed);
217 m_threadCtx.update();
218 turf::CPUTimer::Point end = turf::CPUTimer::get();
221 stats.duration = converter.toSeconds(end - start);
226 static const turf::extra::Option Options[] = {
227 {"readsPerWrite", 'r', true, "number of reads per write"},
228 {"itersPerChunk", 'i', true, "number of iterations per chunk"},
229 {"chunks", 'c', true, "number of chunks to execute"},
230 {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
233 int main(int argc, const char** argv) {
234 turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
235 options.parse(argc, argv);
236 ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
237 ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
238 ureg chunks = options.getInteger("chunks", DefaultChunks);
239 double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
241 turf::extra::JobDispatcher dispatcher;
242 ureg numThreads = dispatcher.getNumPhysicalCores();
243 MapAdapter adapter(numThreads);
245 // Create shared state and register threads
246 SharedState shared(adapter, numThreads, NumKeysPerThread, readsPerWrite, itersPerChunk);
247 std::vector<ThreadState> threads;
248 threads.reserve(numThreads);
249 for (ureg t = 0; t < numThreads; t++) {
250 u32 rangeLo = 0xffffffffu / numThreads * t + 1;
251 u32 rangeHi = 0xffffffffu / numThreads * (t + 1) + 1;
252 threads.push_back(ThreadState(&shared, t, rangeLo, rangeHi));
254 dispatcher.kickMulti(&ThreadState::registerThread, &threads[0], threads.size());
258 MapAdapter::Map map(MapAdapter::getInitialCapacity(numThreads * NumKeysPerThread));
260 dispatcher.kickMulti(&ThreadState::initialPopulate, &threads[0], threads.size());
263 printf("'mapType': '%s',\n", MapAdapter::getMapName());
264 printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
265 printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
266 printf("'chunks': %d,\n", (int) chunks);
267 printf("'keepChunkFraction': %f,\n", keepChunkFraction);
268 printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
269 for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f) {
270 shared.delayFactor = delayFactor;
272 std::vector<ThreadState::Stats> kickTotals;
273 for (ureg c = 0; c < chunks; c++) {
274 shared.doneFlag.storeNonatomic(false);
275 dispatcher.kickMulti(&ThreadState::run, &threads[0], threads.size());
277 ThreadState::Stats kickTotal;
278 for (ureg t = 0; t < numThreads; t++)
279 kickTotal += threads[t].m_stats;
280 kickTotals.push_back(kickTotal);
283 std::sort(kickTotals.begin(), kickTotals.end());
284 ThreadState::Stats totals;
285 for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
286 totals += kickTotals[t];
289 printf(" (%f, %d, %d, %f),\n", shared.delayFactor, int(totals.workUnitsDone), int(totals.mapOpsDone),
298 dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());