Fixing Bugs
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index b0b4c1a7da1788e92154aab4d09071ded3178b38..99dc9bac945d02aea2dac5387eb8886542b73015 100644 (file)
@@ -47,14 +47,13 @@ final public class Table {
        public int resizethreshold;     // TODO:  MAKE PRIVATE
        private long lastliveslotseqn;  //smallest sequence number with a live entry
        private Random random = new Random();
-       private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
+       private long lastUncommittedTransaction = 0;
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
-       private List<Commit> commitList = null; // List of all the most recent live commits
-       private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
-
-       private Set<Abort> abortSet = null; // Set of the live aborts
+       private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
+       private Map<Long, Abort> abortMap = null; // Set of the live aborts
+       private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV       TODO: Make Private
        public  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV       TODO: Make Private
        private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
        public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
@@ -62,6 +61,8 @@ final public class Table {
        private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
        // private Set<Abort> arbitratorTable = null; // Table of arbitrators
        private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
+       private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
 
 
 
@@ -90,14 +91,17 @@ final public class Table {
 
        private void setupDataStructs() {
                pendingTransQueue = new LinkedList<PendingTransaction>();
-               commitList = new LinkedList<Commit>();
-               abortSet = new HashSet<Abort>();
+               commitMap = new HashMap<Long, Commit>();
+               abortMap = new HashMap<Long, Abort>();
+               committedMapByKey = new HashMap<IoTString, Commit>();
                commitedTable = new HashMap<IoTString, KeyValue>();
                speculativeTable = new HashMap<IoTString, KeyValue>();
                uncommittedTransactionsMap = new HashMap<Long, Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
                newKeyTable = new HashMap<IoTString, NewKey>();
-               newCommitMap = new HashMap<Long, Commit> ();
+               newCommitMap = new HashMap<Long, Commit>();
+               lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
+               lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
        }
 
        public void rebuild() {
@@ -142,7 +146,7 @@ final public class Table {
                System.out.println("New:   " + n);
                System.out.println("Size:   " + buffer.size());
                System.out.println("Commits Map:   " + commitedTable.size());
-               System.out.println("Commits List:   " + commitList.size());
+               System.out.println("Commits List:   " + commitMap.size());
        }
 
        public IoTString getCommitted(IoTString key) {
@@ -333,23 +337,16 @@ final public class Table {
                int newsize = 0;
                if (liveslotcount > resizethreshold) {
                        resize = true; //Resize is forced
-                       System.out.println("Live count resize: " + liveslotcount + "   " + resizethreshold);
-
                }
 
                if (resize) {
                        newsize = (int) (numslots * RESIZE_MULTIPLE);
-
-                       System.out.println("New Size:  " + newsize + "  old: " + buffer.oldestseqn); // TODO remove
-
                        TableStatus status = new TableStatus(s, newsize);
                        s.addEntry(status);
                }
 
                doRejectedMessages(s);
 
-
-
                ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
 
                // Resize was needed so redo call
@@ -361,11 +358,13 @@ final public class Table {
                boolean seenliveslot = retTup.getSecond();
                long seqn = retTup.getThird();
 
+
                doArbitration(s);
 
                Transaction trans = new Transaction(s,
                                                    s.getSequenceNumber(),
                                                    localmachineid,
+                                                   pendingTrans.getArbitrator(),
                                                    pendingTrans.getKVUpdates(),
                                                    pendingTrans.getGuard());
                boolean insertedTrans = false;
@@ -375,13 +374,7 @@ final public class Table {
                }
 
                doOptionalRescue(s, seenliveslot, seqn, resize);
-               insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
-
-               if (insertedTrans) {
-                       // System.out.println("Inserted: " + trans.getSequenceNumber());
-               }
-
-               return insertedTrans;
+               return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
        }
 
        private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
@@ -493,6 +486,7 @@ final public class Table {
                                        if (!resize) {
                                                System.out.println("B"); //?
 
+                                               // TODO delete
                                                System.out.println("==============================NEEEEDDDD RESIZING");
                                                return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
                                        }
@@ -521,8 +515,19 @@ final public class Table {
                        KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
                        // Check if this machine arbitrates for this transaction
                        if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+
+                               // TODO delete
+                               // if (localmachineid == 351) {
+                               //      System.out.println("Mis match Machine: " + localmachineid + "   Key: " + keyVal.getKey().toString());
+                               // }
                                continue;
                        }
+                       // else {
+                       //      // TODO delete
+                       //      if (localmachineid == 351) {
+                       //              System.out.println("Full Match Machine: " + localmachineid + "   Key: " + keyVal.getKey().toString());
+                       //      }
+                       // }
 
                        // we did have something to arbitrate on
                        didNeedArbitration = true;
@@ -530,7 +535,7 @@ final public class Table {
                        Entry newEntry = null;
 
                        try {
-                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                               if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
                                        // Guard evaluated as true
 
                                        // update the local tmp current key set
@@ -539,12 +544,12 @@ final public class Table {
                                        }
 
                                        // create the commit
-                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
                                } else {
                                        // Guard was false
 
                                        // create the abort
-                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
                                }
                        } catch (Exception e) {
                                e.printStackTrace();
@@ -604,8 +609,17 @@ final public class Table {
                        inserted = false;
                }
 
+
+               // TODO remove Timers
+               // long startTime = System.currentTimeMillis();
                /* update data structure */
                validateandupdate(array, true);
+               // long endTime = System.currentTimeMillis();
+
+               // long diff = endTime - startTime;
+               // if (diff >= 1) {
+               //      System.out.println("Time Taken: " + diff);
+               // }
 
                return inserted;
        }
@@ -631,8 +645,8 @@ final public class Table {
                        updateExpectedSize();
                }
 
-               proccessAllNewCommits();
 
+               boolean hasGap = false;
                /* If there is a gap, check to see if the server sent us everything. */
                if (firstseqnum != (sequencenumber + 1)) {
 
@@ -652,6 +666,12 @@ final public class Table {
                }
                sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
 
+               // Process all on key value pairs
+               proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
                // Speculate on key value pairs
                createSpeculativeTable();
        }
@@ -672,81 +692,156 @@ final public class Table {
                for (Long entrySeqNum : commitSeqNums) {
                        Commit entry = newCommitMap.get(entrySeqNum);
 
+                       long lastCommitSeenSeqNum = 0;
+
+                       if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
+                               lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
+                       }
+
                        if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
 
-                               // Remove any old commits
-                               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
-                                       Commit prevcommit = i.next();
+                               Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
 
-                                       if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
-                                               prevcommit.setDead();
-                                               i.remove();
+                               if (prevCommit != null) {
+                                       prevCommit.setDead();
+
+                                       for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
+                                               committedMapByKey.put(kv.getKey(), entry);
                                        }
                                }
-                               commitList.add(entry);
+
                                continue;
                        }
 
-                       // Remove any old commits
-                       for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
-                               Commit prevcommit = i.next();
-                               prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+                       Set<Commit> commitsToEditSet = new HashSet<Commit>();
+
+                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                               commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
+                       }
+
+                       commitsToEditSet.remove(null);
+
+                       for (Commit prevCommit : commitsToEditSet) {
 
-                               if (!prevcommit.isLive()) {
-                                       i.remove();
+                               Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                               if (!prevCommit.isLive()) {
+                                       commitMap.remove(prevCommit.getTransSequenceNumber());
                                }
                        }
 
-                       // Add the new commit
-                       commitList.add(entry);
-                       lastCommitSeenSeqNum = entry.getTransSequenceNumber();
-                       // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
+                       // // Remove any old commits
+                       // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
+                       //      Commit prevCommit = i.next().getValue();
+                       //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                       //      if (!prevCommit.isLive()) {
+                       //              i.remove();
+                       //      }
+                       // }
+
+                       // Remove any old commits
+                       // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
+                       //      Commit prevCommit = i.next().getValue();
 
+                       //      if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
+                       //              continue;
+                       //      }
+
+                       //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                       //      if (!prevCommit.isLive()) {
+                       //              i.remove();
+                       //      }
+                       // }
+
+
+                       // Add the new commit
+                       commitMap.put(entry.getTransSequenceNumber(), entry);
+                       lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
 
                        // Update the committed table list
                        for (KeyValue kv : entry.getkeyValueUpdateSet()) {
                                IoTString key = kv.getKey();
                                commitedTable.put(key, kv);
-                       }
 
-                       long committedTransSeq = entry.getTransSequenceNumber();
-
-                       // Make dead the transactions
-                       for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
-                               Transaction prevtrans = i.next().getValue();
-
-                               if (prevtrans.getSequenceNumber() <= committedTransSeq) {
-                                       i.remove();
-                                       prevtrans.setDead();
-                               }
+                               committedMapByKey.put(key, entry);
                        }
                }
 
-
                // Clear the new commits storage so we can use it later
                newCommitMap.clear();
        }
 
+       private void deleteDeadUncommittedTransactions() {
+               // Make dead the transactions
+               for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next().getValue();
+                       long transArb = prevtrans.getArbitrator();
+
+                       if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
+                               (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
+                               i.remove();
+                               prevtrans.setDead();
+                       }
+               }
+       }
+
        private void createSpeculativeTable() {
-               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+
+               if (uncommittedTransactionsMap.keySet().size() == 0) {
+                       speculativeTable = commitedTable; // Ok that they are the same object
+                       return;
+               }
+
+               Map speculativeTableTmp = null;
                List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
 
                // Sort from oldest to newest commit
                Collections.sort(utSeqNums);
 
+               if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
+                       speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
 
-               for (Long key : utSeqNums) {
-                       Transaction trans = uncommittedTransactionsMap.get(key);
+                       for (Long key : utSeqNums) {
+                               Transaction trans = uncommittedTransactionsMap.get(key);
 
-                       try {
-                               if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
-                                               speculativeTableTmp.put(kv.getKey(), kv);
+                               lastUncommittedTransaction = key;
+
+                               try {
+                                       if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
+                                               for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                                       speculativeTableTmp.put(kv.getKey(), kv);
+                                               }
                                        }
+
+                               } catch (Exception e) {
+                                       e.printStackTrace();
                                }
+                       }
+               } else {
+                       speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
 
-                       } catch (Exception e) {
-                               e.printStackTrace();
+                       for (Long key : utSeqNums) {
+
+                               if (key <= lastUncommittedTransaction) {
+                                       continue;
+                               }
+
+                               lastUncommittedTransaction = key;
+
+                               Transaction trans = uncommittedTransactionsMap.get(key);
+
+                               try {
+                                       if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
+                                               for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                                       speculativeTableTmp.put(kv.getKey(), kv);
+                                               }
+                                       }
+
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
                        }
                }
 
@@ -780,7 +875,6 @@ final public class Table {
 
        private void commitNewMaxSize() {
                if (numslots != currmaxsize) {
-                       System.out.println("Resizing the buffer"); // TODO: Remove
                        buffer.resize(currmaxsize);
                }
 
@@ -850,21 +944,17 @@ final public class Table {
 
                if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
                        // Abort has not been seen yet so we need to keep track of it
-                       abortSet.add(entry);
+
+                       Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
+                       if (prevAbort != null) {
+                               prevAbort.setDead(); // delete old version of the duplicate
+                       }
                } else {
                        // The machine already saw this so it is dead
                        entry.setDead();
                }
 
-               // Make dead the transactions
-               for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
-                       Transaction prevtrans = i.next().getValue();
-
-                       if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
-                               i.remove();
-                               prevtrans.setDead();
-                       }
-               }
+               lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
        }
 
        private void processEntry(Commit entry, Slot s) {
@@ -882,7 +972,6 @@ final public class Table {
                lastTableStatus = entry;
        }
 
-
        private void addWatchList(long machineid, RejectedMessage entry) {
                HashSet<RejectedMessage> entries = watchlist.get(machineid);
                if (entries == null)
@@ -918,16 +1007,15 @@ final public class Table {
                }
 
                // Set dead the abort
-               for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
-                       Abort abort = ait.next();
+               for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
+                       Abort abort = i.next().getValue();
 
                        if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
                                abort.setDead();
-                               ait.remove();
+                               i.remove();
                        }
                }
 
-
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;