public class Table {
int numslots;
HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
- HashMap<Long, Long> lastmessage=new HashMap<Long, Long>();
+ HashMap<Long, Pair<Long, Liveness>> lastmessagetable=new HashMap<Long, Pair<Long, Liveness>>();
SlotBuffer buffer;
CloudComm cloud;
private Mac hmac;
long sequencenumber;
- long machineid;
+ long localmachineid;
- public Table(String baseurl, String password, long _machineid) {
- machineid=_machineid;
+ public Table(String baseurl, String password, long _localmachineid) {
+ localmachineid=_localmachineid;
buffer = new SlotBuffer();
sequencenumber = 1;
initCloud(baseurl, password);
validateandupdate(newslots);
}
+ public IoTString get(IoTString key) {
+ KeyValue kv=table.get(key);
+ if (kv != null)
+ return kv.getValue();
+ else
+ return null;
+ }
+
+ public IoTString put(IoTString key, IoTString value) {
+ return null;
+ }
+
void validateandupdate(Slot[] newslots) {
//The cloud communication layer has checked slot HMACs already
//before decoding
}
- void processEntry(KeyValue entry, SlotIndexer indexer, Slot slot) {
+ void processEntry(KeyValue entry, SlotIndexer indexer) {
IoTString key=entry.getKey();
KeyValue oldvalue=table.get(key);
if (oldvalue != null) {
- oldvalue.setDead();
+ oldvalue.decrementLiveCount();
}
table.put(key, entry);
}
- void processEntry(LastMessage entry, SlotIndexer indexer, Slot slot) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), null, entry);
+ void processEntry(LastMessage entry, SlotIndexer indexer) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
}
- void processEntry(RejectedMessage entry, SlotIndexer indexer, Slot slot) {
-
+ void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+ long oldseqnum=entry.getOldSeqNum();
+ long newseqnum=entry.getNewSeqNum();
+ boolean isequal=entry.getEqual();
+ long machineid=entry.getMachineID();
+ for(long seqnum=oldseqnum;seqnum<=newseqnum;seqnum++) {
+ Slot slot=indexer.getSlot(seqnum);
+ if (slot != null) {
+ long slotmachineid=slot.getMachineID();
+ if (isequal!=(slotmachineid==machineid)) {
+ throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
+ }
+ }
+ }
}
-
+
void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
-
+
}
- void updateLastMessage(long machineid, long seqnum, Slot slot, LastMessage entry) {
+ void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+ Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+ if (lastmsgentry == null)
+ return;
+
+ long lastmsgseqnum = lastmsgentry.getFirst();
+ Liveness lastentry = lastmsgentry.getSecond();
+ if (lastentry instanceof LastMessage) {
+ ((LastMessage)lastentry).decrementLiveCount();
+ } else if (lastentry instanceof Slot) {
+ ((Slot)lastentry).decrementLiveCount();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ //Check that nothing funny happened
+ if (machineid == localmachineid) {
+ if (lastmsgseqnum != seqnum)
+ throw new Error("Server Error: Mismatch on local machine sequence number");
+ } else {
+ if (lastmsgseqnum > seqnum)
+ throw new Error("Server Error: Rolback on remote machine sequence number");
+ }
}
void processSlot(SlotIndexer indexer, Slot slot) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, null);
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
case Entry.TypeKeyValue:
- processEntry((KeyValue)entry, indexer, slot);
+ processEntry((KeyValue)entry, indexer);
break;
case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer, slot);
+ processEntry((LastMessage)entry, indexer);
break;
case Entry.TypeRejectedMessage:
- processEntry((RejectedMessage)entry, indexer, slot);
+ processEntry((RejectedMessage)entry, indexer);
break;
case Entry.TypeTableStatus:
processEntry((TableStatus)entry, indexer, slot);