+++ /dev/null
-#include "PendingTransaction.h"
-#include "KeyValue.h"
-#include "IoTString.h"
-#include "Transaction.h"
-#include "TransactionPart.h"
-#include "ByteBuffer.h"
-
-PendingTransaction::PendingTransaction(int64_t _machineId) :
- keyValueUpdateSet(new Hashset<KeyValue *>()),
- keyValueGuardSet(new Hashset<KeyValue *>()),
- arbitrator(-1),
- clientLocalSequenceNumber(-1),
- machineId(_machineId),
- currentDataSize(0) {
-}
-
-PendingTransaction::~PendingTransaction() {
- delete keyValueUpdateSet;
- delete keyValueGuardSet;
-}
-
-/**
- * Add a new key value to the updates
- *
- */
-void PendingTransaction::addKV(KeyValue *newKV) {
- KeyValue *rmKV = NULL;
-
- // Make sure there are no duplicates
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- if (kv->getKey()->equals(newKV->getKey())) {
- // Remove key if we are adding a newer version of the same key
- rmKV = kv;
- break;
- }
- }
- delete kvit;
-
- // Remove key if we are adding a newer version of the same key
- if (rmKV != NULL) {
- keyValueUpdateSet->remove(rmKV);
- currentDataSize -= rmKV->getSize();
- }
-
- // Add the key to the hash set
- keyValueUpdateSet->add(newKV);
- currentDataSize += newKV->getSize();
-}
-
-/**
- * Add a new key value to the guard set
- *
- */
-void PendingTransaction::addKVGuard(KeyValue *newKV) {
- // Add the key to the hash set
- keyValueGuardSet->add(newKV);
- currentDataSize += newKV->getSize();
-}
-
-/**
- * Checks if the arbitrator is the same
- */
-bool PendingTransaction::checkArbitrator(int64_t arb) {
- if (arbitrator == -1) {
- arbitrator = arb;
- return true;
- }
- return arb == arbitrator;
-}
-
-bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *keyValTableCommitted, Hashtable<IoTString *, KeyValue *> *keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> *keyValTablePendingTransSpeculative) {
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kvGuard = kvit->next();
- // First check if the key is in the speculative table, this is the
- // value of the latest assumption
- KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
-
-
- if (kv == NULL) {
- // if it is not in the pending trans table then check the
- // speculative table and use that value as our latest assumption
- kv = keyValTableSpeculative->get(kvGuard->getKey());
- }
-
-
- if (kv == NULL) {
- // if it is not in the speculative table then check the
- // committed table and use that value as our latest assumption
- kv = keyValTableCommitted->get(kvGuard->getKey());
- }
-
- if (kvGuard->getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
- delete kvit;
- return false;
- }
- } else {
- if (kv != NULL) {
- delete kvit;
- return false;
- }
- }
- }
- delete kvit;
- return true;
-}
-
-Transaction *PendingTransaction::createTransaction() {
- Transaction *newTransaction = new Transaction();
- int transactionPartCount = 0;
-
- // Convert all the data into a char array so we can start partitioning
- Array<char> *charData = convertDataToBytes();
-
- int currentPosition = 0;
- for (int remaining = charData->length(); remaining > 0;) {
- bool isLastPart = false;
- // determine how much to copy
- int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
- if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true;//last bit of data so last part
- }
-
- // Copy to a smaller version
- Array<char> *partData = new Array<char>(copySize);
- System_arraycopy(charData, currentPosition, partData, 0, copySize);
-
- TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
- newTransaction->addPartEncode(part);
- part->releaseRef();
-
- // Update position, count and remaining
- currentPosition += copySize;
- transactionPartCount++;
- remaining -= copySize;
- }
- delete charData;
-
- // Add the Guard Conditions
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- newTransaction->addGuardKV(kv);
- }
- delete kvit;
-
- // Add the updates
- kvit = keyValueUpdateSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- newTransaction->addUpdateKV(kv);
- }
- delete kvit;
- return newTransaction;
-}
-
-Array<char> *PendingTransaction::convertDataToBytes() {
- // Calculate the size of the data
- int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
- sizeOfData += currentDataSize;
-
- // Data handlers and storage
- Array<char> *dataArray = new Array<char>(sizeOfData);
- ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
-
- // Encode the size of the updates and guard sets
- bbEncode->putInt(keyValueGuardSet->size());
- bbEncode->putInt(keyValueUpdateSet->size());
-
- // Encode all the guard conditions
- SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- kv->encode(bbEncode);
- }
- delete kvit;
-
- // Encode all the updates
- kvit = keyValueUpdateSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- kv->encode(bbEncode);
- }
- delete kvit;
-
- Array<char> *array = bbEncode->array();
- bbEncode->releaseArray();
- delete bbEncode;
- return array;
-}