From 793c445e17d24c146e7e820a5ecb5f02a5e00ea3 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Tue, 13 Mar 2018 11:57:20 -0700 Subject: [PATCH] edits --- version2/src/C/ByteBuffer.h | 5 +- version2/src/C/CloudComm.cc | 14 +++-- version2/src/C/IoTString.h | 14 ++++- version2/src/C/NewKey.cc | 6 +- version2/src/C/Slot.cc | 5 +- version2/src/C/Table.cc | 108 +++++++++++++++++++++++++++++----- version2/src/C/Table.h | 2 +- version2/src/C/Test.C | 30 ++++++++-- version2/src/C/Transaction.cc | 4 +- 9 files changed, 155 insertions(+), 33 deletions(-) diff --git a/version2/src/C/ByteBuffer.h b/version2/src/C/ByteBuffer.h index 7b8d2e4..06c309b 100644 --- a/version2/src/C/ByteBuffer.h +++ b/version2/src/C/ByteBuffer.h @@ -13,8 +13,11 @@ public: char get(); void get(Array *array); void position(int32_t newPosition); + void releaseArray() {buffer = NULL;} Array *array(); -private: + ~ByteBuffer() {if (buffer != NULL) delete buffer;} + + private: ByteBuffer(Array *array); friend ByteBuffer *ByteBuffer_wrap(Array *array); friend ByteBuffer *ByteBuffer_allocate(uint size); diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index c20488d..2a7d233 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -499,13 +499,10 @@ Array *AESDecrypt(Array *ivBytes, AESKey *key, Array *data) { Array *CloudComm::encryptSlotAndPrependIV(Array *rawData, Array *ivBytes) { try { Array *encryptedBytes = AESEncrypt(ivBytes, key, rawData); - Array *origBytes = AESDecrypt(ivBytes, key, encryptedBytes); - if (!rawData->equals(origBytes)) - throw new Error("BAD"); Array *chars = new Array(encryptedBytes->length() + CloudComm_IV_SIZE); System_arraycopy(ivBytes, 0, chars, 0, ivBytes->length()); System_arraycopy(encryptedBytes, 0, chars, CloudComm_IV_SIZE, encryptedBytes->length()); - + delete encryptedBytes; return chars; } catch (Exception *e) { throw new Error("Failed To Encrypt"); @@ -518,7 +515,10 @@ Array *CloudComm::stripIVAndDecryptSlot(Array *rawData) { Array *encryptedBytes = new Array(rawData->length() - CloudComm_IV_SIZE); System_arraycopy(rawData, 0, ivBytes, 0, CloudComm_IV_SIZE); System_arraycopy(rawData, CloudComm_IV_SIZE, encryptedBytes, 0, encryptedBytes->length()); - return AESDecrypt(ivBytes, key, encryptedBytes); + Array * data = AESDecrypt(ivBytes, key, encryptedBytes); + delete encryptedBytes; + delete ivBytes; + return data; } catch (Exception *e) { throw new Error("Failed To Decrypt"); } @@ -542,10 +542,12 @@ Array *CloudComm::putSlot(Slot *slot, int max) { int64_t sequencenumber = slot->getSequenceNumber(); Array *slotBytes = slot->encode(mac); Array *chars = encryptSlotAndPrependIV(slotBytes, slot->getSlotCryptIV()); + delete slotBytes; IoTString *url = buildRequest(true, sequencenumber, max); timer->startTime(); wc = openURL(url); writeURLDataAndClose(&wc, chars); + delete chars; timer->endTime(); } catch (ServerException *e) { timer->endTime(); @@ -652,7 +654,9 @@ Array *CloudComm::processSlots(WebConnection *wc) { Array *rawData = new Array(sizesofslots->get(i)); readURLData(wc, rawData); Array *data = stripIVAndDecryptSlot(rawData); + delete rawData; slots->set(i, Slot_decode(table, data, mac)); + delete data; } return slots; } diff --git a/version2/src/C/IoTString.h b/version2/src/C/IoTString.h index 1fb527f..583a098 100644 --- a/version2/src/C/IoTString.h +++ b/version2/src/C/IoTString.h @@ -3,6 +3,7 @@ #include "array.h" #include +#include /** * IoTString wraps the underlying char string. * @author Brian Demsky @@ -13,7 +14,7 @@ inline unsigned int hashCharArray(Array *array) { uint len = array->length(); unsigned int hash = 0; for (uint i = 0; i < len; i++) { - hash = 31 * hash + array->get(i); + hash = 31 * hash + ((unsigned int) array->get(i)); } return hash; } @@ -42,14 +43,19 @@ public: } IoTString(IoTString *string) : - array(new Array(string->array)), - hashvalue(hashCharArray(array)) { + array(new Array(string->array)), + hashvalue(string->hashvalue) { } ~IoTString() { delete array; } + void print() { + for(uint i=0; i < array->length(); i++) + printf("%c", array->get(i)); + }; + char get(uint i) {return array->get(i);} /** @@ -69,6 +75,7 @@ public: * Returns the length in chars of the IoTString. */ + bool equals(IoTString *str) { uint strlength = str->array->length(); uint thislength = array->length(); @@ -87,6 +94,7 @@ public: inline IoTString *IoTString_shallow(Array *_array) { IoTString *str = new IoTString(); str->array = _array; + str->hashvalue = hashCharArray(_array); return str; } diff --git a/version2/src/C/NewKey.cc b/version2/src/C/NewKey.cc index a302a78..a3a1a87 100644 --- a/version2/src/C/NewKey.cc +++ b/version2/src/C/NewKey.cc @@ -17,8 +17,10 @@ Entry *NewKey_decode(Slot *slot, ByteBuffer *bb) { Array *key = new Array(keylength); bb->get(key); int64_t machineid = bb->getLong(); - - return new NewKey(slot, IoTString_shallow(key), machineid); + IoTString *str = IoTString_shallow(key); + NewKey *newkey = new NewKey(slot, str, machineid); + delete str; + return newkey; } Entry *NewKey::getCopy(Slot *s) { return new NewKey(s, new IoTString(key), machineid); } diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 10c92de..03aa3cc 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -93,7 +93,8 @@ Slot *Slot_decode(Table *table, Array *array, Mac *mac) { for (int i = 0; i < numentries; i++) { slot->addShallowEntry(Entry_decode(slot, bb)); } - + bb->releaseArray(); + delete bb; return slot; } @@ -120,6 +121,8 @@ Array *Slot::encode(Mac *mac) { hmac = realmac; bb->position(0); bb->put(realmac); + bb->releaseArray(); + delete bb; return array; } diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 88f43ee..a566009 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -168,15 +168,68 @@ Table::~Table() { delete pendingTransactionSpeculatedKeyValueTable; delete liveNewKeyTable; delete lastMessageTable; - delete rejectedMessageWatchVectorTable; + { + SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); + while(rmit->hasNext()) { + int64_t machineid = rmit->next(); + Hashset * rmset = rejectedMessageWatchVectorTable->get(machineid); + SetIterator * mit = rmset->iterator(); + while (mit->hasNext()) { + RejectedMessage * rm = mit->next(); + delete rm; + } + delete mit; + delete rmset; + } + delete rmit; + delete rejectedMessageWatchVectorTable; + } delete arbitratorTable; delete liveAbortTable; - delete newTransactionParts; - delete newCommitParts; + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + delete newTransactionParts; + } + { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); + delete parts; + } + delete partsit; + delete newCommitParts; + } delete lastArbitratedTransactionNumberByArbitratorTable; delete liveTransactionBySequenceNumberTable; delete liveTransactionByTransactionIdTable; - delete liveCommitsTable; + { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); + + // Get all the commits for a specific arbitrator + Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) { + int64_t id = clientit->next(); + delete commitForClientTable->get(id); + } + delete clientit; + } + + delete commitForClientTable; + } + delete liveit; + delete liveCommitsTable; + } delete liveCommitsByKeyTable; delete lastCommitSeenSequenceNumberByArbitratorTable; delete rejectedSlotVector; @@ -189,6 +242,8 @@ Table::~Table() { delete localCommunicationTable; delete lastTransactionSeenFromMachineFromServer; delete pendingSendArbitrationRounds; + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; } @@ -253,10 +308,13 @@ void Table::initTable() { array->set(0, s); // update local block chain validateAndUpdate(array, true); + delete array; } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); + delete array; } else { + delete array; throw new Error("Error on initialization"); } } @@ -269,6 +327,7 @@ void Table::rebuild() { // Just pull the latest slots from the server Array *newslots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); + delete newslots; sendToServer(NULL); updateLiveTransactionsAndStatus(); } @@ -289,7 +348,7 @@ IoTString *Table::getCommitted(IoTString *key) { KeyValue *kv = committedKeyValueTable->get(key); if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -307,7 +366,7 @@ IoTString *Table::getSpeculative(IoTString *key) { } if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -328,7 +387,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -358,7 +417,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -369,6 +428,7 @@ bool Table::update() { try { Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); + delete newSlots; sendToServer(NULL); updateLiveTransactionsAndStatus(); return true; @@ -404,8 +464,7 @@ void Table::startTransaction() { pendingTransactionBuilder = new PendingTransaction(localMachineId); } -void Table::addKV(IoTString *key, IoTString *value) { - +void Table::put(IoTString *key, IoTString *value) { // Make sure it is a valid key if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); @@ -418,7 +477,7 @@ void Table::addKV(IoTString *key, IoTString *value) { } // Add the key value to this transaction - KeyValue *kv = new KeyValue(key, value); + KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value)); pendingTransactionBuilder->addKV(kv); } @@ -590,7 +649,6 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { // insert into the local block chain validateAndUpdate(newSlots, true); } - // continue; } else { if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { @@ -642,6 +700,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { // insert into the local block chain validateAndUpdate(newSlots, true); } + delete newSlots; return newKey; } @@ -698,6 +757,8 @@ bool Table::sendToServer(NewKey *newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; lastTransactionPartsSent = transactionPartsSent->clone(); lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); @@ -772,6 +833,7 @@ bool Table::sendToServer(NewKey *newKey) { // insert into the local block chain validateAndUpdate(newSlots, true); } + delete newSlots; } } catch (ServerException *e) { if (e->getType() != ServerException_TypeInputTimeout) { @@ -1426,7 +1488,8 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal updateExpectedSize(); } } - + delete indexer; + // If there is a gap, check to see if the server sent us // everything-> if (firstSeqNum != (sequenceNumber + 1)) { @@ -1607,7 +1670,16 @@ void Table::processNewTransactionParts() { delete tpit; // Clear all the new transaction parts in preparation for the next // time the server sends slots - newTransactionParts->clear(); + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + newTransactionParts->clear(); + } } void Table::arbitrateFromServer() { @@ -1718,7 +1790,7 @@ void Table::arbitrateFromServer() { newCommit->addKV(kv); } delete spit; - + // create the commit parts newCommit->createCommitParts(); @@ -1732,6 +1804,7 @@ void Table::arbitrateFromServer() { processEntry(commitPart); } } + delete speculativeTableTmp; if ((newCommit != NULL) || (generatedAborts->size() > 0)) { ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); @@ -1999,6 +2072,7 @@ bool Table::updateCommittedTable() { commit->addPartDecode(part); } delete pairit; + delete parts; } delete partsit; @@ -2052,6 +2126,7 @@ bool Table::updateCommittedTable() { // Delete it and move on commit->setDead(); commitForClientTable->remove(commit->getSequenceNumber()); + delete commit; continue; } } @@ -2129,6 +2204,7 @@ bool Table::updateCommittedTable() { // if the commit is now dead then remove it if (!previousCommit->isLive()) { commitForClientTable->remove(previousCommit->getSequenceNumber()); + delete previousCommit; } } } @@ -2328,6 +2404,7 @@ void Table::updateLiveTransactionsAndStatus() { // Remove the transaction from the live table iter->remove(); liveTransactionByTransactionIdTable->remove(transaction->getId()); + delete transaction; } } delete iter; @@ -2655,6 +2732,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven rmit->remove(); // Decrement machines that need to see this notification rm->removeWatcher(machineId); + delete rm; } } delete rmit; diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index 59ba472..dfe1886 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -267,7 +267,7 @@ public: bool update(); bool createNewKey(IoTString *keyName, int64_t machineId); void startTransaction(); - void addKV(IoTString *key, IoTString *value); + void put(IoTString *key, IoTString *value); TransactionStatus *commitTransaction(); /** diff --git a/version2/src/C/Test.C b/version2/src/C/Test.C index 14fec0d..7b7733c 100644 --- a/version2/src/C/Test.C +++ b/version2/src/C/Test.C @@ -37,6 +37,10 @@ int main(int numargs, char ** args) { t1->createNewKey(ib, 351); t2->createNewKey(ic, 321); t2->createNewKey(id, 351); + delete ia; + delete ib; + delete ic; + delete id; } // Do Updates for the keys @@ -61,20 +65,26 @@ int main(int numargs, char ** args) { IoTString * iValueD = new IoTString(buffer); t1->startTransaction(); - t1->addKV(iKeyA, iValueA); + t1->put(iKeyA, iValueA); transStatusList->add(t1->commitTransaction()); + delete iKeyA; delete iValueA; + t1->startTransaction(); - t1->addKV(iKeyB, iValueB); + t1->put(iKeyB, iValueB); transStatusList->add(t1->commitTransaction()); + delete iKeyB; delete iValueB; t2->startTransaction(); - t2->addKV(iKeyC, iValueC); + t2->put(iKeyC, iValueC); transStatusList->add(t2->commitTransaction()); + delete iKeyC; delete iValueC; t2->startTransaction(); - t2->addKV(iKeyD, iValueD); + t2->put(iKeyD, iValueD); transStatusList->add(t2->commitTransaction()); + delete iKeyD; delete iValueD; } + printf("Updating Clients...\n"); t1->update(); t2->update(); @@ -147,6 +157,14 @@ int main(int numargs, char ** args) { printf("Key-Value t2 incorrect: keyD testValD2\n"); foundError = true; } + delete iKeyA; delete iValueA; + delete iKeyB; delete iValueB; + delete iKeyC; delete iValueC; + delete iKeyD; delete iValueD; + delete testValA1; delete testValA2; + delete testValB1; delete testValB2; + delete testValC1; delete testValC2; + delete testValD1; delete testValD2; } for (uint i = 0; i < transStatusList->size(); i++) { @@ -162,5 +180,9 @@ int main(int numargs, char ** args) { } else { printf("No Errors Found...\n"); } + + delete transStatusList; + delete t1; + delete t2; } diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 1f862df..d316aa1 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -20,6 +20,9 @@ Transaction::Transaction() : arbitratorId(-1), machineId(-1), transactionId(Pair(0,0)), + nextPartToSend(0), + flddidSendAPartToServer(false), + transactionStatus(NULL), hadServerFailure(false) { } @@ -34,7 +37,6 @@ void Transaction::addPartEncode(TransactionPart *newPart) { transactionId = newPart->getTransactionId(); clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); machineId = newPart->getMachineId(); - fldisComplete = true; } -- 2.34.1