From 3f03aef15982f1fd4af0d277c152e1642adad1df Mon Sep 17 00:00:00 2001 From: bdemsky Date: Fri, 19 Jan 2018 23:50:45 -0800 Subject: [PATCH] edits --- version2/src/C/PendingTransaction.cc | 4 +- version2/src/C/Slot.cc | 8 +- version2/src/C/Table.cc | 275 ++++++++++++--------------- 3 files changed, 130 insertions(+), 157 deletions(-) diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index be7575f..b1f6db3 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -116,7 +116,7 @@ Transaction *PendingTransaction::createTransaction() { Array *charData = convertDataToBytes(); int currentPosition = 0; - for(int remaining = charData->length(); remaining > 0;) { + for (int remaining = charData->length(); remaining > 0;) { bool isLastPart = false; // determine how much to copy int copySize = TransactionPart_MAX_NON_HEADER_SIZE; @@ -124,7 +124,7 @@ Transaction *PendingTransaction::createTransaction() { copySize = remaining; isLastPart = true;//last bit of data so last part } - + // Copy to a smaller version Array *partData = new Array(copySize); System_arraycopy(charData, currentPosition, partData, 0, copySize); diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 3f75226..18d1a5d 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -112,8 +112,8 @@ Array *Slot::encode(Mac *mac) { bb->putLong(seqnum); bb->putLong(machineid); bb->putInt(entries->size()); - for(uint ei=0; ei < entries->size(); ei++) { - Entry * entry = entries->get(ei); + for (uint ei = 0; ei < entries->size(); ei++) { + Entry *entry = entries->get(ei); entry->encode(bb); } /* Compute our HMAC */ @@ -134,8 +134,8 @@ Array *Slot::encode(Mac *mac) { Vector *Slot::getLiveEntries(bool resize) { Vector *liveEntries = new Vector(); - for(uint ei=0; ei < entries->size(); ei++) { - Entry * entry = entries->get(ei); + for (uint ei = 0; ei < entries->size(); ei++) { + Entry *entry = entries->get(ei); if (entry->isLive()) { if (!resize || entry->getType() != TypeTableStatus) liveEntries->add(entry); diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 59c049a..2c8b8e8 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -1331,19 +1331,21 @@ search: */ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { - // The cloud communication layer has checked slot HMACs already before decoding + // The cloud communication layer has checked slot HMACs already + // before decoding if (newSlots->length() == 0) { return; } - // Make sure all slots are newer than the last largest slot this client has seen - int64_t firstSeqNum = newSlots[0]->getSequenceNumber(); + // Make sure all slots are newer than the last largest slot this + // client has seen + int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { throw new Error("Server Error: Sent older slots!"); } - // Create an object that can access both new slots and slots in our local chain - // without committing slots to our local chain + // Create an object that can access both new slots and slots in our + // local chain without committing slots to our local chain SlotIndexer *indexer = new SlotIndexer(newSlots, buffer); // Check that the HMAC chain is not broken @@ -1359,15 +1361,17 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal updateExpectedSize(); } - // If there is a gap, check to see if the server sent us everything-> + // If there is a gap, check to see if the server sent us + // everything-> if (firstSeqNum != (sequenceNumber + 1)) { // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap checkNumSlots(newSlots->length); - // Since there was a gap every machine must have pushed a slot or must have - // a last message message-> If not then the server is hiding slots + // Since there was a gap every machine must have pushed a slot or + // must have a last message message-> If not then the server is + // hiding slots if (!machineSet->isEmpty()) { throw new Error("Missing record for machines: " + machineSet); } @@ -1382,13 +1386,13 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Insert this slot into our local block chain copy-> buffer->putSlot(slot); - // Keep track of how many slots are currently live (have live data in them)-> + // Keep track of how many slots are currently live (have live data + // in them)-> liveSlotCount++; } // Get the sequence number of the latest slot in the system - sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber(); - + sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); updateLiveStateFromServer(); // No Need to remember after we pulled from the server @@ -1429,23 +1433,13 @@ void Table::updateLiveStateFromLocal() { } void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { - // if (didFindTableStatus) { - // return; - // } int64_t prevslots = firstSequenceNumber; - if (didFindTableStatus) { - // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; - // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); - } else { expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; - // System->out->println("Here: " + expectedsize); } - // System->out->println(numberOfSlots); - didFindTableStatus = true; currMaxSize = numberOfSlots; } @@ -1460,10 +1454,11 @@ void Table::updateExpectedSize() { /** - * Check the size of the block chain to make sure there are enough slots sent back by the server-> - * This is only called when we have a gap between the slots that we have locally and the slots - * sent by the server therefore in the slots sent by the server there will be at least 1 Table - * status message + * Check the size of the block chain to make sure there are enough + * slots sent back by the server-> This is only called when we have a + * gap between the slots that we have locally and the slots sent by + * the server therefore in the slots sent by the server there will be + * at least 1 Table status message */ void Table::checkNumSlots(int numberOfSlots) { if (numberOfSlots != expectedsize) { @@ -1490,13 +1485,14 @@ void Table::commitNewMaxSize() { // Change the number of local slots to the new size numberOfSlots = (int32_t)currMaxSize; - - // Recalculate the resize threshold since the size of the local buffer has changed + // Recalculate the resize threshold since the size of the local + // buffer has changed setResizeThreshold(); } /** - * Process the new transaction parts from this latest round of slots received from the server + * Process the new transaction parts from this latest round of slots + * received from the server */ void Table::processNewTransactionParts() { @@ -1505,7 +1501,8 @@ void Table::processNewTransactionParts() { return; } - // Iterate through all the machine Ids that we received new parts for + // Iterate through all the machine Ids that we received new parts + // for for (int64_t machineId : newTransactionParts->keySet()) { Hashtable, TransactionPart *> *parts = newTransactionParts->get(machineId); @@ -1537,7 +1534,8 @@ void Table::processNewTransactionParts() { } } - // Clear all the new transaction parts in preparation for the next time the server sends slots + // Clear all the new transaction parts in preparation for the next + // time the server sends slots newTransactionParts->clear(); } @@ -1564,7 +1562,8 @@ void Table::arbitrateFromServer() { - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + // Check if this machine arbitrates for this transaction if not + // then we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { continue; } @@ -1608,7 +1607,6 @@ void Table::arbitrateFromServer() { lastTransactionCommitted = transactionSequenceNumber; } else { // Guard evaluated was false so create abort - // Create the abort Abort *newAbort = new Abort(NULL, transaction->getClientLocalSequenceNumber(), @@ -1617,7 +1615,6 @@ void Table::arbitrateFromServer() { transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - generatedAborts->add(newAbort); // Insert the abort so we can process @@ -1625,15 +1622,12 @@ void Table::arbitrateFromServer() { } lastSeqNumArbOn = transactionSequenceNumber; - - // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber); } Commit *newCommit = NULL; // If there is something to commit if (speculativeTableTmp->size() != 0) { - // Create the commit and increment the commit sequence number newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); localArbitrationSequenceNumber++; @@ -1646,8 +1640,8 @@ void Table::arbitrateFromServer() { // create the commit parts newCommit->createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server - + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server // Insert the commit so we can process it for (CommitPart *commitPart : newCommit->getParts()->values()) { processEntry(commitPart); @@ -1671,7 +1665,8 @@ void Table::arbitrateFromServer() { Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + // Check if this machine arbitrates for this transaction if not then + // we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { return Pair(false, false); } @@ -1693,9 +1688,8 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) { - // Guard evaluated as true - - // Create the commit and increment the commit sequence number + // Guard evaluated as true Create the commit and increment the + // commit sequence number Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); localArbitrationSequenceNumber++; @@ -1707,7 +1701,8 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { // create the commit parts newCommit->createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); pendingSendArbitrationRounds->add(arbitrationRound); @@ -1733,10 +1728,8 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { updateLiveStateFromLocal(); return Pair(true, true); } else { - if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status - // Guard evaluated was false so create abort TransactionStatus status = transaction->getTransactionStatus(); if (status != NULL) { @@ -1745,7 +1738,6 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } else { Hashset *addAbortSet = new Hashset(); - // Create the abort Abort *newAbort = new Abort(NULL, transaction->getClientLocalSequenceNumber(), @@ -1754,11 +1746,10 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - addAbortSet->add(newAbort); - - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet); pendingSendArbitrationRounds->add(arbitrationRound); @@ -1776,10 +1767,11 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } /** - * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + * Compacts the arbitration data my merging commits and aggregating + * aborts so that a single large push of commits can be done instead + * of many small updates */ bool Table::compactArbitrationData() { - if (pendingSendArbitrationRounds->size() < 2) { // Nothing to compact so do nothing return false; @@ -1798,12 +1790,12 @@ bool Table::compactArbitrationData() { ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); if (round->isFull() || round->didSendPart()) { - // Stop since there is a part that cannot be compacted and we need to compact in order + // Stop since there is a part that cannot be compacted and we + // need to compact in order break; } if (round->getCommit() == NULL) { - // Try compacting aborts only int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); if (newSize > ArbitrationRound->MAX_PARTS) { @@ -1812,7 +1804,6 @@ bool Table::compactArbitrationData() { } lastRound->addAborts(round->getAborts()); } else { - // Create a new larger commit Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; @@ -1841,7 +1832,6 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction - // Delete the previous pieces that are now in the new compacted piece if (numberToDelete == pendingSendArbitrationRounds->size()) { pendingSendArbitrationRounds->clear(); @@ -1862,12 +1852,10 @@ bool Table::compactArbitrationData() { return false; } -// bool compactArbitrationData() { -// return false; -// } /** - * Update all the commits and the committed tables, sets dead the dead transactions + * Update all the commits and the committed tables, sets dead the dead + * transactions */ bool Table::updateCommittedTable() { @@ -1908,7 +1896,8 @@ bool Table::updateCommittedTable() { } } - // Clear all the new commits parts in preparation for the next time the server sends slots + // Clear all the new commits parts in preparation for the next time + // the server sends slots newCommitParts->clear(); // If we process a new commit keep track of it for future use @@ -1938,10 +1927,13 @@ bool Table::updateCommittedTable() { // Special processing if a commit is not complete if (!commit->isComplete()) { if (i == (commitSequenceNumbers->size() - 1)) { - // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits + // If there is an incomplete commit and this commit is the + // latest one seen then this commit cannot be processed and + // there are no other commits break; } else { - // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)-> + // This is a commit that was already dead but parts of it + // are still in the block chain (not flushed out yet)-> // Delete it and move on commit->setDead(); commitForClientTable->remove(commit->getSequenceNumber()); @@ -1972,7 +1964,8 @@ bool Table::updateCommittedTable() { lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); } - // We have already seen this commit before so need to do the full processing on this commit + // We have already seen this commit before so need to do the + // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { // Update the last transaction that was updated if we can @@ -1988,9 +1981,10 @@ bool Table::updateCommittedTable() { continue; } - // If we got here then this is a brand new commit and needs full processing - - // Get what commits should be edited, these are the commits that have live values for their keys + // If we got here then this is a brand new commit and needs full + // processing + // Get what commits should be edited, these are the commits that + // have live values for their keys Hashset *commitsToEdit = new Hashset(); for (KeyValue *kv : commit->getKeyValueUpdateSet()) { commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); @@ -2039,7 +2033,8 @@ bool Table::updateCommittedTable() { } /** - * Create the speculative table from transactions that are still live and have come from the cloud + * Create the speculative table from transactions that are still live + * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { @@ -2047,7 +2042,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { return false; } - // Create a list of the transaction sequence numbers and sort them from oldest to newest + // Create a list of the transaction sequence numbers and sort them + // from oldest to newest Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); Collections->sort(transactionSequenceNumbersSorted); @@ -2055,8 +2051,10 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { - // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction - // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch + // If there is a gap in the transaction sequence numbers then + // there was a commit or an abort of a transaction OR there was a + // new commit (Could be from offline commit) so a redo the + // speculation from scratch // Start from scratch speculatedKeyValueTable->clear(); @@ -2084,8 +2082,9 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); if (!transaction->isComplete()) { - // If there is an incomplete transaction then there is nothing we can do - // add this transactions arbitrator to the list of arbitrators we should ignore + // If there is an incomplete transaction then there is nothing + // we can do add this transactions arbitrator to the list of + // arbitrators we should ignore incompleteTransactionArbitrator->add(transaction->getArbitrator()); didSkip = true; continue; @@ -2116,7 +2115,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } /** - * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer + * Create the pending transaction speculative table from transactions + * that are still in the pending transaction buffer */ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { if (pendingTransactionQueue->size() == 0) { @@ -2154,7 +2154,8 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } /** - * Set dead and remove from the live transaction tables the transactions that are dead + * Set dead and remove from the live transaction tables the + * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { @@ -2203,35 +2204,27 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo // Process each entry in the slot for (Entry *entry : slot->getEntries()) { switch (entry->getType()) { - case TypeCommitPart: processEntry((CommitPart *)entry); break; - case TypeAbort: processEntry((Abort *)entry); break; - case TypeTransactionPart: processEntry((TransactionPart *)entry); break; - case TypeNewKey: processEntry((NewKey *)entry); break; - case TypeLastMessage: processEntry((LastMessage *)entry, machineSet); break; - case TypeRejectedMessage: processEntry((RejectedMessage *)entry, indexer); break; - case TypeTableStatus: processEntry((TableStatus *)entry, slot->getSequenceNumber()); break; - default: throw new Error("Unrecognized type: " + entry->getType()); } @@ -2247,10 +2240,10 @@ void Table::processEntry(LastMessage *entry, Hashset *machineSet) { } /** - * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) + * Add the new key to the arbitrators table and update the set of live + * new keys (in case of a rescued new key message) */ void Table::processEntry(NewKey *entry) { - // Update the arbitrator table with the new key information arbitratorTable->put(entry->getKey(), entry->getMachineID()); @@ -2263,18 +2256,19 @@ void Table::processEntry(NewKey *entry) { } /** - * Process new table status entries and set dead the old ones as new ones come in-> - * keeps track of the largest and smallest table status seen in this current round - * of updating the local copy of the block chain + * Process new table status entries and set dead the old ones as new + * ones come in-> keeps track of the largest and smallest table status + * seen in this current round of updating the local copy of the block + * chain */ void Table::processEntry(TableStatus entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); - initExpectedSize(seq, newNumSlots); if (liveTableStatus != NULL) { - // We have a larger table status so the old table status is no int64_ter alive + // We have a larger table status so the old table status is no + // int64_ter alive liveTableStatus->setDead(); } @@ -2283,7 +2277,8 @@ void Table::processEntry(TableStatus entry, int64_t seq) { } /** - * Check old messages to see if there is a block chain violation-> Also + * Check old messages to see if there is a block chain violation-> + * Also */ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t oldSeqNum = entry->getOldSeqNum(); @@ -2292,16 +2287,15 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t machineId = entry->getMachineID(); int64_t seq = entry->getSequenceNumber(); - - // Check if we have messages that were supposed to be rejected in our local block chain + // Check if we have messages that were supposed to be rejected in + // our local block chain for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { - // Get the slot Slot *slot = indexer->getSlot(seqNum); if (slot != NULL) { - // If we have this slot make sure that it was not supposed to be a rejected slot - + // If we have this slot make sure that it was not supposed to be + // a rejected slot int64_t slotMachineId = slot->getMachineID(); if (isequal != (slotMachineId == machineId)) { throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); @@ -2309,11 +2303,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { } } - - // Create a list of clients to watch until they see this rejected message entry-> + // Create a list of clients to watch until they see this rejected + // message entry-> Hashset *deviceWatchSet = new Hashset(); for (Map->Entry > *lastMessageEntry : lastMessageTable->entrySet()) { - // Machine ID for the last message entry int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); @@ -2327,15 +2320,14 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { - - // Add this rejected message to the set of messages that this machine ID did not see yet + // Add this rejected message to the set of messages that this + // machine ID did not see yet addWatchVector(lastMessageEntryMachineId, entry); - - // This client did not see this rejected message yet so add it to the watch set to monitor + // This client did not see this rejected message yet so add it + // to the watch set to monitor deviceWatchSet->add(lastMessageEntryMachineId); } } - if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); @@ -2346,12 +2338,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { } /** - * Check if this abort is live, if not then save it so we can kill it later-> - * update the last transaction number that was arbitrated on-> + * Check if this abort is live, if not then save it so we can kill it + * later-> update the last transaction number that was arbitrated on-> */ void Table::processEntry(Abort *entry) { - - if (entry->getTransactionSequenceNumber() != -1) { // update the transaction status if it was sent to the server TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); @@ -2360,7 +2350,8 @@ void Table::processEntry(Abort *entry) { } } - // Abort has not been seen by the client it is for yet so we need to keep track of it + // Abort has not been seen by the client it is for yet so we need to + // keep track of it Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version @@ -2371,7 +2362,6 @@ void Table::processEntry(Abort *entry) { } if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { - // The machine already saw this so it is dead entry->setDead(); liveAbortTable->remove(entry->getAbortId()); @@ -2379,13 +2369,11 @@ void Table::processEntry(Abort *entry) { if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); } - return; } // Update the last arbitration data that we have seen so far if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -2396,17 +2384,16 @@ void Table::processEntry(Abort *entry) { lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); } - // Set dead a transaction if we can Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } - // Update the last transaction sequence number that the arbitrator arbitrated on + // Update the last transaction sequence number that the arbitrator + // arbitrated on int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { - // Is a valid one if (entry->getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); @@ -2415,10 +2402,12 @@ void Table::processEntry(Abort *entry) { } /** - * Set dead the transaction part if that transaction is dead and keep track of all new parts + * Set dead the transaction part if that transaction is dead and keep + * track of all new parts */ void Table::processEntry(TransactionPart *entry) { - // Check if we have already seen this transaction and set it dead OR if it is not alive + // Check if we have already seen this transaction and set it dead OR + // if it is not alive int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted @@ -2435,7 +2424,8 @@ void Table::processEntry(TransactionPart *entry) { newTransactionParts->put(entry->getMachineId(), transactionPart); } - // Update the part and set dead ones we have already seen (got a rescued version) + // Update the part and set dead ones we have already seen (got a + // rescued version) TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); @@ -2449,25 +2439,21 @@ void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on + // Update the last transaction sequence number that the arbitrator + // arbitrated on if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - - - Hashtable, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); - if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one commitPart = new Hashtable, CommitPart *>(); newCommitParts->put(entry->getMachineId(), commitPart); } - - // Update the part and set dead ones we have already seen (got a rescued version) + // Update the part and set dead ones we have already seen (got a + // rescued version) CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); @@ -2475,33 +2461,29 @@ void Table::processEntry(CommitPart *entry) { } /** - * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them-> - * Updates the live aborts, removes those that are dead and sets them dead-> - * Check that the last message seen is correct and that there is no mismatch of our own last message or that - * other clients have not had a rollback on the last message-> + * Update the last message seen table-> Update and set dead the + * appropriate RejectedMessages as clients see them-> Updates the live + * aborts, removes those that are dead and sets them dead-> Check that + * the last message seen is correct and that there is no mismatch of + * our own last message or that other clients have not had a rollback + * on the last message-> */ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { - // We have seen this machine ID machineSet->remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet Hashset *watchset = rejectedMessageWatchVectorTable->get(machineId); - // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { - - // Go through each rejected message that this machine Id has not seen yet + // Go through each rejected message that this machine Id has not + // seen yet for (Iterator *rmit = watchset->iterator(); rmit->hasNext(); ) { - RejectedMessage *rm = rmit->next(); - // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { - // Remove it from our watchlist rmit->remove(); - // Decrement machines that need to see this notification rm->removeWatcher(machineId); } @@ -2511,19 +2493,14 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven // Set dead the abort for (IteratorEntry, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { Abort *abort = i->next()->getValue(); - if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); i->remove(); - if (abort->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } - - - if (machineId == localMachineId) { // Our own messages are immediately dead-> if (liveness instanceof LastMessage) { @@ -2534,7 +2511,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven throw new Error("Unrecognized type"); } } - // Get the old last message for this device Pair lastMessageEntry = lastMessageTable->put(machineId, Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { @@ -2555,16 +2531,13 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven throw new Error("Unrecognized type"); } } - // Make sure the server is not playing any games if (machineId == localMachineId) { - if (hadPartialSendToServer) { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); } - } else { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { @@ -2579,8 +2552,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } /** - * Add a rejected message entry to the watch set to keep track of which clients have seen that - * rejected message entry and which have not-> + * Add a rejected message entry to the watch set to keep track of + * which clients have seen that rejected message entry and which have + * not. */ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); @@ -2596,12 +2570,11 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { * Check if the HMAC chain is not violated */ void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { - for (int i = 0; i < newSlots->length; i++) { - Slot *currSlot = newSlots[i]; + for (int i = 0; i < newSlots->length(); i++) { + Slot *currSlot = newSlots->get(i); Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL && !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC())) throw new Error("Server Error: Invalid HMAC Chain"); } } - -- 2.34.1