rename files
[iotcloud.git] / version2 / src / C / Transaction.cpp
diff --git a/version2/src/C/Transaction.cpp b/version2/src/C/Transaction.cpp
new file mode 100644 (file)
index 0000000..b0d3c5b
--- /dev/null
@@ -0,0 +1,347 @@
+#include "Transaction.h"
+#include "TransactionPart.h"
+#include "KeyValue.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+#include "TransactionStatus.h"
+
+Transaction::Transaction() :
+       parts(new Vector<TransactionPart *>()),
+       partCount(0),
+       missingParts(NULL),
+       partsPendingSend(new Vector<int32_t>()),
+       fldisComplete(false),
+       hasLastPart(false),
+       keyValueGuardSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       isDead(false),
+       sequenceNumber(-1),
+       clientLocalSequenceNumber(-1),
+       arbitratorId(-1),
+       machineId(-1),
+       transactionId(Pair<int64_t, int64_t>(0,0)),
+       nextPartToSend(0),
+       flddidSendAPartToServer(false),
+       transactionStatus(NULL),
+       hadServerFailure(false) {
+}
+
+Transaction::~Transaction() {
+       if (missingParts)
+               delete missingParts;
+       {
+               uint Size = parts->size();
+               for(uint i=0; i<Size; i++)
+                       parts->get(i)->releaseRef();
+               delete parts;
+       }
+       {
+               SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+               while (kvit->hasNext()) {
+                       KeyValue *kvGuard = kvit->next();
+                       delete kvGuard;
+               }
+               delete kvit;
+               delete keyValueGuardSet;
+       }
+       {
+               SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
+               while (kvit->hasNext()) {
+                       KeyValue *kvUpdate = kvit->next();
+                       delete kvUpdate;
+               }
+               delete kvit;
+               delete keyValueUpdateSet;
+       }
+       delete partsPendingSend;
+}
+
+void Transaction::addPartEncode(TransactionPart *newPart) {
+       newPart->acquireRef();
+       printf("Add part %d\n", newPart->getPartNumber());
+       TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (old == NULL) {
+               partCount++;
+       } else {
+               old->releaseRef();
+       }
+       partsPendingSend->add(newPart->getPartNumber());
+
+       sequenceNumber = newPart->getSequenceNumber();
+       arbitratorId = newPart->getArbitratorId();
+       transactionId = newPart->getTransactionId();
+       clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+       machineId = newPart->getMachineId();
+       fldisComplete = true;
+}
+
+void Transaction::addPartDecode(TransactionPart *newPart) {
+       if (isDead) {
+               // If dead then just kill this part and move on
+               newPart->setDead();
+               return;
+       }
+       newPart->acquireRef();
+       sequenceNumber = newPart->getSequenceNumber();
+       arbitratorId = newPart->getArbitratorId();
+       transactionId = newPart->getTransactionId();
+       clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+       machineId = newPart->getMachineId();
+
+       TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (previouslySeenPart == NULL)
+               partCount++;
+
+       if (previouslySeenPart != NULL) {
+               // Set dead the old one since the new one is a rescued version of this part
+               previouslySeenPart->releaseRef();
+               previouslySeenPart->setDead();
+       } else if (newPart->isLastPart()) {
+               missingParts = new Hashset<int32_t>();
+               hasLastPart = true;
+
+               for (int i = 0; i < newPart->getPartNumber(); i++) {
+                       if (parts->get(i) == NULL) {
+                               missingParts->add(i);
+                       }
+               }
+       }
+
+       if (!fldisComplete && hasLastPart) {
+
+               // We have seen this part so remove it from the set of missing parts
+               missingParts->remove(newPart->getPartNumber());
+
+               // Check if all the parts have been seen
+               if (missingParts->size() == 0) {
+
+                       // We have all the parts
+                       fldisComplete = true;
+
+                       // Decode all the parts and create the key value guard and update sets
+                       decodeTransactionData();
+               }
+       }
+}
+
+void Transaction::addUpdateKV(KeyValue *kv) {
+       keyValueUpdateSet->add(kv);
+}
+
+void Transaction::addGuardKV(KeyValue *kv) {
+       keyValueGuardSet->add(kv);
+}
+
+
+int64_t Transaction::getSequenceNumber() {
+       return sequenceNumber;
+}
+
+void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
+       sequenceNumber = _sequenceNumber;
+
+       for (uint32_t i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               if (tp != NULL)
+                       tp->setSequenceNumber(sequenceNumber);
+       }
+}
+
+int64_t Transaction::getClientLocalSequenceNumber() {
+       return clientLocalSequenceNumber;
+}
+
+Vector<TransactionPart *> *Transaction::getParts() {
+       return parts;
+}
+
+bool Transaction::didSendAPartToServer() {
+       return flddidSendAPartToServer;
+}
+
+void Transaction::resetNextPartToSend() {
+       nextPartToSend = 0;
+}
+
+TransactionPart *Transaction::getNextPartToSend() {
+       if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
+               return NULL;
+       }
+       TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
+       nextPartToSend++;
+       return part;
+}
+
+
+void Transaction::setServerFailure() {
+       hadServerFailure = true;
+}
+
+bool Transaction::getServerFailure() {
+       return hadServerFailure;
+}
+
+
+void Transaction::resetServerFailure() {
+       hadServerFailure = false;
+}
+
+
+void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
+       transactionStatus = _transactionStatus;
+}
+
+TransactionStatus *Transaction::getTransactionStatus() {
+       return transactionStatus;
+}
+
+void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
+       nextPartToSend = 0;
+       bool changed = false;
+       uint lastusedindex = 0;
+       for (uint i = 0; i < partsPendingSend->size(); i++) {
+               int32_t parti = partsPendingSend->get(i);
+               for (uint j = 0; j < sentParts->size(); j++) {
+                       int32_t partj = sentParts->get(j);
+                       if (parti == partj) {
+                               changed = true;
+                               goto NextElement;
+                       }
+               }
+               partsPendingSend->set(lastusedindex++, parti);
+NextElement:
+               ;
+       }
+       if (changed) {
+               partsPendingSend->setSize(lastusedindex);
+               flddidSendAPartToServer = true;
+               transactionStatus->setTransactionSequenceNumber(sequenceNumber);
+       }
+}
+
+bool Transaction::didSendAllParts() {
+       return partsPendingSend->isEmpty();
+}
+
+Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
+       return keyValueUpdateSet;
+}
+
+int Transaction::getNumberOfParts() {
+       return partCount;
+}
+
+int64_t Transaction::getMachineId() {
+       return machineId;
+}
+
+int64_t Transaction::getArbitrator() {
+       return arbitratorId;
+}
+
+bool Transaction::isComplete() {
+       return fldisComplete;
+}
+
+Pair<int64_t, int64_t> *Transaction::getId() {
+       return &transactionId;
+}
+
+void Transaction::setDead() {
+       if (!isDead) {
+               // Set dead
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+                       TransactionPart *part = parts->get(partNumber);
+                       if (part != NULL)
+                               part->setDead();
+               }
+       }
+}
+
+void Transaction::decodeTransactionData() {
+       // Calculate the size of the data section
+       int dataSize = 0;
+       for (uint i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               dataSize += tp->getDataSize();
+       }
+
+       Array<char> *combinedData = new Array<char>(dataSize);
+       int currentPosition = 0;
+
+       // Stitch all the data sections together
+       for (uint i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+               currentPosition += tp->getDataSize();
+       }
+
+       // Decoder Object
+       ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
+
+       // Decode how many key value pairs need to be decoded
+       int numberOfKVGuards = bbDecode->getInt();
+       int numberOfKVUpdates = bbDecode->getInt();
+
+       // Decode all the guard key values
+       for (int i = 0; i < numberOfKVGuards; i++) {
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueGuardSet->add(kv);
+       }
+
+       // Decode all the updates key values
+       for (int i = 0; i < numberOfKVUpdates; i++) {
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueUpdateSet->add(kv);
+       }
+       delete bbDecode;
+}
+
+bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kvGuard = kvit->next();
+               // First check if the key is in the speculative table, this is the value of the latest assumption
+               KeyValue *kv = NULL;
+
+               // If we have a speculation table then use it first
+               if (pendingTransactionSpeculatedKeyValueTable != NULL) {
+                       kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
+               }
+
+               // If we have a speculation table then use it first
+               if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
+                       kv = speculatedKeyValueTable->get(kvGuard->getKey());
+               }
+
+               if (kv == NULL) {
+                       // if it is not in the speculative table then check the committed table and use that
+                       // value as our latest assumption
+                       kv = committedKeyValueTable->get(kvGuard->getKey());
+               }
+
+               if (kvGuard->getValue() != NULL) {
+                       if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
+
+
+                               if (kv != NULL) {
+                                       printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
+                               } else {
+                                       printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
+                               }
+                               delete kvit;
+                               return false;
+                       }
+               } else {
+                       if (kv != NULL) {
+                               delete kvit;
+                               return false;
+                       }
+               }
+       }
+       delete kvit;
+       return true;
+}
+