2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.nio.ByteBuffer;
20 * IoTTable data structure. Provides client inferface.
21 * @author Brian Demsky
25 final public class Table {
26 private int numslots; //number of slots stored in buffer
28 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
29 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
31 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
32 private Vector<Long> rejectedmessagelist = new Vector<Long>();
33 private SlotBuffer buffer;
34 private CloudComm cloud;
35 private long sequencenumber; //Largest sequence number a client has received
36 private long localmachineid;
37 private TableStatus lastTableStatus;
38 static final int FREE_SLOTS = 10; //number of slots that should be kept free
39 static final int SKIP_THRESHOLD = 10;
40 private long liveslotcount = 0;
42 static final double RESIZE_MULTIPLE = 1.2;
43 static final double RESIZE_THRESHOLD = 0.75;
44 static final int REJECTED_THRESHOLD = 5;
45 private int resizethreshold;
46 private long lastliveslotseqn; //smallest sequence number with a live entry
47 private Random random = new Random();
48 private long lastUncommittedTransaction = 0;
50 private int smallestTableStatusSeen = -1;
51 private int largestTableStatusSeen = -1;
52 private int lastSeenPendingTransactionSpeculateIndex = 0;
53 private int commitSequenceNumber = 0;
54 private long localTransactionSequenceNumber = 0;
56 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
57 private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
58 private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
59 private Map<Long, Abort> abortMap = null; // Set of the live aborts
60 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
61 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
62 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
63 private Map<Long, Transaction> uncommittedTransactionsMap = null;
64 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
65 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
66 private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
67 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68 private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
69 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
70 private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
71 private List<Commit> pendingCommitsList = null;
72 private List<Commit> pendingCommitsToDelete = null;
73 private Map<Long, LocalComm> localCommunicationChannels;
74 private Map<Long, TransactionStatus> transactionStatusMap = null;
75 private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
78 public Table(String hostname, String baseurl, String password, long _localmachineid) {
79 localmachineid = _localmachineid;
80 buffer = new SlotBuffer();
81 numslots = buffer.capacity();
84 cloud = new CloudComm(this, hostname, baseurl, password);
90 public Table(CloudComm _cloud, long _localmachineid) {
91 localmachineid = _localmachineid;
92 buffer = new SlotBuffer();
93 numslots = buffer.capacity();
101 private void setupDataStructs() {
102 pendingTransQueue = new LinkedList<PendingTransaction>();
103 commitMap = new HashMap<Long, Map<Long, Commit>>();
104 abortMap = new HashMap<Long, Abort>();
105 committedMapByKey = new HashMap<IoTString, Commit>();
106 commitedTable = new HashMap<IoTString, KeyValue>();
107 speculativeTable = new HashMap<IoTString, KeyValue>();
108 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
109 arbitratorTable = new HashMap<IoTString, Long>();
110 newKeyTable = new HashMap<IoTString, NewKey>();
111 newCommitMap = new HashMap<Long, Map<Long, Commit>>();
112 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
113 lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
114 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
115 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
116 pendingCommitsList = new LinkedList<Commit>();
117 pendingCommitsToDelete = new LinkedList<Commit>();
118 localCommunicationChannels = new HashMap<Long, LocalComm>();
119 transactionStatusMap = new HashMap<Long, TransactionStatus>();
120 transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
123 public void initTable() throws ServerException {
124 cloud.setSalt();//Set the salt
125 Slot s = new Slot(this, 1, localmachineid);
126 TableStatus status = new TableStatus(s, numslots);
128 Slot[] array = cloud.putSlot(s, numslots);
130 array = new Slot[] {s};
131 /* update data structure */
132 validateandupdate(array, true);
134 throw new Error("Error on initialization");
138 public void rebuild() throws ServerException {
139 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
140 validateandupdate(newslots, true);
143 // TODO: delete method
144 public void printSlots() {
145 long o = buffer.getOldestSeqNum();
146 long n = buffer.getNewestSeqNum();
148 int[] types = new int[10];
154 for (long i = o; i < (n + 1); i++) {
155 Slot s = buffer.getSlot(i);
157 Vector<Entry> entries = s.getEntries();
159 for (Entry e : entries) {
161 int type = e.getType();
162 types[type] = types[type] + 1;
171 for (int i = 0; i < 10; i++) {
172 System.out.println(i + " " + types[i]);
174 System.out.println("Live count: " + livec);
175 System.out.println("Dead count: " + deadc);
176 System.out.println("Old: " + o);
177 System.out.println("New: " + n);
178 System.out.println("Size: " + buffer.size());
179 System.out.println("Commits Key Map: " + commitedTable.size());
180 // System.out.println("Commits Live Map: " + commitMap.size());
181 System.out.println("Pending: " + pendingTransQueue.size());
183 // List<IoTString> strList = new ArrayList<IoTString>();
184 // for (int i = 0; i < 100; i++) {
185 // String keyA = "a" + i;
186 // String keyB = "b" + i;
187 // String keyC = "c" + i;
188 // String keyD = "d" + i;
190 // IoTString iKeyA = new IoTString(keyA);
191 // IoTString iKeyB = new IoTString(keyB);
192 // IoTString iKeyC = new IoTString(keyC);
193 // IoTString iKeyD = new IoTString(keyD);
195 // strList.add(iKeyA);
196 // strList.add(iKeyB);
197 // strList.add(iKeyC);
198 // strList.add(iKeyD);
202 // for (Long l : commitMap.keySet()) {
203 // for (Long l2 : commitMap.get(l).keySet()) {
204 // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
205 // strList.remove(kv.getKey());
206 // System.out.print(kv.getKey() + " ");
211 // System.out.println();
212 // System.out.println();
214 // for (IoTString s : strList) {
215 // System.out.print(s + " ");
217 // System.out.println();
218 // System.out.println(strList.size());
221 public long getId() {
222 return localmachineid;
225 public boolean hasConnection() {
226 return cloud.hasConnection();
229 public String toString() {
230 String retString = " Committed Table: \n";
231 retString += "---------------------------\n";
232 retString += commitedTable.toString();
236 retString += " Speculative Table: \n";
237 retString += "---------------------------\n";
238 retString += speculativeTable.toString();
243 public void addLocalComm(long machineId, LocalComm lc) {
244 localCommunicationChannels.put(machineId, lc);
246 public Long getArbitrator(IoTString key) {
247 return arbitratorTable.get(key);
250 public IoTString getCommitted(IoTString key) {
251 KeyValue kv = commitedTable.get(key);
253 return kv.getValue();
259 public IoTString getSpeculative(IoTString key) {
260 KeyValue kv = pendingTransSpeculativeTable.get(key);
263 kv = speculativeTable.get(key);
267 kv = commitedTable.get(key);
271 return kv.getValue();
277 public IoTString getCommittedAtomic(IoTString key) {
278 KeyValue kv = commitedTable.get(key);
280 if (arbitratorTable.get(key) == null) {
281 throw new Error("Key not Found.");
284 // Make sure new key value pair matches the current arbitrator
285 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
286 // TODO: Maybe not throw en error
287 throw new Error("Not all Key Values Match Arbitrator.");
291 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
292 return kv.getValue();
294 pendingTransBuild.addKVGuard(new KeyValue(key, null));
299 public IoTString getSpeculativeAtomic(IoTString key) {
301 if (arbitratorTable.get(key) == null) {
302 throw new Error("Key not Found.");
305 // Make sure new key value pair matches the current arbitrator
306 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
307 // TODO: Maybe not throw en error
308 throw new Error("Not all Key Values Match Arbitrator.");
311 KeyValue kv = pendingTransSpeculativeTable.get(key);
314 kv = speculativeTable.get(key);
318 kv = commitedTable.get(key);
322 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
323 return kv.getValue();
325 pendingTransBuild.addKVGuard(new KeyValue(key, null));
330 public Pair<Boolean, Boolean> update() {
332 boolean gotLatestFromServer = false;
333 boolean didSendLocal = false;
336 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
337 validateandupdate(newslots, false);
338 gotLatestFromServer = true;
340 if (!pendingTransQueue.isEmpty()) {
342 // We have a pending transaction so do full insertion
343 processPendingTrans();
346 // We dont have a pending transaction so do minimal effort
347 updateWithNotPendingTrans();
352 } catch (Exception e) {
353 // could not update so do nothing
356 return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
359 public Boolean updateFromLocal(long arb) {
360 LocalComm lc = localCommunicationChannels.get(arb);
362 // Cant talk directly to arbitrator so cant do anything
366 byte[] array = new byte[Long.BYTES ];
367 ByteBuffer bbEncode = ByteBuffer.wrap(array);
368 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
369 if (lastSeenCommit != null) {
370 bbEncode.putLong(lastSeenCommit);
375 byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
378 ByteBuffer bbDecode = ByteBuffer.wrap(data);
379 boolean didCommit = bbDecode.get() == 1;
380 int numberOfCommites = bbDecode.getInt();
382 List<Commit> newCommits = new LinkedList<Commit>();
383 for (int i = 0; i < numberOfCommites; i++ ) {
385 Commit com = (Commit)Commit.decode(null, bbDecode);
390 for (Commit commit : newCommits) {
391 // Prepare to process the commit
392 processEntry(commit);
395 boolean didCommitOrSpeculate = proccessAllNewCommits();
397 // Go through all uncommitted transactions and kill the ones that are dead
398 deleteDeadUncommittedTransactions();
400 // Speculate on key value pairs
401 didCommitOrSpeculate |= createSpeculativeTable();
402 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
407 public void startTransaction() {
408 // Create a new transaction, invalidates any old pending transactions.
409 pendingTransBuild = new PendingTransaction();
412 public void addKV(IoTString key, IoTString value) {
414 if (arbitratorTable.get(key) == null) {
415 throw new Error("Key not Found.");
418 // Make sure new key value pair matches the current arbitrator
419 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
420 // TODO: Maybe not throw en error
421 throw new Error("Not all Key Values Match Arbitrator.");
424 KeyValue kv = new KeyValue(key, value);
425 pendingTransBuild.addKV(kv);
428 public TransactionStatus commitTransaction() {
430 if (pendingTransBuild.getKVUpdates().size() == 0) {
432 // transaction with no updates will have no effect on the system
433 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
436 TransactionStatus transStatus = null;
438 if (pendingTransBuild.getArbitrator() != localmachineid) {
440 // set the local sequence number so we can recognize this transaction later
441 pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
442 localTransactionSequenceNumber++;
444 transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
445 transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
447 // Add the pending transaction to the queue
448 pendingTransQueue.add(pendingTransBuild);
451 for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
452 PendingTransaction pt = pendingTransQueue.get(i);
454 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
456 lastSeenPendingTransactionSpeculateIndex = i;
458 for (KeyValue kv : pt.getKVUpdates()) {
459 pendingTransSpeculativeTable.put(kv.getKey(), kv);
465 Transaction ut = new Transaction(null,
468 pendingTransBuild.getArbitrator(),
469 pendingTransBuild.getKVUpdates(),
470 pendingTransBuild.getKVGuard());
472 Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
474 if (retData.getFirst()) {
475 transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
477 transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
481 // Try to insert transactions if possible
482 if (!pendingTransQueue.isEmpty()) {
483 // We have a pending transaction so do full insertion
484 processPendingTrans();
487 // We dont have a pending transaction so do minimal effort
488 updateWithNotPendingTrans();
489 } catch (Exception e) {
494 // reset it so next time is fresh
495 pendingTransBuild = new PendingTransaction();
500 public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
503 if (arbitratorTable.get(keyName) != null) {
504 // There is already an arbitrator
508 if (tryput(keyName, machineId, false)) {
509 // If successfully inserted
515 private void processPendingTrans() {
517 boolean sentAllPending = false;
519 while (!pendingTransQueue.isEmpty()) {
520 if (tryput( pendingTransQueue.peek(), false)) {
521 pendingTransQueue.poll();
525 // if got here then all pending transactions were sent
526 sentAllPending = true;
527 } catch (Exception e) {
528 // There was a connection error
529 sentAllPending = false;
532 if (!sentAllPending) {
534 for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
535 PendingTransaction pt = i.next();
536 LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
538 // Cant talk directly to arbitrator so cant do anything
543 Transaction ut = new Transaction(null,
546 pendingTransBuild.getArbitrator(),
547 pendingTransBuild.getKVUpdates(),
548 pendingTransBuild.getKVGuard());
551 Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
553 for (Commit commit : retData.getSecond()) {
554 // Prepare to process the commit
555 processEntry(commit);
558 boolean didCommitOrSpeculate = proccessAllNewCommits();
560 // Go through all uncommitted transactions and kill the ones that are dead
561 deleteDeadUncommittedTransactions();
563 // Speculate on key value pairs
564 didCommitOrSpeculate |= createSpeculativeTable();
565 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
567 if (retData.getFirst()) {
568 TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
569 if (transStatus != null) {
570 transStatus.setStatus(TransactionStatus.StatusCommitted);
574 TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
575 if (transStatus != null) {
576 transStatus.setStatus(TransactionStatus.StatusAborted);
584 private void updateWithNotPendingTrans() throws ServerException {
586 boolean doEnd = false;
587 boolean needResize = false;
588 while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) {
589 boolean resize = needResize;
592 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
594 if (liveslotcount > resizethreshold) {
595 resize = true; //Resize is forced
599 newsize = (int) (numslots * RESIZE_MULTIPLE);
600 TableStatus status = new TableStatus(s, newsize);
604 doRejectedMessages(s);
606 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
608 // Resize was needed so redo call
609 if (retTup.getFirst()) {
614 // Extract working variables
615 boolean seenliveslot = retTup.getSecond();
616 long seqn = retTup.getThird();
618 // Did need to arbitrate
619 doEnd = !doArbitration(s);
621 doOptionalRescue(s, seenliveslot, seqn, resize);
628 Slot[] array = cloud.putSlot(s, max);
630 array = new Slot[] {s};
631 rejectedmessagelist.clear();
633 // Delete pending commits that were sent to the cloud
634 deletePendingCommits();
637 if (array.length == 0)
638 throw new Error("Server Error: Did not send any slots");
639 rejectedmessagelist.add(s.getSequenceNumber());
643 /* update data structure */
644 validateandupdate(array, true);
648 private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
650 // encode the request
651 byte[] array = new byte[Long.BYTES + ut.getSize()];
652 ByteBuffer bbEncode = ByteBuffer.wrap(array);
653 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
654 if (lastSeenCommit != null) {
655 bbEncode.putLong(lastSeenCommit);
661 byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
664 ByteBuffer bbDecode = ByteBuffer.wrap(data);
665 boolean didCommit = bbDecode.get() == 1;
666 int numberOfCommites = bbDecode.getInt();
668 List<Commit> newCommits = new LinkedList<Commit>();
669 for (int i = 0; i < numberOfCommites; i++ ) {
671 Commit com = (Commit)Commit.decode(null, bbDecode);
675 return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
678 public byte[] localCommInput(byte[] data) {
681 ByteBuffer bbDecode = ByteBuffer.wrap(data);
682 long lastSeenCommit = bbDecode.getLong();
684 Transaction ut = null;
685 if (data.length != Long.BYTES) {
687 ut = (Transaction)Transaction.decode(null, bbDecode);
689 // Do the local update and arbitrate
690 Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
692 // Calculate the size of the response
693 int size = Byte.BYTES + Integer.BYTES;
694 for (Commit com : returnData.getSecond()) {
695 size += com.getSize();
698 // encode the response
699 byte[] array = new byte[size];
700 ByteBuffer bbEncode = ByteBuffer.wrap(array);
701 if (returnData.getFirst()) {
702 bbEncode.put((byte)1);
704 bbEncode.put((byte)0);
706 bbEncode.putInt(returnData.getSecond().size());
708 for (Commit com : returnData.getSecond()) {
709 com.encode(bbEncode);
712 return bbEncode.array();
715 private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
717 List<Commit> returnCommits = new ArrayList<Commit>();
719 if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
720 // There is a commit that the other client has not seen yet
722 Map<Long, Commit> cm = commitMap.get(localmachineid);
725 List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
726 Collections.sort(commitKeys);
729 for (int i = (commitKeys.size() - 1); i >= 0; i--) {
730 Commit com = cm.get(commitKeys.get(i));
732 if (com.getSequenceNumber() <= lastCommitSeen) {
735 returnCommits.add((Commit)com.getCopy(null));
741 if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
742 // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
743 // or there is no transaction to process
744 return new Pair<Boolean, List<Commit>>(false, returnCommits);
747 if (!ut.evaluateGuard(commitedTable, null)) {
748 // Guard evaluated as false so return only the commits that the other device has not seen yet
749 return new Pair<Boolean, List<Commit>>(false, returnCommits);
753 Commit commit = new Commit(null,
755 commitSequenceNumber,
757 ut.getkeyValueUpdateSet());
758 commitSequenceNumber = commitSequenceNumber + 1;
760 // Add to the pending commits list
761 pendingCommitsList.add(commit);
763 // Add this commit so we can send it back
764 returnCommits.add(commit);
766 // Prepare to process the commit
767 processEntry(commit);
769 boolean didCommitOrSpeculate = proccessAllNewCommits();
771 // Go through all uncommitted transactions and kill the ones that are dead
772 deleteDeadUncommittedTransactions();
774 // Speculate on key value pairs
775 didCommitOrSpeculate |= createSpeculativeTable();
776 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
778 return new Pair<Boolean, List<Commit>>(true, returnCommits);
781 public void decrementLiveCount() {
785 private void setResizeThreshold() {
786 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
787 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
790 private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
791 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
794 if (liveslotcount > resizethreshold) {
795 resize = true; //Resize is forced
799 newsize = (int) (numslots * RESIZE_MULTIPLE);
800 TableStatus status = new TableStatus(s, newsize);
804 doRejectedMessages(s);
806 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
808 // Resize was needed so redo call
809 if (retTup.getFirst()) {
810 return tryput(pendingTrans, true);
813 // Extract working variables
814 boolean seenliveslot = retTup.getSecond();
815 long seqn = retTup.getThird();
819 Transaction trans = new Transaction(s,
820 s.getSequenceNumber(),
822 pendingTrans.getArbitrator(),
823 pendingTrans.getKVUpdates(),
824 pendingTrans.getKVGuard());
825 boolean insertedTrans = false;
826 if (s.hasSpace(trans)) {
828 insertedTrans = true;
831 doOptionalRescue(s, seenliveslot, seqn, resize);
832 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
834 if (sendRetData.getFirst()) {
835 // update the status and change what the sequence number is for the
836 TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
837 transStatus.setStatus(TransactionStatus.StatusSent);
838 transStatus.setSentTransaction();
839 transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
843 if (sendRetData.getSecond().length != 0) {
844 // insert into the local block chain
845 validateandupdate(sendRetData.getSecond(), true);
848 return sendRetData.getFirst();
851 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
852 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
854 if (liveslotcount > resizethreshold) {
855 resize = true; //Resize is forced
859 newsize = (int) (numslots * RESIZE_MULTIPLE);
860 TableStatus status = new TableStatus(s, newsize);
864 doRejectedMessages(s);
865 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
867 // Resize was needed so redo call
868 if (retTup.getFirst()) {
869 return tryput(keyName, arbMachineid, true);
872 // Extract working variables
873 boolean seenliveslot = retTup.getSecond();
874 long seqn = retTup.getThird();
878 NewKey newKey = new NewKey(s, keyName, arbMachineid);
880 boolean insertedNewKey = false;
881 if (s.hasSpace(newKey)) {
883 insertedNewKey = true;
886 doOptionalRescue(s, seenliveslot, seqn, resize);
887 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
889 if (sendRetData.getSecond().length != 0) {
890 // insert into the local block chain
891 validateandupdate(sendRetData.getSecond(), true);
894 return sendRetData.getFirst();
897 private void doRejectedMessages(Slot s) {
898 if (! rejectedmessagelist.isEmpty()) {
899 /* TODO: We should avoid generating a rejected message entry if
900 * there is already a sufficient entry in the queue (e.g.,
901 * equalsto value of true and same sequence number). */
903 long old_seqn = rejectedmessagelist.firstElement();
904 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
905 long new_seqn = rejectedmessagelist.lastElement();
906 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
911 /* Go through list of missing messages */
912 for (; i < rejectedmessagelist.size(); i++) {
913 long curr_seqn = rejectedmessagelist.get(i);
914 Slot s_msg = buffer.getSlot(curr_seqn);
917 prev_seqn = curr_seqn;
919 /* Generate rejected message entry for missing messages */
920 if (prev_seqn != -1) {
921 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
924 /* Generate rejected message entries for present messages */
925 for (; i < rejectedmessagelist.size(); i++) {
926 long curr_seqn = rejectedmessagelist.get(i);
927 Slot s_msg = buffer.getSlot(curr_seqn);
928 long machineid = s_msg.getMachineID();
929 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
936 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
937 long newestseqnum = buffer.getNewestSeqNum();
938 long oldestseqnum = buffer.getOldestSeqNum();
939 if (lastliveslotseqn < oldestseqnum)
940 lastliveslotseqn = oldestseqnum;
942 long seqn = lastliveslotseqn;
943 boolean seenliveslot = false;
944 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
945 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
949 for (; seqn < threshold; seqn++) {
950 Slot prevslot = buffer.getSlot(seqn);
951 // Push slot number forward
953 lastliveslotseqn = seqn;
955 if (! prevslot.isLive())
958 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
959 for (Entry liveentry : liveentries) {
960 if (s.hasSpace(liveentry)) {
961 s.addEntry(liveentry);
962 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
964 System.out.println("B"); //?
965 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
972 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
975 private boolean doArbitration(Slot s) {
977 // flag whether we have finished all arbitration
978 boolean stillHasArbitration = false;
980 pendingCommitsToDelete.clear();
982 // First add queue commits
983 for (Commit commit : pendingCommitsList) {
984 if (s.hasSpace(commit)) {
986 pendingCommitsToDelete.add(commit);
988 // Ran out of space so move on but still not done
989 stillHasArbitration = true;
990 return stillHasArbitration;
995 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
996 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
998 // Sort from oldest to newest
999 Collections.sort(transSeqNums);
1001 for (Long transNum : transSeqNums) {
1002 Transaction ut = uncommittedTransactionsMap.get(transNum);
1004 // Check if this machine arbitrates for this transaction
1005 if (ut.getArbitrator() != localmachineid ) {
1009 // we did have something to arbitrate on
1010 stillHasArbitration = true;
1012 Entry newEntry = null;
1014 if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
1015 // Guard evaluated as true
1017 // update the local tmp current key set
1018 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
1019 speculativeTableTmp.put(kv.getKey(), kv);
1022 // create the commit
1023 newEntry = new Commit(s,
1024 ut.getSequenceNumber(),
1025 commitSequenceNumber,
1027 ut.getkeyValueUpdateSet());
1028 commitSequenceNumber = commitSequenceNumber + 1;
1033 newEntry = new Abort(s,
1034 ut.getSequenceNumber(),
1036 ut.getArbitrator());
1039 if ((newEntry != null) && s.hasSpace(newEntry)) {
1040 s.addEntry(newEntry);
1046 return stillHasArbitration;
1049 private void deletePendingCommits() {
1050 for (Commit com : pendingCommitsToDelete) {
1051 pendingCommitsList.remove(com);
1053 pendingCommitsToDelete.clear();
1056 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1057 /* now go through live entries from least to greatest sequence number until
1058 * either all live slots added, or the slot doesn't have enough room
1059 * for SKIP_THRESHOLD consecutive entries*/
1061 long newestseqnum = buffer.getNewestSeqNum();
1063 for (; seqn <= newestseqnum; seqn++) {
1064 Slot prevslot = buffer.getSlot(seqn);
1065 //Push slot number forward
1067 lastliveslotseqn = seqn;
1069 if (!prevslot.isLive())
1071 seenliveslot = true;
1072 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1073 for (Entry liveentry : liveentries) {
1074 if (s.hasSpace(liveentry))
1075 s.addEntry(liveentry);
1078 if (skipcount > SKIP_THRESHOLD)
1085 private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
1090 Slot[] array = cloud.putSlot(s, max);
1091 if (array == null) {
1092 array = new Slot[] {s};
1093 rejectedmessagelist.clear();
1095 // Delete pending commits that were sent to the cloud
1096 deletePendingCommits();
1098 // if (array.length == 0)
1099 // throw new Error("Server Error: Did not send any slots");
1100 rejectedmessagelist.add(s.getSequenceNumber());
1104 return new Pair<Boolean, Slot[]>(inserted, array);
1107 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
1108 /* The cloud communication layer has checked slot HMACs already
1110 if (newslots.length == 0) return;
1112 // Reset the table status declared sizes
1113 smallestTableStatusSeen = -1;
1114 largestTableStatusSeen = -1;
1116 long firstseqnum = newslots[0].getSequenceNumber();
1117 if (firstseqnum <= sequencenumber) {
1118 throw new Error("Server Error: Sent older slots!");
1121 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
1122 checkHMACChain(indexer, newslots);
1124 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
1126 // initExpectedSize(firstseqnum);
1127 for (Slot slot : newslots) {
1128 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
1129 // updateExpectedSize();
1132 /* If there is a gap, check to see if the server sent us everything. */
1133 if (firstseqnum != (sequencenumber + 1)) {
1136 checkNumSlots(newslots.length);
1137 if (!machineSet.isEmpty()) {
1138 throw new Error("Missing record for machines: " + machineSet);
1145 /* Commit new to slots. */
1146 for (Slot slot : newslots) {
1147 buffer.putSlot(slot);
1150 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
1152 // Process all on key value pairs
1153 boolean didCommitOrSpeculate = proccessAllNewCommits();
1155 // Go through all uncommitted transactions and kill the ones that are dead
1156 deleteDeadUncommittedTransactions();
1158 // Speculate on key value pairs
1159 didCommitOrSpeculate |= createSpeculativeTable();
1161 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
1164 private boolean proccessAllNewCommits() {
1165 // Process only if there are commit
1166 if (newCommitMap.keySet().size() == 0) {
1169 boolean didProcessNewCommit = false;
1171 for (Long arb : newCommitMap.keySet()) {
1173 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
1175 // Sort from oldest to newest commit
1176 Collections.sort(commitSeqNums);
1178 // Go through each new commit one by one
1179 for (Long entrySeqNum : commitSeqNums) {
1180 Commit entry = newCommitMap.get(arb).get(entrySeqNum);
1182 long lastCommitSeenSeqNum = -1;
1183 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
1184 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
1187 if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
1188 Map<Long, Commit> cm = commitMap.get(arb);
1190 cm = new HashMap<Long, Commit>();
1193 Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
1194 commitMap.put(arb, cm);
1196 if (prevCommit != null) {
1197 prevCommit.setDead();
1199 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
1200 committedMapByKey.put(kv.getKey(), entry);
1207 Set<Commit> commitsToEditSet = new HashSet<Commit>();
1209 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1210 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
1213 commitsToEditSet.remove(null);
1215 for (Commit prevCommit : commitsToEditSet) {
1217 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
1219 if (!prevCommit.isLive()) {
1220 Map<Long, Commit> cm = commitMap.get(arb);
1222 // remove it from the map so that it can be set as dead
1224 cm.remove(prevCommit.getSequenceNumber());
1225 commitMap.put(arb, cm);
1230 // Add the new commit
1231 Map<Long, Commit> cm = commitMap.get(arb);
1233 cm = new HashMap<Long, Commit>();
1235 cm.put(entry.getSequenceNumber(), entry);
1236 commitMap.put(arb, cm);
1238 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
1240 // set the trans sequence number if we are able to
1241 if (entry.getTransSequenceNumber() != -1) {
1242 lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1245 didProcessNewCommit = true;
1247 // Update the committed table list
1248 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1249 IoTString key = kv.getKey();
1250 commitedTable.put(key, kv);
1251 committedMapByKey.put(key, entry);
1255 // Clear the new commits storage so we can use it later
1256 newCommitMap.clear();
1259 // go through all saved transactions and update the status of those that can be updated
1260 for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
1261 Map.Entry<Long, TransactionStatus> entry = i.next();
1262 long seqnum = entry.getKey();
1263 TransactionStatus status = entry.getValue();
1265 if (status.getSentTransaction()) {
1267 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
1268 Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
1270 if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
1271 ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
1272 status.setStatus(TransactionStatus.StatusCommitted);
1278 return didProcessNewCommit;
1281 private void deleteDeadUncommittedTransactions() {
1282 // Make dead the transactions
1283 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
1284 Transaction prevtrans = i.next().getValue();
1285 long transArb = prevtrans.getArbitrator();
1287 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
1288 Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
1290 if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
1291 ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
1293 prevtrans.setDead();
1298 private boolean createSpeculativeTable() {
1299 if (uncommittedTransactionsMap.keySet().size() == 0) {
1303 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1304 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1306 // Sort from oldest to newest commit
1307 Collections.sort(utSeqNums);
1309 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
1311 speculativeTable.clear();
1312 lastUncommittedTransaction = -1;
1314 for (Long key : utSeqNums) {
1315 Transaction trans = uncommittedTransactionsMap.get(key);
1317 lastUncommittedTransaction = key;
1319 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
1320 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1321 speculativeTableTmp.put(kv.getKey(), kv);
1327 for (Long key : utSeqNums) {
1329 if (key <= lastUncommittedTransaction) {
1333 lastUncommittedTransaction = key;
1335 Transaction trans = uncommittedTransactionsMap.get(key);
1337 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
1338 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1339 speculativeTableTmp.put(kv.getKey(), kv);
1345 for (IoTString key : speculativeTableTmp.keySet()) {
1346 speculativeTable.put(key, speculativeTableTmp.get(key));
1352 private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
1354 if (didCommitOrSpeculate) {
1355 pendingTransSpeculativeTable.clear();
1356 lastSeenPendingTransactionSpeculateIndex = 0;
1359 for (PendingTransaction pt : pendingTransQueue) {
1360 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
1362 lastSeenPendingTransactionSpeculateIndex = index;
1365 for (KeyValue kv : pt.getKVUpdates()) {
1366 pendingTransSpeculativeTable.put(kv.getKey(), kv);
1374 private int expectedsize, currmaxsize;
1376 private void checkNumSlots(int numslots) {
1379 // We only have 1 size so we must have this many slots
1380 if (largestTableStatusSeen == smallestTableStatusSeen) {
1381 if (numslots != smallestTableStatusSeen) {
1382 throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
1385 // We have more than 1
1386 if (numslots < smallestTableStatusSeen) {
1387 throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
1391 // if (numslots != expectedsize) {
1392 // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
1396 private void initExpectedSize(long firstsequencenumber) {
1397 long prevslots = firstsequencenumber;
1398 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
1399 currmaxsize = numslots;
1402 private void updateExpectedSize() {
1404 if (expectedsize > currmaxsize) {
1405 expectedsize = currmaxsize;
1409 private void updateCurrMaxSize(int newmaxsize) {
1410 currmaxsize = newmaxsize;
1413 private void commitNewMaxSize() {
1415 if (largestTableStatusSeen == -1) {
1416 currmaxsize = numslots;
1418 currmaxsize = largestTableStatusSeen;
1421 if (numslots != currmaxsize) {
1422 buffer.resize(currmaxsize);
1425 numslots = currmaxsize;
1426 setResizeThreshold();
1429 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1430 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1433 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1434 long oldseqnum = entry.getOldSeqNum();
1435 long newseqnum = entry.getNewSeqNum();
1436 boolean isequal = entry.getEqual();
1437 long machineid = entry.getMachineID();
1438 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
1439 Slot slot = indexer.getSlot(seqnum);
1441 long slotmachineid = slot.getMachineID();
1442 if (isequal != (slotmachineid == machineid)) {
1443 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
1448 HashSet<Long> watchset = new HashSet<Long>();
1449 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
1450 long entry_mid = lastmsg_entry.getKey();
1451 /* We've seen it, don't need to continue to watch. Our next
1452 * message will implicitly acknowledge it. */
1453 if (entry_mid == localmachineid)
1455 Pair<Long, Liveness> v = lastmsg_entry.getValue();
1456 long entry_seqn = v.getFirst();
1457 if (entry_seqn < newseqnum) {
1458 addWatchList(entry_mid, entry);
1459 watchset.add(entry_mid);
1462 if (watchset.isEmpty())
1465 entry.setWatchSet(watchset);
1468 private void processEntry(NewKey entry) {
1469 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1471 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1473 if (oldNewKey != null) {
1474 oldNewKey.setDead();
1478 private void processEntry(Transaction entry) {
1480 long arb = entry.getArbitrator();
1481 Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
1482 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1484 Transaction prevTrans = null;
1486 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1487 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1488 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1489 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1491 prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1494 // Duplicate so delete old copy
1495 if (prevTrans != null) {
1496 prevTrans.setDead();
1500 private void processEntry(Abort entry) {
1501 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1502 // Abort has not been seen yet so we need to keep track of it
1504 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1505 if (prevAbort != null) {
1506 prevAbort.setDead(); // delete old version of the duplicate
1509 if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1510 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1513 // The machine already saw this so it is dead
1517 // Update the status of the transaction and remove it since we are done with this transaction
1518 TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
1519 if (status != null) {
1520 status.setStatus(TransactionStatus.StatusAborted);
1524 private void processEntry(Commit entry) {
1525 Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
1527 if (arbMap == null) {
1528 arbMap = new HashMap<Long, Commit>();
1531 Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
1532 newCommitMap.put(entry.getTransArbitrator(), arbMap);
1534 if (prevCommit != null) {
1535 prevCommit.setDead();
1539 private void processEntry(TableStatus entry) {
1540 int newnumslots = entry.getMaxSlots();
1541 // updateCurrMaxSize(newnumslots);
1542 if (lastTableStatus != null)
1543 lastTableStatus.setDead();
1544 lastTableStatus = entry;
1546 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1547 smallestTableStatusSeen = newnumslots;
1550 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1551 largestTableStatusSeen = newnumslots;
1555 private void addWatchList(long machineid, RejectedMessage entry) {
1556 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1557 if (entries == null)
1558 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1562 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1563 machineSet.remove(machineid);
1565 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1566 if (watchset != null) {
1567 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1568 RejectedMessage rm = rmit.next();
1569 if (rm.getNewSeqNum() <= seqnum) {
1570 /* Remove it from our watchlist */
1572 /* Decrement machines that need to see this notification */
1573 rm.removeWatcher(machineid);
1578 if (machineid == localmachineid) {
1579 /* Our own messages are immediately dead. */
1580 if (liveness instanceof LastMessage) {
1581 ((LastMessage)liveness).setDead();
1582 } else if (liveness instanceof Slot) {
1583 ((Slot)liveness).setDead();
1585 throw new Error("Unrecognized type");
1589 // Set dead the abort
1590 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1591 Abort abort = i.next().getValue();
1593 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1599 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1600 if (lastmsgentry == null)
1603 long lastmsgseqnum = lastmsgentry.getFirst();
1604 Liveness lastentry = lastmsgentry.getSecond();
1605 if (machineid != localmachineid) {
1606 if (lastentry instanceof LastMessage) {
1607 ((LastMessage)lastentry).setDead();
1608 } else if (lastentry instanceof Slot) {
1609 ((Slot)lastentry).setDead();
1611 throw new Error("Unrecognized type");
1615 if (machineid == localmachineid) {
1616 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1617 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqnum + " got: " + lastmsgseqnum);
1619 if (lastmsgseqnum > seqnum)
1620 throw new Error("Server Error: Rollback on remote machine sequence number");
1624 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1625 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1626 for (Entry entry : slot.getEntries()) {
1627 switch (entry.getType()) {
1629 case Entry.TypeNewKey:
1630 processEntry((NewKey)entry);
1633 case Entry.TypeCommit:
1634 processEntry((Commit)entry);
1637 case Entry.TypeAbort:
1638 processEntry((Abort)entry);
1641 case Entry.TypeTransaction:
1642 processEntry((Transaction)entry);
1645 case Entry.TypeLastMessage:
1646 processEntry((LastMessage)entry, machineSet);
1649 case Entry.TypeRejectedMessage:
1650 processEntry((RejectedMessage)entry, indexer);
1653 case Entry.TypeTableStatus:
1654 processEntry((TableStatus)entry);
1658 throw new Error("Unrecognized type: " + entry.getType());
1663 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1664 for (int i = 0; i < newslots.length; i++) {
1665 Slot currslot = newslots[i];
1666 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1667 if (prevslot != null &&
1668 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1669 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);