From: bdemsky Date: Sun, 24 Jul 2016 06:58:15 +0000 (-0700) Subject: edits X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=6374679089d86834a93d2ce13c33a42544606b5e;p=iotcloud.git edits --- diff --git a/src/java/iotcloud/CloudComm.java b/src/java/iotcloud/CloudComm.java index 6d421b5..84d7ae4 100644 --- a/src/java/iotcloud/CloudComm.java +++ b/src/java/iotcloud/CloudComm.java @@ -12,7 +12,7 @@ class CloudComm { CloudComm() { } - + CloudComm(String _baseurl, Cipher _encrypt, Cipher _decrypt, Mac _mac) { this.baseurl=_baseurl; this.encryptcipher = _encrypt; diff --git a/src/java/iotcloud/Pair.java b/src/java/iotcloud/Pair.java index 5cbe1db..73ed6bd 100644 --- a/src/java/iotcloud/Pair.java +++ b/src/java/iotcloud/Pair.java @@ -3,7 +3,7 @@ package iotcloud; class Pair { private A a; private B b; - + Pair(A a, B b) { this.a=a; this.b=b; diff --git a/src/java/iotcloud/RejectedMessage.java b/src/java/iotcloud/RejectedMessage.java index bd52ed0..167521f 100644 --- a/src/java/iotcloud/RejectedMessage.java +++ b/src/java/iotcloud/RejectedMessage.java @@ -3,9 +3,9 @@ import java.nio.ByteBuffer; class RejectedMessage extends Entry { private long machineid; - private long oldseqnum; //Oldest seqnum in range - private long newseqnum; //Newest seqnum in range (inclusive) - private boolean equalto; //Is message sent or not sent by machineid + private long oldseqnum; //Oldest seqnum in range + private long newseqnum; //Newest seqnum in range (inclusive) + private boolean equalto; //Is message sent or not sent by machineid RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) { super(slot); diff --git a/src/java/iotcloud/Slot.java b/src/java/iotcloud/Slot.java index 115c761..ea30ddb 100644 --- a/src/java/iotcloud/Slot.java +++ b/src/java/iotcloud/Slot.java @@ -17,7 +17,7 @@ class Slot implements Liveness { private int livecount; private boolean seqnumlive; private int freespace; - + Slot(long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) { seqnum=_seqnum; machineid=_machineid; @@ -60,7 +60,7 @@ class Slot implements Liveness { int newfreespace = freespace - e.getSize(); return newfreespace >= 0; } - + Vector getEntries() { return entries; } @@ -92,7 +92,7 @@ class Slot implements Liveness { byte[] encode(Mac mac) { byte[] array=new byte[SLOT_SIZE]; ByteBuffer bb=ByteBuffer.wrap(array); - bb.position(HMAC_SIZE); //Leave space for the HMACs + bb.position(HMAC_SIZE); //Leave space for the HMACs bb.put(prevhmac); bb.putLong(seqnum); bb.putLong(machineid); @@ -111,7 +111,7 @@ class Slot implements Liveness { int getBaseSize() { return 2*HMAC_SIZE+2*Long.BYTES+Integer.BYTES; } - + Vector getLiveEntries() { Vector liveEntries=new Vector(); for(Entry entry: entries) @@ -120,10 +120,10 @@ class Slot implements Liveness { if (seqnumlive) liveEntries.add(new LastMessage(this, machineid, seqnum)); - + return liveEntries; } - + long getSequenceNumber() { return seqnum; } @@ -140,7 +140,7 @@ class Slot implements Liveness { decrementLiveCount(); seqnumlive=false; } - + void decrementLiveCount() { livecount--; } diff --git a/src/java/iotcloud/SlotBuffer.java b/src/java/iotcloud/SlotBuffer.java index 054d4fc..3d5a465 100644 --- a/src/java/iotcloud/SlotBuffer.java +++ b/src/java/iotcloud/SlotBuffer.java @@ -23,7 +23,7 @@ class SlotBuffer { int capacity() { return array.length - 1; } - + void resize(int newsize) { if (newsize == (array.length-1)) return; @@ -40,20 +40,27 @@ class SlotBuffer { head = 0; } - void putSlot(Slot s) { - array[head]=s; + private void incrementHead() { head++; if (head >= array.length) head=0; - + } + + private void incrementTail() { + tail++; + if (tail >= array.length) + tail=0; + } + + void putSlot(Slot s) { + array[head]=s; + incrementHead(); + if (oldestseqn==0) oldestseqn = s.getSequenceNumber(); if (head==tail) { - tail++; - if (tail >= array.length) - tail=0; - + incrementTail(); oldestseqn++; } } @@ -61,24 +68,18 @@ class SlotBuffer { Slot getSlot(long seqnum) { int diff=(int) (seqnum-oldestseqn); int index=diff + tail; - if (index > array.length) { + if (index >= array.length) { if (head >= tail) return null; index-= array.length; } - if (index >= array.length || - index >= head) + if (index >= array.length) return null; - if (index < 0) { - System.out.println("seqnum="+seqnum); - System.out.println("olestseqn="+oldestseqn); - System.out.println("diff="+diff); - System.out.println("tail="+tail); + if (head >= tail && index >= head) + return null; - } - return array[index]; } diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java index 72b4927..c92aef9 100644 --- a/src/java/iotcloud/Table.java +++ b/src/java/iotcloud/Table.java @@ -14,10 +14,10 @@ final public class Table { private Mac hmac; private long sequencenumber; private long localmachineid; - private TableStatus lastTableStatus; - static final int FREE_SLOTS = 10; - static final int FORCED_RESIZE_INCREMENT = 20; - + private TableStatus lastTableStatus; + static final int FREE_SLOTS = 10; + static final int FORCED_RESIZE_INCREMENT = 20; + public Table(String baseurl, String password, long _localmachineid) { localmachineid=_localmachineid; buffer = new SlotBuffer(); @@ -33,7 +33,7 @@ final public class Table { sequencenumber = 0; cloud=_cloud; } - + private void initCloud(String baseurl, String password) { try { SecretKeySpec secret=getKey(password); @@ -62,6 +62,7 @@ final public class Table { public void update() { Slot[] newslots=cloud.getSlots(sequencenumber+1); + validateandupdate(newslots, false); } @@ -77,96 +78,96 @@ final public class Table { Slot s=new Slot(1, localmachineid); TableStatus status=new TableStatus(s, numslots); s.addEntry(status); - Slot[] array=cloud.putSlot(s, numslots); - if (array == null) { - array = new Slot[] {s}; - validateandupdate(array, true); // update data structure + Slot[] array=cloud.putSlot(s, numslots); + if (array == null) { + array = new Slot[] {s}; + validateandupdate(array, true); // update data structure } else { throw new Error("Error on initialization"); } } - + public IoTString put(IoTString key, IoTString value) { - while(true) { - KeyValue oldvalue=table.get(key); - if (tryput(key, value, false)) { + while(true) { + KeyValue oldvalue=table.get(key); + if (tryput(key, value, false)) { if (oldvalue==null) return null; else return oldvalue.getValue(); - } - } - } + } + } + } - private boolean tryput(IoTString key, IoTString value, boolean forcedresize) { + private boolean tryput(IoTString key, IoTString value, boolean forcedresize) { Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); - long seqn = buffer.getOldestSeqNum(); + long seqn = buffer.getOldestSeqNum(); if (forcedresize) { TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots); s.addEntry(status); } - - if ((numslots - buffer.size()) < FREE_SLOTS) { - //have to check whether we have enough free slots - long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots; - seqn = fullfirstseqn < 1 ? 1: fullfirstseqn; - for(int i=0; i < FREE_SLOTS; i++, seqn++) { - Slot prevslot=buffer.getSlot(seqn); - if (!prevslot.isLive()) - continue; - Vector liveentries = prevslot.getLiveEntries(); - for(Entry liveentry:liveentries) { + + if ((numslots - buffer.size()) < FREE_SLOTS) { + //have to check whether we have enough free slots + long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots; + seqn = fullfirstseqn < 1?1:fullfirstseqn; + for(int i=0; i < FREE_SLOTS; i++, seqn++) { + Slot prevslot=buffer.getSlot(seqn); + if (!prevslot.isLive()) + continue; + Vector liveentries = prevslot.getLiveEntries(); + for(Entry liveentry:liveentries) { if (redundant(liveentry)) continue; - if (s.hasSpace(liveentry)) - s.addEntry(liveentry); - else if (i==0) { - if (s.canFit(liveentry)) - s.addEntry(liveentry); - else if (!forcedresize) { - return tryput(key, value, true); + if (s.hasSpace(liveentry)) + s.addEntry(liveentry); + else if (i==0) { + if (s.canFit(liveentry)) + s.addEntry(liveentry); + else if (!forcedresize) { + return tryput(key, value, true); } } - } - } - } - KeyValue kv=new KeyValue(s, key, value); - boolean insertedkv=false; - if (s.hasSpace(kv)) { - s.addEntry(kv); - insertedkv=true; - } - - long newestseqnum=buffer.getNewestSeqNum(); - search: - for(;seqn<=newestseqnum;seqn++) { - Slot prevslot=buffer.getSlot(seqn); - if (!prevslot.isLive()) - continue; - Vector liveentries = prevslot.getLiveEntries(); - for(Entry liveentry:liveentries) { + } + } + } + KeyValue kv=new KeyValue(s, key, value); + boolean insertedkv=false; + if (s.hasSpace(kv)) { + s.addEntry(kv); + insertedkv=true; + } + + long newestseqnum=buffer.getNewestSeqNum(); +search: + for(; seqn<=newestseqnum; seqn++) { + Slot prevslot=buffer.getSlot(seqn); + if (!prevslot.isLive()) + continue; + Vector liveentries = prevslot.getLiveEntries(); + for(Entry liveentry:liveentries) { if (redundant(liveentry)) continue; - if (s.hasSpace(liveentry)) - s.addEntry(liveentry); - else - break search; - } - } - - int max=0; - if (forcedresize) - max = numslots + FORCED_RESIZE_INCREMENT; - Slot[] array=cloud.putSlot(s, max); - if (array == null) - array = new Slot[] {s}; - else - insertedkv=false; - - validateandupdate(array, true); // update data structure - - return insertedkv; + if (s.hasSpace(liveentry)) + s.addEntry(liveentry); + else + break search; + } + } + + int max=0; + if (forcedresize) + max = numslots + FORCED_RESIZE_INCREMENT; + Slot[] array=cloud.putSlot(s, max); + if (array == null) + array = new Slot[] {s}; + else + insertedkv=false; + + validateandupdate(array, true); // update data structure + + return insertedkv; } boolean redundant(Entry liveentry) { @@ -177,7 +178,7 @@ final public class Table { return false; } - + private void validateandupdate(Slot[] newslots, boolean isput) { //The cloud communication layer has checked slot HMACs already //before decoding @@ -191,55 +192,55 @@ final public class Table { SlotIndexer indexer = new SlotIndexer(newslots, buffer); checkHMACChain(indexer, newslots); - initExpectedSize(); - for(Slot slot: newslots) { - updateExpectedSize(); + initExpectedSize(); + for(Slot slot: newslots) { + updateExpectedSize(); processSlot(indexer, slot, isput); } //If there is a gap, check to see if the server sent us everything if (firstseqnum != (sequencenumber+1)) checkNumSlots(newslots.length); - - commitNewMaxSize(); - //commit new to slots - for(Slot slot:newslots) { - buffer.putSlot(slot); - } + commitNewMaxSize(); + + //commit new to slots + for(Slot slot:newslots) { + buffer.putSlot(slot); + } sequencenumber = newslots[newslots.length - 1].getSequenceNumber(); } - private int expectedsize, currmaxsize; + private int expectedsize, currmaxsize; + + private void checkNumSlots(int numslots) { + if (numslots != expectedsize) + throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots); + } - private void checkNumSlots(int numslots) { - if (numslots != expectedsize) - throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots); - } - - private void initExpectedSize() { + private void initExpectedSize() { long prevslots = sequencenumber; - expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots; - currmaxsize = numslots; - } - - private void updateExpectedSize() { - expectedsize++; - if (expectedsize > currmaxsize) - expectedsize = currmaxsize; - } - - private void updateCurrMaxSize(int newmaxsize) { - currmaxsize=newmaxsize; - } - - private void commitNewMaxSize() { - if (numslots != currmaxsize) - buffer.resize(currmaxsize); - - numslots=currmaxsize; - } - + expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots; + currmaxsize = numslots; + } + + private void updateExpectedSize() { + expectedsize++; + if (expectedsize > currmaxsize) + expectedsize = currmaxsize; + } + + private void updateCurrMaxSize(int newmaxsize) { + currmaxsize=newmaxsize; + } + + private void commitNewMaxSize() { + if (numslots != currmaxsize) + buffer.resize(currmaxsize); + + numslots=currmaxsize; + } + private void processEntry(KeyValue entry, SlotIndexer indexer) { IoTString key=entry.getKey(); KeyValue oldvalue=table.get(key); @@ -270,11 +271,11 @@ final public class Table { } private void processEntry(TableStatus entry, SlotIndexer indexer) { - int newnumslots=entry.getMaxSlots(); - updateCurrMaxSize(newnumslots); - if (lastTableStatus != null) - lastTableStatus.setDead(); - lastTableStatus = entry; + int newnumslots=entry.getMaxSlots(); + updateCurrMaxSize(newnumslots); + if (lastTableStatus != null) + lastTableStatus.setDead(); + lastTableStatus = entry; } private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) { @@ -303,7 +304,7 @@ final public class Table { private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) { updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput); - + for(Entry entry : slot.getEntries()) { switch(entry.getType()) { case Entry.TypeKeyValue: diff --git a/src/java/iotcloud/TableStatus.java b/src/java/iotcloud/TableStatus.java index 07f282c..fb50a7d 100644 --- a/src/java/iotcloud/TableStatus.java +++ b/src/java/iotcloud/TableStatus.java @@ -12,7 +12,7 @@ class TableStatus extends Entry { int getMaxSlots() { return maxslots; } - + static Entry decode(Slot slot, ByteBuffer bb) { int maxslots=bb.getInt(); return new TableStatus(slot, maxslots); diff --git a/src/java/iotcloud/Test.java b/src/java/iotcloud/Test.java index 97057dd..101ca77 100644 --- a/src/java/iotcloud/Test.java +++ b/src/java/iotcloud/Test.java @@ -2,12 +2,38 @@ package iotcloud; public class Test { public static void main(String[] args) { + if (args[0].equals("1")) + test1(); + else if(args[0].equals("2")) + test2(); + + } + + static void test2() { + Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + t1.initTable(); + Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + t2.update(); + for(int i=0; i<600; i++) { + String a="STR"+i; + String b="ABR"+i; + IoTString ia=new IoTString(a); + IoTString ib=new IoTString(b); + t1.put(ia, ia); + t2.put(ib, ib); + t1.update(); + System.out.println(ib+"->"+t1.get(ib)); + System.out.println(ia+"->"+t2.get(ia)); + } + } + + static void test1() { TestCloudComm cc=new TestCloudComm(); Table t1=new Table(cc, 6513); t1.initTable(); Table t2=new Table(cc, 6512); t2.update(); - for(int i=0;i<100;i++) { + for(int i=0; i<600; i++) { String a="STR"+i; String b="ABR"+i; IoTString ia=new IoTString(a); @@ -18,5 +44,16 @@ public class Test { System.out.println(ib+"->"+t1.get(ib)); System.out.println(ia+"->"+t2.get(ia)); } + for(int i=0; i<600; i++) { + String a="STR"+i; + String b="ABR"+i; + IoTString ia=new IoTString(a); + IoTString ib=new IoTString(b); + System.out.println(ib+"->"+t1.get(ib)); + System.out.println(ia+"->"+t2.get(ia)); + System.out.println(ib+"->"+t2.get(ib)); + System.out.println(ia+"->"+t1.get(ia)); + } + } } diff --git a/src/java/iotcloud/TestCloudComm.java b/src/java/iotcloud/TestCloudComm.java index 6c18314..b668237 100644 --- a/src/java/iotcloud/TestCloudComm.java +++ b/src/java/iotcloud/TestCloudComm.java @@ -29,7 +29,7 @@ class TestCloudComm extends CloudComm { sequencenumber=oldestseqnum; int numslots=(int)((newestseqnum - sequencenumber)+1); Slot[] slots=new Slot[numslots]; - for(int i=0;i