namespace junction {
-TURF_TRACE_DECLARE(ConcurrentMap_Linear, 18)
+TURF_TRACE_DECLARE(ConcurrentMap_Linear, 17)
template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
class ConcurrentMap_Linear {
turf::Atomic<typename Details::Table*> m_root;
public:
- ConcurrentMap_Linear(ureg capacity = Details::InitialSize) {
- ureg limitNumValues = capacity * 3 / 4;
- m_root.storeNonatomic(Details::Table::create(capacity, limitNumValues));
+ ConcurrentMap_Linear(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) {
}
~ConcurrentMap_Linear() {
// There are no racing calls to this function.
typename Details::Table* oldRoot = m_root.loadNonatomic();
m_root.store(migration->m_destination, turf::Release);
- TURF_ASSERT(oldRoot == migration->m_source);
+ TURF_ASSERT(oldRoot == migration->getSources()[0].table);
// Caller will GC the TableMigration and the source table.
}
if (m_value != Value(ValueTraits::Redirect))
return; // Found an existing value
// We've encountered a Redirect value. Help finish the migration.
- TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0));
+ TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), 0);
m_table->jobCoordinator.participate();
// Try again using the latest root.
}
TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
for (;;) {
Value oldValue = m_value;
- s32 prevRemainingValues = s32(-1);
- if (oldValue == Value(ValueTraits::NullValue)) {
- // It's a deleted or newly initialized cell.
- // Decrement remainingValues to ensure we have permission to (re)insert a value.
- prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed);
- if (prevRemainingValues <= 0) {
- TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table),
- prevRemainingValues);
- // Can't (re)insert any more values.
- // There are two ways this can happen. One with a TableMigration already in progress, and one without:
- // 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert.
- // 2. Two threads race to insert the same key, and it's the last free cell in the table.
- // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration
- // failure,
- // as LeapFrog and Grampa do...)
- m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
- // Since we don't know whether there's already a TableMigration in progress, always attempt to start one
- // here:
- Details::beginTableMigration(m_map, m_table);
- goto helpMigrate;
- }
- }
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
// Exchange was successful. Return previous value.
- TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
+ TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
Value result = m_value;
m_value = desired; // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
if (m_value != Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+ TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
uptr(m_value));
if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value",
+ TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] racing write inserted new value",
uptr(m_table), uptr(m_value));
- m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
}
// There was a racing write (or erase) to this cell.
// Pretend we exchanged with ourselves, and just let the racing write win.
return desired;
}
// We've encountered a Redirect value. Help finish the migration.
- TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
- helpMigrate:
- // Help migrate to new table.
+ TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
Hash hash = m_cell->hash.load(turf::Relaxed);
for (;;) {
// Help complete the migration.
case Details::InsertResult_AlreadyFound:
m_value = m_cell->value.load(turf::Consume);
if (m_value == Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+ TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
uptr(m_value));
break;
}
case Details::InsertResult_InsertedNew:
goto breakOuter;
case Details::InsertResult_Overflow:
- TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
- 0);
+ TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0);
Details::beginTableMigration(m_map, m_table);
break;
}
Value eraseValue() {
TURF_ASSERT(m_cell); // Cell must have been found or inserted
- TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
+ TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
for (;;) {
if (m_value == Value(ValueTraits::NullValue))
return Value(m_value);
if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
// Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
- m_table->valuesRemaining.fetchAdd(1, turf::Relaxed);
Value result = m_value;
m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
- TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+ TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
m_cell - m_table->getCells());
if (m_value != Value(ValueTraits::Redirect)) {
// There was a racing write (or erase) to this cell.
return Value(ValueTraits::NullValue);
}
// We've been redirected to a new table.
- TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table),
+ TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] was redirected", uptr(m_table),
m_cell - m_table->getCells());
Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
for (;;) {
m_value = m_cell->value.load(turf::Relaxed);
if (m_value != Value(ValueTraits::Redirect))
break;
- TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
+ TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
m_cell - m_table->getCells());
}
}
// Lookup without creating a temporary Mutator.
Value get(Key key) {
Hash hash = KeyTraits::hash(key);
- TURF_TRACE(ConcurrentMap_Linear, 16, "[get] called", uptr(this), uptr(hash));
+ TURF_TRACE(ConcurrentMap_Linear, 15, "[get] called", uptr(this), uptr(hash));
for (;;) {
typename Details::Table* table = m_root.load(turf::Consume);
typename Details::Cell* cell = Details::find(hash, table);
if (value != Value(ValueTraits::Redirect))
return value; // Found an existing value
// We've been redirected to a new table. Help with the migration.
- TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell));
+ TURF_TRACE(ConcurrentMap_Linear, 16, "[get] was redirected", uptr(table), uptr(cell));
table->jobCoordinator.participate();
// Try again in the new table.
}
#include <junction/SimpleJobCoordinator.h>
#include <junction/QSBR.h>
+// Enable this to force migration overflows (for test purposes):
+#define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
+
namespace junction {
namespace details {
-TURF_TRACE_DECLARE(Linear, 22)
+TURF_TRACE_DECLARE(Linear, 26)
template <class Map>
struct Linear {
static const ureg InitialSize = 8;
static const ureg TableMigrationUnitSize = 32;
- static const ureg LinearSearchLimit = 128;
- static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
- TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
+ static const ureg CellsInUseSample = 256;
struct Cell {
turf::Atomic<Hash> hash;
struct Table {
const ureg sizeMask; // a power of two minus one
- const ureg limitNumValues;
turf::Atomic<sreg> cellsRemaining;
- turf::Atomic<sreg> valuesRemaining;
turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
- Table(ureg sizeMask, ureg limitNumValues)
- : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues),
- valuesRemaining(limitNumValues) {
+ Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
}
- static Table* create(ureg tableSize, ureg limitNumValues) {
+ static Table* create(ureg tableSize) {
TURF_ASSERT(turf::util::isPowerOf2(tableSize));
Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
- new (table) Table(tableSize - 1, limitNumValues);
+ new (table) Table(tableSize - 1);
for (ureg j = 0; j < tableSize; j++) {
table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
class TableMigration : public SimpleJobCoordinator::Job {
public:
+ struct Source {
+ Table* table;
+ turf::Atomic<ureg> sourceIndex;
+ };
+
Map& m_map;
- Table* m_source;
- turf::Atomic<ureg> m_sourceIndex;
Table* m_destination;
turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<bool> m_overflowed;
turf::Atomic<sreg> m_unitsRemaining;
+ ureg m_numSources;
+
+ TableMigration(Map& map) : m_map(map) {
+ }
- TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
- // Caller is responsible for filling in source & destination
+ static TableMigration* create(Map& map, ureg numSources) {
+ TableMigration* migration =
+ (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
+ new (migration) TableMigration(map);
+ migration->m_workerStatus.storeNonatomic(0);
+ migration->m_overflowed.storeNonatomic(false);
+ migration->m_unitsRemaining.storeNonatomic(0);
+ migration->m_numSources = numSources;
+ // Caller is responsible for filling in sources & destination
+ return migration;
}
virtual ~TableMigration() TURF_OVERRIDE {
- // Destroy source table.
- m_source->destroy();
}
void destroy() {
- delete this;
+ // Destroy all source tables.
+ for (ureg i = 0; i < m_numSources; i++)
+ if (getSources()[i].table)
+ getSources()[i].table->destroy();
+ // Delete the migration object itself.
+ this->TableMigration::~TableMigration();
+ TURF_HEAP.free(this);
+ }
+
+ Source* getSources() const {
+ return (Source*) (this + 1);
}
- bool migrateRange(ureg startIdx);
+ bool migrateRange(Table* srcTable, ureg startIdx);
virtual void run() TURF_OVERRIDE;
};
}
if (probeHash == KeyTraits::NullHash) {
// It's an empty cell. Try to reserve it.
- // But first, decrement cellsRemaining to ensure we have permission to create new getCells().
+ // But first, decrement cellsRemaining to ensure we have permission to create new cells.
s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
if (prevCellsRemaining <= 0) {
// Table is overpopulated.
}
}
- static void beginTableMigration(Map& map, Table* table) {
+ static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
// Create new migration by DCLI.
- TURF_TRACE(Linear, 8, "[beginTableMigration] called", 0, 0);
+ TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
if (job) {
- TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
+ TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
} else {
turf::LockGuard<turf::Mutex> guard(table->mutex);
job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
if (job) {
- TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
+ TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
} else {
- // Determine new migration size and cap the number of values that can be added concurrent to the migration.
- sreg oldValuesLimit = table->limitNumValues;
- sreg oldValuesRemaining = table->valuesRemaining.load(turf::Relaxed);
- sreg oldValuesInUse = oldValuesLimit - oldValuesRemaining;
- calculateNextTableSize:
- sreg nextTableSize = turf::util::roundUpPowerOf2(oldValuesInUse * 2);
- sreg nextLimitNumValues = nextTableSize * 3 / 4;
- if (nextLimitNumValues < oldValuesLimit) {
- // Set the new limitNumValues on the *current* table.
- // This prevents other threads, while the migration is in progress, from concurrently
- // re-inserting more values than the new table can hold.
- // To set the new limitNumValues on the current table in an atomic fashion,
- // we update its valuesRemaining via CAS loop:
- for (;;) {
- // We must recalculate desiredValuesRemaining on each iteration of the CAS loop
- oldValuesInUse = oldValuesLimit - oldValuesRemaining;
- sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
- if (desiredValuesRemaining < 0) {
- TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues,
- desiredValuesRemaining);
- // Must recalculate nextTableSize. Goto, baby!
- goto calculateNextTableSize;
- }
- if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed,
- turf::Relaxed))
- break; // Success!
- // CAS failed because table->valuesRemaining was modified by another thread.
- // An updated value has been reloaded into oldValuesRemaining (modified by reference).
- // Recalculate desiredValuesRemaining to account for the updated value, and try again.
- TURF_TRACE(Linear, 12, "[table] valuesRemaining CAS failed", oldValuesRemaining, desiredValuesRemaining);
- }
- }
- // Now we are assured that the new table will not become overpopulated during the migration process.
// Create new migration.
- TableMigration* migration = new TableMigration(map);
- migration->m_source = table;
- migration->m_destination = Table::create(nextTableSize, nextLimitNumValues);
+ TableMigration* migration = TableMigration::create(map, 1);
migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
+ migration->getSources()[0].table = table;
+ migration->getSources()[0].sourceIndex.storeNonatomic(0);
+ migration->m_destination = Table::create(nextTableSize);
// Publish the new migration.
table->jobCoordinator.storeRelease(migration);
}
}
}
+
+ static void beginTableMigration(Map& map, Table* table) {
+ // Estimate number of cells in use based on a small sample.
+ ureg idx = 0;
+ ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
+ ureg inUseCells = 0;
+ for (; idx < sampleSize; idx++) {
+ Cell* cell = table->getCells() + idx;
+ Value value = cell->value.load(turf::Relaxed);
+ if (value == Value(ValueTraits::Redirect)) {
+ // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+ TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0);
+ return;
+ }
+ if (value != Value(ValueTraits::NullValue))
+ inUseCells++;
+ }
+ float inUseRatio = float(inUseCells) / sampleSize;
+ float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
+#if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
+ // Periodically underestimate the number of cells in use.
+ // This exercises the code that handles overflow during migration.
+ static ureg counter = 1;
+ if ((++counter & 3) == 0) {
+ estimatedInUse /= 4;
+ }
+#endif
+ ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ beginTableMigrationToSize(map, table, nextTableSize);
+ }
}; // Linear
template <class Map>
-bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
- ureg srcSizeMask = m_source->sizeMask;
+bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
+ ureg srcSizeMask = srcTable->sizeMask;
ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
sreg valuesMigrated = 0;
// Iterate over source range.
for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
- Cell* srcCell = m_source->getCells() + (srcIdx & srcSizeMask);
+ Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
Hash srcHash;
Value srcValue;
// Fetch the srcHash and srcValue.
srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
+ TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
+ TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
// Check for deleted/uninitialized value.
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
+ TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+ if (srcValue == Value(ValueTraits::Redirect)) {
+ // FIXME: I don't think this will happen. Investigate & change to assert
+ TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+ break;
+ }
+ } else if (srcValue == Value(ValueTraits::Redirect)) {
+ // srcValue is already marked Redirect due to previous incomplete migration.
+ TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+ break;
}
// We've got a key/value pair to migrate.
// Reserve a destination cell in the destination.
TURF_ASSERT(srcHash != KeyTraits::NullHash);
TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
- TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
+ TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
Cell* dstCell;
InsertResult result = insert(srcHash, m_destination, dstCell);
// During migration, a hash can only exist in one place among all the source tables,
// and it is only migrated by one thread. Therefore, the hash will never already exist
// in the destination table:
TURF_ASSERT(result != InsertResult_AlreadyFound);
- TURF_ASSERT(result != InsertResult_Overflow);
+ if (result == InsertResult_Overflow) {
+ // Destination overflow.
+ // This can happen for several reasons. For example, the source table could have
+ // existed of all deleted cells when it overflowed, resulting in a small destination
+ // table size, but then another thread could re-insert all the same hashes
+ // before the migration completed.
+ // Caller will cancel the current migration and begin a new one.
+ return false;
+ }
// Migrate the old value to the new cell.
for (;;) {
// Copy srcValue to the destination.
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
// by a late-arriving erase.
if (srcValue == Value(ValueTraits::NullValue))
- TURF_TRACE(Linear, 16, "[migrateRange] racing update was erase", uptr(m_source), srcIdx);
- else
- valuesMigrated++;
+ TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
break;
}
// There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
- TURF_TRACE(Linear, 17, "[migrateRange] race to update migrated value", uptr(m_source), srcIdx);
+ TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
srcValue = doubleCheckedSrcValue;
}
// Cell successfully migrated. Proceed to next source cell.
}
}
}
- sreg prevValuesRemaining = m_destination->valuesRemaining.fetchSub(valuesMigrated, turf::Relaxed);
- TURF_ASSERT(valuesMigrated <= prevValuesRemaining);
- TURF_UNUSED(prevValuesRemaining);
// Range has been migrated successfully.
return true;
}
do {
if (probeStatus & 1) {
// End flag is already set, so do nothing.
- TURF_TRACE(Linear, 18, "[TableMigration::run] already ended", uptr(this), 0);
+ TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0);
return;
}
} while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
// # of workers has been incremented, and the end flag is clear.
TURF_ASSERT((probeStatus & 1) == 0);
- // Loop over all migration units in the source table.
- for (;;) {
- if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0);
- goto endMigration;
- }
- ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
- if (startIdx >= m_source->sizeMask + 1)
- break; // No more migration units.
- migrateRange(startIdx);
- sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
- TURF_ASSERT(prevRemaining > 0);
- if (prevRemaining == 1) {
- // That was the last chunk to migrate.
- m_workerStatus.fetchOr(1, turf::Relaxed);
- goto endMigration;
+ // Iterate over all source tables.
+ for (ureg s = 0; s < m_numSources; s++) {
+ Source& source = getSources()[s];
+ // Loop over all migration units in this source table.
+ for (;;) {
+ if (m_workerStatus.load(turf::Relaxed) & 1) {
+ TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ goto endMigration;
+ }
+ ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
+ if (startIdx >= source.table->sizeMask + 1)
+ break; // No more migration units in this table. Try next source table.
+ bool overflowed = !migrateRange(source.table, startIdx);
+ if (overflowed) {
+ // *** FAILED MIGRATION ***
+ // TableMigration failed due to destination table overflow.
+ // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+ // hence m_unitsRemaining won't reach zero.
+ // However, multiple threads can independently detect a failed migration at the same time.
+ TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
+ // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+ // the thread
+ // that deals with it.
+ bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
+ if (oldOverflowed)
+ TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ uptr(oldOverflowed));
+ m_workerStatus.fetchOr(1, turf::Relaxed);
+ goto endMigration;
+ }
+ sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
+ TURF_ASSERT(prevRemaining > 0);
+ if (prevRemaining == 1) {
+ // *** SUCCESSFUL MIGRATION ***
+ // That was the last chunk to migrate.
+ m_workerStatus.fetchOr(1, turf::Relaxed);
+ goto endMigration;
+ }
}
}
- TURF_TRACE(Linear, 20, "[TableMigration::run] out of migration units", uptr(this), 0);
+ TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0);
endMigration:
// Decrement the shared # of workers.
2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
- TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+ TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
return;
}
// We're the very last worker thread.
- // Publish the new subtree.
+ // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
TURF_ASSERT(probeStatus == 3);
- m_map.publishTableMigration(this);
- // End the jobCoodinator.
- m_source->jobCoordinator.end();
+ bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
+ if (!overflowed) {
+ // The migration succeeded. This is the most likely outcome. Publish the new subtree.
+ m_map.publishTableMigration(this);
+ // End the jobCoodinator.
+ getSources()[0].table->jobCoordinator.end();
+ } else {
+ // The migration failed due to the overflow of the destination table.
+ Table* origTable = getSources()[0].table;
+ turf::LockGuard<turf::Mutex> guard(origTable->mutex);
+ SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
+ if (checkedJob != this) {
+ TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ uptr(checkedJob));
+ } else {
+ TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
+ // Double the destination table size.
+ migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
+ // Transfer source tables to the new migration.
+ for (ureg i = 0; i < m_numSources; i++) {
+ migration->getSources()[i].table = getSources()[i].table;
+ getSources()[i].table = NULL;
+ migration->getSources()[i].sourceIndex.storeNonatomic(0);
+ }
+ migration->getSources()[m_numSources].table = m_destination;
+ migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
+ // Calculate total number of migration units to move.
+ ureg unitsRemaining = 0;
+ for (ureg s = 0; s < migration->m_numSources; s++)
+ unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
+ migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
+ // Publish the new migration.
+ origTable->jobCoordinator.storeRelease(migration);
+ }
+ }
// We're done with this TableMigration. Queue it for GC.
DefaultQSBR.enqueue(&TableMigration::destroy, this);