#include "ArbitrationRound.h"
#include "Commit.h"
+#include "CommitPart.h"
ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset<Abort *> *_abortsBefore) :
abortsBefore(_abortsBefore),
ArbitrationRound::~ArbitrationRound() {
delete abortsBefore;
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ Entry * part = parts->get(i);
+ part->releaseRef();
+ }
delete parts;
if (commit != NULL)
delete commit;
if (didGenerateParts) {
return;
}
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ Entry * part = parts->get(i);
+ part->releaseRef();
+ }
parts->clear();
SetIterator<Abort *, Abort *> *abit = abortsBefore->iterator();
while (abit->hasNext())
Vector<CommitPart *> *cParts = commit->getParts();
uint cPartsSize = cParts->size();
for (uint i = 0; i < cPartsSize; i++) {
- parts->add((Entry *)cParts->get(i));
+ CommitPart * part = cParts->get(i);
+ part->acquireRef();
+ parts->add((Entry *)part);
}
}
}
}
void ArbitrationRound::removeParts(Vector<Entry *> *removeParts) {
- parts->removeAll(removeParts);
+ uint size = removeParts->size();
+ for(uint i=0; i < size; i++) {
+ Entry * e = removeParts->get(i);
+ if (parts->remove(e))
+ e->releaseRef();
+ }
didSendPart = true;
}
sequenceNumber(-1),
machineId(-1),
transactionSequenceNumber(-1),
+ dataBytes(NULL),
liveKeys(new Hashset<IoTString *>()) {
}
sequenceNumber(_sequenceNumber),
machineId(_machineId),
transactionSequenceNumber(_transactionSequenceNumber),
+ dataBytes(NULL),
liveKeys(new Hashset<IoTString *>()) {
}
Commit::~Commit() {
- delete parts;
+ {
+ uint Size = parts->size();
+ for(uint i=0;i<Size; i++)
+ parts->get(i)->releaseRef();
+ delete parts;
+ }
{
SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
while(keyit->hasNext()) {
delete liveKeys;
if (missingParts != NULL)
delete missingParts;
+ if (dataBytes != NULL)
+ delete dataBytes;
}
void Commit::addPartDecode(CommitPart *newPart) {
return;
}
+ newPart->acquireRef();
CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
if (previouslySeenPart == NULL)
partCount++;
if (previouslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
previouslySeenPart->setDead();
+ previouslySeenPart->releaseRef();
} else if (newPart->isLastPart()) {
missingParts = new Hashset<int32_t>();
hasLastPart = true;
// Make all the parts of this transaction dead
for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
CommitPart *part = parts->get(partNumber);
- if (parts != NULL)
- part->setDead();
+ part->setDead();
}
}
}
-CommitPart *Commit::getPart(int index) {
- return parts->get(index);
-}
-
void Commit::createCommitParts() {
+ uint Size = parts->size();
+ for(uint i=0;i < Size; i++) {
+ Entry * e=parts->get(i);
+ e->releaseRef();
+ }
parts->clear();
partCount = 0;
// Convert to chars
int64_t machineId;
int64_t transactionSequenceNumber;
Hashset<IoTString *> *liveKeys;
+ Array<char> *dataBytes;
Array<char> *convertDataToBytes();
void setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs);
bool isComplete() { return fldisComplete; }
bool isLive() { return !isDead; }
void setDead();
- CommitPart *getPart(int32_t index);
void createCommitParts();
void decodeCommitData();
friend Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber);
transactionSequenceNumber(_transactionSequenceNumber),
partNumber(_partNumber),
fldisLastPart(_isLastPart),
+ refCount(1),
data(_data),
partId(Pair<int64_t, int32_t>(sequenceNumber, partNumber)),
commitId(Pair<int64_t, int64_t>(machineId, sequenceNumber)) {
int64_t transactionSequenceNumber;
int32_t partNumber; // Parts position in the
bool fldisLastPart;
+ int32_t refCount;
Array<char> *data;
Pair<int64_t, int32_t> partId;
void encode(ByteBuffer *bb);
char getType();
Entry *getCopy(Slot *s);
+ void releaseRef() {if ((--refCount)==0) delete this;}
+ void acquireRef() {refCount++;}
};
Entry *CommitPart_decode(Slot *s, ByteBuffer *bb);
* array.
*/
virtual int getSize() = 0;
-
+ virtual void releaseRef() {delete this;}
+ virtual void acquireRef() {}
/**
* Returns a copy of the Entry that can be added to a different slot.
TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
newTransaction->addPartEncode(part);
-
+ part->releaseRef();
+
// Update position, count and remaining
currentPosition += copySize;
transactionPartCount++;
delete hmac;
delete prevhmac;
for(uint i=0; i< entries->size(); i++)
- delete entries->get(i);
+ entries->get(i)->releaseRef();
delete entries;
if (fakeLastMessage)
delete fakeLastMessage;
while (partsit->hasNext()) {
int64_t machineId = partsit->next();
Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+ SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
+ while(pit->hasNext()) {
+ Pair<int64_t, int32_t> * pair=pit->next();
+ pit->currVal()->releaseRef();
+ }
+ delete pit;
+
delete parts;
}
delete partsit;
while (partsit->hasNext()) {
int64_t machineId = partsit->next();
Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+ SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
+ while(pit->hasNext()) {
+ Pair<int64_t, int32_t> * pair=pit->next();
+ pit->currVal()->releaseRef();
+ }
+ delete pit;
delete parts;
}
delete partsit;
SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
while (tpit->hasNext()) {
int64_t machineId = tpit->next();
- Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+ Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
// Iterate through all the parts for that machine Id
if (lastTransactionNumber >= part->getSequenceNumber()) {
// Set dead the transaction part
part->setDead();
+ part->releaseRef();
continue;
}
}
liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
}
+ part->releaseRef();
}
delete ptit;
}
SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
- newCommit->addKV(kv->getCopy());
+ newCommit->addKV(kv);
}
delete kvit;
SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
while (pairit->hasNext()) {
Pair<int64_t, int32_t> *partId = pairit->next();
- CommitPart *part = parts->get(partId);
+ CommitPart *part = pairit->currVal();
// Get the transaction object for that sequence number
Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
// Add that part to the commit
commit->addPartDecode(part);
+ part->releaseRef();
}
delete pairit;
delete parts;
// Update the part and set dead ones we have already seen (got a
// rescued version)
+ entry->acquireRef();
TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
if (previouslySeenPart != NULL) {
+ previouslySeenPart->releaseRef();
previouslySeenPart->setDead();
}
}
}
// Update the part and set dead ones we have already seen (got a
// rescued version)
+ entry->acquireRef();
CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
if (previouslySeenPart != NULL) {
previouslySeenPart->setDead();
+ previouslySeenPart->releaseRef();
}
}
if (missingParts)
delete missingParts;
{
+ uint Size = parts->size();
+ for(uint i=0; i<Size; i++)
+ parts->get(i)->releaseRef();
delete parts;
}
{
}
void Transaction::addPartEncode(TransactionPart *newPart) {
+ newPart->acquireRef();
+ printf("Add part %d\n", newPart->getPartNumber());
TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
if (old == NULL) {
partCount++;
- } else
- delete old;
+ } else {
+ old->releaseRef();
+ }
partsPendingSend->add(newPart->getPartNumber());
sequenceNumber = newPart->getSequenceNumber();
newPart->setDead();
return;
}
-
+ newPart->acquireRef();
sequenceNumber = newPart->getSequenceNumber();
arbitratorId = newPart->getArbitratorId();
transactionId = newPart->getTransactionId();
if (previouslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
+ previouslySeenPart->releaseRef();
previouslySeenPart->setDead();
} else if (newPart->isLastPart()) {
missingParts = new Hashset<int32_t>();
}
}
-TransactionPart *Transaction::getPart(int index) {
- return parts->get(index);
-}
-
void Transaction::decodeTransactionData() {
// Calculate the size of the data section
int dataSize = 0;
bool isComplete();
Pair<int64_t, int64_t> *getId();
void setDead();
- TransactionPart *getPart(int32_t index);
bool evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable);
};
#endif
int64_t clientLocalSequenceNumber; // Sequence number of the transaction that this is a part of
int32_t partNumber; // Parts position in the
bool fldisLastPart;
-
+ int32_t refCount;
Pair<int64_t, int64_t> transactionId;
Pair<int64_t, int32_t> partId;
Array<char> *data;
-
+
public:
TransactionPart(Slot *s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, Array<char> *_data, bool _isLastPart) : Entry(s),
sequenceNumber(-1),
clientLocalSequenceNumber(_clientLocalSequenceNumber),
partNumber(_partNumber),
fldisLastPart(_isLastPart),
+ refCount(1),
transactionId(Pair<int64_t, int64_t>(machineId, clientLocalSequenceNumber)),
partId(Pair<int64_t, int32_t>(clientLocalSequenceNumber, partNumber)),
data(_data) {
void setSequenceNumber(int64_t _sequenceNumber);
void encode(ByteBuffer *bb);
char getType();
+ void releaseRef() {if ((--refCount)==0) delete this;}
+ void acquireRef() {refCount++;}
Entry *getCopy(Slot *s);
};
fldsize--;
}
- void remove(type t) {
+ bool remove(type t) {
for (uint i = 0; i < fldsize; i++) {
if (array[i] == t) {
for (i++; i < fldsize; i++) {
array[i - 1] = array[i];
}
fldsize--;
- break;
+ return true;
}
}
+ return false;
}
void removeIndex(uint i) {