2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
13 import java.util.Collection;
16 * IoTTable data structure. Provides client inferface.
17 * @author Brian Demsky
21 final public class Table {
22 private int numslots; //number of slots stored in buffer
24 //table of key-value pairs
25 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
27 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
28 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
30 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
31 private Vector<Long> rejectedmessagelist = new Vector<Long>();
32 private SlotBuffer buffer;
33 private CloudComm cloud;
34 private long sequencenumber; //Largest sequence number a client has received
35 private long localmachineid;
36 private TableStatus lastTableStatus;
37 static final int FREE_SLOTS = 10; //number of slots that should be kept free
38 static final int SKIP_THRESHOLD = 10;
39 private long liveslotcount = 0;
41 static final double RESIZE_MULTIPLE = 1.2;
42 static final double RESIZE_THRESHOLD = 0.75;
43 static final int REJECTED_THRESHOLD = 5;
44 private int resizethreshold;
45 private long lastliveslotseqn; //smallest sequence number with a live entry
46 private Random random = new Random();
48 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
49 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
50 private List<Commit> commitList = null; // List of all the most recent live commits
51 private Set<Abort> abortSet = null; // Set of the live aborts
52 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
53 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
54 private List<Transaction> uncommittedTransactionsList = null; //
55 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
56 // private Set<Abort> arbitratorTable = null; // Table of arbitrators
59 public Table(String baseurl, String password, long _localmachineid) {
60 localmachineid = _localmachineid;
61 buffer = new SlotBuffer();
62 numslots = buffer.capacity();
65 cloud = new CloudComm(this, baseurl, password);
68 pendingTransQueue = new LinkedList<PendingTransaction>();
69 commitList = new LinkedList<Commit>();
70 abortSet = new HashSet<Abort>();
71 commitedTable = new HashMap<IoTString, KeyValue>();
72 speculativeTable = new HashMap<IoTString, KeyValue>();
73 uncommittedTransactionsList = new LinkedList<Transaction>();
74 arbitratorTable = new HashMap<IoTString, Long>();
77 public Table(CloudComm _cloud, long _localmachineid) {
78 localmachineid = _localmachineid;
79 buffer = new SlotBuffer();
80 numslots = buffer.capacity();
85 pendingTransQueue = new LinkedList<PendingTransaction>();
86 commitList = new LinkedList<Commit>();
87 abortSet = new HashSet<Abort>();
88 commitedTable = new HashMap<IoTString, KeyValue>();
89 speculativeTable = new HashMap<IoTString, KeyValue>();
90 uncommittedTransactionsList = new LinkedList<Transaction>();
91 arbitratorTable = new HashMap<IoTString, Long>();
94 public void rebuild() {
95 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
96 validateandupdate(newslots, true);
101 public IoTString getCommitted(IoTString key) {
102 KeyValue kv = commitedTable.get(key);
104 return kv.getValue();
110 public IoTString getSpeculative(IoTString key) {
111 KeyValue kv = speculativeTable.get(key);
113 return kv.getValue();
120 public void initTable() {
121 cloud.setSalt();//Set the salt
122 Slot s = new Slot(this, 1, localmachineid);
123 TableStatus status = new TableStatus(s, numslots);
125 Slot[] array = cloud.putSlot(s, numslots);
127 array = new Slot[] {s};
128 /* update data structure */
129 validateandupdate(array, true);
131 throw new Error("Error on initialization");
135 public String toString() {
138 String retString = " Committed Table: \n";
139 retString += "---------------------------\n";
140 retString += commitedTable.toString();
144 retString += " Speculative Table: \n";
145 retString += "---------------------------\n";
146 retString += speculativeTable.toString();
156 public void startTransaction() {
157 // Create a new transaction, invalidates any old pending transactions.
158 pendingTransBuild = new PendingTransaction();
161 public void commitTransaction() {
163 if (pendingTransBuild.getKVUpdates().size() == 0) {
164 // If no updates are made then there is no point inserting into the chain
168 // Add the pending transaction to the queue
169 pendingTransQueue.add(pendingTransBuild);
171 while (!pendingTransQueue.isEmpty()) {
172 if (tryput( pendingTransQueue.peek(), false)) {
173 pendingTransQueue.poll();
178 public void addKV(IoTString key, IoTString value) {
180 // Make sure new key value pair matches the current arbitrator
181 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
182 // TODO: Maybe not throw and error
183 throw new Error("Not all Key Values match");
188 KeyValue kv = new KeyValue(key, value);
189 pendingTransBuild.addKV(kv);
192 public void addGuard(Guard guard) {
193 pendingTransBuild.addGuard(guard);
196 public void update() {
197 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
199 validateandupdate(newslots, false);
201 if (uncommittedTransactionsList.size() > 0) {
202 List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
204 for (Transaction ut : uncommittedTransactionsList) {
205 KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
206 long arb = arbitratorTable.get(kv.getKey());
208 if (arb == localmachineid) {
209 uncommittedTransArb.add(ut);
214 boolean needResize = false;
215 while (uncommittedTransArb.size() > 0) {
216 boolean resize = needResize;
219 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
221 if (liveslotcount > resizethreshold) {
222 resize = true; //Resize is forced
226 newsize = (int) (numslots * RESIZE_MULTIPLE);
227 TableStatus status = new TableStatus(s, newsize);
231 if (! rejectedmessagelist.isEmpty()) {
232 /* TODO: We should avoid generating a rejected message entry if
233 * there is already a sufficient entry in the queue (e.g.,
234 * equalsto value of true and same sequence number). */
236 long old_seqn = rejectedmessagelist.firstElement();
237 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
238 long new_seqn = rejectedmessagelist.lastElement();
239 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
244 /* Go through list of missing messages */
245 for (; i < rejectedmessagelist.size(); i++) {
246 long curr_seqn = rejectedmessagelist.get(i);
247 Slot s_msg = buffer.getSlot(curr_seqn);
250 prev_seqn = curr_seqn;
252 /* Generate rejected message entry for missing messages */
253 if (prev_seqn != -1) {
254 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
257 /* Generate rejected message entries for present messages */
258 for (; i < rejectedmessagelist.size(); i++) {
259 long curr_seqn = rejectedmessagelist.get(i);
260 Slot s_msg = buffer.getSlot(curr_seqn);
261 long machineid = s_msg.getMachineID();
262 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
268 long newestseqnum = buffer.getNewestSeqNum();
269 long oldestseqnum = buffer.getOldestSeqNum();
270 if (lastliveslotseqn < oldestseqnum) {
271 lastliveslotseqn = oldestseqnum;
274 long seqn = lastliveslotseqn;
275 boolean seenliveslot = false;
276 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
277 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
279 boolean tryAgain = false;
282 for (; seqn < threshold; seqn++) {
283 Slot prevslot = buffer.getSlot(seqn);
284 //Push slot number forward
286 lastliveslotseqn = seqn;
288 if (! prevslot.isLive())
291 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
292 for (Entry liveentry : liveentries) {
293 if (s.hasSpace(liveentry)) {
294 s.addEntry(liveentry);
295 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
297 System.out.print("B"); //?
310 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
311 for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
312 Transaction ut = i.next();
314 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
315 // Check if this machine arbitrates for this transaction
316 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
320 Entry newEntry = null;
323 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
324 // Guard evaluated as true
326 // update the local tmp current key set
327 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
328 speculativeTableTmp.put(kv.getKey(), kv);
332 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
337 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
339 } catch (Exception e) {
343 if ((newEntry != null) && s.hasSpace(newEntry)) {
344 s.addEntry(newEntry);
352 /* now go through live entries from least to greatest sequence number until
353 * either all live slots added, or the slot doesn't have enough room
354 * for SKIP_THRESHOLD consecutive entries*/
357 for (; seqn <= newestseqnum; seqn++) {
358 Slot prevslot = buffer.getSlot(seqn);
359 //Push slot number forward
361 lastliveslotseqn = seqn;
363 if (!prevslot.isLive())
366 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
367 for (Entry liveentry : liveentries) {
368 if (s.hasSpace(liveentry))
369 s.addEntry(liveentry);
372 if (skipcount > SKIP_THRESHOLD)
381 Slot[] array = cloud.putSlot(s, max);
383 array = new Slot[] {s};
384 rejectedmessagelist.clear();
386 if (array.length == 0)
387 throw new Error("Server Error: Did not send any slots");
388 rejectedmessagelist.add(s.getSequenceNumber());
391 /* update data structure */
392 validateandupdate(array, true);
399 public boolean createNewKey(IoTString keyName, long machineId) {
402 if (arbitratorTable.get(keyName) != null) {
403 // There is already an arbitrator
407 if (tryput(keyName, machineId, false)) {
409 // If successfully inserted
415 void decrementLiveCount() {
420 private void setResizeThreshold() {
421 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
422 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
425 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
426 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
428 if (liveslotcount > resizethreshold) {
429 resize = true; //Resize is forced
433 newsize = (int) (numslots * RESIZE_MULTIPLE);
434 TableStatus status = new TableStatus(s, newsize);
438 if (! rejectedmessagelist.isEmpty()) {
439 /* TODO: We should avoid generating a rejected message entry if
440 * there is already a sufficient entry in the queue (e.g.,
441 * equalsto value of true and same sequence number). */
443 long old_seqn = rejectedmessagelist.firstElement();
444 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
445 long new_seqn = rejectedmessagelist.lastElement();
446 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
451 /* Go through list of missing messages */
452 for (; i < rejectedmessagelist.size(); i++) {
453 long curr_seqn = rejectedmessagelist.get(i);
454 Slot s_msg = buffer.getSlot(curr_seqn);
457 prev_seqn = curr_seqn;
459 /* Generate rejected message entry for missing messages */
460 if (prev_seqn != -1) {
461 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
464 /* Generate rejected message entries for present messages */
465 for (; i < rejectedmessagelist.size(); i++) {
466 long curr_seqn = rejectedmessagelist.get(i);
467 Slot s_msg = buffer.getSlot(curr_seqn);
468 long machineid = s_msg.getMachineID();
469 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
475 long newestseqnum = buffer.getNewestSeqNum();
476 long oldestseqnum = buffer.getOldestSeqNum();
477 if (lastliveslotseqn < oldestseqnum)
478 lastliveslotseqn = oldestseqnum;
480 long seqn = lastliveslotseqn;
481 boolean seenliveslot = false;
482 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
483 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
487 for (; seqn < threshold; seqn++) {
488 Slot prevslot = buffer.getSlot(seqn);
489 //Push slot number forward
491 lastliveslotseqn = seqn;
493 if (! prevslot.isLive())
496 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
497 for (Entry liveentry : liveentries) {
498 if (s.hasSpace(liveentry)) {
499 s.addEntry(liveentry);
500 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
502 System.out.print("B"); //?
503 return tryput(pendingTrans, true);
511 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
512 for (Transaction ut : uncommittedTransactionsList) {
514 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
515 // Check if this machine arbitrates for this transaction
516 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
520 Entry newEntry = null;
523 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
524 // Guard evaluated as true
526 // update the local tmp current key set
527 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
528 speculativeTableTmp.put(kv.getKey(), kv);
532 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
537 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
539 } catch (Exception e) {
543 if ((newEntry != null) && s.hasSpace(newEntry)) {
544 s.addEntry(newEntry);
550 Transaction trans = new Transaction(s,
551 s.getSequenceNumber(),
553 pendingTrans.getKVUpdates(),
554 pendingTrans.getGuard());
555 boolean insertedTrans = false;
556 if (s.hasSpace(trans)) {
558 insertedTrans = true;
561 /* now go through live entries from least to greatest sequence number until
562 * either all live slots added, or the slot doesn't have enough room
563 * for SKIP_THRESHOLD consecutive entries*/
566 for (; seqn <= newestseqnum; seqn++) {
567 Slot prevslot = buffer.getSlot(seqn);
568 //Push slot number forward
570 lastliveslotseqn = seqn;
572 if (!prevslot.isLive())
575 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
576 for (Entry liveentry : liveentries) {
577 if (s.hasSpace(liveentry))
578 s.addEntry(liveentry);
581 if (skipcount > SKIP_THRESHOLD)
590 Slot[] array = cloud.putSlot(s, max);
592 array = new Slot[] {s};
593 rejectedmessagelist.clear();
595 if (array.length == 0)
596 throw new Error("Server Error: Did not send any slots");
597 rejectedmessagelist.add(s.getSequenceNumber());
598 insertedTrans = false;
601 /* update data structure */
602 validateandupdate(array, true);
604 return insertedTrans;
607 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
608 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
610 if (liveslotcount > resizethreshold) {
611 resize = true; //Resize is forced
615 newsize = (int) (numslots * RESIZE_MULTIPLE);
616 TableStatus status = new TableStatus(s, newsize);
620 if (! rejectedmessagelist.isEmpty()) {
621 /* TODO: We should avoid generating a rejected message entry if
622 * there is already a sufficient entry in the queue (e.g.,
623 * equalsto value of true and same sequence number). */
625 long old_seqn = rejectedmessagelist.firstElement();
626 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
627 long new_seqn = rejectedmessagelist.lastElement();
628 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
633 /* Go through list of missing messages */
634 for (; i < rejectedmessagelist.size(); i++) {
635 long curr_seqn = rejectedmessagelist.get(i);
636 Slot s_msg = buffer.getSlot(curr_seqn);
639 prev_seqn = curr_seqn;
641 /* Generate rejected message entry for missing messages */
642 if (prev_seqn != -1) {
643 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
646 /* Generate rejected message entries for present messages */
647 for (; i < rejectedmessagelist.size(); i++) {
648 long curr_seqn = rejectedmessagelist.get(i);
649 Slot s_msg = buffer.getSlot(curr_seqn);
650 long machineid = s_msg.getMachineID();
651 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
657 long newestseqnum = buffer.getNewestSeqNum();
658 long oldestseqnum = buffer.getOldestSeqNum();
659 if (lastliveslotseqn < oldestseqnum)
660 lastliveslotseqn = oldestseqnum;
662 long seqn = lastliveslotseqn;
663 boolean seenliveslot = false;
664 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
665 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
669 for (; seqn < threshold; seqn++) {
670 Slot prevslot = buffer.getSlot(seqn);
671 //Push slot number forward
673 lastliveslotseqn = seqn;
675 if (! prevslot.isLive())
678 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
679 for (Entry liveentry : liveentries) {
680 if (s.hasSpace(liveentry)) {
681 s.addEntry(liveentry);
682 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
684 System.out.print("B"); //?
685 return tryput(keyName, arbMachineid, true);
693 // Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
694 // for (Transaction ut : uncommittedTransactionsList) {
696 // KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
697 // // Check if this machine arbitrates for this transaction
698 // if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
702 // Entry newEntry = null;
705 // if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
706 // // Guard evaluated as true
708 // // update the local tmp current key set
709 // for (KeyValue kv : ut.getkeyValueUpdateSet()) {
710 // speculativeTableTmp.put(kv.getKey(), kv);
713 // // create the commit
714 // newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
716 // // Guard was false
718 // // create the abort
719 // newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
721 // } catch (Exception e) {
722 // e.printStackTrace();
725 // if ((newEntry != null) && s.hasSpace(newEntry)) {
727 // // TODO: Remove print
728 // System.out.println("Arbitrating...");
729 // s.addEntry(newEntry);
736 NewKey newKey = new NewKey(s, keyName, arbMachineid);
738 boolean insertedNewKey = false;
739 if (s.hasSpace(newKey)) {
741 insertedNewKey = true;
744 /* now go through live entries from least to greatest sequence number until
745 * either all live slots added, or the slot doesn't have enough room
746 * for SKIP_THRESHOLD consecutive entries*/
749 for (; seqn <= newestseqnum; seqn++) {
750 Slot prevslot = buffer.getSlot(seqn);
751 //Push slot number forward
753 lastliveslotseqn = seqn;
755 if (!prevslot.isLive())
758 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
759 for (Entry liveentry : liveentries) {
760 if (s.hasSpace(liveentry))
761 s.addEntry(liveentry);
764 if (skipcount > SKIP_THRESHOLD)
773 Slot[] array = cloud.putSlot(s, max);
775 array = new Slot[] {s};
776 rejectedmessagelist.clear();
778 if (array.length == 0)
779 throw new Error("Server Error: Did not send any slots");
780 rejectedmessagelist.add(s.getSequenceNumber());
781 insertedNewKey = false;
784 /* update data structure */
785 validateandupdate(array, true);
787 return insertedNewKey;
790 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
791 /* The cloud communication layer has checked slot HMACs already
793 if (newslots.length == 0) return;
795 long firstseqnum = newslots[0].getSequenceNumber();
796 if (firstseqnum <= sequencenumber) {
797 throw new Error("Server Error: Sent older slots!");
800 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
801 checkHMACChain(indexer, newslots);
803 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
805 initExpectedSize(firstseqnum);
806 for (Slot slot : newslots) {
807 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
808 updateExpectedSize();
811 /* If there is a gap, check to see if the server sent us everything. */
812 if (firstseqnum != (sequencenumber + 1)) {
815 checkNumSlots(newslots.length);
816 if (!machineSet.isEmpty()) {
817 throw new Error("Missing record for machines: " + machineSet);
823 /* Commit new to slots. */
824 for (Slot slot : newslots) {
825 buffer.putSlot(slot);
828 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
830 // Speculate on key value pairs
831 createSpeculativeTable();
834 private void createSpeculativeTable() {
835 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
837 for (Transaction trans : uncommittedTransactionsList) {
840 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
841 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
842 speculativeTableTmp.put(kv.getKey(), kv);
846 } catch (Exception e) {
851 speculativeTable = speculativeTableTmp;
854 private int expectedsize, currmaxsize;
856 private void checkNumSlots(int numslots) {
857 if (numslots != expectedsize) {
858 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
862 private void initExpectedSize(long firstsequencenumber) {
863 long prevslots = firstsequencenumber;
864 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
865 currmaxsize = numslots;
868 private void updateExpectedSize() {
870 if (expectedsize > currmaxsize) {
871 expectedsize = currmaxsize;
875 private void updateCurrMaxSize(int newmaxsize) {
876 currmaxsize = newmaxsize;
879 private void commitNewMaxSize() {
880 if (numslots != currmaxsize)
881 buffer.resize(currmaxsize);
883 numslots = currmaxsize;
884 setResizeThreshold();
891 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
892 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
895 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
896 long oldseqnum = entry.getOldSeqNum();
897 long newseqnum = entry.getNewSeqNum();
898 boolean isequal = entry.getEqual();
899 long machineid = entry.getMachineID();
900 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
901 Slot slot = indexer.getSlot(seqnum);
903 long slotmachineid = slot.getMachineID();
904 if (isequal != (slotmachineid == machineid)) {
905 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
910 HashSet<Long> watchset = new HashSet<Long>();
911 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
912 long entry_mid = lastmsg_entry.getKey();
913 /* We've seen it, don't need to continue to watch. Our next
914 * message will implicitly acknowledge it. */
915 if (entry_mid == localmachineid)
917 Pair<Long, Liveness> v = lastmsg_entry.getValue();
918 long entry_seqn = v.getFirst();
919 if (entry_seqn < newseqnum) {
920 addWatchList(entry_mid, entry);
921 watchset.add(entry_mid);
924 if (watchset.isEmpty())
927 entry.setWatchSet(watchset);
930 private void processEntry(NewKey entry) {
931 arbitratorTable.put(entry.getKey(), entry.getMachineID());
934 private void processEntry(Transaction entry) {
935 uncommittedTransactionsList.add(entry);
938 private void processEntry(Abort entry) {
941 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
942 // Abort has not been seen yet so we need to keep track of it
945 // The machine already saw this so it is dead
949 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
950 Transaction prevtrans = i.next();
951 if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
952 uncommittedTransactionsList.remove(prevtrans);
959 private void processEntry(Commit entry) {
961 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
962 Commit prevcommit = i.next();
963 prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
965 if (!prevcommit.isLive()) {
966 //commitList.remove(prevcommit);
971 commitList.add(entry);
973 // Update the committed table list
974 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
975 IoTString key = kv.getKey();
976 commitedTable.put(key, kv);
979 long committedTransSeq = entry.getTransSequenceNumber();
981 // Make dead the transactions
982 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
983 Transaction prevtrans = i.next();
985 if (prevtrans.getSequenceNumber() <= committedTransSeq) {
986 // uncommittedTransactionsList.remove(prevtrans);
993 private void processEntry(TableStatus entry) {
994 int newnumslots = entry.getMaxSlots();
995 updateCurrMaxSize(newnumslots);
996 if (lastTableStatus != null)
997 lastTableStatus.setDead();
998 lastTableStatus = entry;
1002 private void addWatchList(long machineid, RejectedMessage entry) {
1003 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1004 if (entries == null)
1005 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1009 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1010 machineSet.remove(machineid);
1012 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1013 if (watchset != null) {
1014 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1015 RejectedMessage rm = rmit.next();
1016 if (rm.getNewSeqNum() <= seqnum) {
1017 /* Remove it from our watchlist */
1019 /* Decrement machines that need to see this notification */
1020 rm.removeWatcher(machineid);
1025 if (machineid == localmachineid) {
1026 /* Our own messages are immediately dead. */
1027 if (liveness instanceof LastMessage) {
1028 ((LastMessage)liveness).setDead();
1029 } else if (liveness instanceof Slot) {
1030 ((Slot)liveness).setDead();
1032 throw new Error("Unrecognized type");
1036 // Set dead the abort
1037 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
1038 Abort abort = ait.next();
1040 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1047 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1048 if (lastmsgentry == null)
1051 long lastmsgseqnum = lastmsgentry.getFirst();
1052 Liveness lastentry = lastmsgentry.getSecond();
1053 if (machineid != localmachineid) {
1054 if (lastentry instanceof LastMessage) {
1055 ((LastMessage)lastentry).setDead();
1056 } else if (lastentry instanceof Slot) {
1057 ((Slot)lastentry).setDead();
1059 throw new Error("Unrecognized type");
1063 if (machineid == localmachineid) {
1064 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1065 throw new Error("Server Error: Mismatch on local machine sequence number");
1067 if (lastmsgseqnum > seqnum)
1068 throw new Error("Server Error: Rollback on remote machine sequence number");
1072 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1073 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1074 for (Entry entry : slot.getEntries()) {
1075 switch (entry.getType()) {
1077 case Entry.TypeNewKey:
1078 processEntry((NewKey)entry);
1081 case Entry.TypeCommit:
1082 processEntry((Commit)entry);
1085 case Entry.TypeAbort:
1086 processEntry((Abort)entry);
1089 case Entry.TypeTransaction:
1090 processEntry((Transaction)entry);
1093 case Entry.TypeLastMessage:
1094 processEntry((LastMessage)entry, machineSet);
1097 case Entry.TypeRejectedMessage:
1098 processEntry((RejectedMessage)entry, indexer);
1101 case Entry.TypeTableStatus:
1102 processEntry((TableStatus)entry);
1106 throw new Error("Unrecognized type: " + entry.getType());
1111 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1112 for (int i = 0; i < newslots.length; i++) {
1113 Slot currslot = newslots[i];
1114 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1115 if (prevslot != null &&
1116 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1117 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);