+#include "PendingTransaction.h"
+#include "KeyValue.h"
+#include "IoTString.h"
+#include "Transaction.h"
+#include "TransactionPart.h"
+#include "ByteBuffer.h"
+
+PendingTransaction::PendingTransaction(int64_t _machineId) :
+ keyValueUpdateSet(new Hashset<KeyValue *>()),
+ keyValueGuardSet(new Hashset<KeyValue *>()),
+ arbitrator(-1),
+ clientLocalSequenceNumber(-1),
+ machineId(_machineId),
+ currentDataSize(0) {
+}
+
+PendingTransaction::~PendingTransaction() {
+ delete keyValueUpdateSet;
+ delete keyValueGuardSet;
+}
+
+/**
+ * Add a new key value to the updates
+ *
+ */
+void PendingTransaction::addKV(KeyValue *newKV) {
+ KeyValue *rmKV = NULL;
+
+ // Make sure there are no duplicates
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ if (kv->getKey()->equals(newKV->getKey())) {
+ // Remove key if we are adding a newer version of the same key
+ rmKV = kv;
+ break;
+ }
+ }
+ delete kvit;
+
+ // Remove key if we are adding a newer version of the same key
+ if (rmKV != NULL) {
+ keyValueUpdateSet->remove(rmKV);
+ currentDataSize -= rmKV->getSize();
+ }
+
+ // Add the key to the hash set
+ keyValueUpdateSet->add(newKV);
+ currentDataSize += newKV->getSize();
+}
+
+/**
+ * Add a new key value to the guard set
+ *
+ */
+void PendingTransaction::addKVGuard(KeyValue *newKV) {
+ // Add the key to the hash set
+ keyValueGuardSet->add(newKV);
+ currentDataSize += newKV->getSize();
+}
+
+/**
+ * Checks if the arbitrator is the same
+ */
+bool PendingTransaction::checkArbitrator(int64_t arb) {
+ if (arbitrator == -1) {
+ arbitrator = arb;
+ return true;
+ }
+ return arb == arbitrator;
+}
+
+bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *keyValTableCommitted, Hashtable<IoTString *, KeyValue *> *keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> *keyValTablePendingTransSpeculative) {
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kvGuard = kvit->next();
+ // First check if the key is in the speculative table, this is the
+ // value of the latest assumption
+ KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
+
+
+ if (kv == NULL) {
+ // if it is not in the pending trans table then check the
+ // speculative table and use that value as our latest assumption
+ kv = keyValTableSpeculative->get(kvGuard->getKey());
+ }
+
+
+ if (kv == NULL) {
+ // if it is not in the speculative table then check the
+ // committed table and use that value as our latest assumption
+ kv = keyValTableCommitted->get(kvGuard->getKey());
+ }
+
+ if (kvGuard->getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
+ delete kvit;
+ return false;
+ }
+ } else {
+ if (kv != NULL) {
+ delete kvit;
+ return false;
+ }
+ }
+ }
+ delete kvit;
+ return true;
+}
+
+Transaction *PendingTransaction::createTransaction() {
+ Transaction *newTransaction = new Transaction();
+ int transactionPartCount = 0;
+
+ // Convert all the data into a char array so we can start partitioning
+ Array<char> *charData = convertDataToBytes();
+
+ int currentPosition = 0;
+ for (int remaining = charData->length(); remaining > 0;) {
+ bool isLastPart = false;
+ // determine how much to copy
+ int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true;//last bit of data so last part
+ }
+
+ // Copy to a smaller version
+ Array<char> *partData = new Array<char>(copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
+
+ TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
+ newTransaction->addPartEncode(part);
+ part->releaseRef();
+
+ // Update position, count and remaining
+ currentPosition += copySize;
+ transactionPartCount++;
+ remaining -= copySize;
+ }
+ delete charData;
+
+ // Add the Guard Conditions
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ newTransaction->addGuardKV(kv);
+ }
+ delete kvit;
+
+ // Add the updates
+ kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ newTransaction->addUpdateKV(kv);
+ }
+ delete kvit;
+ return newTransaction;
+}
+
+Array<char> *PendingTransaction::convertDataToBytes() {
+ // Calculate the size of the data
+ int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
+ sizeOfData += currentDataSize;
+
+ // Data handlers and storage
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
+
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueGuardSet->size());
+ bbEncode->putInt(keyValueUpdateSet->size());
+
+ // Encode all the guard conditions
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kv->encode(bbEncode);
+ }
+ delete kvit;
+
+ // Encode all the updates
+ kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kv->encode(bbEncode);
+ }
+ delete kvit;
+
+ Array<char> *array = bbEncode->array();
+ bbEncode->releaseArray();
+ delete bbEncode;
+ return array;
+}