2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
13 import java.util.Collection;
16 * IoTTable data structure. Provides client inferface.
17 * @author Brian Demsky
21 final public class Table {
22 private int numslots; //number of slots stored in buffer
24 //table of key-value pairs
25 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
27 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
28 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
30 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
31 private Vector<Long> rejectedmessagelist = new Vector<Long>();
32 private SlotBuffer buffer;
33 private CloudComm cloud;
34 private long sequencenumber; //Largest sequence number a client has received
35 private long localmachineid;
36 private TableStatus lastTableStatus;
37 static final int FREE_SLOTS = 10; //number of slots that should be kept free
38 static final int SKIP_THRESHOLD = 10;
39 private long liveslotcount = 0;
41 static final double RESIZE_MULTIPLE = 1.2;
42 static final double RESIZE_THRESHOLD = 0.75;
43 static final int REJECTED_THRESHOLD = 5;
44 private int resizethreshold;
45 private long lastliveslotseqn; //smallest sequence number with a live entry
46 private Random random = new Random();
48 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
49 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
50 private List<Commit> commitList = null; // List of all the most recent live commits
51 private Set<Abort> abortSet = null; // Set of the live aborts
52 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
53 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
54 private List<Transaction> uncommittedTransactionsList = null; //
55 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
56 // private Set<Abort> arbitratorTable = null; // Table of arbitrators
59 public Table(String baseurl, String password, long _localmachineid) {
60 localmachineid = _localmachineid;
61 buffer = new SlotBuffer();
62 numslots = buffer.capacity();
65 cloud = new CloudComm(this, baseurl, password);
68 pendingTransQueue = new LinkedList<PendingTransaction>();
69 commitList = new LinkedList<Commit>();
70 abortSet = new HashSet<Abort>();
71 commitedTable = new HashMap<IoTString, KeyValue>();
72 speculativeTable = new HashMap<IoTString, KeyValue>();
73 uncommittedTransactionsList = new LinkedList<Transaction>();
74 arbitratorTable = new HashMap<IoTString, Long>();
77 public Table(CloudComm _cloud, long _localmachineid) {
78 localmachineid = _localmachineid;
79 buffer = new SlotBuffer();
80 numslots = buffer.capacity();
85 pendingTransQueue = new LinkedList<PendingTransaction>();
86 commitList = new LinkedList<Commit>();
87 abortSet = new HashSet<Abort>();
88 commitedTable = new HashMap<IoTString, KeyValue>();
89 speculativeTable = new HashMap<IoTString, KeyValue>();
90 uncommittedTransactionsList = new LinkedList<Transaction>();
91 arbitratorTable = new HashMap<IoTString, Long>();
94 public void rebuild() {
95 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
96 validateandupdate(newslots, true);
101 public IoTString getCommitted(IoTString key) {
102 KeyValue kv = commitedTable.get(key);
104 return kv.getValue();
110 public IoTString getSpeculative(IoTString key) {
111 KeyValue kv = speculativeTable.get(key);
113 return kv.getValue();
120 public void initTable() {
121 cloud.setSalt();//Set the salt
122 Slot s = new Slot(this, 1, localmachineid);
123 TableStatus status = new TableStatus(s, numslots);
125 Slot[] array = cloud.putSlot(s, numslots);
127 array = new Slot[] {s};
128 /* update data structure */
129 validateandupdate(array, true);
131 throw new Error("Error on initialization");
135 public String toString() {
138 String retString = " Committed Table: \n";
139 retString += "---------------------------\n";
140 retString += commitedTable.toString();
144 retString += " Speculative Table: \n";
145 retString += "---------------------------\n";
146 retString += speculativeTable.toString();
156 public void startTransaction() {
157 // Create a new transaction, invalidates any old pending transactions.
158 pendingTransBuild = new PendingTransaction();
161 public void commitTransaction() {
163 if (pendingTransBuild.getKVUpdates().size() == 0) {
164 // If no updates are made then there is no point inserting into the chain
168 // Add the pending transaction to the queue
169 pendingTransQueue.add(pendingTransBuild);
171 while (!pendingTransQueue.isEmpty()) {
172 if (tryput( pendingTransQueue.peek(), false)) {
173 pendingTransQueue.poll();
178 public void addKV(IoTString key, IoTString value) {
180 // Make sure new key value pair matches the current arbitrator
181 if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
182 // TODO: Maybe not throw and error
183 throw new Error("Not all Key Values match");
188 KeyValue kv = new KeyValue(key, value);
189 pendingTransBuild.addKV(kv);
194 public void addGuard(IoTString key, IoTString value) {
195 KeyValue kv = new KeyValue(key, value);
196 pendingTransBuild.addKV(kv);
199 public void update() {
200 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
202 validateandupdate(newslots, false);
205 public boolean createNewKey(IoTString keyName, long machineId) {
208 if (arbitratorTable.get(keyName) != null) {
209 // There is already an arbitrator
213 if (tryput(keyName, machineId, false)) {
215 // If successfully inserted
221 void decrementLiveCount() {
226 private void setResizeThreshold() {
227 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
228 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
231 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
232 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
234 if (liveslotcount > resizethreshold) {
235 resize = true; //Resize is forced
239 newsize = (int) (numslots * RESIZE_MULTIPLE);
240 TableStatus status = new TableStatus(s, newsize);
244 if (! rejectedmessagelist.isEmpty()) {
245 /* TODO: We should avoid generating a rejected message entry if
246 * there is already a sufficient entry in the queue (e.g.,
247 * equalsto value of true and same sequence number). */
249 long old_seqn = rejectedmessagelist.firstElement();
250 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
251 long new_seqn = rejectedmessagelist.lastElement();
252 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
257 /* Go through list of missing messages */
258 for (; i < rejectedmessagelist.size(); i++) {
259 long curr_seqn = rejectedmessagelist.get(i);
260 Slot s_msg = buffer.getSlot(curr_seqn);
263 prev_seqn = curr_seqn;
265 /* Generate rejected message entry for missing messages */
266 if (prev_seqn != -1) {
267 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
270 /* Generate rejected message entries for present messages */
271 for (; i < rejectedmessagelist.size(); i++) {
272 long curr_seqn = rejectedmessagelist.get(i);
273 Slot s_msg = buffer.getSlot(curr_seqn);
274 long machineid = s_msg.getMachineID();
275 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
281 long newestseqnum = buffer.getNewestSeqNum();
282 long oldestseqnum = buffer.getOldestSeqNum();
283 if (lastliveslotseqn < oldestseqnum)
284 lastliveslotseqn = oldestseqnum;
286 long seqn = lastliveslotseqn;
287 boolean seenliveslot = false;
288 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
289 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
293 for (; seqn < threshold; seqn++) {
294 Slot prevslot = buffer.getSlot(seqn);
295 //Push slot number forward
297 lastliveslotseqn = seqn;
299 if (! prevslot.isLive())
302 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
303 for (Entry liveentry : liveentries) {
304 if (s.hasSpace(liveentry)) {
305 s.addEntry(liveentry);
306 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
308 System.out.print("B"); //?
309 return tryput(pendingTrans, true);
317 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
318 for (Transaction ut : uncommittedTransactionsList) {
320 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
321 // Check if this machine arbitrates for this transaction
322 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
326 Entry newEntry = null;
329 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
330 // Guard evaluated as true
332 // update the local tmp current key set
333 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
334 speculativeTableTmp.put(kv.getKey(), kv);
338 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
343 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
345 } catch (Exception e) {
349 if ((newEntry != null) && s.hasSpace(newEntry)) {
350 s.addEntry(newEntry);
356 Transaction trans = new Transaction(s,
357 s.getSequenceNumber(),
359 pendingTrans.getKVUpdates(),
360 pendingTrans.getGuard());
361 boolean insertedTrans = false;
362 if (s.hasSpace(trans)) {
364 insertedTrans = true;
367 /* now go through live entries from least to greatest sequence number until
368 * either all live slots added, or the slot doesn't have enough room
369 * for SKIP_THRESHOLD consecutive entries*/
372 for (; seqn <= newestseqnum; seqn++) {
373 Slot prevslot = buffer.getSlot(seqn);
374 //Push slot number forward
376 lastliveslotseqn = seqn;
378 if (!prevslot.isLive())
381 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
382 for (Entry liveentry : liveentries) {
383 if (s.hasSpace(liveentry))
384 s.addEntry(liveentry);
387 if (skipcount > SKIP_THRESHOLD)
396 Slot[] array = cloud.putSlot(s, max);
398 array = new Slot[] {s};
399 rejectedmessagelist.clear();
401 if (array.length == 0)
402 throw new Error("Server Error: Did not send any slots");
403 rejectedmessagelist.add(s.getSequenceNumber());
404 insertedTrans = false;
407 /* update data structure */
408 validateandupdate(array, true);
410 return insertedTrans;
413 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
414 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
416 if (liveslotcount > resizethreshold) {
417 resize = true; //Resize is forced
421 newsize = (int) (numslots * RESIZE_MULTIPLE);
422 TableStatus status = new TableStatus(s, newsize);
426 if (! rejectedmessagelist.isEmpty()) {
427 /* TODO: We should avoid generating a rejected message entry if
428 * there is already a sufficient entry in the queue (e.g.,
429 * equalsto value of true and same sequence number). */
431 long old_seqn = rejectedmessagelist.firstElement();
432 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
433 long new_seqn = rejectedmessagelist.lastElement();
434 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
439 /* Go through list of missing messages */
440 for (; i < rejectedmessagelist.size(); i++) {
441 long curr_seqn = rejectedmessagelist.get(i);
442 Slot s_msg = buffer.getSlot(curr_seqn);
445 prev_seqn = curr_seqn;
447 /* Generate rejected message entry for missing messages */
448 if (prev_seqn != -1) {
449 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
452 /* Generate rejected message entries for present messages */
453 for (; i < rejectedmessagelist.size(); i++) {
454 long curr_seqn = rejectedmessagelist.get(i);
455 Slot s_msg = buffer.getSlot(curr_seqn);
456 long machineid = s_msg.getMachineID();
457 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
463 long newestseqnum = buffer.getNewestSeqNum();
464 long oldestseqnum = buffer.getOldestSeqNum();
465 if (lastliveslotseqn < oldestseqnum)
466 lastliveslotseqn = oldestseqnum;
468 long seqn = lastliveslotseqn;
469 boolean seenliveslot = false;
470 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
471 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
475 for (; seqn < threshold; seqn++) {
476 Slot prevslot = buffer.getSlot(seqn);
477 //Push slot number forward
479 lastliveslotseqn = seqn;
481 if (! prevslot.isLive())
484 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
485 for (Entry liveentry : liveentries) {
486 if (s.hasSpace(liveentry)) {
487 s.addEntry(liveentry);
488 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
490 System.out.print("B"); //?
491 return tryput(keyName, arbMachineid, true);
499 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
500 for (Transaction ut : uncommittedTransactionsList) {
502 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
503 // Check if this machine arbitrates for this transaction
504 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
508 Entry newEntry = null;
511 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
512 // Guard evaluated as true
514 // update the local tmp current key set
515 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
516 speculativeTableTmp.put(kv.getKey(), kv);
520 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
525 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
527 } catch (Exception e) {
531 if ((newEntry != null) && s.hasSpace(newEntry)) {
532 s.addEntry(newEntry);
539 NewKey newKey = new NewKey(s, keyName, arbMachineid);
541 boolean insertedNewKey = false;
542 if (s.hasSpace(newKey)) {
544 insertedNewKey = true;
547 /* now go through live entries from least to greatest sequence number until
548 * either all live slots added, or the slot doesn't have enough room
549 * for SKIP_THRESHOLD consecutive entries*/
552 for (; seqn <= newestseqnum; seqn++) {
553 Slot prevslot = buffer.getSlot(seqn);
554 //Push slot number forward
556 lastliveslotseqn = seqn;
558 if (!prevslot.isLive())
561 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
562 for (Entry liveentry : liveentries) {
563 if (s.hasSpace(liveentry))
564 s.addEntry(liveentry);
567 if (skipcount > SKIP_THRESHOLD)
576 Slot[] array = cloud.putSlot(s, max);
578 array = new Slot[] {s};
579 rejectedmessagelist.clear();
581 if (array.length == 0)
582 throw new Error("Server Error: Did not send any slots");
583 rejectedmessagelist.add(s.getSequenceNumber());
584 insertedNewKey = false;
587 /* update data structure */
588 validateandupdate(array, true);
590 return insertedNewKey;
593 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
594 /* The cloud communication layer has checked slot HMACs already
596 if (newslots.length == 0) return;
598 long firstseqnum = newslots[0].getSequenceNumber();
599 if (firstseqnum <= sequencenumber) {
600 throw new Error("Server Error: Sent older slots!");
603 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
604 checkHMACChain(indexer, newslots);
606 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
608 initExpectedSize(firstseqnum);
609 for (Slot slot : newslots) {
610 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
611 updateExpectedSize();
614 /* If there is a gap, check to see if the server sent us everything. */
615 if (firstseqnum != (sequencenumber + 1)) {
618 checkNumSlots(newslots.length);
619 if (!machineSet.isEmpty()) {
620 throw new Error("Missing record for machines: " + machineSet);
626 /* Commit new to slots. */
627 for (Slot slot : newslots) {
628 buffer.putSlot(slot);
631 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
633 // Speculate on key value pairs
634 createSpeculativeTable();
637 private void createSpeculativeTable() {
638 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
640 for (Transaction trans : uncommittedTransactionsList) {
643 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
644 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
645 speculativeTableTmp.put(kv.getKey(), kv);
649 } catch (Exception e) {
654 speculativeTable = speculativeTableTmp;
657 private int expectedsize, currmaxsize;
659 private void checkNumSlots(int numslots) {
660 if (numslots != expectedsize) {
661 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
665 private void initExpectedSize(long firstsequencenumber) {
666 long prevslots = firstsequencenumber;
667 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
668 currmaxsize = numslots;
671 private void updateExpectedSize() {
673 if (expectedsize > currmaxsize) {
674 expectedsize = currmaxsize;
678 private void updateCurrMaxSize(int newmaxsize) {
679 currmaxsize = newmaxsize;
682 private void commitNewMaxSize() {
683 if (numslots != currmaxsize)
684 buffer.resize(currmaxsize);
686 numslots = currmaxsize;
687 setResizeThreshold();
694 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
695 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
698 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
699 long oldseqnum = entry.getOldSeqNum();
700 long newseqnum = entry.getNewSeqNum();
701 boolean isequal = entry.getEqual();
702 long machineid = entry.getMachineID();
703 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
704 Slot slot = indexer.getSlot(seqnum);
706 long slotmachineid = slot.getMachineID();
707 if (isequal != (slotmachineid == machineid)) {
708 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
713 HashSet<Long> watchset = new HashSet<Long>();
714 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
715 long entry_mid = lastmsg_entry.getKey();
716 /* We've seen it, don't need to continue to watch. Our next
717 * message will implicitly acknowledge it. */
718 if (entry_mid == localmachineid)
720 Pair<Long, Liveness> v = lastmsg_entry.getValue();
721 long entry_seqn = v.getFirst();
722 if (entry_seqn < newseqnum) {
723 addWatchList(entry_mid, entry);
724 watchset.add(entry_mid);
727 if (watchset.isEmpty())
730 entry.setWatchSet(watchset);
733 private void processEntry(NewKey entry) {
734 arbitratorTable.put(entry.getKey(), entry.getMachineID());
737 private void processEntry(Transaction entry) {
738 uncommittedTransactionsList.add(entry);
741 private void processEntry(Abort entry) {
744 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
745 // Abort has not been seen yet so we need to keep track of it
748 // The machine already saw this so it is dead
752 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
753 Transaction prevtrans = i.next();
754 if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
755 uncommittedTransactionsList.remove(prevtrans);
762 private void processEntry(Commit entry) {
764 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
765 Commit prevcommit = i.next();
766 prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
768 if (!prevcommit.isLive()) {
769 commitList.remove(prevcommit);
773 commitList.add(entry);
775 // Update the committed table list
776 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
777 IoTString key = kv.getKey();
778 commitedTable.put(key, kv);
781 long committedTransSeq = entry.getTransSequenceNumber();
783 // Make dead the transactions
784 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
785 Transaction prevtrans = i.next();
787 if (prevtrans.getSequenceNumber() <= committedTransSeq) {
788 uncommittedTransactionsList.remove(prevtrans);
794 private void processEntry(TableStatus entry) {
795 int newnumslots = entry.getMaxSlots();
796 updateCurrMaxSize(newnumslots);
797 if (lastTableStatus != null)
798 lastTableStatus.setDead();
799 lastTableStatus = entry;
803 private void addWatchList(long machineid, RejectedMessage entry) {
804 HashSet<RejectedMessage> entries = watchlist.get(machineid);
806 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
810 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
811 machineSet.remove(machineid);
813 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
814 if (watchset != null) {
815 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
816 RejectedMessage rm = rmit.next();
817 if (rm.getNewSeqNum() <= seqnum) {
818 /* Remove it from our watchlist */
820 /* Decrement machines that need to see this notification */
821 rm.removeWatcher(machineid);
826 if (machineid == localmachineid) {
827 /* Our own messages are immediately dead. */
828 if (liveness instanceof LastMessage) {
829 ((LastMessage)liveness).setDead();
830 } else if (liveness instanceof Slot) {
831 ((Slot)liveness).setDead();
833 throw new Error("Unrecognized type");
837 // Set dead the abort
838 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
839 Abort abort = ait.next();
841 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
848 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
849 if (lastmsgentry == null)
852 long lastmsgseqnum = lastmsgentry.getFirst();
853 Liveness lastentry = lastmsgentry.getSecond();
854 if (machineid != localmachineid) {
855 if (lastentry instanceof LastMessage) {
856 ((LastMessage)lastentry).setDead();
857 } else if (lastentry instanceof Slot) {
858 ((Slot)lastentry).setDead();
860 throw new Error("Unrecognized type");
864 if (machineid == localmachineid) {
865 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
866 throw new Error("Server Error: Mismatch on local machine sequence number");
868 if (lastmsgseqnum > seqnum)
869 throw new Error("Server Error: Rollback on remote machine sequence number");
873 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
874 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
875 for (Entry entry : slot.getEntries()) {
876 switch (entry.getType()) {
878 case Entry.TypeNewKey:
879 processEntry((NewKey)entry);
882 case Entry.TypeCommit:
883 processEntry((Commit)entry);
886 case Entry.TypeAbort:
887 processEntry((Abort)entry);
890 case Entry.TypeTransaction:
891 processEntry((Transaction)entry);
894 case Entry.TypeLastMessage:
895 processEntry((LastMessage)entry, machineSet);
898 case Entry.TypeRejectedMessage:
899 processEntry((RejectedMessage)entry, indexer);
902 case Entry.TypeTableStatus:
903 processEntry((TableStatus)entry);
907 throw new Error("Unrecognized type: " + entry.getType());
912 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
913 for (int i = 0; i < newslots.length; i++) {
914 Slot currslot = newslots[i];
915 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
916 if (prevslot != null &&
917 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
918 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);