1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_DETAILS_LEAPFROG_H
14 #define JUNCTION_DETAILS_LEAPFROG_H
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
33 TURF_TRACE_DECLARE(LeapFrog, 34)
37 typedef typename Map::Hash Hash;
38 typedef typename Map::Value Value;
39 typedef typename Map::KeyTraits KeyTraits;
40 typedef typename Map::ValueTraits ValueTraits;
42 static const ureg InitialSize = 8;
43 static const ureg TableMigrationUnitSize = 32;
44 static const ureg LinearSearchLimit = 128;
45 static const ureg CellsInUseSample = LinearSearchLimit;
46 TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
47 TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
50 turf::Atomic<Hash> hash;
51 turf::Atomic<Value> value;
55 // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
56 // Each cell in the probe chain is located within the table itself.
57 // "deltas" determines the index of the next cell in the probe chain.
58 // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
59 // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
60 // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
61 turf::Atomic<u8> deltas[8];
66 const ureg sizeMask; // a power of two minus one
67 turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
68 SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
70 Table(ureg sizeMask) : sizeMask(sizeMask) {
73 static Table* create(ureg tableSize) {
74 TURF_ASSERT(turf::util::isPowerOf2(tableSize));
75 TURF_ASSERT(tableSize >= 4);
76 ureg numGroups = tableSize >> 2;
77 Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
78 new (table) Table(tableSize - 1);
79 for (ureg i = 0; i < numGroups; i++) {
80 CellGroup* group = table->getCellGroups() + i;
81 for (ureg j = 0; j < 4; j++) {
82 group->deltas[j].storeNonatomic(0);
83 group->deltas[j + 4].storeNonatomic(0);
84 group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
85 group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
92 this->Table::~Table();
96 CellGroup* getCellGroups() const {
97 return (CellGroup*) (this + 1);
100 ureg getNumMigrationUnits() const {
101 return sizeMask / TableMigrationUnitSize + 1;
105 class TableMigration : public SimpleJobCoordinator::Job {
109 turf::Atomic<ureg> sourceIndex;
113 Table* m_destination;
114 turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
115 turf::Atomic<bool> m_overflowed;
116 turf::Atomic<sreg> m_unitsRemaining;
119 TableMigration(Map& map) : m_map(map) {
122 static TableMigration* create(Map& map, ureg numSources) {
123 TableMigration* migration =
124 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
125 new (migration) TableMigration(map);
126 migration->m_workerStatus.storeNonatomic(0);
127 migration->m_overflowed.storeNonatomic(false);
128 migration->m_unitsRemaining.storeNonatomic(0);
129 migration->m_numSources = numSources;
130 // Caller is responsible for filling in sources & destination
134 virtual ~TableMigration() TURF_OVERRIDE {
138 // Destroy all source tables.
139 for (ureg i = 0; i < m_numSources; i++)
140 if (getSources()[i].table)
141 getSources()[i].table->destroy();
142 // Delete the migration object itself.
143 this->TableMigration::~TableMigration();
144 TURF_HEAP.free(this);
147 Source* getSources() const {
148 return (Source*) (this + 1);
151 bool migrateRange(Table* srcTable, ureg startIdx);
152 virtual void run() TURF_OVERRIDE;
155 static Cell* find(Hash hash, Table* table) {
156 TURF_TRACE(LeapFrog, 0, "[find] called", uptr(table), hash);
158 TURF_ASSERT(hash != KeyTraits::NullHash);
159 ureg sizeMask = table->sizeMask;
160 // Optimistically check hashed cell even though it might belong to another bucket
161 ureg idx = hash & sizeMask;
162 CellGroup* group = table->getCellGroups() + (idx >> 2);
163 Cell* cell = group->cells + (idx & 3);
164 Hash probeHash = cell->hash.load(turf::Relaxed);
165 if (probeHash == hash) {
166 TURF_TRACE(LeapFrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
168 } else if (probeHash == KeyTraits::NullHash) {
171 // Follow probe chain for our bucket
172 u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
174 idx = (idx + delta) & sizeMask;
175 group = table->getCellGroups() + (idx >> 2);
176 cell = group->cells + (idx & 3);
177 Hash probeHash = cell->hash.load(turf::Relaxed);
178 // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
179 // but we don't check for it. We just follow the probe chain.
180 if (probeHash == hash) {
181 TURF_TRACE(LeapFrog, 2, "[find] found existing cell", uptr(table), idx);
184 delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
186 // End of probe chain, not found
190 // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
191 enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
192 static InsertResult insert(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
193 TURF_TRACE(LeapFrog, 3, "[insert] called", uptr(table), hash);
195 TURF_ASSERT(hash != KeyTraits::NullHash);
196 ureg sizeMask = table->sizeMask;
197 ureg idx = ureg(hash);
199 // Check hashed cell first, though it may not even belong to the bucket.
200 CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
201 cell = group->cells + (idx & 3);
202 Hash probeHash = cell->hash.load(turf::Relaxed);
203 if (probeHash == KeyTraits::NullHash) {
204 if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
205 TURF_TRACE(LeapFrog, 4, "[insert] reserved first cell", uptr(table), idx);
206 // There are no links to set. We're done.
207 return InsertResult_InsertedNew;
209 TURF_TRACE(LeapFrog, 5, "[insert] race to reserve first cell", uptr(table), idx);
210 // Fall through to check if it was the same hash...
213 if (probeHash == hash) {
214 TURF_TRACE(LeapFrog, 6, "[insert] found in first cell", uptr(table), idx);
215 return InsertResult_AlreadyFound;
218 // Follow the link chain for this bucket.
219 ureg maxIdx = idx + sizeMask;
221 turf::Atomic<u8>* prevLink;
224 prevLink = group->deltas + ((idx & 3) + linkLevel);
226 u8 probeDelta = prevLink->load(turf::Relaxed);
229 // Check the hash for this cell.
230 group = table->getCellGroups() + ((idx & sizeMask) >> 2);
231 cell = group->cells + (idx & 3);
232 probeHash = cell->hash.load(turf::Relaxed);
233 if (probeHash == KeyTraits::NullHash) {
234 // Cell was linked, but hash is not visible yet.
235 // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
236 // just poll until it becomes visible.
237 TURF_TRACE(LeapFrog, 7, "[insert] race to read hash", uptr(table), idx);
239 probeHash = cell->hash.load(turf::Acquire);
240 } while (probeHash == KeyTraits::NullHash);
242 TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
243 if (probeHash == hash) {
244 TURF_TRACE(LeapFrog, 8, "[insert] found in probe chain", uptr(table), idx);
245 return InsertResult_AlreadyFound;
248 // Reached the end of the link chain for this bucket.
249 // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
250 ureg prevLinkIdx = idx;
251 TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
252 ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
253 while (linearProbesRemaining-- > 0) {
255 group = table->getCellGroups() + ((idx & sizeMask) >> 2);
256 cell = group->cells + (idx & 3);
257 probeHash = cell->hash.load(turf::Relaxed);
258 if (probeHash == KeyTraits::NullHash) {
259 // It's an empty cell. Try to reserve it.
260 if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
261 // Success. We've reserved the cell. Link it to previous cell in same bucket.
262 TURF_TRACE(LeapFrog, 9, "[insert] reserved cell", uptr(table), idx);
263 TURF_ASSERT(probeDelta == 0);
264 u8 desiredDelta = idx - prevLinkIdx;
265 #if TURF_WITH_ASSERTS
266 probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
267 TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
269 prevLink->store(desiredDelta, turf::Relaxed);
271 return InsertResult_InsertedNew;
273 TURF_TRACE(LeapFrog, 10, "[insert] race to reserve cell", uptr(table), idx);
274 // Fall through to check if it's the same hash...
277 Hash x = (probeHash ^ hash);
278 // Check for same hash.
280 TURF_TRACE(LeapFrog, 11, "[insert] found outside probe chain", uptr(table), idx);
281 return InsertResult_AlreadyFound;
283 // Check for same bucket.
284 if ((x & sizeMask) == 0) {
285 TURF_TRACE(LeapFrog, 12, "[insert] found late-arriving cell in same bucket", uptr(table), idx);
286 // Attempt to set the link on behalf of the late-arriving cell.
287 // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
288 // there's no guarantee that our own link chain will be well-formed by the time this function returns.
289 // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
290 u8 desiredDelta = idx - prevLinkIdx;
291 #if TURF_WITH_ASSERTS
292 probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
293 TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
295 TURF_TRACE(LeapFrog, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
297 prevLink->store(desiredDelta, turf::Relaxed);
299 goto followLink; // Try to follow link chain for the bucket again.
301 // Continue linear search...
303 // Table is too full to insert.
304 overflowIdx = idx + 1;
305 TURF_TRACE(LeapFrog, 14, "[insert] overflow", uptr(table), overflowIdx);
306 return InsertResult_Overflow;
311 static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
312 // Create new migration by DCLI.
313 TURF_TRACE(LeapFrog, 15, "[beginTableMigrationToSize] called", 0, 0);
314 SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
316 TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
318 turf::LockGuard<turf::Mutex> guard(table->mutex);
319 job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
321 TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
323 // Create new migration.
324 TableMigration* migration = TableMigration::create(map, 1);
325 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
326 migration->getSources()[0].table = table;
327 migration->getSources()[0].sourceIndex.storeNonatomic(0);
328 migration->m_destination = Table::create(nextTableSize);
329 // Publish the new migration.
330 table->jobCoordinator.storeRelease(migration);
335 static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) {
338 TURF_TRACE(LeapFrog, 18, "[beginTableMigration] forced to double", 0, 0);
339 nextTableSize = (table->sizeMask + 1) * 2;
341 // Estimate number of cells in use based on a small sample.
342 ureg sizeMask = table->sizeMask;
343 ureg idx = overflowIdx - CellsInUseSample;
345 for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
346 CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
347 Cell* cell = group->cells + (idx & 3);
348 Value value = cell->value.load(turf::Relaxed);
349 if (value == Value(ValueTraits::Redirect)) {
350 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
351 TURF_TRACE(LeapFrog, 19, "[beginTableMigration] redirected while determining table size", 0, 0);
354 if (value != Value(ValueTraits::NullValue))
358 float inUseRatio = float(inUseCells) / CellsInUseSample;
359 float estimatedInUse = (sizeMask + 1) * inUseRatio;
360 #if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
361 // Periodically underestimate the number of cells in use.
362 // This exercises the code that handles overflow during migration.
363 static ureg counter = 1;
364 if ((++counter & 3) == 0) {
368 nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
370 beginTableMigrationToSize(map, table, nextTableSize);
375 bool LeapFrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
376 ureg srcSizeMask = srcTable->sizeMask;
377 ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
378 // Iterate over source range.
379 for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
380 CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
381 Cell* srcCell = srcGroup->cells + (srcIdx & 3);
384 // Fetch the srcHash and srcValue.
386 srcHash = srcCell->hash.load(turf::Relaxed);
387 if (srcHash == KeyTraits::NullHash) {
388 // An unused cell. Try to put a Redirect marker in its value.
390 srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
391 if (srcValue == Value(ValueTraits::Redirect)) {
392 // srcValue is already marked Redirect due to previous incomplete migration.
393 TURF_TRACE(LeapFrog, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
396 if (srcValue == Value(ValueTraits::NullValue))
397 break; // Redirect has been placed. Break inner loop, continue outer loop.
398 TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
399 // Otherwise, somebody just claimed the cell. Read srcHash again...
401 // Check for deleted/uninitialized value.
402 srcValue = srcCell->value.load(turf::Relaxed);
403 if (srcValue == Value(ValueTraits::NullValue)) {
404 // Try to put a Redirect marker.
405 if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
406 break; // Redirect has been placed. Break inner loop, continue outer loop.
407 TURF_TRACE(LeapFrog, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
408 if (srcValue == Value(ValueTraits::Redirect)) {
409 // FIXME: I don't think this will happen. Investigate & change to assert
410 TURF_TRACE(LeapFrog, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
413 } else if (srcValue == Value(ValueTraits::Redirect)) {
414 // srcValue is already marked Redirect due to previous incomplete migration.
415 TURF_TRACE(LeapFrog, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
419 // We've got a key/value pair to migrate.
420 // Reserve a destination cell in the destination.
421 TURF_ASSERT(srcHash != KeyTraits::NullHash);
422 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
423 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
426 InsertResult result = insert(srcHash, m_destination, dstCell, overflowIdx);
427 // During migration, a hash can only exist in one place among all the source tables,
428 // and it is only migrated by one thread. Therefore, the hash will never already exist
429 // in the destination table:
430 TURF_ASSERT(result != InsertResult_AlreadyFound);
431 if (result == InsertResult_Overflow) {
432 // Destination overflow.
433 // This can happen for several reasons. For example, the source table could have
434 // existed of all deleted cells when it overflowed, resulting in a small destination
435 // table size, but then another thread could re-insert all the same hashes
436 // before the migration completed.
437 // Caller will cancel the current migration and begin a new one.
440 // Migrate the old value to the new cell.
442 // Copy srcValue to the destination.
443 dstCell->value.store(srcValue, turf::Relaxed);
444 // Try to place a Redirect marker in srcValue.
445 Value doubleCheckedSrcValue =
446 srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
447 TURF_ASSERT(doubleCheckedSrcValue !=
448 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
449 if (doubleCheckedSrcValue == srcValue) {
450 // No racing writes to the src. We've successfully placed the Redirect marker.
451 // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
452 // by a late-arriving erase.
453 if (srcValue == Value(ValueTraits::NullValue))
454 TURF_TRACE(LeapFrog, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
457 // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
458 TURF_TRACE(LeapFrog, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
459 srcValue = doubleCheckedSrcValue;
461 // Cell successfully migrated. Proceed to next source cell.
466 // Range has been migrated successfully.
471 void LeapFrog<Map>::TableMigration::run() {
472 // Conditionally increment the shared # of workers.
473 ureg probeStatus = m_workerStatus.load(turf::Relaxed);
475 if (probeStatus & 1) {
476 // End flag is already set, so do nothing.
477 TURF_TRACE(LeapFrog, 27, "[TableMigration::run] already ended", uptr(this), 0);
480 } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
481 // # of workers has been incremented, and the end flag is clear.
482 TURF_ASSERT((probeStatus & 1) == 0);
484 // Iterate over all source tables.
485 for (ureg s = 0; s < m_numSources; s++) {
486 Source& source = getSources()[s];
487 // Loop over all migration units in this source table.
489 if (m_workerStatus.load(turf::Relaxed) & 1) {
490 TURF_TRACE(LeapFrog, 28, "[TableMigration::run] detected end flag set", uptr(this), 0);
493 ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
494 if (startIdx >= source.table->sizeMask + 1)
495 break; // No more migration units in this table. Try next source table.
496 bool overflowed = !migrateRange(source.table, startIdx);
498 // *** FAILED MIGRATION ***
499 // TableMigration failed due to destination table overflow.
500 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
501 // hence m_unitsRemaining won't reach zero.
502 // However, multiple threads can independently detect a failed migration at the same time.
503 TURF_TRACE(LeapFrog, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
504 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
505 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
507 // that deals with it.
508 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
510 TURF_TRACE(LeapFrog, 30, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
511 uptr(oldOverflowed));
512 m_workerStatus.fetchOr(1, turf::Relaxed);
515 sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
516 TURF_ASSERT(prevRemaining > 0);
517 if (prevRemaining == 1) {
518 // *** SUCCESSFUL MIGRATION ***
519 // That was the last chunk to migrate.
520 m_workerStatus.fetchOr(1, turf::Relaxed);
525 TURF_TRACE(LeapFrog, 31, "[TableMigration::run] out of migration units", uptr(this), 0);
528 // Decrement the shared # of workers.
529 probeStatus = m_workerStatus.fetchSub(
530 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
531 if (probeStatus >= 4) {
532 // There are other workers remaining. Return here so that only the very last worker will proceed.
533 TURF_TRACE(LeapFrog, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
537 // We're the very last worker thread.
538 // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
539 TURF_ASSERT(probeStatus == 3);
540 bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
542 // The migration succeeded. This is the most likely outcome. Publish the new subtree.
543 m_map.publishTableMigration(this);
544 // End the jobCoodinator.
545 getSources()[0].table->jobCoordinator.end();
547 // The migration failed due to the overflow of the destination table.
548 Table* origTable = getSources()[0].table;
549 turf::LockGuard<turf::Mutex> guard(origTable->mutex);
550 SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
551 if (checkedJob != this) {
552 TURF_TRACE(LeapFrog, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
555 TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
556 // Double the destination table size.
557 migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
558 // Transfer source tables to the new migration.
559 for (ureg i = 0; i < m_numSources; i++) {
560 migration->getSources()[i].table = getSources()[i].table;
561 getSources()[i].table = NULL;
562 migration->getSources()[i].sourceIndex.storeNonatomic(0);
564 migration->getSources()[m_numSources].table = m_destination;
565 migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
566 // Calculate total number of migration units to move.
567 ureg unitsRemaining = 0;
568 for (ureg s = 0; s < migration->m_numSources; s++)
569 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
570 migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
571 // Publish the new migration.
572 origTable->jobCoordinator.storeRelease(migration);
576 // We're done with this TableMigration. Queue it for GC.
577 DefaultQSBR.enqueue(&TableMigration::destroy, this);
580 } // namespace details
581 } // namespace junction
583 #endif // JUNCTION_DETAILS_LEAPFROG_H