From: bdemsky Date: Mon, 25 Jul 2016 01:11:20 +0000 (-0700) Subject: add check for missing messages X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=51da303cebaf643d2093be578239788207737634;p=iotcloud.git add check for missing messages --- diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java index 28434c5..05a85f8 100644 --- a/src/java/iotcloud/Table.java +++ b/src/java/iotcloud/Table.java @@ -1,5 +1,6 @@ package iotcloud; import java.util.HashMap; +import java.util.HashSet; import java.util.Arrays; import java.util.Vector; @@ -177,15 +178,20 @@ search: SlotIndexer indexer = new SlotIndexer(newslots, buffer); checkHMACChain(indexer, newslots); + HashSet machineSet=new HashSet(lastmessagetable.keySet()); + initExpectedSize(); for(Slot slot: newslots) { updateExpectedSize(); - processSlot(indexer, slot, acceptupdatestolocal); + processSlot(indexer, slot, acceptupdatestolocal, machineSet); } /* If there is a gap, check to see if the server sent us everything. */ - if (firstseqnum != (sequencenumber+1)) + if (firstseqnum != (sequencenumber+1)) { checkNumSlots(newslots.length); + if (!machineSet.isEmpty()) + throw new Error("Missing record for machines: "+machineSet); + } commitNewMaxSize(); @@ -235,8 +241,8 @@ search: table.put(key, entry); } - private void processEntry(LastMessage entry, SlotIndexer indexer) { - updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false); + private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet machineSet) { + updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } private void processEntry(RejectedMessage entry, SlotIndexer indexer) { @@ -263,7 +269,8 @@ search: lastTableStatus = entry; } - private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) { + private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet machineSet) { + machineSet.remove(machineid); Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); if (lastmsgentry == null) return; @@ -287,8 +294,8 @@ search: } } - private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) { - updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal); + private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet machineSet) { + updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet); for(Entry entry : slot.getEntries()) { switch(entry.getType()) { @@ -297,7 +304,7 @@ search: break; case Entry.TypeLastMessage: - processEntry((LastMessage)entry, indexer); + processEntry((LastMessage)entry, indexer, machineSet); break; case Entry.TypeRejectedMessage: diff --git a/src/java/iotcloud/issues.txt b/src/java/iotcloud/issues.txt index da58ceb..dfd7101 100644 --- a/src/java/iotcloud/issues.txt +++ b/src/java/iotcloud/issues.txt @@ -1,4 +1,3 @@ 1) check expiration of rejectedmessage entries -2) check missing machine messages -3) add crypto -4) handle Salt +2) add crypto +3) handle Salt