From: bdemsky Date: Fri, 16 Mar 2018 15:08:34 +0000 (-0700) Subject: edits X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=0173578905303681df8ea5f3c35b3ead109c8ba8;p=iotcloud.git edits --- diff --git a/version2/src/C/ArbitrationRound.cc b/version2/src/C/ArbitrationRound.cc index 543c2d0..159aeae 100644 --- a/version2/src/C/ArbitrationRound.cc +++ b/version2/src/C/ArbitrationRound.cc @@ -28,7 +28,7 @@ void ArbitrationRound::generateParts() { if (didGenerateParts) { return; } - parts = new Vector(); + parts->clear(); SetIterator *abit = abortsBefore->iterator(); while (abit->hasNext()) parts->add((Entry *)abit->next()); diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index c229259..8fe0cee 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -46,7 +46,7 @@ void *threadWrapper(void *cloud) { * Constructor for actual use. Takes in the url and password. */ CloudComm::CloudComm(Table *_table, IoTString *_baseurl, IoTString *_password, int _listeningPort) : - baseurl(_baseurl), + baseurl(new IoTString(_baseurl)), key(NULL), mac(NULL), password(new IoTString(_password)), @@ -74,6 +74,10 @@ CloudComm::~CloudComm() { delete random; if (baseurl) delete baseurl; + if (mac) + delete mac; + if (key) + delete key; } /** @@ -135,6 +139,7 @@ IoTString *CloudComm::buildRequest(bool isput, int64_t sequencenumber, int64_t m if (maxentries != 0) sprintf(&buffer[offset], "&max=%" PRId64, maxentries); IoTString *urlstr = new IoTString(buffer); + free(buffer); return urlstr; } @@ -225,6 +230,7 @@ WebConnection openURL(IoTString *url) { /* send the request */ int total = strlen(message); loopWrite(sockfd, message, total); + free(message); return (WebConnection) {sockfd, -1}; } @@ -411,6 +417,7 @@ void CloudComm::setSalt() { timer->startTime(); wc = openURL(urlstr); + delete urlstr; writeURLDataAndClose(&wc, saltTmp); int responsecode = getResponseCode(&wc); @@ -554,11 +561,14 @@ Array *CloudComm::putSlot(Slot *slot, int max) { int64_t sequencenumber = slot->getSequenceNumber(); Array *slotBytes = slot->encode(mac); - Array *chars = encryptSlotAndPrependIV(slotBytes, slot->getSlotCryptIV()); + Array * ivBytes = slot->getSlotCryptIV(); + Array *chars = encryptSlotAndPrependIV(slotBytes, ivBytes); + delete ivBytes; delete slotBytes; IoTString *url = buildRequest(true, sequencenumber, max); timer->startTime(); wc = openURL(url); + delete url; writeURLDataAndClose(&wc, chars); delete chars; timer->endTime(); @@ -625,6 +635,7 @@ Array *CloudComm::getSlots(int64_t sequencenumber) { IoTString *url = buildRequest(false, sequencenumber, 0); timer->startTime(); wc = openURL(url); + delete url; closeURLReq(&wc); timer->endTime(); } catch (SocketTimeoutException *e) { diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index a24e014..2ee208d 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -33,8 +33,15 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction Commit::~Commit() { delete parts; - delete keyValueUpdateSet; - delete liveKeys; + { + SetIterator * keyit = keyValueUpdateSet->iterator(); + while(keyit->hasNext()) { + delete keyit->next(); + } + delete keyit; + delete keyValueUpdateSet; + } + delete liveKeys; if (missingParts != NULL) delete missingParts; } @@ -99,8 +106,9 @@ Vector *Commit::getParts() { } void Commit::addKV(KeyValue *kv) { - keyValueUpdateSet->add(kv); - liveKeys->add(kv->getKey()); + KeyValue * kvcopy = kv->getCopy(); + keyValueUpdateSet->add(kvcopy); + liveKeys->add(kvcopy->getKey()); } void Commit::invalidateKey(IoTString *key) { @@ -166,6 +174,7 @@ void Commit::createCommitParts() { commitPartCount++; remaining -= copySize; } + delete charData; } void Commit::decodeCommitData() { @@ -201,6 +210,7 @@ void Commit::decodeCommitData() { keyValueUpdateSet->add(kv); liveKeys->add(kv->getKey()); } + delete bbDecode; } Array *Commit::convertDataToBytes() { @@ -227,8 +237,10 @@ Array *Commit::convertDataToBytes() { kv->encode(bbEncode); } delete kvit; - - return bbEncode->array(); + Array * array = bbEncode->array(); + bbEncode->releaseArray(); + delete bbEncode; + return array; } void Commit::setKVsMap(Hashset *newKVs) { @@ -237,8 +249,9 @@ void Commit::setKVsMap(Hashset *kvit = newKVs->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - liveKeys->add(kv->getKey()); - keyValueUpdateSet->add(kv); + KeyValue *kvcopy = kv->getCopy(); + liveKeys->add(kvcopy->getKey()); + keyValueUpdateSet->add(kvcopy); } delete kvit; } diff --git a/version2/src/C/Entry.h b/version2/src/C/Entry.h index acdd04a..2a0c9f4 100644 --- a/version2/src/C/Entry.h +++ b/version2/src/C/Entry.h @@ -29,7 +29,7 @@ protected: public: Entry(Slot *_parentslot) : islive(true), parentslot(_parentslot) {} - + virtual ~Entry() {} /** * Returns true if the Entry object is still live. */ diff --git a/version2/src/C/NewKey.cc b/version2/src/C/NewKey.cc index a3a1a87..0b35933 100644 --- a/version2/src/C/NewKey.cc +++ b/version2/src/C/NewKey.cc @@ -23,7 +23,7 @@ Entry *NewKey_decode(Slot *slot, ByteBuffer *bb) { return newkey; } -Entry *NewKey::getCopy(Slot *s) { return new NewKey(s, new IoTString(key), machineid); } +Entry *NewKey::getCopy(Slot *s) { return new NewKey(s, key, machineid); } void NewKey::encode(ByteBuffer *bb) { bb->put(TypeNewKey); diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index f453219..45602a0 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -137,7 +137,8 @@ Transaction *PendingTransaction::createTransaction() { transactionPartCount++; remaining -= copySize; } - + delete charData; + // Add the Guard Conditions SetIterator *kvit = keyValueGuardSet->iterator(); while (kvit->hasNext()) { @@ -185,5 +186,8 @@ Array *PendingTransaction::convertDataToBytes() { } delete kvit; - return bbEncode->array(); + Array *array = bbEncode->array(); + bbEncode->releaseArray(); + delete bbEncode; + return array; } diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 4a6f268..644854e 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -17,7 +17,8 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_pre seqnumlive(true), freespace(SLOT_SIZE - getBaseSize()), table(_table), - localSequenceNumber(_localSequenceNumber) { + localSequenceNumber(_localSequenceNumber), + fakeLastMessage(NULL) { } Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, int64_t _localSequenceNumber) : @@ -30,7 +31,8 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_pre seqnumlive(true), freespace(SLOT_SIZE - getBaseSize()), table(_table), - localSequenceNumber(_localSequenceNumber) { + localSequenceNumber(_localSequenceNumber), + fakeLastMessage(NULL) { } Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) : @@ -43,7 +45,8 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSeq seqnumlive(true), freespace(SLOT_SIZE - getBaseSize()), table(_table), - localSequenceNumber(_localSequenceNumber) { + localSequenceNumber(_localSequenceNumber), + fakeLastMessage(NULL) { } Slot::~Slot() { @@ -53,6 +56,8 @@ Slot::~Slot() { for(uint i=0; i< entries->size(); i++) delete entries->get(i); delete entries; + if (fakeLastMessage) + delete fakeLastMessage; } Entry *Slot::addEntry(Entry *e) { @@ -153,9 +158,11 @@ Vector *Slot::getLiveEntries(bool resize) { } } - if (seqnumlive && !resize) - liveEntries->add(new LastMessage(this, machineid, seqnum)); - + if (seqnumlive && !resize) { + if (! fakeLastMessage) + fakeLastMessage = new LastMessage(this, machineid, seqnum); + liveEntries->add(fakeLastMessage); + } return liveEntries; } @@ -186,5 +193,8 @@ Array *Slot::getSlotCryptIV() { buffer->putLong(machineid); int64_t localSequenceNumberShift = localSequenceNumber << 16; buffer->putLong(localSequenceNumberShift); - return buffer->array(); + Array * array = buffer->array(); + buffer->releaseArray(); + delete buffer; + return array; } diff --git a/version2/src/C/Slot.h b/version2/src/C/Slot.h index 56732a4..d00cf4e 100644 --- a/version2/src/C/Slot.h +++ b/version2/src/C/Slot.h @@ -28,9 +28,9 @@ private: int freespace; /** Reference to Table */ Table *table; - + LastMessage * fakeLastMessage; + int64_t localSequenceNumber; - void addShallowEntry(Entry *e); public: Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, Array *_hmac, int64_t _localSequenceNumber); @@ -41,6 +41,7 @@ public: Array *getHMAC() { return hmac; } Array *getPrevHMAC() { return prevhmac; } Entry *addEntry(Entry *e); + void addShallowEntry(Entry *e); bool hasSpace(Entry *e); Vector *getEntries(); Array *encode(Mac *mac); diff --git a/version2/src/C/SlotBuffer.h b/version2/src/C/SlotBuffer.h index 4122cd1..0f6d625 100644 --- a/version2/src/C/SlotBuffer.h +++ b/version2/src/C/SlotBuffer.h @@ -9,7 +9,7 @@ * @version 1.0 */ -#define SlotBuffer_DEFAULT_SIZE 16 +#define SlotBuffer_DEFAULT_SIZE 32 class SlotBuffer { private: diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 531337c..1a33f76 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -60,7 +60,6 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i lastIsNewKey(false), lastNewSize(0), lastTransactionPartsSent(NULL), - lastPendingSendArbitrationEntriesToDelete(NULL), lastNewKey(NULL), committedKeyValueTable(NULL), speculatedKeyValueTable(NULL), @@ -123,7 +122,6 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : lastIsNewKey(false), lastNewSize(0), lastTransactionPartsSent(NULL), - lastPendingSendArbitrationEntriesToDelete(NULL), lastNewKey(NULL), committedKeyValueTable(NULL), speculatedKeyValueTable(NULL), @@ -171,6 +169,7 @@ Table::~Table() { SetIterator *> *lmit = getKeyIterator(lastMessageTable); while (lmit->hasNext()) { Pair * pair = lastMessageTable->get(lmit->next()); + delete pair; } delete lmit; delete lastMessageTable; @@ -199,7 +198,7 @@ Table::~Table() { SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); while (partsit->hasNext()) { int64_t machineId = partsit->next(); - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); delete parts; } delete partsit; @@ -209,7 +208,7 @@ Table::~Table() { SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); while (partsit->hasNext()) { int64_t machineId = partsit->next(); - Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); delete parts; } delete partsit; @@ -224,7 +223,7 @@ Table::~Table() { int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator - Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); + Hashtable *commitForClientTable = liveit->currVal(); { SetIterator *clientit = getKeyIterator(commitForClientTable); while (clientit->hasNext()) { @@ -250,7 +249,15 @@ Table::~Table() { delete pendingTransactionQueue; } delete pendingSendArbitrationEntriesToDelete; - delete transactionPartsSent; + { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + delete transactionPartsSent; + } delete outstandingTransactionStatus; delete liveAbortsGeneratedByLocal; delete offlineTransactionsCommittedAndAtServer; @@ -265,6 +272,8 @@ Table::~Table() { if (lastTransactionPartsSent != NULL) delete lastTransactionPartsSent; delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; + if (lastNewKey) + delete lastNewKey; } /** @@ -320,7 +329,7 @@ void Table::initTable() { Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber); localSequenceNumber++; TableStatus *status = new TableStatus(s, numberOfSlots); - s->addEntry(status); + s->addShallowEntry(status); Array *array = cloud->putSlot(s, numberOfSlots); if (array == NULL) { @@ -332,8 +341,10 @@ void Table::initTable() { } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); + delete s; delete array; } else { + delete s; delete array; throw new Error("Error on initialization"); } @@ -585,6 +596,36 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } +void Table::processTransactionList(bool handlePartial) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + delete transaction; + } else if (handlePartial) { + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + } + delete trit; +} + NewKey * Table::handlePartialSend(NewKey * newKey) { //Didn't receive acknowledgement for last send //See if the server has received a newer slot @@ -596,67 +637,23 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully - // sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; + processTransactionList(false); } else { if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; + processTransactionList(true); } } @@ -679,37 +676,12 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; + + processTransactionList(true); } else { SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { @@ -730,6 +702,18 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { return newKey; } +void Table::clearSentParts() { + // Clear the sent data since we are trying again + pendingSendArbitrationEntriesToDelete->clear(); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + transactionPartsSent->clear(); +} + bool Table::sendToServer(NewKey *newKey) { if (hadPartialSendToServer) { newKey = handlePartialSend(newKey); @@ -744,6 +728,7 @@ bool Table::sendToServer(NewKey *newKey) { // If there is a new key with same name then end if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { + delete newKey; return false; } @@ -771,28 +756,31 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; // Clear the sent data since we are trying again - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); - + clearSentParts(); + // We needed a resize so try again fillSlot(slot, true, newKey, newSize, insertedNewKey); } - + if (lastSlotAttemptedToSend != NULL) + delete lastSlotAttemptedToSend; + lastSlotAttemptedToSend = slot; lastIsNewKey = (newKey != NULL); lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; + if (( newKey != lastNewKey) && (lastNewKey != NULL)) + delete lastNewKey; lastNewKey = newKey; if (lastTransactionPartsSent != NULL) delete lastTransactionPartsSent; lastTransactionPartsSent = transactionPartsSent->clone(); - lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); Array * newSlots = NULL; bool wasInserted = false; bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; // Did insert into the block chain if (insertedNewKey) { // This slot was what was inserted not a previous slot @@ -815,28 +803,7 @@ bool Table::sendToServer(NewKey *newKey) { delete pendingSendArbitrationRounds->get(i); } pendingSendArbitrationRounds->setSize(oldcount); - - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(transactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; + processTransactionList(false); } else { // Reset which transaction to send SetIterator *> *trit = getKeyIterator(transactionPartsSent); @@ -853,8 +820,7 @@ bool Table::sendToServer(NewKey *newKey) { } // Clear the sent data in preparation for next send - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); + clearSentParts(); if (newSlots->length() != 0) { // insert into the local block chain @@ -890,8 +856,7 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; } - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); + clearSentParts(); throw e; } @@ -1254,14 +1219,14 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize if (resize) { newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE); TableStatus *status = new TableStatus(slot, newSize); - slot->addEntry(status); + slot->addShallowEntry(status); } // Fill with rejected slots first before doing anything else doRejectedMessages(slot); // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); + ThreeTuple mandatoryRescueReturn = doMandatoryRescue(slot, resize); // Extract working variables bool needsResize = mandatoryRescueReturn.getFirst(); @@ -1283,8 +1248,7 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize } // Clear the transactions, aborts and commits that were sent previously - transactionPartsSent->clear(); - pendingSendArbitrationEntriesToDelete->clear(); + clearSentParts(); uint size = pendingSendArbitrationRounds->size(); for (uint i = 0; i < size; i++) { ArbitrationRound *round = pendingSendArbitrationRounds->get(i); @@ -1363,7 +1327,7 @@ void Table::doRejectedMessages(Slot *s) { if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); - s->addEntry(rm); + s->addShallowEntry(rm); } else { int64_t prev_seqn = -1; uint i = 0; @@ -1378,7 +1342,7 @@ void Table::doRejectedMessages(Slot *s) { /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); - s->addEntry(rm); + s->addShallowEntry(rm); } /* Generate rejected message entries for present messages */ for (; i < rejectedSlotVector->size(); i++) { @@ -1386,13 +1350,13 @@ void Table::doRejectedMessages(Slot *s) { Slot *s_msg = buffer->getSlot(curr_seqn); int64_t machineid = s_msg->getMachineID(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); - s->addEntry(rm); + s->addShallowEntry(rm); } } } } -ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize) { +ThreeTuple Table::doMandatoryRescue(Slot *slot, bool resize) { int64_t newestSequenceNumber = buffer->getNewestSeqNum(); int64_t oldestSequenceNumber = buffer->getOldestSeqNum(); if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { @@ -1802,6 +1766,8 @@ void Table::arbitrateFromServer() { lastSeqNumArbOn = transactionSequenceNumber; } + delete transactionSequenceNumbers; + Commit *newCommit = NULL; // If there is something to commit @@ -1849,6 +1815,8 @@ void Table::arbitrateFromServer() { } } } + } else { + delete generatedAborts; } } @@ -1886,7 +1854,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - newCommit->addKV(kv); + newCommit->addKV(kv->getCopy()); } delete kvit; @@ -2267,6 +2235,7 @@ bool Table::updateCommittedTable() { delete kvit; } } + delete commitSequenceNumbers; } delete liveit; @@ -2323,6 +2292,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds + delete transactionSequenceNumbersSorted; return false; // did not speculate } @@ -2361,6 +2331,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } } + delete transactionSequenceNumbersSorted; + if (didSkip) { // Since there was a skip we need to redo the speculation next time around lastTransactionSequenceNumberSpeculatedOn = -1; @@ -2708,7 +2680,7 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); + TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index dfe1886..7133277 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -57,7 +57,8 @@ private: Hashtable *> *lastTransactionPartsSent; Vector *lastPendingSendArbitrationEntriesToDelete; NewKey *lastNewKey; - + void processTransactionList(bool handlePartial); + void clearSentParts(); /* Data Structures */ Hashtable *committedKeyValueTable;// Table of committed key value pairs @@ -109,7 +110,7 @@ private: bool fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey); void doRejectedMessages(Slot *s); - ThreeTuple doMandatoryResuce(Slot *slot, bool resize); + ThreeTuple doMandatoryRescue(Slot *slot, bool resize); void doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize); /** diff --git a/version2/src/C/Test.C b/version2/src/C/Test.C index 7b7733c..dd3fc62 100644 --- a/version2/src/C/Test.C +++ b/version2/src/C/Test.C @@ -12,14 +12,18 @@ int main(int numargs, char ** args) { Vector * transStatusList = new Vector(); // Setup the 2 clients - Table * t1 = new Table(new IoTString("http://dc-6.calit2.uci.edu/test.iotcloud/"), new IoTString("reallysecret"), 321, -1); + IoTString *baseurl = new IoTString("http://dc-6.calit2.uci.edu/test.iotcloud/"); + IoTString * password = new IoTString("reallysecret"); + Table * t1 = new Table(baseurl, password, 321, -1); t1->initTable(); printf("T1 Ready\n"); - Table * t2 = new Table(new IoTString("http://dc-6.calit2.uci.edu/test.iotcloud/"), new IoTString("reallysecret"), 351, -1); + Table * t2 = new Table(baseurl, password, 351, -1); t2->update(); printf("T2 Ready\n"); + delete baseurl; delete password; + // Make the Keys printf("Setting up keys\n"); for (int i = 0; i < NUMBER_OF_TESTS; i++) { diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 80e70cd..258c22b 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -27,15 +27,38 @@ Transaction::Transaction() : } Transaction::~Transaction() { - delete parts; - delete keyValueGuardSet; - delete keyValueUpdateSet; + if (missingParts) + delete missingParts; + { + delete parts; + } + { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); + delete kvGuard; + } + delete kvit; + delete keyValueGuardSet; + } + { + SetIterator *kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvUpdate = kvit->next(); + delete kvUpdate; + } + delete kvit; + delete keyValueUpdateSet; + } + delete partsPendingSend; } void Transaction::addPartEncode(TransactionPart *newPart) { TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); - if (old == NULL) + if (old == NULL) { partCount++; + } else + delete old; partsPendingSend->add(newPart->getPartNumber()); sequenceNumber = newPart->getSequenceNumber(); @@ -270,6 +293,7 @@ void Transaction::decodeTransactionData() { KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); keyValueUpdateSet->add(kv); } + delete bbDecode; } bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { diff --git a/version2/src/C/TransactionPart.cc b/version2/src/C/TransactionPart.cc index fc13ead..e8f4356 100644 --- a/version2/src/C/TransactionPart.cc +++ b/version2/src/C/TransactionPart.cc @@ -8,6 +8,10 @@ int TransactionPart::getSize() { return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length(); } +TransactionPart::~TransactionPart() { + delete data; +} + Pair TransactionPart::getTransactionId() { return transactionId; } @@ -16,8 +20,8 @@ int64_t TransactionPart::getArbitratorId() { return arbitratorId; } -Pair TransactionPart::getPartId() { - return partId; +Pair * TransactionPart::getPartId() { + return & partId; } int TransactionPart::getPartNumber() { @@ -93,7 +97,7 @@ char TransactionPart::getType() { } Entry *TransactionPart::getCopy(Slot *s) { - TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, fldisLastPart); + TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, new Array(data), fldisLastPart); copyTransaction->setSequenceNumber(sequenceNumber); return copyTransaction; diff --git a/version2/src/C/TransactionPart.h b/version2/src/C/TransactionPart.h index c8aa649..c254049 100644 --- a/version2/src/C/TransactionPart.h +++ b/version2/src/C/TransactionPart.h @@ -33,11 +33,11 @@ public: partId(Pair(clientLocalSequenceNumber, partNumber)), data(_data) { } - + ~TransactionPart(); int getSize(); Pair getTransactionId(); int64_t getArbitratorId(); - Pair getPartId(); + Pair * getPartId(); int getPartNumber(); int getDataSize(); Array *getData(); diff --git a/version2/src/C/hashset.h b/version2/src/C/hashset.h index a8143db..5c81425 100644 --- a/version2/src/C/hashset.h +++ b/version2/src/C/hashset.h @@ -47,7 +47,11 @@ public: return curr != NULL; } - _Key next() { + _Val currVal() { + return last->val; + } + + _Key next() { _Key k = curr->key; last = curr; curr = curr->next;