edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastPendingSendArbitrationEntriesToDelete(NULL),
64         lastNewKey(NULL),
65         committedKeyValueTable(NULL),
66         speculatedKeyValueTable(NULL),
67         pendingTransactionSpeculatedKeyValueTable(NULL),
68         liveNewKeyTable(NULL),
69         lastMessageTable(NULL),
70         rejectedMessageWatchVectorTable(NULL),
71         arbitratorTable(NULL),
72         liveAbortTable(NULL),
73         newTransactionParts(NULL),
74         newCommitParts(NULL),
75         lastArbitratedTransactionNumberByArbitratorTable(NULL),
76         liveTransactionBySequenceNumberTable(NULL),
77         liveTransactionByTransactionIdTable(NULL),
78         liveCommitsTable(NULL),
79         liveCommitsByKeyTable(NULL),
80         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81         rejectedSlotVector(NULL),
82         pendingTransactionQueue(NULL),
83         pendingSendArbitrationRounds(NULL),
84         pendingSendArbitrationEntriesToDelete(NULL),
85         transactionPartsSent(NULL),
86         outstandingTransactionStatus(NULL),
87         liveAbortsGeneratedByLocal(NULL),
88         offlineTransactionsCommittedAndAtServer(NULL),
89         localCommunicationTable(NULL),
90         lastTransactionSeenFromMachineFromServer(NULL),
91         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92         lastInsertedNewKey(false),
93         lastSeqNumArbOn(0)
94 {
95         init();
96 }
97
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
99         buffer(NULL),
100         cloud(_cloud),
101         random(NULL),
102         liveTableStatus(NULL),
103         pendingTransactionBuilder(NULL),
104         lastPendingTransactionSpeculatedOn(NULL),
105         firstPendingTransaction(NULL),
106         numberOfSlots(0),
107         bufferResizeThreshold(0),
108         liveSlotCount(0),
109         oldestLiveSlotSequenceNumver(1),
110         localMachineId(_localMachineId),
111         sequenceNumber(0),
112         localSequenceNumber(0),
113         localTransactionSequenceNumber(0),
114         lastTransactionSequenceNumberSpeculatedOn(0),
115         oldestTransactionSequenceNumberSpeculatedOn(0),
116         localArbitrationSequenceNumber(0),
117         hadPartialSendToServer(false),
118         attemptedToSendToServer(false),
119         expectedsize(0),
120         didFindTableStatus(false),
121         currMaxSize(0),
122         lastSlotAttemptedToSend(NULL),
123         lastIsNewKey(false),
124         lastNewSize(0),
125         lastTransactionPartsSent(NULL),
126         lastPendingSendArbitrationEntriesToDelete(NULL),
127         lastNewKey(NULL),
128         committedKeyValueTable(NULL),
129         speculatedKeyValueTable(NULL),
130         pendingTransactionSpeculatedKeyValueTable(NULL),
131         liveNewKeyTable(NULL),
132         lastMessageTable(NULL),
133         rejectedMessageWatchVectorTable(NULL),
134         arbitratorTable(NULL),
135         liveAbortTable(NULL),
136         newTransactionParts(NULL),
137         newCommitParts(NULL),
138         lastArbitratedTransactionNumberByArbitratorTable(NULL),
139         liveTransactionBySequenceNumberTable(NULL),
140         liveTransactionByTransactionIdTable(NULL),
141         liveCommitsTable(NULL),
142         liveCommitsByKeyTable(NULL),
143         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144         rejectedSlotVector(NULL),
145         pendingTransactionQueue(NULL),
146         pendingSendArbitrationRounds(NULL),
147         pendingSendArbitrationEntriesToDelete(NULL),
148         transactionPartsSent(NULL),
149         outstandingTransactionStatus(NULL),
150         liveAbortsGeneratedByLocal(NULL),
151         offlineTransactionsCommittedAndAtServer(NULL),
152         localCommunicationTable(NULL),
153         lastTransactionSeenFromMachineFromServer(NULL),
154         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155         lastInsertedNewKey(false),
156         lastSeqNumArbOn(0)
157 {
158         init();
159 }
160
161 Table::~Table() {
162         delete cloud;
163         delete random;
164         delete buffer;
165         // init data structs
166         delete committedKeyValueTable;
167         delete speculatedKeyValueTable;
168         delete pendingTransactionSpeculatedKeyValueTable;
169         delete liveNewKeyTable;
170         delete lastMessageTable;
171         delete rejectedMessageWatchVectorTable;
172         delete arbitratorTable;
173         delete liveAbortTable;
174         delete newTransactionParts;
175         delete newCommitParts;
176         delete lastArbitratedTransactionNumberByArbitratorTable;
177         delete liveTransactionBySequenceNumberTable;
178         delete liveTransactionByTransactionIdTable;
179         delete liveCommitsTable;
180         delete liveCommitsByKeyTable;
181         delete lastCommitSeenSequenceNumberByArbitratorTable;
182         delete rejectedSlotVector;
183         delete pendingTransactionQueue;
184         delete pendingSendArbitrationEntriesToDelete;
185         delete transactionPartsSent;
186         delete outstandingTransactionStatus;
187         delete liveAbortsGeneratedByLocal;
188         delete offlineTransactionsCommittedAndAtServer;
189         delete localCommunicationTable;
190         delete lastTransactionSeenFromMachineFromServer;
191         delete pendingSendArbitrationRounds;
192         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
193 }
194
195 /**
196  * Init all the stuff needed for for table usage
197  */
198 void Table::init() {
199         // Init helper objects
200         random = new SecureRandom();
201         buffer = new SlotBuffer();
202
203         // init data structs
204         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
205         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
206         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
207         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
208         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
209         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
210         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
211         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
212         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
213         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
214         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
215         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
216         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
217         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
218         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
219         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
220         rejectedSlotVector = new Vector<int64_t>();
221         pendingTransactionQueue = new Vector<Transaction *>();
222         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
223         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
224         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
225         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
226         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
227         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
228         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
229         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
230         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
231
232         // Other init stuff
233         numberOfSlots = buffer->capacity();
234         setResizeThreshold();
235 }
236
237 /**
238  * Initialize the table by inserting a table status as the first entry
239  * into the table status also initialize the crypto stuff.
240  */
241 void Table::initTable() {
242         cloud->initSecurity();
243
244         // Create the first insertion into the block chain which is the table status
245         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
246         localSequenceNumber++;
247         TableStatus *status = new TableStatus(s, numberOfSlots);
248         s->addEntry(status);
249         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
250
251         if (array == NULL) {
252                 array = new Array<Slot *>(1);
253                 array->set(0, s);
254                 // update local block chain
255                 validateAndUpdate(array, true);
256         } else if (array->length() == 1) {
257                 // in case we did push the slot BUT we failed to init it
258                 validateAndUpdate(array, true);
259         } else {
260                 throw new Error("Error on initialization");
261         }
262 }
263
264 /**
265  * Rebuild the table from scratch by pulling the latest block chain
266  * from the server.
267  */
268 void Table::rebuild() {
269         // Just pull the latest slots from the server
270         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
271         validateAndUpdate(newslots, true);
272         sendToServer(NULL);
273         updateLiveTransactionsAndStatus();
274 }
275
276 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
277         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
278 }
279
280 int64_t Table::getArbitrator(IoTString *key) {
281         return arbitratorTable->get(key);
282 }
283
284 void Table::close() {
285         cloud->closeCloud();
286 }
287
288 IoTString *Table::getCommitted(IoTString *key)  {
289         KeyValue *kv = committedKeyValueTable->get(key);
290
291         if (kv != NULL) {
292                 return kv->getValue();
293         } else {
294                 return NULL;
295         }
296 }
297
298 IoTString *Table::getSpeculative(IoTString *key) {
299         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
300
301         if (kv == NULL) {
302                 kv = speculatedKeyValueTable->get(key);
303         }
304
305         if (kv == NULL) {
306                 kv = committedKeyValueTable->get(key);
307         }
308
309         if (kv != NULL) {
310                 return kv->getValue();
311         } else {
312                 return NULL;
313         }
314 }
315
316 IoTString *Table::getCommittedAtomic(IoTString *key) {
317         KeyValue *kv = committedKeyValueTable->get(key);
318
319         if (!arbitratorTable->contains(key)) {
320                 throw new Error("Key not Found.");
321         }
322
323         // Make sure new key value pair matches the current arbitrator
324         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
325                 // TODO: Maybe not throw en error
326                 throw new Error("Not all Key Values Match Arbitrator.");
327         }
328
329         if (kv != NULL) {
330                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
331                 return kv->getValue();
332         } else {
333                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
334                 return NULL;
335         }
336 }
337
338 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
339         if (!arbitratorTable->contains(key)) {
340                 throw new Error("Key not Found.");
341         }
342
343         // Make sure new key value pair matches the current arbitrator
344         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
345                 // TODO: Maybe not throw en error
346                 throw new Error("Not all Key Values Match Arbitrator.");
347         }
348
349         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
350
351         if (kv == NULL) {
352                 kv = speculatedKeyValueTable->get(key);
353         }
354
355         if (kv == NULL) {
356                 kv = committedKeyValueTable->get(key);
357         }
358
359         if (kv != NULL) {
360                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
361                 return kv->getValue();
362         } else {
363                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
364                 return NULL;
365         }
366 }
367
368 bool Table::update()  {
369         try {
370                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
371                 validateAndUpdate(newSlots, false);
372                 sendToServer(NULL);
373                 updateLiveTransactionsAndStatus();
374                 return true;
375         } catch (Exception *e) {
376                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
377                 while (kit->hasNext()) {
378                         int64_t m = kit->next();
379                         updateFromLocal(m);
380                 }
381                 delete kit;
382         }
383
384         return false;
385 }
386
387 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
388         while (true) {
389                 if (arbitratorTable->contains(keyName)) {
390                         // There is already an arbitrator
391                         return false;
392                 }
393                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
394
395                 if (sendToServer(newKey)) {
396                         // If successfully inserted
397                         return true;
398                 }
399         }
400 }
401
402 void Table::startTransaction() {
403         // Create a new transaction, invalidates any old pending transactions.
404         pendingTransactionBuilder = new PendingTransaction(localMachineId);
405 }
406
407 void Table::addKV(IoTString *key, IoTString *value) {
408
409         // Make sure it is a valid key
410         if (!arbitratorTable->contains(key)) {
411                 throw new Error("Key not Found.");
412         }
413
414         // Make sure new key value pair matches the current arbitrator
415         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
416                 // TODO: Maybe not throw en error
417                 throw new Error("Not all Key Values Match Arbitrator.");
418         }
419
420         // Add the key value to this transaction
421         KeyValue *kv = new KeyValue(key, value);
422         pendingTransactionBuilder->addKV(kv);
423 }
424
425 TransactionStatus *Table::commitTransaction() {
426         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
427                 // transaction with no updates will have no effect on the system
428                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
429         }
430
431         // Set the local transaction sequence number and increment
432         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
433         localTransactionSequenceNumber++;
434
435         // Create the transaction status
436         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
437
438         // Create the new transaction
439         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
440         newTransaction->setTransactionStatus(transactionStatus);
441
442         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
443                 // Add it to the queue and invalidate the builder for safety
444                 pendingTransactionQueue->add(newTransaction);
445         } else {
446                 arbitrateOnLocalTransaction(newTransaction);
447                 updateLiveStateFromLocal();
448         }
449
450         pendingTransactionBuilder = new PendingTransaction(localMachineId);
451
452         try {
453                 sendToServer(NULL);
454         } catch (ServerException *e) {
455
456                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
457                 uint size = pendingTransactionQueue->size();
458                 uint oldindex = 0;
459                 for (uint iter = 0; iter < size; iter++) {
460                         Transaction *transaction = pendingTransactionQueue->get(iter);
461                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
462
463                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
464                                 // Already contacted this client so ignore all attempts to contact this client
465                                 // to preserve ordering for arbitrator
466                                 continue;
467                         }
468
469                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
470
471                         if (sendReturn.getFirst()) {
472                                 // Failed to contact over local
473                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
474                         } else {
475                                 // Successful contact or should not contact
476
477                                 if (sendReturn.getSecond()) {
478                                         // did arbitrate
479                                         oldindex--;
480                                 }
481                         }
482                 }
483                 pendingTransactionQueue->setSize(oldindex);
484         }
485
486         updateLiveStateFromLocal();
487
488         return transactionStatus;
489 }
490
491 /**
492  * Recalculate the new resize threshold
493  */
494 void Table::setResizeThreshold() {
495         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
496         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
497 }
498
499 int64_t Table::getLocalSequenceNumber() {
500         return localSequenceNumber;
501 }
502
503 NewKey * Table::handlePartialSend(NewKey * newKey) {
504         //Didn't receive acknowledgement for last send
505         //See if the server has received a newer slot
506         
507         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
508         if (newSlots->length() == 0) {
509                 //Retry sending old slot
510                 bool wasInserted = false;
511                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
512                 
513                 if (sendSlotsReturn) {
514                         if (newKey != NULL) {
515                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
516                                         newKey = NULL;
517                                 }
518                         }
519                         
520                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
521                         while (trit->hasNext()) {
522                                 Transaction *transaction = trit->next();
523                                 transaction->resetServerFailure();
524                                 // Update which transactions parts still need to be sent
525                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
526                                 // Add the transaction status to the outstanding list
527                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
528                                 
529                                 // Update the transaction status
530                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
531                                 
532                                 // Check if all the transaction parts were successfully
533                                 // sent and if so then remove it from pending
534                                 if (transaction->didSendAllParts()) {
535                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
536                                         pendingTransactionQueue->remove(transaction);
537                                 }
538                         }
539                         delete trit;
540                 } else {
541                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
542                                 if (newKey != NULL) {
543                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
544                                                 newKey = NULL;
545                                         }
546                                 }
547                                 
548                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
549                                 while (trit->hasNext()) {
550                                         Transaction *transaction = trit->next();
551                                         transaction->resetServerFailure();
552                                         
553                                         // Update which transactions parts still need to be sent
554                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
555                                         
556                                         // Add the transaction status to the outstanding list
557                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
558                                         
559                                         // Update the transaction status
560                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
561                                         
562                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
563                                         if (transaction->didSendAllParts()) {
564                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
565                                                 pendingTransactionQueue->remove(transaction);
566                                         } else {
567                                                 transaction->resetServerFailure();
568                                                 // Set the transaction sequence number back to nothing
569                                                 if (!transaction->didSendAPartToServer()) {
570                                                         transaction->setSequenceNumber(-1);
571                                                 }
572                                         }
573                                 }
574                                 delete trit;
575                         }
576                 }
577                 
578                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
579                 while (trit->hasNext()) {
580                         Transaction *transaction = trit->next();
581                         transaction->resetServerFailure();
582                         // Set the transaction sequence number back to nothing
583                         if (!transaction->didSendAPartToServer()) {
584                                 transaction->setSequenceNumber(-1);
585                         }
586                 }
587                 delete trit;
588                 
589                 if (newSlots->length() != 0) {
590                         // insert into the local block chain
591                         validateAndUpdate(newSlots, true);
592                 }
593                 // continue;
594         } else {
595                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
596                         if (newKey != NULL) {
597                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
598                                         newKey = NULL;
599                                 }
600                         }
601                         
602                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
603                         while (trit->hasNext()) {
604                                 Transaction *transaction = trit->next();
605                                 transaction->resetServerFailure();
606                                 
607                                 // Update which transactions parts still need to be sent
608                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
609                                 
610                                 // Add the transaction status to the outstanding list
611                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
612                                 
613                                 // Update the transaction status
614                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
615                                 
616                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
617                                 if (transaction->didSendAllParts()) {
618                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
619                                         pendingTransactionQueue->remove(transaction);
620                                 } else {
621                                         transaction->resetServerFailure();
622                                         // Set the transaction sequence number back to nothing
623                                         if (!transaction->didSendAPartToServer()) {
624                                                 transaction->setSequenceNumber(-1);
625                                         }
626                                 }
627                         }
628                         delete trit;
629                 } else {
630                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
631                         while (trit->hasNext()) {
632                                 Transaction *transaction = trit->next();
633                                 transaction->resetServerFailure();
634                                 // Set the transaction sequence number back to nothing
635                                 if (!transaction->didSendAPartToServer()) {
636                                         transaction->setSequenceNumber(-1);
637                                 }
638                         }
639                         delete trit;
640                 }
641                 
642                 // insert into the local block chain
643                 validateAndUpdate(newSlots, true);
644         }
645         return newKey;
646 }
647
648 bool Table::sendToServer(NewKey *newKey) {
649         if (hadPartialSendToServer) {
650                 newKey = handlePartialSend(newKey);
651         }
652
653         try {
654                 // While we have stuff that needs inserting into the block chain
655                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
656                         if (hadPartialSendToServer) {
657                                 throw new Error("Should Be error free");
658                         }
659                         
660                         // If there is a new key with same name then end
661                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
662                                 return false;
663                         }
664
665                         // Create the slot
666                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
667                         localSequenceNumber++;
668
669                         // Try to fill the slot with data
670                         int newSize = 0;
671                         bool insertedNewKey = false;
672                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
673
674                         if (needsResize) {
675                                 // Reset which transaction to send
676                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
677                                 while (trit->hasNext()) {
678                                         Transaction *transaction = trit->next();
679                                         transaction->resetNextPartToSend();
680
681                                         // Set the transaction sequence number back to nothing
682                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
683                                                 transaction->setSequenceNumber(-1);
684                                         }
685                                 }
686                                 delete trit;
687
688                                 // Clear the sent data since we are trying again
689                                 pendingSendArbitrationEntriesToDelete->clear();
690                                 transactionPartsSent->clear();
691
692                                 // We needed a resize so try again
693                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
694                         }
695
696                         lastSlotAttemptedToSend = slot;
697                         lastIsNewKey = (newKey != NULL);
698                         lastInsertedNewKey = insertedNewKey;
699                         lastNewSize = newSize;
700                         lastNewKey = newKey;
701                         lastTransactionPartsSent = transactionPartsSent->clone();
702                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
703
704                         Array<Slot *> * newSlots = NULL;
705                         bool wasInserted = false;
706                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
707
708                         if (sendSlotsReturn) {
709                                 // Did insert into the block chain
710                                 if (insertedNewKey) {
711                                         // This slot was what was inserted not a previous slot
712                                         // New Key was successfully inserted into the block chain so dont want to insert it again
713                                         newKey = NULL;
714                                 }
715
716                                 // Remove the aborts and commit parts that were sent from the pending to send queue
717                                 uint size = pendingSendArbitrationRounds->size();
718                                 uint oldcount = 0;
719                                 for (uint i = 0; i < size; i++) {
720                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
721                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
722
723                                         if (!round->isDoneSending()) {
724                                                 //Add part back in
725                                                 pendingSendArbitrationRounds->set(oldcount++,
726                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
727                                         }
728                                 }
729                                 pendingSendArbitrationRounds->setSize(oldcount);
730
731                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
732                                 while (trit->hasNext()) {
733                                         Transaction *transaction = trit->next();
734                                         transaction->resetServerFailure();
735
736                                         // Update which transactions parts still need to be sent
737                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
738
739                                         // Add the transaction status to the outstanding list
740                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
741
742                                         // Update the transaction status
743                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
744
745                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
746                                         if (transaction->didSendAllParts()) {
747                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
748                                                 pendingTransactionQueue->remove(transaction);
749                                         }
750                                 }
751                                 delete trit;
752                         } else {
753                                 // Reset which transaction to send
754                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
755                                 while (trit->hasNext()) {
756                                         Transaction *transaction = trit->next();
757                                         transaction->resetNextPartToSend();
758
759                                         // Set the transaction sequence number back to nothing
760                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
761                                                 transaction->setSequenceNumber(-1);
762                                         }
763                                 }
764                                 delete trit;
765                         }
766
767                         // Clear the sent data in preparation for next send
768                         pendingSendArbitrationEntriesToDelete->clear();
769                         transactionPartsSent->clear();
770
771                         if (newSlots->length() != 0) {
772                                 // insert into the local block chain
773                                 validateAndUpdate(newSlots, true);
774                         }
775                 }
776         } catch (ServerException *e) {
777                 if (e->getType() != ServerException_TypeInputTimeout) {
778                         // Nothing was able to be sent to the server so just clear these data structures
779                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
780                         while (trit->hasNext()) {
781                                 Transaction *transaction = trit->next();
782                                 transaction->resetNextPartToSend();
783
784                                 // Set the transaction sequence number back to nothing
785                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
786                                         transaction->setSequenceNumber(-1);
787                                 }
788                         }
789                         delete trit;
790                 } else {
791                         // There was a partial send to the server
792                         hadPartialSendToServer = true;
793
794                         // Nothing was able to be sent to the server so just clear these data structures
795                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
796                         while (trit->hasNext()) {
797                                 Transaction *transaction = trit->next();
798                                 transaction->resetNextPartToSend();
799                                 transaction->setServerFailure();
800                         }
801                         delete trit;
802                 }
803
804                 pendingSendArbitrationEntriesToDelete->clear();
805                 transactionPartsSent->clear();
806
807                 throw e;
808         }
809
810         return newKey == NULL;
811 }
812
813 bool Table::updateFromLocal(int64_t machineId) {
814         if (!localCommunicationTable->contains(machineId))
815                 return false;
816
817         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
818
819         // Get the size of the send data
820         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
821
822         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
823         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
824                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
825         }
826
827         Array<char> *sendData = new Array<char>(sendDataSize);
828         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
829
830         // Encode the data
831         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
832         bbEncode->putInt(0);
833
834         // Send by local
835         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
836         localSequenceNumber++;
837
838         if (returnData == NULL) {
839                 // Could not contact server
840                 return false;
841         }
842
843         // Decode the data
844         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
845         int numberOfEntries = bbDecode->getInt();
846
847         for (int i = 0; i < numberOfEntries; i++) {
848                 char type = bbDecode->get();
849                 if (type == TypeAbort) {
850                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
851                         processEntry(abort);
852                 } else if (type == TypeCommitPart) {
853                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
854                         processEntry(commitPart);
855                 }
856         }
857
858         updateLiveStateFromLocal();
859
860         return true;
861 }
862
863 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
864
865         // Get the devices local communications
866         if (!localCommunicationTable->contains(transaction->getArbitrator()))
867                 return Pair<bool, bool>(true, false);
868
869         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
870
871         // Get the size of the send data
872         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
873         {
874                 Vector<TransactionPart *> *tParts = transaction->getParts();
875                 uint tPartsSize = tParts->size();
876                 for (uint i = 0; i < tPartsSize; i++) {
877                         TransactionPart *part = tParts->get(i);
878                         sendDataSize += part->getSize();
879                 }
880         }
881
882         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
883         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
884                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
885         }
886
887         // Make the send data size
888         Array<char> *sendData = new Array<char>(sendDataSize);
889         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
890
891         // Encode the data
892         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
893         bbEncode->putInt(transaction->getParts()->size());
894         {
895                 Vector<TransactionPart *> *tParts = transaction->getParts();
896                 uint tPartsSize = tParts->size();
897                 for (uint i = 0; i < tPartsSize; i++) {
898                         TransactionPart *part = tParts->get(i);
899                         part->encode(bbEncode);
900                 }
901         }
902
903         // Send by local
904         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
905         localSequenceNumber++;
906
907         if (returnData == NULL) {
908                 // Could not contact server
909                 return Pair<bool, bool>(true, false);
910         }
911
912         // Decode the data
913         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
914         bool didCommit = bbDecode->get() == 1;
915         bool couldArbitrate = bbDecode->get() == 1;
916         int numberOfEntries = bbDecode->getInt();
917         bool foundAbort = false;
918
919         for (int i = 0; i < numberOfEntries; i++) {
920                 char type = bbDecode->get();
921                 if (type == TypeAbort) {
922                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
923
924                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
925                                 foundAbort = true;
926                         }
927
928                         processEntry(abort);
929                 } else if (type == TypeCommitPart) {
930                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
931                         processEntry(commitPart);
932                 }
933         }
934
935         updateLiveStateFromLocal();
936
937         if (couldArbitrate) {
938                 TransactionStatus *status =  transaction->getTransactionStatus();
939                 if (didCommit) {
940                         status->setStatus(TransactionStatus_StatusCommitted);
941                 } else {
942                         status->setStatus(TransactionStatus_StatusAborted);
943                 }
944         } else {
945                 TransactionStatus *status =  transaction->getTransactionStatus();
946                 if (foundAbort) {
947                         status->setStatus(TransactionStatus_StatusAborted);
948                 } else {
949                         status->setStatus(TransactionStatus_StatusCommitted);
950                 }
951         }
952
953         return Pair<bool, bool>(false, true);
954 }
955
956 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
957         // Decode the data
958         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
959         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
960         int numberOfParts = bbDecode->getInt();
961
962         // If we did commit a transaction or not
963         bool didCommit = false;
964         bool couldArbitrate = false;
965
966         if (numberOfParts != 0) {
967
968                 // decode the transaction
969                 Transaction *transaction = new Transaction();
970                 for (int i = 0; i < numberOfParts; i++) {
971                         bbDecode->get();
972                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
973                         transaction->addPartDecode(newPart);
974                 }
975
976                 // Arbitrate on transaction and pull relevant return data
977                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
978                 couldArbitrate = localArbitrateReturn.getFirst();
979                 didCommit = localArbitrateReturn.getSecond();
980
981                 updateLiveStateFromLocal();
982
983                 // Transaction was sent to the server so keep track of it to prevent double commit
984                 if (transaction->getSequenceNumber() != -1) {
985                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
986                 }
987         }
988
989         // The data to send back
990         int returnDataSize = 0;
991         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
992
993         // Get the aborts to send back
994         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
995         {
996                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
997                 while (abortit->hasNext())
998                         abortLocalSequenceNumbers->add(abortit->next());
999                 delete abortit;
1000         }
1001
1002         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1003
1004         uint asize = abortLocalSequenceNumbers->size();
1005         for (uint i = 0; i < asize; i++) {
1006                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1007                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1008                         continue;
1009                 }
1010
1011                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1012                 unseenArbitrations->add(abort);
1013                 returnDataSize += abort->getSize();
1014         }
1015
1016         // Get the commits to send back
1017         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1018         if (commitForClientTable != NULL) {
1019                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1020                 {
1021                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1022                         while (commitit->hasNext())
1023                                 commitLocalSequenceNumbers->add(commitit->next());
1024                         delete commitit;
1025                 }
1026                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1027
1028                 uint clsSize = commitLocalSequenceNumbers->size();
1029                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1030                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1031                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1032
1033                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1034                                 continue;
1035                         }
1036
1037                         {
1038                                 Vector<CommitPart *> *parts = commit->getParts();
1039                                 uint nParts = parts->size();
1040                                 for (uint i = 0; i < nParts; i++) {
1041                                         CommitPart *commitPart = parts->get(i);
1042                                         unseenArbitrations->add(commitPart);
1043                                         returnDataSize += commitPart->getSize();
1044                                 }
1045                         }
1046                 }
1047         }
1048
1049         // Number of arbitration entries to decode
1050         returnDataSize += 2 * sizeof(int32_t);
1051
1052         // bool of did commit or not
1053         if (numberOfParts != 0) {
1054                 returnDataSize += sizeof(char);
1055         }
1056
1057         // Data to send Back
1058         Array<char> *returnData = new Array<char>(returnDataSize);
1059         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1060
1061         if (numberOfParts != 0) {
1062                 if (didCommit) {
1063                         bbEncode->put((char)1);
1064                 } else {
1065                         bbEncode->put((char)0);
1066                 }
1067                 if (couldArbitrate) {
1068                         bbEncode->put((char)1);
1069                 } else {
1070                         bbEncode->put((char)0);
1071                 }
1072         }
1073
1074         bbEncode->putInt(unseenArbitrations->size());
1075         uint size = unseenArbitrations->size();
1076         for (uint i = 0; i < size; i++) {
1077                 Entry *entry = unseenArbitrations->get(i);
1078                 entry->encode(bbEncode);
1079         }
1080
1081         localSequenceNumber++;
1082         return returnData;
1083 }
1084
1085 /** Checks whether a given slot was sent using new slots in
1086                 array. Returns true if sent and false otherwise.  */
1087
1088 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1089         uint size = array->length();
1090         for (uint i = 0; i < size; i++) {
1091                 Slot *s = array->get(i);
1092                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1093                         return true;
1094                 }
1095         }
1096         
1097         //Also need to see if other machines acknowledged our message
1098         for (uint i = 0; i < size; i++) {
1099                 Slot *s = array->get(i);
1100                 
1101                 // Process each entry in the slot
1102                 Vector<Entry *> *entries = s->getEntries();
1103                 uint eSize = entries->size();
1104                 for (uint ei = 0; ei < eSize; ei++) {
1105                         Entry *entry = entries->get(ei);
1106                         
1107                         if (entry->getType() == TypeLastMessage) {
1108                                 LastMessage *lastMessage = (LastMessage *)entry;
1109                                 
1110                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1111                                         return true;
1112                                 }
1113                         }
1114                 }
1115         }
1116         //Not found
1117         return false;
1118 }
1119
1120 /** Method tries to send slot to server.  Returns status in tuple.
1121                 isInserted returns whether last un-acked send (if any) was
1122                 successful.  Returns whether send was confirmed.x
1123  */
1124
1125 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1126         attemptedToSendToServer = true;
1127
1128         *array = cloud->putSlot(slot, newSize);
1129         if (*array == NULL) {
1130                 *array = new Array<Slot *>(1);
1131                 (*array)->set(0, slot);
1132                 rejectedSlotVector->clear();
1133                 *isInserted = false;
1134                 return true;
1135         } else {
1136                 if ((*array)->length() == 0) {
1137                         throw new Error("Server Error: Did not send any slots");
1138                 }
1139
1140                 if (hadPartialSendToServer) {
1141                         *isInserted = checkSend(*array, slot);
1142
1143                         if (!(*isInserted)) {
1144                                 rejectedSlotVector->add(slot->getSequenceNumber());
1145                         }
1146                         
1147                         return false;
1148                 } else {
1149                         rejectedSlotVector->add(slot->getSequenceNumber());
1150                         *isInserted = false;
1151                         return false;
1152                 }
1153         }
1154 }
1155
1156 /**
1157  * Returns true if a resize was needed but not done.
1158  */
1159 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1160         newSize = 0;//special value to indicate no resize
1161         if (liveSlotCount > bufferResizeThreshold) {
1162                 resize = true;//Resize is forced
1163         }
1164
1165         if (resize) {
1166                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1167                 TableStatus *status = new TableStatus(slot, newSize);
1168                 slot->addEntry(status);
1169         }
1170
1171         // Fill with rejected slots first before doing anything else
1172         doRejectedMessages(slot);
1173
1174         // Do mandatory rescue of entries
1175         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1176
1177         // Extract working variables
1178         bool needsResize = mandatoryRescueReturn.getFirst();
1179         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1180         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1181
1182         if (needsResize && !resize) {
1183                 // We need to resize but we are not resizing so return true to force on retry
1184                 return true;
1185         }
1186
1187         insertedKey = false;
1188         if (newKeyEntry != NULL) {
1189                 newKeyEntry->setSlot(slot);
1190                 if (slot->hasSpace(newKeyEntry)) {
1191                         slot->addEntry(newKeyEntry);
1192                         insertedKey = true;
1193                 }
1194         }
1195
1196         // Clear the transactions, aborts and commits that were sent previously
1197         transactionPartsSent->clear();
1198         pendingSendArbitrationEntriesToDelete->clear();
1199         uint size = pendingSendArbitrationRounds->size();
1200         for (uint i = 0; i < size; i++) {
1201                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1202                 bool isFull = false;
1203                 round->generateParts();
1204                 Vector<Entry *> *parts = round->getParts();
1205
1206                 // Insert pending arbitration data
1207                 uint vsize = parts->size();
1208                 for (uint vi = 0; vi < vsize; vi++) {
1209                         Entry *arbitrationData = parts->get(vi);
1210
1211                         // If it is an abort then we need to set some information
1212                         if (arbitrationData->getType() == TypeAbort) {
1213                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1214                         }
1215
1216                         if (!slot->hasSpace(arbitrationData)) {
1217                                 // No space so cant do anything else with these data entries
1218                                 isFull = true;
1219                                 break;
1220                         }
1221
1222                         // Add to this current slot and add it to entries to delete
1223                         slot->addEntry(arbitrationData);
1224                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1225                 }
1226
1227                 if (isFull) {
1228                         break;
1229                 }
1230         }
1231
1232         if (pendingTransactionQueue->size() > 0) {
1233                 Transaction *transaction = pendingTransactionQueue->get(0);
1234                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1235                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1236                         transaction->setSequenceNumber(slot->getSequenceNumber());
1237                 }
1238
1239                 while (true) {
1240                         TransactionPart *part = transaction->getNextPartToSend();
1241                         if (part == NULL) {
1242                                 // Ran out of parts to send for this transaction so move on
1243                                 break;
1244                         }
1245
1246                         if (slot->hasSpace(part)) {
1247                                 slot->addEntry(part);
1248                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1249                                 if (partsSent == NULL) {
1250                                         partsSent = new Vector<int32_t>();
1251                                         transactionPartsSent->put(transaction, partsSent);
1252                                 }
1253                                 partsSent->add(part->getPartNumber());
1254                                 transactionPartsSent->put(transaction, partsSent);
1255                         } else {
1256                                 break;
1257                         }
1258                 }
1259         }
1260
1261         // Fill the remainder of the slot with rescue data
1262         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1263
1264         return false;
1265 }
1266
1267 void Table::doRejectedMessages(Slot *s) {
1268         if (!rejectedSlotVector->isEmpty()) {
1269                 /* TODO: We should avoid generating a rejected message entry if
1270                  * there is already a sufficient entry in the queue (e->g->,
1271                  * equalsto value of true and same sequence number)->  */
1272
1273                 int64_t old_seqn = rejectedSlotVector->get(0);
1274                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1275                         int64_t new_seqn = rejectedSlotVector->lastElement();
1276                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1277                         s->addEntry(rm);
1278                 } else {
1279                         int64_t prev_seqn = -1;
1280                         uint i = 0;
1281                         /* Go through list of missing messages */
1282                         for (; i < rejectedSlotVector->size(); i++) {
1283                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1284                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1285                                 if (s_msg != NULL)
1286                                         break;
1287                                 prev_seqn = curr_seqn;
1288                         }
1289                         /* Generate rejected message entry for missing messages */
1290                         if (prev_seqn != -1) {
1291                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1292                                 s->addEntry(rm);
1293                         }
1294                         /* Generate rejected message entries for present messages */
1295                         for (; i < rejectedSlotVector->size(); i++) {
1296                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1297                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1298                                 int64_t machineid = s_msg->getMachineID();
1299                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1300                                 s->addEntry(rm);
1301                         }
1302                 }
1303         }
1304 }
1305
1306 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1307         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1308         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1309         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1310                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1311         }
1312
1313         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1314         bool seenLiveSlot = false;
1315         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1316         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1317
1318
1319         // Mandatory Rescue
1320         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1321                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1322                 // Push slot number forward
1323                 if (!seenLiveSlot) {
1324                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1325                 }
1326
1327                 if (!previousSlot->isLive()) {
1328                         continue;
1329                 }
1330
1331                 // We have seen a live slot
1332                 seenLiveSlot = true;
1333
1334                 // Get all the live entries for a slot
1335                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1336
1337                 // Iterate over all the live entries and try to rescue them
1338                 uint lESize = liveEntries->size();
1339                 for (uint i = 0; i < lESize; i++) {
1340                         Entry *liveEntry = liveEntries->get(i);
1341                         if (slot->hasSpace(liveEntry)) {
1342                                 // Enough space to rescue the entry
1343                                 slot->addEntry(liveEntry);
1344                         } else if (currentSequenceNumber == firstIfFull) {
1345                                 //if there's no space but the entry is about to fall off the queue
1346                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1347                         }
1348                 }
1349         }
1350
1351         // Did not resize
1352         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1353 }
1354
1355 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1356         /* now go through live entries from least to greatest sequence number until
1357          * either all live slots added, or the slot doesn't have enough room
1358          * for SKIP_THRESHOLD consecutive entries*/
1359         int skipcount = 0;
1360         int64_t newestseqnum = buffer->getNewestSeqNum();
1361         for (; seqn <= newestseqnum; seqn++) {
1362                 Slot *prevslot = buffer->getSlot(seqn);
1363                 //Push slot number forward
1364                 if (!seenliveslot)
1365                         oldestLiveSlotSequenceNumver = seqn;
1366
1367                 if (!prevslot->isLive())
1368                         continue;
1369                 seenliveslot = true;
1370                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1371                 uint lESize = liveentries->size();
1372                 for (uint i = 0; i < lESize; i++) {
1373                         Entry *liveentry = liveentries->get(i);
1374                         if (s->hasSpace(liveentry))
1375                                 s->addEntry(liveentry);
1376                         else {
1377                                 skipcount++;
1378                                 if (skipcount > Table_SKIP_THRESHOLD)
1379                                         goto donesearch;
1380                         }
1381                 }
1382         }
1383 donesearch:
1384         ;
1385 }
1386
1387 /**
1388  * Checks for malicious activity and updates the local copy of the block chain->
1389  */
1390 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1391         // The cloud communication layer has checked slot HMACs already
1392         // before decoding
1393         if (newSlots->length() == 0) {
1394                 return;
1395         }
1396
1397         // Make sure all slots are newer than the last largest slot this
1398         // client has seen
1399         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1400         if (firstSeqNum <= sequenceNumber) {
1401                 throw new Error("Server Error: Sent older slots!");
1402         }
1403
1404         // Create an object that can access both new slots and slots in our
1405         // local chain without committing slots to our local chain
1406         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1407
1408         // Check that the HMAC chain is not broken
1409         checkHMACChain(indexer, newSlots);
1410
1411         // Set to keep track of messages from clients
1412         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1413         {
1414                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1415                 while (lmit->hasNext())
1416                         machineSet->add(lmit->next());
1417                 delete lmit;
1418         }
1419
1420         // Process each slots data
1421         {
1422                 uint numSlots = newSlots->length();
1423                 for (uint i = 0; i < numSlots; i++) {
1424                         Slot *slot = newSlots->get(i);
1425                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1426                         updateExpectedSize();
1427                 }
1428         }
1429
1430         // If there is a gap, check to see if the server sent us
1431         // everything->
1432         if (firstSeqNum != (sequenceNumber + 1)) {
1433
1434                 // Check the size of the slots that were sent down by the server->
1435                 // Can only check the size if there was a gap
1436                 checkNumSlots(newSlots->length());
1437
1438                 // Since there was a gap every machine must have pushed a slot or
1439                 // must have a last message message-> If not then the server is
1440                 // hiding slots
1441                 if (!machineSet->isEmpty()) {
1442                         throw new Error("Missing record for machines: ");
1443                 }
1444         }
1445
1446         // Update the size of our local block chain->
1447         commitNewMaxSize();
1448
1449         // Commit new to slots to the local block chain->
1450         {
1451                 uint numSlots = newSlots->length();
1452                 for (uint i = 0; i < numSlots; i++) {
1453                         Slot *slot = newSlots->get(i);
1454
1455                         // Insert this slot into our local block chain copy->
1456                         buffer->putSlot(slot);
1457
1458                         // Keep track of how many slots are currently live (have live data
1459                         // in them)->
1460                         liveSlotCount++;
1461                 }
1462         }
1463         // Get the sequence number of the latest slot in the system
1464         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1465         updateLiveStateFromServer();
1466
1467         // No Need to remember after we pulled from the server
1468         offlineTransactionsCommittedAndAtServer->clear();
1469
1470         // This is invalidated now
1471         hadPartialSendToServer = false;
1472 }
1473
1474 void Table::updateLiveStateFromServer() {
1475         // Process the new transaction parts
1476         processNewTransactionParts();
1477
1478         // Do arbitration on new transactions that were received
1479         arbitrateFromServer();
1480
1481         // Update all the committed keys
1482         bool didCommitOrSpeculate = updateCommittedTable();
1483
1484         // Delete the transactions that are now dead
1485         updateLiveTransactionsAndStatus();
1486
1487         // Do speculations
1488         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1489         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1490 }
1491
1492 void Table::updateLiveStateFromLocal() {
1493         // Update all the committed keys
1494         bool didCommitOrSpeculate = updateCommittedTable();
1495
1496         // Delete the transactions that are now dead
1497         updateLiveTransactionsAndStatus();
1498
1499         // Do speculations
1500         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1501         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1502 }
1503
1504 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1505         int64_t prevslots = firstSequenceNumber;
1506
1507         if (didFindTableStatus) {
1508         } else {
1509                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1510         }
1511
1512         didFindTableStatus = true;
1513         currMaxSize = numberOfSlots;
1514 }
1515
1516 void Table::updateExpectedSize() {
1517         expectedsize++;
1518
1519         if (expectedsize > currMaxSize) {
1520                 expectedsize = currMaxSize;
1521         }
1522 }
1523
1524
1525 /**
1526  * Check the size of the block chain to make sure there are enough
1527  * slots sent back by the server-> This is only called when we have a
1528  * gap between the slots that we have locally and the slots sent by
1529  * the server therefore in the slots sent by the server there will be
1530  * at least 1 Table status message
1531  */
1532 void Table::checkNumSlots(int numberOfSlots) {
1533         if (numberOfSlots != expectedsize) {
1534                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1535         }
1536 }
1537
1538 /**
1539  * Update the size of of the local buffer if it is needed->
1540  */
1541 void Table::commitNewMaxSize() {
1542         didFindTableStatus = false;
1543
1544         // Resize the local slot buffer
1545         if (numberOfSlots != currMaxSize) {
1546                 buffer->resize((int32_t)currMaxSize);
1547         }
1548
1549         // Change the number of local slots to the new size
1550         numberOfSlots = (int32_t)currMaxSize;
1551
1552         // Recalculate the resize threshold since the size of the local
1553         // buffer has changed
1554         setResizeThreshold();
1555 }
1556
1557 /**
1558  * Process the new transaction parts from this latest round of slots
1559  * received from the server
1560  */
1561 void Table::processNewTransactionParts() {
1562
1563         if (newTransactionParts->size() == 0) {
1564                 // Nothing new to process
1565                 return;
1566         }
1567
1568         // Iterate through all the machine Ids that we received new parts
1569         // for
1570         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1571         while (tpit->hasNext()) {
1572                 int64_t machineId = tpit->next();
1573                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1574
1575                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1576                 // Iterate through all the parts for that machine Id
1577                 while (ptit->hasNext()) {
1578                         Pair<int64_t, int32_t> *partId = ptit->next();
1579                         TransactionPart *part = parts->get(partId);
1580
1581                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1582                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1583                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1584                                         // Set dead the transaction part
1585                                         part->setDead();
1586                                         continue;
1587                                 }
1588                         }
1589
1590                         // Get the transaction object for that sequence number
1591                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1592
1593                         if (transaction == NULL) {
1594                                 // This is a new transaction that we dont have so make a new one
1595                                 transaction = new Transaction();
1596
1597                                 // Insert this new transaction into the live tables
1598                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1599                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1600                         }
1601
1602                         // Add that part to the transaction
1603                         transaction->addPartDecode(part);
1604                 }
1605                 delete ptit;
1606         }
1607         delete tpit;
1608         // Clear all the new transaction parts in preparation for the next
1609         // time the server sends slots
1610         newTransactionParts->clear();
1611 }
1612
1613 void Table::arbitrateFromServer() {
1614         if (liveTransactionBySequenceNumberTable->size() == 0) {
1615                 // Nothing to arbitrate on so move on
1616                 return;
1617         }
1618
1619         // Get the transaction sequence numbers and sort from oldest to newest
1620         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1621         {
1622                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1623                 while (trit->hasNext())
1624                         transactionSequenceNumbers->add(trit->next());
1625                 delete trit;
1626         }
1627         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1628
1629         // Collection of key value pairs that are
1630         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1631
1632         // The last transaction arbitrated on
1633         int64_t lastTransactionCommitted = -1;
1634         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1635         uint tsnSize = transactionSequenceNumbers->size();
1636         for (uint i = 0; i < tsnSize; i++) {
1637                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1638                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1639
1640                 // Check if this machine arbitrates for this transaction if not
1641                 // then we cant arbitrate this transaction
1642                 if (transaction->getArbitrator() != localMachineId) {
1643                         continue;
1644                 }
1645
1646                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1647                         continue;
1648                 }
1649
1650                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1651                         // We have seen this already locally so dont commit again
1652                         continue;
1653                 }
1654
1655
1656                 if (!transaction->isComplete()) {
1657                         // Will arbitrate in incorrect order if we continue so just break
1658                         // Most likely this
1659                         break;
1660                 }
1661
1662
1663                 // update the largest transaction seen by arbitrator from server
1664                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1665                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1666                 } else {
1667                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1668                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1669                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1670                         }
1671                 }
1672
1673                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1674                         // Guard evaluated as true
1675
1676                         // Update the local changes so we can make the commit
1677                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1678                         while (kvit->hasNext()) {
1679                                 KeyValue *kv = kvit->next();
1680                                 speculativeTableTmp->put(kv->getKey(), kv);
1681                         }
1682                         delete kvit;
1683
1684                         // Update what the last transaction committed was for use in batch commit
1685                         lastTransactionCommitted = transactionSequenceNumber;
1686                 } else {
1687                         // Guard evaluated was false so create abort
1688                         // Create the abort
1689                         Abort *newAbort = new Abort(NULL,
1690                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1691                                                                                                                                         transaction->getSequenceNumber(),
1692                                                                                                                                         transaction->getMachineId(),
1693                                                                                                                                         transaction->getArbitrator(),
1694                                                                                                                                         localArbitrationSequenceNumber);
1695                         localArbitrationSequenceNumber++;
1696                         generatedAborts->add(newAbort);
1697
1698                         // Insert the abort so we can process
1699                         processEntry(newAbort);
1700                 }
1701
1702                 lastSeqNumArbOn = transactionSequenceNumber;
1703         }
1704
1705         Commit *newCommit = NULL;
1706
1707         // If there is something to commit
1708         if (speculativeTableTmp->size() != 0) {
1709                 // Create the commit and increment the commit sequence number
1710                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1711                 localArbitrationSequenceNumber++;
1712
1713                 // Add all the new keys to the commit
1714                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1715                 while (spit->hasNext()) {
1716                         IoTString *string = spit->next();
1717                         KeyValue *kv = speculativeTableTmp->get(string);
1718                         newCommit->addKV(kv);
1719                 }
1720                 delete spit;
1721
1722                 // create the commit parts
1723                 newCommit->createCommitParts();
1724
1725                 // Append all the commit parts to the end of the pending queue
1726                 // waiting for sending to the server
1727                 // Insert the commit so we can process it
1728                 Vector<CommitPart *> *parts = newCommit->getParts();
1729                 uint partsSize = parts->size();
1730                 for (uint i = 0; i < partsSize; i++) {
1731                         CommitPart *commitPart = parts->get(i);
1732                         processEntry(commitPart);
1733                 }
1734         }
1735
1736         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1737                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1738                 pendingSendArbitrationRounds->add(arbitrationRound);
1739
1740                 if (compactArbitrationData()) {
1741                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1742                         if (newArbitrationRound->getCommit() != NULL) {
1743                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1744                                 uint partsSize = parts->size();
1745                                 for (uint i = 0; i < partsSize; i++) {
1746                                         CommitPart *commitPart = parts->get(i);
1747                                         processEntry(commitPart);
1748                                 }
1749                         }
1750                 }
1751         }
1752 }
1753
1754 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1755
1756         // Check if this machine arbitrates for this transaction if not then
1757         // we cant arbitrate this transaction
1758         if (transaction->getArbitrator() != localMachineId) {
1759                 return Pair<bool, bool>(false, false);
1760         }
1761
1762         if (!transaction->isComplete()) {
1763                 // Will arbitrate in incorrect order if we continue so just break
1764                 // Most likely this
1765                 return Pair<bool, bool>(false, false);
1766         }
1767
1768         if (transaction->getMachineId() != localMachineId) {
1769                 // dont do this check for local transactions
1770                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1771                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1772                                 // We've have already seen this from the server
1773                                 return Pair<bool, bool>(false, false);
1774                         }
1775                 }
1776         }
1777
1778         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1779                 // Guard evaluated as true Create the commit and increment the
1780                 // commit sequence number
1781                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1782                 localArbitrationSequenceNumber++;
1783
1784                 // Update the local changes so we can make the commit
1785                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1786                 while (kvit->hasNext()) {
1787                         KeyValue *kv = kvit->next();
1788                         newCommit->addKV(kv);
1789                 }
1790                 delete kvit;
1791
1792                 // create the commit parts
1793                 newCommit->createCommitParts();
1794
1795                 // Append all the commit parts to the end of the pending queue
1796                 // waiting for sending to the server
1797                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1798                 pendingSendArbitrationRounds->add(arbitrationRound);
1799
1800                 if (compactArbitrationData()) {
1801                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1802                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1803                         uint partsSize = parts->size();
1804                         for (uint i = 0; i < partsSize; i++) {
1805                                 CommitPart *commitPart = parts->get(i);
1806                                 processEntry(commitPart);
1807                         }
1808                 } else {
1809                         // Insert the commit so we can process it
1810                         Vector<CommitPart *> *parts = newCommit->getParts();
1811                         uint partsSize = parts->size();
1812                         for (uint i = 0; i < partsSize; i++) {
1813                                 CommitPart *commitPart = parts->get(i);
1814                                 processEntry(commitPart);
1815                         }
1816                 }
1817
1818                 if (transaction->getMachineId() == localMachineId) {
1819                         TransactionStatus *status = transaction->getTransactionStatus();
1820                         if (status != NULL) {
1821                                 status->setStatus(TransactionStatus_StatusCommitted);
1822                         }
1823                 }
1824
1825                 updateLiveStateFromLocal();
1826                 return Pair<bool, bool>(true, true);
1827         } else {
1828                 if (transaction->getMachineId() == localMachineId) {
1829                         // For locally created messages update the status
1830                         // Guard evaluated was false so create abort
1831                         TransactionStatus *status = transaction->getTransactionStatus();
1832                         if (status != NULL) {
1833                                 status->setStatus(TransactionStatus_StatusAborted);
1834                         }
1835                 } else {
1836                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1837
1838                         // Create the abort
1839                         Abort *newAbort = new Abort(NULL,
1840                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1841                                                                                                                                         -1,
1842                                                                                                                                         transaction->getMachineId(),
1843                                                                                                                                         transaction->getArbitrator(),
1844                                                                                                                                         localArbitrationSequenceNumber);
1845                         localArbitrationSequenceNumber++;
1846                         addAbortSet->add(newAbort);
1847
1848                         // Append all the commit parts to the end of the pending queue
1849                         // waiting for sending to the server
1850                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1851                         pendingSendArbitrationRounds->add(arbitrationRound);
1852
1853                         if (compactArbitrationData()) {
1854                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1855
1856                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1857                                 uint partsSize = parts->size();
1858                                 for (uint i = 0; i < partsSize; i++) {
1859                                         CommitPart *commitPart = parts->get(i);
1860                                         processEntry(commitPart);
1861                                 }
1862                         }
1863                 }
1864
1865                 updateLiveStateFromLocal();
1866                 return Pair<bool, bool>(true, false);
1867         }
1868 }
1869
1870 /**
1871  * Compacts the arbitration data my merging commits and aggregating
1872  * aborts so that a single large push of commits can be done instead
1873  * of many small updates
1874  */
1875 bool Table::compactArbitrationData() {
1876         if (pendingSendArbitrationRounds->size() < 2) {
1877                 // Nothing to compact so do nothing
1878                 return false;
1879         }
1880
1881         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1882         if (lastRound->getDidSendPart()) {
1883                 return false;
1884         }
1885
1886         bool hadCommit = (lastRound->getCommit() == NULL);
1887         bool gotNewCommit = false;
1888
1889         uint numberToDelete = 1;
1890         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1891                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1892
1893                 if (round->isFull() || round->getDidSendPart()) {
1894                         // Stop since there is a part that cannot be compacted and we
1895                         // need to compact in order
1896                         break;
1897                 }
1898
1899                 if (round->getCommit() == NULL) {
1900                         // Try compacting aborts only
1901                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1902                         if (newSize > ArbitrationRound_MAX_PARTS) {
1903                                 // Cant compact since it would be too large
1904                                 break;
1905                         }
1906                         lastRound->addAborts(round->getAborts());
1907                 } else {
1908                         // Create a new larger commit
1909                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1910                         localArbitrationSequenceNumber++;
1911
1912                         // Create the commit parts so that we can count them
1913                         newCommit->createCommitParts();
1914
1915                         // Calculate the new size of the parts
1916                         int newSize = newCommit->getNumberOfParts();
1917                         newSize += lastRound->getAbortsCount();
1918                         newSize += round->getAbortsCount();
1919
1920                         if (newSize > ArbitrationRound_MAX_PARTS) {
1921                                 // Cant compact since it would be too large
1922                                 break;
1923                         }
1924
1925                         // Set the new compacted part
1926                         lastRound->setCommit(newCommit);
1927                         lastRound->addAborts(round->getAborts());
1928                         gotNewCommit = true;
1929                 }
1930
1931                 numberToDelete++;
1932         }
1933
1934         if (numberToDelete != 1) {
1935                 // If there is a compaction
1936                 // Delete the previous pieces that are now in the new compacted piece
1937                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1938                         pendingSendArbitrationRounds->clear();
1939                 } else {
1940                         for (uint i = 0; i < numberToDelete; i++) {
1941                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1942                         }
1943                 }
1944
1945                 // Add the new compacted into the pending to send list
1946                 pendingSendArbitrationRounds->add(lastRound);
1947
1948                 // Should reinsert into the commit processor
1949                 if (hadCommit && gotNewCommit) {
1950                         return true;
1951                 }
1952         }
1953
1954         return false;
1955 }
1956
1957 /**
1958  * Update all the commits and the committed tables, sets dead the dead
1959  * transactions
1960  */
1961 bool Table::updateCommittedTable() {
1962         if (newCommitParts->size() == 0) {
1963                 // Nothing new to process
1964                 return false;
1965         }
1966
1967         // Iterate through all the machine Ids that we received new parts for
1968         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
1969         while (partsit->hasNext()) {
1970                 int64_t machineId = partsit->next();
1971                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
1972
1973                 // Iterate through all the parts for that machine Id
1974                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
1975                 while (pairit->hasNext()) {
1976                         Pair<int64_t, int32_t> *partId = pairit->next();
1977                         CommitPart *part = parts->get(partId);
1978
1979                         // Get the transaction object for that sequence number
1980                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1981
1982                         if (commitForClientTable == NULL) {
1983                                 // This is the first commit from this device
1984                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1985                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1986                         }
1987
1988                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1989
1990                         if (commit == NULL) {
1991                                 // This is a new commit that we dont have so make a new one
1992                                 commit = new Commit();
1993
1994                                 // Insert this new commit into the live tables
1995                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1996                         }
1997
1998                         // Add that part to the commit
1999                         commit->addPartDecode(part);
2000                 }
2001                 delete pairit;
2002         }
2003         delete partsit;
2004
2005         // Clear all the new commits parts in preparation for the next time
2006         // the server sends slots
2007         newCommitParts->clear();
2008
2009         // If we process a new commit keep track of it for future use
2010         bool didProcessANewCommit = false;
2011
2012         // Process the commits one by one
2013         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2014         while (liveit->hasNext()) {
2015                 int64_t arbitratorId = liveit->next();
2016
2017                 // Get all the commits for a specific arbitrator
2018                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2019
2020                 // Sort the commits in order
2021                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2022                 {
2023                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2024                         while (clientit->hasNext())
2025                                 commitSequenceNumbers->add(clientit->next());
2026                         delete clientit;
2027                 }
2028
2029                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2030
2031                 // Get the last commit seen from this arbitrator
2032                 int64_t lastCommitSeenSequenceNumber = -1;
2033                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2034                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2035                 }
2036
2037                 // Go through each new commit one by one
2038                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2039                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2040                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2041
2042                         // Special processing if a commit is not complete
2043                         if (!commit->isComplete()) {
2044                                 if (i == (commitSequenceNumbers->size() - 1)) {
2045                                         // If there is an incomplete commit and this commit is the
2046                                         // latest one seen then this commit cannot be processed and
2047                                         // there are no other commits
2048                                         break;
2049                                 } else {
2050                                         // This is a commit that was already dead but parts of it
2051                                         // are still in the block chain (not flushed out yet)->
2052                                         // Delete it and move on
2053                                         commit->setDead();
2054                                         commitForClientTable->remove(commit->getSequenceNumber());
2055                                         continue;
2056                                 }
2057                         }
2058
2059                         // Update the last transaction that was updated if we can
2060                         if (commit->getTransactionSequenceNumber() != -1) {
2061                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2062                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2063                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2064                                 }
2065                         }
2066
2067                         // Update the last arbitration data that we have seen so far
2068                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2069                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2070                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2071                                         // Is larger
2072                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2073                                 }
2074                         } else {
2075                                 // Never seen any data from this arbitrator so record the first one
2076                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2077                         }
2078
2079                         // We have already seen this commit before so need to do the
2080                         // full processing on this commit
2081                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2082
2083                                 // Update the last transaction that was updated if we can
2084                                 if (commit->getTransactionSequenceNumber() != -1) {
2085                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2086                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2087                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2088                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2089                                         }
2090                                 }
2091
2092                                 continue;
2093                         }
2094
2095                         // If we got here then this is a brand new commit and needs full
2096                         // processing
2097                         // Get what commits should be edited, these are the commits that
2098                         // have live values for their keys
2099                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2100                         {
2101                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2102                                 while (kvit->hasNext()) {
2103                                         KeyValue *kv = kvit->next();
2104                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2105                                         if (commit != NULL)
2106                                                 commitsToEdit->add(commit);
2107                                 }
2108                                 delete kvit;
2109                         }
2110
2111                         // Update each previous commit that needs to be updated
2112                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2113                         while (commitit->hasNext()) {
2114                                 Commit *previousCommit = commitit->next();
2115
2116                                 // Only bother with live commits (TODO: Maybe remove this check)
2117                                 if (previousCommit->isLive()) {
2118
2119                                         // Update which keys in the old commits are still live
2120                                         {
2121                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2122                                                 while (kvit->hasNext()) {
2123                                                         KeyValue *kv = kvit->next();
2124                                                         previousCommit->invalidateKey(kv->getKey());
2125                                                 }
2126                                                 delete kvit;
2127                                         }
2128
2129                                         // if the commit is now dead then remove it
2130                                         if (!previousCommit->isLive()) {
2131                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2132                                         }
2133                                 }
2134                         }
2135                         delete commitit;
2136
2137                         // Update the last seen sequence number from this arbitrator
2138                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2139                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2140                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2141                                 }
2142                         } else {
2143                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2144                         }
2145
2146                         // We processed a new commit that we havent seen before
2147                         didProcessANewCommit = true;
2148
2149                         // Update the committed table of keys and which commit is using which key
2150                         {
2151                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2152                                 while (kvit->hasNext()) {
2153                                         KeyValue *kv = kvit->next();
2154                                         committedKeyValueTable->put(kv->getKey(), kv);
2155                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2156                                 }
2157                                 delete kvit;
2158                         }
2159                 }
2160         }
2161         delete liveit;
2162
2163         return didProcessANewCommit;
2164 }
2165
2166 /**
2167  * Create the speculative table from transactions that are still live
2168  * and have come from the cloud
2169  */
2170 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2171         if (liveTransactionBySequenceNumberTable->size() == 0) {
2172                 // There is nothing to speculate on
2173                 return false;
2174         }
2175
2176         // Create a list of the transaction sequence numbers and sort them
2177         // from oldest to newest
2178         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2179         {
2180                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2181                 while (trit->hasNext())
2182                         transactionSequenceNumbersSorted->add(trit->next());
2183                 delete trit;
2184         }
2185
2186         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2187
2188         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2189
2190
2191         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2192                 // If there is a gap in the transaction sequence numbers then
2193                 // there was a commit or an abort of a transaction OR there was a
2194                 // new commit (Could be from offline commit) so a redo the
2195                 // speculation from scratch
2196
2197                 // Start from scratch
2198                 speculatedKeyValueTable->clear();
2199                 lastTransactionSequenceNumberSpeculatedOn = -1;
2200                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2201         }
2202
2203         // Remember the front of the transaction list
2204         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2205
2206         // Find where to start arbitration from
2207         uint startIndex = 0;
2208
2209         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2210                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2211                         break;
2212         startIndex++;
2213
2214         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2215                 // Make sure we are not out of bounds
2216                 return false;           // did not speculate
2217         }
2218
2219         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2220         bool didSkip = true;
2221
2222         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2223                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2224                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2225
2226                 if (!transaction->isComplete()) {
2227                         // If there is an incomplete transaction then there is nothing
2228                         // we can do add this transactions arbitrator to the list of
2229                         // arbitrators we should ignore
2230                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2231                         didSkip = true;
2232                         continue;
2233                 }
2234
2235                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2236                         continue;
2237                 }
2238
2239                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2240
2241                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2242                         // Guard evaluated to true so update the speculative table
2243                         {
2244                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2245                                 while (kvit->hasNext()) {
2246                                         KeyValue *kv = kvit->next();
2247                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2248                                 }
2249                                 delete kvit;
2250                         }
2251                 }
2252         }
2253
2254         if (didSkip) {
2255                 // Since there was a skip we need to redo the speculation next time around
2256                 lastTransactionSequenceNumberSpeculatedOn = -1;
2257                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2258         }
2259
2260         // We did some speculation
2261         return true;
2262 }
2263
2264 /**
2265  * Create the pending transaction speculative table from transactions
2266  * that are still in the pending transaction buffer
2267  */
2268 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2269         if (pendingTransactionQueue->size() == 0) {
2270                 // There is nothing to speculate on
2271                 return;
2272         }
2273
2274         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2275                 // need to reset on the pending speculation
2276                 lastPendingTransactionSpeculatedOn = NULL;
2277                 firstPendingTransaction = pendingTransactionQueue->get(0);
2278                 pendingTransactionSpeculatedKeyValueTable->clear();
2279         }
2280
2281         // Find where to start arbitration from
2282         uint startIndex = 0;
2283
2284         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2285                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2286                         break;
2287
2288         if (startIndex >= pendingTransactionQueue->size()) {
2289                 // Make sure we are not out of bounds
2290                 return;
2291         }
2292
2293         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2294                 Transaction *transaction = pendingTransactionQueue->get(i);
2295
2296                 lastPendingTransactionSpeculatedOn = transaction;
2297
2298                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2299                         // Guard evaluated to true so update the speculative table
2300                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2301                         while (kvit->hasNext()) {
2302                                 KeyValue *kv = kvit->next();
2303                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2304                         }
2305                         delete kvit;
2306                 }
2307         }
2308 }
2309
2310 /**
2311  * Set dead and remove from the live transaction tables the
2312  * transactions that are dead
2313  */
2314 void Table::updateLiveTransactionsAndStatus() {
2315         // Go through each of the transactions
2316         {
2317                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2318                 while (iter->hasNext()) {
2319                         int64_t key = iter->next();
2320                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2321
2322                         // Check if the transaction is dead
2323                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2324                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2325                                 // Set dead the transaction
2326                                 transaction->setDead();
2327
2328                                 // Remove the transaction from the live table
2329                                 iter->remove();
2330                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2331                         }
2332                 }
2333                 delete iter;
2334         }
2335
2336         // Go through each of the transactions
2337         {
2338                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2339                 while (iter->hasNext()) {
2340                         int64_t key = iter->next();
2341                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2342
2343                         // Check if the transaction is dead
2344                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2345                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2346                                 // Set committed
2347                                 status->setStatus(TransactionStatus_StatusCommitted);
2348
2349                                 // Remove
2350                                 iter->remove();
2351                         }
2352                 }
2353                 delete iter;
2354         }
2355 }
2356
2357 /**
2358  * Process this slot, entry by entry->  Also update the latest message sent by slot
2359  */
2360 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2361
2362         // Update the last message seen
2363         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2364
2365         // Process each entry in the slot
2366         Vector<Entry *> *entries = slot->getEntries();
2367         uint eSize = entries->size();
2368         for (uint ei = 0; ei < eSize; ei++) {
2369                 Entry *entry = entries->get(ei);
2370                 switch (entry->getType()) {
2371                 case TypeCommitPart:
2372                         processEntry((CommitPart *)entry);
2373                         break;
2374                 case TypeAbort:
2375                         processEntry((Abort *)entry);
2376                         break;
2377                 case TypeTransactionPart:
2378                         processEntry((TransactionPart *)entry);
2379                         break;
2380                 case TypeNewKey:
2381                         processEntry((NewKey *)entry);
2382                         break;
2383                 case TypeLastMessage:
2384                         processEntry((LastMessage *)entry, machineSet);
2385                         break;
2386                 case TypeRejectedMessage:
2387                         processEntry((RejectedMessage *)entry, indexer);
2388                         break;
2389                 case TypeTableStatus:
2390                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2391                         break;
2392                 default:
2393                         throw new Error("Unrecognized type: ");
2394                 }
2395         }
2396 }
2397
2398 /**
2399  * Update the last message that was sent for a machine Id
2400  */
2401 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2402         // Update what the last message received by a machine was
2403         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2404 }
2405
2406 /**
2407  * Add the new key to the arbitrators table and update the set of live
2408  * new keys (in case of a rescued new key message)
2409  */
2410 void Table::processEntry(NewKey *entry) {
2411         // Update the arbitrator table with the new key information
2412         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2413
2414         // Update what the latest live new key is
2415         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2416         if (oldNewKey != NULL) {
2417                 // Delete the old new key messages
2418                 oldNewKey->setDead();
2419         }
2420 }
2421
2422 /**
2423  * Process new table status entries and set dead the old ones as new
2424  * ones come in-> keeps track of the largest and smallest table status
2425  * seen in this current round of updating the local copy of the block
2426  * chain
2427  */
2428 void Table::processEntry(TableStatus *entry, int64_t seq) {
2429         int newNumSlots = entry->getMaxSlots();
2430         updateCurrMaxSize(newNumSlots);
2431         initExpectedSize(seq, newNumSlots);
2432
2433         if (liveTableStatus != NULL) {
2434                 // We have a larger table status so the old table status is no
2435                 // int64_ter alive
2436                 liveTableStatus->setDead();
2437         }
2438
2439         // Make this new table status the latest alive table status
2440         liveTableStatus = entry;
2441 }
2442
2443 /**
2444  * Check old messages to see if there is a block chain violation->
2445  * Also
2446  */
2447 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2448         int64_t oldSeqNum = entry->getOldSeqNum();
2449         int64_t newSeqNum = entry->getNewSeqNum();
2450         bool isequal = entry->getEqual();
2451         int64_t machineId = entry->getMachineID();
2452         int64_t seq = entry->getSequenceNumber();
2453
2454         // Check if we have messages that were supposed to be rejected in
2455         // our local block chain
2456         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2457                 // Get the slot
2458                 Slot *slot = indexer->getSlot(seqNum);
2459
2460                 if (slot != NULL) {
2461                         // If we have this slot make sure that it was not supposed to be
2462                         // a rejected slot
2463                         int64_t slotMachineId = slot->getMachineID();
2464                         if (isequal != (slotMachineId == machineId)) {
2465                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2466                         }
2467                 }
2468         }
2469
2470         // Create a list of clients to watch until they see this rejected
2471         // message entry->
2472         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2473         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2474         while (iter->hasNext()) {
2475                 // Machine ID for the last message entry
2476                 int64_t lastMessageEntryMachineId = iter->next();
2477
2478                 // We've seen it, don't need to continue to watch->  Our next
2479                 // message will implicitly acknowledge it->
2480                 if (lastMessageEntryMachineId == localMachineId) {
2481                         continue;
2482                 }
2483
2484                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2485                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2486
2487                 if (entrySequenceNumber < seq) {
2488                         // Add this rejected message to the set of messages that this
2489                         // machine ID did not see yet
2490                         addWatchVector(lastMessageEntryMachineId, entry);
2491                         // This client did not see this rejected message yet so add it
2492                         // to the watch set to monitor
2493                         deviceWatchSet->add(lastMessageEntryMachineId);
2494                 }
2495         }
2496         delete iter;
2497
2498         if (deviceWatchSet->isEmpty()) {
2499                 // This rejected message has been seen by all the clients so
2500                 entry->setDead();
2501         } else {
2502                 // We need to watch this rejected message
2503                 entry->setWatchSet(deviceWatchSet);
2504         }
2505 }
2506
2507 /**
2508  * Check if this abort is live, if not then save it so we can kill it
2509  * later-> update the last transaction number that was arbitrated on->
2510  */
2511 void Table::processEntry(Abort *entry) {
2512         if (entry->getTransactionSequenceNumber() != -1) {
2513                 // update the transaction status if it was sent to the server
2514                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2515                 if (status != NULL) {
2516                         status->setStatus(TransactionStatus_StatusAborted);
2517                 }
2518         }
2519
2520         // Abort has not been seen by the client it is for yet so we need to
2521         // keep track of it
2522
2523         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2524         if (previouslySeenAbort != NULL) {
2525                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2526         }
2527
2528         if (entry->getTransactionArbitrator() == localMachineId) {
2529                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2530         }
2531
2532         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2533                 // The machine already saw this so it is dead
2534                 entry->setDead();
2535                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2536                 liveAbortTable->remove(&abortid);
2537
2538                 if (entry->getTransactionArbitrator() == localMachineId) {
2539                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2540                 }
2541                 return;
2542         }
2543
2544         // Update the last arbitration data that we have seen so far
2545         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2546                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2547                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2548                         // Is larger
2549                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2550                 }
2551         } else {
2552                 // Never seen any data from this arbitrator so record the first one
2553                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2554         }
2555
2556         // Set dead a transaction if we can
2557         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2558
2559         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2560         if (transactionToSetDead != NULL) {
2561                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2562         }
2563
2564         // Update the last transaction sequence number that the arbitrator
2565         // arbitrated on
2566         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2567                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2568                 // Is a valid one
2569                 if (entry->getTransactionSequenceNumber() != -1) {
2570                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2571                 }
2572         }
2573 }
2574
2575 /**
2576  * Set dead the transaction part if that transaction is dead and keep
2577  * track of all new parts
2578  */
2579 void Table::processEntry(TransactionPart *entry) {
2580         // Check if we have already seen this transaction and set it dead OR
2581         // if it is not alive
2582         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2583                 // This transaction is dead, it was already committed or aborted
2584                 entry->setDead();
2585                 return;
2586         }
2587
2588         // This part is still alive
2589         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2590
2591         if (transactionPart == NULL) {
2592                 // Dont have a table for this machine Id yet so make one
2593                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2594                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2595         }
2596
2597         // Update the part and set dead ones we have already seen (got a
2598         // rescued version)
2599         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2600         if (previouslySeenPart != NULL) {
2601                 previouslySeenPart->setDead();
2602         }
2603 }
2604
2605 /**
2606  * Process new commit entries and save them for future use->  Delete duplicates
2607  */
2608 void Table::processEntry(CommitPart *entry) {
2609         // Update the last transaction that was updated if we can
2610         if (entry->getTransactionSequenceNumber() != -1) {
2611                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2612                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2613                 }
2614         }
2615
2616         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2617         if (commitPart == NULL) {
2618                 // Don't have a table for this machine Id yet so make one
2619                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2620                 newCommitParts->put(entry->getMachineId(), commitPart);
2621         }
2622         // Update the part and set dead ones we have already seen (got a
2623         // rescued version)
2624         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2625         if (previouslySeenPart != NULL) {
2626                 previouslySeenPart->setDead();
2627         }
2628 }
2629
2630 /**
2631  * Update the last message seen table-> Update and set dead the
2632  * appropriate RejectedMessages as clients see them-> Updates the live
2633  * aborts, removes those that are dead and sets them dead-> Check that
2634  * the last message seen is correct and that there is no mismatch of
2635  * our own last message or that other clients have not had a rollback
2636  * on the last message->
2637  */
2638 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2639         // We have seen this machine ID
2640         machineSet->remove(machineId);
2641
2642         // Get the set of rejected messages that this machine Id is has not seen yet
2643         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2644         // If there is a rejected message that this machine Id has not seen yet
2645         if (watchset != NULL) {
2646                 // Go through each rejected message that this machine Id has not
2647                 // seen yet
2648
2649                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2650                 while (rmit->hasNext()) {
2651                         RejectedMessage *rm = rmit->next();
2652                         // If this machine Id has seen this rejected message->->->
2653                         if (rm->getSequenceNumber() <= seqNum) {
2654                                 // Remove it from our watchlist
2655                                 rmit->remove();
2656                                 // Decrement machines that need to see this notification
2657                                 rm->removeWatcher(machineId);
2658                         }
2659                 }
2660                 delete rmit;
2661         }
2662
2663         // Set dead the abort
2664         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2665
2666         while (abortit->hasNext()) {
2667                 Pair<int64_t, int64_t> *key = abortit->next();
2668                 Abort *abort = liveAbortTable->get(key);
2669                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2670                         abort->setDead();
2671                         abortit->remove();
2672                         if (abort->getTransactionArbitrator() == localMachineId) {
2673                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2674                         }
2675                 }
2676         }
2677         delete abortit;
2678         if (machineId == localMachineId) {
2679                 // Our own messages are immediately dead->
2680                 char livenessType = liveness->getType();
2681                 if (livenessType == TypeLastMessage) {
2682                         ((LastMessage *)liveness)->setDead();
2683                 } else if (livenessType == TypeSlot) {
2684                         ((Slot *)liveness)->setDead();
2685                 } else {
2686                         throw new Error("Unrecognized type");
2687                 }
2688         }
2689         // Get the old last message for this device
2690         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2691         if (lastMessageEntry == NULL) {
2692                 // If no last message then there is nothing else to process
2693                 return;
2694         }
2695
2696         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2697         Liveness *lastEntry = lastMessageEntry->getSecond();
2698         delete lastMessageEntry;
2699
2700         // If it is not our machine Id since we already set ours to dead
2701         if (machineId != localMachineId) {
2702                 char lastEntryType = lastEntry->getType();
2703
2704                 if (lastEntryType == TypeLastMessage) {
2705                         ((LastMessage *)lastEntry)->setDead();
2706                 } else if (lastEntryType == TypeSlot) {
2707                         ((Slot *)lastEntry)->setDead();
2708                 } else {
2709                         throw new Error("Unrecognized type");
2710                 }
2711         }
2712         // Make sure the server is not playing any games
2713         if (machineId == localMachineId) {
2714                 if (hadPartialSendToServer) {
2715                         // We were not making any updates and we had a machine mismatch
2716                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2717                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2718                         }
2719                 } else {
2720                         // We were not making any updates and we had a machine mismatch
2721                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2722                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2723                         }
2724                 }
2725         } else {
2726                 if (lastMessageSeqNum > seqNum) {
2727                         throw new Error("Server Error: Rollback on remote machine sequence number");
2728                 }
2729         }
2730 }
2731
2732 /**
2733  * Add a rejected message entry to the watch set to keep track of
2734  * which clients have seen that rejected message entry and which have
2735  * not.
2736  */
2737 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2738         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2739         if (entries == NULL) {
2740                 // There is no set for this machine ID yet so create one
2741                 entries = new Hashset<RejectedMessage *>();
2742                 rejectedMessageWatchVectorTable->put(machineId, entries);
2743         }
2744         entries->add(entry);
2745 }
2746
2747 /**
2748  * Check if the HMAC chain is not violated
2749  */
2750 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2751         for (uint i = 0; i < newSlots->length(); i++) {
2752                 Slot *currSlot = newSlots->get(i);
2753                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2754                 if (prevSlot != NULL &&
2755                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2756                         throw new Error("Server Error: Invalid HMAC Chain");
2757         }
2758 }