1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
33 TURF_TRACE_DECLARE(Linear, 27)
37 typedef typename Map::Hash Hash;
38 typedef typename Map::Value Value;
39 typedef typename Map::KeyTraits KeyTraits;
40 typedef typename Map::ValueTraits ValueTraits;
42 static const ureg InitialSize = 8;
43 static const ureg TableMigrationUnitSize = 32;
44 static const ureg CellsInUseSample = 256;
47 turf::Atomic<Hash> hash;
48 turf::Atomic<Value> value;
52 const ureg sizeMask; // a power of two minus one
53 turf::Atomic<sreg> cellsRemaining;
54 turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
55 SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
57 Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
60 static Table* create(ureg tableSize) {
61 TURF_ASSERT(turf::util::isPowerOf2(tableSize));
62 Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
63 new (table) Table(tableSize - 1);
64 for (ureg j = 0; j < tableSize; j++) {
65 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
66 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
72 this->Table::~Table();
76 Cell* getCells() const {
77 return (Cell*) (this + 1);
80 ureg getNumMigrationUnits() const {
81 return sizeMask / TableMigrationUnitSize + 1;
85 class TableMigration : public SimpleJobCoordinator::Job {
89 turf::Atomic<ureg> sourceIndex;
94 turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
95 turf::Atomic<bool> m_overflowed;
96 turf::Atomic<sreg> m_unitsRemaining;
99 TableMigration(Map& map) : m_map(map) {
102 static TableMigration* create(Map& map, ureg numSources) {
103 TableMigration* migration =
104 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
105 new (migration) TableMigration(map);
106 migration->m_workerStatus.storeNonatomic(0);
107 migration->m_overflowed.storeNonatomic(false);
108 migration->m_unitsRemaining.storeNonatomic(0);
109 migration->m_numSources = numSources;
110 // Caller is responsible for filling in sources & destination
114 virtual ~TableMigration() TURF_OVERRIDE {
118 // Destroy all source tables.
119 for (ureg i = 0; i < m_numSources; i++)
120 if (getSources()[i].table)
121 getSources()[i].table->destroy();
122 // Delete the migration object itself.
123 this->TableMigration::~TableMigration();
124 TURF_HEAP.free(this);
127 Source* getSources() const {
128 return (Source*) (this + 1);
131 bool migrateRange(Table* srcTable, ureg startIdx);
132 virtual void run() TURF_OVERRIDE;
135 static Cell* find(Hash hash, Table* table) {
136 TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
138 TURF_ASSERT(hash != KeyTraits::NullHash);
139 ureg sizeMask = table->sizeMask;
140 for (ureg idx = ureg(hash);; idx++) {
142 Cell* cell = table->getCells() + idx;
143 // Load the hash that was there.
144 Hash probeHash = cell->hash.load(turf::Relaxed);
145 if (probeHash == hash) {
146 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
148 } else if (probeHash == KeyTraits::NullHash) {
154 // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
155 enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
156 static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
157 TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
159 TURF_ASSERT(hash != KeyTraits::NullHash);
160 ureg sizeMask = table->sizeMask;
162 for (ureg idx = ureg(hash);; idx++) {
164 cell = table->getCells() + idx;
165 // Load the existing hash.
166 Hash probeHash = cell->hash.load(turf::Relaxed);
167 if (probeHash == hash) {
168 TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
169 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
171 if (probeHash == KeyTraits::NullHash) {
172 // It's an empty cell. Try to reserve it.
173 // But first, decrement cellsRemaining to ensure we have permission to create new cells.
174 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
175 if (prevCellsRemaining <= 0) {
176 // Table is overpopulated.
177 TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
178 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
179 return InsertResult_Overflow;
181 // Try to reserve this cell.
182 Hash prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
183 if (prevHash == KeyTraits::NullHash) {
184 // Success. We reserved a new cell.
185 TURF_TRACE(Linear, 5, "[insert] reserved cell", prevCellsRemaining, idx);
186 return InsertResult_InsertedNew;
188 // There was a race and another thread reserved that cell from under us.
189 TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
190 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
191 if (prevHash == hash) {
192 TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
193 return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
196 // Try again in the next cell.
200 static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
201 // Create new migration by DCLI.
202 TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
203 SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
205 TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
207 turf::LockGuard<turf::Mutex> guard(table->mutex);
208 job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
210 TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
212 // Create new migration.
213 TableMigration* migration = TableMigration::create(map, 1);
214 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
215 migration->getSources()[0].table = table;
216 migration->getSources()[0].sourceIndex.storeNonatomic(0);
217 migration->m_destination = Table::create(nextTableSize);
218 // Publish the new migration.
219 table->jobCoordinator.storeRelease(migration);
224 static void beginTableMigration(Map& map, Table* table, bool mustDouble) {
227 TURF_TRACE(Linear, 11, "[beginTableMigration] forced to double", 0, 0);
228 nextTableSize = (table->sizeMask + 1) * 2;
230 // Estimate number of cells in use based on a small sample.
232 ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
234 for (; idx < sampleSize; idx++) {
235 Cell* cell = table->getCells() + idx;
236 Value value = cell->value.load(turf::Relaxed);
237 if (value == Value(ValueTraits::Redirect)) {
238 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
239 TURF_TRACE(Linear, 12, "[beginTableMigration] redirected while determining table size", 0, 0);
242 if (value != Value(ValueTraits::NullValue))
245 float inUseRatio = float(inUseCells) / sampleSize;
246 float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
247 #if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
248 // Periodically underestimate the number of cells in use.
249 // This exercises the code that handles overflow during migration.
250 static ureg counter = 1;
251 if ((++counter & 3) == 0) {
255 nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
257 beginTableMigrationToSize(map, table, nextTableSize);
262 bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
263 ureg srcSizeMask = srcTable->sizeMask;
264 ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
265 sreg valuesMigrated = 0;
266 // Iterate over source range.
267 for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
268 Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
271 // Fetch the srcHash and srcValue.
273 srcHash = srcCell->hash.load(turf::Relaxed);
274 if (srcHash == KeyTraits::NullHash) {
275 // An unused cell. Try to put a Redirect marker in its value.
277 srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
278 if (srcValue == Value(ValueTraits::Redirect)) {
279 // srcValue is already marked Redirect due to previous incomplete migration.
280 TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
283 if (srcValue == Value(ValueTraits::NullValue))
284 break; // Redirect has been placed. Break inner loop, continue outer loop.
285 TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
286 // Otherwise, somebody just claimed the cell. Read srcHash again...
288 // Check for deleted/uninitialized value.
289 srcValue = srcCell->value.load(turf::Relaxed);
290 if (srcValue == Value(ValueTraits::NullValue)) {
291 // Try to put a Redirect marker.
292 if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
293 break; // Redirect has been placed. Break inner loop, continue outer loop.
294 TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
295 if (srcValue == Value(ValueTraits::Redirect)) {
296 // FIXME: I don't think this will happen. Investigate & change to assert
297 TURF_TRACE(Linear, 16, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
300 } else if (srcValue == Value(ValueTraits::Redirect)) {
301 // srcValue is already marked Redirect due to previous incomplete migration.
302 TURF_TRACE(Linear, 17, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
306 // We've got a key/value pair to migrate.
307 // Reserve a destination cell in the destination.
308 TURF_ASSERT(srcHash != KeyTraits::NullHash);
309 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
310 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
312 InsertResult result = insert(srcHash, m_destination, dstCell);
313 // During migration, a hash can only exist in one place among all the source tables,
314 // and it is only migrated by one thread. Therefore, the hash will never already exist
315 // in the destination table:
316 TURF_ASSERT(result != InsertResult_AlreadyFound);
317 if (result == InsertResult_Overflow) {
318 // Destination overflow.
319 // This can happen for several reasons. For example, the source table could have
320 // existed of all deleted cells when it overflowed, resulting in a small destination
321 // table size, but then another thread could re-insert all the same hashes
322 // before the migration completed.
323 // Caller will cancel the current migration and begin a new one.
326 // Migrate the old value to the new cell.
328 // Copy srcValue to the destination.
329 dstCell->value.store(srcValue, turf::Relaxed);
330 // Try to place a Redirect marker in srcValue.
331 Value doubleCheckedSrcValue =
332 srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
333 TURF_ASSERT(doubleCheckedSrcValue !=
334 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
335 if (doubleCheckedSrcValue == srcValue) {
336 // No racing writes to the src. We've successfully placed the Redirect marker.
337 // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
338 // by a late-arriving erase.
339 if (srcValue == Value(ValueTraits::NullValue))
340 TURF_TRACE(Linear, 18, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
343 // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
344 TURF_TRACE(Linear, 19, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
345 srcValue = doubleCheckedSrcValue;
347 // Cell successfully migrated. Proceed to next source cell.
352 // Range has been migrated successfully.
357 void Linear<Map>::TableMigration::run() {
358 // Conditionally increment the shared # of workers.
359 ureg probeStatus = m_workerStatus.load(turf::Relaxed);
361 if (probeStatus & 1) {
362 // End flag is already set, so do nothing.
363 TURF_TRACE(Linear, 20, "[TableMigration::run] already ended", uptr(this), 0);
366 } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
367 // # of workers has been incremented, and the end flag is clear.
368 TURF_ASSERT((probeStatus & 1) == 0);
370 // Iterate over all source tables.
371 for (ureg s = 0; s < m_numSources; s++) {
372 Source& source = getSources()[s];
373 // Loop over all migration units in this source table.
375 if (m_workerStatus.load(turf::Relaxed) & 1) {
376 TURF_TRACE(Linear, 21, "[TableMigration::run] detected end flag set", uptr(this), 0);
379 ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
380 if (startIdx >= source.table->sizeMask + 1)
381 break; // No more migration units in this table. Try next source table.
382 bool overflowed = !migrateRange(source.table, startIdx);
384 // *** FAILED MIGRATION ***
385 // TableMigration failed due to destination table overflow.
386 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
387 // hence m_unitsRemaining won't reach zero.
388 // However, multiple threads can independently detect a failed migration at the same time.
389 TURF_TRACE(Linear, 22, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
390 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
391 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
393 // that deals with it.
394 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
396 TURF_TRACE(Linear, 23, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
397 uptr(oldOverflowed));
398 m_workerStatus.fetchOr(1, turf::Relaxed);
401 sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
402 TURF_ASSERT(prevRemaining > 0);
403 if (prevRemaining == 1) {
404 // *** SUCCESSFUL MIGRATION ***
405 // That was the last chunk to migrate.
406 m_workerStatus.fetchOr(1, turf::Relaxed);
411 TURF_TRACE(Linear, 24, "[TableMigration::run] out of migration units", uptr(this), 0);
414 // Decrement the shared # of workers.
415 probeStatus = m_workerStatus.fetchSub(
416 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
417 if (probeStatus >= 4) {
418 // There are other workers remaining. Return here so that only the very last worker will proceed.
419 TURF_TRACE(Linear, 25, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
423 // We're the very last worker thread.
424 // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
425 TURF_ASSERT(probeStatus == 3);
426 bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
428 // The migration succeeded. This is the most likely outcome. Publish the new subtree.
429 m_map.publishTableMigration(this);
430 // End the jobCoodinator.
431 getSources()[0].table->jobCoordinator.end();
433 // The migration failed due to the overflow of the destination table.
434 Table* origTable = getSources()[0].table;
435 turf::LockGuard<turf::Mutex> guard(origTable->mutex);
436 SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
437 if (checkedJob != this) {
438 TURF_TRACE(Linear, 26, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
441 TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
442 // Double the destination table size.
443 migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
444 // Transfer source tables to the new migration.
445 for (ureg i = 0; i < m_numSources; i++) {
446 migration->getSources()[i].table = getSources()[i].table;
447 getSources()[i].table = NULL;
448 migration->getSources()[i].sourceIndex.storeNonatomic(0);
450 migration->getSources()[m_numSources].table = m_destination;
451 migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
452 // Calculate total number of migration units to move.
453 ureg unitsRemaining = 0;
454 for (ureg s = 0; s < migration->m_numSources; s++)
455 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
456 migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
457 // Publish the new migration.
458 origTable->jobCoordinator.storeRelease(migration);
462 // We're done with this TableMigration. Queue it for GC.
463 DefaultQSBR.enqueue(&TableMigration::destroy, this);
466 } // namespace details
467 } // namespace junction
469 #endif // JUNCTION_DETAILS_LINEAR_H