2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
11 * IoTTable data structure. Provides client inferface.
12 * @author Brian Demsky
16 final public class Table {
17 private int numslots; //number of slots stored in buffer
19 //table of key-value pairs
20 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
22 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
23 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
25 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
26 private Vector<Long> rejectedmessagelist=new Vector<Long>();
27 private SlotBuffer buffer;
28 private CloudComm cloud;
29 private long sequencenumber; //Largest sequence number a client has received
30 private long localmachineid;
31 private TableStatus lastTableStatus;
32 static final int FREE_SLOTS = 10; //number of slots that should be kept free
33 static final int SKIP_THRESHOLD = 10;
34 private long liveslotcount=0;
36 static final double RESIZE_MULTIPLE = 1.2;
37 static final double RESIZE_THRESHOLD = 0.75;
38 static final int REJECTED_THRESHOLD = 5;
39 private int resizethreshold;
40 private long lastliveslotseqn; //smallest sequence number with a live entry
41 private Random random=new Random();
43 public Table(String baseurl, String password, long _localmachineid) {
44 localmachineid=_localmachineid;
45 buffer = new SlotBuffer();
46 numslots = buffer.capacity();
49 cloud=new CloudComm(this, baseurl, password);
53 public Table(CloudComm _cloud, long _localmachineid) {
54 localmachineid=_localmachineid;
55 buffer = new SlotBuffer();
56 numslots = buffer.capacity();
62 public void rebuild() {
63 Slot[] newslots=cloud.getSlots(sequencenumber+1);
64 validateandupdate(newslots, true);
67 public void update() {
68 Slot[] newslots=cloud.getSlots(sequencenumber+1);
70 validateandupdate(newslots, false);
73 public IoTString get(IoTString key) {
74 KeyValue kv=table.get(key);
81 public void initTable() {
82 cloud.setSalt();//Set the salt
83 Slot s=new Slot(this, 1, localmachineid);
84 TableStatus status=new TableStatus(s, numslots);
86 Slot[] array=cloud.putSlot(s, numslots);
88 array = new Slot[] {s};
89 /* update data structure */
90 validateandupdate(array, true);
92 throw new Error("Error on initialization");
96 public String toString() {
97 return table.toString();
100 public IoTString put(IoTString key, IoTString value) {
102 KeyValue oldvalue=table.get(key);
103 if (tryput(key, value, false)) {
107 return oldvalue.getValue();
112 void decrementLiveCount() {
117 private void setResizeThreshold() {
118 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
119 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
122 private boolean tryput(IoTString key, IoTString value, boolean resize) {
123 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
125 if (liveslotcount > resizethreshold) {
126 resize=true; //Resize is forced
130 newsize = (int) (numslots * RESIZE_MULTIPLE);
131 TableStatus status=new TableStatus(s, newsize);
135 if (! rejectedmessagelist.isEmpty()) {
136 /* TODO: We should avoid generating a rejected message entry if
137 * there is already a sufficient entry in the queue (e.g.,
138 * equalsto value of true and same sequence number). */
140 long old_seqn=rejectedmessagelist.firstElement();
141 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
142 long new_seqn=rejectedmessagelist.lastElement();
143 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
148 /* Go through list of missing messages */
149 for(; i<rejectedmessagelist.size(); i++) {
150 long curr_seqn = rejectedmessagelist.get(i);
151 Slot s_msg = buffer.getSlot(curr_seqn);
156 /* Generate rejected message entry for missing messages */
157 if (prev_seqn != -1) {
158 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
161 /* Generate rejected message entries for present messages */
162 for(; i<rejectedmessagelist.size(); i++) {
163 long curr_seqn=rejectedmessagelist.get(i);
164 Slot s_msg=buffer.getSlot(curr_seqn);
165 long machineid=s_msg.getMachineID();
166 RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
172 long newestseqnum = buffer.getNewestSeqNum();
173 long oldestseqnum = buffer.getOldestSeqNum();
174 if (lastliveslotseqn < oldestseqnum)
175 lastliveslotseqn = oldestseqnum;
177 long seqn = lastliveslotseqn;
178 boolean seenliveslot = false;
179 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
180 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
182 for(; seqn < threshold; seqn++) {
183 Slot prevslot=buffer.getSlot(seqn);
184 //Push slot number forward
186 lastliveslotseqn = seqn;
188 if (! prevslot.isLive())
191 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
192 for(Entry liveentry:liveentries) {
193 if (s.hasSpace(liveentry)) {
194 s.addEntry(liveentry);
195 } else if (seqn==firstiffull) { //if there's no space but the entry is about to fall off the queue
197 System.out.print("B"); //?
198 return tryput(key, value, true);
204 KeyValue kv=new KeyValue(s, key, value);
205 boolean insertedkv=false;
206 if (s.hasSpace(kv)) {
211 /* now go through live entries from least to greatest sequence number until
212 * either all live slots added, or the slot doesn't have enough room
213 * for SKIP_THRESHOLD consecutive entries*/
216 for(; seqn <= newestseqnum; seqn++) {
217 Slot prevslot=buffer.getSlot(seqn);
218 //Push slot number forward
220 lastliveslotseqn = seqn;
222 if (!prevslot.isLive())
225 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
226 for(Entry liveentry:liveentries) {
227 if (s.hasSpace(liveentry))
228 s.addEntry(liveentry);
231 if (skipcount > SKIP_THRESHOLD)
240 Slot[] array=cloud.putSlot(s, max);
242 array = new Slot[] {s};
243 rejectedmessagelist.clear();
245 if (array.length == 0)
246 throw new Error("Server Error: Did not send any slots");
247 rejectedmessagelist.add(s.getSequenceNumber());
251 /* update data structure */
252 validateandupdate(array, true);
257 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
258 /* The cloud communication layer has checked slot HMACs already
260 if (newslots.length==0) return;
262 long firstseqnum=newslots[0].getSequenceNumber();
263 if (firstseqnum <= sequencenumber)
264 throw new Error("Server Error: Sent older slots!");
266 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
267 checkHMACChain(indexer, newslots);
269 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet()); //
271 initExpectedSize(firstseqnum);
272 for(Slot slot: newslots) {
273 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
274 updateExpectedSize();
277 /* If there is a gap, check to see if the server sent us everything. */
278 if (firstseqnum != (sequencenumber+1)) {
279 checkNumSlots(newslots.length);
280 if (!machineSet.isEmpty())
281 throw new Error("Missing record for machines: "+machineSet);
286 /* Commit new to slots. */
287 for(Slot slot:newslots) {
288 buffer.putSlot(slot);
291 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
294 private int expectedsize, currmaxsize;
296 private void checkNumSlots(int numslots) {
297 if (numslots != expectedsize)
298 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
301 private void initExpectedSize(long firstsequencenumber) {
302 long prevslots = firstsequencenumber;
303 expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
304 currmaxsize = numslots;
307 private void updateExpectedSize() {
309 if (expectedsize > currmaxsize)
310 expectedsize = currmaxsize;
313 private void updateCurrMaxSize(int newmaxsize) {
314 currmaxsize=newmaxsize;
317 private void commitNewMaxSize() {
318 if (numslots != currmaxsize)
319 buffer.resize(currmaxsize);
321 numslots=currmaxsize;
322 setResizeThreshold();
325 private void processEntry(KeyValue entry, SlotIndexer indexer) {
326 IoTString key=entry.getKey();
327 KeyValue oldvalue=table.get(key);
328 if (oldvalue != null) {
331 table.put(key, entry);
334 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
335 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
338 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
339 long oldseqnum=entry.getOldSeqNum();
340 long newseqnum=entry.getNewSeqNum();
341 boolean isequal=entry.getEqual();
342 long machineid=entry.getMachineID();
343 for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
344 Slot slot=indexer.getSlot(seqnum);
346 long slotmachineid=slot.getMachineID();
347 if (isequal != (slotmachineid==machineid)) {
348 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
353 HashSet<Long> watchset=new HashSet<Long>();
354 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
355 long entry_mid=lastmsg_entry.getKey();
356 /* We've seen it, don't need to continue to watch. Our next
357 * message will implicitly acknowledge it. */
358 if (entry_mid == localmachineid)
360 Pair<Long, Liveness> v=lastmsg_entry.getValue();
361 long entry_seqn=v.getFirst();
362 if (entry_seqn < newseqnum) {
363 addWatchList(entry_mid, entry);
364 watchset.add(entry_mid);
367 if (watchset.isEmpty())
370 entry.setWatchSet(watchset);
373 private void addWatchList(long machineid, RejectedMessage entry) {
374 HashSet<RejectedMessage> entries=watchlist.get(machineid);
376 watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
380 private void processEntry(TableStatus entry, SlotIndexer indexer) {
381 int newnumslots=entry.getMaxSlots();
382 updateCurrMaxSize(newnumslots);
383 if (lastTableStatus != null)
384 lastTableStatus.setDead();
385 lastTableStatus = entry;
388 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
389 machineSet.remove(machineid);
391 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
392 if (watchset != null) {
393 for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
394 RejectedMessage rm=rmit.next();
395 if (rm.getNewSeqNum() <= seqnum) {
396 /* Remove it from our watchlist */
398 /* Decrement machines that need to see this notification */
399 rm.removeWatcher(machineid);
404 if (machineid == localmachineid) {
405 /* Our own messages are immediately dead. */
406 if (liveness instanceof LastMessage) {
407 ((LastMessage)liveness).setDead();
408 } else if (liveness instanceof Slot) {
409 ((Slot)liveness).setDead();
411 throw new Error("Unrecognized type");
416 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
417 if (lastmsgentry == null)
420 long lastmsgseqnum = lastmsgentry.getFirst();
421 Liveness lastentry = lastmsgentry.getSecond();
422 if (machineid != localmachineid) {
423 if (lastentry instanceof LastMessage) {
424 ((LastMessage)lastentry).setDead();
425 } else if (lastentry instanceof Slot) {
426 ((Slot)lastentry).setDead();
428 throw new Error("Unrecognized type");
432 if (machineid == localmachineid) {
433 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
434 throw new Error("Server Error: Mismatch on local machine sequence number");
436 if (lastmsgseqnum > seqnum)
437 throw new Error("Server Error: Rollback on remote machine sequence number");
441 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
442 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
443 for(Entry entry : slot.getEntries()) {
444 switch(entry.getType()) {
445 case Entry.TypeKeyValue:
446 processEntry((KeyValue)entry, indexer);
449 case Entry.TypeLastMessage:
450 processEntry((LastMessage)entry, indexer, machineSet);
453 case Entry.TypeRejectedMessage:
454 processEntry((RejectedMessage)entry, indexer);
457 case Entry.TypeTableStatus:
458 processEntry((TableStatus)entry, indexer);
462 throw new Error("Unrecognized type: "+entry.getType());
467 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
468 for(int i=0; i < newslots.length; i++) {
469 Slot currslot=newslots[i];
470 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
471 if (prevslot != null &&
472 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
473 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);