checkpoint
[iotcloud.git] / version2 / src / C / Table.cpp
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localSequenceNumber(0),
112         localTransactionSequenceNumber(0),
113         lastTransactionSequenceNumberSpeculatedOn(0),
114         oldestTransactionSequenceNumberSpeculatedOn(0),
115         localArbitrationSequenceNumber(0),
116         hadPartialSendToServer(false),
117         attemptedToSendToServer(false),
118         expectedsize(0),
119         didFindTableStatus(false),
120         currMaxSize(0),
121         lastSlotAttemptedToSend(NULL),
122         lastIsNewKey(false),
123         lastNewSize(0),
124         lastTransactionPartsSent(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 Table::~Table() {
160         delete cloud;
161         delete random;
162         delete buffer;
163         // init data structs
164         delete committedKeyValueTable;
165         delete speculatedKeyValueTable;
166         delete pendingTransactionSpeculatedKeyValueTable;
167         delete liveNewKeyTable;
168         {
169                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170                 while (lmit->hasNext()) {
171                         Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
172                         delete pair;
173                 }
174                 delete lmit;
175                 delete lastMessageTable;
176         }
177         if (pendingTransactionBuilder != NULL)
178                 delete pendingTransactionBuilder;
179         {
180                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
181                 while(rmit->hasNext()) {
182                         int64_t machineid = rmit->next();
183                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
184                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185                         while (mit->hasNext()) {
186                                 RejectedMessage * rm = mit->next();
187                                 delete rm;
188                         }
189                         delete mit;
190                         delete rmset;
191                 }
192                 delete rmit;
193                 delete rejectedMessageWatchVectorTable;
194         }
195         delete arbitratorTable;
196         delete liveAbortTable;
197         {
198                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199                 while (partsit->hasNext()) {
200                         int64_t machineId = partsit->next();
201                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202                         SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
203                         while(pit->hasNext()) {
204                                 Pair<int64_t, int32_t> * pair=pit->next();
205                                 pit->currVal()->releaseRef();
206                         }
207                         delete pit;
208                         
209                         delete parts;
210                 }
211                 delete partsit;
212                 delete newTransactionParts;
213         }
214         {
215                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
216                 while (partsit->hasNext()) {
217                         int64_t machineId = partsit->next();
218                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
219                         SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
220                         while(pit->hasNext()) {
221                                 Pair<int64_t, int32_t> * pair=pit->next();
222                                 pit->currVal()->releaseRef();
223                         }
224                         delete pit;
225                         delete parts;
226                 }
227                 delete partsit;
228                 delete newCommitParts;
229         }
230         delete lastArbitratedTransactionNumberByArbitratorTable;
231         delete liveTransactionBySequenceNumberTable;
232         delete liveTransactionByTransactionIdTable;
233         {
234                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
235                 while (liveit->hasNext()) {
236                         int64_t arbitratorId = liveit->next();
237                         
238                         // Get all the commits for a specific arbitrator
239                         Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
240                         {
241                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
242                                 while (clientit->hasNext()) {
243                                         int64_t id = clientit->next();
244                                         delete commitForClientTable->get(id);
245                                 }
246                                 delete clientit;
247                         }
248                         
249                         delete commitForClientTable;
250                 }
251                 delete liveit;
252                 delete liveCommitsTable;
253         }
254         delete liveCommitsByKeyTable;
255         delete lastCommitSeenSequenceNumberByArbitratorTable;
256         delete rejectedSlotVector;
257         {
258                 uint size = pendingTransactionQueue->size();
259                 for (uint iter = 0; iter < size; iter++) {
260                         delete pendingTransactionQueue->get(iter);
261                 }
262                 delete pendingTransactionQueue;
263         }
264         delete pendingSendArbitrationEntriesToDelete;
265         {
266                 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
267                 while (trit->hasNext()) {
268                         Transaction *transaction = trit->next();
269                         delete trit->currVal();
270                 }
271                 delete trit;
272                 delete transactionPartsSent;
273         }
274         delete outstandingTransactionStatus;
275         delete liveAbortsGeneratedByLocal;
276         delete offlineTransactionsCommittedAndAtServer;
277         delete localCommunicationTable;
278         delete lastTransactionSeenFromMachineFromServer;
279         {
280                 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
281                         delete pendingSendArbitrationRounds->get(i);
282                 }
283                 delete pendingSendArbitrationRounds;
284         }
285         if (lastTransactionPartsSent != NULL)
286                 delete lastTransactionPartsSent;
287         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
288         if (lastNewKey)
289                 delete lastNewKey;
290 }
291
292 /**
293  * Init all the stuff needed for for table usage
294  */
295 void Table::init() {
296         // Init helper objects
297         random = new SecureRandom();
298         buffer = new SlotBuffer();
299
300         // init data structs
301         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
302         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
303         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
304         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
305         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
306         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
307         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
308         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
309         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
310         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
311         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
312         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
313         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
314         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
315         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
316         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
317         rejectedSlotVector = new Vector<int64_t>();
318         pendingTransactionQueue = new Vector<Transaction *>();
319         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
320         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
321         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
322         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
323         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
324         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
325         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
326         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
327         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
328
329         // Other init stuff
330         numberOfSlots = buffer->capacity();
331         setResizeThreshold();
332 }
333
334 /**
335  * Initialize the table by inserting a table status as the first entry
336  * into the table status also initialize the crypto stuff.
337  */
338 void Table::initTable() {
339         cloud->initSecurity();
340
341         // Create the first insertion into the block chain which is the table status
342         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
343         localSequenceNumber++;
344         TableStatus *status = new TableStatus(s, numberOfSlots);
345         s->addShallowEntry(status);
346         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
347
348         if (array == NULL) {
349                 array = new Array<Slot *>(1);
350                 array->set(0, s);
351                 // update local block chain
352                 validateAndUpdate(array, true);
353                 delete array;
354         } else if (array->length() == 1) {
355                 // in case we did push the slot BUT we failed to init it
356                 validateAndUpdate(array, true);
357                 delete s;
358                 delete array;
359         } else {
360                 delete s;
361                 delete array;
362                 //throw new Error("Error on initialization");
363                 myerror("Error on initialization\n");
364         }
365 }
366
367 /**
368  * Rebuild the table from scratch by pulling the latest block chain
369  * from the server.
370  */
371 void Table::rebuild() {
372         // Just pull the latest slots from the server
373         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
374         validateAndUpdate(newslots, true);
375         delete newslots;
376         sendToServer(NULL);
377         updateLiveTransactionsAndStatus();
378 }
379
380 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
381         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
382 }
383
384 int64_t Table::getArbitrator(IoTString *key) {
385         return arbitratorTable->get(key);
386 }
387
388 void Table::close() {
389         cloud->closeCloud();
390 }
391
392 IoTString *Table::getCommitted(IoTString *key)  {
393         KeyValue *kv = committedKeyValueTable->get(key);
394
395         if (kv != NULL) {
396                 return new IoTString(kv->getValue());
397         } else {
398                 return NULL;
399         }
400 }
401
402 IoTString *Table::getSpeculative(IoTString *key) {
403         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
404
405         if (kv == NULL) {
406                 kv = speculatedKeyValueTable->get(key);
407         }
408
409         if (kv == NULL) {
410                 kv = committedKeyValueTable->get(key);
411         }
412
413         if (kv != NULL) {
414                 return new IoTString(kv->getValue());
415         } else {
416                 return NULL;
417         }
418 }
419
420 IoTString *Table::getCommittedAtomic(IoTString *key) {
421         KeyValue *kv = committedKeyValueTable->get(key);
422
423         if (!arbitratorTable->contains(key)) {
424                 //              throw new Error("Key not Found.");
425                 myerror("Key not found!\n");
426         }
427
428         // Make sure new key value pair matches the current arbitrator
429         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
430                 // TODO: Maybe not throw en error
431                 //throw new Error("Not all Key Values Match Arbitrator.");
432                 myerror("Not all key values match arbitrator\n");
433         }
434
435         if (kv != NULL) {
436                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
437                 return new IoTString(kv->getValue());
438         } else {
439                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
440                 return NULL;
441         }
442 }
443
444 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
445         if (!arbitratorTable->contains(key)) {
446                 //throw new Error("Key not Found.");
447                 myerror("Key not found\n");
448         }
449
450         // Make sure new key value pair matches the current arbitrator
451         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
452                 // TODO: Maybe not throw en error
453                 //throw new Error("Not all Key Values Match Arbitrator.");
454                 myerror("Not all Key Values Match Arbitrator.\n");
455         }
456
457         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
458
459         if (kv == NULL) {
460                 kv = speculatedKeyValueTable->get(key);
461         }
462
463         if (kv == NULL) {
464                 kv = committedKeyValueTable->get(key);
465         }
466
467         if (kv != NULL) {
468                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
469                 return new IoTString(kv->getValue());
470         } else {
471                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
472                 return NULL;
473         }
474 }
475
476 bool Table::update()  {
477         //try {
478                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
479                 validateAndUpdate(newSlots, false);
480                 delete newSlots;
481                 sendToServer(NULL);
482                 updateLiveTransactionsAndStatus();
483                 return true;
484                 /*      } catch (Exception *e) {
485                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
486                 while (kit->hasNext()) {
487                         int64_t m = kit->next();
488                         updateFromLocal(m);
489                 }
490                 delete kit;
491                 }*/
492
493         return false;
494 }
495
496 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
497         while (true) {
498                 if (arbitratorTable->contains(keyName)) {
499                         // There is already an arbitrator
500                         return false;
501                 }
502                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
503
504                 if (sendToServer(newKey)) {
505                         // If successfully inserted
506                         return true;
507                 }
508         }
509 }
510
511 void Table::startTransaction() {
512         // Create a new transaction, invalidates any old pending transactions.
513         if (pendingTransactionBuilder != NULL)
514                 delete pendingTransactionBuilder;
515         pendingTransactionBuilder = new PendingTransaction(localMachineId);
516 }
517
518 void Table::put(IoTString *key, IoTString *value) {
519         // Make sure it is a valid key
520         if (!arbitratorTable->contains(key)) {
521                 //throw new Error("Key not Found.");
522                 myerror("Key not Found.\n");
523         }
524
525         // Make sure new key value pair matches the current arbitrator
526         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
527                 // TODO: Maybe not throw en error
528                 //throw new Error("Not all Key Values Match Arbitrator.");
529                 myerror("Not all Key Values Match Arbitrator.\n");
530         }
531
532         // Add the key value to this transaction
533         KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
534         pendingTransactionBuilder->addKV(kv);
535 }
536
537 TransactionStatus *Table::commitTransaction() {
538         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
539                 // transaction with no updates will have no effect on the system
540                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
541         }
542
543         // Set the local transaction sequence number and increment
544         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
545         localTransactionSequenceNumber++;
546
547         // Create the transaction status
548         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
549
550         // Create the new transaction
551         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
552         newTransaction->setTransactionStatus(transactionStatus);
553
554         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
555                 // Add it to the queue and invalidate the builder for safety
556                 pendingTransactionQueue->add(newTransaction);
557         } else {
558                 arbitrateOnLocalTransaction(newTransaction);
559                 delete newTransaction;
560                 updateLiveStateFromLocal();
561         }
562         if (pendingTransactionBuilder != NULL)
563                 delete pendingTransactionBuilder;
564         
565         pendingTransactionBuilder = new PendingTransaction(localMachineId);
566
567         //      try {
568                 sendToServer(NULL);
569                 /*      } catch (ServerException *e) {
570
571                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
572                 uint size = pendingTransactionQueue->size();
573                 uint oldindex = 0;
574                 for (uint iter = 0; iter < size; iter++) {
575                         Transaction *transaction = pendingTransactionQueue->get(iter);
576                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
577
578                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
579                                 // Already contacted this client so ignore all attempts to contact this client
580                                 // to preserve ordering for arbitrator
581                                 continue;
582                         }
583
584                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
585
586                         if (sendReturn.getFirst()) {
587                                 // Failed to contact over local
588                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
589                         } else {
590                                 // Successful contact or should not contact
591
592                                 if (sendReturn.getSecond()) {
593                                         // did arbitrate
594                                         delete transaction;
595                                         oldindex--;
596                                 }
597                         }
598                 }
599                 pendingTransactionQueue->setSize(oldindex);
600                 }*/
601
602         updateLiveStateFromLocal();
603
604         return transactionStatus;
605 }
606
607 /**
608  * Recalculate the new resize threshold
609  */
610 void Table::setResizeThreshold() {
611         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
612         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
613 }
614
615 int64_t Table::getLocalSequenceNumber() {
616         return localSequenceNumber;
617 }
618
619 void Table::processTransactionList(bool handlePartial) {
620         SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *)getKeyIterator(lastTransactionPartsSent);
621         while (trit->hasNext()) {
622                 Transaction *transaction = trit->next();
623                 transaction->resetServerFailure();
624                 // Update which transactions parts still need to be sent
625                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
626                 // Add the transaction status to the outstanding list
627                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
628                 
629                 // Update the transaction status
630                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
631                 
632                 // Check if all the transaction parts were successfully
633                 // sent and if so then remove it from pending
634                 if (transaction->didSendAllParts()) {
635                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
636                         pendingTransactionQueue->remove(transaction);
637                         delete transaction;
638                 } else if (handlePartial) {
639                         transaction->resetServerFailure();
640                         // Set the transaction sequence number back to nothing
641                         if (!transaction->didSendAPartToServer()) {
642                                 transaction->setSequenceNumber(-1);
643                         }
644                 }
645         }
646         delete trit;
647 }
648
649 NewKey * Table::handlePartialSend(NewKey * newKey) {
650         //Didn't receive acknowledgement for last send
651         //See if the server has received a newer slot
652         
653         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
654         if (newSlots->length() == 0) {
655                 //Retry sending old slot
656                 bool wasInserted = false;
657                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
658                 
659                 if (sendSlotsReturn) {
660                         lastSlotAttemptedToSend = NULL;
661                         if (newKey != NULL) {
662                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
663                                         delete newKey;
664                                         newKey = NULL;
665                                 }
666                         }
667                         processTransactionList(false);
668                 } else {
669                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
670                                 if (newKey != NULL) {
671                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
672                                                 delete newKey;
673                                                 newKey = NULL;
674                                         }
675                                 }
676                                 processTransactionList(true);
677                         }
678                 }
679                 
680                 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *)getKeyIterator(lastTransactionPartsSent);
681                 while (trit->hasNext()) {
682                         Transaction *transaction = trit->next();
683                         transaction->resetServerFailure();
684                         // Set the transaction sequence number back to nothing
685                         if (!transaction->didSendAPartToServer()) {
686                                 transaction->setSequenceNumber(-1);
687                         }
688                 }
689                 delete trit;
690                 
691                 if (newSlots->length() != 0) {
692                         // insert into the local block chain
693                         validateAndUpdate(newSlots, true);
694                 }
695         } else {
696                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
697                         if (newKey != NULL) {
698                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
699                                         delete newKey;
700                                         newKey = NULL;
701                                 }
702                         }
703
704                         processTransactionList(true);
705                 } else {
706                         SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(lastTransactionPartsSent);
707                         while (trit->hasNext()) {
708                                 Transaction *transaction = trit->next();
709                                 transaction->resetServerFailure();
710                                 // Set the transaction sequence number back to nothing
711                                 if (!transaction->didSendAPartToServer()) {
712                                         transaction->setSequenceNumber(-1);
713                                 }
714                         }
715                         delete trit;
716                 }
717                 
718                 // insert into the local block chain
719                 validateAndUpdate(newSlots, true);
720         }
721         delete newSlots;
722         return newKey;
723 }
724
725 void Table::clearSentParts() {
726         // Clear the sent data since we are trying again
727         pendingSendArbitrationEntriesToDelete->clear();
728         SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
729         while (trit->hasNext()) {
730                 Transaction *transaction = trit->next();
731                 delete trit->currVal();
732         }
733         delete trit;
734         transactionPartsSent->clear();
735 }
736
737 bool Table::sendToServer(NewKey *newKey) {
738         if (hadPartialSendToServer) {
739                 newKey = handlePartialSend(newKey);
740         }
741
742         //      try {
743                 // While we have stuff that needs inserting into the block chain
744                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
745                         if (hadPartialSendToServer) {
746                                 //                              throw new Error("Should Be error free");
747                                 myerror("Should Be error free\n");
748                         }
749                         
750                         // If there is a new key with same name then end
751                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
752                                 delete newKey;
753                                 return false;
754                         }
755
756                         // Create the slot
757                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
758                         localSequenceNumber++;
759
760                         // Try to fill the slot with data
761                         int newSize = 0;
762                         bool insertedNewKey = false;
763                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
764
765                         if (needsResize) {
766                                 // Reset which transaction to send
767                                 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
768                                 while (trit->hasNext()) {
769                                         Transaction *transaction = trit->next();
770                                         transaction->resetNextPartToSend();
771
772                                         // Set the transaction sequence number back to nothing
773                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
774                                                 transaction->setSequenceNumber(-1);
775                                         }
776                                 }
777                                 delete trit;
778
779                                 // Clear the sent data since we are trying again
780                                 clearSentParts();
781                                         
782                                 // We needed a resize so try again
783                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
784                         }
785                         if (lastSlotAttemptedToSend != NULL)
786                                 delete lastSlotAttemptedToSend;
787                         
788                         lastSlotAttemptedToSend = slot;
789                         lastIsNewKey = (newKey != NULL);
790                         lastInsertedNewKey = insertedNewKey;
791                         lastNewSize = newSize;
792                         if (( newKey != lastNewKey) && (lastNewKey != NULL))
793                                 delete lastNewKey;
794                         lastNewKey = newKey;
795                         if (lastTransactionPartsSent != NULL)
796                                 delete lastTransactionPartsSent;
797                         lastTransactionPartsSent = transactionPartsSent->clone();
798
799                         Array<Slot *> * newSlots = NULL;
800                         bool wasInserted = false;
801                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
802
803                         if (sendSlotsReturn) {
804                                 lastSlotAttemptedToSend = NULL;
805                                 // Did insert into the block chain
806                                 if (insertedNewKey) {
807                                         // This slot was what was inserted not a previous slot
808                                         // New Key was successfully inserted into the block chain so dont want to insert it again
809                                         newKey = NULL;
810                                 }
811
812                                 // Remove the aborts and commit parts that were sent from the pending to send queue
813                                 uint size = pendingSendArbitrationRounds->size();
814                                 uint oldcount = 0;
815                                 for (uint i = 0; i < size; i++) {
816                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
817                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
818
819                                         if (!round->isDoneSending()) {
820                                                 //Add part back in
821                                                 pendingSendArbitrationRounds->set(oldcount++,
822                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
823                                         } else
824                                                 delete pendingSendArbitrationRounds->get(i);
825                                 }
826                                 pendingSendArbitrationRounds->setSize(oldcount);
827                                 processTransactionList(false);
828                         } else {
829                                 // Reset which transaction to send
830                                 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
831                                 while (trit->hasNext()) {
832                                         Transaction *transaction = trit->next();
833                                         transaction->resetNextPartToSend();
834
835                                         // Set the transaction sequence number back to nothing
836                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
837                                                 transaction->setSequenceNumber(-1);
838                                         }
839                                 }
840                                 delete trit;
841                         }
842
843                         // Clear the sent data in preparation for next send
844                         clearSentParts();
845
846                         if (newSlots->length() != 0) {
847                                 // insert into the local block chain
848                                 validateAndUpdate(newSlots, true);
849                         }
850                         delete newSlots;
851                 }
852                 /*      } catch (ServerException *e) {
853                 if (e->getType() != ServerException_TypeInputTimeout) {
854                         // Nothing was able to be sent to the server so just clear these data structures
855                         SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
856                         while (trit->hasNext()) {
857                                 Transaction *transaction = trit->next();
858                                 transaction->resetNextPartToSend();
859
860                                 // Set the transaction sequence number back to nothing
861                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
862                                         transaction->setSequenceNumber(-1);
863                                 }
864                         }
865                         delete trit;
866                 } else {
867                         // There was a partial send to the server
868                         hadPartialSendToServer = true;
869
870                         // Nothing was able to be sent to the server so just clear these data structures
871                         SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
872                         while (trit->hasNext()) {
873                                 Transaction *transaction = trit->next();
874                                 transaction->resetNextPartToSend();
875                                 transaction->setServerFailure();
876                         }
877                         delete trit;
878                 }
879
880                 clearSentParts();
881
882                 throw e;
883                 }*/
884
885         return newKey == NULL;
886 }
887
888 bool Table::updateFromLocal(int64_t machineId) {
889         if (!localCommunicationTable->contains(machineId))
890                 return false;
891
892         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
893
894         // Get the size of the send data
895         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
896
897         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
898         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
899                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
900         }
901
902         Array<char> *sendData = new Array<char>(sendDataSize);
903         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
904
905         // Encode the data
906         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
907         bbEncode->putInt(0);
908
909         // Send by local
910         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
911         localSequenceNumber++;
912
913         if (returnData == NULL) {
914                 // Could not contact server
915                 return false;
916         }
917
918         // Decode the data
919         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
920         int numberOfEntries = bbDecode->getInt();
921
922         for (int i = 0; i < numberOfEntries; i++) {
923                 char type = bbDecode->get();
924                 if (type == TypeAbort) {
925                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
926                         processEntry(abort);
927                 } else if (type == TypeCommitPart) {
928                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
929                         processEntry(commitPart);
930                 }
931         }
932
933         updateLiveStateFromLocal();
934
935         return true;
936 }
937
938 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
939
940         // Get the devices local communications
941         if (!localCommunicationTable->contains(transaction->getArbitrator()))
942                 return Pair<bool, bool>(true, false);
943
944         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
945
946         // Get the size of the send data
947         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
948         {
949                 Vector<TransactionPart *> *tParts = transaction->getParts();
950                 uint tPartsSize = tParts->size();
951                 for (uint i = 0; i < tPartsSize; i++) {
952                         TransactionPart *part = tParts->get(i);
953                         sendDataSize += part->getSize();
954                 }
955         }
956
957         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
958         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
959                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
960         }
961
962         // Make the send data size
963         Array<char> *sendData = new Array<char>(sendDataSize);
964         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
965
966         // Encode the data
967         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
968         bbEncode->putInt(transaction->getParts()->size());
969         {
970                 Vector<TransactionPart *> *tParts = transaction->getParts();
971                 uint tPartsSize = tParts->size();
972                 for (uint i = 0; i < tPartsSize; i++) {
973                         TransactionPart *part = tParts->get(i);
974                         part->encode(bbEncode);
975                 }
976         }
977
978         // Send by local
979         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
980         localSequenceNumber++;
981
982         if (returnData == NULL) {
983                 // Could not contact server
984                 return Pair<bool, bool>(true, false);
985         }
986
987         // Decode the data
988         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
989         bool didCommit = bbDecode->get() == 1;
990         bool couldArbitrate = bbDecode->get() == 1;
991         int numberOfEntries = bbDecode->getInt();
992         bool foundAbort = false;
993
994         for (int i = 0; i < numberOfEntries; i++) {
995                 char type = bbDecode->get();
996                 if (type == TypeAbort) {
997                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
998
999                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
1000                                 foundAbort = true;
1001                         }
1002
1003                         processEntry(abort);
1004                 } else if (type == TypeCommitPart) {
1005                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1006                         processEntry(commitPart);
1007                 }
1008         }
1009
1010         updateLiveStateFromLocal();
1011
1012         if (couldArbitrate) {
1013                 TransactionStatus *status =  transaction->getTransactionStatus();
1014                 if (didCommit) {
1015                         status->setStatus(TransactionStatus_StatusCommitted);
1016                 } else {
1017                         status->setStatus(TransactionStatus_StatusAborted);
1018                 }
1019         } else {
1020                 TransactionStatus *status =  transaction->getTransactionStatus();
1021                 if (foundAbort) {
1022                         status->setStatus(TransactionStatus_StatusAborted);
1023                 } else {
1024                         status->setStatus(TransactionStatus_StatusCommitted);
1025                 }
1026         }
1027
1028         return Pair<bool, bool>(false, true);
1029 }
1030
1031 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1032         // Decode the data
1033         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1034         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1035         int numberOfParts = bbDecode->getInt();
1036
1037         // If we did commit a transaction or not
1038         bool didCommit = false;
1039         bool couldArbitrate = false;
1040
1041         if (numberOfParts != 0) {
1042
1043                 // decode the transaction
1044                 Transaction *transaction = new Transaction();
1045                 for (int i = 0; i < numberOfParts; i++) {
1046                         bbDecode->get();
1047                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1048                         transaction->addPartDecode(newPart);
1049                 }
1050
1051                 // Arbitrate on transaction and pull relevant return data
1052                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1053                 couldArbitrate = localArbitrateReturn.getFirst();
1054                 didCommit = localArbitrateReturn.getSecond();
1055
1056                 updateLiveStateFromLocal();
1057
1058                 // Transaction was sent to the server so keep track of it to prevent double commit
1059                 if (transaction->getSequenceNumber() != -1) {
1060                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1061                 }
1062         }
1063
1064         // The data to send back
1065         int returnDataSize = 0;
1066         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1067
1068         // Get the aborts to send back
1069         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1070         {
1071                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1072                 while (abortit->hasNext())
1073                         abortLocalSequenceNumbers->add(abortit->next());
1074                 delete abortit;
1075         }
1076
1077         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1078
1079         uint asize = abortLocalSequenceNumbers->size();
1080         for (uint i = 0; i < asize; i++) {
1081                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1082                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1083                         continue;
1084                 }
1085
1086                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1087                 unseenArbitrations->add(abort);
1088                 returnDataSize += abort->getSize();
1089         }
1090
1091         // Get the commits to send back
1092         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1093         if (commitForClientTable != NULL) {
1094                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1095                 {
1096                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1097                         while (commitit->hasNext())
1098                                 commitLocalSequenceNumbers->add(commitit->next());
1099                         delete commitit;
1100                 }
1101                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1102
1103                 uint clsSize = commitLocalSequenceNumbers->size();
1104                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1105                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1106                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1107
1108                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1109                                 continue;
1110                         }
1111
1112                         {
1113                                 Vector<CommitPart *> *parts = commit->getParts();
1114                                 uint nParts = parts->size();
1115                                 for (uint i = 0; i < nParts; i++) {
1116                                         CommitPart *commitPart = parts->get(i);
1117                                         unseenArbitrations->add(commitPart);
1118                                         returnDataSize += commitPart->getSize();
1119                                 }
1120                         }
1121                 }
1122         }
1123
1124         // Number of arbitration entries to decode
1125         returnDataSize += 2 * sizeof(int32_t);
1126
1127         // bool of did commit or not
1128         if (numberOfParts != 0) {
1129                 returnDataSize += sizeof(char);
1130         }
1131
1132         // Data to send Back
1133         Array<char> *returnData = new Array<char>(returnDataSize);
1134         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1135
1136         if (numberOfParts != 0) {
1137                 if (didCommit) {
1138                         bbEncode->put((char)1);
1139                 } else {
1140                         bbEncode->put((char)0);
1141                 }
1142                 if (couldArbitrate) {
1143                         bbEncode->put((char)1);
1144                 } else {
1145                         bbEncode->put((char)0);
1146                 }
1147         }
1148
1149         bbEncode->putInt(unseenArbitrations->size());
1150         uint size = unseenArbitrations->size();
1151         for (uint i = 0; i < size; i++) {
1152                 Entry *entry = unseenArbitrations->get(i);
1153                 entry->encode(bbEncode);
1154         }
1155
1156         localSequenceNumber++;
1157         return returnData;
1158 }
1159
1160 /** Checks whether a given slot was sent using new slots in
1161                 array. Returns true if sent and false otherwise.  */
1162
1163 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1164         uint size = array->length();
1165         for (uint i = 0; i < size; i++) {
1166                 Slot *s = array->get(i);
1167                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1168                         return true;
1169                 }
1170         }
1171         
1172         //Also need to see if other machines acknowledged our message
1173         for (uint i = 0; i < size; i++) {
1174                 Slot *s = array->get(i);
1175                 
1176                 // Process each entry in the slot
1177                 Vector<Entry *> *entries = s->getEntries();
1178                 uint eSize = entries->size();
1179                 for (uint ei = 0; ei < eSize; ei++) {
1180                         Entry *entry = entries->get(ei);
1181                         
1182                         if (entry->getType() == TypeLastMessage) {
1183                                 LastMessage *lastMessage = (LastMessage *)entry;
1184                                 
1185                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1186                                         return true;
1187                                 }
1188                         }
1189                 }
1190         }
1191         //Not found
1192         return false;
1193 }
1194
1195 /** Method tries to send slot to server.  Returns status in tuple.
1196                 isInserted returns whether last un-acked send (if any) was
1197                 successful.  Returns whether send was confirmed.x
1198  */
1199
1200 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1201         attemptedToSendToServer = true;
1202
1203         *array = cloud->putSlot(slot, newSize);
1204         if (*array == NULL) {
1205                 *array = new Array<Slot *>(1);
1206                 (*array)->set(0, slot);
1207                 rejectedSlotVector->clear();
1208                 *isInserted = false;
1209                 return true;
1210         } else {
1211                 if ((*array)->length() == 0) {
1212                         //                      throw new Error("Server Error: Did not send any slots");
1213                         myerror("Server Error: Did not send any slots\n");
1214                 }
1215
1216                 if (hadPartialSendToServer) {
1217                         *isInserted = checkSend(*array, slot);
1218
1219                         if (!(*isInserted)) {
1220                                 rejectedSlotVector->add(slot->getSequenceNumber());
1221                         }
1222                         
1223                         return false;
1224                 } else {
1225                         rejectedSlotVector->add(slot->getSequenceNumber());
1226                         *isInserted = false;
1227                         return false;
1228                 }
1229         }
1230 }
1231
1232 /**
1233  * Returns true if a resize was needed but not done.
1234  */
1235 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1236         newSize = 0;//special value to indicate no resize
1237         if (liveSlotCount > bufferResizeThreshold) {
1238                 resize = true;//Resize is forced
1239         }
1240
1241         if (resize) {
1242                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1243                 TableStatus *status = new TableStatus(slot, newSize);
1244                 slot->addShallowEntry(status);
1245         }
1246
1247         // Fill with rejected slots first before doing anything else
1248         doRejectedMessages(slot);
1249
1250         // Do mandatory rescue of entries
1251         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1252
1253         // Extract working variables
1254         bool needsResize = mandatoryRescueReturn.getFirst();
1255         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1256         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1257
1258         if (needsResize && !resize) {
1259                 // We need to resize but we are not resizing so return true to force on retry
1260                 return true;
1261         }
1262
1263         insertedKey = false;
1264         if (newKeyEntry != NULL) {
1265                 newKeyEntry->setSlot(slot);
1266                 if (slot->hasSpace(newKeyEntry)) {
1267                         slot->addEntry(newKeyEntry);
1268                         insertedKey = true;
1269                 }
1270         }
1271
1272         // Clear the transactions, aborts and commits that were sent previously
1273         clearSentParts();
1274         uint size = pendingSendArbitrationRounds->size();
1275         for (uint i = 0; i < size; i++) {
1276                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1277                 bool isFull = false;
1278                 round->generateParts();
1279                 Vector<Entry *> *parts = round->getParts();
1280
1281                 // Insert pending arbitration data
1282                 uint vsize = parts->size();
1283                 for (uint vi = 0; vi < vsize; vi++) {
1284                         Entry *arbitrationData = parts->get(vi);
1285
1286                         // If it is an abort then we need to set some information
1287                         if (arbitrationData->getType() == TypeAbort) {
1288                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1289                         }
1290
1291                         if (!slot->hasSpace(arbitrationData)) {
1292                                 // No space so cant do anything else with these data entries
1293                                 isFull = true;
1294                                 break;
1295                         }
1296
1297                         // Add to this current slot and add it to entries to delete
1298                         slot->addEntry(arbitrationData);
1299                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1300                 }
1301
1302                 if (isFull) {
1303                         break;
1304                 }
1305         }
1306
1307         if (pendingTransactionQueue->size() > 0) {
1308                 Transaction *transaction = pendingTransactionQueue->get(0);
1309                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1310                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1311                         transaction->setSequenceNumber(slot->getSequenceNumber());
1312                 }
1313
1314                 while (true) {
1315                         TransactionPart *part = transaction->getNextPartToSend();
1316                         if (part == NULL) {
1317                                 // Ran out of parts to send for this transaction so move on
1318                                 break;
1319                         }
1320
1321                         if (slot->hasSpace(part)) {
1322                                 slot->addEntry(part);
1323                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1324                                 if (partsSent == NULL) {
1325                                         partsSent = new Vector<int32_t>();
1326                                         transactionPartsSent->put(transaction, partsSent);
1327                                 }
1328                                 partsSent->add(part->getPartNumber());
1329                                 transactionPartsSent->put(transaction, partsSent);
1330                         } else {
1331                                 break;
1332                         }
1333                 }
1334         }
1335
1336         // Fill the remainder of the slot with rescue data
1337         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1338
1339         return false;
1340 }
1341
1342 void Table::doRejectedMessages(Slot *s) {
1343         if (!rejectedSlotVector->isEmpty()) {
1344                 /* TODO: We should avoid generating a rejected message entry if
1345                  * there is already a sufficient entry in the queue (e->g->,
1346                  * equalsto value of true and same sequence number)->  */
1347
1348                 int64_t old_seqn = rejectedSlotVector->get(0);
1349                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1350                         int64_t new_seqn = rejectedSlotVector->lastElement();
1351                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1352                         s->addShallowEntry(rm);
1353                 } else {
1354                         int64_t prev_seqn = -1;
1355                         uint i = 0;
1356                         /* Go through list of missing messages */
1357                         for (; i < rejectedSlotVector->size(); i++) {
1358                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1359                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1360                                 if (s_msg != NULL)
1361                                         break;
1362                                 prev_seqn = curr_seqn;
1363                         }
1364                         /* Generate rejected message entry for missing messages */
1365                         if (prev_seqn != -1) {
1366                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1367                                 s->addShallowEntry(rm);
1368                         }
1369                         /* Generate rejected message entries for present messages */
1370                         for (; i < rejectedSlotVector->size(); i++) {
1371                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1372                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1373                                 int64_t machineid = s_msg->getMachineID();
1374                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1375                                 s->addShallowEntry(rm);
1376                         }
1377                 }
1378         }
1379 }
1380
1381 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1382         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1383         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1384         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1385                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1386         }
1387
1388         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1389         bool seenLiveSlot = false;
1390         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1391         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1392
1393
1394         // Mandatory Rescue
1395         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1396                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1397                 // Push slot number forward
1398                 if (!seenLiveSlot) {
1399                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1400                 }
1401
1402                 if (!previousSlot->isLive()) {
1403                         continue;
1404                 }
1405
1406                 // We have seen a live slot
1407                 seenLiveSlot = true;
1408
1409                 // Get all the live entries for a slot
1410                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1411
1412                 // Iterate over all the live entries and try to rescue them
1413                 uint lESize = liveEntries->size();
1414                 for (uint i = 0; i < lESize; i++) {
1415                         Entry *liveEntry = liveEntries->get(i);
1416                         if (slot->hasSpace(liveEntry)) {
1417                                 // Enough space to rescue the entry
1418                                 slot->addEntry(liveEntry);
1419                         } else if (currentSequenceNumber == firstIfFull) {
1420                                 //if there's no space but the entry is about to fall off the queue
1421                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1422                         }
1423                 }
1424         }
1425
1426         // Did not resize
1427         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1428 }
1429
1430 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1431         /* now go through live entries from least to greatest sequence number until
1432          * either all live slots added, or the slot doesn't have enough room
1433          * for SKIP_THRESHOLD consecutive entries*/
1434         int skipcount = 0;
1435         int64_t newestseqnum = buffer->getNewestSeqNum();
1436         for (; seqn <= newestseqnum; seqn++) {
1437                 Slot *prevslot = buffer->getSlot(seqn);
1438                 //Push slot number forward
1439                 if (!seenliveslot)
1440                         oldestLiveSlotSequenceNumver = seqn;
1441
1442                 if (!prevslot->isLive())
1443                         continue;
1444                 seenliveslot = true;
1445                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1446                 uint lESize = liveentries->size();
1447                 for (uint i = 0; i < lESize; i++) {
1448                         Entry *liveentry = liveentries->get(i);
1449                         if (s->hasSpace(liveentry))
1450                                 s->addEntry(liveentry);
1451                         else {
1452                                 skipcount++;
1453                                 if (skipcount > Table_SKIP_THRESHOLD) {
1454                                         delete liveentries;
1455                                         goto donesearch;
1456                                 }
1457                         }
1458                 }
1459                 delete liveentries;
1460         }
1461 donesearch:
1462         ;
1463 }
1464
1465 /**
1466  * Checks for malicious activity and updates the local copy of the block chain->
1467  */
1468 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1469         // The cloud communication layer has checked slot HMACs already
1470         // before decoding
1471         if (newSlots->length() == 0) {
1472                 return;
1473         }
1474
1475         // Make sure all slots are newer than the last largest slot this
1476         // client has seen
1477         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1478         if (firstSeqNum <= sequenceNumber) {
1479                 //              throw new Error("Server Error: Sent older slots!");
1480                 myerror("Server Error: Sent older slots!\n");
1481         }
1482
1483         // Create an object that can access both new slots and slots in our
1484         // local chain without committing slots to our local chain
1485         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1486
1487         // Check that the HMAC chain is not broken
1488         checkHMACChain(indexer, newSlots);
1489
1490         // Set to keep track of messages from clients
1491         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1492         {
1493                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1494                 while (lmit->hasNext())
1495                         machineSet->add(lmit->next());
1496                 delete lmit;
1497         }
1498
1499         // Process each slots data
1500         {
1501                 uint numSlots = newSlots->length();
1502                 for (uint i = 0; i < numSlots; i++) {
1503                         Slot *slot = newSlots->get(i);
1504                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1505                         updateExpectedSize();
1506                 }
1507         }
1508         delete indexer;
1509         
1510         // If there is a gap, check to see if the server sent us
1511         // everything->
1512         if (firstSeqNum != (sequenceNumber + 1)) {
1513
1514                 // Check the size of the slots that were sent down by the server->
1515                 // Can only check the size if there was a gap
1516                 checkNumSlots(newSlots->length());
1517
1518                 // Since there was a gap every machine must have pushed a slot or
1519                 // must have a last message message-> If not then the server is
1520                 // hiding slots
1521                 if (!machineSet->isEmpty()) {
1522                         delete machineSet;
1523                         //throw new Error("Missing record for machines: ");
1524                         myerror("Missing record for machines: \n");
1525                 }
1526         }
1527         delete machineSet;
1528         // Update the size of our local block chain->
1529         commitNewMaxSize();
1530
1531         // Commit new to slots to the local block chain->
1532         {
1533                 uint numSlots = newSlots->length();
1534                 for (uint i = 0; i < numSlots; i++) {
1535                         Slot *slot = newSlots->get(i);
1536
1537                         // Insert this slot into our local block chain copy->
1538                         buffer->putSlot(slot);
1539
1540                         // Keep track of how many slots are currently live (have live data
1541                         // in them)->
1542                         liveSlotCount++;
1543                 }
1544         }
1545         // Get the sequence number of the latest slot in the system
1546         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1547         updateLiveStateFromServer();
1548
1549         // No Need to remember after we pulled from the server
1550         offlineTransactionsCommittedAndAtServer->clear();
1551
1552         // This is invalidated now
1553         hadPartialSendToServer = false;
1554 }
1555
1556 void Table::updateLiveStateFromServer() {
1557         // Process the new transaction parts
1558         processNewTransactionParts();
1559
1560         // Do arbitration on new transactions that were received
1561         arbitrateFromServer();
1562
1563         // Update all the committed keys
1564         bool didCommitOrSpeculate = updateCommittedTable();
1565
1566         // Delete the transactions that are now dead
1567         updateLiveTransactionsAndStatus();
1568
1569         // Do speculations
1570         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1571         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1572 }
1573
1574 void Table::updateLiveStateFromLocal() {
1575         // Update all the committed keys
1576         bool didCommitOrSpeculate = updateCommittedTable();
1577
1578         // Delete the transactions that are now dead
1579         updateLiveTransactionsAndStatus();
1580
1581         // Do speculations
1582         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1583         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1584 }
1585
1586 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1587         int64_t prevslots = firstSequenceNumber;
1588
1589         if (didFindTableStatus) {
1590         } else {
1591                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1592         }
1593
1594         didFindTableStatus = true;
1595         currMaxSize = numberOfSlots;
1596 }
1597
1598 void Table::updateExpectedSize() {
1599         expectedsize++;
1600
1601         if (expectedsize > currMaxSize) {
1602                 expectedsize = currMaxSize;
1603         }
1604 }
1605
1606
1607 /**
1608  * Check the size of the block chain to make sure there are enough
1609  * slots sent back by the server-> This is only called when we have a
1610  * gap between the slots that we have locally and the slots sent by
1611  * the server therefore in the slots sent by the server there will be
1612  * at least 1 Table status message
1613  */
1614 void Table::checkNumSlots(int numberOfSlots) {
1615         if (numberOfSlots != expectedsize) {
1616                 //throw new Error("Server Error: Server did not send all slots->  Expected: ");
1617                 myerror("Server Error: Server did not send all slots->  Expected: \n");
1618         }
1619 }
1620
1621 /**
1622  * Update the size of of the local buffer if it is needed->
1623  */
1624 void Table::commitNewMaxSize() {
1625         didFindTableStatus = false;
1626
1627         // Resize the local slot buffer
1628         if (numberOfSlots != currMaxSize) {
1629                 buffer->resize((int32_t)currMaxSize);
1630         }
1631
1632         // Change the number of local slots to the new size
1633         numberOfSlots = (int32_t)currMaxSize;
1634
1635         // Recalculate the resize threshold since the size of the local
1636         // buffer has changed
1637         setResizeThreshold();
1638 }
1639
1640 /**
1641  * Process the new transaction parts from this latest round of slots
1642  * received from the server
1643  */
1644 void Table::processNewTransactionParts() {
1645
1646         if (newTransactionParts->size() == 0) {
1647                 // Nothing new to process
1648                 return;
1649         }
1650
1651         // Iterate through all the machine Ids that we received new parts
1652         // for
1653         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1654         while (tpit->hasNext()) {
1655                 int64_t machineId = tpit->next();
1656                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
1657
1658                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1659                 // Iterate through all the parts for that machine Id
1660                 while (ptit->hasNext()) {
1661                         Pair<int64_t, int32_t> *partId = ptit->next();
1662                         TransactionPart *part = parts->get(partId);
1663
1664                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1665                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1666                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1667                                         // Set dead the transaction part
1668                                         part->setDead();
1669                                         part->releaseRef();
1670                                         continue;
1671                                 }
1672                         }
1673
1674                         // Get the transaction object for that sequence number
1675                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1676
1677                         if (transaction == NULL) {
1678                                 // This is a new transaction that we dont have so make a new one
1679                                 transaction = new Transaction();
1680                                 
1681                                 // Add that part to the transaction
1682                                 transaction->addPartDecode(part);
1683
1684                                 // Insert this new transaction into the live tables
1685                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1686                                 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1687                         }
1688                         part->releaseRef();
1689                 }
1690                 delete ptit;
1691         }
1692         delete tpit;
1693         // Clear all the new transaction parts in preparation for the next
1694         // time the server sends slots
1695         {
1696                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1697                 while (partsit->hasNext()) {
1698                         int64_t machineId = partsit->next();
1699                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1700                         delete parts;
1701                 }
1702                 delete partsit;
1703                 newTransactionParts->clear();
1704         }
1705 }
1706
1707 void Table::arbitrateFromServer() {
1708         if (liveTransactionBySequenceNumberTable->size() == 0) {
1709                 // Nothing to arbitrate on so move on
1710                 return;
1711         }
1712
1713         // Get the transaction sequence numbers and sort from oldest to newest
1714         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1715         {
1716                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1717                 while (trit->hasNext())
1718                         transactionSequenceNumbers->add(trit->next());
1719                 delete trit;
1720         }
1721         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1722
1723         // Collection of key value pairs that are
1724         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1725
1726         // The last transaction arbitrated on
1727         int64_t lastTransactionCommitted = -1;
1728         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1729         uint tsnSize = transactionSequenceNumbers->size();
1730         for (uint i = 0; i < tsnSize; i++) {
1731                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1732                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1733
1734                 // Check if this machine arbitrates for this transaction if not
1735                 // then we cant arbitrate this transaction
1736                 if (transaction->getArbitrator() != localMachineId) {
1737                         continue;
1738                 }
1739
1740                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1741                         continue;
1742                 }
1743
1744                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1745                         // We have seen this already locally so dont commit again
1746                         continue;
1747                 }
1748
1749                 if (!transaction->isComplete()) {
1750                         // Will arbitrate in incorrect order if we continue so just break
1751                         // Most likely this
1752                         break;
1753                 }
1754
1755                 // update the largest transaction seen by arbitrator from server
1756                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1757                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1758                 } else {
1759                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1760                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1761                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1762                         }
1763                 }
1764
1765                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1766                         // Guard evaluated as true
1767                         // Update the local changes so we can make the commit
1768                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1769                         while (kvit->hasNext()) {
1770                                 KeyValue *kv = kvit->next();
1771                                 speculativeTableTmp->put(kv->getKey(), kv);
1772                         }
1773                         delete kvit;
1774
1775                         // Update what the last transaction committed was for use in batch commit
1776                         lastTransactionCommitted = transactionSequenceNumber;
1777                 } else {
1778                         // Guard evaluated was false so create abort
1779                         // Create the abort
1780                         Abort *newAbort = new Abort(NULL,
1781                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1782                                                                                                                                         transaction->getSequenceNumber(),
1783                                                                                                                                         transaction->getMachineId(),
1784                                                                                                                                         transaction->getArbitrator(),
1785                                                                                                                                         localArbitrationSequenceNumber);
1786                         localArbitrationSequenceNumber++;
1787                         generatedAborts->add(newAbort);
1788
1789                         // Insert the abort so we can process
1790                         processEntry(newAbort);
1791                 }
1792
1793                 lastSeqNumArbOn = transactionSequenceNumber;
1794         }
1795
1796         delete transactionSequenceNumbers;
1797
1798         Commit *newCommit = NULL;
1799
1800         // If there is something to commit
1801         if (speculativeTableTmp->size() != 0) {
1802                 // Create the commit and increment the commit sequence number
1803                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1804                 localArbitrationSequenceNumber++;
1805
1806                 // Add all the new keys to the commit
1807                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1808                 while (spit->hasNext()) {
1809                         IoTString *string = spit->next();
1810                         KeyValue *kv = speculativeTableTmp->get(string);
1811                         newCommit->addKV(kv);
1812                 }
1813                 delete spit;
1814                 
1815                 // create the commit parts
1816                 newCommit->createCommitParts();
1817
1818                 // Append all the commit parts to the end of the pending queue
1819                 // waiting for sending to the server
1820                 // Insert the commit so we can process it
1821                 Vector<CommitPart *> *parts = newCommit->getParts();
1822                 uint partsSize = parts->size();
1823                 for (uint i = 0; i < partsSize; i++) {
1824                         CommitPart *commitPart = parts->get(i);
1825                         processEntry(commitPart);
1826                 }
1827         }
1828         delete speculativeTableTmp;
1829
1830         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1831                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1832                 pendingSendArbitrationRounds->add(arbitrationRound);
1833
1834                 if (compactArbitrationData()) {
1835                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1836                         if (newArbitrationRound->getCommit() != NULL) {
1837                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1838                                 uint partsSize = parts->size();
1839                                 for (uint i = 0; i < partsSize; i++) {
1840                                         CommitPart *commitPart = parts->get(i);
1841                                         processEntry(commitPart);
1842                                 }
1843                         }
1844                 }
1845         } else {
1846                 delete generatedAborts;
1847         }
1848 }
1849
1850 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1851
1852         // Check if this machine arbitrates for this transaction if not then
1853         // we cant arbitrate this transaction
1854         if (transaction->getArbitrator() != localMachineId) {
1855                 return Pair<bool, bool>(false, false);
1856         }
1857
1858         if (!transaction->isComplete()) {
1859                 // Will arbitrate in incorrect order if we continue so just break
1860                 // Most likely this
1861                 return Pair<bool, bool>(false, false);
1862         }
1863
1864         if (transaction->getMachineId() != localMachineId) {
1865                 // dont do this check for local transactions
1866                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1867                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1868                                 // We've have already seen this from the server
1869                                 return Pair<bool, bool>(false, false);
1870                         }
1871                 }
1872         }
1873
1874         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1875                 // Guard evaluated as true Create the commit and increment the
1876                 // commit sequence number
1877                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1878                 localArbitrationSequenceNumber++;
1879
1880                 // Update the local changes so we can make the commit
1881                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1882                 while (kvit->hasNext()) {
1883                         KeyValue *kv = kvit->next();
1884                         newCommit->addKV(kv);
1885                 }
1886                 delete kvit;
1887
1888                 // create the commit parts
1889                 newCommit->createCommitParts();
1890
1891                 // Append all the commit parts to the end of the pending queue
1892                 // waiting for sending to the server
1893                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1894                 pendingSendArbitrationRounds->add(arbitrationRound);
1895
1896                 if (compactArbitrationData()) {
1897                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1898                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1899                         uint partsSize = parts->size();
1900                         for (uint i = 0; i < partsSize; i++) {
1901                                 CommitPart *commitPart = parts->get(i);
1902                                 processEntry(commitPart);
1903                         }
1904                 } else {
1905                         // Insert the commit so we can process it
1906                         Vector<CommitPart *> *parts = newCommit->getParts();
1907                         uint partsSize = parts->size();
1908                         for (uint i = 0; i < partsSize; i++) {
1909                                 CommitPart *commitPart = parts->get(i);
1910                                 processEntry(commitPart);
1911                         }
1912                 }
1913
1914                 if (transaction->getMachineId() == localMachineId) {
1915                         TransactionStatus *status = transaction->getTransactionStatus();
1916                         if (status != NULL) {
1917                                 status->setStatus(TransactionStatus_StatusCommitted);
1918                         }
1919                 }
1920
1921                 updateLiveStateFromLocal();
1922                 return Pair<bool, bool>(true, true);
1923         } else {
1924                 if (transaction->getMachineId() == localMachineId) {
1925                         // For locally created messages update the status
1926                         // Guard evaluated was false so create abort
1927                         TransactionStatus *status = transaction->getTransactionStatus();
1928                         if (status != NULL) {
1929                                 status->setStatus(TransactionStatus_StatusAborted);
1930                         }
1931                 } else {
1932                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1933
1934                         // Create the abort
1935                         Abort *newAbort = new Abort(NULL,
1936                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1937                                                                                                                                         -1,
1938                                                                                                                                         transaction->getMachineId(),
1939                                                                                                                                         transaction->getArbitrator(),
1940                                                                                                                                         localArbitrationSequenceNumber);
1941                         localArbitrationSequenceNumber++;
1942                         addAbortSet->add(newAbort);
1943
1944                         // Append all the commit parts to the end of the pending queue
1945                         // waiting for sending to the server
1946                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1947                         pendingSendArbitrationRounds->add(arbitrationRound);
1948
1949                         if (compactArbitrationData()) {
1950                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1951
1952                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1953                                 uint partsSize = parts->size();
1954                                 for (uint i = 0; i < partsSize; i++) {
1955                                         CommitPart *commitPart = parts->get(i);
1956                                         processEntry(commitPart);
1957                                 }
1958                         }
1959                 }
1960
1961                 updateLiveStateFromLocal();
1962                 return Pair<bool, bool>(true, false);
1963         }
1964 }
1965
1966 /**
1967  * Compacts the arbitration data by merging commits and aggregating
1968  * aborts so that a single large push of commits can be done instead
1969  * of many small updates
1970  */
1971 bool Table::compactArbitrationData() {
1972         if (pendingSendArbitrationRounds->size() < 2) {
1973                 // Nothing to compact so do nothing
1974                 return false;
1975         }
1976
1977         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1978         if (lastRound->getDidSendPart()) {
1979                 return false;
1980         }
1981
1982         bool hadCommit = (lastRound->getCommit() == NULL);
1983         bool gotNewCommit = false;
1984
1985         uint numberToDelete = 1;
1986         
1987         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1988                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1989
1990                 if (round->isFull() || round->getDidSendPart()) {
1991                         // Stop since there is a part that cannot be compacted and we
1992                         // need to compact in order
1993                         break;
1994                 }
1995
1996                 if (round->getCommit() == NULL) {
1997                         // Try compacting aborts only
1998                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1999                         if (newSize > ArbitrationRound_MAX_PARTS) {
2000                                 // Cant compact since it would be too large
2001                                 break;
2002                         }
2003                         lastRound->addAborts(round->getAborts());
2004                 } else {
2005                         // Create a new larger commit
2006                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2007                         localArbitrationSequenceNumber++;
2008
2009                         // Create the commit parts so that we can count them
2010                         newCommit->createCommitParts();
2011
2012                         // Calculate the new size of the parts
2013                         int newSize = newCommit->getNumberOfParts();
2014                         newSize += lastRound->getAbortsCount();
2015                         newSize += round->getAbortsCount();
2016
2017                         if (newSize > ArbitrationRound_MAX_PARTS) {
2018                                 // Can't compact since it would be too large
2019                                 if (lastRound->getCommit() != newCommit &&
2020                                                 round->getCommit() != newCommit)
2021                                         delete newCommit;
2022                                 break;
2023                         }
2024                         // Set the new compacted part
2025                         if (lastRound->getCommit() == newCommit)
2026                                 lastRound->setCommit(NULL);
2027                         if (round->getCommit() == newCommit)
2028                                 round->setCommit(NULL);
2029                         
2030                         if (lastRound->getCommit() != NULL) {
2031                                 Commit * oldcommit = lastRound->getCommit();
2032                                 lastRound->setCommit(NULL);
2033                                 delete oldcommit;
2034                         }
2035                         lastRound->setCommit(newCommit);
2036                         lastRound->addAborts(round->getAborts());
2037                         gotNewCommit = true;
2038                 }
2039
2040                 numberToDelete++;
2041         }
2042
2043         if (numberToDelete != 1) {
2044                 // If there is a compaction
2045                 // Delete the previous pieces that are now in the new compacted piece
2046                 for (uint i = 2; i <= numberToDelete; i++) {
2047                         delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2048                 }
2049                 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2050
2051                 pendingSendArbitrationRounds->add(lastRound);
2052
2053                 // Should reinsert into the commit processor
2054                 if (hadCommit && gotNewCommit) {
2055                         return true;
2056                 }
2057         }
2058
2059         return false;
2060 }
2061
2062 /**
2063  * Update all the commits and the committed tables, sets dead the dead
2064  * transactions
2065  */
2066 bool Table::updateCommittedTable() {
2067         if (newCommitParts->size() == 0) {
2068                 // Nothing new to process
2069                 return false;
2070         }
2071
2072         // Iterate through all the machine Ids that we received new parts for
2073         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2074         while (partsit->hasNext()) {
2075                 int64_t machineId = partsit->next();
2076                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2077
2078                 // Iterate through all the parts for that machine Id
2079                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2080                 while (pairit->hasNext()) {
2081                         Pair<int64_t, int32_t> *partId = pairit->next();
2082                         CommitPart *part = pairit->currVal();
2083
2084                         // Get the transaction object for that sequence number
2085                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2086
2087                         if (commitForClientTable == NULL) {
2088                                 // This is the first commit from this device
2089                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2090                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2091                         }
2092
2093                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2094
2095                         if (commit == NULL) {
2096                                 // This is a new commit that we dont have so make a new one
2097                                 commit = new Commit();
2098
2099                                 // Insert this new commit into the live tables
2100                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2101                         }
2102
2103                         // Add that part to the commit
2104                         commit->addPartDecode(part);
2105                         part->releaseRef();
2106                 }
2107                 delete pairit;
2108                 delete parts;
2109         }
2110         delete partsit;
2111
2112         // Clear all the new commits parts in preparation for the next time
2113         // the server sends slots
2114         newCommitParts->clear();
2115
2116         // If we process a new commit keep track of it for future use
2117         bool didProcessANewCommit = false;
2118
2119         // Process the commits one by one
2120         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2121         while (liveit->hasNext()) {
2122                 int64_t arbitratorId = liveit->next();
2123                 // Get all the commits for a specific arbitrator
2124                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2125
2126                 // Sort the commits in order
2127                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2128                 {
2129                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2130                         while (clientit->hasNext())
2131                                 commitSequenceNumbers->add(clientit->next());
2132                         delete clientit;
2133                 }
2134
2135                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2136
2137                 // Get the last commit seen from this arbitrator
2138                 int64_t lastCommitSeenSequenceNumber = -1;
2139                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2140                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2141                 }
2142
2143                 // Go through each new commit one by one
2144                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2145                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2146                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2147                         // Special processing if a commit is not complete
2148                         if (!commit->isComplete()) {
2149                                 if (i == (commitSequenceNumbers->size() - 1)) {
2150                                         // If there is an incomplete commit and this commit is the
2151                                         // latest one seen then this commit cannot be processed and
2152                                         // there are no other commits
2153                                         break;
2154                                 } else {
2155                                         // This is a commit that was already dead but parts of it
2156                                         // are still in the block chain (not flushed out yet)->
2157                                         // Delete it and move on
2158                                         commit->setDead();
2159                                         commitForClientTable->remove(commit->getSequenceNumber());
2160                                         delete commit;
2161                                         continue;
2162                                 }
2163                         }
2164
2165                         // Update the last transaction that was updated if we can
2166                         if (commit->getTransactionSequenceNumber() != -1) {
2167                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2168                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2169                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2170                                 }
2171                         }
2172
2173                         // Update the last arbitration data that we have seen so far
2174                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2175                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2176                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2177                                         // Is larger
2178                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2179                                 }
2180                         } else {
2181                                 // Never seen any data from this arbitrator so record the first one
2182                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2183                         }
2184
2185                         // We have already seen this commit before so need to do the
2186                         // full processing on this commit
2187                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2188                                 // Update the last transaction that was updated if we can
2189                                 if (commit->getTransactionSequenceNumber() != -1) {
2190                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2191                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2192                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2193                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2194                                         }
2195                                 }
2196                                 continue;
2197                         }
2198
2199                         // If we got here then this is a brand new commit and needs full
2200                         // processing
2201                         // Get what commits should be edited, these are the commits that
2202                         // have live values for their keys
2203                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2204                         {
2205                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2206                                 while (kvit->hasNext()) {
2207                                         KeyValue *kv = kvit->next();
2208                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2209                                         if (commit != NULL)
2210                                                 commitsToEdit->add(commit);
2211                                 }
2212                                 delete kvit;
2213                         }
2214
2215                         // Update each previous commit that needs to be updated
2216                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2217                         while (commitit->hasNext()) {
2218                                 Commit *previousCommit = commitit->next();
2219
2220                                 // Only bother with live commits (TODO: Maybe remove this check)
2221                                 if (previousCommit->isLive()) {
2222
2223                                         // Update which keys in the old commits are still live
2224                                         {
2225                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2226                                                 while (kvit->hasNext()) {
2227                                                         KeyValue *kv = kvit->next();
2228                                                         previousCommit->invalidateKey(kv->getKey());
2229                                                 }
2230                                                 delete kvit;
2231                                         }
2232
2233                                         // if the commit is now dead then remove it
2234                                         if (!previousCommit->isLive()) {
2235                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2236                                                 delete previousCommit;
2237                                         }
2238                                 }
2239                         }
2240                         delete commitit;
2241                         delete commitsToEdit;
2242
2243                         // Update the last seen sequence number from this arbitrator
2244                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2245                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2246                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2247                                 }
2248                         } else {
2249                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2250                         }
2251
2252                         // We processed a new commit that we havent seen before
2253                         didProcessANewCommit = true;
2254
2255                         // Update the committed table of keys and which commit is using which key
2256                         {
2257                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2258                                 while (kvit->hasNext()) {
2259                                         KeyValue *kv = kvit->next();
2260                                         committedKeyValueTable->put(kv->getKey(), kv);
2261                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2262                                 }
2263                                 delete kvit;
2264                         }
2265                 }
2266                 delete commitSequenceNumbers;
2267         }
2268         delete liveit;
2269
2270         return didProcessANewCommit;
2271 }
2272
2273 /**
2274  * Create the speculative table from transactions that are still live
2275  * and have come from the cloud
2276  */
2277 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2278         if (liveTransactionBySequenceNumberTable->size() == 0) {
2279                 // There is nothing to speculate on
2280                 return false;
2281         }
2282
2283         // Create a list of the transaction sequence numbers and sort them
2284         // from oldest to newest
2285         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2286         {
2287                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2288                 while (trit->hasNext())
2289                         transactionSequenceNumbersSorted->add(trit->next());
2290                 delete trit;
2291         }
2292
2293         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2294
2295         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2296
2297
2298         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2299                 // If there is a gap in the transaction sequence numbers then
2300                 // there was a commit or an abort of a transaction OR there was a
2301                 // new commit (Could be from offline commit) so a redo the
2302                 // speculation from scratch
2303
2304                 // Start from scratch
2305                 speculatedKeyValueTable->clear();
2306                 lastTransactionSequenceNumberSpeculatedOn = -1;
2307                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2308         }
2309
2310         // Remember the front of the transaction list
2311         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2312
2313         // Find where to start arbitration from
2314         uint startIndex = 0;
2315
2316         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2317                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2318                         break;
2319         startIndex++;
2320
2321         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2322                 // Make sure we are not out of bounds
2323                 delete transactionSequenceNumbersSorted;
2324                 return false;           // did not speculate
2325         }
2326
2327         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2328         bool didSkip = true;
2329
2330         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2331                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2332                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2333
2334                 if (!transaction->isComplete()) {
2335                         // If there is an incomplete transaction then there is nothing
2336                         // we can do add this transactions arbitrator to the list of
2337                         // arbitrators we should ignore
2338                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2339                         didSkip = true;
2340                         continue;
2341                 }
2342
2343                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2344                         continue;
2345                 }
2346
2347                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2348
2349                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2350                         // Guard evaluated to true so update the speculative table
2351                         {
2352                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2353                                 while (kvit->hasNext()) {
2354                                         KeyValue *kv = kvit->next();
2355                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2356                                 }
2357                                 delete kvit;
2358                         }
2359                 }
2360         }
2361
2362         delete transactionSequenceNumbersSorted;
2363         
2364         if (didSkip) {
2365                 // Since there was a skip we need to redo the speculation next time around
2366                 lastTransactionSequenceNumberSpeculatedOn = -1;
2367                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2368         }
2369
2370         // We did some speculation
2371         return true;
2372 }
2373
2374 /**
2375  * Create the pending transaction speculative table from transactions
2376  * that are still in the pending transaction buffer
2377  */
2378 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2379         if (pendingTransactionQueue->size() == 0) {
2380                 // There is nothing to speculate on
2381                 return;
2382         }
2383
2384         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2385                 // need to reset on the pending speculation
2386                 lastPendingTransactionSpeculatedOn = NULL;
2387                 firstPendingTransaction = pendingTransactionQueue->get(0);
2388                 pendingTransactionSpeculatedKeyValueTable->clear();
2389         }
2390
2391         // Find where to start arbitration from
2392         uint startIndex = 0;
2393
2394         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2395                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2396                         break;
2397
2398         if (startIndex >= pendingTransactionQueue->size()) {
2399                 // Make sure we are not out of bounds
2400                 return;
2401         }
2402
2403         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2404                 Transaction *transaction = pendingTransactionQueue->get(i);
2405
2406                 lastPendingTransactionSpeculatedOn = transaction;
2407
2408                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2409                         // Guard evaluated to true so update the speculative table
2410                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2411                         while (kvit->hasNext()) {
2412                                 KeyValue *kv = kvit->next();
2413                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2414                         }
2415                         delete kvit;
2416                 }
2417         }
2418 }
2419
2420 /**
2421  * Set dead and remove from the live transaction tables the
2422  * transactions that are dead
2423  */
2424 void Table::updateLiveTransactionsAndStatus() {
2425         // Go through each of the transactions
2426         {
2427                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2428                 while (iter->hasNext()) {
2429                         int64_t key = iter->next();
2430                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2431
2432                         // Check if the transaction is dead
2433                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2434                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2435                                 // Set dead the transaction
2436                                 transaction->setDead();
2437
2438                                 // Remove the transaction from the live table
2439                                 iter->remove();
2440                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2441                                 delete transaction;
2442                         }
2443                 }
2444                 delete iter;
2445         }
2446
2447         // Go through each of the transactions
2448         {
2449                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2450                 while (iter->hasNext()) {
2451                         int64_t key = iter->next();
2452                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2453
2454                         // Check if the transaction is dead
2455                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2456                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2457                                 // Set committed
2458                                 status->setStatus(TransactionStatus_StatusCommitted);
2459
2460                                 // Remove
2461                                 iter->remove();
2462                         }
2463                 }
2464                 delete iter;
2465         }
2466 }
2467
2468 /**
2469  * Process this slot, entry by entry->  Also update the latest message sent by slot
2470  */
2471 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2472
2473         // Update the last message seen
2474         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2475
2476         // Process each entry in the slot
2477         Vector<Entry *> *entries = slot->getEntries();
2478         uint eSize = entries->size();
2479         for (uint ei = 0; ei < eSize; ei++) {
2480                 Entry *entry = entries->get(ei);
2481                 switch (entry->getType()) {
2482                 case TypeCommitPart:
2483                         processEntry((CommitPart *)entry);
2484                         break;
2485                 case TypeAbort:
2486                         processEntry((Abort *)entry);
2487                         break;
2488                 case TypeTransactionPart:
2489                         processEntry((TransactionPart *)entry);
2490                         break;
2491                 case TypeNewKey:
2492                         processEntry((NewKey *)entry);
2493                         break;
2494                 case TypeLastMessage:
2495                         processEntry((LastMessage *)entry, machineSet);
2496                         break;
2497                 case TypeRejectedMessage:
2498                         processEntry((RejectedMessage *)entry, indexer);
2499                         break;
2500                 case TypeTableStatus:
2501                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2502                         break;
2503                 default:
2504                         //throw new Error("Unrecognized type: ");
2505                         myerror("Unrecognized type: \n");
2506                 }
2507         }
2508 }
2509
2510 /**
2511  * Update the last message that was sent for a machine Id
2512  */
2513 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2514         // Update what the last message received by a machine was
2515         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2516 }
2517
2518 /**
2519  * Add the new key to the arbitrators table and update the set of live
2520  * new keys (in case of a rescued new key message)
2521  */
2522 void Table::processEntry(NewKey *entry) {
2523         // Update the arbitrator table with the new key information
2524         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2525
2526         // Update what the latest live new key is
2527         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2528         if (oldNewKey != NULL) {
2529                 // Delete the old new key messages
2530                 oldNewKey->setDead();
2531         }
2532 }
2533
2534 /**
2535  * Process new table status entries and set dead the old ones as new
2536  * ones come in-> keeps track of the largest and smallest table status
2537  * seen in this current round of updating the local copy of the block
2538  * chain
2539  */
2540 void Table::processEntry(TableStatus *entry, int64_t seq) {
2541         int newNumSlots = entry->getMaxSlots();
2542         updateCurrMaxSize(newNumSlots);
2543         initExpectedSize(seq, newNumSlots);
2544
2545         if (liveTableStatus != NULL) {
2546                 // We have a larger table status so the old table status is no
2547                 // int64_ter alive
2548                 liveTableStatus->setDead();
2549         }
2550
2551         // Make this new table status the latest alive table status
2552         liveTableStatus = entry;
2553 }
2554
2555 /**
2556  * Check old messages to see if there is a block chain violation->
2557  * Also
2558  */
2559 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2560         int64_t oldSeqNum = entry->getOldSeqNum();
2561         int64_t newSeqNum = entry->getNewSeqNum();
2562         bool isequal = entry->getEqual();
2563         int64_t machineId = entry->getMachineID();
2564         int64_t seq = entry->getSequenceNumber();
2565
2566         // Check if we have messages that were supposed to be rejected in
2567         // our local block chain
2568         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2569                 // Get the slot
2570                 Slot *slot = indexer->getSlot(seqNum);
2571
2572                 if (slot != NULL) {
2573                         // If we have this slot make sure that it was not supposed to be
2574                         // a rejected slot
2575                         int64_t slotMachineId = slot->getMachineID();
2576                         if (isequal != (slotMachineId == machineId)) {
2577                                 //throw new Error("Server Error: Trying to insert rejected message for slot ");
2578                                 myerror("Server Error: Trying to insert rejected message for slot\n");
2579                         }
2580                 }
2581         }
2582
2583         // Create a list of clients to watch until they see this rejected
2584         // message entry->
2585         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2586         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2587         while (iter->hasNext()) {
2588                 // Machine ID for the last message entry
2589                 int64_t lastMessageEntryMachineId = iter->next();
2590
2591                 // We've seen it, don't need to continue to watch->  Our next
2592                 // message will implicitly acknowledge it->
2593                 if (lastMessageEntryMachineId == localMachineId) {
2594                         continue;
2595                 }
2596
2597                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2598                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2599
2600                 if (entrySequenceNumber < seq) {
2601                         // Add this rejected message to the set of messages that this
2602                         // machine ID did not see yet
2603                         addWatchVector(lastMessageEntryMachineId, entry);
2604                         // This client did not see this rejected message yet so add it
2605                         // to the watch set to monitor
2606                         deviceWatchSet->add(lastMessageEntryMachineId);
2607                 }
2608         }
2609         delete iter;
2610
2611         if (deviceWatchSet->isEmpty()) {
2612                 // This rejected message has been seen by all the clients so
2613                 entry->setDead();
2614                 delete deviceWatchSet;
2615         } else {
2616                 // We need to watch this rejected message
2617                 entry->setWatchSet(deviceWatchSet);
2618         }
2619 }
2620
2621 /**
2622  * Check if this abort is live, if not then save it so we can kill it
2623  * later-> update the last transaction number that was arbitrated on->
2624  */
2625 void Table::processEntry(Abort *entry) {
2626         if (entry->getTransactionSequenceNumber() != -1) {
2627                 // update the transaction status if it was sent to the server
2628                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2629                 if (status != NULL) {
2630                         status->setStatus(TransactionStatus_StatusAborted);
2631                 }
2632         }
2633
2634         // Abort has not been seen by the client it is for yet so we need to
2635         // keep track of it
2636
2637         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2638         if (previouslySeenAbort != NULL) {
2639                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2640         }
2641
2642         if (entry->getTransactionArbitrator() == localMachineId) {
2643                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2644         }
2645
2646         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2647                 // The machine already saw this so it is dead
2648                 entry->setDead();
2649                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2650                 liveAbortTable->remove(&abortid);
2651
2652                 if (entry->getTransactionArbitrator() == localMachineId) {
2653                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2654                 }
2655                 return;
2656         }
2657
2658         // Update the last arbitration data that we have seen so far
2659         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2660                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2661                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2662                         // Is larger
2663                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2664                 }
2665         } else {
2666                 // Never seen any data from this arbitrator so record the first one
2667                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2668         }
2669
2670         // Set dead a transaction if we can
2671         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2672
2673         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2674         if (transactionToSetDead != NULL) {
2675                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2676         }
2677
2678         // Update the last transaction sequence number that the arbitrator
2679         // arbitrated on
2680         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2681                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2682                 // Is a valid one
2683                 if (entry->getTransactionSequenceNumber() != -1) {
2684                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2685                 }
2686         }
2687 }
2688
2689 /**
2690  * Set dead the transaction part if that transaction is dead and keep
2691  * track of all new parts
2692  */
2693 void Table::processEntry(TransactionPart *entry) {
2694         // Check if we have already seen this transaction and set it dead OR
2695         // if it is not alive
2696         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2697                 // This transaction is dead, it was already committed or aborted
2698                 entry->setDead();
2699                 return;
2700         }
2701
2702         // This part is still alive
2703         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2704
2705         if (transactionPart == NULL) {
2706                 // Dont have a table for this machine Id yet so make one
2707                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2708                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2709         }
2710
2711         // Update the part and set dead ones we have already seen (got a
2712         // rescued version)
2713         entry->acquireRef();
2714         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2715         if (previouslySeenPart != NULL) {
2716                 previouslySeenPart->releaseRef();
2717                 previouslySeenPart->setDead();
2718         }
2719 }
2720
2721 /**
2722  * Process new commit entries and save them for future use->  Delete duplicates
2723  */
2724 void Table::processEntry(CommitPart *entry) {
2725         // Update the last transaction that was updated if we can
2726         if (entry->getTransactionSequenceNumber() != -1) {
2727                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2728                                 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2729                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2730                 }
2731         }
2732
2733         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2734         if (commitPart == NULL) {
2735                 // Don't have a table for this machine Id yet so make one
2736                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2737                 newCommitParts->put(entry->getMachineId(), commitPart);
2738         }
2739         // Update the part and set dead ones we have already seen (got a
2740         // rescued version)
2741         entry->acquireRef();
2742         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2743         if (previouslySeenPart != NULL) {
2744                 previouslySeenPart->setDead();
2745                 previouslySeenPart->releaseRef();
2746         }
2747 }
2748
2749 /**
2750  * Update the last message seen table-> Update and set dead the
2751  * appropriate RejectedMessages as clients see them-> Updates the live
2752  * aborts, removes those that are dead and sets them dead-> Check that
2753  * the last message seen is correct and that there is no mismatch of
2754  * our own last message or that other clients have not had a rollback
2755  * on the last message->
2756  */
2757 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2758         // We have seen this machine ID
2759         machineSet->remove(machineId);
2760
2761         // Get the set of rejected messages that this machine Id is has not seen yet
2762         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2763         // If there is a rejected message that this machine Id has not seen yet
2764         if (watchset != NULL) {
2765                 // Go through each rejected message that this machine Id has not
2766                 // seen yet
2767
2768                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2769                 while (rmit->hasNext()) {
2770                         RejectedMessage *rm = rmit->next();
2771                         // If this machine Id has seen this rejected message->->->
2772                         if (rm->getSequenceNumber() <= seqNum) {
2773                                 // Remove it from our watchlist
2774                                 rmit->remove();
2775                                 // Decrement machines that need to see this notification
2776                                 rm->removeWatcher(machineId);
2777                         }
2778                 }
2779                 delete rmit;
2780         }
2781
2782         // Set dead the abort
2783         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2784
2785         while (abortit->hasNext()) {
2786                 Pair<int64_t, int64_t> *key = abortit->next();
2787                 Abort *abort = liveAbortTable->get(key);
2788                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2789                         abort->setDead();
2790                         abortit->remove();
2791                         if (abort->getTransactionArbitrator() == localMachineId) {
2792                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2793                         }
2794                 }
2795         }
2796         delete abortit;
2797         if (machineId == localMachineId) {
2798                 // Our own messages are immediately dead->
2799                 char livenessType = liveness->getType();
2800                 if (livenessType == TypeLastMessage) {
2801                         ((LastMessage *)liveness)->setDead();
2802                 } else if (livenessType == TypeSlot) {
2803                         ((Slot *)liveness)->setDead();
2804                 } else {
2805                         //throw new Error("Unrecognized type");
2806                         myerror("Unrecognized type\n");
2807                 }
2808         }
2809         // Get the old last message for this device
2810         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2811         if (lastMessageEntry == NULL) {
2812                 // If no last message then there is nothing else to process
2813                 return;
2814         }
2815
2816         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2817         Liveness *lastEntry = lastMessageEntry->getSecond();
2818         delete lastMessageEntry;
2819
2820         // If it is not our machine Id since we already set ours to dead
2821         if (machineId != localMachineId) {
2822                 char lastEntryType = lastEntry->getType();
2823
2824                 if (lastEntryType == TypeLastMessage) {
2825                         ((LastMessage *)lastEntry)->setDead();
2826                 } else if (lastEntryType == TypeSlot) {
2827                         ((Slot *)lastEntry)->setDead();
2828                 } else {
2829                         //throw new Error("Unrecognized type");
2830                         myerror("Unrecognized type\n");
2831                 }
2832         }
2833         // Make sure the server is not playing any games
2834         if (machineId == localMachineId) {
2835                 if (hadPartialSendToServer) {
2836                         // We were not making any updates and we had a machine mismatch
2837                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2838                                 //throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2839                                 myerror("Server Error: Mismatch on local machine sequence number, needed at least: \n");
2840                         }
2841                 } else {
2842                         // We were not making any updates and we had a machine mismatch
2843                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2844                                 //throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2845                                 myerror("Server Error: Mismatch on local machine sequence number, needed: \n");
2846                         }
2847                 }
2848         } else {
2849                 if (lastMessageSeqNum > seqNum) {
2850                         //throw new Error("Server Error: Rollback on remote machine sequence number");
2851                         myerror("Server Error: Rollback on remote machine sequence number\n");
2852                 }
2853         }
2854 }
2855
2856 /**
2857  * Add a rejected message entry to the watch set to keep track of
2858  * which clients have seen that rejected message entry and which have
2859  * not.
2860  */
2861 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2862         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2863         if (entries == NULL) {
2864                 // There is no set for this machine ID yet so create one
2865                 entries = new Hashset<RejectedMessage *>();
2866                 rejectedMessageWatchVectorTable->put(machineId, entries);
2867         }
2868         entries->add(entry);
2869 }
2870
2871 /**
2872  * Check if the HMAC chain is not violated
2873  */
2874 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2875         for (uint i = 0; i < newSlots->length(); i++) {
2876                 Slot *currSlot = newSlots->get(i);
2877                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2878                 if (prevSlot != NULL &&
2879                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2880                         //                      throw new Error("Server Error: Invalid HMAC Chain");
2881                         myerror("Server Error: Invalid HMAC Chain\n");
2882         }
2883 }