3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localTransactionSequenceNumber(0),
50 lastTransactionSequenceNumberSpeculatedOn(0),
51 oldestTransactionSequenceNumberSpeculatedOn(0),
52 localArbitrationSequenceNumber(0),
53 hadPartialSendToServer(false),
54 attemptedToSendToServer(false),
56 didFindTableStatus(false),
58 lastSlotAttemptedToSend(NULL),
61 lastTransactionPartsSent(NULL),
62 lastPendingSendArbitrationEntriesToDelete(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localTransactionSequenceNumber(0),
112 lastTransactionSequenceNumberSpeculatedOn(0),
113 oldestTransactionSequenceNumberSpeculatedOn(0),
114 localArbitrationSequenceNumber(0),
115 hadPartialSendToServer(false),
116 attemptedToSendToServer(false),
118 didFindTableStatus(false),
120 lastSlotAttemptedToSend(NULL),
123 lastTransactionPartsSent(NULL),
124 lastPendingSendArbitrationEntriesToDelete(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
164 delete committedKeyValueTable;
165 delete speculatedKeyValueTable;
166 delete pendingTransactionSpeculatedKeyValueTable;
167 delete liveNewKeyTable;
168 delete lastMessageTable;
169 delete rejectedMessageWatchVectorTable;
170 delete arbitratorTable;
171 delete liveAbortTable;
172 delete newTransactionParts;
173 delete newCommitParts;
174 delete lastArbitratedTransactionNumberByArbitratorTable;
175 delete liveTransactionBySequenceNumberTable;
176 delete liveTransactionByTransactionIdTable;
177 delete liveCommitsTable;
178 delete liveCommitsByKeyTable;
179 delete lastCommitSeenSequenceNumberByArbitratorTable;
180 delete rejectedSlotVector;
181 delete pendingTransactionQueue;
182 delete pendingSendArbitrationEntriesToDelete;
183 delete transactionPartsSent;
184 delete outstandingTransactionStatus;
185 delete liveAbortsGeneratedByLocal;
186 delete offlineTransactionsCommittedAndAtServer;
187 delete localCommunicationTable;
188 delete lastTransactionSeenFromMachineFromServer;
189 delete pendingSendArbitrationRounds;
190 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
194 * Init all the stuff needed for for table usage
197 // Init helper objects
198 random = new SecureRandom();
199 buffer = new SlotBuffer();
202 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
203 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
204 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
205 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
206 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
207 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
208 arbitratorTable = new Hashtable<IoTString *, int64_t>();
209 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
210 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
211 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
212 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
213 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
214 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
215 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
216 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
217 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
218 rejectedSlotVector = new Vector<int64_t>();
219 pendingTransactionQueue = new Vector<Transaction *>();
220 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
221 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
222 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
223 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
224 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
225 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
226 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
227 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
228 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
231 numberOfSlots = buffer->capacity();
232 setResizeThreshold();
236 * Initialize the table by inserting a table status as the first entry
237 * into the table status also initialize the crypto stuff.
239 void Table::initTable() {
240 cloud->initSecurity();
242 // Create the first insertion into the block chain which is the table status
243 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
244 localSequenceNumber++;
245 TableStatus *status = new TableStatus(s, numberOfSlots);
247 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
250 array = new Array<Slot *>(1);
252 // update local block chain
253 validateAndUpdate(array, true);
254 } else if (array->length() == 1) {
255 // in case we did push the slot BUT we failed to init it
256 validateAndUpdate(array, true);
258 throw new Error("Error on initialization");
263 * Rebuild the table from scratch by pulling the latest block chain
266 void Table::rebuild() {
267 // Just pull the latest slots from the server
268 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
269 validateAndUpdate(newslots, true);
271 updateLiveTransactionsAndStatus();
274 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
275 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
278 int64_t Table::getArbitrator(IoTString *key) {
279 return arbitratorTable->get(key);
282 void Table::close() {
286 IoTString *Table::getCommitted(IoTString *key) {
287 KeyValue *kv = committedKeyValueTable->get(key);
290 return kv->getValue();
296 IoTString *Table::getSpeculative(IoTString *key) {
297 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
300 kv = speculatedKeyValueTable->get(key);
304 kv = committedKeyValueTable->get(key);
308 return kv->getValue();
314 IoTString *Table::getCommittedAtomic(IoTString *key) {
315 KeyValue *kv = committedKeyValueTable->get(key);
317 if (!arbitratorTable->contains(key)) {
318 throw new Error("Key not Found.");
321 // Make sure new key value pair matches the current arbitrator
322 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
323 // TODO: Maybe not throw en error
324 throw new Error("Not all Key Values Match Arbitrator.");
328 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
329 return kv->getValue();
331 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
336 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
337 if (!arbitratorTable->contains(key)) {
338 throw new Error("Key not Found.");
341 // Make sure new key value pair matches the current arbitrator
342 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
343 // TODO: Maybe not throw en error
344 throw new Error("Not all Key Values Match Arbitrator.");
347 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
350 kv = speculatedKeyValueTable->get(key);
354 kv = committedKeyValueTable->get(key);
358 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
359 return kv->getValue();
361 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
366 bool Table::update() {
368 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
369 validateAndUpdate(newSlots, false);
371 updateLiveTransactionsAndStatus();
373 } catch (Exception *e) {
374 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
375 while (kit->hasNext()) {
376 int64_t m = kit->next();
385 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
387 if (!arbitratorTable->contains(keyName)) {
388 // There is already an arbitrator
391 NewKey *newKey = new NewKey(NULL, keyName, machineId);
393 if (sendToServer(newKey)) {
394 // If successfully inserted
400 void Table::startTransaction() {
401 // Create a new transaction, invalidates any old pending transactions.
402 pendingTransactionBuilder = new PendingTransaction(localMachineId);
405 void Table::addKV(IoTString *key, IoTString *value) {
407 // Make sure it is a valid key
408 if (!arbitratorTable->contains(key)) {
409 throw new Error("Key not Found.");
412 // Make sure new key value pair matches the current arbitrator
413 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
414 // TODO: Maybe not throw en error
415 throw new Error("Not all Key Values Match Arbitrator.");
418 // Add the key value to this transaction
419 KeyValue *kv = new KeyValue(key, value);
420 pendingTransactionBuilder->addKV(kv);
423 TransactionStatus *Table::commitTransaction() {
424 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
425 // transaction with no updates will have no effect on the system
426 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
429 // Set the local transaction sequence number and increment
430 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
431 localTransactionSequenceNumber++;
433 // Create the transaction status
434 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
436 // Create the new transaction
437 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
438 newTransaction->setTransactionStatus(transactionStatus);
440 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
441 // Add it to the queue and invalidate the builder for safety
442 pendingTransactionQueue->add(newTransaction);
444 arbitrateOnLocalTransaction(newTransaction);
445 updateLiveStateFromLocal();
448 pendingTransactionBuilder = new PendingTransaction(localMachineId);
452 } catch (ServerException *e) {
454 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
455 uint size = pendingTransactionQueue->size();
457 for (uint iter = 0; iter < size; iter++) {
458 Transaction *transaction = pendingTransactionQueue->get(iter);
459 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
461 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
462 // Already contacted this client so ignore all attempts to contact this client
463 // to preserve ordering for arbitrator
467 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
469 if (sendReturn.getFirst()) {
470 // Failed to contact over local
471 arbitratorTriedAndFailed->add(transaction->getArbitrator());
473 // Successful contact or should not contact
475 if (sendReturn.getSecond()) {
481 pendingTransactionQueue->setSize(oldindex);
484 updateLiveStateFromLocal();
486 return transactionStatus;
490 * Recalculate the new resize threshold
492 void Table::setResizeThreshold() {
493 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
494 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
497 int64_t Table::getLocalSequenceNumber() {
498 return localSequenceNumber;
501 bool Table::sendToServer(NewKey *newKey) {
502 bool fromRetry = false;
504 if (hadPartialSendToServer) {
505 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
506 if (newSlots->length() == 0) {
508 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
510 if (sendSlotsReturn.getFirst()) {
511 if (newKey != NULL) {
512 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
517 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
518 while (trit->hasNext()) {
519 Transaction *transaction = trit->next();
520 transaction->resetServerFailure();
521 // Update which transactions parts still need to be sent
522 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
523 // Add the transaction status to the outstanding list
524 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
526 // Update the transaction status
527 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
529 // Check if all the transaction parts were successfully
530 // sent and if so then remove it from pending
531 if (transaction->didSendAllParts()) {
532 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
533 pendingTransactionQueue->remove(transaction);
538 newSlots = sendSlotsReturn.getThird();
539 bool isInserted = false;
540 for (uint si = 0; si < newSlots->length(); si++) {
541 Slot *s = newSlots->get(si);
542 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
548 for (uint si = 0; si < newSlots->length(); si++) {
549 Slot *s = newSlots->get(si);
554 // Process each entry in the slot
555 Vector<Entry *> *ventries = s->getEntries();
556 uint vesize = ventries->size();
557 for (uint vei = 0; vei < vesize; vei++) {
558 Entry *entry = ventries->get(vei);
559 if (entry->getType() == TypeLastMessage) {
560 LastMessage *lastMessage = (LastMessage *)entry;
561 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
570 if (newKey != NULL) {
571 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
576 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
577 while (trit->hasNext()) {
578 Transaction *transaction = trit->next();
579 transaction->resetServerFailure();
581 // Update which transactions parts still need to be sent
582 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
584 // Add the transaction status to the outstanding list
585 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
587 // Update the transaction status
588 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
590 // Check if all the transaction parts were successfully sent and if so then remove it from pending
591 if (transaction->didSendAllParts()) {
592 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
593 pendingTransactionQueue->remove(transaction);
595 transaction->resetServerFailure();
596 // Set the transaction sequence number back to nothing
597 if (!transaction->didSendAPartToServer()) {
598 transaction->setSequenceNumber(-1);
606 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
607 while (trit->hasNext()) {
608 Transaction *transaction = trit->next();
609 transaction->resetServerFailure();
610 // Set the transaction sequence number back to nothing
611 if (!transaction->didSendAPartToServer()) {
612 transaction->setSequenceNumber(-1);
617 if (sendSlotsReturn.getThird()->length() != 0) {
618 // insert into the local block chain
619 validateAndUpdate(sendSlotsReturn.getThird(), true);
623 bool isInserted = false;
624 for (uint si = 0; si < newSlots->length(); si++) {
625 Slot *s = newSlots->get(si);
626 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
632 for (uint si = 0; si < newSlots->length(); si++) {
633 Slot *s = newSlots->get(si);
638 // Process each entry in the slot
639 Vector<Entry *> *entries = s->getEntries();
640 uint eSize = entries->size();
641 for (uint ei = 0; ei < eSize; ei++) {
642 Entry *entry = entries->get(ei);
644 if (entry->getType() == TypeLastMessage) {
645 LastMessage *lastMessage = (LastMessage *)entry;
646 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
655 if (newKey != NULL) {
656 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
661 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
662 while (trit->hasNext()) {
663 Transaction *transaction = trit->next();
664 transaction->resetServerFailure();
666 // Update which transactions parts still need to be sent
667 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
669 // Add the transaction status to the outstanding list
670 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
672 // Update the transaction status
673 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
675 // Check if all the transaction parts were successfully sent and if so then remove it from pending
676 if (transaction->didSendAllParts()) {
677 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
678 pendingTransactionQueue->remove(transaction);
680 transaction->resetServerFailure();
681 // Set the transaction sequence number back to nothing
682 if (!transaction->didSendAPartToServer()) {
683 transaction->setSequenceNumber(-1);
689 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
690 while (trit->hasNext()) {
691 Transaction *transaction = trit->next();
692 transaction->resetServerFailure();
693 // Set the transaction sequence number back to nothing
694 if (!transaction->didSendAPartToServer()) {
695 transaction->setSequenceNumber(-1);
701 // insert into the local block chain
702 validateAndUpdate(newSlots, true);
705 } catch (ServerException *e) {
712 // While we have stuff that needs inserting into the block chain
713 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
717 if (hadPartialSendToServer) {
718 throw new Error("Should Be error free");
723 // If there is a new key with same name then end
724 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
729 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
730 localSequenceNumber++;
732 // Try to fill the slot with data
733 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
734 bool needsResize = fillSlotsReturn.getFirst();
735 int newSize = fillSlotsReturn.getSecond();
736 bool insertedNewKey = fillSlotsReturn.getThird();
739 // Reset which transaction to send
740 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
741 while (trit->hasNext()) {
742 Transaction *transaction = trit->next();
743 transaction->resetNextPartToSend();
745 // Set the transaction sequence number back to nothing
746 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
747 transaction->setSequenceNumber(-1);
752 // Clear the sent data since we are trying again
753 pendingSendArbitrationEntriesToDelete->clear();
754 transactionPartsSent->clear();
756 // We needed a resize so try again
757 fillSlot(slot, true, newKey);
760 lastSlotAttemptedToSend = slot;
761 lastIsNewKey = (newKey != NULL);
762 lastInsertedNewKey = insertedNewKey;
763 lastNewSize = newSize;
765 lastTransactionPartsSent = transactionPartsSent->clone();
766 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
768 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
770 if (sendSlotsReturn.getFirst()) {
772 // Did insert into the block chain
774 if (insertedNewKey) {
775 // This slot was what was inserted not a previous slot
777 // New Key was successfully inserted into the block chain so dont want to insert it again
781 // Remove the aborts and commit parts that were sent from the pending to send queue
782 uint size = pendingSendArbitrationRounds->size();
784 for (uint i = 0; i < size; i++) {
785 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
786 round->removeParts(pendingSendArbitrationEntriesToDelete);
788 if (!round->isDoneSending()) {
789 // Sent all the parts
790 pendingSendArbitrationRounds->set(oldcount++,
791 pendingSendArbitrationRounds->get(i));
794 pendingSendArbitrationRounds->setSize(oldcount);
796 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
797 while (trit->hasNext()) {
798 Transaction *transaction = trit->next();
799 transaction->resetServerFailure();
801 // Update which transactions parts still need to be sent
802 transaction->removeSentParts(transactionPartsSent->get(transaction));
804 // Add the transaction status to the outstanding list
805 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
807 // Update the transaction status
808 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
810 // Check if all the transaction parts were successfully sent and if so then remove it from pending
811 if (transaction->didSendAllParts()) {
812 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
813 pendingTransactionQueue->remove(transaction);
818 // Reset which transaction to send
819 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
820 while (trit->hasNext()) {
821 Transaction *transaction = trit->next();
822 transaction->resetNextPartToSend();
824 // Set the transaction sequence number back to nothing
825 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
826 transaction->setSequenceNumber(-1);
832 // Clear the sent data in preparation for next send
833 pendingSendArbitrationEntriesToDelete->clear();
834 transactionPartsSent->clear();
836 if (sendSlotsReturn.getThird()->length() != 0) {
837 // insert into the local block chain
838 validateAndUpdate(sendSlotsReturn.getThird(), true);
842 } catch (ServerException *e) {
843 if (e->getType() != ServerException_TypeInputTimeout) {
844 // Nothing was able to be sent to the server so just clear these data structures
845 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
846 while (trit->hasNext()) {
847 Transaction *transaction = trit->next();
848 transaction->resetNextPartToSend();
850 // Set the transaction sequence number back to nothing
851 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
852 transaction->setSequenceNumber(-1);
857 // There was a partial send to the server
858 hadPartialSendToServer = true;
860 // Nothing was able to be sent to the server so just clear these data structures
861 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
862 while (trit->hasNext()) {
863 Transaction *transaction = trit->next();
864 transaction->resetNextPartToSend();
865 transaction->setServerFailure();
870 pendingSendArbitrationEntriesToDelete->clear();
871 transactionPartsSent->clear();
876 return newKey == NULL;
879 bool Table::updateFromLocal(int64_t machineId) {
880 if (!localCommunicationTable->contains(machineId))
883 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
885 // Get the size of the send data
886 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
888 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
889 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
890 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
893 Array<char> *sendData = new Array<char>(sendDataSize);
894 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
897 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
901 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
902 localSequenceNumber++;
904 if (returnData == NULL) {
905 // Could not contact server
910 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
911 int numberOfEntries = bbDecode->getInt();
913 for (int i = 0; i < numberOfEntries; i++) {
914 char type = bbDecode->get();
915 if (type == TypeAbort) {
916 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
918 } else if (type == TypeCommitPart) {
919 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
920 processEntry(commitPart);
924 updateLiveStateFromLocal();
929 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
931 // Get the devices local communications
932 if (!localCommunicationTable->contains(transaction->getArbitrator()))
933 return Pair<bool, bool>(true, false);
935 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
937 // Get the size of the send data
938 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
940 Vector<TransactionPart *> *tParts = transaction->getParts();
941 uint tPartsSize = tParts->size();
942 for (uint i = 0; i < tPartsSize; i++) {
943 TransactionPart *part = tParts->get(i);
944 sendDataSize += part->getSize();
948 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
949 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
950 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
953 // Make the send data size
954 Array<char> *sendData = new Array<char>(sendDataSize);
955 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
958 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
959 bbEncode->putInt(transaction->getParts()->size());
961 Vector<TransactionPart *> *tParts = transaction->getParts();
962 uint tPartsSize = tParts->size();
963 for (uint i = 0; i < tPartsSize; i++) {
964 TransactionPart *part = tParts->get(i);
965 part->encode(bbEncode);
970 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
971 localSequenceNumber++;
973 if (returnData == NULL) {
974 // Could not contact server
975 return Pair<bool, bool>(true, false);
979 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
980 bool didCommit = bbDecode->get() == 1;
981 bool couldArbitrate = bbDecode->get() == 1;
982 int numberOfEntries = bbDecode->getInt();
983 bool foundAbort = false;
985 for (int i = 0; i < numberOfEntries; i++) {
986 char type = bbDecode->get();
987 if (type == TypeAbort) {
988 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
990 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
995 } else if (type == TypeCommitPart) {
996 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
997 processEntry(commitPart);
1001 updateLiveStateFromLocal();
1003 if (couldArbitrate) {
1004 TransactionStatus *status = transaction->getTransactionStatus();
1006 status->setStatus(TransactionStatus_StatusCommitted);
1008 status->setStatus(TransactionStatus_StatusAborted);
1011 TransactionStatus *status = transaction->getTransactionStatus();
1013 status->setStatus(TransactionStatus_StatusAborted);
1015 status->setStatus(TransactionStatus_StatusCommitted);
1019 return Pair<bool, bool>(false, true);
1022 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1024 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1025 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1026 int numberOfParts = bbDecode->getInt();
1028 // If we did commit a transaction or not
1029 bool didCommit = false;
1030 bool couldArbitrate = false;
1032 if (numberOfParts != 0) {
1034 // decode the transaction
1035 Transaction *transaction = new Transaction();
1036 for (int i = 0; i < numberOfParts; i++) {
1038 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1039 transaction->addPartDecode(newPart);
1042 // Arbitrate on transaction and pull relevant return data
1043 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1044 couldArbitrate = localArbitrateReturn.getFirst();
1045 didCommit = localArbitrateReturn.getSecond();
1047 updateLiveStateFromLocal();
1049 // Transaction was sent to the server so keep track of it to prevent double commit
1050 if (transaction->getSequenceNumber() != -1) {
1051 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1055 // The data to send back
1056 int returnDataSize = 0;
1057 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1059 // Get the aborts to send back
1060 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1062 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1063 while (abortit->hasNext())
1064 abortLocalSequenceNumbers->add(abortit->next());
1068 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1070 uint asize = abortLocalSequenceNumbers->size();
1071 for (uint i = 0; i < asize; i++) {
1072 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1073 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1077 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1078 unseenArbitrations->add(abort);
1079 returnDataSize += abort->getSize();
1082 // Get the commits to send back
1083 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1084 if (commitForClientTable != NULL) {
1085 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1087 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1088 while (commitit->hasNext())
1089 commitLocalSequenceNumbers->add(commitit->next());
1092 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1094 uint clsSize = commitLocalSequenceNumbers->size();
1095 for (uint clsi = 0; clsi < clsSize; clsi++) {
1096 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1097 Commit *commit = commitForClientTable->get(localSequenceNumber);
1099 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1104 Vector<CommitPart *> *parts = commit->getParts();
1105 uint nParts = parts->size();
1106 for (uint i = 0; i < nParts; i++) {
1107 CommitPart *commitPart = parts->get(i);
1108 unseenArbitrations->add(commitPart);
1109 returnDataSize += commitPart->getSize();
1115 // Number of arbitration entries to decode
1116 returnDataSize += 2 * sizeof(int32_t);
1118 // bool of did commit or not
1119 if (numberOfParts != 0) {
1120 returnDataSize += sizeof(char);
1123 // Data to send Back
1124 Array<char> *returnData = new Array<char>(returnDataSize);
1125 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1127 if (numberOfParts != 0) {
1129 bbEncode->put((char)1);
1131 bbEncode->put((char)0);
1133 if (couldArbitrate) {
1134 bbEncode->put((char)1);
1136 bbEncode->put((char)0);
1140 bbEncode->putInt(unseenArbitrations->size());
1141 uint size = unseenArbitrations->size();
1142 for (uint i = 0; i < size; i++) {
1143 Entry *entry = unseenArbitrations->get(i);
1144 entry->encode(bbEncode);
1147 localSequenceNumber++;
1151 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1152 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1153 attemptedToSendToServer = true;
1155 bool inserted = false;
1156 bool lastTryInserted = false;
1158 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1159 if (array == NULL) {
1160 array = new Array<Slot *>();
1161 array->set(0, slot);
1162 rejectedSlotVector->clear();
1165 if (array->length() == 0) {
1166 throw new Error("Server Error: Did not send any slots");
1169 // if (attemptedToSendToServerTmp) {
1170 if (hadPartialSendToServer) {
1172 bool isInserted = false;
1173 uint size = array->length();
1174 for (uint i = 0; i < size; i++) {
1175 Slot *s = array->get(i);
1176 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1182 for (uint i = 0; i < size; i++) {
1183 Slot *s = array->get(i);
1188 // Process each entry in the slot
1189 Vector<Entry *> *entries = s->getEntries();
1190 uint eSize = entries->size();
1191 for (uint ei = 0; ei < eSize; ei++) {
1192 Entry *entry = entries->get(ei);
1194 if (entry->getType() == TypeLastMessage) {
1195 LastMessage *lastMessage = (LastMessage *)entry;
1197 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1206 rejectedSlotVector->add(slot->getSequenceNumber());
1207 lastTryInserted = false;
1209 lastTryInserted = true;
1212 rejectedSlotVector->add(slot->getSequenceNumber());
1213 lastTryInserted = false;
1217 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1221 * Returns false if a resize was needed
1223 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1225 if (liveSlotCount > bufferResizeThreshold) {
1226 resize = true;//Resize is forced
1230 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1231 TableStatus *status = new TableStatus(slot, newSize);
1232 slot->addEntry(status);
1235 // Fill with rejected slots first before doing anything else
1236 doRejectedMessages(slot);
1238 // Do mandatory rescue of entries
1239 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1241 // Extract working variables
1242 bool needsResize = mandatoryRescueReturn.getFirst();
1243 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1244 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1246 if (needsResize && !resize) {
1247 // We need to resize but we are not resizing so return false
1248 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1251 bool inserted = false;
1252 if (newKeyEntry != NULL) {
1253 newKeyEntry->setSlot(slot);
1254 if (slot->hasSpace(newKeyEntry)) {
1255 slot->addEntry(newKeyEntry);
1260 // Clear the transactions, aborts and commits that were sent previously
1261 transactionPartsSent->clear();
1262 pendingSendArbitrationEntriesToDelete->clear();
1263 uint size = pendingSendArbitrationRounds->size();
1264 for (uint i = 0; i < size; i++) {
1265 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1266 bool isFull = false;
1267 round->generateParts();
1268 Vector<Entry *> *parts = round->getParts();
1270 // Insert pending arbitration data
1271 uint vsize = parts->size();
1272 for (uint vi = 0; vi < vsize; vi++) {
1273 Entry *arbitrationData = parts->get(vi);
1275 // If it is an abort then we need to set some information
1276 if (arbitrationData->getType() == TypeAbort) {
1277 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1280 if (!slot->hasSpace(arbitrationData)) {
1281 // No space so cant do anything else with these data entries
1286 // Add to this current slot and add it to entries to delete
1287 slot->addEntry(arbitrationData);
1288 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1296 if (pendingTransactionQueue->size() > 0) {
1297 Transaction *transaction = pendingTransactionQueue->get(0);
1298 // Set the transaction sequence number if it has yet to be inserted into the block chain
1299 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1300 transaction->setSequenceNumber(slot->getSequenceNumber());
1304 TransactionPart *part = transaction->getNextPartToSend();
1306 // Ran out of parts to send for this transaction so move on
1310 if (slot->hasSpace(part)) {
1311 slot->addEntry(part);
1312 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1313 if (partsSent == NULL) {
1314 partsSent = new Vector<int32_t>();
1315 transactionPartsSent->put(transaction, partsSent);
1317 partsSent->add(part->getPartNumber());
1318 transactionPartsSent->put(transaction, partsSent);
1325 // Fill the remainder of the slot with rescue data
1326 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1328 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1331 void Table::doRejectedMessages(Slot *s) {
1332 if (!rejectedSlotVector->isEmpty()) {
1333 /* TODO: We should avoid generating a rejected message entry if
1334 * there is already a sufficient entry in the queue (e->g->,
1335 * equalsto value of true and same sequence number)-> */
1337 int64_t old_seqn = rejectedSlotVector->get(0);
1338 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1339 int64_t new_seqn = rejectedSlotVector->lastElement();
1340 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1343 int64_t prev_seqn = -1;
1345 /* Go through list of missing messages */
1346 for (; i < rejectedSlotVector->size(); i++) {
1347 int64_t curr_seqn = rejectedSlotVector->get(i);
1348 Slot *s_msg = buffer->getSlot(curr_seqn);
1351 prev_seqn = curr_seqn;
1353 /* Generate rejected message entry for missing messages */
1354 if (prev_seqn != -1) {
1355 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1358 /* Generate rejected message entries for present messages */
1359 for (; i < rejectedSlotVector->size(); i++) {
1360 int64_t curr_seqn = rejectedSlotVector->get(i);
1361 Slot *s_msg = buffer->getSlot(curr_seqn);
1362 int64_t machineid = s_msg->getMachineID();
1363 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1370 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1371 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1372 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1373 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1374 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1377 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1378 bool seenLiveSlot = false;
1379 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1380 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1384 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1385 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1386 // Push slot number forward
1387 if (!seenLiveSlot) {
1388 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1391 if (!previousSlot->isLive()) {
1395 // We have seen a live slot
1396 seenLiveSlot = true;
1398 // Get all the live entries for a slot
1399 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1401 // Iterate over all the live entries and try to rescue them
1402 uint lESize = liveEntries->size();
1403 for (uint i = 0; i < lESize; i++) {
1404 Entry *liveEntry = liveEntries->get(i);
1405 if (slot->hasSpace(liveEntry)) {
1406 // Enough space to rescue the entry
1407 slot->addEntry(liveEntry);
1408 } else if (currentSequenceNumber == firstIfFull) {
1409 //if there's no space but the entry is about to fall off the queue
1410 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1416 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1419 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1420 /* now go through live entries from least to greatest sequence number until
1421 * either all live slots added, or the slot doesn't have enough room
1422 * for SKIP_THRESHOLD consecutive entries*/
1424 int64_t newestseqnum = buffer->getNewestSeqNum();
1425 for (; seqn <= newestseqnum; seqn++) {
1426 Slot *prevslot = buffer->getSlot(seqn);
1427 //Push slot number forward
1429 oldestLiveSlotSequenceNumver = seqn;
1431 if (!prevslot->isLive())
1433 seenliveslot = true;
1434 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1435 uint lESize = liveentries->size();
1436 for (uint i = 0; i < lESize; i++) {
1437 Entry *liveentry = liveentries->get(i);
1438 if (s->hasSpace(liveentry))
1439 s->addEntry(liveentry);
1442 if (skipcount > Table_SKIP_THRESHOLD)
1452 * Checks for malicious activity and updates the local copy of the block chain->
1454 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1455 // The cloud communication layer has checked slot HMACs already
1457 if (newSlots->length() == 0) {
1461 // Make sure all slots are newer than the last largest slot this
1463 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1464 if (firstSeqNum <= sequenceNumber) {
1465 throw new Error("Server Error: Sent older slots!");
1468 // Create an object that can access both new slots and slots in our
1469 // local chain without committing slots to our local chain
1470 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1472 // Check that the HMAC chain is not broken
1473 checkHMACChain(indexer, newSlots);
1475 // Set to keep track of messages from clients
1476 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1478 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1479 while (lmit->hasNext())
1480 machineSet->add(lmit->next());
1484 // Process each slots data
1486 uint numSlots = newSlots->length();
1487 for (uint i = 0; i < numSlots; i++) {
1488 Slot *slot = newSlots->get(i);
1489 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1490 updateExpectedSize();
1494 // If there is a gap, check to see if the server sent us
1496 if (firstSeqNum != (sequenceNumber + 1)) {
1498 // Check the size of the slots that were sent down by the server->
1499 // Can only check the size if there was a gap
1500 checkNumSlots(newSlots->length());
1502 // Since there was a gap every machine must have pushed a slot or
1503 // must have a last message message-> If not then the server is
1505 if (!machineSet->isEmpty()) {
1506 throw new Error("Missing record for machines: ");
1510 // Update the size of our local block chain->
1513 // Commit new to slots to the local block chain->
1515 uint numSlots = newSlots->length();
1516 for (uint i = 0; i < numSlots; i++) {
1517 Slot *slot = newSlots->get(i);
1519 // Insert this slot into our local block chain copy->
1520 buffer->putSlot(slot);
1522 // Keep track of how many slots are currently live (have live data
1527 // Get the sequence number of the latest slot in the system
1528 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1529 updateLiveStateFromServer();
1531 // No Need to remember after we pulled from the server
1532 offlineTransactionsCommittedAndAtServer->clear();
1534 // This is invalidated now
1535 hadPartialSendToServer = false;
1538 void Table::updateLiveStateFromServer() {
1539 // Process the new transaction parts
1540 processNewTransactionParts();
1542 // Do arbitration on new transactions that were received
1543 arbitrateFromServer();
1545 // Update all the committed keys
1546 bool didCommitOrSpeculate = updateCommittedTable();
1548 // Delete the transactions that are now dead
1549 updateLiveTransactionsAndStatus();
1552 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1553 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1556 void Table::updateLiveStateFromLocal() {
1557 // Update all the committed keys
1558 bool didCommitOrSpeculate = updateCommittedTable();
1560 // Delete the transactions that are now dead
1561 updateLiveTransactionsAndStatus();
1564 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1565 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1568 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1569 int64_t prevslots = firstSequenceNumber;
1571 if (didFindTableStatus) {
1573 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1576 didFindTableStatus = true;
1577 currMaxSize = numberOfSlots;
1580 void Table::updateExpectedSize() {
1583 if (expectedsize > currMaxSize) {
1584 expectedsize = currMaxSize;
1590 * Check the size of the block chain to make sure there are enough
1591 * slots sent back by the server-> This is only called when we have a
1592 * gap between the slots that we have locally and the slots sent by
1593 * the server therefore in the slots sent by the server there will be
1594 * at least 1 Table status message
1596 void Table::checkNumSlots(int numberOfSlots) {
1597 if (numberOfSlots != expectedsize) {
1598 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1603 * Update the size of of the local buffer if it is needed->
1605 void Table::commitNewMaxSize() {
1606 didFindTableStatus = false;
1608 // Resize the local slot buffer
1609 if (numberOfSlots != currMaxSize) {
1610 buffer->resize((int32_t)currMaxSize);
1613 // Change the number of local slots to the new size
1614 numberOfSlots = (int32_t)currMaxSize;
1616 // Recalculate the resize threshold since the size of the local
1617 // buffer has changed
1618 setResizeThreshold();
1622 * Process the new transaction parts from this latest round of slots
1623 * received from the server
1625 void Table::processNewTransactionParts() {
1627 if (newTransactionParts->size() == 0) {
1628 // Nothing new to process
1632 // Iterate through all the machine Ids that we received new parts
1634 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1635 while (tpit->hasNext()) {
1636 int64_t machineId = tpit->next();
1637 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1639 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1640 // Iterate through all the parts for that machine Id
1641 while (ptit->hasNext()) {
1642 Pair<int64_t, int32_t> *partId = ptit->next();
1643 TransactionPart *part = parts->get(partId);
1645 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1646 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1647 if (lastTransactionNumber >= part->getSequenceNumber()) {
1648 // Set dead the transaction part
1654 // Get the transaction object for that sequence number
1655 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1657 if (transaction == NULL) {
1658 // This is a new transaction that we dont have so make a new one
1659 transaction = new Transaction();
1661 // Insert this new transaction into the live tables
1662 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1663 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1666 // Add that part to the transaction
1667 transaction->addPartDecode(part);
1672 // Clear all the new transaction parts in preparation for the next
1673 // time the server sends slots
1674 newTransactionParts->clear();
1677 void Table::arbitrateFromServer() {
1679 if (liveTransactionBySequenceNumberTable->size() == 0) {
1680 // Nothing to arbitrate on so move on
1684 // Get the transaction sequence numbers and sort from oldest to newest
1685 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1687 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1688 while (trit->hasNext())
1689 transactionSequenceNumbers->add(trit->next());
1692 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1694 // Collection of key value pairs that are
1695 Hashtable<IoTString *, KeyValue *> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1697 // The last transaction arbitrated on
1698 int64_t lastTransactionCommitted = -1;
1699 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1700 uint tsnSize = transactionSequenceNumbers->size();
1701 for (uint i = 0; i < tsnSize; i++) {
1702 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1703 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1705 // Check if this machine arbitrates for this transaction if not
1706 // then we cant arbitrate this transaction
1707 if (transaction->getArbitrator() != localMachineId) {
1711 if (transactionSequenceNumber < lastSeqNumArbOn) {
1715 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1716 // We have seen this already locally so dont commit again
1721 if (!transaction->isComplete()) {
1722 // Will arbitrate in incorrect order if we continue so just break
1728 // update the largest transaction seen by arbitrator from server
1729 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1730 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1732 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1733 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1734 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1738 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1739 // Guard evaluated as true
1741 // Update the local changes so we can make the commit
1742 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1743 while (kvit->hasNext()) {
1744 KeyValue *kv = kvit->next();
1745 speculativeTableTmp->put(kv->getKey(), kv);
1749 // Update what the last transaction committed was for use in batch commit
1750 lastTransactionCommitted = transactionSequenceNumber;
1752 // Guard evaluated was false so create abort
1754 Abort *newAbort = new Abort(NULL,
1755 transaction->getClientLocalSequenceNumber(),
1756 transaction->getSequenceNumber(),
1757 transaction->getMachineId(),
1758 transaction->getArbitrator(),
1759 localArbitrationSequenceNumber);
1760 localArbitrationSequenceNumber++;
1761 generatedAborts->add(newAbort);
1763 // Insert the abort so we can process
1764 processEntry(newAbort);
1767 lastSeqNumArbOn = transactionSequenceNumber;
1770 Commit *newCommit = NULL;
1772 // If there is something to commit
1773 if (speculativeTableTmp->size() != 0) {
1774 // Create the commit and increment the commit sequence number
1775 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1776 localArbitrationSequenceNumber++;
1778 // Add all the new keys to the commit
1779 SetIterator<IoTString *, KeyValue *> *spit = getKeyIterator(speculativeTableTmp);
1780 while (spit->hasNext()) {
1781 IoTString *string = spit->next();
1782 KeyValue *kv = speculativeTableTmp->get(string);
1783 newCommit->addKV(kv);
1787 // create the commit parts
1788 newCommit->createCommitParts();
1790 // Append all the commit parts to the end of the pending queue
1791 // waiting for sending to the server
1792 // Insert the commit so we can process it
1793 Vector<CommitPart *> *parts = newCommit->getParts();
1794 uint partsSize = parts->size();
1795 for (uint i = 0; i < partsSize; i++) {
1796 CommitPart *commitPart = parts->get(i);
1797 processEntry(commitPart);
1801 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1802 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1803 pendingSendArbitrationRounds->add(arbitrationRound);
1805 if (compactArbitrationData()) {
1806 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1807 if (newArbitrationRound->getCommit() != NULL) {
1808 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1809 uint partsSize = parts->size();
1810 for (uint i = 0; i < partsSize; i++) {
1811 CommitPart *commitPart = parts->get(i);
1812 processEntry(commitPart);
1819 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1821 // Check if this machine arbitrates for this transaction if not then
1822 // we cant arbitrate this transaction
1823 if (transaction->getArbitrator() != localMachineId) {
1824 return Pair<bool, bool>(false, false);
1827 if (!transaction->isComplete()) {
1828 // Will arbitrate in incorrect order if we continue so just break
1830 return Pair<bool, bool>(false, false);
1833 if (transaction->getMachineId() != localMachineId) {
1834 // dont do this check for local transactions
1835 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1836 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1837 // We've have already seen this from the server
1838 return Pair<bool, bool>(false, false);
1843 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1844 // Guard evaluated as true Create the commit and increment the
1845 // commit sequence number
1846 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1847 localArbitrationSequenceNumber++;
1849 // Update the local changes so we can make the commit
1850 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1851 while (kvit->hasNext()) {
1852 KeyValue *kv = kvit->next();
1853 newCommit->addKV(kv);
1857 // create the commit parts
1858 newCommit->createCommitParts();
1860 // Append all the commit parts to the end of the pending queue
1861 // waiting for sending to the server
1862 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1863 pendingSendArbitrationRounds->add(arbitrationRound);
1865 if (compactArbitrationData()) {
1866 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1867 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1868 uint partsSize = parts->size();
1869 for (uint i = 0; i < partsSize; i++) {
1870 CommitPart *commitPart = parts->get(i);
1871 processEntry(commitPart);
1874 // Insert the commit so we can process it
1875 Vector<CommitPart *> *parts = newCommit->getParts();
1876 uint partsSize = parts->size();
1877 for (uint i = 0; i < partsSize; i++) {
1878 CommitPart *commitPart = parts->get(i);
1879 processEntry(commitPart);
1883 if (transaction->getMachineId() == localMachineId) {
1884 TransactionStatus *status = transaction->getTransactionStatus();
1885 if (status != NULL) {
1886 status->setStatus(TransactionStatus_StatusCommitted);
1890 updateLiveStateFromLocal();
1891 return Pair<bool, bool>(true, true);
1893 if (transaction->getMachineId() == localMachineId) {
1894 // For locally created messages update the status
1895 // Guard evaluated was false so create abort
1896 TransactionStatus *status = transaction->getTransactionStatus();
1897 if (status != NULL) {
1898 status->setStatus(TransactionStatus_StatusAborted);
1901 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1904 Abort *newAbort = new Abort(NULL,
1905 transaction->getClientLocalSequenceNumber(),
1907 transaction->getMachineId(),
1908 transaction->getArbitrator(),
1909 localArbitrationSequenceNumber);
1910 localArbitrationSequenceNumber++;
1911 addAbortSet->add(newAbort);
1913 // Append all the commit parts to the end of the pending queue
1914 // waiting for sending to the server
1915 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1916 pendingSendArbitrationRounds->add(arbitrationRound);
1918 if (compactArbitrationData()) {
1919 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1921 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1922 uint partsSize = parts->size();
1923 for (uint i = 0; i < partsSize; i++) {
1924 CommitPart *commitPart = parts->get(i);
1925 processEntry(commitPart);
1930 updateLiveStateFromLocal();
1931 return Pair<bool, bool>(true, false);
1936 * Compacts the arbitration data my merging commits and aggregating
1937 * aborts so that a single large push of commits can be done instead
1938 * of many small updates
1940 bool Table::compactArbitrationData() {
1941 if (pendingSendArbitrationRounds->size() < 2) {
1942 // Nothing to compact so do nothing
1946 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1947 if (lastRound->getDidSendPart()) {
1951 bool hadCommit = (lastRound->getCommit() == NULL);
1952 bool gotNewCommit = false;
1954 uint numberToDelete = 1;
1955 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1956 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1958 if (round->isFull() || round->getDidSendPart()) {
1959 // Stop since there is a part that cannot be compacted and we
1960 // need to compact in order
1964 if (round->getCommit() == NULL) {
1965 // Try compacting aborts only
1966 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1967 if (newSize > ArbitrationRound_MAX_PARTS) {
1968 // Cant compact since it would be too large
1971 lastRound->addAborts(round->getAborts());
1973 // Create a new larger commit
1974 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1975 localArbitrationSequenceNumber++;
1977 // Create the commit parts so that we can count them
1978 newCommit->createCommitParts();
1980 // Calculate the new size of the parts
1981 int newSize = newCommit->getNumberOfParts();
1982 newSize += lastRound->getAbortsCount();
1983 newSize += round->getAbortsCount();
1985 if (newSize > ArbitrationRound_MAX_PARTS) {
1986 // Cant compact since it would be too large
1990 // Set the new compacted part
1991 lastRound->setCommit(newCommit);
1992 lastRound->addAborts(round->getAborts());
1993 gotNewCommit = true;
1999 if (numberToDelete != 1) {
2000 // If there is a compaction
2001 // Delete the previous pieces that are now in the new compacted piece
2002 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2003 pendingSendArbitrationRounds->clear();
2005 for (uint i = 0; i < numberToDelete; i++) {
2006 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2010 // Add the new compacted into the pending to send list
2011 pendingSendArbitrationRounds->add(lastRound);
2013 // Should reinsert into the commit processor
2014 if (hadCommit && gotNewCommit) {
2023 * Update all the commits and the committed tables, sets dead the dead
2026 bool Table::updateCommittedTable() {
2028 if (newCommitParts->size() == 0) {
2029 // Nothing new to process
2033 // Iterate through all the machine Ids that we received new parts for
2034 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2035 while (partsit->hasNext()) {
2036 int64_t machineId = partsit->next();
2037 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2039 // Iterate through all the parts for that machine Id
2040 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2041 while (pairit->hasNext()) {
2042 Pair<int64_t, int32_t> *partId = pairit->next();
2043 CommitPart *part = parts->get(partId);
2045 // Get the transaction object for that sequence number
2046 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2048 if (commitForClientTable == NULL) {
2049 // This is the first commit from this device
2050 commitForClientTable = new Hashtable<int64_t, Commit *>();
2051 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2054 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2056 if (commit == NULL) {
2057 // This is a new commit that we dont have so make a new one
2058 commit = new Commit();
2060 // Insert this new commit into the live tables
2061 commitForClientTable->put(part->getSequenceNumber(), commit);
2064 // Add that part to the commit
2065 commit->addPartDecode(part);
2071 // Clear all the new commits parts in preparation for the next time
2072 // the server sends slots
2073 newCommitParts->clear();
2075 // If we process a new commit keep track of it for future use
2076 bool didProcessANewCommit = false;
2078 // Process the commits one by one
2079 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2080 while (liveit->hasNext()) {
2081 int64_t arbitratorId = liveit->next();
2083 // Get all the commits for a specific arbitrator
2084 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2086 // Sort the commits in order
2087 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2089 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2090 while (clientit->hasNext())
2091 commitSequenceNumbers->add(clientit->next());
2095 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2097 // Get the last commit seen from this arbitrator
2098 int64_t lastCommitSeenSequenceNumber = -1;
2099 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2100 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2103 // Go through each new commit one by one
2104 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2105 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2106 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2108 // Special processing if a commit is not complete
2109 if (!commit->isComplete()) {
2110 if (i == (commitSequenceNumbers->size() - 1)) {
2111 // If there is an incomplete commit and this commit is the
2112 // latest one seen then this commit cannot be processed and
2113 // there are no other commits
2116 // This is a commit that was already dead but parts of it
2117 // are still in the block chain (not flushed out yet)->
2118 // Delete it and move on
2120 commitForClientTable->remove(commit->getSequenceNumber());
2125 // Update the last transaction that was updated if we can
2126 if (commit->getTransactionSequenceNumber() != -1) {
2127 // Update the last transaction sequence number that the arbitrator arbitrated on1
2128 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2129 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2133 // Update the last arbitration data that we have seen so far
2134 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2135 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2136 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2138 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2141 // Never seen any data from this arbitrator so record the first one
2142 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2145 // We have already seen this commit before so need to do the
2146 // full processing on this commit
2147 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2149 // Update the last transaction that was updated if we can
2150 if (commit->getTransactionSequenceNumber() != -1) {
2151 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2152 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2153 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2154 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2161 // If we got here then this is a brand new commit and needs full
2163 // Get what commits should be edited, these are the commits that
2164 // have live values for their keys
2165 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2167 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2168 while (kvit->hasNext()) {
2169 KeyValue *kv = kvit->next();
2170 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2172 commitsToEdit->add(commit);
2177 // Update each previous commit that needs to be updated
2178 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2179 while (commitit->hasNext()) {
2180 Commit *previousCommit = commitit->next();
2182 // Only bother with live commits (TODO: Maybe remove this check)
2183 if (previousCommit->isLive()) {
2185 // Update which keys in the old commits are still live
2187 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2188 while (kvit->hasNext()) {
2189 KeyValue *kv = kvit->next();
2190 previousCommit->invalidateKey(kv->getKey());
2195 // if the commit is now dead then remove it
2196 if (!previousCommit->isLive()) {
2197 commitForClientTable->remove(previousCommit->getSequenceNumber());
2203 // Update the last seen sequence number from this arbitrator
2204 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2205 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2206 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2209 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2212 // We processed a new commit that we havent seen before
2213 didProcessANewCommit = true;
2215 // Update the committed table of keys and which commit is using which key
2217 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2218 while (kvit->hasNext()) {
2219 KeyValue *kv = kvit->next();
2220 committedKeyValueTable->put(kv->getKey(), kv);
2221 liveCommitsByKeyTable->put(kv->getKey(), commit);
2229 return didProcessANewCommit;
2233 * Create the speculative table from transactions that are still live
2234 * and have come from the cloud
2236 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2237 if (liveTransactionBySequenceNumberTable->size() == 0) {
2238 // There is nothing to speculate on
2242 // Create a list of the transaction sequence numbers and sort them
2243 // from oldest to newest
2244 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2246 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2247 while (trit->hasNext())
2248 transactionSequenceNumbersSorted->add(trit->next());
2252 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2254 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2257 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2258 // If there is a gap in the transaction sequence numbers then
2259 // there was a commit or an abort of a transaction OR there was a
2260 // new commit (Could be from offline commit) so a redo the
2261 // speculation from scratch
2263 // Start from scratch
2264 speculatedKeyValueTable->clear();
2265 lastTransactionSequenceNumberSpeculatedOn = -1;
2266 oldestTransactionSequenceNumberSpeculatedOn = -1;
2269 // Remember the front of the transaction list
2270 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2272 // Find where to start arbitration from
2273 uint startIndex = 0;
2275 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2276 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2280 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2281 // Make sure we are not out of bounds
2282 return false; // did not speculate
2285 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2286 bool didSkip = true;
2288 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2289 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2290 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2292 if (!transaction->isComplete()) {
2293 // If there is an incomplete transaction then there is nothing
2294 // we can do add this transactions arbitrator to the list of
2295 // arbitrators we should ignore
2296 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2301 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2305 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2307 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2308 // Guard evaluated to true so update the speculative table
2310 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2311 while (kvit->hasNext()) {
2312 KeyValue *kv = kvit->next();
2313 speculatedKeyValueTable->put(kv->getKey(), kv);
2321 // Since there was a skip we need to redo the speculation next time around
2322 lastTransactionSequenceNumberSpeculatedOn = -1;
2323 oldestTransactionSequenceNumberSpeculatedOn = -1;
2326 // We did some speculation
2331 * Create the pending transaction speculative table from transactions
2332 * that are still in the pending transaction buffer
2334 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2335 if (pendingTransactionQueue->size() == 0) {
2336 // There is nothing to speculate on
2340 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2341 // need to reset on the pending speculation
2342 lastPendingTransactionSpeculatedOn = NULL;
2343 firstPendingTransaction = pendingTransactionQueue->get(0);
2344 pendingTransactionSpeculatedKeyValueTable->clear();
2347 // Find where to start arbitration from
2348 uint startIndex = 0;
2350 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2351 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2354 if (startIndex >= pendingTransactionQueue->size()) {
2355 // Make sure we are not out of bounds
2359 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2360 Transaction *transaction = pendingTransactionQueue->get(i);
2362 lastPendingTransactionSpeculatedOn = transaction;
2364 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2365 // Guard evaluated to true so update the speculative table
2366 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2367 while (kvit->hasNext()) {
2368 KeyValue *kv = kvit->next();
2369 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2377 * Set dead and remove from the live transaction tables the
2378 * transactions that are dead
2380 void Table::updateLiveTransactionsAndStatus() {
2381 // Go through each of the transactions
2383 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2384 while (iter->hasNext()) {
2385 int64_t key = iter->next();
2386 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2388 // Check if the transaction is dead
2389 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2390 // Set dead the transaction
2391 transaction->setDead();
2393 // Remove the transaction from the live table
2395 liveTransactionByTransactionIdTable->remove(transaction->getId());
2401 // Go through each of the transactions
2403 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2404 while (iter->hasNext()) {
2405 int64_t key = iter->next();
2406 TransactionStatus *status = outstandingTransactionStatus->get(key);
2408 // Check if the transaction is dead
2409 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2412 status->setStatus(TransactionStatus_StatusCommitted);
2423 * Process this slot, entry by entry-> Also update the latest message sent by slot
2425 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2427 // Update the last message seen
2428 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2430 // Process each entry in the slot
2431 Vector<Entry *> *entries = slot->getEntries();
2432 uint eSize = entries->size();
2433 for (uint ei = 0; ei < eSize; ei++) {
2434 Entry *entry = entries->get(ei);
2435 switch (entry->getType()) {
2436 case TypeCommitPart:
2437 processEntry((CommitPart *)entry);
2440 processEntry((Abort *)entry);
2442 case TypeTransactionPart:
2443 processEntry((TransactionPart *)entry);
2446 processEntry((NewKey *)entry);
2448 case TypeLastMessage:
2449 processEntry((LastMessage *)entry, machineSet);
2451 case TypeRejectedMessage:
2452 processEntry((RejectedMessage *)entry, indexer);
2454 case TypeTableStatus:
2455 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2458 throw new Error("Unrecognized type: ");
2464 * Update the last message that was sent for a machine Id
2466 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2467 // Update what the last message received by a machine was
2468 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2472 * Add the new key to the arbitrators table and update the set of live
2473 * new keys (in case of a rescued new key message)
2475 void Table::processEntry(NewKey *entry) {
2476 // Update the arbitrator table with the new key information
2477 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2479 // Update what the latest live new key is
2480 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2481 if (oldNewKey != NULL) {
2482 // Delete the old new key messages
2483 oldNewKey->setDead();
2488 * Process new table status entries and set dead the old ones as new
2489 * ones come in-> keeps track of the largest and smallest table status
2490 * seen in this current round of updating the local copy of the block
2493 void Table::processEntry(TableStatus *entry, int64_t seq) {
2494 int newNumSlots = entry->getMaxSlots();
2495 updateCurrMaxSize(newNumSlots);
2496 initExpectedSize(seq, newNumSlots);
2498 if (liveTableStatus != NULL) {
2499 // We have a larger table status so the old table status is no
2501 liveTableStatus->setDead();
2504 // Make this new table status the latest alive table status
2505 liveTableStatus = entry;
2509 * Check old messages to see if there is a block chain violation->
2512 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2513 int64_t oldSeqNum = entry->getOldSeqNum();
2514 int64_t newSeqNum = entry->getNewSeqNum();
2515 bool isequal = entry->getEqual();
2516 int64_t machineId = entry->getMachineID();
2517 int64_t seq = entry->getSequenceNumber();
2519 // Check if we have messages that were supposed to be rejected in
2520 // our local block chain
2521 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2523 Slot *slot = indexer->getSlot(seqNum);
2526 // If we have this slot make sure that it was not supposed to be
2528 int64_t slotMachineId = slot->getMachineID();
2529 if (isequal != (slotMachineId == machineId)) {
2530 throw new Error("Server Error: Trying to insert rejected message for slot ");
2535 // Create a list of clients to watch until they see this rejected
2537 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2538 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2539 while (iter->hasNext()) {
2540 // Machine ID for the last message entry
2541 int64_t lastMessageEntryMachineId = iter->next();
2543 // We've seen it, don't need to continue to watch-> Our next
2544 // message will implicitly acknowledge it->
2545 if (lastMessageEntryMachineId == localMachineId) {
2549 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2550 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2552 if (entrySequenceNumber < seq) {
2553 // Add this rejected message to the set of messages that this
2554 // machine ID did not see yet
2555 addWatchVector(lastMessageEntryMachineId, entry);
2556 // This client did not see this rejected message yet so add it
2557 // to the watch set to monitor
2558 deviceWatchSet->add(lastMessageEntryMachineId);
2563 if (deviceWatchSet->isEmpty()) {
2564 // This rejected message has been seen by all the clients so
2567 // We need to watch this rejected message
2568 entry->setWatchSet(deviceWatchSet);
2573 * Check if this abort is live, if not then save it so we can kill it
2574 * later-> update the last transaction number that was arbitrated on->
2576 void Table::processEntry(Abort *entry) {
2577 if (entry->getTransactionSequenceNumber() != -1) {
2578 // update the transaction status if it was sent to the server
2579 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2580 if (status != NULL) {
2581 status->setStatus(TransactionStatus_StatusAborted);
2585 // Abort has not been seen by the client it is for yet so we need to
2588 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2589 if (previouslySeenAbort != NULL) {
2590 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2593 if (entry->getTransactionArbitrator() == localMachineId) {
2594 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2597 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2598 // The machine already saw this so it is dead
2600 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2601 liveAbortTable->remove(&abortid);
2603 if (entry->getTransactionArbitrator() == localMachineId) {
2604 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2609 // Update the last arbitration data that we have seen so far
2610 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2611 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2612 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2614 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2617 // Never seen any data from this arbitrator so record the first one
2618 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2621 // Set dead a transaction if we can
2622 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2624 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2625 if (transactionToSetDead != NULL) {
2626 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2629 // Update the last transaction sequence number that the arbitrator
2631 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2632 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2634 if (entry->getTransactionSequenceNumber() != -1) {
2635 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2641 * Set dead the transaction part if that transaction is dead and keep
2642 * track of all new parts
2644 void Table::processEntry(TransactionPart *entry) {
2645 // Check if we have already seen this transaction and set it dead OR
2646 // if it is not alive
2647 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2648 // This transaction is dead, it was already committed or aborted
2653 // This part is still alive
2654 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2656 if (transactionPart == NULL) {
2657 // Dont have a table for this machine Id yet so make one
2658 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2659 newTransactionParts->put(entry->getMachineId(), transactionPart);
2662 // Update the part and set dead ones we have already seen (got a
2664 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2665 if (previouslySeenPart != NULL) {
2666 previouslySeenPart->setDead();
2671 * Process new commit entries and save them for future use-> Delete duplicates
2673 void Table::processEntry(CommitPart *entry) {
2674 // Update the last transaction that was updated if we can
2675 if (entry->getTransactionSequenceNumber() != -1) {
2676 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2677 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2681 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2682 if (commitPart == NULL) {
2683 // Don't have a table for this machine Id yet so make one
2684 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2685 newCommitParts->put(entry->getMachineId(), commitPart);
2687 // Update the part and set dead ones we have already seen (got a
2689 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2690 if (previouslySeenPart != NULL) {
2691 previouslySeenPart->setDead();
2696 * Update the last message seen table-> Update and set dead the
2697 * appropriate RejectedMessages as clients see them-> Updates the live
2698 * aborts, removes those that are dead and sets them dead-> Check that
2699 * the last message seen is correct and that there is no mismatch of
2700 * our own last message or that other clients have not had a rollback
2701 * on the last message->
2703 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2704 // We have seen this machine ID
2705 machineSet->remove(machineId);
2707 // Get the set of rejected messages that this machine Id is has not seen yet
2708 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2709 // If there is a rejected message that this machine Id has not seen yet
2710 if (watchset != NULL) {
2711 // Go through each rejected message that this machine Id has not
2714 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2715 while (rmit->hasNext()) {
2716 RejectedMessage *rm = rmit->next();
2717 // If this machine Id has seen this rejected message->->->
2718 if (rm->getSequenceNumber() <= seqNum) {
2719 // Remove it from our watchlist
2721 // Decrement machines that need to see this notification
2722 rm->removeWatcher(machineId);
2728 // Set dead the abort
2729 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2731 while (abortit->hasNext()) {
2732 Pair<int64_t, int64_t> *key = abortit->next();
2733 Abort *abort = liveAbortTable->get(key);
2734 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2737 if (abort->getTransactionArbitrator() == localMachineId) {
2738 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2743 if (machineId == localMachineId) {
2744 // Our own messages are immediately dead->
2745 char livenessType = liveness->getType();
2746 if (livenessType == TypeLastMessage) {
2747 ((LastMessage *)liveness)->setDead();
2748 } else if (livenessType == TypeSlot) {
2749 ((Slot *)liveness)->setDead();
2751 throw new Error("Unrecognized type");
2754 // Get the old last message for this device
2755 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2756 if (lastMessageEntry == NULL) {
2757 // If no last message then there is nothing else to process
2761 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2762 Liveness *lastEntry = lastMessageEntry->getSecond();
2763 delete lastMessageEntry;
2765 // If it is not our machine Id since we already set ours to dead
2766 if (machineId != localMachineId) {
2767 char lastEntryType = lastEntry->getType();
2769 if (lastEntryType == TypeLastMessage) {
2770 ((LastMessage *)lastEntry)->setDead();
2771 } else if (lastEntryType == TypeSlot) {
2772 ((Slot *)lastEntry)->setDead();
2774 throw new Error("Unrecognized type");
2777 // Make sure the server is not playing any games
2778 if (machineId == localMachineId) {
2779 if (hadPartialSendToServer) {
2780 // We were not making any updates and we had a machine mismatch
2781 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2782 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2785 // We were not making any updates and we had a machine mismatch
2786 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2787 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2791 if (lastMessageSeqNum > seqNum) {
2792 throw new Error("Server Error: Rollback on remote machine sequence number");
2798 * Add a rejected message entry to the watch set to keep track of
2799 * which clients have seen that rejected message entry and which have
2802 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2803 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2804 if (entries == NULL) {
2805 // There is no set for this machine ID yet so create one
2806 entries = new Hashset<RejectedMessage *>();
2807 rejectedMessageWatchVectorTable->put(machineId, entries);
2809 entries->add(entry);
2813 * Check if the HMAC chain is not violated
2815 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2816 for (uint i = 0; i < newSlots->length(); i++) {
2817 Slot *currSlot = newSlots->get(i);
2818 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2819 if (prevSlot != NULL &&
2820 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2821 throw new Error("Server Error: Invalid HMAC Chain");