4 * IoTTable data structure. Provides client interface.
12 static final int FREE_SLOTS = 2;// Number of slots that should be kept free // 10
13 static final int SKIP_THRESHOLD = 10;
14 static final double RESIZE_MULTIPLE = 1.2;
15 static final double RESIZE_THRESHOLD = 0.75;
16 static final int REJECTED_THRESHOLD = 5;
19 SlotBuffer buffer = NULL;
20 CloudComm cloud = NULL;
22 TableStatus liveTableStatus = NULL;
23 PendingTransaction pendingTransactionBuilder = NULL;// Pending Transaction used in building a Pending Transaction
24 Transaction lastPendingTransactionSpeculatedOn = NULL;// Last transaction that was speculated on from the pending transaction
25 Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
28 int numberOfSlots = 0; // Number of slots stored in buffer
29 int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
30 int64_t liveSlotCount = 0;// Number of currently live slots
31 int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
32 int64_t localMachineId = 0; // Machine ID of this client device
33 int64_t sequenceNumber = 0; // Largest sequence number a client has received
34 int64_t localSequenceNumber = 0;
36 // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
37 // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
38 int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
39 int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
40 int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
41 int64_t localArbitrationSequenceNumber = 0;
42 bool hadPartialSendToServer = false;
43 bool attemptedToSendToServer = false;
45 bool didFindTableStatus = false;
46 int64_t currMaxSize = 0;
48 Slot lastSlotAttemptedToSend = NULL;
49 bool lastIsNewKey = false;
51 Hashtable<Transaction, Vector<int32_t> > lastTransactionPartsSent = NULL;
52 Vector<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
53 NewKey lastNewKey = NULL;
57 Hashtable<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
58 Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value
59 Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value from the pending transactions
60 Hashtable<IoTString, NewKey> liveNewKeyTable = NULL;// Table of live new keys
61 Hashtable<int64_t Pair<int64_t Liveness> > lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
62 Hashtable<int64_t HashSet<RejectedMessage> > rejectedMessageWatchVectorTable = NULL;// Table of machine Ids and the set of rejected messages they have not seen yet
63 Hashtable<IoTString, Long> arbitratorTable = NULL;// Table of keys and their arbitrators
64 Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
65 Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> > newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
66 Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> > newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
67 Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL;// Last transaction sequence number that an arbitrator arbitrated on
68 Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
69 Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL;// live transaction grouped by the transaction ID
70 Hashtable<int64_t Hashtable<int64_t Commit> > liveCommitsTable = NULL;
71 Hashtable<IoTString, Commit> liveCommitsByKeyTable = NULL;
72 Hashtable<int64_t, int64_t> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
73 Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
74 Vector<Transaction> pendingTransactionQueue = NULL;
75 Vector<ArbitrationRound> pendingSendArbitrationRounds = NULL;
76 Vector<Entry> pendingSendArbitrationEntriesToDelete = NULL;
77 Hashtable<Transaction, Vector<int32_t> > transactionPartsSent = NULL;
78 Hashtable<int64_t TransactionStatus> outstandingTransactionStatus = NULL;
79 Hashtable<int64_t Abort> liveAbortsGeneratedByLocal = NULL;
80 Set<Pair<int64_t, int64_t> > offlineTransactionsCommittedAndAtServer = NULL;
81 Hashtable<int64_t Pair<String, int32_t> > localCommunicationTable = NULL;
82 Hashtable<int64_t, int64_t> lastTransactionSeenFromMachineFromServer = NULL;
83 Hashtable<int64_t, int64_t> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
86 Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
87 localMachineId = _localMachineId;
88 cloud = new CloudComm(this, baseurl, password, listeningPort);
93 Table(CloudComm _cloud, int64_t _localMachineId) {
94 localMachineId = _localMachineId;
101 * Init all the stuff needed for for table usage
105 // Init helper objects
106 random = new Random();
107 buffer = new SlotBuffer();
110 oldestLiveSlotSequenceNumver = 1;
113 committedKeyValueTable = new Hashtable<IoTString, KeyValue>();
114 speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
115 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
116 liveNewKeyTable = new Hashtable<IoTString, NewKey>();
117 lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
118 rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
119 arbitratorTable = new Hashtable<IoTString, Long>();
120 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
121 newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
122 newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
123 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
124 liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
125 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
126 liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
127 liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
128 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
129 rejectedSlotVector = new Vector<Long>();
130 pendingTransactionQueue = new Vector<Transaction>();
131 pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
132 transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
133 outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
134 liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
135 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
136 localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
137 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
138 pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
139 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
143 numberOfSlots = buffer.capacity();
144 setResizeThreshold();
147 // TODO: delete method
148 synchronized void printSlots() {
149 int64_t o = buffer.getOldestSeqNum();
150 int64_t n = buffer.getNewestSeqNum();
152 int[] types = new int[10];
163 for (int64_t i = o; i < (n + 1); i++) {
164 Slot s = buffer.getSlot(i);
171 Vector<Entry> entries = s.getEntries();
173 for (Entry e : entries) {
175 int type = e.getType();
179 RejectedMessage rej = (RejectedMessage)e;
182 System.out.println(rej.getMachineID());
186 types[type] = types[type] + 1;
195 for (int i = 0; i < 10; i++) {
196 System.out.println(i + " " + types[i]);
198 System.out.println("Live count: " + livec);
199 System.out.println("Live Slot count: " + liveslo);
201 System.out.println("Dead count: " + deadc);
202 System.out.println("Old: " + o);
203 System.out.println("New: " + n);
204 System.out.println("Size: " + buffer.size());
205 // System.out.println("Commits: " + liveCommitsTable.size());
206 System.out.println("pendingTrans: " + pendingTransactionQueue.size());
207 System.out.println("Trans Status Out: " + outstandingTransactionStatus.size());
209 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
210 System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
214 for (Long a : liveCommitsTable.keySet()) {
215 for (Long b : liveCommitsTable.get(a).keySet()) {
216 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
217 System.out.print(kv + " ");
219 System.out.print("|| ");
221 System.out.println();
227 * Initialize the table by inserting a table status as the first entry into the table status
228 * also initialize the crypto stuff.
230 synchronized void initTable() throws ServerException {
231 cloud.initSecurity();
233 // Create the first insertion into the block chain which is the table status
234 Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
235 localSequenceNumber++;
236 TableStatus status = new TableStatus(s, numberOfSlots);
238 Slot[] array = cloud.putSlot(s, numberOfSlots);
241 array = new Slot[] {s};
242 // update local block chain
243 validateAndUpdate(array, true);
244 } else if (array.length == 1) {
245 // in case we did push the slot BUT we failed to init it
246 validateAndUpdate(array, true);
248 throw new Error("Error on initialization");
253 * Rebuild the table from scratch by pulling the latest block chain from the server.
255 synchronized void rebuild() throws ServerException {
256 // Just pull the latest slots from the server
257 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
258 validateAndUpdate(newslots, true);
260 updateLiveTransactionsAndStatus();
264 // String toString() {
265 // String retString = " Committed Table: \n";
266 // retString += "---------------------------\n";
267 // retString += commitedTable.toString();
269 // retString += "\n\n";
271 // retString += " Speculative Table: \n";
272 // retString += "---------------------------\n";
273 // retString += speculativeTable.toString();
278 synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
279 localCommunicationTable.put(arbitrator, new Pair<String, int32_t>(hostName, portNumber));
282 synchronized Long getArbitrator(IoTString key) {
283 return arbitratorTable.get(key);
286 synchronized void close() {
290 synchronized IoTString getCommitted(IoTString key) {
291 KeyValue kv = committedKeyValueTable.get(key);
294 return kv.getValue();
300 synchronized IoTString getSpeculative(IoTString key) {
301 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
304 kv = speculatedKeyValueTable.get(key);
308 kv = committedKeyValueTable.get(key);
312 return kv.getValue();
318 synchronized IoTString getCommittedAtomic(IoTString key) {
319 KeyValue kv = committedKeyValueTable.get(key);
321 if (arbitratorTable.get(key) == NULL) {
322 throw new Error("Key not Found.");
325 // Make sure new key value pair matches the current arbitrator
326 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
327 // TODO: Maybe not throw en error
328 throw new Error("Not all Key Values Match Arbitrator.");
332 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
333 return kv.getValue();
335 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
340 synchronized IoTString getSpeculativeAtomic(IoTString key) {
341 if (arbitratorTable.get(key) == NULL) {
342 throw new Error("Key not Found.");
345 // Make sure new key value pair matches the current arbitrator
346 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
347 // TODO: Maybe not throw en error
348 throw new Error("Not all Key Values Match Arbitrator.");
351 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
354 kv = speculatedKeyValueTable.get(key);
358 kv = committedKeyValueTable.get(key);
362 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
363 return kv.getValue();
365 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
370 synchronized bool update() {
372 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
373 validateAndUpdate(newSlots, false);
377 updateLiveTransactionsAndStatus();
380 } catch (Exception e) {
381 // e.printStackTrace();
383 for (Long m : localCommunicationTable.keySet()) {
391 synchronized bool createNewKey(IoTString keyName, int64_t machineId) throws ServerException {
393 if (arbitratorTable.get(keyName) != NULL) {
394 // There is already an arbitrator
398 NewKey newKey = new NewKey(NULL, keyName, machineId);
400 if (sendToServer(newKey)) {
401 // If successfully inserted
407 synchronized void startTransaction() {
408 // Create a new transaction, invalidates any old pending transactions.
409 pendingTransactionBuilder = new PendingTransaction(localMachineId);
412 synchronized void addKV(IoTString key, IoTString value) {
414 // Make sure it is a valid key
415 if (arbitratorTable.get(key) == NULL) {
416 throw new Error("Key not Found.");
419 // Make sure new key value pair matches the current arbitrator
420 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
421 // TODO: Maybe not throw en error
422 throw new Error("Not all Key Values Match Arbitrator.");
425 // Add the key value to this transaction
426 KeyValue kv = new KeyValue(key, value);
427 pendingTransactionBuilder.addKV(kv);
430 synchronized TransactionStatus commitTransaction() {
432 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
433 // transaction with no updates will have no effect on the system
434 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
437 // Set the local transaction sequence number and increment
438 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
439 localTransactionSequenceNumber++;
441 // Create the transaction status
442 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
444 // Create the new transaction
445 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
446 newTransaction.setTransactionStatus(transactionStatus);
448 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
449 // Add it to the queue and invalidate the builder for safety
450 pendingTransactionQueue.add(newTransaction);
452 arbitrateOnLocalTransaction(newTransaction);
453 updateLiveStateFromLocal();
456 pendingTransactionBuilder = new PendingTransaction(localMachineId);
460 } catch (ServerException e) {
462 Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
463 for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
464 Transaction transaction = iter.next();
466 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
467 // Already contacted this client so ignore all attempts to contact this client
468 // to preserve ordering for arbitrator
472 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
474 if (sendReturn.getFirst()) {
475 // Failed to contact over local
476 arbitratorTriedAndFailed.add(transaction.getArbitrator());
478 // Successful contact or should not contact
480 if (sendReturn.getSecond()) {
488 updateLiveStateFromLocal();
490 return transactionStatus;
494 * Get the machine ID for this client
496 int64_t getMachineId() {
497 return localMachineId;
501 * Decrement the number of live slots that we currently have
503 void decrementLiveCount() {
508 * Recalculate the new resize threshold
510 void setResizeThreshold() {
511 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
512 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
515 int64_t getLocalSequenceNumber() {
516 return localSequenceNumber;
520 bool lastInsertedNewKey = false;
522 bool sendToServer(NewKey newKey) throws ServerException {
524 bool fromRetry = false;
527 if (hadPartialSendToServer) {
528 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
529 if (newSlots.length == 0) {
531 ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
533 if (sendSlotsReturn.getFirst()) {
534 if (newKey != NULL) {
535 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
540 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
541 transaction.resetServerFailure();
543 // Update which transactions parts still need to be sent
544 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
546 // Add the transaction status to the outstanding list
547 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
549 // Update the transaction status
550 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
552 // Check if all the transaction parts were successfully sent and if so then remove it from pending
553 if (transaction.didSendAllParts()) {
554 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
555 pendingTransactionQueue.remove(transaction);
560 newSlots = sendSlotsReturn.getThird();
562 bool isInserted = false;
563 for (Slot s : newSlots) {
564 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
570 for (Slot s : newSlots) {
575 // Process each entry in the slot
576 for (Entry entry : s.getEntries()) {
578 if (entry.getType() == Entry.TypeLastMessage) {
579 LastMessage lastMessage = (LastMessage)entry;
580 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
589 if (newKey != NULL) {
590 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
595 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
596 transaction.resetServerFailure();
598 // Update which transactions parts still need to be sent
599 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
601 // Add the transaction status to the outstanding list
602 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
604 // Update the transaction status
605 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
607 // Check if all the transaction parts were successfully sent and if so then remove it from pending
608 if (transaction.didSendAllParts()) {
609 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
610 pendingTransactionQueue.remove(transaction);
612 transaction.resetServerFailure();
613 // Set the transaction sequence number back to nothing
614 if (!transaction.didSendAPartToServer()) {
615 transaction.setSequenceNumber(-1);
622 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
623 transaction.resetServerFailure();
624 // Set the transaction sequence number back to nothing
625 if (!transaction.didSendAPartToServer()) {
626 transaction.setSequenceNumber(-1);
630 if (sendSlotsReturn.getThird().length != 0) {
631 // insert into the local block chain
632 validateAndUpdate(sendSlotsReturn.getThird(), true);
636 bool isInserted = false;
637 for (Slot s : newSlots) {
638 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
644 for (Slot s : newSlots) {
649 // Process each entry in the slot
650 for (Entry entry : s.getEntries()) {
652 if (entry.getType() == Entry.TypeLastMessage) {
653 LastMessage lastMessage = (LastMessage)entry;
654 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
663 if (newKey != NULL) {
664 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
669 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
670 transaction.resetServerFailure();
672 // Update which transactions parts still need to be sent
673 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
675 // Add the transaction status to the outstanding list
676 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
678 // Update the transaction status
679 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
681 // Check if all the transaction parts were successfully sent and if so then remove it from pending
682 if (transaction.didSendAllParts()) {
683 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
684 pendingTransactionQueue.remove(transaction);
686 transaction.resetServerFailure();
687 // Set the transaction sequence number back to nothing
688 if (!transaction.didSendAPartToServer()) {
689 transaction.setSequenceNumber(-1);
694 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
695 transaction.resetServerFailure();
696 // Set the transaction sequence number back to nothing
697 if (!transaction.didSendAPartToServer()) {
698 transaction.setSequenceNumber(-1);
703 // insert into the local block chain
704 validateAndUpdate(newSlots, true);
707 } catch (ServerException e) {
714 // While we have stuff that needs inserting into the block chain
715 while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) {
719 if (hadPartialSendToServer) {
720 throw new Error("Should Be error free");
725 // If there is a new key with same name then end
726 if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) {
731 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
732 localSequenceNumber++;
734 // Try to fill the slot with data
735 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
736 bool needsResize = fillSlotsReturn.getFirst();
737 int newSize = fillSlotsReturn.getSecond();
738 bool insertedNewKey = fillSlotsReturn.getThird();
741 // Reset which transaction to send
742 for (Transaction transaction : transactionPartsSent.keySet()) {
743 transaction.resetNextPartToSend();
745 // Set the transaction sequence number back to nothing
746 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
747 transaction.setSequenceNumber(-1);
751 // Clear the sent data since we are trying again
752 pendingSendArbitrationEntriesToDelete.clear();
753 transactionPartsSent.clear();
755 // We needed a resize so try again
756 fillSlot(slot, true, newKey);
759 lastSlotAttemptedToSend = slot;
760 lastIsNewKey = (newKey != NULL);
761 lastInsertedNewKey = insertedNewKey;
762 lastNewSize = newSize;
764 lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
765 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
768 ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
770 if (sendSlotsReturn.getFirst()) {
772 // Did insert into the block chain
774 if (insertedNewKey) {
775 // This slot was what was inserted not a previous slot
777 // New Key was successfully inserted into the block chain so dont want to insert it again
781 // Remove the aborts and commit parts that were sent from the pending to send queue
782 for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
783 ArbitrationRound round = iter.next();
784 round.removeParts(pendingSendArbitrationEntriesToDelete);
786 if (round.isDoneSending()) {
787 // Sent all the parts
792 for (Transaction transaction : transactionPartsSent.keySet()) {
793 transaction.resetServerFailure();
795 // Update which transactions parts still need to be sent
796 transaction.removeSentParts(transactionPartsSent.get(transaction));
798 // Add the transaction status to the outstanding list
799 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
801 // Update the transaction status
802 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
804 // Check if all the transaction parts were successfully sent and if so then remove it from pending
805 if (transaction.didSendAllParts()) {
806 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
807 pendingTransactionQueue.remove(transaction);
812 // if (!sendSlotsReturn.getSecond()) {
813 // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
814 // transaction.resetServerFailure();
817 // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
818 // transaction.resetServerFailure();
820 // // Update which transactions parts still need to be sent
821 // transaction.removeSentParts(transactionPartsSent.get(transaction));
823 // // Add the transaction status to the outstanding list
824 // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
826 // // Update the transaction status
827 // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
829 // // Check if all the transaction parts were successfully sent and if so then remove it from pending
830 // if (transaction.didSendAllParts()) {
831 // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
832 // pendingTransactionQueue.remove(transaction);
834 // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
835 // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
841 // Reset which transaction to send
842 for (Transaction transaction : transactionPartsSent.keySet()) {
843 transaction.resetNextPartToSend();
844 // transaction.resetNextPartToSend();
846 // Set the transaction sequence number back to nothing
847 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
848 transaction.setSequenceNumber(-1);
853 // Clear the sent data in preparation for next send
854 pendingSendArbitrationEntriesToDelete.clear();
855 transactionPartsSent.clear();
857 if (sendSlotsReturn.getThird().length != 0) {
858 // insert into the local block chain
859 validateAndUpdate(sendSlotsReturn.getThird(), true);
863 } catch (ServerException e) {
865 if (e.getType() != ServerException.TypeInputTimeout) {
866 // e.printStackTrace();
868 // Nothing was able to be sent to the server so just clear these data structures
869 for (Transaction transaction : transactionPartsSent.keySet()) {
870 transaction.resetNextPartToSend();
872 // Set the transaction sequence number back to nothing
873 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
874 transaction.setSequenceNumber(-1);
878 // There was a partial send to the server
879 hadPartialSendToServer = true;
883 // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
884 // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
887 // Nothing was able to be sent to the server so just clear these data structures
888 for (Transaction transaction : transactionPartsSent.keySet()) {
889 transaction.resetNextPartToSend();
890 transaction.setServerFailure();
894 pendingSendArbitrationEntriesToDelete.clear();
895 transactionPartsSent.clear();
900 return newKey == NULL;
903 synchronized bool updateFromLocal(int64_t machineId) {
904 Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(machineId);
905 if (localCommunicationInformation == NULL) {
906 // Cant talk to that device locally so do nothing
910 // Get the size of the send data
911 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
913 Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
914 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
915 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
918 char[] sendData = new char[sendDataSize];
919 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
922 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
926 char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
927 localSequenceNumber++;
929 if (returnData == NULL) {
930 // Could not contact server
935 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
936 int numberOfEntries = bbDecode.getInt();
938 for (int i = 0; i < numberOfEntries; i++) {
939 char type = bbDecode.get();
940 if (type == Entry.TypeAbort) {
941 Abort abort = (Abort)Abort.decode(NULL, bbDecode);
943 } else if (type == Entry.TypeCommitPart) {
944 CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
945 processEntry(commitPart);
949 updateLiveStateFromLocal();
954 Pair<bool, bool> sendTransactionToLocal(Transaction transaction) {
956 // Get the devices local communications
957 Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
959 if (localCommunicationInformation == NULL) {
960 // Cant talk to that device locally so do nothing
961 return new Pair<bool, bool>(true, false);
964 // Get the size of the send data
965 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
966 for (TransactionPart part : transaction.getParts().values()) {
967 sendDataSize += part.getSize();
970 Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
971 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
972 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
975 // Make the send data size
976 char[] sendData = new char[sendDataSize];
977 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
980 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
981 bbEncode.putInt(transaction.getParts().size());
982 for (TransactionPart part : transaction.getParts().values()) {
983 part.encode(bbEncode);
988 char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
989 localSequenceNumber++;
991 if (returnData == NULL) {
992 // Could not contact server
993 return new Pair<bool, bool>(true, false);
997 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
998 bool didCommit = bbDecode.get() == 1;
999 bool couldArbitrate = bbDecode.get() == 1;
1000 int numberOfEntries = bbDecode.getInt();
1001 bool foundAbort = false;
1003 for (int i = 0; i < numberOfEntries; i++) {
1004 char type = bbDecode.get();
1005 if (type == Entry.TypeAbort) {
1006 Abort abort = (Abort)Abort.decode(NULL, bbDecode);
1008 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1012 processEntry(abort);
1013 } else if (type == Entry.TypeCommitPart) {
1014 CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
1015 processEntry(commitPart);
1019 updateLiveStateFromLocal();
1021 if (couldArbitrate) {
1022 TransactionStatus status = transaction.getTransactionStatus();
1024 status.setStatus(TransactionStatus.StatusCommitted);
1026 status.setStatus(TransactionStatus.StatusAborted);
1029 TransactionStatus status = transaction.getTransactionStatus();
1031 status.setStatus(TransactionStatus.StatusAborted);
1033 status.setStatus(TransactionStatus.StatusCommitted);
1037 return new Pair<bool, bool>(false, true);
1040 synchronized char[] acceptDataFromLocal(char[] data) {
1043 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1044 int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1045 int numberOfParts = bbDecode.getInt();
1047 // If we did commit a transaction or not
1048 bool didCommit = false;
1049 bool couldArbitrate = false;
1051 if (numberOfParts != 0) {
1053 // decode the transaction
1054 Transaction transaction = new Transaction();
1055 for (int i = 0; i < numberOfParts; i++) {
1057 TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
1058 transaction.addPartDecode(newPart);
1061 // Arbitrate on transaction and pull relevant return data
1062 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1063 couldArbitrate = localArbitrateReturn.getFirst();
1064 didCommit = localArbitrateReturn.getSecond();
1066 updateLiveStateFromLocal();
1068 // Transaction was sent to the server so keep track of it to prevent double commit
1069 if (transaction.getSequenceNumber() != -1) {
1070 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1074 // The data to send back
1075 int returnDataSize = 0;
1076 Vector<Entry> unseenArbitrations = new Vector<Entry>();
1078 // Get the aborts to send back
1079 Vector<Long> abortLocalSequenceNumbers = new Vector<Long >(liveAbortsGeneratedByLocal.keySet());
1080 Collections.sort(abortLocalSequenceNumbers);
1081 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1082 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1086 Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1087 unseenArbitrations.add(abort);
1088 returnDataSize += abort.getSize();
1091 // Get the commits to send back
1092 Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1093 if (commitForClientTable != NULL) {
1094 Vector<Long> commitLocalSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
1095 Collections.sort(commitLocalSequenceNumbers);
1097 for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1098 Commit commit = commitForClientTable.get(localSequenceNumber);
1100 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1104 unseenArbitrations.addAll(commit.getParts().values());
1106 for (CommitPart commitPart : commit.getParts().values()) {
1107 returnDataSize += commitPart.getSize();
1112 // Number of arbitration entries to decode
1113 returnDataSize += 2 * sizeof(int32_t);
1115 // bool of did commit or not
1116 if (numberOfParts != 0) {
1117 returnDataSize += sizeof(char);
1120 // Data to send Back
1121 char[] returnData = new char[returnDataSize];
1122 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1124 if (numberOfParts != 0) {
1126 bbEncode.put((char)1);
1128 bbEncode.put((char)0);
1130 if (couldArbitrate) {
1131 bbEncode.put((char)1);
1133 bbEncode.put((char)0);
1137 bbEncode.putInt(unseenArbitrations.size());
1138 for (Entry entry : unseenArbitrations) {
1139 entry.encode(bbEncode);
1143 localSequenceNumber++;
1147 ThreeTuple<bool, bool, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
1149 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1150 attemptedToSendToServer = true;
1152 bool inserted = false;
1153 bool lastTryInserted = false;
1155 Slot[] array = cloud.putSlot(slot, newSize);
1156 if (array == NULL) {
1157 array = new Slot[] {slot};
1158 rejectedSlotVector.clear();
1161 if (array.length == 0) {
1162 throw new Error("Server Error: Did not send any slots");
1165 // if (attemptedToSendToServerTmp) {
1166 if (hadPartialSendToServer) {
1168 bool isInserted = false;
1169 for (Slot s : array) {
1170 if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1176 for (Slot s : array) {
1181 // Process each entry in the slot
1182 for (Entry entry : s.getEntries()) {
1184 if (entry.getType() == Entry.TypeLastMessage) {
1185 LastMessage lastMessage = (LastMessage)entry;
1187 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1196 rejectedSlotVector.add(slot.getSequenceNumber());
1197 lastTryInserted = false;
1199 lastTryInserted = true;
1202 rejectedSlotVector.add(slot.getSequenceNumber());
1203 lastTryInserted = false;
1207 return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
1211 * Returns false if a resize was needed
1213 ThreeTuple<bool, int32_t, bool> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
1217 if (liveSlotCount > bufferResizeThreshold) {
1218 resize = true;//Resize is forced
1223 newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1224 TableStatus status = new TableStatus(slot, newSize);
1225 slot.addEntry(status);
1228 // Fill with rejected slots first before doing anything else
1229 doRejectedMessages(slot);
1231 // Do mandatory rescue of entries
1232 ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1234 // Extract working variables
1235 bool needsResize = mandatoryRescueReturn.getFirst();
1236 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1237 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1239 if (needsResize && !resize) {
1240 // We need to resize but we are not resizing so return false
1241 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1244 bool inserted = false;
1245 if (newKeyEntry != NULL) {
1246 newKeyEntry.setSlot(slot);
1247 if (slot.hasSpace(newKeyEntry)) {
1249 slot.addEntry(newKeyEntry);
1254 // Clear the transactions, aborts and commits that were sent previously
1255 transactionPartsSent.clear();
1256 pendingSendArbitrationEntriesToDelete.clear();
1258 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1259 bool isFull = false;
1260 round.generateParts();
1261 Vector<Entry> parts = round.getParts();
1263 // Insert pending arbitration data
1264 for (Entry arbitrationData : parts) {
1266 // If it is an abort then we need to set some information
1267 if (arbitrationData instanceof Abort) {
1268 ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1271 if (!slot.hasSpace(arbitrationData)) {
1272 // No space so cant do anything else with these data entries
1277 // Add to this current slot and add it to entries to delete
1278 slot.addEntry(arbitrationData);
1279 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1287 if (pendingTransactionQueue.size() > 0) {
1289 Transaction transaction = pendingTransactionQueue.get(0);
1291 // Set the transaction sequence number if it has yet to be inserted into the block chain
1292 // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1293 // transaction.setSequenceNumber(slot.getSequenceNumber());
1296 if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1297 transaction.setSequenceNumber(slot.getSequenceNumber());
1302 TransactionPart part = transaction.getNextPartToSend();
1305 // Ran out of parts to send for this transaction so move on
1309 if (slot.hasSpace(part)) {
1310 slot.addEntry(part);
1311 Vector<int32_t> partsSent = transactionPartsSent.get(transaction);
1312 if (partsSent == NULL) {
1313 partsSent = new Vector<int32_t>();
1314 transactionPartsSent.put(transaction, partsSent);
1316 partsSent.add(part.getPartNumber());
1317 transactionPartsSent.put(transaction, partsSent);
1324 // Fill the remainder of the slot with rescue data
1325 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1327 return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1330 void doRejectedMessages(Slot s) {
1331 if (!rejectedSlotVector.isEmpty()) {
1332 /* TODO: We should avoid generating a rejected message entry if
1333 * there is already a sufficient entry in the queue (e.g.,
1334 * equalsto value of true and same sequence number). */
1336 int64_t old_seqn = rejectedSlotVector.firstElement();
1337 if (rejectedSlotVector.size() > REJECTED_THRESHOLD) {
1338 int64_t new_seqn = rejectedSlotVector.lastElement();
1339 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1342 int64_t prev_seqn = -1;
1344 /* Go through list of missing messages */
1345 for (; i < rejectedSlotVector.size(); i++) {
1346 int64_t curr_seqn = rejectedSlotVector.get(i);
1347 Slot s_msg = buffer.getSlot(curr_seqn);
1350 prev_seqn = curr_seqn;
1352 /* Generate rejected message entry for missing messages */
1353 if (prev_seqn != -1) {
1354 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1357 /* Generate rejected message entries for present messages */
1358 for (; i < rejectedSlotVector.size(); i++) {
1359 int64_t curr_seqn = rejectedSlotVector.get(i);
1360 Slot s_msg = buffer.getSlot(curr_seqn);
1361 int64_t machineid = s_msg.getMachineID();
1362 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1369 ThreeTuple<bool, bool, Long> doMandatoryResuce(Slot slot, bool resize) {
1370 int64_t newestSequenceNumber = buffer.getNewestSeqNum();
1371 int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
1372 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1373 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1376 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1377 bool seenLiveSlot = false;
1378 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1379 int64_t threshold = firstIfFull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1383 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1384 Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1385 // Push slot number forward
1386 if (!seenLiveSlot) {
1387 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1390 if (!previousSlot.isLive()) {
1394 // We have seen a live slot
1395 seenLiveSlot = true;
1397 // Get all the live entries for a slot
1398 Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1400 // Iterate over all the live entries and try to rescue them
1401 for (Entry liveEntry : liveEntries) {
1402 if (slot.hasSpace(liveEntry)) {
1404 // Enough space to rescue the entry
1405 slot.addEntry(liveEntry);
1406 } else if (currentSequenceNumber == firstIfFull) {
1407 //if there's no space but the entry is about to fall off the queue
1408 System.out.println("B");//?
1409 return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
1416 return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
1419 void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
1420 /* now go through live entries from least to greatest sequence number until
1421 * either all live slots added, or the slot doesn't have enough room
1422 * for SKIP_THRESHOLD consecutive entries*/
1424 int64_t newestseqnum = buffer.getNewestSeqNum();
1426 for (; seqn <= newestseqnum; seqn++) {
1427 Slot prevslot = buffer.getSlot(seqn);
1428 //Push slot number forward
1430 oldestLiveSlotSequenceNumver = seqn;
1432 if (!prevslot.isLive())
1434 seenliveslot = true;
1435 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1436 for (Entry liveentry : liveentries) {
1437 if (s.hasSpace(liveentry))
1438 s.addEntry(liveentry);
1441 if (skipcount > SKIP_THRESHOLD)
1449 * Checks for malicious activity and updates the local copy of the block chain.
1451 void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
1453 // The cloud communication layer has checked slot HMACs already before decoding
1454 if (newSlots.length == 0) {
1458 // Make sure all slots are newer than the last largest slot this client has seen
1459 int64_t firstSeqNum = newSlots[0].getSequenceNumber();
1460 if (firstSeqNum <= sequenceNumber) {
1461 throw new Error("Server Error: Sent older slots!");
1464 // Create an object that can access both new slots and slots in our local chain
1465 // without committing slots to our local chain
1466 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1468 // Check that the HMAC chain is not broken
1469 checkHMACChain(indexer, newSlots);
1471 // Set to keep track of messages from clients
1472 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1474 // Process each slots data
1475 for (Slot slot : newSlots) {
1476 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1478 updateExpectedSize();
1481 // If there is a gap, check to see if the server sent us everything.
1482 if (firstSeqNum != (sequenceNumber + 1)) {
1484 // Check the size of the slots that were sent down by the server.
1485 // Can only check the size if there was a gap
1486 checkNumSlots(newSlots.length);
1488 // Since there was a gap every machine must have pushed a slot or must have
1489 // a last message message. If not then the server is hiding slots
1490 if (!machineSet.isEmpty()) {
1491 throw new Error("Missing record for machines: " + machineSet);
1495 // Update the size of our local block chain.
1498 // Commit new to slots to the local block chain.
1499 for (Slot slot : newSlots) {
1501 // Insert this slot into our local block chain copy.
1502 buffer.putSlot(slot);
1504 // Keep track of how many slots are currently live (have live data in them).
1508 // Get the sequence number of the latest slot in the system
1509 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1511 updateLiveStateFromServer();
1513 // No Need to remember after we pulled from the server
1514 offlineTransactionsCommittedAndAtServer.clear();
1516 // This is invalidated now
1517 hadPartialSendToServer = false;
1520 void updateLiveStateFromServer() {
1521 // Process the new transaction parts
1522 processNewTransactionParts();
1524 // Do arbitration on new transactions that were received
1525 arbitrateFromServer();
1527 // Update all the committed keys
1528 bool didCommitOrSpeculate = updateCommittedTable();
1530 // Delete the transactions that are now dead
1531 updateLiveTransactionsAndStatus();
1534 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1535 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1538 void updateLiveStateFromLocal() {
1539 // Update all the committed keys
1540 bool didCommitOrSpeculate = updateCommittedTable();
1542 // Delete the transactions that are now dead
1543 updateLiveTransactionsAndStatus();
1546 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1547 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1550 void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1551 // if (didFindTableStatus) {
1554 int64_t prevslots = firstSequenceNumber;
1557 if (didFindTableStatus) {
1558 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1559 // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
1562 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1563 // System.out.println("Here: " + expectedsize);
1566 // System.out.println(numberOfSlots);
1568 didFindTableStatus = true;
1569 currMaxSize = numberOfSlots;
1572 void updateExpectedSize() {
1575 if (expectedsize > currMaxSize) {
1576 expectedsize = currMaxSize;
1582 * Check the size of the block chain to make sure there are enough slots sent back by the server.
1583 * This is only called when we have a gap between the slots that we have locally and the slots
1584 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1587 void checkNumSlots(int numberOfSlots) {
1588 if (numberOfSlots != expectedsize) {
1589 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots);
1593 void updateCurrMaxSize(int newmaxsize) {
1594 currMaxSize = newmaxsize;
1599 * Update the size of of the local buffer if it is needed.
1601 void commitNewMaxSize() {
1602 didFindTableStatus = false;
1604 // Resize the local slot buffer
1605 if (numberOfSlots != currMaxSize) {
1606 buffer.resize((int)currMaxSize);
1609 // Change the number of local slots to the new size
1610 numberOfSlots = (int)currMaxSize;
1613 // Recalculate the resize threshold since the size of the local buffer has changed
1614 setResizeThreshold();
1618 * Process the new transaction parts from this latest round of slots received from the server
1620 void processNewTransactionParts() {
1622 if (newTransactionParts.size() == 0) {
1623 // Nothing new to process
1627 // Iterate through all the machine Ids that we received new parts for
1628 for (Long machineId : newTransactionParts.keySet()) {
1629 Hashtable<Pair<int64_t int32_t>, TransactionPart> parts = newTransactionParts.get(machineId);
1631 // Iterate through all the parts for that machine Id
1632 for (Pair<int64_t int32_t> partId : parts.keySet()) {
1633 TransactionPart part = parts.get(partId);
1635 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1636 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) {
1637 // Set dead the transaction part
1642 // Get the transaction object for that sequence number
1643 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1645 if (transaction == NULL) {
1646 // This is a new transaction that we dont have so make a new one
1647 transaction = new Transaction();
1649 // Insert this new transaction into the live tables
1650 liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1651 liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1654 // Add that part to the transaction
1655 transaction.addPartDecode(part);
1659 // Clear all the new transaction parts in preparation for the next time the server sends slots
1660 newTransactionParts.clear();
1664 int64_t lastSeqNumArbOn = 0;
1666 void arbitrateFromServer() {
1668 if (liveTransactionBySequenceNumberTable.size() == 0) {
1669 // Nothing to arbitrate on so move on
1673 // Get the transaction sequence numbers and sort from oldest to newest
1674 Vector<Long> transactionSequenceNumbers = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
1675 Collections.sort(transactionSequenceNumbers);
1677 // Collection of key value pairs that are
1678 Hashtable<IoTString, KeyValue> speculativeTableTmp = new Hashtable<IoTString, KeyValue>();
1680 // The last transaction arbitrated on
1681 int64_t lastTransactionCommitted = -1;
1682 Set<Abort> generatedAborts = new HashSet<Abort>();
1684 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1685 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1689 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1690 if (transaction.getArbitrator() != localMachineId) {
1694 if (transactionSequenceNumber < lastSeqNumArbOn) {
1698 if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1699 // We have seen this already locally so dont commit again
1704 if (!transaction.isComplete()) {
1705 // Will arbitrate in incorrect order if we continue so just break
1711 // update the largest transaction seen by arbitrator from server
1712 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) {
1713 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1715 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1716 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1717 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1721 if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1722 // Guard evaluated as true
1724 // Update the local changes so we can make the commit
1725 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1726 speculativeTableTmp.put(kv.getKey(), kv);
1729 // Update what the last transaction committed was for use in batch commit
1730 lastTransactionCommitted = transactionSequenceNumber;
1732 // Guard evaluated was false so create abort
1735 Abort newAbort = new Abort(NULL,
1736 transaction.getClientLocalSequenceNumber(),
1737 transaction.getSequenceNumber(),
1738 transaction.getMachineId(),
1739 transaction.getArbitrator(),
1740 localArbitrationSequenceNumber);
1741 localArbitrationSequenceNumber++;
1743 generatedAborts.add(newAbort);
1745 // Insert the abort so we can process
1746 processEntry(newAbort);
1749 lastSeqNumArbOn = transactionSequenceNumber;
1751 // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1754 Commit newCommit = NULL;
1756 // If there is something to commit
1757 if (speculativeTableTmp.size() != 0) {
1759 // Create the commit and increment the commit sequence number
1760 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1761 localArbitrationSequenceNumber++;
1763 // Add all the new keys to the commit
1764 for (KeyValue kv : speculativeTableTmp.values()) {
1765 newCommit.addKV(kv);
1768 // create the commit parts
1769 newCommit.createCommitParts();
1771 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1773 // Insert the commit so we can process it
1774 for (CommitPart commitPart : newCommit.getParts().values()) {
1775 processEntry(commitPart);
1779 if ((newCommit != NULL) || (generatedAborts.size() > 0)) {
1780 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1781 pendingSendArbitrationRounds.add(arbitrationRound);
1783 if (compactArbitrationData()) {
1784 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1785 if (newArbitrationRound.getCommit() != NULL) {
1786 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1787 processEntry(commitPart);
1794 Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction) {
1796 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1797 if (transaction.getArbitrator() != localMachineId) {
1798 return new Pair<bool, bool>(false, false);
1801 if (!transaction.isComplete()) {
1802 // Will arbitrate in incorrect order if we continue so just break
1804 return new Pair<bool, bool>(false, false);
1807 if (transaction.getMachineId() != localMachineId) {
1808 // dont do this check for local transactions
1809 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
1810 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1811 // We've have already seen this from the server
1812 return new Pair<bool, bool>(false, false);
1817 if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1818 // Guard evaluated as true
1820 // Create the commit and increment the commit sequence number
1821 Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1822 localArbitrationSequenceNumber++;
1824 // Update the local changes so we can make the commit
1825 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1826 newCommit.addKV(kv);
1829 // create the commit parts
1830 newCommit.createCommitParts();
1832 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1833 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1834 pendingSendArbitrationRounds.add(arbitrationRound);
1836 if (compactArbitrationData()) {
1837 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1838 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1839 processEntry(commitPart);
1842 // Insert the commit so we can process it
1843 for (CommitPart commitPart : newCommit.getParts().values()) {
1844 processEntry(commitPart);
1848 if (transaction.getMachineId() == localMachineId) {
1849 TransactionStatus status = transaction.getTransactionStatus();
1850 if (status != NULL) {
1851 status.setStatus(TransactionStatus.StatusCommitted);
1855 updateLiveStateFromLocal();
1856 return new Pair<bool, bool>(true, true);
1859 if (transaction.getMachineId() == localMachineId) {
1860 // For locally created messages update the status
1862 // Guard evaluated was false so create abort
1863 TransactionStatus status = transaction.getTransactionStatus();
1864 if (status != NULL) {
1865 status.setStatus(TransactionStatus.StatusAborted);
1868 Set addAbortSet = new HashSet<Abort>();
1872 Abort newAbort = new Abort(NULL,
1873 transaction.getClientLocalSequenceNumber(),
1875 transaction.getMachineId(),
1876 transaction.getArbitrator(),
1877 localArbitrationSequenceNumber);
1878 localArbitrationSequenceNumber++;
1880 addAbortSet.add(newAbort);
1883 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1884 ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1885 pendingSendArbitrationRounds.add(arbitrationRound);
1887 if (compactArbitrationData()) {
1888 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1889 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1890 processEntry(commitPart);
1895 updateLiveStateFromLocal();
1896 return new Pair<bool, bool>(true, false);
1901 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1903 bool compactArbitrationData() {
1905 if (pendingSendArbitrationRounds.size() < 2) {
1906 // Nothing to compact so do nothing
1910 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1911 if (lastRound.didSendPart()) {
1915 bool hadCommit = (lastRound.getCommit() == NULL);
1916 bool gotNewCommit = false;
1918 int numberToDelete = 1;
1919 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1920 ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1922 if (round.isFull() || round.didSendPart()) {
1923 // Stop since there is a part that cannot be compacted and we need to compact in order
1927 if (round.getCommit() == NULL) {
1929 // Try compacting aborts only
1930 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1931 if (newSize > ArbitrationRound.MAX_PARTS) {
1932 // Cant compact since it would be too large
1935 lastRound.addAborts(round.getAborts());
1938 // Create a new larger commit
1939 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1940 localArbitrationSequenceNumber++;
1942 // Create the commit parts so that we can count them
1943 newCommit.createCommitParts();
1945 // Calculate the new size of the parts
1946 int newSize = newCommit.getNumberOfParts();
1947 newSize += lastRound.getAbortsCount();
1948 newSize += round.getAbortsCount();
1950 if (newSize > ArbitrationRound.MAX_PARTS) {
1951 // Cant compact since it would be too large
1955 // Set the new compacted part
1956 lastRound.setCommit(newCommit);
1957 lastRound.addAborts(round.getAborts());
1958 gotNewCommit = true;
1964 if (numberToDelete != 1) {
1965 // If there is a compaction
1967 // Delete the previous pieces that are now in the new compacted piece
1968 if (numberToDelete == pendingSendArbitrationRounds.size()) {
1969 pendingSendArbitrationRounds.clear();
1971 for (int i = 0; i < numberToDelete; i++) {
1972 pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1976 // Add the new compacted into the pending to send list
1977 pendingSendArbitrationRounds.add(lastRound);
1979 // Should reinsert into the commit processor
1980 if (hadCommit && gotNewCommit) {
1987 // bool compactArbitrationData() {
1992 * Update all the commits and the committed tables, sets dead the dead transactions
1994 bool updateCommittedTable() {
1996 if (newCommitParts.size() == 0) {
1997 // Nothing new to process
2001 // Iterate through all the machine Ids that we received new parts for
2002 for (Long machineId : newCommitParts.keySet()) {
2003 Hashtable<Pair<int64_t int32_t>, CommitPart> parts = newCommitParts.get(machineId);
2005 // Iterate through all the parts for that machine Id
2006 for (Pair<int64_t int32_t> partId : parts.keySet()) {
2007 CommitPart part = parts.get(partId);
2009 // Get the transaction object for that sequence number
2010 Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2012 if (commitForClientTable == NULL) {
2013 // This is the first commit from this device
2014 commitForClientTable = new Hashtable<int64_t Commit>();
2015 liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2018 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2020 if (commit == NULL) {
2021 // This is a new commit that we dont have so make a new one
2022 commit = new Commit();
2024 // Insert this new commit into the live tables
2025 commitForClientTable.put(part.getSequenceNumber(), commit);
2028 // Add that part to the commit
2029 commit.addPartDecode(part);
2033 // Clear all the new commits parts in preparation for the next time the server sends slots
2034 newCommitParts.clear();
2036 // If we process a new commit keep track of it for future use
2037 bool didProcessANewCommit = false;
2039 // Process the commits one by one
2040 for (Long arbitratorId : liveCommitsTable.keySet()) {
2042 // Get all the commits for a specific arbitrator
2043 Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2045 // Sort the commits in order
2046 Vector<Long> commitSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
2047 Collections.sort(commitSequenceNumbers);
2049 // Get the last commit seen from this arbitrator
2050 int64_t lastCommitSeenSequenceNumber = -1;
2051 if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) {
2052 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2055 // Go through each new commit one by one
2056 for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2057 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2058 Commit commit = commitForClientTable.get(commitSequenceNumber);
2060 // Special processing if a commit is not complete
2061 if (!commit.isComplete()) {
2062 if (i == (commitSequenceNumbers.size() - 1)) {
2063 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2066 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2067 // Delete it and move on
2069 commitForClientTable.remove(commit.getSequenceNumber());
2074 // Update the last transaction that was updated if we can
2075 if (commit.getTransactionSequenceNumber() != -1) {
2076 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2078 // Update the last transaction sequence number that the arbitrator arbitrated on
2079 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2080 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2084 // Update the last arbitration data that we have seen so far
2085 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) {
2087 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2088 if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2090 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2093 // Never seen any data from this arbitrator so record the first one
2094 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2097 // We have already seen this commit before so need to do the full processing on this commit
2098 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2100 // Update the last transaction that was updated if we can
2101 if (commit.getTransactionSequenceNumber() != -1) {
2102 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2104 // Update the last transaction sequence number that the arbitrator arbitrated on
2105 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2106 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2113 // If we got here then this is a brand new commit and needs full processing
2115 // Get what commits should be edited, these are the commits that have live values for their keys
2116 Set<Commit> commitsToEdit = new HashSet<Commit>();
2117 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2118 commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2120 commitsToEdit.remove(NULL); // remove NULL since it could be in this set
2122 // Update each previous commit that needs to be updated
2123 for (Commit previousCommit : commitsToEdit) {
2125 // Only bother with live commits (TODO: Maybe remove this check)
2126 if (previousCommit.isLive()) {
2128 // Update which keys in the old commits are still live
2129 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2130 previousCommit.invalidateKey(kv.getKey());
2133 // if the commit is now dead then remove it
2134 if (!previousCommit.isLive()) {
2135 commitForClientTable.remove(previousCommit);
2140 // Update the last seen sequence number from this arbitrator
2141 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) {
2142 if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2143 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2146 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2149 // We processed a new commit that we havent seen before
2150 didProcessANewCommit = true;
2152 // Update the committed table of keys and which commit is using which key
2153 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2154 committedKeyValueTable.put(kv.getKey(), kv);
2155 liveCommitsByKeyTable.put(kv.getKey(), commit);
2160 return didProcessANewCommit;
2164 * Create the speculative table from transactions that are still live and have come from the cloud
2166 bool updateSpeculativeTable(bool didProcessNewCommits) {
2167 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2168 // There is nothing to speculate on
2172 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2173 Vector<Long> transactionSequenceNumbersSorted = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
2174 Collections.sort(transactionSequenceNumbersSorted);
2176 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2179 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2180 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2181 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2183 // Start from scratch
2184 speculatedKeyValueTable.clear();
2185 lastTransactionSequenceNumberSpeculatedOn = -1;
2186 oldestTransactionSequenceNumberSpeculatedOn = -1;
2190 // Remember the front of the transaction list
2191 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2193 // Find where to start arbitration from
2194 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2196 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2197 // Make sure we are not out of bounds
2198 return false; // did not speculate
2201 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2202 bool didSkip = true;
2204 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2205 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2206 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2208 if (!transaction.isComplete()) {
2209 // If there is an incomplete transaction then there is nothing we can do
2210 // add this transactions arbitrator to the list of arbitrators we should ignore
2211 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2216 if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2220 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2222 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2223 // Guard evaluated to true so update the speculative table
2224 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2225 speculatedKeyValueTable.put(kv.getKey(), kv);
2231 // Since there was a skip we need to redo the speculation next time around
2232 lastTransactionSequenceNumberSpeculatedOn = -1;
2233 oldestTransactionSequenceNumberSpeculatedOn = -1;
2236 // We did some speculation
2241 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2243 void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2244 if (pendingTransactionQueue.size() == 0) {
2245 // There is nothing to speculate on
2250 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2251 // need to reset on the pending speculation
2252 lastPendingTransactionSpeculatedOn = NULL;
2253 firstPendingTransaction = pendingTransactionQueue.get(0);
2254 pendingTransactionSpeculatedKeyValueTable.clear();
2257 // Find where to start arbitration from
2258 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2260 if (startIndex >= pendingTransactionQueue.size()) {
2261 // Make sure we are not out of bounds
2265 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2266 Transaction transaction = pendingTransactionQueue.get(i);
2268 lastPendingTransactionSpeculatedOn = transaction;
2270 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2271 // Guard evaluated to true so update the speculative table
2272 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2273 pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2280 * Set dead and remove from the live transaction tables the transactions that are dead
2282 void updateLiveTransactionsAndStatus() {
2284 // Go through each of the transactions
2285 for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2286 Transaction transaction = iter.next().getValue();
2288 // Check if the transaction is dead
2289 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2290 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2292 // Set dead the transaction
2293 transaction.setDead();
2295 // Remove the transaction from the live table
2297 liveTransactionByTransactionIdTable.remove(transaction.getId());
2301 // Go through each of the transactions
2302 for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2303 TransactionStatus status = iter.next().getValue();
2305 // Check if the transaction is dead
2306 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2307 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2310 status.setStatus(TransactionStatus.StatusCommitted);
2319 * Process this slot, entry by entry. Also update the latest message sent by slot
2321 void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2323 // Update the last message seen
2324 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2326 // Process each entry in the slot
2327 for (Entry entry : slot.getEntries()) {
2328 switch (entry.getType()) {
2330 case Entry.TypeCommitPart:
2331 processEntry((CommitPart)entry);
2334 case Entry.TypeAbort:
2335 processEntry((Abort)entry);
2338 case Entry.TypeTransactionPart:
2339 processEntry((TransactionPart)entry);
2342 case Entry.TypeNewKey:
2343 processEntry((NewKey)entry);
2346 case Entry.TypeLastMessage:
2347 processEntry((LastMessage)entry, machineSet);
2350 case Entry.TypeRejectedMessage:
2351 processEntry((RejectedMessage)entry, indexer);
2354 case Entry.TypeTableStatus:
2355 processEntry((TableStatus)entry, slot.getSequenceNumber());
2359 throw new Error("Unrecognized type: " + entry.getType());
2365 * Update the last message that was sent for a machine Id
2367 void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2368 // Update what the last message received by a machine was
2369 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2373 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2375 void processEntry(NewKey entry) {
2377 // Update the arbitrator table with the new key information
2378 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2380 // Update what the latest live new key is
2381 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2382 if (oldNewKey != NULL) {
2383 // Delete the old new key messages
2384 oldNewKey.setDead();
2389 * Process new table status entries and set dead the old ones as new ones come in.
2390 * keeps track of the largest and smallest table status seen in this current round
2391 * of updating the local copy of the block chain
2393 void processEntry(TableStatus entry, int64_t seq) {
2394 int newNumSlots = entry.getMaxSlots();
2395 updateCurrMaxSize(newNumSlots);
2397 initExpectedSize(seq, newNumSlots);
2399 if (liveTableStatus != NULL) {
2400 // We have a larger table status so the old table status is no int64_ter alive
2401 liveTableStatus.setDead();
2404 // Make this new table status the latest alive table status
2405 liveTableStatus = entry;
2409 * Check old messages to see if there is a block chain violation. Also
2411 void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2412 int64_t oldSeqNum = entry.getOldSeqNum();
2413 int64_t newSeqNum = entry.getNewSeqNum();
2414 bool isequal = entry.getEqual();
2415 int64_t machineId = entry.getMachineID();
2416 int64_t seq = entry.getSequenceNumber();
2419 // Check if we have messages that were supposed to be rejected in our local block chain
2420 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2423 Slot slot = indexer.getSlot(seqNum);
2426 // If we have this slot make sure that it was not supposed to be a rejected slot
2428 int64_t slotMachineId = slot.getMachineID();
2429 if (isequal != (slotMachineId == machineId)) {
2430 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2436 // Create a list of clients to watch until they see this rejected message entry.
2437 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2438 for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
2440 // Machine ID for the last message entry
2441 int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
2443 // We've seen it, don't need to continue to watch. Our next
2444 // message will implicitly acknowledge it.
2445 if (lastMessageEntryMachineId == localMachineId) {
2449 Pair<int64_t Liveness> lastMessageValue = lastMessageEntry.getValue();
2450 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2452 if (entrySequenceNumber < seq) {
2454 // Add this rejected message to the set of messages that this machine ID did not see yet
2455 addWatchVector(lastMessageEntryMachineId, entry);
2457 // This client did not see this rejected message yet so add it to the watch set to monitor
2458 deviceWatchSet.add(lastMessageEntryMachineId);
2462 if (deviceWatchSet.isEmpty()) {
2463 // This rejected message has been seen by all the clients so
2466 // We need to watch this rejected message
2467 entry.setWatchSet(deviceWatchSet);
2472 * Check if this abort is live, if not then save it so we can kill it later.
2473 * update the last transaction number that was arbitrated on.
2475 void processEntry(Abort entry) {
2478 if (entry.getTransactionSequenceNumber() != -1) {
2479 // update the transaction status if it was sent to the server
2480 TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2481 if (status != NULL) {
2482 status.setStatus(TransactionStatus.StatusAborted);
2486 // Abort has not been seen by the client it is for yet so we need to keep track of it
2487 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2488 if (previouslySeenAbort != NULL) {
2489 previouslySeenAbort.setDead();// Delete old version of the abort since we got a rescued newer version
2492 if (entry.getTransactionArbitrator() == localMachineId) {
2493 liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2496 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2498 // The machine already saw this so it is dead
2500 liveAbortTable.remove(entry.getAbortId());
2502 if (entry.getTransactionArbitrator() == localMachineId) {
2503 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2512 // Update the last arbitration data that we have seen so far
2513 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) {
2515 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2516 if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2518 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2521 // Never seen any data from this arbitrator so record the first one
2522 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2526 // Set dead a transaction if we can
2527 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<int64_t, int64_t>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2528 if (transactionToSetDead != NULL) {
2529 liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2532 // Update the last transaction sequence number that the arbitrator arbitrated on
2533 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2534 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2537 if (entry.getTransactionSequenceNumber() != -1) {
2538 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2544 * Set dead the transaction part if that transaction is dead and keep track of all new parts
2546 void processEntry(TransactionPart entry) {
2547 // Check if we have already seen this transaction and set it dead OR if it is not alive
2548 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2549 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2550 // This transaction is dead, it was already committed or aborted
2555 // This part is still alive
2556 Hashtable<Pair<int64_t int32_t>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2558 if (transactionPart == NULL) {
2559 // Dont have a table for this machine Id yet so make one
2560 transactionPart = new Hashtable<Pair<int64_t int32_t>, TransactionPart>();
2561 newTransactionParts.put(entry.getMachineId(), transactionPart);
2564 // Update the part and set dead ones we have already seen (got a rescued version)
2565 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2566 if (previouslySeenPart != NULL) {
2567 previouslySeenPart.setDead();
2572 * Process new commit entries and save them for future use. Delete duplicates
2574 void processEntry(CommitPart entry) {
2577 // Update the last transaction that was updated if we can
2578 if (entry.getTransactionSequenceNumber() != -1) {
2579 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2581 // Update the last transaction sequence number that the arbitrator arbitrated on
2582 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2583 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2590 Hashtable<Pair<int64_t int32_t>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2592 if (commitPart == NULL) {
2593 // Don't have a table for this machine Id yet so make one
2594 commitPart = new Hashtable<Pair<int64_t int32_t>, CommitPart>();
2595 newCommitParts.put(entry.getMachineId(), commitPart);
2598 // Update the part and set dead ones we have already seen (got a rescued version)
2599 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2600 if (previouslySeenPart != NULL) {
2601 previouslySeenPart.setDead();
2606 * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
2607 * Updates the live aborts, removes those that are dead and sets them dead.
2608 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2609 * other clients have not had a rollback on the last message.
2611 void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2613 // We have seen this machine ID
2614 machineSet.remove(machineId);
2616 // Get the set of rejected messages that this machine Id is has not seen yet
2617 HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
2619 // If there is a rejected message that this machine Id has not seen yet
2620 if (watchset != NULL) {
2622 // Go through each rejected message that this machine Id has not seen yet
2623 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2625 RejectedMessage rm = rmit.next();
2627 // If this machine Id has seen this rejected message...
2628 if (rm.getSequenceNumber() <= seqNum) {
2630 // Remove it from our watchlist
2633 // Decrement machines that need to see this notification
2634 rm.removeWatcher(machineId);
2639 // Set dead the abort
2640 for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2641 Abort abort = i.next().getValue();
2643 if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2647 if (abort.getTransactionArbitrator() == localMachineId) {
2648 liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2655 if (machineId == localMachineId) {
2656 // Our own messages are immediately dead.
2657 if (liveness instanceof LastMessage) {
2658 ((LastMessage)liveness).setDead();
2659 } else if (liveness instanceof Slot) {
2660 ((Slot)liveness).setDead();
2662 throw new Error("Unrecognized type");
2666 // Get the old last message for this device
2667 Pair<int64_t Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<int64_t Liveness>(seqNum, liveness));
2668 if (lastMessageEntry == NULL) {
2669 // If no last message then there is nothing else to process
2673 int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2674 Liveness lastEntry = lastMessageEntry.getSecond();
2676 // If it is not our machine Id since we already set ours to dead
2677 if (machineId != localMachineId) {
2678 if (lastEntry instanceof LastMessage) {
2679 ((LastMessage)lastEntry).setDead();
2680 } else if (lastEntry instanceof Slot) {
2681 ((Slot)lastEntry).setDead();
2683 throw new Error("Unrecognized type");
2687 // Make sure the server is not playing any games
2688 if (machineId == localMachineId) {
2690 if (hadPartialSendToServer) {
2691 // We were not making any updates and we had a machine mismatch
2692 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2693 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2697 // We were not making any updates and we had a machine mismatch
2698 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2699 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2703 if (lastMessageSeqNum > seqNum) {
2704 throw new Error("Server Error: Rollback on remote machine sequence number");
2710 * Add a rejected message entry to the watch set to keep track of which clients have seen that
2711 * rejected message entry and which have not.
2713 void addWatchVector(int64_t machineId, RejectedMessage entry) {
2714 HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
2715 if (entries == NULL) {
2716 // There is no set for this machine ID yet so create one
2717 entries = new HashSet<RejectedMessage>();
2718 rejectedMessageWatchVectorTable.put(machineId, entries);
2724 * Check if the HMAC chain is not violated
2726 void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2727 for (int i = 0; i < newSlots.length; i++) {
2728 Slot currSlot = newSlots[i];
2729 Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2730 if (prevSlot != NULL &&
2731 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2732 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);