simplify code
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9
10 /**
11  * IoTTable data structure.  Provides client inferface.
12  * @author Brian Demsky
13  * @version 1.0
14  */
15
16 final public class Table {
17         private int numslots;
18         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21         private Vector<Long> rejectedmessagelist=new Vector<Long>();
22         private SlotBuffer buffer;
23         private CloudComm cloud;
24         private long sequencenumber;
25         private long localmachineid;
26         private TableStatus lastTableStatus;
27         static final int FREE_SLOTS = 10;
28         static final int SKIP_THRESHOLD = 10;
29         private long liveslotcount=0;
30         private int chance;
31         static final double RESIZE_MULTIPLE = 1.2;
32         static final double RESIZE_THRESHOLD = 0.75;
33         static final int REJECTED_THRESHOLD = 5;
34         private int resizethreshold;
35         private long lastliveslotseqn;
36         private Random random=new Random();
37         
38         public Table(String baseurl, String password, long _localmachineid) {
39                 localmachineid=_localmachineid;
40                 buffer = new SlotBuffer();
41                 numslots = buffer.capacity();
42                 setResizeThreshold();
43                 sequencenumber = 0;
44                 cloud=new CloudComm(this, baseurl, password);
45                 lastliveslotseqn = 1;
46         }
47
48         public Table(CloudComm _cloud, long _localmachineid) {
49                 localmachineid=_localmachineid;
50                 buffer = new SlotBuffer();
51                 numslots = buffer.capacity();
52                 setResizeThreshold();
53                 sequencenumber = 0;
54                 cloud=_cloud;
55         }
56
57         public void rebuild() {
58                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
59                 validateandupdate(newslots, true);
60         }
61
62         public void update() {
63                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
64
65                 validateandupdate(newslots, false);
66         }
67
68         public IoTString get(IoTString key) {
69                 KeyValue kv=table.get(key);
70                 if (kv != null)
71                         return kv.getValue();
72                 else
73                         return null;
74         }
75
76         public void initTable() {
77                 cloud.setSalt();//Set the salt
78                 Slot s=new Slot(this, 1, localmachineid);
79                 TableStatus status=new TableStatus(s, numslots);
80                 s.addEntry(status);
81                 Slot[] array=cloud.putSlot(s, numslots);
82                 if (array == null) {
83                         array = new Slot[] {s};
84                         /* update data structure */
85                         validateandupdate(array, true);
86                 } else {
87                         throw new Error("Error on initialization");
88                 }
89         }
90
91         public String toString() {
92                 return table.toString();
93         }
94         
95         public IoTString put(IoTString key, IoTString value) {
96                 while(true) {
97                         KeyValue oldvalue=table.get(key);
98                         if (tryput(key, value, false)) {
99                                 if (oldvalue==null)
100                                         return null;
101                                 else
102                                         return oldvalue.getValue();
103                         }
104                 }
105         }
106
107         void decrementLiveCount() {
108                 liveslotcount--;
109         }
110         
111         private void setResizeThreshold() {
112                 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
113                 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
114         }
115         
116         private boolean tryput(IoTString key, IoTString value, boolean resize) {
117                 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
118                 int newsize = 0;
119                 if (liveslotcount > resizethreshold) {
120                         resize=true;
121                         newsize = (int) (numslots * RESIZE_MULTIPLE);
122                 }
123                 
124                 if (resize) {
125                         newsize = (int) (numslots * RESIZE_MULTIPLE);
126                         TableStatus status=new TableStatus(s, newsize);
127                         s.addEntry(status);
128                 }
129
130                 if (!rejectedmessagelist.isEmpty()) {
131                         long old_seqn=rejectedmessagelist.firstElement();
132                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
133                                 long new_seqn=rejectedmessagelist.lastElement();
134                                 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
135                                 s.addEntry(rm);
136                         } else {
137                                 long prev_seqn=-1;
138                                 int i=0;
139                                 /* Go through list of missing messages */
140                                 for(;i<rejectedmessagelist.size();i++) {
141                                         long curr_seqn=rejectedmessagelist.get(i);
142                                         Slot s_msg=buffer.getSlot(curr_seqn);
143                                         if (s_msg!=null)
144                                                 break;
145                                         prev_seqn=curr_seqn;
146                                 }
147                                 /* Generate rejected message entry for missing messages */
148                                 if (prev_seqn != -1) {
149                                         RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
150                                         s.addEntry(rm);
151                                 }
152                                 /* Generate rejected message entries for present messages */
153                                 for(;i<rejectedmessagelist.size();i++) {
154                                         long curr_seqn=rejectedmessagelist.get(i);
155                                         Slot s_msg=buffer.getSlot(curr_seqn);
156                                         long machineid=s_msg.getMachineID();
157                                         RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
158                                         s.addEntry(rm);
159                                 }
160                         }
161                 }
162                 
163                 long newestseqnum = buffer.getNewestSeqNum();
164                 long oldestseqnum = buffer.getOldestSeqNum();
165                 if (lastliveslotseqn < oldestseqnum)
166                         lastliveslotseqn = oldestseqnum;
167
168                 long seqn = lastliveslotseqn;
169                 boolean seenliveslot = false;
170                 long firstiffull = newestseqnum + 1 - numslots;
171                 long threshold = firstiffull + FREE_SLOTS;
172                 
173                 for(; seqn < threshold; seqn++) {
174                         Slot prevslot=buffer.getSlot(seqn);
175                         //Push slot number forward
176                         if (!seenliveslot)
177                                 lastliveslotseqn = seqn;
178
179                         if (!prevslot.isLive())
180                                 continue;
181                         seenliveslot = true;
182                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
183                         for(Entry liveentry:liveentries) {
184                                 if (s.hasSpace(liveentry)) {
185                                         s.addEntry(liveentry);
186                                 } else if (seqn==firstiffull) {
187                                         if (!resize) {
188                                                 System.out.print("B");
189                                                 return tryput(key, value, true);
190                                         }
191                                 }
192                         }
193                 }
194
195                 KeyValue kv=new KeyValue(s, key, value);
196                 boolean insertedkv=false;
197                 if (s.hasSpace(kv)) {
198                         s.addEntry(kv);
199                         insertedkv=true;
200                 }
201
202                 int skipcount=0;
203                 search:
204                 for(; seqn <= newestseqnum; seqn++) {
205                         Slot prevslot=buffer.getSlot(seqn);
206                         //Push slot number forward
207                         if (!seenliveslot)
208                                 lastliveslotseqn = seqn;
209                         
210                         if (!prevslot.isLive())
211                                 continue;
212                         seenliveslot = true;
213                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
214                         for(Entry liveentry:liveentries) {
215                                 if (s.hasSpace(liveentry))
216                                         s.addEntry(liveentry);
217                                 else {
218                                         skipcount++;
219                                         if (skipcount > SKIP_THRESHOLD)
220                                                 break search;
221                                 }
222                         }
223                 }
224
225                 int max=0;
226                 if (resize)
227                         max = newsize;
228                 Slot[] array=cloud.putSlot(s, max);
229                 if (array == null) {
230                         array = new Slot[] {s};
231                         rejectedmessagelist.clear();
232                 }       else {
233                         if (array.length == 0)
234                                 throw new Error("Server Error: Did not send any slots");
235                         rejectedmessagelist.add(s.getSequenceNumber());
236                         insertedkv=false;
237                 }
238                 
239                 /* update data structure */
240                 validateandupdate(array, true);
241
242                 return insertedkv;
243         }
244
245         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
246                 /* The cloud communication layer has checked slot HMACs already
247                          before decoding */
248                 if (newslots.length==0)
249                         return;
250
251                 long firstseqnum=newslots[0].getSequenceNumber();
252                 if (firstseqnum <= sequencenumber)
253                         throw new Error("Server Error: Sent older slots!");
254
255                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
256                 checkHMACChain(indexer, newslots);
257
258                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
259
260                 initExpectedSize(firstseqnum);
261                 for(Slot slot: newslots) {
262                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
263                         updateExpectedSize();
264                 }
265
266                 /* If there is a gap, check to see if the server sent us everything. */
267                 if (firstseqnum != (sequencenumber+1)) {
268                         checkNumSlots(newslots.length);
269                         if (!machineSet.isEmpty())
270                                 throw new Error("Missing record for machines: "+machineSet);
271                 }
272
273                 commitNewMaxSize();
274
275                 /* Commit new to slots. */
276                 for(Slot slot:newslots) {
277                         buffer.putSlot(slot);
278                         liveslotcount++;
279                 }
280                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
281         }
282
283         private int expectedsize, currmaxsize;
284
285         private void checkNumSlots(int numslots) {
286                 if (numslots != expectedsize)
287                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
288         }
289
290         private void initExpectedSize(long firstsequencenumber) {
291                 long prevslots = firstsequencenumber;
292                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
293                 currmaxsize = numslots;
294         }
295
296         private void updateExpectedSize() {
297                 expectedsize++;
298                 if (expectedsize > currmaxsize)
299                         expectedsize = currmaxsize;
300         }
301
302         private void updateCurrMaxSize(int newmaxsize) {
303                 currmaxsize=newmaxsize;
304         }
305
306         private void commitNewMaxSize() {
307                 if (numslots != currmaxsize)
308                         buffer.resize(currmaxsize);
309
310                 numslots=currmaxsize;
311                 setResizeThreshold();
312         }
313
314         private void processEntry(KeyValue entry, SlotIndexer indexer) {
315                 IoTString key=entry.getKey();
316                 KeyValue oldvalue=table.get(key);
317                 if (oldvalue != null) {
318                         oldvalue.setDead();
319                 }
320                 table.put(key, entry);
321         }
322
323         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
324                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
325         }
326
327         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
328                 long oldseqnum=entry.getOldSeqNum();
329                 long newseqnum=entry.getNewSeqNum();
330                 boolean isequal=entry.getEqual();
331                 long machineid=entry.getMachineID();
332                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
333                         Slot slot=indexer.getSlot(seqnum);
334                         if (slot != null) {
335                                 long slotmachineid=slot.getMachineID();
336                                 if (isequal!=(slotmachineid==machineid)) {
337                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
338                                 }
339                         }
340                 }
341
342                 HashSet<Long> watchset=new HashSet<Long>();
343                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
344                         long entry_mid=lastmsg_entry.getKey();
345                         /* We've seen it, don't need to continue to watch.  Our next
346                          * message will implicitly acknowledge it. */
347                         if (entry_mid == localmachineid)
348                                 continue;
349                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
350                         long entry_seqn=v.getFirst();
351                         if (entry_seqn < newseqnum) {
352                                 addWatchList(entry_mid, entry);
353                                 watchset.add(entry_mid);
354                         }
355                 }
356                 if (watchset.isEmpty())
357                         entry.setDead();
358                 else
359                         entry.setWatchSet(watchset);
360         }
361
362         private void addWatchList(long machineid, RejectedMessage entry) {
363                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
364                 if (entries == null)
365                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
366                 entries.add(entry);
367         }
368
369         private void processEntry(TableStatus entry, SlotIndexer indexer) {
370                 int newnumslots=entry.getMaxSlots();
371                 updateCurrMaxSize(newnumslots);
372                 if (lastTableStatus != null)
373                         lastTableStatus.setDead();
374                 lastTableStatus = entry;
375         }
376
377         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
378                 machineSet.remove(machineid);
379
380                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
381                 if (watchset != null) {
382                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
383                                 RejectedMessage rm=rmit.next();
384                                 if (rm.getNewSeqNum() <= seqnum) {
385                                         /* Remove it from our watchlist */
386                                         rmit.remove();
387                                         /* Decrement machines that need to see this notification */
388                                         rm.removeWatcher(machineid);
389                                 }
390                         }
391                 }
392                 
393                 if (machineid == localmachineid) {
394                         /* Our own messages are immediately dead. */
395                         if (liveness instanceof LastMessage) {
396                                 ((LastMessage)liveness).setDead();
397                         } else if (liveness instanceof Slot) {
398                                 ((Slot)liveness).setDead();
399                         } else {
400                                 throw new Error("Unrecognized type");
401                         }
402                 }
403                 
404                 
405                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
406                 if (lastmsgentry == null)
407                         return;
408
409                 long lastmsgseqnum = lastmsgentry.getFirst();
410                 Liveness lastentry = lastmsgentry.getSecond();
411                 if (machineid != localmachineid) {
412                         if (lastentry instanceof LastMessage) {
413                                 ((LastMessage)lastentry).setDead();
414                         } else if (lastentry instanceof Slot) {
415                                 ((Slot)lastentry).setDead();
416                         } else {
417                                 throw new Error("Unrecognized type");
418                         }
419                 }
420                 
421                 if (machineid == localmachineid) {
422                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
423                                 throw new Error("Server Error: Mismatch on local machine sequence number");
424                 } else {
425                         if (lastmsgseqnum > seqnum)
426                                 throw new Error("Server Error: Rollback on remote machine sequence number");
427                 }
428         }
429
430         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
431                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
432                 for(Entry entry : slot.getEntries()) {
433                         switch(entry.getType()) {
434                         case Entry.TypeKeyValue:
435                                 processEntry((KeyValue)entry, indexer);
436                                 break;
437
438                         case Entry.TypeLastMessage:
439                                 processEntry((LastMessage)entry, indexer, machineSet);
440                                 break;
441
442                         case Entry.TypeRejectedMessage:
443                                 processEntry((RejectedMessage)entry, indexer);
444                                 break;
445
446                         case Entry.TypeTableStatus:
447                                 processEntry((TableStatus)entry, indexer);
448                                 break;
449
450                         default:
451                                 throw new Error("Unrecognized type: "+entry.getType());
452                         }
453                 }
454         }
455
456         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
457                 for(int i=0; i < newslots.length; i++) {
458                         Slot currslot=newslots[i];
459                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
460                         if (prevslot != null &&
461                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
462                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
463                 }
464         }
465 }