X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTransaction.cpp;fp=version2%2Fsrc%2FC%2FTransaction.cpp;h=b0d3c5bd50d4f0ccdcb69c320c4809907cd4ac42;hb=786e40250f31eff04eec25bbcaae3cd916fedb14;hp=0000000000000000000000000000000000000000;hpb=3f24bffc82ebfe2730308b63100af08645316577;p=iotcloud.git diff --git a/version2/src/C/Transaction.cpp b/version2/src/C/Transaction.cpp new file mode 100644 index 0000000..b0d3c5b --- /dev/null +++ b/version2/src/C/Transaction.cpp @@ -0,0 +1,347 @@ +#include "Transaction.h" +#include "TransactionPart.h" +#include "KeyValue.h" +#include "ByteBuffer.h" +#include "IoTString.h" +#include "TransactionStatus.h" + +Transaction::Transaction() : + parts(new Vector()), + partCount(0), + missingParts(NULL), + partsPendingSend(new Vector()), + fldisComplete(false), + hasLastPart(false), + keyValueGuardSet(new Hashset()), + keyValueUpdateSet(new Hashset()), + isDead(false), + sequenceNumber(-1), + clientLocalSequenceNumber(-1), + arbitratorId(-1), + machineId(-1), + transactionId(Pair(0,0)), + nextPartToSend(0), + flddidSendAPartToServer(false), + transactionStatus(NULL), + hadServerFailure(false) { +} + +Transaction::~Transaction() { + if (missingParts) + delete missingParts; + { + uint Size = parts->size(); + for(uint i=0; iget(i)->releaseRef(); + delete parts; + } + { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); + delete kvGuard; + } + delete kvit; + delete keyValueGuardSet; + } + { + SetIterator *kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvUpdate = kvit->next(); + delete kvUpdate; + } + delete kvit; + delete keyValueUpdateSet; + } + delete partsPendingSend; +} + +void Transaction::addPartEncode(TransactionPart *newPart) { + newPart->acquireRef(); + printf("Add part %d\n", newPart->getPartNumber()); + TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); + if (old == NULL) { + partCount++; + } else { + old->releaseRef(); + } + partsPendingSend->add(newPart->getPartNumber()); + + sequenceNumber = newPart->getSequenceNumber(); + arbitratorId = newPart->getArbitratorId(); + transactionId = newPart->getTransactionId(); + clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); + machineId = newPart->getMachineId(); + fldisComplete = true; +} + +void Transaction::addPartDecode(TransactionPart *newPart) { + if (isDead) { + // If dead then just kill this part and move on + newPart->setDead(); + return; + } + newPart->acquireRef(); + sequenceNumber = newPart->getSequenceNumber(); + arbitratorId = newPart->getArbitratorId(); + transactionId = newPart->getTransactionId(); + clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); + machineId = newPart->getMachineId(); + + TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if (previouslySeenPart == NULL) + partCount++; + + if (previouslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previouslySeenPart->releaseRef(); + previouslySeenPart->setDead(); + } else if (newPart->isLastPart()) { + missingParts = new Hashset(); + hasLastPart = true; + + for (int i = 0; i < newPart->getPartNumber(); i++) { + if (parts->get(i) == NULL) { + missingParts->add(i); + } + } + } + + if (!fldisComplete && hasLastPart) { + + // We have seen this part so remove it from the set of missing parts + missingParts->remove(newPart->getPartNumber()); + + // Check if all the parts have been seen + if (missingParts->size() == 0) { + + // We have all the parts + fldisComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeTransactionData(); + } + } +} + +void Transaction::addUpdateKV(KeyValue *kv) { + keyValueUpdateSet->add(kv); +} + +void Transaction::addGuardKV(KeyValue *kv) { + keyValueGuardSet->add(kv); +} + + +int64_t Transaction::getSequenceNumber() { + return sequenceNumber; +} + +void Transaction::setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; + + for (uint32_t i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + if (tp != NULL) + tp->setSequenceNumber(sequenceNumber); + } +} + +int64_t Transaction::getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; +} + +Vector *Transaction::getParts() { + return parts; +} + +bool Transaction::didSendAPartToServer() { + return flddidSendAPartToServer; +} + +void Transaction::resetNextPartToSend() { + nextPartToSend = 0; +} + +TransactionPart *Transaction::getNextPartToSend() { + if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) { + return NULL; + } + TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend)); + nextPartToSend++; + return part; +} + + +void Transaction::setServerFailure() { + hadServerFailure = true; +} + +bool Transaction::getServerFailure() { + return hadServerFailure; +} + + +void Transaction::resetServerFailure() { + hadServerFailure = false; +} + + +void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) { + transactionStatus = _transactionStatus; +} + +TransactionStatus *Transaction::getTransactionStatus() { + return transactionStatus; +} + +void Transaction::removeSentParts(Vector *sentParts) { + nextPartToSend = 0; + bool changed = false; + uint lastusedindex = 0; + for (uint i = 0; i < partsPendingSend->size(); i++) { + int32_t parti = partsPendingSend->get(i); + for (uint j = 0; j < sentParts->size(); j++) { + int32_t partj = sentParts->get(j); + if (parti == partj) { + changed = true; + goto NextElement; + } + } + partsPendingSend->set(lastusedindex++, parti); +NextElement: + ; + } + if (changed) { + partsPendingSend->setSize(lastusedindex); + flddidSendAPartToServer = true; + transactionStatus->setTransactionSequenceNumber(sequenceNumber); + } +} + +bool Transaction::didSendAllParts() { + return partsPendingSend->isEmpty(); +} + +Hashset *Transaction::getKeyValueUpdateSet() { + return keyValueUpdateSet; +} + +int Transaction::getNumberOfParts() { + return partCount; +} + +int64_t Transaction::getMachineId() { + return machineId; +} + +int64_t Transaction::getArbitrator() { + return arbitratorId; +} + +bool Transaction::isComplete() { + return fldisComplete; +} + +Pair *Transaction::getId() { + return &transactionId; +} + +void Transaction::setDead() { + if (!isDead) { + // Set dead + isDead = true; + // Make all the parts of this transaction dead + for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) { + TransactionPart *part = parts->get(partNumber); + if (part != NULL) + part->setDead(); + } + } +} + +void Transaction::decodeTransactionData() { + // Calculate the size of the data section + int dataSize = 0; + for (uint i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + dataSize += tp->getDataSize(); + } + + Array *combinedData = new Array(dataSize); + int currentPosition = 0; + + // Stitch all the data sections together + for (uint i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); + } + + // Decoder Object + ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); + + // Decode how many key value pairs need to be decoded + int numberOfKVGuards = bbDecode->getInt(); + int numberOfKVUpdates = bbDecode->getInt(); + + // Decode all the guard key values + for (int i = 0; i < numberOfKVGuards; i++) { + KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueGuardSet->add(kv); + } + + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueUpdateSet->add(kv); + } + delete bbDecode; +} + +bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); + // First check if the key is in the speculative table, this is the value of the latest assumption + KeyValue *kv = NULL; + + // If we have a speculation table then use it first + if (pendingTransactionSpeculatedKeyValueTable != NULL) { + kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey()); + } + + // If we have a speculation table then use it first + if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { + kv = speculatedKeyValueTable->get(kvGuard->getKey()); + } + + if (kv == NULL) { + // if it is not in the speculative table then check the committed table and use that + // value as our latest assumption + kv = committedKeyValueTable->get(kvGuard->getKey()); + } + + if (kvGuard->getValue() != NULL) { + if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) { + + + if (kv != NULL) { + printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray()); + } else { + printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray()); + } + delete kvit; + return false; + } + } else { + if (kv != NULL) { + delete kvit; + return false; + } + } + } + delete kvit; + return true; +} +