X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FCommit.cpp;fp=version2%2Fsrc%2FC%2FCommit.cpp;h=766584175f86a82c00442dd0bb3adf119e1a7420;hb=786e40250f31eff04eec25bbcaae3cd916fedb14;hp=0000000000000000000000000000000000000000;hpb=3f24bffc82ebfe2730308b63100af08645316577;p=iotcloud.git diff --git a/version2/src/C/Commit.cpp b/version2/src/C/Commit.cpp new file mode 100644 index 0000000..7665841 --- /dev/null +++ b/version2/src/C/Commit.cpp @@ -0,0 +1,300 @@ +#include "Commit.h" +#include "CommitPart.h" +#include "ByteBuffer.h" +#include "IoTString.h" + +Commit::Commit() : + parts(new Vector()), + partCount(0), + missingParts(NULL), + fldisComplete(false), + hasLastPart(false), + keyValueUpdateSet(new Hashset()), + isDead(false), + sequenceNumber(-1), + machineId(-1), + transactionSequenceNumber(-1), + dataBytes(NULL), + liveKeys(new Hashset()) { +} + +Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) : + parts(new Vector()), + partCount(0), + missingParts(NULL), + fldisComplete(true), + hasLastPart(false), + keyValueUpdateSet(new Hashset()), + isDead(false), + sequenceNumber(_sequenceNumber), + machineId(_machineId), + transactionSequenceNumber(_transactionSequenceNumber), + dataBytes(NULL), + liveKeys(new Hashset()) { +} + +Commit::~Commit() { + { + uint Size = parts->size(); + for(uint i=0;iget(i)->releaseRef(); + delete parts; + } + { + SetIterator * keyit = keyValueUpdateSet->iterator(); + while(keyit->hasNext()) { + delete keyit->next(); + } + delete keyit; + delete keyValueUpdateSet; + } + delete liveKeys; + if (missingParts != NULL) + delete missingParts; + if (dataBytes != NULL) + delete dataBytes; +} + +void Commit::addPartDecode(CommitPart *newPart) { + if (isDead) { + // If dead then just kill this part and move on + newPart->setDead(); + return; + } + + newPart->acquireRef(); + CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if (previouslySeenPart == NULL) + partCount++; + + if (previouslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previouslySeenPart->setDead(); + previouslySeenPart->releaseRef(); + } else if (newPart->isLastPart()) { + missingParts = new Hashset(); + hasLastPart = true; + + for (int i = 0; i < newPart->getPartNumber(); i++) { + if (parts->get(i) == NULL) { + missingParts->add(i); + } + } + } + + if (!fldisComplete && hasLastPart) { + + // We have seen this part so remove it from the set of missing parts + missingParts->remove(newPart->getPartNumber()); + + // Check if all the parts have been seen + if (missingParts->size() == 0) { + + // We have all the parts + fldisComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeCommitData(); + + // Get the sequence number and arbitrator of this transaction + sequenceNumber = parts->get(0)->getSequenceNumber(); + machineId = parts->get(0)->getMachineId(); + transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber(); + } + } +} + +int64_t Commit::getSequenceNumber() { + return sequenceNumber; +} + +int64_t Commit::getTransactionSequenceNumber() { + return transactionSequenceNumber; +} + +Vector *Commit::getParts() { + return parts; +} + +void Commit::addKV(KeyValue *kv) { + KeyValue * kvcopy = kv->getCopy(); + keyValueUpdateSet->add(kvcopy); + liveKeys->add(kvcopy->getKey()); +} + +void Commit::invalidateKey(IoTString *key) { + liveKeys->remove(key); + + if (liveKeys->size() == 0) { + setDead(); + } +} + +Hashset *Commit::getKeyValueUpdateSet() { + return keyValueUpdateSet; +} + +int32_t Commit::getNumberOfParts() { + return partCount; +} + +void Commit::setDead() { + if (!isDead) { + isDead = true; + // Make all the parts of this transaction dead + for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) { + CommitPart *part = parts->get(partNumber); + part->setDead(); + } + } +} + +void Commit::createCommitParts() { + uint Size = parts->size(); + for(uint i=0;i < Size; i++) { + Entry * e=parts->get(i); + e->releaseRef(); + } + parts->clear(); + partCount = 0; + // Convert to chars + Array *charData = convertDataToBytes(); + + int commitPartCount = 0; + int currentPosition = 0; + int remaining = charData->length(); + + while (remaining > 0) { + bool isLastPart = false; + // determine how much to copy + int copySize = CommitPart_MAX_NON_HEADER_SIZE; + if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) { + copySize = remaining; + isLastPart = true;// last bit of data so last part + } + + // Copy to a smaller version + Array *partData = new Array(copySize); + System_arraycopy(charData, currentPosition, partData, 0, copySize); + + CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); + parts->setExpand(part->getPartNumber(), part); + + // Update position, count and remaining + currentPosition += copySize; + commitPartCount++; + remaining -= copySize; + } + delete charData; +} + +void Commit::decodeCommitData() { + // Calculate the size of the data section + int dataSize = 0; + for (uint i = 0; i < parts->size(); i++) { + CommitPart *tp = parts->get(i); + if (tp != NULL) + dataSize += tp->getDataSize(); + } + + Array *combinedData = new Array(dataSize); + int currentPosition = 0; + + // Stitch all the data sections together + for (uint i = 0; i < parts->size(); i++) { + CommitPart *tp = parts->get(i); + if (tp != NULL) { + System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); + } + } + + // Decoder Object + ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); + + // Decode how many key value pairs need to be decoded + int numberOfKVUpdates = bbDecode->getInt(); + + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueUpdateSet->add(kv); + liveKeys->add(kv->getKey()); + } + delete bbDecode; +} + +Array *Commit::convertDataToBytes() { + // Calculate the size of the data + int sizeOfData = sizeof(int32_t); // Number of Update KV's + SetIterator *kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + sizeOfData += kv->getSize(); + } + delete kvit; + + // Data handlers and storage + Array *dataArray = new Array(sizeOfData); + ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); + + // Encode the size of the updates and guard sets + bbEncode->putInt(keyValueUpdateSet->size()); + + // Encode all the updates + kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + kv->encode(bbEncode); + } + delete kvit; + Array * array = bbEncode->array(); + bbEncode->releaseArray(); + delete bbEncode; + return array; +} + +void Commit::setKVsMap(Hashset *newKVs) { + keyValueUpdateSet->clear(); + liveKeys->clear(); + SetIterator *kvit = newKVs->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + KeyValue *kvcopy = kv->getCopy(); + liveKeys->add(kvcopy->getKey()); + keyValueUpdateSet->add(kvcopy); + } + delete kvit; +} + +Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { + if (older == NULL) { + return newer; + } else if (newer == NULL) { + return older; + } + Hashset *kvSet = new Hashset(); + SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + kvSet->add(kv); + } + delete kvit; + kvit = newer->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + kvSet->add(kv); + } + delete kvit; + + int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber(); + if (transactionSequenceNumber == -1) { + transactionSequenceNumber = older->getTransactionSequenceNumber(); + } + + Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber); + newCommit->setKVsMap(kvSet); + + delete kvSet; + return newCommit; +}