package iotcloud;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Arrays;
import java.util.Vector;
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
+ HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+
initExpectedSize();
for(Slot slot: newslots) {
updateExpectedSize();
- processSlot(indexer, slot, acceptupdatestolocal);
+ processSlot(indexer, slot, acceptupdatestolocal, machineSet);
}
/* If there is a gap, check to see if the server sent us everything. */
- if (firstseqnum != (sequencenumber+1))
+ if (firstseqnum != (sequencenumber+1)) {
checkNumSlots(newslots.length);
+ if (!machineSet.isEmpty())
+ throw new Error("Missing record for machines: "+machineSet);
+ }
commitNewMaxSize();
table.put(key, entry);
}
- private void processEntry(LastMessage entry, SlotIndexer indexer) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
+ private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
lastTableStatus = entry;
}
- private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ machineSet.remove(machineid);
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
}
}
- private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
break;
case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer);
+ processEntry((LastMessage)entry, indexer, machineSet);
break;
case Entry.TypeRejectedMessage: