From 1d9e7bc23a1a5c9183e123de76fe3ea7bcdea02c Mon Sep 17 00:00:00 2001 From: bdemsky Date: Sat, 23 Jul 2016 22:01:17 -0700 Subject: [PATCH] --amend --- src/java/iotcloud/Makefile | 2 +- src/java/iotcloud/Table.java | 59 +++++++++++++++++++--------- src/java/iotcloud/Test.java | 1 + src/java/iotcloud/TestCloudComm.java | 3 +- 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/java/iotcloud/Makefile b/src/java/iotcloud/Makefile index df6ceeb..2d45b63 100644 --- a/src/java/iotcloud/Makefile +++ b/src/java/iotcloud/Makefile @@ -9,7 +9,7 @@ server: $(JAVAC) -d $(BIN_DIR) *.java doc: server - $(JAVADOC) -d $(DOCS_DIR) *.java + $(JAVADOC) -private -d $(DOCS_DIR) *.java clean: rm -r bin/* diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java index c31aed8..d3bac3d 100644 --- a/src/java/iotcloud/Table.java +++ b/src/java/iotcloud/Table.java @@ -22,14 +22,15 @@ final public class Table { localmachineid=_localmachineid; buffer = new SlotBuffer(); numslots = buffer.capacity(); - sequencenumber = 1; + sequencenumber = 0; initCloud(baseurl, password); } public Table(CloudComm _cloud, long _localmachineid) { localmachineid=_localmachineid; buffer = new SlotBuffer(); - sequencenumber = 1; + numslots = buffer.capacity(); + sequencenumber = 0; cloud=_cloud; } @@ -60,8 +61,8 @@ final public class Table { } public void update() { - Slot[] newslots=cloud.getSlots(sequencenumber); - validateandupdate(newslots); + Slot[] newslots=cloud.getSlots(sequencenumber+1); + validateandupdate(newslots, false); } public IoTString get(IoTString key) { @@ -79,7 +80,7 @@ final public class Table { Slot[] array=cloud.putSlot(s, numslots); if (array == null) { array = new Slot[] {s}; - validateandupdate(array); // update data structure + validateandupdate(array, true); // update data structure } else { throw new Error("Error on initialization"); } @@ -89,7 +90,10 @@ final public class Table { while(true) { KeyValue oldvalue=table.get(key); if (tryput(key, value, false)) { - return oldvalue.getValue(); + if (oldvalue==null) + return null; + else + return oldvalue.getValue(); } } } @@ -112,6 +116,8 @@ final public class Table { continue; Vector liveentries = prevslot.getLiveEntries(); for(Entry liveentry:liveentries) { + if (redundant(liveentry)) + continue; if (s.hasSpace(liveentry)) s.addEntry(liveentry); else if (i==0) { @@ -139,6 +145,8 @@ final public class Table { continue; Vector liveentries = prevslot.getLiveEntries(); for(Entry liveentry:liveentries) { + if (redundant(liveentry)) + continue; if (s.hasSpace(liveentry)) s.addEntry(liveentry); else @@ -155,19 +163,28 @@ final public class Table { else insertedkv=false; - validateandupdate(array); // update data structure + validateandupdate(array, true); // update data structure return insertedkv; } - private void validateandupdate(Slot[] newslots) { + boolean redundant(Entry liveentry) { + if (liveentry.getType()==Entry.TypeLastMessage) { + LastMessage lastmsg=(LastMessage) liveentry; + return lastmsg.getMachineID() == localmachineid; + } + return false; + } + + + private void validateandupdate(Slot[] newslots, boolean isput) { //The cloud communication layer has checked slot HMACs already //before decoding if (newslots.length==0) return; long firstseqnum=newslots[0].getSequenceNumber(); - if (firstseqnum < sequencenumber) + if (firstseqnum <= sequencenumber) throw new Error("Server Error: Sent older slots!"); SlotIndexer indexer = new SlotIndexer(newslots, buffer); @@ -176,26 +193,32 @@ final public class Table { initExpectedSize(); for(Slot slot: newslots) { updateExpectedSize(); - processSlot(indexer, slot); + processSlot(indexer, slot, isput); } - checkNumSlots(newslots.length); + + //If there is a gap, check to see if the server sent us everything + if (firstseqnum != (sequencenumber+1)) + checkNumSlots(newslots.length); + commitNewMaxSize(); //commit new to slots for(Slot slot:newslots) { buffer.putSlot(slot); } + sequencenumber = newslots[newslots.length - 1].getSequenceNumber(); } private int expectedsize, currmaxsize; private void checkNumSlots(int numslots) { if (numslots != expectedsize) - throw new Error("Server Error: Server did not send all slots"); + throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots); } private void initExpectedSize() { - expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots; + long prevslots = sequencenumber; + expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots; currmaxsize = numslots; } @@ -226,7 +249,7 @@ final public class Table { } private void processEntry(LastMessage entry, SlotIndexer indexer) { - updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry); + updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false); } private void processEntry(RejectedMessage entry, SlotIndexer indexer) { @@ -253,7 +276,7 @@ final public class Table { lastTableStatus = entry; } - private void updateLastMessage(long machineid, long seqnum, Liveness liveness) { + private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) { Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); if (lastmsgentry == null) return; @@ -269,7 +292,7 @@ final public class Table { } if (machineid == localmachineid) { - if (lastmsgseqnum != seqnum) + if (lastmsgseqnum != seqnum && !isput) throw new Error("Server Error: Mismatch on local machine sequence number"); } else { if (lastmsgseqnum > seqnum) @@ -277,8 +300,8 @@ final public class Table { } } - private void processSlot(SlotIndexer indexer, Slot slot) { - updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot); + private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) { + updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput); for(Entry entry : slot.getEntries()) { switch(entry.getType()) { diff --git a/src/java/iotcloud/Test.java b/src/java/iotcloud/Test.java index 5ecd9d2..97057dd 100644 --- a/src/java/iotcloud/Test.java +++ b/src/java/iotcloud/Test.java @@ -14,6 +14,7 @@ public class Test { IoTString ib=new IoTString(b); t1.put(ia, ia); t2.put(ib, ib); + t1.update(); System.out.println(ib+"->"+t1.get(ib)); System.out.println(ia+"->"+t2.get(ia)); } diff --git a/src/java/iotcloud/TestCloudComm.java b/src/java/iotcloud/TestCloudComm.java index ce5b9ba..6c18314 100644 --- a/src/java/iotcloud/TestCloudComm.java +++ b/src/java/iotcloud/TestCloudComm.java @@ -12,7 +12,8 @@ class TestCloudComm extends CloudComm { } public synchronized Slot[] putSlot(Slot slot, int max) { - if (buffer.getNewestSeqNum()+1 == slot.getSequenceNumber()) { + if ((buffer.size()==0 && 1 == slot.getSequenceNumber()) || + buffer.getNewestSeqNum()+1 == slot.getSequenceNumber()) { if (max!=0) buffer.resize(max); buffer.putSlot(slot); -- 2.34.1