+++ /dev/null
-#include "Transaction.h"
-#include "TransactionPart.h"
-#include "KeyValue.h"
-#include "ByteBuffer.h"
-#include "IoTString.h"
-#include "TransactionStatus.h"
-
-Transaction::Transaction() :
- parts(new Vector<TransactionPart *>()),
- partCount(0),
- missingParts(NULL),
- partsPendingSend(new Vector<int32_t>()),
- fldisComplete(false),
- hasLastPart(false),
- keyValueGuardSet(new Hashset<KeyValue *>()),
- keyValueUpdateSet(new Hashset<KeyValue *>()),
- isDead(false),
- sequenceNumber(-1),
- clientLocalSequenceNumber(-1),
- arbitratorId(-1),
- machineId(-1),
- transactionId(Pair<int64_t, int64_t>(0,0)),
- nextPartToSend(0),
- flddidSendAPartToServer(false),
- transactionStatus(NULL),
- hadServerFailure(false) {
-}
-
-Transaction::~Transaction() {
- if (missingParts)
- delete missingParts;
- {
- uint Size = parts->size();
- for(uint i=0; i<Size; i++)
- parts->get(i)->releaseRef();
- delete parts;
- }
- {
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kvGuard = kvit->next();
- delete kvGuard;
- }
- delete kvit;
- delete keyValueGuardSet;
- }
- {
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kvUpdate = kvit->next();
- delete kvUpdate;
- }
- delete kvit;
- delete keyValueUpdateSet;
- }
- delete partsPendingSend;
-}
-
-void Transaction::addPartEncode(TransactionPart *newPart) {
- newPart->acquireRef();
- printf("Add part %d\n", newPart->getPartNumber());
- TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
- if (old == NULL) {
- partCount++;
- } else {
- old->releaseRef();
- }
- partsPendingSend->add(newPart->getPartNumber());
-
- sequenceNumber = newPart->getSequenceNumber();
- arbitratorId = newPart->getArbitratorId();
- transactionId = newPart->getTransactionId();
- clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
- machineId = newPart->getMachineId();
- fldisComplete = true;
-}
-
-void Transaction::addPartDecode(TransactionPart *newPart) {
- if (isDead) {
- // If dead then just kill this part and move on
- newPart->setDead();
- return;
- }
- newPart->acquireRef();
- sequenceNumber = newPart->getSequenceNumber();
- arbitratorId = newPart->getArbitratorId();
- transactionId = newPart->getTransactionId();
- clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
- machineId = newPart->getMachineId();
-
- TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
- if (previouslySeenPart == NULL)
- partCount++;
-
- if (previouslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previouslySeenPart->releaseRef();
- previouslySeenPart->setDead();
- } else if (newPart->isLastPart()) {
- missingParts = new Hashset<int32_t>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart->getPartNumber(); i++) {
- if (parts->get(i) == NULL) {
- missingParts->add(i);
- }
- }
- }
-
- if (!fldisComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts->remove(newPart->getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts->size() == 0) {
-
- // We have all the parts
- fldisComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeTransactionData();
- }
- }
-}
-
-void Transaction::addUpdateKV(KeyValue *kv) {
- keyValueUpdateSet->add(kv);
-}
-
-void Transaction::addGuardKV(KeyValue *kv) {
- keyValueGuardSet->add(kv);
-}
-
-
-int64_t Transaction::getSequenceNumber() {
- return sequenceNumber;
-}
-
-void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
- sequenceNumber = _sequenceNumber;
-
- for (uint32_t i = 0; i < parts->size(); i++) {
- TransactionPart *tp = parts->get(i);
- if (tp != NULL)
- tp->setSequenceNumber(sequenceNumber);
- }
-}
-
-int64_t Transaction::getClientLocalSequenceNumber() {
- return clientLocalSequenceNumber;
-}
-
-Vector<TransactionPart *> *Transaction::getParts() {
- return parts;
-}
-
-bool Transaction::didSendAPartToServer() {
- return flddidSendAPartToServer;
-}
-
-void Transaction::resetNextPartToSend() {
- nextPartToSend = 0;
-}
-
-TransactionPart *Transaction::getNextPartToSend() {
- if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
- return NULL;
- }
- TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
- nextPartToSend++;
- return part;
-}
-
-
-void Transaction::setServerFailure() {
- hadServerFailure = true;
-}
-
-bool Transaction::getServerFailure() {
- return hadServerFailure;
-}
-
-
-void Transaction::resetServerFailure() {
- hadServerFailure = false;
-}
-
-
-void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
- transactionStatus = _transactionStatus;
-}
-
-TransactionStatus *Transaction::getTransactionStatus() {
- return transactionStatus;
-}
-
-void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
- nextPartToSend = 0;
- bool changed = false;
- uint lastusedindex = 0;
- for (uint i = 0; i < partsPendingSend->size(); i++) {
- int32_t parti = partsPendingSend->get(i);
- for (uint j = 0; j < sentParts->size(); j++) {
- int32_t partj = sentParts->get(j);
- if (parti == partj) {
- changed = true;
- goto NextElement;
- }
- }
- partsPendingSend->set(lastusedindex++, parti);
-NextElement:
- ;
- }
- if (changed) {
- partsPendingSend->setSize(lastusedindex);
- flddidSendAPartToServer = true;
- transactionStatus->setTransactionSequenceNumber(sequenceNumber);
- }
-}
-
-bool Transaction::didSendAllParts() {
- return partsPendingSend->isEmpty();
-}
-
-Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
- return keyValueUpdateSet;
-}
-
-int Transaction::getNumberOfParts() {
- return partCount;
-}
-
-int64_t Transaction::getMachineId() {
- return machineId;
-}
-
-int64_t Transaction::getArbitrator() {
- return arbitratorId;
-}
-
-bool Transaction::isComplete() {
- return fldisComplete;
-}
-
-Pair<int64_t, int64_t> *Transaction::getId() {
- return &transactionId;
-}
-
-void Transaction::setDead() {
- if (!isDead) {
- // Set dead
- isDead = true;
- // Make all the parts of this transaction dead
- for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
- TransactionPart *part = parts->get(partNumber);
- if (part != NULL)
- part->setDead();
- }
- }
-}
-
-void Transaction::decodeTransactionData() {
- // Calculate the size of the data section
- int dataSize = 0;
- for (uint i = 0; i < parts->size(); i++) {
- TransactionPart *tp = parts->get(i);
- dataSize += tp->getDataSize();
- }
-
- Array<char> *combinedData = new Array<char>(dataSize);
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (uint i = 0; i < parts->size(); i++) {
- TransactionPart *tp = parts->get(i);
- System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
- currentPosition += tp->getDataSize();
- }
-
- // Decoder Object
- ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVGuards = bbDecode->getInt();
- int numberOfKVUpdates = bbDecode->getInt();
-
- // Decode all the guard key values
- for (int i = 0; i < numberOfKVGuards; i++) {
- KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
- keyValueGuardSet->add(kv);
- }
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
- keyValueUpdateSet->add(kv);
- }
- delete bbDecode;
-}
-
-bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kvGuard = kvit->next();
- // First check if the key is in the speculative table, this is the value of the latest assumption
- KeyValue *kv = NULL;
-
- // If we have a speculation table then use it first
- if (pendingTransactionSpeculatedKeyValueTable != NULL) {
- kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
- }
-
- // If we have a speculation table then use it first
- if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
- kv = speculatedKeyValueTable->get(kvGuard->getKey());
- }
-
- if (kv == NULL) {
- // if it is not in the speculative table then check the committed table and use that
- // value as our latest assumption
- kv = committedKeyValueTable->get(kvGuard->getKey());
- }
-
- if (kvGuard->getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
-
-
- if (kv != NULL) {
- printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
- } else {
- printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());
- }
- delete kvit;
- return false;
- }
- } else {
- if (kv != NULL) {
- delete kvit;
- return false;
- }
- }
- }
- delete kvit;
- return true;
-}
-