3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
63 lastPendingSendArbitrationEntriesToDelete(NULL),
65 committedKeyValueTable(NULL),
66 speculatedKeyValueTable(NULL),
67 pendingTransactionSpeculatedKeyValueTable(NULL),
68 liveNewKeyTable(NULL),
69 lastMessageTable(NULL),
70 rejectedMessageWatchVectorTable(NULL),
71 arbitratorTable(NULL),
73 newTransactionParts(NULL),
75 lastArbitratedTransactionNumberByArbitratorTable(NULL),
76 liveTransactionBySequenceNumberTable(NULL),
77 liveTransactionByTransactionIdTable(NULL),
78 liveCommitsTable(NULL),
79 liveCommitsByKeyTable(NULL),
80 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81 rejectedSlotVector(NULL),
82 pendingTransactionQueue(NULL),
83 pendingSendArbitrationRounds(NULL),
84 pendingSendArbitrationEntriesToDelete(NULL),
85 transactionPartsSent(NULL),
86 outstandingTransactionStatus(NULL),
87 liveAbortsGeneratedByLocal(NULL),
88 offlineTransactionsCommittedAndAtServer(NULL),
89 localCommunicationTable(NULL),
90 lastTransactionSeenFromMachineFromServer(NULL),
91 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92 lastInsertedNewKey(false),
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
102 liveTableStatus(NULL),
103 pendingTransactionBuilder(NULL),
104 lastPendingTransactionSpeculatedOn(NULL),
105 firstPendingTransaction(NULL),
107 bufferResizeThreshold(0),
109 oldestLiveSlotSequenceNumver(1),
110 localMachineId(_localMachineId),
112 localSequenceNumber(0),
113 localTransactionSequenceNumber(0),
114 lastTransactionSequenceNumberSpeculatedOn(0),
115 oldestTransactionSequenceNumberSpeculatedOn(0),
116 localArbitrationSequenceNumber(0),
117 hadPartialSendToServer(false),
118 attemptedToSendToServer(false),
120 didFindTableStatus(false),
122 lastSlotAttemptedToSend(NULL),
125 lastTransactionPartsSent(NULL),
126 lastPendingSendArbitrationEntriesToDelete(NULL),
128 committedKeyValueTable(NULL),
129 speculatedKeyValueTable(NULL),
130 pendingTransactionSpeculatedKeyValueTable(NULL),
131 liveNewKeyTable(NULL),
132 lastMessageTable(NULL),
133 rejectedMessageWatchVectorTable(NULL),
134 arbitratorTable(NULL),
135 liveAbortTable(NULL),
136 newTransactionParts(NULL),
137 newCommitParts(NULL),
138 lastArbitratedTransactionNumberByArbitratorTable(NULL),
139 liveTransactionBySequenceNumberTable(NULL),
140 liveTransactionByTransactionIdTable(NULL),
141 liveCommitsTable(NULL),
142 liveCommitsByKeyTable(NULL),
143 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144 rejectedSlotVector(NULL),
145 pendingTransactionQueue(NULL),
146 pendingSendArbitrationRounds(NULL),
147 pendingSendArbitrationEntriesToDelete(NULL),
148 transactionPartsSent(NULL),
149 outstandingTransactionStatus(NULL),
150 liveAbortsGeneratedByLocal(NULL),
151 offlineTransactionsCommittedAndAtServer(NULL),
152 localCommunicationTable(NULL),
153 lastTransactionSeenFromMachineFromServer(NULL),
154 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155 lastInsertedNewKey(false),
166 delete committedKeyValueTable;
167 delete speculatedKeyValueTable;
168 delete pendingTransactionSpeculatedKeyValueTable;
169 delete liveNewKeyTable;
170 delete lastMessageTable;
171 delete rejectedMessageWatchVectorTable;
172 delete arbitratorTable;
173 delete liveAbortTable;
174 delete newTransactionParts;
175 delete newCommitParts;
176 delete lastArbitratedTransactionNumberByArbitratorTable;
177 delete liveTransactionBySequenceNumberTable;
178 delete liveTransactionByTransactionIdTable;
179 delete liveCommitsTable;
180 delete liveCommitsByKeyTable;
181 delete lastCommitSeenSequenceNumberByArbitratorTable;
182 delete rejectedSlotVector;
183 delete pendingTransactionQueue;
184 delete pendingSendArbitrationEntriesToDelete;
185 delete transactionPartsSent;
186 delete outstandingTransactionStatus;
187 delete liveAbortsGeneratedByLocal;
188 delete offlineTransactionsCommittedAndAtServer;
189 delete localCommunicationTable;
190 delete lastTransactionSeenFromMachineFromServer;
191 delete pendingSendArbitrationRounds;
192 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
196 * Init all the stuff needed for for table usage
199 // Init helper objects
200 random = new SecureRandom();
201 buffer = new SlotBuffer();
204 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
205 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
206 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
207 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
208 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
209 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
210 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
211 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
212 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
213 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
214 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
215 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
216 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
217 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
218 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
219 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
220 rejectedSlotVector = new Vector<int64_t>();
221 pendingTransactionQueue = new Vector<Transaction *>();
222 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
223 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
224 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
225 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
226 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
227 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
228 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
229 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
230 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
233 numberOfSlots = buffer->capacity();
234 setResizeThreshold();
238 * Initialize the table by inserting a table status as the first entry
239 * into the table status also initialize the crypto stuff.
241 void Table::initTable() {
242 cloud->initSecurity();
244 // Create the first insertion into the block chain which is the table status
245 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
246 localSequenceNumber++;
247 TableStatus *status = new TableStatus(s, numberOfSlots);
249 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
252 array = new Array<Slot *>(1);
254 // update local block chain
255 validateAndUpdate(array, true);
256 } else if (array->length() == 1) {
257 // in case we did push the slot BUT we failed to init it
258 validateAndUpdate(array, true);
260 throw new Error("Error on initialization");
265 * Rebuild the table from scratch by pulling the latest block chain
268 void Table::rebuild() {
269 // Just pull the latest slots from the server
270 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
271 validateAndUpdate(newslots, true);
273 updateLiveTransactionsAndStatus();
276 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
277 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
280 int64_t Table::getArbitrator(IoTString *key) {
281 return arbitratorTable->get(key);
284 void Table::close() {
288 IoTString *Table::getCommitted(IoTString *key) {
289 KeyValue *kv = committedKeyValueTable->get(key);
292 return kv->getValue();
298 IoTString *Table::getSpeculative(IoTString *key) {
299 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
302 kv = speculatedKeyValueTable->get(key);
306 kv = committedKeyValueTable->get(key);
310 return kv->getValue();
316 IoTString *Table::getCommittedAtomic(IoTString *key) {
317 KeyValue *kv = committedKeyValueTable->get(key);
319 if (!arbitratorTable->contains(key)) {
320 throw new Error("Key not Found.");
323 // Make sure new key value pair matches the current arbitrator
324 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
325 // TODO: Maybe not throw en error
326 throw new Error("Not all Key Values Match Arbitrator.");
330 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
331 return kv->getValue();
333 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
338 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
339 if (!arbitratorTable->contains(key)) {
340 throw new Error("Key not Found.");
343 // Make sure new key value pair matches the current arbitrator
344 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
345 // TODO: Maybe not throw en error
346 throw new Error("Not all Key Values Match Arbitrator.");
349 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
352 kv = speculatedKeyValueTable->get(key);
356 kv = committedKeyValueTable->get(key);
360 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
361 return kv->getValue();
363 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
368 bool Table::update() {
370 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
371 validateAndUpdate(newSlots, false);
373 updateLiveTransactionsAndStatus();
375 } catch (Exception *e) {
376 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
377 while (kit->hasNext()) {
378 int64_t m = kit->next();
387 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
389 if (arbitratorTable->contains(keyName)) {
390 // There is already an arbitrator
393 NewKey *newKey = new NewKey(NULL, keyName, machineId);
395 if (sendToServer(newKey)) {
396 // If successfully inserted
402 void Table::startTransaction() {
403 // Create a new transaction, invalidates any old pending transactions.
404 pendingTransactionBuilder = new PendingTransaction(localMachineId);
407 void Table::addKV(IoTString *key, IoTString *value) {
409 // Make sure it is a valid key
410 if (!arbitratorTable->contains(key)) {
411 throw new Error("Key not Found.");
414 // Make sure new key value pair matches the current arbitrator
415 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
416 // TODO: Maybe not throw en error
417 throw new Error("Not all Key Values Match Arbitrator.");
420 // Add the key value to this transaction
421 KeyValue *kv = new KeyValue(key, value);
422 pendingTransactionBuilder->addKV(kv);
425 TransactionStatus *Table::commitTransaction() {
426 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
427 // transaction with no updates will have no effect on the system
428 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
431 // Set the local transaction sequence number and increment
432 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
433 localTransactionSequenceNumber++;
435 // Create the transaction status
436 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
438 // Create the new transaction
439 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
440 newTransaction->setTransactionStatus(transactionStatus);
442 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
443 // Add it to the queue and invalidate the builder for safety
444 pendingTransactionQueue->add(newTransaction);
446 arbitrateOnLocalTransaction(newTransaction);
447 updateLiveStateFromLocal();
450 pendingTransactionBuilder = new PendingTransaction(localMachineId);
454 } catch (ServerException *e) {
456 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
457 uint size = pendingTransactionQueue->size();
459 for (uint iter = 0; iter < size; iter++) {
460 Transaction *transaction = pendingTransactionQueue->get(iter);
461 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
463 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
464 // Already contacted this client so ignore all attempts to contact this client
465 // to preserve ordering for arbitrator
469 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
471 if (sendReturn.getFirst()) {
472 // Failed to contact over local
473 arbitratorTriedAndFailed->add(transaction->getArbitrator());
475 // Successful contact or should not contact
477 if (sendReturn.getSecond()) {
483 pendingTransactionQueue->setSize(oldindex);
486 updateLiveStateFromLocal();
488 return transactionStatus;
492 * Recalculate the new resize threshold
494 void Table::setResizeThreshold() {
495 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
496 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
499 int64_t Table::getLocalSequenceNumber() {
500 return localSequenceNumber;
503 NewKey * Table::handlePartialSend(NewKey * newKey) {
504 //Didn't receive acknowledgement for last send
505 //See if the server has received a newer slot
507 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
508 if (newSlots->length() == 0) {
509 //Retry sending old slot
510 bool wasInserted = false;
511 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
513 if (sendSlotsReturn) {
514 if (newKey != NULL) {
515 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
520 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
521 while (trit->hasNext()) {
522 Transaction *transaction = trit->next();
523 transaction->resetServerFailure();
524 // Update which transactions parts still need to be sent
525 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
526 // Add the transaction status to the outstanding list
527 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
529 // Update the transaction status
530 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
532 // Check if all the transaction parts were successfully
533 // sent and if so then remove it from pending
534 if (transaction->didSendAllParts()) {
535 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
536 pendingTransactionQueue->remove(transaction);
541 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
542 if (newKey != NULL) {
543 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
548 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
549 while (trit->hasNext()) {
550 Transaction *transaction = trit->next();
551 transaction->resetServerFailure();
553 // Update which transactions parts still need to be sent
554 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
556 // Add the transaction status to the outstanding list
557 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
559 // Update the transaction status
560 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
562 // Check if all the transaction parts were successfully sent and if so then remove it from pending
563 if (transaction->didSendAllParts()) {
564 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
565 pendingTransactionQueue->remove(transaction);
567 transaction->resetServerFailure();
568 // Set the transaction sequence number back to nothing
569 if (!transaction->didSendAPartToServer()) {
570 transaction->setSequenceNumber(-1);
578 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
579 while (trit->hasNext()) {
580 Transaction *transaction = trit->next();
581 transaction->resetServerFailure();
582 // Set the transaction sequence number back to nothing
583 if (!transaction->didSendAPartToServer()) {
584 transaction->setSequenceNumber(-1);
589 if (newSlots->length() != 0) {
590 // insert into the local block chain
591 validateAndUpdate(newSlots, true);
595 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
596 if (newKey != NULL) {
597 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
602 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
603 while (trit->hasNext()) {
604 Transaction *transaction = trit->next();
605 transaction->resetServerFailure();
607 // Update which transactions parts still need to be sent
608 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
610 // Add the transaction status to the outstanding list
611 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
613 // Update the transaction status
614 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
616 // Check if all the transaction parts were successfully sent and if so then remove it from pending
617 if (transaction->didSendAllParts()) {
618 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
619 pendingTransactionQueue->remove(transaction);
621 transaction->resetServerFailure();
622 // Set the transaction sequence number back to nothing
623 if (!transaction->didSendAPartToServer()) {
624 transaction->setSequenceNumber(-1);
630 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
631 while (trit->hasNext()) {
632 Transaction *transaction = trit->next();
633 transaction->resetServerFailure();
634 // Set the transaction sequence number back to nothing
635 if (!transaction->didSendAPartToServer()) {
636 transaction->setSequenceNumber(-1);
642 // insert into the local block chain
643 validateAndUpdate(newSlots, true);
648 bool Table::sendToServer(NewKey *newKey) {
649 if (hadPartialSendToServer) {
650 newKey = handlePartialSend(newKey);
654 // While we have stuff that needs inserting into the block chain
655 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
656 if (hadPartialSendToServer) {
657 throw new Error("Should Be error free");
660 // If there is a new key with same name then end
661 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
666 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
667 localSequenceNumber++;
669 // Try to fill the slot with data
671 bool insertedNewKey = false;
672 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
675 // Reset which transaction to send
676 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
677 while (trit->hasNext()) {
678 Transaction *transaction = trit->next();
679 transaction->resetNextPartToSend();
681 // Set the transaction sequence number back to nothing
682 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
683 transaction->setSequenceNumber(-1);
688 // Clear the sent data since we are trying again
689 pendingSendArbitrationEntriesToDelete->clear();
690 transactionPartsSent->clear();
692 // We needed a resize so try again
693 fillSlot(slot, true, newKey, newSize, insertedNewKey);
696 lastSlotAttemptedToSend = slot;
697 lastIsNewKey = (newKey != NULL);
698 lastInsertedNewKey = insertedNewKey;
699 lastNewSize = newSize;
701 lastTransactionPartsSent = transactionPartsSent->clone();
702 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
704 Array<Slot *> * newSlots = NULL;
705 bool wasInserted = false;
706 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
708 if (sendSlotsReturn) {
709 // Did insert into the block chain
710 if (insertedNewKey) {
711 // This slot was what was inserted not a previous slot
712 // New Key was successfully inserted into the block chain so dont want to insert it again
716 // Remove the aborts and commit parts that were sent from the pending to send queue
717 uint size = pendingSendArbitrationRounds->size();
719 for (uint i = 0; i < size; i++) {
720 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
721 round->removeParts(pendingSendArbitrationEntriesToDelete);
723 if (!round->isDoneSending()) {
725 pendingSendArbitrationRounds->set(oldcount++,
726 pendingSendArbitrationRounds->get(i));
729 pendingSendArbitrationRounds->setSize(oldcount);
731 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
732 while (trit->hasNext()) {
733 Transaction *transaction = trit->next();
734 transaction->resetServerFailure();
736 // Update which transactions parts still need to be sent
737 transaction->removeSentParts(transactionPartsSent->get(transaction));
739 // Add the transaction status to the outstanding list
740 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
742 // Update the transaction status
743 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
745 // Check if all the transaction parts were successfully sent and if so then remove it from pending
746 if (transaction->didSendAllParts()) {
747 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
748 pendingTransactionQueue->remove(transaction);
753 // Reset which transaction to send
754 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
755 while (trit->hasNext()) {
756 Transaction *transaction = trit->next();
757 transaction->resetNextPartToSend();
759 // Set the transaction sequence number back to nothing
760 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
761 transaction->setSequenceNumber(-1);
767 // Clear the sent data in preparation for next send
768 pendingSendArbitrationEntriesToDelete->clear();
769 transactionPartsSent->clear();
771 if (newSlots->length() != 0) {
772 // insert into the local block chain
773 validateAndUpdate(newSlots, true);
776 } catch (ServerException *e) {
777 if (e->getType() != ServerException_TypeInputTimeout) {
778 // Nothing was able to be sent to the server so just clear these data structures
779 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
780 while (trit->hasNext()) {
781 Transaction *transaction = trit->next();
782 transaction->resetNextPartToSend();
784 // Set the transaction sequence number back to nothing
785 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
786 transaction->setSequenceNumber(-1);
791 // There was a partial send to the server
792 hadPartialSendToServer = true;
794 // Nothing was able to be sent to the server so just clear these data structures
795 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
796 while (trit->hasNext()) {
797 Transaction *transaction = trit->next();
798 transaction->resetNextPartToSend();
799 transaction->setServerFailure();
804 pendingSendArbitrationEntriesToDelete->clear();
805 transactionPartsSent->clear();
810 return newKey == NULL;
813 bool Table::updateFromLocal(int64_t machineId) {
814 if (!localCommunicationTable->contains(machineId))
817 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
819 // Get the size of the send data
820 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
822 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
823 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
824 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
827 Array<char> *sendData = new Array<char>(sendDataSize);
828 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
831 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
835 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
836 localSequenceNumber++;
838 if (returnData == NULL) {
839 // Could not contact server
844 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
845 int numberOfEntries = bbDecode->getInt();
847 for (int i = 0; i < numberOfEntries; i++) {
848 char type = bbDecode->get();
849 if (type == TypeAbort) {
850 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
852 } else if (type == TypeCommitPart) {
853 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
854 processEntry(commitPart);
858 updateLiveStateFromLocal();
863 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
865 // Get the devices local communications
866 if (!localCommunicationTable->contains(transaction->getArbitrator()))
867 return Pair<bool, bool>(true, false);
869 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
871 // Get the size of the send data
872 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
874 Vector<TransactionPart *> *tParts = transaction->getParts();
875 uint tPartsSize = tParts->size();
876 for (uint i = 0; i < tPartsSize; i++) {
877 TransactionPart *part = tParts->get(i);
878 sendDataSize += part->getSize();
882 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
883 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
884 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
887 // Make the send data size
888 Array<char> *sendData = new Array<char>(sendDataSize);
889 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
892 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
893 bbEncode->putInt(transaction->getParts()->size());
895 Vector<TransactionPart *> *tParts = transaction->getParts();
896 uint tPartsSize = tParts->size();
897 for (uint i = 0; i < tPartsSize; i++) {
898 TransactionPart *part = tParts->get(i);
899 part->encode(bbEncode);
904 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
905 localSequenceNumber++;
907 if (returnData == NULL) {
908 // Could not contact server
909 return Pair<bool, bool>(true, false);
913 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
914 bool didCommit = bbDecode->get() == 1;
915 bool couldArbitrate = bbDecode->get() == 1;
916 int numberOfEntries = bbDecode->getInt();
917 bool foundAbort = false;
919 for (int i = 0; i < numberOfEntries; i++) {
920 char type = bbDecode->get();
921 if (type == TypeAbort) {
922 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
924 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
929 } else if (type == TypeCommitPart) {
930 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
931 processEntry(commitPart);
935 updateLiveStateFromLocal();
937 if (couldArbitrate) {
938 TransactionStatus *status = transaction->getTransactionStatus();
940 status->setStatus(TransactionStatus_StatusCommitted);
942 status->setStatus(TransactionStatus_StatusAborted);
945 TransactionStatus *status = transaction->getTransactionStatus();
947 status->setStatus(TransactionStatus_StatusAborted);
949 status->setStatus(TransactionStatus_StatusCommitted);
953 return Pair<bool, bool>(false, true);
956 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
958 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
959 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
960 int numberOfParts = bbDecode->getInt();
962 // If we did commit a transaction or not
963 bool didCommit = false;
964 bool couldArbitrate = false;
966 if (numberOfParts != 0) {
968 // decode the transaction
969 Transaction *transaction = new Transaction();
970 for (int i = 0; i < numberOfParts; i++) {
972 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
973 transaction->addPartDecode(newPart);
976 // Arbitrate on transaction and pull relevant return data
977 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
978 couldArbitrate = localArbitrateReturn.getFirst();
979 didCommit = localArbitrateReturn.getSecond();
981 updateLiveStateFromLocal();
983 // Transaction was sent to the server so keep track of it to prevent double commit
984 if (transaction->getSequenceNumber() != -1) {
985 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
989 // The data to send back
990 int returnDataSize = 0;
991 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
993 // Get the aborts to send back
994 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
996 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
997 while (abortit->hasNext())
998 abortLocalSequenceNumbers->add(abortit->next());
1002 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1004 uint asize = abortLocalSequenceNumbers->size();
1005 for (uint i = 0; i < asize; i++) {
1006 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1007 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1011 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1012 unseenArbitrations->add(abort);
1013 returnDataSize += abort->getSize();
1016 // Get the commits to send back
1017 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1018 if (commitForClientTable != NULL) {
1019 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1021 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1022 while (commitit->hasNext())
1023 commitLocalSequenceNumbers->add(commitit->next());
1026 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1028 uint clsSize = commitLocalSequenceNumbers->size();
1029 for (uint clsi = 0; clsi < clsSize; clsi++) {
1030 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1031 Commit *commit = commitForClientTable->get(localSequenceNumber);
1033 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1038 Vector<CommitPart *> *parts = commit->getParts();
1039 uint nParts = parts->size();
1040 for (uint i = 0; i < nParts; i++) {
1041 CommitPart *commitPart = parts->get(i);
1042 unseenArbitrations->add(commitPart);
1043 returnDataSize += commitPart->getSize();
1049 // Number of arbitration entries to decode
1050 returnDataSize += 2 * sizeof(int32_t);
1052 // bool of did commit or not
1053 if (numberOfParts != 0) {
1054 returnDataSize += sizeof(char);
1057 // Data to send Back
1058 Array<char> *returnData = new Array<char>(returnDataSize);
1059 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1061 if (numberOfParts != 0) {
1063 bbEncode->put((char)1);
1065 bbEncode->put((char)0);
1067 if (couldArbitrate) {
1068 bbEncode->put((char)1);
1070 bbEncode->put((char)0);
1074 bbEncode->putInt(unseenArbitrations->size());
1075 uint size = unseenArbitrations->size();
1076 for (uint i = 0; i < size; i++) {
1077 Entry *entry = unseenArbitrations->get(i);
1078 entry->encode(bbEncode);
1081 localSequenceNumber++;
1085 /** Checks whether a given slot was sent using new slots in
1086 array. Returns true if sent and false otherwise. */
1088 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1089 uint size = array->length();
1090 for (uint i = 0; i < size; i++) {
1091 Slot *s = array->get(i);
1092 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1097 //Also need to see if other machines acknowledged our message
1098 for (uint i = 0; i < size; i++) {
1099 Slot *s = array->get(i);
1101 // Process each entry in the slot
1102 Vector<Entry *> *entries = s->getEntries();
1103 uint eSize = entries->size();
1104 for (uint ei = 0; ei < eSize; ei++) {
1105 Entry *entry = entries->get(ei);
1107 if (entry->getType() == TypeLastMessage) {
1108 LastMessage *lastMessage = (LastMessage *)entry;
1110 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1120 /** Method tries to send slot to server. Returns status in tuple.
1121 isInserted returns whether last un-acked send (if any) was
1122 successful. Returns whether send was confirmed.x
1125 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1126 attemptedToSendToServer = true;
1128 *array = cloud->putSlot(slot, newSize);
1129 if (*array == NULL) {
1130 *array = new Array<Slot *>(1);
1131 (*array)->set(0, slot);
1132 rejectedSlotVector->clear();
1133 *isInserted = false;
1136 if ((*array)->length() == 0) {
1137 throw new Error("Server Error: Did not send any slots");
1140 if (hadPartialSendToServer) {
1141 *isInserted = checkSend(*array, slot);
1143 if (!(*isInserted)) {
1144 rejectedSlotVector->add(slot->getSequenceNumber());
1149 rejectedSlotVector->add(slot->getSequenceNumber());
1150 *isInserted = false;
1157 * Returns true if a resize was needed but not done.
1159 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1160 newSize = 0;//special value to indicate no resize
1161 if (liveSlotCount > bufferResizeThreshold) {
1162 resize = true;//Resize is forced
1166 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1167 TableStatus *status = new TableStatus(slot, newSize);
1168 slot->addEntry(status);
1171 // Fill with rejected slots first before doing anything else
1172 doRejectedMessages(slot);
1174 // Do mandatory rescue of entries
1175 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1177 // Extract working variables
1178 bool needsResize = mandatoryRescueReturn.getFirst();
1179 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1180 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1182 if (needsResize && !resize) {
1183 // We need to resize but we are not resizing so return true to force on retry
1187 insertedKey = false;
1188 if (newKeyEntry != NULL) {
1189 newKeyEntry->setSlot(slot);
1190 if (slot->hasSpace(newKeyEntry)) {
1191 slot->addEntry(newKeyEntry);
1196 // Clear the transactions, aborts and commits that were sent previously
1197 transactionPartsSent->clear();
1198 pendingSendArbitrationEntriesToDelete->clear();
1199 uint size = pendingSendArbitrationRounds->size();
1200 for (uint i = 0; i < size; i++) {
1201 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1202 bool isFull = false;
1203 round->generateParts();
1204 Vector<Entry *> *parts = round->getParts();
1206 // Insert pending arbitration data
1207 uint vsize = parts->size();
1208 for (uint vi = 0; vi < vsize; vi++) {
1209 Entry *arbitrationData = parts->get(vi);
1211 // If it is an abort then we need to set some information
1212 if (arbitrationData->getType() == TypeAbort) {
1213 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1216 if (!slot->hasSpace(arbitrationData)) {
1217 // No space so cant do anything else with these data entries
1222 // Add to this current slot and add it to entries to delete
1223 slot->addEntry(arbitrationData);
1224 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1232 if (pendingTransactionQueue->size() > 0) {
1233 Transaction *transaction = pendingTransactionQueue->get(0);
1234 // Set the transaction sequence number if it has yet to be inserted into the block chain
1235 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1236 transaction->setSequenceNumber(slot->getSequenceNumber());
1240 TransactionPart *part = transaction->getNextPartToSend();
1242 // Ran out of parts to send for this transaction so move on
1246 if (slot->hasSpace(part)) {
1247 slot->addEntry(part);
1248 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1249 if (partsSent == NULL) {
1250 partsSent = new Vector<int32_t>();
1251 transactionPartsSent->put(transaction, partsSent);
1253 partsSent->add(part->getPartNumber());
1254 transactionPartsSent->put(transaction, partsSent);
1261 // Fill the remainder of the slot with rescue data
1262 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1267 void Table::doRejectedMessages(Slot *s) {
1268 if (!rejectedSlotVector->isEmpty()) {
1269 /* TODO: We should avoid generating a rejected message entry if
1270 * there is already a sufficient entry in the queue (e->g->,
1271 * equalsto value of true and same sequence number)-> */
1273 int64_t old_seqn = rejectedSlotVector->get(0);
1274 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1275 int64_t new_seqn = rejectedSlotVector->lastElement();
1276 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1279 int64_t prev_seqn = -1;
1281 /* Go through list of missing messages */
1282 for (; i < rejectedSlotVector->size(); i++) {
1283 int64_t curr_seqn = rejectedSlotVector->get(i);
1284 Slot *s_msg = buffer->getSlot(curr_seqn);
1287 prev_seqn = curr_seqn;
1289 /* Generate rejected message entry for missing messages */
1290 if (prev_seqn != -1) {
1291 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1294 /* Generate rejected message entries for present messages */
1295 for (; i < rejectedSlotVector->size(); i++) {
1296 int64_t curr_seqn = rejectedSlotVector->get(i);
1297 Slot *s_msg = buffer->getSlot(curr_seqn);
1298 int64_t machineid = s_msg->getMachineID();
1299 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1306 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1307 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1308 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1309 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1310 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1313 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1314 bool seenLiveSlot = false;
1315 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1316 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1320 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1321 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1322 // Push slot number forward
1323 if (!seenLiveSlot) {
1324 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1327 if (!previousSlot->isLive()) {
1331 // We have seen a live slot
1332 seenLiveSlot = true;
1334 // Get all the live entries for a slot
1335 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1337 // Iterate over all the live entries and try to rescue them
1338 uint lESize = liveEntries->size();
1339 for (uint i = 0; i < lESize; i++) {
1340 Entry *liveEntry = liveEntries->get(i);
1341 if (slot->hasSpace(liveEntry)) {
1342 // Enough space to rescue the entry
1343 slot->addEntry(liveEntry);
1344 } else if (currentSequenceNumber == firstIfFull) {
1345 //if there's no space but the entry is about to fall off the queue
1346 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1352 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1355 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1356 /* now go through live entries from least to greatest sequence number until
1357 * either all live slots added, or the slot doesn't have enough room
1358 * for SKIP_THRESHOLD consecutive entries*/
1360 int64_t newestseqnum = buffer->getNewestSeqNum();
1361 for (; seqn <= newestseqnum; seqn++) {
1362 Slot *prevslot = buffer->getSlot(seqn);
1363 //Push slot number forward
1365 oldestLiveSlotSequenceNumver = seqn;
1367 if (!prevslot->isLive())
1369 seenliveslot = true;
1370 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1371 uint lESize = liveentries->size();
1372 for (uint i = 0; i < lESize; i++) {
1373 Entry *liveentry = liveentries->get(i);
1374 if (s->hasSpace(liveentry))
1375 s->addEntry(liveentry);
1378 if (skipcount > Table_SKIP_THRESHOLD)
1388 * Checks for malicious activity and updates the local copy of the block chain->
1390 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1391 // The cloud communication layer has checked slot HMACs already
1393 if (newSlots->length() == 0) {
1397 // Make sure all slots are newer than the last largest slot this
1399 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1400 if (firstSeqNum <= sequenceNumber) {
1401 throw new Error("Server Error: Sent older slots!");
1404 // Create an object that can access both new slots and slots in our
1405 // local chain without committing slots to our local chain
1406 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1408 // Check that the HMAC chain is not broken
1409 checkHMACChain(indexer, newSlots);
1411 // Set to keep track of messages from clients
1412 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1414 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1415 while (lmit->hasNext())
1416 machineSet->add(lmit->next());
1420 // Process each slots data
1422 uint numSlots = newSlots->length();
1423 for (uint i = 0; i < numSlots; i++) {
1424 Slot *slot = newSlots->get(i);
1425 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1426 updateExpectedSize();
1430 // If there is a gap, check to see if the server sent us
1432 if (firstSeqNum != (sequenceNumber + 1)) {
1434 // Check the size of the slots that were sent down by the server->
1435 // Can only check the size if there was a gap
1436 checkNumSlots(newSlots->length());
1438 // Since there was a gap every machine must have pushed a slot or
1439 // must have a last message message-> If not then the server is
1441 if (!machineSet->isEmpty()) {
1442 throw new Error("Missing record for machines: ");
1446 // Update the size of our local block chain->
1449 // Commit new to slots to the local block chain->
1451 uint numSlots = newSlots->length();
1452 for (uint i = 0; i < numSlots; i++) {
1453 Slot *slot = newSlots->get(i);
1455 // Insert this slot into our local block chain copy->
1456 buffer->putSlot(slot);
1458 // Keep track of how many slots are currently live (have live data
1463 // Get the sequence number of the latest slot in the system
1464 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1465 updateLiveStateFromServer();
1467 // No Need to remember after we pulled from the server
1468 offlineTransactionsCommittedAndAtServer->clear();
1470 // This is invalidated now
1471 hadPartialSendToServer = false;
1474 void Table::updateLiveStateFromServer() {
1475 // Process the new transaction parts
1476 processNewTransactionParts();
1478 // Do arbitration on new transactions that were received
1479 arbitrateFromServer();
1481 // Update all the committed keys
1482 bool didCommitOrSpeculate = updateCommittedTable();
1484 // Delete the transactions that are now dead
1485 updateLiveTransactionsAndStatus();
1488 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1489 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1492 void Table::updateLiveStateFromLocal() {
1493 // Update all the committed keys
1494 bool didCommitOrSpeculate = updateCommittedTable();
1496 // Delete the transactions that are now dead
1497 updateLiveTransactionsAndStatus();
1500 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1501 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1504 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1505 int64_t prevslots = firstSequenceNumber;
1507 if (didFindTableStatus) {
1509 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1512 didFindTableStatus = true;
1513 currMaxSize = numberOfSlots;
1516 void Table::updateExpectedSize() {
1519 if (expectedsize > currMaxSize) {
1520 expectedsize = currMaxSize;
1526 * Check the size of the block chain to make sure there are enough
1527 * slots sent back by the server-> This is only called when we have a
1528 * gap between the slots that we have locally and the slots sent by
1529 * the server therefore in the slots sent by the server there will be
1530 * at least 1 Table status message
1532 void Table::checkNumSlots(int numberOfSlots) {
1533 if (numberOfSlots != expectedsize) {
1534 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1539 * Update the size of of the local buffer if it is needed->
1541 void Table::commitNewMaxSize() {
1542 didFindTableStatus = false;
1544 // Resize the local slot buffer
1545 if (numberOfSlots != currMaxSize) {
1546 buffer->resize((int32_t)currMaxSize);
1549 // Change the number of local slots to the new size
1550 numberOfSlots = (int32_t)currMaxSize;
1552 // Recalculate the resize threshold since the size of the local
1553 // buffer has changed
1554 setResizeThreshold();
1558 * Process the new transaction parts from this latest round of slots
1559 * received from the server
1561 void Table::processNewTransactionParts() {
1563 if (newTransactionParts->size() == 0) {
1564 // Nothing new to process
1568 // Iterate through all the machine Ids that we received new parts
1570 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1571 while (tpit->hasNext()) {
1572 int64_t machineId = tpit->next();
1573 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1575 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1576 // Iterate through all the parts for that machine Id
1577 while (ptit->hasNext()) {
1578 Pair<int64_t, int32_t> *partId = ptit->next();
1579 TransactionPart *part = parts->get(partId);
1581 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1582 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1583 if (lastTransactionNumber >= part->getSequenceNumber()) {
1584 // Set dead the transaction part
1590 // Get the transaction object for that sequence number
1591 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1593 if (transaction == NULL) {
1594 // This is a new transaction that we dont have so make a new one
1595 transaction = new Transaction();
1597 // Insert this new transaction into the live tables
1598 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1599 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1602 // Add that part to the transaction
1603 transaction->addPartDecode(part);
1608 // Clear all the new transaction parts in preparation for the next
1609 // time the server sends slots
1610 newTransactionParts->clear();
1613 void Table::arbitrateFromServer() {
1614 if (liveTransactionBySequenceNumberTable->size() == 0) {
1615 // Nothing to arbitrate on so move on
1619 // Get the transaction sequence numbers and sort from oldest to newest
1620 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1622 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1623 while (trit->hasNext())
1624 transactionSequenceNumbers->add(trit->next());
1627 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1629 // Collection of key value pairs that are
1630 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1632 // The last transaction arbitrated on
1633 int64_t lastTransactionCommitted = -1;
1634 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1635 uint tsnSize = transactionSequenceNumbers->size();
1636 for (uint i = 0; i < tsnSize; i++) {
1637 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1638 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1640 // Check if this machine arbitrates for this transaction if not
1641 // then we cant arbitrate this transaction
1642 if (transaction->getArbitrator() != localMachineId) {
1646 if (transactionSequenceNumber < lastSeqNumArbOn) {
1650 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1651 // We have seen this already locally so dont commit again
1656 if (!transaction->isComplete()) {
1657 // Will arbitrate in incorrect order if we continue so just break
1663 // update the largest transaction seen by arbitrator from server
1664 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1665 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1667 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1668 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1669 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1673 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1674 // Guard evaluated as true
1676 // Update the local changes so we can make the commit
1677 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1678 while (kvit->hasNext()) {
1679 KeyValue *kv = kvit->next();
1680 speculativeTableTmp->put(kv->getKey(), kv);
1684 // Update what the last transaction committed was for use in batch commit
1685 lastTransactionCommitted = transactionSequenceNumber;
1687 // Guard evaluated was false so create abort
1689 Abort *newAbort = new Abort(NULL,
1690 transaction->getClientLocalSequenceNumber(),
1691 transaction->getSequenceNumber(),
1692 transaction->getMachineId(),
1693 transaction->getArbitrator(),
1694 localArbitrationSequenceNumber);
1695 localArbitrationSequenceNumber++;
1696 generatedAborts->add(newAbort);
1698 // Insert the abort so we can process
1699 processEntry(newAbort);
1702 lastSeqNumArbOn = transactionSequenceNumber;
1705 Commit *newCommit = NULL;
1707 // If there is something to commit
1708 if (speculativeTableTmp->size() != 0) {
1709 // Create the commit and increment the commit sequence number
1710 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1711 localArbitrationSequenceNumber++;
1713 // Add all the new keys to the commit
1714 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1715 while (spit->hasNext()) {
1716 IoTString *string = spit->next();
1717 KeyValue *kv = speculativeTableTmp->get(string);
1718 newCommit->addKV(kv);
1722 // create the commit parts
1723 newCommit->createCommitParts();
1725 // Append all the commit parts to the end of the pending queue
1726 // waiting for sending to the server
1727 // Insert the commit so we can process it
1728 Vector<CommitPart *> *parts = newCommit->getParts();
1729 uint partsSize = parts->size();
1730 for (uint i = 0; i < partsSize; i++) {
1731 CommitPart *commitPart = parts->get(i);
1732 processEntry(commitPart);
1736 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1737 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1738 pendingSendArbitrationRounds->add(arbitrationRound);
1740 if (compactArbitrationData()) {
1741 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1742 if (newArbitrationRound->getCommit() != NULL) {
1743 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1744 uint partsSize = parts->size();
1745 for (uint i = 0; i < partsSize; i++) {
1746 CommitPart *commitPart = parts->get(i);
1747 processEntry(commitPart);
1754 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1756 // Check if this machine arbitrates for this transaction if not then
1757 // we cant arbitrate this transaction
1758 if (transaction->getArbitrator() != localMachineId) {
1759 return Pair<bool, bool>(false, false);
1762 if (!transaction->isComplete()) {
1763 // Will arbitrate in incorrect order if we continue so just break
1765 return Pair<bool, bool>(false, false);
1768 if (transaction->getMachineId() != localMachineId) {
1769 // dont do this check for local transactions
1770 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1771 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1772 // We've have already seen this from the server
1773 return Pair<bool, bool>(false, false);
1778 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1779 // Guard evaluated as true Create the commit and increment the
1780 // commit sequence number
1781 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1782 localArbitrationSequenceNumber++;
1784 // Update the local changes so we can make the commit
1785 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1786 while (kvit->hasNext()) {
1787 KeyValue *kv = kvit->next();
1788 newCommit->addKV(kv);
1792 // create the commit parts
1793 newCommit->createCommitParts();
1795 // Append all the commit parts to the end of the pending queue
1796 // waiting for sending to the server
1797 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1798 pendingSendArbitrationRounds->add(arbitrationRound);
1800 if (compactArbitrationData()) {
1801 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1802 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1803 uint partsSize = parts->size();
1804 for (uint i = 0; i < partsSize; i++) {
1805 CommitPart *commitPart = parts->get(i);
1806 processEntry(commitPart);
1809 // Insert the commit so we can process it
1810 Vector<CommitPart *> *parts = newCommit->getParts();
1811 uint partsSize = parts->size();
1812 for (uint i = 0; i < partsSize; i++) {
1813 CommitPart *commitPart = parts->get(i);
1814 processEntry(commitPart);
1818 if (transaction->getMachineId() == localMachineId) {
1819 TransactionStatus *status = transaction->getTransactionStatus();
1820 if (status != NULL) {
1821 status->setStatus(TransactionStatus_StatusCommitted);
1825 updateLiveStateFromLocal();
1826 return Pair<bool, bool>(true, true);
1828 if (transaction->getMachineId() == localMachineId) {
1829 // For locally created messages update the status
1830 // Guard evaluated was false so create abort
1831 TransactionStatus *status = transaction->getTransactionStatus();
1832 if (status != NULL) {
1833 status->setStatus(TransactionStatus_StatusAborted);
1836 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1839 Abort *newAbort = new Abort(NULL,
1840 transaction->getClientLocalSequenceNumber(),
1842 transaction->getMachineId(),
1843 transaction->getArbitrator(),
1844 localArbitrationSequenceNumber);
1845 localArbitrationSequenceNumber++;
1846 addAbortSet->add(newAbort);
1848 // Append all the commit parts to the end of the pending queue
1849 // waiting for sending to the server
1850 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1851 pendingSendArbitrationRounds->add(arbitrationRound);
1853 if (compactArbitrationData()) {
1854 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1856 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1857 uint partsSize = parts->size();
1858 for (uint i = 0; i < partsSize; i++) {
1859 CommitPart *commitPart = parts->get(i);
1860 processEntry(commitPart);
1865 updateLiveStateFromLocal();
1866 return Pair<bool, bool>(true, false);
1871 * Compacts the arbitration data my merging commits and aggregating
1872 * aborts so that a single large push of commits can be done instead
1873 * of many small updates
1875 bool Table::compactArbitrationData() {
1876 if (pendingSendArbitrationRounds->size() < 2) {
1877 // Nothing to compact so do nothing
1881 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1882 if (lastRound->getDidSendPart()) {
1886 bool hadCommit = (lastRound->getCommit() == NULL);
1887 bool gotNewCommit = false;
1889 uint numberToDelete = 1;
1890 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1891 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1893 if (round->isFull() || round->getDidSendPart()) {
1894 // Stop since there is a part that cannot be compacted and we
1895 // need to compact in order
1899 if (round->getCommit() == NULL) {
1900 // Try compacting aborts only
1901 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1902 if (newSize > ArbitrationRound_MAX_PARTS) {
1903 // Cant compact since it would be too large
1906 lastRound->addAborts(round->getAborts());
1908 // Create a new larger commit
1909 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1910 localArbitrationSequenceNumber++;
1912 // Create the commit parts so that we can count them
1913 newCommit->createCommitParts();
1915 // Calculate the new size of the parts
1916 int newSize = newCommit->getNumberOfParts();
1917 newSize += lastRound->getAbortsCount();
1918 newSize += round->getAbortsCount();
1920 if (newSize > ArbitrationRound_MAX_PARTS) {
1921 // Cant compact since it would be too large
1925 // Set the new compacted part
1926 lastRound->setCommit(newCommit);
1927 lastRound->addAborts(round->getAborts());
1928 gotNewCommit = true;
1934 if (numberToDelete != 1) {
1935 // If there is a compaction
1936 // Delete the previous pieces that are now in the new compacted piece
1937 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1938 pendingSendArbitrationRounds->clear();
1940 for (uint i = 0; i < numberToDelete; i++) {
1941 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1945 // Add the new compacted into the pending to send list
1946 pendingSendArbitrationRounds->add(lastRound);
1948 // Should reinsert into the commit processor
1949 if (hadCommit && gotNewCommit) {
1958 * Update all the commits and the committed tables, sets dead the dead
1961 bool Table::updateCommittedTable() {
1962 if (newCommitParts->size() == 0) {
1963 // Nothing new to process
1967 // Iterate through all the machine Ids that we received new parts for
1968 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
1969 while (partsit->hasNext()) {
1970 int64_t machineId = partsit->next();
1971 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
1973 // Iterate through all the parts for that machine Id
1974 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
1975 while (pairit->hasNext()) {
1976 Pair<int64_t, int32_t> *partId = pairit->next();
1977 CommitPart *part = parts->get(partId);
1979 // Get the transaction object for that sequence number
1980 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1982 if (commitForClientTable == NULL) {
1983 // This is the first commit from this device
1984 commitForClientTable = new Hashtable<int64_t, Commit *>();
1985 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1988 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1990 if (commit == NULL) {
1991 // This is a new commit that we dont have so make a new one
1992 commit = new Commit();
1994 // Insert this new commit into the live tables
1995 commitForClientTable->put(part->getSequenceNumber(), commit);
1998 // Add that part to the commit
1999 commit->addPartDecode(part);
2005 // Clear all the new commits parts in preparation for the next time
2006 // the server sends slots
2007 newCommitParts->clear();
2009 // If we process a new commit keep track of it for future use
2010 bool didProcessANewCommit = false;
2012 // Process the commits one by one
2013 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2014 while (liveit->hasNext()) {
2015 int64_t arbitratorId = liveit->next();
2017 // Get all the commits for a specific arbitrator
2018 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2020 // Sort the commits in order
2021 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2023 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2024 while (clientit->hasNext())
2025 commitSequenceNumbers->add(clientit->next());
2029 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2031 // Get the last commit seen from this arbitrator
2032 int64_t lastCommitSeenSequenceNumber = -1;
2033 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2034 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2037 // Go through each new commit one by one
2038 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2039 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2040 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2042 // Special processing if a commit is not complete
2043 if (!commit->isComplete()) {
2044 if (i == (commitSequenceNumbers->size() - 1)) {
2045 // If there is an incomplete commit and this commit is the
2046 // latest one seen then this commit cannot be processed and
2047 // there are no other commits
2050 // This is a commit that was already dead but parts of it
2051 // are still in the block chain (not flushed out yet)->
2052 // Delete it and move on
2054 commitForClientTable->remove(commit->getSequenceNumber());
2059 // Update the last transaction that was updated if we can
2060 if (commit->getTransactionSequenceNumber() != -1) {
2061 // Update the last transaction sequence number that the arbitrator arbitrated on1
2062 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2063 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2067 // Update the last arbitration data that we have seen so far
2068 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2069 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2070 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2072 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2075 // Never seen any data from this arbitrator so record the first one
2076 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2079 // We have already seen this commit before so need to do the
2080 // full processing on this commit
2081 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2083 // Update the last transaction that was updated if we can
2084 if (commit->getTransactionSequenceNumber() != -1) {
2085 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2086 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2087 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2088 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2095 // If we got here then this is a brand new commit and needs full
2097 // Get what commits should be edited, these are the commits that
2098 // have live values for their keys
2099 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2101 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2102 while (kvit->hasNext()) {
2103 KeyValue *kv = kvit->next();
2104 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2106 commitsToEdit->add(commit);
2111 // Update each previous commit that needs to be updated
2112 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2113 while (commitit->hasNext()) {
2114 Commit *previousCommit = commitit->next();
2116 // Only bother with live commits (TODO: Maybe remove this check)
2117 if (previousCommit->isLive()) {
2119 // Update which keys in the old commits are still live
2121 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2122 while (kvit->hasNext()) {
2123 KeyValue *kv = kvit->next();
2124 previousCommit->invalidateKey(kv->getKey());
2129 // if the commit is now dead then remove it
2130 if (!previousCommit->isLive()) {
2131 commitForClientTable->remove(previousCommit->getSequenceNumber());
2137 // Update the last seen sequence number from this arbitrator
2138 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2139 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2140 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2143 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2146 // We processed a new commit that we havent seen before
2147 didProcessANewCommit = true;
2149 // Update the committed table of keys and which commit is using which key
2151 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2152 while (kvit->hasNext()) {
2153 KeyValue *kv = kvit->next();
2154 committedKeyValueTable->put(kv->getKey(), kv);
2155 liveCommitsByKeyTable->put(kv->getKey(), commit);
2163 return didProcessANewCommit;
2167 * Create the speculative table from transactions that are still live
2168 * and have come from the cloud
2170 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2171 if (liveTransactionBySequenceNumberTable->size() == 0) {
2172 // There is nothing to speculate on
2176 // Create a list of the transaction sequence numbers and sort them
2177 // from oldest to newest
2178 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2180 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2181 while (trit->hasNext())
2182 transactionSequenceNumbersSorted->add(trit->next());
2186 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2188 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2191 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2192 // If there is a gap in the transaction sequence numbers then
2193 // there was a commit or an abort of a transaction OR there was a
2194 // new commit (Could be from offline commit) so a redo the
2195 // speculation from scratch
2197 // Start from scratch
2198 speculatedKeyValueTable->clear();
2199 lastTransactionSequenceNumberSpeculatedOn = -1;
2200 oldestTransactionSequenceNumberSpeculatedOn = -1;
2203 // Remember the front of the transaction list
2204 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2206 // Find where to start arbitration from
2207 uint startIndex = 0;
2209 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2210 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2214 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2215 // Make sure we are not out of bounds
2216 return false; // did not speculate
2219 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2220 bool didSkip = true;
2222 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2223 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2224 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2226 if (!transaction->isComplete()) {
2227 // If there is an incomplete transaction then there is nothing
2228 // we can do add this transactions arbitrator to the list of
2229 // arbitrators we should ignore
2230 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2235 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2239 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2241 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2242 // Guard evaluated to true so update the speculative table
2244 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2245 while (kvit->hasNext()) {
2246 KeyValue *kv = kvit->next();
2247 speculatedKeyValueTable->put(kv->getKey(), kv);
2255 // Since there was a skip we need to redo the speculation next time around
2256 lastTransactionSequenceNumberSpeculatedOn = -1;
2257 oldestTransactionSequenceNumberSpeculatedOn = -1;
2260 // We did some speculation
2265 * Create the pending transaction speculative table from transactions
2266 * that are still in the pending transaction buffer
2268 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2269 if (pendingTransactionQueue->size() == 0) {
2270 // There is nothing to speculate on
2274 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2275 // need to reset on the pending speculation
2276 lastPendingTransactionSpeculatedOn = NULL;
2277 firstPendingTransaction = pendingTransactionQueue->get(0);
2278 pendingTransactionSpeculatedKeyValueTable->clear();
2281 // Find where to start arbitration from
2282 uint startIndex = 0;
2284 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2285 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2288 if (startIndex >= pendingTransactionQueue->size()) {
2289 // Make sure we are not out of bounds
2293 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2294 Transaction *transaction = pendingTransactionQueue->get(i);
2296 lastPendingTransactionSpeculatedOn = transaction;
2298 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2299 // Guard evaluated to true so update the speculative table
2300 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2301 while (kvit->hasNext()) {
2302 KeyValue *kv = kvit->next();
2303 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2311 * Set dead and remove from the live transaction tables the
2312 * transactions that are dead
2314 void Table::updateLiveTransactionsAndStatus() {
2315 // Go through each of the transactions
2317 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2318 while (iter->hasNext()) {
2319 int64_t key = iter->next();
2320 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2322 // Check if the transaction is dead
2323 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2324 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2325 // Set dead the transaction
2326 transaction->setDead();
2328 // Remove the transaction from the live table
2330 liveTransactionByTransactionIdTable->remove(transaction->getId());
2336 // Go through each of the transactions
2338 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2339 while (iter->hasNext()) {
2340 int64_t key = iter->next();
2341 TransactionStatus *status = outstandingTransactionStatus->get(key);
2343 // Check if the transaction is dead
2344 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2345 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2347 status->setStatus(TransactionStatus_StatusCommitted);
2358 * Process this slot, entry by entry-> Also update the latest message sent by slot
2360 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2362 // Update the last message seen
2363 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2365 // Process each entry in the slot
2366 Vector<Entry *> *entries = slot->getEntries();
2367 uint eSize = entries->size();
2368 for (uint ei = 0; ei < eSize; ei++) {
2369 Entry *entry = entries->get(ei);
2370 switch (entry->getType()) {
2371 case TypeCommitPart:
2372 processEntry((CommitPart *)entry);
2375 processEntry((Abort *)entry);
2377 case TypeTransactionPart:
2378 processEntry((TransactionPart *)entry);
2381 processEntry((NewKey *)entry);
2383 case TypeLastMessage:
2384 processEntry((LastMessage *)entry, machineSet);
2386 case TypeRejectedMessage:
2387 processEntry((RejectedMessage *)entry, indexer);
2389 case TypeTableStatus:
2390 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2393 throw new Error("Unrecognized type: ");
2399 * Update the last message that was sent for a machine Id
2401 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2402 // Update what the last message received by a machine was
2403 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2407 * Add the new key to the arbitrators table and update the set of live
2408 * new keys (in case of a rescued new key message)
2410 void Table::processEntry(NewKey *entry) {
2411 // Update the arbitrator table with the new key information
2412 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2414 // Update what the latest live new key is
2415 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2416 if (oldNewKey != NULL) {
2417 // Delete the old new key messages
2418 oldNewKey->setDead();
2423 * Process new table status entries and set dead the old ones as new
2424 * ones come in-> keeps track of the largest and smallest table status
2425 * seen in this current round of updating the local copy of the block
2428 void Table::processEntry(TableStatus *entry, int64_t seq) {
2429 int newNumSlots = entry->getMaxSlots();
2430 updateCurrMaxSize(newNumSlots);
2431 initExpectedSize(seq, newNumSlots);
2433 if (liveTableStatus != NULL) {
2434 // We have a larger table status so the old table status is no
2436 liveTableStatus->setDead();
2439 // Make this new table status the latest alive table status
2440 liveTableStatus = entry;
2444 * Check old messages to see if there is a block chain violation->
2447 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2448 int64_t oldSeqNum = entry->getOldSeqNum();
2449 int64_t newSeqNum = entry->getNewSeqNum();
2450 bool isequal = entry->getEqual();
2451 int64_t machineId = entry->getMachineID();
2452 int64_t seq = entry->getSequenceNumber();
2454 // Check if we have messages that were supposed to be rejected in
2455 // our local block chain
2456 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2458 Slot *slot = indexer->getSlot(seqNum);
2461 // If we have this slot make sure that it was not supposed to be
2463 int64_t slotMachineId = slot->getMachineID();
2464 if (isequal != (slotMachineId == machineId)) {
2465 throw new Error("Server Error: Trying to insert rejected message for slot ");
2470 // Create a list of clients to watch until they see this rejected
2472 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2473 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2474 while (iter->hasNext()) {
2475 // Machine ID for the last message entry
2476 int64_t lastMessageEntryMachineId = iter->next();
2478 // We've seen it, don't need to continue to watch-> Our next
2479 // message will implicitly acknowledge it->
2480 if (lastMessageEntryMachineId == localMachineId) {
2484 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2485 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2487 if (entrySequenceNumber < seq) {
2488 // Add this rejected message to the set of messages that this
2489 // machine ID did not see yet
2490 addWatchVector(lastMessageEntryMachineId, entry);
2491 // This client did not see this rejected message yet so add it
2492 // to the watch set to monitor
2493 deviceWatchSet->add(lastMessageEntryMachineId);
2498 if (deviceWatchSet->isEmpty()) {
2499 // This rejected message has been seen by all the clients so
2502 // We need to watch this rejected message
2503 entry->setWatchSet(deviceWatchSet);
2508 * Check if this abort is live, if not then save it so we can kill it
2509 * later-> update the last transaction number that was arbitrated on->
2511 void Table::processEntry(Abort *entry) {
2512 if (entry->getTransactionSequenceNumber() != -1) {
2513 // update the transaction status if it was sent to the server
2514 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2515 if (status != NULL) {
2516 status->setStatus(TransactionStatus_StatusAborted);
2520 // Abort has not been seen by the client it is for yet so we need to
2523 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2524 if (previouslySeenAbort != NULL) {
2525 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2528 if (entry->getTransactionArbitrator() == localMachineId) {
2529 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2532 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2533 // The machine already saw this so it is dead
2535 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2536 liveAbortTable->remove(&abortid);
2538 if (entry->getTransactionArbitrator() == localMachineId) {
2539 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2544 // Update the last arbitration data that we have seen so far
2545 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2546 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2547 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2549 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2552 // Never seen any data from this arbitrator so record the first one
2553 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2556 // Set dead a transaction if we can
2557 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2559 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2560 if (transactionToSetDead != NULL) {
2561 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2564 // Update the last transaction sequence number that the arbitrator
2566 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2567 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2569 if (entry->getTransactionSequenceNumber() != -1) {
2570 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2576 * Set dead the transaction part if that transaction is dead and keep
2577 * track of all new parts
2579 void Table::processEntry(TransactionPart *entry) {
2580 // Check if we have already seen this transaction and set it dead OR
2581 // if it is not alive
2582 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2583 // This transaction is dead, it was already committed or aborted
2588 // This part is still alive
2589 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2591 if (transactionPart == NULL) {
2592 // Dont have a table for this machine Id yet so make one
2593 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2594 newTransactionParts->put(entry->getMachineId(), transactionPart);
2597 // Update the part and set dead ones we have already seen (got a
2599 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2600 if (previouslySeenPart != NULL) {
2601 previouslySeenPart->setDead();
2606 * Process new commit entries and save them for future use-> Delete duplicates
2608 void Table::processEntry(CommitPart *entry) {
2609 // Update the last transaction that was updated if we can
2610 if (entry->getTransactionSequenceNumber() != -1) {
2611 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2612 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2616 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2617 if (commitPart == NULL) {
2618 // Don't have a table for this machine Id yet so make one
2619 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2620 newCommitParts->put(entry->getMachineId(), commitPart);
2622 // Update the part and set dead ones we have already seen (got a
2624 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2625 if (previouslySeenPart != NULL) {
2626 previouslySeenPart->setDead();
2631 * Update the last message seen table-> Update and set dead the
2632 * appropriate RejectedMessages as clients see them-> Updates the live
2633 * aborts, removes those that are dead and sets them dead-> Check that
2634 * the last message seen is correct and that there is no mismatch of
2635 * our own last message or that other clients have not had a rollback
2636 * on the last message->
2638 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2639 // We have seen this machine ID
2640 machineSet->remove(machineId);
2642 // Get the set of rejected messages that this machine Id is has not seen yet
2643 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2644 // If there is a rejected message that this machine Id has not seen yet
2645 if (watchset != NULL) {
2646 // Go through each rejected message that this machine Id has not
2649 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2650 while (rmit->hasNext()) {
2651 RejectedMessage *rm = rmit->next();
2652 // If this machine Id has seen this rejected message->->->
2653 if (rm->getSequenceNumber() <= seqNum) {
2654 // Remove it from our watchlist
2656 // Decrement machines that need to see this notification
2657 rm->removeWatcher(machineId);
2663 // Set dead the abort
2664 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2666 while (abortit->hasNext()) {
2667 Pair<int64_t, int64_t> *key = abortit->next();
2668 Abort *abort = liveAbortTable->get(key);
2669 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2672 if (abort->getTransactionArbitrator() == localMachineId) {
2673 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2678 if (machineId == localMachineId) {
2679 // Our own messages are immediately dead->
2680 char livenessType = liveness->getType();
2681 if (livenessType == TypeLastMessage) {
2682 ((LastMessage *)liveness)->setDead();
2683 } else if (livenessType == TypeSlot) {
2684 ((Slot *)liveness)->setDead();
2686 throw new Error("Unrecognized type");
2689 // Get the old last message for this device
2690 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2691 if (lastMessageEntry == NULL) {
2692 // If no last message then there is nothing else to process
2696 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2697 Liveness *lastEntry = lastMessageEntry->getSecond();
2698 delete lastMessageEntry;
2700 // If it is not our machine Id since we already set ours to dead
2701 if (machineId != localMachineId) {
2702 char lastEntryType = lastEntry->getType();
2704 if (lastEntryType == TypeLastMessage) {
2705 ((LastMessage *)lastEntry)->setDead();
2706 } else if (lastEntryType == TypeSlot) {
2707 ((Slot *)lastEntry)->setDead();
2709 throw new Error("Unrecognized type");
2712 // Make sure the server is not playing any games
2713 if (machineId == localMachineId) {
2714 if (hadPartialSendToServer) {
2715 // We were not making any updates and we had a machine mismatch
2716 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2717 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2720 // We were not making any updates and we had a machine mismatch
2721 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2722 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2726 if (lastMessageSeqNum > seqNum) {
2727 throw new Error("Server Error: Rollback on remote machine sequence number");
2733 * Add a rejected message entry to the watch set to keep track of
2734 * which clients have seen that rejected message entry and which have
2737 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2738 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2739 if (entries == NULL) {
2740 // There is no set for this machine ID yet so create one
2741 entries = new Hashset<RejectedMessage *>();
2742 rejectedMessageWatchVectorTable->put(machineId, entries);
2744 entries->add(entry);
2748 * Check if the HMAC chain is not violated
2750 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2751 for (uint i = 0; i < newSlots->length(); i++) {
2752 Slot *currSlot = newSlots->get(i);
2753 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2754 if (prevSlot != NULL &&
2755 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2756 throw new Error("Server Error: Invalid HMAC Chain");