1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <junction/Averager.h>
19 #include <turf/extra/Options.h>
20 #include <junction/extra/MapAdapter.h>
22 using namespace turf::intTypes;
23 typedef junction::extra::MapAdapter MapAdapter;
25 static const ureg NumKeysPerThread = 2000;
26 static const ureg DefaultReadsPerWrite = 4;
27 static const ureg DefaultItersPerChunk = 10000;
28 static const ureg DefaultChunks = 200;
29 static const u32 Prime = 0x4190ab09;
34 ureg numKeysPerThread;
38 turf::extra::SpinKicker spinKicker;
39 turf::Atomic<u32> doneFlag;
41 SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
42 : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
43 doneFlag.storeNonatomic(0);
50 SharedState& m_shared;
51 MapAdapter::ThreadContext m_threadCtx;
68 Stats& operator+=(const Stats& other) {
69 mapOpsDone += other.mapOpsDone;
70 duration += other.duration;
74 bool operator<(const Stats& other) const {
75 return duration < other.duration;
81 ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
82 m_threadIndex = threadIndex;
86 m_removeIndex = rangeLo;
89 void registerThread() {
90 m_threadCtx.registerThread();
93 void unregisterThread() {
94 m_threadCtx.unregisterThread();
97 void initialPopulate() {
98 TURF_ASSERT(m_addIndex == m_removeIndex);
99 MapAdapter::Map *map = m_shared.map;
100 for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
101 u32 key = m_addIndex * Prime;
103 map->insert(key, (void*) uptr(key));
104 if (++m_addIndex == m_rangeHi)
105 m_addIndex = m_rangeLo;
110 MapAdapter::Map *map = m_shared.map;
111 turf::CPUTimer::Converter converter;
113 ureg lookupIndex = m_rangeLo;
114 ureg remaining = m_shared.itersPerChunk;
115 if (m_threadIndex == 0)
116 m_shared.spinKicker.kick(m_shared.numThreads - 1);
119 m_shared.spinKicker.waitForKick();
123 turf::CPUTimer::Point start = turf::CPUTimer::get();
124 for (; remaining > 0; remaining--) {
126 if (m_shared.doneFlag.load(turf::Relaxed))
128 u32 key = m_addIndex * Prime;
130 map->insert(key, (void*) uptr(key));
133 if (++m_addIndex == m_rangeHi)
134 m_addIndex = m_rangeLo;
137 if (s32(lookupIndex - m_removeIndex) < 0)
138 lookupIndex = m_removeIndex;
139 for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
140 if (m_shared.doneFlag.load(turf::Relaxed))
142 key = lookupIndex * Prime;
144 volatile void* value = map->get(key);
148 if (++lookupIndex == m_rangeHi)
149 lookupIndex = m_rangeLo;
150 if (lookupIndex == m_addIndex)
151 lookupIndex = m_removeIndex;
155 if (m_shared.doneFlag.load(turf::Relaxed))
157 key = m_removeIndex * Prime;
162 if (++m_removeIndex == m_rangeHi)
163 m_removeIndex = m_rangeLo;
166 if (s32(lookupIndex - m_removeIndex) < 0)
167 lookupIndex = m_removeIndex;
168 for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
169 if (m_shared.doneFlag.load(turf::Relaxed))
171 key = lookupIndex * Prime;
173 volatile void* value = map->get(key);
177 if (++lookupIndex == m_rangeHi)
178 lookupIndex = m_rangeLo;
179 if (lookupIndex == m_addIndex)
180 lookupIndex = m_removeIndex;
183 if (m_threadIndex == 0)
184 m_shared.doneFlag.store(1, turf::Relaxed);
185 m_threadCtx.update();
186 turf::CPUTimer::Point end = turf::CPUTimer::get();
189 stats.duration = converter.toSeconds(end - start);
194 static const turf::extra::Option Options[] = {
195 { "readsPerWrite", 'r', true, "number of reads per write" },
196 { "itersPerChunk", 'i', true, "number of iterations per chunk" },
197 { "chunks", 'c', true, "number of chunks to execute" },
198 { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
201 int main(int argc, const char** argv) {
202 turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
203 options.parse(argc, argv);
204 ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
205 ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
206 ureg chunks = options.getInteger("chunks", DefaultChunks);
207 double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
209 turf::extra::JobDispatcher dispatcher;
210 ureg numCores = dispatcher.getNumPhysicalCores();
211 TURF_ASSERT(numCores > 0);
212 MapAdapter adapter(numCores);
214 // Create shared state and register first thread
215 SharedState shared(adapter, NumKeysPerThread, readsPerWrite, itersPerChunk);
216 std::vector<ThreadState> threads;
217 threads.reserve(numCores);
218 for (ureg t = 0; t < numCores; t++) {
219 u32 rangeLo = 0xffffffffu / numCores * t + 1;
220 u32 rangeHi = 0xffffffffu / numCores * (t + 1) + 1;
221 threads.emplace_back(shared, t, rangeLo, rangeHi);
223 dispatcher.kickOne(0, &ThreadState::registerThread, threads[0]);
226 // Create the map and populate it entirely from main thread
227 MapAdapter::Map map(MapAdapter::getInitialCapacity(numCores * NumKeysPerThread));
229 for (ureg t = 0; t < numCores; t++) {
230 threads[t].initialPopulate();
234 printf("'mapType': '%s',\n", MapAdapter::MapName);
235 printf("'population': %d,\n", (int) (numCores * NumKeysPerThread));
236 printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
237 printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
238 printf("'chunks': %d,\n", (int) chunks);
239 printf("'keepChunkFraction': %f,\n", keepChunkFraction);
240 printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"),
241 printf("'points': [\n");
242 for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) {
243 if (shared.numThreads > 1) {
244 // Spawn and register a new thread
245 dispatcher.kickOne(shared.numThreads - 1, &ThreadState::registerThread, threads[shared.numThreads - 1]);
248 std::vector<ThreadState::Stats> kickTotals;
249 for (ureg c = 0; c < chunks; c++) {
250 shared.doneFlag.storeNonatomic(false);
251 dispatcher.kickMulti(&ThreadState::run, &threads[0], shared.numThreads);
253 ThreadState::Stats kickTotal;
254 for (ureg t = 0; t < shared.numThreads; t++)
255 kickTotal += threads[t].m_stats;
256 kickTotals.push_back(kickTotal);
259 std::sort(kickTotals.begin(), kickTotals.end());
260 ThreadState::Stats totals;
261 for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
262 totals += kickTotals[t];
265 printf(" (%d, %d, %f),\n",
266 int(shared.numThreads),
267 int(totals.mapOpsDone),
276 dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());