TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)),
uptr(key));
Hash hash = KeyTraits::hash(key);
+ bool mustDouble = false;
for (;;) {
if (!m_map.locateTable(m_table, m_sizeMask, hash)) {
m_map.createInitialTable(Details::MinTableSize);
return; // Found an existing value
}
case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table, overflowIdx);
+ Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble);
break;
}
}
// A migration has been started (either by us, or another thread). Participate until it's complete.
m_table->jobCoordinator.participate();
}
+ // If we still overflow after this, avoid an infinite loop by forcing the next table to double.
+ mustDouble = true;
// Try again using the latest root.
}
}
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
+ bool mustDouble = false;
for (;;) {
Value oldValue = m_value;
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
m_table->jobCoordinator.participate();
// Try again in the latest table.
// FIXME: locateTable() could return false if the map is concurrently cleared (m_root set to 0).
- // This is not concern yet since clear() is not implemented.
+ // This is not a concern yet since clear() is not implemented.
bool exists = m_map.locateTable(m_table, m_sizeMask, hash);
TURF_ASSERT(exists);
TURF_UNUSED(exists);
case Details::InsertResult_Overflow:
TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
overflowIdx);
- Details::beginTableMigration(m_map, m_table, overflowIdx);
+ Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble);
break;
}
// We were redirected... again
}
breakOuter:;
+ // If we still overflow after this, avoid an infinite loop by forcing the next table to double.
+ mustDouble = true;
// Try again in the new table.
}
}
: m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key));
Hash hash = KeyTraits::hash(key);
+ bool mustDouble = false;
for (;;) {
m_table = m_map.m_root.load(turf::Consume);
ureg overflowIdx;
return; // Found an existing value
}
case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table, overflowIdx);
+ Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble);
break;
}
}
// A migration has been started (either by us, or another thread). Participate until it's complete.
m_table->jobCoordinator.participate();
+ // If we still overflow after this, avoid an infinite loop by forcing the next table to double.
+ mustDouble = true;
// Try again using the latest root.
}
}
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
+ bool mustDouble = false;
for (;;) {
Value oldValue = m_value;
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
case Details::InsertResult_Overflow:
TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
overflowIdx);
- Details::beginTableMigration(m_map, m_table, overflowIdx);
+ Details::beginTableMigration(m_map, m_table, overflowIdx, mustDouble);
break;
}
// We were redirected... again
}
breakOuter:;
+ // If we still overflow after this, avoid an infinite loop by forcing the next table to double.
+ mustDouble = true;
// Try again in the new table.
}
}
: m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
TURF_TRACE(ConcurrentMap_Linear, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key));
Hash hash = KeyTraits::hash(key);
+ bool mustDouble = false;
for (;;) {
m_table = m_map.m_root.load(turf::Consume);
switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
return; // Found an existing value
}
case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table);
+ Details::beginTableMigration(m_map, m_table, mustDouble);
break;
}
}
// A migration has been started (either by us, or another thread). Participate until it's complete.
m_table->jobCoordinator.participate();
+ // If we still overflow after this, avoid an infinite loop by forcing the next table to double.
+ mustDouble = true;
// Try again using the latest root.
}
}
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
+ bool mustDouble = false;
for (;;) {
Value oldValue = m_value;
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
goto breakOuter;
case Details::InsertResult_Overflow:
TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0);
- Details::beginTableMigration(m_map, m_table);
+ Details::beginTableMigration(m_map, m_table, mustDouble);
break;
}
+ // If we still overflow after this, avoid an infinite loop by forcing the next table to double.
+ mustDouble = true;
// We were redirected... again
}
breakOuter:;
GrampaStats GrampaStats::Instance;
#endif
-TURF_TRACE_DEFINE_BEGIN(Grampa, 37) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE_BEGIN(Grampa, 38) // autogenerated by TidySource.py
TURF_TRACE_DEFINE("[find] called")
TURF_TRACE_DEFINE("[find] found existing cell optimistically")
TURF_TRACE_DEFINE("[find] found existing cell")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
+TURF_TRACE_DEFINE("[beginTableMigration] forced to double")
TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
TURF_TRACE_DEFINE("[migrateRange] race to insert key")
TURF_TRACE_DEFINE("[TableMigration::run] doubling subtree size after failure")
TURF_TRACE_DEFINE("[TableMigration::run] keeping same subtree size after failure")
TURF_TRACE_DEFINE("[FlatTreeMigration::run] already ended")
-TURF_TRACE_DEFINE_END(Grampa, 37)
+TURF_TRACE_DEFINE_END(Grampa, 38)
} // namespace details
} // namespace junction
};
#endif
-TURF_TRACE_DECLARE(Grampa, 37)
+TURF_TRACE_DECLARE(Grampa, 38)
template <class Map>
struct Grampa {
}
}
- static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
- // Estimate number of cells in use based on a small sample.
- ureg sizeMask = table->sizeMask;
- ureg idx = overflowIdx - CellsInUseSample;
- ureg inUseCells = 0;
- for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
- CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- Cell* cell = group->cells + (idx & 3);
- Value value = cell->value.load(turf::Relaxed);
- if (value == Value(ValueTraits::Redirect)) {
- // Another thread kicked off the jobCoordinator. The caller will participate upon return.
- TURF_TRACE(Grampa, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
- return;
+ static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) {
+ ureg nextTableSize;
+ if (mustDouble) {
+ TURF_TRACE(Grampa, 18, "[beginTableMigration] forced to double", 0, 0);
+ nextTableSize = (table->sizeMask + 1) * 2;
+ } else {
+ // Estimate number of cells in use based on a small sample.
+ ureg sizeMask = table->sizeMask;
+ ureg idx = overflowIdx - CellsInUseSample;
+ ureg inUseCells = 0;
+ for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
+ CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ Cell* cell = group->cells + (idx & 3);
+ Value value = cell->value.load(turf::Relaxed);
+ if (value == Value(ValueTraits::Redirect)) {
+ // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+ TURF_TRACE(Grampa, 19, "[beginTableMigration] redirected while determining table size", 0, 0);
+ return;
+ }
+ if (value != Value(ValueTraits::NullValue))
+ inUseCells++;
+ idx++;
}
- if (value != Value(ValueTraits::NullValue))
- inUseCells++;
- idx++;
- }
- float inUseRatio = float(inUseCells) / CellsInUseSample;
- float estimatedInUse = (sizeMask + 1) * inUseRatio;
- ureg nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2));
- // FIXME: Support migrating to smaller tables.
- nextTableSize = turf::util::max(nextTableSize, sizeMask + 1);
+ float inUseRatio = float(inUseCells) / CellsInUseSample;
+ float estimatedInUse = (sizeMask + 1) * inUseRatio;
+ nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2));
+ // FIXME: Support migrating to smaller tables.
+ nextTableSize = turf::util::max(nextTableSize, sizeMask + 1);
+ }
// Split into multiple tables if necessary.
ureg splitShift = 0;
while (nextTableSize > LeafSize) {
srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(Grampa, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(Grampa, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
// Check for deleted/uninitialized value.
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(Grampa, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
- TURF_TRACE(Grampa, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
break;
}
} else if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(Grampa, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
// by a late-arriving erase.
if (srcValue == Value(ValueTraits::NullValue))
- TURF_TRACE(Grampa, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
break;
}
// There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
- TURF_TRACE(Grampa, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
+ TURF_TRACE(Grampa, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
srcValue = doubleCheckedSrcValue;
}
// Cell successfully migrated. Proceed to next source cell.
do {
if (probeStatus & 1) {
// End flag is already set, so do nothing.
- TURF_TRACE(Grampa, 26, "[TableMigration::run] already ended", uptr(this), 0);
+ TURF_TRACE(Grampa, 27, "[TableMigration::run] already ended", uptr(this), 0);
return;
}
} while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
// Loop over all migration units in this source table.
for (;;) {
if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(Grampa, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ TURF_TRACE(Grampa, 28, "[TableMigration::run] detected end flag set", uptr(this), 0);
goto endMigration;
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
// No other thread can declare the migration successful at this point, because *this* unit will never complete,
// hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
- TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ TURF_TRACE(Grampa, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
// The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads
// before we can safely deal with the overflow. Therefore, the thread that detects the failure is often
// different from the thread that deals with it.
// Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins.
sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed);
if (oldIndex >= 0)
- TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex),
+ TURF_TRACE(Grampa, 30, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex),
uptr(oldIndex));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
}
}
- TURF_TRACE(Grampa, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
+ TURF_TRACE(Grampa, 31, "[TableMigration::run] out of migration units", uptr(this), 0);
endMigration:
// Decrement the shared # of workers.
m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
- TURF_TRACE(Grampa, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+ TURF_TRACE(Grampa, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
return;
}
turf::LockGuard<junction::striped::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ TURF_TRACE(Grampa, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
uptr(checkedJob));
} else {
TableMigration* migration;
Table* overflowedTable = getDestinations()[overflowTableIndex];
if (overflowedTable->sizeMask + 1 < LeafSize) {
// The entire map is contained in a small table.
- TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable),
+ TURF_TRACE(Grampa, 34, "[TableMigration::run] overflow occured in a small map", uptr(origTable),
uptr(checkedJob));
TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8);
TURF_ASSERT(overflowedTable->baseHash == 0);
} else {
// The overflowed table is already the size of a leaf. Split it into two ranges.
if (count == 1) {
- TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable),
+ TURF_TRACE(Grampa, 35, "[TableMigration::run] doubling subtree size after failure", uptr(origTable),
uptr(checkedJob));
migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2);
migration->m_baseHash = m_baseHash;
}
count = 2;
} else {
- TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable),
+ TURF_TRACE(Grampa, 36, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable),
uptr(checkedJob));
migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations);
migration->m_baseHash = m_baseHash;
do {
if (probeStatus & 1) {
// End flag is already set, so do nothing.
- TURF_TRACE(Grampa, 36, "[FlatTreeMigration::run] already ended", uptr(this), 0);
+ TURF_TRACE(Grampa, 37, "[FlatTreeMigration::run] already ended", uptr(this), 0);
return;
}
} while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
namespace junction {
namespace details {
-TURF_TRACE_DEFINE_BEGIN(LeapFrog, 33) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE_BEGIN(LeapFrog, 34) // autogenerated by TidySource.py
TURF_TRACE_DEFINE("[find] called")
TURF_TRACE_DEFINE("[find] found existing cell optimistically")
TURF_TRACE_DEFINE("[find] found existing cell")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
+TURF_TRACE_DEFINE("[beginTableMigration] forced to double")
TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
TURF_TRACE_DEFINE("[migrateRange] race to insert key")
TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
-TURF_TRACE_DEFINE_END(LeapFrog, 33)
+TURF_TRACE_DEFINE_END(LeapFrog, 34)
} // namespace details
} // namespace junction
namespace junction {
namespace details {
-TURF_TRACE_DECLARE(LeapFrog, 33)
+TURF_TRACE_DECLARE(LeapFrog, 34)
template <class Map>
struct LeapFrog {
}
}
- static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
- // Estimate number of cells in use based on a small sample.
- ureg sizeMask = table->sizeMask;
- ureg idx = overflowIdx - CellsInUseSample;
- ureg inUseCells = 0;
- for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
- CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- Cell* cell = group->cells + (idx & 3);
- Value value = cell->value.load(turf::Relaxed);
- if (value == Value(ValueTraits::Redirect)) {
- // Another thread kicked off the jobCoordinator. The caller will participate upon return.
- TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
- return;
+ static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) {
+ ureg nextTableSize;
+ if (mustDouble) {
+ TURF_TRACE(LeapFrog, 18, "[beginTableMigration] forced to double", 0, 0);
+ nextTableSize = (table->sizeMask + 1) * 2;
+ } else {
+ // Estimate number of cells in use based on a small sample.
+ ureg sizeMask = table->sizeMask;
+ ureg idx = overflowIdx - CellsInUseSample;
+ ureg inUseCells = 0;
+ for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
+ CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ Cell* cell = group->cells + (idx & 3);
+ Value value = cell->value.load(turf::Relaxed);
+ if (value == Value(ValueTraits::Redirect)) {
+ // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+ TURF_TRACE(LeapFrog, 19, "[beginTableMigration] redirected while determining table size", 0, 0);
+ return;
+ }
+ if (value != Value(ValueTraits::NullValue))
+ inUseCells++;
+ idx++;
}
- if (value != Value(ValueTraits::NullValue))
- inUseCells++;
- idx++;
- }
- float inUseRatio = float(inUseCells) / CellsInUseSample;
- float estimatedInUse = (sizeMask + 1) * inUseRatio;
+ float inUseRatio = float(inUseCells) / CellsInUseSample;
+ float estimatedInUse = (sizeMask + 1) * inUseRatio;
#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
- // Periodically underestimate the number of cells in use.
- // This exercises the code that handles overflow during migration.
- static ureg counter = 1;
- if ((++counter & 3) == 0) {
- estimatedInUse /= 4;
- }
+ // Periodically underestimate the number of cells in use.
+ // This exercises the code that handles overflow during migration.
+ static ureg counter = 1;
+ if ((++counter & 3) == 0) {
+ estimatedInUse /= 4;
+ }
#endif
- ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ }
beginTableMigrationToSize(map, table, nextTableSize);
}
}; // LeapFrog
srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
// Check for deleted/uninitialized value.
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
- TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
break;
}
} else if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
// by a late-arriving erase.
if (srcValue == Value(ValueTraits::NullValue))
- TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
break;
}
// There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
- TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
srcValue = doubleCheckedSrcValue;
}
// Cell successfully migrated. Proceed to next source cell.
do {
if (probeStatus & 1) {
// End flag is already set, so do nothing.
- TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
+ TURF_TRACE(LeapFrog, 27, "[TableMigration::run] already ended", uptr(this), 0);
return;
}
} while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
// Loop over all migration units in this source table.
for (;;) {
if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ TURF_TRACE(LeapFrog, 28, "[TableMigration::run] detected end flag set", uptr(this), 0);
goto endMigration;
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
// No other thread can declare the migration successful at this point, because *this* unit will never complete,
// hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
- TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ TURF_TRACE(LeapFrog, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
// The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
// we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
// the thread
// that deals with it.
bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
if (oldOverflowed)
- TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ TURF_TRACE(LeapFrog, 30, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
uptr(oldOverflowed));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
}
}
- TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
+ TURF_TRACE(LeapFrog, 31, "[TableMigration::run] out of migration units", uptr(this), 0);
endMigration:
// Decrement the shared # of workers.
2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
- TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+ TURF_TRACE(LeapFrog, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
return;
}
turf::LockGuard<turf::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ TURF_TRACE(LeapFrog, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
uptr(checkedJob));
} else {
TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
namespace junction {
namespace details {
-TURF_TRACE_DEFINE_BEGIN(Linear, 26) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE_BEGIN(Linear, 27) // autogenerated by TidySource.py
TURF_TRACE_DEFINE("[find] called")
TURF_TRACE_DEFINE("[find] found existing cell")
TURF_TRACE_DEFINE("[insert] called")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
+TURF_TRACE_DEFINE("[beginTableMigration] forced to double")
TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
TURF_TRACE_DEFINE("[migrateRange] race to insert key")
TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
-TURF_TRACE_DEFINE_END(Linear, 26)
+TURF_TRACE_DEFINE_END(Linear, 27)
} // namespace details
} // namespace junction
namespace junction {
namespace details {
-TURF_TRACE_DECLARE(Linear, 26)
+TURF_TRACE_DECLARE(Linear, 27)
template <class Map>
struct Linear {
}
}
- static void beginTableMigration(Map& map, Table* table) {
- // Estimate number of cells in use based on a small sample.
- ureg idx = 0;
- ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
- ureg inUseCells = 0;
- for (; idx < sampleSize; idx++) {
- Cell* cell = table->getCells() + idx;
- Value value = cell->value.load(turf::Relaxed);
- if (value == Value(ValueTraits::Redirect)) {
- // Another thread kicked off the jobCoordinator. The caller will participate upon return.
- TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0);
- return;
+ static void beginTableMigration(Map& map, Table* table, bool mustDouble) {
+ ureg nextTableSize;
+ if (mustDouble) {
+ TURF_TRACE(Linear, 11, "[beginTableMigration] forced to double", 0, 0);
+ nextTableSize = (table->sizeMask + 1) * 2;
+ } else {
+ // Estimate number of cells in use based on a small sample.
+ ureg idx = 0;
+ ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
+ ureg inUseCells = 0;
+ for (; idx < sampleSize; idx++) {
+ Cell* cell = table->getCells() + idx;
+ Value value = cell->value.load(turf::Relaxed);
+ if (value == Value(ValueTraits::Redirect)) {
+ // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+ TURF_TRACE(Linear, 12, "[beginTableMigration] redirected while determining table size", 0, 0);
+ return;
+ }
+ if (value != Value(ValueTraits::NullValue))
+ inUseCells++;
}
- if (value != Value(ValueTraits::NullValue))
- inUseCells++;
- }
- float inUseRatio = float(inUseCells) / sampleSize;
- float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
+ float inUseRatio = float(inUseCells) / sampleSize;
+ float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
#if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
- // Periodically underestimate the number of cells in use.
- // This exercises the code that handles overflow during migration.
- static ureg counter = 1;
- if ((++counter & 3) == 0) {
- estimatedInUse /= 4;
- }
+ // Periodically underestimate the number of cells in use.
+ // This exercises the code that handles overflow during migration.
+ static ureg counter = 1;
+ if ((++counter & 3) == 0) {
+ estimatedInUse /= 4;
+ }
#endif
- ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ }
beginTableMigrationToSize(map, table, nextTableSize);
}
}; // Linear
srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
// Check for deleted/uninitialized value.
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
- TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 16, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
break;
}
} else if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 17, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
// by a late-arriving erase.
if (srcValue == Value(ValueTraits::NullValue))
- TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 18, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
break;
}
// There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
- TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
+ TURF_TRACE(Linear, 19, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
srcValue = doubleCheckedSrcValue;
}
// Cell successfully migrated. Proceed to next source cell.
do {
if (probeStatus & 1) {
// End flag is already set, so do nothing.
- TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0);
+ TURF_TRACE(Linear, 20, "[TableMigration::run] already ended", uptr(this), 0);
return;
}
} while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
// Loop over all migration units in this source table.
for (;;) {
if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ TURF_TRACE(Linear, 21, "[TableMigration::run] detected end flag set", uptr(this), 0);
goto endMigration;
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
// No other thread can declare the migration successful at this point, because *this* unit will never complete,
// hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
- TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ TURF_TRACE(Linear, 22, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
// The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
// we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
// the thread
// that deals with it.
bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
if (oldOverflowed)
- TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ TURF_TRACE(Linear, 23, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
uptr(oldOverflowed));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
}
}
- TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0);
+ TURF_TRACE(Linear, 24, "[TableMigration::run] out of migration units", uptr(this), 0);
endMigration:
// Decrement the shared # of workers.
2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
- TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+ TURF_TRACE(Linear, 25, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
return;
}
turf::LockGuard<turf::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ TURF_TRACE(Linear, 26, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
uptr(checkedJob));
} else {
TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);