X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=junction%2Fdetails%2FGrampa.h;fp=junction%2Fdetails%2FGrampa.h;h=2fe2cc226f992a3ad73bdd93cbae5188d33917f1;hb=28fa77bb667643e2ed0a9eb43331a07fe8cb1ba8;hp=de7a7b91e70bc03f69014e2b90a1e446fa9d40d6;hpb=e0633b87f411d099eab8f801122b9ef58a003913;p=junction.git diff --git a/junction/details/Grampa.h b/junction/details/Grampa.h index de7a7b9..2fe2cc2 100644 --- a/junction/details/Grampa.h +++ b/junction/details/Grampa.h @@ -53,7 +53,7 @@ struct GrampaStats { }; #endif -TURF_TRACE_DECLARE(Grampa, 38) +TURF_TRACE_DECLARE(Grampa, 37) template struct Grampa { @@ -511,35 +511,29 @@ struct Grampa { } } - static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) { - ureg nextTableSize; - if (mustDouble) { - TURF_TRACE(Grampa, 18, "[beginTableMigration] forced to double", 0, 0); - nextTableSize = (table->sizeMask + 1) * 2; - } else { - // Estimate number of cells in use based on a small sample. - ureg sizeMask = table->sizeMask; - ureg idx = overflowIdx - CellsInUseSample; - ureg inUseCells = 0; - for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { - CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); - Cell* cell = group->cells + (idx & 3); - Value value = cell->value.load(turf::Relaxed); - if (value == Value(ValueTraits::Redirect)) { - // Another thread kicked off the jobCoordinator. The caller will participate upon return. - TURF_TRACE(Grampa, 19, "[beginTableMigration] redirected while determining table size", 0, 0); - return; - } - if (value != Value(ValueTraits::NullValue)) - inUseCells++; - idx++; + static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) { + // Estimate number of cells in use based on a small sample. + ureg sizeMask = table->sizeMask; + ureg idx = overflowIdx - CellsInUseSample; + ureg inUseCells = 0; + for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { + CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); + Cell* cell = group->cells + (idx & 3); + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(Grampa, 18, "[beginTableMigration] redirected while determining table size", 0, 0); + return; } - float inUseRatio = float(inUseCells) / CellsInUseSample; - float estimatedInUse = (sizeMask + 1) * inUseRatio; - nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)); - // FIXME: Support migrating to smaller tables. - nextTableSize = turf::util::max(nextTableSize, sizeMask + 1); - } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; + idx++; + } + float inUseRatio = float(inUseCells) / CellsInUseSample; + float estimatedInUse = (sizeMask + 1) * inUseRatio; + ureg nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)); + // FIXME: Support migrating to smaller tables. + nextTableSize = turf::util::max(nextTableSize, sizeMask + 1); // Split into multiple tables if necessary. ureg splitShift = 0; while (nextTableSize > LeafSize) { @@ -587,12 +581,12 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Grampa, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Grampa, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. @@ -601,15 +595,15 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Grampa, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert - TURF_TRACE(Grampa, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Grampa, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } @@ -650,11 +644,11 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(Grampa, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(Grampa, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); + TURF_TRACE(Grampa, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. @@ -673,7 +667,7 @@ void Grampa::TableMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(Grampa, 27, "[TableMigration::run] already ended", uptr(this), 0); + TURF_TRACE(Grampa, 26, "[TableMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); @@ -687,7 +681,7 @@ void Grampa::TableMigration::run() { // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(Grampa, 28, "[TableMigration::run] detected end flag set", uptr(this), 0); + TURF_TRACE(Grampa, 27, "[TableMigration::run] detected end flag set", uptr(this), 0); goto endMigration; } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); @@ -700,14 +694,14 @@ void Grampa::TableMigration::run() { // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. - TURF_TRACE(Grampa, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads // before we can safely deal with the overflow. Therefore, the thread that detects the failure is often // different from the thread that deals with it. // Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins. sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed); if (oldIndex >= 0) - TURF_TRACE(Grampa, 30, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), + TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), uptr(oldIndex)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; @@ -722,7 +716,7 @@ void Grampa::TableMigration::run() { } } } - TURF_TRACE(Grampa, 31, "[TableMigration::run] out of migration units", uptr(this), 0); + TURF_TRACE(Grampa, 30, "[TableMigration::run] out of migration units", uptr(this), 0); endMigration: // Decrement the shared # of workers. @@ -730,7 +724,7 @@ endMigration: m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(Grampa, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + TURF_TRACE(Grampa, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); return; } @@ -752,14 +746,14 @@ endMigration: turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(Grampa, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); } else { TableMigration* migration; Table* overflowedTable = getDestinations()[overflowTableIndex]; if (overflowedTable->sizeMask + 1 < LeafSize) { // The entire map is contained in a small table. - TURF_TRACE(Grampa, 34, "[TableMigration::run] overflow occured in a small map", uptr(origTable), + TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable), uptr(checkedJob)); TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8); TURF_ASSERT(overflowedTable->baseHash == 0); @@ -774,7 +768,7 @@ endMigration: } else { // The overflowed table is already the size of a leaf. Split it into two ranges. if (count == 1) { - TURF_TRACE(Grampa, 35, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), + TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), uptr(checkedJob)); migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2); migration->m_baseHash = m_baseHash; @@ -785,7 +779,7 @@ endMigration: } count = 2; } else { - TURF_TRACE(Grampa, 36, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), + TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), uptr(checkedJob)); migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations); migration->m_baseHash = m_baseHash; @@ -833,7 +827,7 @@ void Grampa::FlatTreeMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(Grampa, 37, "[FlatTreeMigration::run] already ended", uptr(this), 0); + TURF_TRACE(Grampa, 36, "[FlatTreeMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));