edits
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2 #include "TransactionPart.h"
3 #include "KeyValue.h"
4 #include "ByteBuffer.h"
5 #include "IoTString.h"
6 #include "TransactionStatus.h"
7
8 Transaction::Transaction() :
9         parts(new Vector<TransactionPart *>()),
10         partCount(0),
11         missingParts(NULL),
12         partsPendingSend(new Vector<int32_t>()),
13         fldisComplete(false),
14         hasLastPart(false),
15         keyValueGuardSet(new Hashset<KeyValue *>()),
16         keyValueUpdateSet(new Hashset<KeyValue *>()),
17         isDead(false),
18         sequenceNumber(-1),
19         clientLocalSequenceNumber(-1),
20         arbitratorId(-1),
21         machineId(-1),
22         transactionId(Pair<int64_t, int64_t>(0,0)),
23         nextPartToSend(0),
24         flddidSendAPartToServer(false),
25         transactionStatus(NULL),
26         hadServerFailure(false) {
27 }
28
29 Transaction::~Transaction() {
30         if (missingParts)
31                 delete missingParts;
32         {
33                 delete parts;
34         }
35         {
36                 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
37                 while (kvit->hasNext()) {
38                         KeyValue *kvGuard = kvit->next();
39                         delete kvGuard;
40                 }
41                 delete kvit;
42                 delete keyValueGuardSet;
43         }
44         {
45                 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
46                 while (kvit->hasNext()) {
47                         KeyValue *kvUpdate = kvit->next();
48                         delete kvUpdate;
49                 }
50                 delete kvit;
51                 delete keyValueUpdateSet;
52         }
53         delete partsPendingSend;
54 }
55
56 void Transaction::addPartEncode(TransactionPart *newPart) {
57         TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
58         if (old == NULL) {
59                 partCount++;
60         } else
61                 delete old;
62         partsPendingSend->add(newPart->getPartNumber());
63
64         sequenceNumber = newPart->getSequenceNumber();
65         arbitratorId = newPart->getArbitratorId();
66         transactionId = newPart->getTransactionId();
67         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
68         machineId = newPart->getMachineId();
69         fldisComplete = true;
70 }
71
72 void Transaction::addPartDecode(TransactionPart *newPart) {
73         if (isDead) {
74                 // If dead then just kill this part and move on
75                 newPart->setDead();
76                 return;
77         }
78
79         sequenceNumber = newPart->getSequenceNumber();
80         arbitratorId = newPart->getArbitratorId();
81         transactionId = newPart->getTransactionId();
82         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
83         machineId = newPart->getMachineId();
84
85         TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
86         if (previouslySeenPart == NULL)
87                 partCount++;
88
89         if (previouslySeenPart != NULL) {
90                 // Set dead the old one since the new one is a rescued version of this part
91                 previouslySeenPart->setDead();
92         } else if (newPart->isLastPart()) {
93                 missingParts = new Hashset<int32_t>();
94                 hasLastPart = true;
95
96                 for (int i = 0; i < newPart->getPartNumber(); i++) {
97                         if (parts->get(i) == NULL) {
98                                 missingParts->add(i);
99                         }
100                 }
101         }
102
103         if (!fldisComplete && hasLastPart) {
104
105                 // We have seen this part so remove it from the set of missing parts
106                 missingParts->remove(newPart->getPartNumber());
107
108                 // Check if all the parts have been seen
109                 if (missingParts->size() == 0) {
110
111                         // We have all the parts
112                         fldisComplete = true;
113
114                         // Decode all the parts and create the key value guard and update sets
115                         decodeTransactionData();
116                 }
117         }
118 }
119
120 void Transaction::addUpdateKV(KeyValue *kv) {
121         keyValueUpdateSet->add(kv);
122 }
123
124 void Transaction::addGuardKV(KeyValue *kv) {
125         keyValueGuardSet->add(kv);
126 }
127
128
129 int64_t Transaction::getSequenceNumber() {
130         return sequenceNumber;
131 }
132
133 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
134         sequenceNumber = _sequenceNumber;
135
136         for (uint32_t i = 0; i < parts->size(); i++) {
137                 TransactionPart *tp = parts->get(i);
138                 if (tp != NULL)
139                         tp->setSequenceNumber(sequenceNumber);
140         }
141 }
142
143 int64_t Transaction::getClientLocalSequenceNumber() {
144         return clientLocalSequenceNumber;
145 }
146
147 Vector<TransactionPart *> *Transaction::getParts() {
148         return parts;
149 }
150
151 bool Transaction::didSendAPartToServer() {
152         return flddidSendAPartToServer;
153 }
154
155 void Transaction::resetNextPartToSend() {
156         nextPartToSend = 0;
157 }
158
159 TransactionPart *Transaction::getNextPartToSend() {
160         if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
161                 return NULL;
162         }
163         TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
164         nextPartToSend++;
165         return part;
166 }
167
168
169 void Transaction::setServerFailure() {
170         hadServerFailure = true;
171 }
172
173 bool Transaction::getServerFailure() {
174         return hadServerFailure;
175 }
176
177
178 void Transaction::resetServerFailure() {
179         hadServerFailure = false;
180 }
181
182
183 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
184         transactionStatus = _transactionStatus;
185 }
186
187 TransactionStatus *Transaction::getTransactionStatus() {
188         return transactionStatus;
189 }
190
191 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
192         nextPartToSend = 0;
193         bool changed = false;
194         uint lastusedindex = 0;
195         for (uint i = 0; i < partsPendingSend->size(); i++) {
196                 int32_t parti = partsPendingSend->get(i);
197                 for (uint j = 0; j < sentParts->size(); j++) {
198                         int32_t partj = sentParts->get(j);
199                         if (parti == partj) {
200                                 changed = true;
201                                 goto NextElement;
202                         }
203                 }
204                 partsPendingSend->set(lastusedindex++, parti);
205 NextElement:
206                 ;
207         }
208         if (changed) {
209                 partsPendingSend->setSize(lastusedindex);
210                 flddidSendAPartToServer = true;
211                 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
212         }
213 }
214
215 bool Transaction::didSendAllParts() {
216         return partsPendingSend->isEmpty();
217 }
218
219 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
220         return keyValueUpdateSet;
221 }
222
223 int Transaction::getNumberOfParts() {
224         return partCount;
225 }
226
227 int64_t Transaction::getMachineId() {
228         return machineId;
229 }
230
231 int64_t Transaction::getArbitrator() {
232         return arbitratorId;
233 }
234
235 bool Transaction::isComplete() {
236         return fldisComplete;
237 }
238
239 Pair<int64_t, int64_t> *Transaction::getId() {
240         return &transactionId;
241 }
242
243 void Transaction::setDead() {
244         if (!isDead) {
245                 // Set dead
246                 isDead = true;
247                 // Make all the parts of this transaction dead
248                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
249                         TransactionPart *part = parts->get(partNumber);
250                         if (part != NULL)
251                                 part->setDead();
252                 }
253         }
254 }
255
256 TransactionPart *Transaction::getPart(int index) {
257         return parts->get(index);
258 }
259
260 void Transaction::decodeTransactionData() {
261         // Calculate the size of the data section
262         int dataSize = 0;
263         for (uint i = 0; i < parts->size(); i++) {
264                 TransactionPart *tp = parts->get(i);
265                 dataSize += tp->getDataSize();
266         }
267
268         Array<char> *combinedData = new Array<char>(dataSize);
269         int currentPosition = 0;
270
271         // Stitch all the data sections together
272         for (uint i = 0; i < parts->size(); i++) {
273                 TransactionPart *tp = parts->get(i);
274                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
275                 currentPosition += tp->getDataSize();
276         }
277
278         // Decoder Object
279         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
280
281         // Decode how many key value pairs need to be decoded
282         int numberOfKVGuards = bbDecode->getInt();
283         int numberOfKVUpdates = bbDecode->getInt();
284
285         // Decode all the guard key values
286         for (int i = 0; i < numberOfKVGuards; i++) {
287                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
288                 keyValueGuardSet->add(kv);
289         }
290
291         // Decode all the updates key values
292         for (int i = 0; i < numberOfKVUpdates; i++) {
293                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
294                 keyValueUpdateSet->add(kv);
295         }
296         delete bbDecode;
297 }
298
299 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
300         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
301         while (kvit->hasNext()) {
302                 KeyValue *kvGuard = kvit->next();
303                 // First check if the key is in the speculative table, this is the value of the latest assumption
304                 KeyValue *kv = NULL;
305
306                 // If we have a speculation table then use it first
307                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
308                         kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
309                 }
310
311                 // If we have a speculation table then use it first
312                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
313                         kv = speculatedKeyValueTable->get(kvGuard->getKey());
314                 }
315
316                 if (kv == NULL) {
317                         // if it is not in the speculative table then check the committed table and use that
318                         // value as our latest assumption
319                         kv = committedKeyValueTable->get(kvGuard->getKey());
320                 }
321
322                 if (kvGuard->getValue() != NULL) {
323                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
324
325
326                                 if (kv != NULL) {
327                                         printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
328                                 } else {
329                                         printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
330                                 }
331                                 delete kvit;
332                                 return false;
333                         }
334                 } else {
335                         if (kv != NULL) {
336                                 delete kvit;
337                                 return false;
338                         }
339                 }
340         }
341         delete kvit;
342         return true;
343 }
344