3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(1),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(1),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchMyVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotMyVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localSequenceNumber(0),
112 localTransactionSequenceNumber(1),
113 lastTransactionSequenceNumberSpeculatedOn(0),
114 oldestTransactionSequenceNumberSpeculatedOn(0),
115 localArbitrationSequenceNumber(1),
116 hadPartialSendToServer(false),
117 attemptedToSendToServer(false),
119 didFindTableStatus(false),
121 lastSlotAttemptedToSend(NULL),
124 lastTransactionPartsSent(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchMyVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotMyVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
164 delete committedKeyValueTable;
165 delete speculatedKeyValueTable;
166 delete pendingTransactionSpeculatedKeyValueTable;
167 delete liveNewKeyTable;
169 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170 while (lmit->hasNext()) {
171 Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
175 delete lastMessageTable;
177 if (pendingTransactionBuilder != NULL)
178 delete pendingTransactionBuilder;
180 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchMyVectorTable);
181 while(rmit->hasNext()) {
182 int64_t machineid = rmit->next();
183 Hashset<RejectedMessage *> * rmset = rejectedMessageWatchMyVectorTable->get(machineid);
184 SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185 while (mit->hasNext()) {
186 RejectedMessage * rm = mit->next();
193 delete rejectedMessageWatchMyVectorTable;
195 delete arbitratorTable;
196 delete liveAbortTable;
198 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199 while (partsit->hasNext()) {
200 int64_t machineId = partsit->next();
201 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
203 while(pit->hasNext()) {
204 Pair<int64_t, int32_t> * pair=pit->next();
205 pit->currVal()->releaseRef();
212 delete newTransactionParts;
215 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
216 while (partsit->hasNext()) {
217 int64_t machineId = partsit->next();
218 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
219 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
220 while(pit->hasNext()) {
221 Pair<int64_t, int32_t> * pair=pit->next();
222 pit->currVal()->releaseRef();
228 delete newCommitParts;
230 delete lastArbitratedTransactionNumberByArbitratorTable;
231 delete liveTransactionBySequenceNumberTable;
232 delete liveTransactionByTransactionIdTable;
234 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
235 while (liveit->hasNext()) {
236 int64_t arbitratorId = liveit->next();
238 // Get all the commits for a specific arbitrator
239 Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
241 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
242 while (clientit->hasNext()) {
243 int64_t id = clientit->next();
244 delete commitForClientTable->get(id);
249 delete commitForClientTable;
252 delete liveCommitsTable;
254 delete liveCommitsByKeyTable;
255 delete lastCommitSeenSequenceNumberByArbitratorTable;
256 delete rejectedSlotMyVector;
258 uint size = pendingTransactionQueue->size();
259 for (uint iter = 0; iter < size; iter++) {
260 delete pendingTransactionQueue->get(iter);
262 delete pendingTransactionQueue;
264 delete pendingSendArbitrationEntriesToDelete;
266 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
267 while (trit->hasNext()) {
268 Transaction *transaction = trit->next();
269 delete trit->currVal();
272 delete transactionPartsSent;
274 delete outstandingTransactionStatus;
275 delete liveAbortsGeneratedByLocal;
276 delete offlineTransactionsCommittedAndAtServer;
277 delete localCommunicationTable;
278 delete lastTransactionSeenFromMachineFromServer;
280 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
281 delete pendingSendArbitrationRounds->get(i);
283 delete pendingSendArbitrationRounds;
285 if (lastTransactionPartsSent != NULL)
286 delete lastTransactionPartsSent;
287 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
293 * Init all the stuff needed for for table usage
296 // Init helper objects
297 random = new SecureRandom();
298 buffer = new SlotBuffer();
301 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
302 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
303 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
304 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
305 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
306 rejectedMessageWatchMyVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
307 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
308 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
309 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
310 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
311 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
312 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
313 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
314 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
315 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
316 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
317 rejectedSlotMyVector = new MyVector<int64_t>();
318 pendingTransactionQueue = new MyVector<Transaction *>();
319 pendingSendArbitrationEntriesToDelete = new MyVector<Entry *>();
320 transactionPartsSent = new Hashtable<Transaction *, MyVector<int32_t> *>();
321 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
322 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
323 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
324 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
325 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
326 pendingSendArbitrationRounds = new MyVector<ArbitrationRound *>();
327 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
330 numberOfSlots = buffer->capacity();
331 setResizeThreshold();
335 * Initialize the table by inserting a table status as the first entry
336 * into the table status also initialize the crypto stuff.
338 void Table::initTable() {
339 cloud->initSecurity();
340 // Create the first insertion into the block chain which is the table status
341 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
342 localSequenceNumber++;
343 TableStatus *status = new TableStatus(s, numberOfSlots);
344 s->addShallowEntry(status);
345 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
347 array = new Array<Slot *>(1);
349 // update local block chain
350 validateAndUpdate(array, true);
352 } else if (array->length() == 1) {
353 // in case we did push the slot BUT we failed to init it
354 validateAndUpdate(array, true);
360 //throw new Error("Error on initialization");
361 myerror("Error on initialization\n");
366 * Rebuild the table from scratch by pulling the latest block chain
369 void Table::rebuild() {
370 // Just pull the latest slots from the server
371 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
372 validateAndUpdate(newslots, true);
375 updateLiveTransactionsAndStatus();
378 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
379 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
382 int64_t Table::getArbitrator(IoTString *key) {
383 return arbitratorTable->get(key);
386 void Table::close() {
390 IoTString *Table::getCommitted(IoTString *key) {
391 KeyValue *kv = committedKeyValueTable->get(key);
394 return kv->getValue()->acquireRef();
400 IoTString *Table::getSpeculative(IoTString *key) {
401 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
404 kv = speculatedKeyValueTable->get(key);
408 kv = committedKeyValueTable->get(key);
412 return kv->getValue()->acquireRef();
418 IoTString *Table::getCommittedAtomic(IoTString *key) {
419 KeyValue *kv = committedKeyValueTable->get(key);
421 if (!arbitratorTable->contains(key)) {
422 // throw new Error("Key not Found.");
423 myerror("Key not found!\n");
426 // Make sure new key value pair matches the current arbitrator
427 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
428 // TODO: Maybe not throw en error
429 //throw new Error("Not all Key Values Match Arbitrator.");
430 myerror("Not all key values match arbitrator\n");
434 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
435 return kv->getValue()->acquireRef();
437 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
442 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
443 if (!arbitratorTable->contains(key)) {
444 //throw new Error("Key not Found.");
445 myerror("Key not found\n");
448 // Make sure new key value pair matches the current arbitrator
449 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
450 // TODO: Maybe not throw en error
451 //throw new Error("Not all Key Values Match Arbitrator.");
452 myerror("Not all Key Values Match Arbitrator.\n");
455 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
458 kv = speculatedKeyValueTable->get(key);
462 kv = committedKeyValueTable->get(key);
466 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
467 return kv->getValue()->acquireRef();
469 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
474 bool Table::update() {
476 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
477 validateAndUpdate(newSlots, false);
480 updateLiveTransactionsAndStatus();
482 /* } catch (Exception *e) {
483 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
484 while (kit->hasNext()) {
485 int64_t m = kit->next();
494 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
496 if (arbitratorTable->contains(keyName)) {
497 // There is already an arbitrator
500 NewKey *newKey = new NewKey(NULL, keyName, machineId);
502 if (sendToServer(newKey)) {
503 // If successfully inserted
509 void Table::startTransaction() {
510 // Create a new transaction, invalidates any old pending transactions.
511 if (pendingTransactionBuilder != NULL)
512 delete pendingTransactionBuilder;
513 pendingTransactionBuilder = new PendingTransaction(localMachineId);
516 void Table::put(IoTString *key, IoTString *value) {
517 // Make sure it is a valid key
518 if (!arbitratorTable->contains(key)) {
519 //throw new Error("Key not Found.");
520 myerror("Key not Found.\n");
523 // Make sure new key value pair matches the current arbitrator
524 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
525 // TODO: Maybe not throw en error
526 //throw new Error("Not all Key Values Match Arbitrator.");
527 myerror("Not all Key Values Match Arbitrator.\n");
530 // Add the key value to this transaction
531 KeyValue *kv = new KeyValue(key->acquireRef(), value->acquireRef());
532 pendingTransactionBuilder->addKV(kv);
535 TransactionStatus *Table::commitTransaction() {
536 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
537 // transaction with no updates will have no effect on the system
538 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
541 // Set the local transaction sequence number and increment
542 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
543 localTransactionSequenceNumber++;
545 // Create the transaction status
546 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
548 // Create the new transaction
549 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
550 newTransaction->setTransactionStatus(transactionStatus);
552 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
553 // Add it to the queue and invalidate the builder for safety
554 pendingTransactionQueue->add(newTransaction);
556 arbitrateOnLocalTransaction(newTransaction);
557 delete newTransaction;
558 updateLiveStateFromLocal();
560 if (pendingTransactionBuilder != NULL)
561 delete pendingTransactionBuilder;
563 pendingTransactionBuilder = new PendingTransaction(localMachineId);
567 /* } catch (ServerException *e) {
569 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
570 uint size = pendingTransactionQueue->size();
572 for (uint iter = 0; iter < size; iter++) {
573 Transaction *transaction = pendingTransactionQueue->get(iter);
574 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
576 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
577 // Already contacted this client so ignore all attempts to contact this client
578 // to preserve ordering for arbitrator
582 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
584 if (sendReturn.getFirst()) {
585 // Failed to contact over local
586 arbitratorTriedAndFailed->add(transaction->getArbitrator());
588 // Successful contact or should not contact
590 if (sendReturn.getSecond()) {
597 pendingTransactionQueue->setSize(oldindex);
600 updateLiveStateFromLocal();
602 return transactionStatus;
606 * Recalculate the new resize threshold
608 void Table::setResizeThreshold() {
609 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
610 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
613 int64_t Table::getLocalSequenceNumber() {
614 return localSequenceNumber;
617 void Table::processTransactionList(bool handlePartial) {
618 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *)getKeyIterator(lastTransactionPartsSent);
619 while (trit->hasNext()) {
620 Transaction *transaction = trit->next();
621 transaction->resetServerFailure();
622 // Update which transactions parts still need to be sent
623 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
624 // Add the transaction status to the outstanding list
625 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
627 // Update the transaction status
628 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
630 // Check if all the transaction parts were successfully
631 // sent and if so then remove it from pending
632 if (transaction->didSendAllParts()) {
633 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
634 pendingTransactionQueue->remove(transaction);
636 } else if (handlePartial) {
637 transaction->resetServerFailure();
638 // Set the transaction sequence number back to nothing
639 if (!transaction->didSendAPartToServer()) {
640 transaction->setSequenceNumber(-1);
647 NewKey * Table::handlePartialSend(NewKey * newKey) {
648 //Didn't receive acknowledgement for last send
649 //See if the server has received a newer slot
651 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
652 if (newSlots->length() == 0) {
653 //Retry sending old slot
654 bool wasInserted = false;
655 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
657 if (sendSlotsReturn) {
658 lastSlotAttemptedToSend = NULL;
659 if (newKey != NULL) {
660 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
665 processTransactionList(false);
667 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
668 if (newKey != NULL) {
669 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
674 processTransactionList(true);
678 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *)getKeyIterator(lastTransactionPartsSent);
679 while (trit->hasNext()) {
680 Transaction *transaction = trit->next();
681 transaction->resetServerFailure();
682 // Set the transaction sequence number back to nothing
683 if (!transaction->didSendAPartToServer()) {
684 transaction->setSequenceNumber(-1);
689 if (newSlots->length() != 0) {
690 // insert into the local block chain
691 validateAndUpdate(newSlots, true);
694 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
695 if (newKey != NULL) {
696 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
702 processTransactionList(true);
704 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(lastTransactionPartsSent);
705 while (trit->hasNext()) {
706 Transaction *transaction = trit->next();
707 transaction->resetServerFailure();
708 // Set the transaction sequence number back to nothing
709 if (!transaction->didSendAPartToServer()) {
710 transaction->setSequenceNumber(-1);
716 // insert into the local block chain
717 validateAndUpdate(newSlots, true);
723 void Table::clearSentParts() {
724 // Clear the sent data since we are trying again
725 pendingSendArbitrationEntriesToDelete->clear();
726 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
727 while (trit->hasNext()) {
728 Transaction *transaction = trit->next();
729 delete trit->currVal();
732 transactionPartsSent->clear();
735 bool Table::sendToServer(NewKey *newKey) {
736 if (hadPartialSendToServer) {
737 newKey = handlePartialSend(newKey);
741 // While we have stuff that needs inserting into the block chain
742 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
743 if (hadPartialSendToServer) {
744 // throw new Error("Should Be error free");
745 myerror("Should Be error free\n");
748 // If there is a new key with same name then end
749 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
755 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
756 localSequenceNumber++;
758 // Try to fill the slot with data
760 bool insertedNewKey = false;
761 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
764 // Reset which transaction to send
765 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
766 while (trit->hasNext()) {
767 Transaction *transaction = trit->next();
768 transaction->resetNextPartToSend();
770 // Set the transaction sequence number back to nothing
771 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772 transaction->setSequenceNumber(-1);
777 // Clear the sent data since we are trying again
780 // We needed a resize so try again
781 fillSlot(slot, true, newKey, newSize, insertedNewKey);
783 if (lastSlotAttemptedToSend != NULL)
784 delete lastSlotAttemptedToSend;
786 lastSlotAttemptedToSend = slot;
787 lastIsNewKey = (newKey != NULL);
788 lastInsertedNewKey = insertedNewKey;
789 lastNewSize = newSize;
790 if (( newKey != lastNewKey) && (lastNewKey != NULL))
793 if (lastTransactionPartsSent != NULL)
794 delete lastTransactionPartsSent;
795 lastTransactionPartsSent = transactionPartsSent->clone();
797 Array<Slot *> * newSlots = NULL;
798 bool wasInserted = false;
799 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
801 if (sendSlotsReturn) {
802 lastSlotAttemptedToSend = NULL;
803 // Did insert into the block chain
804 if (insertedNewKey) {
805 // This slot was what was inserted not a previous slot
806 // New Key was successfully inserted into the block chain so dont want to insert it again
810 // Remove the aborts and commit parts that were sent from the pending to send queue
811 uint size = pendingSendArbitrationRounds->size();
813 for (uint i = 0; i < size; i++) {
814 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
815 round->removeParts(pendingSendArbitrationEntriesToDelete);
817 if (!round->isDoneSending()) {
819 pendingSendArbitrationRounds->set(oldcount++,
820 pendingSendArbitrationRounds->get(i));
822 delete pendingSendArbitrationRounds->get(i);
824 pendingSendArbitrationRounds->setSize(oldcount);
825 processTransactionList(false);
827 // Reset which transaction to send
828 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
829 while (trit->hasNext()) {
830 Transaction *transaction = trit->next();
831 transaction->resetNextPartToSend();
833 // Set the transaction sequence number back to nothing
834 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
835 transaction->setSequenceNumber(-1);
841 // Clear the sent data in preparation for next send
844 if (newSlots->length() != 0) {
845 // insert into the local block chain
846 validateAndUpdate(newSlots, true);
850 /* } catch (ServerException *e) {
851 if (e->getType() != ServerException_TypeInputTimeout) {
852 // Nothing was able to be sent to the server so just clear these data structures
853 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
854 while (trit->hasNext()) {
855 Transaction *transaction = trit->next();
856 transaction->resetNextPartToSend();
858 // Set the transaction sequence number back to nothing
859 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
860 transaction->setSequenceNumber(-1);
865 // There was a partial send to the server
866 hadPartialSendToServer = true;
868 // Nothing was able to be sent to the server so just clear these data structures
869 SetIterator<Transaction *, MyVector<int> *> *trit = (SetIterator<Transaction *, MyVector<int> *> *) getKeyIterator(transactionPartsSent);
870 while (trit->hasNext()) {
871 Transaction *transaction = trit->next();
872 transaction->resetNextPartToSend();
873 transaction->setServerFailure();
883 return newKey == NULL;
886 bool Table::updateFromLocal(int64_t machineId) {
887 if (!localCommunicationTable->contains(machineId))
890 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
892 // Get the size of the send data
893 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
895 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
896 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
897 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
900 Array<char> *sendData = new Array<char>(sendDataSize);
901 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
904 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
908 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
909 localSequenceNumber++;
911 if (returnData == NULL) {
912 // Could not contact server
917 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
918 int numberOfEntries = bbDecode->getInt();
920 for (int i = 0; i < numberOfEntries; i++) {
921 char type = bbDecode->get();
922 if (type == TypeAbort) {
923 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
925 } else if (type == TypeCommitPart) {
926 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
927 processEntry(commitPart);
931 updateLiveStateFromLocal();
936 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
938 // Get the devices local communications
939 if (!localCommunicationTable->contains(transaction->getArbitrator()))
940 return Pair<bool, bool>(true, false);
942 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
944 // Get the size of the send data
945 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
947 MyVector<TransactionPart *> *tParts = transaction->getParts();
948 uint tPartsSize = tParts->size();
949 for (uint i = 0; i < tPartsSize; i++) {
950 TransactionPart *part = tParts->get(i);
951 sendDataSize += part->getSize();
955 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
956 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
957 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
960 // Make the send data size
961 Array<char> *sendData = new Array<char>(sendDataSize);
962 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
965 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
966 bbEncode->putInt(transaction->getParts()->size());
968 MyVector<TransactionPart *> *tParts = transaction->getParts();
969 uint tPartsSize = tParts->size();
970 for (uint i = 0; i < tPartsSize; i++) {
971 TransactionPart *part = tParts->get(i);
972 part->encode(bbEncode);
977 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
978 localSequenceNumber++;
980 if (returnData == NULL) {
981 // Could not contact server
982 return Pair<bool, bool>(true, false);
986 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
987 bool didCommit = bbDecode->get() == 1;
988 bool couldArbitrate = bbDecode->get() == 1;
989 int numberOfEntries = bbDecode->getInt();
990 bool foundAbort = false;
992 for (int i = 0; i < numberOfEntries; i++) {
993 char type = bbDecode->get();
994 if (type == TypeAbort) {
995 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
997 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
1001 processEntry(abort);
1002 } else if (type == TypeCommitPart) {
1003 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1004 processEntry(commitPart);
1008 updateLiveStateFromLocal();
1010 if (couldArbitrate) {
1011 TransactionStatus *status = transaction->getTransactionStatus();
1013 status->setStatus(TransactionStatus_StatusCommitted);
1015 status->setStatus(TransactionStatus_StatusAborted);
1018 TransactionStatus *status = transaction->getTransactionStatus();
1020 status->setStatus(TransactionStatus_StatusAborted);
1022 status->setStatus(TransactionStatus_StatusCommitted);
1026 return Pair<bool, bool>(false, true);
1029 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1031 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1032 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1033 int numberOfParts = bbDecode->getInt();
1035 // If we did commit a transaction or not
1036 bool didCommit = false;
1037 bool couldArbitrate = false;
1039 if (numberOfParts != 0) {
1041 // decode the transaction
1042 Transaction *transaction = new Transaction();
1043 for (int i = 0; i < numberOfParts; i++) {
1045 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1046 transaction->addPartDecode(newPart);
1049 // Arbitrate on transaction and pull relevant return data
1050 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1051 couldArbitrate = localArbitrateReturn.getFirst();
1052 didCommit = localArbitrateReturn.getSecond();
1054 updateLiveStateFromLocal();
1056 // Transaction was sent to the server so keep track of it to prevent double commit
1057 if (transaction->getSequenceNumber() != -1) {
1058 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1062 // The data to send back
1063 int returnDataSize = 0;
1064 MyVector<Entry *> *unseenArbitrations = new MyVector<Entry *>();
1066 // Get the aborts to send back
1067 MyVector<int64_t> *abortLocalSequenceNumbers = new MyVector<int64_t>();
1069 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1070 while (abortit->hasNext())
1071 abortLocalSequenceNumbers->add(abortit->next());
1075 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1077 uint asize = abortLocalSequenceNumbers->size();
1078 for (uint i = 0; i < asize; i++) {
1079 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1080 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1084 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1085 unseenArbitrations->add(abort);
1086 returnDataSize += abort->getSize();
1089 // Get the commits to send back
1090 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1091 if (commitForClientTable != NULL) {
1092 MyVector<int64_t> *commitLocalSequenceNumbers = new MyVector<int64_t>();
1094 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1095 while (commitit->hasNext())
1096 commitLocalSequenceNumbers->add(commitit->next());
1099 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1101 uint clsSize = commitLocalSequenceNumbers->size();
1102 for (uint clsi = 0; clsi < clsSize; clsi++) {
1103 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1104 Commit *commit = commitForClientTable->get(localSequenceNumber);
1106 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1111 MyVector<CommitPart *> *parts = commit->getParts();
1112 uint nParts = parts->size();
1113 for (uint i = 0; i < nParts; i++) {
1114 CommitPart *commitPart = parts->get(i);
1115 unseenArbitrations->add(commitPart);
1116 returnDataSize += commitPart->getSize();
1122 // Number of arbitration entries to decode
1123 returnDataSize += 2 * sizeof(int32_t);
1125 // bool of did commit or not
1126 if (numberOfParts != 0) {
1127 returnDataSize += sizeof(char);
1130 // Data to send Back
1131 Array<char> *returnData = new Array<char>(returnDataSize);
1132 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1134 if (numberOfParts != 0) {
1136 bbEncode->put((char)1);
1138 bbEncode->put((char)0);
1140 if (couldArbitrate) {
1141 bbEncode->put((char)1);
1143 bbEncode->put((char)0);
1147 bbEncode->putInt(unseenArbitrations->size());
1148 uint size = unseenArbitrations->size();
1149 for (uint i = 0; i < size; i++) {
1150 Entry *entry = unseenArbitrations->get(i);
1151 entry->encode(bbEncode);
1154 localSequenceNumber++;
1158 /** Checks whether a given slot was sent using new slots in
1159 array. Returns true if sent and false otherwise. */
1161 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1162 uint size = array->length();
1163 for (uint i = 0; i < size; i++) {
1164 Slot *s = array->get(i);
1165 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1170 //Also need to see if other machines acknowledged our message
1171 for (uint i = 0; i < size; i++) {
1172 Slot *s = array->get(i);
1174 // Process each entry in the slot
1175 MyVector<Entry *> *entries = s->getEntries();
1176 uint eSize = entries->size();
1177 for (uint ei = 0; ei < eSize; ei++) {
1178 Entry *entry = entries->get(ei);
1180 if (entry->getType() == TypeLastMessage) {
1181 LastMessage *lastMessage = (LastMessage *)entry;
1183 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1193 /** Method tries to send slot to server. Returns status in tuple.
1194 isInserted returns whether last un-acked send (if any) was
1195 successful. Returns whether send was confirmed.x
1198 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1199 attemptedToSendToServer = true;
1201 *array = cloud->putSlot(slot, newSize);
1202 if (*array == NULL) {
1203 *array = new Array<Slot *>(1);
1204 (*array)->set(0, slot);
1205 rejectedSlotMyVector->clear();
1206 *isInserted = false;
1209 if ((*array)->length() == 0) {
1210 // throw new Error("Server Error: Did not send any slots");
1211 myerror("Server Error: Did not send any slots\n");
1214 if (hadPartialSendToServer) {
1215 *isInserted = checkSend(*array, slot);
1217 if (!(*isInserted)) {
1218 rejectedSlotMyVector->add(slot->getSequenceNumber());
1223 rejectedSlotMyVector->add(slot->getSequenceNumber());
1224 *isInserted = false;
1231 * Returns true if a resize was needed but not done.
1233 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1234 newSize = 0;//special value to indicate no resize
1235 if (liveSlotCount > bufferResizeThreshold) {
1236 resize = true;//Resize is forced
1240 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1241 TableStatus *status = new TableStatus(slot, newSize);
1242 slot->addShallowEntry(status);
1245 // Fill with rejected slots first before doing anything else
1246 doRejectedMessages(slot);
1248 // Do mandatory rescue of entries
1249 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1251 // Extract working variables
1252 bool needsResize = mandatoryRescueReturn.getFirst();
1253 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1254 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1256 if (needsResize && !resize) {
1257 // We need to resize but we are not resizing so return true to force on retry
1261 insertedKey = false;
1262 if (newKeyEntry != NULL) {
1263 newKeyEntry->setSlot(slot);
1264 if (slot->hasSpace(newKeyEntry)) {
1265 slot->addEntry(newKeyEntry);
1270 // Clear the transactions, aborts and commits that were sent previously
1272 uint size = pendingSendArbitrationRounds->size();
1273 for (uint i = 0; i < size; i++) {
1274 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1275 bool isFull = false;
1276 round->generateParts();
1277 MyVector<Entry *> *parts = round->getParts();
1279 // Insert pending arbitration data
1280 uint vsize = parts->size();
1281 for (uint vi = 0; vi < vsize; vi++) {
1282 Entry *arbitrationData = parts->get(vi);
1284 // If it is an abort then we need to set some information
1285 if (arbitrationData->getType() == TypeAbort) {
1286 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1289 if (!slot->hasSpace(arbitrationData)) {
1290 // No space so cant do anything else with these data entries
1295 // Add to this current slot and add it to entries to delete
1296 slot->addEntry(arbitrationData);
1297 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1305 if (pendingTransactionQueue->size() > 0) {
1306 Transaction *transaction = pendingTransactionQueue->get(0);
1307 // Set the transaction sequence number if it has yet to be inserted into the block chain
1308 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1309 transaction->setSequenceNumber(slot->getSequenceNumber());
1313 TransactionPart *part = transaction->getNextPartToSend();
1315 // Ran out of parts to send for this transaction so move on
1319 if (slot->hasSpace(part)) {
1320 slot->addEntry(part);
1321 MyVector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1322 if (partsSent == NULL) {
1323 partsSent = new MyVector<int32_t>();
1324 transactionPartsSent->put(transaction, partsSent);
1326 partsSent->add(part->getPartNumber());
1327 transactionPartsSent->put(transaction, partsSent);
1334 // Fill the remainder of the slot with rescue data
1335 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1340 void Table::doRejectedMessages(Slot *s) {
1341 if (!rejectedSlotMyVector->isEmpty()) {
1342 /* TODO: We should avoid generating a rejected message entry if
1343 * there is already a sufficient entry in the queue (e->g->,
1344 * equalsto value of true and same sequence number)-> */
1346 int64_t old_seqn = rejectedSlotMyVector->get(0);
1347 if (rejectedSlotMyVector->size() > Table_REJECTED_THRESHOLD) {
1348 int64_t new_seqn = rejectedSlotMyVector->lastElement();
1349 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1350 s->addShallowEntry(rm);
1352 int64_t prev_seqn = -1;
1354 /* Go through list of missing messages */
1355 for (; i < rejectedSlotMyVector->size(); i++) {
1356 int64_t curr_seqn = rejectedSlotMyVector->get(i);
1357 Slot *s_msg = buffer->getSlot(curr_seqn);
1360 prev_seqn = curr_seqn;
1362 /* Generate rejected message entry for missing messages */
1363 if (prev_seqn != -1) {
1364 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1365 s->addShallowEntry(rm);
1367 /* Generate rejected message entries for present messages */
1368 for (; i < rejectedSlotMyVector->size(); i++) {
1369 int64_t curr_seqn = rejectedSlotMyVector->get(i);
1370 Slot *s_msg = buffer->getSlot(curr_seqn);
1371 int64_t machineid = s_msg->getMachineID();
1372 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1373 s->addShallowEntry(rm);
1379 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1380 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1381 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1382 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1383 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1386 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1387 bool seenLiveSlot = false;
1388 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1389 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1393 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1394 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1395 // Push slot number forward
1396 if (!seenLiveSlot) {
1397 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1400 if (!previousSlot->isLive()) {
1404 // We have seen a live slot
1405 seenLiveSlot = true;
1407 // Get all the live entries for a slot
1408 MyVector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1410 // Iterate over all the live entries and try to rescue them
1411 uint lESize = liveEntries->size();
1412 for (uint i = 0; i < lESize; i++) {
1413 Entry *liveEntry = liveEntries->get(i);
1414 if (slot->hasSpace(liveEntry)) {
1415 // Enough space to rescue the entry
1416 slot->addEntry(liveEntry);
1417 } else if (currentSequenceNumber == firstIfFull) {
1418 //if there's no space but the entry is about to fall off the queue
1419 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1425 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1428 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1429 /* now go through live entries from least to greatest sequence number until
1430 * either all live slots added, or the slot doesn't have enough room
1431 * for SKIP_THRESHOLD consecutive entries*/
1433 int64_t newestseqnum = buffer->getNewestSeqNum();
1434 for (; seqn <= newestseqnum; seqn++) {
1435 Slot *prevslot = buffer->getSlot(seqn);
1436 //Push slot number forward
1438 oldestLiveSlotSequenceNumver = seqn;
1440 if (!prevslot->isLive())
1442 seenliveslot = true;
1443 MyVector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1444 uint lESize = liveentries->size();
1445 for (uint i = 0; i < lESize; i++) {
1446 Entry *liveentry = liveentries->get(i);
1447 if (s->hasSpace(liveentry))
1448 s->addEntry(liveentry);
1451 if (skipcount > Table_SKIP_THRESHOLD) {
1464 * Checks for malicious activity and updates the local copy of the block chain->
1466 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1467 // The cloud communication layer has checked slot HMACs already
1469 if (newSlots->length() == 0) {
1473 // Make sure all slots are newer than the last largest slot this
1475 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1476 if (firstSeqNum <= sequenceNumber) {
1477 // throw new Error("Server Error: Sent older slots!");
1478 myerror("Server Error: Sent older slots!\n");
1481 // Create an object that can access both new slots and slots in our
1482 // local chain without committing slots to our local chain
1483 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1485 // Check that the HMAC chain is not broken
1486 checkHMACChain(indexer, newSlots);
1488 // Set to keep track of messages from clients
1489 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1491 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1492 while (lmit->hasNext())
1493 machineSet->add(lmit->next());
1497 // Process each slots data
1499 uint numSlots = newSlots->length();
1500 for (uint i = 0; i < numSlots; i++) {
1501 Slot *slot = newSlots->get(i);
1502 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1503 updateExpectedSize();
1508 // If there is a gap, check to see if the server sent us
1510 if (firstSeqNum != (sequenceNumber + 1)) {
1512 // Check the size of the slots that were sent down by the server->
1513 // Can only check the size if there was a gap
1514 checkNumSlots(newSlots->length());
1516 // Since there was a gap every machine must have pushed a slot or
1517 // must have a last message message-> If not then the server is
1519 if (!machineSet->isEmpty()) {
1521 //throw new Error("Missing record for machines: ");
1522 myerror("Missing record for machines: \n");
1526 // Update the size of our local block chain->
1529 // Commit new to slots to the local block chain->
1531 uint numSlots = newSlots->length();
1532 for (uint i = 0; i < numSlots; i++) {
1533 Slot *slot = newSlots->get(i);
1535 // Insert this slot into our local block chain copy->
1536 buffer->putSlot(slot);
1538 // Keep track of how many slots are currently live (have live data
1543 // Get the sequence number of the latest slot in the system
1544 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1545 updateLiveStateFromServer();
1547 // No Need to remember after we pulled from the server
1548 offlineTransactionsCommittedAndAtServer->clear();
1550 // This is invalidated now
1551 hadPartialSendToServer = false;
1554 void Table::updateLiveStateFromServer() {
1555 // Process the new transaction parts
1556 processNewTransactionParts();
1558 // Do arbitration on new transactions that were received
1559 arbitrateFromServer();
1561 // Update all the committed keys
1562 bool didCommitOrSpeculate = updateCommittedTable();
1564 // Delete the transactions that are now dead
1565 updateLiveTransactionsAndStatus();
1568 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1569 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1572 void Table::updateLiveStateFromLocal() {
1573 // Update all the committed keys
1574 bool didCommitOrSpeculate = updateCommittedTable();
1576 // Delete the transactions that are now dead
1577 updateLiveTransactionsAndStatus();
1580 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1581 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1584 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1585 int64_t prevslots = firstSequenceNumber;
1587 if (didFindTableStatus) {
1589 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1592 didFindTableStatus = true;
1593 currMaxSize = numberOfSlots;
1596 void Table::updateExpectedSize() {
1599 if (expectedsize > currMaxSize) {
1600 expectedsize = currMaxSize;
1606 * Check the size of the block chain to make sure there are enough
1607 * slots sent back by the server-> This is only called when we have a
1608 * gap between the slots that we have locally and the slots sent by
1609 * the server therefore in the slots sent by the server there will be
1610 * at least 1 Table status message
1612 void Table::checkNumSlots(int numberOfSlots) {
1613 if (numberOfSlots != expectedsize) {
1614 //throw new Error("Server Error: Server did not send all slots-> Expected: ");
1615 myerror("Server Error: Server did not send all slots-> Expected: \n");
1620 * Update the size of of the local buffer if it is needed->
1622 void Table::commitNewMaxSize() {
1623 didFindTableStatus = false;
1625 // Resize the local slot buffer
1626 if (numberOfSlots != currMaxSize) {
1627 buffer->resize((int32_t)currMaxSize);
1630 // Change the number of local slots to the new size
1631 numberOfSlots = (int32_t)currMaxSize;
1633 // Recalculate the resize threshold since the size of the local
1634 // buffer has changed
1635 setResizeThreshold();
1639 * Process the new transaction parts from this latest round of slots
1640 * received from the server
1642 void Table::processNewTransactionParts() {
1644 if (newTransactionParts->size() == 0) {
1645 // Nothing new to process
1649 // Iterate through all the machine Ids that we received new parts
1651 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1652 while (tpit->hasNext()) {
1653 int64_t machineId = tpit->next();
1654 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
1656 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1657 // Iterate through all the parts for that machine Id
1658 while (ptit->hasNext()) {
1659 Pair<int64_t, int32_t> *partId = ptit->next();
1660 TransactionPart *part = parts->get(partId);
1662 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1663 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1664 if (lastTransactionNumber >= part->getSequenceNumber()) {
1665 // Set dead the transaction part
1672 // Get the transaction object for that sequence number
1673 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1675 if (transaction == NULL) {
1676 // This is a new transaction that we dont have so make a new one
1677 transaction = new Transaction();
1679 // Add that part to the transaction
1680 transaction->addPartDecode(part);
1682 // Insert this new transaction into the live tables
1683 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1684 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1691 // Clear all the new transaction parts in preparation for the next
1692 // time the server sends slots
1694 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1695 while (partsit->hasNext()) {
1696 int64_t machineId = partsit->next();
1697 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1701 newTransactionParts->clear();
1705 void Table::arbitrateFromServer() {
1706 if (liveTransactionBySequenceNumberTable->size() == 0) {
1707 // Nothing to arbitrate on so move on
1711 // Get the transaction sequence numbers and sort from oldest to newest
1712 MyVector<int64_t> *transactionSequenceNumbers = new MyVector<int64_t>();
1714 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1715 while (trit->hasNext())
1716 transactionSequenceNumbers->add(trit->next());
1719 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1721 // Collection of key value pairs that are
1722 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1724 // The last transaction arbitrated on
1725 int64_t lastTransactionCommitted = -1;
1726 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1727 uint tsnSize = transactionSequenceNumbers->size();
1728 for (uint i = 0; i < tsnSize; i++) {
1729 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1730 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1732 // Check if this machine arbitrates for this transaction if not
1733 // then we cant arbitrate this transaction
1734 if (transaction->getArbitrator() != localMachineId) {
1738 if (transactionSequenceNumber < lastSeqNumArbOn) {
1742 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1743 // We have seen this already locally so dont commit again
1747 if (!transaction->isComplete()) {
1748 // Will arbitrate in incorrect order if we continue so just break
1753 // update the largest transaction seen by arbitrator from server
1754 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1755 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1757 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1758 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1759 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1763 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1764 // Guard evaluated as true
1765 // Update the local changes so we can make the commit
1766 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1767 while (kvit->hasNext()) {
1768 KeyValue *kv = kvit->next();
1769 speculativeTableTmp->put(kv->getKey(), kv);
1773 // Update what the last transaction committed was for use in batch commit
1774 lastTransactionCommitted = transactionSequenceNumber;
1776 // Guard evaluated was false so create abort
1778 Abort *newAbort = new Abort(NULL,
1779 transaction->getClientLocalSequenceNumber(),
1780 transaction->getSequenceNumber(),
1781 transaction->getMachineId(),
1782 transaction->getArbitrator(),
1783 localArbitrationSequenceNumber);
1784 localArbitrationSequenceNumber++;
1785 generatedAborts->add(newAbort);
1787 // Insert the abort so we can process
1788 processEntry(newAbort);
1791 lastSeqNumArbOn = transactionSequenceNumber;
1794 delete transactionSequenceNumbers;
1796 Commit *newCommit = NULL;
1798 // If there is something to commit
1799 if (speculativeTableTmp->size() != 0) {
1800 // Create the commit and increment the commit sequence number
1801 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1802 localArbitrationSequenceNumber++;
1804 // Add all the new keys to the commit
1805 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1806 while (spit->hasNext()) {
1807 IoTString *string = spit->next();
1808 KeyValue *kv = speculativeTableTmp->get(string);
1809 newCommit->addKV(kv);
1813 // create the commit parts
1814 newCommit->createCommitParts();
1816 // Append all the commit parts to the end of the pending queue
1817 // waiting for sending to the server
1818 // Insert the commit so we can process it
1819 MyVector<CommitPart *> *parts = newCommit->getParts();
1820 uint partsSize = parts->size();
1821 for (uint i = 0; i < partsSize; i++) {
1822 CommitPart *commitPart = parts->get(i);
1823 processEntry(commitPart);
1826 delete speculativeTableTmp;
1828 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1829 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1830 pendingSendArbitrationRounds->add(arbitrationRound);
1832 if (compactArbitrationData()) {
1833 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1834 if (newArbitrationRound->getCommit() != NULL) {
1835 MyVector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1836 uint partsSize = parts->size();
1837 for (uint i = 0; i < partsSize; i++) {
1838 CommitPart *commitPart = parts->get(i);
1839 processEntry(commitPart);
1844 delete generatedAborts;
1848 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1850 // Check if this machine arbitrates for this transaction if not then
1851 // we cant arbitrate this transaction
1852 if (transaction->getArbitrator() != localMachineId) {
1853 return Pair<bool, bool>(false, false);
1856 if (!transaction->isComplete()) {
1857 // Will arbitrate in incorrect order if we continue so just break
1859 return Pair<bool, bool>(false, false);
1862 if (transaction->getMachineId() != localMachineId) {
1863 // dont do this check for local transactions
1864 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1865 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1866 // We've have already seen this from the server
1867 return Pair<bool, bool>(false, false);
1872 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1873 // Guard evaluated as true Create the commit and increment the
1874 // commit sequence number
1875 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1876 localArbitrationSequenceNumber++;
1878 // Update the local changes so we can make the commit
1879 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1880 while (kvit->hasNext()) {
1881 KeyValue *kv = kvit->next();
1882 newCommit->addKV(kv);
1886 // create the commit parts
1887 newCommit->createCommitParts();
1889 // Append all the commit parts to the end of the pending queue
1890 // waiting for sending to the server
1891 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1892 pendingSendArbitrationRounds->add(arbitrationRound);
1894 if (compactArbitrationData()) {
1895 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1896 MyVector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1897 uint partsSize = parts->size();
1898 for (uint i = 0; i < partsSize; i++) {
1899 CommitPart *commitPart = parts->get(i);
1900 processEntry(commitPart);
1903 // Insert the commit so we can process it
1904 MyVector<CommitPart *> *parts = newCommit->getParts();
1905 uint partsSize = parts->size();
1906 for (uint i = 0; i < partsSize; i++) {
1907 CommitPart *commitPart = parts->get(i);
1908 processEntry(commitPart);
1912 if (transaction->getMachineId() == localMachineId) {
1913 TransactionStatus *status = transaction->getTransactionStatus();
1914 if (status != NULL) {
1915 status->setStatus(TransactionStatus_StatusCommitted);
1919 updateLiveStateFromLocal();
1920 return Pair<bool, bool>(true, true);
1922 if (transaction->getMachineId() == localMachineId) {
1923 // For locally created messages update the status
1924 // Guard evaluated was false so create abort
1925 TransactionStatus *status = transaction->getTransactionStatus();
1926 if (status != NULL) {
1927 status->setStatus(TransactionStatus_StatusAborted);
1930 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1933 Abort *newAbort = new Abort(NULL,
1934 transaction->getClientLocalSequenceNumber(),
1936 transaction->getMachineId(),
1937 transaction->getArbitrator(),
1938 localArbitrationSequenceNumber);
1939 localArbitrationSequenceNumber++;
1940 addAbortSet->add(newAbort);
1942 // Append all the commit parts to the end of the pending queue
1943 // waiting for sending to the server
1944 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1945 pendingSendArbitrationRounds->add(arbitrationRound);
1947 if (compactArbitrationData()) {
1948 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1950 MyVector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1951 uint partsSize = parts->size();
1952 for (uint i = 0; i < partsSize; i++) {
1953 CommitPart *commitPart = parts->get(i);
1954 processEntry(commitPart);
1959 updateLiveStateFromLocal();
1960 return Pair<bool, bool>(true, false);
1965 * Compacts the arbitration data by merging commits and aggregating
1966 * aborts so that a single large push of commits can be done instead
1967 * of many small updates
1969 bool Table::compactArbitrationData() {
1970 if (pendingSendArbitrationRounds->size() < 2) {
1971 // Nothing to compact so do nothing
1975 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1976 if (lastRound->getDidSendPart()) {
1980 bool hadCommit = (lastRound->getCommit() == NULL);
1981 bool gotNewCommit = false;
1983 uint numberToDelete = 1;
1985 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1986 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1988 if (round->isFull() || round->getDidSendPart()) {
1989 // Stop since there is a part that cannot be compacted and we
1990 // need to compact in order
1994 if (round->getCommit() == NULL) {
1995 // Try compacting aborts only
1996 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1997 if (newSize > ArbitrationRound_MAX_PARTS) {
1998 // Cant compact since it would be too large
2001 lastRound->addAborts(round->getAborts());
2003 // Create a new larger commit
2004 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2005 localArbitrationSequenceNumber++;
2007 // Create the commit parts so that we can count them
2008 newCommit->createCommitParts();
2010 // Calculate the new size of the parts
2011 int newSize = newCommit->getNumberOfParts();
2012 newSize += lastRound->getAbortsCount();
2013 newSize += round->getAbortsCount();
2015 if (newSize > ArbitrationRound_MAX_PARTS) {
2016 // Can't compact since it would be too large
2017 if (lastRound->getCommit() != newCommit &&
2018 round->getCommit() != newCommit)
2022 // Set the new compacted part
2023 if (lastRound->getCommit() == newCommit)
2024 lastRound->setCommit(NULL);
2025 if (round->getCommit() == newCommit)
2026 round->setCommit(NULL);
2028 if (lastRound->getCommit() != NULL) {
2029 Commit * oldcommit = lastRound->getCommit();
2030 lastRound->setCommit(NULL);
2033 lastRound->setCommit(newCommit);
2034 lastRound->addAborts(round->getAborts());
2035 gotNewCommit = true;
2041 if (numberToDelete != 1) {
2042 // If there is a compaction
2043 // Delete the previous pieces that are now in the new compacted piece
2044 for (uint i = 2; i <= numberToDelete; i++) {
2045 delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2047 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2049 pendingSendArbitrationRounds->add(lastRound);
2051 // Should reinsert into the commit processor
2052 if (hadCommit && gotNewCommit) {
2061 * Update all the commits and the committed tables, sets dead the dead
2064 bool Table::updateCommittedTable() {
2065 if (newCommitParts->size() == 0) {
2066 // Nothing new to process
2070 // Iterate through all the machine Ids that we received new parts for
2071 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2072 while (partsit->hasNext()) {
2073 int64_t machineId = partsit->next();
2074 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2076 // Iterate through all the parts for that machine Id
2077 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2078 while (pairit->hasNext()) {
2079 Pair<int64_t, int32_t> *partId = pairit->next();
2080 CommitPart *part = pairit->currVal();
2082 // Get the transaction object for that sequence number
2083 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2085 if (commitForClientTable == NULL) {
2086 // This is the first commit from this device
2087 commitForClientTable = new Hashtable<int64_t, Commit *>();
2088 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2091 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2093 if (commit == NULL) {
2094 // This is a new commit that we dont have so make a new one
2095 commit = new Commit();
2097 // Insert this new commit into the live tables
2098 commitForClientTable->put(part->getSequenceNumber(), commit);
2101 // Add that part to the commit
2102 commit->addPartDecode(part);
2110 // Clear all the new commits parts in preparation for the next time
2111 // the server sends slots
2112 newCommitParts->clear();
2114 // If we process a new commit keep track of it for future use
2115 bool didProcessANewCommit = false;
2117 // Process the commits one by one
2118 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2119 while (liveit->hasNext()) {
2120 int64_t arbitratorId = liveit->next();
2121 // Get all the commits for a specific arbitrator
2122 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2124 // Sort the commits in order
2125 MyVector<int64_t> *commitSequenceNumbers = new MyVector<int64_t>();
2127 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2128 while (clientit->hasNext())
2129 commitSequenceNumbers->add(clientit->next());
2133 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2135 // Get the last commit seen from this arbitrator
2136 int64_t lastCommitSeenSequenceNumber = -1;
2137 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2138 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2141 // Go through each new commit one by one
2142 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2143 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2144 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2145 // Special processing if a commit is not complete
2146 if (!commit->isComplete()) {
2147 if (i == (commitSequenceNumbers->size() - 1)) {
2148 // If there is an incomplete commit and this commit is the
2149 // latest one seen then this commit cannot be processed and
2150 // there are no other commits
2153 // This is a commit that was already dead but parts of it
2154 // are still in the block chain (not flushed out yet)->
2155 // Delete it and move on
2157 commitForClientTable->remove(commit->getSequenceNumber());
2163 // Update the last transaction that was updated if we can
2164 if (commit->getTransactionSequenceNumber() != -1) {
2165 // Update the last transaction sequence number that the arbitrator arbitrated on1
2166 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2167 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2171 // Update the last arbitration data that we have seen so far
2172 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2173 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2174 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2176 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2179 // Never seen any data from this arbitrator so record the first one
2180 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2183 // We have already seen this commit before so need to do the
2184 // full processing on this commit
2185 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2186 // Update the last transaction that was updated if we can
2187 if (commit->getTransactionSequenceNumber() != -1) {
2188 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2189 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2190 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2191 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2197 // If we got here then this is a brand new commit and needs full
2199 // Get what commits should be edited, these are the commits that
2200 // have live values for their keys
2201 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2203 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2204 while (kvit->hasNext()) {
2205 KeyValue *kv = kvit->next();
2206 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2208 commitsToEdit->add(commit);
2213 // Update each previous commit that needs to be updated
2214 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2215 while (commitit->hasNext()) {
2216 Commit *previousCommit = commitit->next();
2218 // Only bother with live commits (TODO: Maybe remove this check)
2219 if (previousCommit->isLive()) {
2221 // Update which keys in the old commits are still live
2223 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2224 while (kvit->hasNext()) {
2225 KeyValue *kv = kvit->next();
2226 previousCommit->invalidateKey(kv->getKey());
2231 // if the commit is now dead then remove it
2232 if (!previousCommit->isLive()) {
2233 commitForClientTable->remove(previousCommit->getSequenceNumber());
2234 delete previousCommit;
2239 delete commitsToEdit;
2241 // Update the last seen sequence number from this arbitrator
2242 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2243 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2244 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2247 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2250 // We processed a new commit that we havent seen before
2251 didProcessANewCommit = true;
2253 // Update the committed table of keys and which commit is using which key
2255 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2256 while (kvit->hasNext()) {
2257 KeyValue *kv = kvit->next();
2258 committedKeyValueTable->put(kv->getKey(), kv);
2259 liveCommitsByKeyTable->put(kv->getKey(), commit);
2264 delete commitSequenceNumbers;
2268 return didProcessANewCommit;
2272 * Create the speculative table from transactions that are still live
2273 * and have come from the cloud
2275 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2276 if (liveTransactionBySequenceNumberTable->size() == 0) {
2277 // There is nothing to speculate on
2281 // Create a list of the transaction sequence numbers and sort them
2282 // from oldest to newest
2283 MyVector<int64_t> *transactionSequenceNumbersSorted = new MyVector<int64_t>();
2285 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2286 while (trit->hasNext())
2287 transactionSequenceNumbersSorted->add(trit->next());
2291 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2293 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2296 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2297 // If there is a gap in the transaction sequence numbers then
2298 // there was a commit or an abort of a transaction OR there was a
2299 // new commit (Could be from offline commit) so a redo the
2300 // speculation from scratch
2302 // Start from scratch
2303 speculatedKeyValueTable->clear();
2304 lastTransactionSequenceNumberSpeculatedOn = -1;
2305 oldestTransactionSequenceNumberSpeculatedOn = -1;
2308 // Remember the front of the transaction list
2309 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2311 // Find where to start arbitration from
2312 uint startIndex = 0;
2314 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2315 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2319 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2320 // Make sure we are not out of bounds
2321 delete transactionSequenceNumbersSorted;
2322 return false; // did not speculate
2325 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2326 bool didSkip = true;
2328 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2329 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2330 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2332 if (!transaction->isComplete()) {
2333 // If there is an incomplete transaction then there is nothing
2334 // we can do add this transactions arbitrator to the list of
2335 // arbitrators we should ignore
2336 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2341 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2345 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2347 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2348 // Guard evaluated to true so update the speculative table
2350 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2351 while (kvit->hasNext()) {
2352 KeyValue *kv = kvit->next();
2353 speculatedKeyValueTable->put(kv->getKey(), kv);
2360 delete transactionSequenceNumbersSorted;
2363 // Since there was a skip we need to redo the speculation next time around
2364 lastTransactionSequenceNumberSpeculatedOn = -1;
2365 oldestTransactionSequenceNumberSpeculatedOn = -1;
2368 // We did some speculation
2373 * Create the pending transaction speculative table from transactions
2374 * that are still in the pending transaction buffer
2376 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2377 if (pendingTransactionQueue->size() == 0) {
2378 // There is nothing to speculate on
2382 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2383 // need to reset on the pending speculation
2384 lastPendingTransactionSpeculatedOn = NULL;
2385 firstPendingTransaction = pendingTransactionQueue->get(0);
2386 pendingTransactionSpeculatedKeyValueTable->clear();
2389 // Find where to start arbitration from
2390 uint startIndex = 0;
2392 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2393 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2396 if (startIndex >= pendingTransactionQueue->size()) {
2397 // Make sure we are not out of bounds
2401 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2402 Transaction *transaction = pendingTransactionQueue->get(i);
2404 lastPendingTransactionSpeculatedOn = transaction;
2406 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2407 // Guard evaluated to true so update the speculative table
2408 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2409 while (kvit->hasNext()) {
2410 KeyValue *kv = kvit->next();
2411 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2419 * Set dead and remove from the live transaction tables the
2420 * transactions that are dead
2422 void Table::updateLiveTransactionsAndStatus() {
2423 // Go through each of the transactions
2425 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2426 while (iter->hasNext()) {
2427 int64_t key = iter->next();
2428 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2430 // Check if the transaction is dead
2431 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2432 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2433 // Set dead the transaction
2434 transaction->setDead();
2436 // Remove the transaction from the live table
2438 liveTransactionByTransactionIdTable->remove(transaction->getId());
2445 // Go through each of the transactions
2447 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2448 while (iter->hasNext()) {
2449 int64_t key = iter->next();
2450 TransactionStatus *status = outstandingTransactionStatus->get(key);
2452 // Check if the transaction is dead
2453 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2454 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2456 status->setStatus(TransactionStatus_StatusCommitted);
2467 * Process this slot, entry by entry-> Also update the latest message sent by slot
2469 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2471 // Update the last message seen
2472 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2474 // Process each entry in the slot
2475 MyVector<Entry *> *entries = slot->getEntries();
2476 uint eSize = entries->size();
2477 for (uint ei = 0; ei < eSize; ei++) {
2478 Entry *entry = entries->get(ei);
2479 switch (entry->getType()) {
2480 case TypeCommitPart:
2481 processEntry((CommitPart *)entry);
2484 processEntry((Abort *)entry);
2486 case TypeTransactionPart:
2487 processEntry((TransactionPart *)entry);
2490 processEntry((NewKey *)entry);
2492 case TypeLastMessage:
2493 processEntry((LastMessage *)entry, machineSet);
2495 case TypeRejectedMessage:
2496 processEntry((RejectedMessage *)entry, indexer);
2498 case TypeTableStatus:
2499 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2502 //throw new Error("Unrecognized type: ");
2503 myerror("Unrecognized type: \n");
2509 * Update the last message that was sent for a machine Id
2511 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2512 // Update what the last message received by a machine was
2513 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2517 * Add the new key to the arbitrators table and update the set of live
2518 * new keys (in case of a rescued new key message)
2520 void Table::processEntry(NewKey *entry) {
2521 // Update the arbitrator table with the new key information
2522 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2524 // Update what the latest live new key is
2525 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2526 if (oldNewKey != NULL) {
2527 // Delete the old new key messages
2528 oldNewKey->setDead();
2533 * Process new table status entries and set dead the old ones as new
2534 * ones come in-> keeps track of the largest and smallest table status
2535 * seen in this current round of updating the local copy of the block
2538 void Table::processEntry(TableStatus *entry, int64_t seq) {
2539 int newNumSlots = entry->getMaxSlots();
2540 updateCurrMaxSize(newNumSlots);
2541 initExpectedSize(seq, newNumSlots);
2543 if (liveTableStatus != NULL) {
2544 // We have a larger table status so the old table status is no
2546 liveTableStatus->setDead();
2549 // Make this new table status the latest alive table status
2550 liveTableStatus = entry;
2554 * Check old messages to see if there is a block chain violation->
2557 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2558 int64_t oldSeqNum = entry->getOldSeqNum();
2559 int64_t newSeqNum = entry->getNewSeqNum();
2560 bool isequal = entry->getEqual();
2561 int64_t machineId = entry->getMachineID();
2562 int64_t seq = entry->getSequenceNumber();
2564 // Check if we have messages that were supposed to be rejected in
2565 // our local block chain
2566 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2568 Slot *slot = indexer->getSlot(seqNum);
2571 // If we have this slot make sure that it was not supposed to be
2573 int64_t slotMachineId = slot->getMachineID();
2574 if (isequal != (slotMachineId == machineId)) {
2575 //throw new Error("Server Error: Trying to insert rejected message for slot ");
2576 myerror("Server Error: Trying to insert rejected message for slot\n");
2581 // Create a list of clients to watch until they see this rejected
2583 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2584 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2585 while (iter->hasNext()) {
2586 // Machine ID for the last message entry
2587 int64_t lastMessageEntryMachineId = iter->next();
2589 // We've seen it, don't need to continue to watch-> Our next
2590 // message will implicitly acknowledge it->
2591 if (lastMessageEntryMachineId == localMachineId) {
2595 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2596 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2598 if (entrySequenceNumber < seq) {
2599 // Add this rejected message to the set of messages that this
2600 // machine ID did not see yet
2601 addWatchMyVector(lastMessageEntryMachineId, entry);
2602 // This client did not see this rejected message yet so add it
2603 // to the watch set to monitor
2604 deviceWatchSet->add(lastMessageEntryMachineId);
2609 if (deviceWatchSet->isEmpty()) {
2610 // This rejected message has been seen by all the clients so
2612 delete deviceWatchSet;
2614 // We need to watch this rejected message
2615 entry->setWatchSet(deviceWatchSet);
2620 * Check if this abort is live, if not then save it so we can kill it
2621 * later-> update the last transaction number that was arbitrated on->
2623 void Table::processEntry(Abort *entry) {
2624 if (entry->getTransactionSequenceNumber() != -1) {
2625 // update the transaction status if it was sent to the server
2626 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2627 if (status != NULL) {
2628 status->setStatus(TransactionStatus_StatusAborted);
2632 // Abort has not been seen by the client it is for yet so we need to
2635 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2636 if (previouslySeenAbort != NULL) {
2637 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2640 if (entry->getTransactionArbitrator() == localMachineId) {
2641 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2644 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2645 // The machine already saw this so it is dead
2647 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2648 liveAbortTable->remove(&abortid);
2650 if (entry->getTransactionArbitrator() == localMachineId) {
2651 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2656 // Update the last arbitration data that we have seen so far
2657 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2658 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2659 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2661 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2664 // Never seen any data from this arbitrator so record the first one
2665 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2668 // Set dead a transaction if we can
2669 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2671 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2672 if (transactionToSetDead != NULL) {
2673 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2676 // Update the last transaction sequence number that the arbitrator
2678 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2679 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2681 if (entry->getTransactionSequenceNumber() != -1) {
2682 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2688 * Set dead the transaction part if that transaction is dead and keep
2689 * track of all new parts
2691 void Table::processEntry(TransactionPart *entry) {
2692 // Check if we have already seen this transaction and set it dead OR
2693 // if it is not alive
2694 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2695 // This transaction is dead, it was already committed or aborted
2700 // This part is still alive
2701 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2703 if (transactionPart == NULL) {
2704 // Dont have a table for this machine Id yet so make one
2705 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2706 newTransactionParts->put(entry->getMachineId(), transactionPart);
2709 // Update the part and set dead ones we have already seen (got a
2711 entry->acquireRef();
2712 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2713 if (previouslySeenPart != NULL) {
2714 previouslySeenPart->releaseRef();
2715 previouslySeenPart->setDead();
2720 * Process new commit entries and save them for future use-> Delete duplicates
2722 void Table::processEntry(CommitPart *entry) {
2723 // Update the last transaction that was updated if we can
2724 if (entry->getTransactionSequenceNumber() != -1) {
2725 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2726 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2727 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2731 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2732 if (commitPart == NULL) {
2733 // Don't have a table for this machine Id yet so make one
2734 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2735 newCommitParts->put(entry->getMachineId(), commitPart);
2737 // Update the part and set dead ones we have already seen (got a
2739 entry->acquireRef();
2740 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2741 if (previouslySeenPart != NULL) {
2742 previouslySeenPart->setDead();
2743 previouslySeenPart->releaseRef();
2748 * Update the last message seen table-> Update and set dead the
2749 * appropriate RejectedMessages as clients see them-> Updates the live
2750 * aborts, removes those that are dead and sets them dead-> Check that
2751 * the last message seen is correct and that there is no mismatch of
2752 * our own last message or that other clients have not had a rollback
2753 * on the last message->
2755 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2756 // We have seen this machine ID
2757 machineSet->remove(machineId);
2759 // Get the set of rejected messages that this machine Id is has not seen yet
2760 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchMyVectorTable->get(machineId);
2761 // If there is a rejected message that this machine Id has not seen yet
2762 if (watchset != NULL) {
2763 // Go through each rejected message that this machine Id has not
2766 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2767 while (rmit->hasNext()) {
2768 RejectedMessage *rm = rmit->next();
2769 // If this machine Id has seen this rejected message->->->
2770 if (rm->getSequenceNumber() <= seqNum) {
2771 // Remove it from our watchlist
2773 // Decrement machines that need to see this notification
2774 rm->removeWatcher(machineId);
2780 // Set dead the abort
2781 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2783 while (abortit->hasNext()) {
2784 Pair<int64_t, int64_t> *key = abortit->next();
2785 Abort *abort = liveAbortTable->get(key);
2786 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2789 if (abort->getTransactionArbitrator() == localMachineId) {
2790 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2795 if (machineId == localMachineId) {
2796 // Our own messages are immediately dead->
2797 char livenessType = liveness->getType();
2798 if (livenessType == TypeLastMessage) {
2799 ((LastMessage *)liveness)->setDead();
2800 } else if (livenessType == TypeSlot) {
2801 ((Slot *)liveness)->setDead();
2803 //throw new Error("Unrecognized type");
2804 myerror("Unrecognized type\n");
2807 // Get the old last message for this device
2808 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2809 if (lastMessageEntry == NULL) {
2810 // If no last message then there is nothing else to process
2814 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2815 Liveness *lastEntry = lastMessageEntry->getSecond();
2816 delete lastMessageEntry;
2818 // If it is not our machine Id since we already set ours to dead
2819 if (machineId != localMachineId) {
2820 char lastEntryType = lastEntry->getType();
2822 if (lastEntryType == TypeLastMessage) {
2823 ((LastMessage *)lastEntry)->setDead();
2824 } else if (lastEntryType == TypeSlot) {
2825 ((Slot *)lastEntry)->setDead();
2827 //throw new Error("Unrecognized type");
2828 myerror("Unrecognized type\n");
2831 // Make sure the server is not playing any games
2832 if (machineId == localMachineId) {
2833 if (hadPartialSendToServer) {
2834 // We were not making any updates and we had a machine mismatch
2835 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2836 //throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2837 myerror("Server Error: Mismatch on local machine sequence number, needed at least: \n");
2840 // We were not making any updates and we had a machine mismatch
2841 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2842 //throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2843 myerror("Server Error: Mismatch on local machine sequence number, needed: \n");
2847 if (lastMessageSeqNum > seqNum) {
2848 //throw new Error("Server Error: Rollback on remote machine sequence number");
2849 myerror("Server Error: Rollback on remote machine sequence number\n");
2855 * Add a rejected message entry to the watch set to keep track of
2856 * which clients have seen that rejected message entry and which have
2859 void Table::addWatchMyVector(int64_t machineId, RejectedMessage *entry) {
2860 Hashset<RejectedMessage *> *entries = rejectedMessageWatchMyVectorTable->get(machineId);
2861 if (entries == NULL) {
2862 // There is no set for this machine ID yet so create one
2863 entries = new Hashset<RejectedMessage *>();
2864 rejectedMessageWatchMyVectorTable->put(machineId, entries);
2866 entries->add(entry);
2870 * Check if the HMAC chain is not violated
2872 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2873 for (uint i = 0; i < newSlots->length(); i++) {
2874 Slot *currSlot = newSlots->get(i);
2875 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2876 if (prevSlot != NULL &&
2877 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2878 // throw new Error("Server Error: Invalid HMAC Chain");
2879 myerror("Server Error: Invalid HMAC Chain\n");