edits
authorbdemsky <bdemsky@uci.edu>
Fri, 16 Mar 2018 15:08:34 +0000 (08:08 -0700)
committerbdemsky <bdemsky@uci.edu>
Fri, 16 Mar 2018 15:08:34 +0000 (08:08 -0700)
16 files changed:
version2/src/C/ArbitrationRound.cc
version2/src/C/CloudComm.cc
version2/src/C/Commit.cc
version2/src/C/Entry.h
version2/src/C/NewKey.cc
version2/src/C/PendingTransaction.cc
version2/src/C/Slot.cc
version2/src/C/Slot.h
version2/src/C/SlotBuffer.h
version2/src/C/Table.cc
version2/src/C/Table.h
version2/src/C/Test.C
version2/src/C/Transaction.cc
version2/src/C/TransactionPart.cc
version2/src/C/TransactionPart.h
version2/src/C/hashset.h

index 543c2d08385edd4d0e9090d3fea8a95c6c72f7c8..159aeaea39ccb1bcac682154a4469b2046407c53 100644 (file)
@@ -28,7 +28,7 @@ void ArbitrationRound::generateParts() {
        if (didGenerateParts) {
                return;
        }
-       parts = new Vector<Entry *>();
+       parts->clear();
        SetIterator<Abort *, Abort *> *abit = abortsBefore->iterator();
        while (abit->hasNext())
                parts->add((Entry *)abit->next());
index c229259140501fa1384cb74eb1fa49824b58a5c9..8fe0ceeb256eddc802a076f3e6289798f0e5a90d 100644 (file)
@@ -46,7 +46,7 @@ void *threadWrapper(void *cloud) {
  * Constructor for actual use. Takes in the url and password.
  */
 CloudComm::CloudComm(Table *_table,  IoTString *_baseurl, IoTString *_password, int _listeningPort) :
-       baseurl(_baseurl),
+       baseurl(new IoTString(_baseurl)),
        key(NULL),
        mac(NULL),
        password(new IoTString(_password)),
@@ -74,6 +74,10 @@ CloudComm::~CloudComm() {
                delete random;
        if (baseurl)
                delete baseurl;
+       if (mac)
+               delete mac;
+       if (key)
+               delete key;
 }
 
 /**
@@ -135,6 +139,7 @@ IoTString *CloudComm::buildRequest(bool isput, int64_t sequencenumber, int64_t m
        if (maxentries != 0)
                sprintf(&buffer[offset], "&max=%" PRId64, maxentries);
        IoTString *urlstr = new IoTString(buffer);
+       free(buffer);
        return urlstr;
 }
 
@@ -225,6 +230,7 @@ WebConnection openURL(IoTString *url) {
        /* send the request */
        int total = strlen(message);
        loopWrite(sockfd, message, total);
+       free(message);
        return (WebConnection) {sockfd, -1};
 }
 
@@ -411,6 +417,7 @@ void CloudComm::setSalt() {
 
                timer->startTime();
                wc = openURL(urlstr);
+               delete urlstr;
                writeURLDataAndClose(&wc, saltTmp);
 
                int responsecode = getResponseCode(&wc);
@@ -554,11 +561,14 @@ Array<Slot *> *CloudComm::putSlot(Slot *slot, int max) {
 
                int64_t sequencenumber = slot->getSequenceNumber();
                Array<char> *slotBytes = slot->encode(mac);
-               Array<char> *chars = encryptSlotAndPrependIV(slotBytes, slot->getSlotCryptIV());
+               Array<char> * ivBytes = slot->getSlotCryptIV();
+               Array<char> *chars = encryptSlotAndPrependIV(slotBytes, ivBytes);
+               delete ivBytes;
                delete slotBytes;
                IoTString *url = buildRequest(true, sequencenumber, max);
                timer->startTime();
                wc = openURL(url);
+               delete url;
                writeURLDataAndClose(&wc, chars);
                delete chars;
                timer->endTime();
@@ -625,6 +635,7 @@ Array<Slot *> *CloudComm::getSlots(int64_t sequencenumber) {
                IoTString *url = buildRequest(false, sequencenumber, 0);
                timer->startTime();
                wc = openURL(url);
+               delete url;
                closeURLReq(&wc);
                timer->endTime();
        } catch (SocketTimeoutException *e) {
index a24e0148df376b6180c7bd574ff499c6c6b37519..2ee208d5fd7397995e5f39e452a9ec44825e7308 100644 (file)
@@ -33,8 +33,15 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction
 
 Commit::~Commit() {
        delete parts;
-       delete keyValueUpdateSet;
-       delete liveKeys;
+       {
+               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
+               while(keyit->hasNext()) {
+                       delete keyit->next();
+               }
+               delete keyit;
+               delete keyValueUpdateSet;
+       }
+       delete liveKeys;
        if (missingParts != NULL)
                delete missingParts;
 }
@@ -99,8 +106,9 @@ Vector<CommitPart *> *Commit::getParts() {
 }
 
 void Commit::addKV(KeyValue *kv) {
-       keyValueUpdateSet->add(kv);
-       liveKeys->add(kv->getKey());
+       KeyValue * kvcopy = kv->getCopy();
+       keyValueUpdateSet->add(kvcopy);
+       liveKeys->add(kvcopy->getKey());
 }
 
 void Commit::invalidateKey(IoTString *key) {
@@ -166,6 +174,7 @@ void Commit::createCommitParts() {
                commitPartCount++;
                remaining -= copySize;
        }
+       delete charData;
 }
 
 void Commit::decodeCommitData() {
@@ -201,6 +210,7 @@ void Commit::decodeCommitData() {
                keyValueUpdateSet->add(kv);
                liveKeys->add(kv->getKey());
        }
+       delete bbDecode;
 }
 
 Array<char> *Commit::convertDataToBytes() {
@@ -227,8 +237,10 @@ Array<char> *Commit::convertDataToBytes() {
                kv->encode(bbEncode);
        }
        delete kvit;
-
-       return bbEncode->array();
+       Array<char> * array = bbEncode->array();
+       bbEncode->releaseArray();
+       delete bbEncode;
+       return array;
 }
 
 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
@@ -237,8 +249,9 @@ void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueE
        SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
        while (kvit->hasNext()) {
                KeyValue *kv = kvit->next();
-               liveKeys->add(kv->getKey());
-               keyValueUpdateSet->add(kv);
+               KeyValue *kvcopy = kv->getCopy();
+               liveKeys->add(kvcopy->getKey());
+               keyValueUpdateSet->add(kvcopy);
        }
        delete kvit;
 }
index acdd04a8a0174d431516ac13866a4ee45ab0e054..2a0c9f42a8c230fea2a2d3d8bcdb7704b5e3670c 100644 (file)
@@ -29,7 +29,7 @@ protected:
 
 public:
        Entry(Slot *_parentslot) : islive(true), parentslot(_parentslot) {}
-
+       virtual ~Entry() {}
        /**
         * Returns true if the Entry object is still live.
         */
index a3a1a87c8dbe81007c5711b0b1a219b83c85116d..0b35933b85750ba375414a8b18a64be8987c4559 100644 (file)
@@ -23,7 +23,7 @@ Entry *NewKey_decode(Slot *slot, ByteBuffer *bb) {
        return newkey;
 }
 
-Entry *NewKey::getCopy(Slot *s) { return new NewKey(s, new IoTString(key), machineid); }
+Entry *NewKey::getCopy(Slot *s) { return new NewKey(s, key, machineid); }
 
 void NewKey::encode(ByteBuffer *bb) {
        bb->put(TypeNewKey);
index f4532190bca1e43d490cc4717c200f1827c1a98d..45602a0e9ae361a195bfbcb7e54c4ddc1375430f 100644 (file)
@@ -137,7 +137,8 @@ Transaction *PendingTransaction::createTransaction() {
                transactionPartCount++;
                remaining -= copySize;
        }
-
+       delete charData;
+       
        // Add the Guard Conditions
        SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
        while (kvit->hasNext()) {
@@ -185,5 +186,8 @@ Array<char> *PendingTransaction::convertDataToBytes() {
        }
        delete kvit;
 
-       return bbEncode->array();
+       Array<char> *array = bbEncode->array();
+       bbEncode->releaseArray();
+       delete bbEncode;
+       return array;
 }
index 4a6f268d016ac2d4418b0bb100274b7449c8a0ea..644854e56d6bdc145b5b7577ff7b647b18560f0a 100644 (file)
@@ -17,7 +17,8 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_pre
        seqnumlive(true),
        freespace(SLOT_SIZE - getBaseSize()),
        table(_table),
-       localSequenceNumber(_localSequenceNumber) {
+       localSequenceNumber(_localSequenceNumber),
+       fakeLastMessage(NULL) {
 }
 
 Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, int64_t _localSequenceNumber) :
@@ -30,7 +31,8 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_pre
        seqnumlive(true),
        freespace(SLOT_SIZE - getBaseSize()),
        table(_table),
-       localSequenceNumber(_localSequenceNumber) {
+       localSequenceNumber(_localSequenceNumber),
+       fakeLastMessage(NULL) {
 }
 
 Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) :
@@ -43,7 +45,8 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSeq
        seqnumlive(true),
        freespace(SLOT_SIZE - getBaseSize()),
        table(_table),
-       localSequenceNumber(_localSequenceNumber) {
+       localSequenceNumber(_localSequenceNumber),
+       fakeLastMessage(NULL) {
 }
 
 Slot::~Slot() {
@@ -53,6 +56,8 @@ Slot::~Slot() {
        for(uint i=0; i< entries->size(); i++)
                delete entries->get(i);
        delete entries;
+       if (fakeLastMessage)
+               delete fakeLastMessage;
 }
 
 Entry *Slot::addEntry(Entry *e) {
@@ -153,9 +158,11 @@ Vector<Entry *> *Slot::getLiveEntries(bool resize) {
                }
        }
 
-       if (seqnumlive && !resize)
-               liveEntries->add(new LastMessage(this, machineid, seqnum));
-
+       if (seqnumlive && !resize) {
+               if (! fakeLastMessage)
+                       fakeLastMessage = new LastMessage(this, machineid, seqnum);
+               liveEntries->add(fakeLastMessage);
+       }
        return liveEntries;
 }
 
@@ -186,5 +193,8 @@ Array<char> *Slot::getSlotCryptIV() {
        buffer->putLong(machineid);
        int64_t localSequenceNumberShift = localSequenceNumber << 16;
        buffer->putLong(localSequenceNumberShift);
-       return buffer->array();
+       Array<char> * array = buffer->array();
+       buffer->releaseArray();
+       delete buffer;
+       return array;
 }
index 56732a40bb981527490a42ecb1bd1fc8edd4674b..d00cf4e3d470a7acd144cb27ef25c3643e144c7c 100644 (file)
@@ -28,9 +28,9 @@ private:
        int freespace;
        /** Reference to Table */
        Table *table;
-
+       LastMessage * fakeLastMessage;
+       
        int64_t localSequenceNumber;
-       void addShallowEntry(Entry *e);
 
 public:
        Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, Array<char> *_hmac, int64_t _localSequenceNumber);
@@ -41,6 +41,7 @@ public:
        Array<char> *getHMAC() { return hmac; }
        Array<char> *getPrevHMAC() { return prevhmac; }
        Entry *addEntry(Entry *e);
+       void addShallowEntry(Entry *e);
        bool hasSpace(Entry *e);
        Vector<Entry *> *getEntries();
        Array<char> *encode(Mac *mac);
index 4122cd1745114011c188511a8da88dbc877e255f..0f6d6259bd48e91b45387c359db3a7f9cee8caaf 100644 (file)
@@ -9,7 +9,7 @@
  * @version 1.0
  */
 
-#define SlotBuffer_DEFAULT_SIZE 16
+#define SlotBuffer_DEFAULT_SIZE 32
 
 class SlotBuffer {
 private:
index 531337cc5336f1c69dac6aed68018b2899355c98..1a33f76f1f1bc98f7e8c267e90e4ddc515655bc0 100644 (file)
@@ -60,7 +60,6 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i
        lastIsNewKey(false),
        lastNewSize(0),
        lastTransactionPartsSent(NULL),
-       lastPendingSendArbitrationEntriesToDelete(NULL),
        lastNewKey(NULL),
        committedKeyValueTable(NULL),
        speculatedKeyValueTable(NULL),
@@ -123,7 +122,6 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
        lastIsNewKey(false),
        lastNewSize(0),
        lastTransactionPartsSent(NULL),
-       lastPendingSendArbitrationEntriesToDelete(NULL),
        lastNewKey(NULL),
        committedKeyValueTable(NULL),
        speculatedKeyValueTable(NULL),
@@ -171,6 +169,7 @@ Table::~Table() {
                SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
                while (lmit->hasNext()) {
                        Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+                       delete pair;
                }
                delete lmit;
                delete lastMessageTable;
@@ -199,7 +198,7 @@ Table::~Table() {
                SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
                while (partsit->hasNext()) {
                        int64_t machineId = partsit->next();
-                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
                        delete parts;
                }
                delete partsit;
@@ -209,7 +208,7 @@ Table::~Table() {
                SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
                while (partsit->hasNext()) {
                        int64_t machineId = partsit->next();
-                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
+                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
                        delete parts;
                }
                delete partsit;
@@ -224,7 +223,7 @@ Table::~Table() {
                        int64_t arbitratorId = liveit->next();
                        
                        // Get all the commits for a specific arbitrator
-                       Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
+                       Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
                        {
                                SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
                                while (clientit->hasNext()) {
@@ -250,7 +249,15 @@ Table::~Table() {
                delete pendingTransactionQueue;
        }
        delete pendingSendArbitrationEntriesToDelete;
-       delete transactionPartsSent;
+       {
+               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+               while (trit->hasNext()) {
+                       Transaction *transaction = trit->next();
+                       delete trit->currVal();
+               }
+               delete trit;
+               delete transactionPartsSent;
+       }
        delete outstandingTransactionStatus;
        delete liveAbortsGeneratedByLocal;
        delete offlineTransactionsCommittedAndAtServer;
@@ -265,6 +272,8 @@ Table::~Table() {
        if (lastTransactionPartsSent != NULL)
                delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+       if (lastNewKey)
+               delete lastNewKey;
 }
 
 /**
@@ -320,7 +329,7 @@ void Table::initTable() {
        Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
        localSequenceNumber++;
        TableStatus *status = new TableStatus(s, numberOfSlots);
-       s->addEntry(status);
+       s->addShallowEntry(status);
        Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
 
        if (array == NULL) {
@@ -332,8 +341,10 @@ void Table::initTable() {
        } else if (array->length() == 1) {
                // in case we did push the slot BUT we failed to init it
                validateAndUpdate(array, true);
+               delete s;
                delete array;
        } else {
+               delete s;
                delete array;
                throw new Error("Error on initialization");
        }
@@ -585,6 +596,36 @@ int64_t Table::getLocalSequenceNumber() {
        return localSequenceNumber;
 }
 
+void Table::processTransactionList(bool handlePartial) {
+       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+       while (trit->hasNext()) {
+               Transaction *transaction = trit->next();
+               transaction->resetServerFailure();
+               // Update which transactions parts still need to be sent
+               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+               // Add the transaction status to the outstanding list
+               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+               
+               // Update the transaction status
+               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+               
+               // Check if all the transaction parts were successfully
+               // sent and if so then remove it from pending
+               if (transaction->didSendAllParts()) {
+                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+                       pendingTransactionQueue->remove(transaction);
+                       delete transaction;
+               } else if (handlePartial) {
+                       transaction->resetServerFailure();
+                       // Set the transaction sequence number back to nothing
+                       if (!transaction->didSendAPartToServer()) {
+                               transaction->setSequenceNumber(-1);
+                       }
+               }
+       }
+       delete trit;
+}
+
 NewKey * Table::handlePartialSend(NewKey * newKey) {
        //Didn't receive acknowledgement for last send
        //See if the server has received a newer slot
@@ -596,67 +637,23 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
                
                if (sendSlotsReturn) {
+                       lastSlotAttemptedToSend = NULL;
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                       delete newKey;
                                        newKey = NULL;
                                }
                        }
-                       
-                       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                       while (trit->hasNext()) {
-                               Transaction *transaction = trit->next();
-                               transaction->resetServerFailure();
-                               // Update which transactions parts still need to be sent
-                               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                               // Add the transaction status to the outstanding list
-                               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                               
-                               // Update the transaction status
-                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                               
-                               // Check if all the transaction parts were successfully
-                               // sent and if so then remove it from pending
-                               if (transaction->didSendAllParts()) {
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                       pendingTransactionQueue->remove(transaction);
-                               }
-                       }
-                       delete trit;
+                       processTransactionList(false);
                } else {
                        if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                                if (newKey != NULL) {
                                        if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                               delete newKey;
                                                newKey = NULL;
                                        }
                                }
-                               
-                               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                               while (trit->hasNext()) {
-                                       Transaction *transaction = trit->next();
-                                       transaction->resetServerFailure();
-                                       
-                                       // Update which transactions parts still need to be sent
-                                       transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                                       
-                                       // Add the transaction status to the outstanding list
-                                       outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                                       
-                                       // Update the transaction status
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                                       
-                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                                       if (transaction->didSendAllParts()) {
-                                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                               pendingTransactionQueue->remove(transaction);
-                                       } else {
-                                               transaction->resetServerFailure();
-                                               // Set the transaction sequence number back to nothing
-                                               if (!transaction->didSendAPartToServer()) {
-                                                       transaction->setSequenceNumber(-1);
-                                               }
-                                       }
-                               }
-                               delete trit;
+                               processTransactionList(true);
                        }
                }
                
@@ -679,37 +676,12 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                       delete newKey;
                                        newKey = NULL;
                                }
                        }
-                       
-                       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                       while (trit->hasNext()) {
-                               Transaction *transaction = trit->next();
-                               transaction->resetServerFailure();
-                               
-                               // Update which transactions parts still need to be sent
-                               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                               
-                               // Add the transaction status to the outstanding list
-                               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                               
-                               // Update the transaction status
-                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                               
-                               // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                               if (transaction->didSendAllParts()) {
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                       pendingTransactionQueue->remove(transaction);
-                               } else {
-                                       transaction->resetServerFailure();
-                                       // Set the transaction sequence number back to nothing
-                                       if (!transaction->didSendAPartToServer()) {
-                                               transaction->setSequenceNumber(-1);
-                                       }
-                               }
-                       }
-                       delete trit;
+
+                       processTransactionList(true);
                } else {
                        SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
                        while (trit->hasNext()) {
@@ -730,6 +702,18 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
        return newKey;
 }
 
+void Table::clearSentParts() {
+       // Clear the sent data since we are trying again
+       pendingSendArbitrationEntriesToDelete->clear();
+       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+       while (trit->hasNext()) {
+               Transaction *transaction = trit->next();
+               delete trit->currVal();
+       }
+       delete trit;
+       transactionPartsSent->clear();
+}
+
 bool Table::sendToServer(NewKey *newKey) {
        if (hadPartialSendToServer) {
                newKey = handlePartialSend(newKey);
@@ -744,6 +728,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        
                        // If there is a new key with same name then end
                        if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
+                               delete newKey;
                                return false;
                        }
 
@@ -771,28 +756,31 @@ bool Table::sendToServer(NewKey *newKey) {
                                delete trit;
 
                                // Clear the sent data since we are trying again
-                               pendingSendArbitrationEntriesToDelete->clear();
-                               transactionPartsSent->clear();
-
+                               clearSentParts();
+                                       
                                // We needed a resize so try again
                                fillSlot(slot, true, newKey, newSize, insertedNewKey);
                        }
-
+                       if (lastSlotAttemptedToSend != NULL)
+                               delete lastSlotAttemptedToSend;
+                       
                        lastSlotAttemptedToSend = slot;
                        lastIsNewKey = (newKey != NULL);
                        lastInsertedNewKey = insertedNewKey;
                        lastNewSize = newSize;
+                       if (( newKey != lastNewKey) && (lastNewKey != NULL))
+                               delete lastNewKey;
                        lastNewKey = newKey;
                        if (lastTransactionPartsSent != NULL)
                                delete lastTransactionPartsSent;
                        lastTransactionPartsSent = transactionPartsSent->clone();
-                       lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
 
                        Array<Slot *> * newSlots = NULL;
                        bool wasInserted = false;
                        bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
 
                        if (sendSlotsReturn) {
+                               lastSlotAttemptedToSend = NULL;
                                // Did insert into the block chain
                                if (insertedNewKey) {
                                        // This slot was what was inserted not a previous slot
@@ -815,28 +803,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                                delete pendingSendArbitrationRounds->get(i);
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
-
-                               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
-                               while (trit->hasNext()) {
-                                       Transaction *transaction = trit->next();
-                                       transaction->resetServerFailure();
-
-                                       // Update which transactions parts still need to be sent
-                                       transaction->removeSentParts(transactionPartsSent->get(transaction));
-
-                                       // Add the transaction status to the outstanding list
-                                       outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
-                                       // Update the transaction status
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
-                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                                       if (transaction->didSendAllParts()) {
-                                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                               pendingTransactionQueue->remove(transaction);
-                                       }
-                               }
-                               delete trit;
+                               processTransactionList(false);
                        } else {
                                // Reset which transaction to send
                                SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
@@ -853,8 +820,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        }
 
                        // Clear the sent data in preparation for next send
-                       pendingSendArbitrationEntriesToDelete->clear();
-                       transactionPartsSent->clear();
+                       clearSentParts();
 
                        if (newSlots->length() != 0) {
                                // insert into the local block chain
@@ -890,8 +856,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        delete trit;
                }
 
-               pendingSendArbitrationEntriesToDelete->clear();
-               transactionPartsSent->clear();
+               clearSentParts();
 
                throw e;
        }
@@ -1254,14 +1219,14 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize
        if (resize) {
                newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
                TableStatus *status = new TableStatus(slot, newSize);
-               slot->addEntry(status);
+               slot->addShallowEntry(status);
        }
 
        // Fill with rejected slots first before doing anything else
        doRejectedMessages(slot);
 
        // Do mandatory rescue of entries
-       ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+       ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
 
        // Extract working variables
        bool needsResize = mandatoryRescueReturn.getFirst();
@@ -1283,8 +1248,7 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize
        }
 
        // Clear the transactions, aborts and commits that were sent previously
-       transactionPartsSent->clear();
-       pendingSendArbitrationEntriesToDelete->clear();
+       clearSentParts();
        uint size = pendingSendArbitrationRounds->size();
        for (uint i = 0; i < size; i++) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
@@ -1363,7 +1327,7 @@ void Table::doRejectedMessages(Slot *s) {
                if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
                        int64_t new_seqn = rejectedSlotVector->lastElement();
                        RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
-                       s->addEntry(rm);
+                       s->addShallowEntry(rm);
                } else {
                        int64_t prev_seqn = -1;
                        uint i = 0;
@@ -1378,7 +1342,7 @@ void Table::doRejectedMessages(Slot *s) {
                        /* Generate rejected message entry for missing messages */
                        if (prev_seqn != -1) {
                                RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
-                               s->addEntry(rm);
+                               s->addShallowEntry(rm);
                        }
                        /* Generate rejected message entries for present messages */
                        for (; i < rejectedSlotVector->size(); i++) {
@@ -1386,13 +1350,13 @@ void Table::doRejectedMessages(Slot *s) {
                                Slot *s_msg = buffer->getSlot(curr_seqn);
                                int64_t machineid = s_msg->getMachineID();
                                RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
-                               s->addEntry(rm);
+                               s->addShallowEntry(rm);
                        }
                }
        }
 }
 
-ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
+ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
        int64_t newestSequenceNumber = buffer->getNewestSeqNum();
        int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
        if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
@@ -1802,6 +1766,8 @@ void Table::arbitrateFromServer() {
                lastSeqNumArbOn = transactionSequenceNumber;
        }
 
+       delete transactionSequenceNumbers;
+
        Commit *newCommit = NULL;
 
        // If there is something to commit
@@ -1849,6 +1815,8 @@ void Table::arbitrateFromServer() {
                                }
                        }
                }
+       } else {
+               delete generatedAborts;
        }
 }
 
@@ -1886,7 +1854,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                while (kvit->hasNext()) {
                        KeyValue *kv = kvit->next();
-                       newCommit->addKV(kv);
+                       newCommit->addKV(kv->getCopy());
                }
                delete kvit;
 
@@ -2267,6 +2235,7 @@ bool Table::updateCommittedTable() {
                                delete kvit;
                        }
                }
+               delete commitSequenceNumbers;
        }
        delete liveit;
 
@@ -2323,6 +2292,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
 
        if (startIndex >= transactionSequenceNumbersSorted->size()) {
                // Make sure we are not out of bounds
+               delete transactionSequenceNumbersSorted;
                return false;           // did not speculate
        }
 
@@ -2361,6 +2331,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                }
        }
 
+       delete transactionSequenceNumbersSorted;
+       
        if (didSkip) {
                // Since there was a skip we need to redo the speculation next time around
                lastTransactionSequenceNumberSpeculatedOn = -1;
@@ -2708,7 +2680,7 @@ void Table::processEntry(TransactionPart *entry) {
 
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }
index dfe188652d4192c95a3c13455685896c647ec5c8..713327736c19700a4d8831ccb0a84152c602a51e 100644 (file)
@@ -57,7 +57,8 @@ private:
        Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
        Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
        NewKey *lastNewKey;
-
+       void processTransactionList(bool handlePartial);
+       void clearSentParts();
 
        /* Data Structures  */
        Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable;// Table of committed key value pairs
@@ -109,7 +110,7 @@ private:
        bool fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey);
        void doRejectedMessages(Slot *s);
 
-       ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
+       ThreeTuple<bool, bool, int64_t> doMandatoryRescue(Slot *slot, bool resize);
 
        void  doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
        /**
index 7b7733ca287a6bbe538548c214f4da302c22dfb9..dd3fc6216d3cd7794776fa200b8170fbb65b56c4 100644 (file)
@@ -12,14 +12,18 @@ int main(int numargs, char ** args) {
        Vector<TransactionStatus *> * transStatusList = new Vector<TransactionStatus *>();
 
        // Setup the 2 clients
-       Table * t1 = new Table(new IoTString("http://dc-6.calit2.uci.edu/test.iotcloud/"), new IoTString("reallysecret"), 321, -1);
+       IoTString *baseurl = new IoTString("http://dc-6.calit2.uci.edu/test.iotcloud/");
+       IoTString * password = new IoTString("reallysecret");
+       Table * t1 = new Table(baseurl, password, 321, -1);
        t1->initTable();
        printf("T1 Ready\n");
 
-       Table * t2 = new Table(new IoTString("http://dc-6.calit2.uci.edu/test.iotcloud/"), new IoTString("reallysecret"), 351, -1);
+       Table * t2 = new Table(baseurl, password, 351, -1);
        t2->update();
        printf("T2 Ready\n");
 
+       delete baseurl; delete password;
+       
        // Make the Keys
        printf("Setting up keys\n");
        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
index 80e70cd1dc456c2992fd9c17c58f1f9b8b39871e..258c22b6d61c43f113aa4f3f6363024c4b11e7d1 100644 (file)
@@ -27,15 +27,38 @@ Transaction::Transaction() :
 }
 
 Transaction::~Transaction() {
-       delete parts;
-       delete keyValueGuardSet;
-       delete keyValueUpdateSet;
+       if (missingParts)
+               delete missingParts;
+       {
+               delete parts;
+       }
+       {
+               SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+               while (kvit->hasNext()) {
+                       KeyValue *kvGuard = kvit->next();
+                       delete kvGuard;
+               }
+               delete kvit;
+               delete keyValueGuardSet;
+       }
+       {
+               SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
+               while (kvit->hasNext()) {
+                       KeyValue *kvUpdate = kvit->next();
+                       delete kvUpdate;
+               }
+               delete kvit;
+               delete keyValueUpdateSet;
+       }
+       delete partsPendingSend;
 }
 
 void Transaction::addPartEncode(TransactionPart *newPart) {
        TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
-       if (old == NULL)
+       if (old == NULL) {
                partCount++;
+       } else
+               delete old;
        partsPendingSend->add(newPart->getPartNumber());
 
        sequenceNumber = newPart->getSequenceNumber();
@@ -270,6 +293,7 @@ void Transaction::decodeTransactionData() {
                KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
                keyValueUpdateSet->add(kv);
        }
+       delete bbDecode;
 }
 
 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
index fc13ead0c4bfe35097784bb540cec31f7af1024e..e8f43566b75a09de750340b26d5aaf77b90cce12 100644 (file)
@@ -8,6 +8,10 @@ int TransactionPart::getSize() {
        return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length();
 }
 
+TransactionPart::~TransactionPart() {
+       delete data;
+}
+
 Pair<int64_t, int64_t> TransactionPart::getTransactionId() {
        return transactionId;
 }
@@ -16,8 +20,8 @@ int64_t TransactionPart::getArbitratorId() {
        return arbitratorId;
 }
 
-Pair<int64_t, int32_t> TransactionPart::getPartId() {
-       return partId;
+Pair<int64_t, int32_t> TransactionPart::getPartId() {
+       return partId;
 }
 
 int TransactionPart::getPartNumber() {
@@ -93,7 +97,7 @@ char TransactionPart::getType() {
 }
 
 Entry *TransactionPart::getCopy(Slot *s) {
-       TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, fldisLastPart);
+       TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, new Array<char>(data), fldisLastPart);
        copyTransaction->setSequenceNumber(sequenceNumber);
 
        return copyTransaction;
index c8aa649d685be8e42ae4afc1092a89b1567f3c8e..c254049fab45f1a7687a46394c77bfcd4d59fad4 100644 (file)
@@ -33,11 +33,11 @@ public:
                partId(Pair<int64_t, int32_t>(clientLocalSequenceNumber, partNumber)),
                data(_data) {
        }
-
+       ~TransactionPart();
        int getSize();
        Pair<int64_t, int64_t> getTransactionId();
        int64_t getArbitratorId();
-       Pair<int64_t, int32_t> getPartId();
+       Pair<int64_t, int32_t> getPartId();
        int getPartNumber();
        int getDataSize();
        Array<char> *getData();
index a8143dba9fbccc134dcc3f89b175629a50ee674c..5c81425b5b26ca9ad69a5587c806cf8b01e5c128 100644 (file)
@@ -47,7 +47,11 @@ public:
                return curr != NULL;
        }
 
-       _Key next() {
+  _Val currVal() {
+         return last->val;
+  }
+
+  _Key next() {
                _Key k = curr->key;
                last = curr;
                curr = curr->next;