edits
authorbdemsky <bdemsky@uci.edu>
Tue, 13 Mar 2018 18:57:20 +0000 (11:57 -0700)
committerbdemsky <bdemsky@uci.edu>
Tue, 13 Mar 2018 18:57:20 +0000 (11:57 -0700)
version2/src/C/ByteBuffer.h
version2/src/C/CloudComm.cc
version2/src/C/IoTString.h
version2/src/C/NewKey.cc
version2/src/C/Slot.cc
version2/src/C/Table.cc
version2/src/C/Table.h
version2/src/C/Test.C
version2/src/C/Transaction.cc

index 7b8d2e4ed9c71295af4e6761aaedd7cdc6d7e039..06c309bf885a088ff0521f4da6754f0e4f36b86d 100644 (file)
@@ -13,8 +13,11 @@ public:
        char get();
        void get(Array<char> *array);
        void position(int32_t newPosition);
+       void releaseArray() {buffer = NULL;}
        Array<char> *array();
-private:
+       ~ByteBuffer() {if (buffer != NULL) delete buffer;}
+       
+ private:
        ByteBuffer(Array<char> *array);
        friend ByteBuffer *ByteBuffer_wrap(Array<char> *array);
        friend ByteBuffer *ByteBuffer_allocate(uint size);
index c20488dd268d59632d09634068961b918a9a878c..2a7d233608d9498679c1348bbf3530324ebb73bb 100644 (file)
@@ -499,13 +499,10 @@ Array<char> *AESDecrypt(Array<char> *ivBytes, AESKey *key, Array<char> *data) {
 Array<char> *CloudComm::encryptSlotAndPrependIV(Array<char> *rawData, Array<char> *ivBytes) {
        try {
                Array<char> *encryptedBytes = AESEncrypt(ivBytes, key, rawData);
-               Array<char> *origBytes = AESDecrypt(ivBytes, key, encryptedBytes);
-               if (!rawData->equals(origBytes))
-                       throw new Error("BAD");
                Array<char> *chars = new Array<char>(encryptedBytes->length() + CloudComm_IV_SIZE);
                System_arraycopy(ivBytes, 0, chars, 0, ivBytes->length());
                System_arraycopy(encryptedBytes, 0, chars, CloudComm_IV_SIZE, encryptedBytes->length());
-
+               delete encryptedBytes;
                return chars;
        } catch (Exception *e) {
                throw new Error("Failed To Encrypt");
@@ -518,7 +515,10 @@ Array<char> *CloudComm::stripIVAndDecryptSlot(Array<char> *rawData) {
                Array<char> *encryptedBytes = new Array<char>(rawData->length() - CloudComm_IV_SIZE);
                System_arraycopy(rawData, 0, ivBytes, 0, CloudComm_IV_SIZE);
                System_arraycopy(rawData, CloudComm_IV_SIZE, encryptedBytes, 0, encryptedBytes->length());
-               return AESDecrypt(ivBytes, key, encryptedBytes);
+               Array<char> * data = AESDecrypt(ivBytes, key, encryptedBytes);
+               delete encryptedBytes;
+               delete ivBytes;
+               return data;
        } catch (Exception *e) {
                throw new Error("Failed To Decrypt");
        }
@@ -542,10 +542,12 @@ Array<Slot *> *CloudComm::putSlot(Slot *slot, int max) {
                int64_t sequencenumber = slot->getSequenceNumber();
                Array<char> *slotBytes = slot->encode(mac);
                Array<char> *chars = encryptSlotAndPrependIV(slotBytes, slot->getSlotCryptIV());
+               delete slotBytes;
                IoTString *url = buildRequest(true, sequencenumber, max);
                timer->startTime();
                wc = openURL(url);
                writeURLDataAndClose(&wc, chars);
+               delete chars;
                timer->endTime();
        } catch (ServerException *e) {
                timer->endTime();
@@ -652,7 +654,9 @@ Array<Slot *> *CloudComm::processSlots(WebConnection *wc) {
                Array<char> *rawData = new Array<char>(sizesofslots->get(i));
                readURLData(wc, rawData);
                Array<char> *data = stripIVAndDecryptSlot(rawData);
+               delete rawData;
                slots->set(i, Slot_decode(table, data, mac));
+               delete data;
        }
        return slots;
 }
index 1fb527f03a9977233377a1796c0c2c47e656993c..583a098a07df5aff5f663d4e8299d56c1258497a 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "array.h"
 #include <string.h>
+#include <stdio.h>
 /**
  * IoTString wraps the underlying char string.
  * @author Brian Demsky <bdemsky@uci.edu>
@@ -13,7 +14,7 @@ inline unsigned int hashCharArray(Array<char> *array) {
        uint len = array->length();
        unsigned int hash = 0;
        for (uint i = 0; i < len; i++) {
-               hash = 31 * hash + array->get(i);
+               hash = 31 * hash + ((unsigned int) array->get(i));
        }
        return hash;
 }
@@ -42,14 +43,19 @@ public:
        }
 
        IoTString(IoTString *string) :
-               array(new Array<char>(string->array)),
-               hashvalue(hashCharArray(array)) {
+         array(new Array<char>(string->array)),
+               hashvalue(string->hashvalue) {
        }
 
        ~IoTString() {
                delete array;
        }
 
+       void print() {
+               for(uint i=0; i < array->length(); i++)
+                       printf("%c", array->get(i));
+       };
+       
        char get(uint i) {return array->get(i);}
 
        /**
@@ -69,6 +75,7 @@ public:
         * Returns the length in chars of the IoTString.
         */
 
+       
        bool equals(IoTString *str) {
                uint strlength = str->array->length();
                uint thislength = array->length();
@@ -87,6 +94,7 @@ public:
 inline IoTString *IoTString_shallow(Array<char> *_array) {
        IoTString *str = new IoTString();
        str->array = _array;
+       str->hashvalue = hashCharArray(_array);
        return str;
 }
 
index a302a78d40877bc6c436691520aacb4036e9ba3e..a3a1a87c8dbe81007c5711b0b1a219b83c85116d 100644 (file)
@@ -17,8 +17,10 @@ Entry *NewKey_decode(Slot *slot, ByteBuffer *bb) {
        Array<char> *key = new Array<char>(keylength);
        bb->get(key);
        int64_t machineid = bb->getLong();
-
-       return new NewKey(slot, IoTString_shallow(key), machineid);
+       IoTString *str = IoTString_shallow(key);
+       NewKey *newkey = new NewKey(slot, str, machineid);
+       delete str;
+       return newkey;
 }
 
 Entry *NewKey::getCopy(Slot *s) { return new NewKey(s, new IoTString(key), machineid); }
index 10c92ded4e5a592518ab3e7dba9f3c89c945690c..03aa3cc1a8f12553774e16e7e7cb5965ed4447de 100644 (file)
@@ -93,7 +93,8 @@ Slot *Slot_decode(Table *table, Array<char> *array, Mac *mac) {
        for (int i = 0; i < numentries; i++) {
                slot->addShallowEntry(Entry_decode(slot, bb));
        }
-
+       bb->releaseArray();
+       delete bb;
        return slot;
 }
 
@@ -120,6 +121,8 @@ Array<char> *Slot::encode(Mac *mac) {
        hmac = realmac;
        bb->position(0);
        bb->put(realmac);
+       bb->releaseArray();
+       delete bb;
        return array;
 }
 
index 88f43ee75ddef8c3feb817c91074c07fbcec71ec..a5660091fd614018bc10f7966e0b8cee66c83308 100644 (file)
@@ -168,15 +168,68 @@ Table::~Table() {
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
        delete lastMessageTable;
-       delete rejectedMessageWatchVectorTable;
+       {
+               SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
+               while(rmit->hasNext()) {
+                       int64_t machineid = rmit->next();
+                       Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
+                       SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
+                       while (mit->hasNext()) {
+                               RejectedMessage * rm = mit->next();
+                               delete rm;
+                       }
+                       delete mit;
+                       delete rmset;
+               }
+               delete rmit;
+               delete rejectedMessageWatchVectorTable;
+       }
        delete arbitratorTable;
        delete liveAbortTable;
-       delete newTransactionParts;
-       delete newCommitParts;
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               delete newTransactionParts;
+       }
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               delete newCommitParts;
+       }
        delete lastArbitratedTransactionNumberByArbitratorTable;
        delete liveTransactionBySequenceNumberTable;
        delete liveTransactionByTransactionIdTable;
-       delete liveCommitsTable;
+       {
+               SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
+               while (liveit->hasNext()) {
+                       int64_t arbitratorId = liveit->next();
+                       
+                       // Get all the commits for a specific arbitrator
+                       Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
+                       {
+                               SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
+                               while (clientit->hasNext()) {
+                                       int64_t id = clientit->next();
+                                       delete commitForClientTable->get(id);
+                               }
+                               delete clientit;
+                       }
+                       
+                       delete commitForClientTable;
+               }
+               delete liveit;
+               delete liveCommitsTable;
+       }
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
@@ -189,6 +242,8 @@ Table::~Table() {
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
        delete pendingSendArbitrationRounds;
+       if (lastTransactionPartsSent != NULL)
+               delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
 }
 
@@ -253,10 +308,13 @@ void Table::initTable() {
                array->set(0, s);
                // update local block chain
                validateAndUpdate(array, true);
+               delete array;
        } else if (array->length() == 1) {
                // in case we did push the slot BUT we failed to init it
                validateAndUpdate(array, true);
+               delete array;
        } else {
+               delete array;
                throw new Error("Error on initialization");
        }
 }
@@ -269,6 +327,7 @@ void Table::rebuild() {
        // Just pull the latest slots from the server
        Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
        validateAndUpdate(newslots, true);
+       delete newslots;
        sendToServer(NULL);
        updateLiveTransactionsAndStatus();
 }
@@ -289,7 +348,7 @@ IoTString *Table::getCommitted(IoTString *key)  {
        KeyValue *kv = committedKeyValueTable->get(key);
 
        if (kv != NULL) {
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                return NULL;
        }
@@ -307,7 +366,7 @@ IoTString *Table::getSpeculative(IoTString *key) {
        }
 
        if (kv != NULL) {
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                return NULL;
        }
@@ -328,7 +387,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) {
 
        if (kv != NULL) {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
                return NULL;
@@ -358,7 +417,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) {
 
        if (kv != NULL) {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
                return NULL;
@@ -369,6 +428,7 @@ bool Table::update()  {
        try {
                Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
                validateAndUpdate(newSlots, false);
+               delete newSlots;
                sendToServer(NULL);
                updateLiveTransactionsAndStatus();
                return true;
@@ -404,8 +464,7 @@ void Table::startTransaction() {
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
-void Table::addKV(IoTString *key, IoTString *value) {
-
+void Table::put(IoTString *key, IoTString *value) {
        // Make sure it is a valid key
        if (!arbitratorTable->contains(key)) {
                throw new Error("Key not Found.");
@@ -418,7 +477,7 @@ void Table::addKV(IoTString *key, IoTString *value) {
        }
 
        // Add the key value to this transaction
-       KeyValue *kv = new KeyValue(key, value);
+       KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
        pendingTransactionBuilder->addKV(kv);
 }
 
@@ -590,7 +649,6 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                        // insert into the local block chain
                        validateAndUpdate(newSlots, true);
                }
-               // continue;
        } else {
                if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                        if (newKey != NULL) {
@@ -642,6 +700,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                // insert into the local block chain
                validateAndUpdate(newSlots, true);
        }
+       delete newSlots;
        return newKey;
 }
 
@@ -698,6 +757,8 @@ bool Table::sendToServer(NewKey *newKey) {
                        lastInsertedNewKey = insertedNewKey;
                        lastNewSize = newSize;
                        lastNewKey = newKey;
+                       if (lastTransactionPartsSent != NULL)
+                               delete lastTransactionPartsSent;
                        lastTransactionPartsSent = transactionPartsSent->clone();
                        lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
 
@@ -772,6 +833,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                // insert into the local block chain
                                validateAndUpdate(newSlots, true);
                        }
+                       delete newSlots;
                }
        } catch (ServerException *e) {
                if (e->getType() != ServerException_TypeInputTimeout) {
@@ -1426,7 +1488,8 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                        updateExpectedSize();
                }
        }
-
+       delete indexer;
+       
        // If there is a gap, check to see if the server sent us
        // everything->
        if (firstSeqNum != (sequenceNumber + 1)) {
@@ -1607,7 +1670,16 @@ void Table::processNewTransactionParts() {
        delete tpit;
        // Clear all the new transaction parts in preparation for the next
        // time the server sends slots
-       newTransactionParts->clear();
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               newTransactionParts->clear();
+       }
 }
 
 void Table::arbitrateFromServer() {
@@ -1718,7 +1790,7 @@ void Table::arbitrateFromServer() {
                        newCommit->addKV(kv);
                }
                delete spit;
-
+               
                // create the commit parts
                newCommit->createCommitParts();
 
@@ -1732,6 +1804,7 @@ void Table::arbitrateFromServer() {
                        processEntry(commitPart);
                }
        }
+       delete speculativeTableTmp;
 
        if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
                ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
@@ -1999,6 +2072,7 @@ bool Table::updateCommittedTable() {
                        commit->addPartDecode(part);
                }
                delete pairit;
+               delete parts;
        }
        delete partsit;
 
@@ -2052,6 +2126,7 @@ bool Table::updateCommittedTable() {
                                        // Delete it and move on
                                        commit->setDead();
                                        commitForClientTable->remove(commit->getSequenceNumber());
+                                       delete commit;
                                        continue;
                                }
                        }
@@ -2129,6 +2204,7 @@ bool Table::updateCommittedTable() {
                                        // if the commit is now dead then remove it
                                        if (!previousCommit->isLive()) {
                                                commitForClientTable->remove(previousCommit->getSequenceNumber());
+                                               delete previousCommit;
                                        }
                                }
                        }
@@ -2328,6 +2404,7 @@ void Table::updateLiveTransactionsAndStatus() {
                                // Remove the transaction from the live table
                                iter->remove();
                                liveTransactionByTransactionIdTable->remove(transaction->getId());
+                               delete transaction;
                        }
                }
                delete iter;
@@ -2655,6 +2732,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                                rmit->remove();
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
+                               delete rm;
                        }
                }
                delete rmit;
index 59ba47216191b268189894ee76604287316d2599..dfe188652d4192c95a3c13455685896c647ec5c8 100644 (file)
@@ -267,7 +267,7 @@ public:
        bool update();
        bool createNewKey(IoTString *keyName, int64_t machineId);
        void startTransaction();
-       void addKV(IoTString *key, IoTString *value);
+       void put(IoTString *key, IoTString *value);
        TransactionStatus *commitTransaction();
 
        /**
index 14fec0d44d627a113e088561e5060815f1c1f7d0..7b7733ca287a6bbe538548c214f4da302c22dfb9 100644 (file)
@@ -37,6 +37,10 @@ int main(int numargs, char ** args) {
                t1->createNewKey(ib, 351);
                t2->createNewKey(ic, 321);
                t2->createNewKey(id, 351);
+               delete ia;
+               delete ib;
+               delete ic;
+               delete id;
        }
        
        // Do Updates for the keys
@@ -61,20 +65,26 @@ int main(int numargs, char ** args) {
                IoTString * iValueD = new IoTString(buffer);
 
                t1->startTransaction();
-               t1->addKV(iKeyA, iValueA);
+               t1->put(iKeyA, iValueA);
                transStatusList->add(t1->commitTransaction());
+               delete iKeyA; delete iValueA;
+               
                t1->startTransaction();
-               t1->addKV(iKeyB, iValueB);
+               t1->put(iKeyB, iValueB);
                transStatusList->add(t1->commitTransaction());
+               delete iKeyB; delete iValueB;
                
                t2->startTransaction();
-               t2->addKV(iKeyC, iValueC);
+               t2->put(iKeyC, iValueC);
                transStatusList->add(t2->commitTransaction());
+               delete iKeyC; delete iValueC;
                
                t2->startTransaction();
-               t2->addKV(iKeyD, iValueD);
+               t2->put(iKeyD, iValueD);
                transStatusList->add(t2->commitTransaction());
+               delete iKeyD; delete iValueD;
        }
+       
        printf("Updating Clients...\n");
        t1->update();
        t2->update();
@@ -147,6 +157,14 @@ int main(int numargs, char ** args) {
                        printf("Key-Value t2 incorrect: keyD     testValD2\n");
                        foundError = true;
                }
+               delete iKeyA; delete iValueA;
+               delete iKeyB; delete iValueB;
+               delete iKeyC; delete iValueC;
+               delete iKeyD; delete iValueD;
+               delete testValA1; delete testValA2;
+               delete testValB1; delete testValB2;
+               delete testValC1; delete testValC2;
+               delete testValD1; delete testValD2;
        }
 
        for (uint i = 0; i < transStatusList->size(); i++) {
@@ -162,5 +180,9 @@ int main(int numargs, char ** args) {
        } else {
                printf("No Errors Found...\n");
        }
+
+       delete transStatusList;
+       delete t1;
+       delete t2;
 }
 
index 1f862df5c42a46e392f0e4663737427baa7c60bd..d316aa10985723556b2c7c2ccc3c9ab10696d854 100644 (file)
@@ -20,6 +20,9 @@ Transaction::Transaction() :
        arbitratorId(-1),
        machineId(-1),
        transactionId(Pair<int64_t, int64_t>(0,0)),
+       nextPartToSend(0),
+       flddidSendAPartToServer(false),
+       transactionStatus(NULL),
        hadServerFailure(false) {
 }
 
@@ -34,7 +37,6 @@ void Transaction::addPartEncode(TransactionPart *newPart) {
        transactionId = newPart->getTransactionId();
        clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
        machineId = newPart->getMachineId();
-
        fldisComplete = true;
 }