X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;fp=version2%2Fsrc%2FC%2FTable.cc;h=1a33f76f1f1bc98f7e8c267e90e4ddc515655bc0;hb=0173578905303681df8ea5f3c35b3ead109c8ba8;hp=531337cc5336f1c69dac6aed68018b2899355c98;hpb=b7ed1849727b50e226f3b9d1c432d3071d739368;p=iotcloud.git diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 531337c..1a33f76 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -60,7 +60,6 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i lastIsNewKey(false), lastNewSize(0), lastTransactionPartsSent(NULL), - lastPendingSendArbitrationEntriesToDelete(NULL), lastNewKey(NULL), committedKeyValueTable(NULL), speculatedKeyValueTable(NULL), @@ -123,7 +122,6 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : lastIsNewKey(false), lastNewSize(0), lastTransactionPartsSent(NULL), - lastPendingSendArbitrationEntriesToDelete(NULL), lastNewKey(NULL), committedKeyValueTable(NULL), speculatedKeyValueTable(NULL), @@ -171,6 +169,7 @@ Table::~Table() { SetIterator *> *lmit = getKeyIterator(lastMessageTable); while (lmit->hasNext()) { Pair * pair = lastMessageTable->get(lmit->next()); + delete pair; } delete lmit; delete lastMessageTable; @@ -199,7 +198,7 @@ Table::~Table() { SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); while (partsit->hasNext()) { int64_t machineId = partsit->next(); - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); delete parts; } delete partsit; @@ -209,7 +208,7 @@ Table::~Table() { SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); while (partsit->hasNext()) { int64_t machineId = partsit->next(); - Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); delete parts; } delete partsit; @@ -224,7 +223,7 @@ Table::~Table() { int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator - Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); + Hashtable *commitForClientTable = liveit->currVal(); { SetIterator *clientit = getKeyIterator(commitForClientTable); while (clientit->hasNext()) { @@ -250,7 +249,15 @@ Table::~Table() { delete pendingTransactionQueue; } delete pendingSendArbitrationEntriesToDelete; - delete transactionPartsSent; + { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + delete transactionPartsSent; + } delete outstandingTransactionStatus; delete liveAbortsGeneratedByLocal; delete offlineTransactionsCommittedAndAtServer; @@ -265,6 +272,8 @@ Table::~Table() { if (lastTransactionPartsSent != NULL) delete lastTransactionPartsSent; delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; + if (lastNewKey) + delete lastNewKey; } /** @@ -320,7 +329,7 @@ void Table::initTable() { Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber); localSequenceNumber++; TableStatus *status = new TableStatus(s, numberOfSlots); - s->addEntry(status); + s->addShallowEntry(status); Array *array = cloud->putSlot(s, numberOfSlots); if (array == NULL) { @@ -332,8 +341,10 @@ void Table::initTable() { } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); + delete s; delete array; } else { + delete s; delete array; throw new Error("Error on initialization"); } @@ -585,6 +596,36 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } +void Table::processTransactionList(bool handlePartial) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + delete transaction; + } else if (handlePartial) { + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + } + delete trit; +} + NewKey * Table::handlePartialSend(NewKey * newKey) { //Didn't receive acknowledgement for last send //See if the server has received a newer slot @@ -596,67 +637,23 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully - // sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; + processTransactionList(false); } else { if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; + processTransactionList(true); } } @@ -679,37 +676,12 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; + + processTransactionList(true); } else { SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { @@ -730,6 +702,18 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { return newKey; } +void Table::clearSentParts() { + // Clear the sent data since we are trying again + pendingSendArbitrationEntriesToDelete->clear(); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + transactionPartsSent->clear(); +} + bool Table::sendToServer(NewKey *newKey) { if (hadPartialSendToServer) { newKey = handlePartialSend(newKey); @@ -744,6 +728,7 @@ bool Table::sendToServer(NewKey *newKey) { // If there is a new key with same name then end if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { + delete newKey; return false; } @@ -771,28 +756,31 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; // Clear the sent data since we are trying again - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); - + clearSentParts(); + // We needed a resize so try again fillSlot(slot, true, newKey, newSize, insertedNewKey); } - + if (lastSlotAttemptedToSend != NULL) + delete lastSlotAttemptedToSend; + lastSlotAttemptedToSend = slot; lastIsNewKey = (newKey != NULL); lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; + if (( newKey != lastNewKey) && (lastNewKey != NULL)) + delete lastNewKey; lastNewKey = newKey; if (lastTransactionPartsSent != NULL) delete lastTransactionPartsSent; lastTransactionPartsSent = transactionPartsSent->clone(); - lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); Array * newSlots = NULL; bool wasInserted = false; bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; // Did insert into the block chain if (insertedNewKey) { // This slot was what was inserted not a previous slot @@ -815,28 +803,7 @@ bool Table::sendToServer(NewKey *newKey) { delete pendingSendArbitrationRounds->get(i); } pendingSendArbitrationRounds->setSize(oldcount); - - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(transactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; + processTransactionList(false); } else { // Reset which transaction to send SetIterator *> *trit = getKeyIterator(transactionPartsSent); @@ -853,8 +820,7 @@ bool Table::sendToServer(NewKey *newKey) { } // Clear the sent data in preparation for next send - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); + clearSentParts(); if (newSlots->length() != 0) { // insert into the local block chain @@ -890,8 +856,7 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; } - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); + clearSentParts(); throw e; } @@ -1254,14 +1219,14 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize if (resize) { newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE); TableStatus *status = new TableStatus(slot, newSize); - slot->addEntry(status); + slot->addShallowEntry(status); } // Fill with rejected slots first before doing anything else doRejectedMessages(slot); // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); + ThreeTuple mandatoryRescueReturn = doMandatoryRescue(slot, resize); // Extract working variables bool needsResize = mandatoryRescueReturn.getFirst(); @@ -1283,8 +1248,7 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize } // Clear the transactions, aborts and commits that were sent previously - transactionPartsSent->clear(); - pendingSendArbitrationEntriesToDelete->clear(); + clearSentParts(); uint size = pendingSendArbitrationRounds->size(); for (uint i = 0; i < size; i++) { ArbitrationRound *round = pendingSendArbitrationRounds->get(i); @@ -1363,7 +1327,7 @@ void Table::doRejectedMessages(Slot *s) { if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); - s->addEntry(rm); + s->addShallowEntry(rm); } else { int64_t prev_seqn = -1; uint i = 0; @@ -1378,7 +1342,7 @@ void Table::doRejectedMessages(Slot *s) { /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); - s->addEntry(rm); + s->addShallowEntry(rm); } /* Generate rejected message entries for present messages */ for (; i < rejectedSlotVector->size(); i++) { @@ -1386,13 +1350,13 @@ void Table::doRejectedMessages(Slot *s) { Slot *s_msg = buffer->getSlot(curr_seqn); int64_t machineid = s_msg->getMachineID(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); - s->addEntry(rm); + s->addShallowEntry(rm); } } } } -ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize) { +ThreeTuple Table::doMandatoryRescue(Slot *slot, bool resize) { int64_t newestSequenceNumber = buffer->getNewestSeqNum(); int64_t oldestSequenceNumber = buffer->getOldestSeqNum(); if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { @@ -1802,6 +1766,8 @@ void Table::arbitrateFromServer() { lastSeqNumArbOn = transactionSequenceNumber; } + delete transactionSequenceNumbers; + Commit *newCommit = NULL; // If there is something to commit @@ -1849,6 +1815,8 @@ void Table::arbitrateFromServer() { } } } + } else { + delete generatedAborts; } } @@ -1886,7 +1854,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - newCommit->addKV(kv); + newCommit->addKV(kv->getCopy()); } delete kvit; @@ -2267,6 +2235,7 @@ bool Table::updateCommittedTable() { delete kvit; } } + delete commitSequenceNumbers; } delete liveit; @@ -2323,6 +2292,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds + delete transactionSequenceNumbersSorted; return false; // did not speculate } @@ -2361,6 +2331,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } } + delete transactionSequenceNumbersSorted; + if (didSkip) { // Since there was a skip we need to redo the speculation next time around lastTransactionSequenceNumberSpeculatedOn = -1; @@ -2708,7 +2680,7 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); + TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); }