private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+ private Vector<Long> rejectedmessagelist=new Vector<Long>();
private SlotBuffer buffer;
private CloudComm cloud;
private long sequencenumber;
private int chance;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
+ static final int REJECTED_THRESHOLD = 5;
private int resizethreshold;
private long lastliveslotseqn;
private Random random=new Random();
s.addEntry(status);
}
+ if (!rejectedmessagelist.isEmpty()) {
+ long old_seqn=rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn=rejectedmessagelist.lastElement();
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn=old_seqn;
+ for(int i=0; i<rejectedmessagelist.size();i++) {
+ long curr_seqn=rejectedmessagelist.get(i);
+ Slot s_msg=buffer.getSlot(curr_seqn);
+ if (s_msg!=null) {
+ long machineid=s_msg.getMachineID();
+ RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ if (old_seqn != -1 && old_seqn != curr_seqn) {
+ RejectedMessage rmprev=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rmprev);
+ }
+ old_seqn = -1;
+ } else {
+ prev_seqn=curr_seqn;
+ }
+ }
+ if (old_seqn != -1) {
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ }
+ }
+
long newestseqnum = buffer.getNewestSeqNum();
long oldestseqnum = buffer.getOldestSeqNum();
if (lastliveslotseqn < oldestseqnum)
if (resize)
max = newsize;
Slot[] array=cloud.putSlot(s, max);
- if (array == null)
+ if (array == null) {
array = new Slot[] {s};
- else
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
insertedkv=false;
-
+ }
+
/* update data structure */
validateandupdate(array, true);