Fixes to tex doc; Code Updates
[iotcloud.git] / src2 / java / iotcloud / Table.java
index 732867fc4cd7bc0f49286f202cd5b43f9b31f659..1cdb3d5b11b789d3cb29afac838cb4e0d2ca11d4 100644 (file)
@@ -9,6 +9,8 @@ import java.util.Random;
 import java.util.Queue;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.Collection;
 
 /**
  * IoTTable data structure.  Provides client inferface.
@@ -20,7 +22,7 @@ final public class Table {
        private int numslots;   //number of slots stored in buffer
 
        //table of key-value pairs
-       private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
+       //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
 
        // machine id -> (sequence number, Slot or LastMessage); records last message by each client
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
@@ -46,10 +48,12 @@ final public class Table {
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
        private List<Commit> commitList = null; // List of all the most recent live commits
+       private Set<Abort> abortSet = null; // Set of the live aborts
        private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
        private List<Transaction> uncommittedTransactionsList = null; //
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
-       private Set<Abort> arbitratorTable = null; // Table of arbitrators
+       // private Set<Abort> arbitratorTable = null; // Table of arbitrators
 
 
        public Table(String baseurl, String password, long _localmachineid) {
@@ -63,7 +67,9 @@ final public class Table {
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
                commitList = new LinkedList<Commit>();
+               abortSet = new HashSet<Abort>();
                commitedTable = new HashMap<IoTString, KeyValue>();
+               speculativeTable = new HashMap<IoTString, KeyValue>();
                uncommittedTransactionsList = new LinkedList<Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
        }
@@ -78,7 +84,9 @@ final public class Table {
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
                commitList = new LinkedList<Commit>();
+               abortSet = new HashSet<Abort>();
                commitedTable = new HashMap<IoTString, KeyValue>();
+               speculativeTable = new HashMap<IoTString, KeyValue>();
                uncommittedTransactionsList = new LinkedList<Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
        }
@@ -88,20 +96,27 @@ final public class Table {
                validateandupdate(newslots, true);
        }
 
-       public void update() {
-               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
 
-               validateandupdate(newslots, false);
+
+       public IoTString getCommitted(IoTString key) {
+               KeyValue kv = commitedTable.get(key);
+               if (kv != null) {
+                       return kv.getValue();
+               } else {
+                       return null;
+               }
        }
 
-       public IoTString get(IoTString key) {
-               KeyValue kv = table.get(key);
-               if (kv != null)
+       public IoTString getSpeculative(IoTString key) {
+               KeyValue kv = speculativeTable.get(key);
+               if (kv != null) {
                        return kv.getValue();
-               else
+               } else {
                        return null;
+               }
        }
 
+
        public void initTable() {
                cloud.setSalt();//Set the salt
                Slot s = new Slot(this, 1, localmachineid);
@@ -118,9 +133,26 @@ final public class Table {
        }
 
        public String toString() {
-               return table.toString();
+
+
+               String retString = " Committed Table: \n";
+               retString += "---------------------------\n";
+               retString += commitedTable.toString();
+
+               retString += "\n\n";
+
+               retString += " Speculative Table: \n";
+               retString += "---------------------------\n";
+               retString += speculativeTable.toString();
+
+               return retString;
        }
 
+
+
+
+
+
        public void startTransaction() {
                // Create a new transaction, invalidates any old pending transactions.
                pendingTransBuild = new PendingTransaction();
@@ -128,6 +160,11 @@ final public class Table {
 
        public void commitTransaction() {
 
+               if (pendingTransBuild.getKVUpdates().size() == 0) {
+                       // If no updates are made then there is no point inserting into the chain
+                       return;
+               }
+
                // Add the pending transaction to the queue
                pendingTransQueue.add(pendingTransBuild);
 
@@ -139,15 +176,48 @@ final public class Table {
        }
 
        public void addKV(IoTString key, IoTString value) {
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw and error
+                       throw new Error("Not all Key Values match");
+               }
+
+
+
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
 
+
+       // TODo: FIx Guard
        public void addGuard(IoTString key, IoTString value) {
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
 
+       public void update() {
+               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+
+               validateandupdate(newslots, false);
+       }
+
+       public boolean createNewKey(IoTString keyName, long machineId) {
+
+               while (true) {
+                       if (arbitratorTable.get(keyName) != null) {
+                               // There is already an arbitrator
+                               return false;
+                       }
+
+                       if (tryput(keyName, machineId, false)) {
+
+                               // If successfully inserted
+                               return true;
+                       }
+               }
+       }
+
        void decrementLiveCount() {
                liveslotcount--;
        }
@@ -218,6 +288,8 @@ final public class Table {
                long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
                long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
 
+
+               // Mandatory Rescue
                for (; seqn < threshold; seqn++) {
                        Slot prevslot = buffer.getSlot(seqn);
                        //Push slot number forward
@@ -241,6 +313,46 @@ final public class Table {
                }
 
 
+               // Arbitrate
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               for (Transaction ut : uncommittedTransactionsList) {
+
+                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                       // Check if this machine arbitrates for this transaction
+                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                               continue;
+                       }
+
+                       Entry newEntry = null;
+
+                       try {
+                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       // Guard evaluated as true
+
+                                       // update the local tmp current key set
+                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+
+                                       // create the commit
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                               } else {
+                                       // Guard was false
+
+                                       // create the abort
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
+               }
+
                Transaction trans = new Transaction(s,
                                                    s.getSequenceNumber(),
                                                    localmachineid,
@@ -298,14 +410,195 @@ final public class Table {
                return insertedTrans;
        }
 
+       private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
+               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+               int newsize = 0;
+               if (liveslotcount > resizethreshold) {
+                       resize = true; //Resize is forced
+               }
+
+               if (resize) {
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
+               }
+
+               if (! rejectedmessagelist.isEmpty()) {
+                       /* TODO: We should avoid generating a rejected message entry if
+                        * there is already a sufficient entry in the queue (e.g.,
+                        * equalsto value of true and same sequence number).  */
+
+                       long old_seqn = rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn = rejectedmessagelist.lastElement();
+                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn = -1;
+                               int i = 0;
+                               /* Go through list of missing messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       if (s_msg != null)
+                                               break;
+                                       prev_seqn = curr_seqn;
+                               }
+                               /* Generate rejected message entry for missing messages */
+                               if (prev_seqn != -1) {
+                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                               /* Generate rejected message entries for present messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       long machineid = s_msg.getMachineID();
+                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+
+               long newestseqnum = buffer.getNewestSeqNum();
+               long oldestseqnum = buffer.getOldestSeqNum();
+               if (lastliveslotseqn < oldestseqnum)
+                       lastliveslotseqn = oldestseqnum;
+
+               long seqn = lastliveslotseqn;
+               boolean seenliveslot = false;
+               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
+
+
+               // Mandatory Rescue
+               for (; seqn < threshold; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (! seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (! prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry)) {
+                                       s.addEntry(liveentry);
+                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+                                       if (!resize) {
+                                               System.out.print("B"); //?
+                                               return tryput(keyName, arbMachineid, true);
+                                       }
+                               }
+                       }
+               }
+
+
+               // Arbitrate
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               for (Transaction ut : uncommittedTransactionsList) {
+
+                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                       // Check if this machine arbitrates for this transaction
+                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                               continue;
+                       }
+
+                       Entry newEntry = null;
+
+                       try {
+                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       // Guard evaluated as true
+
+                                       // update the local tmp current key set
+                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+
+                                       // create the commit
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                               } else {
+                                       // Guard was false
+
+                                       // create the abort
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
+               }
+
+
+               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+               boolean insertedNewKey = false;
+               if (s.hasSpace(newKey)) {
+                       s.addEntry(newKey);
+                       insertedNewKey = true;
+               }
+
+               /* now go through live entries from least to greatest sequence number until
+                * either all live slots added, or the slot doesn't have enough room
+                * for SKIP_THRESHOLD consecutive entries*/
+               int skipcount = 0;
+               search:
+               for (; seqn <= newestseqnum; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (!seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (!prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry))
+                                       s.addEntry(liveentry);
+                               else {
+                                       skipcount++;
+                                       if (skipcount > SKIP_THRESHOLD)
+                                               break search;
+                               }
+                       }
+               }
+
+               int max = 0;
+               if (resize)
+                       max = newsize;
+               Slot[] array = cloud.putSlot(s, max);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       rejectedmessagelist.clear();
+               }       else {
+                       if (array.length == 0)
+                               throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
+                       insertedNewKey = false;
+               }
+
+               /* update data structure */
+               validateandupdate(array, true);
+
+               return insertedNewKey;
+       }
+
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
                /* The cloud communication layer has checked slot HMACs already
                         before decoding */
                if (newslots.length == 0) return;
 
                long firstseqnum = newslots[0].getSequenceNumber();
-               if (firstseqnum <= sequencenumber)
+               if (firstseqnum <= sequencenumber) {
                        throw new Error("Server Error: Sent older slots!");
+               }
 
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
@@ -320,9 +613,12 @@ final public class Table {
 
                /* If there is a gap, check to see if the server sent us everything. */
                if (firstseqnum != (sequencenumber + 1)) {
+
+                       // TODO: Check size
                        checkNumSlots(newslots.length);
-                       if (!machineSet.isEmpty())
+                       if (!machineSet.isEmpty()) {
                                throw new Error("Missing record for machines: " + machineSet);
+                       }
                }
 
                commitNewMaxSize();
@@ -333,13 +629,37 @@ final public class Table {
                        liveslotcount++;
                }
                sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+
+               // Speculate on key value pairs
+               createSpeculativeTable();
+       }
+
+       private void createSpeculativeTable() {
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+
+               for (Transaction trans : uncommittedTransactionsList) {
+
+                       try {
+                               if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+                               }
+
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               speculativeTable = speculativeTableTmp;
        }
 
        private int expectedsize, currmaxsize;
 
        private void checkNumSlots(int numslots) {
-               if (numslots != expectedsize)
+               if (numslots != expectedsize) {
                        throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
+               }
        }
 
        private void initExpectedSize(long firstsequencenumber) {
@@ -350,8 +670,9 @@ final public class Table {
 
        private void updateExpectedSize() {
                expectedsize++;
-               if (expectedsize > currmaxsize)
+               if (expectedsize > currmaxsize) {
                        expectedsize = currmaxsize;
+               }
        }
 
        private void updateCurrMaxSize(int newmaxsize) {
@@ -366,6 +687,10 @@ final public class Table {
                setResizeThreshold();
        }
 
+
+
+
+
        private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
@@ -414,6 +739,16 @@ final public class Table {
        }
 
        private void processEntry(Abort entry) {
+
+
+               if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
+                       // Abort has not been seen yet so we need to keep track of it
+                       abortSet.add(entry);
+               } else {
+                       // The machine already saw this so it is dead
+                       entry.setDead();
+               }
+
                for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
                        Transaction prevtrans = i.next();
                        if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
@@ -456,13 +791,6 @@ final public class Table {
                }
        }
 
-       private void addWatchList(long machineid, RejectedMessage entry) {
-               HashSet<RejectedMessage> entries = watchlist.get(machineid);
-               if (entries == null)
-                       watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
-               entries.add(entry);
-       }
-
        private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
                updateCurrMaxSize(newnumslots);
@@ -471,6 +799,14 @@ final public class Table {
                lastTableStatus = entry;
        }
 
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries = watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+               entries.add(entry);
+       }
+
        private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
                machineSet.remove(machineid);
 
@@ -498,6 +834,16 @@ final public class Table {
                        }
                }
 
+               // Set dead the abort
+               for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
+                       Abort abort = ait.next();
+
+                       if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
+                               abort.setDead();
+                               ait.remove();
+                       }
+               }
+
 
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)