From: Jeff Preshing Date: Thu, 11 Feb 2016 19:27:32 +0000 (-0500) Subject: Fix possible infinite loop X-Git-Url: http://demsky.eecs.uci.edu/git/?p=junction.git;a=commitdiff_plain;h=e0633b87f411d099eab8f801122b9ef58a003913 Fix possible infinite loop If the keys are badly distributed, beginTableMigration's estimatedInUse may be too low. In extreme cases, exchangeValue could loop infinitely migrating to the same size. Encountered once while testing ConcurrentMap_Linear after previous commit. Fix is to force a doubling on second loop iteration. --- diff --git a/junction/ConcurrentMap_Grampa.h b/junction/ConcurrentMap_Grampa.h index 8d67f22..cc2de0f 100644 --- a/junction/ConcurrentMap_Grampa.h +++ b/junction/ConcurrentMap_Grampa.h @@ -295,6 +295,7 @@ public: TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key)); Hash hash = KeyTraits::hash(key); + bool mustDouble = false; for (;;) { if (!m_map.locateTable(m_table, m_sizeMask, hash)) { m_map.createInitialTable(Details::MinTableSize); @@ -316,13 +317,15 @@ public: return; // Found an existing value } case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table, overflowIdx); + Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble); break; } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); } + // If we still overflow after this, avoid an infinite loop by forcing the next table to double. + mustDouble = true; // Try again using the latest root. } } @@ -337,6 +340,7 @@ public: TURF_ASSERT(desired != Value(ValueTraits::NullValue)); TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); + bool mustDouble = false; for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { @@ -367,7 +371,7 @@ public: m_table->jobCoordinator.participate(); // Try again in the latest table. // FIXME: locateTable() could return false if the map is concurrently cleared (m_root set to 0). - // This is not concern yet since clear() is not implemented. + // This is not a concern yet since clear() is not implemented. bool exists = m_map.locateTable(m_table, m_sizeMask, hash); TURF_ASSERT(exists); TURF_UNUSED(exists); @@ -387,12 +391,14 @@ public: case Details::InsertResult_Overflow: TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx); - Details::beginTableMigration(m_map, m_table, overflowIdx); + Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble); break; } // We were redirected... again } breakOuter:; + // If we still overflow after this, avoid an infinite loop by forcing the next table to double. + mustDouble = true; // Try again in the new table. } } diff --git a/junction/ConcurrentMap_LeapFrog.h b/junction/ConcurrentMap_LeapFrog.h index 389ce40..8b6c0f3 100644 --- a/junction/ConcurrentMap_LeapFrog.h +++ b/junction/ConcurrentMap_LeapFrog.h @@ -96,6 +96,7 @@ public: : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) { TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key)); Hash hash = KeyTraits::hash(key); + bool mustDouble = false; for (;;) { m_table = m_map.m_root.load(turf::Consume); ureg overflowIdx; @@ -115,12 +116,14 @@ public: return; // Found an existing value } case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table, overflowIdx); + Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble); break; } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); + // If we still overflow after this, avoid an infinite loop by forcing the next table to double. + mustDouble = true; // Try again using the latest root. } } @@ -135,6 +138,7 @@ public: TURF_ASSERT(desired != Value(ValueTraits::NullValue)); TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); + bool mustDouble = false; for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { @@ -181,12 +185,14 @@ public: case Details::InsertResult_Overflow: TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx); - Details::beginTableMigration(m_map, m_table, overflowIdx); + Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble); break; } // We were redirected... again } breakOuter:; + // If we still overflow after this, avoid an infinite loop by forcing the next table to double. + mustDouble = true; // Try again in the new table. } } diff --git a/junction/ConcurrentMap_Linear.h b/junction/ConcurrentMap_Linear.h index bf6c1ea..3450807 100644 --- a/junction/ConcurrentMap_Linear.h +++ b/junction/ConcurrentMap_Linear.h @@ -96,6 +96,7 @@ public: : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) { TURF_TRACE(ConcurrentMap_Linear, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key)); Hash hash = KeyTraits::hash(key); + bool mustDouble = false; for (;;) { m_table = m_map.m_root.load(turf::Consume); switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell @@ -114,12 +115,14 @@ public: return; // Found an existing value } case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table); + Details::beginTableMigration(m_map, m_table, mustDouble); break; } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); + // If we still overflow after this, avoid an infinite loop by forcing the next table to double. + mustDouble = true; // Try again using the latest root. } } @@ -134,6 +137,7 @@ public: TURF_ASSERT(desired != Value(ValueTraits::NullValue)); TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); + bool mustDouble = false; for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { @@ -177,9 +181,11 @@ public: goto breakOuter; case Details::InsertResult_Overflow: TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0); - Details::beginTableMigration(m_map, m_table); + Details::beginTableMigration(m_map, m_table, mustDouble); break; } + // If we still overflow after this, avoid an infinite loop by forcing the next table to double. + mustDouble = true; // We were redirected... again } breakOuter:; diff --git a/junction/details/Grampa.cpp b/junction/details/Grampa.cpp index 3b33fa5..1950a9d 100644 --- a/junction/details/Grampa.cpp +++ b/junction/details/Grampa.cpp @@ -21,7 +21,7 @@ namespace details { GrampaStats GrampaStats::Instance; #endif -TURF_TRACE_DEFINE_BEGIN(Grampa, 37) // autogenerated by TidySource.py +TURF_TRACE_DEFINE_BEGIN(Grampa, 38) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[find] called") TURF_TRACE_DEFINE("[find] found existing cell optimistically") TURF_TRACE_DEFINE("[find] found existing cell") @@ -40,6 +40,7 @@ TURF_TRACE_DEFINE("[insert] overflow") TURF_TRACE_DEFINE("[beginTableMigrationToSize] called") TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists") TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)") +TURF_TRACE_DEFINE("[beginTableMigration] forced to double") TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size") TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected") TURF_TRACE_DEFINE("[migrateRange] race to insert key") @@ -59,7 +60,7 @@ TURF_TRACE_DEFINE("[TableMigration::run] overflow occured in a small map") TURF_TRACE_DEFINE("[TableMigration::run] doubling subtree size after failure") TURF_TRACE_DEFINE("[TableMigration::run] keeping same subtree size after failure") TURF_TRACE_DEFINE("[FlatTreeMigration::run] already ended") -TURF_TRACE_DEFINE_END(Grampa, 37) +TURF_TRACE_DEFINE_END(Grampa, 38) } // namespace details } // namespace junction diff --git a/junction/details/Grampa.h b/junction/details/Grampa.h index 2fe2cc2..de7a7b9 100644 --- a/junction/details/Grampa.h +++ b/junction/details/Grampa.h @@ -53,7 +53,7 @@ struct GrampaStats { }; #endif -TURF_TRACE_DECLARE(Grampa, 37) +TURF_TRACE_DECLARE(Grampa, 38) template struct Grampa { @@ -511,29 +511,35 @@ struct Grampa { } } - static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) { - // Estimate number of cells in use based on a small sample. - ureg sizeMask = table->sizeMask; - ureg idx = overflowIdx - CellsInUseSample; - ureg inUseCells = 0; - for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { - CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); - Cell* cell = group->cells + (idx & 3); - Value value = cell->value.load(turf::Relaxed); - if (value == Value(ValueTraits::Redirect)) { - // Another thread kicked off the jobCoordinator. The caller will participate upon return. - TURF_TRACE(Grampa, 18, "[beginTableMigration] redirected while determining table size", 0, 0); - return; + static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) { + ureg nextTableSize; + if (mustDouble) { + TURF_TRACE(Grampa, 18, "[beginTableMigration] forced to double", 0, 0); + nextTableSize = (table->sizeMask + 1) * 2; + } else { + // Estimate number of cells in use based on a small sample. + ureg sizeMask = table->sizeMask; + ureg idx = overflowIdx - CellsInUseSample; + ureg inUseCells = 0; + for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { + CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); + Cell* cell = group->cells + (idx & 3); + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(Grampa, 19, "[beginTableMigration] redirected while determining table size", 0, 0); + return; + } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; + idx++; } - if (value != Value(ValueTraits::NullValue)) - inUseCells++; - idx++; - } - float inUseRatio = float(inUseCells) / CellsInUseSample; - float estimatedInUse = (sizeMask + 1) * inUseRatio; - ureg nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)); - // FIXME: Support migrating to smaller tables. - nextTableSize = turf::util::max(nextTableSize, sizeMask + 1); + float inUseRatio = float(inUseCells) / CellsInUseSample; + float estimatedInUse = (sizeMask + 1) * inUseRatio; + nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)); + // FIXME: Support migrating to smaller tables. + nextTableSize = turf::util::max(nextTableSize, sizeMask + 1); + } // Split into multiple tables if necessary. ureg splitShift = 0; while (nextTableSize > LeafSize) { @@ -581,12 +587,12 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Grampa, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Grampa, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. @@ -595,15 +601,15 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Grampa, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert - TURF_TRACE(Grampa, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Grampa, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } @@ -644,11 +650,11 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(Grampa, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(Grampa, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. @@ -667,7 +673,7 @@ void Grampa::TableMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(Grampa, 26, "[TableMigration::run] already ended", uptr(this), 0); + TURF_TRACE(Grampa, 27, "[TableMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); @@ -681,7 +687,7 @@ void Grampa::TableMigration::run() { // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(Grampa, 27, "[TableMigration::run] detected end flag set", uptr(this), 0); + TURF_TRACE(Grampa, 28, "[TableMigration::run] detected end flag set", uptr(this), 0); goto endMigration; } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); @@ -694,14 +700,14 @@ void Grampa::TableMigration::run() { // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. - TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + TURF_TRACE(Grampa, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads // before we can safely deal with the overflow. Therefore, the thread that detects the failure is often // different from the thread that deals with it. // Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins. sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed); if (oldIndex >= 0) - TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), + TURF_TRACE(Grampa, 30, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), uptr(oldIndex)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; @@ -716,7 +722,7 @@ void Grampa::TableMigration::run() { } } } - TURF_TRACE(Grampa, 30, "[TableMigration::run] out of migration units", uptr(this), 0); + TURF_TRACE(Grampa, 31, "[TableMigration::run] out of migration units", uptr(this), 0); endMigration: // Decrement the shared # of workers. @@ -724,7 +730,7 @@ endMigration: m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(Grampa, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + TURF_TRACE(Grampa, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); return; } @@ -746,14 +752,14 @@ endMigration: turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + TURF_TRACE(Grampa, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); } else { TableMigration* migration; Table* overflowedTable = getDestinations()[overflowTableIndex]; if (overflowedTable->sizeMask + 1 < LeafSize) { // The entire map is contained in a small table. - TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable), + TURF_TRACE(Grampa, 34, "[TableMigration::run] overflow occured in a small map", uptr(origTable), uptr(checkedJob)); TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8); TURF_ASSERT(overflowedTable->baseHash == 0); @@ -768,7 +774,7 @@ endMigration: } else { // The overflowed table is already the size of a leaf. Split it into two ranges. if (count == 1) { - TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), + TURF_TRACE(Grampa, 35, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), uptr(checkedJob)); migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2); migration->m_baseHash = m_baseHash; @@ -779,7 +785,7 @@ endMigration: } count = 2; } else { - TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), + TURF_TRACE(Grampa, 36, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), uptr(checkedJob)); migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations); migration->m_baseHash = m_baseHash; @@ -827,7 +833,7 @@ void Grampa::FlatTreeMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(Grampa, 36, "[FlatTreeMigration::run] already ended", uptr(this), 0); + TURF_TRACE(Grampa, 37, "[FlatTreeMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); diff --git a/junction/details/LeapFrog.cpp b/junction/details/LeapFrog.cpp index 3a7011e..be5d453 100644 --- a/junction/details/LeapFrog.cpp +++ b/junction/details/LeapFrog.cpp @@ -17,7 +17,7 @@ namespace junction { namespace details { -TURF_TRACE_DEFINE_BEGIN(LeapFrog, 33) // autogenerated by TidySource.py +TURF_TRACE_DEFINE_BEGIN(LeapFrog, 34) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[find] called") TURF_TRACE_DEFINE("[find] found existing cell optimistically") TURF_TRACE_DEFINE("[find] found existing cell") @@ -36,6 +36,7 @@ TURF_TRACE_DEFINE("[insert] overflow") TURF_TRACE_DEFINE("[beginTableMigrationToSize] called") TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists") TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)") +TURF_TRACE_DEFINE("[beginTableMigration] forced to double") TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size") TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected") TURF_TRACE_DEFINE("[migrateRange] race to insert key") @@ -51,7 +52,7 @@ TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed") TURF_TRACE_DEFINE("[TableMigration::run] out of migration units") TURF_TRACE_DEFINE("[TableMigration::run] not the last worker") TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started") -TURF_TRACE_DEFINE_END(LeapFrog, 33) +TURF_TRACE_DEFINE_END(LeapFrog, 34) } // namespace details } // namespace junction diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h index 75d91fb..b93208a 100644 --- a/junction/details/LeapFrog.h +++ b/junction/details/LeapFrog.h @@ -30,7 +30,7 @@ namespace junction { namespace details { -TURF_TRACE_DECLARE(LeapFrog, 33) +TURF_TRACE_DECLARE(LeapFrog, 34) template struct LeapFrog { @@ -332,35 +332,41 @@ struct LeapFrog { } } - static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) { - // Estimate number of cells in use based on a small sample. - ureg sizeMask = table->sizeMask; - ureg idx = overflowIdx - CellsInUseSample; - ureg inUseCells = 0; - for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { - CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); - Cell* cell = group->cells + (idx & 3); - Value value = cell->value.load(turf::Relaxed); - if (value == Value(ValueTraits::Redirect)) { - // Another thread kicked off the jobCoordinator. The caller will participate upon return. - TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0); - return; + static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) { + ureg nextTableSize; + if (mustDouble) { + TURF_TRACE(LeapFrog, 18, "[beginTableMigration] forced to double", 0, 0); + nextTableSize = (table->sizeMask + 1) * 2; + } else { + // Estimate number of cells in use based on a small sample. + ureg sizeMask = table->sizeMask; + ureg idx = overflowIdx - CellsInUseSample; + ureg inUseCells = 0; + for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { + CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); + Cell* cell = group->cells + (idx & 3); + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(LeapFrog, 19, "[beginTableMigration] redirected while determining table size", 0, 0); + return; + } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; + idx++; } - if (value != Value(ValueTraits::NullValue)) - inUseCells++; - idx++; - } - float inUseRatio = float(inUseCells) / CellsInUseSample; - float estimatedInUse = (sizeMask + 1) * inUseRatio; + float inUseRatio = float(inUseCells) / CellsInUseSample; + float estimatedInUse = (sizeMask + 1) * inUseRatio; #if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS - // Periodically underestimate the number of cells in use. - // This exercises the code that handles overflow during migration. - static ureg counter = 1; - if ((++counter & 3) == 0) { - estimatedInUse /= 4; - } + // Periodically underestimate the number of cells in use. + // This exercises the code that handles overflow during migration. + static ureg counter = 1; + if ((++counter & 3) == 0) { + estimatedInUse /= 4; + } #endif - ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + } beginTableMigrationToSize(map, table, nextTableSize); } }; // LeapFrog @@ -384,12 +390,12 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. @@ -398,15 +404,15 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert - TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } @@ -445,11 +451,11 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. @@ -468,7 +474,7 @@ void LeapFrog::TableMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0); + TURF_TRACE(LeapFrog, 27, "[TableMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); @@ -481,7 +487,7 @@ void LeapFrog::TableMigration::run() { // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0); + TURF_TRACE(LeapFrog, 28, "[TableMigration::run] detected end flag set", uptr(this), 0); goto endMigration; } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); @@ -494,14 +500,14 @@ void LeapFrog::TableMigration::run() { // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. - TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + TURF_TRACE(LeapFrog, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); if (oldOverflowed) - TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + TURF_TRACE(LeapFrog, 30, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), uptr(oldOverflowed)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; @@ -516,7 +522,7 @@ void LeapFrog::TableMigration::run() { } } } - TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0); + TURF_TRACE(LeapFrog, 31, "[TableMigration::run] out of migration units", uptr(this), 0); endMigration: // Decrement the shared # of workers. @@ -524,7 +530,7 @@ endMigration: 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + TURF_TRACE(LeapFrog, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); return; } @@ -543,7 +549,7 @@ endMigration: turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + TURF_TRACE(LeapFrog, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); diff --git a/junction/details/Linear.cpp b/junction/details/Linear.cpp index 299f654..739fc44 100644 --- a/junction/details/Linear.cpp +++ b/junction/details/Linear.cpp @@ -17,7 +17,7 @@ namespace junction { namespace details { -TURF_TRACE_DEFINE_BEGIN(Linear, 26) // autogenerated by TidySource.py +TURF_TRACE_DEFINE_BEGIN(Linear, 27) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[find] called") TURF_TRACE_DEFINE("[find] found existing cell") TURF_TRACE_DEFINE("[insert] called") @@ -29,6 +29,7 @@ TURF_TRACE_DEFINE("[insert] race reserved same hash") TURF_TRACE_DEFINE("[beginTableMigrationToSize] called") TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists") TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)") +TURF_TRACE_DEFINE("[beginTableMigration] forced to double") TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size") TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected") TURF_TRACE_DEFINE("[migrateRange] race to insert key") @@ -44,7 +45,7 @@ TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed") TURF_TRACE_DEFINE("[TableMigration::run] out of migration units") TURF_TRACE_DEFINE("[TableMigration::run] not the last worker") TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started") -TURF_TRACE_DEFINE_END(Linear, 26) +TURF_TRACE_DEFINE_END(Linear, 27) } // namespace details } // namespace junction diff --git a/junction/details/Linear.h b/junction/details/Linear.h index 40fc450..590d390 100644 --- a/junction/details/Linear.h +++ b/junction/details/Linear.h @@ -30,7 +30,7 @@ namespace junction { namespace details { -TURF_TRACE_DECLARE(Linear, 26) +TURF_TRACE_DECLARE(Linear, 27) template struct Linear { @@ -221,33 +221,39 @@ struct Linear { } } - static void beginTableMigration(Map& map, Table* table) { - // Estimate number of cells in use based on a small sample. - ureg idx = 0; - ureg sampleSize = turf::util::min(table->sizeMask + 1, CellsInUseSample); - ureg inUseCells = 0; - for (; idx < sampleSize; idx++) { - Cell* cell = table->getCells() + idx; - Value value = cell->value.load(turf::Relaxed); - if (value == Value(ValueTraits::Redirect)) { - // Another thread kicked off the jobCoordinator. The caller will participate upon return. - TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0); - return; + static void beginTableMigration(Map& map, Table* table, bool mustDouble) { + ureg nextTableSize; + if (mustDouble) { + TURF_TRACE(Linear, 11, "[beginTableMigration] forced to double", 0, 0); + nextTableSize = (table->sizeMask + 1) * 2; + } else { + // Estimate number of cells in use based on a small sample. + ureg idx = 0; + ureg sampleSize = turf::util::min(table->sizeMask + 1, CellsInUseSample); + ureg inUseCells = 0; + for (; idx < sampleSize; idx++) { + Cell* cell = table->getCells() + idx; + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(Linear, 12, "[beginTableMigration] redirected while determining table size", 0, 0); + return; + } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; } - if (value != Value(ValueTraits::NullValue)) - inUseCells++; - } - float inUseRatio = float(inUseCells) / sampleSize; - float estimatedInUse = (table->sizeMask + 1) * inUseRatio; + float inUseRatio = float(inUseCells) / sampleSize; + float estimatedInUse = (table->sizeMask + 1) * inUseRatio; #if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS - // Periodically underestimate the number of cells in use. - // This exercises the code that handles overflow during migration. - static ureg counter = 1; - if ((++counter & 3) == 0) { - estimatedInUse /= 4; - } + // Periodically underestimate the number of cells in use. + // This exercises the code that handles overflow during migration. + static ureg counter = 1; + if ((++counter & 3) == 0) { + estimatedInUse /= 4; + } #endif - ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + } beginTableMigrationToSize(map, table, nextTableSize); } }; // Linear @@ -271,12 +277,12 @@ bool Linear::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. @@ -285,15 +291,15 @@ bool Linear::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert - TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 16, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 17, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } @@ -331,11 +337,11 @@ bool Linear::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 18, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); + TURF_TRACE(Linear, 19, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. @@ -354,7 +360,7 @@ void Linear::TableMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0); + TURF_TRACE(Linear, 20, "[TableMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); @@ -367,7 +373,7 @@ void Linear::TableMigration::run() { // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0); + TURF_TRACE(Linear, 21, "[TableMigration::run] detected end flag set", uptr(this), 0); goto endMigration; } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); @@ -380,14 +386,14 @@ void Linear::TableMigration::run() { // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. - TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + TURF_TRACE(Linear, 22, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); if (oldOverflowed) - TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + TURF_TRACE(Linear, 23, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), uptr(oldOverflowed)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; @@ -402,7 +408,7 @@ void Linear::TableMigration::run() { } } } - TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0); + TURF_TRACE(Linear, 24, "[TableMigration::run] out of migration units", uptr(this), 0); endMigration: // Decrement the shared # of workers. @@ -410,7 +416,7 @@ endMigration: 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + TURF_TRACE(Linear, 25, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); return; } @@ -429,7 +435,7 @@ endMigration: turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + TURF_TRACE(Linear, 26, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);