PointerAlignment: Left
SpaceAfterCStyleCast: true
AllowShortFunctionsOnASingleLine: None
+AlwaysBreakTemplateDeclarations: true
#include <junction/ConcurrentMap_Grampa.h>
namespace junction {
-
+
TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Grampa, 27) // autogenerated by TidySource.py
TURF_TRACE_DEFINE("[locateTable] flattree lookup redirected")
TURF_TRACE_DEFINE("[createInitialTable] race to create initial table")
// This TableMigration replaces the entire map with a single table.
TURF_ASSERT(migration->m_baseHash == 0);
TURF_ASSERT(migration->m_numDestinations == 1);
- ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root.
+ ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root.
// Store the single table in m_root directly.
typename Details::Table* newTable = migration->getDestinations()[0];
- m_root.store(uptr(newTable), turf::Release); // Make table contents visible
+ m_root.store(uptr(newTable), turf::Release); // Make table contents visible
newTable->isPublished.signal();
if ((oldRoot & 1) == 0) {
- TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root", uptr(migration), 0);
+ TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root",
+ uptr(migration), 0);
// If oldRoot is a table, it must be the original source of the migration.
TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table);
// Don't GC it here. The caller will GC it since it's a source of the TableMigration.
} else {
- TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root", uptr(migration), 0);
+ TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root",
+ uptr(migration), 0);
// The entire previous flattree is being replaced.
Details::garbageCollectFlatTree((typename Details::FlatTree*) (oldRoot & ~ureg(1)));
}
} else {
// We are either publishing a subtree of one or more tables, or replacing the entire map with multiple tables.
// In either case, there will be a flattree after this function returns.
- TURF_ASSERT(migration->m_safeShift < sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting
+ TURF_ASSERT(migration->m_safeShift <
+ sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting
ureg oldRoot = m_root.load(turf::Consume);
if ((oldRoot & 1) == 0) {
// There's no flattree yet. This means the TableMigration is publishing the full range of hashes.
TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table);
// Furthermore, it is guaranteed that there are no racing writes to m_root.
// Create a new flattree and store it to m_root.
- TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree", uptr(migration), 0);
+ TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree",
+ uptr(migration), 0);
typename Details::FlatTree* flatTree = Details::FlatTree::create(migration->m_safeShift);
typename Details::Table* prevTable = NULL;
for (ureg i = 0; i < migration->m_numDestinations; i++) {
prevTable = newTable;
}
}
- m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables
+ m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables
// Caller will GC the TableMigration.
// Caller will also GC the old oldRoot since it's a source of the TableMigration.
} else {
// There is an existing flattree, and we are publishing one or more tables to it.
// Attempt to publish the subtree in a loop.
// The loop is necessary because we might get redirected in the middle of publishing.
- TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree", uptr(migration), 0);
+ TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree",
+ uptr(migration), 0);
typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (oldRoot & ~ureg(1));
ureg subTreeEntriesPublished = 0;
typename Details::Table* tableToReplace = migration->getSources()[0].table;
// Otherwise, there will be a race between a subtree and its own children.
// (If all ManualResetEvent objects supported isPublished(), we could add a TURF_TRACE counter for this.
// In previous tests, such a counter does in fact get hit.)
- tableToReplace->isPublished.wait();
+ tableToReplace->isPublished.wait();
typename Details::Table* prevTable = NULL;
for (;;) {
publishLoop:
// First, try to create a FlatTreeMigration with the necessary properties.
// This will fail if an existing FlatTreeMigration has already been created using the same source.
// In that case, we'll help complete the existing FlatTreeMigration, then we'll retry the loop.
- TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small", uptr(migration), 0);
- typename Details::FlatTreeMigration* flatTreeMigration = Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift);
+ TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small",
+ uptr(migration), 0);
+ typename Details::FlatTreeMigration* flatTreeMigration =
+ Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift);
tableToReplace->jobCoordinator.runOne(flatTreeMigration);
- flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible
+ flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible
flatTree = flatTreeMigration->m_destination;
// The FlatTreeMigration has already been GC'ed by the last worker.
// Retry the loop.
// The subtree we're about to publish fits inside the flattree.
TURF_ASSERT(dstStartIndex + migration->m_numDestinations * repeat - 1 <= Hash(-1) >> flatTree->safeShift);
// If a previous attempt to publish got redirected, resume publishing into the new flattree,
- // starting with the first subtree entry that has not yet been fully published, as given by subTreeEntriesPublished.
+ // starting with the first subtree entry that has not yet been fully published, as given by
+ // subTreeEntriesPublished.
// (Note: We could, in fact, restart the publish operation starting at entry 0. That would be valid too.
// We are the only thread that can modify this particular range of the flattree at this time.)
- turf::Atomic<typename Details::Table*>* dstLeaf = flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat);
+ turf::Atomic<typename Details::Table*>* dstLeaf =
+ flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat);
typename Details::Table** subFlatTree = migration->getDestinations();
while (subTreeEntriesPublished < migration->m_numDestinations) {
typename Details::Table* srcTable = subFlatTree[subTreeEntriesPublished];
if (ureg(probeTable) == Details::RedirectFlatTree) {
// We've been redirected.
// Help with the FlatTreeMigration, then try again.
- TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration), uptr(dstLeaf));
- typename Details::FlatTreeMigration* flatTreeMigration = Details::getExistingFlatTreeMigration(flatTree);
+ TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration),
+ uptr(dstLeaf));
+ typename Details::FlatTreeMigration* flatTreeMigration =
+ Details::getExistingFlatTreeMigration(flatTree);
tableToReplace->jobCoordinator.runOne(flatTreeMigration);
- flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible
+ flatTreeMigration->m_completed.wait();
+ // flatTreeMigration->m_destination becomes entirely visible
flatTree = flatTreeMigration->m_destination;
goto publishLoop;
}
- // The only other possibility is that we were previously redirected, and the subtree entry got partially published.
- TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish", uptr(migration), 0);
+ // The only other possibility is that we were previously redirected, and the subtree entry got
+ // partially published.
+ TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish",
+ uptr(migration), 0);
TURF_ASSERT(probeTable == srcTable);
}
// The caller will GC the table) being replaced them since it's a source of the TableMigration.
// Constructor: Find existing cell
Mutator(ConcurrentMap_Grampa& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key));
+ TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)),
+ uptr(key));
Hash hash = KeyTraits::hash(key);
for (;;) {
if (!m_map.locateTable(m_table, m_sizeMask, hash))
return;
m_value = m_cell->value.load(turf::Consume);
if (m_value != Value(ValueTraits::Redirect))
- return; // Found an existing value
+ return; // Found an existing value
// We've encountered a Redirect value. Help finish the migration.
TURF_TRACE(ConcurrentMap_Grampa, 11, "[Mutator] find was redirected", uptr(m_table), 0);
m_table->jobCoordinator.participate();
// Constructor: Insert cell
Mutator(ConcurrentMap_Grampa& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key));
+ TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)),
+ uptr(key));
Hash hash = KeyTraits::hash(key);
for (;;) {
if (!m_map.locateTable(m_table, m_sizeMask, hash)) {
m_map.createInitialTable(Details::MinTableSize);
} else {
ureg overflowIdx;
- switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
- case Details::InsertResult_InsertedNew: {
- // We've inserted a new cell. Don't load m_cell->value.
- return;
- }
- case Details::InsertResult_AlreadyFound: {
- // The hash was already found in the table.
- m_value = m_cell->value.load(turf::Consume);
- if (m_value == Value(ValueTraits::Redirect)) {
- // We've encountered a Redirect value.
- TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
- break; // Help finish the migration.
- }
- return; // Found an existing value
- }
- case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table, overflowIdx);
- break;
+ switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
+ case Details::InsertResult_InsertedNew: {
+ // We've inserted a new cell. Don't load m_cell->value.
+ return;
+ }
+ case Details::InsertResult_AlreadyFound: {
+ // The hash was already found in the table.
+ m_value = m_cell->value.load(turf::Consume);
+ if (m_value == Value(ValueTraits::Redirect)) {
+ // We've encountered a Redirect value.
+ TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
+ break; // Help finish the migration.
}
+ return; // Found an existing value
+ }
+ case Details::InsertResult_Overflow: {
+ Details::beginTableMigration(m_map, m_table, overflowIdx);
+ break;
+ }
}
// A migration has been started (either by us, or another thread). Participate until it's complete.
m_table->jobCoordinator.participate();
Value exchangeValue(Value desired) {
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
for (;;) {
Value oldValue = m_value;
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
// Exchange was successful. Return previous value.
- TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
+ TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
+ uptr(desired));
Value result = m_value;
- m_value = desired; // Leave the mutator in a valid state
+ m_value = desired; // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
if (m_value != Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+ uptr(m_value));
if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value",
+ uptr(m_table), uptr(m_value));
}
// There was a racing write (or erase) to this cell.
// Pretend we exchanged with ourselves, and just let the racing write win.
TURF_UNUSED(exists);
m_value = Value(ValueTraits::NullValue);
ureg overflowIdx;
- switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
+ switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
case Details::InsertResult_AlreadyFound:
m_value = m_cell->value.load(turf::Consume);
if (m_value == Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+ uptr(m_value));
break;
}
goto breakOuter;
case Details::InsertResult_InsertedNew:
goto breakOuter;
case Details::InsertResult_Overflow:
- TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx);
+ TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
+ overflowIdx);
Details::beginTableMigration(m_map, m_table, overflowIdx);
break;
}
// We were redirected... again
}
- breakOuter:
- ;
+ breakOuter:;
// Try again in the new table.
}
}
}
Value eraseValue() {
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Grampa, 21, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_value));
for (;;) {
if (m_value == Value(ValueTraits::NullValue))
return m_value;
- TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
+ TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
// Exchange was successful and a non-NullValue value was erased and returned by reference in m_value.
- TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop.
+ TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop.
Value result = m_value;
- m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
+ m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
- TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+ uptr(m_value));
if (m_value != Value(ValueTraits::Redirect)) {
// There was a racing write (or erase) to this cell.
// Pretend we erased nothing, and just let the racing write win.
}
// We've been redirected to a new table.
TURF_TRACE(ConcurrentMap_Grampa, 23, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
- Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
+ Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
for (;;) {
// Help complete the migration.
m_table->jobCoordinator.participate();
return Value(ValueTraits::NullValue);
Value value = cell->value.load(turf::Consume);
if (value != Value(ValueTraits::Redirect))
- return value; // Found an existing value
+ return value; // Found an existing value
// We've been redirected to a new table. Help with the migration.
TURF_TRACE(ConcurrentMap_Grampa, 26, "[get] was redirected", uptr(table), 0);
table->jobCoordinator.participate();
void next() {
TURF_ASSERT(m_table);
- TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
+ TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
for (;;) {
searchInTable:
m_idx++;
if (m_idx <= m_table->sizeMask) {
// Index still inside range of table.
typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
- typename Details::Cell *cell = group->cells + (m_idx & 3);
+ typename Details::Cell* cell = group->cells + (m_idx & 3);
m_hash = cell->hash.load(turf::Relaxed);
if (m_hash != KeyTraits::NullHash) {
// Cell has been reserved.
m_value = cell->value.load(turf::Relaxed);
TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
if (m_value != Value(ValueTraits::NullValue))
- return; // Yield this cell.
+ return; // Yield this cell.
}
} else {
// We've advanced past the end of this table.
// Found the next table.
m_table = nextTable;
m_idx = -1;
- goto searchInTable; // Continue iterating in this table.
+ goto searchInTable; // Continue iterating in this table.
}
}
}
#include <junction/ConcurrentMap_LeapFrog.h>
namespace junction {
-
+
TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_LeapFrog, 17) // autogenerated by TidySource.py
TURF_TRACE_DEFINE("[Mutator] find constructor called")
TURF_TRACE_DEFINE("[Mutator] find was redirected")
return;
m_value = m_cell->value.load(turf::Consume);
if (m_value != Value(ValueTraits::Redirect))
- return; // Found an existing value
+ return; // Found an existing value
// We've encountered a Redirect value. Help finish the migration.
TURF_TRACE(ConcurrentMap_LeapFrog, 1, "[Mutator] find was redirected", uptr(m_table), 0);
m_table->jobCoordinator.participate();
}
// Constructor: Insert cell
- Mutator(ConcurrentMap_LeapFrog& map, Key key) : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
+ Mutator(ConcurrentMap_LeapFrog& map, Key key)
+ : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key));
Hash hash = KeyTraits::hash(key);
for (;;) {
m_table = m_map.m_root.load(turf::Consume);
ureg overflowIdx;
- switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
- case Details::InsertResult_InsertedNew: {
- // We've inserted a new cell. Don't load m_cell->value.
- return;
- }
- case Details::InsertResult_AlreadyFound: {
- // The hash was already found in the table.
- m_value = m_cell->value.load(turf::Consume);
- if (m_value == Value(ValueTraits::Redirect)) {
- // We've encountered a Redirect value.
- TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
- break; // Help finish the migration.
- }
- return; // Found an existing value
- }
- case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table, overflowIdx);
- break;
+ switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
+ case Details::InsertResult_InsertedNew: {
+ // We've inserted a new cell. Don't load m_cell->value.
+ return;
+ }
+ case Details::InsertResult_AlreadyFound: {
+ // The hash was already found in the table.
+ m_value = m_cell->value.load(turf::Consume);
+ if (m_value == Value(ValueTraits::Redirect)) {
+ // We've encountered a Redirect value.
+ TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
+ break; // Help finish the migration.
}
+ return; // Found an existing value
+ }
+ case Details::InsertResult_Overflow: {
+ Details::beginTableMigration(m_map, m_table, overflowIdx);
+ break;
+ }
}
// A migration has been started (either by us, or another thread). Participate until it's complete.
m_table->jobCoordinator.participate();
Value exchangeValue(Value desired) {
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
for (;;) {
Value oldValue = m_value;
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
// Exchange was successful. Return previous value.
- TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
+ TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
+ uptr(desired));
Value result = m_value;
- m_value = desired; // Leave the mutator in a valid state
+ m_value = desired; // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
if (m_value != Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+ uptr(m_value));
if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value",
+ uptr(m_table), uptr(m_value));
}
// There was a racing write (or erase) to this cell.
// Pretend we exchanged with ourselves, and just let the racing write win.
m_table = m_map.m_root.load(turf::Consume);
m_value = Value(ValueTraits::NullValue);
ureg overflowIdx;
- switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
+ switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
case Details::InsertResult_AlreadyFound:
m_value = m_cell->value.load(turf::Consume);
if (m_value == Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+ uptr(m_value));
break;
}
goto breakOuter;
case Details::InsertResult_InsertedNew:
goto breakOuter;
case Details::InsertResult_Overflow:
- TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx);
+ TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
+ overflowIdx);
Details::beginTableMigration(m_map, m_table, overflowIdx);
break;
}
// We were redirected... again
}
- breakOuter:
- ;
+ breakOuter:;
// Try again in the new table.
}
}
}
Value eraseValue() {
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_LeapFrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell));
for (;;) {
if (m_value == Value(ValueTraits::NullValue))
return Value(m_value);
- TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
+ TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
// Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
- TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
+ TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
Value result = m_value;
- m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
+ m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
- TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table), uptr(m_cell));
+ TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+ uptr(m_cell));
if (m_value != Value(ValueTraits::Redirect)) {
// There was a racing write (or erase) to this cell.
// Pretend we erased nothing, and just let the racing write win.
}
// We've been redirected to a new table.
TURF_TRACE(ConcurrentMap_LeapFrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
- Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
+ Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
for (;;) {
// Help complete the migration.
m_table->jobCoordinator.participate();
m_value = m_cell->value.load(turf::Relaxed);
if (m_value != Value(ValueTraits::Redirect))
break;
- TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table), uptr(m_cell));
+ TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
+ uptr(m_cell));
}
}
}
return Value(ValueTraits::NullValue);
Value value = cell->value.load(turf::Consume);
if (value != Value(ValueTraits::Redirect))
- return value; // Found an existing value
+ return value; // Found an existing value
// We've been redirected to a new table. Help with the migration.
TURF_TRACE(ConcurrentMap_LeapFrog, 16, "[get] was redirected", uptr(table), uptr(hash));
table->jobCoordinator.participate();
void next() {
TURF_ASSERT(m_table);
- TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
+ TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
while (++m_idx <= m_table->sizeMask) {
// Index still inside range of table.
typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
- typename Details::Cell *cell = group->cells + (m_idx & 3);
+ typename Details::Cell* cell = group->cells + (m_idx & 3);
m_hash = cell->hash.load(turf::Relaxed);
if (m_hash != KeyTraits::NullHash) {
// Cell has been reserved.
m_value = cell->value.load(turf::Relaxed);
TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
if (m_value != Value(ValueTraits::NullValue))
- return; // Yield this cell.
+ return; // Yield this cell.
}
}
// That's the end of the map.
#include <junction/ConcurrentMap_Linear.h>
namespace junction {
-
+
TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Linear, 18) // autogenerated by TidySource.py
TURF_TRACE_DEFINE("[Mutator] find constructor called")
TURF_TRACE_DEFINE("[Mutator] find was redirected")
return;
m_value = m_cell->value.load(turf::Consume);
if (m_value != Value(ValueTraits::Redirect))
- return; // Found an existing value
+ return; // Found an existing value
// We've encountered a Redirect value. Help finish the migration.
TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0));
m_table->jobCoordinator.participate();
}
// Constructor: Insert cell
- Mutator(ConcurrentMap_Linear& map, Key key) : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
+ Mutator(ConcurrentMap_Linear& map, Key key)
+ : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
TURF_TRACE(ConcurrentMap_Linear, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key));
Hash hash = KeyTraits::hash(key);
for (;;) {
m_table = m_map.m_root.load(turf::Consume);
- switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
- case Details::InsertResult_InsertedNew: {
- // We've inserted a new cell. Don't load m_cell->value.
- return;
- }
- case Details::InsertResult_AlreadyFound: {
- // The hash was already found in the table.
- m_value = m_cell->value.load(turf::Consume);
- if (m_value == Value(ValueTraits::Redirect)) {
- // We've encountered a Redirect value.
- TURF_TRACE(ConcurrentMap_Linear, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
- break; // Help finish the migration.
- }
- return; // Found an existing value
- }
- case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table);
- break;
+ switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
+ case Details::InsertResult_InsertedNew: {
+ // We've inserted a new cell. Don't load m_cell->value.
+ return;
+ }
+ case Details::InsertResult_AlreadyFound: {
+ // The hash was already found in the table.
+ m_value = m_cell->value.load(turf::Consume);
+ if (m_value == Value(ValueTraits::Redirect)) {
+ // We've encountered a Redirect value.
+ TURF_TRACE(ConcurrentMap_Linear, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
+ break; // Help finish the migration.
}
+ return; // Found an existing value
+ }
+ case Details::InsertResult_Overflow: {
+ Details::beginTableMigration(m_map, m_table);
+ break;
+ }
}
// A migration has been started (either by us, or another thread). Participate until it's complete.
m_table->jobCoordinator.participate();
Value exchangeValue(Value desired) {
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
for (;;) {
Value oldValue = m_value;
// Decrement remainingValues to ensure we have permission to (re)insert a value.
prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed);
if (prevRemainingValues <= 0) {
- TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table), prevRemainingValues);
+ TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table),
+ prevRemainingValues);
// Can't (re)insert any more values.
// There are two ways this can happen. One with a TableMigration already in progress, and one without:
// 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert.
// 2. Two threads race to insert the same key, and it's the last free cell in the table.
- // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration failure,
+ // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration
+ // failure,
// as LeapFrog and Grampa do...)
- m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
- // Since we don't know whether there's already a TableMigration in progress, always attempt to start one here:
+ m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
+ // Since we don't know whether there's already a TableMigration in progress, always attempt to start one
+ // here:
Details::beginTableMigration(m_map, m_table);
goto helpMigrate;
}
// Exchange was successful. Return previous value.
TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
Value result = m_value;
- m_value = desired; // Leave the mutator in a valid state
+ m_value = desired; // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
if (m_value != Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+ uptr(m_value));
if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value));
- m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
+ TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value",
+ uptr(m_table), uptr(m_value));
+ m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
}
// There was a racing write (or erase) to this cell.
// Pretend we exchanged with ourselves, and just let the racing write win.
// Try again in the new table.
m_table = m_map.m_root.load(turf::Consume);
m_value = Value(ValueTraits::NullValue);
- switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
+ switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
case Details::InsertResult_AlreadyFound:
m_value = m_cell->value.load(turf::Consume);
if (m_value == Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value));
+ TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+ uptr(m_value));
break;
}
goto breakOuter;
case Details::InsertResult_InsertedNew:
goto breakOuter;
case Details::InsertResult_Overflow:
- TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0);
+ TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
+ 0);
Details::beginTableMigration(m_map, m_table);
break;
}
// We were redirected... again
}
- breakOuter:
- ;
+ breakOuter:;
// Try again in the new table.
}
}
}
Value eraseValue() {
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
for (;;) {
if (m_value == Value(ValueTraits::NullValue))
return Value(m_value);
- TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
+ TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
// Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
- TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
+ TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
m_table->valuesRemaining.fetchAdd(1, turf::Relaxed);
Value result = m_value;
- m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
+ m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
return result;
}
// The CAS failed and m_value has been updated with the latest value.
- TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table), m_cell - m_table->getCells());
+ TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+ m_cell - m_table->getCells());
if (m_value != Value(ValueTraits::Redirect)) {
// There was a racing write (or erase) to this cell.
// Pretend we erased nothing, and just let the racing write win.
return Value(ValueTraits::NullValue);
}
// We've been redirected to a new table.
- TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table), m_cell - m_table->getCells());
- Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
+ TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table),
+ m_cell - m_table->getCells());
+ Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
for (;;) {
// Help complete the migration.
m_table->jobCoordinator.participate();
m_value = m_cell->value.load(turf::Relaxed);
if (m_value != Value(ValueTraits::Redirect))
break;
- TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table), m_cell - m_table->getCells());
+ TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
+ m_cell - m_table->getCells());
}
}
}
return Value(ValueTraits::NullValue);
Value value = cell->value.load(turf::Consume);
if (value != Value(ValueTraits::Redirect))
- return value; // Found an existing value
+ return value; // Found an existing value
// We've been redirected to a new table. Help with the migration.
TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell));
table->jobCoordinator.participate();
void next() {
TURF_ASSERT(m_table);
- TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
+ TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
while (++m_idx <= m_table->sizeMask) {
// Index still inside range of table.
- typename Details::Cell *cell = m_table->getCells() + m_idx;
+ typename Details::Cell* cell = m_table->getCells() + m_idx;
m_hash = cell->hash.load(turf::Relaxed);
if (m_hash != KeyTraits::NullHash) {
// Cell has been reserved.
m_value = cell->value.load(turf::Relaxed);
TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
if (m_value != Value(ValueTraits::NullValue))
- return; // Yield this cell.
+ return; // Yield this cell.
}
}
// That's the end of the map.
#define JUNCTION_CORE_H
//-----------------------------------------------
-#include "junction_config.h" // junction_config.h generated by CMake.
+#include "junction_config.h" // junction_config.h generated by CMake.
// Default to true in case junction_config.h is missing entirely:
#ifndef JUNCTION_USE_STRIPING
// This is like saying that all contexts are quiescent,
// so we can issue all actions at once.
// No lock is taken.
- TURF_RACE_DETECT_GUARD(m_flushRaceDetector); // There should be no concurrent operations
+ TURF_RACE_DETECT_GUARD(m_flushRaceDetector); // There should be no concurrent operations
for (ureg i = 0; i < m_pendingActions.size(); i++)
m_pendingActions[i]();
m_pendingActions.clear();
private:
struct Action {
void (*func)(void*);
- uptr param[4]; // Size limit found experimentally. Verified by assert below.
+ uptr param[4]; // Size limit found experimentally. Verified by assert below.
Action(void (*f)(void*), void* p, ureg paramSize) : func(f) {
- TURF_ASSERT(paramSize <= sizeof(param)); // Verify size limit.
+ TURF_ASSERT(paramSize <= sizeof(param)); // Verify size limit.
memcpy(¶m, p, paramSize);
}
void operator()() {
void (T::*pmf)();
T* target;
static void thunk(void* param) {
- Closure* self = (Closure*) param;
- TURF_CALL_MEMBER(*self->target, self->pmf)();
+ Closure* self = (Closure*) param;
+ TURF_CALL_MEMBER (*self->target, self->pmf)();
}
};
- Closure closure = { pmf, target };
+ Closure closure = {pmf, target};
turf::LockGuard<turf::Mutex> guard(m_mutex);
TURF_RACE_DETECT_GUARD(m_flushRaceDetector);
m_deferredActions.push_back(Action(Closure::thunk, &closure, sizeof(closure)));
if (job == prevJob) {
turf::LockGuard<turf::Mutex> guard(pair.mutex);
for (;;) {
- job = m_job.loadNonatomic(); // No concurrent writes inside lock
+ job = m_job.loadNonatomic(); // No concurrent writes inside lock
if (job != prevJob)
break;
pair.condVar.wait(guard);
static Cell* createTable(ureg size) {
Cell* cells = (Cell*) TURF_HEAP.alloc(sizeof(Cell) * size);
for (ureg i = 0; i < size; i++)
- new(cells + i) Cell(KeyTraits::NullHash, Value(ValueTraits::NullValue));
+ new (cells + i) Cell(KeyTraits::NullHash, Value(ValueTraits::NullValue));
return cells;
}
idx &= map.m_sizeMask;
m_cell = map.m_cells + idx;
if (m_cell->hash == hash)
- return; // Key found in table.
+ return; // Key found in table.
if (m_cell->hash != KeyTraits::NullHash)
- continue; // Slot is taken by another key. Try next slot.
- m_cell = NULL; // Insert not allowed & key not found.
+ continue; // Slot is taken by another key. Try next slot.
+ m_cell = NULL; // Insert not allowed & key not found.
return;
}
}
idx &= map.m_sizeMask;
m_cell = map.m_cells + idx;
if (m_cell->hash == hash)
- return; // Key found in table.
+ return; // Key found in table.
if (m_cell->hash != KeyTraits::NullHash)
- continue; // Slot is taken by another key. Try next slot.
+ continue; // Slot is taken by another key. Try next slot.
// Insert is allowed. Reserve this cell.
if (isOverpopulated(map.m_population, map.m_sizeMask)) {
map.migrateToNewTable((map.m_sizeMask + 1) * 2);
- break; // Retry in new table.
+ break; // Retry in new table.
}
map.m_population++;
m_cell->hash = hash;
}
~Iterator() {
- TURF_ASSERT(!m_cell || m_cell->value != NULL); // Forbid leaving a cell half-inserted.
+ TURF_ASSERT(!m_cell || m_cell->value != NULL); // Forbid leaving a cell half-inserted.
}
public:
Value exchangeValue(Value desired) {
TURF_ASSERT(m_cell);
- TURF_ASSERT(desired != NULL); // Use eraseValue()
+ TURF_ASSERT(desired != NULL); // Use eraseValue()
Value oldValue = m_cell->value;
m_cell->value = desired;
return oldValue;
Value erase() {
TURF_ASSERT(m_cell);
- TURF_ASSERT(m_cell->value != NULL); // Forbid erasing a cell that's not actually used.
+ TURF_ASSERT(m_cell->value != NULL); // Forbid erasing a cell that's not actually used.
Value oldValue = m_cell->value;
// Remove this cell by shuffling neighboring cells so there are no gaps in anyone's probe chain
- ureg cellIdx = m_cell - m_map.m_cells;
+ ureg cellIdx = m_cell - m_map.m_cells;
for (ureg neighborIdx = cellIdx + 1;; neighborIdx++) {
neighborIdx &= m_map.m_sizeMask;
Cell* neighbor = m_map.m_cells + neighborIdx;
GrampaCounter numFlatTrees;
GrampaCounter numFlatTreeMigrations;
- static GrampaStats Instance; // Zero-initialized
+ static GrampaStats Instance; // Zero-initialized
};
#endif
TURF_TRACE_DECLARE(Grampa, 37)
-template<class Map>
+template <class Map>
struct Grampa {
typedef typename Map::Hash Hash;
typedef typename Map::Value Value;
static const ureg FlatTreeMigrationUnitSize = 32;
static const ureg LinearSearchLimit = 128;
static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
+ TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
static const ureg MinTableSize = 8;
// eg. If the entire map is stored in a single table, then Table::shift == HASH_BITS.
// If the entire map is stored in two tables, then Table::shift == (HASH_BITS - 1) for each table.
// FlatTree::shift is always <= Table::shift for all the tables it contains.
- const ureg sizeMask; // a power of two minus one
+ const ureg sizeMask; // a power of two minus one
const Hash baseHash;
const ureg unsafeRangeShift;
- junction::striped::ManualResetEvent isPublished; // To prevent publishing a subtree before its parent is published (happened in testing)
- junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
- SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+ junction::striped::ManualResetEvent
+ isPublished; // To prevent publishing a subtree before its parent is published (happened in testing)
+ junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
+ SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
- Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift) : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) {
+ Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift)
+ : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) {
}
static Table* create(ureg tableSize, ureg baseHash, ureg unsafeShift) {
TURF_ASSERT(tableSize >= 4);
ureg numGroups = tableSize >> 2;
Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
- new(table) Table(tableSize - 1, baseHash, (u8) unsafeShift);
+ new (table) Table(tableSize - 1, baseHash, (u8) unsafeShift);
for (ureg i = 0; i < numGroups; i++) {
CellGroup* group = table->getCellGroups() + i;
for (ureg j = 0; j < 4; j++) {
};
Map& m_map;
- Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree.
+ Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree.
// If m_numDestinations == 1, m_shift == 0.
- // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS - m_shift).
+ // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS
+ // - m_shift).
// This ensures that m_shift is always less than sizeof(Hash) * 8, so that shifting by m_shift is not undefined behavior.
// To determine the subtree index for a hash during migration, we use: (hash >> m_shift) & (m_numDestinations - 1)
// A mask is used since we are only migrating a subtree -- not necessarily the entire map.
ureg m_safeShift;
- turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
turf::Atomic<sreg> m_overflowTableIndex;
turf::Atomic<sreg> m_unitsRemaining;
ureg m_numSources;
- ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated.
+ ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated.
TableMigration(Map& map) : m_map(map) {
}
static TableMigration* create(Map& map, ureg numSources, ureg numDestinations) {
- TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations);
- new(migration) TableMigration(map);
+ TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(
+ sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations);
+ new (migration) TableMigration(map);
migration->m_workerStatus.storeNonatomic(0);
migration->m_overflowTableIndex.storeNonatomic(-1);
migration->m_unitsRemaining.storeNonatomic(0);
migration->m_numSources = numSources;
migration->m_numDestinations = numDestinations;
- // Caller is responsible for filling in source & destination pointers
#if JUNCTION_TRACK_GRAMPA_STATS
GrampaStats::Instance.numTableMigrations.increment();
#endif
+ // Caller is responsible for filling in source & destination pointers
return migration;
}
// Each time the flattree doubles in size, shift decreases by 1.
const ureg safeShift;
junction::striped::Mutex mutex;
- FlatTreeMigration* migration; // Protected by mutex
+ FlatTreeMigration* migration; // Protected by mutex
FlatTree(ureg safeShift) : safeShift(safeShift), migration(NULL) {
// A FlatTree always has at least two tables, so the shift is always safe.
TURF_ASSERT(safeShift < sizeof(Hash) * 8);
ureg numLeaves = (Hash(-1) >> safeShift) + 1;
FlatTree* flatTree = (FlatTree*) TURF_HEAP.alloc(sizeof(FlatTree) + sizeof(turf::Atomic<Table*>) * numLeaves);
- new(flatTree) FlatTree(safeShift);
- // Caller will initialize flatTree->getTables()
+ new (flatTree) FlatTree(safeShift);
#if JUNCTION_TRACK_GRAMPA_STATS
GrampaStats::Instance.numFlatTrees.increment();
#endif
+ // Caller will initialize flatTree->getTables()
return flatTree;
}
}
// FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
- enum InsertResult {
- InsertResult_AlreadyFound,
- InsertResult_InsertedNew,
- InsertResult_Overflow
- };
+ enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
static InsertResult insert(Hash hash, Table* table, ureg sizeMask, Cell*& cell, ureg& overflowIdx) {
TURF_TRACE(Grampa, 3, "[insert] called", uptr(table), hash);
TURF_ASSERT(table);
probeHash = cell->hash.load(turf::Acquire);
} while (probeHash == KeyTraits::NullHash);
}
- TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
+ TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
if (probeHash == hash) {
TURF_TRACE(Grampa, 8, "[insert] found in probe chain", uptr(table), idx);
return InsertResult_AlreadyFound;
// Reached the end of the link chain for this bucket.
// Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
ureg prevLinkIdx = idx;
- TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
+ TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
while (linearProbesRemaining-- > 0) {
idx++;
TURF_TRACE(Grampa, 9, "[insert] reserved cell", uptr(table), idx);
TURF_ASSERT(probeDelta == 0);
u8 desiredDelta = idx - prevLinkIdx;
+#if TURF_WITH_ASSERTS
// Note: another thread could actually set the link on our behalf (see below).
-#if TURF_WITH_ASSERTS
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
#else
prevLink->store(desiredDelta, turf::Relaxed);
-#endif
+#endif
return InsertResult_InsertedNew;
} else {
TURF_TRACE(Grampa, 10, "[insert] race to reserve cell", uptr(table), idx);
// there's no guarantee that our own link chain will be well-formed by the time this function returns.
// (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
+#if TURF_WITH_ASSERTS
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
if (probeDelta == 0)
TURF_TRACE(Grampa, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
#else
prevLink->store(desiredDelta, turf::Relaxed);
-#endif
- goto followLink; // Try to follow link chain for the bucket again.
+#endif
+ goto followLink; // Try to follow link chain for the bucket again.
}
// Continue linear search...
}
TURF_TRACE(Grampa, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
} else {
turf::LockGuard<junction::striped::Mutex> guard(table->mutex);
- job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+ job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
if (job) {
TURF_TRACE(Grampa, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
} else {
migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
migration->getSources()[0].table = table;
migration->getSources()[0].sourceIndex.storeNonatomic(0);
- ureg subRangeShift = table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range)
+ ureg subRangeShift =
+ table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range)
ureg hashOffsetDelta = subRangeShift < (sizeof(Hash) * 8) ? (ureg(1) << subRangeShift) : 0;
for (ureg i = 0; i < numDestinations; i++) {
- migration->getDestinations()[i] = Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift);
+ migration->getDestinations()[i] =
+ Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift);
}
// Publish the new migration.
table->jobCoordinator.storeRelease(migration);
}
beginTableMigrationToSize(map, table, nextTableSize, splitShift);
}
-
+
static FlatTreeMigration* createFlatTreeMigration(Map& map, FlatTree* flatTree, ureg shift) {
turf::LockGuard<junction::striped::Mutex> guard(flatTree->mutex);
if (!flatTree->migration) {
static FlatTreeMigration* getExistingFlatTreeMigration(FlatTree* flatTree) {
turf::LockGuard<junction::striped::Mutex> guard(flatTree->mutex);
- TURF_ASSERT(flatTree->migration); // Must already exist!
+ TURF_ASSERT(flatTree->migration); // Must already exist!
return flatTree->migration;
}
}; // Grampa
// Return index of the destination table that overflowed, or -1 if none
-template<class Map>
+template <class Map>
sreg Grampa<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
ureg srcSizeMask = srcTable->sizeMask;
ureg safeShift = m_safeShift;
srcHash = srcCell->hash.load(turf::Relaxed);
if (srcHash == KeyTraits::NullHash) {
// An unused cell. Try to put a Redirect marker in its value.
- srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+ srcValue =
+ srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
TURF_TRACE(Grampa, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Grampa, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
if (srcValue == Value(ValueTraits::NullValue)) {
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Grampa, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
TURF_TRACE(Grampa, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
-
+
// We've got a key/value pair to migrate.
// Reserve a destination cell in dstTable.
TURF_ASSERT(srcHash != KeyTraits::NullHash);
// Copy srcValue to the destination.
dstCell->value.store(srcValue, turf::Relaxed);
// Try to place a Redirect marker in srcValue.
- Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
- TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+ Value doubleCheckedSrcValue =
+ srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+ TURF_ASSERT(doubleCheckedSrcValue !=
+ Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
if (doubleCheckedSrcValue == srcValue) {
// No racing writes to the src. We've successfully placed the Redirect marker.
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
if (startIdx >= source.table->sizeMask + 1)
- break; // No more migration units in this table. Try next source table.
+ break; // No more migration units in this table. Try next source table.
sreg overflowTableIndex = migrateRange(source.table, startIdx);
- if (overflowTableIndex >= 0) {
+ if (overflowTableIndex >= 0) {
// *** FAILED MIGRATION ***
// TableMigration failed due to destination table overflow.
- // No other thread can declare the migration successful at this point, because *this* unit will never complete, hence m_unitsRemaining won't reach zero.
+ // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+ // hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
- // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads before
- // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from the thread
+ // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads
+ // before
+ // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+ // the thread
// that deals with it.
// Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins.
sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed);
if (oldIndex >= 0)
- TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), uptr(oldIndex));
+ TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex),
+ uptr(oldIndex));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
endMigration:
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish
+ probeStatus =
+ m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
TURF_TRACE(Grampa, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
// We're the very last worker thread.
// Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
TURF_ASSERT(probeStatus == 3);
- sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point
+ sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point
if (overflowTableIndex < 0) {
// The migration succeeded. This is the most likely outcome. Publish the new subtree.
m_map.publishTableMigration(this);
turf::LockGuard<junction::striped::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ uptr(checkedJob));
} else {
TableMigration* migration;
Table* overflowedTable = getDestinations()[overflowTableIndex];
if (overflowedTable->sizeMask + 1 < LeafSize) {
// The entire map is contained in a small table.
- TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable),
+ uptr(checkedJob));
TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8);
TURF_ASSERT(overflowedTable->baseHash == 0);
TURF_ASSERT(m_numDestinations == 1);
migration->m_baseHash = 0;
migration->m_safeShift = 0;
// Double the destination table size.
- migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash, overflowedTable->unsafeRangeShift);
+ migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash,
+ overflowedTable->unsafeRangeShift);
} else {
// The overflowed table is already the size of a leaf. Split it into two ranges.
if (count == 1) {
- TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable),
+ uptr(checkedJob));
migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2);
migration->m_baseHash = m_baseHash;
migration->m_safeShift = getUnsafeShift() - 1;
}
count = 2;
} else {
- TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable),
+ uptr(checkedJob));
migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations);
migration->m_baseHash = m_baseHash;
migration->m_safeShift = m_safeShift;
migration->getDestinations()[lo + i] = splitTable1;
}
ureg halfNumHashes = ureg(1) << (origTable->unsafeRangeShift - 1);
- Table* splitTable2 = Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1);
+ Table* splitTable2 =
+ Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1);
for (; i < count; i++) {
migration->getDestinations()[lo + i] = splitTable2;
}
DefaultQSBR.enqueue(&TableMigration::destroy, this);
}
-template<class Map>
+template <class Map>
void Grampa<Map>::FlatTreeMigration::run() {
// Conditionally increment the shared # of workers.
ureg probeStatus = m_workerStatus.load(turf::Relaxed);
for (;;) {
ureg srcStart = m_sourceIndex.fetchAdd(FlatTreeMigrationUnitSize, turf::Relaxed);
if (srcStart >= srcSize)
- break; // No more migration units in this flattree.
+ break; // No more migration units in this flattree.
// Migrate this range
ureg srcEnd = turf::util::min(srcSize, srcStart + FlatTreeMigrationUnitSize);
ureg dst = srcStart * repeat;
}
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+ probeStatus = m_workerStatus.fetchSub(
+ 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
return;
// We're the very last worker thread.
// Publish the new flattree.
- TURF_ASSERT(probeStatus == 3); // End flag must be set
+ TURF_ASSERT(probeStatus == 3); // End flag must be set
m_map.publishFlatTreeMigration(this);
m_completed.signal();
TURF_TRACE_DECLARE(LeapFrog, 33)
-template<class Map>
+template <class Map>
struct LeapFrog {
typedef typename Map::Hash Hash;
typedef typename Map::Value Value;
static const ureg TableMigrationUnitSize = 32;
static const ureg LinearSearchLimit = 128;
static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
+ TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
struct Cell {
};
struct Table {
- const ureg sizeMask; // a power of two minus one
- turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
- SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+ const ureg sizeMask; // a power of two minus one
+ turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
+ SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
Table(ureg sizeMask) : sizeMask(sizeMask) {
}
TURF_ASSERT(tableSize >= 4);
ureg numGroups = tableSize >> 2;
Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
- new(table) Table(tableSize - 1);
+ new (table) Table(tableSize - 1);
for (ureg i = 0; i < numGroups; i++) {
CellGroup* group = table->getCellGroups() + i;
for (ureg j = 0; j < 4; j++) {
Map& m_map;
Table* m_destination;
- turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
turf::Atomic<bool> m_overflowed;
turf::Atomic<sreg> m_unitsRemaining;
ureg m_numSources;
}
static TableMigration* create(Map& map, ureg numSources) {
- TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
- new(migration) TableMigration(map);
+ TableMigration* migration =
+ (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
+ new (migration) TableMigration(map);
migration->m_workerStatus.storeNonatomic(0);
migration->m_overflowed.storeNonatomic(false);
migration->m_unitsRemaining.storeNonatomic(0);
}
// FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
- enum InsertResult {
- InsertResult_AlreadyFound,
- InsertResult_InsertedNew,
- InsertResult_Overflow
- };
+ enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
static InsertResult insert(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
TURF_TRACE(LeapFrog, 3, "[insert] called", uptr(table), hash);
TURF_ASSERT(table);
probeHash = cell->hash.load(turf::Acquire);
} while (probeHash == KeyTraits::NullHash);
}
- TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
+ TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
if (probeHash == hash) {
TURF_TRACE(LeapFrog, 8, "[insert] found in probe chain", uptr(table), idx);
return InsertResult_AlreadyFound;
// Reached the end of the link chain for this bucket.
// Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
ureg prevLinkIdx = idx;
- TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
+ TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
while (linearProbesRemaining-- > 0) {
idx++;
TURF_TRACE(LeapFrog, 9, "[insert] reserved cell", uptr(table), idx);
TURF_ASSERT(probeDelta == 0);
u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
+#if TURF_WITH_ASSERTS
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
#else
// there's no guarantee that our own link chain will be well-formed by the time this function returns.
// (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
+#if TURF_WITH_ASSERTS
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
if (probeDelta == 0)
TURF_TRACE(LeapFrog, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
#else
prevLink->store(desiredDelta, turf::Relaxed);
-#endif
- goto followLink; // Try to follow link chain for the bucket again.
+#endif
+ goto followLink; // Try to follow link chain for the bucket again.
}
// Continue linear search...
}
TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
} else {
turf::LockGuard<turf::Mutex> guard(table->mutex);
- job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+ job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
if (job) {
TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
} else {
}
}; // LeapFrog
-template<class Map>
+template <class Map>
bool LeapFrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
ureg srcSizeMask = srcTable->sizeMask;
ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
srcHash = srcCell->hash.load(turf::Relaxed);
if (srcHash == KeyTraits::NullHash) {
// An unused cell. Try to put a Redirect marker in its value.
- srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+ srcValue =
+ srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
if (srcValue == Value(ValueTraits::NullValue)) {
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
-
+
// We've got a key/value pair to migrate.
// Reserve a destination cell in the destination.
TURF_ASSERT(srcHash != KeyTraits::NullHash);
// Copy srcValue to the destination.
dstCell->value.store(srcValue, turf::Relaxed);
// Try to place a Redirect marker in srcValue.
- Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
- TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+ Value doubleCheckedSrcValue =
+ srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+ TURF_ASSERT(doubleCheckedSrcValue !=
+ Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
if (doubleCheckedSrcValue == srcValue) {
// No racing writes to the src. We've successfully placed the Redirect marker.
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
if (startIdx >= source.table->sizeMask + 1)
- break; // No more migration units in this table. Try next source table.
+ break; // No more migration units in this table. Try next source table.
bool overflowed = !migrateRange(source.table, startIdx);
if (overflowed) {
// *** FAILED MIGRATION ***
// TableMigration failed due to destination table overflow.
- // No other thread can declare the migration successful at this point, because *this* unit will never complete, hence m_unitsRemaining won't reach zero.
+ // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+ // hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
// The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
- // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from the thread
+ // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+ // the thread
// that deals with it.
bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
if (oldOverflowed)
- TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), uptr(oldOverflowed));
+ TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ uptr(oldOverflowed));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
endMigration:
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+ probeStatus = m_workerStatus.fetchSub(
+ 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
// We're the very last worker thread.
// Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
TURF_ASSERT(probeStatus == 3);
- bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
+ bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
if (!overflowed) {
// The migration succeeded. This is the most likely outcome. Publish the new subtree.
m_map.publishTableMigration(this);
turf::LockGuard<turf::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ uptr(checkedJob));
} else {
TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
// Double the destination table size.
TURF_TRACE_DECLARE(Linear, 22)
-template<class Map>
+template <class Map>
struct Linear {
typedef typename Map::Hash Hash;
typedef typename Map::Value Value;
static const ureg TableMigrationUnitSize = 32;
static const ureg LinearSearchLimit = 128;
static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
+ TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
struct Cell {
};
struct Table {
- const ureg sizeMask; // a power of two minus one
+ const ureg sizeMask; // a power of two minus one
const ureg limitNumValues;
turf::Atomic<sreg> cellsRemaining;
turf::Atomic<sreg> valuesRemaining;
- turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
- SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+ turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
+ SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
- Table(ureg sizeMask, ureg limitNumValues) : sizeMask(sizeMask), limitNumValues(limitNumValues),
- cellsRemaining(limitNumValues), valuesRemaining(limitNumValues) {
+ Table(ureg sizeMask, ureg limitNumValues)
+ : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues),
+ valuesRemaining(limitNumValues) {
}
static Table* create(ureg tableSize, ureg limitNumValues) {
TURF_ASSERT(turf::util::isPowerOf2(tableSize));
Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
- new(table) Table(tableSize - 1, limitNumValues);
+ new (table) Table(tableSize - 1, limitNumValues);
for (ureg j = 0; j < tableSize; j++) {
table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
Table* m_source;
turf::Atomic<ureg> m_sourceIndex;
Table* m_destination;
- turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
turf::Atomic<sreg> m_unitsRemaining;
TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
}
// FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
- enum InsertResult {
- InsertResult_AlreadyFound,
- InsertResult_InsertedNew,
- InsertResult_Overflow
- };
+ enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
TURF_ASSERT(table);
uptr probeHash = cell->hash.load(turf::Relaxed);
if (probeHash == hash) {
TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
- return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
+ return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
}
if (probeHash == KeyTraits::NullHash) {
// It's an empty cell. Try to reserve it.
if (prevCellsRemaining <= 0) {
// Table is overpopulated.
TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
- table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
+ table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
return InsertResult_Overflow;
}
// Try to reserve this cell.
}
// There was a race and another thread reserved that cell from under us.
TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
- table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
+ table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
if (prevHash == hash) {
TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
- return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
+ return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
}
}
// Try again in the next cell.
TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
} else {
turf::LockGuard<turf::Mutex> guard(table->mutex);
- job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+ job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
if (job) {
TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
} else {
// re-inserting more values than the new table can hold.
// To set the new limitNumValues on the current table in an atomic fashion,
// we update its valuesRemaining via CAS loop:
- for(;;) {
+ for (;;) {
// We must recalculate desiredValuesRemaining on each iteration of the CAS loop
oldValuesInUse = oldValuesLimit - oldValuesRemaining;
sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
if (desiredValuesRemaining < 0) {
- TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues, desiredValuesRemaining);
+ TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues,
+ desiredValuesRemaining);
// Must recalculate nextTableSize. Goto, baby!
goto calculateNextTableSize;
}
- if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed, turf::Relaxed))
+ if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed,
+ turf::Relaxed))
break; // Success!
// CAS failed because table->valuesRemaining was modified by another thread.
// An updated value has been reloaded into oldValuesRemaining (modified by reference).
}
}; // Linear
-template<class Map>
+template <class Map>
bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
ureg srcSizeMask = m_source->sizeMask;
ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
srcHash = srcCell->hash.load(turf::Relaxed);
if (srcHash == KeyTraits::NullHash) {
// An unused cell. Try to put a Redirect marker in its value.
- srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+ srcValue =
+ srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
if (srcValue == Value(ValueTraits::NullValue)) {
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
}
-
+
// We've got a key/value pair to migrate.
// Reserve a destination cell in the destination.
TURF_ASSERT(srcHash != KeyTraits::NullHash);
TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
- TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
+ TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
Cell* dstCell;
InsertResult result = insert(srcHash, m_destination, dstCell);
// During migration, a hash can only exist in one place among all the source tables,
// Copy srcValue to the destination.
dstCell->value.store(srcValue, turf::Relaxed);
// Try to place a Redirect marker in srcValue.
- Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
- TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+ Value doubleCheckedSrcValue =
+ srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+ TURF_ASSERT(doubleCheckedSrcValue !=
+ Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
if (doubleCheckedSrcValue == srcValue) {
// No racing writes to the src. We've successfully placed the Redirect marker.
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
}
ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
if (startIdx >= m_source->sizeMask + 1)
- break; // No more migration units.
+ break; // No more migration units.
migrateRange(startIdx);
sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
TURF_ASSERT(prevRemaining > 0);
endMigration:
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+ probeStatus = m_workerStatus.fetchSub(
+ 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
#if JUNCTION_WITH_NBDS && NBDS_USE_TURF_HEAP
extern "C" {
-void mem_init (void) {
+void mem_init(void) {
}
-void *nbd_malloc (size_t n) {
+void* nbd_malloc(size_t n) {
return TURF_HEAP.alloc(n);
}
-void nbd_free (void *x) {
+void nbd_free(void* x) {
TURF_HEAP.free(x);
}
} // extern "C"
return TURF_HEAP.alloc(size);
}
-void tbbWrap_free(void* ptr) {
+void tbbWrap_free(void* ptr) {
TURF_HEAP.free(ptr);
}
return TURF_HEAP.allocAligned(size, alignment);
}
-void tbbWrap_padded_free(void* ptr) {
+void tbbWrap_padded_free(void* ptr) {
TURF_HEAP.free(ptr);
}
#endif // JUNCTION_WITH_TBB && TBB_USE_TURF_HEAP
#include <junction/MapTraits.h>
#include <cds/init.h>
#include <cds/gc/hp.h>
-#include <memory.h> // memcpy required by cuckoo_map.h
+#include <memory.h> // memcpy required by cuckoo_map.h
#include <cds/container/cuckoo_map.h>
namespace junction {
class MapAdapter {
public:
static TURF_CONSTEXPR const char* MapName = "CDS CuckooMap";
-
- cds::gc::HP *m_hpGC;
+
+ cds::gc::HP* m_hpGC;
MapAdapter(ureg) {
cds::Initialize();
void* get(u32 key) {
void* result = NULL;
- m_map.find(key, [&result](std::pair<const u32, void*>& item){ result = item.second; });
+ m_map.find(key, [&result](std::pair<const u32, void*>& item) { result = item.second; });
return result;
}
public:
static TURF_CONSTEXPR const char* MapName = "CDS MichaelKVList";
- cds::gc::HP *m_hpGC;
+ cds::gc::HP* m_hpGC;
MapAdapter(ureg) {
cds::Initialize();
class Map {
private:
// List traits based on std::less predicate
- struct ListTraits : public cds::container::michael_list::traits
- {
- typedef std::less<u32> less;
+ struct ListTraits : public cds::container::michael_list::traits {
+ typedef std::less<u32> less;
};
// Ordered list
- typedef cds::container::MichaelKVList< cds::gc::HP, u32, void*, ListTraits> OrderedList;
+ typedef cds::container::MichaelKVList<cds::gc::HP, u32, void*, ListTraits> OrderedList;
// Map traits
- struct MapTraits : public cds::container::michael_map::traits
- {
+ struct MapTraits : public cds::container::michael_map::traits {
struct hash {
- size_t operator()( u32 i ) const
- {
- return cds::opt::v::hash<u32>()( i );
+ size_t operator()(u32 i) const {
+ return cds::opt::v::hash<u32>()(i);
}
};
};
void* get(u32 key) {
void* result = NULL;
- m_map.find(key, [&result](std::pair<const u32, void*>& item){ result = item.second; });
+ m_map.find(key, [&result](std::pair<const u32, void*>& item) { result = item.second; });
return result;
}
class MapAdapter {
public:
static TURF_CONSTEXPR const char* MapName = "Junction Grampa map";
-
+
MapAdapter(ureg) {
}
class ThreadContext {
private:
QSBR::Context m_qsbrContext;
-
+
public:
ThreadContext(MapAdapter&, ureg) {
}
class MapAdapter {
public:
static TURF_CONSTEXPR const char* MapName = "Junction LeapFrog map";
-
+
MapAdapter(ureg) {
}
class ThreadContext {
private:
QSBR::Context m_qsbrContext;
-
+
public:
ThreadContext(MapAdapter&, ureg) {
}
-
+
void registerThread() {
m_qsbrContext = DefaultQSBR.createContext();
}
-
+
void unregisterThread() {
DefaultQSBR.destroyContext(m_qsbrContext);
}
-
+
void update() {
DefaultQSBR.update(m_qsbrContext);
}
};
-
+
typedef ConcurrentMap_LeapFrog<u32, void*> Map;
static ureg getInitialCapacity(ureg maxPopulation) {
class MapAdapter {
public:
static TURF_CONSTEXPR const char* MapName = "Junction Linear map";
-
+
MapAdapter(ureg) {
}
class ThreadContext {
private:
QSBR::Context m_qsbrContext;
-
+
public:
ThreadContext(MapAdapter&, ureg) {
}
-
+
void registerThread() {
m_qsbrContext = DefaultQSBR.createContext();
}
-
+
void unregisterThread() {
DefaultQSBR.destroyContext(m_qsbrContext);
}
-
+
void update() {
DefaultQSBR.update(m_qsbrContext);
}
};
-
+
typedef ConcurrentMap_Linear<u32, void*> Map;
static ureg getInitialCapacity(ureg maxPopulation) {
turf::LockGuard<turf::Mutex> guard(m_mutex);
return m_map.get(key);
}
-
+
void* erase(u32 key) {
turf::LockGuard<turf::Mutex> guard(m_mutex);
return m_map.erase(key);
turf::SharedLockGuard<turf::RWLock> guard(m_rwLock);
return m_map.get(key);
}
-
+
void erase(u32 key) {
turf::ExclusiveLockGuard<turf::RWLock> guard(m_rwLock);
m_map.erase(key);
class MapAdapter {
public:
static TURF_CONSTEXPR const char* MapName = "Null";
-
+
MapAdapter(ureg) {
}
MapType::iterator iter = m_map.find(key);
return (iter == m_map.end()) ? NULL : iter->second;
}
-
+
void erase(u32 key) {
std::lock_guard<std::mutex> guard(m_mutex);
m_map.erase(key);
} // namespace junction
#define JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER()
-#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) (junction::striped::DefaultConditionBank.get(objectPtr))
+#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) (junction::striped::DefaultConditionBank.get(objectPtr))
#else // JUNCTION_USE_STRIPING
//-----------------------------------
// Striping disabled
//-----------------------------------
-#define JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER() junction::striped::ConditionPair m_conditionPair;
-#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) ((objectPtr)->m_conditionPair)
+#define JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER() junction::striped::ConditionPair m_conditionPair;
+#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) ((objectPtr)->m_conditionPair)
#endif // JUNCTION_USE_STRIPING
}
void signal() {
- u8 prevState = m_state.fetchOr(Signaled, turf::Release); // Synchronizes-with the load in wait (fast path)
+ u8 prevState = m_state.fetchOr(Signaled, turf::Release); // Synchronizes-with the load in wait (fast path)
if (prevState & HasWaiters) {
ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this);
- turf::LockGuard<turf::Mutex> guard(pair.mutex); // Prevents the wake from occuring in the middle of wait()'s critical section
+ turf::LockGuard<turf::Mutex> guard(
+ pair.mutex); // Prevents the wake from occuring in the middle of wait()'s critical section
pair.condVar.wakeAll();
}
}
}
void wait() {
- u8 state = m_state.load(turf::Acquire); // Synchronizes-with the fetchOr in signal (fast path)
+ u8 state = m_state.load(turf::Acquire); // Synchronizes-with the fetchOr in signal (fast path)
if ((state & Signaled) == 0) {
ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this);
turf::LockGuard<turf::Mutex> guard(pair.mutex);
bool tryLock() {
return (m_status.compareExchange(-1, 0, turf::Acquire) < 0);
}
-
+
void unlock() {
if (m_status.exchange(-1, turf::Release) > 0)
m_event.signal();
#include "TestInsertDifferentKeys.h"
#include "TestChurn.h"
#include <turf/extra/Options.h>
-#include <junction/details/Grampa.h> // for GrampaStats
+#include <junction/details/Grampa.h> // for GrampaStats
static const ureg IterationsPerLog = 100;
junction::DefaultQSBR.flush();
junction::details::GrampaStats& stats = junction::details::GrampaStats::Instance;
printf("---------------------------\n");
- printf("numTables: %d/%d\n", (int) stats.numTables.current.load(turf::Relaxed), (int) stats.numTables.total.load(turf::Relaxed));
- printf("numTableMigrations: %d/%d\n", (int) stats.numTableMigrations.current.load(turf::Relaxed), (int) stats.numTableMigrations.total.load(turf::Relaxed));
- printf("numFlatTrees: %d/%d\n", (int) stats.numFlatTrees.current.load(turf::Relaxed), (int) stats.numFlatTrees.total.load(turf::Relaxed));
- printf("numFlatTreeMigrations: %d/%d\n", (int) stats.numFlatTreeMigrations.current.load(turf::Relaxed), (int) stats.numFlatTreeMigrations.total.load(turf::Relaxed));
+ printf("numTables: %d/%d\n", (int) stats.numTables.current.load(turf::Relaxed),
+ (int) stats.numTables.total.load(turf::Relaxed));
+ printf("numTableMigrations: %d/%d\n", (int) stats.numTableMigrations.current.load(turf::Relaxed),
+ (int) stats.numTableMigrations.total.load(turf::Relaxed));
+ printf("numFlatTrees: %d/%d\n", (int) stats.numFlatTrees.current.load(turf::Relaxed),
+ (int) stats.numFlatTrees.total.load(turf::Relaxed));
+ printf("numFlatTreeMigrations: %d/%d\n", (int) stats.numFlatTreeMigrations.current.load(turf::Relaxed),
+ (int) stats.numFlatTreeMigrations.total.load(turf::Relaxed));
#endif
}
u32 m_relativePrime;
std::vector<ThreadInfo> m_threads;
- TestChurn(TestEnvironment& env) : m_env(env), m_map(MapAdapter::getInitialCapacity(KeysInBlock * BlocksToMaintain * env.numThreads)) {
+ TestChurn(TestEnvironment& env)
+ : m_env(env), m_map(MapAdapter::getInitialCapacity(KeysInBlock * BlocksToMaintain * env.numThreads)) {
m_threads.resize(m_env.numThreads);
- m_rangePerThread = u32(-3) / m_env.numThreads; // from 2 to 0xffffffff inclusive
+ m_rangePerThread = u32(-3) / m_env.numThreads; // from 2 to 0xffffffff inclusive
TURF_ASSERT(KeysInBlock * (BlocksToMaintain + BlocksToLookup + 1) < m_rangePerThread);
u32 startIndex = 2;
for (ureg i = 0; i < m_env.numThreads; i++) {
TURF_ASSERT(thread.insertIndex != thread.eraseIndex);
for (sreg stepsRemaining = StepsPerIteration; stepsRemaining > 0; stepsRemaining--) {
switch (thread.phase) {
- case Phase_Insert: {
- for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) {
- u32 key = thread.insertIndex * m_relativePrime;
- key = key ^ (key >> 16);
- if (key >= 2) {
- m_map.insert(key, (void*) uptr(key));
- }
- if (++thread.insertIndex >= thread.rangeHi)
- thread.insertIndex = thread.rangeLo;
- TURF_ASSERT(thread.insertIndex != thread.eraseIndex);
+ case Phase_Insert: {
+ for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) {
+ u32 key = thread.insertIndex * m_relativePrime;
+ key = key ^ (key >> 16);
+ if (key >= 2) {
+ m_map.insert(key, (void*) uptr(key));
}
- thread.phase = Phase_Lookup;
- thread.lookupIndex = thread.insertIndex;
- thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1)));
- break;
+ if (++thread.insertIndex >= thread.rangeHi)
+ thread.insertIndex = thread.rangeLo;
+ TURF_ASSERT(thread.insertIndex != thread.eraseIndex);
}
- case Phase_Lookup: {
- sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock);
- thread.keysToCheck -= keysRemaining;
- for (; keysRemaining > 0; keysRemaining--) {
- if (thread.lookupIndex == thread.rangeLo)
- thread.lookupIndex = thread.rangeHi;
- thread.lookupIndex--;
- u32 key = thread.lookupIndex * m_relativePrime;
- key = key ^ (key >> 16);
- if (key >= 2) {
- if (m_map.get(key) != (void*) uptr(key))
- TURF_DEBUG_BREAK();
- }
- }
- if (thread.keysToCheck == 0) {
- thread.phase = Phase_Erase;
+ thread.phase = Phase_Lookup;
+ thread.lookupIndex = thread.insertIndex;
+ thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1)));
+ break;
+ }
+ case Phase_Lookup: {
+ sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock);
+ thread.keysToCheck -= keysRemaining;
+ for (; keysRemaining > 0; keysRemaining--) {
+ if (thread.lookupIndex == thread.rangeLo)
+ thread.lookupIndex = thread.rangeHi;
+ thread.lookupIndex--;
+ u32 key = thread.lookupIndex * m_relativePrime;
+ key = key ^ (key >> 16);
+ if (key >= 2) {
+ if (m_map.get(key) != (void*) uptr(key))
+ TURF_DEBUG_BREAK();
}
- break;
}
- case Phase_Erase: {
- for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) {
- u32 key = thread.eraseIndex * m_relativePrime;
- key = key ^ (key >> 16);
- if (key >= 2) {
- m_map.erase(key);
- }
- if (++thread.eraseIndex >= thread.rangeHi)
- thread.eraseIndex = thread.rangeLo;
- TURF_ASSERT(thread.insertIndex != thread.eraseIndex);
- }
- thread.phase = Phase_LookupDeleted;
- thread.lookupIndex = thread.eraseIndex;
- thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1)));
- break;
+ if (thread.keysToCheck == 0) {
+ thread.phase = Phase_Erase;
}
- case Phase_LookupDeleted: {
- sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock);
- thread.keysToCheck -= keysRemaining;
- for (; keysRemaining > 0; keysRemaining--) {
- if (thread.lookupIndex == thread.rangeLo)
- thread.lookupIndex = thread.rangeHi;
- thread.lookupIndex--;
- u32 key = thread.lookupIndex * m_relativePrime;
- key = key ^ (key >> 16);
- if (key >= 2) {
- if (m_map.get(key))
- TURF_DEBUG_BREAK();
- }
+ break;
+ }
+ case Phase_Erase: {
+ for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) {
+ u32 key = thread.eraseIndex * m_relativePrime;
+ key = key ^ (key >> 16);
+ if (key >= 2) {
+ m_map.erase(key);
}
- if (thread.keysToCheck == 0) {
- thread.phase = Phase_Insert;
+ if (++thread.eraseIndex >= thread.rangeHi)
+ thread.eraseIndex = thread.rangeLo;
+ TURF_ASSERT(thread.insertIndex != thread.eraseIndex);
+ }
+ thread.phase = Phase_LookupDeleted;
+ thread.lookupIndex = thread.eraseIndex;
+ thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1)));
+ break;
+ }
+ case Phase_LookupDeleted: {
+ sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock);
+ thread.keysToCheck -= keysRemaining;
+ for (; keysRemaining > 0; keysRemaining--) {
+ if (thread.lookupIndex == thread.rangeLo)
+ thread.lookupIndex = thread.rangeHi;
+ thread.lookupIndex--;
+ u32 key = thread.lookupIndex * m_relativePrime;
+ key = key ^ (key >> 16);
+ if (key >= 2) {
+ if (m_map.get(key))
+ TURF_DEBUG_BREAK();
}
- break;
}
+ if (thread.keysToCheck == 0) {
+ thread.phase = Phase_Insert;
+ }
+ break;
+ }
}
}
m_env.threads[threadIndex].update();
}
};
-
#endif // SAMPLES_MAPCORRECTNESSTESTS_TESTENVIRONMENT_H
public:
static const ureg KeysToInsert = 2048;
TestEnvironment& m_env;
- MapAdapter::Map *m_map;
+ MapAdapter::Map* m_map;
turf::extra::Random m_random;
u32 m_startIndex;
u32 m_relativePrime;
while (keysRemaining > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
m_map->insert(key, (void*) uptr(key));
keysRemaining--;
}
while (keysRemaining > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
m_map->erase(key);
keysRemaining--;
}
while (leftToCheck > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
if (m_map->get(key) != (void*) uptr(key))
TURF_DEBUG_BREAK();
actualChecksum += key;
for (MapAdapter::Map::Iterator iter(*m_map); iter.isValid(); iter.next()) {
TURF_DEBUG_BREAK();
}
-
+
for (ureg i = 0; i < m_env.numThreads; i++) {
u32 index = m_startIndex + i * (KeysToInsert + 2);
sreg leftToCheck = KeysToInsert;
while (leftToCheck > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
if (m_map->get(key))
TURF_DEBUG_BREAK();
leftToCheck--;
public:
static const ureg KeysToInsert = 2048;
TestEnvironment& m_env;
- MapAdapter::Map *m_map;
+ MapAdapter::Map* m_map;
turf::extra::Random m_random;
u32 m_startIndex;
u32 m_relativePrime;
while (keysRemaining > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
m_map->insert(key, (void*) uptr(key));
keysRemaining--;
}
while (keysRemaining > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
m_map->erase(key);
keysRemaining--;
}
while (leftToCheck > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
if (m_map->get(key) != (void*) uptr(key))
TURF_DEBUG_BREAK();
actualChecksum += key;
for (MapAdapter::Map::Iterator iter(*m_map); iter.isValid(); iter.next()) {
TURF_DEBUG_BREAK();
}
-
+
u32 index = m_startIndex;
sreg leftToCheck = KeysToInsert;
while (leftToCheck > 0) {
u32 key = index * m_relativePrime;
key = key ^ (key >> 16);
- if (key >= 2) { // Don't insert 0 or 1
+ if (key >= 2) { // Don't insert 0 or 1
if (m_map->get(key))
TURF_DEBUG_BREAK();
leftToCheck--;
turf::Atomic<u32> doneFlag;
SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
- : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
+ : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite),
+ itersPerChunk(itersPerChunk) {
delayFactor = 0.5f;
doneFlag.storeNonatomic(0);
}
Stats m_stats;
- ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
+ ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
+ : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
m_threadIndex = threadIndex;
m_rangeLo = rangeLo;
m_rangeHi = rangeHi;
void initialPopulate() {
TURF_ASSERT(m_addIndex == m_removeIndex);
- MapAdapter::Map *map = m_shared.map;
+ MapAdapter::Map* map = m_shared.map;
for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
u32 key = m_addIndex * Prime;
map->insert(key, (void*) (key & ~uptr(3)));
}
void run() {
- MapAdapter::Map *map = m_shared.map;
+ MapAdapter::Map* map = m_shared.map;
turf::CPUTimer::Converter converter;
Delay delay(m_shared.delayFactor);
Stats stats;
};
static const turf::extra::Option Options[] = {
- { "readsPerWrite", 'r', true, "number of reads per write" },
- { "itersPerChunk", 'i', true, "number of iterations per chunk" },
- { "chunks", 'c', true, "number of chunks to execute" },
- { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
+ {"readsPerWrite", 'r', true, "number of reads per write"},
+ {"itersPerChunk", 'i', true, "number of iterations per chunk"},
+ {"chunks", 'c', true, "number of chunks to execute"},
+ {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
};
int main(int argc, const char** argv) {
printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
printf("'chunks': %d,\n", (int) chunks);
printf("'keepChunkFraction': %f,\n", keepChunkFraction);
- printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"),
- printf("'points': [\n");
- for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f)
- {
+ printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
+ for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f) {
shared.delayFactor = delayFactor;
std::vector<ThreadState::Stats> kickTotals;
totals += kickTotals[t];
}
- printf(" (%f, %d, %d, %f),\n",
- shared.delayFactor,
- int(totals.workUnitsDone),
- int(totals.mapOpsDone),
- totals.duration);
+ printf(" (%f, %d, %d, %f),\n", shared.delayFactor, int(totals.workUnitsDone), int(totals.mapOpsDone),
+ totals.duration);
}
printf("],\n");
printf("}\n");
turf::Atomic<u32> doneFlag;
SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
- : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
+ : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite),
+ itersPerChunk(itersPerChunk) {
doneFlag.storeNonatomic(0);
numThreads = 0;
}
Stats m_stats;
- ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
+ ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
+ : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
m_threadIndex = threadIndex;
m_rangeLo = rangeLo;
m_rangeHi = rangeHi;
void initialPopulate() {
TURF_ASSERT(m_addIndex == m_removeIndex);
- MapAdapter::Map *map = m_shared.map;
+ MapAdapter::Map* map = m_shared.map;
for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
u32 key = m_addIndex * Prime;
if (key >= 2)
}
void run() {
- MapAdapter::Map *map = m_shared.map;
+ MapAdapter::Map* map = m_shared.map;
turf::CPUTimer::Converter converter;
Stats stats;
ureg lookupIndex = m_rangeLo;
};
static const turf::extra::Option Options[] = {
- { "readsPerWrite", 'r', true, "number of reads per write" },
- { "itersPerChunk", 'i', true, "number of iterations per chunk" },
- { "chunks", 'c', true, "number of chunks to execute" },
- { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
+ {"readsPerWrite", 'r', true, "number of reads per write"},
+ {"itersPerChunk", 'i', true, "number of iterations per chunk"},
+ {"chunks", 'c', true, "number of chunks to execute"},
+ {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
};
int main(int argc, const char** argv) {
turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
options.parse(argc, argv);
- ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
+ ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
ureg chunks = options.getInteger("chunks", DefaultChunks);
double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
printf("'chunks': %d,\n", (int) chunks);
printf("'keepChunkFraction': %f,\n", keepChunkFraction);
- printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"),
- printf("'points': [\n");
+ printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) {
if (shared.numThreads > 1) {
// Spawn and register a new thread
totals += kickTotals[t];
}
- printf(" (%d, %d, %f),\n",
- int(shared.numThreads),
- int(totals.mapOpsDone),
- totals.duration);
+ printf(" (%d, %d, %f),\n", int(shared.numThreads), int(totals.mapOpsDone), totals.duration);
}
printf("],\n");
printf("}\n");