localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
- sequencenumber = 1;
+ sequencenumber = 0;
initCloud(baseurl, password);
}
public Table(CloudComm _cloud, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
- sequencenumber = 1;
+ numslots = buffer.capacity();
+ sequencenumber = 0;
cloud=_cloud;
}
}
public void update() {
- Slot[] newslots=cloud.getSlots(sequencenumber);
- validateandupdate(newslots);
+ Slot[] newslots=cloud.getSlots(sequencenumber+1);
+ validateandupdate(newslots, false);
}
public IoTString get(IoTString key) {
Slot[] array=cloud.putSlot(s, numslots);
if (array == null) {
array = new Slot[] {s};
- validateandupdate(array); // update data structure
+ validateandupdate(array, true); // update data structure
} else {
throw new Error("Error on initialization");
}
while(true) {
KeyValue oldvalue=table.get(key);
if (tryput(key, value, false)) {
- return oldvalue.getValue();
+ if (oldvalue==null)
+ return null;
+ else
+ return oldvalue.getValue();
}
}
}
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
+ if (redundant(liveentry))
+ continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else if (i==0) {
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
+ if (redundant(liveentry))
+ continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else
else
insertedkv=false;
- validateandupdate(array); // update data structure
+ validateandupdate(array, true); // update data structure
return insertedkv;
}
- private void validateandupdate(Slot[] newslots) {
+ boolean redundant(Entry liveentry) {
+ if (liveentry.getType()==Entry.TypeLastMessage) {
+ LastMessage lastmsg=(LastMessage) liveentry;
+ return lastmsg.getMachineID() == localmachineid;
+ }
+ return false;
+ }
+
+
+ private void validateandupdate(Slot[] newslots, boolean isput) {
//The cloud communication layer has checked slot HMACs already
//before decoding
if (newslots.length==0)
return;
long firstseqnum=newslots[0].getSequenceNumber();
- if (firstseqnum < sequencenumber)
+ if (firstseqnum <= sequencenumber)
throw new Error("Server Error: Sent older slots!");
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
initExpectedSize();
for(Slot slot: newslots) {
updateExpectedSize();
- processSlot(indexer, slot);
+ processSlot(indexer, slot, isput);
}
- checkNumSlots(newslots.length);
+
+ //If there is a gap, check to see if the server sent us everything
+ if (firstseqnum != (sequencenumber+1))
+ checkNumSlots(newslots.length);
+
commitNewMaxSize();
//commit new to slots
for(Slot slot:newslots) {
buffer.putSlot(slot);
}
+ sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
}
private int expectedsize, currmaxsize;
private void checkNumSlots(int numslots) {
if (numslots != expectedsize)
- throw new Error("Server Error: Server did not send all slots");
+ throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
}
private void initExpectedSize() {
- expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
+ long prevslots = sequencenumber;
+ expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
currmaxsize = numslots;
}
}
private void processEntry(LastMessage entry, SlotIndexer indexer) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
}
private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
lastTableStatus = entry;
}
- private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
}
if (machineid == localmachineid) {
- if (lastmsgseqnum != seqnum)
+ if (lastmsgseqnum != seqnum && !isput)
throw new Error("Server Error: Mismatch on local machine sequence number");
} else {
if (lastmsgseqnum > seqnum)
}
}
- private void processSlot(SlotIndexer indexer, Slot slot) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {