API Changes
authorAli Younis <ayounis@uci.edu>
Tue, 3 Jan 2017 22:41:14 +0000 (14:41 -0800)
committerAli Younis <ayounis@uci.edu>
Tue, 3 Jan 2017 22:41:14 +0000 (14:41 -0800)
version2/src/java/iotcloud/Abort.java
version2/src/java/iotcloud/CloudComm.java
version2/src/java/iotcloud/Commit.java
version2/src/java/iotcloud/Entry.java
version2/src/java/iotcloud/LocalComm.java [new file with mode: 0644]
version2/src/java/iotcloud/PendingTransaction.java
version2/src/java/iotcloud/Table.java
version2/src/java/iotcloud/Test.java
version2/src/java/iotcloud/Transaction.java
version2/src/java/iotcloud/TransactionStatus.java [new file with mode: 0644]

index f27c787d1a8b60d9861b58f52f6c10371600ffdf..327ce336b9a68e358c0d8930616ef66c83c7d78c 100644 (file)
@@ -15,7 +15,7 @@ class Abort extends Entry {
        private long transarbitrator;
 
 
-       public Abort(Slot slot, long _seqnumtrans, long _machineid, long _transarbitrator) {
+       public Abort(Slot slot, long _seqnumtrans,  long _machineid, long _transarbitrator) {
                super(slot);
                seqnumtrans = _seqnumtrans;
                machineid = _machineid;
@@ -30,16 +30,16 @@ class Abort extends Entry {
                return seqnumtrans;
        }
 
+
        public long getTransArbitrator() {
                return transarbitrator;
        }
 
-
        static Entry decode(Slot slot, ByteBuffer bb) {
                long seqnumtrans = bb.getLong();
                long machineid = bb.getLong();
                long transarbitrator = bb.getLong();
-               return new Abort(slot, seqnumtrans, machineid, transarbitrator);
+               return new Abort(slot, seqnumtrans,  machineid, transarbitrator);
        }
 
        public void encode(ByteBuffer bb) {
index 5ee249d18776a144d5fbc3ee32df8d08b32e3565..d7a01d6384f58cb40c8b21544fb2a38b6b22fca0 100644 (file)
@@ -15,6 +15,7 @@ import java.security.SecureRandom;
 
 
 class CloudComm {
+       String hostname;
        String baseurl;
        Cipher encryptCipher;
        Cipher decryptCipher;
@@ -35,8 +36,9 @@ class CloudComm {
        /**
         * Constructor for actual use. Takes in the url and password.
         */
-       CloudComm(Table _table, String _baseurl, String _password) {
+       CloudComm(Table _table, String _hostname, String _baseurl, String _password) {
                this.table = _table;
+               this.hostname = _hostname;
                this.baseurl = _baseurl;
                this.password = _password;
                this.random = new SecureRandom();
@@ -222,6 +224,15 @@ class CloudComm {
                }
        }
 
+       public boolean hasConnection() {
+               try {
+                       InetAddress address = InetAddress.getByName(hostname);
+                       return address.isReachable(TIMEOUT_MILLIS);
+               } catch (Exception e) {
+                       return false;
+               }
+       }
+
        /**
         * Method that actually handles building Slot objects from the
         * server response.  Shared by both putSlot and getSlots.
@@ -244,4 +255,8 @@ class CloudComm {
                dis.close();
                return slots;
        }
+
+
+
+
 }
index deeb501c8ef47ed51192393914c66a270790e5db..fb52e6738e4854318bb9e578a6613e8f74855749 100644 (file)
@@ -14,14 +14,16 @@ import java.util.Iterator;
 
 class Commit extends Entry {
        private long seqnumtrans;
+       private long seqnumcommit;
        private long transarbitrator;
 
        private Set<KeyValue> keyValueUpdateSet = null;
 
 
-       public Commit(Slot slot, long _seqnumtrans, long _transarbitrator, Set<KeyValue> _keyValueUpdateSet) {
+       public Commit(Slot slot, long _seqnumtrans,  long _seqnumcommit, long _transarbitrator, Set<KeyValue> _keyValueUpdateSet) {
                super(slot);
                seqnumtrans = _seqnumtrans;
+               seqnumcommit = _seqnumcommit;
                transarbitrator = _transarbitrator;
 
                keyValueUpdateSet = new HashSet<KeyValue>();
@@ -35,6 +37,9 @@ class Commit extends Entry {
        public long getTransSequenceNumber() {
                return seqnumtrans;
        }
+       public long getSequenceNumber() {
+               return seqnumcommit;
+       }
 
        public long getTransArbitrator() {
                return transarbitrator;
@@ -49,7 +54,7 @@ class Commit extends Entry {
        }
 
        public int getSize() {
-               int size = 2 * Long.BYTES + Byte.BYTES; // seq id, entry type
+               int size = 3 * Long.BYTES + Byte.BYTES; // seq id, entry type
                size += Integer.BYTES; // number of KV's
 
                // Size of each KV
@@ -62,6 +67,7 @@ class Commit extends Entry {
 
        static Entry decode(Slot slot, ByteBuffer bb) {
                long seqnumtrans = bb.getLong();
+               long seqnumcommit = bb.getLong();
                long transarbitrator = bb.getLong();
                int numberOfKeys = bb.getInt();
 
@@ -71,12 +77,13 @@ class Commit extends Entry {
                        kvSet.add(kv);
                }
 
-               return new Commit(slot, seqnumtrans, transarbitrator, kvSet);
+               return new Commit(slot, seqnumtrans, seqnumcommit, transarbitrator, kvSet);
        }
 
        public void encode(ByteBuffer bb) {
                bb.put(Entry.TypeCommit);
                bb.putLong(seqnumtrans);
+               bb.putLong(seqnumcommit);
                bb.putLong(transarbitrator);
 
                bb.putInt(keyValueUpdateSet.size());
@@ -87,7 +94,7 @@ class Commit extends Entry {
        }
 
        public Entry getCopy(Slot s) {
-               return new Commit(s, seqnumtrans, transarbitrator, keyValueUpdateSet);
+               return new Commit(s, seqnumtrans, seqnumcommit, transarbitrator, keyValueUpdateSet);
        }
 
        public Set<KeyValue> updateLiveKeys(Set<KeyValue> kvSet) {
index c5a680783c0ba2408f6e918dfb6bc1175864721a..8395deca5ea297091af4a7e3afc088dcb5fba07f 100644 (file)
@@ -86,7 +86,10 @@ abstract class Entry implements Liveness {
                }
 
                islive = false;
-               parentslot.decrementLiveCount();
+
+               if (parentslot != null) {
+                       parentslot.decrementLiveCount();
+               }
        }
 
        /**
diff --git a/version2/src/java/iotcloud/LocalComm.java b/version2/src/java/iotcloud/LocalComm.java
new file mode 100644 (file)
index 0000000..d6d3491
--- /dev/null
@@ -0,0 +1,22 @@
+package iotcloud;
+
+class LocalComm {
+    private Table t1;
+    private Table t2;
+
+    public LocalComm(Table _t1, Table _t2) {
+        t1 = _t1;
+        t2 = _t2;
+    }
+
+    public byte[] sendDataToLocalDevice(Long deviceId, byte[] data) {
+        if (deviceId == t1.getId()) {
+            return t1.localCommInput(data);
+        } else if (deviceId == t2.getId()) {
+            return t2.localCommInput(data);
+        }
+        else {
+            throw new Error("Cannot send to " + deviceId + " using this local comm");
+        }
+    }
+}
\ No newline at end of file
index 578f1eb01212a5cdbce2a3d2c724ee384bcf4ec4..1a14674aa8442816cdbb0b16b45bfc3a112294ea 100644 (file)
@@ -13,6 +13,7 @@ class PendingTransaction {
     private Set<KeyValue> keyValueUpdateSet = null;
     private Set<KeyValue> keyValueGuardSet = null;
     private long arbitrator = -1;
+    private long machineLocalTransSeqNum = -1;
 
     public PendingTransaction() {
         keyValueUpdateSet = new HashSet<KeyValue>();
@@ -94,6 +95,14 @@ class PendingTransaction {
         return keyValueGuardSet;
     }
 
+    public void setMachineLocalTransSeqNum(long _machineLocalTransSeqNum) {
+        machineLocalTransSeqNum = _machineLocalTransSeqNum;
+    }
+
+    public long getMachineLocalTransSeqNum() {
+        return machineLocalTransSeqNum;
+    }
+
     public boolean evaluateGuard(Map<IoTString, KeyValue> keyValTableCommitted, Map<IoTString, KeyValue> keyValTableSpeculative, Map<IoTString, KeyValue> keyValTablePendingTransSpeculative) {
         for (KeyValue kvGuard : keyValueGuardSet) {
 
index 6aeb28e3628c57f611e56921717e83f924bad66a..be9defb91cef43d5921db27bd9f0ed3e9e3bb02e 100644 (file)
@@ -13,6 +13,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.Collection;
 import java.util.Collections;
+import java.nio.ByteBuffer;
 
 
 /**
@@ -24,9 +25,6 @@ import java.util.Collections;
 final public class Table {
        private int numslots;   //number of slots stored in buffer
 
-       //table of key-value pairs
-       //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
-
        // machine id -> (sequence number, Slot or LastMessage); records last message by each client
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
        // machine id -> ...
@@ -52,10 +50,12 @@ final public class Table {
        private int smallestTableStatusSeen = -1;
        private int largestTableStatusSeen = -1;
        private int lastSeenPendingTransactionSpeculateIndex = 0;
+       private int commitSequenceNumber = 0;
+       private long localTransactionSequenceNumber = 0;
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
-       private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
+       private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
        private Map<Long, Abort> abortMap = null; // Set of the live aborts
        private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
        private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
@@ -63,20 +63,24 @@ final public class Table {
        private Map<Long, Transaction> uncommittedTransactionsMap = null;
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
        private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
-       private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
+       private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
        private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
-       private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
        private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
+       private List<Commit> pendingCommitsList = null;
+       private List<Commit> pendingCommitsToDelete = null;
+       private Map<Long, LocalComm> localCommunicationChannels;
+       private Map<Long, TransactionStatus> transactionStatusMap = null;
 
 
-
-       public Table(String baseurl, String password, long _localmachineid) {
+       public Table(String hostname, String baseurl, String password, long _localmachineid) {
                localmachineid = _localmachineid;
                buffer = new SlotBuffer();
                numslots = buffer.capacity();
                setResizeThreshold();
                sequencenumber = 0;
-               cloud = new CloudComm(this, baseurl, password);
+               cloud = new CloudComm(this, hostname, baseurl, password);
                lastliveslotseqn = 1;
 
                setupDataStructs();
@@ -95,7 +99,7 @@ final public class Table {
 
        private void setupDataStructs() {
                pendingTransQueue = new LinkedList<PendingTransaction>();
-               commitMap = new HashMap<Long, Commit>();
+               commitMap = new HashMap<Long, Map<Long, Commit>>();
                abortMap = new HashMap<Long, Abort>();
                committedMapByKey = new HashMap<IoTString, Commit>();
                commitedTable = new HashMap<IoTString, KeyValue>();
@@ -103,10 +107,30 @@ final public class Table {
                uncommittedTransactionsMap = new HashMap<Long, Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
                newKeyTable = new HashMap<IoTString, NewKey>();
-               newCommitMap = new HashMap<Long, Commit>();
+               newCommitMap = new HashMap<Long, Map<Long, Commit>>();
                lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
+               lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
                lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
                pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
+               pendingCommitsList = new LinkedList<Commit>();
+               pendingCommitsToDelete = new LinkedList<Commit>();
+               localCommunicationChannels = new HashMap<Long, LocalComm>();
+               transactionStatusMap = new HashMap<Long, TransactionStatus>();
+       }
+
+       public void initTable() throws ServerException {
+               cloud.setSalt();//Set the salt
+               Slot s = new Slot(this, 1, localmachineid);
+               TableStatus status = new TableStatus(s, numslots);
+               s.addEntry(status);
+               Slot[] array = cloud.putSlot(s, numslots);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       /* update data structure */
+                       validateandupdate(array, true);
+               } else {
+                       throw new Error("Error on initialization");
+               }
        }
 
        public void rebuild() throws ServerException {
@@ -114,45 +138,112 @@ final public class Table {
                validateandupdate(newslots, true);
        }
 
-       // // TODO: delete method
-       // public void printSlots() {
-       //      long o = buffer.getOldestSeqNum();
-       //      long n = buffer.getNewestSeqNum();
-
-       //      int[] types = new int[10];
-
-       //      int num = 0;
-
-       //      int livec = 0;
-       //      int deadc = 0;
-       //      for (long i = o; i < (n + 1); i++) {
-       //              Slot s = buffer.getSlot(i);
-
-       //              Vector<Entry> entries = s.getEntries();
-
-       //              for (Entry e : entries) {
-       //                      if (e.isLive()) {
-       //                              int type = e.getType();
-       //                              types[type] = types[type] + 1;
-       //                              num++;
-       //                              livec++;
-       //                      } else {
-       //                              deadc++;
-       //                      }
-       //              }
-       //      }
-
-       //      for (int i = 0; i < 10; i++) {
-       //              System.out.println(i + "    " + types[i]);
-       //      }
-       //      System.out.println("Live count:   " + livec);
-       //      System.out.println("Dead count:   " + deadc);
-       //      System.out.println("Old:   " + o);
-       //      System.out.println("New:   " + n);
-       //      System.out.println("Size:   " + buffer.size());
-       //      System.out.println("Commits Map:   " + commitedTable.size());
-       //      System.out.println("Commits List:   " + commitMap.size());
-       // }
+       // TODO: delete method
+       public void printSlots() {
+               long o = buffer.getOldestSeqNum();
+               long n = buffer.getNewestSeqNum();
+
+               int[] types = new int[10];
+
+               int num = 0;
+
+               int livec = 0;
+               int deadc = 0;
+               for (long i = o; i < (n + 1); i++) {
+                       Slot s = buffer.getSlot(i);
+
+                       Vector<Entry> entries = s.getEntries();
+
+                       for (Entry e : entries) {
+                               if (e.isLive()) {
+                                       int type = e.getType();
+                                       types[type] = types[type] + 1;
+                                       num++;
+                                       livec++;
+                               } else {
+                                       deadc++;
+                               }
+                       }
+               }
+
+               for (int i = 0; i < 10; i++) {
+                       System.out.println(i + "    " + types[i]);
+               }
+               System.out.println("Live count:   " + livec);
+               System.out.println("Dead count:   " + deadc);
+               System.out.println("Old:   " + o);
+               System.out.println("New:   " + n);
+               System.out.println("Size:   " + buffer.size());
+               System.out.println("Commits Key Map:   " + commitedTable.size());
+               // System.out.println("Commits Live Map:   " + commitMap.size());
+               System.out.println("Pending:   " + pendingTransQueue.size());
+
+               // List<IoTString> strList = new ArrayList<IoTString>();
+               // for (int i = 0; i < 100; i++) {
+               //      String keyA = "a" + i;
+               //      String keyB = "b" + i;
+               //      String keyC = "c" + i;
+               //      String keyD = "d" + i;
+
+               //      IoTString iKeyA = new IoTString(keyA);
+               //      IoTString iKeyB = new IoTString(keyB);
+               //      IoTString iKeyC = new IoTString(keyC);
+               //      IoTString iKeyD = new IoTString(keyD);
+
+               //      strList.add(iKeyA);
+               //      strList.add(iKeyB);
+               //      strList.add(iKeyC);
+               //      strList.add(iKeyD);
+               // }
+
+
+               // for (Long l : commitMap.keySet()) {
+               //      for (Long l2 : commitMap.get(l).keySet()) {
+               //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
+               //                      strList.remove(kv.getKey());
+               //                      System.out.print(kv.getKey() + "    ");
+               //              }
+               //      }
+               // }
+
+               // System.out.println();
+               // System.out.println();
+
+               // for (IoTString s : strList) {
+               //      System.out.print(s + "    ");
+               // }
+               // System.out.println();
+               // System.out.println(strList.size());
+       }
+
+       public long getId() {
+               return localmachineid;
+       }
+
+       public boolean hasConnection() {
+               return cloud.hasConnection();
+       }
+
+       public String toString() {
+               String retString = " Committed Table: \n";
+               retString += "---------------------------\n";
+               retString += commitedTable.toString();
+
+               retString += "\n\n";
+
+               retString += " Speculative Table: \n";
+               retString += "---------------------------\n";
+               retString += speculativeTable.toString();
+
+               return retString;
+       }
+
+       public void addLocalComm(long machineId, LocalComm lc) {
+               localCommunicationChannels.put(machineId, lc);
+       }
+       public Long getArbitrator(IoTString key) {
+               return arbitratorTable.get(key);
+       }
 
        public IoTString getCommitted(IoTString key) {
                KeyValue kv = commitedTable.get(key);
@@ -234,37 +325,24 @@ final public class Table {
                }
        }
 
-       public Long getArbitrator(IoTString key) {
-               return arbitratorTable.get(key);
-       }
-
-       public void initTable() throws ServerException {
-               cloud.setSalt();//Set the salt
-               Slot s = new Slot(this, 1, localmachineid);
-               TableStatus status = new TableStatus(s, numslots);
-               s.addEntry(status);
-               Slot[] array = cloud.putSlot(s, numslots);
-               if (array == null) {
-                       array = new Slot[] {s};
-                       /* update data structure */
-                       validateandupdate(array, true);
-               } else {
-                       throw new Error("Error on initialization");
-               }
-       }
+       public void update() {
+               try {
+                       Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+                       validateandupdate(newslots, false);
 
-       public String toString() {
-               String retString = " Committed Table: \n";
-               retString += "---------------------------\n";
-               retString += commitedTable.toString();
+                       if (!pendingTransQueue.isEmpty()) {
 
-               retString += "\n\n";
+                               // We have a pending transaction so do full insertion
+                               processPendingTrans();
+                       } else {
 
-               retString += " Speculative Table: \n";
-               retString += "---------------------------\n";
-               retString += speculativeTable.toString();
+                               // We dont have a pending transaction so do minimal effort
+                               updateWithNotPendingTrans();
+                       }
 
-               return retString;
+               } catch (Exception e) {
+                       // could not update so do nothing
+               }
        }
 
        public void startTransaction() {
@@ -272,150 +350,372 @@ final public class Table {
                pendingTransBuild = new PendingTransaction();
        }
 
-       public void commitTransaction() throws ServerException {
+       public void addKV(IoTString key, IoTString value) {
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               KeyValue kv = new KeyValue(key, value);
+               pendingTransBuild.addKV(kv);
+       }
+
+       public TransactionStatus commitTransaction() {
 
                if (pendingTransBuild.getKVUpdates().size() == 0) {
-                       // If no updates are made then there is no point inserting into the chain
-                       return;
+
+                       // transaction with no updates will have no effect on the system
+                       return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
                }
 
-               // Add the pending transaction to the queue
-               pendingTransQueue.add(pendingTransBuild);
+               TransactionStatus transStatus = null;
 
-               for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
-                       PendingTransaction pt = pendingTransQueue.get(i);
+               if (pendingTransBuild.getArbitrator() != localmachineid) {
 
-                       if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+                       // set the local sequence number so we can recognize this transaction later
+                       pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
+                       localTransactionSequenceNumber++;
 
-                               lastSeenPendingTransactionSpeculateIndex = i;
+                       transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
+                       transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
 
-                               for (KeyValue kv : pt.getKVUpdates()) {
-                                       pendingTransSpeculativeTable.put(kv.getKey(), kv);
-                               }
+                       // Add the pending transaction to the queue
+                       pendingTransQueue.add(pendingTransBuild);
 
-                       }
-               }
 
+                       for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
+                               PendingTransaction pt = pendingTransQueue.get(i);
 
-               // Delete since already inserted
-               pendingTransBuild = new PendingTransaction();
+                               if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
 
-               while (!pendingTransQueue.isEmpty()) {
-                       if (tryput( pendingTransQueue.peek(), false)) {
-                               pendingTransQueue.poll();
+                                       lastSeenPendingTransactionSpeculateIndex = i;
+
+                                       for (KeyValue kv : pt.getKVUpdates()) {
+                                               pendingTransSpeculativeTable.put(kv.getKey(), kv);
+                                       }
+
+                               }
                        }
-               }
-       }
+               } else {
+                       Transaction ut = new Transaction(null,
+                                                        -1,
+                                                        localmachineid,
+                                                        pendingTransBuild.getArbitrator(),
+                                                        pendingTransBuild.getKVUpdates(),
+                                                        pendingTransBuild.getKVGuard());
 
-       public void addKV(IoTString key, IoTString value) {
+                       Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
 
-               if (arbitratorTable.get(key) == null) {
-                       throw new Error("Key not Found.");
+                       if (retData.getFirst()) {
+                               transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
+                       } else {
+                               transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
+                       }
                }
 
-               // Make sure new key value pair matches the current arbitrator
-               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
-                       // TODO: Maybe not throw en error
-                       throw new Error("Not all Key Values Match Arbitrator.");
+               // Try to insert transactions if possible
+               if (!pendingTransQueue.isEmpty()) {
+                       // We have a pending transaction so do full insertion
+                       processPendingTrans();
+               } else {
+                       try {
+                               // We dont have a pending transaction so do minimal effort
+                               updateWithNotPendingTrans();
+                       } catch (Exception e) {
+                               // Do nothing
+                       }
                }
 
-               KeyValue kv = new KeyValue(key, value);
-               pendingTransBuild.addKV(kv);
+               // reset it so next time is fresh
+               pendingTransBuild = new PendingTransaction();
+
+               return transStatus;
        }
 
-       public void update() throws ServerException {
-               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
-               validateandupdate(newslots, false);
+       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
 
-               if (!pendingTransQueue.isEmpty()) {
-                       System.out.println("Full Update");
+               while (true) {
+                       if (arbitratorTable.get(keyName) != null) {
+                               // There is already an arbitrator
+                               return false;
+                       }
 
-                       // We have a pending transaction so do full insertion
+                       if (tryput(keyName, machineId, false)) {
+                               // If successfully inserted
+                               return true;
+                       }
+               }
+       }
+
+       private void processPendingTrans() {
 
+               boolean sentAllPending = false;
+               try {
                        while (!pendingTransQueue.isEmpty()) {
                                if (tryput( pendingTransQueue.peek(), false)) {
                                        pendingTransQueue.poll();
                                }
                        }
-               } else {
-                       // We dont have a pending transaction so do minimal effort
-                       if (uncommittedTransactionsMap.keySet().size() > 0) {
-
-                               boolean doEnd = false;
-                               boolean needResize = false;
-                               while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
-                                       boolean resize = needResize;
-                                       needResize = false;
-
-                                       Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
-                                       int newsize = 0;
-                                       if (liveslotcount > resizethreshold) {
-                                               resize = true; //Resize is forced
-                                       }
 
-                                       if (resize) {
-                                               newsize = (int) (numslots * RESIZE_MULTIPLE);
-                                               TableStatus status = new TableStatus(s, newsize);
-                                               s.addEntry(status);
-                                       }
+                       // if got here then all pending transactions were sent
+                       sentAllPending = true;
+               } catch (Exception e) {
+                       // There was a connection error
+                       e.printStackTrace();
+                       sentAllPending = false;
+               }
 
-                                       doRejectedMessages(s);
 
-                                       ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+               if (!sentAllPending) {
 
-                                       // Resize was needed so redo call
-                                       if (retTup.getFirst()) {
-                                               needResize = true;
-                                               continue;
-                                       }
+                       for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
+                               PendingTransaction pt = i.next();
+                               LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
+                               if (lc == null) {
+                                       // Cant talk directly to arbitrator so cant do anything
+                                       continue;
+                               }
 
-                                       // Extract working variables
-                                       boolean seenliveslot = retTup.getSecond();
-                                       long seqn = retTup.getThird();
 
-                                       // Did need to arbitrate
-                                       doEnd = !doArbitration(s);
+                               Transaction ut = new Transaction(null,
+                                                                -1,
+                                                                localmachineid,
+                                                                pendingTransBuild.getArbitrator(),
+                                                                pendingTransBuild.getKVUpdates(),
+                                                                pendingTransBuild.getKVGuard());
 
-                                       doOptionalRescue(s, seenliveslot, seqn, resize);
 
-                                       int max = 0;
-                                       if (resize) {
-                                               max = newsize;
-                                       }
+                               Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
+
+                               for (Commit commit : retData.getSecond()) {
+                                       // Prepare to process the commit
+                                       processEntry(commit);
+                               }
+
+                               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+                               // Go through all uncommitted transactions and kill the ones that are dead
+                               deleteDeadUncommittedTransactions();
 
-                                       Slot[] array = cloud.putSlot(s, max);
-                                       if (array == null) {
-                                               array = new Slot[] {s};
-                                               rejectedmessagelist.clear();
-                                       }       else {
-                                               if (array.length == 0)
-                                                       throw new Error("Server Error: Did not send any slots");
-                                               rejectedmessagelist.add(s.getSequenceNumber());
-                                               doEnd = false;
+                               // Speculate on key value pairs
+                               didCommitOrSpeculate |= createSpeculativeTable();
+                               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+
+                               if (retData.getFirst()) {
+                                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       if (transStatus != null) {
+                                               transStatus.setStatus(TransactionStatus.StatusCommitted);
                                        }
 
-                                       /* update data structure */
-                                       validateandupdate(array, true);
+                               } else {
+                                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       if (transStatus != null) {
+                                               transStatus.setStatus(TransactionStatus.StatusAborted);
+                                       }
                                }
                        }
                }
        }
 
-       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
+       private void updateWithNotPendingTrans() throws ServerException {
 
-               while (true) {
-                       if (arbitratorTable.get(keyName) != null) {
-                               // There is already an arbitrator
-                               return false;
+               boolean doEnd = false;
+               boolean needResize = false;
+               while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
+                       boolean resize = needResize;
+                       needResize = false;
+
+                       Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+                       int newsize = 0;
+                       if (liveslotcount > resizethreshold) {
+                               resize = true; //Resize is forced
                        }
 
-                       if (tryput(keyName, machineId, false)) {
-                               // If successfully inserted
-                               return true;
+                       if (resize) {
+                               newsize = (int) (numslots * RESIZE_MULTIPLE);
+                               TableStatus status = new TableStatus(s, newsize);
+                               s.addEntry(status);
+                       }
+
+                       doRejectedMessages(s);
+
+                       ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+                       // Resize was needed so redo call
+                       if (retTup.getFirst()) {
+                               needResize = true;
+                               continue;
                        }
+
+                       // Extract working variables
+                       boolean seenliveslot = retTup.getSecond();
+                       long seqn = retTup.getThird();
+
+                       // Did need to arbitrate
+                       doEnd = !doArbitration(s);
+
+                       doOptionalRescue(s, seenliveslot, seqn, resize);
+
+                       int max = 0;
+                       if (resize) {
+                               max = newsize;
+                       }
+
+                       Slot[] array = cloud.putSlot(s, max);
+                       if (array == null) {
+                               array = new Slot[] {s};
+                               rejectedmessagelist.clear();
+
+                               // Delete pending commits that were sent to the cloud
+                               deletePendingCommits();
+
+                       }       else {
+                               if (array.length == 0)
+                                       throw new Error("Server Error: Did not send any slots");
+                               rejectedmessagelist.add(s.getSequenceNumber());
+                               doEnd = false;
+                       }
+
+                       /* update data structure */
+                       validateandupdate(array, true);
                }
        }
 
+       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+
+               // encode the request
+               byte[] array = new byte[Long.BYTES + ut.getSize()];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
+               if (lastSeenCommit != null) {
+                       bbEncode.putLong(lastSeenCommit);
+               } else {
+                       bbEncode.putLong(0);
+               }
+               ut.encode(bbEncode);
+
+               byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               boolean didCommit = bbDecode.get() == 1;
+               int numberOfCommites = bbDecode.getInt();
+
+               List<Commit> newCommits = new LinkedList<Commit>();
+               for (int i = 0; i < numberOfCommites; i++ ) {
+                       bbDecode.get();
+                       Commit com = (Commit)Commit.decode(null, bbDecode);
+                       newCommits.add(com);
+               }
+
+               return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
+       }
+
+       public byte[] localCommInput(byte[] data) {
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               long lastSeenCommit = bbDecode.getLong();
+               bbDecode.get();
+               Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
+
+               // Do the local update and arbitrate
+               Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+
+               // Calculate the size of the response
+               int size = Byte.BYTES + Integer.BYTES;
+               for (Commit com : returnData.getSecond()) {
+                       size += com.getSize();
+               }
+
+               // encode the response
+               byte[] array = new byte[size];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               if (returnData.getFirst()) {
+                       bbEncode.put((byte)1);
+               } else {
+                       bbEncode.put((byte)0);
+               }
+               bbEncode.putInt(returnData.getSecond().size());
+
+               for (Commit com : returnData.getSecond()) {
+                       com.encode(bbEncode);
+               }
+
+               return bbEncode.array();
+       }
+
+       private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
+
+               if (ut.getArbitrator() != localmachineid) {
+                       // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+                       return null;
+               }
+
+               List<Commit> returnCommits = new ArrayList<Commit>();
+
+               if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
+                       // There is a commit that the other client has not seen yet
+
+                       Map<Long, Commit> cm = commitMap.get(localmachineid);
+                       if (cm != null) {
+
+                               List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
+                               Collections.sort(commitKeys);
+
+
+                               for (int i = (commitKeys.size() - 1); i >= 0; i--) {
+                                       Commit com = cm.get(commitKeys.get(i));
+
+                                       if (com.getSequenceNumber() <= lastCommitSeen) {
+                                               break;
+                                       }
+                                       returnCommits.add((Commit)com.getCopy(null));
+                               }
+                       }
+               }
+
+               if (!ut.evaluateGuard(commitedTable, null)) {
+                       // Guard evaluated as false so return only the commits that the other device has not seen yet
+                       return new Pair<Boolean, List<Commit>>(false, returnCommits);
+               }
+
+               // create the commit
+               Commit commit = new Commit(null,
+                                          -1,
+                                          commitSequenceNumber,
+                                          ut.getArbitrator(),
+                                          ut.getkeyValueUpdateSet());
+               commitSequenceNumber = commitSequenceNumber + 1;
+
+               // Add to the pending commits list
+               pendingCommitsList.add(commit);
+
+               // Add this commit so we can send it back
+               returnCommits.add(commit);
+
+               // Prepare to process the commit
+               processEntry(commit);
+
+               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
+               // Speculate on key value pairs
+               didCommitOrSpeculate |= createSpeculativeTable();
+               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+               return new Pair<Boolean, List<Commit>>(true, returnCommits);
+       }
+
        public void decrementLiveCount() {
                liveslotcount--;
        }
@@ -467,7 +767,23 @@ final public class Table {
                }
 
                doOptionalRescue(s, seenliveslot, seqn, resize);
-               return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
+               Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
+
+               if (sendRetData.getFirst()) {
+                       // update the status and change what the sequence number is for the
+                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+                       transStatus.setStatus(TransactionStatus.StatusSent);
+                       transStatus.setSentTransaction();
+                       transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
+               }
+
+
+               if (sendRetData.getSecond().length != 0) {
+                       // insert into the local block chain
+                       validateandupdate(sendRetData.getSecond(), true);
+               }
+
+               return sendRetData.getFirst();
        }
 
        private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
@@ -506,7 +822,14 @@ final public class Table {
                }
 
                doOptionalRescue(s, seenliveslot, seqn, resize);
-               return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
+               Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
+
+               if (sendRetData.getSecond().length != 0) {
+                       // insert into the local block chain
+                       validateandupdate(sendRetData.getSecond(), true);
+               }
+
+               return sendRetData.getFirst();
        }
 
        private void doRejectedMessages(Slot s) {
@@ -588,6 +911,24 @@ final public class Table {
        }
 
        private boolean doArbitration(Slot s) {
+
+               // flag whether we have finished all arbitration
+               boolean stillHasArbitration = false;
+
+               pendingCommitsToDelete.clear();
+
+               // First add queue commits
+               for (Commit commit : pendingCommitsList) {
+                       if (s.hasSpace(commit)) {
+                               s.addEntry(commit);
+                               pendingCommitsToDelete.add(commit);
+                       } else {
+                               // Ran out of space so move on but still not done
+                               stillHasArbitration = true;
+                               return stillHasArbitration;
+                       }
+               }
+
                // Arbitrate
                Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
                List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
@@ -595,7 +936,6 @@ final public class Table {
                // Sort from oldest to newest
                Collections.sort(transSeqNums);
 
-               boolean didNeedArbitration = false;
                for (Long transNum : transSeqNums) {
                        Transaction ut = uncommittedTransactionsMap.get(transNum);
 
@@ -605,7 +945,7 @@ final public class Table {
                        }
 
                        // we did have something to arbitrate on
-                       didNeedArbitration = true;
+                       stillHasArbitration = true;
 
                        Entry newEntry = null;
 
@@ -618,12 +958,20 @@ final public class Table {
                                }
 
                                // create the commit
-                               newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
+                               newEntry = new Commit(s,
+                                                     ut.getSequenceNumber(),
+                                                     commitSequenceNumber,
+                                                     ut.getArbitrator(),
+                                                     ut.getkeyValueUpdateSet());
+                               commitSequenceNumber = commitSequenceNumber + 1;
                        } else {
                                // Guard was false
 
                                // create the abort
-                               newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
+                               newEntry = new Abort(s,
+                                                    ut.getSequenceNumber(),
+                                                    ut.getMachineID(),
+                                                    ut.getArbitrator());
                        }
 
                        if ((newEntry != null) && s.hasSpace(newEntry)) {
@@ -633,7 +981,14 @@ final public class Table {
                        }
                }
 
-               return didNeedArbitration;
+               return stillHasArbitration;
+       }
+
+       private void deletePendingCommits() {
+               for (Commit com : pendingCommitsToDelete) {
+                       pendingCommitsList.remove(com);
+               }
+               pendingCommitsToDelete.clear();
        }
 
        private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
@@ -665,7 +1020,7 @@ final public class Table {
                }
        }
 
-       private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
+       private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
                int max = 0;
                if (resize)
                        max = newsize;
@@ -674,6 +1029,9 @@ final public class Table {
                if (array == null) {
                        array = new Slot[] {s};
                        rejectedmessagelist.clear();
+
+                       // Delete pending commits that were sent to the cloud
+                       deletePendingCommits();
                }       else {
                        // if (array.length == 0)
                        // throw new Error("Server Error: Did not send any slots");
@@ -681,11 +1039,7 @@ final public class Table {
                        inserted = false;
                }
 
-               if (array.length != 0) {
-                       validateandupdate(array, true);
-               }
-
-               return inserted;
+               return new Pair<Boolean, Slot[]>(inserted, array);
        }
 
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
@@ -745,79 +1099,112 @@ final public class Table {
                createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
-       public boolean proccessAllNewCommits() {
-
+       private boolean proccessAllNewCommits() {
                // Process only if there are commit
                if (newCommitMap.keySet().size() == 0) {
                        return false;
                }
+               boolean didProcessNewCommit = false;
 
-               List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
-
-               // Sort from oldest to newest commit
-               Collections.sort(commitSeqNums);
+               for (Long arb : newCommitMap.keySet()) {
 
-               boolean didProcessNewCommit = false;
+                       List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
 
-               // Go through each new commit one by one
-               for (Long entrySeqNum : commitSeqNums) {
-                       Commit entry = newCommitMap.get(entrySeqNum);
+                       // Sort from oldest to newest commit
+                       Collections.sort(commitSeqNums);
 
-                       long lastCommitSeenSeqNum = -1;
+                       // Go through each new commit one by one
+                       for (Long entrySeqNum : commitSeqNums) {
+                               Commit entry = newCommitMap.get(arb).get(entrySeqNum);
 
-                       if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
-                               lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
-                       }
+                               long lastCommitSeenSeqNum = -1;
+                               if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
+                                       lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
+                               }
 
-                       if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
+                               if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
+                                       Map<Long, Commit> cm = commitMap.get(arb);
+                                       if (cm == null) {
+                                               cm = new HashMap<Long, Commit>();
+                                       }
 
-                               Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
+                                       Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
+                                       commitMap.put(arb, cm);
 
-                               if (prevCommit != null) {
-                                       prevCommit.setDead();
+                                       if (prevCommit != null) {
+                                               prevCommit.setDead();
 
-                                       for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
-                                               committedMapByKey.put(kv.getKey(), entry);
+                                               for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
+                                                       committedMapByKey.put(kv.getKey(), entry);
+                                               }
                                        }
+
+                                       continue;
                                }
 
-                               continue;
-                       }
+                               Set<Commit> commitsToEditSet = new HashSet<Commit>();
 
-                       Set<Commit> commitsToEditSet = new HashSet<Commit>();
+                               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                                       commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
+                               }
 
-                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
-                               commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
-                       }
+                               commitsToEditSet.remove(null);
 
-                       commitsToEditSet.remove(null);
+                               for (Commit prevCommit : commitsToEditSet) {
 
-                       for (Commit prevCommit : commitsToEditSet) {
+                                       Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
 
-                               Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+                                       if (!prevCommit.isLive()) {
+                                               Map<Long, Commit> cm = commitMap.get(arb);
 
-                               if (!prevCommit.isLive()) {
-                                       commitMap.remove(prevCommit.getTransSequenceNumber());
+                                               // remove it from the map so that it can be set as dead
+                                               if (cm != null) {
+                                                       cm.remove(prevCommit.getSequenceNumber());
+                                                       commitMap.put(arb, cm);
+                                               }
+                                       }
                                }
-                       }
 
-                       // Add the new commit
-                       commitMap.put(entry.getTransSequenceNumber(), entry);
-                       lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
-                       didProcessNewCommit = true;
+                               // Add the new commit
+                               Map<Long, Commit> cm = commitMap.get(arb);
+                               if (cm == null) {
+                                       cm = new HashMap<Long, Commit>();
+                               }
+                               cm.put(entry.getSequenceNumber(), entry);
+                               commitMap.put(arb, cm);
+
+                               lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
+
+                               // set the trans sequence number if we are able to
+                               if (entry.getTransSequenceNumber() != -1) {
+                                       lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+                               }
 
-                       // Update the committed table list
-                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
-                               IoTString key = kv.getKey();
-                               commitedTable.put(key, kv);
+                               didProcessNewCommit = true;
 
-                               committedMapByKey.put(key, entry);
+                               // Update the committed table list
+                               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                                       IoTString key = kv.getKey();
+                                       commitedTable.put(key, kv);
+                                       committedMapByKey.put(key, entry);
+                               }
                        }
                }
-
                // Clear the new commits storage so we can use it later
                newCommitMap.clear();
 
+               // go through all saved transactions and update the status of those that can be updated
+               for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
+                       Map.Entry<Long, TransactionStatus> entry = i.next();
+                       long seqnum = entry.getKey();
+                       TransactionStatus status = entry.getValue();
+
+                       if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) {
+                               status.setStatus(TransactionStatus.StatusCommitted);
+                               i.remove();
+                       }
+               }
+
                return didProcessNewCommit;
        }
 
@@ -827,8 +1214,11 @@ final public class Table {
                        Transaction prevtrans = i.next().getValue();
                        long transArb = prevtrans.getArbitrator();
 
-                       if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
-                               (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
+                       Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
+                       Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
+
+                       if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
+                               ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
                                i.remove();
                                prevtrans.setDead();
                        }
@@ -942,7 +1332,6 @@ final public class Table {
        private void updateExpectedSize() {
                expectedsize++;
                if (expectedsize > currmaxsize) {
-                       System.out.println("Maxing Out: " + expectedsize + "   " + currmaxsize);
                        expectedsize = currmaxsize;
                }
        }
@@ -1019,7 +1408,7 @@ final public class Table {
        private void processEntry(Transaction entry) {
 
                long arb = entry.getArbitrator();
-               Long comLast = lastCommitSeenSeqNumMap.get(arb);
+               Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
                Long abLast = lastAbortSeenSeqNumMap.get(arb);
 
                Transaction prevTrans = null;
@@ -1039,7 +1428,6 @@ final public class Table {
        }
 
        private void processEntry(Abort entry) {
-
                if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
                        // Abort has not been seen yet so we need to keep track of it
 
@@ -1047,18 +1435,32 @@ final public class Table {
                        if (prevAbort != null) {
                                prevAbort.setDead(); // delete old version of the duplicate
                        }
+
+                       if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+                               lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+                       }
                } else {
                        // The machine already saw this so it is dead
                        entry.setDead();
                }
 
-               if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
-                       lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+               // Update the status of the transaction and remove it since we are done with this transaction
+               TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
+               if (status != null) {
+                       status.setStatus(TransactionStatus.StatusAborted);
                }
        }
 
-       private void processEntry(Commit entry, Slot s) {
-               Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
+       private void processEntry(Commit entry) {
+               Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
+
+               if (arbMap == null) {
+                       arbMap = new HashMap<Long, Commit>();
+               }
+
+               Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
+               newCommitMap.put(entry.getTransArbitrator(), arbMap);
+
                if (prevCommit != null) {
                        prevCommit.setDead();
                }
@@ -1078,8 +1480,6 @@ final public class Table {
                if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
                        largestTableStatusSeen = newnumslots;
                }
-
-               // System.out.println("Table Stat: " + newnumslots + "   large: " + largestTableStatusSeen + "   small: " + smallestTableStatusSeen);
        }
 
        private void addWatchList(long machineid, RejectedMessage entry) {
@@ -1161,7 +1561,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeCommit:
-                               processEntry((Commit)entry, slot);
+                               processEntry((Commit)entry);
                                break;
 
                        case Entry.TypeAbort:
index cae1c398247bc7f78703070a45ddf205fd79a9f3..d2af002da0ec8bfb56c6eb418ba6057b421c596b 100644 (file)
@@ -1,5 +1,8 @@
 package iotcloud;
 
+import java.util.List;
+import java.util.ArrayList;
+
 /**
  * Test cases.
  * @author Brian Demsky
@@ -8,7 +11,7 @@ package iotcloud;
 
 public class Test {
 
-       public static final  int NUMBER_OF_TESTS = 15;
+       public static final  int NUMBER_OF_TESTS = 100;
 
        public static void main(String[] args)  throws ServerException {
                if (args[0].equals("2")) {
@@ -28,143 +31,35 @@ public class Test {
                } else if (args[0].equals("9")) {
                        test9();
                }
+               // else if (args[0].equals("10")) {
+               //      test10();
+               // }
        }
 
-       // static void test9()  throws ServerException {
-       //      // Setup the 2 clients
-       //      Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-       //      t1.initTable();
-       //      Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
-       //      t2.update();
-
-
-       //      // Make the Keys
-       //      System.out.println("Setting up keys");
-       //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-       //              String a = "a" + i;
-       //              String b = "b" + i;
-       //              String c = "c" + i;
-       //              String d = "d" + i;
-       //              IoTString ia = new IoTString(a);
-       //              IoTString ib = new IoTString(b);
-       //              IoTString ic = new IoTString(c);
-       //              IoTString id = new IoTString(d);
-       //              t1.createNewKey(ia, 321);
-       //              t1.createNewKey(ib, 351);
-       //              t2.createNewKey(ic, 321);
-       //              t2.createNewKey(id, 351);
-       //      }
-
-       //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-       //              String a = "a" + i;
-       //              String b = "b" + i;
-       //              String c = "c" + i;
-       //              String d = "d" + i;
-       //              IoTString ia = new IoTString(a);
-       //              IoTString ib = new IoTString(b);
-       //              IoTString ic = new IoTString(c);
-       //              IoTString id = new IoTString(d);
-       //              t1.createNewKey(ia, 1000);
-       //              t1.createNewKey(ib, 1000);
-       //              t2.createNewKey(ic, 1000);
-       //              t2.createNewKey(id, 1000);
-       //      }
-
-       //      System.out.println("Updating Clients...");
-       //      t1.update();
-       //      t2.update();
-       //      t1.update();
-       //      t2.update();
 
-       //      boolean foundError = false;
-
-       //      System.out.println("Checking Key-Values...");
-       //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-
-       //              String keyA = "a" + i;
-       //              String keyB = "b" + i;
-       //              String keyC = "c" + i;
-       //              String keyD = "d" + i;
-
-       //              IoTString iKeyA = new IoTString(keyA);
-       //              IoTString iKeyB = new IoTString(keyB);
-       //              IoTString iKeyC = new IoTString(keyC);
-       //              IoTString iKeyD = new IoTString(keyD);
-
-
-       //              Long testValA1 = t1.getArbitrator(iKeyA);
-       //              Long testValB1 = t1.getArbitrator(iKeyB);
-       //              Long testValC1 = t1.getArbitrator(iKeyC);
-       //              Long testValD1 = t1.getArbitrator(iKeyD);
-
-       //              Long testValA2 = t2.getArbitrator(iKeyA);
-       //              Long testValB2 = t2.getArbitrator(iKeyB);
-       //              Long testValC2 = t2.getArbitrator(iKeyC);
-       //              Long testValD2 = t2.getArbitrator(iKeyD);
-
-       //              if ((testValA1 == null) || (testValA1 != 321)) {
-       //                      System.out.println("Key-Value t1 incorrect: " + keyA + "    " + testValA1);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValB1 == null) || (testValB1 != 351)) {
-       //                      System.out.println("Key-Value t1 incorrect: " + keyB + "    " + testValB1);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValC1 == null) || (testValC1 != 321)) {
-       //                      System.out.println("Key-Value t1 incorrect: " + keyC + "    " + testValC1);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValD1 == null) || (testValD1 != 351)) {
-       //                      System.out.println("Key-Value t1 incorrect: " + keyD + "    " + testValD1);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValA2 == null) || (testValA2 != 321)) {
-       //                      System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValB2 == null) || (testValB2 != 351)) {
-       //                      System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValC2 == null) || (testValC2 != 321)) {
-       //                      System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
-       //                      foundError = true;
-       //              }
-
-       //              if ((testValD2 == null) || (testValD2 != 351)) {
-       //                      System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
-       //                      foundError = true;
-       //              }
-       //      }
-
-       //      if (foundError) {
-       //              System.out.println("Found Errors...");
-       //      } else {
-       //              System.out.println("No Errors Found...");
-       //      }
-       // }
-
-       // static void test8()  throws ServerException {
+       // static void test10() throws ServerException {
 
+       //      long startTime = 0;
+       //      long endTime = 0;
        //      boolean foundError = false;
 
        //      // Setup the 2 clients
-       //      Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+       //      Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
        //      t1.initTable();
-       //      Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+       //      Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
        //      t2.update();
 
-       //      // t1.rebuild();
-       //      // t2.rebuild();
+       //      if (t1.hasConnection()) {
+       //              System.out.println("Can see server");
+       //      }
+
+       //      LocalComm lc = new LocalComm(t1, t2);
+       //      t1.addLocalComm(t2.getId(), lc);
+       //      t2.addLocalComm(t1.getId(), lc);
 
        //      // Make the Keys
        //      System.out.println("Setting up keys");
+       //      startTime = System.currentTimeMillis();
        //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
        //              String a = "a" + i;
        //              String b = "b" + i;
@@ -179,92 +74,16 @@ public class Test {
        //              t2.createNewKey(ic, 321);
        //              t2.createNewKey(id, 351);
        //      }
+       //      endTime = System.currentTimeMillis();
+       //      System.out.println("Time Taken: " + (double)   ((endTime - startTime) / 1000.0)    );
+       //      System.out.println("Time Taken Per Key: " + (double)  (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4))   );
+       //      System.out.println();
 
 
        //      // Do Updates for the keys
        //      System.out.println("Setting Key-Values...");
-
-
-       //      String keyA0 = "a0";
-       //      String keyB0 = "b0";
-       //      String keyC0 = "c0";
-       //      String keyD0 = "d0";
-       //      String valueA0 = "a0";
-       //      String valueB0 = "b0";
-       //      String valueC0 = "c0";
-       //      String valueD0 = "d0";
-
-       //      IoTString iKeyA0 = new IoTString(keyA0);
-       //      IoTString iKeyB0 = new IoTString(keyB0);
-       //      IoTString iKeyC0 = new IoTString(keyC0);
-       //      IoTString iKeyD0 = new IoTString(keyD0);
-       //      IoTString iValueA0 = new IoTString(valueA0);
-       //      IoTString iValueB0 = new IoTString(valueB0);
-       //      IoTString iValueC0 = new IoTString(valueC0);
-       //      IoTString iValueD0 = new IoTString(valueD0);
-
-       //      t1.startTransaction();
-       //      t1.addKV( iKeyA0, iValueA0);
-       //      t1.commitTransaction();
-
-       //      t1.startTransaction();
-       //      t1.addKV(iKeyB0, iValueB0);
-       //      t1.commitTransaction();
-
-       //      t2.startTransaction();
-       //      t2.addKV(iKeyC0, iValueC0);
-       //      t2.commitTransaction();
-
-       //      t2.startTransaction();
-       //      t2.addKV(iKeyD0, iValueD0);
-       //      t2.commitTransaction();
-
-       //      for (int i = 1; i < NUMBER_OF_TESTS; i++) {
-       //              String keyB = "b" + i;
-       //              String valueB = "b" + i;
-       //              IoTString iKeyB = new IoTString(keyB);
-       //              IoTString iValueB = new IoTString(valueB);
-
-       //              String keyBOld = "b" + (i - 1);
-       //              String valueBOld = "b" + (i - 1);
-       //              IoTString iKeyBOld = new IoTString(keyBOld);
-       //              IoTString iValueBOld = new IoTString(valueBOld);
-
-
-       //              t1.startTransaction();
-       //              t1.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyBOld, iValueBOld, Guard.Equal))));
-       //              t1.addKV(iKeyB, iValueB);
-       //              t1.commitTransaction();
-       //      }
-
-       //      System.out.println("Checking Key-Values...");
-       //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-
-       //              String keyB = "b" + i;
-       //              String valueB = "b" + i;
-
-       //              IoTString iKeyB = new IoTString(keyB);
-       //              IoTString iValueB = new IoTString(valueB);
-
-       //              IoTString testValB1 = t1.getSpeculative(iKeyB);
-
-       //              if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
-       //                      System.out.println("Key-Value t1 incorrect: " + keyB);
-       //                      foundError = true;
-
-       //              }
-       //      }
-
-
-       //      System.out.println("Updating Clients...");
-       //      t1.update();
-       //      t2.update();
-       //      t1.update();
-       //      t2.update();
-
-       //      System.out.println("Checking Key-Values...");
+       //      startTime = System.currentTimeMillis();
        //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-
        //              String keyA = "a" + i;
        //              String keyB = "b" + i;
        //              String keyC = "c" + i;
@@ -283,271 +102,44 @@ public class Test {
        //              IoTString iValueC = new IoTString(valueC);
        //              IoTString iValueD = new IoTString(valueD);
 
-
-       //              IoTString testValA1 = t1.getCommitted(iKeyA);
-       //              IoTString testValB1 = t1.getCommitted(iKeyB);
-       //              IoTString testValC1 = t1.getCommitted(iKeyC);
-       //              IoTString testValD1 = t1.getCommitted(iKeyD);
-
-       //              IoTString testValA2 = t2.getCommitted(iKeyA);
-       //              IoTString testValB2 = t2.getCommitted(iKeyB);
-       //              IoTString testValC2 = t2.getCommitted(iKeyC);
-       //              IoTString testValD2 = t2.getCommitted(iKeyD);
-
-
-       //              if (i == 0) {
-       //                      if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyA);
-       //                              foundError = true;
-       //                      }
-
-       //                      if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyB);
-       //                              foundError = true;
-       //                      }
-
-       //                      if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyC);
-       //                              foundError = true;
-       //                      }
-
-       //                      if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyD);
-       //                              foundError = true;
-       //                      }
-
-
-       //                      if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
-       //                              foundError = true;
-       //                      }
-
-       //                      if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
-       //                              foundError = true;
-       //                      }
-
-       //                      if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
-       //                              foundError = true;
-       //                      }
-
-       //                      if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
-       //                              foundError = true;
-       //                      }
-       //              } else {
-       //                      if (testValA1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyA);
-       //                              foundError = true;
-       //                      }
-
-       //                      if (testValB1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyB);
-       //                              foundError = true;
-       //                      }
-
-       //                      if (testValC1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyC);
-       //                              foundError = true;
-       //                      }
-
-       //                      if (testValD1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyD);
-       //                              foundError = true;
-       //                      }
-
-       //                      if (testValA2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
-       //                              foundError = true;
+       //              while (true) {
+       //                      t1.startTransaction();
+       //                      t1.addKV(iKeyA, iValueA);
+       //                      if (t1.commitTransactionLocal()) {
+       //                              break;
        //                      }
+       //              }
 
-       //                      if (testValB2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
-       //                              foundError = true;
+       //              while (true) {
+       //                      t1.startTransaction();
+       //                      t1.addKV(iKeyB, iValueB);
+       //                      if (t1.commitTransactionLocal()) {
+       //                              break;
        //                      }
+       //              }
 
-       //                      if (testValC2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
-       //                              foundError = true;
+       //              while (true) {
+       //                      t2.startTransaction();
+       //                      t2.addKV(iKeyC, iValueC);
+       //                      if (t2.commitTransactionLocal()) {
+       //                              break;
        //                      }
+       //              }
 
-       //                      if (testValD2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
-       //                              foundError = true;
+       //              while (true) {
+       //                      t2.startTransaction();
+       //                      t2.addKV(iKeyD, iValueD);
+       //                      if (t2.commitTransactionLocal()) {
+       //                              break;
        //                      }
        //              }
        //      }
-
-       //      if (foundError) {
-       //              System.out.println("Found Errors...");
-       //      } else {
-       //              System.out.println("No Errors Found...");
-       //      }
-       // }
-
-       // static void test7()  throws ServerException {
-
-       //      long startTime = 0;
-       //      long endTime = 0;
-
-       //      // Setup the 2 clients
-       //      Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-       //      t1.initTable();
-       //      Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
-       //      t2.update();
-
-       //      // t1.rebuild();
-       //      // t2.rebuild();
-
-       //      // Make the Keys
-       //      System.out.println("Setting up keys");
-       //      startTime = System.currentTimeMillis();
-       //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-       //              String a = "a" + i;
-       //              String b = "b" + i;
-       //              String c = "c" + i;
-       //              String d = "d" + i;
-       //              IoTString ia = new IoTString(a);
-       //              IoTString ib = new IoTString(b);
-       //              IoTString ic = new IoTString(c);
-       //              IoTString id = new IoTString(d);
-       //              t1.createNewKey(ia, 321);
-       //              t1.createNewKey(ib, 351);
-       //              t2.createNewKey(ic, 321);
-       //              t2.createNewKey(id, 351);
-       //      }
        //      endTime = System.currentTimeMillis();
-
-       //      System.out.println("Time Taken: " + (double)   ((endTime - startTime) / 1000.0)    );
-       //      System.out.println("Time Taken Per Key: " + (double)  (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4))   );
-       //      System.out.println();
-
-
-       //      // Do Updates for the keys
-       //      System.out.println("Setting Key-Values...");
-       //      startTime = System.currentTimeMillis();
-
-
-
-       //      String keyA0 = "a0";
-       //      String keyB0 = "b0";
-       //      String keyC0 = "c0";
-       //      String keyD0 = "d0";
-       //      String valueA0 = "a0";
-       //      String valueB0 = "b0";
-       //      String valueC0 = "c0";
-       //      String valueD0 = "d0";
-
-       //      IoTString iKeyA0 = new IoTString(keyA0);
-       //      IoTString iKeyB0 = new IoTString(keyB0);
-       //      IoTString iKeyC0 = new IoTString(keyC0);
-       //      IoTString iKeyD0 = new IoTString(keyD0);
-       //      IoTString iValueA0 = new IoTString(valueA0);
-       //      IoTString iValueB0 = new IoTString(valueB0);
-       //      IoTString iValueC0 = new IoTString(valueC0);
-       //      IoTString iValueD0 = new IoTString(valueD0);
-
-       //      t1.startTransaction();
-       //      t1.addKV( iKeyA0, iValueA0);
-       //      t1.commitTransaction();
-
-       //      t1.startTransaction();
-       //      t1.addKV(iKeyB0, iValueB0);
-       //      t1.commitTransaction();
-
-       //      t2.startTransaction();
-       //      t2.addKV(iKeyC0, iValueC0);
-       //      t2.commitTransaction();
-
-       //      t2.startTransaction();
-       //      t2.addKV(iKeyD0, iValueD0);
-       //      t2.commitTransaction();
-
-       //      for (int i = 1; i < NUMBER_OF_TESTS; i++) {
-       //              String keyB = "b" + i;
-       //              String valueB = "b" + i;
-       //              IoTString iKeyB = new IoTString(keyB);
-       //              IoTString iValueB = new IoTString(valueB);
-
-       //              String keyBOld = "b" + (i - 1);
-       //              String valueBOld = "b" + (i - 2);
-       //              IoTString iKeyBOld = new IoTString(keyBOld);
-       //              IoTString iValueBOld = new IoTString(valueBOld);
-
-
-       //              t1.startTransaction();
-       //              t1.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyBOld, iValueBOld, Guard.Equal))));
-       //              t1.addKV(iKeyB, iValueB);
-       //              t1.commitTransaction();
-       //      }
-
-       //      for (int i = 1; i < NUMBER_OF_TESTS; i++) {
-       //              String keyC = "c" + i;
-       //              String valueC = "c" + i;
-       //              IoTString iKeyC = new IoTString(keyC);
-       //              IoTString iValueC = new IoTString(valueC);
-
-       //              String keyCOld = "c" + (i - 1);
-       //              String valueCOld = "c" + (i - 2);
-       //              IoTString iKeyCOld = new IoTString(keyCOld);
-       //              IoTString iValueCOld = new IoTString(valueCOld);
-
-
-       //              t2.startTransaction();
-       //              t2.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyCOld, iValueCOld, Guard.Equal))));
-       //              t2.addKV(iKeyC, iValueC);
-       //              t2.commitTransaction();
-       //      }
-
-       //      for (int i = 1; i < NUMBER_OF_TESTS; i++) {
-       //              String keyA = "a" + i;
-       //              String keyD = "d" + i;
-       //              String valueA = "a" + i;
-       //              String valueD = "d" + i;
-
-       //              IoTString iKeyA = new IoTString(keyA);
-       //              IoTString iKeyD = new IoTString(keyD);
-       //              IoTString iValueA = new IoTString(valueA);
-       //              IoTString iValueD = new IoTString(valueD);
-
-
-       //              String keyAOld = "a" + (i - 1);
-       //              String keyDOld = "d" + (i - 1);
-       //              String valueAOld = "a" + (i - 2);
-       //              String valueDOld = "d" + (i - 2);
-       //              IoTString iKeyAOld = new IoTString(keyAOld);
-       //              IoTString iKeyDOld = new IoTString(keyDOld);
-       //              IoTString iValueAOld = new IoTString(valueAOld);
-       //              IoTString iValueDOld = new IoTString(valueDOld);
-
-
-       //              t1.startTransaction();
-       //              t1.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyAOld, iValueAOld, Guard.Equal))));
-       //              t1.addKV(iKeyA, iValueA);
-       //              t1.commitTransaction();
-
-       //              t2.startTransaction();
-       //              t2.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyDOld, iValueDOld, Guard.Equal))));
-       //              t2.addKV(iKeyD, iValueD);
-       //              t2.commitTransaction();
-       //      }
-
-       //      endTime = System.currentTimeMillis();
-
        //      System.out.println("Time Taken: " + (double)   ((endTime - startTime) / 1000.0)    );
        //      System.out.println("Time Taken Per Update: " + (double)  (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4))   );
        //      System.out.println();
 
 
-       //      System.out.println("Updating Clients...");
-       //      t1.update();
-       //      t2.update();
-       //      t1.update();
-       //      t2.update();
-
-       //      boolean foundError = false;
-
        //      System.out.println("Checking Key-Values...");
        //      for (int i = 0; i < NUMBER_OF_TESTS; i++) {
 
@@ -580,96 +172,146 @@ public class Test {
        //              IoTString testValC2 = t2.getCommitted(iKeyC);
        //              IoTString testValD2 = t2.getCommitted(iKeyD);
 
+       //              if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+       //                      System.out.println("Key-Value t1 incorrect: " + keyA);
+       //                      foundError = true;
+       //              }
 
-       //              if (i == 0) {
-       //                      if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyA);
-       //                              foundError = true;
-       //                      }
+       //              if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
+       //                      System.out.println("Key-Value t1 incorrect: " + keyB);
+       //                      foundError = true;
+       //              }
 
-       //                      if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyB);
-       //                              foundError = true;
-       //                      }
+       //              if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
+       //                      System.out.println("Key-Value t1 incorrect: " + keyC);
+       //                      foundError = true;
+       //              }
 
-       //                      if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyC);
-       //                              foundError = true;
-       //                      }
+       //              if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+       //                      System.out.println("Key-Value t1 incorrect: " + keyD);
+       //                      foundError = true;
+       //              }
 
-       //                      if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyD);
-       //                              foundError = true;
-       //                      }
 
+       //              if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+       //                      System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
+       //                      foundError = true;
+       //              }
 
-       //                      if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
-       //                              foundError = true;
-       //                      }
+       //              if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
+       //                      System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
+       //                      foundError = true;
+       //              }
 
-       //                      if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
-       //                              foundError = true;
-       //                      }
+       //              if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
+       //                      System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
+       //                      foundError = true;
+       //              }
 
-       //                      if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
-       //                              foundError = true;
-       //                      }
+       //              if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+       //                      System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
+       //                      foundError = true;
+       //              }
+       //      }
 
-       //                      if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
-       //                              foundError = true;
-       //                      }
-       //              } else {
-       //                      if (testValA1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyA);
-       //                              foundError = true;
-       //                      }
 
-       //                      if (testValB1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyB);
-       //                              foundError = true;
-       //                      }
+       //      // System.out.println("Updating Clients...");
+       //      // t1.update();
+       //      // t2.update();
+       //      // t1.update();
+       //      // t2.update();
 
-       //                      if (testValC1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyC);
-       //                              foundError = true;
-       //                      }
 
-       //                      if (testValD1 != null) {
-       //                              System.out.println("Key-Value t1 incorrect: " + keyD);
-       //                              foundError = true;
-       //                      }
 
-       //                      if (testValA2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
-       //                              foundError = true;
-       //                      }
+       //      // System.out.println("Checking Key-Values...");
+       //      // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
 
-       //                      if (testValB2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
-       //                              foundError = true;
-       //                      }
+       //      //      String keyA = "a" + i;
+       //      //      String keyB = "b" + i;
+       //      //      String keyC = "c" + i;
+       //      //      String keyD = "d" + i;
+       //      //      String valueA = "a" + i;
+       //      //      String valueB = "b" + i;
+       //      //      String valueC = "c" + i;
+       //      //      String valueD = "d" + i;
 
-       //                      if (testValC2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
-       //                              foundError = true;
-       //                      }
+       //      //      IoTString iKeyA = new IoTString(keyA);
+       //      //      IoTString iKeyB = new IoTString(keyB);
+       //      //      IoTString iKeyC = new IoTString(keyC);
+       //      //      IoTString iKeyD = new IoTString(keyD);
+       //      //      IoTString iValueA = new IoTString(valueA);
+       //      //      IoTString iValueB = new IoTString(valueB);
+       //      //      IoTString iValueC = new IoTString(valueC);
+       //      //      IoTString iValueD = new IoTString(valueD);
+
+
+       //      //      IoTString testValA1 = t1.getCommitted(iKeyA);
+       //      //      IoTString testValB1 = t1.getCommitted(iKeyB);
+       //      //      IoTString testValC1 = t1.getCommitted(iKeyC);
+       //      //      IoTString testValD1 = t1.getCommitted(iKeyD);
+
+       //      //      IoTString testValA2 = t2.getCommitted(iKeyA);
+       //      //      IoTString testValB2 = t2.getCommitted(iKeyB);
+       //      //      IoTString testValC2 = t2.getCommitted(iKeyC);
+       //      //      IoTString testValD2 = t2.getCommitted(iKeyD);
+
+       //      //      if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+       //      //              System.out.println("Key-Value t1 incorrect: " + keyA);
+       //      //              foundError = true;
+       //      //      }
+
+       //      //      if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
+       //      //              System.out.println("Key-Value t1 incorrect: " + keyB);
+       //      //              foundError = true;
+       //      //      }
+
+       //      //      if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
+       //      //              System.out.println("Key-Value t1 incorrect: " + keyC);
+       //      //              foundError = true;
+       //      //      }
+
+       //      //      if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+       //      //              System.out.println("Key-Value t1 incorrect: " + keyD);
+       //      //              foundError = true;
+       //      //      }
+
+
+       //      //      if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+       //      //              System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
+       //      //              foundError = true;
+       //      //      }
+
+       //      //      if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
+       //      //              System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
+       //      //              foundError = true;
+       //      //      }
+
+       //      //      if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
+       //      //              System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
+       //      //              foundError = true;
+       //      //      }
+
+       //      //      if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+       //      //              System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
+       //      //              foundError = true;
+       //      //      }
+       //      // }
 
-       //                      if (testValD2 != null) {
-       //                              System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
-       //                              foundError = true;
-       //                      }
-       //              }
-       //      }
 
        //      if (foundError) {
        //              System.out.println("Found Errors...");
        //      } else {
        //              System.out.println("No Errors Found...");
        //      }
+
+
+       //      System.out.println();
+       //      System.out.println();
+       //      System.out.println();
+       //      t1.printSlots();
+       //      System.out.println();
+       //      System.out.println();
+       //      t2.printSlots();
        // }
 
        static void test9()  {
@@ -677,7 +319,7 @@ public class Test {
                boolean foundError = false;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
 
                while (true) {
                        try {
@@ -686,7 +328,7 @@ public class Test {
                        } catch (Exception e) {}
                }
 
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
 
                while (true) {
                        try {
@@ -1019,7 +661,7 @@ public class Test {
 
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                while (true) {
                        try {
                                t1.initTable();
@@ -1027,7 +669,7 @@ public class Test {
                        } catch (Exception e) {}
                }
 
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                while (true) {
                        try {
                                t2.update();
@@ -1305,9 +947,9 @@ public class Test {
                boolean foundError = false;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
                startTime = System.currentTimeMillis();
@@ -1537,9 +1179,9 @@ public class Test {
                long endTime = 0;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
 
@@ -1723,9 +1365,9 @@ public class Test {
                boolean foundError = false;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
 
@@ -1942,9 +1584,9 @@ public class Test {
                long endTime = 0;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
 
@@ -2117,9 +1759,9 @@ public class Test {
                boolean foundError = false;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
 
@@ -2296,11 +1938,13 @@ public class Test {
                long endTime = 0;
 
                // Setup the 2 clients
-               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
+               List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
                // Make the Keys
                System.out.println("Setting up keys");
                startTime = System.currentTimeMillis();
@@ -2319,9 +1963,6 @@ public class Test {
                        t2.createNewKey(id, 351);
                }
                endTime = System.currentTimeMillis();
-
-
-
                System.out.println("Time Taken: " + (double)   ((endTime - startTime) / 1000.0)    );
                System.out.println("Time Taken Per Key: " + (double)  (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4))   );
                System.out.println();
@@ -2351,22 +1992,21 @@ public class Test {
 
                        t1.startTransaction();
                        t1.addKV(iKeyA, iValueA);
-                       t1.commitTransaction();
+                       transStatusList.add(t1.commitTransaction());
 
                        t1.startTransaction();
                        t1.addKV(iKeyB, iValueB);
-                       t1.commitTransaction();
+                       transStatusList.add(t1.commitTransaction());
 
                        t2.startTransaction();
                        t2.addKV(iKeyC, iValueC);
-                       t2.commitTransaction();
+                       transStatusList.add(t2.commitTransaction());
 
                        t2.startTransaction();
                        t2.addKV(iKeyD, iValueD);
-                       t2.commitTransaction();
+                       transStatusList.add(t2.commitTransaction());
                }
                endTime = System.currentTimeMillis();
-
                System.out.println("Time Taken: " + (double)   ((endTime - startTime) / 1000.0)    );
                System.out.println("Time Taken Per Update: " + (double)  (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4))   );
                System.out.println();
@@ -2454,10 +2094,25 @@ public class Test {
                        }
                }
 
+               for (TransactionStatus status : transStatusList) {
+                       if (status.getStatus() != TransactionStatus.StatusCommitted) {
+                               foundError = true;
+                       }
+               }
+
                if (foundError) {
                        System.out.println("Found Errors...");
                } else {
                        System.out.println("No Errors Found...");
                }
+
+
+               System.out.println();
+               System.out.println();
+               System.out.println();
+               t1.printSlots();
+               System.out.println();
+               System.out.println();
+               t2.printSlots();
        }
 }
index 72aed0345b8ef0a47018b7635ab4c63121ae062c..90ef4d4fc385b269308cbc6d0716da9ece60ad96 100644 (file)
@@ -47,6 +47,7 @@ class Transaction extends Entry {
         return seqnum;
     }
 
+
     public Set<KeyValue> getkeyValueUpdateSet() {
         return keyValueUpdateSet;
     }
@@ -59,24 +60,26 @@ class Transaction extends Entry {
         for (KeyValue kvGuard : keyValueGuardSet) {
 
             // First check if the key is in the speculative table, this is the value of the latest assumption
-            KeyValue kv = keyValTableSpeculative.get(kvGuard.getKey());
+            KeyValue kv = null;
+
+            // If we have a speculation table then use it first
+            if (keyValTableSpeculative != null) {
+                kv = keyValTableSpeculative.get(kvGuard.getKey());
+            }
 
             if (kv == null) {
 
                 // if it is not in the speculative table then check the committed table and use that
                 // value as our latest assumption
                 kv = keyValTableCommitted.get(kvGuard.getKey());
-                // System.out.println("Replaced With Commit Table");
             }
 
             if (kvGuard.getValue() != null) {
                 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
-                    // System.out.println("Fail 1       " + (kv == null) + "   " + kvGuard.getValue() + "    " + kv.getValue());
                     return false;
                 }
             } else {
                 if (kv != null) {
-                    // System.out.println("Fail 2    " + kv.getValue());
                     return false;
                 }
             }
diff --git a/version2/src/java/iotcloud/TransactionStatus.java b/version2/src/java/iotcloud/TransactionStatus.java
new file mode 100644 (file)
index 0000000..a42f570
--- /dev/null
@@ -0,0 +1,53 @@
+package iotcloud;
+
+class TransactionStatus {
+    static final byte StatusAborted = 1;
+    static final byte StatusPending = 2;
+    static final byte StatusCommitted = 3;
+    // static final byte StatusRetrying = 4;
+    static final byte StatusSent = 5;
+    static final byte StatusNoEffect = 6;
+
+    private byte status = 0;
+    private boolean applicationReleased = false;
+    private long arbitrator = 0;
+    private boolean wasSentInChain = false;
+
+    public TransactionStatus(byte _status, long _arbitrator) {
+        status = _status;
+        arbitrator = _arbitrator;
+    }
+
+    public byte getStatus() {
+        return status;
+    }
+
+    public void setStatus(byte _status) {
+        status = _status;
+    }
+
+    public void setSentTransaction() {
+        wasSentInChain = true;
+    }
+
+    public boolean getSentTransaction() {
+        return wasSentInChain;
+    }
+
+
+    // public void setArbitrator(long _arbitrator) {
+    //     arbitrator = _arbitrator;
+    // }
+
+    public long getArbitrator() {
+        return arbitrator;
+    }
+
+    public void release() {
+        applicationReleased = true;
+    }
+
+    public boolean getReleased() {
+        return applicationReleased;
+    }
+}
\ No newline at end of file