3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(1),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(1),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localSequenceNumber(0),
112 localTransactionSequenceNumber(1),
113 lastTransactionSequenceNumberSpeculatedOn(0),
114 oldestTransactionSequenceNumberSpeculatedOn(0),
115 localArbitrationSequenceNumber(1),
116 hadPartialSendToServer(false),
117 attemptedToSendToServer(false),
119 didFindTableStatus(false),
121 lastSlotAttemptedToSend(NULL),
124 lastTransactionPartsSent(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
164 delete committedKeyValueTable;
165 delete speculatedKeyValueTable;
166 delete pendingTransactionSpeculatedKeyValueTable;
167 delete liveNewKeyTable;
169 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170 while (lmit->hasNext()) {
171 Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
175 delete lastMessageTable;
177 if (pendingTransactionBuilder != NULL)
178 delete pendingTransactionBuilder;
180 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
181 while(rmit->hasNext()) {
182 int64_t machineid = rmit->next();
183 Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
184 SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185 while (mit->hasNext()) {
186 RejectedMessage * rm = mit->next();
193 delete rejectedMessageWatchVectorTable;
195 delete arbitratorTable;
196 delete liveAbortTable;
198 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199 while (partsit->hasNext()) {
200 int64_t machineId = partsit->next();
201 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
203 while(pit->hasNext()) {
204 Pair<int64_t, int32_t> * pair=pit->next();
205 pit->currVal()->releaseRef();
212 delete newTransactionParts;
215 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
216 while (partsit->hasNext()) {
217 int64_t machineId = partsit->next();
218 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
219 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
220 while(pit->hasNext()) {
221 Pair<int64_t, int32_t> * pair=pit->next();
222 pit->currVal()->releaseRef();
228 delete newCommitParts;
230 delete lastArbitratedTransactionNumberByArbitratorTable;
231 delete liveTransactionBySequenceNumberTable;
232 delete liveTransactionByTransactionIdTable;
234 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
235 while (liveit->hasNext()) {
236 int64_t arbitratorId = liveit->next();
238 // Get all the commits for a specific arbitrator
239 Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
241 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
242 while (clientit->hasNext()) {
243 int64_t id = clientit->next();
244 delete commitForClientTable->get(id);
249 delete commitForClientTable;
252 delete liveCommitsTable;
254 delete liveCommitsByKeyTable;
255 delete lastCommitSeenSequenceNumberByArbitratorTable;
256 delete rejectedSlotVector;
258 uint size = pendingTransactionQueue->size();
259 for (uint iter = 0; iter < size; iter++) {
260 delete pendingTransactionQueue->get(iter);
262 delete pendingTransactionQueue;
264 delete pendingSendArbitrationEntriesToDelete;
266 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
267 while (trit->hasNext()) {
268 Transaction *transaction = trit->next();
269 delete trit->currVal();
272 delete transactionPartsSent;
274 delete outstandingTransactionStatus;
275 delete liveAbortsGeneratedByLocal;
276 delete offlineTransactionsCommittedAndAtServer;
277 delete localCommunicationTable;
278 delete lastTransactionSeenFromMachineFromServer;
280 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
281 delete pendingSendArbitrationRounds->get(i);
283 delete pendingSendArbitrationRounds;
285 if (lastTransactionPartsSent != NULL)
286 delete lastTransactionPartsSent;
287 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
293 * Init all the stuff needed for for table usage
296 // Init helper objects
297 random = new SecureRandom();
298 buffer = new SlotBuffer();
301 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
302 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
303 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
304 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
305 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
306 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
307 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
308 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
309 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
310 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
311 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
312 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
313 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
314 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
315 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
316 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
317 rejectedSlotVector = new Vector<int64_t>();
318 pendingTransactionQueue = new Vector<Transaction *>();
319 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
320 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
321 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
322 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
323 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
324 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
325 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
326 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
327 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
330 numberOfSlots = buffer->capacity();
331 setResizeThreshold();
335 * Initialize the table by inserting a table status as the first entry
336 * into the table status also initialize the crypto stuff.
338 void Table::initTable() {
339 cloud->initSecurity();
341 // Create the first insertion into the block chain which is the table status
342 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
343 localSequenceNumber++;
344 TableStatus *status = new TableStatus(s, numberOfSlots);
345 s->addShallowEntry(status);
346 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
349 array = new Array<Slot *>(1);
351 // update local block chain
352 validateAndUpdate(array, true);
354 } else if (array->length() == 1) {
355 // in case we did push the slot BUT we failed to init it
356 validateAndUpdate(array, true);
362 throw new Error("Error on initialization");
367 * Rebuild the table from scratch by pulling the latest block chain
370 void Table::rebuild() {
371 // Just pull the latest slots from the server
372 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
373 validateAndUpdate(newslots, true);
376 updateLiveTransactionsAndStatus();
379 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
380 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
383 int64_t Table::getArbitrator(IoTString *key) {
384 return arbitratorTable->get(key);
387 void Table::close() {
391 IoTString *Table::getCommitted(IoTString *key) {
392 KeyValue *kv = committedKeyValueTable->get(key);
395 return kv->getValue()->acquireRef();
401 IoTString *Table::getSpeculative(IoTString *key) {
402 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
405 kv = speculatedKeyValueTable->get(key);
409 kv = committedKeyValueTable->get(key);
413 return kv->getValue()->acquireRef();
419 IoTString *Table::getCommittedAtomic(IoTString *key) {
420 KeyValue *kv = committedKeyValueTable->get(key);
422 if (!arbitratorTable->contains(key)) {
423 throw new Error("Key not Found.");
426 // Make sure new key value pair matches the current arbitrator
427 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
428 // TODO: Maybe not throw en error
429 throw new Error("Not all Key Values Match Arbitrator.");
433 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
434 return kv->getValue()->acquireRef();
436 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
441 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
442 if (!arbitratorTable->contains(key)) {
443 throw new Error("Key not Found.");
446 // Make sure new key value pair matches the current arbitrator
447 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
448 // TODO: Maybe not throw en error
449 throw new Error("Not all Key Values Match Arbitrator.");
452 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
455 kv = speculatedKeyValueTable->get(key);
459 kv = committedKeyValueTable->get(key);
463 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
464 return kv->getValue()->acquireRef();
466 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
471 bool Table::update() {
473 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
474 validateAndUpdate(newSlots, false);
477 updateLiveTransactionsAndStatus();
479 } catch (Exception *e) {
480 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
481 while (kit->hasNext()) {
482 int64_t m = kit->next();
491 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
493 if (arbitratorTable->contains(keyName)) {
494 // There is already an arbitrator
497 NewKey *newKey = new NewKey(NULL, keyName, machineId);
499 if (sendToServer(newKey)) {
500 // If successfully inserted
506 void Table::startTransaction() {
507 // Create a new transaction, invalidates any old pending transactions.
508 if (pendingTransactionBuilder != NULL)
509 delete pendingTransactionBuilder;
510 pendingTransactionBuilder = new PendingTransaction(localMachineId);
513 void Table::put(IoTString *key, IoTString *value) {
514 // Make sure it is a valid key
515 if (!arbitratorTable->contains(key)) {
516 throw new Error("Key not Found.");
519 // Make sure new key value pair matches the current arbitrator
520 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
521 // TODO: Maybe not throw en error
522 throw new Error("Not all Key Values Match Arbitrator.");
525 // Add the key value to this transaction
526 KeyValue *kv = new KeyValue(key->acquireRef(), value->acquireRef());
527 pendingTransactionBuilder->addKV(kv);
530 TransactionStatus *Table::commitTransaction() {
531 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
532 // transaction with no updates will have no effect on the system
533 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
536 // Set the local transaction sequence number and increment
537 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
538 localTransactionSequenceNumber++;
540 // Create the transaction status
541 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
543 // Create the new transaction
544 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
545 newTransaction->setTransactionStatus(transactionStatus);
547 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
548 // Add it to the queue and invalidate the builder for safety
549 pendingTransactionQueue->add(newTransaction);
551 arbitrateOnLocalTransaction(newTransaction);
552 delete newTransaction;
553 updateLiveStateFromLocal();
555 if (pendingTransactionBuilder != NULL)
556 delete pendingTransactionBuilder;
558 pendingTransactionBuilder = new PendingTransaction(localMachineId);
562 } catch (ServerException *e) {
564 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
565 uint size = pendingTransactionQueue->size();
567 for (uint iter = 0; iter < size; iter++) {
568 Transaction *transaction = pendingTransactionQueue->get(iter);
569 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
571 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
572 // Already contacted this client so ignore all attempts to contact this client
573 // to preserve ordering for arbitrator
577 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
579 if (sendReturn.getFirst()) {
580 // Failed to contact over local
581 arbitratorTriedAndFailed->add(transaction->getArbitrator());
583 // Successful contact or should not contact
585 if (sendReturn.getSecond()) {
592 pendingTransactionQueue->setSize(oldindex);
595 updateLiveStateFromLocal();
597 return transactionStatus;
601 * Recalculate the new resize threshold
603 void Table::setResizeThreshold() {
604 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
605 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
608 int64_t Table::getLocalSequenceNumber() {
609 return localSequenceNumber;
612 void Table::processTransactionList(bool handlePartial) {
613 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
614 while (trit->hasNext()) {
615 Transaction *transaction = trit->next();
616 transaction->resetServerFailure();
617 // Update which transactions parts still need to be sent
618 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
619 // Add the transaction status to the outstanding list
620 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
622 // Update the transaction status
623 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
625 // Check if all the transaction parts were successfully
626 // sent and if so then remove it from pending
627 if (transaction->didSendAllParts()) {
628 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
629 pendingTransactionQueue->remove(transaction);
631 } else if (handlePartial) {
632 transaction->resetServerFailure();
633 // Set the transaction sequence number back to nothing
634 if (!transaction->didSendAPartToServer()) {
635 transaction->setSequenceNumber(-1);
642 NewKey * Table::handlePartialSend(NewKey * newKey) {
643 //Didn't receive acknowledgement for last send
644 //See if the server has received a newer slot
646 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
647 if (newSlots->length() == 0) {
648 //Retry sending old slot
649 bool wasInserted = false;
650 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
652 if (sendSlotsReturn) {
653 lastSlotAttemptedToSend = NULL;
654 if (newKey != NULL) {
655 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
660 processTransactionList(false);
662 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
663 if (newKey != NULL) {
664 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
669 processTransactionList(true);
673 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
674 while (trit->hasNext()) {
675 Transaction *transaction = trit->next();
676 transaction->resetServerFailure();
677 // Set the transaction sequence number back to nothing
678 if (!transaction->didSendAPartToServer()) {
679 transaction->setSequenceNumber(-1);
684 if (newSlots->length() != 0) {
685 // insert into the local block chain
686 validateAndUpdate(newSlots, true);
689 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
690 if (newKey != NULL) {
691 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
697 processTransactionList(true);
699 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
700 while (trit->hasNext()) {
701 Transaction *transaction = trit->next();
702 transaction->resetServerFailure();
703 // Set the transaction sequence number back to nothing
704 if (!transaction->didSendAPartToServer()) {
705 transaction->setSequenceNumber(-1);
711 // insert into the local block chain
712 validateAndUpdate(newSlots, true);
718 void Table::clearSentParts() {
719 // Clear the sent data since we are trying again
720 pendingSendArbitrationEntriesToDelete->clear();
721 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
722 while (trit->hasNext()) {
723 Transaction *transaction = trit->next();
724 delete trit->currVal();
727 transactionPartsSent->clear();
730 bool Table::sendToServer(NewKey *newKey) {
731 if (hadPartialSendToServer) {
732 newKey = handlePartialSend(newKey);
736 // While we have stuff that needs inserting into the block chain
737 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
738 if (hadPartialSendToServer) {
739 throw new Error("Should Be error free");
742 // If there is a new key with same name then end
743 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
749 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
750 localSequenceNumber++;
752 // Try to fill the slot with data
754 bool insertedNewKey = false;
755 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
758 // Reset which transaction to send
759 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
760 while (trit->hasNext()) {
761 Transaction *transaction = trit->next();
762 transaction->resetNextPartToSend();
764 // Set the transaction sequence number back to nothing
765 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
766 transaction->setSequenceNumber(-1);
771 // Clear the sent data since we are trying again
774 // We needed a resize so try again
775 fillSlot(slot, true, newKey, newSize, insertedNewKey);
777 if (lastSlotAttemptedToSend != NULL)
778 delete lastSlotAttemptedToSend;
780 lastSlotAttemptedToSend = slot;
781 lastIsNewKey = (newKey != NULL);
782 lastInsertedNewKey = insertedNewKey;
783 lastNewSize = newSize;
784 if (( newKey != lastNewKey) && (lastNewKey != NULL))
787 if (lastTransactionPartsSent != NULL)
788 delete lastTransactionPartsSent;
789 lastTransactionPartsSent = transactionPartsSent->clone();
791 Array<Slot *> * newSlots = NULL;
792 bool wasInserted = false;
793 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
795 if (sendSlotsReturn) {
796 lastSlotAttemptedToSend = NULL;
797 // Did insert into the block chain
798 if (insertedNewKey) {
799 // This slot was what was inserted not a previous slot
800 // New Key was successfully inserted into the block chain so dont want to insert it again
804 // Remove the aborts and commit parts that were sent from the pending to send queue
805 uint size = pendingSendArbitrationRounds->size();
807 for (uint i = 0; i < size; i++) {
808 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
809 round->removeParts(pendingSendArbitrationEntriesToDelete);
811 if (!round->isDoneSending()) {
813 pendingSendArbitrationRounds->set(oldcount++,
814 pendingSendArbitrationRounds->get(i));
816 delete pendingSendArbitrationRounds->get(i);
818 pendingSendArbitrationRounds->setSize(oldcount);
819 processTransactionList(false);
821 // Reset which transaction to send
822 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
823 while (trit->hasNext()) {
824 Transaction *transaction = trit->next();
825 transaction->resetNextPartToSend();
827 // Set the transaction sequence number back to nothing
828 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
829 transaction->setSequenceNumber(-1);
835 // Clear the sent data in preparation for next send
838 if (newSlots->length() != 0) {
839 // insert into the local block chain
840 validateAndUpdate(newSlots, true);
844 } catch (ServerException *e) {
845 if (e->getType() != ServerException_TypeInputTimeout) {
846 // Nothing was able to be sent to the server so just clear these data structures
847 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
848 while (trit->hasNext()) {
849 Transaction *transaction = trit->next();
850 transaction->resetNextPartToSend();
852 // Set the transaction sequence number back to nothing
853 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
854 transaction->setSequenceNumber(-1);
859 // There was a partial send to the server
860 hadPartialSendToServer = true;
862 // Nothing was able to be sent to the server so just clear these data structures
863 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
864 while (trit->hasNext()) {
865 Transaction *transaction = trit->next();
866 transaction->resetNextPartToSend();
867 transaction->setServerFailure();
877 return newKey == NULL;
880 bool Table::updateFromLocal(int64_t machineId) {
881 if (!localCommunicationTable->contains(machineId))
884 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
886 // Get the size of the send data
887 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
889 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
890 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
891 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
894 Array<char> *sendData = new Array<char>(sendDataSize);
895 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
898 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
902 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
903 localSequenceNumber++;
905 if (returnData == NULL) {
906 // Could not contact server
911 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
912 int numberOfEntries = bbDecode->getInt();
914 for (int i = 0; i < numberOfEntries; i++) {
915 char type = bbDecode->get();
916 if (type == TypeAbort) {
917 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
919 } else if (type == TypeCommitPart) {
920 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
921 processEntry(commitPart);
925 updateLiveStateFromLocal();
930 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
932 // Get the devices local communications
933 if (!localCommunicationTable->contains(transaction->getArbitrator()))
934 return Pair<bool, bool>(true, false);
936 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
938 // Get the size of the send data
939 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
941 Vector<TransactionPart *> *tParts = transaction->getParts();
942 uint tPartsSize = tParts->size();
943 for (uint i = 0; i < tPartsSize; i++) {
944 TransactionPart *part = tParts->get(i);
945 sendDataSize += part->getSize();
949 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
950 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
951 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
954 // Make the send data size
955 Array<char> *sendData = new Array<char>(sendDataSize);
956 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
959 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
960 bbEncode->putInt(transaction->getParts()->size());
962 Vector<TransactionPart *> *tParts = transaction->getParts();
963 uint tPartsSize = tParts->size();
964 for (uint i = 0; i < tPartsSize; i++) {
965 TransactionPart *part = tParts->get(i);
966 part->encode(bbEncode);
971 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
972 localSequenceNumber++;
974 if (returnData == NULL) {
975 // Could not contact server
976 return Pair<bool, bool>(true, false);
980 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
981 bool didCommit = bbDecode->get() == 1;
982 bool couldArbitrate = bbDecode->get() == 1;
983 int numberOfEntries = bbDecode->getInt();
984 bool foundAbort = false;
986 for (int i = 0; i < numberOfEntries; i++) {
987 char type = bbDecode->get();
988 if (type == TypeAbort) {
989 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
991 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
996 } else if (type == TypeCommitPart) {
997 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
998 processEntry(commitPart);
1002 updateLiveStateFromLocal();
1004 if (couldArbitrate) {
1005 TransactionStatus *status = transaction->getTransactionStatus();
1007 status->setStatus(TransactionStatus_StatusCommitted);
1009 status->setStatus(TransactionStatus_StatusAborted);
1012 TransactionStatus *status = transaction->getTransactionStatus();
1014 status->setStatus(TransactionStatus_StatusAborted);
1016 status->setStatus(TransactionStatus_StatusCommitted);
1020 return Pair<bool, bool>(false, true);
1023 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1025 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1026 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1027 int numberOfParts = bbDecode->getInt();
1029 // If we did commit a transaction or not
1030 bool didCommit = false;
1031 bool couldArbitrate = false;
1033 if (numberOfParts != 0) {
1035 // decode the transaction
1036 Transaction *transaction = new Transaction();
1037 for (int i = 0; i < numberOfParts; i++) {
1039 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1040 transaction->addPartDecode(newPart);
1043 // Arbitrate on transaction and pull relevant return data
1044 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1045 couldArbitrate = localArbitrateReturn.getFirst();
1046 didCommit = localArbitrateReturn.getSecond();
1048 updateLiveStateFromLocal();
1050 // Transaction was sent to the server so keep track of it to prevent double commit
1051 if (transaction->getSequenceNumber() != -1) {
1052 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1056 // The data to send back
1057 int returnDataSize = 0;
1058 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1060 // Get the aborts to send back
1061 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1063 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1064 while (abortit->hasNext())
1065 abortLocalSequenceNumbers->add(abortit->next());
1069 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1071 uint asize = abortLocalSequenceNumbers->size();
1072 for (uint i = 0; i < asize; i++) {
1073 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1074 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1078 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1079 unseenArbitrations->add(abort);
1080 returnDataSize += abort->getSize();
1083 // Get the commits to send back
1084 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1085 if (commitForClientTable != NULL) {
1086 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1088 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1089 while (commitit->hasNext())
1090 commitLocalSequenceNumbers->add(commitit->next());
1093 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1095 uint clsSize = commitLocalSequenceNumbers->size();
1096 for (uint clsi = 0; clsi < clsSize; clsi++) {
1097 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1098 Commit *commit = commitForClientTable->get(localSequenceNumber);
1100 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1105 Vector<CommitPart *> *parts = commit->getParts();
1106 uint nParts = parts->size();
1107 for (uint i = 0; i < nParts; i++) {
1108 CommitPart *commitPart = parts->get(i);
1109 unseenArbitrations->add(commitPart);
1110 returnDataSize += commitPart->getSize();
1116 // Number of arbitration entries to decode
1117 returnDataSize += 2 * sizeof(int32_t);
1119 // bool of did commit or not
1120 if (numberOfParts != 0) {
1121 returnDataSize += sizeof(char);
1124 // Data to send Back
1125 Array<char> *returnData = new Array<char>(returnDataSize);
1126 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1128 if (numberOfParts != 0) {
1130 bbEncode->put((char)1);
1132 bbEncode->put((char)0);
1134 if (couldArbitrate) {
1135 bbEncode->put((char)1);
1137 bbEncode->put((char)0);
1141 bbEncode->putInt(unseenArbitrations->size());
1142 uint size = unseenArbitrations->size();
1143 for (uint i = 0; i < size; i++) {
1144 Entry *entry = unseenArbitrations->get(i);
1145 entry->encode(bbEncode);
1148 localSequenceNumber++;
1152 /** Checks whether a given slot was sent using new slots in
1153 array. Returns true if sent and false otherwise. */
1155 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1156 uint size = array->length();
1157 for (uint i = 0; i < size; i++) {
1158 Slot *s = array->get(i);
1159 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1164 //Also need to see if other machines acknowledged our message
1165 for (uint i = 0; i < size; i++) {
1166 Slot *s = array->get(i);
1168 // Process each entry in the slot
1169 Vector<Entry *> *entries = s->getEntries();
1170 uint eSize = entries->size();
1171 for (uint ei = 0; ei < eSize; ei++) {
1172 Entry *entry = entries->get(ei);
1174 if (entry->getType() == TypeLastMessage) {
1175 LastMessage *lastMessage = (LastMessage *)entry;
1177 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1187 /** Method tries to send slot to server. Returns status in tuple.
1188 isInserted returns whether last un-acked send (if any) was
1189 successful. Returns whether send was confirmed.x
1192 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1193 attemptedToSendToServer = true;
1195 *array = cloud->putSlot(slot, newSize);
1196 if (*array == NULL) {
1197 *array = new Array<Slot *>(1);
1198 (*array)->set(0, slot);
1199 rejectedSlotVector->clear();
1200 *isInserted = false;
1203 if ((*array)->length() == 0) {
1204 throw new Error("Server Error: Did not send any slots");
1207 if (hadPartialSendToServer) {
1208 *isInserted = checkSend(*array, slot);
1210 if (!(*isInserted)) {
1211 rejectedSlotVector->add(slot->getSequenceNumber());
1216 rejectedSlotVector->add(slot->getSequenceNumber());
1217 *isInserted = false;
1224 * Returns true if a resize was needed but not done.
1226 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1227 newSize = 0;//special value to indicate no resize
1228 if (liveSlotCount > bufferResizeThreshold) {
1229 resize = true;//Resize is forced
1233 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1234 TableStatus *status = new TableStatus(slot, newSize);
1235 slot->addShallowEntry(status);
1238 // Fill with rejected slots first before doing anything else
1239 doRejectedMessages(slot);
1241 // Do mandatory rescue of entries
1242 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1244 // Extract working variables
1245 bool needsResize = mandatoryRescueReturn.getFirst();
1246 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1247 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1249 if (needsResize && !resize) {
1250 // We need to resize but we are not resizing so return true to force on retry
1254 insertedKey = false;
1255 if (newKeyEntry != NULL) {
1256 newKeyEntry->setSlot(slot);
1257 if (slot->hasSpace(newKeyEntry)) {
1258 slot->addEntry(newKeyEntry);
1263 // Clear the transactions, aborts and commits that were sent previously
1265 uint size = pendingSendArbitrationRounds->size();
1266 for (uint i = 0; i < size; i++) {
1267 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1268 bool isFull = false;
1269 round->generateParts();
1270 Vector<Entry *> *parts = round->getParts();
1272 // Insert pending arbitration data
1273 uint vsize = parts->size();
1274 for (uint vi = 0; vi < vsize; vi++) {
1275 Entry *arbitrationData = parts->get(vi);
1277 // If it is an abort then we need to set some information
1278 if (arbitrationData->getType() == TypeAbort) {
1279 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1282 if (!slot->hasSpace(arbitrationData)) {
1283 // No space so cant do anything else with these data entries
1288 // Add to this current slot and add it to entries to delete
1289 slot->addEntry(arbitrationData);
1290 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1298 if (pendingTransactionQueue->size() > 0) {
1299 Transaction *transaction = pendingTransactionQueue->get(0);
1300 // Set the transaction sequence number if it has yet to be inserted into the block chain
1301 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1302 transaction->setSequenceNumber(slot->getSequenceNumber());
1306 TransactionPart *part = transaction->getNextPartToSend();
1308 // Ran out of parts to send for this transaction so move on
1312 if (slot->hasSpace(part)) {
1313 slot->addEntry(part);
1314 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1315 if (partsSent == NULL) {
1316 partsSent = new Vector<int32_t>();
1317 transactionPartsSent->put(transaction, partsSent);
1319 partsSent->add(part->getPartNumber());
1320 transactionPartsSent->put(transaction, partsSent);
1327 // Fill the remainder of the slot with rescue data
1328 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1333 void Table::doRejectedMessages(Slot *s) {
1334 if (!rejectedSlotVector->isEmpty()) {
1335 /* TODO: We should avoid generating a rejected message entry if
1336 * there is already a sufficient entry in the queue (e->g->,
1337 * equalsto value of true and same sequence number)-> */
1339 int64_t old_seqn = rejectedSlotVector->get(0);
1340 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1341 int64_t new_seqn = rejectedSlotVector->lastElement();
1342 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1343 s->addShallowEntry(rm);
1345 int64_t prev_seqn = -1;
1347 /* Go through list of missing messages */
1348 for (; i < rejectedSlotVector->size(); i++) {
1349 int64_t curr_seqn = rejectedSlotVector->get(i);
1350 Slot *s_msg = buffer->getSlot(curr_seqn);
1353 prev_seqn = curr_seqn;
1355 /* Generate rejected message entry for missing messages */
1356 if (prev_seqn != -1) {
1357 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1358 s->addShallowEntry(rm);
1360 /* Generate rejected message entries for present messages */
1361 for (; i < rejectedSlotVector->size(); i++) {
1362 int64_t curr_seqn = rejectedSlotVector->get(i);
1363 Slot *s_msg = buffer->getSlot(curr_seqn);
1364 int64_t machineid = s_msg->getMachineID();
1365 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1366 s->addShallowEntry(rm);
1372 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1373 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1374 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1375 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1376 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1379 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1380 bool seenLiveSlot = false;
1381 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1382 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1386 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1387 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1388 // Push slot number forward
1389 if (!seenLiveSlot) {
1390 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1393 if (!previousSlot->isLive()) {
1397 // We have seen a live slot
1398 seenLiveSlot = true;
1400 // Get all the live entries for a slot
1401 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1403 // Iterate over all the live entries and try to rescue them
1404 uint lESize = liveEntries->size();
1405 for (uint i = 0; i < lESize; i++) {
1406 Entry *liveEntry = liveEntries->get(i);
1407 if (slot->hasSpace(liveEntry)) {
1408 // Enough space to rescue the entry
1409 slot->addEntry(liveEntry);
1410 } else if (currentSequenceNumber == firstIfFull) {
1411 //if there's no space but the entry is about to fall off the queue
1412 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1418 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1421 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1422 /* now go through live entries from least to greatest sequence number until
1423 * either all live slots added, or the slot doesn't have enough room
1424 * for SKIP_THRESHOLD consecutive entries*/
1426 int64_t newestseqnum = buffer->getNewestSeqNum();
1427 for (; seqn <= newestseqnum; seqn++) {
1428 Slot *prevslot = buffer->getSlot(seqn);
1429 //Push slot number forward
1431 oldestLiveSlotSequenceNumver = seqn;
1433 if (!prevslot->isLive())
1435 seenliveslot = true;
1436 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1437 uint lESize = liveentries->size();
1438 for (uint i = 0; i < lESize; i++) {
1439 Entry *liveentry = liveentries->get(i);
1440 if (s->hasSpace(liveentry))
1441 s->addEntry(liveentry);
1444 if (skipcount > Table_SKIP_THRESHOLD) {
1457 * Checks for malicious activity and updates the local copy of the block chain->
1459 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1460 // The cloud communication layer has checked slot HMACs already
1462 if (newSlots->length() == 0) {
1466 // Make sure all slots are newer than the last largest slot this
1468 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1469 if (firstSeqNum <= sequenceNumber) {
1470 throw new Error("Server Error: Sent older slots!");
1473 // Create an object that can access both new slots and slots in our
1474 // local chain without committing slots to our local chain
1475 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1477 // Check that the HMAC chain is not broken
1478 checkHMACChain(indexer, newSlots);
1480 // Set to keep track of messages from clients
1481 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1483 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1484 while (lmit->hasNext())
1485 machineSet->add(lmit->next());
1489 // Process each slots data
1491 uint numSlots = newSlots->length();
1492 for (uint i = 0; i < numSlots; i++) {
1493 Slot *slot = newSlots->get(i);
1494 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1495 updateExpectedSize();
1500 // If there is a gap, check to see if the server sent us
1502 if (firstSeqNum != (sequenceNumber + 1)) {
1504 // Check the size of the slots that were sent down by the server->
1505 // Can only check the size if there was a gap
1506 checkNumSlots(newSlots->length());
1508 // Since there was a gap every machine must have pushed a slot or
1509 // must have a last message message-> If not then the server is
1511 if (!machineSet->isEmpty()) {
1513 throw new Error("Missing record for machines: ");
1517 // Update the size of our local block chain->
1520 // Commit new to slots to the local block chain->
1522 uint numSlots = newSlots->length();
1523 for (uint i = 0; i < numSlots; i++) {
1524 Slot *slot = newSlots->get(i);
1526 // Insert this slot into our local block chain copy->
1527 buffer->putSlot(slot);
1529 // Keep track of how many slots are currently live (have live data
1534 // Get the sequence number of the latest slot in the system
1535 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1536 updateLiveStateFromServer();
1538 // No Need to remember after we pulled from the server
1539 offlineTransactionsCommittedAndAtServer->clear();
1541 // This is invalidated now
1542 hadPartialSendToServer = false;
1545 void Table::updateLiveStateFromServer() {
1546 // Process the new transaction parts
1547 processNewTransactionParts();
1549 // Do arbitration on new transactions that were received
1550 arbitrateFromServer();
1552 // Update all the committed keys
1553 bool didCommitOrSpeculate = updateCommittedTable();
1555 // Delete the transactions that are now dead
1556 updateLiveTransactionsAndStatus();
1559 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1560 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1563 void Table::updateLiveStateFromLocal() {
1564 // Update all the committed keys
1565 bool didCommitOrSpeculate = updateCommittedTable();
1567 // Delete the transactions that are now dead
1568 updateLiveTransactionsAndStatus();
1571 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1572 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1575 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1576 int64_t prevslots = firstSequenceNumber;
1578 if (didFindTableStatus) {
1580 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1583 didFindTableStatus = true;
1584 currMaxSize = numberOfSlots;
1587 void Table::updateExpectedSize() {
1590 if (expectedsize > currMaxSize) {
1591 expectedsize = currMaxSize;
1597 * Check the size of the block chain to make sure there are enough
1598 * slots sent back by the server-> This is only called when we have a
1599 * gap between the slots that we have locally and the slots sent by
1600 * the server therefore in the slots sent by the server there will be
1601 * at least 1 Table status message
1603 void Table::checkNumSlots(int numberOfSlots) {
1604 if (numberOfSlots != expectedsize) {
1605 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1610 * Update the size of of the local buffer if it is needed->
1612 void Table::commitNewMaxSize() {
1613 didFindTableStatus = false;
1615 // Resize the local slot buffer
1616 if (numberOfSlots != currMaxSize) {
1617 buffer->resize((int32_t)currMaxSize);
1620 // Change the number of local slots to the new size
1621 numberOfSlots = (int32_t)currMaxSize;
1623 // Recalculate the resize threshold since the size of the local
1624 // buffer has changed
1625 setResizeThreshold();
1629 * Process the new transaction parts from this latest round of slots
1630 * received from the server
1632 void Table::processNewTransactionParts() {
1634 if (newTransactionParts->size() == 0) {
1635 // Nothing new to process
1639 // Iterate through all the machine Ids that we received new parts
1641 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1642 while (tpit->hasNext()) {
1643 int64_t machineId = tpit->next();
1644 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
1646 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1647 // Iterate through all the parts for that machine Id
1648 while (ptit->hasNext()) {
1649 Pair<int64_t, int32_t> *partId = ptit->next();
1650 TransactionPart *part = parts->get(partId);
1652 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1653 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1654 if (lastTransactionNumber >= part->getSequenceNumber()) {
1655 // Set dead the transaction part
1662 // Get the transaction object for that sequence number
1663 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1665 if (transaction == NULL) {
1666 // This is a new transaction that we dont have so make a new one
1667 transaction = new Transaction();
1669 // Add that part to the transaction
1670 transaction->addPartDecode(part);
1672 // Insert this new transaction into the live tables
1673 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1674 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1681 // Clear all the new transaction parts in preparation for the next
1682 // time the server sends slots
1684 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1685 while (partsit->hasNext()) {
1686 int64_t machineId = partsit->next();
1687 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1691 newTransactionParts->clear();
1695 void Table::arbitrateFromServer() {
1696 if (liveTransactionBySequenceNumberTable->size() == 0) {
1697 // Nothing to arbitrate on so move on
1701 // Get the transaction sequence numbers and sort from oldest to newest
1702 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1704 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1705 while (trit->hasNext())
1706 transactionSequenceNumbers->add(trit->next());
1709 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1711 // Collection of key value pairs that are
1712 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1714 // The last transaction arbitrated on
1715 int64_t lastTransactionCommitted = -1;
1716 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1717 uint tsnSize = transactionSequenceNumbers->size();
1718 for (uint i = 0; i < tsnSize; i++) {
1719 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1720 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1722 // Check if this machine arbitrates for this transaction if not
1723 // then we cant arbitrate this transaction
1724 if (transaction->getArbitrator() != localMachineId) {
1728 if (transactionSequenceNumber < lastSeqNumArbOn) {
1732 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1733 // We have seen this already locally so dont commit again
1737 if (!transaction->isComplete()) {
1738 // Will arbitrate in incorrect order if we continue so just break
1743 // update the largest transaction seen by arbitrator from server
1744 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1745 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1747 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1748 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1749 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1753 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1754 // Guard evaluated as true
1755 // Update the local changes so we can make the commit
1756 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1757 while (kvit->hasNext()) {
1758 KeyValue *kv = kvit->next();
1759 speculativeTableTmp->put(kv->getKey(), kv);
1763 // Update what the last transaction committed was for use in batch commit
1764 lastTransactionCommitted = transactionSequenceNumber;
1766 // Guard evaluated was false so create abort
1768 Abort *newAbort = new Abort(NULL,
1769 transaction->getClientLocalSequenceNumber(),
1770 transaction->getSequenceNumber(),
1771 transaction->getMachineId(),
1772 transaction->getArbitrator(),
1773 localArbitrationSequenceNumber);
1774 localArbitrationSequenceNumber++;
1775 generatedAborts->add(newAbort);
1777 // Insert the abort so we can process
1778 processEntry(newAbort);
1781 lastSeqNumArbOn = transactionSequenceNumber;
1784 delete transactionSequenceNumbers;
1786 Commit *newCommit = NULL;
1788 // If there is something to commit
1789 if (speculativeTableTmp->size() != 0) {
1790 // Create the commit and increment the commit sequence number
1791 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1792 localArbitrationSequenceNumber++;
1794 // Add all the new keys to the commit
1795 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1796 while (spit->hasNext()) {
1797 IoTString *string = spit->next();
1798 KeyValue *kv = speculativeTableTmp->get(string);
1799 newCommit->addKV(kv);
1803 // create the commit parts
1804 newCommit->createCommitParts();
1806 // Append all the commit parts to the end of the pending queue
1807 // waiting for sending to the server
1808 // Insert the commit so we can process it
1809 Vector<CommitPart *> *parts = newCommit->getParts();
1810 uint partsSize = parts->size();
1811 for (uint i = 0; i < partsSize; i++) {
1812 CommitPart *commitPart = parts->get(i);
1813 processEntry(commitPart);
1816 delete speculativeTableTmp;
1818 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1819 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1820 pendingSendArbitrationRounds->add(arbitrationRound);
1822 if (compactArbitrationData()) {
1823 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1824 if (newArbitrationRound->getCommit() != NULL) {
1825 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1826 uint partsSize = parts->size();
1827 for (uint i = 0; i < partsSize; i++) {
1828 CommitPart *commitPart = parts->get(i);
1829 processEntry(commitPart);
1834 delete generatedAborts;
1838 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1840 // Check if this machine arbitrates for this transaction if not then
1841 // we cant arbitrate this transaction
1842 if (transaction->getArbitrator() != localMachineId) {
1843 return Pair<bool, bool>(false, false);
1846 if (!transaction->isComplete()) {
1847 // Will arbitrate in incorrect order if we continue so just break
1849 return Pair<bool, bool>(false, false);
1852 if (transaction->getMachineId() != localMachineId) {
1853 // dont do this check for local transactions
1854 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1855 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1856 // We've have already seen this from the server
1857 return Pair<bool, bool>(false, false);
1862 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1863 // Guard evaluated as true Create the commit and increment the
1864 // commit sequence number
1865 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1866 localArbitrationSequenceNumber++;
1868 // Update the local changes so we can make the commit
1869 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1870 while (kvit->hasNext()) {
1871 KeyValue *kv = kvit->next();
1872 newCommit->addKV(kv);
1876 // create the commit parts
1877 newCommit->createCommitParts();
1879 // Append all the commit parts to the end of the pending queue
1880 // waiting for sending to the server
1881 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1882 pendingSendArbitrationRounds->add(arbitrationRound);
1884 if (compactArbitrationData()) {
1885 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1886 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1887 uint partsSize = parts->size();
1888 for (uint i = 0; i < partsSize; i++) {
1889 CommitPart *commitPart = parts->get(i);
1890 processEntry(commitPart);
1893 // Insert the commit so we can process it
1894 Vector<CommitPart *> *parts = newCommit->getParts();
1895 uint partsSize = parts->size();
1896 for (uint i = 0; i < partsSize; i++) {
1897 CommitPart *commitPart = parts->get(i);
1898 processEntry(commitPart);
1902 if (transaction->getMachineId() == localMachineId) {
1903 TransactionStatus *status = transaction->getTransactionStatus();
1904 if (status != NULL) {
1905 status->setStatus(TransactionStatus_StatusCommitted);
1909 updateLiveStateFromLocal();
1910 return Pair<bool, bool>(true, true);
1912 if (transaction->getMachineId() == localMachineId) {
1913 // For locally created messages update the status
1914 // Guard evaluated was false so create abort
1915 TransactionStatus *status = transaction->getTransactionStatus();
1916 if (status != NULL) {
1917 status->setStatus(TransactionStatus_StatusAborted);
1920 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1923 Abort *newAbort = new Abort(NULL,
1924 transaction->getClientLocalSequenceNumber(),
1926 transaction->getMachineId(),
1927 transaction->getArbitrator(),
1928 localArbitrationSequenceNumber);
1929 localArbitrationSequenceNumber++;
1930 addAbortSet->add(newAbort);
1932 // Append all the commit parts to the end of the pending queue
1933 // waiting for sending to the server
1934 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1935 pendingSendArbitrationRounds->add(arbitrationRound);
1937 if (compactArbitrationData()) {
1938 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1940 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1941 uint partsSize = parts->size();
1942 for (uint i = 0; i < partsSize; i++) {
1943 CommitPart *commitPart = parts->get(i);
1944 processEntry(commitPart);
1949 updateLiveStateFromLocal();
1950 return Pair<bool, bool>(true, false);
1955 * Compacts the arbitration data by merging commits and aggregating
1956 * aborts so that a single large push of commits can be done instead
1957 * of many small updates
1959 bool Table::compactArbitrationData() {
1960 if (pendingSendArbitrationRounds->size() < 2) {
1961 // Nothing to compact so do nothing
1965 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1966 if (lastRound->getDidSendPart()) {
1970 bool hadCommit = (lastRound->getCommit() == NULL);
1971 bool gotNewCommit = false;
1973 uint numberToDelete = 1;
1975 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1976 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1978 if (round->isFull() || round->getDidSendPart()) {
1979 // Stop since there is a part that cannot be compacted and we
1980 // need to compact in order
1984 if (round->getCommit() == NULL) {
1985 // Try compacting aborts only
1986 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1987 if (newSize > ArbitrationRound_MAX_PARTS) {
1988 // Cant compact since it would be too large
1991 lastRound->addAborts(round->getAborts());
1993 // Create a new larger commit
1994 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1995 localArbitrationSequenceNumber++;
1997 // Create the commit parts so that we can count them
1998 newCommit->createCommitParts();
2000 // Calculate the new size of the parts
2001 int newSize = newCommit->getNumberOfParts();
2002 newSize += lastRound->getAbortsCount();
2003 newSize += round->getAbortsCount();
2005 if (newSize > ArbitrationRound_MAX_PARTS) {
2006 // Can't compact since it would be too large
2007 if (lastRound->getCommit() != newCommit &&
2008 round->getCommit() != newCommit)
2012 // Set the new compacted part
2013 if (lastRound->getCommit() == newCommit)
2014 lastRound->setCommit(NULL);
2015 if (round->getCommit() == newCommit)
2016 round->setCommit(NULL);
2018 if (lastRound->getCommit() != NULL) {
2019 Commit * oldcommit = lastRound->getCommit();
2020 lastRound->setCommit(NULL);
2023 lastRound->setCommit(newCommit);
2024 lastRound->addAborts(round->getAborts());
2025 gotNewCommit = true;
2031 if (numberToDelete != 1) {
2032 // If there is a compaction
2033 // Delete the previous pieces that are now in the new compacted piece
2034 for (uint i = 2; i <= numberToDelete; i++) {
2035 delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2037 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2039 pendingSendArbitrationRounds->add(lastRound);
2041 // Should reinsert into the commit processor
2042 if (hadCommit && gotNewCommit) {
2051 * Update all the commits and the committed tables, sets dead the dead
2054 bool Table::updateCommittedTable() {
2055 if (newCommitParts->size() == 0) {
2056 // Nothing new to process
2060 // Iterate through all the machine Ids that we received new parts for
2061 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2062 while (partsit->hasNext()) {
2063 int64_t machineId = partsit->next();
2064 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2066 // Iterate through all the parts for that machine Id
2067 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2068 while (pairit->hasNext()) {
2069 Pair<int64_t, int32_t> *partId = pairit->next();
2070 CommitPart *part = pairit->currVal();
2072 // Get the transaction object for that sequence number
2073 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2075 if (commitForClientTable == NULL) {
2076 // This is the first commit from this device
2077 commitForClientTable = new Hashtable<int64_t, Commit *>();
2078 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2081 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2083 if (commit == NULL) {
2084 // This is a new commit that we dont have so make a new one
2085 commit = new Commit();
2087 // Insert this new commit into the live tables
2088 commitForClientTable->put(part->getSequenceNumber(), commit);
2091 // Add that part to the commit
2092 commit->addPartDecode(part);
2100 // Clear all the new commits parts in preparation for the next time
2101 // the server sends slots
2102 newCommitParts->clear();
2104 // If we process a new commit keep track of it for future use
2105 bool didProcessANewCommit = false;
2107 // Process the commits one by one
2108 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2109 while (liveit->hasNext()) {
2110 int64_t arbitratorId = liveit->next();
2111 // Get all the commits for a specific arbitrator
2112 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2114 // Sort the commits in order
2115 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2117 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2118 while (clientit->hasNext())
2119 commitSequenceNumbers->add(clientit->next());
2123 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2125 // Get the last commit seen from this arbitrator
2126 int64_t lastCommitSeenSequenceNumber = -1;
2127 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2128 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2131 // Go through each new commit one by one
2132 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2133 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2134 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2135 // Special processing if a commit is not complete
2136 if (!commit->isComplete()) {
2137 if (i == (commitSequenceNumbers->size() - 1)) {
2138 // If there is an incomplete commit and this commit is the
2139 // latest one seen then this commit cannot be processed and
2140 // there are no other commits
2143 // This is a commit that was already dead but parts of it
2144 // are still in the block chain (not flushed out yet)->
2145 // Delete it and move on
2147 commitForClientTable->remove(commit->getSequenceNumber());
2153 // Update the last transaction that was updated if we can
2154 if (commit->getTransactionSequenceNumber() != -1) {
2155 // Update the last transaction sequence number that the arbitrator arbitrated on1
2156 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2157 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2161 // Update the last arbitration data that we have seen so far
2162 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2163 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2164 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2166 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2169 // Never seen any data from this arbitrator so record the first one
2170 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2173 // We have already seen this commit before so need to do the
2174 // full processing on this commit
2175 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2176 // Update the last transaction that was updated if we can
2177 if (commit->getTransactionSequenceNumber() != -1) {
2178 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2179 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2180 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2181 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2187 // If we got here then this is a brand new commit and needs full
2189 // Get what commits should be edited, these are the commits that
2190 // have live values for their keys
2191 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2193 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2194 while (kvit->hasNext()) {
2195 KeyValue *kv = kvit->next();
2196 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2198 commitsToEdit->add(commit);
2203 // Update each previous commit that needs to be updated
2204 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2205 while (commitit->hasNext()) {
2206 Commit *previousCommit = commitit->next();
2208 // Only bother with live commits (TODO: Maybe remove this check)
2209 if (previousCommit->isLive()) {
2211 // Update which keys in the old commits are still live
2213 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2214 while (kvit->hasNext()) {
2215 KeyValue *kv = kvit->next();
2216 previousCommit->invalidateKey(kv->getKey());
2221 // if the commit is now dead then remove it
2222 if (!previousCommit->isLive()) {
2223 commitForClientTable->remove(previousCommit->getSequenceNumber());
2224 delete previousCommit;
2229 delete commitsToEdit;
2231 // Update the last seen sequence number from this arbitrator
2232 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2233 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2234 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2237 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2240 // We processed a new commit that we havent seen before
2241 didProcessANewCommit = true;
2243 // Update the committed table of keys and which commit is using which key
2245 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2246 while (kvit->hasNext()) {
2247 KeyValue *kv = kvit->next();
2248 committedKeyValueTable->put(kv->getKey(), kv);
2249 liveCommitsByKeyTable->put(kv->getKey(), commit);
2254 delete commitSequenceNumbers;
2258 return didProcessANewCommit;
2262 * Create the speculative table from transactions that are still live
2263 * and have come from the cloud
2265 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2266 if (liveTransactionBySequenceNumberTable->size() == 0) {
2267 // There is nothing to speculate on
2271 // Create a list of the transaction sequence numbers and sort them
2272 // from oldest to newest
2273 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2275 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2276 while (trit->hasNext())
2277 transactionSequenceNumbersSorted->add(trit->next());
2281 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2283 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2286 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2287 // If there is a gap in the transaction sequence numbers then
2288 // there was a commit or an abort of a transaction OR there was a
2289 // new commit (Could be from offline commit) so a redo the
2290 // speculation from scratch
2292 // Start from scratch
2293 speculatedKeyValueTable->clear();
2294 lastTransactionSequenceNumberSpeculatedOn = -1;
2295 oldestTransactionSequenceNumberSpeculatedOn = -1;
2298 // Remember the front of the transaction list
2299 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2301 // Find where to start arbitration from
2302 uint startIndex = 0;
2304 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2305 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2309 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2310 // Make sure we are not out of bounds
2311 delete transactionSequenceNumbersSorted;
2312 return false; // did not speculate
2315 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2316 bool didSkip = true;
2318 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2319 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2320 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2322 if (!transaction->isComplete()) {
2323 // If there is an incomplete transaction then there is nothing
2324 // we can do add this transactions arbitrator to the list of
2325 // arbitrators we should ignore
2326 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2331 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2335 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2337 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2338 // Guard evaluated to true so update the speculative table
2340 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2341 while (kvit->hasNext()) {
2342 KeyValue *kv = kvit->next();
2343 speculatedKeyValueTable->put(kv->getKey(), kv);
2350 delete transactionSequenceNumbersSorted;
2353 // Since there was a skip we need to redo the speculation next time around
2354 lastTransactionSequenceNumberSpeculatedOn = -1;
2355 oldestTransactionSequenceNumberSpeculatedOn = -1;
2358 // We did some speculation
2363 * Create the pending transaction speculative table from transactions
2364 * that are still in the pending transaction buffer
2366 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2367 if (pendingTransactionQueue->size() == 0) {
2368 // There is nothing to speculate on
2372 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2373 // need to reset on the pending speculation
2374 lastPendingTransactionSpeculatedOn = NULL;
2375 firstPendingTransaction = pendingTransactionQueue->get(0);
2376 pendingTransactionSpeculatedKeyValueTable->clear();
2379 // Find where to start arbitration from
2380 uint startIndex = 0;
2382 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2383 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2386 if (startIndex >= pendingTransactionQueue->size()) {
2387 // Make sure we are not out of bounds
2391 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2392 Transaction *transaction = pendingTransactionQueue->get(i);
2394 lastPendingTransactionSpeculatedOn = transaction;
2396 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2397 // Guard evaluated to true so update the speculative table
2398 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2399 while (kvit->hasNext()) {
2400 KeyValue *kv = kvit->next();
2401 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2409 * Set dead and remove from the live transaction tables the
2410 * transactions that are dead
2412 void Table::updateLiveTransactionsAndStatus() {
2413 // Go through each of the transactions
2415 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2416 while (iter->hasNext()) {
2417 int64_t key = iter->next();
2418 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2420 // Check if the transaction is dead
2421 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2422 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2423 // Set dead the transaction
2424 transaction->setDead();
2426 // Remove the transaction from the live table
2428 liveTransactionByTransactionIdTable->remove(transaction->getId());
2435 // Go through each of the transactions
2437 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2438 while (iter->hasNext()) {
2439 int64_t key = iter->next();
2440 TransactionStatus *status = outstandingTransactionStatus->get(key);
2442 // Check if the transaction is dead
2443 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2444 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2446 status->setStatus(TransactionStatus_StatusCommitted);
2457 * Process this slot, entry by entry-> Also update the latest message sent by slot
2459 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2461 // Update the last message seen
2462 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2464 // Process each entry in the slot
2465 Vector<Entry *> *entries = slot->getEntries();
2466 uint eSize = entries->size();
2467 for (uint ei = 0; ei < eSize; ei++) {
2468 Entry *entry = entries->get(ei);
2469 switch (entry->getType()) {
2470 case TypeCommitPart:
2471 processEntry((CommitPart *)entry);
2474 processEntry((Abort *)entry);
2476 case TypeTransactionPart:
2477 processEntry((TransactionPart *)entry);
2480 processEntry((NewKey *)entry);
2482 case TypeLastMessage:
2483 processEntry((LastMessage *)entry, machineSet);
2485 case TypeRejectedMessage:
2486 processEntry((RejectedMessage *)entry, indexer);
2488 case TypeTableStatus:
2489 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2492 throw new Error("Unrecognized type: ");
2498 * Update the last message that was sent for a machine Id
2500 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2501 // Update what the last message received by a machine was
2502 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2506 * Add the new key to the arbitrators table and update the set of live
2507 * new keys (in case of a rescued new key message)
2509 void Table::processEntry(NewKey *entry) {
2510 // Update the arbitrator table with the new key information
2511 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2513 // Update what the latest live new key is
2514 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2515 if (oldNewKey != NULL) {
2516 // Delete the old new key messages
2517 oldNewKey->setDead();
2522 * Process new table status entries and set dead the old ones as new
2523 * ones come in-> keeps track of the largest and smallest table status
2524 * seen in this current round of updating the local copy of the block
2527 void Table::processEntry(TableStatus *entry, int64_t seq) {
2528 int newNumSlots = entry->getMaxSlots();
2529 updateCurrMaxSize(newNumSlots);
2530 initExpectedSize(seq, newNumSlots);
2532 if (liveTableStatus != NULL) {
2533 // We have a larger table status so the old table status is no
2535 liveTableStatus->setDead();
2538 // Make this new table status the latest alive table status
2539 liveTableStatus = entry;
2543 * Check old messages to see if there is a block chain violation->
2546 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2547 int64_t oldSeqNum = entry->getOldSeqNum();
2548 int64_t newSeqNum = entry->getNewSeqNum();
2549 bool isequal = entry->getEqual();
2550 int64_t machineId = entry->getMachineID();
2551 int64_t seq = entry->getSequenceNumber();
2553 // Check if we have messages that were supposed to be rejected in
2554 // our local block chain
2555 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2557 Slot *slot = indexer->getSlot(seqNum);
2560 // If we have this slot make sure that it was not supposed to be
2562 int64_t slotMachineId = slot->getMachineID();
2563 if (isequal != (slotMachineId == machineId)) {
2564 throw new Error("Server Error: Trying to insert rejected message for slot ");
2569 // Create a list of clients to watch until they see this rejected
2571 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2572 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2573 while (iter->hasNext()) {
2574 // Machine ID for the last message entry
2575 int64_t lastMessageEntryMachineId = iter->next();
2577 // We've seen it, don't need to continue to watch-> Our next
2578 // message will implicitly acknowledge it->
2579 if (lastMessageEntryMachineId == localMachineId) {
2583 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2584 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2586 if (entrySequenceNumber < seq) {
2587 // Add this rejected message to the set of messages that this
2588 // machine ID did not see yet
2589 addWatchVector(lastMessageEntryMachineId, entry);
2590 // This client did not see this rejected message yet so add it
2591 // to the watch set to monitor
2592 deviceWatchSet->add(lastMessageEntryMachineId);
2597 if (deviceWatchSet->isEmpty()) {
2598 // This rejected message has been seen by all the clients so
2600 delete deviceWatchSet;
2602 // We need to watch this rejected message
2603 entry->setWatchSet(deviceWatchSet);
2608 * Check if this abort is live, if not then save it so we can kill it
2609 * later-> update the last transaction number that was arbitrated on->
2611 void Table::processEntry(Abort *entry) {
2612 if (entry->getTransactionSequenceNumber() != -1) {
2613 // update the transaction status if it was sent to the server
2614 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2615 if (status != NULL) {
2616 status->setStatus(TransactionStatus_StatusAborted);
2620 // Abort has not been seen by the client it is for yet so we need to
2623 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2624 if (previouslySeenAbort != NULL) {
2625 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2628 if (entry->getTransactionArbitrator() == localMachineId) {
2629 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2632 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2633 // The machine already saw this so it is dead
2635 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2636 liveAbortTable->remove(&abortid);
2638 if (entry->getTransactionArbitrator() == localMachineId) {
2639 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2644 // Update the last arbitration data that we have seen so far
2645 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2646 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2647 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2649 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2652 // Never seen any data from this arbitrator so record the first one
2653 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2656 // Set dead a transaction if we can
2657 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2659 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2660 if (transactionToSetDead != NULL) {
2661 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2664 // Update the last transaction sequence number that the arbitrator
2666 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2667 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2669 if (entry->getTransactionSequenceNumber() != -1) {
2670 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2676 * Set dead the transaction part if that transaction is dead and keep
2677 * track of all new parts
2679 void Table::processEntry(TransactionPart *entry) {
2680 // Check if we have already seen this transaction and set it dead OR
2681 // if it is not alive
2682 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2683 // This transaction is dead, it was already committed or aborted
2688 // This part is still alive
2689 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2691 if (transactionPart == NULL) {
2692 // Dont have a table for this machine Id yet so make one
2693 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2694 newTransactionParts->put(entry->getMachineId(), transactionPart);
2697 // Update the part and set dead ones we have already seen (got a
2699 entry->acquireRef();
2700 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2701 if (previouslySeenPart != NULL) {
2702 previouslySeenPart->releaseRef();
2703 previouslySeenPart->setDead();
2708 * Process new commit entries and save them for future use-> Delete duplicates
2710 void Table::processEntry(CommitPart *entry) {
2711 // Update the last transaction that was updated if we can
2712 if (entry->getTransactionSequenceNumber() != -1) {
2713 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2714 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2715 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2719 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2720 if (commitPart == NULL) {
2721 // Don't have a table for this machine Id yet so make one
2722 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2723 newCommitParts->put(entry->getMachineId(), commitPart);
2725 // Update the part and set dead ones we have already seen (got a
2727 entry->acquireRef();
2728 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2729 if (previouslySeenPart != NULL) {
2730 previouslySeenPart->setDead();
2731 previouslySeenPart->releaseRef();
2736 * Update the last message seen table-> Update and set dead the
2737 * appropriate RejectedMessages as clients see them-> Updates the live
2738 * aborts, removes those that are dead and sets them dead-> Check that
2739 * the last message seen is correct and that there is no mismatch of
2740 * our own last message or that other clients have not had a rollback
2741 * on the last message->
2743 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2744 // We have seen this machine ID
2745 machineSet->remove(machineId);
2747 // Get the set of rejected messages that this machine Id is has not seen yet
2748 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2749 // If there is a rejected message that this machine Id has not seen yet
2750 if (watchset != NULL) {
2751 // Go through each rejected message that this machine Id has not
2754 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2755 while (rmit->hasNext()) {
2756 RejectedMessage *rm = rmit->next();
2757 // If this machine Id has seen this rejected message->->->
2758 if (rm->getSequenceNumber() <= seqNum) {
2759 // Remove it from our watchlist
2761 // Decrement machines that need to see this notification
2762 rm->removeWatcher(machineId);
2768 // Set dead the abort
2769 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2771 while (abortit->hasNext()) {
2772 Pair<int64_t, int64_t> *key = abortit->next();
2773 Abort *abort = liveAbortTable->get(key);
2774 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2777 if (abort->getTransactionArbitrator() == localMachineId) {
2778 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2783 if (machineId == localMachineId) {
2784 // Our own messages are immediately dead->
2785 char livenessType = liveness->getType();
2786 if (livenessType == TypeLastMessage) {
2787 ((LastMessage *)liveness)->setDead();
2788 } else if (livenessType == TypeSlot) {
2789 ((Slot *)liveness)->setDead();
2791 throw new Error("Unrecognized type");
2794 // Get the old last message for this device
2795 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2796 if (lastMessageEntry == NULL) {
2797 // If no last message then there is nothing else to process
2801 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2802 Liveness *lastEntry = lastMessageEntry->getSecond();
2803 delete lastMessageEntry;
2805 // If it is not our machine Id since we already set ours to dead
2806 if (machineId != localMachineId) {
2807 char lastEntryType = lastEntry->getType();
2809 if (lastEntryType == TypeLastMessage) {
2810 ((LastMessage *)lastEntry)->setDead();
2811 } else if (lastEntryType == TypeSlot) {
2812 ((Slot *)lastEntry)->setDead();
2814 throw new Error("Unrecognized type");
2817 // Make sure the server is not playing any games
2818 if (machineId == localMachineId) {
2819 if (hadPartialSendToServer) {
2820 // We were not making any updates and we had a machine mismatch
2821 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2822 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2825 // We were not making any updates and we had a machine mismatch
2826 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2827 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2831 if (lastMessageSeqNum > seqNum) {
2832 throw new Error("Server Error: Rollback on remote machine sequence number");
2838 * Add a rejected message entry to the watch set to keep track of
2839 * which clients have seen that rejected message entry and which have
2842 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2843 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2844 if (entries == NULL) {
2845 // There is no set for this machine ID yet so create one
2846 entries = new Hashset<RejectedMessage *>();
2847 rejectedMessageWatchVectorTable->put(machineId, entries);
2849 entries->add(entry);
2853 * Check if the HMAC chain is not violated
2855 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2856 for (uint i = 0; i < newSlots->length(); i++) {
2857 Slot *currSlot = newSlots->get(i);
2858 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2859 if (prevSlot != NULL &&
2860 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2861 throw new Error("Server Error: Invalid HMAC Chain");