2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
11 * IoTTable data structure. Provides client inferface.
12 * @author Brian Demsky
16 final public class Table {
18 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21 private Vector<Long> rejectedmessagelist=new Vector<Long>();
22 private SlotBuffer buffer;
23 private CloudComm cloud;
24 private long sequencenumber;
25 private long localmachineid;
26 private TableStatus lastTableStatus;
27 static final int FREE_SLOTS = 10;
28 static final int SKIP_THRESHOLD = 10;
29 private long liveslotcount=0;
31 static final double RESIZE_MULTIPLE = 1.2;
32 static final double RESIZE_THRESHOLD = 0.75;
33 static final int REJECTED_THRESHOLD = 5;
34 private int resizethreshold;
35 private long lastliveslotseqn;
36 private Random random=new Random();
38 public Table(String baseurl, String password, long _localmachineid) {
39 localmachineid=_localmachineid;
40 buffer = new SlotBuffer();
41 numslots = buffer.capacity();
44 cloud=new CloudComm(this, baseurl, password);
48 public Table(CloudComm _cloud, long _localmachineid) {
49 localmachineid=_localmachineid;
50 buffer = new SlotBuffer();
51 numslots = buffer.capacity();
57 public void rebuild() {
58 Slot[] newslots=cloud.getSlots(sequencenumber+1);
59 validateandupdate(newslots, true);
62 public void update() {
63 Slot[] newslots=cloud.getSlots(sequencenumber+1);
65 validateandupdate(newslots, false);
68 public IoTString get(IoTString key) {
69 KeyValue kv=table.get(key);
76 public void initTable() {
77 cloud.setSalt();//Set the salt
78 Slot s=new Slot(this, 1, localmachineid);
79 TableStatus status=new TableStatus(s, numslots);
81 Slot[] array=cloud.putSlot(s, numslots);
83 array = new Slot[] {s};
84 /* update data structure */
85 validateandupdate(array, true);
87 throw new Error("Error on initialization");
91 public String toString() {
92 return table.toString();
95 public IoTString put(IoTString key, IoTString value) {
97 KeyValue oldvalue=table.get(key);
98 if (tryput(key, value, false)) {
102 return oldvalue.getValue();
107 void decrementLiveCount() {
111 private void setResizeThreshold() {
112 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
113 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
116 private boolean tryput(IoTString key, IoTString value, boolean resize) {
117 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
119 if (liveslotcount > resizethreshold) {
121 newsize = (int) (numslots * RESIZE_MULTIPLE);
125 newsize = (int) (numslots * RESIZE_MULTIPLE);
126 TableStatus status=new TableStatus(s, newsize);
130 if (!rejectedmessagelist.isEmpty()) {
131 long old_seqn=rejectedmessagelist.firstElement();
132 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
133 long new_seqn=rejectedmessagelist.lastElement();
134 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
139 /* Go through list of missing messages */
140 for(;i<rejectedmessagelist.size();i++) {
141 long curr_seqn=rejectedmessagelist.get(i);
142 Slot s_msg=buffer.getSlot(curr_seqn);
147 /* Generate rejected message entry for missing messages */
148 if (prev_seqn != -1) {
149 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
152 /* Generate rejected message entries for present messages */
153 for(;i<rejectedmessagelist.size();i++) {
154 long curr_seqn=rejectedmessagelist.get(i);
155 Slot s_msg=buffer.getSlot(curr_seqn);
156 long machineid=s_msg.getMachineID();
157 RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
163 long newestseqnum = buffer.getNewestSeqNum();
164 long oldestseqnum = buffer.getOldestSeqNum();
165 if (lastliveslotseqn < oldestseqnum)
166 lastliveslotseqn = oldestseqnum;
168 long seqn = lastliveslotseqn;
169 boolean seenliveslot = false;
170 long firstiffull = newestseqnum + 1 - numslots;
171 long threshold = firstiffull + FREE_SLOTS;
173 for(; seqn < threshold; seqn++) {
174 Slot prevslot=buffer.getSlot(seqn);
175 //Push slot number forward
177 lastliveslotseqn = seqn;
179 if (!prevslot.isLive())
182 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
183 for(Entry liveentry:liveentries) {
184 if (s.hasSpace(liveentry)) {
185 s.addEntry(liveentry);
186 } else if (seqn==firstiffull) {
188 System.out.print("B");
189 return tryput(key, value, true);
195 KeyValue kv=new KeyValue(s, key, value);
196 boolean insertedkv=false;
197 if (s.hasSpace(kv)) {
204 for(; seqn <= newestseqnum; seqn++) {
205 Slot prevslot=buffer.getSlot(seqn);
206 //Push slot number forward
208 lastliveslotseqn = seqn;
210 if (!prevslot.isLive())
213 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
214 for(Entry liveentry:liveentries) {
215 if (s.hasSpace(liveentry))
216 s.addEntry(liveentry);
219 if (skipcount > SKIP_THRESHOLD)
228 Slot[] array=cloud.putSlot(s, max);
230 array = new Slot[] {s};
231 rejectedmessagelist.clear();
233 if (array.length == 0)
234 throw new Error("Server Error: Did not send any slots");
235 rejectedmessagelist.add(s.getSequenceNumber());
239 /* update data structure */
240 validateandupdate(array, true);
245 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
246 /* The cloud communication layer has checked slot HMACs already
248 if (newslots.length==0)
251 long firstseqnum=newslots[0].getSequenceNumber();
252 if (firstseqnum <= sequencenumber)
253 throw new Error("Server Error: Sent older slots!");
255 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
256 checkHMACChain(indexer, newslots);
258 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
260 initExpectedSize(firstseqnum);
261 for(Slot slot: newslots) {
262 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
263 updateExpectedSize();
266 /* If there is a gap, check to see if the server sent us everything. */
267 if (firstseqnum != (sequencenumber+1)) {
268 checkNumSlots(newslots.length);
269 if (!machineSet.isEmpty())
270 throw new Error("Missing record for machines: "+machineSet);
275 /* Commit new to slots. */
276 for(Slot slot:newslots) {
277 buffer.putSlot(slot);
280 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
283 private int expectedsize, currmaxsize;
285 private void checkNumSlots(int numslots) {
286 if (numslots != expectedsize)
287 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
290 private void initExpectedSize(long firstsequencenumber) {
291 long prevslots = firstsequencenumber;
292 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
293 currmaxsize = numslots;
296 private void updateExpectedSize() {
298 if (expectedsize > currmaxsize)
299 expectedsize = currmaxsize;
302 private void updateCurrMaxSize(int newmaxsize) {
303 currmaxsize=newmaxsize;
306 private void commitNewMaxSize() {
307 if (numslots != currmaxsize)
308 buffer.resize(currmaxsize);
310 numslots=currmaxsize;
311 setResizeThreshold();
314 private void processEntry(KeyValue entry, SlotIndexer indexer) {
315 IoTString key=entry.getKey();
316 KeyValue oldvalue=table.get(key);
317 if (oldvalue != null) {
320 table.put(key, entry);
323 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
324 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
327 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
328 long oldseqnum=entry.getOldSeqNum();
329 long newseqnum=entry.getNewSeqNum();
330 boolean isequal=entry.getEqual();
331 long machineid=entry.getMachineID();
332 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
333 Slot slot=indexer.getSlot(seqnum);
335 long slotmachineid=slot.getMachineID();
336 if (isequal!=(slotmachineid==machineid)) {
337 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
342 HashSet<Long> watchset=new HashSet<Long>();
343 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
344 long entry_mid=lastmsg_entry.getKey();
345 /* We've seen it, don't need to continue to watch. Our next
346 * message will implicitly acknowledge it. */
347 if (entry_mid == localmachineid)
349 Pair<Long, Liveness> v=lastmsg_entry.getValue();
350 long entry_seqn=v.getFirst();
351 if (entry_seqn < newseqnum) {
352 addWatchList(entry_mid, entry);
353 watchset.add(entry_mid);
356 if (watchset.isEmpty())
359 entry.setWatchSet(watchset);
362 private void addWatchList(long machineid, RejectedMessage entry) {
363 HashSet<RejectedMessage> entries=watchlist.get(machineid);
365 watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
369 private void processEntry(TableStatus entry, SlotIndexer indexer) {
370 int newnumslots=entry.getMaxSlots();
371 updateCurrMaxSize(newnumslots);
372 if (lastTableStatus != null)
373 lastTableStatus.setDead();
374 lastTableStatus = entry;
377 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
378 machineSet.remove(machineid);
380 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
381 if (watchset != null) {
382 for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
383 RejectedMessage rm=rmit.next();
384 if (rm.getNewSeqNum() <= seqnum) {
385 /* Remove it from our watchlist */
387 /* Decrement machines that need to see this notification */
388 rm.removeWatcher(machineid);
393 if (machineid == localmachineid) {
394 /* Our own messages are immediately dead. */
395 if (liveness instanceof LastMessage) {
396 ((LastMessage)liveness).setDead();
397 } else if (liveness instanceof Slot) {
398 ((Slot)liveness).setDead();
400 throw new Error("Unrecognized type");
405 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
406 if (lastmsgentry == null)
409 long lastmsgseqnum = lastmsgentry.getFirst();
410 Liveness lastentry = lastmsgentry.getSecond();
411 if (machineid != localmachineid) {
412 if (lastentry instanceof LastMessage) {
413 ((LastMessage)lastentry).setDead();
414 } else if (lastentry instanceof Slot) {
415 ((Slot)lastentry).setDead();
417 throw new Error("Unrecognized type");
421 if (machineid == localmachineid) {
422 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
423 throw new Error("Server Error: Mismatch on local machine sequence number");
425 if (lastmsgseqnum > seqnum)
426 throw new Error("Server Error: Rollback on remote machine sequence number");
430 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
431 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
432 for(Entry entry : slot.getEntries()) {
433 switch(entry.getType()) {
434 case Entry.TypeKeyValue:
435 processEntry((KeyValue)entry, indexer);
438 case Entry.TypeLastMessage:
439 processEntry((LastMessage)entry, indexer, machineSet);
442 case Entry.TypeRejectedMessage:
443 processEntry((RejectedMessage)entry, indexer);
446 case Entry.TypeTableStatus:
447 processEntry((TableStatus)entry, indexer);
451 throw new Error("Unrecognized type: "+entry.getType());
456 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
457 for(int i=0; i < newslots.length; i++) {
458 Slot currslot=newslots[i];
459 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
460 if (prevslot != null &&
461 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
462 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);