X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=junction%2Fdetails%2FLeapFrog.h;h=b93208a6965da10765ae674b0c06e1bb42078640;hb=e0633b87f411d099eab8f801122b9ef58a003913;hp=75d91fbeb34e5fd7c2e8a4ed4773d5960320e127;hpb=c7fafdd7d1e574307901bb196685acf787ca23e8;p=junction.git diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h index 75d91fb..b93208a 100644 --- a/junction/details/LeapFrog.h +++ b/junction/details/LeapFrog.h @@ -30,7 +30,7 @@ namespace junction { namespace details { -TURF_TRACE_DECLARE(LeapFrog, 33) +TURF_TRACE_DECLARE(LeapFrog, 34) template struct LeapFrog { @@ -332,35 +332,41 @@ struct LeapFrog { } } - static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) { - // Estimate number of cells in use based on a small sample. - ureg sizeMask = table->sizeMask; - ureg idx = overflowIdx - CellsInUseSample; - ureg inUseCells = 0; - for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { - CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); - Cell* cell = group->cells + (idx & 3); - Value value = cell->value.load(turf::Relaxed); - if (value == Value(ValueTraits::Redirect)) { - // Another thread kicked off the jobCoordinator. The caller will participate upon return. - TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0); - return; + static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) { + ureg nextTableSize; + if (mustDouble) { + TURF_TRACE(LeapFrog, 18, "[beginTableMigration] forced to double", 0, 0); + nextTableSize = (table->sizeMask + 1) * 2; + } else { + // Estimate number of cells in use based on a small sample. + ureg sizeMask = table->sizeMask; + ureg idx = overflowIdx - CellsInUseSample; + ureg inUseCells = 0; + for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { + CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); + Cell* cell = group->cells + (idx & 3); + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(LeapFrog, 19, "[beginTableMigration] redirected while determining table size", 0, 0); + return; + } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; + idx++; } - if (value != Value(ValueTraits::NullValue)) - inUseCells++; - idx++; - } - float inUseRatio = float(inUseCells) / CellsInUseSample; - float estimatedInUse = (sizeMask + 1) * inUseRatio; + float inUseRatio = float(inUseCells) / CellsInUseSample; + float estimatedInUse = (sizeMask + 1) * inUseRatio; #if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS - // Periodically underestimate the number of cells in use. - // This exercises the code that handles overflow during migration. - static ureg counter = 1; - if ((++counter & 3) == 0) { - estimatedInUse /= 4; - } + // Periodically underestimate the number of cells in use. + // This exercises the code that handles overflow during migration. + static ureg counter = 1; + if ((++counter & 3) == 0) { + estimatedInUse /= 4; + } #endif - ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + } beginTableMigrationToSize(map, table, nextTableSize); } }; // LeapFrog @@ -384,12 +390,12 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. @@ -398,15 +404,15 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert - TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } @@ -445,11 +451,11 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); + TURF_TRACE(LeapFrog, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. @@ -468,7 +474,7 @@ void LeapFrog::TableMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0); + TURF_TRACE(LeapFrog, 27, "[TableMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); @@ -481,7 +487,7 @@ void LeapFrog::TableMigration::run() { // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0); + TURF_TRACE(LeapFrog, 28, "[TableMigration::run] detected end flag set", uptr(this), 0); goto endMigration; } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); @@ -494,14 +500,14 @@ void LeapFrog::TableMigration::run() { // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. - TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + TURF_TRACE(LeapFrog, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); if (oldOverflowed) - TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + TURF_TRACE(LeapFrog, 30, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), uptr(oldOverflowed)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; @@ -516,7 +522,7 @@ void LeapFrog::TableMigration::run() { } } } - TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0); + TURF_TRACE(LeapFrog, 31, "[TableMigration::run] out of migration units", uptr(this), 0); endMigration: // Decrement the shared # of workers. @@ -524,7 +530,7 @@ endMigration: 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + TURF_TRACE(LeapFrog, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); return; } @@ -543,7 +549,7 @@ endMigration: turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + TURF_TRACE(LeapFrog, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);