final class Table {
/* Constants */
- static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
+ static final int FREE_SLOTS = 2;// Number of slots that should be kept free // 10
static final int SKIP_THRESHOLD = 10;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
CloudComm cloud = NULL;
Random random = NULL;
TableStatus liveTableStatus = NULL;
- PendingTransaction pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
+ PendingTransaction pendingTransactionBuilder = NULL;// Pending Transaction used in building a Pending Transaction
+ Transaction lastPendingTransactionSpeculatedOn = NULL;// Last transaction that was speculated on from the pending transaction
+ Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0; // Number of currently live slots
+ int numberOfSlots = 0; // Number of slots stored in buffer
+ int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount = 0;// Number of currently live slots
int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
+ int64_t localMachineId = 0; // Machine ID of this client device
+ int64_t sequenceNumber = 0; // Largest sequence number a client has received
int64_t localSequenceNumber = 0;
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
+ int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
int64_t localArbitrationSequenceNumber = 0;
bool hadPartialSendToServer = false;
bool attemptedToSendToServer = false;
Slot lastSlotAttemptedToSend = NULL;
bool lastIsNewKey = false;
int lastNewSize = 0;
- Hashtable<Transaction, Vector<int32_t>> lastTransactionPartsSent = NULL;
+ Hashtable<Transaction, Vector<int32_t> > lastTransactionPartsSent = NULL;
Vector<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
NewKey lastNewKey = NULL;
/* Data Structures */
- Hashtable<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
- Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString, NewKey> liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<int64_t Pair<int64_t Liveness>> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<int64_t HashSet<RejectedMessage>> rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString, Long> arbitratorTable = NULL; // Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
- Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<int64_t Hashtable<int64_t Commit>> liveCommitsTable = NULL;
+ Hashtable<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
+ Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString, NewKey> liveNewKeyTable = NULL;// Table of live new keys
+ Hashtable<int64_t Pair<int64_t Liveness> > lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t HashSet<RejectedMessage> > rejectedMessageWatchVectorTable = NULL;// Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString, Long> arbitratorTable = NULL;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> > newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> > newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL;// Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL;// live transaction grouped by the transaction ID
+ Hashtable<int64_t Hashtable<int64_t Commit> > liveCommitsTable = NULL;
Hashtable<IoTString, Commit> liveCommitsByKeyTable = NULL;
Hashtable<int64_t, int64_t> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
Vector<Transaction> pendingTransactionQueue = NULL;
Vector<ArbitrationRound> pendingSendArbitrationRounds = NULL;
Vector<Entry> pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction, Vector<int32_t>> transactionPartsSent = NULL;
+ Hashtable<Transaction, Vector<int32_t> > transactionPartsSent = NULL;
Hashtable<int64_t TransactionStatus> outstandingTransactionStatus = NULL;
Hashtable<int64_t Abort> liveAbortsGeneratedByLocal = NULL;
- Set<Pair<int64_t, int64_t>> offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<int64_t Pair<String, int32_t>> localCommunicationTable = NULL;
+ Set<Pair<int64_t, int64_t> > offlineTransactionsCommittedAndAtServer = NULL;
+ Hashtable<int64_t Pair<String, int32_t> > localCommunicationTable = NULL;
Hashtable<int64_t, int64_t> lastTransactionSeenFromMachineFromServer = NULL;
Hashtable<int64_t, int64_t> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
liveNewKeyTable = new Hashtable<IoTString, NewKey>();
- lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness>>();
- rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage>>();
+ lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
arbitratorTable = new Hashtable<IoTString, Long>();
liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
- newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart>>();
- newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart>>();
+ newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
+ newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
- liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit>>();
+ liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
rejectedSlotVector = new Vector<Long>();
pendingTransactionQueue = new Vector<Transaction>();
pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
- transactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>();
+ transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
- offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t>>();
- localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t>>();
+ offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+ localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
}
// String toString() {
- // String retString = " Committed Table: \n";
- // retString += "---------------------------\n";
- // retString += commitedTable.toString();
+ // String retString = " Committed Table: \n";
+ // retString += "---------------------------\n";
+ // retString += commitedTable.toString();
- // retString += "\n\n";
+ // retString += "\n\n";
- // retString += " Speculative Table: \n";
- // retString += "---------------------------\n";
- // retString += speculativeTable.toString();
+ // retString += " Speculative Table: \n";
+ // retString += "---------------------------\n";
+ // retString += speculativeTable.toString();
- // return retString;
+ // return retString;
// }
synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
continue;
}
- Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
+ Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
if (sendReturn.getFirst()) {
// Failed to contact over local
Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
if (newSlots.length == 0) {
fromRetry = true;
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+ ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
if (sendSlotsReturn.getFirst()) {
if (newKey != NULL) {
localSequenceNumber++;
// Try to fill the slot with data
- ThreeTuple<Boolean, int32_t, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
+ ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
bool needsResize = fillSlotsReturn.getFirst();
int newSize = fillSlotsReturn.getSecond();
- Boolean insertedNewKey = fillSlotsReturn.getThird();
+ bool insertedNewKey = fillSlotsReturn.getThird();
if (needsResize) {
// Reset which transaction to send
lastInsertedNewKey = insertedNewKey;
lastNewSize = newSize;
lastNewKey = newKey;
- lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+ ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
if (sendSlotsReturn.getFirst()) {
} else {
// if (!sendSlotsReturn.getSecond()) {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
- // }
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+ // }
// } else {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
- // // Update which transactions parts still need to be sent
- // transaction.removeSentParts(transactionPartsSent.get(transaction));
+ // // Update which transactions parts still need to be sent
+ // transaction.removeSentParts(transactionPartsSent.get(transaction));
- // // Add the transaction status to the outstanding list
- // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+ // // Add the transaction status to the outstanding list
+ // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
- // // Update the transaction status
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+ // // Update the transaction status
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
- // // Check if all the transaction parts were successfully sent and if so then remove it from pending
- // if (transaction.didSendAllParts()) {
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- // pendingTransactionQueue.remove(transaction);
+ // // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ // if (transaction.didSendAllParts()) {
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ // pendingTransactionQueue.remove(transaction);
- // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
- // }
- // }
- // }
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
+ // }
+ // }
+ // }
// }
// Reset which transaction to send
// if (!fromRetry) {
- // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
- // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
+ // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
// }
// Nothing was able to be sent to the server so just clear these data structures
// Get the size of the send data
int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
- Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
+ Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
}
return true;
}
- Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
+ Pair<bool, bool> sendTransactionToLocal(Transaction transaction) {
// Get the devices local communications
Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
if (localCommunicationInformation == NULL) {
// Cant talk to that device locally so do nothing
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
// Get the size of the send data
sendDataSize += part.getSize();
}
- Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
+ Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
}
if (returnData == NULL) {
// Could not contact server
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
// Decode the data
}
}
- return new Pair<Boolean, Boolean>(false, true);
+ return new Pair<bool, bool>(false, true);
}
synchronized char[] acceptDataFromLocal(char[] data) {
}
// Arbitrate on transaction and pull relevant return data
- Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
couldArbitrate = localArbitrateReturn.getFirst();
didCommit = localArbitrateReturn.getSecond();
// Number of arbitration entries to decode
returnDataSize += 2 * sizeof(int32_t);
- // Boolean of did commit or not
+ // bool of did commit or not
if (numberOfParts != 0) {
returnDataSize += sizeof(char);
}
return returnData;
}
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
+ ThreeTuple<bool, bool, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
bool attemptedToSendToServerTmp = attemptedToSendToServer;
attemptedToSendToServer = true;
array = new Slot[] {slot};
rejectedSlotVector.clear();
inserted = true;
- } else {
+ } else {
if (array.length == 0) {
throw new Error("Server Error: Did not send any slots");
}
}
}
- return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
+ return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
}
/**
* Returns false if a resize was needed
*/
- ThreeTuple<Boolean, int32_t, Boolean> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
+ ThreeTuple<bool, int32_t, bool> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
int newSize = 0;
if (liveSlotCount > bufferResizeThreshold) {
- resize = true; //Resize is forced
+ resize = true;//Resize is forced
}
doRejectedMessages(slot);
// Do mandatory rescue of entries
- ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+ ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
// Extract working variables
bool needsResize = mandatoryRescueReturn.getFirst();
if (needsResize && !resize) {
// We need to resize but we are not resizing so return false
- return new ThreeTuple<Boolean, int32_t, Boolean>(true, NULL, NULL);
+ return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
}
bool inserted = false;
// Set the transaction sequence number if it has yet to be inserted into the block chain
// if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
- // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
// }
if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
// Fill the remainder of the slot with rescue data
doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
- return new ThreeTuple<Boolean, int32_t, Boolean>(false, newSize, inserted);
+ return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
}
void doRejectedMessages(Slot s) {
- if (! rejectedSlotVector.isEmpty()) {
+ if (!rejectedSlotVector.isEmpty()) {
/* TODO: We should avoid generating a rejected message entry if
* there is already a sufficient entry in the queue (e.g.,
* equalsto value of true and same sequence number). */
}
}
- ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, bool resize) {
+ ThreeTuple<bool, bool, Long> doMandatoryResuce(Slot slot, bool resize) {
int64_t newestSequenceNumber = buffer.getNewestSeqNum();
int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
Slot previousSlot = buffer.getSlot(currentSequenceNumber);
// Push slot number forward
- if (! seenLiveSlot) {
+ if (!seenLiveSlot) {
oldestLiveSlotSequenceNumver = currentSequenceNumber;
}
slot.addEntry(liveEntry);
} else if (currentSequenceNumber == firstIfFull) {
//if there's no space but the entry is about to fall off the queue
- System.out.println("B"); //?
- return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
+ System.out.println("B");//?
+ return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
}
}
}
// Did not resize
- return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
+ return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
}
void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
* for SKIP_THRESHOLD consecutive entries*/
int skipcount = 0;
int64_t newestseqnum = buffer.getNewestSeqNum();
- search:
+search:
for (; seqn <= newestseqnum; seqn++) {
Slot prevslot = buffer.getSlot(seqn);
//Push slot number forward
// Create the abort
Abort newAbort = new Abort(NULL,
- transaction.getClientLocalSequenceNumber(),
- transaction.getSequenceNumber(),
- transaction.getMachineId(),
- transaction.getArbitrator(),
- localArbitrationSequenceNumber);
+ transaction.getClientLocalSequenceNumber(),
+ transaction.getSequenceNumber(),
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
localArbitrationSequenceNumber++;
generatedAborts.add(newAbort);
}
}
- Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
+ Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction) {
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
// Most likely this
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
if (transaction.getMachineId() != localMachineId) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
// We've have already seen this from the server
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
}
}
}
updateLiveStateFromLocal();
- return new Pair<Boolean, Boolean>(true, true);
+ return new Pair<bool, bool>(true, true);
} else {
if (transaction.getMachineId() == localMachineId) {
// Create the abort
Abort newAbort = new Abort(NULL,
- transaction.getClientLocalSequenceNumber(),
- -1,
- transaction.getMachineId(),
- transaction.getArbitrator(),
- localArbitrationSequenceNumber);
+ transaction.getClientLocalSequenceNumber(),
+ -1,
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
localArbitrationSequenceNumber++;
addAbortSet.add(newAbort);
}
updateLiveStateFromLocal();
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
}
return false;
}
// bool compactArbitrationData() {
- // return false;
+ // return false;
// }
/**
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
}
- commitsToEdit.remove(NULL); // remove NULL since it could be in this set
+ commitsToEdit.remove(NULL); // remove NULL since it could be in this set
// Update each previous commit that needs to be updated
for (Commit previousCommit : commitsToEdit) {
if (startIndex >= transactionSequenceNumbersSorted.size()) {
// Make sure we are not out of bounds
- return false; // did not speculate
+ return false; // did not speculate
}
Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
void updateLiveTransactionsAndStatus() {
// Go through each of the transactions
- for (Iterator<Map.Entry<int64_t Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
Transaction transaction = iter.next().getValue();
// Check if the transaction is dead
}
// Go through each of the transactions
- for (Iterator<Map.Entry<int64_t TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
TransactionStatus status = iter.next().getValue();
// Check if the transaction is dead
// Create a list of clients to watch until they see this rejected message entry.
HashSet<Long> deviceWatchSet = new HashSet<Long>();
- for (Map.Entry<int64_t Pair<int64_t Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
+ for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
// Machine ID for the last message entry
int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
// Abort has not been seen by the client it is for yet so we need to keep track of it
Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
if (previouslySeenAbort != NULL) {
- previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
+ previouslySeenAbort.setDead();// Delete old version of the abort since we got a rescued newer version
}
if (entry.getTransactionArbitrator() == localMachineId) {
}
// Set dead the abort
- for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
+ for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
Abort abort = i.next().getValue();
if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
Slot currSlot = newSlots[i];
Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
if (prevSlot != NULL &&
- !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
+ !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
}
}