forget rejected message list
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9
10 /**
11  * IoTTable data structure.  Provides client inferface.
12  * @author Brian Demsky
13  * @version 1.0
14  */
15
16 final public class Table {
17         private int numslots;
18         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21         private Vector<Long> rejectedmessagelist=new Vector<Long>();
22         private SlotBuffer buffer;
23         private CloudComm cloud;
24         private long sequencenumber;
25         private long localmachineid;
26         private TableStatus lastTableStatus;
27         static final int FREE_SLOTS = 10;
28         static final int SKIP_THRESHOLD = 10;
29         private long liveslotcount=0;
30         private int chance;
31         static final double RESIZE_MULTIPLE = 1.2;
32         static final double RESIZE_THRESHOLD = 0.75;
33         static final int REJECTED_THRESHOLD = 5;
34         private int resizethreshold;
35         private long lastliveslotseqn;
36         private Random random=new Random();
37         
38         public Table(String baseurl, String password, long _localmachineid) {
39                 localmachineid=_localmachineid;
40                 buffer = new SlotBuffer();
41                 numslots = buffer.capacity();
42                 setResizeThreshold();
43                 sequencenumber = 0;
44                 cloud=new CloudComm(this, baseurl, password);
45                 lastliveslotseqn = 1;
46         }
47
48         public Table(CloudComm _cloud, long _localmachineid) {
49                 localmachineid=_localmachineid;
50                 buffer = new SlotBuffer();
51                 numslots = buffer.capacity();
52                 setResizeThreshold();
53                 sequencenumber = 0;
54                 cloud=_cloud;
55         }
56
57         public void rebuild() {
58                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
59                 validateandupdate(newslots, true);
60         }
61
62         public void update() {
63                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
64
65                 validateandupdate(newslots, false);
66         }
67
68         public IoTString get(IoTString key) {
69                 KeyValue kv=table.get(key);
70                 if (kv != null)
71                         return kv.getValue();
72                 else
73                         return null;
74         }
75
76         public void initTable() {
77                 cloud.setSalt();//Set the salt
78                 Slot s=new Slot(this, 1, localmachineid);
79                 TableStatus status=new TableStatus(s, numslots);
80                 s.addEntry(status);
81                 Slot[] array=cloud.putSlot(s, numslots);
82                 if (array == null) {
83                         array = new Slot[] {s};
84                         /* update data structure */
85                         validateandupdate(array, true);
86                 } else {
87                         throw new Error("Error on initialization");
88                 }
89         }
90
91         public String toString() {
92                 return table.toString();
93         }
94         
95         public IoTString put(IoTString key, IoTString value) {
96                 while(true) {
97                         KeyValue oldvalue=table.get(key);
98                         if (tryput(key, value, false)) {
99                                 if (oldvalue==null)
100                                         return null;
101                                 else
102                                         return oldvalue.getValue();
103                         }
104                 }
105         }
106
107         void decrementLiveCount() {
108                 liveslotcount--;
109         }
110         
111         private void setResizeThreshold() {
112                 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
113                 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
114         }
115         
116         private boolean tryput(IoTString key, IoTString value, boolean resize) {
117                 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
118                 int newsize = 0;
119                 if (liveslotcount > resizethreshold) {
120                         resize=true;
121                         newsize = (int) (numslots * RESIZE_MULTIPLE);
122                 }
123                 
124                 if (resize) {
125                         newsize = (int) (numslots * RESIZE_MULTIPLE);
126                         TableStatus status=new TableStatus(s, newsize);
127                         s.addEntry(status);
128                 }
129
130                 if (!rejectedmessagelist.isEmpty()) {
131                         long old_seqn=rejectedmessagelist.firstElement();
132                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
133                                 long new_seqn=rejectedmessagelist.lastElement();
134                                 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
135                                 s.addEntry(rm);
136                         } else {
137                                 long prev_seqn=old_seqn;
138                                 for(int i=0; i<rejectedmessagelist.size();i++) {
139                                         long curr_seqn=rejectedmessagelist.get(i);
140                                         Slot s_msg=buffer.getSlot(curr_seqn);
141                                         if (s_msg!=null) {
142                                                 long machineid=s_msg.getMachineID();
143                                                 RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
144                                                 s.addEntry(rm);
145                                                 if (old_seqn != -1 && old_seqn != curr_seqn) {
146                                                         RejectedMessage rmprev=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
147                                                         s.addEntry(rmprev);
148                                                 }
149                                                 old_seqn = -1;
150                                         } else {
151                                                 prev_seqn=curr_seqn;
152                                         }
153                                 }
154                                 if (old_seqn != -1) {
155                                         RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
156                                         s.addEntry(rm);
157                                 }
158                         }
159                 }
160                 
161                 long newestseqnum = buffer.getNewestSeqNum();
162                 long oldestseqnum = buffer.getOldestSeqNum();
163                 if (lastliveslotseqn < oldestseqnum)
164                         lastliveslotseqn = oldestseqnum;
165
166                 long seqn = lastliveslotseqn;
167                 boolean seenliveslot = false;
168                 long firstiffull = newestseqnum + 1 - numslots;
169                 long threshold = firstiffull + FREE_SLOTS;
170                 
171                 for(; seqn < threshold; seqn++) {
172                         Slot prevslot=buffer.getSlot(seqn);
173                         //Push slot number forward
174                         if (!seenliveslot)
175                                 lastliveslotseqn = seqn;
176
177                         if (!prevslot.isLive())
178                                 continue;
179                         seenliveslot = true;
180                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
181                         for(Entry liveentry:liveentries) {
182                                 if (s.hasSpace(liveentry)) {
183                                         s.addEntry(liveentry);
184                                 } else if (seqn==firstiffull) {
185                                         if (!resize) {
186                                                 System.out.print("B");
187                                                 return tryput(key, value, true);
188                                         }
189                                 }
190                         }
191                 }
192
193                 KeyValue kv=new KeyValue(s, key, value);
194                 boolean insertedkv=false;
195                 if (s.hasSpace(kv)) {
196                         s.addEntry(kv);
197                         insertedkv=true;
198                 }
199
200                 int skipcount=0;
201                 search:
202                 for(; seqn <= newestseqnum; seqn++) {
203                         Slot prevslot=buffer.getSlot(seqn);
204                         //Push slot number forward
205                         if (!seenliveslot)
206                                 lastliveslotseqn = seqn;
207                         
208                         if (!prevslot.isLive())
209                                 continue;
210                         seenliveslot = true;
211                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
212                         for(Entry liveentry:liveentries) {
213                                 if (s.hasSpace(liveentry))
214                                         s.addEntry(liveentry);
215                                 else {
216                                         skipcount++;
217                                         if (skipcount > SKIP_THRESHOLD)
218                                                 break search;
219                                 }
220                         }
221                 }
222
223                 int max=0;
224                 if (resize)
225                         max = newsize;
226                 Slot[] array=cloud.putSlot(s, max);
227                 if (array == null) {
228                         array = new Slot[] {s};
229                         rejectedmessagelist.clear();
230                 }       else {
231                         if (array.length == 0)
232                                 throw new Error("Server Error: Did not send any slots");
233                         rejectedmessagelist.add(s.getSequenceNumber());
234                         insertedkv=false;
235                 }
236                 
237                 /* update data structure */
238                 validateandupdate(array, true);
239
240                 return insertedkv;
241         }
242
243         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
244                 /* The cloud communication layer has checked slot HMACs already
245                          before decoding */
246                 if (newslots.length==0)
247                         return;
248
249                 long firstseqnum=newslots[0].getSequenceNumber();
250                 if (firstseqnum <= sequencenumber)
251                         throw new Error("Server Error: Sent older slots!");
252
253                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
254                 checkHMACChain(indexer, newslots);
255
256                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
257
258                 initExpectedSize(firstseqnum);
259                 for(Slot slot: newslots) {
260                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
261                         updateExpectedSize();
262                 }
263
264                 /* If there is a gap, check to see if the server sent us everything. */
265                 if (firstseqnum != (sequencenumber+1)) {
266                         checkNumSlots(newslots.length);
267                         if (!machineSet.isEmpty())
268                                 throw new Error("Missing record for machines: "+machineSet);
269                 }
270
271                 commitNewMaxSize();
272
273                 /* Commit new to slots. */
274                 for(Slot slot:newslots) {
275                         buffer.putSlot(slot);
276                         liveslotcount++;
277                 }
278                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
279         }
280
281         private int expectedsize, currmaxsize;
282
283         private void checkNumSlots(int numslots) {
284                 if (numslots != expectedsize)
285                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
286         }
287
288         private void initExpectedSize(long firstsequencenumber) {
289                 long prevslots = firstsequencenumber;
290                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
291                 currmaxsize = numslots;
292         }
293
294         private void updateExpectedSize() {
295                 expectedsize++;
296                 if (expectedsize > currmaxsize)
297                         expectedsize = currmaxsize;
298         }
299
300         private void updateCurrMaxSize(int newmaxsize) {
301                 currmaxsize=newmaxsize;
302         }
303
304         private void commitNewMaxSize() {
305                 if (numslots != currmaxsize)
306                         buffer.resize(currmaxsize);
307
308                 numslots=currmaxsize;
309                 setResizeThreshold();
310         }
311
312         private void processEntry(KeyValue entry, SlotIndexer indexer) {
313                 IoTString key=entry.getKey();
314                 KeyValue oldvalue=table.get(key);
315                 if (oldvalue != null) {
316                         oldvalue.setDead();
317                 }
318                 table.put(key, entry);
319         }
320
321         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
322                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
323         }
324
325         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
326                 long oldseqnum=entry.getOldSeqNum();
327                 long newseqnum=entry.getNewSeqNum();
328                 boolean isequal=entry.getEqual();
329                 long machineid=entry.getMachineID();
330                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
331                         Slot slot=indexer.getSlot(seqnum);
332                         if (slot != null) {
333                                 long slotmachineid=slot.getMachineID();
334                                 if (isequal!=(slotmachineid==machineid)) {
335                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
336                                 }
337                         }
338                 }
339
340                 HashSet<Long> watchset=new HashSet<Long>();
341                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
342                         long entry_mid=lastmsg_entry.getKey();
343                         /* We've seen it, don't need to continue to watch.  Our next
344                          * message will implicitly acknowledge it. */
345                         if (entry_mid == localmachineid)
346                                 continue;
347                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
348                         long entry_seqn=v.getFirst();
349                         if (entry_seqn < newseqnum) {
350                                 addWatchList(entry_mid, entry);
351                                 watchset.add(entry_mid);
352                         }
353                 }
354                 if (watchset.isEmpty())
355                         entry.setDead();
356                 else
357                         entry.setWatchSet(watchset);
358         }
359
360         private void addWatchList(long machineid, RejectedMessage entry) {
361                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
362                 if (entries == null)
363                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
364                 entries.add(entry);
365         }
366
367         private void processEntry(TableStatus entry, SlotIndexer indexer) {
368                 int newnumslots=entry.getMaxSlots();
369                 updateCurrMaxSize(newnumslots);
370                 if (lastTableStatus != null)
371                         lastTableStatus.setDead();
372                 lastTableStatus = entry;
373         }
374
375         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
376                 machineSet.remove(machineid);
377
378                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
379                 if (watchset != null) {
380                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
381                                 RejectedMessage rm=rmit.next();
382                                 if (rm.getNewSeqNum() <= seqnum) {
383                                         /* Remove it from our watchlist */
384                                         rmit.remove();
385                                         /* Decrement machines that need to see this notification */
386                                         rm.removeWatcher(machineid);
387                                 }
388                         }
389                 }
390                 
391                 if (machineid == localmachineid) {
392                         /* Our own messages are immediately dead. */
393                         if (liveness instanceof LastMessage) {
394                                 ((LastMessage)liveness).setDead();
395                         } else if (liveness instanceof Slot) {
396                                 ((Slot)liveness).setDead();
397                         } else {
398                                 throw new Error("Unrecognized type");
399                         }
400                 }
401                 
402                 
403                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
404                 if (lastmsgentry == null)
405                         return;
406
407                 long lastmsgseqnum = lastmsgentry.getFirst();
408                 Liveness lastentry = lastmsgentry.getSecond();
409                 if (machineid != localmachineid) {
410                         if (lastentry instanceof LastMessage) {
411                                 ((LastMessage)lastentry).setDead();
412                         } else if (lastentry instanceof Slot) {
413                                 ((Slot)lastentry).setDead();
414                         } else {
415                                 throw new Error("Unrecognized type");
416                         }
417                 }
418                 
419                 if (machineid == localmachineid) {
420                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
421                                 throw new Error("Server Error: Mismatch on local machine sequence number");
422                 } else {
423                         if (lastmsgseqnum > seqnum)
424                                 throw new Error("Server Error: Rollback on remote machine sequence number");
425                 }
426         }
427
428         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
429                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
430                 for(Entry entry : slot.getEntries()) {
431                         switch(entry.getType()) {
432                         case Entry.TypeKeyValue:
433                                 processEntry((KeyValue)entry, indexer);
434                                 break;
435
436                         case Entry.TypeLastMessage:
437                                 processEntry((LastMessage)entry, indexer, machineSet);
438                                 break;
439
440                         case Entry.TypeRejectedMessage:
441                                 processEntry((RejectedMessage)entry, indexer);
442                                 break;
443
444                         case Entry.TypeTableStatus:
445                                 processEntry((TableStatus)entry, indexer);
446                                 break;
447
448                         default:
449                                 throw new Error("Unrecognized type: "+entry.getType());
450                         }
451                 }
452         }
453
454         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
455                 for(int i=0; i < newslots.length; i++) {
456                         Slot currslot=newslots[i];
457                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
458                         if (prevslot != null &&
459                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
460                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
461                 }
462         }
463 }