X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FCommit.cc;fp=version2%2Fsrc%2FC%2FCommit.cc;h=0000000000000000000000000000000000000000;hb=786e40250f31eff04eec25bbcaae3cd916fedb14;hp=766584175f86a82c00442dd0bb3adf119e1a7420;hpb=3f24bffc82ebfe2730308b63100af08645316577;p=iotcloud.git diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc deleted file mode 100644 index 7665841..0000000 --- a/version2/src/C/Commit.cc +++ /dev/null @@ -1,300 +0,0 @@ -#include "Commit.h" -#include "CommitPart.h" -#include "ByteBuffer.h" -#include "IoTString.h" - -Commit::Commit() : - parts(new Vector()), - partCount(0), - missingParts(NULL), - fldisComplete(false), - hasLastPart(false), - keyValueUpdateSet(new Hashset()), - isDead(false), - sequenceNumber(-1), - machineId(-1), - transactionSequenceNumber(-1), - dataBytes(NULL), - liveKeys(new Hashset()) { -} - -Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) : - parts(new Vector()), - partCount(0), - missingParts(NULL), - fldisComplete(true), - hasLastPart(false), - keyValueUpdateSet(new Hashset()), - isDead(false), - sequenceNumber(_sequenceNumber), - machineId(_machineId), - transactionSequenceNumber(_transactionSequenceNumber), - dataBytes(NULL), - liveKeys(new Hashset()) { -} - -Commit::~Commit() { - { - uint Size = parts->size(); - for(uint i=0;iget(i)->releaseRef(); - delete parts; - } - { - SetIterator * keyit = keyValueUpdateSet->iterator(); - while(keyit->hasNext()) { - delete keyit->next(); - } - delete keyit; - delete keyValueUpdateSet; - } - delete liveKeys; - if (missingParts != NULL) - delete missingParts; - if (dataBytes != NULL) - delete dataBytes; -} - -void Commit::addPartDecode(CommitPart *newPart) { - if (isDead) { - // If dead then just kill this part and move on - newPart->setDead(); - return; - } - - newPart->acquireRef(); - CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); - if (previouslySeenPart == NULL) - partCount++; - - if (previouslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previouslySeenPart->setDead(); - previouslySeenPart->releaseRef(); - } else if (newPart->isLastPart()) { - missingParts = new Hashset(); - hasLastPart = true; - - for (int i = 0; i < newPart->getPartNumber(); i++) { - if (parts->get(i) == NULL) { - missingParts->add(i); - } - } - } - - if (!fldisComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts->remove(newPart->getPartNumber()); - - // Check if all the parts have been seen - if (missingParts->size() == 0) { - - // We have all the parts - fldisComplete = true; - - // Decode all the parts and create the key value guard and update sets - decodeCommitData(); - - // Get the sequence number and arbitrator of this transaction - sequenceNumber = parts->get(0)->getSequenceNumber(); - machineId = parts->get(0)->getMachineId(); - transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber(); - } - } -} - -int64_t Commit::getSequenceNumber() { - return sequenceNumber; -} - -int64_t Commit::getTransactionSequenceNumber() { - return transactionSequenceNumber; -} - -Vector *Commit::getParts() { - return parts; -} - -void Commit::addKV(KeyValue *kv) { - KeyValue * kvcopy = kv->getCopy(); - keyValueUpdateSet->add(kvcopy); - liveKeys->add(kvcopy->getKey()); -} - -void Commit::invalidateKey(IoTString *key) { - liveKeys->remove(key); - - if (liveKeys->size() == 0) { - setDead(); - } -} - -Hashset *Commit::getKeyValueUpdateSet() { - return keyValueUpdateSet; -} - -int32_t Commit::getNumberOfParts() { - return partCount; -} - -void Commit::setDead() { - if (!isDead) { - isDead = true; - // Make all the parts of this transaction dead - for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) { - CommitPart *part = parts->get(partNumber); - part->setDead(); - } - } -} - -void Commit::createCommitParts() { - uint Size = parts->size(); - for(uint i=0;i < Size; i++) { - Entry * e=parts->get(i); - e->releaseRef(); - } - parts->clear(); - partCount = 0; - // Convert to chars - Array *charData = convertDataToBytes(); - - int commitPartCount = 0; - int currentPosition = 0; - int remaining = charData->length(); - - while (remaining > 0) { - bool isLastPart = false; - // determine how much to copy - int copySize = CommitPart_MAX_NON_HEADER_SIZE; - if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) { - copySize = remaining; - isLastPart = true;// last bit of data so last part - } - - // Copy to a smaller version - Array *partData = new Array(copySize); - System_arraycopy(charData, currentPosition, partData, 0, copySize); - - CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); - parts->setExpand(part->getPartNumber(), part); - - // Update position, count and remaining - currentPosition += copySize; - commitPartCount++; - remaining -= copySize; - } - delete charData; -} - -void Commit::decodeCommitData() { - // Calculate the size of the data section - int dataSize = 0; - for (uint i = 0; i < parts->size(); i++) { - CommitPart *tp = parts->get(i); - if (tp != NULL) - dataSize += tp->getDataSize(); - } - - Array *combinedData = new Array(dataSize); - int currentPosition = 0; - - // Stitch all the data sections together - for (uint i = 0; i < parts->size(); i++) { - CommitPart *tp = parts->get(i); - if (tp != NULL) { - System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); - currentPosition += tp->getDataSize(); - } - } - - // Decoder Object - ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVUpdates = bbDecode->getInt(); - - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); - keyValueUpdateSet->add(kv); - liveKeys->add(kv->getKey()); - } - delete bbDecode; -} - -Array *Commit::convertDataToBytes() { - // Calculate the size of the data - int sizeOfData = sizeof(int32_t); // Number of Update KV's - SetIterator *kvit = keyValueUpdateSet->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - sizeOfData += kv->getSize(); - } - delete kvit; - - // Data handlers and storage - Array *dataArray = new Array(sizeOfData); - ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); - - // Encode the size of the updates and guard sets - bbEncode->putInt(keyValueUpdateSet->size()); - - // Encode all the updates - kvit = keyValueUpdateSet->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - kv->encode(bbEncode); - } - delete kvit; - Array * array = bbEncode->array(); - bbEncode->releaseArray(); - delete bbEncode; - return array; -} - -void Commit::setKVsMap(Hashset *newKVs) { - keyValueUpdateSet->clear(); - liveKeys->clear(); - SetIterator *kvit = newKVs->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - KeyValue *kvcopy = kv->getCopy(); - liveKeys->add(kvcopy->getKey()); - keyValueUpdateSet->add(kvcopy); - } - delete kvit; -} - -Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { - if (older == NULL) { - return newer; - } else if (newer == NULL) { - return older; - } - Hashset *kvSet = new Hashset(); - SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - kvSet->add(kv); - } - delete kvit; - kvit = newer->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - kvSet->add(kv); - } - delete kvit; - - int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber(); - if (transactionSequenceNumber == -1) { - transactionSequenceNumber = older->getTransactionSequenceNumber(); - } - - Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber); - newCommit->setKVsMap(kvSet); - - delete kvSet; - return newCommit; -}