From 6fa4bc657823e25ede66cbd443b81745838a91f2 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Sat, 23 Jul 2016 19:45:49 -0700 Subject: [PATCH] more changes --- src/java/iotcloud/Entry.java | 2 +- src/java/iotcloud/IoTString.java | 2 +- src/java/iotcloud/Pair.java | 10 +-- src/java/iotcloud/Slot.java | 13 +++- src/java/iotcloud/SlotBuffer.java | 2 +- src/java/iotcloud/Table.java | 110 ++++++++++++++++++++++------- src/java/iotcloud/TableStatus.java | 6 +- 7 files changed, 107 insertions(+), 38 deletions(-) diff --git a/src/java/iotcloud/Entry.java b/src/java/iotcloud/Entry.java index ea4e118..4ff9b06 100644 --- a/src/java/iotcloud/Entry.java +++ b/src/java/iotcloud/Entry.java @@ -37,7 +37,7 @@ abstract class Entry implements Liveness { return islive; } - void decrementLiveCount() { + void setDead() { islive = false; parentslot.decrementLiveCount(); } diff --git a/src/java/iotcloud/IoTString.java b/src/java/iotcloud/IoTString.java index 8051806..c0e4197 100644 --- a/src/java/iotcloud/IoTString.java +++ b/src/java/iotcloud/IoTString.java @@ -2,7 +2,7 @@ package iotcloud; import java.util.Arrays; -public class IoTString { +final public class IoTString { byte[] array; int hashcode; diff --git a/src/java/iotcloud/Pair.java b/src/java/iotcloud/Pair.java index 6f57a57..5cbe1db 100644 --- a/src/java/iotcloud/Pair.java +++ b/src/java/iotcloud/Pair.java @@ -1,19 +1,19 @@ package iotcloud; -public class Pair { +class Pair { private A a; private B b; - - public Pair(A a, B b) { + + Pair(A a, B b) { this.a=a; this.b=b; } - public A getFirst() { + A getFirst() { return a; } - public B getSecond() { + B getSecond() { return b; } diff --git a/src/java/iotcloud/Slot.java b/src/java/iotcloud/Slot.java index 707b528..ff0e901 100644 --- a/src/java/iotcloud/Slot.java +++ b/src/java/iotcloud/Slot.java @@ -14,7 +14,8 @@ class Slot implements Liveness { private long machineid; private Vector entries; private int livecount; - + private boolean seqnumlive; + Slot(long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) { seqnum=_seqnum; machineid=_machineid; @@ -22,10 +23,11 @@ class Slot implements Liveness { hmac=_hmac; entries=new Vector(); livecount=1; + seqnumlive=true; } - Slot(long _seqnum, byte[] _bytes) { - seqnum=_seqnum; + Slot(long _seqnum, long _machineid, byte[] _prevhmac) { + this(_seqnum, _machineid, _prevhmac, new byte[HMAC_SIZE]); } byte[] getHMAC() { @@ -100,6 +102,11 @@ class Slot implements Liveness { return null; } + void setDead() { + decrementLiveCount(); + seqnumlive=false; + } + void decrementLiveCount() { livecount--; } diff --git a/src/java/iotcloud/SlotBuffer.java b/src/java/iotcloud/SlotBuffer.java index b975850..8a8662b 100644 --- a/src/java/iotcloud/SlotBuffer.java +++ b/src/java/iotcloud/SlotBuffer.java @@ -67,6 +67,6 @@ class SlotBuffer { } long getNewestSeqNum() { - return oldestseqn + size(); + return oldestseqn + size() - 1; } } diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java index 00680ab..d4da5db 100644 --- a/src/java/iotcloud/Table.java +++ b/src/java/iotcloud/Table.java @@ -4,16 +4,18 @@ import java.util.Arrays; import javax.crypto.spec.*; import javax.crypto.*; -public class Table { - int numslots; - HashMap table=new HashMap(); - HashMap > lastmessagetable=new HashMap >(); - SlotBuffer buffer; - CloudComm cloud; +final public class Table { + private int numslots; + private HashMap table=new HashMap(); + private HashMap > lastmessagetable=new HashMap >(); + private SlotBuffer buffer; + private CloudComm cloud; private Mac hmac; - long sequencenumber; - long localmachineid; - + private long sequencenumber; + private long localmachineid; + private TableStatus lastTableStatus; + static final int FREE_SLOTS = 10; + public Table(String baseurl, String password, long _localmachineid) { localmachineid=_localmachineid; buffer = new SlotBuffer(); @@ -61,10 +63,24 @@ public class Table { } public IoTString put(IoTString key, IoTString value) { - return null; + Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + + if ((numslots - buffer.size()) < FREE_SLOTS) { + //have to check whether we have enough free slots + long seqn = buffer.getNewestSeqNum() + 1 - numslots; + for(int i=0; i < FREE_SLOTS; i++, seqn--) { + Slot prevslot=buffer.getSlot(seqn); + if (!prevslot.isLive()) + continue; + + } + } + + + return null; } - void validateandupdate(Slot[] newslots) { + private void validateandupdate(Slot[] newslots) { //The cloud communication layer has checked slot HMACs already //before decoding if (newslots.length==0) @@ -76,26 +92,64 @@ public class Table { SlotIndexer indexer = new SlotIndexer(newslots, buffer); checkHMACChain(indexer, newslots); - for(Slot slot: newslots) { + + initExpectedSize(); + for(Slot slot: newslots) { + updateExpectedSize(); processSlot(indexer, slot); } + checkNumSlots(newslots.length); + commitNewMaxSize(); + //commit new to slots + for(Slot slot:newslots) { + buffer.putSlot(slot); + } } - void processEntry(KeyValue entry, SlotIndexer indexer) { + private int expectedsize, currmaxsize; + + private void checkNumSlots(int numslots) { + if (numslots != expectedsize) + throw new Error("Server Error: Server did not send all slots"); + } + + private void initExpectedSize() { + expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots; + currmaxsize = numslots; + } + + private void updateExpectedSize() { + expectedsize++; + if (expectedsize > currmaxsize) + expectedsize = currmaxsize; + } + + private void updateCurrMaxSize(int newmaxsize) { + currmaxsize=newmaxsize; + } + + private void commitNewMaxSize() { + if (numslots != currmaxsize) + buffer.resize(currmaxsize); + + numslots=currmaxsize; + } + + private void processEntry(KeyValue entry, SlotIndexer indexer) { IoTString key=entry.getKey(); KeyValue oldvalue=table.get(key); if (oldvalue != null) { - oldvalue.decrementLiveCount(); + oldvalue.setDead(); } table.put(key, entry); } - void processEntry(LastMessage entry, SlotIndexer indexer) { + private void processEntry(LastMessage entry, SlotIndexer indexer) { updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry); } - void processEntry(RejectedMessage entry, SlotIndexer indexer) { + private void processEntry(RejectedMessage entry, SlotIndexer indexer) { long oldseqnum=entry.getOldSeqNum(); long newseqnum=entry.getNewSeqNum(); boolean isequal=entry.getEqual(); @@ -111,11 +165,15 @@ public class Table { } } - void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) { - + private void processEntry(TableStatus entry, SlotIndexer indexer) { + int newnumslots=entry.getMaxSlots(); + updateCurrMaxSize(newnumslots); + if (lastTableStatus != null) + lastTableStatus.setDead(); + lastTableStatus = entry; } - void updateLastMessage(long machineid, long seqnum, Liveness liveness) { + private void updateLastMessage(long machineid, long seqnum, Liveness liveness) { Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); if (lastmsgentry == null) return; @@ -123,9 +181,9 @@ public class Table { long lastmsgseqnum = lastmsgentry.getFirst(); Liveness lastentry = lastmsgentry.getSecond(); if (lastentry instanceof LastMessage) { - ((LastMessage)lastentry).decrementLiveCount(); + ((LastMessage)lastentry).setDead(); } else if (lastentry instanceof Slot) { - ((Slot)lastentry).decrementLiveCount(); + ((Slot)lastentry).setDead(); } else { throw new Error("Unrecognized type"); } @@ -135,13 +193,13 @@ public class Table { throw new Error("Server Error: Mismatch on local machine sequence number"); } else { if (lastmsgseqnum > seqnum) - throw new Error("Server Error: Rolback on remote machine sequence number"); + throw new Error("Server Error: Rollback on remote machine sequence number"); } } - void processSlot(SlotIndexer indexer, Slot slot) { + private void processSlot(SlotIndexer indexer, Slot slot) { updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot); - + for(Entry entry : slot.getEntries()) { switch(entry.getType()) { case Entry.TypeKeyValue: @@ -157,7 +215,7 @@ public class Table { break; case Entry.TypeTableStatus: - processEntry((TableStatus)entry, indexer, slot); + processEntry((TableStatus)entry, indexer); break; default: @@ -166,7 +224,7 @@ public class Table { } } - void checkHMACChain(SlotIndexer indexer, Slot[] newslots) { + private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) { for(int i=0; i < newslots.length; i++) { Slot currslot=newslots[i]; Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1); diff --git a/src/java/iotcloud/TableStatus.java b/src/java/iotcloud/TableStatus.java index 83aa9cc..07f282c 100644 --- a/src/java/iotcloud/TableStatus.java +++ b/src/java/iotcloud/TableStatus.java @@ -2,13 +2,17 @@ package iotcloud; import java.nio.ByteBuffer; class TableStatus extends Entry { - int maxslots; + private int maxslots; TableStatus(Slot slot, int _maxslots) { super(slot); maxslots=_maxslots; } + int getMaxSlots() { + return maxslots; + } + static Entry decode(Slot slot, ByteBuffer bb) { int maxslots=bb.getInt(); return new TableStatus(slot, maxslots); -- 2.34.1