3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void * a, const void *b) {
25 const int64_t * pa = (const int64_t *) a;
26 const int64_t * pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localTransactionSequenceNumber(0),
50 lastTransactionSequenceNumberSpeculatedOn(0),
51 oldestTransactionSequenceNumberSpeculatedOn(0),
52 localArbitrationSequenceNumber(0),
53 hadPartialSendToServer(false),
54 attemptedToSendToServer(false),
56 didFindTableStatus(false),
58 lastSlotAttemptedToSend(NULL),
61 lastTransactionPartsSent(NULL),
62 lastPendingSendArbitrationEntriesToDelete(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localTransactionSequenceNumber(0),
112 lastTransactionSequenceNumberSpeculatedOn(0),
113 oldestTransactionSequenceNumberSpeculatedOn(0),
114 localArbitrationSequenceNumber(0),
115 hadPartialSendToServer(false),
116 attemptedToSendToServer(false),
118 didFindTableStatus(false),
120 lastSlotAttemptedToSend(NULL),
123 lastTransactionPartsSent(NULL),
124 lastPendingSendArbitrationEntriesToDelete(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
160 * Init all the stuff needed for for table usage
163 // Init helper objects
164 random = new Random();
165 buffer = new SlotBuffer();
168 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174 arbitratorTable = new Hashtable<IoTString *, int64_t>();
175 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184 rejectedSlotVector = new Vector<int64_t>();
185 pendingTransactionQueue = new Vector<Transaction *>();
186 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
197 numberOfSlots = buffer->capacity();
198 setResizeThreshold();
202 * Initialize the table by inserting a table status as the first entry
203 * into the table status also initialize the crypto stuff.
205 void Table::initTable() {
206 cloud->initSecurity();
208 // Create the first insertion into the block chain which is the table status
209 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210 localSequenceNumber++;
211 TableStatus *status = new TableStatus(s, numberOfSlots);
213 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
216 array = new Array<Slot *>(1);
218 // update local block chain
219 validateAndUpdate(array, true);
220 } else if (array->length() == 1) {
221 // in case we did push the slot BUT we failed to init it
222 validateAndUpdate(array, true);
224 throw new Error("Error on initialization");
229 * Rebuild the table from scratch by pulling the latest block chain
232 void Table::rebuild() {
233 // Just pull the latest slots from the server
234 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235 validateAndUpdate(newslots, true);
237 updateLiveTransactionsAndStatus();
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
244 int64_t Table::getArbitrator(IoTString *key) {
245 return arbitratorTable->get(key);
248 void Table::close() {
252 IoTString *Table::getCommitted(IoTString *key) {
253 KeyValue *kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getSpeculative(IoTString *key) {
263 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
266 kv = speculatedKeyValueTable->get(key);
270 kv = committedKeyValueTable->get(key);
274 return kv->getValue();
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281 KeyValue *kv = committedKeyValueTable->get(key);
283 if (!arbitratorTable->contains(key)) {
284 throw new Error("Key not Found.");
287 // Make sure new key value pair matches the current arbitrator
288 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289 // TODO: Maybe not throw en error
290 throw new Error("Not all Key Values Match Arbitrator.");
294 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295 return kv->getValue();
297 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303 if (!arbitratorTable->contains(key)) {
304 throw new Error("Key not Found.");
307 // Make sure new key value pair matches the current arbitrator
308 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309 // TODO: Maybe not throw en error
310 throw new Error("Not all Key Values Match Arbitrator.");
313 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
316 kv = speculatedKeyValueTable->get(key);
320 kv = committedKeyValueTable->get(key);
324 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325 return kv->getValue();
327 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
332 bool Table::update() {
334 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335 validateAndUpdate(newSlots, false);
337 updateLiveTransactionsAndStatus();
339 } catch (Exception *e) {
340 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341 while (kit->hasNext()) {
342 int64_t m = kit->next();
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
353 if (!arbitratorTable->contains(keyName)) {
354 // There is already an arbitrator
357 NewKey *newKey = new NewKey(NULL, keyName, machineId);
359 if (sendToServer(newKey)) {
360 // If successfully inserted
366 void Table::startTransaction() {
367 // Create a new transaction, invalidates any old pending transactions.
368 pendingTransactionBuilder = new PendingTransaction(localMachineId);
371 void Table::addKV(IoTString *key, IoTString *value) {
373 // Make sure it is a valid key
374 if (!arbitratorTable->contains(key)) {
375 throw new Error("Key not Found.");
378 // Make sure new key value pair matches the current arbitrator
379 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380 // TODO: Maybe not throw en error
381 throw new Error("Not all Key Values Match Arbitrator.");
384 // Add the key value to this transaction
385 KeyValue *kv = new KeyValue(key, value);
386 pendingTransactionBuilder->addKV(kv);
389 TransactionStatus *Table::commitTransaction() {
390 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391 // transaction with no updates will have no effect on the system
392 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
395 // Set the local transaction sequence number and increment
396 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397 localTransactionSequenceNumber++;
399 // Create the transaction status
400 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
402 // Create the new transaction
403 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404 newTransaction->setTransactionStatus(transactionStatus);
406 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407 // Add it to the queue and invalidate the builder for safety
408 pendingTransactionQueue->add(newTransaction);
410 arbitrateOnLocalTransaction(newTransaction);
411 updateLiveStateFromLocal();
414 pendingTransactionBuilder = new PendingTransaction(localMachineId);
418 } catch (ServerException *e) {
420 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421 uint size = pendingTransactionQueue->size();
423 for (int iter = 0; iter < size; iter++) {
424 Transaction *transaction = pendingTransactionQueue->get(iter);
425 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
427 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428 // Already contacted this client so ignore all attempts to contact this client
429 // to preserve ordering for arbitrator
433 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
435 if (sendReturn.getFirst()) {
436 // Failed to contact over local
437 arbitratorTriedAndFailed->add(transaction->getArbitrator());
439 // Successful contact or should not contact
441 if (sendReturn.getSecond()) {
447 pendingTransactionQueue->setSize(oldindex);
450 updateLiveStateFromLocal();
452 return transactionStatus;
456 * Recalculate the new resize threshold
458 void Table::setResizeThreshold() {
459 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
463 int64_t Table::getLocalSequenceNumber() {
464 return localSequenceNumber;
467 bool Table::sendToServer(NewKey *newKey) {
468 bool fromRetry = false;
470 if (hadPartialSendToServer) {
471 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472 if (newSlots->length() == 0) {
474 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
476 if (sendSlotsReturn.getFirst()) {
477 if (newKey != NULL) {
478 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
483 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484 while (trit->hasNext()) {
485 Transaction *transaction = trit->next();
486 transaction->resetServerFailure();
487 // Update which transactions parts still need to be sent
488 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489 // Add the transaction status to the outstanding list
490 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
492 // Update the transaction status
493 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
495 // Check if all the transaction parts were successfully
496 // sent and if so then remove it from pending
497 if (transaction->didSendAllParts()) {
498 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499 pendingTransactionQueue->remove(transaction);
504 newSlots = sendSlotsReturn.getThird();
505 bool isInserted = false;
506 for (uint si = 0; si < newSlots->length(); si++) {
507 Slot *s = newSlots->get(si);
508 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
514 for (uint si = 0; si < newSlots->length(); si++) {
515 Slot *s = newSlots->get(si);
520 // Process each entry in the slot
521 Vector<Entry *> *ventries = s->getEntries();
522 uint vesize = ventries->size();
523 for (uint vei = 0; vei < vesize; vei++) {
524 Entry *entry = ventries->get(vei);
525 if (entry->getType() == TypeLastMessage) {
526 LastMessage *lastMessage = (LastMessage *)entry;
527 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
536 if (newKey != NULL) {
537 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
542 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543 while (trit->hasNext()) {
544 Transaction *transaction = trit->next();
545 transaction->resetServerFailure();
547 // Update which transactions parts still need to be sent
548 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
550 // Add the transaction status to the outstanding list
551 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
553 // Update the transaction status
554 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
556 // Check if all the transaction parts were successfully sent and if so then remove it from pending
557 if (transaction->didSendAllParts()) {
558 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559 pendingTransactionQueue->remove(transaction);
561 transaction->resetServerFailure();
562 // Set the transaction sequence number back to nothing
563 if (!transaction->didSendAPartToServer()) {
564 transaction->setSequenceNumber(-1);
572 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573 while (trit->hasNext()) {
574 Transaction *transaction = trit->next();
575 transaction->resetServerFailure();
576 // Set the transaction sequence number back to nothing
577 if (!transaction->didSendAPartToServer()) {
578 transaction->setSequenceNumber(-1);
583 if (sendSlotsReturn.getThird()->length() != 0) {
584 // insert into the local block chain
585 validateAndUpdate(sendSlotsReturn.getThird(), true);
589 bool isInserted = false;
590 for (uint si = 0; si < newSlots->length(); si++) {
591 Slot *s = newSlots->get(si);
592 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
598 for (uint si = 0; si < newSlots->length(); si++) {
599 Slot *s = newSlots->get(si);
604 // Process each entry in the slot
605 Vector<Entry *> *entries = s->getEntries();
606 uint eSize = entries->size();
607 for(uint ei=0; ei < eSize; ei++) {
608 Entry * entry = entries->get(ei);
610 if (entry->getType() == TypeLastMessage) {
611 LastMessage *lastMessage = (LastMessage *)entry;
612 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
621 if (newKey != NULL) {
622 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
627 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628 while (trit->hasNext()) {
629 Transaction *transaction = trit->next();
630 transaction->resetServerFailure();
632 // Update which transactions parts still need to be sent
633 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
635 // Add the transaction status to the outstanding list
636 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
638 // Update the transaction status
639 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
641 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642 if (transaction->didSendAllParts()) {
643 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644 pendingTransactionQueue->remove(transaction);
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656 while (trit->hasNext()) {
657 Transaction *transaction = trit->next();
658 transaction->resetServerFailure();
659 // Set the transaction sequence number back to nothing
660 if (!transaction->didSendAPartToServer()) {
661 transaction->setSequenceNumber(-1);
667 // insert into the local block chain
668 validateAndUpdate(newSlots, true);
671 } catch (ServerException *e) {
678 // While we have stuff that needs inserting into the block chain
679 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
683 if (hadPartialSendToServer) {
684 throw new Error("Should Be error free");
689 // If there is a new key with same name then end
690 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
695 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696 localSequenceNumber++;
698 // Try to fill the slot with data
699 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700 bool needsResize = fillSlotsReturn.getFirst();
701 int newSize = fillSlotsReturn.getSecond();
702 bool insertedNewKey = fillSlotsReturn.getThird();
705 // Reset which transaction to send
706 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707 while (trit->hasNext()) {
708 Transaction *transaction = trit->next();
709 transaction->resetNextPartToSend();
711 // Set the transaction sequence number back to nothing
712 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713 transaction->setSequenceNumber(-1);
718 // Clear the sent data since we are trying again
719 pendingSendArbitrationEntriesToDelete->clear();
720 transactionPartsSent->clear();
722 // We needed a resize so try again
723 fillSlot(slot, true, newKey);
726 lastSlotAttemptedToSend = slot;
727 lastIsNewKey = (newKey != NULL);
728 lastInsertedNewKey = insertedNewKey;
729 lastNewSize = newSize;
731 lastTransactionPartsSent = transactionPartsSent->clone();
732 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
734 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
736 if (sendSlotsReturn.getFirst()) {
738 // Did insert into the block chain
740 if (insertedNewKey) {
741 // This slot was what was inserted not a previous slot
743 // New Key was successfully inserted into the block chain so dont want to insert it again
747 // Remove the aborts and commit parts that were sent from the pending to send queue
748 uint size = pendingSendArbitrationRounds->size();
750 for (uint i = 0; i < size; i++) {
751 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752 round->removeParts(pendingSendArbitrationEntriesToDelete);
754 if (!round->isDoneSending()) {
755 // Sent all the parts
756 pendingSendArbitrationRounds->set(oldcount++,
757 pendingSendArbitrationRounds->get(i));
760 pendingSendArbitrationRounds->setSize(oldcount);
762 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763 while (trit->hasNext()) {
764 Transaction *transaction = trit->next();
765 transaction->resetServerFailure();
767 // Update which transactions parts still need to be sent
768 transaction->removeSentParts(transactionPartsSent->get(transaction));
770 // Add the transaction status to the outstanding list
771 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
773 // Update the transaction status
774 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
776 // Check if all the transaction parts were successfully sent and if so then remove it from pending
777 if (transaction->didSendAllParts()) {
778 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779 pendingTransactionQueue->remove(transaction);
784 // Reset which transaction to send
785 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786 while (trit->hasNext()) {
787 Transaction *transaction = trit->next();
788 transaction->resetNextPartToSend();
790 // Set the transaction sequence number back to nothing
791 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792 transaction->setSequenceNumber(-1);
798 // Clear the sent data in preparation for next send
799 pendingSendArbitrationEntriesToDelete->clear();
800 transactionPartsSent->clear();
802 if (sendSlotsReturn.getThird()->length() != 0) {
803 // insert into the local block chain
804 validateAndUpdate(sendSlotsReturn.getThird(), true);
808 } catch (ServerException *e) {
809 if (e->getType() != ServerException_TypeInputTimeout) {
810 // Nothing was able to be sent to the server so just clear these data structures
811 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812 while (trit->hasNext()) {
813 Transaction *transaction = trit->next();
814 transaction->resetNextPartToSend();
816 // Set the transaction sequence number back to nothing
817 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818 transaction->setSequenceNumber(-1);
823 // There was a partial send to the server
824 hadPartialSendToServer = true;
826 // Nothing was able to be sent to the server so just clear these data structures
827 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828 while (trit->hasNext()) {
829 Transaction *transaction = trit->next();
830 transaction->resetNextPartToSend();
831 transaction->setServerFailure();
836 pendingSendArbitrationEntriesToDelete->clear();
837 transactionPartsSent->clear();
842 return newKey == NULL;
845 bool Table::updateFromLocal(int64_t machineId) {
846 if (!localCommunicationTable->contains(machineId))
849 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
851 // Get the size of the send data
852 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
859 Array<char> *sendData = new Array<char>(sendDataSize);
860 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
863 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
867 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868 localSequenceNumber++;
870 if (returnData == NULL) {
871 // Could not contact server
876 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877 int numberOfEntries = bbDecode->getInt();
879 for (int i = 0; i < numberOfEntries; i++) {
880 char type = bbDecode->get();
881 if (type == TypeAbort) {
882 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
884 } else if (type == TypeCommitPart) {
885 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886 processEntry(commitPart);
890 updateLiveStateFromLocal();
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
897 // Get the devices local communications
898 if (!localCommunicationTable->contains(transaction->getArbitrator()))
899 return Pair<bool, bool>(true, false);
901 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
903 // Get the size of the send data
904 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
906 Vector<TransactionPart *> * tParts = transaction->getParts();
907 uint tPartsSize = tParts->size();
908 for (uint i = 0; i < tPartsSize; i++) {
909 TransactionPart * part = tParts->get(i);
910 sendDataSize += part->getSize();
914 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
919 // Make the send data size
920 Array<char> *sendData = new Array<char>(sendDataSize);
921 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
924 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925 bbEncode->putInt(transaction->getParts()->size());
927 Vector<TransactionPart *> * tParts = transaction->getParts();
928 uint tPartsSize = tParts->size();
929 for (uint i = 0; i < tPartsSize; i++) {
930 TransactionPart * part = tParts->get(i);
931 part->encode(bbEncode);
936 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937 localSequenceNumber++;
939 if (returnData == NULL) {
940 // Could not contact server
941 return Pair<bool, bool>(true, false);
945 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946 bool didCommit = bbDecode->get() == 1;
947 bool couldArbitrate = bbDecode->get() == 1;
948 int numberOfEntries = bbDecode->getInt();
949 bool foundAbort = false;
951 for (int i = 0; i < numberOfEntries; i++) {
952 char type = bbDecode->get();
953 if (type == TypeAbort) {
954 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
956 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
961 } else if (type == TypeCommitPart) {
962 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963 processEntry(commitPart);
967 updateLiveStateFromLocal();
969 if (couldArbitrate) {
970 TransactionStatus * status = transaction->getTransactionStatus();
972 status->setStatus(TransactionStatus_StatusCommitted);
974 status->setStatus(TransactionStatus_StatusAborted);
977 TransactionStatus * status = transaction->getTransactionStatus();
979 status->setStatus(TransactionStatus_StatusAborted);
981 status->setStatus(TransactionStatus_StatusCommitted);
985 return Pair<bool, bool>(false, true);
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
990 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992 int numberOfParts = bbDecode->getInt();
994 // If we did commit a transaction or not
995 bool didCommit = false;
996 bool couldArbitrate = false;
998 if (numberOfParts != 0) {
1000 // decode the transaction
1001 Transaction *transaction = new Transaction();
1002 for (int i = 0; i < numberOfParts; i++) {
1004 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005 transaction->addPartDecode(newPart);
1008 // Arbitrate on transaction and pull relevant return data
1009 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010 couldArbitrate = localArbitrateReturn.getFirst();
1011 didCommit = localArbitrateReturn.getSecond();
1013 updateLiveStateFromLocal();
1015 // Transaction was sent to the server so keep track of it to prevent double commit
1016 if (transaction->getSequenceNumber() != -1) {
1017 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1021 // The data to send back
1022 int returnDataSize = 0;
1023 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1025 // Get the aborts to send back
1026 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1028 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029 while(abortit->hasNext())
1030 abortLocalSequenceNumbers->add(abortit->next());
1034 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1036 uint asize = abortLocalSequenceNumbers->size();
1037 for(uint i=0; i<asize; i++) {
1038 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1043 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044 unseenArbitrations->add(abort);
1045 returnDataSize += abort->getSize();
1048 // Get the commits to send back
1049 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050 if (commitForClientTable != NULL) {
1051 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1053 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054 while(commitit->hasNext())
1055 commitLocalSequenceNumbers->add(commitit->next());
1058 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1060 uint clsSize = commitLocalSequenceNumbers->size();
1061 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063 Commit *commit = commitForClientTable->get(localSequenceNumber);
1065 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1070 Vector<CommitPart *> * parts = commit->getParts();
1071 uint nParts = parts->size();
1072 for(uint i=0; i<nParts; i++) {
1073 CommitPart * commitPart = parts->get(i);
1074 unseenArbitrations->add(commitPart);
1075 returnDataSize += commitPart->getSize();
1081 // Number of arbitration entries to decode
1082 returnDataSize += 2 * sizeof(int32_t);
1084 // bool of did commit or not
1085 if (numberOfParts != 0) {
1086 returnDataSize += sizeof(char);
1089 // Data to send Back
1090 Array<char> *returnData = new Array<char>(returnDataSize);
1091 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1093 if (numberOfParts != 0) {
1095 bbEncode->put((char)1);
1097 bbEncode->put((char)0);
1099 if (couldArbitrate) {
1100 bbEncode->put((char)1);
1102 bbEncode->put((char)0);
1106 bbEncode->putInt(unseenArbitrations->size());
1107 uint size = unseenArbitrations->size();
1108 for (uint i = 0; i < size; i++) {
1109 Entry *entry = unseenArbitrations->get(i);
1110 entry->encode(bbEncode);
1113 localSequenceNumber++;
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119 attemptedToSendToServer = true;
1121 bool inserted = false;
1122 bool lastTryInserted = false;
1124 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125 if (array == NULL) {
1126 array = new Array<Slot *>();
1127 array->set(0, slot);
1128 rejectedSlotVector->clear();
1131 if (array->length() == 0) {
1132 throw new Error("Server Error: Did not send any slots");
1135 // if (attemptedToSendToServerTmp) {
1136 if (hadPartialSendToServer) {
1138 bool isInserted = false;
1139 uint size = array->length();
1140 for (uint i = 0; i < size; i++) {
1141 Slot *s = array->get(i);
1142 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1148 for (uint i = 0; i < size; i++) {
1149 Slot *s = array->get(i);
1154 // Process each entry in the slot
1155 Vector<Entry *> *entries = s->getEntries();
1156 uint eSize = entries->size();
1157 for(uint ei=0; ei < eSize; ei++) {
1158 Entry * entry = entries->get(ei);
1160 if (entry->getType() == TypeLastMessage) {
1161 LastMessage *lastMessage = (LastMessage *)entry;
1163 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1172 rejectedSlotVector->add(slot->getSequenceNumber());
1173 lastTryInserted = false;
1175 lastTryInserted = true;
1178 rejectedSlotVector->add(slot->getSequenceNumber());
1179 lastTryInserted = false;
1183 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1187 * Returns false if a resize was needed
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1191 if (liveSlotCount > bufferResizeThreshold) {
1192 resize = true;//Resize is forced
1196 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197 TableStatus *status = new TableStatus(slot, newSize);
1198 slot->addEntry(status);
1201 // Fill with rejected slots first before doing anything else
1202 doRejectedMessages(slot);
1204 // Do mandatory rescue of entries
1205 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1207 // Extract working variables
1208 bool needsResize = mandatoryRescueReturn.getFirst();
1209 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1212 if (needsResize && !resize) {
1213 // We need to resize but we are not resizing so return false
1214 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1217 bool inserted = false;
1218 if (newKeyEntry != NULL) {
1219 newKeyEntry->setSlot(slot);
1220 if (slot->hasSpace(newKeyEntry)) {
1221 slot->addEntry(newKeyEntry);
1226 // Clear the transactions, aborts and commits that were sent previously
1227 transactionPartsSent->clear();
1228 pendingSendArbitrationEntriesToDelete->clear();
1229 uint size = pendingSendArbitrationRounds->size();
1230 for (uint i = 0; i < size; i++) {
1231 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232 bool isFull = false;
1233 round->generateParts();
1234 Vector<Entry *> *parts = round->getParts();
1236 // Insert pending arbitration data
1237 uint vsize = parts->size();
1238 for (uint vi = 0; vi < vsize; vi++) {
1239 Entry *arbitrationData = parts->get(vi);
1241 // If it is an abort then we need to set some information
1242 if (arbitrationData->getType() == TypeAbort) {
1243 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1246 if (!slot->hasSpace(arbitrationData)) {
1247 // No space so cant do anything else with these data entries
1252 // Add to this current slot and add it to entries to delete
1253 slot->addEntry(arbitrationData);
1254 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1262 if (pendingTransactionQueue->size() > 0) {
1263 Transaction *transaction = pendingTransactionQueue->get(0);
1264 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266 transaction->setSequenceNumber(slot->getSequenceNumber());
1270 TransactionPart *part = transaction->getNextPartToSend();
1272 // Ran out of parts to send for this transaction so move on
1276 if (slot->hasSpace(part)) {
1277 slot->addEntry(part);
1278 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279 if (partsSent == NULL) {
1280 partsSent = new Vector<int32_t>();
1281 transactionPartsSent->put(transaction, partsSent);
1283 partsSent->add(part->getPartNumber());
1284 transactionPartsSent->put(transaction, partsSent);
1291 // Fill the remainder of the slot with rescue data
1292 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1294 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1297 void Table::doRejectedMessages(Slot *s) {
1298 if (!rejectedSlotVector->isEmpty()) {
1299 /* TODO: We should avoid generating a rejected message entry if
1300 * there is already a sufficient entry in the queue (e->g->,
1301 * equalsto value of true and same sequence number)-> */
1303 int64_t old_seqn = rejectedSlotVector->get(0);
1304 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305 int64_t new_seqn = rejectedSlotVector->lastElement();
1306 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1309 int64_t prev_seqn = -1;
1311 /* Go through list of missing messages */
1312 for (; i < rejectedSlotVector->size(); i++) {
1313 int64_t curr_seqn = rejectedSlotVector->get(i);
1314 Slot *s_msg = buffer->getSlot(curr_seqn);
1317 prev_seqn = curr_seqn;
1319 /* Generate rejected message entry for missing messages */
1320 if (prev_seqn != -1) {
1321 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1324 /* Generate rejected message entries for present messages */
1325 for (; i < rejectedSlotVector->size(); i++) {
1326 int64_t curr_seqn = rejectedSlotVector->get(i);
1327 Slot *s_msg = buffer->getSlot(curr_seqn);
1328 int64_t machineid = s_msg->getMachineID();
1329 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1343 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344 bool seenLiveSlot = false;
1345 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1346 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1350 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352 // Push slot number forward
1353 if (!seenLiveSlot) {
1354 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1357 if (!previousSlot->isLive()) {
1361 // We have seen a live slot
1362 seenLiveSlot = true;
1364 // Get all the live entries for a slot
1365 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1367 // Iterate over all the live entries and try to rescue them
1368 uint lESize = liveEntries->size();
1369 for (uint i=0; i< lESize; i++) {
1370 Entry * liveEntry = liveEntries->get(i);
1371 if (slot->hasSpace(liveEntry)) {
1372 // Enough space to rescue the entry
1373 slot->addEntry(liveEntry);
1374 } else if (currentSequenceNumber == firstIfFull) {
1375 //if there's no space but the entry is about to fall off the queue
1376 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1382 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386 /* now go through live entries from least to greatest sequence number until
1387 * either all live slots added, or the slot doesn't have enough room
1388 * for SKIP_THRESHOLD consecutive entries*/
1390 int64_t newestseqnum = buffer->getNewestSeqNum();
1391 for (; seqn <= newestseqnum; seqn++) {
1392 Slot *prevslot = buffer->getSlot(seqn);
1393 //Push slot number forward
1395 oldestLiveSlotSequenceNumver = seqn;
1397 if (!prevslot->isLive())
1399 seenliveslot = true;
1400 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1401 uint lESize = liveentries->size();
1402 for (uint i=0; i< lESize; i++) {
1403 Entry * liveentry = liveentries->get(i);
1404 if (s->hasSpace(liveentry))
1405 s->addEntry(liveentry);
1408 if (skipcount > Table_SKIP_THRESHOLD)
1418 * Checks for malicious activity and updates the local copy of the block chain->
1420 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1421 // The cloud communication layer has checked slot HMACs already
1423 if (newSlots->length() == 0) {
1427 // Make sure all slots are newer than the last largest slot this
1429 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1430 if (firstSeqNum <= sequenceNumber) {
1431 throw new Error("Server Error: Sent older slots!");
1434 // Create an object that can access both new slots and slots in our
1435 // local chain without committing slots to our local chain
1436 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1438 // Check that the HMAC chain is not broken
1439 checkHMACChain(indexer, newSlots);
1441 // Set to keep track of messages from clients
1442 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1444 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1445 while(lmit->hasNext())
1446 machineSet->add(lmit->next());
1450 // Process each slots data
1452 uint numSlots = newSlots->length();
1453 for(uint i=0; i<numSlots; i++) {
1454 Slot *slot = newSlots->get(i);
1455 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1456 updateExpectedSize();
1460 // If there is a gap, check to see if the server sent us
1462 if (firstSeqNum != (sequenceNumber + 1)) {
1464 // Check the size of the slots that were sent down by the server->
1465 // Can only check the size if there was a gap
1466 checkNumSlots(newSlots->length());
1468 // Since there was a gap every machine must have pushed a slot or
1469 // must have a last message message-> If not then the server is
1471 if (!machineSet->isEmpty()) {
1472 throw new Error("Missing record for machines: ");
1476 // Update the size of our local block chain->
1479 // Commit new to slots to the local block chain->
1481 uint numSlots = newSlots->length();
1482 for(uint i=0; i<numSlots; i++) {
1483 Slot *slot = newSlots->get(i);
1485 // Insert this slot into our local block chain copy->
1486 buffer->putSlot(slot);
1488 // Keep track of how many slots are currently live (have live data
1493 // Get the sequence number of the latest slot in the system
1494 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1495 updateLiveStateFromServer();
1497 // No Need to remember after we pulled from the server
1498 offlineTransactionsCommittedAndAtServer->clear();
1500 // This is invalidated now
1501 hadPartialSendToServer = false;
1504 void Table::updateLiveStateFromServer() {
1505 // Process the new transaction parts
1506 processNewTransactionParts();
1508 // Do arbitration on new transactions that were received
1509 arbitrateFromServer();
1511 // Update all the committed keys
1512 bool didCommitOrSpeculate = updateCommittedTable();
1514 // Delete the transactions that are now dead
1515 updateLiveTransactionsAndStatus();
1518 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1519 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1522 void Table::updateLiveStateFromLocal() {
1523 // Update all the committed keys
1524 bool didCommitOrSpeculate = updateCommittedTable();
1526 // Delete the transactions that are now dead
1527 updateLiveTransactionsAndStatus();
1530 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1531 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1534 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1535 int64_t prevslots = firstSequenceNumber;
1537 if (didFindTableStatus) {
1539 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1542 didFindTableStatus = true;
1543 currMaxSize = numberOfSlots;
1546 void Table::updateExpectedSize() {
1549 if (expectedsize > currMaxSize) {
1550 expectedsize = currMaxSize;
1556 * Check the size of the block chain to make sure there are enough
1557 * slots sent back by the server-> This is only called when we have a
1558 * gap between the slots that we have locally and the slots sent by
1559 * the server therefore in the slots sent by the server there will be
1560 * at least 1 Table status message
1562 void Table::checkNumSlots(int numberOfSlots) {
1563 if (numberOfSlots != expectedsize) {
1564 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1569 * Update the size of of the local buffer if it is needed->
1571 void Table::commitNewMaxSize() {
1572 didFindTableStatus = false;
1574 // Resize the local slot buffer
1575 if (numberOfSlots != currMaxSize) {
1576 buffer->resize((int32_t)currMaxSize);
1579 // Change the number of local slots to the new size
1580 numberOfSlots = (int32_t)currMaxSize;
1582 // Recalculate the resize threshold since the size of the local
1583 // buffer has changed
1584 setResizeThreshold();
1588 * Process the new transaction parts from this latest round of slots
1589 * received from the server
1591 void Table::processNewTransactionParts() {
1593 if (newTransactionParts->size() == 0) {
1594 // Nothing new to process
1598 // Iterate through all the machine Ids that we received new parts
1600 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1601 while(tpit->hasNext()) {
1602 int64_t machineId = tpit->next();
1603 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1605 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1606 // Iterate through all the parts for that machine Id
1607 while(ptit->hasNext()) {
1608 Pair<int64_t, int32_t> * partId = ptit->next();
1609 TransactionPart *part = parts->get(partId);
1611 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1612 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1613 if (lastTransactionNumber >= part->getSequenceNumber()) {
1614 // Set dead the transaction part
1620 // Get the transaction object for that sequence number
1621 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1623 if (transaction == NULL) {
1624 // This is a new transaction that we dont have so make a new one
1625 transaction = new Transaction();
1627 // Insert this new transaction into the live tables
1628 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1629 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1632 // Add that part to the transaction
1633 transaction->addPartDecode(part);
1638 // Clear all the new transaction parts in preparation for the next
1639 // time the server sends slots
1640 newTransactionParts->clear();
1643 void Table::arbitrateFromServer() {
1645 if (liveTransactionBySequenceNumberTable->size() == 0) {
1646 // Nothing to arbitrate on so move on
1650 // Get the transaction sequence numbers and sort from oldest to newest
1651 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1653 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1654 while(trit->hasNext())
1655 transactionSequenceNumbers->add(trit->next());
1658 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1660 // Collection of key value pairs that are
1661 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1663 // The last transaction arbitrated on
1664 int64_t lastTransactionCommitted = -1;
1665 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1666 uint tsnSize = transactionSequenceNumbers->size();
1667 for(uint i=0; i<tsnSize; i++) {
1668 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1669 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1671 // Check if this machine arbitrates for this transaction if not
1672 // then we cant arbitrate this transaction
1673 if (transaction->getArbitrator() != localMachineId) {
1677 if (transactionSequenceNumber < lastSeqNumArbOn) {
1681 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1682 // We have seen this already locally so dont commit again
1687 if (!transaction->isComplete()) {
1688 // Will arbitrate in incorrect order if we continue so just break
1694 // update the largest transaction seen by arbitrator from server
1695 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1696 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1698 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1699 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1700 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1704 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1705 // Guard evaluated as true
1707 // Update the local changes so we can make the commit
1708 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1709 while (kvit->hasNext()) {
1710 KeyValue *kv = kvit->next();
1711 speculativeTableTmp->put(kv->getKey(), kv);
1715 // Update what the last transaction committed was for use in batch commit
1716 lastTransactionCommitted = transactionSequenceNumber;
1718 // Guard evaluated was false so create abort
1720 Abort *newAbort = new Abort(NULL,
1721 transaction->getClientLocalSequenceNumber(),
1722 transaction->getSequenceNumber(),
1723 transaction->getMachineId(),
1724 transaction->getArbitrator(),
1725 localArbitrationSequenceNumber);
1726 localArbitrationSequenceNumber++;
1727 generatedAborts->add(newAbort);
1729 // Insert the abort so we can process
1730 processEntry(newAbort);
1733 lastSeqNumArbOn = transactionSequenceNumber;
1736 Commit *newCommit = NULL;
1738 // If there is something to commit
1739 if (speculativeTableTmp->size() != 0) {
1740 // Create the commit and increment the commit sequence number
1741 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1742 localArbitrationSequenceNumber++;
1744 // Add all the new keys to the commit
1745 SetIterator<IoTString *, KeyValue *> * spit = getKeyIterator(speculativeTableTmp);
1746 while(spit->hasNext()) {
1747 IoTString * string = spit->next();
1748 KeyValue * kv = speculativeTableTmp->get(string);
1749 newCommit->addKV(kv);
1753 // create the commit parts
1754 newCommit->createCommitParts();
1756 // Append all the commit parts to the end of the pending queue
1757 // waiting for sending to the server
1758 // Insert the commit so we can process it
1759 Vector<CommitPart *> * parts = newCommit->getParts();
1760 uint partsSize = parts->size();
1761 for(uint i=0; i<partsSize; i++) {
1762 CommitPart * commitPart = parts->get(i);
1763 processEntry(commitPart);
1767 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1768 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1769 pendingSendArbitrationRounds->add(arbitrationRound);
1771 if (compactArbitrationData()) {
1772 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1773 if (newArbitrationRound->getCommit() != NULL) {
1774 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1775 uint partsSize = parts->size();
1776 for(uint i=0; i<partsSize; i++) {
1777 CommitPart * commitPart = parts->get(i);
1778 processEntry(commitPart);
1785 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1787 // Check if this machine arbitrates for this transaction if not then
1788 // we cant arbitrate this transaction
1789 if (transaction->getArbitrator() != localMachineId) {
1790 return Pair<bool, bool>(false, false);
1793 if (!transaction->isComplete()) {
1794 // Will arbitrate in incorrect order if we continue so just break
1796 return Pair<bool, bool>(false, false);
1799 if (transaction->getMachineId() != localMachineId) {
1800 // dont do this check for local transactions
1801 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1802 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1803 // We've have already seen this from the server
1804 return Pair<bool, bool>(false, false);
1809 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1810 // Guard evaluated as true Create the commit and increment the
1811 // commit sequence number
1812 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1813 localArbitrationSequenceNumber++;
1815 // Update the local changes so we can make the commit
1816 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1817 while (kvit->hasNext()) {
1818 KeyValue *kv = kvit->next();
1819 newCommit->addKV(kv);
1823 // create the commit parts
1824 newCommit->createCommitParts();
1826 // Append all the commit parts to the end of the pending queue
1827 // waiting for sending to the server
1828 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1829 pendingSendArbitrationRounds->add(arbitrationRound);
1831 if (compactArbitrationData()) {
1832 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1833 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1834 uint partsSize = parts->size();
1835 for(uint i=0; i<partsSize; i++) {
1836 CommitPart * commitPart = parts->get(i);
1837 processEntry(commitPart);
1840 // Insert the commit so we can process it
1841 Vector<CommitPart *> * parts = newCommit->getParts();
1842 uint partsSize = parts->size();
1843 for(uint i=0; i<partsSize; i++) {
1844 CommitPart * commitPart = parts->get(i);
1845 processEntry(commitPart);
1849 if (transaction->getMachineId() == localMachineId) {
1850 TransactionStatus *status = transaction->getTransactionStatus();
1851 if (status != NULL) {
1852 status->setStatus(TransactionStatus_StatusCommitted);
1856 updateLiveStateFromLocal();
1857 return Pair<bool, bool>(true, true);
1859 if (transaction->getMachineId() == localMachineId) {
1860 // For locally created messages update the status
1861 // Guard evaluated was false so create abort
1862 TransactionStatus * status = transaction->getTransactionStatus();
1863 if (status != NULL) {
1864 status->setStatus(TransactionStatus_StatusAborted);
1867 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1870 Abort *newAbort = new Abort(NULL,
1871 transaction->getClientLocalSequenceNumber(),
1873 transaction->getMachineId(),
1874 transaction->getArbitrator(),
1875 localArbitrationSequenceNumber);
1876 localArbitrationSequenceNumber++;
1877 addAbortSet->add(newAbort);
1879 // Append all the commit parts to the end of the pending queue
1880 // waiting for sending to the server
1881 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1882 pendingSendArbitrationRounds->add(arbitrationRound);
1884 if (compactArbitrationData()) {
1885 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1887 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1888 uint partsSize = parts->size();
1889 for(uint i=0; i<partsSize; i++) {
1890 CommitPart * commitPart = parts->get(i);
1891 processEntry(commitPart);
1896 updateLiveStateFromLocal();
1897 return Pair<bool, bool>(true, false);
1902 * Compacts the arbitration data my merging commits and aggregating
1903 * aborts so that a single large push of commits can be done instead
1904 * of many small updates
1906 bool Table::compactArbitrationData() {
1907 if (pendingSendArbitrationRounds->size() < 2) {
1908 // Nothing to compact so do nothing
1912 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1913 if (lastRound->getDidSendPart()) {
1917 bool hadCommit = (lastRound->getCommit() == NULL);
1918 bool gotNewCommit = false;
1920 int numberToDelete = 1;
1921 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1922 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1924 if (round->isFull() || round->getDidSendPart()) {
1925 // Stop since there is a part that cannot be compacted and we
1926 // need to compact in order
1930 if (round->getCommit() == NULL) {
1931 // Try compacting aborts only
1932 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1933 if (newSize > ArbitrationRound_MAX_PARTS) {
1934 // Cant compact since it would be too large
1937 lastRound->addAborts(round->getAborts());
1939 // Create a new larger commit
1940 Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1941 localArbitrationSequenceNumber++;
1943 // Create the commit parts so that we can count them
1944 newCommit->createCommitParts();
1946 // Calculate the new size of the parts
1947 int newSize = newCommit->getNumberOfParts();
1948 newSize += lastRound->getAbortsCount();
1949 newSize += round->getAbortsCount();
1951 if (newSize > ArbitrationRound_MAX_PARTS) {
1952 // Cant compact since it would be too large
1956 // Set the new compacted part
1957 lastRound->setCommit(newCommit);
1958 lastRound->addAborts(round->getAborts());
1959 gotNewCommit = true;
1965 if (numberToDelete != 1) {
1966 // If there is a compaction
1967 // Delete the previous pieces that are now in the new compacted piece
1968 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1969 pendingSendArbitrationRounds->clear();
1971 for (int i = 0; i < numberToDelete; i++) {
1972 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1976 // Add the new compacted into the pending to send list
1977 pendingSendArbitrationRounds->add(lastRound);
1979 // Should reinsert into the commit processor
1980 if (hadCommit && gotNewCommit) {
1989 * Update all the commits and the committed tables, sets dead the dead
1992 bool Table::updateCommittedTable() {
1994 if (newCommitParts->size() == 0) {
1995 // Nothing new to process
1999 // Iterate through all the machine Ids that we received new parts for
2000 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts);
2001 while(partsit->hasNext()) {
2002 int64_t machineId = partsit->next();
2003 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2005 // Iterate through all the parts for that machine Id
2006 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts);
2007 while(pairit->hasNext()) {
2008 Pair<int64_t, int32_t> * partId = pairit->next();
2009 CommitPart *part = parts->get(partId);
2011 // Get the transaction object for that sequence number
2012 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2014 if (commitForClientTable == NULL) {
2015 // This is the first commit from this device
2016 commitForClientTable = new Hashtable<int64_t, Commit *>();
2017 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2020 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2022 if (commit == NULL) {
2023 // This is a new commit that we dont have so make a new one
2024 commit = new Commit();
2026 // Insert this new commit into the live tables
2027 commitForClientTable->put(part->getSequenceNumber(), commit);
2030 // Add that part to the commit
2031 commit->addPartDecode(part);
2037 // Clear all the new commits parts in preparation for the next time
2038 // the server sends slots
2039 newCommitParts->clear();
2041 // If we process a new commit keep track of it for future use
2042 bool didProcessANewCommit = false;
2044 // Process the commits one by one
2045 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> * liveit = getKeyIterator(liveCommitsTable);
2046 while (liveit->hasNext()) {
2047 int64_t arbitratorId = liveit->next();
2049 // Get all the commits for a specific arbitrator
2050 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2052 // Sort the commits in order
2053 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2055 SetIterator<int64_t, Commit *> * clientit = getKeyIterator(commitForClientTable);
2056 while(clientit->hasNext())
2057 commitSequenceNumbers->add(clientit->next());
2061 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2063 // Get the last commit seen from this arbitrator
2064 int64_t lastCommitSeenSequenceNumber = -1;
2065 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2066 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2069 // Go through each new commit one by one
2070 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2071 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2072 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2074 // Special processing if a commit is not complete
2075 if (!commit->isComplete()) {
2076 if (i == (commitSequenceNumbers->size() - 1)) {
2077 // If there is an incomplete commit and this commit is the
2078 // latest one seen then this commit cannot be processed and
2079 // there are no other commits
2082 // This is a commit that was already dead but parts of it
2083 // are still in the block chain (not flushed out yet)->
2084 // Delete it and move on
2086 commitForClientTable->remove(commit->getSequenceNumber());
2091 // Update the last transaction that was updated if we can
2092 if (commit->getTransactionSequenceNumber() != -1) {
2093 // Update the last transaction sequence number that the arbitrator arbitrated on1
2094 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2095 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2099 // Update the last arbitration data that we have seen so far
2100 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2101 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2102 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2104 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2107 // Never seen any data from this arbitrator so record the first one
2108 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2111 // We have already seen this commit before so need to do the
2112 // full processing on this commit
2113 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2115 // Update the last transaction that was updated if we can
2116 if (commit->getTransactionSequenceNumber() != -1) {
2117 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2118 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2119 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2120 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2127 // If we got here then this is a brand new commit and needs full
2129 // Get what commits should be edited, these are the commits that
2130 // have live values for their keys
2131 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2133 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2134 while (kvit->hasNext()) {
2135 KeyValue *kv = kvit->next();
2136 Commit * commit = liveCommitsByKeyTable->get(kv->getKey());
2138 commitsToEdit->add(commit);
2143 // Update each previous commit that needs to be updated
2144 SetIterator<Commit *, Commit *> * commitit = commitsToEdit->iterator();
2145 while(commitit->hasNext()) {
2146 Commit *previousCommit = commitit->next();
2148 // Only bother with live commits (TODO: Maybe remove this check)
2149 if (previousCommit->isLive()) {
2151 // Update which keys in the old commits are still live
2153 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2154 while (kvit->hasNext()) {
2155 KeyValue *kv = kvit->next();
2156 previousCommit->invalidateKey(kv->getKey());
2161 // if the commit is now dead then remove it
2162 if (!previousCommit->isLive()) {
2163 commitForClientTable->remove(previousCommit->getSequenceNumber());
2169 // Update the last seen sequence number from this arbitrator
2170 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2171 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2172 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2175 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2178 // We processed a new commit that we havent seen before
2179 didProcessANewCommit = true;
2181 // Update the committed table of keys and which commit is using which key
2183 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2184 while (kvit->hasNext()) {
2185 KeyValue *kv = kvit->next();
2186 committedKeyValueTable->put(kv->getKey(), kv);
2187 liveCommitsByKeyTable->put(kv->getKey(), commit);
2195 return didProcessANewCommit;
2199 * Create the speculative table from transactions that are still live
2200 * and have come from the cloud
2202 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2203 if (liveTransactionBySequenceNumberTable->size() == 0) {
2204 // There is nothing to speculate on
2208 // Create a list of the transaction sequence numbers and sort them
2209 // from oldest to newest
2210 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2212 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2213 while(trit->hasNext())
2214 transactionSequenceNumbersSorted->add(trit->next());
2218 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2220 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2223 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2224 // If there is a gap in the transaction sequence numbers then
2225 // there was a commit or an abort of a transaction OR there was a
2226 // new commit (Could be from offline commit) so a redo the
2227 // speculation from scratch
2229 // Start from scratch
2230 speculatedKeyValueTable->clear();
2231 lastTransactionSequenceNumberSpeculatedOn = -1;
2232 oldestTransactionSequenceNumberSpeculatedOn = -1;
2235 // Remember the front of the transaction list
2236 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2238 // Find where to start arbitration from
2241 for(; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2242 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2246 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2247 // Make sure we are not out of bounds
2248 return false; // did not speculate
2251 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2252 bool didSkip = true;
2254 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2255 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2256 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2258 if (!transaction->isComplete()) {
2259 // If there is an incomplete transaction then there is nothing
2260 // we can do add this transactions arbitrator to the list of
2261 // arbitrators we should ignore
2262 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2267 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2271 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2273 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2274 // Guard evaluated to true so update the speculative table
2276 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2277 while (kvit->hasNext()) {
2278 KeyValue *kv = kvit->next();
2279 speculatedKeyValueTable->put(kv->getKey(), kv);
2287 // Since there was a skip we need to redo the speculation next time around
2288 lastTransactionSequenceNumberSpeculatedOn = -1;
2289 oldestTransactionSequenceNumberSpeculatedOn = -1;
2292 // We did some speculation
2297 * Create the pending transaction speculative table from transactions
2298 * that are still in the pending transaction buffer
2300 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2301 if (pendingTransactionQueue->size() == 0) {
2302 // There is nothing to speculate on
2306 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2307 // need to reset on the pending speculation
2308 lastPendingTransactionSpeculatedOn = NULL;
2309 firstPendingTransaction = pendingTransactionQueue->get(0);
2310 pendingTransactionSpeculatedKeyValueTable->clear();
2313 // Find where to start arbitration from
2316 for(; startIndex < pendingTransactionQueue->size(); startIndex++)
2317 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2320 if (startIndex >= pendingTransactionQueue->size()) {
2321 // Make sure we are not out of bounds
2325 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2326 Transaction *transaction = pendingTransactionQueue->get(i);
2328 lastPendingTransactionSpeculatedOn = transaction;
2330 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2331 // Guard evaluated to true so update the speculative table
2332 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2333 while (kvit->hasNext()) {
2334 KeyValue *kv = kvit->next();
2335 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2343 * Set dead and remove from the live transaction tables the
2344 * transactions that are dead
2346 void Table::updateLiveTransactionsAndStatus() {
2347 // Go through each of the transactions
2349 SetIterator<int64_t, Transaction *> * iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2350 while(iter->hasNext()) {
2351 int64_t key = iter->next();
2352 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2354 // Check if the transaction is dead
2355 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2356 // Set dead the transaction
2357 transaction->setDead();
2359 // Remove the transaction from the live table
2361 liveTransactionByTransactionIdTable->remove(transaction->getId());
2367 // Go through each of the transactions
2369 SetIterator<int64_t, TransactionStatus *> * iter = getKeyIterator(outstandingTransactionStatus);
2370 while(iter->hasNext()) {
2371 int64_t key = iter->next();
2372 TransactionStatus *status = outstandingTransactionStatus->get(key);
2374 // Check if the transaction is dead
2375 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2378 status->setStatus(TransactionStatus_StatusCommitted);
2389 * Process this slot, entry by entry-> Also update the latest message sent by slot
2391 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2393 // Update the last message seen
2394 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2396 // Process each entry in the slot
2397 Vector<Entry *> *entries = slot->getEntries();
2398 uint eSize = entries->size();
2399 for(uint ei=0; ei < eSize; ei++) {
2400 Entry * entry = entries->get(ei);
2401 switch (entry->getType()) {
2402 case TypeCommitPart:
2403 processEntry((CommitPart *)entry);
2406 processEntry((Abort *)entry);
2408 case TypeTransactionPart:
2409 processEntry((TransactionPart *)entry);
2412 processEntry((NewKey *)entry);
2414 case TypeLastMessage:
2415 processEntry((LastMessage *)entry, machineSet);
2417 case TypeRejectedMessage:
2418 processEntry((RejectedMessage *)entry, indexer);
2420 case TypeTableStatus:
2421 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2424 throw new Error("Unrecognized type: ");
2430 * Update the last message that was sent for a machine Id
2432 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2433 // Update what the last message received by a machine was
2434 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2438 * Add the new key to the arbitrators table and update the set of live
2439 * new keys (in case of a rescued new key message)
2441 void Table::processEntry(NewKey *entry) {
2442 // Update the arbitrator table with the new key information
2443 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2445 // Update what the latest live new key is
2446 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2447 if (oldNewKey != NULL) {
2448 // Delete the old new key messages
2449 oldNewKey->setDead();
2454 * Process new table status entries and set dead the old ones as new
2455 * ones come in-> keeps track of the largest and smallest table status
2456 * seen in this current round of updating the local copy of the block
2459 void Table::processEntry(TableStatus * entry, int64_t seq) {
2460 int newNumSlots = entry->getMaxSlots();
2461 updateCurrMaxSize(newNumSlots);
2462 initExpectedSize(seq, newNumSlots);
2464 if (liveTableStatus != NULL) {
2465 // We have a larger table status so the old table status is no
2467 liveTableStatus->setDead();
2470 // Make this new table status the latest alive table status
2471 liveTableStatus = entry;
2475 * Check old messages to see if there is a block chain violation->
2478 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2479 int64_t oldSeqNum = entry->getOldSeqNum();
2480 int64_t newSeqNum = entry->getNewSeqNum();
2481 bool isequal = entry->getEqual();
2482 int64_t machineId = entry->getMachineID();
2483 int64_t seq = entry->getSequenceNumber();
2485 // Check if we have messages that were supposed to be rejected in
2486 // our local block chain
2487 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2489 Slot *slot = indexer->getSlot(seqNum);
2492 // If we have this slot make sure that it was not supposed to be
2494 int64_t slotMachineId = slot->getMachineID();
2495 if (isequal != (slotMachineId == machineId)) {
2496 throw new Error("Server Error: Trying to insert rejected message for slot ");
2501 // Create a list of clients to watch until they see this rejected
2503 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2504 SetIterator<int64_t, Pair<int64_t, Liveness*> *> * iter = getKeyIterator(lastMessageTable);
2505 while(iter->hasNext()) {
2506 // Machine ID for the last message entry
2507 int64_t lastMessageEntryMachineId = iter->next();
2509 // We've seen it, don't need to continue to watch-> Our next
2510 // message will implicitly acknowledge it->
2511 if (lastMessageEntryMachineId == localMachineId) {
2515 Pair<int64_t, Liveness *> * lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2516 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2518 if (entrySequenceNumber < seq) {
2519 // Add this rejected message to the set of messages that this
2520 // machine ID did not see yet
2521 addWatchVector(lastMessageEntryMachineId, entry);
2522 // This client did not see this rejected message yet so add it
2523 // to the watch set to monitor
2524 deviceWatchSet->add(lastMessageEntryMachineId);
2529 if (deviceWatchSet->isEmpty()) {
2530 // This rejected message has been seen by all the clients so
2533 // We need to watch this rejected message
2534 entry->setWatchSet(deviceWatchSet);
2539 * Check if this abort is live, if not then save it so we can kill it
2540 * later-> update the last transaction number that was arbitrated on->
2542 void Table::processEntry(Abort *entry) {
2543 if (entry->getTransactionSequenceNumber() != -1) {
2544 // update the transaction status if it was sent to the server
2545 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2546 if (status != NULL) {
2547 status->setStatus(TransactionStatus_StatusAborted);
2551 // Abort has not been seen by the client it is for yet so we need to
2554 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2555 if (previouslySeenAbort != NULL) {
2556 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2559 if (entry->getTransactionArbitrator() == localMachineId) {
2560 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2563 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2564 // The machine already saw this so it is dead
2566 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2567 liveAbortTable->remove(&abortid);
2569 if (entry->getTransactionArbitrator() == localMachineId) {
2570 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2575 // Update the last arbitration data that we have seen so far
2576 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2577 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2578 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2580 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2583 // Never seen any data from this arbitrator so record the first one
2584 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2587 // Set dead a transaction if we can
2588 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2590 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2591 if (transactionToSetDead != NULL) {
2592 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2595 // Update the last transaction sequence number that the arbitrator
2597 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2598 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2600 if (entry->getTransactionSequenceNumber() != -1) {
2601 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2607 * Set dead the transaction part if that transaction is dead and keep
2608 * track of all new parts
2610 void Table::processEntry(TransactionPart *entry) {
2611 // Check if we have already seen this transaction and set it dead OR
2612 // if it is not alive
2613 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2614 // This transaction is dead, it was already committed or aborted
2619 // This part is still alive
2620 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2622 if (transactionPart == NULL) {
2623 // Dont have a table for this machine Id yet so make one
2624 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2625 newTransactionParts->put(entry->getMachineId(), transactionPart);
2628 // Update the part and set dead ones we have already seen (got a
2630 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2631 if (previouslySeenPart != NULL) {
2632 previouslySeenPart->setDead();
2637 * Process new commit entries and save them for future use-> Delete duplicates
2639 void Table::processEntry(CommitPart *entry) {
2640 // Update the last transaction that was updated if we can
2641 if (entry->getTransactionSequenceNumber() != -1) {
2642 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2643 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2647 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2648 if (commitPart == NULL) {
2649 // Don't have a table for this machine Id yet so make one
2650 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2651 newCommitParts->put(entry->getMachineId(), commitPart);
2653 // Update the part and set dead ones we have already seen (got a
2655 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2656 if (previouslySeenPart != NULL) {
2657 previouslySeenPart->setDead();
2662 * Update the last message seen table-> Update and set dead the
2663 * appropriate RejectedMessages as clients see them-> Updates the live
2664 * aborts, removes those that are dead and sets them dead-> Check that
2665 * the last message seen is correct and that there is no mismatch of
2666 * our own last message or that other clients have not had a rollback
2667 * on the last message->
2669 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2670 // We have seen this machine ID
2671 machineSet->remove(machineId);
2673 // Get the set of rejected messages that this machine Id is has not seen yet
2674 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2675 // If there is a rejected message that this machine Id has not seen yet
2676 if (watchset != NULL) {
2677 // Go through each rejected message that this machine Id has not
2680 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2681 while(rmit->hasNext()) {
2682 RejectedMessage *rm = rmit->next();
2683 // If this machine Id has seen this rejected message->->->
2684 if (rm->getSequenceNumber() <= seqNum) {
2685 // Remove it from our watchlist
2687 // Decrement machines that need to see this notification
2688 rm->removeWatcher(machineId);
2694 // Set dead the abort
2695 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> * abortit = getKeyIterator(liveAbortTable);
2697 while(abortit->hasNext()) {
2698 Pair<int64_t, int64_t> * key = abortit->next();
2699 Abort *abort = liveAbortTable->get(key);
2700 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2703 if (abort->getTransactionArbitrator() == localMachineId) {
2704 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2709 if (machineId == localMachineId) {
2710 // Our own messages are immediately dead->
2711 char livenessType = liveness->getType();
2712 if (livenessType==TypeLastMessage) {
2713 ((LastMessage *)liveness)->setDead();
2714 } else if (livenessType == TypeSlot) {
2715 ((Slot *)liveness)->setDead();
2717 throw new Error("Unrecognized type");
2720 // Get the old last message for this device
2721 Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2722 if (lastMessageEntry == NULL) {
2723 // If no last message then there is nothing else to process
2727 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2728 Liveness *lastEntry = lastMessageEntry->getSecond();
2729 delete lastMessageEntry;
2731 // If it is not our machine Id since we already set ours to dead
2732 if (machineId != localMachineId) {
2733 char lastEntryType = lastEntry->getType();
2735 if (lastEntryType == TypeLastMessage) {
2736 ((LastMessage *)lastEntry)->setDead();
2737 } else if (lastEntryType == TypeSlot) {
2738 ((Slot *)lastEntry)->setDead();
2740 throw new Error("Unrecognized type");
2743 // Make sure the server is not playing any games
2744 if (machineId == localMachineId) {
2745 if (hadPartialSendToServer) {
2746 // We were not making any updates and we had a machine mismatch
2747 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2748 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2751 // We were not making any updates and we had a machine mismatch
2752 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2753 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2757 if (lastMessageSeqNum > seqNum) {
2758 throw new Error("Server Error: Rollback on remote machine sequence number");
2764 * Add a rejected message entry to the watch set to keep track of
2765 * which clients have seen that rejected message entry and which have
2768 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2769 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2770 if (entries == NULL) {
2771 // There is no set for this machine ID yet so create one
2772 entries = new Hashset<RejectedMessage *>();
2773 rejectedMessageWatchVectorTable->put(machineId, entries);
2775 entries->add(entry);
2779 * Check if the HMAC chain is not violated
2781 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2782 for (int i = 0; i < newSlots->length(); i++) {
2783 Slot *currSlot = newSlots->get(i);
2784 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2785 if (prevSlot != NULL &&
2786 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2787 throw new Error("Server Error: Invalid HMAC Chain");