2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.nio.ByteBuffer;
17 import java.util.concurrent.Semaphore;
21 * IoTTable data structure. Provides client inferface.
22 * @author Brian Demsky
26 final public class Table {
27 private int numslots; //number of slots stored in buffer
29 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
30 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
32 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
33 private Vector<Long> rejectedmessagelist = new Vector<Long>();
34 private SlotBuffer buffer;
35 private CloudComm cloud;
36 private long sequencenumber; //Largest sequence number a client has received
37 private long localmachineid;
38 private TableStatus lastTableStatus;
39 static final int FREE_SLOTS = 10; //number of slots that should be kept free
40 static final int SKIP_THRESHOLD = 10;
41 private long liveslotcount = 0;
43 static final double RESIZE_MULTIPLE = 1.2;
44 static final double RESIZE_THRESHOLD = 0.75;
45 static final int REJECTED_THRESHOLD = 5;
46 private int resizethreshold;
47 private long lastliveslotseqn; //smallest sequence number with a live entry
48 private Random random = new Random();
49 private long lastUncommittedTransaction = 0;
51 private int smallestTableStatusSeen = -1;
52 private int largestTableStatusSeen = -1;
53 private int lastSeenPendingTransactionSpeculateIndex = 0;
54 private int commitSequenceNumber = 0;
55 private long localTransactionSequenceNumber = 0;
57 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
58 private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
59 private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
60 private Map<Long, Abort> abortMap = null; // Set of the live aborts
61 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
62 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
63 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
64 private Map<Long, Transaction> uncommittedTransactionsMap = null;
65 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
66 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
67 private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
68 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
69 private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
70 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
71 private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
72 private List<Commit> pendingCommitsList = null;
73 private List<Commit> pendingCommitsToDelete = null;
74 private Map<Long, LocalComm> localCommunicationChannels;
75 private Map<Long, TransactionStatus> transactionStatusMap = null;
76 private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
78 private Semaphore mutex = null;
82 public Table(String hostname, String baseurl, String password, long _localmachineid) {
83 localmachineid = _localmachineid;
84 buffer = new SlotBuffer();
85 numslots = buffer.capacity();
88 cloud = new CloudComm(this, hostname, baseurl, password);
94 public Table(CloudComm _cloud, long _localmachineid) {
95 localmachineid = _localmachineid;
96 buffer = new SlotBuffer();
97 numslots = buffer.capacity();
105 private void setupDataStructs() {
106 pendingTransQueue = new LinkedList<PendingTransaction>();
107 commitMap = new HashMap<Long, Map<Long, Commit>>();
108 abortMap = new HashMap<Long, Abort>();
109 committedMapByKey = new HashMap<IoTString, Commit>();
110 commitedTable = new HashMap<IoTString, KeyValue>();
111 speculativeTable = new HashMap<IoTString, KeyValue>();
112 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
113 arbitratorTable = new HashMap<IoTString, Long>();
114 newKeyTable = new HashMap<IoTString, NewKey>();
115 newCommitMap = new HashMap<Long, Map<Long, Commit>>();
116 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
117 lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
118 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
119 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
120 pendingCommitsList = new LinkedList<Commit>();
121 pendingCommitsToDelete = new LinkedList<Commit>();
122 localCommunicationChannels = new HashMap<Long, LocalComm>();
123 transactionStatusMap = new HashMap<Long, TransactionStatus>();
124 transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
125 mutex = new Semaphore(1);
128 public void initTable() throws ServerException {
129 cloud.setSalt();//Set the salt
130 Slot s = new Slot(this, 1, localmachineid);
131 TableStatus status = new TableStatus(s, numslots);
133 Slot[] array = cloud.putSlot(s, numslots);
135 array = new Slot[] {s};
136 /* update data structure */
137 validateandupdate(array, true);
139 throw new Error("Error on initialization");
143 public void rebuild() throws ServerException, InterruptedException {
145 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
146 validateandupdate(newslots, true);
150 // TODO: delete method
151 public void printSlots() {
152 long o = buffer.getOldestSeqNum();
153 long n = buffer.getNewestSeqNum();
155 int[] types = new int[10];
161 for (long i = o; i < (n + 1); i++) {
162 Slot s = buffer.getSlot(i);
164 Vector<Entry> entries = s.getEntries();
166 for (Entry e : entries) {
168 int type = e.getType();
169 types[type] = types[type] + 1;
178 for (int i = 0; i < 10; i++) {
179 System.out.println(i + " " + types[i]);
181 System.out.println("Live count: " + livec);
182 System.out.println("Dead count: " + deadc);
183 System.out.println("Old: " + o);
184 System.out.println("New: " + n);
185 System.out.println("Size: " + buffer.size());
186 System.out.println("Commits Key Map: " + commitedTable.size());
187 // System.out.println("Commits Live Map: " + commitMap.size());
188 System.out.println("Pending: " + pendingTransQueue.size());
190 // List<IoTString> strList = new ArrayList<IoTString>();
191 // for (int i = 0; i < 100; i++) {
192 // String keyA = "a" + i;
193 // String keyB = "b" + i;
194 // String keyC = "c" + i;
195 // String keyD = "d" + i;
197 // IoTString iKeyA = new IoTString(keyA);
198 // IoTString iKeyB = new IoTString(keyB);
199 // IoTString iKeyC = new IoTString(keyC);
200 // IoTString iKeyD = new IoTString(keyD);
202 // strList.add(iKeyA);
203 // strList.add(iKeyB);
204 // strList.add(iKeyC);
205 // strList.add(iKeyD);
209 // for (Long l : commitMap.keySet()) {
210 // for (Long l2 : commitMap.get(l).keySet()) {
211 // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
212 // strList.remove(kv.getKey());
213 // System.out.print(kv.getKey() + " ");
218 // System.out.println();
219 // System.out.println();
221 // for (IoTString s : strList) {
222 // System.out.print(s + " ");
224 // System.out.println();
225 // System.out.println(strList.size());
228 public long getId() {
229 return localmachineid;
232 public boolean hasConnection() {
233 return cloud.hasConnection();
236 public String toString() {
237 String retString = " Committed Table: \n";
238 retString += "---------------------------\n";
239 retString += commitedTable.toString();
243 retString += " Speculative Table: \n";
244 retString += "---------------------------\n";
245 retString += speculativeTable.toString();
250 public void addLocalComm(long machineId, LocalComm lc) {
251 localCommunicationChannels.put(machineId, lc);
253 public Long getArbitrator(IoTString key) throws InterruptedException {
256 Long arb = arbitratorTable.get(key);
262 public IoTString getCommitted(IoTString key) throws InterruptedException {
265 KeyValue kv = commitedTable.get(key);
270 return kv.getValue();
276 public IoTString getSpeculative(IoTString key) throws InterruptedException {
280 KeyValue kv = pendingTransSpeculativeTable.get(key);
283 kv = speculativeTable.get(key);
287 kv = commitedTable.get(key);
293 return kv.getValue();
299 public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
303 KeyValue kv = commitedTable.get(key);
305 if (arbitratorTable.get(key) == null) {
306 throw new Error("Key not Found.");
309 // Make sure new key value pair matches the current arbitrator
310 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
311 // TODO: Maybe not throw en error
312 throw new Error("Not all Key Values Match Arbitrator.");
318 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
319 return kv.getValue();
321 pendingTransBuild.addKVGuard(new KeyValue(key, null));
326 public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
330 if (arbitratorTable.get(key) == null) {
331 throw new Error("Key not Found.");
334 // Make sure new key value pair matches the current arbitrator
335 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
336 // TODO: Maybe not throw en error
337 throw new Error("Not all Key Values Match Arbitrator.");
340 KeyValue kv = pendingTransSpeculativeTable.get(key);
343 kv = speculativeTable.get(key);
347 kv = commitedTable.get(key);
353 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
354 return kv.getValue();
356 pendingTransBuild.addKVGuard(new KeyValue(key, null));
361 public Pair<Boolean, Boolean> update() throws InterruptedException {
365 boolean gotLatestFromServer = false;
366 boolean didSendLocal = false;
369 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
370 validateandupdate(newslots, false);
371 gotLatestFromServer = true;
373 if (!pendingTransQueue.isEmpty()) {
375 // We have a pending transaction so do full insertion
376 processPendingTrans();
379 // We dont have a pending transaction so do minimal effort
380 updateWithNotPendingTrans();
386 } catch (Exception e) {
387 // could not update so do nothing
392 return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
395 public Boolean updateFromLocal(long arb) throws InterruptedException {
396 LocalComm lc = localCommunicationChannels.get(arb);
398 // Cant talk directly to arbitrator so cant do anything
402 byte[] array = new byte[Long.BYTES ];
403 ByteBuffer bbEncode = ByteBuffer.wrap(array);
404 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
405 if (lastSeenCommit != null) {
406 bbEncode.putLong(lastSeenCommit);
412 byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
415 ByteBuffer bbDecode = ByteBuffer.wrap(data);
416 boolean didCommit = bbDecode.get() == 1;
417 int numberOfCommites = bbDecode.getInt();
419 List<Commit> newCommits = new LinkedList<Commit>();
420 for (int i = 0; i < numberOfCommites; i++ ) {
422 Commit com = (Commit)Commit.decode(null, bbDecode);
426 for (Commit commit : newCommits) {
427 // Prepare to process the commit
428 processEntry(commit);
431 boolean didCommitOrSpeculate = proccessAllNewCommits();
433 // Go through all uncommitted transactions and kill the ones that are dead
434 deleteDeadUncommittedTransactions();
436 // Speculate on key value pairs
437 didCommitOrSpeculate |= createSpeculativeTable();
438 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
446 public void startTransaction() throws InterruptedException {
447 // Create a new transaction, invalidates any old pending transactions.
448 pendingTransBuild = new PendingTransaction();
451 public void addKV(IoTString key, IoTString value) {
453 if (arbitratorTable.get(key) == null) {
454 throw new Error("Key not Found.");
457 // Make sure new key value pair matches the current arbitrator
458 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
459 // TODO: Maybe not throw en error
460 throw new Error("Not all Key Values Match Arbitrator.");
463 KeyValue kv = new KeyValue(key, value);
464 pendingTransBuild.addKV(kv);
467 public TransactionStatus commitTransaction() throws InterruptedException {
469 if (pendingTransBuild.getKVUpdates().size() == 0) {
471 // transaction with no updates will have no effect on the system
472 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
477 TransactionStatus transStatus = null;
479 if (pendingTransBuild.getArbitrator() != localmachineid) {
481 // set the local sequence number so we can recognize this transaction later
482 pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
483 localTransactionSequenceNumber++;
485 transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
486 transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
488 // Add the pending transaction to the queue
489 pendingTransQueue.add(pendingTransBuild);
492 for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
493 PendingTransaction pt = pendingTransQueue.get(i);
495 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
497 lastSeenPendingTransactionSpeculateIndex = i;
499 for (KeyValue kv : pt.getKVUpdates()) {
500 pendingTransSpeculativeTable.put(kv.getKey(), kv);
506 Transaction ut = new Transaction(null,
509 pendingTransBuild.getArbitrator(),
510 pendingTransBuild.getKVUpdates(),
511 pendingTransBuild.getKVGuard());
513 Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
515 if (retData.getFirst()) {
516 transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
518 transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
522 // Try to insert transactions if possible
523 if (!pendingTransQueue.isEmpty()) {
524 // We have a pending transaction so do full insertion
525 processPendingTrans();
528 // We dont have a pending transaction so do minimal effort
529 updateWithNotPendingTrans();
530 } catch (Exception e) {
535 // reset it so next time is fresh
536 pendingTransBuild = new PendingTransaction();
543 public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
548 if (arbitratorTable.get(keyName) != null) {
549 // There is already an arbitrator
554 if (tryput(keyName, machineId, false)) {
555 // If successfully inserted
560 } catch (ServerException e) {
566 private void processPendingTrans() throws InterruptedException {
568 boolean sentAllPending = false;
570 while (!pendingTransQueue.isEmpty()) {
571 if (tryput( pendingTransQueue.peek(), false)) {
572 pendingTransQueue.poll();
576 // if got here then all pending transactions were sent
577 sentAllPending = true;
578 } catch (Exception e) {
579 // There was a connection error
580 sentAllPending = false;
583 if (!sentAllPending) {
585 for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
586 PendingTransaction pt = i.next();
587 LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
589 // Cant talk directly to arbitrator so cant do anything
594 Transaction ut = new Transaction(null,
597 pendingTransBuild.getArbitrator(),
598 pendingTransBuild.getKVUpdates(),
599 pendingTransBuild.getKVGuard());
602 Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
604 for (Commit commit : retData.getSecond()) {
605 // Prepare to process the commit
606 processEntry(commit);
609 boolean didCommitOrSpeculate = proccessAllNewCommits();
611 // Go through all uncommitted transactions and kill the ones that are dead
612 deleteDeadUncommittedTransactions();
614 // Speculate on key value pairs
615 didCommitOrSpeculate |= createSpeculativeTable();
616 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
618 if (retData.getFirst()) {
619 TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
620 if (transStatus != null) {
621 transStatus.setStatus(TransactionStatus.StatusCommitted);
625 TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
626 if (transStatus != null) {
627 transStatus.setStatus(TransactionStatus.StatusAborted);
635 private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
636 boolean doEnd = false;
637 boolean needResize = false;
638 while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) {
639 boolean resize = needResize;
642 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
644 if (liveslotcount > resizethreshold) {
645 resize = true; //Resize is forced
649 newsize = (int) (numslots * RESIZE_MULTIPLE);
650 TableStatus status = new TableStatus(s, newsize);
654 doRejectedMessages(s);
656 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
658 // Resize was needed so redo call
659 if (retTup.getFirst()) {
664 // Extract working variables
665 boolean seenliveslot = retTup.getSecond();
666 long seqn = retTup.getThird();
668 // Did need to arbitrate
669 doEnd = !doArbitration(s);
671 doOptionalRescue(s, seenliveslot, seqn, resize);
678 Slot[] array = cloud.putSlot(s, max);
680 array = new Slot[] {s};
681 rejectedmessagelist.clear();
683 // Delete pending commits that were sent to the cloud
684 deletePendingCommits();
687 if (array.length == 0)
688 throw new Error("Server Error: Did not send any slots");
689 rejectedmessagelist.add(s.getSequenceNumber());
693 /* update data structure */
694 validateandupdate(array, true);
698 private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
700 // encode the request
701 byte[] array = new byte[Long.BYTES + ut.getSize()];
702 ByteBuffer bbEncode = ByteBuffer.wrap(array);
703 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
704 if (lastSeenCommit != null) {
705 bbEncode.putLong(lastSeenCommit);
711 byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
714 ByteBuffer bbDecode = ByteBuffer.wrap(data);
715 boolean didCommit = bbDecode.get() == 1;
716 int numberOfCommites = bbDecode.getInt();
718 List<Commit> newCommits = new LinkedList<Commit>();
719 for (int i = 0; i < numberOfCommites; i++ ) {
721 Commit com = (Commit)Commit.decode(null, bbDecode);
725 return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
728 public byte[] localCommInput(byte[] data) throws InterruptedException {
733 ByteBuffer bbDecode = ByteBuffer.wrap(data);
734 long lastSeenCommit = bbDecode.getLong();
736 Transaction ut = null;
737 if (data.length != Long.BYTES) {
739 ut = (Transaction)Transaction.decode(null, bbDecode);
743 // Do the local update and arbitrate
744 Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
748 // Calculate the size of the response
749 int size = Byte.BYTES + Integer.BYTES;
750 for (Commit com : returnData.getSecond()) {
751 size += com.getSize();
754 // encode the response
755 byte[] array = new byte[size];
756 ByteBuffer bbEncode = ByteBuffer.wrap(array);
757 if (returnData.getFirst()) {
758 bbEncode.put((byte)1);
760 bbEncode.put((byte)0);
762 bbEncode.putInt(returnData.getSecond().size());
764 for (Commit com : returnData.getSecond()) {
765 com.encode(bbEncode);
768 return bbEncode.array();
771 private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
773 List<Commit> returnCommits = new ArrayList<Commit>();
775 if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
776 // There is a commit that the other client has not seen yet
778 Map<Long, Commit> cm = commitMap.get(localmachineid);
781 List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
782 Collections.sort(commitKeys);
785 for (int i = (commitKeys.size() - 1); i >= 0; i--) {
786 Commit com = cm.get(commitKeys.get(i));
788 if (com.getSequenceNumber() <= lastCommitSeen) {
791 returnCommits.add((Commit)com.getCopy(null));
797 if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
798 // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
799 // or there is no transaction to process
800 return new Pair<Boolean, List<Commit>>(false, returnCommits);
803 if (!ut.evaluateGuard(commitedTable, null)) {
804 // Guard evaluated as false so return only the commits that the other device has not seen yet
805 return new Pair<Boolean, List<Commit>>(false, returnCommits);
809 Commit commit = new Commit(null,
811 commitSequenceNumber,
813 ut.getkeyValueUpdateSet());
814 commitSequenceNumber = commitSequenceNumber + 1;
816 // Add to the pending commits list
817 pendingCommitsList.add(commit);
819 // Add this commit so we can send it back
820 returnCommits.add(commit);
822 // Prepare to process the commit
823 processEntry(commit);
825 boolean didCommitOrSpeculate = proccessAllNewCommits();
827 // Go through all uncommitted transactions and kill the ones that are dead
828 deleteDeadUncommittedTransactions();
830 // Speculate on key value pairs
831 didCommitOrSpeculate |= createSpeculativeTable();
832 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
834 return new Pair<Boolean, List<Commit>>(true, returnCommits);
837 public void decrementLiveCount() {
841 private void setResizeThreshold() {
842 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
843 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
846 private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
847 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
850 if (liveslotcount > resizethreshold) {
851 resize = true; //Resize is forced
855 newsize = (int) (numslots * RESIZE_MULTIPLE);
856 TableStatus status = new TableStatus(s, newsize);
860 doRejectedMessages(s);
862 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
864 // Resize was needed so redo call
865 if (retTup.getFirst()) {
866 return tryput(pendingTrans, true);
869 // Extract working variables
870 boolean seenliveslot = retTup.getSecond();
871 long seqn = retTup.getThird();
875 Transaction trans = new Transaction(s,
876 s.getSequenceNumber(),
878 pendingTrans.getArbitrator(),
879 pendingTrans.getKVUpdates(),
880 pendingTrans.getKVGuard());
881 boolean insertedTrans = false;
882 if (s.hasSpace(trans)) {
884 insertedTrans = true;
887 doOptionalRescue(s, seenliveslot, seqn, resize);
888 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
890 if (sendRetData.getFirst()) {
891 // update the status and change what the sequence number is for the
892 TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
893 transStatus.setStatus(TransactionStatus.StatusSent);
894 transStatus.setSentTransaction();
895 transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
899 if (sendRetData.getSecond().length != 0) {
900 // insert into the local block chain
901 validateandupdate(sendRetData.getSecond(), true);
904 return sendRetData.getFirst();
907 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
908 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
910 if (liveslotcount > resizethreshold) {
911 resize = true; //Resize is forced
915 newsize = (int) (numslots * RESIZE_MULTIPLE);
916 TableStatus status = new TableStatus(s, newsize);
920 doRejectedMessages(s);
921 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
923 // Resize was needed so redo call
924 if (retTup.getFirst()) {
925 return tryput(keyName, arbMachineid, true);
928 // Extract working variables
929 boolean seenliveslot = retTup.getSecond();
930 long seqn = retTup.getThird();
934 NewKey newKey = new NewKey(s, keyName, arbMachineid);
936 boolean insertedNewKey = false;
937 if (s.hasSpace(newKey)) {
939 insertedNewKey = true;
942 doOptionalRescue(s, seenliveslot, seqn, resize);
943 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
945 if (sendRetData.getSecond().length != 0) {
946 // insert into the local block chain
947 validateandupdate(sendRetData.getSecond(), true);
950 return sendRetData.getFirst();
953 private void doRejectedMessages(Slot s) {
954 if (! rejectedmessagelist.isEmpty()) {
955 /* TODO: We should avoid generating a rejected message entry if
956 * there is already a sufficient entry in the queue (e.g.,
957 * equalsto value of true and same sequence number). */
959 long old_seqn = rejectedmessagelist.firstElement();
960 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
961 long new_seqn = rejectedmessagelist.lastElement();
962 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
967 /* Go through list of missing messages */
968 for (; i < rejectedmessagelist.size(); i++) {
969 long curr_seqn = rejectedmessagelist.get(i);
970 Slot s_msg = buffer.getSlot(curr_seqn);
973 prev_seqn = curr_seqn;
975 /* Generate rejected message entry for missing messages */
976 if (prev_seqn != -1) {
977 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
980 /* Generate rejected message entries for present messages */
981 for (; i < rejectedmessagelist.size(); i++) {
982 long curr_seqn = rejectedmessagelist.get(i);
983 Slot s_msg = buffer.getSlot(curr_seqn);
984 long machineid = s_msg.getMachineID();
985 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
992 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
993 long newestseqnum = buffer.getNewestSeqNum();
994 long oldestseqnum = buffer.getOldestSeqNum();
995 if (lastliveslotseqn < oldestseqnum)
996 lastliveslotseqn = oldestseqnum;
998 long seqn = lastliveslotseqn;
999 boolean seenliveslot = false;
1000 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
1001 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1005 for (; seqn < threshold; seqn++) {
1006 Slot prevslot = buffer.getSlot(seqn);
1007 // Push slot number forward
1009 lastliveslotseqn = seqn;
1011 if (! prevslot.isLive())
1013 seenliveslot = true;
1014 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1015 for (Entry liveentry : liveentries) {
1016 if (s.hasSpace(liveentry)) {
1017 s.addEntry(liveentry);
1018 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
1020 System.out.println("B"); //?
1021 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
1028 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
1031 private boolean doArbitration(Slot s) {
1033 // flag whether we have finished all arbitration
1034 boolean stillHasArbitration = false;
1036 pendingCommitsToDelete.clear();
1038 // First add queue commits
1039 for (Commit commit : pendingCommitsList) {
1040 if (s.hasSpace(commit)) {
1042 pendingCommitsToDelete.add(commit);
1044 // Ran out of space so move on but still not done
1045 stillHasArbitration = true;
1046 return stillHasArbitration;
1051 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1052 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1054 // Sort from oldest to newest
1055 Collections.sort(transSeqNums);
1057 for (Long transNum : transSeqNums) {
1058 Transaction ut = uncommittedTransactionsMap.get(transNum);
1060 // Check if this machine arbitrates for this transaction
1061 if (ut.getArbitrator() != localmachineid ) {
1065 // we did have something to arbitrate on
1066 stillHasArbitration = true;
1068 Entry newEntry = null;
1070 if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
1071 // Guard evaluated as true
1073 // update the local tmp current key set
1074 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
1075 speculativeTableTmp.put(kv.getKey(), kv);
1078 // create the commit
1079 newEntry = new Commit(s,
1080 ut.getSequenceNumber(),
1081 commitSequenceNumber,
1083 ut.getkeyValueUpdateSet());
1084 commitSequenceNumber = commitSequenceNumber + 1;
1089 newEntry = new Abort(s,
1090 ut.getSequenceNumber(),
1092 ut.getArbitrator());
1095 if ((newEntry != null) && s.hasSpace(newEntry)) {
1096 s.addEntry(newEntry);
1102 return stillHasArbitration;
1105 private void deletePendingCommits() {
1106 for (Commit com : pendingCommitsToDelete) {
1107 pendingCommitsList.remove(com);
1109 pendingCommitsToDelete.clear();
1112 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1113 /* now go through live entries from least to greatest sequence number until
1114 * either all live slots added, or the slot doesn't have enough room
1115 * for SKIP_THRESHOLD consecutive entries*/
1117 long newestseqnum = buffer.getNewestSeqNum();
1119 for (; seqn <= newestseqnum; seqn++) {
1120 Slot prevslot = buffer.getSlot(seqn);
1121 //Push slot number forward
1123 lastliveslotseqn = seqn;
1125 if (!prevslot.isLive())
1127 seenliveslot = true;
1128 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1129 for (Entry liveentry : liveentries) {
1130 if (s.hasSpace(liveentry))
1131 s.addEntry(liveentry);
1134 if (skipcount > SKIP_THRESHOLD)
1141 private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
1146 Slot[] array = cloud.putSlot(s, max);
1147 if (array == null) {
1148 array = new Slot[] {s};
1149 rejectedmessagelist.clear();
1151 // Delete pending commits that were sent to the cloud
1152 deletePendingCommits();
1154 // if (array.length == 0)
1155 // throw new Error("Server Error: Did not send any slots");
1156 rejectedmessagelist.add(s.getSequenceNumber());
1160 return new Pair<Boolean, Slot[]>(inserted, array);
1163 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
1164 /* The cloud communication layer has checked slot HMACs already
1166 if (newslots.length == 0) return;
1168 // Reset the table status declared sizes
1169 smallestTableStatusSeen = -1;
1170 largestTableStatusSeen = -1;
1172 long firstseqnum = newslots[0].getSequenceNumber();
1173 if (firstseqnum <= sequencenumber) {
1174 throw new Error("Server Error: Sent older slots!");
1177 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
1178 checkHMACChain(indexer, newslots);
1180 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
1182 // initExpectedSize(firstseqnum);
1183 for (Slot slot : newslots) {
1184 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
1185 // updateExpectedSize();
1188 /* If there is a gap, check to see if the server sent us everything. */
1189 if (firstseqnum != (sequencenumber + 1)) {
1192 checkNumSlots(newslots.length);
1193 if (!machineSet.isEmpty()) {
1194 throw new Error("Missing record for machines: " + machineSet);
1201 /* Commit new to slots. */
1202 for (Slot slot : newslots) {
1203 buffer.putSlot(slot);
1206 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
1208 // Process all on key value pairs
1209 boolean didCommitOrSpeculate = proccessAllNewCommits();
1211 // Go through all uncommitted transactions and kill the ones that are dead
1212 deleteDeadUncommittedTransactions();
1214 // Speculate on key value pairs
1215 didCommitOrSpeculate |= createSpeculativeTable();
1217 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
1220 private boolean proccessAllNewCommits() {
1221 // Process only if there are commit
1222 if (newCommitMap.keySet().size() == 0) {
1225 boolean didProcessNewCommit = false;
1227 for (Long arb : newCommitMap.keySet()) {
1229 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
1231 // Sort from oldest to newest commit
1232 Collections.sort(commitSeqNums);
1234 // Go through each new commit one by one
1235 for (Long entrySeqNum : commitSeqNums) {
1236 Commit entry = newCommitMap.get(arb).get(entrySeqNum);
1238 long lastCommitSeenSeqNum = -1;
1239 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
1240 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
1243 if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
1244 Map<Long, Commit> cm = commitMap.get(arb);
1246 cm = new HashMap<Long, Commit>();
1249 Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
1250 commitMap.put(arb, cm);
1252 if (prevCommit != null) {
1253 prevCommit.setDead();
1255 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
1256 committedMapByKey.put(kv.getKey(), entry);
1263 Set<Commit> commitsToEditSet = new HashSet<Commit>();
1265 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1266 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
1269 commitsToEditSet.remove(null);
1271 for (Commit prevCommit : commitsToEditSet) {
1273 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
1275 if (!prevCommit.isLive()) {
1276 Map<Long, Commit> cm = commitMap.get(arb);
1278 // remove it from the map so that it can be set as dead
1280 cm.remove(prevCommit.getSequenceNumber());
1281 commitMap.put(arb, cm);
1286 // Add the new commit
1287 Map<Long, Commit> cm = commitMap.get(arb);
1289 cm = new HashMap<Long, Commit>();
1291 cm.put(entry.getSequenceNumber(), entry);
1292 commitMap.put(arb, cm);
1294 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
1296 // set the trans sequence number if we are able to
1297 if (entry.getTransSequenceNumber() != -1) {
1298 lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1301 didProcessNewCommit = true;
1303 // Update the committed table list
1304 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1305 IoTString key = kv.getKey();
1306 commitedTable.put(key, kv);
1307 committedMapByKey.put(key, entry);
1311 // Clear the new commits storage so we can use it later
1312 newCommitMap.clear();
1315 // go through all saved transactions and update the status of those that can be updated
1316 for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
1317 Map.Entry<Long, TransactionStatus> entry = i.next();
1318 long seqnum = entry.getKey();
1319 TransactionStatus status = entry.getValue();
1321 if (status.getSentTransaction()) {
1323 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
1324 Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
1326 if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
1327 ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
1328 status.setStatus(TransactionStatus.StatusCommitted);
1334 return didProcessNewCommit;
1337 private void deleteDeadUncommittedTransactions() {
1338 // Make dead the transactions
1339 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
1340 Transaction prevtrans = i.next().getValue();
1341 long transArb = prevtrans.getArbitrator();
1343 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
1344 Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
1346 if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
1347 ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
1349 prevtrans.setDead();
1354 private boolean createSpeculativeTable() {
1355 if (uncommittedTransactionsMap.keySet().size() == 0) {
1359 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1360 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1362 // Sort from oldest to newest commit
1363 Collections.sort(utSeqNums);
1365 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
1367 speculativeTable.clear();
1368 lastUncommittedTransaction = -1;
1370 for (Long key : utSeqNums) {
1371 Transaction trans = uncommittedTransactionsMap.get(key);
1373 lastUncommittedTransaction = key;
1375 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
1376 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1377 speculativeTableTmp.put(kv.getKey(), kv);
1383 for (Long key : utSeqNums) {
1385 if (key <= lastUncommittedTransaction) {
1389 lastUncommittedTransaction = key;
1391 Transaction trans = uncommittedTransactionsMap.get(key);
1393 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
1394 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1395 speculativeTableTmp.put(kv.getKey(), kv);
1401 for (IoTString key : speculativeTableTmp.keySet()) {
1402 speculativeTable.put(key, speculativeTableTmp.get(key));
1408 private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
1410 if (didCommitOrSpeculate) {
1411 pendingTransSpeculativeTable.clear();
1412 lastSeenPendingTransactionSpeculateIndex = 0;
1415 for (PendingTransaction pt : pendingTransQueue) {
1416 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
1418 lastSeenPendingTransactionSpeculateIndex = index;
1421 for (KeyValue kv : pt.getKVUpdates()) {
1422 pendingTransSpeculativeTable.put(kv.getKey(), kv);
1430 private int expectedsize, currmaxsize;
1432 private void checkNumSlots(int numslots) {
1435 // We only have 1 size so we must have this many slots
1436 if (largestTableStatusSeen == smallestTableStatusSeen) {
1437 if (numslots != smallestTableStatusSeen) {
1438 throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
1441 // We have more than 1
1442 if (numslots < smallestTableStatusSeen) {
1443 throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
1447 // if (numslots != expectedsize) {
1448 // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
1452 private void initExpectedSize(long firstsequencenumber) {
1453 long prevslots = firstsequencenumber;
1454 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
1455 currmaxsize = numslots;
1458 private void updateExpectedSize() {
1460 if (expectedsize > currmaxsize) {
1461 expectedsize = currmaxsize;
1465 private void updateCurrMaxSize(int newmaxsize) {
1466 currmaxsize = newmaxsize;
1469 private void commitNewMaxSize() {
1471 if (largestTableStatusSeen == -1) {
1472 currmaxsize = numslots;
1474 currmaxsize = largestTableStatusSeen;
1477 if (numslots != currmaxsize) {
1478 buffer.resize(currmaxsize);
1481 numslots = currmaxsize;
1482 setResizeThreshold();
1485 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1486 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1489 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1490 long oldseqnum = entry.getOldSeqNum();
1491 long newseqnum = entry.getNewSeqNum();
1492 boolean isequal = entry.getEqual();
1493 long machineid = entry.getMachineID();
1494 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
1495 Slot slot = indexer.getSlot(seqnum);
1497 long slotmachineid = slot.getMachineID();
1498 if (isequal != (slotmachineid == machineid)) {
1499 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
1504 HashSet<Long> watchset = new HashSet<Long>();
1505 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
1506 long entry_mid = lastmsg_entry.getKey();
1507 /* We've seen it, don't need to continue to watch. Our next
1508 * message will implicitly acknowledge it. */
1509 if (entry_mid == localmachineid)
1511 Pair<Long, Liveness> v = lastmsg_entry.getValue();
1512 long entry_seqn = v.getFirst();
1513 if (entry_seqn < newseqnum) {
1514 addWatchList(entry_mid, entry);
1515 watchset.add(entry_mid);
1518 if (watchset.isEmpty())
1521 entry.setWatchSet(watchset);
1524 private void processEntry(NewKey entry) {
1525 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1527 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1529 if (oldNewKey != null) {
1530 oldNewKey.setDead();
1534 private void processEntry(Transaction entry) {
1536 long arb = entry.getArbitrator();
1537 Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
1538 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1540 Transaction prevTrans = null;
1542 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1543 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1544 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1545 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1547 prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1550 // Duplicate so delete old copy
1551 if (prevTrans != null) {
1552 prevTrans.setDead();
1556 private void processEntry(Abort entry) {
1557 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1558 // Abort has not been seen yet so we need to keep track of it
1560 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1561 if (prevAbort != null) {
1562 prevAbort.setDead(); // delete old version of the duplicate
1565 if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1566 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1569 // The machine already saw this so it is dead
1573 // Update the status of the transaction and remove it since we are done with this transaction
1574 TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
1575 if (status != null) {
1576 status.setStatus(TransactionStatus.StatusAborted);
1580 private void processEntry(Commit entry) {
1581 Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
1583 if (arbMap == null) {
1584 arbMap = new HashMap<Long, Commit>();
1587 Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
1588 newCommitMap.put(entry.getTransArbitrator(), arbMap);
1590 if (prevCommit != null) {
1591 prevCommit.setDead();
1595 private void processEntry(TableStatus entry) {
1596 int newnumslots = entry.getMaxSlots();
1597 // updateCurrMaxSize(newnumslots);
1598 if (lastTableStatus != null)
1599 lastTableStatus.setDead();
1600 lastTableStatus = entry;
1602 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1603 smallestTableStatusSeen = newnumslots;
1606 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1607 largestTableStatusSeen = newnumslots;
1611 private void addWatchList(long machineid, RejectedMessage entry) {
1612 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1613 if (entries == null)
1614 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1618 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1619 machineSet.remove(machineid);
1621 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1622 if (watchset != null) {
1623 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1624 RejectedMessage rm = rmit.next();
1625 if (rm.getNewSeqNum() <= seqnum) {
1626 /* Remove it from our watchlist */
1628 /* Decrement machines that need to see this notification */
1629 rm.removeWatcher(machineid);
1634 if (machineid == localmachineid) {
1635 /* Our own messages are immediately dead. */
1636 if (liveness instanceof LastMessage) {
1637 ((LastMessage)liveness).setDead();
1638 } else if (liveness instanceof Slot) {
1639 ((Slot)liveness).setDead();
1641 throw new Error("Unrecognized type");
1645 // Set dead the abort
1646 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1647 Abort abort = i.next().getValue();
1649 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1655 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1656 if (lastmsgentry == null)
1659 long lastmsgseqnum = lastmsgentry.getFirst();
1660 Liveness lastentry = lastmsgentry.getSecond();
1661 if (machineid != localmachineid) {
1662 if (lastentry instanceof LastMessage) {
1663 ((LastMessage)lastentry).setDead();
1664 } else if (lastentry instanceof Slot) {
1665 ((Slot)lastentry).setDead();
1667 throw new Error("Unrecognized type");
1671 if (machineid == localmachineid) {
1672 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1673 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqnum + " got: " + lastmsgseqnum);
1675 if (lastmsgseqnum > seqnum)
1676 throw new Error("Server Error: Rollback on remote machine sequence number");
1680 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1681 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1682 for (Entry entry : slot.getEntries()) {
1683 switch (entry.getType()) {
1685 case Entry.TypeNewKey:
1686 processEntry((NewKey)entry);
1689 case Entry.TypeCommit:
1690 processEntry((Commit)entry);
1693 case Entry.TypeAbort:
1694 processEntry((Abort)entry);
1697 case Entry.TypeTransaction:
1698 processEntry((Transaction)entry);
1701 case Entry.TypeLastMessage:
1702 processEntry((LastMessage)entry, machineSet);
1705 case Entry.TypeRejectedMessage:
1706 processEntry((RejectedMessage)entry, indexer);
1709 case Entry.TypeTableStatus:
1710 processEntry((TableStatus)entry);
1714 throw new Error("Unrecognized type: " + entry.getType());
1719 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1720 for (int i = 0; i < newslots.length; i++) {
1721 Slot currslot = newslots[i];
1722 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1723 if (prevslot != null &&
1724 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1725 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);