localArbitrationSequenceNumber++;
// Add all the new keys to the commit
- for (KeyValue *kv : speculativeTableTmp->values()) {
+ SetIterator<IoTString *, KeyValue *> * spit = getKeyIterator(speculativeTableTmp);
+ while(spit->hasNext()) {
+ IoTString * string = spit->next();
+ KeyValue * kv = speculativeTableTmp->get(string);
newCommit->addKV(kv);
}
-
+ delete spit;
+
// create the commit parts
newCommit->createCommitParts();
// Append all the commit parts to the end of the pending queue
// waiting for sending to the server
// Insert the commit so we can process it
- for (CommitPart *commitPart : newCommit->getParts()->values()) {
+ Vector<CommitPart *> * parts = newCommit->getParts();
+ uint partsSize = parts->size();
+ for(uint i=0; i<partsSize; i++) {
+ CommitPart * commitPart = parts->get(i);
processEntry(commitPart);
}
}
if (compactArbitrationData()) {
ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
if (newArbitrationRound->getCommit() != NULL) {
- for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
+ Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
+ uint partsSize = parts->size();
+ for(uint i=0; i<partsSize; i++) {
+ CommitPart * commitPart = parts->get(i);
processEntry(commitPart);
}
}
if (transaction->getMachineId() != localMachineId) {
// dont do this check for local transactions
- if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
+ if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
// We've have already seen this from the server
return Pair<bool, bool>(false, false);
if (compactArbitrationData()) {
ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
- for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
+ Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
+ uint partsSize = parts->size();
+ for(uint i=0; i<partsSize; i++) {
+ CommitPart * commitPart = parts->get(i);
processEntry(commitPart);
}
} else {
// Insert the commit so we can process it
- for (CommitPart *commitPart : newCommit->getParts()->values()) {
+ Vector<CommitPart *> * parts = newCommit->getParts();
+ uint partsSize = parts->size();
+ for(uint i=0; i<partsSize; i++) {
+ CommitPart * commitPart = parts->get(i);
processEntry(commitPart);
}
}
if (compactArbitrationData()) {
ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
- for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
+
+ Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
+ uint partsSize = parts->size();
+ for(uint i=0; i<partsSize; i++) {
+ CommitPart * commitPart = parts->get(i);
processEntry(commitPart);
}
}
}
// Iterate through all the machine Ids that we received new parts for
- for (int64_t machineId : newCommitParts->keySet()) {
- Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
+ SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts);
+ while(partsit->hasNext()) {
+ int64_t machineId = partsit->next();
+ Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
// Iterate through all the parts for that machine Id
- for (Pair<int64_t, int32_t> partId : parts->keySet()) {
+ SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts);
+ while(pairit->hasNext()) {
+ Pair<int64_t, int32_t> * partId = pairit->next();
CommitPart *part = parts->get(partId);
// Get the transaction object for that sequence number
// Add that part to the commit
commit->addPartDecode(part);
}
+ delete pairit;
}
-
+ delete partsit;
+
// Clear all the new commits parts in preparation for the next time
// the server sends slots
newCommitParts->clear();
bool didProcessANewCommit = false;
// Process the commits one by one
- for (int64_T arbitratorId : liveCommitsTable->keySet()) {
+ for (int64_t arbitratorId : liveCommitsTable->keySet()) {
// Get all the commits for a specific arbitrator
Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
// Update the last transaction that was updated if we can
if (commit->getTransactionSequenceNumber() != -1) {
- int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
-
- // Update the last transaction sequence number that the arbitrator arbitrated on
- if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
+ // Update the last transaction sequence number that the arbitrator arbitrated on1
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber())) {
lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
}
}
commitsToEdit->remove(NULL); // remove NULL since it could be in this set
// Update each previous commit that needs to be updated
- for (Commit *previousCommit : commitsToEdit) {
+ SetIterator<Commit *, Commit *> * commitit = commitsToEdit->iterator();
+ while(commitit->hasNext()) {
+ Commit *previousCommit = commitit->next();
// Only bother with live commits (TODO: Maybe remove this check)
if (previousCommit->isLive()) {
}
}
}
+ delete commitit;
// Update the last seen sequence number from this arbitrator
- if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
+ if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
}
// Update the part and set dead ones we have already seen (got a
// rescued version)
- TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
+ TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
if (previouslySeenPart != NULL) {
previouslySeenPart->setDead();
}
void Table::processEntry(CommitPart *entry) {
// Update the last transaction that was updated if we can
if (entry->getTransactionSequenceNumber() != -1) {
- int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
- // Update the last transaction sequence number that the arbitrator
- // arbitrated on
- if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
}
}
- Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
+ Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
if (commitPart == NULL) {
// Don't have a table for this machine Id yet so make one
- commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
+ commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
newCommitParts->put(entry->getMachineId(), commitPart);
}
// Update the part and set dead ones we have already seen (got a
// rescued version)
- CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
+ CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
if (previouslySeenPart != NULL) {
previouslySeenPart->setDead();
}