rename files
[iotcloud.git] / version2 / src / C / PendingTransaction.cpp
diff --git a/version2/src/C/PendingTransaction.cpp b/version2/src/C/PendingTransaction.cpp
new file mode 100644 (file)
index 0000000..c0d32a3
--- /dev/null
@@ -0,0 +1,194 @@
+#include "PendingTransaction.h"
+#include "KeyValue.h"
+#include "IoTString.h"
+#include "Transaction.h"
+#include "TransactionPart.h"
+#include "ByteBuffer.h"
+
+PendingTransaction::PendingTransaction(int64_t _machineId) :
+       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       keyValueGuardSet(new Hashset<KeyValue *>()),
+       arbitrator(-1),
+       clientLocalSequenceNumber(-1),
+       machineId(_machineId),
+       currentDataSize(0) {
+}
+
+PendingTransaction::~PendingTransaction() {
+       delete keyValueUpdateSet;
+       delete keyValueGuardSet;
+}
+
+/**
+ * Add a new key value to the updates
+ *
+ */
+void PendingTransaction::addKV(KeyValue *newKV) {
+       KeyValue *rmKV = NULL;
+
+       // Make sure there are no duplicates
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               if (kv->getKey()->equals(newKV->getKey())) {
+                       // Remove key if we are adding a newer version of the same key
+                       rmKV = kv;
+                       break;
+               }
+       }
+       delete kvit;
+
+       // Remove key if we are adding a newer version of the same key
+       if (rmKV != NULL) {
+               keyValueUpdateSet->remove(rmKV);
+               currentDataSize -= rmKV->getSize();
+       }
+
+       // Add the key to the hash set
+       keyValueUpdateSet->add(newKV);
+       currentDataSize += newKV->getSize();
+}
+
+/**
+ * Add a new key value to the guard set
+ *
+ */
+void PendingTransaction::addKVGuard(KeyValue *newKV) {
+       // Add the key to the hash set
+       keyValueGuardSet->add(newKV);
+       currentDataSize += newKV->getSize();
+}
+
+/**
+ * Checks if the arbitrator is the same
+ */
+bool PendingTransaction::checkArbitrator(int64_t arb) {
+       if (arbitrator == -1) {
+               arbitrator = arb;
+               return true;
+       }
+       return arb == arbitrator;
+}
+
+bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *keyValTableCommitted, Hashtable<IoTString *, KeyValue *> *keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> *keyValTablePendingTransSpeculative) {
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kvGuard = kvit->next();
+               // First check if the key is in the speculative table, this is the
+               // value of the latest assumption
+               KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
+
+
+               if (kv == NULL) {
+                       // if it is not in the pending trans table then check the
+                       // speculative table and use that value as our latest assumption
+                       kv = keyValTableSpeculative->get(kvGuard->getKey());
+               }
+
+
+               if (kv == NULL) {
+                       // if it is not in the speculative table then check the
+                       // committed table and use that value as our latest assumption
+                       kv = keyValTableCommitted->get(kvGuard->getKey());
+               }
+
+               if (kvGuard->getValue() != NULL) {
+                       if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
+                               delete kvit;
+                               return false;
+                       }
+               } else {
+                       if (kv != NULL) {
+                               delete kvit;
+                               return false;
+                       }
+               }
+       }
+       delete kvit;
+       return true;
+}
+
+Transaction *PendingTransaction::createTransaction() {
+       Transaction *newTransaction = new Transaction();
+       int transactionPartCount = 0;
+
+       // Convert all the data into a char array so we can start partitioning
+       Array<char> *charData = convertDataToBytes();
+
+       int currentPosition = 0;
+       for (int remaining = charData->length(); remaining > 0;) {
+               bool isLastPart = false;
+               // determine how much to copy
+               int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
+               if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
+                       copySize = remaining;
+                       isLastPart = true;//last bit of data so last part
+               }
+
+               // Copy to a smaller version
+               Array<char> *partData = new Array<char>(copySize);
+               System_arraycopy(charData, currentPosition, partData, 0, copySize);
+
+               TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
+               newTransaction->addPartEncode(part);
+               part->releaseRef();
+               
+               // Update position, count and remaining
+               currentPosition += copySize;
+               transactionPartCount++;
+               remaining -= copySize;
+       }
+       delete charData;
+       
+       // Add the Guard Conditions
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               newTransaction->addGuardKV(kv);
+       }
+       delete kvit;
+
+       //  Add the updates
+       kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               newTransaction->addUpdateKV(kv);
+       }
+       delete kvit;
+       return newTransaction;
+}
+
+Array<char> *PendingTransaction::convertDataToBytes() {
+       // Calculate the size of the data
+       int sizeOfData = 2 * sizeof(int32_t);   // Number of Update KV's and Guard KV's
+       sizeOfData += currentDataSize;
+
+       // Data handlers and storage
+       Array<char> *dataArray = new Array<char>(sizeOfData);
+       ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
+
+       // Encode the size of the updates and guard sets
+       bbEncode->putInt(keyValueGuardSet->size());
+       bbEncode->putInt(keyValueUpdateSet->size());
+
+       // Encode all the guard conditions
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kv->encode(bbEncode);
+       }
+       delete kvit;
+
+       // Encode all the updates
+       kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kv->encode(bbEncode);
+       }
+       delete kvit;
+
+       Array<char> *array = bbEncode->array();
+       bbEncode->releaseArray();
+       delete bbEncode;
+       return array;
+}