Updates
[iotcloud.git] / version2 / src / C / Table.cpp
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(1),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(1),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchMyVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotMyVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localSequenceNumber(0),
112         localTransactionSequenceNumber(1),
113         lastTransactionSequenceNumberSpeculatedOn(0),
114         oldestTransactionSequenceNumberSpeculatedOn(0),
115         localArbitrationSequenceNumber(1),
116         hadPartialSendToServer(false),
117         attemptedToSendToServer(false),
118         expectedsize(0),
119         didFindTableStatus(false),
120         currMaxSize(0),
121         lastSlotAttemptedToSend(NULL),
122         lastIsNewKey(false),
123         lastNewSize(0),
124         lastTransactionPartsSent(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchMyVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotMyVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 Table::~Table() {
160         delete cloud;
161         delete random;
162         delete buffer;
163         // init data structs
164         delete committedKeyValueTable;
165         delete speculatedKeyValueTable;
166         delete pendingTransactionSpeculatedKeyValueTable;
167         delete liveNewKeyTable;
168         {
169                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170                 while (lmit->hasNext()) {
171                         Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
172                         delete pair;
173                 }
174                 delete lmit;
175                 delete lastMessageTable;
176         }
177         if (pendingTransactionBuilder != NULL)
178                 delete pendingTransactionBuilder;
179         {
180                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchMyVectorTable);
181                 while(rmit->hasNext()) {
182                         int64_t machineid = rmit->next();
183                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchMyVectorTable->get(machineid);
184                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185                         while (mit->hasNext()) {
186                                 RejectedMessage * rm = mit->next();
187                                 delete rm;
188                         }
189                         delete mit;
190                         delete rmset;
191                 }
192                 delete rmit;
193                 delete rejectedMessageWatchMyVectorTable;
194         }
195         delete arbitratorTable;
196         delete liveAbortTable;
197         {
198                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199                 while (partsit->hasNext()) {
200                         int64_t machineId = partsit->next();
201                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202                         SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
203                         while(pit->hasNext()) {
204                                 Pair<int64_t, int32_t> * pair=pit->next();
205                                 pit->currVal()->releaseRef();
206                         }
207                         delete pit;
208                         
209                         delete parts;
210                 }
211                 delete partsit;
212                 delete newTransactionParts;
213         }
214         {
215                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
216                 while (partsit->hasNext()) {
217                         int64_t machineId = partsit->next();
218                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
219                         SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
220                         while(pit->hasNext()) {
221                                 Pair<int64_t, int32_t> * pair=pit->next();
222                                 pit->currVal()->releaseRef();
223                         }
224                         delete pit;
225                         delete parts;
226                 }
227                 delete partsit;
228                 delete newCommitParts;
229         }
230         delete lastArbitratedTransactionNumberByArbitratorTable;
231         delete liveTransactionBySequenceNumberTable;
232         delete liveTransactionByTransactionIdTable;
233         {
234                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
235                 while (liveit->hasNext()) {
236                         int64_t arbitratorId = liveit->next();
237                         
238                         // Get all the commits for a specific arbitrator
239                         Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
240                         {
241                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
242                                 while (clientit->hasNext()) {
243                                         int64_t id = clientit->next();
244                                         delete commitForClientTable->get(id);
245                                 }
246                                 delete clientit;
247                         }
248                         
249                         delete commitForClientTable;
250                 }
251                 delete liveit;
252                 delete liveCommitsTable;
253         }
254         delete liveCommitsByKeyTable;
255         delete lastCommitSeenSequenceNumberByArbitratorTable;
256         delete rejectedSlotMyVector;
257         {
258                 uint size = pendingTransactionQueue->size();
259                 for (uint iter = 0; iter < size; iter++) {
260                         delete pendingTransactionQueue->get(iter);
261                 }
262                 delete pendingTransactionQueue;
263         }
264         delete pendingSendArbitrationEntriesToDelete;
265         {
266                 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
267                 while (trit->hasNext()) {
268                         Transaction *transaction = trit->next();
269                         delete trit->currVal();
270                 }
271                 delete trit;
272                 delete transactionPartsSent;
273         }
274         delete outstandingTransactionStatus;
275         delete liveAbortsGeneratedByLocal;
276         delete offlineTransactionsCommittedAndAtServer;
277         delete localCommunicationTable;
278         delete lastTransactionSeenFromMachineFromServer;
279         {
280                 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
281                         delete pendingSendArbitrationRounds->get(i);
282                 }
283                 delete pendingSendArbitrationRounds;
284         }
285         if (lastTransactionPartsSent != NULL)
286                 delete lastTransactionPartsSent;
287         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
288         if (lastNewKey)
289                 delete lastNewKey;
290 }
291
292 /**
293  * Init all the stuff needed for for table usage
294  */
295 void Table::init() {
296         // Init helper objects
297         random = new SecureRandom();
298         buffer = new SlotBuffer();
299
300         // init data structs
301         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
302         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
303         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
304         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
305         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
306         rejectedMessageWatchMyVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
307         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
308         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
309         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
310         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
311         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
312         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
313         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
314         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
315         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
316         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
317         rejectedSlotMyVector = new MyVector<int64_t>();
318         pendingTransactionQueue = new MyVector<Transaction *>();
319         pendingSendArbitrationEntriesToDelete = new MyVector<Entry *>();
320         transactionPartsSent = new Hashtable<Transaction *, MyVector<int32_t> *>();
321         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
322         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
323         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
324         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
325         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
326         pendingSendArbitrationRounds = new MyVector<ArbitrationRound *>();
327         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
328
329         // Other init stuff
330         numberOfSlots = buffer->capacity();
331         setResizeThreshold();
332 }
333
334 /**
335  * Initialize the table by inserting a table status as the first entry
336  * into the table status also initialize the crypto stuff.
337  */
338 void Table::initTable() {
339         cloud->initSecurity();
340         // Create the first insertion into the block chain which is the table status
341         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
342         localSequenceNumber++;
343         TableStatus *status = new TableStatus(s, numberOfSlots);
344         s->addShallowEntry(status);
345         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
346         if (array == NULL) {
347                 array = new Array<Slot *>(1);
348                 array->set(0, s);
349                 // update local block chain
350                 validateAndUpdate(array, true);
351                 delete array;
352         } else if (array->length() == 1) {
353                 // in case we did push the slot BUT we failed to init it
354                 validateAndUpdate(array, true);
355                 delete s;
356                 delete array;
357         } else {
358                 delete s;
359                 delete array;
360                 //throw new Error("Error on initialization");
361                 myerror("Error on initialization\n");
362         }
363 }
364
365 /**
366  * Rebuild the table from scratch by pulling the latest block chain
367  * from the server.
368  */
369 void Table::rebuild() {
370         // Just pull the latest slots from the server
371         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
372         validateAndUpdate(newslots, true);
373         delete newslots;
374         sendToServer(NULL);
375         updateLiveTransactionsAndStatus();
376 }
377
378 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
379         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
380 }
381
382 int64_t Table::getArbitrator(IoTString *key) {
383         return arbitratorTable->get(key);
384 }
385
386 void Table::close() {
387         cloud->closeCloud();
388 }
389
390 IoTString *Table::getCommitted(IoTString *key)  {
391         KeyValue *kv = committedKeyValueTable->get(key);
392
393         if (kv != NULL) {
394                 return kv->getValue()->acquireRef();
395         } else {
396                 return NULL;
397         }
398 }
399
400 IoTString *Table::getSpeculative(IoTString *key) {
401         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
402
403         if (kv == NULL) {
404                 kv = speculatedKeyValueTable->get(key);
405         }
406
407         if (kv == NULL) {
408                 kv = committedKeyValueTable->get(key);
409         }
410
411         if (kv != NULL) {
412                 return kv->getValue()->acquireRef();
413         } else {
414                 return NULL;
415         }
416 }
417
418 IoTString *Table::getCommittedAtomic(IoTString *key) {
419         KeyValue *kv = committedKeyValueTable->get(key);
420
421         if (!arbitratorTable->contains(key)) {
422                 //              throw new Error("Key not Found.");
423                 myerror("Key not found!\n");
424         }
425
426         // Make sure new key value pair matches the current arbitrator
427         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
428                 // TODO: Maybe not throw en error
429                 //throw new Error("Not all Key Values Match Arbitrator.");
430                 myerror("Not all key values match arbitrator\n");
431         }
432
433         if (kv != NULL) {
434                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
435                 return kv->getValue()->acquireRef();
436         } else {
437                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
438                 return NULL;
439         }
440 }
441
442 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
443         if (!arbitratorTable->contains(key)) {
444                 //throw new Error("Key not Found.");
445                 myerror("Key not found\n");
446         }
447
448         // Make sure new key value pair matches the current arbitrator
449         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
450                 // TODO: Maybe not throw en error
451                 //throw new Error("Not all Key Values Match Arbitrator.");
452                 myerror("Not all Key Values Match Arbitrator.\n");
453         }
454
455         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
456
457         if (kv == NULL) {
458                 kv = speculatedKeyValueTable->get(key);
459         }
460
461         if (kv == NULL) {
462                 kv = committedKeyValueTable->get(key);
463         }
464
465         if (kv != NULL) {
466                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
467                 return kv->getValue()->acquireRef();
468         } else {
469                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
470                 return NULL;
471         }
472 }
473
474 bool Table::update()  {
475         //try {
476                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
477                 validateAndUpdate(newSlots, false);
478                 delete newSlots;
479                 sendToServer(NULL);
480                 updateLiveTransactionsAndStatus();
481                 return true;
482                 /*      } catch (Exception *e) {
483                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
484                 while (kit->hasNext()) {
485                         int64_t m = kit->next();
486                         updateFromLocal(m);
487                 }
488                 delete kit;
489                 }*/
490
491         return false;
492 }
493
494 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
495         while (true) {
496                 if (arbitratorTable->contains(keyName)) {
497                         // There is already an arbitrator
498                         return false;
499                 }
500                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
501
502                 if (sendToServer(newKey)) {
503                         // If successfully inserted
504                         return true;
505                 }
506         }
507 }
508
509 void Table::startTransaction() {
510         // Create a new transaction, invalidates any old pending transactions.
511         if (pendingTransactionBuilder != NULL)
512                 delete pendingTransactionBuilder;
513         pendingTransactionBuilder = new PendingTransaction(localMachineId);
514 }
515
516 void Table::put(IoTString *key, IoTString *value) {
517         // Make sure it is a valid key
518         if (!arbitratorTable->contains(key)) {
519                 //throw new Error("Key not Found.");
520                 myerror("Key not Found.\n");
521         }
522
523         // Make sure new key value pair matches the current arbitrator
524         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
525                 // TODO: Maybe not throw en error
526                 //throw new Error("Not all Key Values Match Arbitrator.");
527                 myerror("Not all Key Values Match Arbitrator.\n");
528         }
529
530         // Add the key value to this transaction
531         KeyValue *kv = new KeyValue(key->acquireRef(), value->acquireRef());
532         pendingTransactionBuilder->addKV(kv);
533 }
534
535 TransactionStatus *Table::commitTransaction() {
536         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
537                 // transaction with no updates will have no effect on the system
538                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
539         }
540
541         // Set the local transaction sequence number and increment
542         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
543         localTransactionSequenceNumber++;
544
545         // Create the transaction status
546         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
547
548         // Create the new transaction
549         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
550         newTransaction->setTransactionStatus(transactionStatus);
551
552         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
553                 // Add it to the queue and invalidate the builder for safety
554                 pendingTransactionQueue->add(newTransaction);
555         } else {
556                 arbitrateOnLocalTransaction(newTransaction);
557                 delete newTransaction;
558                 updateLiveStateFromLocal();
559         }
560         if (pendingTransactionBuilder != NULL)
561                 delete pendingTransactionBuilder;
562         
563         pendingTransactionBuilder = new PendingTransaction(localMachineId);
564
565         //      try {
566                 sendToServer(NULL);
567                 /*      } catch (ServerException *e) {
568
569                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
570                 uint size = pendingTransactionQueue->size();
571                 uint oldindex = 0;
572                 for (uint iter = 0; iter < size; iter++) {
573                         Transaction *transaction = pendingTransactionQueue->get(iter);
574                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
575
576                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
577                                 // Already contacted this client so ignore all attempts to contact this client
578                                 // to preserve ordering for arbitrator
579                                 continue;
580                         }
581
582                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
583
584                         if (sendReturn.getFirst()) {
585                                 // Failed to contact over local
586                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
587                         } else {
588                                 // Successful contact or should not contact
589
590                                 if (sendReturn.getSecond()) {
591                                         // did arbitrate
592                                         delete transaction;
593                                         oldindex--;
594                                 }
595                         }
596                 }
597                 pendingTransactionQueue->setSize(oldindex);
598                 }*/
599
600         updateLiveStateFromLocal();
601
602         return transactionStatus;
603 }
604
605 /**
606  * Recalculate the new resize threshold
607  */
608 void Table::setResizeThreshold() {
609         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
610         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
611 }
612
613 int64_t Table::getLocalSequenceNumber() {
614         return localSequenceNumber;
615 }
616
617 void Table::processTransactionList(bool handlePartial) {
618         SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *)getKeyIterator(lastTransactionPartsSent);
619         while (trit->hasNext()) {
620                 Transaction *transaction = trit->next();
621                 transaction->resetServerFailure();
622                 // Update which transactions parts still need to be sent
623                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
624                 // Add the transaction status to the outstanding list
625                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
626                 
627                 // Update the transaction status
628                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
629                 
630                 // Check if all the transaction parts were successfully
631                 // sent and if so then remove it from pending
632                 if (transaction->didSendAllParts()) {
633                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
634                         pendingTransactionQueue->remove(transaction);
635                         delete transaction;
636                 } else if (handlePartial) {
637                         transaction->resetServerFailure();
638                         // Set the transaction sequence number back to nothing
639                         if (!transaction->didSendAPartToServer()) {
640                                 transaction->setSequenceNumber(-1);
641                         }
642                 }
643         }
644         delete trit;
645 }
646
647 NewKey * Table::handlePartialSend(NewKey * newKey) {
648         //Didn't receive acknowledgement for last send
649         //See if the server has received a newer slot
650         
651         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
652         if (newSlots->length() == 0) {
653                 //Retry sending old slot
654                 bool wasInserted = false;
655                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
656                 
657                 if (sendSlotsReturn) {
658                         lastSlotAttemptedToSend = NULL;
659                         if (newKey != NULL) {
660                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
661                                         delete newKey;
662                                         newKey = NULL;
663                                 }
664                         }
665                         processTransactionList(false);
666                 } else {
667                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
668                                 if (newKey != NULL) {
669                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
670                                                 delete newKey;
671                                                 newKey = NULL;
672                                         }
673                                 }
674                                 processTransactionList(true);
675                         }
676                 }
677                 
678                 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *)getKeyIterator(lastTransactionPartsSent);
679                 while (trit->hasNext()) {
680                         Transaction *transaction = trit->next();
681                         transaction->resetServerFailure();
682                         // Set the transaction sequence number back to nothing
683                         if (!transaction->didSendAPartToServer()) {
684                                 transaction->setSequenceNumber(-1);
685                         }
686                 }
687                 delete trit;
688                 
689                 if (newSlots->length() != 0) {
690                         // insert into the local block chain
691                         validateAndUpdate(newSlots, true);
692                 }
693         } else {
694                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
695                         if (newKey != NULL) {
696                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
697                                         delete newKey;
698                                         newKey = NULL;
699                                 }
700                         }
701
702                         processTransactionList(true);
703                 } else {
704                         SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(lastTransactionPartsSent);
705                         while (trit->hasNext()) {
706                                 Transaction *transaction = trit->next();
707                                 transaction->resetServerFailure();
708                                 // Set the transaction sequence number back to nothing
709                                 if (!transaction->didSendAPartToServer()) {
710                                         transaction->setSequenceNumber(-1);
711                                 }
712                         }
713                         delete trit;
714                 }
715                 
716                 // insert into the local block chain
717                 validateAndUpdate(newSlots, true);
718         }
719         delete newSlots;
720         return newKey;
721 }
722
723 void Table::clearSentParts() {
724         // Clear the sent data since we are trying again
725         pendingSendArbitrationEntriesToDelete->clear();
726         SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
727         while (trit->hasNext()) {
728                 Transaction *transaction = trit->next();
729                 delete trit->currVal();
730         }
731         delete trit;
732         transactionPartsSent->clear();
733 }
734
735 bool Table::sendToServer(NewKey *newKey) {
736         if (hadPartialSendToServer) {
737                 newKey = handlePartialSend(newKey);
738         }
739
740         //      try {
741                 // While we have stuff that needs inserting into the block chain
742                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
743                         if (hadPartialSendToServer) {
744                                 //                              throw new Error("Should Be error free");
745                                 myerror("Should Be error free\n");
746                         }
747                         
748                         // If there is a new key with same name then end
749                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
750                                 delete newKey;
751                                 return false;
752                         }
753
754                         // Create the slot
755                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
756                         localSequenceNumber++;
757
758                         // Try to fill the slot with data
759                         int newSize = 0;
760                         bool insertedNewKey = false;
761                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
762
763                         if (needsResize) {
764                                 // Reset which transaction to send
765                                 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
766                                 while (trit->hasNext()) {
767                                         Transaction *transaction = trit->next();
768                                         transaction->resetNextPartToSend();
769
770                                         // Set the transaction sequence number back to nothing
771                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772                                                 transaction->setSequenceNumber(-1);
773                                         }
774                                 }
775                                 delete trit;
776
777                                 // Clear the sent data since we are trying again
778                                 clearSentParts();
779                                         
780                                 // We needed a resize so try again
781                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
782                         }
783                         if (lastSlotAttemptedToSend != NULL)
784                                 delete lastSlotAttemptedToSend;
785                         
786                         lastSlotAttemptedToSend = slot;
787                         lastIsNewKey = (newKey != NULL);
788                         lastInsertedNewKey = insertedNewKey;
789                         lastNewSize = newSize;
790                         if (( newKey != lastNewKey) && (lastNewKey != NULL))
791                                 delete lastNewKey;
792                         lastNewKey = newKey;
793                         if (lastTransactionPartsSent != NULL)
794                                 delete lastTransactionPartsSent;
795                         lastTransactionPartsSent = transactionPartsSent->clone();
796
797                         Array<Slot *> * newSlots = NULL;
798                         bool wasInserted = false;
799                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
800
801                         if (sendSlotsReturn) {
802                                 lastSlotAttemptedToSend = NULL;
803                                 // Did insert into the block chain
804                                 if (insertedNewKey) {
805                                         // This slot was what was inserted not a previous slot
806                                         // New Key was successfully inserted into the block chain so dont want to insert it again
807                                         newKey = NULL;
808                                 }
809
810                                 // Remove the aborts and commit parts that were sent from the pending to send queue
811                                 uint size = pendingSendArbitrationRounds->size();
812                                 uint oldcount = 0;
813                                 for (uint i = 0; i < size; i++) {
814                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
815                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
816
817                                         if (!round->isDoneSending()) {
818                                                 //Add part back in
819                                                 pendingSendArbitrationRounds->set(oldcount++,
820                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
821                                         } else
822                                                 delete pendingSendArbitrationRounds->get(i);
823                                 }
824                                 pendingSendArbitrationRounds->setSize(oldcount);
825                                 processTransactionList(false);
826                         } else {
827                                 // Reset which transaction to send
828                                 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
829                                 while (trit->hasNext()) {
830                                         Transaction *transaction = trit->next();
831                                         transaction->resetNextPartToSend();
832
833                                         // Set the transaction sequence number back to nothing
834                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
835                                                 transaction->setSequenceNumber(-1);
836                                         }
837                                 }
838                                 delete trit;
839                         }
840
841                         // Clear the sent data in preparation for next send
842                         clearSentParts();
843
844                         if (newSlots->length() != 0) {
845                                 // insert into the local block chain
846                                 validateAndUpdate(newSlots, true);
847                         }
848                         delete newSlots;
849                 }
850                 /*      } catch (ServerException *e) {
851                 if (e->getType() != ServerException_TypeInputTimeout) {
852                         // Nothing was able to be sent to the server so just clear these data structures
853                         SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
854                         while (trit->hasNext()) {
855                                 Transaction *transaction = trit->next();
856                                 transaction->resetNextPartToSend();
857
858                                 // Set the transaction sequence number back to nothing
859                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
860                                         transaction->setSequenceNumber(-1);
861                                 }
862                         }
863                         delete trit;
864                 } else {
865                         // There was a partial send to the server
866                         hadPartialSendToServer = true;
867
868                         // Nothing was able to be sent to the server so just clear these data structures
869                         SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
870                         while (trit->hasNext()) {
871                                 Transaction *transaction = trit->next();
872                                 transaction->resetNextPartToSend();
873                                 transaction->setServerFailure();
874                         }
875                         delete trit;
876                 }
877
878                 clearSentParts();
879
880                 throw e;
881                 }*/
882
883         return newKey == NULL;
884 }
885
886 bool Table::updateFromLocal(int64_t machineId) {
887         if (!localCommunicationTable->contains(machineId))
888                 return false;
889
890         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
891
892         // Get the size of the send data
893         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
894
895         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
896         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
897                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
898         }
899
900         Array<char> *sendData = new Array<char>(sendDataSize);
901         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
902
903         // Encode the data
904         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
905         bbEncode->putInt(0);
906
907         // Send by local
908         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
909         localSequenceNumber++;
910
911         if (returnData == NULL) {
912                 // Could not contact server
913                 return false;
914         }
915
916         // Decode the data
917         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
918         int numberOfEntries = bbDecode->getInt();
919
920         for (int i = 0; i < numberOfEntries; i++) {
921                 char type = bbDecode->get();
922                 if (type == TypeAbort) {
923                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
924                         processEntry(abort);
925                 } else if (type == TypeCommitPart) {
926                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
927                         processEntry(commitPart);
928                 }
929         }
930
931         updateLiveStateFromLocal();
932
933         return true;
934 }
935
936 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
937
938         // Get the devices local communications
939         if (!localCommunicationTable->contains(transaction->getArbitrator()))
940                 return Pair<bool, bool>(true, false);
941
942         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
943
944         // Get the size of the send data
945         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
946         {
947                 MyVector<TransactionPart *> *tParts = transaction->getParts();
948                 uint tPartsSize = tParts->size();
949                 for (uint i = 0; i < tPartsSize; i++) {
950                         TransactionPart *part = tParts->get(i);
951                         sendDataSize += part->getSize();
952                 }
953         }
954
955         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
956         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
957                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
958         }
959
960         // Make the send data size
961         Array<char> *sendData = new Array<char>(sendDataSize);
962         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
963
964         // Encode the data
965         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
966         bbEncode->putInt(transaction->getParts()->size());
967         {
968                 MyVector<TransactionPart *> *tParts = transaction->getParts();
969                 uint tPartsSize = tParts->size();
970                 for (uint i = 0; i < tPartsSize; i++) {
971                         TransactionPart *part = tParts->get(i);
972                         part->encode(bbEncode);
973                 }
974         }
975
976         // Send by local
977         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
978         localSequenceNumber++;
979
980         if (returnData == NULL) {
981                 // Could not contact server
982                 return Pair<bool, bool>(true, false);
983         }
984
985         // Decode the data
986         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
987         bool didCommit = bbDecode->get() == 1;
988         bool couldArbitrate = bbDecode->get() == 1;
989         int numberOfEntries = bbDecode->getInt();
990         bool foundAbort = false;
991
992         for (int i = 0; i < numberOfEntries; i++) {
993                 char type = bbDecode->get();
994                 if (type == TypeAbort) {
995                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
996
997                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
998                                 foundAbort = true;
999                         }
1000
1001                         processEntry(abort);
1002                 } else if (type == TypeCommitPart) {
1003                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1004                         processEntry(commitPart);
1005                 }
1006         }
1007
1008         updateLiveStateFromLocal();
1009
1010         if (couldArbitrate) {
1011                 TransactionStatus *status =  transaction->getTransactionStatus();
1012                 if (didCommit) {
1013                         status->setStatus(TransactionStatus_StatusCommitted);
1014                 } else {
1015                         status->setStatus(TransactionStatus_StatusAborted);
1016                 }
1017         } else {
1018                 TransactionStatus *status =  transaction->getTransactionStatus();
1019                 if (foundAbort) {
1020                         status->setStatus(TransactionStatus_StatusAborted);
1021                 } else {
1022                         status->setStatus(TransactionStatus_StatusCommitted);
1023                 }
1024         }
1025
1026         return Pair<bool, bool>(false, true);
1027 }
1028
1029 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1030         // Decode the data
1031         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1032         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1033         int numberOfParts = bbDecode->getInt();
1034
1035         // If we did commit a transaction or not
1036         bool didCommit = false;
1037         bool couldArbitrate = false;
1038
1039         if (numberOfParts != 0) {
1040
1041                 // decode the transaction
1042                 Transaction *transaction = new Transaction();
1043                 for (int i = 0; i < numberOfParts; i++) {
1044                         bbDecode->get();
1045                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1046                         transaction->addPartDecode(newPart);
1047                 }
1048
1049                 // Arbitrate on transaction and pull relevant return data
1050                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1051                 couldArbitrate = localArbitrateReturn.getFirst();
1052                 didCommit = localArbitrateReturn.getSecond();
1053
1054                 updateLiveStateFromLocal();
1055
1056                 // Transaction was sent to the server so keep track of it to prevent double commit
1057                 if (transaction->getSequenceNumber() != -1) {
1058                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1059                 }
1060         }
1061
1062         // The data to send back
1063         int returnDataSize = 0;
1064         MyVector<Entry *> *unseenArbitrations = new MyVector<Entry *>();
1065
1066         // Get the aborts to send back
1067         MyVector<int64_t> *abortLocalSequenceNumbers = new MyVector<int64_t>();
1068         {
1069                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1070                 while (abortit->hasNext())
1071                         abortLocalSequenceNumbers->add(abortit->next());
1072                 delete abortit;
1073         }
1074
1075         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1076
1077         uint asize = abortLocalSequenceNumbers->size();
1078         for (uint i = 0; i < asize; i++) {
1079                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1080                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1081                         continue;
1082                 }
1083
1084                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1085                 unseenArbitrations->add(abort);
1086                 returnDataSize += abort->getSize();
1087         }
1088
1089         // Get the commits to send back
1090         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1091         if (commitForClientTable != NULL) {
1092                 MyVector<int64_t> *commitLocalSequenceNumbers = new MyVector<int64_t>();
1093                 {
1094                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1095                         while (commitit->hasNext())
1096                                 commitLocalSequenceNumbers->add(commitit->next());
1097                         delete commitit;
1098                 }
1099                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1100
1101                 uint clsSize = commitLocalSequenceNumbers->size();
1102                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1103                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1104                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1105
1106                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1107                                 continue;
1108                         }
1109
1110                         {
1111                                 MyVector<CommitPart *> *parts = commit->getParts();
1112                                 uint nParts = parts->size();
1113                                 for (uint i = 0; i < nParts; i++) {
1114                                         CommitPart *commitPart = parts->get(i);
1115                                         unseenArbitrations->add(commitPart);
1116                                         returnDataSize += commitPart->getSize();
1117                                 }
1118                         }
1119                 }
1120         }
1121
1122         // Number of arbitration entries to decode
1123         returnDataSize += 2 * sizeof(int32_t);
1124
1125         // bool of did commit or not
1126         if (numberOfParts != 0) {
1127                 returnDataSize += sizeof(char);
1128         }
1129
1130         // Data to send Back
1131         Array<char> *returnData = new Array<char>(returnDataSize);
1132         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1133
1134         if (numberOfParts != 0) {
1135                 if (didCommit) {
1136                         bbEncode->put((char)1);
1137                 } else {
1138                         bbEncode->put((char)0);
1139                 }
1140                 if (couldArbitrate) {
1141                         bbEncode->put((char)1);
1142                 } else {
1143                         bbEncode->put((char)0);
1144                 }
1145         }
1146
1147         bbEncode->putInt(unseenArbitrations->size());
1148         uint size = unseenArbitrations->size();
1149         for (uint i = 0; i < size; i++) {
1150                 Entry *entry = unseenArbitrations->get(i);
1151                 entry->encode(bbEncode);
1152         }
1153
1154         localSequenceNumber++;
1155         return returnData;
1156 }
1157
1158 /** Checks whether a given slot was sent using new slots in
1159                 array. Returns true if sent and false otherwise.  */
1160
1161 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1162         uint size = array->length();
1163         for (uint i = 0; i < size; i++) {
1164                 Slot *s = array->get(i);
1165                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1166                         return true;
1167                 }
1168         }
1169         
1170         //Also need to see if other machines acknowledged our message
1171         for (uint i = 0; i < size; i++) {
1172                 Slot *s = array->get(i);
1173                 
1174                 // Process each entry in the slot
1175                 MyVector<Entry *> *entries = s->getEntries();
1176                 uint eSize = entries->size();
1177                 for (uint ei = 0; ei < eSize; ei++) {
1178                         Entry *entry = entries->get(ei);
1179                         
1180                         if (entry->getType() == TypeLastMessage) {
1181                                 LastMessage *lastMessage = (LastMessage *)entry;
1182                                 
1183                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1184                                         return true;
1185                                 }
1186                         }
1187                 }
1188         }
1189         //Not found
1190         return false;
1191 }
1192
1193 /** Method tries to send slot to server.  Returns status in tuple.
1194                 isInserted returns whether last un-acked send (if any) was
1195                 successful.  Returns whether send was confirmed.x
1196  */
1197
1198 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1199         attemptedToSendToServer = true;
1200
1201         *array = cloud->putSlot(slot, newSize);
1202         if (*array == NULL) {
1203                 *array = new Array<Slot *>(1);
1204                 (*array)->set(0, slot);
1205                 rejectedSlotMyVector->clear();
1206                 *isInserted = false;
1207                 return true;
1208         } else {
1209                 if ((*array)->length() == 0) {
1210                         //                      throw new Error("Server Error: Did not send any slots");
1211                         myerror("Server Error: Did not send any slots\n");
1212                 }
1213
1214                 if (hadPartialSendToServer) {
1215                         *isInserted = checkSend(*array, slot);
1216
1217                         if (!(*isInserted)) {
1218                                 rejectedSlotMyVector->add(slot->getSequenceNumber());
1219                         }
1220                         
1221                         return false;
1222                 } else {
1223                         rejectedSlotMyVector->add(slot->getSequenceNumber());
1224                         *isInserted = false;
1225                         return false;
1226                 }
1227         }
1228 }
1229
1230 /**
1231  * Returns true if a resize was needed but not done.
1232  */
1233 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1234         newSize = 0;//special value to indicate no resize
1235         if (liveSlotCount > bufferResizeThreshold) {
1236                 resize = true;//Resize is forced
1237         }
1238
1239         if (resize) {
1240                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1241                 TableStatus *status = new TableStatus(slot, newSize);
1242                 slot->addShallowEntry(status);
1243         }
1244
1245         // Fill with rejected slots first before doing anything else
1246         doRejectedMessages(slot);
1247
1248         // Do mandatory rescue of entries
1249         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1250
1251         // Extract working variables
1252         bool needsResize = mandatoryRescueReturn.getFirst();
1253         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1254         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1255
1256         if (needsResize && !resize) {
1257                 // We need to resize but we are not resizing so return true to force on retry
1258                 return true;
1259         }
1260
1261         insertedKey = false;
1262         if (newKeyEntry != NULL) {
1263                 newKeyEntry->setSlot(slot);
1264                 if (slot->hasSpace(newKeyEntry)) {
1265                         slot->addEntry(newKeyEntry);
1266                         insertedKey = true;
1267                 }
1268         }
1269
1270         // Clear the transactions, aborts and commits that were sent previously
1271         clearSentParts();
1272         uint size = pendingSendArbitrationRounds->size();
1273         for (uint i = 0; i < size; i++) {
1274                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1275                 bool isFull = false;
1276                 round->generateParts();
1277                 MyVector<Entry *> *parts = round->getParts();
1278
1279                 // Insert pending arbitration data
1280                 uint vsize = parts->size();
1281                 for (uint vi = 0; vi < vsize; vi++) {
1282                         Entry *arbitrationData = parts->get(vi);
1283
1284                         // If it is an abort then we need to set some information
1285                         if (arbitrationData->getType() == TypeAbort) {
1286                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1287                         }
1288
1289                         if (!slot->hasSpace(arbitrationData)) {
1290                                 // No space so cant do anything else with these data entries
1291                                 isFull = true;
1292                                 break;
1293                         }
1294
1295                         // Add to this current slot and add it to entries to delete
1296                         slot->addEntry(arbitrationData);
1297                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1298                 }
1299
1300                 if (isFull) {
1301                         break;
1302                 }
1303         }
1304
1305         if (pendingTransactionQueue->size() > 0) {
1306                 Transaction *transaction = pendingTransactionQueue->get(0);
1307                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1308                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1309                         transaction->setSequenceNumber(slot->getSequenceNumber());
1310                 }
1311
1312                 while (true) {
1313                         TransactionPart *part = transaction->getNextPartToSend();
1314                         if (part == NULL) {
1315                                 // Ran out of parts to send for this transaction so move on
1316                                 break;
1317                         }
1318
1319                         if (slot->hasSpace(part)) {
1320                                 slot->addEntry(part);
1321                                 MyVector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1322                                 if (partsSent == NULL) {
1323                                         partsSent = new MyVector<int32_t>();
1324                                         transactionPartsSent->put(transaction, partsSent);
1325                                 }
1326                                 partsSent->add(part->getPartNumber());
1327                                 transactionPartsSent->put(transaction, partsSent);
1328                         } else {
1329                                 break;
1330                         }
1331                 }
1332         }
1333
1334         // Fill the remainder of the slot with rescue data
1335         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1336
1337         return false;
1338 }
1339
1340 void Table::doRejectedMessages(Slot *s) {
1341         if (!rejectedSlotMyVector->isEmpty()) {
1342                 /* TODO: We should avoid generating a rejected message entry if
1343                  * there is already a sufficient entry in the queue (e->g->,
1344                  * equalsto value of true and same sequence number)->  */
1345
1346                 int64_t old_seqn = rejectedSlotMyVector->get(0);
1347                 if (rejectedSlotMyVector->size() > Table_REJECTED_THRESHOLD) {
1348                         int64_t new_seqn = rejectedSlotMyVector->lastElement();
1349                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1350                         s->addShallowEntry(rm);
1351                 } else {
1352                         int64_t prev_seqn = -1;
1353                         uint i = 0;
1354                         /* Go through list of missing messages */
1355                         for (; i < rejectedSlotMyVector->size(); i++) {
1356                                 int64_t curr_seqn = rejectedSlotMyVector->get(i);
1357                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1358                                 if (s_msg != NULL)
1359                                         break;
1360                                 prev_seqn = curr_seqn;
1361                         }
1362                         /* Generate rejected message entry for missing messages */
1363                         if (prev_seqn != -1) {
1364                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1365                                 s->addShallowEntry(rm);
1366                         }
1367                         /* Generate rejected message entries for present messages */
1368                         for (; i < rejectedSlotMyVector->size(); i++) {
1369                                 int64_t curr_seqn = rejectedSlotMyVector->get(i);
1370                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1371                                 int64_t machineid = s_msg->getMachineID();
1372                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1373                                 s->addShallowEntry(rm);
1374                         }
1375                 }
1376         }
1377 }
1378
1379 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1380         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1381         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1382         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1383                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1384         }
1385
1386         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1387         bool seenLiveSlot = false;
1388         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1389         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1390
1391
1392         // Mandatory Rescue
1393         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1394                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1395                 // Push slot number forward
1396                 if (!seenLiveSlot) {
1397                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1398                 }
1399
1400                 if (!previousSlot->isLive()) {
1401                         continue;
1402                 }
1403
1404                 // We have seen a live slot
1405                 seenLiveSlot = true;
1406
1407                 // Get all the live entries for a slot
1408                 MyVector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1409
1410                 // Iterate over all the live entries and try to rescue them
1411                 uint lESize = liveEntries->size();
1412                 for (uint i = 0; i < lESize; i++) {
1413                         Entry *liveEntry = liveEntries->get(i);
1414                         if (slot->hasSpace(liveEntry)) {
1415                                 // Enough space to rescue the entry
1416                                 slot->addEntry(liveEntry);
1417                         } else if (currentSequenceNumber == firstIfFull) {
1418                                 //if there's no space but the entry is about to fall off the queue
1419                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1420                         }
1421                 }
1422         }
1423
1424         // Did not resize
1425         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1426 }
1427
1428 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1429         /* now go through live entries from least to greatest sequence number until
1430          * either all live slots added, or the slot doesn't have enough room
1431          * for SKIP_THRESHOLD consecutive entries*/
1432         int skipcount = 0;
1433         int64_t newestseqnum = buffer->getNewestSeqNum();
1434         for (; seqn <= newestseqnum; seqn++) {
1435                 Slot *prevslot = buffer->getSlot(seqn);
1436                 //Push slot number forward
1437                 if (!seenliveslot)
1438                         oldestLiveSlotSequenceNumver = seqn;
1439
1440                 if (!prevslot->isLive())
1441                         continue;
1442                 seenliveslot = true;
1443                 MyVector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1444                 uint lESize = liveentries->size();
1445                 for (uint i = 0; i < lESize; i++) {
1446                         Entry *liveentry = liveentries->get(i);
1447                         if (s->hasSpace(liveentry))
1448                                 s->addEntry(liveentry);
1449                         else {
1450                                 skipcount++;
1451                                 if (skipcount > Table_SKIP_THRESHOLD) {
1452                                         delete liveentries;
1453                                         goto donesearch;
1454                                 }
1455                         }
1456                 }
1457                 delete liveentries;
1458         }
1459 donesearch:
1460         ;
1461 }
1462
1463 /**
1464  * Checks for malicious activity and updates the local copy of the block chain->
1465  */
1466 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1467         // The cloud communication layer has checked slot HMACs already
1468         // before decoding
1469         if (newSlots->length() == 0) {
1470                 return;
1471         }
1472
1473         // Make sure all slots are newer than the last largest slot this
1474         // client has seen
1475         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1476         if (firstSeqNum <= sequenceNumber) {
1477                 //              throw new Error("Server Error: Sent older slots!");
1478                 myerror("Server Error: Sent older slots!\n");
1479         }
1480
1481         // Create an object that can access both new slots and slots in our
1482         // local chain without committing slots to our local chain
1483         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1484
1485         // Check that the HMAC chain is not broken
1486         checkHMACChain(indexer, newSlots);
1487
1488         // Set to keep track of messages from clients
1489         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1490         {
1491                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1492                 while (lmit->hasNext())
1493                         machineSet->add(lmit->next());
1494                 delete lmit;
1495         }
1496
1497         // Process each slots data
1498         {
1499                 uint numSlots = newSlots->length();
1500                 for (uint i = 0; i < numSlots; i++) {
1501                         Slot *slot = newSlots->get(i);
1502                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1503                         updateExpectedSize();
1504                 }
1505         }
1506         delete indexer;
1507         
1508         // If there is a gap, check to see if the server sent us
1509         // everything->
1510         if (firstSeqNum != (sequenceNumber + 1)) {
1511
1512                 // Check the size of the slots that were sent down by the server->
1513                 // Can only check the size if there was a gap
1514                 checkNumSlots(newSlots->length());
1515
1516                 // Since there was a gap every machine must have pushed a slot or
1517                 // must have a last message message-> If not then the server is
1518                 // hiding slots
1519                 if (!machineSet->isEmpty()) {
1520                         delete machineSet;
1521                         //throw new Error("Missing record for machines: ");
1522                         myerror("Missing record for machines: \n");
1523                 }
1524         }
1525         delete machineSet;
1526         // Update the size of our local block chain->
1527         commitNewMaxSize();
1528
1529         // Commit new to slots to the local block chain->
1530         {
1531                 uint numSlots = newSlots->length();
1532                 for (uint i = 0; i < numSlots; i++) {
1533                         Slot *slot = newSlots->get(i);
1534
1535                         // Insert this slot into our local block chain copy->
1536                         buffer->putSlot(slot);
1537
1538                         // Keep track of how many slots are currently live (have live data
1539                         // in them)->
1540                         liveSlotCount++;
1541                 }
1542         }
1543         // Get the sequence number of the latest slot in the system
1544         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1545         updateLiveStateFromServer();
1546
1547         // No Need to remember after we pulled from the server
1548         offlineTransactionsCommittedAndAtServer->clear();
1549
1550         // This is invalidated now
1551         hadPartialSendToServer = false;
1552 }
1553
1554 void Table::updateLiveStateFromServer() {
1555         // Process the new transaction parts
1556         processNewTransactionParts();
1557
1558         // Do arbitration on new transactions that were received
1559         arbitrateFromServer();
1560
1561         // Update all the committed keys
1562         bool didCommitOrSpeculate = updateCommittedTable();
1563
1564         // Delete the transactions that are now dead
1565         updateLiveTransactionsAndStatus();
1566
1567         // Do speculations
1568         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1569         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1570 }
1571
1572 void Table::updateLiveStateFromLocal() {
1573         // Update all the committed keys
1574         bool didCommitOrSpeculate = updateCommittedTable();
1575
1576         // Delete the transactions that are now dead
1577         updateLiveTransactionsAndStatus();
1578
1579         // Do speculations
1580         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1581         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1582 }
1583
1584 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1585         int64_t prevslots = firstSequenceNumber;
1586
1587         if (didFindTableStatus) {
1588         } else {
1589                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1590         }
1591
1592         didFindTableStatus = true;
1593         currMaxSize = numberOfSlots;
1594 }
1595
1596 void Table::updateExpectedSize() {
1597         expectedsize++;
1598
1599         if (expectedsize > currMaxSize) {
1600                 expectedsize = currMaxSize;
1601         }
1602 }
1603
1604
1605 /**
1606  * Check the size of the block chain to make sure there are enough
1607  * slots sent back by the server-> This is only called when we have a
1608  * gap between the slots that we have locally and the slots sent by
1609  * the server therefore in the slots sent by the server there will be
1610  * at least 1 Table status message
1611  */
1612 void Table::checkNumSlots(int numberOfSlots) {
1613         if (numberOfSlots != expectedsize) {
1614                 //throw new Error("Server Error: Server did not send all slots->  Expected: ");
1615                 myerror("Server Error: Server did not send all slots->  Expected: \n");
1616         }
1617 }
1618
1619 /**
1620  * Update the size of of the local buffer if it is needed->
1621  */
1622 void Table::commitNewMaxSize() {
1623         didFindTableStatus = false;
1624
1625         // Resize the local slot buffer
1626         if (numberOfSlots != currMaxSize) {
1627                 buffer->resize((int32_t)currMaxSize);
1628         }
1629
1630         // Change the number of local slots to the new size
1631         numberOfSlots = (int32_t)currMaxSize;
1632
1633         // Recalculate the resize threshold since the size of the local
1634         // buffer has changed
1635         setResizeThreshold();
1636 }
1637
1638 /**
1639  * Process the new transaction parts from this latest round of slots
1640  * received from the server
1641  */
1642 void Table::processNewTransactionParts() {
1643
1644         if (newTransactionParts->size() == 0) {
1645                 // Nothing new to process
1646                 return;
1647         }
1648
1649         // Iterate through all the machine Ids that we received new parts
1650         // for
1651         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1652         while (tpit->hasNext()) {
1653                 int64_t machineId = tpit->next();
1654                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
1655
1656                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1657                 // Iterate through all the parts for that machine Id
1658                 while (ptit->hasNext()) {
1659                         Pair<int64_t, int32_t> *partId = ptit->next();
1660                         TransactionPart *part = parts->get(partId);
1661
1662                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1663                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1664                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1665                                         // Set dead the transaction part
1666                                         part->setDead();
1667                                         part->releaseRef();
1668                                         continue;
1669                                 }
1670                         }
1671
1672                         // Get the transaction object for that sequence number
1673                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1674
1675                         if (transaction == NULL) {
1676                                 // This is a new transaction that we dont have so make a new one
1677                                 transaction = new Transaction();
1678                                 
1679                                 // Add that part to the transaction
1680                                 transaction->addPartDecode(part);
1681
1682                                 // Insert this new transaction into the live tables
1683                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1684                                 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1685                         }
1686                         part->releaseRef();
1687                 }
1688                 delete ptit;
1689         }
1690         delete tpit;
1691         // Clear all the new transaction parts in preparation for the next
1692         // time the server sends slots
1693         {
1694                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1695                 while (partsit->hasNext()) {
1696                         int64_t machineId = partsit->next();
1697                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1698                         delete parts;
1699                 }
1700                 delete partsit;
1701                 newTransactionParts->clear();
1702         }
1703 }
1704
1705 void Table::arbitrateFromServer() {
1706         if (liveTransactionBySequenceNumberTable->size() == 0) {
1707                 // Nothing to arbitrate on so move on
1708                 return;
1709         }
1710
1711         // Get the transaction sequence numbers and sort from oldest to newest
1712         MyVector<int64_t> *transactionSequenceNumbers = new MyVector<int64_t>();
1713         {
1714                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1715                 while (trit->hasNext())
1716                         transactionSequenceNumbers->add(trit->next());
1717                 delete trit;
1718         }
1719         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1720
1721         // Collection of key value pairs that are
1722         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1723
1724         // The last transaction arbitrated on
1725         int64_t lastTransactionCommitted = -1;
1726         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1727         uint tsnSize = transactionSequenceNumbers->size();
1728         for (uint i = 0; i < tsnSize; i++) {
1729                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1730                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1731
1732                 // Check if this machine arbitrates for this transaction if not
1733                 // then we cant arbitrate this transaction
1734                 if (transaction->getArbitrator() != localMachineId) {
1735                         continue;
1736                 }
1737
1738                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1739                         continue;
1740                 }
1741
1742                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1743                         // We have seen this already locally so dont commit again
1744                         continue;
1745                 }
1746
1747                 if (!transaction->isComplete()) {
1748                         // Will arbitrate in incorrect order if we continue so just break
1749                         // Most likely this
1750                         break;
1751                 }
1752
1753                 // update the largest transaction seen by arbitrator from server
1754                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1755                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1756                 } else {
1757                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1758                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1759                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1760                         }
1761                 }
1762
1763                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1764                         // Guard evaluated as true
1765                         // Update the local changes so we can make the commit
1766                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1767                         while (kvit->hasNext()) {
1768                                 KeyValue *kv = kvit->next();
1769                                 speculativeTableTmp->put(kv->getKey(), kv);
1770                         }
1771                         delete kvit;
1772
1773                         // Update what the last transaction committed was for use in batch commit
1774                         lastTransactionCommitted = transactionSequenceNumber;
1775                 } else {
1776                         // Guard evaluated was false so create abort
1777                         // Create the abort
1778                         Abort *newAbort = new Abort(NULL,
1779                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1780                                                                                                                                         transaction->getSequenceNumber(),
1781                                                                                                                                         transaction->getMachineId(),
1782                                                                                                                                         transaction->getArbitrator(),
1783                                                                                                                                         localArbitrationSequenceNumber);
1784                         localArbitrationSequenceNumber++;
1785                         generatedAborts->add(newAbort);
1786
1787                         // Insert the abort so we can process
1788                         processEntry(newAbort);
1789                 }
1790
1791                 lastSeqNumArbOn = transactionSequenceNumber;
1792         }
1793
1794         delete transactionSequenceNumbers;
1795
1796         Commit *newCommit = NULL;
1797
1798         // If there is something to commit
1799         if (speculativeTableTmp->size() != 0) {
1800                 // Create the commit and increment the commit sequence number
1801                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1802                 localArbitrationSequenceNumber++;
1803
1804                 // Add all the new keys to the commit
1805                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1806                 while (spit->hasNext()) {
1807                         IoTString *string = spit->next();
1808                         KeyValue *kv = speculativeTableTmp->get(string);
1809                         newCommit->addKV(kv);
1810                 }
1811                 delete spit;
1812                 
1813                 // create the commit parts
1814                 newCommit->createCommitParts();
1815
1816                 // Append all the commit parts to the end of the pending queue
1817                 // waiting for sending to the server
1818                 // Insert the commit so we can process it
1819                 MyVector<CommitPart *> *parts = newCommit->getParts();
1820                 uint partsSize = parts->size();
1821                 for (uint i = 0; i < partsSize; i++) {
1822                         CommitPart *commitPart = parts->get(i);
1823                         processEntry(commitPart);
1824                 }
1825         }
1826         delete speculativeTableTmp;
1827
1828         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1829                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1830                 pendingSendArbitrationRounds->add(arbitrationRound);
1831
1832                 if (compactArbitrationData()) {
1833                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1834                         if (newArbitrationRound->getCommit() != NULL) {
1835                                 MyVector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1836                                 uint partsSize = parts->size();
1837                                 for (uint i = 0; i < partsSize; i++) {
1838                                         CommitPart *commitPart = parts->get(i);
1839                                         processEntry(commitPart);
1840                                 }
1841                         }
1842                 }
1843         } else {
1844                 delete generatedAborts;
1845         }
1846 }
1847
1848 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1849
1850         // Check if this machine arbitrates for this transaction if not then
1851         // we cant arbitrate this transaction
1852         if (transaction->getArbitrator() != localMachineId) {
1853                 return Pair<bool, bool>(false, false);
1854         }
1855
1856         if (!transaction->isComplete()) {
1857                 // Will arbitrate in incorrect order if we continue so just break
1858                 // Most likely this
1859                 return Pair<bool, bool>(false, false);
1860         }
1861
1862         if (transaction->getMachineId() != localMachineId) {
1863                 // dont do this check for local transactions
1864                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1865                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1866                                 // We've have already seen this from the server
1867                                 return Pair<bool, bool>(false, false);
1868                         }
1869                 }
1870         }
1871
1872         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1873                 // Guard evaluated as true Create the commit and increment the
1874                 // commit sequence number
1875                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1876                 localArbitrationSequenceNumber++;
1877
1878                 // Update the local changes so we can make the commit
1879                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1880                 while (kvit->hasNext()) {
1881                         KeyValue *kv = kvit->next();
1882                         newCommit->addKV(kv);
1883                 }
1884                 delete kvit;
1885
1886                 // create the commit parts
1887                 newCommit->createCommitParts();
1888
1889                 // Append all the commit parts to the end of the pending queue
1890                 // waiting for sending to the server
1891                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1892                 pendingSendArbitrationRounds->add(arbitrationRound);
1893
1894                 if (compactArbitrationData()) {
1895                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1896                         MyVector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1897                         uint partsSize = parts->size();
1898                         for (uint i = 0; i < partsSize; i++) {
1899                                 CommitPart *commitPart = parts->get(i);
1900                                 processEntry(commitPart);
1901                         }
1902                 } else {
1903                         // Insert the commit so we can process it
1904                         MyVector<CommitPart *> *parts = newCommit->getParts();
1905                         uint partsSize = parts->size();
1906                         for (uint i = 0; i < partsSize; i++) {
1907                                 CommitPart *commitPart = parts->get(i);
1908                                 processEntry(commitPart);
1909                         }
1910                 }
1911
1912                 if (transaction->getMachineId() == localMachineId) {
1913                         TransactionStatus *status = transaction->getTransactionStatus();
1914                         if (status != NULL) {
1915                                 status->setStatus(TransactionStatus_StatusCommitted);
1916                         }
1917                 }
1918
1919                 updateLiveStateFromLocal();
1920                 return Pair<bool, bool>(true, true);
1921         } else {
1922                 if (transaction->getMachineId() == localMachineId) {
1923                         // For locally created messages update the status
1924                         // Guard evaluated was false so create abort
1925                         TransactionStatus *status = transaction->getTransactionStatus();
1926                         if (status != NULL) {
1927                                 status->setStatus(TransactionStatus_StatusAborted);
1928                         }
1929                 } else {
1930                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1931
1932                         // Create the abort
1933                         Abort *newAbort = new Abort(NULL,
1934                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1935                                                                                                                                         -1,
1936                                                                                                                                         transaction->getMachineId(),
1937                                                                                                                                         transaction->getArbitrator(),
1938                                                                                                                                         localArbitrationSequenceNumber);
1939                         localArbitrationSequenceNumber++;
1940                         addAbortSet->add(newAbort);
1941
1942                         // Append all the commit parts to the end of the pending queue
1943                         // waiting for sending to the server
1944                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1945                         pendingSendArbitrationRounds->add(arbitrationRound);
1946
1947                         if (compactArbitrationData()) {
1948                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1949
1950                                 MyVector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1951                                 uint partsSize = parts->size();
1952                                 for (uint i = 0; i < partsSize; i++) {
1953                                         CommitPart *commitPart = parts->get(i);
1954                                         processEntry(commitPart);
1955                                 }
1956                         }
1957                 }
1958
1959                 updateLiveStateFromLocal();
1960                 return Pair<bool, bool>(true, false);
1961         }
1962 }
1963
1964 /**
1965  * Compacts the arbitration data by merging commits and aggregating
1966  * aborts so that a single large push of commits can be done instead
1967  * of many small updates
1968  */
1969 bool Table::compactArbitrationData() {
1970         if (pendingSendArbitrationRounds->size() < 2) {
1971                 // Nothing to compact so do nothing
1972                 return false;
1973         }
1974
1975         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1976         if (lastRound->getDidSendPart()) {
1977                 return false;
1978         }
1979
1980         bool hadCommit = (lastRound->getCommit() == NULL);
1981         bool gotNewCommit = false;
1982
1983         uint numberToDelete = 1;
1984         
1985         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1986                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1987
1988                 if (round->isFull() || round->getDidSendPart()) {
1989                         // Stop since there is a part that cannot be compacted and we
1990                         // need to compact in order
1991                         break;
1992                 }
1993
1994                 if (round->getCommit() == NULL) {
1995                         // Try compacting aborts only
1996                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1997                         if (newSize > ArbitrationRound_MAX_PARTS) {
1998                                 // Cant compact since it would be too large
1999                                 break;
2000                         }
2001                         lastRound->addAborts(round->getAborts());
2002                 } else {
2003                         // Create a new larger commit
2004                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2005                         localArbitrationSequenceNumber++;
2006
2007                         // Create the commit parts so that we can count them
2008                         newCommit->createCommitParts();
2009
2010                         // Calculate the new size of the parts
2011                         int newSize = newCommit->getNumberOfParts();
2012                         newSize += lastRound->getAbortsCount();
2013                         newSize += round->getAbortsCount();
2014
2015                         if (newSize > ArbitrationRound_MAX_PARTS) {
2016                                 // Can't compact since it would be too large
2017                                 if (lastRound->getCommit() != newCommit &&
2018                                                 round->getCommit() != newCommit)
2019                                         delete newCommit;
2020                                 break;
2021                         }
2022                         // Set the new compacted part
2023                         if (lastRound->getCommit() == newCommit)
2024                                 lastRound->setCommit(NULL);
2025                         if (round->getCommit() == newCommit)
2026                                 round->setCommit(NULL);
2027                         
2028                         if (lastRound->getCommit() != NULL) {
2029                                 Commit * oldcommit = lastRound->getCommit();
2030                                 lastRound->setCommit(NULL);
2031                                 delete oldcommit;
2032                         }
2033                         lastRound->setCommit(newCommit);
2034                         lastRound->addAborts(round->getAborts());
2035                         gotNewCommit = true;
2036                 }
2037
2038                 numberToDelete++;
2039         }
2040
2041         if (numberToDelete != 1) {
2042                 // If there is a compaction
2043                 // Delete the previous pieces that are now in the new compacted piece
2044                 for (uint i = 2; i <= numberToDelete; i++) {
2045                         delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2046                 }
2047                 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2048
2049                 pendingSendArbitrationRounds->add(lastRound);
2050
2051                 // Should reinsert into the commit processor
2052                 if (hadCommit && gotNewCommit) {
2053                         return true;
2054                 }
2055         }
2056
2057         return false;
2058 }
2059
2060 /**
2061  * Update all the commits and the committed tables, sets dead the dead
2062  * transactions
2063  */
2064 bool Table::updateCommittedTable() {
2065         if (newCommitParts->size() == 0) {
2066                 // Nothing new to process
2067                 return false;
2068         }
2069
2070         // Iterate through all the machine Ids that we received new parts for
2071         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2072         while (partsit->hasNext()) {
2073                 int64_t machineId = partsit->next();
2074                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2075
2076                 // Iterate through all the parts for that machine Id
2077                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2078                 while (pairit->hasNext()) {
2079                         Pair<int64_t, int32_t> *partId = pairit->next();
2080                         CommitPart *part = pairit->currVal();
2081
2082                         // Get the transaction object for that sequence number
2083                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2084
2085                         if (commitForClientTable == NULL) {
2086                                 // This is the first commit from this device
2087                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2088                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2089                         }
2090
2091                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2092
2093                         if (commit == NULL) {
2094                                 // This is a new commit that we dont have so make a new one
2095                                 commit = new Commit();
2096
2097                                 // Insert this new commit into the live tables
2098                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2099                         }
2100
2101                         // Add that part to the commit
2102                         commit->addPartDecode(part);
2103                         part->releaseRef();
2104                 }
2105                 delete pairit;
2106                 delete parts;
2107         }
2108         delete partsit;
2109
2110         // Clear all the new commits parts in preparation for the next time
2111         // the server sends slots
2112         newCommitParts->clear();
2113
2114         // If we process a new commit keep track of it for future use
2115         bool didProcessANewCommit = false;
2116
2117         // Process the commits one by one
2118         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2119         while (liveit->hasNext()) {
2120                 int64_t arbitratorId = liveit->next();
2121                 // Get all the commits for a specific arbitrator
2122                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2123
2124                 // Sort the commits in order
2125                 MyVector<int64_t> *commitSequenceNumbers = new MyVector<int64_t>();
2126                 {
2127                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2128                         while (clientit->hasNext())
2129                                 commitSequenceNumbers->add(clientit->next());
2130                         delete clientit;
2131                 }
2132
2133                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2134
2135                 // Get the last commit seen from this arbitrator
2136                 int64_t lastCommitSeenSequenceNumber = -1;
2137                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2138                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2139                 }
2140
2141                 // Go through each new commit one by one
2142                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2143                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2144                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2145                         // Special processing if a commit is not complete
2146                         if (!commit->isComplete()) {
2147                                 if (i == (commitSequenceNumbers->size() - 1)) {
2148                                         // If there is an incomplete commit and this commit is the
2149                                         // latest one seen then this commit cannot be processed and
2150                                         // there are no other commits
2151                                         break;
2152                                 } else {
2153                                         // This is a commit that was already dead but parts of it
2154                                         // are still in the block chain (not flushed out yet)->
2155                                         // Delete it and move on
2156                                         commit->setDead();
2157                                         commitForClientTable->remove(commit->getSequenceNumber());
2158                                         delete commit;
2159                                         continue;
2160                                 }
2161                         }
2162
2163                         // Update the last transaction that was updated if we can
2164                         if (commit->getTransactionSequenceNumber() != -1) {
2165                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2166                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2167                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2168                                 }
2169                         }
2170
2171                         // Update the last arbitration data that we have seen so far
2172                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2173                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2174                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2175                                         // Is larger
2176                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2177                                 }
2178                         } else {
2179                                 // Never seen any data from this arbitrator so record the first one
2180                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2181                         }
2182
2183                         // We have already seen this commit before so need to do the
2184                         // full processing on this commit
2185                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2186                                 // Update the last transaction that was updated if we can
2187                                 if (commit->getTransactionSequenceNumber() != -1) {
2188                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2189                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2190                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2191                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2192                                         }
2193                                 }
2194                                 continue;
2195                         }
2196
2197                         // If we got here then this is a brand new commit and needs full
2198                         // processing
2199                         // Get what commits should be edited, these are the commits that
2200                         // have live values for their keys
2201                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2202                         {
2203                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2204                                 while (kvit->hasNext()) {
2205                                         KeyValue *kv = kvit->next();
2206                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2207                                         if (commit != NULL)
2208                                                 commitsToEdit->add(commit);
2209                                 }
2210                                 delete kvit;
2211                         }
2212
2213                         // Update each previous commit that needs to be updated
2214                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2215                         while (commitit->hasNext()) {
2216                                 Commit *previousCommit = commitit->next();
2217
2218                                 // Only bother with live commits (TODO: Maybe remove this check)
2219                                 if (previousCommit->isLive()) {
2220
2221                                         // Update which keys in the old commits are still live
2222                                         {
2223                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2224                                                 while (kvit->hasNext()) {
2225                                                         KeyValue *kv = kvit->next();
2226                                                         previousCommit->invalidateKey(kv->getKey());
2227                                                 }
2228                                                 delete kvit;
2229                                         }
2230
2231                                         // if the commit is now dead then remove it
2232                                         if (!previousCommit->isLive()) {
2233                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2234                                                 delete previousCommit;
2235                                         }
2236                                 }
2237                         }
2238                         delete commitit;
2239                         delete commitsToEdit;
2240
2241                         // Update the last seen sequence number from this arbitrator
2242                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2243                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2244                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2245                                 }
2246                         } else {
2247                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2248                         }
2249
2250                         // We processed a new commit that we havent seen before
2251                         didProcessANewCommit = true;
2252
2253                         // Update the committed table of keys and which commit is using which key
2254                         {
2255                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2256                                 while (kvit->hasNext()) {
2257                                         KeyValue *kv = kvit->next();
2258                                         committedKeyValueTable->put(kv->getKey(), kv);
2259                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2260                                 }
2261                                 delete kvit;
2262                         }
2263                 }
2264                 delete commitSequenceNumbers;
2265         }
2266         delete liveit;
2267
2268         return didProcessANewCommit;
2269 }
2270
2271 /**
2272  * Create the speculative table from transactions that are still live
2273  * and have come from the cloud
2274  */
2275 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2276         if (liveTransactionBySequenceNumberTable->size() == 0) {
2277                 // There is nothing to speculate on
2278                 return false;
2279         }
2280
2281         // Create a list of the transaction sequence numbers and sort them
2282         // from oldest to newest
2283         MyVector<int64_t> *transactionSequenceNumbersSorted = new MyVector<int64_t>();
2284         {
2285                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2286                 while (trit->hasNext())
2287                         transactionSequenceNumbersSorted->add(trit->next());
2288                 delete trit;
2289         }
2290
2291         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2292
2293         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2294
2295
2296         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2297                 // If there is a gap in the transaction sequence numbers then
2298                 // there was a commit or an abort of a transaction OR there was a
2299                 // new commit (Could be from offline commit) so a redo the
2300                 // speculation from scratch
2301
2302                 // Start from scratch
2303                 speculatedKeyValueTable->clear();
2304                 lastTransactionSequenceNumberSpeculatedOn = -1;
2305                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2306         }
2307
2308         // Remember the front of the transaction list
2309         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2310
2311         // Find where to start arbitration from
2312         uint startIndex = 0;
2313
2314         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2315                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2316                         break;
2317         startIndex++;
2318
2319         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2320                 // Make sure we are not out of bounds
2321                 delete transactionSequenceNumbersSorted;
2322                 return false;           // did not speculate
2323         }
2324
2325         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2326         bool didSkip = true;
2327
2328         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2329                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2330                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2331
2332                 if (!transaction->isComplete()) {
2333                         // If there is an incomplete transaction then there is nothing
2334                         // we can do add this transactions arbitrator to the list of
2335                         // arbitrators we should ignore
2336                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2337                         didSkip = true;
2338                         continue;
2339                 }
2340
2341                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2342                         continue;
2343                 }
2344
2345                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2346
2347                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2348                         // Guard evaluated to true so update the speculative table
2349                         {
2350                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2351                                 while (kvit->hasNext()) {
2352                                         KeyValue *kv = kvit->next();
2353                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2354                                 }
2355                                 delete kvit;
2356                         }
2357                 }
2358         }
2359
2360         delete transactionSequenceNumbersSorted;
2361         
2362         if (didSkip) {
2363                 // Since there was a skip we need to redo the speculation next time around
2364                 lastTransactionSequenceNumberSpeculatedOn = -1;
2365                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2366         }
2367
2368         // We did some speculation
2369         return true;
2370 }
2371
2372 /**
2373  * Create the pending transaction speculative table from transactions
2374  * that are still in the pending transaction buffer
2375  */
2376 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2377         if (pendingTransactionQueue->size() == 0) {
2378                 // There is nothing to speculate on
2379                 return;
2380         }
2381
2382         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2383                 // need to reset on the pending speculation
2384                 lastPendingTransactionSpeculatedOn = NULL;
2385                 firstPendingTransaction = pendingTransactionQueue->get(0);
2386                 pendingTransactionSpeculatedKeyValueTable->clear();
2387         }
2388
2389         // Find where to start arbitration from
2390         uint startIndex = 0;
2391
2392         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2393                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2394                         break;
2395
2396         if (startIndex >= pendingTransactionQueue->size()) {
2397                 // Make sure we are not out of bounds
2398                 return;
2399         }
2400
2401         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2402                 Transaction *transaction = pendingTransactionQueue->get(i);
2403
2404                 lastPendingTransactionSpeculatedOn = transaction;
2405
2406                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2407                         // Guard evaluated to true so update the speculative table
2408                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2409                         while (kvit->hasNext()) {
2410                                 KeyValue *kv = kvit->next();
2411                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2412                         }
2413                         delete kvit;
2414                 }
2415         }
2416 }
2417
2418 /**
2419  * Set dead and remove from the live transaction tables the
2420  * transactions that are dead
2421  */
2422 void Table::updateLiveTransactionsAndStatus() {
2423         // Go through each of the transactions
2424         {
2425                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2426                 while (iter->hasNext()) {
2427                         int64_t key = iter->next();
2428                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2429
2430                         // Check if the transaction is dead
2431                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2432                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2433                                 // Set dead the transaction
2434                                 transaction->setDead();
2435
2436                                 // Remove the transaction from the live table
2437                                 iter->remove();
2438                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2439                                 delete transaction;
2440                         }
2441                 }
2442                 delete iter;
2443         }
2444
2445         // Go through each of the transactions
2446         {
2447                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2448                 while (iter->hasNext()) {
2449                         int64_t key = iter->next();
2450                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2451
2452                         // Check if the transaction is dead
2453                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2454                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2455                                 // Set committed
2456                                 status->setStatus(TransactionStatus_StatusCommitted);
2457
2458                                 // Remove
2459                                 iter->remove();
2460                         }
2461                 }
2462                 delete iter;
2463         }
2464 }
2465
2466 /**
2467  * Process this slot, entry by entry->  Also update the latest message sent by slot
2468  */
2469 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2470
2471         // Update the last message seen
2472         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2473
2474         // Process each entry in the slot
2475         MyVector<Entry *> *entries = slot->getEntries();
2476         uint eSize = entries->size();
2477         for (uint ei = 0; ei < eSize; ei++) {
2478                 Entry *entry = entries->get(ei);
2479                 switch (entry->getType()) {
2480                 case TypeCommitPart:
2481                         processEntry((CommitPart *)entry);
2482                         break;
2483                 case TypeAbort:
2484                         processEntry((Abort *)entry);
2485                         break;
2486                 case TypeTransactionPart:
2487                         processEntry((TransactionPart *)entry);
2488                         break;
2489                 case TypeNewKey:
2490                         processEntry((NewKey *)entry);
2491                         break;
2492                 case TypeLastMessage:
2493                         processEntry((LastMessage *)entry, machineSet);
2494                         break;
2495                 case TypeRejectedMessage:
2496                         processEntry((RejectedMessage *)entry, indexer);
2497                         break;
2498                 case TypeTableStatus:
2499                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2500                         break;
2501                 default:
2502                         //throw new Error("Unrecognized type: ");
2503                         myerror("Unrecognized type: \n");
2504                 }
2505         }
2506 }
2507
2508 /**
2509  * Update the last message that was sent for a machine Id
2510  */
2511 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2512         // Update what the last message received by a machine was
2513         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2514 }
2515
2516 /**
2517  * Add the new key to the arbitrators table and update the set of live
2518  * new keys (in case of a rescued new key message)
2519  */
2520 void Table::processEntry(NewKey *entry) {
2521         // Update the arbitrator table with the new key information
2522         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2523
2524         // Update what the latest live new key is
2525         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2526         if (oldNewKey != NULL) {
2527                 // Delete the old new key messages
2528                 oldNewKey->setDead();
2529         }
2530 }
2531
2532 /**
2533  * Process new table status entries and set dead the old ones as new
2534  * ones come in-> keeps track of the largest and smallest table status
2535  * seen in this current round of updating the local copy of the block
2536  * chain
2537  */
2538 void Table::processEntry(TableStatus *entry, int64_t seq) {
2539         int newNumSlots = entry->getMaxSlots();
2540         updateCurrMaxSize(newNumSlots);
2541         initExpectedSize(seq, newNumSlots);
2542
2543         if (liveTableStatus != NULL) {
2544                 // We have a larger table status so the old table status is no
2545                 // int64_ter alive
2546                 liveTableStatus->setDead();
2547         }
2548
2549         // Make this new table status the latest alive table status
2550         liveTableStatus = entry;
2551 }
2552
2553 /**
2554  * Check old messages to see if there is a block chain violation->
2555  * Also
2556  */
2557 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2558         int64_t oldSeqNum = entry->getOldSeqNum();
2559         int64_t newSeqNum = entry->getNewSeqNum();
2560         bool isequal = entry->getEqual();
2561         int64_t machineId = entry->getMachineID();
2562         int64_t seq = entry->getSequenceNumber();
2563
2564         // Check if we have messages that were supposed to be rejected in
2565         // our local block chain
2566         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2567                 // Get the slot
2568                 Slot *slot = indexer->getSlot(seqNum);
2569
2570                 if (slot != NULL) {
2571                         // If we have this slot make sure that it was not supposed to be
2572                         // a rejected slot
2573                         int64_t slotMachineId = slot->getMachineID();
2574                         if (isequal != (slotMachineId == machineId)) {
2575                                 //throw new Error("Server Error: Trying to insert rejected message for slot ");
2576                                 myerror("Server Error: Trying to insert rejected message for slot\n");
2577                         }
2578                 }
2579         }
2580
2581         // Create a list of clients to watch until they see this rejected
2582         // message entry->
2583         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2584         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2585         while (iter->hasNext()) {
2586                 // Machine ID for the last message entry
2587                 int64_t lastMessageEntryMachineId = iter->next();
2588
2589                 // We've seen it, don't need to continue to watch->  Our next
2590                 // message will implicitly acknowledge it->
2591                 if (lastMessageEntryMachineId == localMachineId) {
2592                         continue;
2593                 }
2594
2595                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2596                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2597
2598                 if (entrySequenceNumber < seq) {
2599                         // Add this rejected message to the set of messages that this
2600                         // machine ID did not see yet
2601                         addWatchMyVector(lastMessageEntryMachineId, entry);
2602                         // This client did not see this rejected message yet so add it
2603                         // to the watch set to monitor
2604                         deviceWatchSet->add(lastMessageEntryMachineId);
2605                 }
2606         }
2607         delete iter;
2608
2609         if (deviceWatchSet->isEmpty()) {
2610                 // This rejected message has been seen by all the clients so
2611                 entry->setDead();
2612                 delete deviceWatchSet;
2613         } else {
2614                 // We need to watch this rejected message
2615                 entry->setWatchSet(deviceWatchSet);
2616         }
2617 }
2618
2619 /**
2620  * Check if this abort is live, if not then save it so we can kill it
2621  * later-> update the last transaction number that was arbitrated on->
2622  */
2623 void Table::processEntry(Abort *entry) {
2624         if (entry->getTransactionSequenceNumber() != -1) {
2625                 // update the transaction status if it was sent to the server
2626                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2627                 if (status != NULL) {
2628                         status->setStatus(TransactionStatus_StatusAborted);
2629                 }
2630         }
2631
2632         // Abort has not been seen by the client it is for yet so we need to
2633         // keep track of it
2634
2635         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2636         if (previouslySeenAbort != NULL) {
2637                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2638         }
2639
2640         if (entry->getTransactionArbitrator() == localMachineId) {
2641                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2642         }
2643
2644         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2645                 // The machine already saw this so it is dead
2646                 entry->setDead();
2647                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2648                 liveAbortTable->remove(&abortid);
2649
2650                 if (entry->getTransactionArbitrator() == localMachineId) {
2651                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2652                 }
2653                 return;
2654         }
2655
2656         // Update the last arbitration data that we have seen so far
2657         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2658                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2659                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2660                         // Is larger
2661                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2662                 }
2663         } else {
2664                 // Never seen any data from this arbitrator so record the first one
2665                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2666         }
2667
2668         // Set dead a transaction if we can
2669         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2670
2671         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2672         if (transactionToSetDead != NULL) {
2673                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2674         }
2675
2676         // Update the last transaction sequence number that the arbitrator
2677         // arbitrated on
2678         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2679                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2680                 // Is a valid one
2681                 if (entry->getTransactionSequenceNumber() != -1) {
2682                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2683                 }
2684         }
2685 }
2686
2687 /**
2688  * Set dead the transaction part if that transaction is dead and keep
2689  * track of all new parts
2690  */
2691 void Table::processEntry(TransactionPart *entry) {
2692         // Check if we have already seen this transaction and set it dead OR
2693         // if it is not alive
2694         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2695                 // This transaction is dead, it was already committed or aborted
2696                 entry->setDead();
2697                 return;
2698         }
2699
2700         // This part is still alive
2701         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2702
2703         if (transactionPart == NULL) {
2704                 // Dont have a table for this machine Id yet so make one
2705                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2706                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2707         }
2708
2709         // Update the part and set dead ones we have already seen (got a
2710         // rescued version)
2711         entry->acquireRef();
2712         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2713         if (previouslySeenPart != NULL) {
2714                 previouslySeenPart->releaseRef();
2715                 previouslySeenPart->setDead();
2716         }
2717 }
2718
2719 /**
2720  * Process new commit entries and save them for future use->  Delete duplicates
2721  */
2722 void Table::processEntry(CommitPart *entry) {
2723         // Update the last transaction that was updated if we can
2724         if (entry->getTransactionSequenceNumber() != -1) {
2725                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2726                                 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2727                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2728                 }
2729         }
2730
2731         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2732         if (commitPart == NULL) {
2733                 // Don't have a table for this machine Id yet so make one
2734                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2735                 newCommitParts->put(entry->getMachineId(), commitPart);
2736         }
2737         // Update the part and set dead ones we have already seen (got a
2738         // rescued version)
2739         entry->acquireRef();
2740         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2741         if (previouslySeenPart != NULL) {
2742                 previouslySeenPart->setDead();
2743                 previouslySeenPart->releaseRef();
2744         }
2745 }
2746
2747 /**
2748  * Update the last message seen table-> Update and set dead the
2749  * appropriate RejectedMessages as clients see them-> Updates the live
2750  * aborts, removes those that are dead and sets them dead-> Check that
2751  * the last message seen is correct and that there is no mismatch of
2752  * our own last message or that other clients have not had a rollback
2753  * on the last message->
2754  */
2755 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2756         // We have seen this machine ID
2757         machineSet->remove(machineId);
2758
2759         // Get the set of rejected messages that this machine Id is has not seen yet
2760         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchMyVectorTable->get(machineId);
2761         // If there is a rejected message that this machine Id has not seen yet
2762         if (watchset != NULL) {
2763                 // Go through each rejected message that this machine Id has not
2764                 // seen yet
2765
2766                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2767                 while (rmit->hasNext()) {
2768                         RejectedMessage *rm = rmit->next();
2769                         // If this machine Id has seen this rejected message->->->
2770                         if (rm->getSequenceNumber() <= seqNum) {
2771                                 // Remove it from our watchlist
2772                                 rmit->remove();
2773                                 // Decrement machines that need to see this notification
2774                                 rm->removeWatcher(machineId);
2775                         }
2776                 }
2777                 delete rmit;
2778         }
2779
2780         // Set dead the abort
2781         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2782
2783         while (abortit->hasNext()) {
2784                 Pair<int64_t, int64_t> *key = abortit->next();
2785                 Abort *abort = liveAbortTable->get(key);
2786                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2787                         abort->setDead();
2788                         abortit->remove();
2789                         if (abort->getTransactionArbitrator() == localMachineId) {
2790                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2791                         }
2792                 }
2793         }
2794         delete abortit;
2795         if (machineId == localMachineId) {
2796                 // Our own messages are immediately dead->
2797                 char livenessType = liveness->getType();
2798                 if (livenessType == TypeLastMessage) {
2799                         ((LastMessage *)liveness)->setDead();
2800                 } else if (livenessType == TypeSlot) {
2801                         ((Slot *)liveness)->setDead();
2802                 } else {
2803                         //throw new Error("Unrecognized type");
2804                         myerror("Unrecognized type\n");
2805                 }
2806         }
2807         // Get the old last message for this device
2808         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2809         if (lastMessageEntry == NULL) {
2810                 // If no last message then there is nothing else to process
2811                 return;
2812         }
2813
2814         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2815         Liveness *lastEntry = lastMessageEntry->getSecond();
2816         delete lastMessageEntry;
2817
2818         // If it is not our machine Id since we already set ours to dead
2819         if (machineId != localMachineId) {
2820                 char lastEntryType = lastEntry->getType();
2821
2822                 if (lastEntryType == TypeLastMessage) {
2823                         ((LastMessage *)lastEntry)->setDead();
2824                 } else if (lastEntryType == TypeSlot) {
2825                         ((Slot *)lastEntry)->setDead();
2826                 } else {
2827                         //throw new Error("Unrecognized type");
2828                         myerror("Unrecognized type\n");
2829                 }
2830         }
2831         // Make sure the server is not playing any games
2832         if (machineId == localMachineId) {
2833                 if (hadPartialSendToServer) {
2834                         // We were not making any updates and we had a machine mismatch
2835                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2836                                 //throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2837                                 myerror("Server Error: Mismatch on local machine sequence number, needed at least: \n");
2838                         }
2839                 } else {
2840                         // We were not making any updates and we had a machine mismatch
2841                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2842                                 //throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2843                                 myerror("Server Error: Mismatch on local machine sequence number, needed: \n");
2844                         }
2845                 }
2846         } else {
2847                 if (lastMessageSeqNum > seqNum) {
2848                         //throw new Error("Server Error: Rollback on remote machine sequence number");
2849                         myerror("Server Error: Rollback on remote machine sequence number\n");
2850                 }
2851         }
2852 }
2853
2854 /**
2855  * Add a rejected message entry to the watch set to keep track of
2856  * which clients have seen that rejected message entry and which have
2857  * not.
2858  */
2859 void Table::addWatchMyVector(int64_t machineId, RejectedMessage *entry) {
2860         Hashset<RejectedMessage *> *entries = rejectedMessageWatchMyVectorTable->get(machineId);
2861         if (entries == NULL) {
2862                 // There is no set for this machine ID yet so create one
2863                 entries = new Hashset<RejectedMessage *>();
2864                 rejectedMessageWatchMyVectorTable->put(machineId, entries);
2865         }
2866         entries->add(entry);
2867 }
2868
2869 /**
2870  * Check if the HMAC chain is not violated
2871  */
2872 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2873         for (uint i = 0; i < newSlots->length(); i++) {
2874                 Slot *currSlot = newSlots->get(i);
2875                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2876                 if (prevSlot != NULL &&
2877                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2878                         //                      throw new Error("Server Error: Invalid HMAC Chain");
2879                         myerror("Server Error: Invalid HMAC Chain\n");
2880         }
2881 }