1cdb3d5b11b789d3cb29afac838cb4e0d2ca11d4
[iotcloud.git] / src2 / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
12 import java.util.Set;
13 import java.util.Collection;
14
15 /**
16  * IoTTable data structure.  Provides client inferface.
17  * @author Brian Demsky
18  * @version 1.0
19  */
20
21 final public class Table {
22         private int numslots;   //number of slots stored in buffer
23
24         //table of key-value pairs
25         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
26
27         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
28         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
29         // machine id -> ...
30         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
31         private Vector<Long> rejectedmessagelist = new Vector<Long>();
32         private SlotBuffer buffer;
33         private CloudComm cloud;
34         private long sequencenumber; //Largest sequence number a client has received
35         private long localmachineid;
36         private TableStatus lastTableStatus;
37         static final int FREE_SLOTS = 10; //number of slots that should be kept free
38         static final int SKIP_THRESHOLD = 10;
39         private long liveslotcount = 0;
40         private int chance;
41         static final double RESIZE_MULTIPLE = 1.2;
42         static final double RESIZE_THRESHOLD = 0.75;
43         static final int REJECTED_THRESHOLD = 5;
44         private int resizethreshold;
45         private long lastliveslotseqn;  //smallest sequence number with a live entry
46         private Random random = new Random();
47
48         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
49         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
50         private List<Commit> commitList = null; // List of all the most recent live commits
51         private Set<Abort> abortSet = null; // Set of the live aborts
52         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
53         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
54         private List<Transaction> uncommittedTransactionsList = null; //
55         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
56         // private Set<Abort> arbitratorTable = null; // Table of arbitrators
57
58
59         public Table(String baseurl, String password, long _localmachineid) {
60                 localmachineid = _localmachineid;
61                 buffer = new SlotBuffer();
62                 numslots = buffer.capacity();
63                 setResizeThreshold();
64                 sequencenumber = 0;
65                 cloud = new CloudComm(this, baseurl, password);
66                 lastliveslotseqn = 1;
67
68                 pendingTransQueue = new LinkedList<PendingTransaction>();
69                 commitList = new LinkedList<Commit>();
70                 abortSet = new HashSet<Abort>();
71                 commitedTable = new HashMap<IoTString, KeyValue>();
72                 speculativeTable = new HashMap<IoTString, KeyValue>();
73                 uncommittedTransactionsList = new LinkedList<Transaction>();
74                 arbitratorTable = new HashMap<IoTString, Long>();
75         }
76
77         public Table(CloudComm _cloud, long _localmachineid) {
78                 localmachineid = _localmachineid;
79                 buffer = new SlotBuffer();
80                 numslots = buffer.capacity();
81                 setResizeThreshold();
82                 sequencenumber = 0;
83                 cloud = _cloud;
84
85                 pendingTransQueue = new LinkedList<PendingTransaction>();
86                 commitList = new LinkedList<Commit>();
87                 abortSet = new HashSet<Abort>();
88                 commitedTable = new HashMap<IoTString, KeyValue>();
89                 speculativeTable = new HashMap<IoTString, KeyValue>();
90                 uncommittedTransactionsList = new LinkedList<Transaction>();
91                 arbitratorTable = new HashMap<IoTString, Long>();
92         }
93
94         public void rebuild() {
95                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
96                 validateandupdate(newslots, true);
97         }
98
99
100
101         public IoTString getCommitted(IoTString key) {
102                 KeyValue kv = commitedTable.get(key);
103                 if (kv != null) {
104                         return kv.getValue();
105                 } else {
106                         return null;
107                 }
108         }
109
110         public IoTString getSpeculative(IoTString key) {
111                 KeyValue kv = speculativeTable.get(key);
112                 if (kv != null) {
113                         return kv.getValue();
114                 } else {
115                         return null;
116                 }
117         }
118
119
120         public void initTable() {
121                 cloud.setSalt();//Set the salt
122                 Slot s = new Slot(this, 1, localmachineid);
123                 TableStatus status = new TableStatus(s, numslots);
124                 s.addEntry(status);
125                 Slot[] array = cloud.putSlot(s, numslots);
126                 if (array == null) {
127                         array = new Slot[] {s};
128                         /* update data structure */
129                         validateandupdate(array, true);
130                 } else {
131                         throw new Error("Error on initialization");
132                 }
133         }
134
135         public String toString() {
136
137
138                 String retString = " Committed Table: \n";
139                 retString += "---------------------------\n";
140                 retString += commitedTable.toString();
141
142                 retString += "\n\n";
143
144                 retString += " Speculative Table: \n";
145                 retString += "---------------------------\n";
146                 retString += speculativeTable.toString();
147
148                 return retString;
149         }
150
151
152
153
154
155
156         public void startTransaction() {
157                 // Create a new transaction, invalidates any old pending transactions.
158                 pendingTransBuild = new PendingTransaction();
159         }
160
161         public void commitTransaction() {
162
163                 if (pendingTransBuild.getKVUpdates().size() == 0) {
164                         // If no updates are made then there is no point inserting into the chain
165                         return;
166                 }
167
168                 // Add the pending transaction to the queue
169                 pendingTransQueue.add(pendingTransBuild);
170
171                 while (!pendingTransQueue.isEmpty()) {
172                         if (tryput( pendingTransQueue.peek(), false)) {
173                                 pendingTransQueue.poll();
174                         }
175                 }
176         }
177
178         public void addKV(IoTString key, IoTString value) {
179
180                 // Make sure new key value pair matches the current arbitrator
181                 if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
182                         // TODO: Maybe not throw and error
183                         throw new Error("Not all Key Values match");
184                 }
185
186
187
188                 KeyValue kv = new KeyValue(key, value);
189                 pendingTransBuild.addKV(kv);
190         }
191
192
193         // TODo: FIx Guard
194         public void addGuard(IoTString key, IoTString value) {
195                 KeyValue kv = new KeyValue(key, value);
196                 pendingTransBuild.addKV(kv);
197         }
198
199         public void update() {
200                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
201
202                 validateandupdate(newslots, false);
203         }
204
205         public boolean createNewKey(IoTString keyName, long machineId) {
206
207                 while (true) {
208                         if (arbitratorTable.get(keyName) != null) {
209                                 // There is already an arbitrator
210                                 return false;
211                         }
212
213                         if (tryput(keyName, machineId, false)) {
214
215                                 // If successfully inserted
216                                 return true;
217                         }
218                 }
219         }
220
221         void decrementLiveCount() {
222                 liveslotcount--;
223         }
224
225
226         private void setResizeThreshold() {
227                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
228                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
229         }
230
231         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
232                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
233                 int newsize = 0;
234                 if (liveslotcount > resizethreshold) {
235                         resize = true; //Resize is forced
236                 }
237
238                 if (resize) {
239                         newsize = (int) (numslots * RESIZE_MULTIPLE);
240                         TableStatus status = new TableStatus(s, newsize);
241                         s.addEntry(status);
242                 }
243
244                 if (! rejectedmessagelist.isEmpty()) {
245                         /* TODO: We should avoid generating a rejected message entry if
246                          * there is already a sufficient entry in the queue (e.g.,
247                          * equalsto value of true and same sequence number).  */
248
249                         long old_seqn = rejectedmessagelist.firstElement();
250                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
251                                 long new_seqn = rejectedmessagelist.lastElement();
252                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
253                                 s.addEntry(rm);
254                         } else {
255                                 long prev_seqn = -1;
256                                 int i = 0;
257                                 /* Go through list of missing messages */
258                                 for (; i < rejectedmessagelist.size(); i++) {
259                                         long curr_seqn = rejectedmessagelist.get(i);
260                                         Slot s_msg = buffer.getSlot(curr_seqn);
261                                         if (s_msg != null)
262                                                 break;
263                                         prev_seqn = curr_seqn;
264                                 }
265                                 /* Generate rejected message entry for missing messages */
266                                 if (prev_seqn != -1) {
267                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
268                                         s.addEntry(rm);
269                                 }
270                                 /* Generate rejected message entries for present messages */
271                                 for (; i < rejectedmessagelist.size(); i++) {
272                                         long curr_seqn = rejectedmessagelist.get(i);
273                                         Slot s_msg = buffer.getSlot(curr_seqn);
274                                         long machineid = s_msg.getMachineID();
275                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
276                                         s.addEntry(rm);
277                                 }
278                         }
279                 }
280
281                 long newestseqnum = buffer.getNewestSeqNum();
282                 long oldestseqnum = buffer.getOldestSeqNum();
283                 if (lastliveslotseqn < oldestseqnum)
284                         lastliveslotseqn = oldestseqnum;
285
286                 long seqn = lastliveslotseqn;
287                 boolean seenliveslot = false;
288                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
289                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
290
291
292                 // Mandatory Rescue
293                 for (; seqn < threshold; seqn++) {
294                         Slot prevslot = buffer.getSlot(seqn);
295                         //Push slot number forward
296                         if (! seenliveslot)
297                                 lastliveslotseqn = seqn;
298
299                         if (! prevslot.isLive())
300                                 continue;
301                         seenliveslot = true;
302                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
303                         for (Entry liveentry : liveentries) {
304                                 if (s.hasSpace(liveentry)) {
305                                         s.addEntry(liveentry);
306                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
307                                         if (!resize) {
308                                                 System.out.print("B"); //?
309                                                 return tryput(pendingTrans, true);
310                                         }
311                                 }
312                         }
313                 }
314
315
316                 // Arbitrate
317                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
318                 for (Transaction ut : uncommittedTransactionsList) {
319
320                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
321                         // Check if this machine arbitrates for this transaction
322                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
323                                 continue;
324                         }
325
326                         Entry newEntry = null;
327
328                         try {
329                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
330                                         // Guard evaluated as true
331
332                                         // update the local tmp current key set
333                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
334                                                 speculativeTableTmp.put(kv.getKey(), kv);
335                                         }
336
337                                         // create the commit
338                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
339                                 } else {
340                                         // Guard was false
341
342                                         // create the abort
343                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
344                                 }
345                         } catch (Exception e) {
346                                 e.printStackTrace();
347                         }
348
349                         if ((newEntry != null) && s.hasSpace(newEntry)) {
350                                 s.addEntry(newEntry);
351                         } else {
352                                 break;
353                         }
354                 }
355
356                 Transaction trans = new Transaction(s,
357                                                     s.getSequenceNumber(),
358                                                     localmachineid,
359                                                     pendingTrans.getKVUpdates(),
360                                                     pendingTrans.getGuard());
361                 boolean insertedTrans = false;
362                 if (s.hasSpace(trans)) {
363                         s.addEntry(trans);
364                         insertedTrans = true;
365                 }
366
367                 /* now go through live entries from least to greatest sequence number until
368                  * either all live slots added, or the slot doesn't have enough room
369                  * for SKIP_THRESHOLD consecutive entries*/
370                 int skipcount = 0;
371                 search:
372                 for (; seqn <= newestseqnum; seqn++) {
373                         Slot prevslot = buffer.getSlot(seqn);
374                         //Push slot number forward
375                         if (!seenliveslot)
376                                 lastliveslotseqn = seqn;
377
378                         if (!prevslot.isLive())
379                                 continue;
380                         seenliveslot = true;
381                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
382                         for (Entry liveentry : liveentries) {
383                                 if (s.hasSpace(liveentry))
384                                         s.addEntry(liveentry);
385                                 else {
386                                         skipcount++;
387                                         if (skipcount > SKIP_THRESHOLD)
388                                                 break search;
389                                 }
390                         }
391                 }
392
393                 int max = 0;
394                 if (resize)
395                         max = newsize;
396                 Slot[] array = cloud.putSlot(s, max);
397                 if (array == null) {
398                         array = new Slot[] {s};
399                         rejectedmessagelist.clear();
400                 }       else {
401                         if (array.length == 0)
402                                 throw new Error("Server Error: Did not send any slots");
403                         rejectedmessagelist.add(s.getSequenceNumber());
404                         insertedTrans = false;
405                 }
406
407                 /* update data structure */
408                 validateandupdate(array, true);
409
410                 return insertedTrans;
411         }
412
413         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
414                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
415                 int newsize = 0;
416                 if (liveslotcount > resizethreshold) {
417                         resize = true; //Resize is forced
418                 }
419
420                 if (resize) {
421                         newsize = (int) (numslots * RESIZE_MULTIPLE);
422                         TableStatus status = new TableStatus(s, newsize);
423                         s.addEntry(status);
424                 }
425
426                 if (! rejectedmessagelist.isEmpty()) {
427                         /* TODO: We should avoid generating a rejected message entry if
428                          * there is already a sufficient entry in the queue (e.g.,
429                          * equalsto value of true and same sequence number).  */
430
431                         long old_seqn = rejectedmessagelist.firstElement();
432                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
433                                 long new_seqn = rejectedmessagelist.lastElement();
434                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
435                                 s.addEntry(rm);
436                         } else {
437                                 long prev_seqn = -1;
438                                 int i = 0;
439                                 /* Go through list of missing messages */
440                                 for (; i < rejectedmessagelist.size(); i++) {
441                                         long curr_seqn = rejectedmessagelist.get(i);
442                                         Slot s_msg = buffer.getSlot(curr_seqn);
443                                         if (s_msg != null)
444                                                 break;
445                                         prev_seqn = curr_seqn;
446                                 }
447                                 /* Generate rejected message entry for missing messages */
448                                 if (prev_seqn != -1) {
449                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
450                                         s.addEntry(rm);
451                                 }
452                                 /* Generate rejected message entries for present messages */
453                                 for (; i < rejectedmessagelist.size(); i++) {
454                                         long curr_seqn = rejectedmessagelist.get(i);
455                                         Slot s_msg = buffer.getSlot(curr_seqn);
456                                         long machineid = s_msg.getMachineID();
457                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
458                                         s.addEntry(rm);
459                                 }
460                         }
461                 }
462
463                 long newestseqnum = buffer.getNewestSeqNum();
464                 long oldestseqnum = buffer.getOldestSeqNum();
465                 if (lastliveslotseqn < oldestseqnum)
466                         lastliveslotseqn = oldestseqnum;
467
468                 long seqn = lastliveslotseqn;
469                 boolean seenliveslot = false;
470                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
471                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
472
473
474                 // Mandatory Rescue
475                 for (; seqn < threshold; seqn++) {
476                         Slot prevslot = buffer.getSlot(seqn);
477                         //Push slot number forward
478                         if (! seenliveslot)
479                                 lastliveslotseqn = seqn;
480
481                         if (! prevslot.isLive())
482                                 continue;
483                         seenliveslot = true;
484                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
485                         for (Entry liveentry : liveentries) {
486                                 if (s.hasSpace(liveentry)) {
487                                         s.addEntry(liveentry);
488                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
489                                         if (!resize) {
490                                                 System.out.print("B"); //?
491                                                 return tryput(keyName, arbMachineid, true);
492                                         }
493                                 }
494                         }
495                 }
496
497
498                 // Arbitrate
499                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
500                 for (Transaction ut : uncommittedTransactionsList) {
501
502                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
503                         // Check if this machine arbitrates for this transaction
504                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
505                                 continue;
506                         }
507
508                         Entry newEntry = null;
509
510                         try {
511                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
512                                         // Guard evaluated as true
513
514                                         // update the local tmp current key set
515                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
516                                                 speculativeTableTmp.put(kv.getKey(), kv);
517                                         }
518
519                                         // create the commit
520                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
521                                 } else {
522                                         // Guard was false
523
524                                         // create the abort
525                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
526                                 }
527                         } catch (Exception e) {
528                                 e.printStackTrace();
529                         }
530
531                         if ((newEntry != null) && s.hasSpace(newEntry)) {
532                                 s.addEntry(newEntry);
533                         } else {
534                                 break;
535                         }
536                 }
537
538
539                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
540
541                 boolean insertedNewKey = false;
542                 if (s.hasSpace(newKey)) {
543                         s.addEntry(newKey);
544                         insertedNewKey = true;
545                 }
546
547                 /* now go through live entries from least to greatest sequence number until
548                  * either all live slots added, or the slot doesn't have enough room
549                  * for SKIP_THRESHOLD consecutive entries*/
550                 int skipcount = 0;
551                 search:
552                 for (; seqn <= newestseqnum; seqn++) {
553                         Slot prevslot = buffer.getSlot(seqn);
554                         //Push slot number forward
555                         if (!seenliveslot)
556                                 lastliveslotseqn = seqn;
557
558                         if (!prevslot.isLive())
559                                 continue;
560                         seenliveslot = true;
561                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
562                         for (Entry liveentry : liveentries) {
563                                 if (s.hasSpace(liveentry))
564                                         s.addEntry(liveentry);
565                                 else {
566                                         skipcount++;
567                                         if (skipcount > SKIP_THRESHOLD)
568                                                 break search;
569                                 }
570                         }
571                 }
572
573                 int max = 0;
574                 if (resize)
575                         max = newsize;
576                 Slot[] array = cloud.putSlot(s, max);
577                 if (array == null) {
578                         array = new Slot[] {s};
579                         rejectedmessagelist.clear();
580                 }       else {
581                         if (array.length == 0)
582                                 throw new Error("Server Error: Did not send any slots");
583                         rejectedmessagelist.add(s.getSequenceNumber());
584                         insertedNewKey = false;
585                 }
586
587                 /* update data structure */
588                 validateandupdate(array, true);
589
590                 return insertedNewKey;
591         }
592
593         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
594                 /* The cloud communication layer has checked slot HMACs already
595                          before decoding */
596                 if (newslots.length == 0) return;
597
598                 long firstseqnum = newslots[0].getSequenceNumber();
599                 if (firstseqnum <= sequencenumber) {
600                         throw new Error("Server Error: Sent older slots!");
601                 }
602
603                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
604                 checkHMACChain(indexer, newslots);
605
606                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
607
608                 initExpectedSize(firstseqnum);
609                 for (Slot slot : newslots) {
610                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
611                         updateExpectedSize();
612                 }
613
614                 /* If there is a gap, check to see if the server sent us everything. */
615                 if (firstseqnum != (sequencenumber + 1)) {
616
617                         // TODO: Check size
618                         checkNumSlots(newslots.length);
619                         if (!machineSet.isEmpty()) {
620                                 throw new Error("Missing record for machines: " + machineSet);
621                         }
622                 }
623
624                 commitNewMaxSize();
625
626                 /* Commit new to slots. */
627                 for (Slot slot : newslots) {
628                         buffer.putSlot(slot);
629                         liveslotcount++;
630                 }
631                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
632
633                 // Speculate on key value pairs
634                 createSpeculativeTable();
635         }
636
637         private void createSpeculativeTable() {
638                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
639
640                 for (Transaction trans : uncommittedTransactionsList) {
641
642                         try {
643                                 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
644                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
645                                                 speculativeTableTmp.put(kv.getKey(), kv);
646                                         }
647                                 }
648
649                         } catch (Exception e) {
650                                 e.printStackTrace();
651                         }
652                 }
653
654                 speculativeTable = speculativeTableTmp;
655         }
656
657         private int expectedsize, currmaxsize;
658
659         private void checkNumSlots(int numslots) {
660                 if (numslots != expectedsize) {
661                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
662                 }
663         }
664
665         private void initExpectedSize(long firstsequencenumber) {
666                 long prevslots = firstsequencenumber;
667                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
668                 currmaxsize = numslots;
669         }
670
671         private void updateExpectedSize() {
672                 expectedsize++;
673                 if (expectedsize > currmaxsize) {
674                         expectedsize = currmaxsize;
675                 }
676         }
677
678         private void updateCurrMaxSize(int newmaxsize) {
679                 currmaxsize = newmaxsize;
680         }
681
682         private void commitNewMaxSize() {
683                 if (numslots != currmaxsize)
684                         buffer.resize(currmaxsize);
685
686                 numslots = currmaxsize;
687                 setResizeThreshold();
688         }
689
690
691
692
693
694         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
695                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
696         }
697
698         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
699                 long oldseqnum = entry.getOldSeqNum();
700                 long newseqnum = entry.getNewSeqNum();
701                 boolean isequal = entry.getEqual();
702                 long machineid = entry.getMachineID();
703                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
704                         Slot slot = indexer.getSlot(seqnum);
705                         if (slot != null) {
706                                 long slotmachineid = slot.getMachineID();
707                                 if (isequal != (slotmachineid == machineid)) {
708                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
709                                 }
710                         }
711                 }
712
713                 HashSet<Long> watchset = new HashSet<Long>();
714                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
715                         long entry_mid = lastmsg_entry.getKey();
716                         /* We've seen it, don't need to continue to watch.  Our next
717                          * message will implicitly acknowledge it. */
718                         if (entry_mid == localmachineid)
719                                 continue;
720                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
721                         long entry_seqn = v.getFirst();
722                         if (entry_seqn < newseqnum) {
723                                 addWatchList(entry_mid, entry);
724                                 watchset.add(entry_mid);
725                         }
726                 }
727                 if (watchset.isEmpty())
728                         entry.setDead();
729                 else
730                         entry.setWatchSet(watchset);
731         }
732
733         private void processEntry(NewKey entry) {
734                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
735         }
736
737         private void processEntry(Transaction entry) {
738                 uncommittedTransactionsList.add(entry);
739         }
740
741         private void processEntry(Abort entry) {
742
743
744                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
745                         // Abort has not been seen yet so we need to keep track of it
746                         abortSet.add(entry);
747                 } else {
748                         // The machine already saw this so it is dead
749                         entry.setDead();
750                 }
751
752                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
753                         Transaction prevtrans = i.next();
754                         if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
755                                 uncommittedTransactionsList.remove(prevtrans);
756                                 prevtrans.setDead();
757                                 return;
758                         }
759                 }
760         }
761
762         private void processEntry(Commit entry) {
763
764                 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
765                         Commit prevcommit = i.next();
766                         prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
767
768                         if (!prevcommit.isLive()) {
769                                 commitList.remove(prevcommit);
770                         }
771                 }
772
773                 commitList.add(entry);
774
775                 // Update the committed table list
776                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
777                         IoTString key = kv.getKey();
778                         commitedTable.put(key, kv);
779                 }
780
781                 long committedTransSeq = entry.getTransSequenceNumber();
782
783                 // Make dead the transactions
784                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
785                         Transaction prevtrans = i.next();
786
787                         if (prevtrans.getSequenceNumber() <= committedTransSeq) {
788                                 uncommittedTransactionsList.remove(prevtrans);
789                                 prevtrans.setDead();
790                         }
791                 }
792         }
793
794         private void processEntry(TableStatus entry) {
795                 int newnumslots = entry.getMaxSlots();
796                 updateCurrMaxSize(newnumslots);
797                 if (lastTableStatus != null)
798                         lastTableStatus.setDead();
799                 lastTableStatus = entry;
800         }
801
802
803         private void addWatchList(long machineid, RejectedMessage entry) {
804                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
805                 if (entries == null)
806                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
807                 entries.add(entry);
808         }
809
810         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
811                 machineSet.remove(machineid);
812
813                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
814                 if (watchset != null) {
815                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
816                                 RejectedMessage rm = rmit.next();
817                                 if (rm.getNewSeqNum() <= seqnum) {
818                                         /* Remove it from our watchlist */
819                                         rmit.remove();
820                                         /* Decrement machines that need to see this notification */
821                                         rm.removeWatcher(machineid);
822                                 }
823                         }
824                 }
825
826                 if (machineid == localmachineid) {
827                         /* Our own messages are immediately dead. */
828                         if (liveness instanceof LastMessage) {
829                                 ((LastMessage)liveness).setDead();
830                         } else if (liveness instanceof Slot) {
831                                 ((Slot)liveness).setDead();
832                         } else {
833                                 throw new Error("Unrecognized type");
834                         }
835                 }
836
837                 // Set dead the abort
838                 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
839                         Abort abort = ait.next();
840
841                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
842                                 abort.setDead();
843                                 ait.remove();
844                         }
845                 }
846
847
848                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
849                 if (lastmsgentry == null)
850                         return;
851
852                 long lastmsgseqnum = lastmsgentry.getFirst();
853                 Liveness lastentry = lastmsgentry.getSecond();
854                 if (machineid != localmachineid) {
855                         if (lastentry instanceof LastMessage) {
856                                 ((LastMessage)lastentry).setDead();
857                         } else if (lastentry instanceof Slot) {
858                                 ((Slot)lastentry).setDead();
859                         } else {
860                                 throw new Error("Unrecognized type");
861                         }
862                 }
863
864                 if (machineid == localmachineid) {
865                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
866                                 throw new Error("Server Error: Mismatch on local machine sequence number");
867                 } else {
868                         if (lastmsgseqnum > seqnum)
869                                 throw new Error("Server Error: Rollback on remote machine sequence number");
870                 }
871         }
872
873         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
874                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
875                 for (Entry entry : slot.getEntries()) {
876                         switch (entry.getType()) {
877
878                         case Entry.TypeNewKey:
879                                 processEntry((NewKey)entry);
880                                 break;
881
882                         case Entry.TypeCommit:
883                                 processEntry((Commit)entry);
884                                 break;
885
886                         case Entry.TypeAbort:
887                                 processEntry((Abort)entry);
888                                 break;
889
890                         case Entry.TypeTransaction:
891                                 processEntry((Transaction)entry);
892                                 break;
893
894                         case Entry.TypeLastMessage:
895                                 processEntry((LastMessage)entry, machineSet);
896                                 break;
897
898                         case Entry.TypeRejectedMessage:
899                                 processEntry((RejectedMessage)entry, indexer);
900                                 break;
901
902                         case Entry.TypeTableStatus:
903                                 processEntry((TableStatus)entry);
904                                 break;
905
906                         default:
907                                 throw new Error("Unrecognized type: " + entry.getType());
908                         }
909                 }
910         }
911
912         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
913                 for (int i = 0; i < newslots.length; i++) {
914                         Slot currslot = newslots[i];
915                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
916                         if (prevslot != null &&
917                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
918                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
919                 }
920         }
921 }