package iotcloud;
import java.nio.ByteBuffer;
+import java.util.HashSet;
/**
* Entry for tracking messages that the server rejected. We have to
/* Is the machine identifier of the relevant slots equal to (or not
* equal to) the specified machine identifier. */
private boolean equalto;
+ /* Set of machines that have not received notification. */
+ private HashSet<Long> watchset;
RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
super(slot);
return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1);
}
+ void setWatchSet(HashSet<Long> _watchset) {
+ watchset=_watchset;
+ }
+
+ void removeWatcher(long machineid) {
+ if (watchset.remove(machineid))
+ if (watchset.isEmpty())
+ setDead();
+ }
+
void encode(ByteBuffer bb) {
bb.put(Entry.TypeRejectedMessage);
bb.putLong(machineid);
package iotcloud;
import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Vector;
private int numslots;
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
private SlotBuffer buffer;
private CloudComm cloud;
private long sequencenumber;
checkHMACChain(indexer, newslots);
HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
-
+
initExpectedSize();
for(Slot slot: newslots) {
updateExpectedSize();
}
}
}
+
+ HashSet<Long> watchset=new HashSet<Long>();
+ for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+ long entry_mid=lastmsg_entry.getKey();
+ /* We've seen it, don't need to continue to watch. Our next
+ * message will implicitly acknowledge it. */
+ if (entry_mid == localmachineid)
+ continue;
+ Pair<Long, Liveness> v=lastmsg_entry.getValue();
+ long entry_seqn=v.getFirst();
+ if (entry_seqn < newseqnum) {
+ addWatchList(entry_mid, entry);
+ watchset.add(entry_mid);
+ }
+ }
+ if (watchset.isEmpty())
+ entry.setDead();
+ else
+ entry.setWatchSet(watchset);
+ }
+
+ private void addWatchList(long machineid, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries=watchlist.get(machineid);
+ if (entries == null)
+ watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+ entries.add(entry);
}
private void processEntry(TableStatus entry, SlotIndexer indexer) {
private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
machineSet.remove(machineid);
+
+ HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+ if (watchset != null) {
+ for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+ RejectedMessage rm=rmit.next();
+ if (rm.getNewSeqNum() <= seqnum) {
+ /* Remove it from our watchlist */
+ rmit.remove();
+ /* Decrement machines that need to see this notification */
+ rm.removeWatcher(machineid);
+ }
+ }
+ }
+
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;