Changed way Guard works, Sped up code
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index d42679d8add869edf9273074f6dd7b4e2f3965ae..4f0fe9551e1f817ed87f18dfe9dbb1ef6949e8f4 100644 (file)
@@ -39,7 +39,7 @@ final public class Table {
        private TableStatus lastTableStatus;
        static final int FREE_SLOTS = 10; //number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
-       private long liveslotcount = 0; 
+       private long liveslotcount = 0;
        private int chance;
        static final double RESIZE_MULTIPLE = 1.2;
        static final double RESIZE_THRESHOLD = 0.75;
@@ -49,12 +49,15 @@ final public class Table {
        private Random random = new Random();
        private long lastUncommittedTransaction = 0;
 
+       private int smallestTableStatusSeen = -1;
+       private int largestTableStatusSeen = -1;
+
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
        private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
        private Map<Long, Abort> abortMap = null; // Set of the live aborts
-       private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV   
-       private  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV    
+       private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
+       private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
        private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
        private Map<Long, Transaction> uncommittedTransactionsMap = null;
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
@@ -103,12 +106,12 @@ final public class Table {
                lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
        }
 
-       public void rebuild() {
+       public void rebuild() throws ServerException {
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
                validateandupdate(newslots, true);
        }
 
-       // TODO: delete method
+       // // TODO: delete method
        // public void printSlots() {
        //      long o = buffer.getOldestSeqNum();
        //      long n = buffer.getNewestSeqNum();
@@ -166,11 +169,59 @@ final public class Table {
                }
        }
 
+       public IoTString getCommittedAtomic(IoTString key) {
+               KeyValue kv = commitedTable.get(key);
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               if (kv != null) {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+                       return kv.getValue();
+               } else {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, null));
+                       return null;
+               }
+       }
+
+       public IoTString getSpeculativeAtomic(IoTString key) {
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               KeyValue kv = speculativeTable.get(key);
+               if (kv == null) {
+                       kv = commitedTable.get(key);
+               }
+
+               if (kv != null) {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+                       return kv.getValue();
+               } else {
+                       pendingTransBuild.addKVGuard(new KeyValue(key, null));
+                       return null;
+               }
+       }
+
        public Long getArbitrator(IoTString key) {
                return arbitratorTable.get(key);
        }
 
-       public void initTable() {
+       public void initTable() throws ServerException {
                cloud.setSalt();//Set the salt
                Slot s = new Slot(this, 1, localmachineid);
                TableStatus status = new TableStatus(s, numslots);
@@ -204,7 +255,7 @@ final public class Table {
                pendingTransBuild = new PendingTransaction();
        }
 
-       public void commitTransaction() {
+       public void commitTransaction() throws ServerException {
 
                if (pendingTransBuild.getKVUpdates().size() == 0) {
                        // If no updates are made then there is no point inserting into the chain
@@ -214,6 +265,9 @@ final public class Table {
                // Add the pending transaction to the queue
                pendingTransQueue.add(pendingTransBuild);
 
+               // Delete since already inserted
+               pendingTransBuild = new PendingTransaction();
+
                while (!pendingTransQueue.isEmpty()) {
                        if (tryput( pendingTransQueue.peek(), false)) {
                                pendingTransQueue.poll();
@@ -230,85 +284,93 @@ final public class Table {
                // Make sure new key value pair matches the current arbitrator
                if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
                        // TODO: Maybe not throw en error
-                       throw new Error("Not all Key Values Match.");
+                       throw new Error("Not all Key Values Match Arbitrator.");
                }
 
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
 
-       public void addGuard(Guard guard) {
-               pendingTransBuild.addGuard(guard);
-       }
-
-       public void update() {
+       public void update()  throws ServerException {
 
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
 
                validateandupdate(newslots, false);
 
-               if (uncommittedTransactionsMap.keySet().size() > 0) {
 
-                       boolean doEnd = false;
-                       boolean needResize = false;
-                       while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
-                               boolean resize = needResize;
-                               needResize = false;
+               if (!pendingTransQueue.isEmpty()) {
+                       // We have a pending transaction so do full insertion
 
-                               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
-                               int newsize = 0;
-                               if (liveslotcount > resizethreshold) {
-                                       resize = true; //Resize is forced
+                       while (!pendingTransQueue.isEmpty()) {
+                               if (tryput( pendingTransQueue.peek(), false)) {
+                                       pendingTransQueue.poll();
                                }
+                       }
+               } else {
+                       // We dont have a pending transaction so do minimal effort
+                       if (uncommittedTransactionsMap.keySet().size() > 0) {
+
+                               boolean doEnd = false;
+                               boolean needResize = false;
+                               while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
+                                       boolean resize = needResize;
+                                       needResize = false;
+
+                                       Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+                                       int newsize = 0;
+                                       if (liveslotcount > resizethreshold) {
+                                               resize = true; //Resize is forced
+                                       }
 
-                               if (resize) {
-                                       newsize = (int) (numslots * RESIZE_MULTIPLE);
-                                       TableStatus status = new TableStatus(s, newsize);
-                                       s.addEntry(status);
-                               }
+                                       if (resize) {
+                                               newsize = (int) (numslots * RESIZE_MULTIPLE);
+                                               TableStatus status = new TableStatus(s, newsize);
+                                               s.addEntry(status);
+                                       }
 
-                               doRejectedMessages(s);
+                                       doRejectedMessages(s);
 
-                               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+                                       ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
 
-                               // Resize was needed so redo call
-                               if (retTup.getFirst()) {
-                                       needResize = true;
-                                       continue;
-                               }
+                                       // Resize was needed so redo call
+                                       if (retTup.getFirst()) {
+                                               needResize = true;
+                                               continue;
+                                       }
 
-                               // Extract working variables
-                               boolean seenliveslot = retTup.getSecond();
-                               long seqn = retTup.getThird();
+                                       // Extract working variables
+                                       boolean seenliveslot = retTup.getSecond();
+                                       long seqn = retTup.getThird();
 
-                               // Did need to arbitrate
-                               doEnd = !doArbitration(s);
+                                       // Did need to arbitrate
+                                       doEnd = !doArbitration(s);
 
-                               doOptionalRescue(s, seenliveslot, seqn, resize);
+                                       doOptionalRescue(s, seenliveslot, seqn, resize);
 
-                               int max = 0;
-                               if (resize) {
-                                       max = newsize;
-                               }
+                                       int max = 0;
+                                       if (resize) {
+                                               max = newsize;
+                                       }
 
-                               Slot[] array = cloud.putSlot(s, max);
-                               if (array == null) {
-                                       array = new Slot[] {s};
-                                       rejectedmessagelist.clear();
-                               }       else {
-                                       if (array.length == 0)
-                                               throw new Error("Server Error: Did not send any slots");
-                                       rejectedmessagelist.add(s.getSequenceNumber());
-                                       doEnd = false;
-                               }
+                                       Slot[] array = cloud.putSlot(s, max);
+                                       if (array == null) {
+                                               array = new Slot[] {s};
+                                               rejectedmessagelist.clear();
+                                       }       else {
+                                               if (array.length == 0)
+                                                       throw new Error("Server Error: Did not send any slots");
+                                               rejectedmessagelist.add(s.getSequenceNumber());
+                                               doEnd = false;
+                                       }
 
-                               /* update data structure */
-                               validateandupdate(array, true);
+                                       /* update data structure */
+                                       validateandupdate(array, true);
+                               }
                        }
                }
        }
 
-       public boolean createNewKey(IoTString keyName, long machineId) {
+       public boolean createNewKey(IoTString keyName, long machineId)  throws ServerException {
 
                while (true) {
                        if (arbitratorTable.get(keyName) != null) {
@@ -332,7 +394,7 @@ final public class Table {
                resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
        }
 
-       private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
+       private boolean tryput(PendingTransaction pendingTrans, boolean resize)  throws ServerException {
                Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
 
                int newsize = 0;
@@ -359,7 +421,6 @@ final public class Table {
                boolean seenliveslot = retTup.getSecond();
                long seqn = retTup.getThird();
 
-
                doArbitration(s);
 
                Transaction trans = new Transaction(s,
@@ -367,7 +428,7 @@ final public class Table {
                                                    localmachineid,
                                                    pendingTrans.getArbitrator(),
                                                    pendingTrans.getKVUpdates(),
-                                                   pendingTrans.getGuard());
+                                                   pendingTrans.getKVGuard());
                boolean insertedTrans = false;
                if (s.hasSpace(trans)) {
                        s.addEntry(trans);
@@ -378,7 +439,7 @@ final public class Table {
                return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
        }
 
-       private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
+       private boolean tryput(IoTString keyName, long arbMachineid, boolean resize)  throws ServerException {
                Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
                int newsize = 0;
                if (liveslotcount > resizethreshold) {
@@ -403,7 +464,6 @@ final public class Table {
                boolean seenliveslot = retTup.getSecond();
                long seqn = retTup.getThird();
 
-
                doArbitration(s);
 
                NewKey newKey = new NewKey(s, keyName, arbMachineid);
@@ -498,21 +558,18 @@ final public class Table {
 
        private boolean doArbitration(Slot s) {
                // Arbitrate
-               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-
+               Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
                List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
 
                // Sort from oldest to newest
                Collections.sort(transSeqNums);
 
-
                boolean didNeedArbitration = false;
                for (Long transNum : transSeqNums) {
                        Transaction ut = uncommittedTransactionsMap.get(transNum);
 
-                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
                        // Check if this machine arbitrates for this transaction
-                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                       if (ut.getArbitrator() != localmachineid ) {
                                continue;
                        }
 
@@ -521,25 +578,21 @@ final public class Table {
 
                        Entry newEntry = null;
 
-                       try {
-                               if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
-                                       // Guard evaluated as true
+                       if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
+                               // Guard evaluated as true
 
-                                       // update the local tmp current key set
-                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-                                               speculativeTableTmp.put(kv.getKey(), kv);
-                                       }
+                               // update the local tmp current key set
+                               for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                       speculativeTableTmp.put(kv.getKey(), kv);
+                               }
 
-                                       // create the commit
-                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
-                               } else {
-                                       // Guard was false
+                               // create the commit
+                               newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
+                       } else {
+                               // Guard was false
 
-                                       // create the abort
-                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
-                               }
-                       } catch (Exception e) {
-                               e.printStackTrace();
+                               // create the abort
+                               newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
                        }
 
                        if ((newEntry != null) && s.hasSpace(newEntry)) {
@@ -581,7 +634,7 @@ final public class Table {
                }
        }
 
-       private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
+       private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
                int max = 0;
                if (resize)
                        max = newsize;
@@ -605,6 +658,10 @@ final public class Table {
                         before decoding */
                if (newslots.length == 0) return;
 
+               // Reset the table status declared sizes
+               smallestTableStatusSeen = -1;
+               largestTableStatusSeen = -1;
+
                long firstseqnum = newslots[0].getSequenceNumber();
                if (firstseqnum <= sequencenumber) {
                        throw new Error("Server Error: Sent older slots!");
@@ -615,14 +672,12 @@ final public class Table {
 
                HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
 
-               initExpectedSize(firstseqnum);
+               // initExpectedSize(firstseqnum);
                for (Slot slot : newslots) {
                        processSlot(indexer, slot, acceptupdatestolocal, machineSet);
-                       updateExpectedSize();
+                       // updateExpectedSize();
                }
 
-
-               boolean hasGap = false;
                /* If there is a gap, check to see if the server sent us everything. */
                if (firstseqnum != (sequencenumber + 1)) {
 
@@ -633,6 +688,7 @@ final public class Table {
                        }
                }
 
+
                commitNewMaxSize();
 
                /* Commit new to slots. */
@@ -766,38 +822,34 @@ final public class Table {
        private void createSpeculativeTable() {
 
                if (uncommittedTransactionsMap.keySet().size() == 0) {
-                       speculativeTable = commitedTable; // Ok that they are the same object
+                       //      speculativeTable = commitedTable; // Ok that they are the same object
                        return;
                }
 
-               Map speculativeTableTmp = null;
+               Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
                List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
 
                // Sort from oldest to newest commit
                Collections.sort(utSeqNums);
 
                if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
-                       speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+
+                       speculativeTable.clear();
+                       lastUncommittedTransaction = -1;
 
                        for (Long key : utSeqNums) {
                                Transaction trans = uncommittedTransactionsMap.get(key);
 
                                lastUncommittedTransaction = key;
 
-                               try {
-                                       if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
-                                               for (KeyValue kv : trans.getkeyValueUpdateSet()) {
-                                                       speculativeTableTmp.put(kv.getKey(), kv);
-                                               }
+                               if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
+                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
                                        }
-
-                               } catch (Exception e) {
-                                       e.printStackTrace();
                                }
+
                        }
                } else {
-                       speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
-
                        for (Long key : utSeqNums) {
 
                                if (key <= lastUncommittedTransaction) {
@@ -808,28 +860,41 @@ final public class Table {
 
                                Transaction trans = uncommittedTransactionsMap.get(key);
 
-                               try {
-                                       if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
-                                               for (KeyValue kv : trans.getkeyValueUpdateSet()) {
-                                                       speculativeTableTmp.put(kv.getKey(), kv);
-                                               }
+                               if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
+                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
                                        }
-
-                               } catch (Exception e) {
-                                       e.printStackTrace();
                                }
                        }
                }
 
-               speculativeTable = speculativeTableTmp;
+               for (IoTString key : speculativeTableTmp.keySet()) {
+                       speculativeTable.put(key, speculativeTableTmp.get(key));
+               }
+
+               // speculativeTable = speculativeTableTmp;
        }
 
        private int expectedsize, currmaxsize;
 
        private void checkNumSlots(int numslots) {
-               if (numslots != expectedsize) {
-                       throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
+
+
+               // We only have 1 size so we must have this many slots
+               if (largestTableStatusSeen == smallestTableStatusSeen) {
+                       if (numslots != smallestTableStatusSeen) {
+                               throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numslots);
+                       }
+               } else {
+                       // We have more than 1
+                       if (numslots < smallestTableStatusSeen) {
+                               throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
+                       }
                }
+
+               // if (numslots != expectedsize) {
+               // throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
+               // }
        }
 
        private void initExpectedSize(long firstsequencenumber) {
@@ -841,6 +906,7 @@ final public class Table {
        private void updateExpectedSize() {
                expectedsize++;
                if (expectedsize > currmaxsize) {
+                       System.out.println("Maxing Out: " + expectedsize + "   " + currmaxsize);
                        expectedsize = currmaxsize;
                }
        }
@@ -850,6 +916,13 @@ final public class Table {
        }
 
        private void commitNewMaxSize() {
+
+               if (largestTableStatusSeen == -1) {
+                       currmaxsize = numslots;
+               } else {
+                       currmaxsize = largestTableStatusSeen;
+               }
+
                if (numslots != currmaxsize) {
                        buffer.resize(currmaxsize);
                }
@@ -908,7 +981,20 @@ final public class Table {
        }
 
        private void processEntry(Transaction entry) {
-               Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+
+               long arb = entry.getArbitrator();
+               Long comLast = lastCommitSeenSeqNumMap.get(arb);
+               Long abLast = lastAbortSeenSeqNumMap.get(arb);
+
+               Transaction prevTrans = null;
+
+               if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
+                       prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+               } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
+                       prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+               } else {
+                       prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+               }
 
                // Duplicate so delete old copy
                if (prevTrans != null) {
@@ -930,7 +1016,9 @@ final public class Table {
                        entry.setDead();
                }
 
-               lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+               if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+                       lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+               }
        }
 
        private void processEntry(Commit entry, Slot s) {
@@ -942,10 +1030,20 @@ final public class Table {
 
        private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
-               updateCurrMaxSize(newnumslots);
+               // updateCurrMaxSize(newnumslots);
                if (lastTableStatus != null)
                        lastTableStatus.setDead();
                lastTableStatus = entry;
+
+               if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
+                       smallestTableStatusSeen = newnumslots;
+               }
+
+               if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
+                       largestTableStatusSeen = newnumslots;
+               }
+
+               // System.out.println("Table Stat: " + newnumslots + "   large: " + largestTableStatusSeen + "   small: " + smallestTableStatusSeen);
        }
 
        private void addWatchList(long machineid, RejectedMessage entry) {