3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localSequenceNumber(0),
112 localTransactionSequenceNumber(0),
113 lastTransactionSequenceNumberSpeculatedOn(0),
114 oldestTransactionSequenceNumberSpeculatedOn(0),
115 localArbitrationSequenceNumber(0),
116 hadPartialSendToServer(false),
117 attemptedToSendToServer(false),
119 didFindTableStatus(false),
121 lastSlotAttemptedToSend(NULL),
124 lastTransactionPartsSent(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
164 delete committedKeyValueTable;
165 delete speculatedKeyValueTable;
166 delete pendingTransactionSpeculatedKeyValueTable;
167 delete liveNewKeyTable;
169 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170 while (lmit->hasNext()) {
171 Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
175 delete lastMessageTable;
177 if (pendingTransactionBuilder != NULL)
178 delete pendingTransactionBuilder;
180 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
181 while(rmit->hasNext()) {
182 int64_t machineid = rmit->next();
183 Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
184 SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185 while (mit->hasNext()) {
186 RejectedMessage * rm = mit->next();
193 delete rejectedMessageWatchVectorTable;
195 delete arbitratorTable;
196 delete liveAbortTable;
198 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199 while (partsit->hasNext()) {
200 int64_t machineId = partsit->next();
201 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
203 while(pit->hasNext()) {
204 Pair<int64_t, int32_t> * pair=pit->next();
205 pit->currVal()->releaseRef();
212 delete newTransactionParts;
215 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
216 while (partsit->hasNext()) {
217 int64_t machineId = partsit->next();
218 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
219 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
220 while(pit->hasNext()) {
221 Pair<int64_t, int32_t> * pair=pit->next();
222 pit->currVal()->releaseRef();
228 delete newCommitParts;
230 delete lastArbitratedTransactionNumberByArbitratorTable;
231 delete liveTransactionBySequenceNumberTable;
232 delete liveTransactionByTransactionIdTable;
234 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
235 while (liveit->hasNext()) {
236 int64_t arbitratorId = liveit->next();
238 // Get all the commits for a specific arbitrator
239 Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
241 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
242 while (clientit->hasNext()) {
243 int64_t id = clientit->next();
244 delete commitForClientTable->get(id);
249 delete commitForClientTable;
252 delete liveCommitsTable;
254 delete liveCommitsByKeyTable;
255 delete lastCommitSeenSequenceNumberByArbitratorTable;
256 delete rejectedSlotVector;
258 uint size = pendingTransactionQueue->size();
259 for (uint iter = 0; iter < size; iter++) {
260 delete pendingTransactionQueue->get(iter);
262 delete pendingTransactionQueue;
264 delete pendingSendArbitrationEntriesToDelete;
266 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
267 while (trit->hasNext()) {
268 Transaction *transaction = trit->next();
269 delete trit->currVal();
272 delete transactionPartsSent;
274 delete outstandingTransactionStatus;
275 delete liveAbortsGeneratedByLocal;
276 delete offlineTransactionsCommittedAndAtServer;
277 delete localCommunicationTable;
278 delete lastTransactionSeenFromMachineFromServer;
280 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
281 delete pendingSendArbitrationRounds->get(i);
283 delete pendingSendArbitrationRounds;
285 if (lastTransactionPartsSent != NULL)
286 delete lastTransactionPartsSent;
287 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
293 * Init all the stuff needed for for table usage
296 // Init helper objects
297 random = new SecureRandom();
298 buffer = new SlotBuffer();
301 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
302 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
303 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
304 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
305 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
306 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
307 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
308 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
309 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
310 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
311 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
312 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
313 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
314 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
315 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
316 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
317 rejectedSlotVector = new Vector<int64_t>();
318 pendingTransactionQueue = new Vector<Transaction *>();
319 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
320 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
321 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
322 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
323 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
324 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
325 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
326 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
327 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
330 numberOfSlots = buffer->capacity();
331 setResizeThreshold();
335 * Initialize the table by inserting a table status as the first entry
336 * into the table status also initialize the crypto stuff.
338 void Table::initTable() {
339 cloud->initSecurity();
341 // Create the first insertion into the block chain which is the table status
342 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
343 localSequenceNumber++;
344 TableStatus *status = new TableStatus(s, numberOfSlots);
345 s->addShallowEntry(status);
346 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
349 array = new Array<Slot *>(1);
351 // update local block chain
352 validateAndUpdate(array, true);
354 } else if (array->length() == 1) {
355 // in case we did push the slot BUT we failed to init it
356 validateAndUpdate(array, true);
362 //throw new Error("Error on initialization");
363 myerror("Error on initialization\n");
368 * Rebuild the table from scratch by pulling the latest block chain
371 void Table::rebuild() {
372 // Just pull the latest slots from the server
373 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
374 validateAndUpdate(newslots, true);
377 updateLiveTransactionsAndStatus();
380 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
381 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
384 int64_t Table::getArbitrator(IoTString *key) {
385 return arbitratorTable->get(key);
388 void Table::close() {
392 IoTString *Table::getCommitted(IoTString *key) {
393 KeyValue *kv = committedKeyValueTable->get(key);
396 return new IoTString(kv->getValue());
402 IoTString *Table::getSpeculative(IoTString *key) {
403 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
406 kv = speculatedKeyValueTable->get(key);
410 kv = committedKeyValueTable->get(key);
414 return new IoTString(kv->getValue());
420 IoTString *Table::getCommittedAtomic(IoTString *key) {
421 KeyValue *kv = committedKeyValueTable->get(key);
423 if (!arbitratorTable->contains(key)) {
424 // throw new Error("Key not Found.");
425 myerror("Key not found!\n");
428 // Make sure new key value pair matches the current arbitrator
429 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
430 // TODO: Maybe not throw en error
431 //throw new Error("Not all Key Values Match Arbitrator.");
432 myerror("Not all key values match arbitrator\n");
436 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
437 return new IoTString(kv->getValue());
439 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
444 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
445 if (!arbitratorTable->contains(key)) {
446 //throw new Error("Key not Found.");
447 myerror("Key not found\n");
450 // Make sure new key value pair matches the current arbitrator
451 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
452 // TODO: Maybe not throw en error
453 //throw new Error("Not all Key Values Match Arbitrator.");
454 myerror("Not all Key Values Match Arbitrator.\n");
457 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
460 kv = speculatedKeyValueTable->get(key);
464 kv = committedKeyValueTable->get(key);
468 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
469 return new IoTString(kv->getValue());
471 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
476 bool Table::update() {
478 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
479 validateAndUpdate(newSlots, false);
482 updateLiveTransactionsAndStatus();
484 /* } catch (Exception *e) {
485 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
486 while (kit->hasNext()) {
487 int64_t m = kit->next();
496 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
498 if (arbitratorTable->contains(keyName)) {
499 // There is already an arbitrator
502 NewKey *newKey = new NewKey(NULL, keyName, machineId);
504 if (sendToServer(newKey)) {
505 // If successfully inserted
511 void Table::startTransaction() {
512 // Create a new transaction, invalidates any old pending transactions.
513 if (pendingTransactionBuilder != NULL)
514 delete pendingTransactionBuilder;
515 pendingTransactionBuilder = new PendingTransaction(localMachineId);
518 void Table::put(IoTString *key, IoTString *value) {
519 // Make sure it is a valid key
520 if (!arbitratorTable->contains(key)) {
521 //throw new Error("Key not Found.");
522 myerror("Key not Found.\n");
525 // Make sure new key value pair matches the current arbitrator
526 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
527 // TODO: Maybe not throw en error
528 //throw new Error("Not all Key Values Match Arbitrator.");
529 myerror("Not all Key Values Match Arbitrator.\n");
532 // Add the key value to this transaction
533 KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
534 pendingTransactionBuilder->addKV(kv);
537 TransactionStatus *Table::commitTransaction() {
538 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
539 // transaction with no updates will have no effect on the system
540 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
543 // Set the local transaction sequence number and increment
544 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
545 localTransactionSequenceNumber++;
547 // Create the transaction status
548 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
550 // Create the new transaction
551 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
552 newTransaction->setTransactionStatus(transactionStatus);
554 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
555 // Add it to the queue and invalidate the builder for safety
556 pendingTransactionQueue->add(newTransaction);
558 arbitrateOnLocalTransaction(newTransaction);
559 delete newTransaction;
560 updateLiveStateFromLocal();
562 if (pendingTransactionBuilder != NULL)
563 delete pendingTransactionBuilder;
565 pendingTransactionBuilder = new PendingTransaction(localMachineId);
569 /* } catch (ServerException *e) {
571 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
572 uint size = pendingTransactionQueue->size();
574 for (uint iter = 0; iter < size; iter++) {
575 Transaction *transaction = pendingTransactionQueue->get(iter);
576 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
578 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
579 // Already contacted this client so ignore all attempts to contact this client
580 // to preserve ordering for arbitrator
584 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
586 if (sendReturn.getFirst()) {
587 // Failed to contact over local
588 arbitratorTriedAndFailed->add(transaction->getArbitrator());
590 // Successful contact or should not contact
592 if (sendReturn.getSecond()) {
599 pendingTransactionQueue->setSize(oldindex);
602 updateLiveStateFromLocal();
604 return transactionStatus;
608 * Recalculate the new resize threshold
610 void Table::setResizeThreshold() {
611 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
612 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
615 int64_t Table::getLocalSequenceNumber() {
616 return localSequenceNumber;
619 void Table::processTransactionList(bool handlePartial) {
620 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *)getKeyIterator(lastTransactionPartsSent);
621 while (trit->hasNext()) {
622 Transaction *transaction = trit->next();
623 transaction->resetServerFailure();
624 // Update which transactions parts still need to be sent
625 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
626 // Add the transaction status to the outstanding list
627 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
629 // Update the transaction status
630 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
632 // Check if all the transaction parts were successfully
633 // sent and if so then remove it from pending
634 if (transaction->didSendAllParts()) {
635 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
636 pendingTransactionQueue->remove(transaction);
638 } else if (handlePartial) {
639 transaction->resetServerFailure();
640 // Set the transaction sequence number back to nothing
641 if (!transaction->didSendAPartToServer()) {
642 transaction->setSequenceNumber(-1);
649 NewKey * Table::handlePartialSend(NewKey * newKey) {
650 //Didn't receive acknowledgement for last send
651 //See if the server has received a newer slot
653 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
654 if (newSlots->length() == 0) {
655 //Retry sending old slot
656 bool wasInserted = false;
657 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
659 if (sendSlotsReturn) {
660 lastSlotAttemptedToSend = NULL;
661 if (newKey != NULL) {
662 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
667 processTransactionList(false);
669 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
670 if (newKey != NULL) {
671 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
676 processTransactionList(true);
680 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *)getKeyIterator(lastTransactionPartsSent);
681 while (trit->hasNext()) {
682 Transaction *transaction = trit->next();
683 transaction->resetServerFailure();
684 // Set the transaction sequence number back to nothing
685 if (!transaction->didSendAPartToServer()) {
686 transaction->setSequenceNumber(-1);
691 if (newSlots->length() != 0) {
692 // insert into the local block chain
693 validateAndUpdate(newSlots, true);
696 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
697 if (newKey != NULL) {
698 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
704 processTransactionList(true);
706 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(lastTransactionPartsSent);
707 while (trit->hasNext()) {
708 Transaction *transaction = trit->next();
709 transaction->resetServerFailure();
710 // Set the transaction sequence number back to nothing
711 if (!transaction->didSendAPartToServer()) {
712 transaction->setSequenceNumber(-1);
718 // insert into the local block chain
719 validateAndUpdate(newSlots, true);
725 void Table::clearSentParts() {
726 // Clear the sent data since we are trying again
727 pendingSendArbitrationEntriesToDelete->clear();
728 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
729 while (trit->hasNext()) {
730 Transaction *transaction = trit->next();
731 delete trit->currVal();
734 transactionPartsSent->clear();
737 bool Table::sendToServer(NewKey *newKey) {
738 if (hadPartialSendToServer) {
739 newKey = handlePartialSend(newKey);
743 // While we have stuff that needs inserting into the block chain
744 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
745 if (hadPartialSendToServer) {
746 // throw new Error("Should Be error free");
747 myerror("Should Be error free\n");
750 // If there is a new key with same name then end
751 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
757 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
758 localSequenceNumber++;
760 // Try to fill the slot with data
762 bool insertedNewKey = false;
763 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
766 // Reset which transaction to send
767 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
768 while (trit->hasNext()) {
769 Transaction *transaction = trit->next();
770 transaction->resetNextPartToSend();
772 // Set the transaction sequence number back to nothing
773 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
774 transaction->setSequenceNumber(-1);
779 // Clear the sent data since we are trying again
782 // We needed a resize so try again
783 fillSlot(slot, true, newKey, newSize, insertedNewKey);
785 if (lastSlotAttemptedToSend != NULL)
786 delete lastSlotAttemptedToSend;
788 lastSlotAttemptedToSend = slot;
789 lastIsNewKey = (newKey != NULL);
790 lastInsertedNewKey = insertedNewKey;
791 lastNewSize = newSize;
792 if (( newKey != lastNewKey) && (lastNewKey != NULL))
795 if (lastTransactionPartsSent != NULL)
796 delete lastTransactionPartsSent;
797 lastTransactionPartsSent = transactionPartsSent->clone();
799 Array<Slot *> * newSlots = NULL;
800 bool wasInserted = false;
801 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
803 if (sendSlotsReturn) {
804 lastSlotAttemptedToSend = NULL;
805 // Did insert into the block chain
806 if (insertedNewKey) {
807 // This slot was what was inserted not a previous slot
808 // New Key was successfully inserted into the block chain so dont want to insert it again
812 // Remove the aborts and commit parts that were sent from the pending to send queue
813 uint size = pendingSendArbitrationRounds->size();
815 for (uint i = 0; i < size; i++) {
816 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
817 round->removeParts(pendingSendArbitrationEntriesToDelete);
819 if (!round->isDoneSending()) {
821 pendingSendArbitrationRounds->set(oldcount++,
822 pendingSendArbitrationRounds->get(i));
824 delete pendingSendArbitrationRounds->get(i);
826 pendingSendArbitrationRounds->setSize(oldcount);
827 processTransactionList(false);
829 // Reset which transaction to send
830 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
831 while (trit->hasNext()) {
832 Transaction *transaction = trit->next();
833 transaction->resetNextPartToSend();
835 // Set the transaction sequence number back to nothing
836 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
837 transaction->setSequenceNumber(-1);
843 // Clear the sent data in preparation for next send
846 if (newSlots->length() != 0) {
847 // insert into the local block chain
848 validateAndUpdate(newSlots, true);
852 /* } catch (ServerException *e) {
853 if (e->getType() != ServerException_TypeInputTimeout) {
854 // Nothing was able to be sent to the server so just clear these data structures
855 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
856 while (trit->hasNext()) {
857 Transaction *transaction = trit->next();
858 transaction->resetNextPartToSend();
860 // Set the transaction sequence number back to nothing
861 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
862 transaction->setSequenceNumber(-1);
867 // There was a partial send to the server
868 hadPartialSendToServer = true;
870 // Nothing was able to be sent to the server so just clear these data structures
871 SetIterator<Transaction *, Vector<int> *> *trit = (SetIterator<Transaction *, Vector<int> *> *) getKeyIterator(transactionPartsSent);
872 while (trit->hasNext()) {
873 Transaction *transaction = trit->next();
874 transaction->resetNextPartToSend();
875 transaction->setServerFailure();
885 return newKey == NULL;
888 bool Table::updateFromLocal(int64_t machineId) {
889 if (!localCommunicationTable->contains(machineId))
892 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
894 // Get the size of the send data
895 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
897 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
898 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
899 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
902 Array<char> *sendData = new Array<char>(sendDataSize);
903 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
906 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
910 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
911 localSequenceNumber++;
913 if (returnData == NULL) {
914 // Could not contact server
919 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
920 int numberOfEntries = bbDecode->getInt();
922 for (int i = 0; i < numberOfEntries; i++) {
923 char type = bbDecode->get();
924 if (type == TypeAbort) {
925 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
927 } else if (type == TypeCommitPart) {
928 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
929 processEntry(commitPart);
933 updateLiveStateFromLocal();
938 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
940 // Get the devices local communications
941 if (!localCommunicationTable->contains(transaction->getArbitrator()))
942 return Pair<bool, bool>(true, false);
944 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
946 // Get the size of the send data
947 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
949 Vector<TransactionPart *> *tParts = transaction->getParts();
950 uint tPartsSize = tParts->size();
951 for (uint i = 0; i < tPartsSize; i++) {
952 TransactionPart *part = tParts->get(i);
953 sendDataSize += part->getSize();
957 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
958 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
959 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
962 // Make the send data size
963 Array<char> *sendData = new Array<char>(sendDataSize);
964 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
967 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
968 bbEncode->putInt(transaction->getParts()->size());
970 Vector<TransactionPart *> *tParts = transaction->getParts();
971 uint tPartsSize = tParts->size();
972 for (uint i = 0; i < tPartsSize; i++) {
973 TransactionPart *part = tParts->get(i);
974 part->encode(bbEncode);
979 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
980 localSequenceNumber++;
982 if (returnData == NULL) {
983 // Could not contact server
984 return Pair<bool, bool>(true, false);
988 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
989 bool didCommit = bbDecode->get() == 1;
990 bool couldArbitrate = bbDecode->get() == 1;
991 int numberOfEntries = bbDecode->getInt();
992 bool foundAbort = false;
994 for (int i = 0; i < numberOfEntries; i++) {
995 char type = bbDecode->get();
996 if (type == TypeAbort) {
997 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
999 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
1003 processEntry(abort);
1004 } else if (type == TypeCommitPart) {
1005 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1006 processEntry(commitPart);
1010 updateLiveStateFromLocal();
1012 if (couldArbitrate) {
1013 TransactionStatus *status = transaction->getTransactionStatus();
1015 status->setStatus(TransactionStatus_StatusCommitted);
1017 status->setStatus(TransactionStatus_StatusAborted);
1020 TransactionStatus *status = transaction->getTransactionStatus();
1022 status->setStatus(TransactionStatus_StatusAborted);
1024 status->setStatus(TransactionStatus_StatusCommitted);
1028 return Pair<bool, bool>(false, true);
1031 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1033 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1034 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1035 int numberOfParts = bbDecode->getInt();
1037 // If we did commit a transaction or not
1038 bool didCommit = false;
1039 bool couldArbitrate = false;
1041 if (numberOfParts != 0) {
1043 // decode the transaction
1044 Transaction *transaction = new Transaction();
1045 for (int i = 0; i < numberOfParts; i++) {
1047 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1048 transaction->addPartDecode(newPart);
1051 // Arbitrate on transaction and pull relevant return data
1052 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1053 couldArbitrate = localArbitrateReturn.getFirst();
1054 didCommit = localArbitrateReturn.getSecond();
1056 updateLiveStateFromLocal();
1058 // Transaction was sent to the server so keep track of it to prevent double commit
1059 if (transaction->getSequenceNumber() != -1) {
1060 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1064 // The data to send back
1065 int returnDataSize = 0;
1066 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1068 // Get the aborts to send back
1069 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1071 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1072 while (abortit->hasNext())
1073 abortLocalSequenceNumbers->add(abortit->next());
1077 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1079 uint asize = abortLocalSequenceNumbers->size();
1080 for (uint i = 0; i < asize; i++) {
1081 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1082 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1086 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1087 unseenArbitrations->add(abort);
1088 returnDataSize += abort->getSize();
1091 // Get the commits to send back
1092 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1093 if (commitForClientTable != NULL) {
1094 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1096 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1097 while (commitit->hasNext())
1098 commitLocalSequenceNumbers->add(commitit->next());
1101 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1103 uint clsSize = commitLocalSequenceNumbers->size();
1104 for (uint clsi = 0; clsi < clsSize; clsi++) {
1105 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1106 Commit *commit = commitForClientTable->get(localSequenceNumber);
1108 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1113 Vector<CommitPart *> *parts = commit->getParts();
1114 uint nParts = parts->size();
1115 for (uint i = 0; i < nParts; i++) {
1116 CommitPart *commitPart = parts->get(i);
1117 unseenArbitrations->add(commitPart);
1118 returnDataSize += commitPart->getSize();
1124 // Number of arbitration entries to decode
1125 returnDataSize += 2 * sizeof(int32_t);
1127 // bool of did commit or not
1128 if (numberOfParts != 0) {
1129 returnDataSize += sizeof(char);
1132 // Data to send Back
1133 Array<char> *returnData = new Array<char>(returnDataSize);
1134 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1136 if (numberOfParts != 0) {
1138 bbEncode->put((char)1);
1140 bbEncode->put((char)0);
1142 if (couldArbitrate) {
1143 bbEncode->put((char)1);
1145 bbEncode->put((char)0);
1149 bbEncode->putInt(unseenArbitrations->size());
1150 uint size = unseenArbitrations->size();
1151 for (uint i = 0; i < size; i++) {
1152 Entry *entry = unseenArbitrations->get(i);
1153 entry->encode(bbEncode);
1156 localSequenceNumber++;
1160 /** Checks whether a given slot was sent using new slots in
1161 array. Returns true if sent and false otherwise. */
1163 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1164 uint size = array->length();
1165 for (uint i = 0; i < size; i++) {
1166 Slot *s = array->get(i);
1167 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1172 //Also need to see if other machines acknowledged our message
1173 for (uint i = 0; i < size; i++) {
1174 Slot *s = array->get(i);
1176 // Process each entry in the slot
1177 Vector<Entry *> *entries = s->getEntries();
1178 uint eSize = entries->size();
1179 for (uint ei = 0; ei < eSize; ei++) {
1180 Entry *entry = entries->get(ei);
1182 if (entry->getType() == TypeLastMessage) {
1183 LastMessage *lastMessage = (LastMessage *)entry;
1185 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1195 /** Method tries to send slot to server. Returns status in tuple.
1196 isInserted returns whether last un-acked send (if any) was
1197 successful. Returns whether send was confirmed.x
1200 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1201 attemptedToSendToServer = true;
1203 *array = cloud->putSlot(slot, newSize);
1204 if (*array == NULL) {
1205 *array = new Array<Slot *>(1);
1206 (*array)->set(0, slot);
1207 rejectedSlotVector->clear();
1208 *isInserted = false;
1211 if ((*array)->length() == 0) {
1212 // throw new Error("Server Error: Did not send any slots");
1213 myerror("Server Error: Did not send any slots\n");
1216 if (hadPartialSendToServer) {
1217 *isInserted = checkSend(*array, slot);
1219 if (!(*isInserted)) {
1220 rejectedSlotVector->add(slot->getSequenceNumber());
1225 rejectedSlotVector->add(slot->getSequenceNumber());
1226 *isInserted = false;
1233 * Returns true if a resize was needed but not done.
1235 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1236 newSize = 0;//special value to indicate no resize
1237 if (liveSlotCount > bufferResizeThreshold) {
1238 resize = true;//Resize is forced
1242 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1243 TableStatus *status = new TableStatus(slot, newSize);
1244 slot->addShallowEntry(status);
1247 // Fill with rejected slots first before doing anything else
1248 doRejectedMessages(slot);
1250 // Do mandatory rescue of entries
1251 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1253 // Extract working variables
1254 bool needsResize = mandatoryRescueReturn.getFirst();
1255 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1256 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1258 if (needsResize && !resize) {
1259 // We need to resize but we are not resizing so return true to force on retry
1263 insertedKey = false;
1264 if (newKeyEntry != NULL) {
1265 newKeyEntry->setSlot(slot);
1266 if (slot->hasSpace(newKeyEntry)) {
1267 slot->addEntry(newKeyEntry);
1272 // Clear the transactions, aborts and commits that were sent previously
1274 uint size = pendingSendArbitrationRounds->size();
1275 for (uint i = 0; i < size; i++) {
1276 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1277 bool isFull = false;
1278 round->generateParts();
1279 Vector<Entry *> *parts = round->getParts();
1281 // Insert pending arbitration data
1282 uint vsize = parts->size();
1283 for (uint vi = 0; vi < vsize; vi++) {
1284 Entry *arbitrationData = parts->get(vi);
1286 // If it is an abort then we need to set some information
1287 if (arbitrationData->getType() == TypeAbort) {
1288 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1291 if (!slot->hasSpace(arbitrationData)) {
1292 // No space so cant do anything else with these data entries
1297 // Add to this current slot and add it to entries to delete
1298 slot->addEntry(arbitrationData);
1299 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1307 if (pendingTransactionQueue->size() > 0) {
1308 Transaction *transaction = pendingTransactionQueue->get(0);
1309 // Set the transaction sequence number if it has yet to be inserted into the block chain
1310 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1311 transaction->setSequenceNumber(slot->getSequenceNumber());
1315 TransactionPart *part = transaction->getNextPartToSend();
1317 // Ran out of parts to send for this transaction so move on
1321 if (slot->hasSpace(part)) {
1322 slot->addEntry(part);
1323 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1324 if (partsSent == NULL) {
1325 partsSent = new Vector<int32_t>();
1326 transactionPartsSent->put(transaction, partsSent);
1328 partsSent->add(part->getPartNumber());
1329 transactionPartsSent->put(transaction, partsSent);
1336 // Fill the remainder of the slot with rescue data
1337 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1342 void Table::doRejectedMessages(Slot *s) {
1343 if (!rejectedSlotVector->isEmpty()) {
1344 /* TODO: We should avoid generating a rejected message entry if
1345 * there is already a sufficient entry in the queue (e->g->,
1346 * equalsto value of true and same sequence number)-> */
1348 int64_t old_seqn = rejectedSlotVector->get(0);
1349 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1350 int64_t new_seqn = rejectedSlotVector->lastElement();
1351 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1352 s->addShallowEntry(rm);
1354 int64_t prev_seqn = -1;
1356 /* Go through list of missing messages */
1357 for (; i < rejectedSlotVector->size(); i++) {
1358 int64_t curr_seqn = rejectedSlotVector->get(i);
1359 Slot *s_msg = buffer->getSlot(curr_seqn);
1362 prev_seqn = curr_seqn;
1364 /* Generate rejected message entry for missing messages */
1365 if (prev_seqn != -1) {
1366 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1367 s->addShallowEntry(rm);
1369 /* Generate rejected message entries for present messages */
1370 for (; i < rejectedSlotVector->size(); i++) {
1371 int64_t curr_seqn = rejectedSlotVector->get(i);
1372 Slot *s_msg = buffer->getSlot(curr_seqn);
1373 int64_t machineid = s_msg->getMachineID();
1374 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1375 s->addShallowEntry(rm);
1381 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1382 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1383 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1384 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1385 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1388 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1389 bool seenLiveSlot = false;
1390 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1391 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1395 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1396 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1397 // Push slot number forward
1398 if (!seenLiveSlot) {
1399 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1402 if (!previousSlot->isLive()) {
1406 // We have seen a live slot
1407 seenLiveSlot = true;
1409 // Get all the live entries for a slot
1410 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1412 // Iterate over all the live entries and try to rescue them
1413 uint lESize = liveEntries->size();
1414 for (uint i = 0; i < lESize; i++) {
1415 Entry *liveEntry = liveEntries->get(i);
1416 if (slot->hasSpace(liveEntry)) {
1417 // Enough space to rescue the entry
1418 slot->addEntry(liveEntry);
1419 } else if (currentSequenceNumber == firstIfFull) {
1420 //if there's no space but the entry is about to fall off the queue
1421 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1427 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1430 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1431 /* now go through live entries from least to greatest sequence number until
1432 * either all live slots added, or the slot doesn't have enough room
1433 * for SKIP_THRESHOLD consecutive entries*/
1435 int64_t newestseqnum = buffer->getNewestSeqNum();
1436 for (; seqn <= newestseqnum; seqn++) {
1437 Slot *prevslot = buffer->getSlot(seqn);
1438 //Push slot number forward
1440 oldestLiveSlotSequenceNumver = seqn;
1442 if (!prevslot->isLive())
1444 seenliveslot = true;
1445 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1446 uint lESize = liveentries->size();
1447 for (uint i = 0; i < lESize; i++) {
1448 Entry *liveentry = liveentries->get(i);
1449 if (s->hasSpace(liveentry))
1450 s->addEntry(liveentry);
1453 if (skipcount > Table_SKIP_THRESHOLD) {
1466 * Checks for malicious activity and updates the local copy of the block chain->
1468 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1469 // The cloud communication layer has checked slot HMACs already
1471 if (newSlots->length() == 0) {
1475 // Make sure all slots are newer than the last largest slot this
1477 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1478 if (firstSeqNum <= sequenceNumber) {
1479 // throw new Error("Server Error: Sent older slots!");
1480 myerror("Server Error: Sent older slots!\n");
1483 // Create an object that can access both new slots and slots in our
1484 // local chain without committing slots to our local chain
1485 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1487 // Check that the HMAC chain is not broken
1488 checkHMACChain(indexer, newSlots);
1490 // Set to keep track of messages from clients
1491 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1493 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1494 while (lmit->hasNext())
1495 machineSet->add(lmit->next());
1499 // Process each slots data
1501 uint numSlots = newSlots->length();
1502 for (uint i = 0; i < numSlots; i++) {
1503 Slot *slot = newSlots->get(i);
1504 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1505 updateExpectedSize();
1510 // If there is a gap, check to see if the server sent us
1512 if (firstSeqNum != (sequenceNumber + 1)) {
1514 // Check the size of the slots that were sent down by the server->
1515 // Can only check the size if there was a gap
1516 checkNumSlots(newSlots->length());
1518 // Since there was a gap every machine must have pushed a slot or
1519 // must have a last message message-> If not then the server is
1521 if (!machineSet->isEmpty()) {
1523 //throw new Error("Missing record for machines: ");
1524 myerror("Missing record for machines: \n");
1528 // Update the size of our local block chain->
1531 // Commit new to slots to the local block chain->
1533 uint numSlots = newSlots->length();
1534 for (uint i = 0; i < numSlots; i++) {
1535 Slot *slot = newSlots->get(i);
1537 // Insert this slot into our local block chain copy->
1538 buffer->putSlot(slot);
1540 // Keep track of how many slots are currently live (have live data
1545 // Get the sequence number of the latest slot in the system
1546 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1547 updateLiveStateFromServer();
1549 // No Need to remember after we pulled from the server
1550 offlineTransactionsCommittedAndAtServer->clear();
1552 // This is invalidated now
1553 hadPartialSendToServer = false;
1556 void Table::updateLiveStateFromServer() {
1557 // Process the new transaction parts
1558 processNewTransactionParts();
1560 // Do arbitration on new transactions that were received
1561 arbitrateFromServer();
1563 // Update all the committed keys
1564 bool didCommitOrSpeculate = updateCommittedTable();
1566 // Delete the transactions that are now dead
1567 updateLiveTransactionsAndStatus();
1570 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1571 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1574 void Table::updateLiveStateFromLocal() {
1575 // Update all the committed keys
1576 bool didCommitOrSpeculate = updateCommittedTable();
1578 // Delete the transactions that are now dead
1579 updateLiveTransactionsAndStatus();
1582 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1583 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1586 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1587 int64_t prevslots = firstSequenceNumber;
1589 if (didFindTableStatus) {
1591 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1594 didFindTableStatus = true;
1595 currMaxSize = numberOfSlots;
1598 void Table::updateExpectedSize() {
1601 if (expectedsize > currMaxSize) {
1602 expectedsize = currMaxSize;
1608 * Check the size of the block chain to make sure there are enough
1609 * slots sent back by the server-> This is only called when we have a
1610 * gap between the slots that we have locally and the slots sent by
1611 * the server therefore in the slots sent by the server there will be
1612 * at least 1 Table status message
1614 void Table::checkNumSlots(int numberOfSlots) {
1615 if (numberOfSlots != expectedsize) {
1616 //throw new Error("Server Error: Server did not send all slots-> Expected: ");
1617 myerror("Server Error: Server did not send all slots-> Expected: \n");
1622 * Update the size of of the local buffer if it is needed->
1624 void Table::commitNewMaxSize() {
1625 didFindTableStatus = false;
1627 // Resize the local slot buffer
1628 if (numberOfSlots != currMaxSize) {
1629 buffer->resize((int32_t)currMaxSize);
1632 // Change the number of local slots to the new size
1633 numberOfSlots = (int32_t)currMaxSize;
1635 // Recalculate the resize threshold since the size of the local
1636 // buffer has changed
1637 setResizeThreshold();
1641 * Process the new transaction parts from this latest round of slots
1642 * received from the server
1644 void Table::processNewTransactionParts() {
1646 if (newTransactionParts->size() == 0) {
1647 // Nothing new to process
1651 // Iterate through all the machine Ids that we received new parts
1653 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1654 while (tpit->hasNext()) {
1655 int64_t machineId = tpit->next();
1656 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
1658 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1659 // Iterate through all the parts for that machine Id
1660 while (ptit->hasNext()) {
1661 Pair<int64_t, int32_t> *partId = ptit->next();
1662 TransactionPart *part = parts->get(partId);
1664 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1665 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1666 if (lastTransactionNumber >= part->getSequenceNumber()) {
1667 // Set dead the transaction part
1674 // Get the transaction object for that sequence number
1675 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1677 if (transaction == NULL) {
1678 // This is a new transaction that we dont have so make a new one
1679 transaction = new Transaction();
1681 // Add that part to the transaction
1682 transaction->addPartDecode(part);
1684 // Insert this new transaction into the live tables
1685 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1686 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1693 // Clear all the new transaction parts in preparation for the next
1694 // time the server sends slots
1696 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1697 while (partsit->hasNext()) {
1698 int64_t machineId = partsit->next();
1699 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1703 newTransactionParts->clear();
1707 void Table::arbitrateFromServer() {
1708 if (liveTransactionBySequenceNumberTable->size() == 0) {
1709 // Nothing to arbitrate on so move on
1713 // Get the transaction sequence numbers and sort from oldest to newest
1714 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1716 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1717 while (trit->hasNext())
1718 transactionSequenceNumbers->add(trit->next());
1721 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1723 // Collection of key value pairs that are
1724 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1726 // The last transaction arbitrated on
1727 int64_t lastTransactionCommitted = -1;
1728 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1729 uint tsnSize = transactionSequenceNumbers->size();
1730 for (uint i = 0; i < tsnSize; i++) {
1731 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1732 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1734 // Check if this machine arbitrates for this transaction if not
1735 // then we cant arbitrate this transaction
1736 if (transaction->getArbitrator() != localMachineId) {
1740 if (transactionSequenceNumber < lastSeqNumArbOn) {
1744 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1745 // We have seen this already locally so dont commit again
1749 if (!transaction->isComplete()) {
1750 // Will arbitrate in incorrect order if we continue so just break
1755 // update the largest transaction seen by arbitrator from server
1756 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1757 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1759 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1760 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1761 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1765 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1766 // Guard evaluated as true
1767 // Update the local changes so we can make the commit
1768 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1769 while (kvit->hasNext()) {
1770 KeyValue *kv = kvit->next();
1771 speculativeTableTmp->put(kv->getKey(), kv);
1775 // Update what the last transaction committed was for use in batch commit
1776 lastTransactionCommitted = transactionSequenceNumber;
1778 // Guard evaluated was false so create abort
1780 Abort *newAbort = new Abort(NULL,
1781 transaction->getClientLocalSequenceNumber(),
1782 transaction->getSequenceNumber(),
1783 transaction->getMachineId(),
1784 transaction->getArbitrator(),
1785 localArbitrationSequenceNumber);
1786 localArbitrationSequenceNumber++;
1787 generatedAborts->add(newAbort);
1789 // Insert the abort so we can process
1790 processEntry(newAbort);
1793 lastSeqNumArbOn = transactionSequenceNumber;
1796 delete transactionSequenceNumbers;
1798 Commit *newCommit = NULL;
1800 // If there is something to commit
1801 if (speculativeTableTmp->size() != 0) {
1802 // Create the commit and increment the commit sequence number
1803 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1804 localArbitrationSequenceNumber++;
1806 // Add all the new keys to the commit
1807 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1808 while (spit->hasNext()) {
1809 IoTString *string = spit->next();
1810 KeyValue *kv = speculativeTableTmp->get(string);
1811 newCommit->addKV(kv);
1815 // create the commit parts
1816 newCommit->createCommitParts();
1818 // Append all the commit parts to the end of the pending queue
1819 // waiting for sending to the server
1820 // Insert the commit so we can process it
1821 Vector<CommitPart *> *parts = newCommit->getParts();
1822 uint partsSize = parts->size();
1823 for (uint i = 0; i < partsSize; i++) {
1824 CommitPart *commitPart = parts->get(i);
1825 processEntry(commitPart);
1828 delete speculativeTableTmp;
1830 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1831 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1832 pendingSendArbitrationRounds->add(arbitrationRound);
1834 if (compactArbitrationData()) {
1835 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1836 if (newArbitrationRound->getCommit() != NULL) {
1837 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1838 uint partsSize = parts->size();
1839 for (uint i = 0; i < partsSize; i++) {
1840 CommitPart *commitPart = parts->get(i);
1841 processEntry(commitPart);
1846 delete generatedAborts;
1850 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1852 // Check if this machine arbitrates for this transaction if not then
1853 // we cant arbitrate this transaction
1854 if (transaction->getArbitrator() != localMachineId) {
1855 return Pair<bool, bool>(false, false);
1858 if (!transaction->isComplete()) {
1859 // Will arbitrate in incorrect order if we continue so just break
1861 return Pair<bool, bool>(false, false);
1864 if (transaction->getMachineId() != localMachineId) {
1865 // dont do this check for local transactions
1866 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1867 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1868 // We've have already seen this from the server
1869 return Pair<bool, bool>(false, false);
1874 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1875 // Guard evaluated as true Create the commit and increment the
1876 // commit sequence number
1877 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1878 localArbitrationSequenceNumber++;
1880 // Update the local changes so we can make the commit
1881 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1882 while (kvit->hasNext()) {
1883 KeyValue *kv = kvit->next();
1884 newCommit->addKV(kv);
1888 // create the commit parts
1889 newCommit->createCommitParts();
1891 // Append all the commit parts to the end of the pending queue
1892 // waiting for sending to the server
1893 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1894 pendingSendArbitrationRounds->add(arbitrationRound);
1896 if (compactArbitrationData()) {
1897 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1898 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1899 uint partsSize = parts->size();
1900 for (uint i = 0; i < partsSize; i++) {
1901 CommitPart *commitPart = parts->get(i);
1902 processEntry(commitPart);
1905 // Insert the commit so we can process it
1906 Vector<CommitPart *> *parts = newCommit->getParts();
1907 uint partsSize = parts->size();
1908 for (uint i = 0; i < partsSize; i++) {
1909 CommitPart *commitPart = parts->get(i);
1910 processEntry(commitPart);
1914 if (transaction->getMachineId() == localMachineId) {
1915 TransactionStatus *status = transaction->getTransactionStatus();
1916 if (status != NULL) {
1917 status->setStatus(TransactionStatus_StatusCommitted);
1921 updateLiveStateFromLocal();
1922 return Pair<bool, bool>(true, true);
1924 if (transaction->getMachineId() == localMachineId) {
1925 // For locally created messages update the status
1926 // Guard evaluated was false so create abort
1927 TransactionStatus *status = transaction->getTransactionStatus();
1928 if (status != NULL) {
1929 status->setStatus(TransactionStatus_StatusAborted);
1932 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1935 Abort *newAbort = new Abort(NULL,
1936 transaction->getClientLocalSequenceNumber(),
1938 transaction->getMachineId(),
1939 transaction->getArbitrator(),
1940 localArbitrationSequenceNumber);
1941 localArbitrationSequenceNumber++;
1942 addAbortSet->add(newAbort);
1944 // Append all the commit parts to the end of the pending queue
1945 // waiting for sending to the server
1946 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1947 pendingSendArbitrationRounds->add(arbitrationRound);
1949 if (compactArbitrationData()) {
1950 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1952 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1953 uint partsSize = parts->size();
1954 for (uint i = 0; i < partsSize; i++) {
1955 CommitPart *commitPart = parts->get(i);
1956 processEntry(commitPart);
1961 updateLiveStateFromLocal();
1962 return Pair<bool, bool>(true, false);
1967 * Compacts the arbitration data by merging commits and aggregating
1968 * aborts so that a single large push of commits can be done instead
1969 * of many small updates
1971 bool Table::compactArbitrationData() {
1972 if (pendingSendArbitrationRounds->size() < 2) {
1973 // Nothing to compact so do nothing
1977 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1978 if (lastRound->getDidSendPart()) {
1982 bool hadCommit = (lastRound->getCommit() == NULL);
1983 bool gotNewCommit = false;
1985 uint numberToDelete = 1;
1987 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1988 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1990 if (round->isFull() || round->getDidSendPart()) {
1991 // Stop since there is a part that cannot be compacted and we
1992 // need to compact in order
1996 if (round->getCommit() == NULL) {
1997 // Try compacting aborts only
1998 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1999 if (newSize > ArbitrationRound_MAX_PARTS) {
2000 // Cant compact since it would be too large
2003 lastRound->addAborts(round->getAborts());
2005 // Create a new larger commit
2006 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2007 localArbitrationSequenceNumber++;
2009 // Create the commit parts so that we can count them
2010 newCommit->createCommitParts();
2012 // Calculate the new size of the parts
2013 int newSize = newCommit->getNumberOfParts();
2014 newSize += lastRound->getAbortsCount();
2015 newSize += round->getAbortsCount();
2017 if (newSize > ArbitrationRound_MAX_PARTS) {
2018 // Can't compact since it would be too large
2019 if (lastRound->getCommit() != newCommit &&
2020 round->getCommit() != newCommit)
2024 // Set the new compacted part
2025 if (lastRound->getCommit() == newCommit)
2026 lastRound->setCommit(NULL);
2027 if (round->getCommit() == newCommit)
2028 round->setCommit(NULL);
2030 if (lastRound->getCommit() != NULL) {
2031 Commit * oldcommit = lastRound->getCommit();
2032 lastRound->setCommit(NULL);
2035 lastRound->setCommit(newCommit);
2036 lastRound->addAborts(round->getAborts());
2037 gotNewCommit = true;
2043 if (numberToDelete != 1) {
2044 // If there is a compaction
2045 // Delete the previous pieces that are now in the new compacted piece
2046 for (uint i = 2; i <= numberToDelete; i++) {
2047 delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2049 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2051 pendingSendArbitrationRounds->add(lastRound);
2053 // Should reinsert into the commit processor
2054 if (hadCommit && gotNewCommit) {
2063 * Update all the commits and the committed tables, sets dead the dead
2066 bool Table::updateCommittedTable() {
2067 if (newCommitParts->size() == 0) {
2068 // Nothing new to process
2072 // Iterate through all the machine Ids that we received new parts for
2073 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2074 while (partsit->hasNext()) {
2075 int64_t machineId = partsit->next();
2076 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2078 // Iterate through all the parts for that machine Id
2079 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2080 while (pairit->hasNext()) {
2081 Pair<int64_t, int32_t> *partId = pairit->next();
2082 CommitPart *part = pairit->currVal();
2084 // Get the transaction object for that sequence number
2085 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2087 if (commitForClientTable == NULL) {
2088 // This is the first commit from this device
2089 commitForClientTable = new Hashtable<int64_t, Commit *>();
2090 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2093 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2095 if (commit == NULL) {
2096 // This is a new commit that we dont have so make a new one
2097 commit = new Commit();
2099 // Insert this new commit into the live tables
2100 commitForClientTable->put(part->getSequenceNumber(), commit);
2103 // Add that part to the commit
2104 commit->addPartDecode(part);
2112 // Clear all the new commits parts in preparation for the next time
2113 // the server sends slots
2114 newCommitParts->clear();
2116 // If we process a new commit keep track of it for future use
2117 bool didProcessANewCommit = false;
2119 // Process the commits one by one
2120 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2121 while (liveit->hasNext()) {
2122 int64_t arbitratorId = liveit->next();
2123 // Get all the commits for a specific arbitrator
2124 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2126 // Sort the commits in order
2127 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2129 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2130 while (clientit->hasNext())
2131 commitSequenceNumbers->add(clientit->next());
2135 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2137 // Get the last commit seen from this arbitrator
2138 int64_t lastCommitSeenSequenceNumber = -1;
2139 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2140 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2143 // Go through each new commit one by one
2144 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2145 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2146 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2147 // Special processing if a commit is not complete
2148 if (!commit->isComplete()) {
2149 if (i == (commitSequenceNumbers->size() - 1)) {
2150 // If there is an incomplete commit and this commit is the
2151 // latest one seen then this commit cannot be processed and
2152 // there are no other commits
2155 // This is a commit that was already dead but parts of it
2156 // are still in the block chain (not flushed out yet)->
2157 // Delete it and move on
2159 commitForClientTable->remove(commit->getSequenceNumber());
2165 // Update the last transaction that was updated if we can
2166 if (commit->getTransactionSequenceNumber() != -1) {
2167 // Update the last transaction sequence number that the arbitrator arbitrated on1
2168 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2169 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2173 // Update the last arbitration data that we have seen so far
2174 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2175 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2176 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2178 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2181 // Never seen any data from this arbitrator so record the first one
2182 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2185 // We have already seen this commit before so need to do the
2186 // full processing on this commit
2187 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2188 // Update the last transaction that was updated if we can
2189 if (commit->getTransactionSequenceNumber() != -1) {
2190 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2191 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2192 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2193 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2199 // If we got here then this is a brand new commit and needs full
2201 // Get what commits should be edited, these are the commits that
2202 // have live values for their keys
2203 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2205 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2206 while (kvit->hasNext()) {
2207 KeyValue *kv = kvit->next();
2208 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2210 commitsToEdit->add(commit);
2215 // Update each previous commit that needs to be updated
2216 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2217 while (commitit->hasNext()) {
2218 Commit *previousCommit = commitit->next();
2220 // Only bother with live commits (TODO: Maybe remove this check)
2221 if (previousCommit->isLive()) {
2223 // Update which keys in the old commits are still live
2225 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2226 while (kvit->hasNext()) {
2227 KeyValue *kv = kvit->next();
2228 previousCommit->invalidateKey(kv->getKey());
2233 // if the commit is now dead then remove it
2234 if (!previousCommit->isLive()) {
2235 commitForClientTable->remove(previousCommit->getSequenceNumber());
2236 delete previousCommit;
2241 delete commitsToEdit;
2243 // Update the last seen sequence number from this arbitrator
2244 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2245 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2246 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2249 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2252 // We processed a new commit that we havent seen before
2253 didProcessANewCommit = true;
2255 // Update the committed table of keys and which commit is using which key
2257 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2258 while (kvit->hasNext()) {
2259 KeyValue *kv = kvit->next();
2260 committedKeyValueTable->put(kv->getKey(), kv);
2261 liveCommitsByKeyTable->put(kv->getKey(), commit);
2266 delete commitSequenceNumbers;
2270 return didProcessANewCommit;
2274 * Create the speculative table from transactions that are still live
2275 * and have come from the cloud
2277 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2278 if (liveTransactionBySequenceNumberTable->size() == 0) {
2279 // There is nothing to speculate on
2283 // Create a list of the transaction sequence numbers and sort them
2284 // from oldest to newest
2285 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2287 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2288 while (trit->hasNext())
2289 transactionSequenceNumbersSorted->add(trit->next());
2293 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2295 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2298 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2299 // If there is a gap in the transaction sequence numbers then
2300 // there was a commit or an abort of a transaction OR there was a
2301 // new commit (Could be from offline commit) so a redo the
2302 // speculation from scratch
2304 // Start from scratch
2305 speculatedKeyValueTable->clear();
2306 lastTransactionSequenceNumberSpeculatedOn = -1;
2307 oldestTransactionSequenceNumberSpeculatedOn = -1;
2310 // Remember the front of the transaction list
2311 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2313 // Find where to start arbitration from
2314 uint startIndex = 0;
2316 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2317 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2321 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2322 // Make sure we are not out of bounds
2323 delete transactionSequenceNumbersSorted;
2324 return false; // did not speculate
2327 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2328 bool didSkip = true;
2330 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2331 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2332 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2334 if (!transaction->isComplete()) {
2335 // If there is an incomplete transaction then there is nothing
2336 // we can do add this transactions arbitrator to the list of
2337 // arbitrators we should ignore
2338 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2343 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2347 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2349 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2350 // Guard evaluated to true so update the speculative table
2352 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2353 while (kvit->hasNext()) {
2354 KeyValue *kv = kvit->next();
2355 speculatedKeyValueTable->put(kv->getKey(), kv);
2362 delete transactionSequenceNumbersSorted;
2365 // Since there was a skip we need to redo the speculation next time around
2366 lastTransactionSequenceNumberSpeculatedOn = -1;
2367 oldestTransactionSequenceNumberSpeculatedOn = -1;
2370 // We did some speculation
2375 * Create the pending transaction speculative table from transactions
2376 * that are still in the pending transaction buffer
2378 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2379 if (pendingTransactionQueue->size() == 0) {
2380 // There is nothing to speculate on
2384 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2385 // need to reset on the pending speculation
2386 lastPendingTransactionSpeculatedOn = NULL;
2387 firstPendingTransaction = pendingTransactionQueue->get(0);
2388 pendingTransactionSpeculatedKeyValueTable->clear();
2391 // Find where to start arbitration from
2392 uint startIndex = 0;
2394 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2395 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2398 if (startIndex >= pendingTransactionQueue->size()) {
2399 // Make sure we are not out of bounds
2403 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2404 Transaction *transaction = pendingTransactionQueue->get(i);
2406 lastPendingTransactionSpeculatedOn = transaction;
2408 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2409 // Guard evaluated to true so update the speculative table
2410 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2411 while (kvit->hasNext()) {
2412 KeyValue *kv = kvit->next();
2413 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2421 * Set dead and remove from the live transaction tables the
2422 * transactions that are dead
2424 void Table::updateLiveTransactionsAndStatus() {
2425 // Go through each of the transactions
2427 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2428 while (iter->hasNext()) {
2429 int64_t key = iter->next();
2430 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2432 // Check if the transaction is dead
2433 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2434 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2435 // Set dead the transaction
2436 transaction->setDead();
2438 // Remove the transaction from the live table
2440 liveTransactionByTransactionIdTable->remove(transaction->getId());
2447 // Go through each of the transactions
2449 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2450 while (iter->hasNext()) {
2451 int64_t key = iter->next();
2452 TransactionStatus *status = outstandingTransactionStatus->get(key);
2454 // Check if the transaction is dead
2455 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2456 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2458 status->setStatus(TransactionStatus_StatusCommitted);
2469 * Process this slot, entry by entry-> Also update the latest message sent by slot
2471 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2473 // Update the last message seen
2474 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2476 // Process each entry in the slot
2477 Vector<Entry *> *entries = slot->getEntries();
2478 uint eSize = entries->size();
2479 for (uint ei = 0; ei < eSize; ei++) {
2480 Entry *entry = entries->get(ei);
2481 switch (entry->getType()) {
2482 case TypeCommitPart:
2483 processEntry((CommitPart *)entry);
2486 processEntry((Abort *)entry);
2488 case TypeTransactionPart:
2489 processEntry((TransactionPart *)entry);
2492 processEntry((NewKey *)entry);
2494 case TypeLastMessage:
2495 processEntry((LastMessage *)entry, machineSet);
2497 case TypeRejectedMessage:
2498 processEntry((RejectedMessage *)entry, indexer);
2500 case TypeTableStatus:
2501 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2504 //throw new Error("Unrecognized type: ");
2505 myerror("Unrecognized type: \n");
2511 * Update the last message that was sent for a machine Id
2513 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2514 // Update what the last message received by a machine was
2515 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2519 * Add the new key to the arbitrators table and update the set of live
2520 * new keys (in case of a rescued new key message)
2522 void Table::processEntry(NewKey *entry) {
2523 // Update the arbitrator table with the new key information
2524 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2526 // Update what the latest live new key is
2527 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2528 if (oldNewKey != NULL) {
2529 // Delete the old new key messages
2530 oldNewKey->setDead();
2535 * Process new table status entries and set dead the old ones as new
2536 * ones come in-> keeps track of the largest and smallest table status
2537 * seen in this current round of updating the local copy of the block
2540 void Table::processEntry(TableStatus *entry, int64_t seq) {
2541 int newNumSlots = entry->getMaxSlots();
2542 updateCurrMaxSize(newNumSlots);
2543 initExpectedSize(seq, newNumSlots);
2545 if (liveTableStatus != NULL) {
2546 // We have a larger table status so the old table status is no
2548 liveTableStatus->setDead();
2551 // Make this new table status the latest alive table status
2552 liveTableStatus = entry;
2556 * Check old messages to see if there is a block chain violation->
2559 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2560 int64_t oldSeqNum = entry->getOldSeqNum();
2561 int64_t newSeqNum = entry->getNewSeqNum();
2562 bool isequal = entry->getEqual();
2563 int64_t machineId = entry->getMachineID();
2564 int64_t seq = entry->getSequenceNumber();
2566 // Check if we have messages that were supposed to be rejected in
2567 // our local block chain
2568 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2570 Slot *slot = indexer->getSlot(seqNum);
2573 // If we have this slot make sure that it was not supposed to be
2575 int64_t slotMachineId = slot->getMachineID();
2576 if (isequal != (slotMachineId == machineId)) {
2577 //throw new Error("Server Error: Trying to insert rejected message for slot ");
2578 myerror("Server Error: Trying to insert rejected message for slot\n");
2583 // Create a list of clients to watch until they see this rejected
2585 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2586 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2587 while (iter->hasNext()) {
2588 // Machine ID for the last message entry
2589 int64_t lastMessageEntryMachineId = iter->next();
2591 // We've seen it, don't need to continue to watch-> Our next
2592 // message will implicitly acknowledge it->
2593 if (lastMessageEntryMachineId == localMachineId) {
2597 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2598 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2600 if (entrySequenceNumber < seq) {
2601 // Add this rejected message to the set of messages that this
2602 // machine ID did not see yet
2603 addWatchVector(lastMessageEntryMachineId, entry);
2604 // This client did not see this rejected message yet so add it
2605 // to the watch set to monitor
2606 deviceWatchSet->add(lastMessageEntryMachineId);
2611 if (deviceWatchSet->isEmpty()) {
2612 // This rejected message has been seen by all the clients so
2614 delete deviceWatchSet;
2616 // We need to watch this rejected message
2617 entry->setWatchSet(deviceWatchSet);
2622 * Check if this abort is live, if not then save it so we can kill it
2623 * later-> update the last transaction number that was arbitrated on->
2625 void Table::processEntry(Abort *entry) {
2626 if (entry->getTransactionSequenceNumber() != -1) {
2627 // update the transaction status if it was sent to the server
2628 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2629 if (status != NULL) {
2630 status->setStatus(TransactionStatus_StatusAborted);
2634 // Abort has not been seen by the client it is for yet so we need to
2637 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2638 if (previouslySeenAbort != NULL) {
2639 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2642 if (entry->getTransactionArbitrator() == localMachineId) {
2643 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2646 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2647 // The machine already saw this so it is dead
2649 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2650 liveAbortTable->remove(&abortid);
2652 if (entry->getTransactionArbitrator() == localMachineId) {
2653 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2658 // Update the last arbitration data that we have seen so far
2659 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2660 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2661 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2663 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2666 // Never seen any data from this arbitrator so record the first one
2667 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2670 // Set dead a transaction if we can
2671 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2673 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2674 if (transactionToSetDead != NULL) {
2675 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2678 // Update the last transaction sequence number that the arbitrator
2680 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2681 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2683 if (entry->getTransactionSequenceNumber() != -1) {
2684 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2690 * Set dead the transaction part if that transaction is dead and keep
2691 * track of all new parts
2693 void Table::processEntry(TransactionPart *entry) {
2694 // Check if we have already seen this transaction and set it dead OR
2695 // if it is not alive
2696 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2697 // This transaction is dead, it was already committed or aborted
2702 // This part is still alive
2703 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2705 if (transactionPart == NULL) {
2706 // Dont have a table for this machine Id yet so make one
2707 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2708 newTransactionParts->put(entry->getMachineId(), transactionPart);
2711 // Update the part and set dead ones we have already seen (got a
2713 entry->acquireRef();
2714 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2715 if (previouslySeenPart != NULL) {
2716 previouslySeenPart->releaseRef();
2717 previouslySeenPart->setDead();
2722 * Process new commit entries and save them for future use-> Delete duplicates
2724 void Table::processEntry(CommitPart *entry) {
2725 // Update the last transaction that was updated if we can
2726 if (entry->getTransactionSequenceNumber() != -1) {
2727 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2728 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2729 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2733 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2734 if (commitPart == NULL) {
2735 // Don't have a table for this machine Id yet so make one
2736 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2737 newCommitParts->put(entry->getMachineId(), commitPart);
2739 // Update the part and set dead ones we have already seen (got a
2741 entry->acquireRef();
2742 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2743 if (previouslySeenPart != NULL) {
2744 previouslySeenPart->setDead();
2745 previouslySeenPart->releaseRef();
2750 * Update the last message seen table-> Update and set dead the
2751 * appropriate RejectedMessages as clients see them-> Updates the live
2752 * aborts, removes those that are dead and sets them dead-> Check that
2753 * the last message seen is correct and that there is no mismatch of
2754 * our own last message or that other clients have not had a rollback
2755 * on the last message->
2757 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2758 // We have seen this machine ID
2759 machineSet->remove(machineId);
2761 // Get the set of rejected messages that this machine Id is has not seen yet
2762 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2763 // If there is a rejected message that this machine Id has not seen yet
2764 if (watchset != NULL) {
2765 // Go through each rejected message that this machine Id has not
2768 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2769 while (rmit->hasNext()) {
2770 RejectedMessage *rm = rmit->next();
2771 // If this machine Id has seen this rejected message->->->
2772 if (rm->getSequenceNumber() <= seqNum) {
2773 // Remove it from our watchlist
2775 // Decrement machines that need to see this notification
2776 rm->removeWatcher(machineId);
2782 // Set dead the abort
2783 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2785 while (abortit->hasNext()) {
2786 Pair<int64_t, int64_t> *key = abortit->next();
2787 Abort *abort = liveAbortTable->get(key);
2788 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2791 if (abort->getTransactionArbitrator() == localMachineId) {
2792 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2797 if (machineId == localMachineId) {
2798 // Our own messages are immediately dead->
2799 char livenessType = liveness->getType();
2800 if (livenessType == TypeLastMessage) {
2801 ((LastMessage *)liveness)->setDead();
2802 } else if (livenessType == TypeSlot) {
2803 ((Slot *)liveness)->setDead();
2805 //throw new Error("Unrecognized type");
2806 myerror("Unrecognized type\n");
2809 // Get the old last message for this device
2810 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2811 if (lastMessageEntry == NULL) {
2812 // If no last message then there is nothing else to process
2816 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2817 Liveness *lastEntry = lastMessageEntry->getSecond();
2818 delete lastMessageEntry;
2820 // If it is not our machine Id since we already set ours to dead
2821 if (machineId != localMachineId) {
2822 char lastEntryType = lastEntry->getType();
2824 if (lastEntryType == TypeLastMessage) {
2825 ((LastMessage *)lastEntry)->setDead();
2826 } else if (lastEntryType == TypeSlot) {
2827 ((Slot *)lastEntry)->setDead();
2829 //throw new Error("Unrecognized type");
2830 myerror("Unrecognized type\n");
2833 // Make sure the server is not playing any games
2834 if (machineId == localMachineId) {
2835 if (hadPartialSendToServer) {
2836 // We were not making any updates and we had a machine mismatch
2837 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2838 //throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2839 myerror("Server Error: Mismatch on local machine sequence number, needed at least: \n");
2842 // We were not making any updates and we had a machine mismatch
2843 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2844 //throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2845 myerror("Server Error: Mismatch on local machine sequence number, needed: \n");
2849 if (lastMessageSeqNum > seqNum) {
2850 //throw new Error("Server Error: Rollback on remote machine sequence number");
2851 myerror("Server Error: Rollback on remote machine sequence number\n");
2857 * Add a rejected message entry to the watch set to keep track of
2858 * which clients have seen that rejected message entry and which have
2861 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2862 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2863 if (entries == NULL) {
2864 // There is no set for this machine ID yet so create one
2865 entries = new Hashset<RejectedMessage *>();
2866 rejectedMessageWatchVectorTable->put(machineId, entries);
2868 entries->add(entry);
2872 * Check if the HMAC chain is not violated
2874 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2875 for (uint i = 0; i < newSlots->length(); i++) {
2876 Slot *currSlot = newSlots->get(i);
2877 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2878 if (prevSlot != NULL &&
2879 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2880 // throw new Error("Server Error: Invalid HMAC Chain");
2881 myerror("Server Error: Invalid HMAC Chain\n");