Block Chain Transactions, Commits multiple parts version
[iotcloud.git] / version2 / backup / src / java / iotcloud / Table.java
diff --git a/version2/backup/src/java/iotcloud/Table.java b/version2/backup/src/java/iotcloud/Table.java
new file mode 100644 (file)
index 0000000..f6d699b
--- /dev/null
@@ -0,0 +1,1728 @@
+package iotcloud;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.Random;
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+
+
+/**
+ * IoTTable data structure.  Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+final public class Table {
+       private int numslots;   //number of slots stored in buffer
+
+       // machine id -> (sequence number, Slot or LastMessage); records last message by each client
+       private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
+       // machine id -> ...
+       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+       private Vector<Long> rejectedmessagelist = new Vector<Long>();
+       private SlotBuffer buffer;
+       private CloudComm cloud;
+       private long sequencenumber; //Largest sequence number a client has received
+       private long localmachineid;
+       private TableStatus lastTableStatus;
+       static final int FREE_SLOTS = 10; //number of slots that should be kept free
+       static final int SKIP_THRESHOLD = 10;
+       private long liveslotcount = 0;
+       private int chance;
+       static final double RESIZE_MULTIPLE = 1.2;
+       static final double RESIZE_THRESHOLD = 0.75;
+       static final int REJECTED_THRESHOLD = 5;
+       private int resizethreshold;
+       private long lastliveslotseqn;  //smallest sequence number with a live entry
+       private Random random = new Random();
+       private long lastUncommittedTransaction = 0;
+
+       private int smallestTableStatusSeen = -1;
+       private int largestTableStatusSeen = -1;
+       private int lastSeenPendingTransactionSpeculateIndex = 0;
+       private int commitSequenceNumber = 0;
+       private long localTransactionSequenceNumber = 0;
+
+       private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
+       private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+       private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
+       private Map<Long, Abort> abortMap = null; // Set of the live aborts
+       private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
+       private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
+       private Map<Long, Transaction> uncommittedTransactionsMap = null;
+       private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+       private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
+       private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
+       private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
+       private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
+       private List<Commit> pendingCommitsList = null;
+       private List<Commit> pendingCommitsToDelete = null;
+       private Map<Long, LocalComm> localCommunicationChannels;
+       private Map<Long, TransactionStatus> transactionStatusMap = null;
+       private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
+
+       private Semaphore mutex = null;
+
+
+
+       public Table(String hostname, String baseurl, String password, long _localmachineid) {
+               localmachineid = _localmachineid;
+               buffer = new SlotBuffer();
+               numslots = buffer.capacity();
+               setResizeThreshold();
+               sequencenumber = 0;
+               cloud = new CloudComm(this, hostname, baseurl, password);
+               lastliveslotseqn = 1;
+
+               setupDataStructs();
+       }
+
+       public Table(CloudComm _cloud, long _localmachineid) {
+               localmachineid = _localmachineid;
+               buffer = new SlotBuffer();
+               numslots = buffer.capacity();
+               setResizeThreshold();
+               sequencenumber = 0;
+               cloud = _cloud;
+
+               setupDataStructs();
+       }
+
+       private void setupDataStructs() {
+               pendingTransQueue = new LinkedList<PendingTransaction>();
+               commitMap = new HashMap<Long, Map<Long, Commit>>();
+               abortMap = new HashMap<Long, Abort>();
+               committedMapByKey = new HashMap<IoTString, Commit>();
+               commitedTable = new HashMap<IoTString, KeyValue>();
+               speculativeTable = new HashMap<IoTString, KeyValue>();
+               uncommittedTransactionsMap = new HashMap<Long, Transaction>();
+               arbitratorTable = new HashMap<IoTString, Long>();
+               newKeyTable = new HashMap<IoTString, NewKey>();
+               newCommitMap = new HashMap<Long, Map<Long, Commit>>();
+               lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
+               lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
+               lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
+               pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
+               pendingCommitsList = new LinkedList<Commit>();
+               pendingCommitsToDelete = new LinkedList<Commit>();
+               localCommunicationChannels = new HashMap<Long, LocalComm>();
+               transactionStatusMap = new HashMap<Long, TransactionStatus>();
+               transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
+               mutex = new Semaphore(1);
+       }
+
+       public void initTable() throws ServerException {
+               cloud.setSalt();//Set the salt
+               Slot s = new Slot(this, 1, localmachineid);
+               TableStatus status = new TableStatus(s, numslots);
+               s.addEntry(status);
+               Slot[] array = cloud.putSlot(s, numslots);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       /* update data structure */
+                       validateandupdate(array, true);
+               } else {
+                       throw new Error("Error on initialization");
+               }
+       }
+
+       public void rebuild() throws ServerException, InterruptedException {
+               mutex.acquire();
+               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+               validateandupdate(newslots, true);
+               mutex.release();
+       }
+
+       // TODO: delete method
+       public void printSlots() {
+               long o = buffer.getOldestSeqNum();
+               long n = buffer.getNewestSeqNum();
+
+               int[] types = new int[10];
+
+               int num = 0;
+
+               int livec = 0;
+               int deadc = 0;
+               for (long i = o; i < (n + 1); i++) {
+                       Slot s = buffer.getSlot(i);
+
+                       Vector<Entry> entries = s.getEntries();
+
+                       for (Entry e : entries) {
+                               if (e.isLive()) {
+                                       int type = e.getType();
+                                       types[type] = types[type] + 1;
+                                       num++;
+                                       livec++;
+                               } else {
+                                       deadc++;
+                               }
+                       }
+               }
+
+               for (int i = 0; i < 10; i++) {
+                       System.out.println(i + "    " + types[i]);
+               }
+               System.out.println("Live count:   " + livec);
+               System.out.println("Dead count:   " + deadc);
+               System.out.println("Old:   " + o);
+               System.out.println("New:   " + n);
+               System.out.println("Size:   " + buffer.size());
+               System.out.println("Commits Key Map:   " + commitedTable.size());
+               // System.out.println("Commits Live Map:   " + commitMap.size());
+               System.out.println("Pending:   " + pendingTransQueue.size());
+
+               // List<IoTString> strList = new ArrayList<IoTString>();
+               // for (int i = 0; i < 100; i++) {
+               //      String keyA = "a" + i;
+               //      String keyB = "b" + i;
+               //      String keyC = "c" + i;
+               //      String keyD = "d" + i;
+
+               //      IoTString iKeyA = new IoTString(keyA);
+               //      IoTString iKeyB = new IoTString(keyB);
+               //      IoTString iKeyC = new IoTString(keyC);
+               //      IoTString iKeyD = new IoTString(keyD);
+
+               //      strList.add(iKeyA);
+               //      strList.add(iKeyB);
+               //      strList.add(iKeyC);
+               //      strList.add(iKeyD);
+               // }
+
+
+               // for (Long l : commitMap.keySet()) {
+               //      for (Long l2 : commitMap.get(l).keySet()) {
+               //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
+               //                      strList.remove(kv.getKey());
+               //                      System.out.print(kv.getKey() + "    ");
+               //              }
+               //      }
+               // }
+
+               // System.out.println();
+               // System.out.println();
+
+               // for (IoTString s : strList) {
+               //      System.out.print(s + "    ");
+               // }
+               // System.out.println();
+               // System.out.println(strList.size());
+       }
+
+       public long getId() {
+               return localmachineid;
+       }
+
+       public boolean hasConnection() {
+               return cloud.hasConnection();
+       }
+
+       public String toString() {
+               String retString = " Committed Table: \n";
+               retString += "---------------------------\n";
+               retString += commitedTable.toString();
+
+               retString += "\n\n";
+
+               retString += " Speculative Table: \n";
+               retString += "---------------------------\n";
+               retString += speculativeTable.toString();
+
+               return retString;
+       }
+
+       public void addLocalComm(long machineId, LocalComm lc) {
+               localCommunicationChannels.put(machineId, lc);
+       }
+       public Long getArbitrator(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+               Long arb = arbitratorTable.get(key);
+               mutex.release();
+
+               return arb;
+       }
+
+       public IoTString getCommitted(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+               KeyValue kv = commitedTable.get(key);
+               mutex.release();
+
+
+               if (kv != null) {
+                       return kv.getValue();
+               } else {
+                       return null;
+               }
+       }
+
+       public IoTString getSpeculative(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
+               KeyValue kv = pendingTransSpeculativeTable.get(key);
+
+               if (kv == null) {
+                       kv = speculativeTable.get(key);
+               }
+
+               if (kv == null) {
+                       kv = commitedTable.get(key);
+               }
+               mutex.release();
+
+
+               if (kv != null) {
+                       return kv.getValue();
+               } else {
+                       return null;
+               }
+       }
+
+       public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
+               KeyValue kv = commitedTable.get(key);
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               mutex.release();
+
+               if (kv != null) {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+                       return kv.getValue();
+               } else {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, null));
+                       return null;
+               }
+       }
+
+       public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               KeyValue kv = pendingTransSpeculativeTable.get(key);
+
+               if (kv == null) {
+                       kv = speculativeTable.get(key);
+               }
+
+               if (kv == null) {
+                       kv = commitedTable.get(key);
+               }
+
+               mutex.release();
+
+               if (kv != null) {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+                       return kv.getValue();
+               } else {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, null));
+                       return null;
+               }
+       }
+
+       public Pair<Boolean, Boolean> update() throws InterruptedException {
+
+               mutex.acquire();
+
+               boolean gotLatestFromServer = false;
+               boolean didSendLocal = false;
+
+               try {
+                       Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+                       validateandupdate(newslots, false);
+                       gotLatestFromServer = true;
+
+                       if (!pendingTransQueue.isEmpty()) {
+
+                               // We have a pending transaction so do full insertion
+                               processPendingTrans();
+                       } else {
+
+                               // We dont have a pending transaction so do minimal effort
+                               updateWithNotPendingTrans();
+                       }
+
+                       didSendLocal = true;
+
+
+               } catch (Exception e) {
+                       // could not update so do nothing
+               }
+
+
+               mutex.release();
+               return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
+       }
+
+       public Boolean updateFromLocal(long arb) throws InterruptedException {
+               LocalComm lc = localCommunicationChannels.get(arb);
+               if (lc == null) {
+                       // Cant talk directly to arbitrator so cant do anything
+                       return false;
+               }
+
+               byte[] array = new byte[Long.BYTES ];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
+               if (lastSeenCommit != null) {
+                       bbEncode.putLong(lastSeenCommit);
+               } else {
+                       bbEncode.putLong(0);
+               }
+
+               mutex.acquire();
+               byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               boolean didCommit = bbDecode.get() == 1;
+               int numberOfCommites = bbDecode.getInt();
+
+               List<Commit> newCommits = new LinkedList<Commit>();
+               for (int i = 0; i < numberOfCommites; i++ ) {
+                       bbDecode.get();
+                       Commit com = (Commit)Commit.decode(null, bbDecode);
+                       newCommits.add(com);
+               }
+
+               for (Commit commit : newCommits) {
+                       // Prepare to process the commit
+                       processEntry(commit);
+               }
+
+               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
+               // Speculate on key value pairs
+               didCommitOrSpeculate |= createSpeculativeTable();
+               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+
+               mutex.release();
+
+               return true;
+       }
+
+       public void startTransaction() throws InterruptedException {
+               // Create a new transaction, invalidates any old pending transactions.
+               pendingTransBuild = new PendingTransaction();
+       }
+
+       public void addKV(IoTString key, IoTString value) {
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               KeyValue kv = new KeyValue(key, value);
+               pendingTransBuild.addKV(kv);
+       }
+
+       public TransactionStatus commitTransaction() throws InterruptedException {
+
+               if (pendingTransBuild.getKVUpdates().size() == 0) {
+
+                       // transaction with no updates will have no effect on the system
+                       return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
+               }
+
+               mutex.acquire();
+
+               TransactionStatus transStatus = null;
+
+               if (pendingTransBuild.getArbitrator() != localmachineid) {
+
+                       // set the local sequence number so we can recognize this transaction later
+                       pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
+                       localTransactionSequenceNumber++;
+
+                       transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
+                       transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
+
+                       // Add the pending transaction to the queue
+                       pendingTransQueue.add(pendingTransBuild);
+
+
+                       for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
+                               PendingTransaction pt = pendingTransQueue.get(i);
+
+                               if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+
+                                       lastSeenPendingTransactionSpeculateIndex = i;
+
+                                       for (KeyValue kv : pt.getKVUpdates()) {
+                                               pendingTransSpeculativeTable.put(kv.getKey(), kv);
+                                       }
+
+                               }
+                       }
+               } else {
+                       Transaction ut = new Transaction(null,
+                                                        -1,
+                                                        localmachineid,
+                                                        pendingTransBuild.getArbitrator(),
+                                                        pendingTransBuild.getKVUpdates(),
+                                                        pendingTransBuild.getKVGuard());
+
+                       Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
+
+                       if (retData.getFirst()) {
+                               transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
+                       } else {
+                               transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
+                       }
+               }
+
+               // Try to insert transactions if possible
+               if (!pendingTransQueue.isEmpty()) {
+                       // We have a pending transaction so do full insertion
+                       processPendingTrans();
+               } else {
+                       try {
+                               // We dont have a pending transaction so do minimal effort
+                               updateWithNotPendingTrans();
+                       } catch (Exception e) {
+                               // Do nothing
+                       }
+               }
+
+               // reset it so next time is fresh
+               pendingTransBuild = new PendingTransaction();
+
+
+               mutex.release();
+               return transStatus;
+       }
+
+       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
+               try {
+                       mutex.acquire();
+
+                       while (true) {
+                               if (arbitratorTable.get(keyName) != null) {
+                                       // There is already an arbitrator
+                                       mutex.release();
+                                       return false;
+                               }
+
+                               if (tryput(keyName, machineId, false)) {
+                                       // If successfully inserted
+                                       mutex.release();
+                                       return true;
+                               }
+                       }
+               } catch (ServerException e) {
+                       mutex.release();
+                       throw e;
+               }
+       }
+
+       private void processPendingTrans() throws InterruptedException {
+
+               boolean sentAllPending = false;
+               try {
+                       while (!pendingTransQueue.isEmpty()) {
+                               if (tryput( pendingTransQueue.peek(), false)) {
+                                       pendingTransQueue.poll();
+                               }
+                       }
+
+                       // if got here then all pending transactions were sent
+                       sentAllPending = true;
+               } catch (Exception e) {
+                       // There was a connection error
+                       sentAllPending = false;
+               }
+
+               if (!sentAllPending) {
+
+                       for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
+                               PendingTransaction pt = i.next();
+                               LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
+                               if (lc == null) {
+                                       // Cant talk directly to arbitrator so cant do anything
+                                       continue;
+                               }
+
+
+                               Transaction ut = new Transaction(null,
+                                                                -1,
+                                                                localmachineid,
+                                                                pendingTransBuild.getArbitrator(),
+                                                                pendingTransBuild.getKVUpdates(),
+                                                                pendingTransBuild.getKVGuard());
+
+
+                               Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
+
+                               for (Commit commit : retData.getSecond()) {
+                                       // Prepare to process the commit
+                                       processEntry(commit);
+                               }
+
+                               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+                               // Go through all uncommitted transactions and kill the ones that are dead
+                               deleteDeadUncommittedTransactions();
+
+                               // Speculate on key value pairs
+                               didCommitOrSpeculate |= createSpeculativeTable();
+                               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+                               if (retData.getFirst()) {
+                                       TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       if (transStatus != null) {
+                                               transStatus.setStatus(TransactionStatus.StatusCommitted);
+                                       }
+
+                               } else {
+                                       TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       if (transStatus != null) {
+                                               transStatus.setStatus(TransactionStatus.StatusAborted);
+                                       }
+                               }
+                               i.remove();
+                       }
+               }
+       }
+
+       private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
+               boolean doEnd = false;
+               boolean needResize = false;
+               while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
+                       boolean resize = needResize;
+                       needResize = false;
+
+                       Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+                       int newsize = 0;
+                       if (liveslotcount > resizethreshold) {
+                               resize = true; //Resize is forced
+                       }
+
+                       if (resize) {
+                               newsize = (int) (numslots * RESIZE_MULTIPLE);
+                               TableStatus status = new TableStatus(s, newsize);
+                               s.addEntry(status);
+                       }
+
+                       doRejectedMessages(s);
+
+                       ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+                       // Resize was needed so redo call
+                       if (retTup.getFirst()) {
+                               needResize = true;
+                               continue;
+                       }
+
+                       // Extract working variables
+                       boolean seenliveslot = retTup.getSecond();
+                       long seqn = retTup.getThird();
+
+                       // Did need to arbitrate
+                       doEnd = !doArbitration(s);
+
+                       doOptionalRescue(s, seenliveslot, seqn, resize);
+
+                       int max = 0;
+                       if (resize) {
+                               max = newsize;
+                       }
+
+                       Slot[] array = cloud.putSlot(s, max);
+                       if (array == null) {
+                               array = new Slot[] {s};
+                               rejectedmessagelist.clear();
+
+                               // Delete pending commits that were sent to the cloud
+                               deletePendingCommits();
+
+                       }       else {
+                               if (array.length == 0)
+                                       throw new Error("Server Error: Did not send any slots");
+                               rejectedmessagelist.add(s.getSequenceNumber());
+                               doEnd = false;
+                       }
+
+                       /* update data structure */
+                       validateandupdate(array, true);
+               }
+       }
+
+       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
+
+               // encode the request
+               byte[] array = new byte[Long.BYTES + ut.getSize()];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
+               if (lastSeenCommit != null) {
+                       bbEncode.putLong(lastSeenCommit);
+               } else {
+                       bbEncode.putLong(0);
+               }
+               ut.encode(bbEncode);
+
+               byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               boolean didCommit = bbDecode.get() == 1;
+               int numberOfCommites = bbDecode.getInt();
+
+               List<Commit> newCommits = new LinkedList<Commit>();
+               for (int i = 0; i < numberOfCommites; i++ ) {
+                       bbDecode.get();
+                       Commit com = (Commit)Commit.decode(null, bbDecode);
+                       newCommits.add(com);
+               }
+
+               return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
+       }
+
+       public byte[] localCommInput(byte[] data) throws InterruptedException {
+
+
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               long lastSeenCommit = bbDecode.getLong();
+
+               Transaction ut = null;
+               if (data.length != Long.BYTES) {
+                       bbDecode.get();
+                       ut = (Transaction)Transaction.decode(null, bbDecode);
+               }
+
+               mutex.acquire();
+               // Do the local update and arbitrate
+               Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+               mutex.release();
+
+
+               // Calculate the size of the response
+               int size = Byte.BYTES + Integer.BYTES;
+               for (Commit com : returnData.getSecond()) {
+                       size += com.getSize();
+               }
+
+               // encode the response
+               byte[] array = new byte[size];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               if (returnData.getFirst()) {
+                       bbEncode.put((byte)1);
+               } else {
+                       bbEncode.put((byte)0);
+               }
+               bbEncode.putInt(returnData.getSecond().size());
+
+               for (Commit com : returnData.getSecond()) {
+                       com.encode(bbEncode);
+               }
+
+               return bbEncode.array();
+       }
+
+       private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
+
+               List<Commit> returnCommits = new ArrayList<Commit>();
+
+               if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
+                       // There is a commit that the other client has not seen yet
+
+                       Map<Long, Commit> cm = commitMap.get(localmachineid);
+                       if (cm != null) {
+
+                               List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
+                               Collections.sort(commitKeys);
+
+
+                               for (int i = (commitKeys.size() - 1); i >= 0; i--) {
+                                       Commit com = cm.get(commitKeys.get(i));
+
+                                       if (com.getSequenceNumber() <= lastCommitSeen) {
+                                               break;
+                                       }
+                                       returnCommits.add((Commit)com.getCopy(null));
+                               }
+                       }
+               }
+
+
+               if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
+                       // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+                       // or there is no transaction to process
+                       return new Pair<Boolean, List<Commit>>(false, returnCommits);
+               }
+
+               if (!ut.evaluateGuard(commitedTable, null)) {
+                       // Guard evaluated as false so return only the commits that the other device has not seen yet
+                       return new Pair<Boolean, List<Commit>>(false, returnCommits);
+               }
+
+               // create the commit
+               Commit commit = new Commit(null,
+                                          -1,
+                                          commitSequenceNumber,
+                                          ut.getArbitrator(),
+                                          ut.getkeyValueUpdateSet());
+               commitSequenceNumber = commitSequenceNumber + 1;
+
+               // Add to the pending commits list
+               pendingCommitsList.add(commit);
+
+               // Add this commit so we can send it back
+               returnCommits.add(commit);
+
+               // Prepare to process the commit
+               processEntry(commit);
+
+               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
+               // Speculate on key value pairs
+               didCommitOrSpeculate |= createSpeculativeTable();
+               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+               return new Pair<Boolean, List<Commit>>(true, returnCommits);
+       }
+
+       public void decrementLiveCount() {
+               liveslotcount--;
+       }
+
+       private void setResizeThreshold() {
+               int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
+               resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
+       }
+
+       private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
+               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
+               int newsize = 0;
+               if (liveslotcount > resizethreshold) {
+                       resize = true; //Resize is forced
+               }
+
+               if (resize) {
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
+               }
+
+               doRejectedMessages(s);
+
+               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+               // Resize was needed so redo call
+               if (retTup.getFirst()) {
+                       return tryput(pendingTrans, true);
+               }
+
+               // Extract working variables
+               boolean seenliveslot = retTup.getSecond();
+               long seqn = retTup.getThird();
+
+               doArbitration(s);
+
+               Transaction trans = new Transaction(s,
+                                                   s.getSequenceNumber(),
+                                                   localmachineid,
+                                                   pendingTrans.getArbitrator(),
+                                                   pendingTrans.getKVUpdates(),
+                                                   pendingTrans.getKVGuard());
+               boolean insertedTrans = false;
+               if (s.hasSpace(trans)) {
+                       s.addEntry(trans);
+                       insertedTrans = true;
+               }
+
+               doOptionalRescue(s, seenliveslot, seqn, resize);
+               Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
+
+               if (sendRetData.getFirst()) {
+                       // update the status and change what the sequence number is for the
+                       TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+                       transStatus.setStatus(TransactionStatus.StatusSent);
+                       transStatus.setSentTransaction();
+                       transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
+               }
+
+
+               if (sendRetData.getSecond().length != 0) {
+                       // insert into the local block chain
+                       validateandupdate(sendRetData.getSecond(), true);
+               }
+
+               return sendRetData.getFirst();
+       }
+
+       private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
+               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+               int newsize = 0;
+               if (liveslotcount > resizethreshold) {
+                       resize = true; //Resize is forced
+               }
+
+               if (resize) {
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
+               }
+
+               doRejectedMessages(s);
+               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+               // Resize was needed so redo call
+               if (retTup.getFirst()) {
+                       return tryput(keyName, arbMachineid, true);
+               }
+
+               // Extract working variables
+               boolean seenliveslot = retTup.getSecond();
+               long seqn = retTup.getThird();
+
+               doArbitration(s);
+
+               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+               boolean insertedNewKey = false;
+               if (s.hasSpace(newKey)) {
+                       s.addEntry(newKey);
+                       insertedNewKey = true;
+               }
+
+               doOptionalRescue(s, seenliveslot, seqn, resize);
+               Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
+
+               if (sendRetData.getSecond().length != 0) {
+                       // insert into the local block chain
+                       validateandupdate(sendRetData.getSecond(), true);
+               }
+
+               return sendRetData.getFirst();
+       }
+
+       private void doRejectedMessages(Slot s) {
+               if (! rejectedmessagelist.isEmpty()) {
+                       /* TODO: We should avoid generating a rejected message entry if
+                        * there is already a sufficient entry in the queue (e.g.,
+                        * equalsto value of true and same sequence number).  */
+
+                       long old_seqn = rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn = rejectedmessagelist.lastElement();
+                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn = -1;
+                               int i = 0;
+                               /* Go through list of missing messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       if (s_msg != null)
+                                               break;
+                                       prev_seqn = curr_seqn;
+                               }
+                               /* Generate rejected message entry for missing messages */
+                               if (prev_seqn != -1) {
+                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                               /* Generate rejected message entries for present messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       long machineid = s_msg.getMachineID();
+                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+       }
+
+       private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
+               long newestseqnum = buffer.getNewestSeqNum();
+               long oldestseqnum = buffer.getOldestSeqNum();
+               if (lastliveslotseqn < oldestseqnum)
+                       lastliveslotseqn = oldestseqnum;
+
+               long seqn = lastliveslotseqn;
+               boolean seenliveslot = false;
+               long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
+
+
+               // Mandatory Rescue
+               for (; seqn < threshold; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       // Push slot number forward
+                       if (! seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (! prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry)) {
+                                       s.addEntry(liveentry);
+                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+                                       if (!resize) {
+                                               System.out.println("B"); //?
+                                               return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
+                                       }
+                               }
+                       }
+               }
+
+               // Did not resize
+               return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
+       }
+
+       private boolean doArbitration(Slot s) {
+
+               // flag whether we have finished all arbitration
+               boolean stillHasArbitration = false;
+
+               pendingCommitsToDelete.clear();
+
+               // First add queue commits
+               for (Commit commit : pendingCommitsList) {
+                       if (s.hasSpace(commit)) {
+                               s.addEntry(commit);
+                               pendingCommitsToDelete.add(commit);
+                       } else {
+                               // Ran out of space so move on but still not done
+                               stillHasArbitration = true;
+                               return stillHasArbitration;
+                       }
+               }
+
+               // Arbitrate
+               Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
+               List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+               // Sort from oldest to newest
+               Collections.sort(transSeqNums);
+
+               for (Long transNum : transSeqNums) {
+                       Transaction ut = uncommittedTransactionsMap.get(transNum);
+
+                       // Check if this machine arbitrates for this transaction
+                       if (ut.getArbitrator() != localmachineid ) {
+                               continue;
+                       }
+
+                       // we did have something to arbitrate on
+                       stillHasArbitration = true;
+
+                       Entry newEntry = null;
+
+                       if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
+                               // Guard evaluated as true
+
+                               // update the local tmp current key set
+                               for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                       speculativeTableTmp.put(kv.getKey(), kv);
+                               }
+
+                               // create the commit
+                               newEntry = new Commit(s,
+                                                     ut.getSequenceNumber(),
+                                                     commitSequenceNumber,
+                                                     ut.getArbitrator(),
+                                                     ut.getkeyValueUpdateSet());
+                               commitSequenceNumber = commitSequenceNumber + 1;
+                       } else {
+                               // Guard was false
+
+                               // create the abort
+                               newEntry = new Abort(s,
+                                                    ut.getSequenceNumber(),
+                                                    ut.getMachineID(),
+                                                    ut.getArbitrator());
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
+               }
+
+               return stillHasArbitration;
+       }
+
+       private void deletePendingCommits() {
+               for (Commit com : pendingCommitsToDelete) {
+                       pendingCommitsList.remove(com);
+               }
+               pendingCommitsToDelete.clear();
+       }
+
+       private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
+               /* now go through live entries from least to greatest sequence number until
+                * either all live slots added, or the slot doesn't have enough room
+                * for SKIP_THRESHOLD consecutive entries*/
+               int skipcount = 0;
+               long newestseqnum = buffer.getNewestSeqNum();
+               search:
+               for (; seqn <= newestseqnum; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (!seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (!prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry))
+                                       s.addEntry(liveentry);
+                               else {
+                                       skipcount++;
+                                       if (skipcount > SKIP_THRESHOLD)
+                                               break search;
+                               }
+                       }
+               }
+       }
+
+       private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
+               int max = 0;
+               if (resize)
+                       max = newsize;
+
+               Slot[] array = cloud.putSlot(s, max);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       rejectedmessagelist.clear();
+
+                       // Delete pending commits that were sent to the cloud
+                       deletePendingCommits();
+               }       else {
+                       // if (array.length == 0)
+                       // throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
+                       inserted = false;
+               }
+
+               return new Pair<Boolean, Slot[]>(inserted, array);
+       }
+
+       private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+               /* The cloud communication layer has checked slot HMACs already
+                        before decoding */
+               if (newslots.length == 0) return;
+
+               // Reset the table status declared sizes
+               smallestTableStatusSeen = -1;
+               largestTableStatusSeen = -1;
+
+               long firstseqnum = newslots[0].getSequenceNumber();
+               if (firstseqnum <= sequencenumber) {
+                       throw new Error("Server Error: Sent older slots!");
+               }
+
+               SlotIndexer indexer = new SlotIndexer(newslots, buffer);
+               checkHMACChain(indexer, newslots);
+
+               HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
+
+               // initExpectedSize(firstseqnum);
+               for (Slot slot : newslots) {
+                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
+                       // updateExpectedSize();
+               }
+
+               /* If there is a gap, check to see if the server sent us everything. */
+               if (firstseqnum != (sequencenumber + 1)) {
+
+                       // TODO: Check size
+                       checkNumSlots(newslots.length);
+                       if (!machineSet.isEmpty()) {
+                               throw new Error("Missing record for machines: " + machineSet);
+                       }
+               }
+
+
+               commitNewMaxSize();
+
+               /* Commit new to slots. */
+               for (Slot slot : newslots) {
+                       buffer.putSlot(slot);
+                       liveslotcount++;
+               }
+               sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+
+               // Process all on key value pairs
+               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
+               // Speculate on key value pairs
+               didCommitOrSpeculate |= createSpeculativeTable();
+
+               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+       }
+
+       private boolean proccessAllNewCommits() {
+               // Process only if there are commit
+               if (newCommitMap.keySet().size() == 0) {
+                       return false;
+               }
+               boolean didProcessNewCommit = false;
+
+               for (Long arb : newCommitMap.keySet()) {
+
+                       List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
+
+                       // Sort from oldest to newest commit
+                       Collections.sort(commitSeqNums);
+
+                       // Go through each new commit one by one
+                       for (Long entrySeqNum : commitSeqNums) {
+                               Commit entry = newCommitMap.get(arb).get(entrySeqNum);
+
+                               long lastCommitSeenSeqNum = -1;
+                               if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
+                                       lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
+                               }
+
+                               if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
+                                       Map<Long, Commit> cm = commitMap.get(arb);
+                                       if (cm == null) {
+                                               cm = new HashMap<Long, Commit>();
+                                       }
+
+                                       Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
+                                       commitMap.put(arb, cm);
+
+                                       if (prevCommit != null) {
+                                               prevCommit.setDead();
+
+                                               for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
+                                                       committedMapByKey.put(kv.getKey(), entry);
+                                               }
+                                       }
+
+                                       continue;
+                               }
+
+                               Set<Commit> commitsToEditSet = new HashSet<Commit>();
+
+                               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                                       commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
+                               }
+
+                               commitsToEditSet.remove(null);
+
+                               for (Commit prevCommit : commitsToEditSet) {
+
+                                       Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                                       if (!prevCommit.isLive()) {
+                                               Map<Long, Commit> cm = commitMap.get(arb);
+
+                                               // remove it from the map so that it can be set as dead
+                                               if (cm != null) {
+                                                       cm.remove(prevCommit.getSequenceNumber());
+                                                       commitMap.put(arb, cm);
+                                               }
+                                       }
+                               }
+
+                               // Add the new commit
+                               Map<Long, Commit> cm = commitMap.get(arb);
+                               if (cm == null) {
+                                       cm = new HashMap<Long, Commit>();
+                               }
+                               cm.put(entry.getSequenceNumber(), entry);
+                               commitMap.put(arb, cm);
+
+                               lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
+
+                               // set the trans sequence number if we are able to
+                               if (entry.getTransSequenceNumber() != -1) {
+                                       lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+                               }
+
+                               didProcessNewCommit = true;
+
+                               // Update the committed table list
+                               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                                       IoTString key = kv.getKey();
+                                       commitedTable.put(key, kv);
+                                       committedMapByKey.put(key, entry);
+                               }
+                       }
+               }
+               // Clear the new commits storage so we can use it later
+               newCommitMap.clear();
+
+
+               // go through all saved transactions and update the status of those that can be updated
+               for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
+                       Map.Entry<Long, TransactionStatus> entry = i.next();
+                       long seqnum = entry.getKey();
+                       TransactionStatus status = entry.getValue();
+
+                       if (status.getSentTransaction()) {
+
+                               Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
+                               Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
+
+                               if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
+                                       ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
+                                       status.setStatus(TransactionStatus.StatusCommitted);
+                                       i.remove();
+                               }
+                       }
+               }
+
+               return didProcessNewCommit;
+       }
+
+       private void deleteDeadUncommittedTransactions() {
+               // Make dead the transactions
+               for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next().getValue();
+                       long transArb = prevtrans.getArbitrator();
+
+                       Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
+                       Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
+
+                       if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
+                               ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
+                               i.remove();
+                               prevtrans.setDead();
+                       }
+               }
+       }
+
+       private boolean createSpeculativeTable() {
+               if (uncommittedTransactionsMap.keySet().size() == 0) {
+                       return false;
+               }
+
+               Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
+               List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+               // Sort from oldest to newest commit
+               Collections.sort(utSeqNums);
+
+               if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
+
+                       speculativeTable.clear();
+                       lastUncommittedTransaction = -1;
+
+                       for (Long key : utSeqNums) {
+                               Transaction trans = uncommittedTransactionsMap.get(key);
+
+                               lastUncommittedTransaction = key;
+
+                               if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
+                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+                               }
+
+                       }
+               } else {
+                       for (Long key : utSeqNums) {
+
+                               if (key <= lastUncommittedTransaction) {
+                                       continue;
+                               }
+
+                               lastUncommittedTransaction = key;
+
+                               Transaction trans = uncommittedTransactionsMap.get(key);
+
+                               if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
+                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+                               }
+                       }
+               }
+
+               for (IoTString key : speculativeTableTmp.keySet()) {
+                       speculativeTable.put(key, speculativeTableTmp.get(key));
+               }
+
+               return true;
+       }
+
+       private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
+
+               if (didCommitOrSpeculate) {
+                       pendingTransSpeculativeTable.clear();
+                       lastSeenPendingTransactionSpeculateIndex = 0;
+
+                       int index = 0;
+                       for (PendingTransaction pt : pendingTransQueue) {
+                               if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+
+                                       lastSeenPendingTransactionSpeculateIndex = index;
+                                       index++;
+
+                                       for (KeyValue kv : pt.getKVUpdates()) {
+                                               pendingTransSpeculativeTable.put(kv.getKey(), kv);
+                                       }
+
+                               }
+                       }
+               }
+       }
+
+       private int expectedsize, currmaxsize;
+
+       private void checkNumSlots(int numslots) {
+
+
+               // We only have 1 size so we must have this many slots
+               if (largestTableStatusSeen == smallestTableStatusSeen) {
+                       if (numslots != smallestTableStatusSeen) {
+                               throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numslots);
+                       }
+               } else {
+                       // We have more than 1
+                       if (numslots < smallestTableStatusSeen) {
+                               throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
+                       }
+               }
+
+               // if (numslots != expectedsize) {
+               // throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
+               // }
+       }
+
+       private void initExpectedSize(long firstsequencenumber) {
+               long prevslots = firstsequencenumber;
+               expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
+               currmaxsize = numslots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+               if (expectedsize > currmaxsize) {
+                       expectedsize = currmaxsize;
+               }
+       }
+
+       private void updateCurrMaxSize(int newmaxsize) {
+               currmaxsize = newmaxsize;
+       }
+
+       private void commitNewMaxSize() {
+
+               if (largestTableStatusSeen == -1) {
+                       currmaxsize = numslots;
+               } else {
+                       currmaxsize = largestTableStatusSeen;
+               }
+
+               if (numslots != currmaxsize) {
+                       buffer.resize(currmaxsize);
+               }
+
+               numslots = currmaxsize;
+               setResizeThreshold();
+       }
+
+       private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
+       }
+
+       private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+               long oldseqnum = entry.getOldSeqNum();
+               long newseqnum = entry.getNewSeqNum();
+               boolean isequal = entry.getEqual();
+               long machineid = entry.getMachineID();
+               for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
+                       Slot slot = indexer.getSlot(seqnum);
+                       if (slot != null) {
+                               long slotmachineid = slot.getMachineID();
+                               if (isequal != (slotmachineid == machineid)) {
+                                       throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
+                               }
+                       }
+               }
+
+               HashSet<Long> watchset = new HashSet<Long>();
+               for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
+                       long entry_mid = lastmsg_entry.getKey();
+                       /* We've seen it, don't need to continue to watch.  Our next
+                        * message will implicitly acknowledge it. */
+                       if (entry_mid == localmachineid)
+                               continue;
+                       Pair<Long, Liveness> v = lastmsg_entry.getValue();
+                       long entry_seqn = v.getFirst();
+                       if (entry_seqn < newseqnum) {
+                               addWatchList(entry_mid, entry);
+                               watchset.add(entry_mid);
+                       }
+               }
+               if (watchset.isEmpty())
+                       entry.setDead();
+               else
+                       entry.setWatchSet(watchset);
+       }
+
+       private void processEntry(NewKey entry) {
+               arbitratorTable.put(entry.getKey(), entry.getMachineID());
+
+               NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
+
+               if (oldNewKey != null) {
+                       oldNewKey.setDead();
+               }
+       }
+
+       private void processEntry(Transaction entry) {
+
+               long arb = entry.getArbitrator();
+               Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
+               Long abLast = lastAbortSeenSeqNumMap.get(arb);
+
+               Transaction prevTrans = null;
+
+               if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
+                       prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+               } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
+                       prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+               } else {
+                       prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+               }
+
+               // Duplicate so delete old copy
+               if (prevTrans != null) {
+                       prevTrans.setDead();
+               }
+       }
+
+       private void processEntry(Abort entry) {
+               if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
+                       // Abort has not been seen yet so we need to keep track of it
+
+                       Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
+                       if (prevAbort != null) {
+                               prevAbort.setDead(); // delete old version of the duplicate
+                       }
+
+                       if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+                               lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+                       }
+               } else {
+                       // The machine already saw this so it is dead
+                       entry.setDead();
+               }
+
+               // Update the status of the transaction and remove it since we are done with this transaction
+               TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
+               if (status != null) {
+                       status.setStatus(TransactionStatus.StatusAborted);
+               }
+       }
+
+       private void processEntry(Commit entry) {
+               Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
+
+               if (arbMap == null) {
+                       arbMap = new HashMap<Long, Commit>();
+               }
+
+               Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
+               newCommitMap.put(entry.getTransArbitrator(), arbMap);
+
+               if (prevCommit != null) {
+                       prevCommit.setDead();
+               }
+       }
+
+       private void processEntry(TableStatus entry) {
+               int newnumslots = entry.getMaxSlots();
+               // updateCurrMaxSize(newnumslots);
+               if (lastTableStatus != null)
+                       lastTableStatus.setDead();
+               lastTableStatus = entry;
+
+               if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
+                       smallestTableStatusSeen = newnumslots;
+               }
+
+               if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
+                       largestTableStatusSeen = newnumslots;
+               }
+       }
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries = watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+               entries.add(entry);
+       }
+
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               machineSet.remove(machineid);
+
+               HashSet<RejectedMessage> watchset = watchlist.get(machineid);
+               if (watchset != null) {
+                       for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
+                               RejectedMessage rm = rmit.next();
+                               if (rm.getNewSeqNum() <= seqnum) {
+                                       /* Remove it from our watchlist */
+                                       rmit.remove();
+                                       /* Decrement machines that need to see this notification */
+                                       rm.removeWatcher(machineid);
+                               }
+                       }
+               }
+
+               if (machineid == localmachineid) {
+                       /* Our own messages are immediately dead. */
+                       if (liveness instanceof LastMessage) {
+                               ((LastMessage)liveness).setDead();
+                       } else if (liveness instanceof Slot) {
+                               ((Slot)liveness).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
+               }
+
+               // Set dead the abort
+               for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
+                       Abort abort = i.next().getValue();
+
+                       if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
+                               abort.setDead();
+                               i.remove();
+                       }
+               }
+
+               Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+               if (lastmsgentry == null)
+                       return;
+
+               long lastmsgseqnum = lastmsgentry.getFirst();
+               Liveness lastentry = lastmsgentry.getSecond();
+               if (machineid != localmachineid) {
+                       if (lastentry instanceof LastMessage) {
+                               ((LastMessage)lastentry).setDead();
+                       } else if (lastentry instanceof Slot) {
+                               ((Slot)lastentry).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
+               }
+
+               if (machineid == localmachineid) {
+                       if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
+                               throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  seqnum + " got: " + lastmsgseqnum);
+               } else {
+                       if (lastmsgseqnum > seqnum)
+                               throw new Error("Server Error: Rollback on remote machine sequence number");
+               }
+       }
+
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
+               for (Entry entry : slot.getEntries()) {
+                       switch (entry.getType()) {
+
+                       case Entry.TypeNewKey:
+                               processEntry((NewKey)entry);
+                               break;
+
+                       case Entry.TypeCommit:
+                               processEntry((Commit)entry);
+                               break;
+
+                       case Entry.TypeAbort:
+                               processEntry((Abort)entry);
+                               break;
+
+                       case Entry.TypeTransaction:
+                               processEntry((Transaction)entry);
+                               break;
+
+                       case Entry.TypeLastMessage:
+                               processEntry((LastMessage)entry, machineSet);
+                               break;
+
+                       case Entry.TypeRejectedMessage:
+                               processEntry((RejectedMessage)entry, indexer);
+                               break;
+
+                       case Entry.TypeTableStatus:
+                               processEntry((TableStatus)entry);
+                               break;
+
+                       default:
+                               throw new Error("Unrecognized type: " + entry.getType());
+                       }
+               }
+       }
+
+       private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+               for (int i = 0; i < newslots.length; i++) {
+                       Slot currslot = newslots[i];
+                       Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
+                       if (prevslot != null &&
+                               !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
+                               throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
+               }
+       }
+}
\ No newline at end of file