Bug fixes + tabbing
[iotcloud.git] / version2 / src / C / Table.cc
index d70a590890b170fac9417fc6ab049437304de286..152b3778cb81003ece942947f1e20d95227d986a 100644 (file)
@@ -46,6 +46,7 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i
        oldestLiveSlotSequenceNumver(1),
        localMachineId(_localMachineId),
        sequenceNumber(0),
+       localSequenceNumber(0),
        localTransactionSequenceNumber(0),
        lastTransactionSequenceNumberSpeculatedOn(0),
        oldestTransactionSequenceNumberSpeculatedOn(0),
@@ -108,6 +109,7 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
        oldestLiveSlotSequenceNumver(1),
        localMachineId(_localMachineId),
        sequenceNumber(0),
+       localSequenceNumber(0),
        localTransactionSequenceNumber(0),
        lastTransactionSequenceNumberSpeculatedOn(0),
        oldestTransactionSequenceNumberSpeculatedOn(0),
@@ -156,6 +158,40 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
        init();
 }
 
+Table::~Table() {
+       delete cloud;
+       delete random;
+       delete buffer;
+       // init data structs
+       delete committedKeyValueTable;
+       delete speculatedKeyValueTable;
+       delete pendingTransactionSpeculatedKeyValueTable;
+       delete liveNewKeyTable;
+       delete lastMessageTable;
+       delete rejectedMessageWatchVectorTable;
+       delete arbitratorTable;
+       delete liveAbortTable;
+       delete newTransactionParts;
+       delete newCommitParts;
+       delete lastArbitratedTransactionNumberByArbitratorTable;
+       delete liveTransactionBySequenceNumberTable;
+       delete liveTransactionByTransactionIdTable;
+       delete liveCommitsTable;
+       delete liveCommitsByKeyTable;
+       delete lastCommitSeenSequenceNumberByArbitratorTable;
+       delete rejectedSlotVector;
+       delete pendingTransactionQueue;
+       delete pendingSendArbitrationEntriesToDelete;
+       delete transactionPartsSent;
+       delete outstandingTransactionStatus;
+       delete liveAbortsGeneratedByLocal;
+       delete offlineTransactionsCommittedAndAtServer;
+       delete localCommunicationTable;
+       delete lastTransactionSeenFromMachineFromServer;
+       delete pendingSendArbitrationRounds;
+       delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+}
+
 /**
  * Init all the stuff needed for for table usage
  */
@@ -171,7 +207,7 @@ void Table::init() {
        liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
        lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
        rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
-       arbitratorTable = new Hashtable<IoTString *, int64_t>();
+       arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
        liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
        newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
        newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
@@ -350,7 +386,7 @@ bool Table::update()  {
 
 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
        while (true) {
-               if (!arbitratorTable->contains(keyName)) {
+               if (arbitratorTable->contains(keyName)) {
                        // There is already an arbitrator
                        return false;
                }
@@ -420,7 +456,7 @@ TransactionStatus *Table::commitTransaction() {
                Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
                uint size = pendingTransactionQueue->size();
                uint oldindex = 0;
-               for (int iter = 0; iter < size; iter++) {
+               for (uint iter = 0; iter < size; iter++) {
                        Transaction *transaction = pendingTransactionQueue->get(iter);
                        pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
 
@@ -1123,7 +1159,7 @@ ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int
 
        Array<Slot *> *array = cloud->putSlot(slot, newSize);
        if (array == NULL) {
-               array = new Array<Slot *>();
+               array = new Array<Slot *>(1);
                array->set(0, slot);
                rejectedSlotVector->clear();
                inserted = true;
@@ -1307,7 +1343,7 @@ void Table::doRejectedMessages(Slot *s) {
                        s->addEntry(rm);
                } else {
                        int64_t prev_seqn = -1;
-                       int i = 0;
+                       uint i = 0;
                        /* Go through list of missing messages */
                        for (; i < rejectedSlotVector->size(); i++) {
                                int64_t curr_seqn = rejectedSlotVector->get(i);
@@ -1917,7 +1953,7 @@ bool Table::compactArbitrationData() {
        bool hadCommit = (lastRound->getCommit() == NULL);
        bool gotNewCommit = false;
 
-       int numberToDelete = 1;
+       uint numberToDelete = 1;
        while (numberToDelete < pendingSendArbitrationRounds->size()) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
 
@@ -1968,7 +2004,7 @@ bool Table::compactArbitrationData() {
                if (numberToDelete == pendingSendArbitrationRounds->size()) {
                        pendingSendArbitrationRounds->clear();
                } else {
-                       for (int i = 0; i < numberToDelete; i++) {
+                       for (uint i = 0; i < numberToDelete; i++) {
                                pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
                        }
                }
@@ -2067,7 +2103,7 @@ bool Table::updateCommittedTable() {
                }
 
                // Go through each new commit one by one
-               for (int i = 0; i < commitSequenceNumbers->size(); i++) {
+               for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
                        int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
                        Commit *commit = commitForClientTable->get(commitSequenceNumber);
 
@@ -2130,7 +2166,7 @@ bool Table::updateCommittedTable() {
                        // have live values for their keys
                        Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
                        {
-                               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+                               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                while (kvit->hasNext()) {
                                        KeyValue *kv = kvit->next();
                                        Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
@@ -2150,7 +2186,7 @@ bool Table::updateCommittedTable() {
 
                                        // Update which keys in the old commits are still live
                                        {
-                                               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+                                               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                                while (kvit->hasNext()) {
                                                        KeyValue *kv = kvit->next();
                                                        previousCommit->invalidateKey(kv->getKey());
@@ -2180,7 +2216,7 @@ bool Table::updateCommittedTable() {
 
                        // Update the committed table of keys and which commit is using which key
                        {
-                               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+                               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                while (kvit->hasNext()) {
                                        KeyValue *kv = kvit->next();
                                        committedKeyValueTable->put(kv->getKey(), kv);
@@ -2236,7 +2272,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
        oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
 
        // Find where to start arbitration from
-       int startIndex = 0;
+       uint startIndex = 0;
 
        for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
                if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
@@ -2251,7 +2287,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
        Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
        bool didSkip = true;
 
-       for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
+       for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
                int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
                Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
 
@@ -2311,7 +2347,7 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr
        }
 
        // Find where to start arbitration from
-       int startIndex = 0;
+       uint startIndex = 0;
 
        for (; startIndex < pendingTransactionQueue->size(); startIndex++)
                if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
@@ -2322,7 +2358,7 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr
                return;
        }
 
-       for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
+       for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
                Transaction *transaction = pendingTransactionQueue->get(i);
 
                lastPendingTransactionSpeculatedOn = transaction;
@@ -2779,7 +2815,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
  * Check if the HMAC chain is not violated
  */
 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
-       for (int i = 0; i < newSlots->length(); i++) {
+       for (uint i = 0; i < newSlots->length(); i++) {
                Slot *currSlot = newSlots->get(i);
                Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
                if (prevSlot != NULL &&