lastIsNewKey(false),
lastNewSize(0),
lastTransactionPartsSent(NULL),
- lastPendingSendArbitrationEntriesToDelete(NULL),
lastNewKey(NULL),
committedKeyValueTable(NULL),
speculatedKeyValueTable(NULL),
lastIsNewKey(false),
lastNewSize(0),
lastTransactionPartsSent(NULL),
- lastPendingSendArbitrationEntriesToDelete(NULL),
lastNewKey(NULL),
committedKeyValueTable(NULL),
speculatedKeyValueTable(NULL),
SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
while (lmit->hasNext()) {
Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+ delete pair;
}
delete lmit;
delete lastMessageTable;
SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
while (partsit->hasNext()) {
int64_t machineId = partsit->next();
- Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+ Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
delete parts;
}
delete partsit;
SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
while (partsit->hasNext()) {
int64_t machineId = partsit->next();
- Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
+ Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
delete parts;
}
delete partsit;
int64_t arbitratorId = liveit->next();
// Get all the commits for a specific arbitrator
- Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
+ Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
{
SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
while (clientit->hasNext()) {
delete pendingTransactionQueue;
}
delete pendingSendArbitrationEntriesToDelete;
- delete transactionPartsSent;
+ {
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ delete trit->currVal();
+ }
+ delete trit;
+ delete transactionPartsSent;
+ }
delete outstandingTransactionStatus;
delete liveAbortsGeneratedByLocal;
delete offlineTransactionsCommittedAndAtServer;
if (lastTransactionPartsSent != NULL)
delete lastTransactionPartsSent;
delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+ if (lastNewKey)
+ delete lastNewKey;
}
/**
Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
localSequenceNumber++;
TableStatus *status = new TableStatus(s, numberOfSlots);
- s->addEntry(status);
+ s->addShallowEntry(status);
Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
if (array == NULL) {
} else if (array->length() == 1) {
// in case we did push the slot BUT we failed to init it
validateAndUpdate(array, true);
+ delete s;
delete array;
} else {
+ delete s;
delete array;
throw new Error("Error on initialization");
}
return localSequenceNumber;
}
+void Table::processTransactionList(bool handlePartial) {
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Update which transactions parts still need to be sent
+ transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+
+ // Update the transaction status
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+
+ // Check if all the transaction parts were successfully
+ // sent and if so then remove it from pending
+ if (transaction->didSendAllParts()) {
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+ pendingTransactionQueue->remove(transaction);
+ delete transaction;
+ } else if (handlePartial) {
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ }
+ delete trit;
+}
+
NewKey * Table::handlePartialSend(NewKey * newKey) {
//Didn't receive acknowledgement for last send
//See if the server has received a newer slot
bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
if (sendSlotsReturn) {
+ lastSlotAttemptedToSend = NULL;
if (newKey != NULL) {
if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ delete newKey;
newKey = NULL;
}
}
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully
- // sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- }
- }
- delete trit;
+ processTransactionList(false);
} else {
if (checkSend(newSlots, lastSlotAttemptedToSend)) {
if (newKey != NULL) {
if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ delete newKey;
newKey = NULL;
}
}
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- } else {
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- }
- delete trit;
+ processTransactionList(true);
}
}
if (checkSend(newSlots, lastSlotAttemptedToSend)) {
if (newKey != NULL) {
if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ delete newKey;
newKey = NULL;
}
}
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- } else {
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- }
- delete trit;
+
+ processTransactionList(true);
} else {
SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
while (trit->hasNext()) {
return newKey;
}
+void Table::clearSentParts() {
+ // Clear the sent data since we are trying again
+ pendingSendArbitrationEntriesToDelete->clear();
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ delete trit->currVal();
+ }
+ delete trit;
+ transactionPartsSent->clear();
+}
+
bool Table::sendToServer(NewKey *newKey) {
if (hadPartialSendToServer) {
newKey = handlePartialSend(newKey);
// If there is a new key with same name then end
if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
+ delete newKey;
return false;
}
delete trit;
// Clear the sent data since we are trying again
- pendingSendArbitrationEntriesToDelete->clear();
- transactionPartsSent->clear();
-
+ clearSentParts();
+
// We needed a resize so try again
fillSlot(slot, true, newKey, newSize, insertedNewKey);
}
-
+ if (lastSlotAttemptedToSend != NULL)
+ delete lastSlotAttemptedToSend;
+
lastSlotAttemptedToSend = slot;
lastIsNewKey = (newKey != NULL);
lastInsertedNewKey = insertedNewKey;
lastNewSize = newSize;
+ if (( newKey != lastNewKey) && (lastNewKey != NULL))
+ delete lastNewKey;
lastNewKey = newKey;
if (lastTransactionPartsSent != NULL)
delete lastTransactionPartsSent;
lastTransactionPartsSent = transactionPartsSent->clone();
- lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
Array<Slot *> * newSlots = NULL;
bool wasInserted = false;
bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
if (sendSlotsReturn) {
+ lastSlotAttemptedToSend = NULL;
// Did insert into the block chain
if (insertedNewKey) {
// This slot was what was inserted not a previous slot
delete pendingSendArbitrationRounds->get(i);
}
pendingSendArbitrationRounds->setSize(oldcount);
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(transactionPartsSent->get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- }
- }
- delete trit;
+ processTransactionList(false);
} else {
// Reset which transaction to send
SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
}
// Clear the sent data in preparation for next send
- pendingSendArbitrationEntriesToDelete->clear();
- transactionPartsSent->clear();
+ clearSentParts();
if (newSlots->length() != 0) {
// insert into the local block chain
delete trit;
}
- pendingSendArbitrationEntriesToDelete->clear();
- transactionPartsSent->clear();
+ clearSentParts();
throw e;
}
if (resize) {
newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
TableStatus *status = new TableStatus(slot, newSize);
- slot->addEntry(status);
+ slot->addShallowEntry(status);
}
// Fill with rejected slots first before doing anything else
doRejectedMessages(slot);
// Do mandatory rescue of entries
- ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+ ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
// Extract working variables
bool needsResize = mandatoryRescueReturn.getFirst();
}
// Clear the transactions, aborts and commits that were sent previously
- transactionPartsSent->clear();
- pendingSendArbitrationEntriesToDelete->clear();
+ clearSentParts();
uint size = pendingSendArbitrationRounds->size();
for (uint i = 0; i < size; i++) {
ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
int64_t new_seqn = rejectedSlotVector->lastElement();
RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
- s->addEntry(rm);
+ s->addShallowEntry(rm);
} else {
int64_t prev_seqn = -1;
uint i = 0;
/* Generate rejected message entry for missing messages */
if (prev_seqn != -1) {
RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
- s->addEntry(rm);
+ s->addShallowEntry(rm);
}
/* Generate rejected message entries for present messages */
for (; i < rejectedSlotVector->size(); i++) {
Slot *s_msg = buffer->getSlot(curr_seqn);
int64_t machineid = s_msg->getMachineID();
RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
- s->addEntry(rm);
+ s->addShallowEntry(rm);
}
}
}
}
-ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
+ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
int64_t newestSequenceNumber = buffer->getNewestSeqNum();
int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
lastSeqNumArbOn = transactionSequenceNumber;
}
+ delete transactionSequenceNumbers;
+
Commit *newCommit = NULL;
// If there is something to commit
}
}
}
+ } else {
+ delete generatedAborts;
}
}
SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
- newCommit->addKV(kv);
+ newCommit->addKV(kv->getCopy());
}
delete kvit;
delete kvit;
}
}
+ delete commitSequenceNumbers;
}
delete liveit;
if (startIndex >= transactionSequenceNumbersSorted->size()) {
// Make sure we are not out of bounds
+ delete transactionSequenceNumbersSorted;
return false; // did not speculate
}
}
}
+ delete transactionSequenceNumbersSorted;
+
if (didSkip) {
// Since there was a skip we need to redo the speculation next time around
lastTransactionSequenceNumberSpeculatedOn = -1;
// Update the part and set dead ones we have already seen (got a
// rescued version)
- TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+ TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
if (previouslySeenPart != NULL) {
previouslySeenPart->setDead();
}