2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
12 * IoTTable data structure. Provides client inferface.
13 * @author Brian Demsky
17 final public class Table {
18 private int numslots; //number of slots stored in buffer
20 //table of key-value pairs
21 private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
23 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
24 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
26 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
27 private Vector<Long> rejectedmessagelist = new Vector<Long>();
28 private SlotBuffer buffer;
29 private CloudComm cloud;
30 private long sequencenumber; //Largest sequence number a client has received
31 private long localmachineid;
32 private TableStatus lastTableStatus;
33 static final int FREE_SLOTS = 10; //number of slots that should be kept free
34 static final int SKIP_THRESHOLD = 10;
35 private long liveslotcount = 0;
37 static final double RESIZE_MULTIPLE = 1.2;
38 static final double RESIZE_THRESHOLD = 0.75;
39 static final int REJECTED_THRESHOLD = 5;
40 private int resizethreshold;
41 private long lastliveslotseqn; //smallest sequence number with a live entry
42 private Random random = new Random();
44 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
45 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
48 public Table(String baseurl, String password, long _localmachineid) {
49 localmachineid = _localmachineid;
50 buffer = new SlotBuffer();
51 numslots = buffer.capacity();
54 cloud = new CloudComm(this, baseurl, password);
57 pendingTransQueue = new LinkedList<PendingTransaction>();
60 public Table(CloudComm _cloud, long _localmachineid) {
61 localmachineid = _localmachineid;
62 buffer = new SlotBuffer();
63 numslots = buffer.capacity();
68 pendingTransQueue = new LinkedList<PendingTransaction>();
71 public void rebuild() {
72 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
73 validateandupdate(newslots, true);
76 public void update() {
77 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
79 validateandupdate(newslots, false);
82 public IoTString get(IoTString key) {
83 KeyValue kv = table.get(key);
90 public void initTable() {
91 cloud.setSalt();//Set the salt
92 Slot s = new Slot(this, 1, localmachineid);
93 TableStatus status = new TableStatus(s, numslots);
95 Slot[] array = cloud.putSlot(s, numslots);
97 array = new Slot[] {s};
98 /* update data structure */
99 validateandupdate(array, true);
101 throw new Error("Error on initialization");
105 public String toString() {
106 return table.toString();
109 public void startTransaction() {
110 // Create a new transaction, invalidates any old pending transactions.
111 pendingTransBuild = new PendingTransaction();
114 public void commitTransaction() {
116 // Add the pending transaction to the queue
117 pendingTransQueue.add(pendingTransBuild);
119 while (!pendingTransQueue.isEmpty()) {
120 if (tryput( pendingTransQueue.peek(), false)) {
121 pendingTransQueue.poll();
126 public void addKV(IoTString key, IoTString value) {
127 KeyValue kv = new KeyValue(key, value);
128 pendingTransBuild.addKV(kv);
131 public void addGuard(IoTString key, IoTString value) {
132 KeyValue kv = new KeyValue(key, value);
133 pendingTransBuild.addKV(kv);
141 void decrementLiveCount() {
146 private void setResizeThreshold() {
147 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
148 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
151 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
152 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
154 if (liveslotcount > resizethreshold) {
155 resize = true; //Resize is forced
159 newsize = (int) (numslots * RESIZE_MULTIPLE);
160 TableStatus status = new TableStatus(s, newsize);
164 if (! rejectedmessagelist.isEmpty()) {
165 /* TODO: We should avoid generating a rejected message entry if
166 * there is already a sufficient entry in the queue (e.g.,
167 * equalsto value of true and same sequence number). */
169 long old_seqn = rejectedmessagelist.firstElement();
170 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
171 long new_seqn = rejectedmessagelist.lastElement();
172 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
177 /* Go through list of missing messages */
178 for (; i < rejectedmessagelist.size(); i++) {
179 long curr_seqn = rejectedmessagelist.get(i);
180 Slot s_msg = buffer.getSlot(curr_seqn);
183 prev_seqn = curr_seqn;
185 /* Generate rejected message entry for missing messages */
186 if (prev_seqn != -1) {
187 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
190 /* Generate rejected message entries for present messages */
191 for (; i < rejectedmessagelist.size(); i++) {
192 long curr_seqn = rejectedmessagelist.get(i);
193 Slot s_msg = buffer.getSlot(curr_seqn);
194 long machineid = s_msg.getMachineID();
195 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
201 long newestseqnum = buffer.getNewestSeqNum();
202 long oldestseqnum = buffer.getOldestSeqNum();
203 if (lastliveslotseqn < oldestseqnum)
204 lastliveslotseqn = oldestseqnum;
206 long seqn = lastliveslotseqn;
207 boolean seenliveslot = false;
208 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
209 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
211 for (; seqn < threshold; seqn++) {
212 Slot prevslot = buffer.getSlot(seqn);
213 //Push slot number forward
215 lastliveslotseqn = seqn;
217 if (! prevslot.isLive())
220 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
221 for (Entry liveentry : liveentries) {
222 if (s.hasSpace(liveentry)) {
223 s.addEntry(liveentry);
224 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
226 System.out.print("B"); //?
227 return tryput(pendingTrans, true);
234 Transaction trans = new Transaction(s,
235 s.getSequenceNumber(),
237 pendingTrans.getKVUpdates(),
238 pendingTrans.getGuard());
239 boolean insertedTrans = false;
240 if (s.hasSpace(trans)) {
245 /* now go through live entries from least to greatest sequence number until
246 * either all live slots added, or the slot doesn't have enough room
247 * for SKIP_THRESHOLD consecutive entries*/
250 for (; seqn <= newestseqnum; seqn++) {
251 Slot prevslot = buffer.getSlot(seqn);
252 //Push slot number forward
254 lastliveslotseqn = seqn;
256 if (!prevslot.isLive())
259 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
260 for (Entry liveentry : liveentries) {
261 if (s.hasSpace(liveentry))
262 s.addEntry(liveentry);
265 if (skipcount > SKIP_THRESHOLD)
274 Slot[] array = cloud.putSlot(s, max);
276 array = new Slot[] {s};
277 rejectedmessagelist.clear();
279 if (array.length == 0)
280 throw new Error("Server Error: Did not send any slots");
281 rejectedmessagelist.add(s.getSequenceNumber());
282 insertedTrans = false;
285 /* update data structure */
286 validateandupdate(array, true);
288 return insertedTrans;
291 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
292 /* The cloud communication layer has checked slot HMACs already
294 if (newslots.length == 0) return;
296 long firstseqnum = newslots[0].getSequenceNumber();
297 if (firstseqnum <= sequencenumber)
298 throw new Error("Server Error: Sent older slots!");
300 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
301 checkHMACChain(indexer, newslots);
303 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
305 initExpectedSize(firstseqnum);
306 for (Slot slot : newslots) {
307 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
308 updateExpectedSize();
311 /* If there is a gap, check to see if the server sent us everything. */
312 if (firstseqnum != (sequencenumber + 1)) {
313 checkNumSlots(newslots.length);
314 if (!machineSet.isEmpty())
315 throw new Error("Missing record for machines: " + machineSet);
320 /* Commit new to slots. */
321 for (Slot slot : newslots) {
322 buffer.putSlot(slot);
325 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
328 private int expectedsize, currmaxsize;
330 private void checkNumSlots(int numslots) {
331 if (numslots != expectedsize)
332 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
335 private void initExpectedSize(long firstsequencenumber) {
336 long prevslots = firstsequencenumber;
337 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
338 currmaxsize = numslots;
341 private void updateExpectedSize() {
343 if (expectedsize > currmaxsize)
344 expectedsize = currmaxsize;
347 private void updateCurrMaxSize(int newmaxsize) {
348 currmaxsize = newmaxsize;
351 private void commitNewMaxSize() {
352 if (numslots != currmaxsize)
353 buffer.resize(currmaxsize);
355 numslots = currmaxsize;
356 setResizeThreshold();
359 // private void processEntry(KeyValue entry, SlotIndexer indexer) {
360 // IoTString key=entry.getKey();
361 // KeyValue oldvalue=table.get(key);
362 // if (oldvalue != null) {
363 // oldvalue.setDead();
365 // table.put(key, entry);
368 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
369 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
372 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
373 long oldseqnum = entry.getOldSeqNum();
374 long newseqnum = entry.getNewSeqNum();
375 boolean isequal = entry.getEqual();
376 long machineid = entry.getMachineID();
377 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
378 Slot slot = indexer.getSlot(seqnum);
380 long slotmachineid = slot.getMachineID();
381 if (isequal != (slotmachineid == machineid)) {
382 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
387 HashSet<Long> watchset = new HashSet<Long>();
388 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
389 long entry_mid = lastmsg_entry.getKey();
390 /* We've seen it, don't need to continue to watch. Our next
391 * message will implicitly acknowledge it. */
392 if (entry_mid == localmachineid)
394 Pair<Long, Liveness> v = lastmsg_entry.getValue();
395 long entry_seqn = v.getFirst();
396 if (entry_seqn < newseqnum) {
397 addWatchList(entry_mid, entry);
398 watchset.add(entry_mid);
401 if (watchset.isEmpty())
404 entry.setWatchSet(watchset);
407 private void addWatchList(long machineid, RejectedMessage entry) {
408 HashSet<RejectedMessage> entries = watchlist.get(machineid);
410 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
414 private void processEntry(TableStatus entry, SlotIndexer indexer) {
415 int newnumslots = entry.getMaxSlots();
416 updateCurrMaxSize(newnumslots);
417 if (lastTableStatus != null)
418 lastTableStatus.setDead();
419 lastTableStatus = entry;
422 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
423 machineSet.remove(machineid);
425 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
426 if (watchset != null) {
427 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
428 RejectedMessage rm = rmit.next();
429 if (rm.getNewSeqNum() <= seqnum) {
430 /* Remove it from our watchlist */
432 /* Decrement machines that need to see this notification */
433 rm.removeWatcher(machineid);
438 if (machineid == localmachineid) {
439 /* Our own messages are immediately dead. */
440 if (liveness instanceof LastMessage) {
441 ((LastMessage)liveness).setDead();
442 } else if (liveness instanceof Slot) {
443 ((Slot)liveness).setDead();
445 throw new Error("Unrecognized type");
450 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
451 if (lastmsgentry == null)
454 long lastmsgseqnum = lastmsgentry.getFirst();
455 Liveness lastentry = lastmsgentry.getSecond();
456 if (machineid != localmachineid) {
457 if (lastentry instanceof LastMessage) {
458 ((LastMessage)lastentry).setDead();
459 } else if (lastentry instanceof Slot) {
460 ((Slot)lastentry).setDead();
462 throw new Error("Unrecognized type");
466 if (machineid == localmachineid) {
467 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
468 throw new Error("Server Error: Mismatch on local machine sequence number");
470 if (lastmsgseqnum > seqnum)
471 throw new Error("Server Error: Rollback on remote machine sequence number");
475 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
476 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
477 for (Entry entry : slot.getEntries()) {
478 switch (entry.getType()) {
479 // case Entry.TypeKeyValue:
480 // processEntry((KeyValue)entry, indexer);
483 case Entry.TypeLastMessage:
484 processEntry((LastMessage)entry, indexer, machineSet);
487 case Entry.TypeRejectedMessage:
488 processEntry((RejectedMessage)entry, indexer);
491 case Entry.TypeTableStatus:
492 processEntry((TableStatus)entry, indexer);
496 throw new Error("Unrecognized type: " + entry.getType());
501 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
502 for (int i = 0; i < newslots.length; i++) {
503 Slot currslot = newslots[i];
504 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
505 if (prevslot != null &&
506 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
507 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);