From: bdemsky Date: Thu, 1 Mar 2018 14:24:34 +0000 (-0800) Subject: edits X-Git-Url: http://demsky.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=186eba16218f9a43229c2a08432fbd4704a74f53 edits --- diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index ce70324..f4280aa 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -2043,18 +2043,27 @@ bool Table::updateCommittedTable() { bool didProcessANewCommit = false; // Process the commits one by one - for (int64_t arbitratorId : liveCommitsTable->keySet()) { + SetIterator *> * liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator * clientit = getKeyIterator(commitForClientTable); + while(clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } @@ -2083,7 +2092,7 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { // Update the last transaction sequence number that the arbitrator arbitrated on1 - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2107,9 +2116,8 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2126,11 +2134,12 @@ bool Table::updateCommittedTable() { SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + Commit * commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); } delete kvit; } - commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated SetIterator * commitit = commitsToEdit->iterator(); @@ -2152,7 +2161,7 @@ bool Table::updateCommittedTable() { // if the commit is now dead then remove it if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit); + commitForClientTable->remove(previousCommit->getSequenceNumber()); } } } @@ -2180,8 +2189,9 @@ bool Table::updateCommittedTable() { } delete kvit; } - } - } + } +} +delete liveit; return didProcessANewCommit; } @@ -2191,15 +2201,22 @@ bool Table::updateCommittedTable() { * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // There is nothing to speculate on return false; } // Create a list of the transaction sequence numbers and sort them // from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator * trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while(trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2214,15 +2231,19 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; - } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + int startIndex = 0; + for(; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; + if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds return false; // did not speculate @@ -2291,8 +2312,12 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } // Find where to start arbitration from - int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; + int startIndex = 0; + for(; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; + if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; @@ -2320,38 +2345,44 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { - Transaction *transaction = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { - - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); + { + SetIterator * iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while(iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + } } + delete iter; } - + // Go through each of the transactions - for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus *status = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { - - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); - - // Remove - iter->remove(); + { + SetIterator * iter = getKeyIterator(outstandingTransactionStatus); + while(iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { + + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); + + // Remove + iter->remove(); + } } + delete iter; } } @@ -2471,9 +2502,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { // Create a list of clients to watch until they see this rejected // message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry > *lastMessageEntry : lastMessageTable->entrySet()) { + SetIterator *> * iter = getKeyIterator(lastMessageTable); + while(iter->hasNext()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); + int64_t lastMessageEntryMachineId = iter->next(); // We've seen it, don't need to continue to watch-> Our next // message will implicitly acknowledge it-> @@ -2481,8 +2513,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair lastMessageValue = lastMessageEntry->getValue(); - int64_t entrySequenceNumber = lastMessageValue.getFirst(); + Pair * lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); + int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this @@ -2493,6 +2525,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { deviceWatchSet->add(lastMessageEntryMachineId); } } + delete iter; + if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); @@ -2517,7 +2551,7 @@ void Table::processEntry(Abort *entry) { // Abort has not been seen by the client it is for yet so we need to // keep track of it - Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } @@ -2526,7 +2560,7 @@ void Table::processEntry(Abort *entry) { liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); } - if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) { + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead entry->setDead(); liveAbortTable->remove(&entry->getAbortId());