1 #include "PendingTransaction.h"
3 PendingTransaction::PendingTransaction(int64_t _machineId) :
4 keyValueUpdateSet(new Hashset<KeyValue *>()),
5 keyValueGuardSet(new HashSet<KeyValue *>()),
7 clientLocalSequenceNumber(-1),
13 * Add a new key value to the updates
16 void PendingTransaction::addKV(KeyValue *newKV) {
20 // Make sure there are no duplicates
21 for (KeyValue kv : keyValueUpdateSet) {
22 if (kv.getKey().equals(newKV.getKey())) {
24 // Remove key if we are adding a newer version of the same key
30 // Remove key if we are adding a newer version of the same key
32 keyValueUpdateSet.remove(rmKV);
33 currentDataSize -= rmKV.getSize();
36 // Add the key to the hash set
37 keyValueUpdateSet.add(newKV);
38 currentDataSize += newKV.getSize();
42 * Add a new key value to the guard set
45 void PendingTransaction::addKVGuard(KeyValue *newKV) {
46 // Add the key to the hash set
47 keyValueGuardSet.add(newKV);
48 currentDataSize += newKV.getSize();
52 * Checks if the arbitrator is the same
54 bool PendingTransaction::checkArbitrator(int64_t arb) {
55 if (arbitrator == -1) {
60 return arb == arbitrator;
63 bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> keyValTableCommitted, Hashtable<IoTString *, KeyValue *> keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> keyValTablePendingTransSpeculative) {
64 for (KeyValue kvGuard : keyValueGuardSet) {
66 // First check if the key is in the speculative table, this is the value of the latest assumption
67 KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey());
71 // if it is not in the pending trans table then check the speculative table and use that
72 // value as our latest assumption
73 kv = keyValTableSpeculative.get(kvGuard.getKey());
78 // if it is not in the speculative table then check the committed table and use that
79 // value as our latest assumption
80 kv = keyValTableCommitted.get(kvGuard.getKey());
83 if (kvGuard.getValue() != NULL) {
84 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
96 Transaction *PendingTransaction::createTransaction() {
97 Transaction *newTransaction = new Transaction();
98 int transactionPartCount = 0;
100 // Convert all the data into a char array so we can start partitioning
101 Array<char> *charData = convertDataToBytes();
103 int currentPosition = 0;
104 int remaining = charData.length;
106 while (remaining > 0) {
108 bool isLastPart = false;
109 // determine how much to copy
110 int copySize = TransactionPart.MAX_NON_HEADER_SIZE;
111 if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) {
112 copySize = remaining;
113 isLastPart = true;// last bit of data so last part
116 // Copy to a smaller version
117 char[] partData = new char[copySize];
118 System.arraycopy(charData, currentPosition, partData, 0, copySize);
120 TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
121 newTransaction.addPartEncode(part);
123 // Update position, count and remaining
124 currentPosition += copySize;
125 transactionPartCount++;
126 remaining -= copySize;
129 // Add the Guard Conditions
130 for (KeyValue kv : keyValueGuardSet) {
131 newTransaction.addGuardKV(kv);
135 for (KeyValue kv : keyValueUpdateSet) {
136 newTransaction.addUpdateKV(kv);
139 return newTransaction;
142 Arrar<char> *PendingTransaction::convertDataToBytes() {
143 // Calculate the size of the data
144 int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
145 sizeOfData += currentDataSize;
147 // Data handlers and storage
148 Array<char> *dataArray = new Array<char>(sizeOfData);
149 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
151 // Encode the size of the updates and guard sets
152 bbEncode->putInt(keyValueGuardSet.size());
153 bbEncode->putInt(keyValueUpdateSet.size());
155 // Encode all the guard conditions
156 for (KeyValue kv : keyValueGuardSet) {
157 kv->encode(bbEncode);
160 // Encode all the updates
161 for (KeyValue kv : keyValueUpdateSet) {
162 kv->encode(bbEncode);
165 return bbEncode->array();