--- /dev/null
+package iotcloud;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.Random;
+
+/**
+ * IoTTable data structure. Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+final public class Table {
+ private int numslots; //number of slots stored in buffer
+
+ //table of key-value pairs
+ private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+
+ // machine id -> (sequence number, Slot or LastMessage); records last message by each client
+ private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ // machine id -> ...
+ private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+ private Vector<Long> rejectedmessagelist=new Vector<Long>();
+ private SlotBuffer buffer;
+ private CloudComm cloud;
+ private long sequencenumber; //Largest sequence number a client has received
+ private long localmachineid;
+ private TableStatus lastTableStatus;
+ static final int FREE_SLOTS = 10; //number of slots that should be kept free
+ static final int SKIP_THRESHOLD = 10;
+ private long liveslotcount=0;
+ private int chance;
+ static final double RESIZE_MULTIPLE = 1.2;
+ static final double RESIZE_THRESHOLD = 0.75;
+ static final int REJECTED_THRESHOLD = 5;
+ private int resizethreshold;
+ private long lastliveslotseqn; //smallest sequence number with a live entry
+ private Random random=new Random();
+
+ public Table(String baseurl, String password, long _localmachineid) {
+ localmachineid=_localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ setResizeThreshold();
+ sequencenumber = 0;
+ cloud=new CloudComm(this, baseurl, password);
+ lastliveslotseqn = 1;
+ }
+
+ public Table(CloudComm _cloud, long _localmachineid) {
+ localmachineid=_localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ setResizeThreshold();
+ sequencenumber = 0;
+ cloud=_cloud;
+ }
+
+ public void rebuild() {
+ Slot[] newslots=cloud.getSlots(sequencenumber+1);
+ validateandupdate(newslots, true);
+ }
+
+ public void update() {
+ Slot[] newslots=cloud.getSlots(sequencenumber+1);
+
+ validateandupdate(newslots, false);
+ }
+
+ public IoTString get(IoTString key) {
+ KeyValue kv=table.get(key);
+ if (kv != null)
+ return kv.getValue();
+ else
+ return null;
+ }
+
+ public void initTable() {
+ cloud.setSalt();//Set the salt
+ Slot s=new Slot(this, 1, localmachineid);
+ TableStatus status=new TableStatus(s, numslots);
+ s.addEntry(status);
+ Slot[] array=cloud.putSlot(s, numslots);
+ if (array == null) {
+ array = new Slot[] {s};
+ /* update data structure */
+ validateandupdate(array, true);
+ } else {
+ throw new Error("Error on initialization");
+ }
+ }
+
+ public String toString() {
+ return table.toString();
+ }
+
+ public IoTString put(IoTString key, IoTString value) {
+ while(true) {
+ KeyValue oldvalue=table.get(key);
+ if (tryput(key, value, false)) {
+ if (oldvalue==null)
+ return null;
+ else
+ return oldvalue.getValue();
+ }
+ }
+ }
+
+ void decrementLiveCount() {
+ liveslotcount--;
+ }
+
+
+ private void setResizeThreshold() {
+ int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
+ resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
+ }
+
+ private boolean tryput(IoTString key, IoTString value, boolean resize) {
+ Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize=true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status=new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn=rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn=rejectedmessagelist.lastElement();
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i=0;
+ /* Go through list of missing messages */
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn=curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn=rejectedmessagelist.get(i);
+ Slot s_msg=buffer.getSlot(curr_seqn);
+ long machineid=s_msg.getMachineID();
+ RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+
+ for(; seqn < threshold; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (! seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (! prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn==firstiffull) { //if there's no space but the entry is about to fall off the queue
+ if (!resize) {
+ System.out.print("B"); //?
+ return tryput(key, value, true);
+ }
+ }
+ }
+ }
+
+ KeyValue kv=new KeyValue(s, key, value);
+ boolean insertedkv=false;
+ if (s.hasSpace(kv)) {
+ s.addEntry(kv);
+ insertedkv=true;
+ }
+
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount=0;
+ search:
+ for(; seqn <= newestseqnum; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
+ }
+ }
+
+ int max=0;
+ if (resize)
+ max = newsize;
+ Slot[] array=cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ insertedkv=false;
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+
+ return insertedkv;
+ }
+
+ private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
+ if (newslots.length==0) return;
+
+ long firstseqnum=newslots[0].getSequenceNumber();
+ if (firstseqnum <= sequencenumber)
+ throw new Error("Server Error: Sent older slots!");
+
+ SlotIndexer indexer = new SlotIndexer(newslots, buffer);
+ checkHMACChain(indexer, newslots);
+
+ HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet()); //
+
+ initExpectedSize(firstseqnum);
+ for(Slot slot: newslots) {
+ processSlot(indexer, slot, acceptupdatestolocal, machineSet);
+ updateExpectedSize();
+ }
+
+ /* If there is a gap, check to see if the server sent us everything. */
+ if (firstseqnum != (sequencenumber+1)) {
+ checkNumSlots(newslots.length);
+ if (!machineSet.isEmpty())
+ throw new Error("Missing record for machines: "+machineSet);
+ }
+
+ commitNewMaxSize();
+
+ /* Commit new to slots. */
+ for(Slot slot:newslots) {
+ buffer.putSlot(slot);
+ liveslotcount++;
+ }
+ sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+ }
+
+ private int expectedsize, currmaxsize;
+
+ private void checkNumSlots(int numslots) {
+ if (numslots != expectedsize)
+ throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
+ }
+
+ private void initExpectedSize(long firstsequencenumber) {
+ long prevslots = firstsequencenumber;
+ expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
+ currmaxsize = numslots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currmaxsize)
+ expectedsize = currmaxsize;
+ }
+
+ private void updateCurrMaxSize(int newmaxsize) {
+ currmaxsize=newmaxsize;
+ }
+
+ private void commitNewMaxSize() {
+ if (numslots != currmaxsize)
+ buffer.resize(currmaxsize);
+
+ numslots=currmaxsize;
+ setResizeThreshold();
+ }
+
+ private void processEntry(KeyValue entry, SlotIndexer indexer) {
+ IoTString key=entry.getKey();
+ KeyValue oldvalue=table.get(key);
+ if (oldvalue != null) {
+ oldvalue.setDead();
+ }
+ table.put(key, entry);
+ }
+
+ private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
+ }
+
+ private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+ long oldseqnum=entry.getOldSeqNum();
+ long newseqnum=entry.getNewSeqNum();
+ boolean isequal=entry.getEqual();
+ long machineid=entry.getMachineID();
+ for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
+ Slot slot=indexer.getSlot(seqnum);
+ if (slot != null) {
+ long slotmachineid=slot.getMachineID();
+ if (isequal != (slotmachineid==machineid)) {
+ throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
+ }
+ }
+ }
+
+ HashSet<Long> watchset=new HashSet<Long>();
+ for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
+ long entry_mid=lastmsg_entry.getKey();
+ /* We've seen it, don't need to continue to watch. Our next
+ * message will implicitly acknowledge it. */
+ if (entry_mid == localmachineid)
+ continue;
+ Pair<Long, Liveness> v=lastmsg_entry.getValue();
+ long entry_seqn=v.getFirst();
+ if (entry_seqn < newseqnum) {
+ addWatchList(entry_mid, entry);
+ watchset.add(entry_mid);
+ }
+ }
+ if (watchset.isEmpty())
+ entry.setDead();
+ else
+ entry.setWatchSet(watchset);
+ }
+
+ private void addWatchList(long machineid, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries=watchlist.get(machineid);
+ if (entries == null)
+ watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+ entries.add(entry);
+ }
+
+ private void processEntry(TableStatus entry, SlotIndexer indexer) {
+ int newnumslots=entry.getMaxSlots();
+ updateCurrMaxSize(newnumslots);
+ if (lastTableStatus != null)
+ lastTableStatus.setDead();
+ lastTableStatus = entry;
+ }
+
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ machineSet.remove(machineid);
+
+ HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+ if (watchset != null) {
+ for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+ RejectedMessage rm=rmit.next();
+ if (rm.getNewSeqNum() <= seqnum) {
+ /* Remove it from our watchlist */
+ rmit.remove();
+ /* Decrement machines that need to see this notification */
+ rm.removeWatcher(machineid);
+ }
+ }
+ }
+
+ if (machineid == localmachineid) {
+ /* Our own messages are immediately dead. */
+ if (liveness instanceof LastMessage) {
+ ((LastMessage)liveness).setDead();
+ } else if (liveness instanceof Slot) {
+ ((Slot)liveness).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+
+ Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+ if (lastmsgentry == null)
+ return;
+
+ long lastmsgseqnum = lastmsgentry.getFirst();
+ Liveness lastentry = lastmsgentry.getSecond();
+ if (machineid != localmachineid) {
+ if (lastentry instanceof LastMessage) {
+ ((LastMessage)lastentry).setDead();
+ } else if (lastentry instanceof Slot) {
+ ((Slot)lastentry).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+ if (machineid == localmachineid) {
+ if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
+ throw new Error("Server Error: Mismatch on local machine sequence number");
+ } else {
+ if (lastmsgseqnum > seqnum)
+ throw new Error("Server Error: Rollback on remote machine sequence number");
+ }
+ }
+
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
+ for(Entry entry : slot.getEntries()) {
+ switch(entry.getType()) {
+ case Entry.TypeKeyValue:
+ processEntry((KeyValue)entry, indexer);
+ break;
+
+ case Entry.TypeLastMessage:
+ processEntry((LastMessage)entry, indexer, machineSet);
+ break;
+
+ case Entry.TypeRejectedMessage:
+ processEntry((RejectedMessage)entry, indexer);
+ break;
+
+ case Entry.TypeTableStatus:
+ processEntry((TableStatus)entry, indexer);
+ break;
+
+ default:
+ throw new Error("Unrecognized type: "+entry.getType());
+ }
+ }
+ }
+
+ private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+ for(int i=0; i < newslots.length; i++) {
+ Slot currslot=newslots[i];
+ Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
+ if (prevslot != null &&
+ !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
+ throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
+ }
+ }
+}