--- /dev/null
+#include "Transaction.h"
+#include "TransactionPart.h"
+#include "KeyValue.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+#include "TransactionStatus.h"
+
+Transaction::Transaction() :
+ parts(new Vector<TransactionPart *>()),
+ partCount(0),
+ missingParts(NULL),
+ partsPendingSend(new Vector<int32_t>()),
+ fldisComplete(false),
+ hasLastPart(false),
+ keyValueGuardSet(new Hashset<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *>()),
+ isDead(false),
+ sequenceNumber(-1),
+ clientLocalSequenceNumber(-1),
+ arbitratorId(-1),
+ machineId(-1),
+ transactionId(Pair<int64_t, int64_t>(0,0)),
+ nextPartToSend(0),
+ flddidSendAPartToServer(false),
+ transactionStatus(NULL),
+ hadServerFailure(false) {
+}
+
+Transaction::~Transaction() {
+ if (missingParts)
+ delete missingParts;
+ {
+ uint Size = parts->size();
+ for(uint i=0; i<Size; i++)
+ parts->get(i)->releaseRef();
+ delete parts;
+ }
+ {
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kvGuard = kvit->next();
+ delete kvGuard;
+ }
+ delete kvit;
+ delete keyValueGuardSet;
+ }
+ {
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kvUpdate = kvit->next();
+ delete kvUpdate;
+ }
+ delete kvit;
+ delete keyValueUpdateSet;
+ }
+ delete partsPendingSend;
+}
+
+void Transaction::addPartEncode(TransactionPart *newPart) {
+ newPart->acquireRef();
+ printf("Add part %d\n", newPart->getPartNumber());
+ TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (old == NULL) {
+ partCount++;
+ } else {
+ old->releaseRef();
+ }
+ partsPendingSend->add(newPart->getPartNumber());
+
+ sequenceNumber = newPart->getSequenceNumber();
+ arbitratorId = newPart->getArbitratorId();
+ transactionId = newPart->getTransactionId();
+ clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+ machineId = newPart->getMachineId();
+ fldisComplete = true;
+}
+
+void Transaction::addPartDecode(TransactionPart *newPart) {
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart->setDead();
+ return;
+ }
+ newPart->acquireRef();
+ sequenceNumber = newPart->getSequenceNumber();
+ arbitratorId = newPart->getArbitratorId();
+ transactionId = newPart->getTransactionId();
+ clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+ machineId = newPart->getMachineId();
+
+ TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (previouslySeenPart == NULL)
+ partCount++;
+
+ if (previouslySeenPart != NULL) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previouslySeenPart->releaseRef();
+ previouslySeenPart->setDead();
+ } else if (newPart->isLastPart()) {
+ missingParts = new Hashset<int32_t>();
+ hasLastPart = true;
+
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
+ }
+ }
+ }
+
+ if (!fldisComplete && hasLastPart) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts->remove(newPart->getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts->size() == 0) {
+
+ // We have all the parts
+ fldisComplete = true;
+
+ // Decode all the parts and create the key value guard and update sets
+ decodeTransactionData();
+ }
+ }
+}
+
+void Transaction::addUpdateKV(KeyValue *kv) {
+ keyValueUpdateSet->add(kv);
+}
+
+void Transaction::addGuardKV(KeyValue *kv) {
+ keyValueGuardSet->add(kv);
+}
+
+
+int64_t Transaction::getSequenceNumber() {
+ return sequenceNumber;
+}
+
+void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
+ sequenceNumber = _sequenceNumber;
+
+ for (uint32_t i = 0; i < parts->size(); i++) {
+ TransactionPart *tp = parts->get(i);
+ if (tp != NULL)
+ tp->setSequenceNumber(sequenceNumber);
+ }
+}
+
+int64_t Transaction::getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
+}
+
+Vector<TransactionPart *> *Transaction::getParts() {
+ return parts;
+}
+
+bool Transaction::didSendAPartToServer() {
+ return flddidSendAPartToServer;
+}
+
+void Transaction::resetNextPartToSend() {
+ nextPartToSend = 0;
+}
+
+TransactionPart *Transaction::getNextPartToSend() {
+ if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
+ return NULL;
+ }
+ TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
+ nextPartToSend++;
+ return part;
+}
+
+
+void Transaction::setServerFailure() {
+ hadServerFailure = true;
+}
+
+bool Transaction::getServerFailure() {
+ return hadServerFailure;
+}
+
+
+void Transaction::resetServerFailure() {
+ hadServerFailure = false;
+}
+
+
+void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
+ transactionStatus = _transactionStatus;
+}
+
+TransactionStatus *Transaction::getTransactionStatus() {
+ return transactionStatus;
+}
+
+void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
+ nextPartToSend = 0;
+ bool changed = false;
+ uint lastusedindex = 0;
+ for (uint i = 0; i < partsPendingSend->size(); i++) {
+ int32_t parti = partsPendingSend->get(i);
+ for (uint j = 0; j < sentParts->size(); j++) {
+ int32_t partj = sentParts->get(j);
+ if (parti == partj) {
+ changed = true;
+ goto NextElement;
+ }
+ }
+ partsPendingSend->set(lastusedindex++, parti);
+NextElement:
+ ;
+ }
+ if (changed) {
+ partsPendingSend->setSize(lastusedindex);
+ flddidSendAPartToServer = true;
+ transactionStatus->setTransactionSequenceNumber(sequenceNumber);
+ }
+}
+
+bool Transaction::didSendAllParts() {
+ return partsPendingSend->isEmpty();
+}
+
+Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+}
+
+int Transaction::getNumberOfParts() {
+ return partCount;
+}
+
+int64_t Transaction::getMachineId() {
+ return machineId;
+}
+
+int64_t Transaction::getArbitrator() {
+ return arbitratorId;
+}
+
+bool Transaction::isComplete() {
+ return fldisComplete;
+}
+
+Pair<int64_t, int64_t> *Transaction::getId() {
+ return &transactionId;
+}
+
+void Transaction::setDead() {
+ if (!isDead) {
+ // Set dead
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+ TransactionPart *part = parts->get(partNumber);
+ if (part != NULL)
+ part->setDead();
+ }
+ }
+}
+
+void Transaction::decodeTransactionData() {
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (uint i = 0; i < parts->size(); i++) {
+ TransactionPart *tp = parts->get(i);
+ dataSize += tp->getDataSize();
+ }
+
+ Array<char> *combinedData = new Array<char>(dataSize);
+ int currentPosition = 0;
+
+ // Stitch all the data sections together
+ for (uint i = 0; i < parts->size(); i++) {
+ TransactionPart *tp = parts->get(i);
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
+
+ // Decoder Object
+ ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVGuards = bbDecode->getInt();
+ int numberOfKVUpdates = bbDecode->getInt();
+
+ // Decode all the guard key values
+ for (int i = 0; i < numberOfKVGuards; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueGuardSet->add(kv);
+ }
+
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueUpdateSet->add(kv);
+ }
+ delete bbDecode;
+}
+
+bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kvGuard = kvit->next();
+ // First check if the key is in the speculative table, this is the value of the latest assumption
+ KeyValue *kv = NULL;
+
+ // If we have a speculation table then use it first
+ if (pendingTransactionSpeculatedKeyValueTable != NULL) {
+ kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
+ }
+
+ // If we have a speculation table then use it first
+ if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
+ kv = speculatedKeyValueTable->get(kvGuard->getKey());
+ }
+
+ if (kv == NULL) {
+ // if it is not in the speculative table then check the committed table and use that
+ // value as our latest assumption
+ kv = committedKeyValueTable->get(kvGuard->getKey());
+ }
+
+ if (kvGuard->getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
+
+
+ if (kv != NULL) {
+ printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
+ } else {
+ printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());
+ }
+ delete kvit;
+ return false;
+ }
+ } else {
+ if (kv != NULL) {
+ delete kvit;
+ return false;
+ }
+ }
+ }
+ delete kvit;
+ return true;
+}
+