oldestLiveSlotSequenceNumver(1),
localMachineId(_localMachineId),
sequenceNumber(0),
+ localSequenceNumber(0),
localTransactionSequenceNumber(0),
lastTransactionSequenceNumberSpeculatedOn(0),
oldestTransactionSequenceNumberSpeculatedOn(0),
oldestLiveSlotSequenceNumver(1),
localMachineId(_localMachineId),
sequenceNumber(0),
+ localSequenceNumber(0),
localTransactionSequenceNumber(0),
lastTransactionSequenceNumberSpeculatedOn(0),
oldestTransactionSequenceNumberSpeculatedOn(0),
init();
}
+Table::~Table() {
+ delete cloud;
+ delete random;
+ delete buffer;
+ // init data structs
+ delete committedKeyValueTable;
+ delete speculatedKeyValueTable;
+ delete pendingTransactionSpeculatedKeyValueTable;
+ delete liveNewKeyTable;
+ delete lastMessageTable;
+ delete rejectedMessageWatchVectorTable;
+ delete arbitratorTable;
+ delete liveAbortTable;
+ delete newTransactionParts;
+ delete newCommitParts;
+ delete lastArbitratedTransactionNumberByArbitratorTable;
+ delete liveTransactionBySequenceNumberTable;
+ delete liveTransactionByTransactionIdTable;
+ delete liveCommitsTable;
+ delete liveCommitsByKeyTable;
+ delete lastCommitSeenSequenceNumberByArbitratorTable;
+ delete rejectedSlotVector;
+ delete pendingTransactionQueue;
+ delete pendingSendArbitrationEntriesToDelete;
+ delete transactionPartsSent;
+ delete outstandingTransactionStatus;
+ delete liveAbortsGeneratedByLocal;
+ delete offlineTransactionsCommittedAndAtServer;
+ delete localCommunicationTable;
+ delete lastTransactionSeenFromMachineFromServer;
+ delete pendingSendArbitrationRounds;
+ delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+}
+
/**
* Init all the stuff needed for for table usage
*/
liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
- arbitratorTable = new Hashtable<IoTString *, int64_t>();
+ arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
while (true) {
- if (!arbitratorTable->contains(keyName)) {
+ if (arbitratorTable->contains(keyName)) {
// There is already an arbitrator
return false;
}
Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
uint size = pendingTransactionQueue->size();
uint oldindex = 0;
- for (int iter = 0; iter < size; iter++) {
+ for (uint iter = 0; iter < size; iter++) {
Transaction *transaction = pendingTransactionQueue->get(iter);
pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
Array<Slot *> *array = cloud->putSlot(slot, newSize);
if (array == NULL) {
- array = new Array<Slot *>();
+ array = new Array<Slot *>(1);
array->set(0, slot);
rejectedSlotVector->clear();
inserted = true;
s->addEntry(rm);
} else {
int64_t prev_seqn = -1;
- int i = 0;
+ uint i = 0;
/* Go through list of missing messages */
for (; i < rejectedSlotVector->size(); i++) {
int64_t curr_seqn = rejectedSlotVector->get(i);
bool hadCommit = (lastRound->getCommit() == NULL);
bool gotNewCommit = false;
- int numberToDelete = 1;
+ uint numberToDelete = 1;
while (numberToDelete < pendingSendArbitrationRounds->size()) {
ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
if (numberToDelete == pendingSendArbitrationRounds->size()) {
pendingSendArbitrationRounds->clear();
} else {
- for (int i = 0; i < numberToDelete; i++) {
+ for (uint i = 0; i < numberToDelete; i++) {
pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
}
}
}
// Go through each new commit one by one
- for (int i = 0; i < commitSequenceNumbers->size(); i++) {
+ for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
Commit *commit = commitForClientTable->get(commitSequenceNumber);
// have live values for their keys
Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
{
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
// Update which keys in the old commits are still live
{
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
previousCommit->invalidateKey(kv->getKey());
// Update the committed table of keys and which commit is using which key
{
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
committedKeyValueTable->put(kv->getKey(), kv);
oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
// Find where to start arbitration from
- int startIndex = 0;
+ uint startIndex = 0;
for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
bool didSkip = true;
- for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
+ for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
}
// Find where to start arbitration from
- int startIndex = 0;
+ uint startIndex = 0;
for (; startIndex < pendingTransactionQueue->size(); startIndex++)
if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
return;
}
- for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
+ for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
Transaction *transaction = pendingTransactionQueue->get(i);
lastPendingTransactionSpeculatedOn = transaction;
* Check if the HMAC chain is not violated
*/
void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
- for (int i = 0; i < newSlots->length(); i++) {
+ for (uint i = 0; i < newSlots->length(); i++) {
Slot *currSlot = newSlots->get(i);
Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
if (prevSlot != NULL &&