1 #include "PendingTransaction.h"
4 #include "Transaction.h"
5 #include "TransactionPart.h"
6 #include "ByteBuffer.h"
8 PendingTransaction::PendingTransaction(int64_t _machineId) :
9 keyValueUpdateSet(new Hashset<KeyValue *>()),
10 keyValueGuardSet(new Hashset<KeyValue *>()),
12 clientLocalSequenceNumber(-1),
13 machineId(_machineId),
17 PendingTransaction::~PendingTransaction() {
18 delete keyValueUpdateSet;
19 delete keyValueGuardSet;
23 * Add a new key value to the updates
26 void PendingTransaction::addKV(KeyValue *newKV) {
27 KeyValue *rmKV = NULL;
29 // Make sure there are no duplicates
30 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
31 while (kvit->hasNext()) {
32 KeyValue *kv = kvit->next();
33 if (kv->getKey()->equals(newKV->getKey())) {
34 // Remove key if we are adding a newer version of the same key
41 // Remove key if we are adding a newer version of the same key
43 keyValueUpdateSet->remove(rmKV);
44 currentDataSize -= rmKV->getSize();
47 // Add the key to the hash set
48 keyValueUpdateSet->add(newKV);
49 currentDataSize += newKV->getSize();
53 * Add a new key value to the guard set
56 void PendingTransaction::addKVGuard(KeyValue *newKV) {
57 // Add the key to the hash set
58 keyValueGuardSet->add(newKV);
59 currentDataSize += newKV->getSize();
63 * Checks if the arbitrator is the same
65 bool PendingTransaction::checkArbitrator(int64_t arb) {
66 if (arbitrator == -1) {
70 return arb == arbitrator;
73 bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *keyValTableCommitted, Hashtable<IoTString *, KeyValue *> *keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> *keyValTablePendingTransSpeculative) {
74 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
75 while (kvit->hasNext()) {
76 KeyValue *kvGuard = kvit->next();
77 // First check if the key is in the speculative table, this is the
78 // value of the latest assumption
79 KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
83 // if it is not in the pending trans table then check the
84 // speculative table and use that value as our latest assumption
85 kv = keyValTableSpeculative->get(kvGuard->getKey());
90 // if it is not in the speculative table then check the
91 // committed table and use that value as our latest assumption
92 kv = keyValTableCommitted->get(kvGuard->getKey());
95 if (kvGuard->getValue() != NULL) {
96 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
111 Transaction *PendingTransaction::createTransaction() {
112 Transaction *newTransaction = new Transaction();
113 int transactionPartCount = 0;
115 // Convert all the data into a char array so we can start partitioning
116 Array<char> *charData = convertDataToBytes();
118 int currentPosition = 0;
119 for (int remaining = charData->length(); remaining > 0;) {
120 bool isLastPart = false;
121 // determine how much to copy
122 int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
123 if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
124 copySize = remaining;
125 isLastPart = true;//last bit of data so last part
128 // Copy to a smaller version
129 Array<char> *partData = new Array<char>(copySize);
130 System_arraycopy(charData, currentPosition, partData, 0, copySize);
132 TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
133 newTransaction->addPartEncode(part);
135 // Update position, count and remaining
136 currentPosition += copySize;
137 transactionPartCount++;
138 remaining -= copySize;
142 // Add the Guard Conditions
143 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
144 while (kvit->hasNext()) {
145 KeyValue *kv = kvit->next();
146 newTransaction->addGuardKV(kv);
151 kvit = keyValueUpdateSet->iterator();
152 while (kvit->hasNext()) {
153 KeyValue *kv = kvit->next();
154 newTransaction->addUpdateKV(kv);
157 return newTransaction;
160 Array<char> *PendingTransaction::convertDataToBytes() {
161 // Calculate the size of the data
162 int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
163 sizeOfData += currentDataSize;
165 // Data handlers and storage
166 Array<char> *dataArray = new Array<char>(sizeOfData);
167 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
169 // Encode the size of the updates and guard sets
170 bbEncode->putInt(keyValueGuardSet->size());
171 bbEncode->putInt(keyValueUpdateSet->size());
173 // Encode all the guard conditions
174 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
175 while (kvit->hasNext()) {
176 KeyValue *kv = kvit->next();
177 kv->encode(bbEncode);
181 // Encode all the updates
182 kvit = keyValueUpdateSet->iterator();
183 while (kvit->hasNext()) {
184 KeyValue *kv = kvit->next();
185 kv->encode(bbEncode);
189 Array<char> *array = bbEncode->array();
190 bbEncode->releaseArray();