1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
33 TURF_TRACE_DECLARE(Linear, 27)
37 typedef typename Map::Hash Hash;
38 typedef typename Map::Value Value;
39 typedef typename Map::KeyTraits KeyTraits;
40 typedef typename Map::ValueTraits ValueTraits;
42 static const ureg InitialSize = 8;
43 static const ureg TableMigrationUnitSize = 32;
44 static const ureg CellsInUseSample = 256;
47 turf::Atomic<Hash> hash;
48 turf::Atomic<Value> value;
52 const ureg sizeMask; // a power of two minus one
53 turf::Atomic<sreg> cellsRemaining;
54 turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
55 SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
57 Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
60 static Table* create(ureg tableSize) {
61 TURF_ASSERT(turf::util::isPowerOf2(tableSize));
62 Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
63 new (table) Table(tableSize - 1);
64 for (ureg j = 0; j < tableSize; j++) {
65 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
66 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
72 this->Table::~Table();
76 Cell* getCells() const {
77 return (Cell*) (this + 1);
80 ureg getNumMigrationUnits() const {
81 return sizeMask / TableMigrationUnitSize + 1;
85 class TableMigration : public SimpleJobCoordinator::Job {
89 turf::Atomic<ureg> sourceIndex;
94 turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
95 turf::Atomic<bool> m_overflowed;
96 turf::Atomic<sreg> m_unitsRemaining;
99 TableMigration(Map& map) : m_map(map) {
102 static TableMigration* create(Map& map, ureg numSources) {
103 TableMigration* migration =
104 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
105 new (migration) TableMigration(map);
106 migration->m_workerStatus.storeNonatomic(0);
107 migration->m_overflowed.storeNonatomic(false);
108 migration->m_unitsRemaining.storeNonatomic(0);
109 migration->m_numSources = numSources;
110 // Caller is responsible for filling in sources & destination
114 virtual ~TableMigration() TURF_OVERRIDE {
118 // Destroy all source tables.
119 for (ureg i = 0; i < m_numSources; i++)
120 if (getSources()[i].table)
121 getSources()[i].table->destroy();
122 // Delete the migration object itself.
123 this->TableMigration::~TableMigration();
124 TURF_HEAP.free(this);
127 Source* getSources() const {
128 return (Source*) (this + 1);
131 bool migrateRange(Table* srcTable, ureg startIdx);
132 virtual void run() TURF_OVERRIDE;
135 static Cell* find(Hash hash, Table* table) {
136 TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
138 TURF_ASSERT(hash != KeyTraits::NullHash);
139 ureg sizeMask = table->sizeMask;
140 for (ureg idx = ureg(hash);; idx++) {
142 Cell* cell = table->getCells() + idx;
143 // Load the hash that was there.
144 Hash probeHash = cell->hash.load(turf::Relaxed);
145 if (probeHash == hash) {
146 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
148 } else if (probeHash == KeyTraits::NullHash) {
154 // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
155 enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
156 static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell) {
157 TURF_TRACE(Linear, 2, "[insertOrFind] called", uptr(table), hash);
159 TURF_ASSERT(hash != KeyTraits::NullHash);
160 ureg sizeMask = table->sizeMask;
162 for (ureg idx = ureg(hash);; idx++) {
164 cell = table->getCells() + idx;
165 // Load the existing hash.
166 Hash probeHash = cell->hash.load(turf::Relaxed);
167 if (probeHash == hash) {
168 TURF_TRACE(Linear, 3, "[insertOrFind] found existing cell", uptr(table), idx);
169 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
171 if (probeHash == KeyTraits::NullHash) {
172 // It's an empty cell. Try to reserve it.
173 // But first, decrement cellsRemaining to ensure we have permission to create new cells.
174 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
175 if (prevCellsRemaining <= 0) {
176 // Table is overpopulated.
177 TURF_TRACE(Linear, 4, "[insertOrFind] ran out of cellsRemaining", prevCellsRemaining, 0);
178 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
179 return InsertResult_Overflow;
181 // Try to reserve this cell.
182 Hash prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
183 if (prevHash == KeyTraits::NullHash) {
184 // Success. We reserved a new cell.
185 TURF_TRACE(Linear, 5, "[insertOrFind] reserved cell", prevCellsRemaining, idx);
186 return InsertResult_InsertedNew;
188 // There was a race and another thread reserved that cell from under us.
189 TURF_TRACE(Linear, 6, "[insertOrFind] detected race to reserve cell", ureg(hash), idx);
190 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
191 if (prevHash == hash) {
192 TURF_TRACE(Linear, 7, "[insertOrFind] race reserved same hash", ureg(hash), idx);
193 return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
196 // Try again in the next cell.
200 static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
201 // Create new migration by DCLI.
202 TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
203 SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
205 TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
207 turf::LockGuard<turf::Mutex> guard(table->mutex);
208 job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
210 TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
212 // Create new migration.
213 TableMigration* migration = TableMigration::create(map, 1);
214 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
215 migration->getSources()[0].table = table;
216 migration->getSources()[0].sourceIndex.storeNonatomic(0);
217 migration->m_destination = Table::create(nextTableSize);
218 // Publish the new migration.
219 table->jobCoordinator.storeRelease(migration);
224 static void beginTableMigration(Map& map, Table* table, bool mustDouble) {
227 TURF_TRACE(Linear, 11, "[beginTableMigration] forced to double", 0, 0);
228 nextTableSize = (table->sizeMask + 1) * 2;
230 // Estimate number of cells in use based on a small sample.
232 ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
234 for (; idx < sampleSize; idx++) {
235 Cell* cell = table->getCells() + idx;
236 Value value = cell->value.load(turf::Relaxed);
237 if (value == Value(ValueTraits::Redirect)) {
238 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
239 TURF_TRACE(Linear, 12, "[beginTableMigration] redirected while determining table size", 0, 0);
242 if (value != Value(ValueTraits::NullValue))
245 float inUseRatio = float(inUseCells) / sampleSize;
246 float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
247 #if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
248 // Periodically underestimate the number of cells in use.
249 // This exercises the code that handles overflow during migration.
250 static ureg counter = 1;
251 if ((++counter & 3) == 0) {
255 nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
257 beginTableMigrationToSize(map, table, nextTableSize);
262 bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
263 ureg srcSizeMask = srcTable->sizeMask;
264 ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
265 // Iterate over source range.
266 for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
267 Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
270 // Fetch the srcHash and srcValue.
272 srcHash = srcCell->hash.load(turf::Relaxed);
273 if (srcHash == KeyTraits::NullHash) {
274 // An unused cell. Try to put a Redirect marker in its value.
276 srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
277 if (srcValue == Value(ValueTraits::Redirect)) {
278 // srcValue is already marked Redirect due to previous incomplete migration.
279 TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
282 if (srcValue == Value(ValueTraits::NullValue))
283 break; // Redirect has been placed. Break inner loop, continue outer loop.
284 TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
285 // Otherwise, somebody just claimed the cell. Read srcHash again...
287 // Check for deleted/uninitialized value.
288 srcValue = srcCell->value.load(turf::Relaxed);
289 if (srcValue == Value(ValueTraits::NullValue)) {
290 // Try to put a Redirect marker.
291 if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
292 break; // Redirect has been placed. Break inner loop, continue outer loop.
293 TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
294 if (srcValue == Value(ValueTraits::Redirect)) {
295 // FIXME: I don't think this will happen. Investigate & change to assert
296 TURF_TRACE(Linear, 16, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
299 } else if (srcValue == Value(ValueTraits::Redirect)) {
300 // srcValue is already marked Redirect due to previous incomplete migration.
301 TURF_TRACE(Linear, 17, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
305 // We've got a key/value pair to migrate.
306 // Reserve a destination cell in the destination.
307 TURF_ASSERT(srcHash != KeyTraits::NullHash);
308 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
309 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
311 InsertResult result = insertOrFind(srcHash, m_destination, dstCell);
312 // During migration, a hash can only exist in one place among all the source tables,
313 // and it is only migrated by one thread. Therefore, the hash will never already exist
314 // in the destination table:
315 TURF_ASSERT(result != InsertResult_AlreadyFound);
316 if (result == InsertResult_Overflow) {
317 // Destination overflow.
318 // This can happen for several reasons. For example, the source table could have
319 // existed of all deleted cells when it overflowed, resulting in a small destination
320 // table size, but then another thread could re-insert all the same hashes
321 // before the migration completed.
322 // Caller will cancel the current migration and begin a new one.
325 // Migrate the old value to the new cell.
327 // Copy srcValue to the destination.
328 dstCell->value.store(srcValue, turf::Relaxed);
329 // Try to place a Redirect marker in srcValue.
330 Value doubleCheckedSrcValue =
331 srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
332 TURF_ASSERT(doubleCheckedSrcValue !=
333 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
334 if (doubleCheckedSrcValue == srcValue) {
335 // No racing writes to the src. We've successfully placed the Redirect marker.
336 // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
337 // by a late-arriving erase.
338 if (srcValue == Value(ValueTraits::NullValue))
339 TURF_TRACE(Linear, 18, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
342 // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
343 TURF_TRACE(Linear, 19, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
344 srcValue = doubleCheckedSrcValue;
346 // Cell successfully migrated. Proceed to next source cell.
351 // Range has been migrated successfully.
356 void Linear<Map>::TableMigration::run() {
357 // Conditionally increment the shared # of workers.
358 ureg probeStatus = m_workerStatus.load(turf::Relaxed);
360 if (probeStatus & 1) {
361 // End flag is already set, so do nothing.
362 TURF_TRACE(Linear, 20, "[TableMigration::run] already ended", uptr(this), 0);
365 } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
366 // # of workers has been incremented, and the end flag is clear.
367 TURF_ASSERT((probeStatus & 1) == 0);
369 // Iterate over all source tables.
370 for (ureg s = 0; s < m_numSources; s++) {
371 Source& source = getSources()[s];
372 // Loop over all migration units in this source table.
374 if (m_workerStatus.load(turf::Relaxed) & 1) {
375 TURF_TRACE(Linear, 21, "[TableMigration::run] detected end flag set", uptr(this), 0);
378 ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
379 if (startIdx >= source.table->sizeMask + 1)
380 break; // No more migration units in this table. Try next source table.
381 bool overflowed = !migrateRange(source.table, startIdx);
383 // *** FAILED MIGRATION ***
384 // TableMigration failed due to destination table overflow.
385 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
386 // hence m_unitsRemaining won't reach zero.
387 // However, multiple threads can independently detect a failed migration at the same time.
388 TURF_TRACE(Linear, 22, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
389 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
390 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
392 // that deals with it.
393 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
395 TURF_TRACE(Linear, 23, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
396 uptr(oldOverflowed));
397 m_workerStatus.fetchOr(1, turf::Relaxed);
400 sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
401 TURF_ASSERT(prevRemaining > 0);
402 if (prevRemaining == 1) {
403 // *** SUCCESSFUL MIGRATION ***
404 // That was the last chunk to migrate.
405 m_workerStatus.fetchOr(1, turf::Relaxed);
410 TURF_TRACE(Linear, 24, "[TableMigration::run] out of migration units", uptr(this), 0);
413 // Decrement the shared # of workers.
414 probeStatus = m_workerStatus.fetchSub(
415 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
416 if (probeStatus >= 4) {
417 // There are other workers remaining. Return here so that only the very last worker will proceed.
418 TURF_TRACE(Linear, 25, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
422 // We're the very last worker thread.
423 // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
424 TURF_ASSERT(probeStatus == 3);
425 bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
427 // The migration succeeded. This is the most likely outcome. Publish the new subtree.
428 m_map.publishTableMigration(this);
429 // End the jobCoodinator.
430 getSources()[0].table->jobCoordinator.end();
432 // The migration failed due to the overflow of the destination table.
433 Table* origTable = getSources()[0].table;
434 turf::LockGuard<turf::Mutex> guard(origTable->mutex);
435 SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
436 if (checkedJob != this) {
437 TURF_TRACE(Linear, 26, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
440 TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
441 // Double the destination table size.
442 migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
443 // Transfer source tables to the new migration.
444 for (ureg i = 0; i < m_numSources; i++) {
445 migration->getSources()[i].table = getSources()[i].table;
446 getSources()[i].table = NULL;
447 migration->getSources()[i].sourceIndex.storeNonatomic(0);
449 migration->getSources()[m_numSources].table = m_destination;
450 migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
451 // Calculate total number of migration units to move.
452 ureg unitsRemaining = 0;
453 for (ureg s = 0; s < migration->m_numSources; s++)
454 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
455 migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
456 // Publish the new migration.
457 origTable->jobCoordinator.storeRelease(migration);
461 // We're done with this TableMigration. Queue it for GC.
462 DefaultQSBR.enqueue(&TableMigration::destroy, this);
465 } // namespace details
466 } // namespace junction
468 #endif // JUNCTION_DETAILS_LINEAR_H