private Mac hmac;
private long sequencenumber;
private long localmachineid;
- private TableStatus lastTableStatus;
- static final int FREE_SLOTS = 10;
- static final int FORCED_RESIZE_INCREMENT = 20;
-
+ private TableStatus lastTableStatus;
+ static final int FREE_SLOTS = 10;
+ static final int FORCED_RESIZE_INCREMENT = 20;
+
public Table(String baseurl, String password, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
sequencenumber = 0;
cloud=_cloud;
}
-
+
private void initCloud(String baseurl, String password) {
try {
SecretKeySpec secret=getKey(password);
public void update() {
Slot[] newslots=cloud.getSlots(sequencenumber+1);
+
validateandupdate(newslots, false);
}
Slot s=new Slot(1, localmachineid);
TableStatus status=new TableStatus(s, numslots);
s.addEntry(status);
- Slot[] array=cloud.putSlot(s, numslots);
- if (array == null) {
- array = new Slot[] {s};
- validateandupdate(array, true); // update data structure
+ Slot[] array=cloud.putSlot(s, numslots);
+ if (array == null) {
+ array = new Slot[] {s};
+ validateandupdate(array, true); // update data structure
} else {
throw new Error("Error on initialization");
}
}
-
+
public IoTString put(IoTString key, IoTString value) {
- while(true) {
- KeyValue oldvalue=table.get(key);
- if (tryput(key, value, false)) {
+ while(true) {
+ KeyValue oldvalue=table.get(key);
+ if (tryput(key, value, false)) {
if (oldvalue==null)
return null;
else
return oldvalue.getValue();
- }
- }
- }
+ }
+ }
+ }
- private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
+ private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- long seqn = buffer.getOldestSeqNum();
+ long seqn = buffer.getOldestSeqNum();
if (forcedresize) {
TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
s.addEntry(status);
}
-
- if ((numslots - buffer.size()) < FREE_SLOTS) {
- //have to check whether we have enough free slots
- long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
- seqn = fullfirstseqn < 1 ? 1: fullfirstseqn;
- for(int i=0; i < FREE_SLOTS; i++, seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- if (!prevslot.isLive())
- continue;
- Vector<Entry> liveentries = prevslot.getLiveEntries();
- for(Entry liveentry:liveentries) {
+
+ if ((numslots - buffer.size()) < FREE_SLOTS) {
+ //have to check whether we have enough free slots
+ long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
+ seqn = fullfirstseqn < 1?1:fullfirstseqn;
+ for(int i=0; i < FREE_SLOTS; i++, seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ if (!prevslot.isLive())
+ continue;
+ Vector<Entry> liveentries = prevslot.getLiveEntries();
+ for(Entry liveentry:liveentries) {
if (redundant(liveentry))
continue;
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else if (i==0) {
- if (s.canFit(liveentry))
- s.addEntry(liveentry);
- else if (!forcedresize) {
- return tryput(key, value, true);
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else if (i==0) {
+ if (s.canFit(liveentry))
+ s.addEntry(liveentry);
+ else if (!forcedresize) {
+ return tryput(key, value, true);
}
}
- }
- }
- }
- KeyValue kv=new KeyValue(s, key, value);
- boolean insertedkv=false;
- if (s.hasSpace(kv)) {
- s.addEntry(kv);
- insertedkv=true;
- }
-
- long newestseqnum=buffer.getNewestSeqNum();
- search:
- for(;seqn<=newestseqnum;seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- if (!prevslot.isLive())
- continue;
- Vector<Entry> liveentries = prevslot.getLiveEntries();
- for(Entry liveentry:liveentries) {
+ }
+ }
+ }
+ KeyValue kv=new KeyValue(s, key, value);
+ boolean insertedkv=false;
+ if (s.hasSpace(kv)) {
+ s.addEntry(kv);
+ insertedkv=true;
+ }
+
+ long newestseqnum=buffer.getNewestSeqNum();
+search:
+ for(; seqn<=newestseqnum; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ if (!prevslot.isLive())
+ continue;
+ Vector<Entry> liveentries = prevslot.getLiveEntries();
+ for(Entry liveentry:liveentries) {
if (redundant(liveentry))
continue;
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else
- break search;
- }
- }
-
- int max=0;
- if (forcedresize)
- max = numslots + FORCED_RESIZE_INCREMENT;
- Slot[] array=cloud.putSlot(s, max);
- if (array == null)
- array = new Slot[] {s};
- else
- insertedkv=false;
-
- validateandupdate(array, true); // update data structure
-
- return insertedkv;
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else
+ break search;
+ }
+ }
+
+ int max=0;
+ if (forcedresize)
+ max = numslots + FORCED_RESIZE_INCREMENT;
+ Slot[] array=cloud.putSlot(s, max);
+ if (array == null)
+ array = new Slot[] {s};
+ else
+ insertedkv=false;
+
+ validateandupdate(array, true); // update data structure
+
+ return insertedkv;
}
boolean redundant(Entry liveentry) {
return false;
}
-
+
private void validateandupdate(Slot[] newslots, boolean isput) {
//The cloud communication layer has checked slot HMACs already
//before decoding
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
- initExpectedSize();
- for(Slot slot: newslots) {
- updateExpectedSize();
+ initExpectedSize();
+ for(Slot slot: newslots) {
+ updateExpectedSize();
processSlot(indexer, slot, isput);
}
//If there is a gap, check to see if the server sent us everything
if (firstseqnum != (sequencenumber+1))
checkNumSlots(newslots.length);
-
- commitNewMaxSize();
- //commit new to slots
- for(Slot slot:newslots) {
- buffer.putSlot(slot);
- }
+ commitNewMaxSize();
+
+ //commit new to slots
+ for(Slot slot:newslots) {
+ buffer.putSlot(slot);
+ }
sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
}
- private int expectedsize, currmaxsize;
+ private int expectedsize, currmaxsize;
+
+ private void checkNumSlots(int numslots) {
+ if (numslots != expectedsize)
+ throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
+ }
- private void checkNumSlots(int numslots) {
- if (numslots != expectedsize)
- throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
- }
-
- private void initExpectedSize() {
+ private void initExpectedSize() {
long prevslots = sequencenumber;
- expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
- currmaxsize = numslots;
- }
-
- private void updateExpectedSize() {
- expectedsize++;
- if (expectedsize > currmaxsize)
- expectedsize = currmaxsize;
- }
-
- private void updateCurrMaxSize(int newmaxsize) {
- currmaxsize=newmaxsize;
- }
-
- private void commitNewMaxSize() {
- if (numslots != currmaxsize)
- buffer.resize(currmaxsize);
-
- numslots=currmaxsize;
- }
-
+ expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+ currmaxsize = numslots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currmaxsize)
+ expectedsize = currmaxsize;
+ }
+
+ private void updateCurrMaxSize(int newmaxsize) {
+ currmaxsize=newmaxsize;
+ }
+
+ private void commitNewMaxSize() {
+ if (numslots != currmaxsize)
+ buffer.resize(currmaxsize);
+
+ numslots=currmaxsize;
+ }
+
private void processEntry(KeyValue entry, SlotIndexer indexer) {
IoTString key=entry.getKey();
KeyValue oldvalue=table.get(key);
}
private void processEntry(TableStatus entry, SlotIndexer indexer) {
- int newnumslots=entry.getMaxSlots();
- updateCurrMaxSize(newnumslots);
- if (lastTableStatus != null)
- lastTableStatus.setDead();
- lastTableStatus = entry;
+ int newnumslots=entry.getMaxSlots();
+ updateCurrMaxSize(newnumslots);
+ if (lastTableStatus != null)
+ lastTableStatus.setDead();
+ lastTableStatus = entry;
}
private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
-
+
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
case Entry.TypeKeyValue: