Fixed Andriod version
[iotcloud.git] / version2 / src / Control / app / src / main / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15 import android.content.*;
16
17 /**
18  * IoTTable data structure.  Provides client interface.
19  * @author Brian Demsky
20  * @version 1.0
21  */
22
23 final public class Table {
24
25         /* Constants */
26         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
27         static final int SKIP_THRESHOLD = 10;
28         static final double RESIZE_MULTIPLE = 1.2;
29         static final double RESIZE_THRESHOLD = 0.75;
30         static final int REJECTED_THRESHOLD = 5;
31
32         /* Helper Objects */
33         private SlotBuffer buffer = null;
34         private CloudComm cloud = null;
35         private Random random = null;
36         private TableStatus liveTableStatus = null;
37         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
38         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
39         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
40
41         /* Variables */
42         private int numberOfSlots = 0;  // Number of slots stored in buffer
43         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
44         private long liveSlotCount = 0; // Number of currently live slots
45         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
46         private long localMachineId = 0; // Machine ID of this client device
47         private long sequenceNumber = 0; // Largest sequence number a client has received
48         private long localSequenceNumber = 0;
49
50         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
51         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
52         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
53         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
54         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
55         private long localArbitrationSequenceNumber = 0;
56         private boolean hadPartialSendToServer = false;
57         private boolean attemptedToSendToServer = false;
58         private long expectedsize;
59         private boolean didFindTableStatus = false;
60         private long currMaxSize = 0;
61
62         private Slot lastSlotAttemptedToSend = null;
63         private boolean lastIsNewKey = false;
64         private int lastNewSize = 0;
65         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
66         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
67         private NewKey lastNewKey = null;
68
69
70         /* Data Structures  */
71         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
72         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
73         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
74         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
75         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
76         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
77         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
78         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
79         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
80         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
81         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
82         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
83         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
84         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
85         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
86         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
87         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
88         private List<Transaction> pendingTransactionQueue = null;
89         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
90         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
91         private Map<Transaction, List<Integer>> transactionPartsSent = null;
92         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
93         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
94         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
95         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
96         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
97         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
98
99
100         public Table(String baseurl, String password, long _localMachineId, int listeningPort, Context context) {
101                 localMachineId = _localMachineId;
102                 cloud = new CloudComm(this, baseurl, password, listeningPort, context);
103
104                 init();
105         }
106
107         public Table(CloudComm _cloud, long _localMachineId) {
108                 localMachineId = _localMachineId;
109                 cloud = _cloud;
110
111                 init();
112         }
113
114         /**
115          * Init all the stuff needed for for table usage
116          */
117         private void init() {
118
119                 // Init helper objects
120                 random = new Random();
121                 buffer = new SlotBuffer();
122
123                 // Set Variables
124                 oldestLiveSlotSequenceNumver = 1;
125
126                 // init data structs
127                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
128                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
129                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
130                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
131                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
132                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
133                 arbitratorTable = new HashMap<IoTString, Long>();
134                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
135                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
136                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
137                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
138                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
139                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
140                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
141                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
142                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
143                 rejectedSlotList = new Vector<Long>();
144                 pendingTransactionQueue = new ArrayList<Transaction>();
145                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
146                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
147                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
148                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
149                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
150                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
151                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
152                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
153                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
154
155
156                 // Other init stuff
157                 numberOfSlots = buffer.capacity();
158                 setResizeThreshold();
159         }
160
161         // TODO: delete method
162         public synchronized void printSlots() {
163                 long o = buffer.getOldestSeqNum();
164                 long n = buffer.getNewestSeqNum();
165
166                 int[] types = new int[10];
167
168                 int num = 0;
169
170                 int livec = 0;
171                 int deadc = 0;
172
173                 int casdasd = 0;
174
175                 int liveslo = 0;
176
177                 for (long i = o; i < (n + 1); i++) {
178                         Slot s = buffer.getSlot(i);
179
180
181                         if (s.isLive()) {
182                                 liveslo++;
183                         }
184
185                         Vector<Entry> entries = s.getEntries();
186
187                         for (Entry e : entries) {
188                                 if (e.isLive()) {
189                                         int type = e.getType();
190
191
192                                         if (type == 6) {
193                                                 RejectedMessage rej = (RejectedMessage)e;
194                                                 casdasd++;
195
196                                                 System.out.println(rej.getMachineID());
197                                         }
198
199
200                                         types[type] = types[type] + 1;
201                                         num++;
202                                         livec++;
203                                 } else {
204                                         deadc++;
205                                 }
206                         }
207                 }
208
209                 for (int i = 0; i < 10; i++) {
210                         System.out.println(i + "    " + types[i]);
211                 }
212                 System.out.println("Live count:   " + livec);
213                 System.out.println("Live Slot count:   " + liveslo);
214
215                 System.out.println("Dead count:   " + deadc);
216                 System.out.println("Old:   " + o);
217                 System.out.println("New:   " + n);
218                 System.out.println("Size:   " + buffer.size());
219                 // System.out.println("Commits:   " + liveCommitsTable.size());
220                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
221                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
222
223                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
224                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
225                 }
226
227
228                 for (Long a : liveCommitsTable.keySet()) {
229                         for (Long b : liveCommitsTable.get(a).keySet()) {
230                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
231                                         System.out.print(kv + " ");
232                                 }
233                                 System.out.print("|| ");
234                         }
235                         System.out.println();
236                 }
237
238         }
239
240         /**
241          * Initialize the table by inserting a table status as the first entry into the table status
242          * also initialize the crypto stuff.
243          */
244         public synchronized void initTable() throws ServerException {
245                 cloud.initSecurity();
246
247                 // Create the first insertion into the block chain which is the table status
248                 Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
249                 localSequenceNumber++;
250                 TableStatus status = new TableStatus(s, numberOfSlots);
251                 s.addEntry(status);
252                 Slot[] array = cloud.putSlot(s, numberOfSlots);
253
254                 if (array == null) {
255                         array = new Slot[] {s};
256                         // update local block chain
257                         validateAndUpdate(array, true);
258                 } else if (array.length == 1) {
259                         // in case we did push the slot BUT we failed to init it
260                         validateAndUpdate(array, true);
261                 } else {
262                         throw new Error("Error on initialization");
263                 }
264         }
265
266         /**
267          * Rebuild the table from scratch by pulling the latest block chain from the server.
268          */
269         public synchronized void rebuild() throws ServerException {
270                 // Just pull the latest slots from the server
271                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
272                 validateAndUpdate(newslots, true);
273                 sendToServer(null);
274                 updateLiveTransactionsAndStatus();
275
276         }
277
278         // public String toString() {
279         //      String retString = " Committed Table: \n";
280         //      retString += "---------------------------\n";
281         //      retString += commitedTable.toString();
282
283         //      retString += "\n\n";
284
285         //      retString += " Speculative Table: \n";
286         //      retString += "---------------------------\n";
287         //      retString += speculativeTable.toString();
288
289         //      return retString;
290         // }
291
292         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
293                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
294         }
295
296         public synchronized Long getArbitrator(IoTString key) {
297                 return arbitratorTable.get(key);
298         }
299
300         public synchronized void close() {
301                 cloud.close();
302         }
303
304         public synchronized IoTString getCommitted(IoTString key)  {
305                 KeyValue kv = committedKeyValueTable.get(key);
306
307                 if (kv != null) {
308                         return kv.getValue();
309                 } else {
310                         return null;
311                 }
312         }
313
314         public synchronized IoTString getSpeculative(IoTString key) {
315                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
316
317                 if (kv == null) {
318                         kv = speculatedKeyValueTable.get(key);
319                 }
320
321                 if (kv == null) {
322                         kv = committedKeyValueTable.get(key);
323                 }
324
325                 if (kv != null) {
326                         return kv.getValue();
327                 } else {
328                         return null;
329                 }
330         }
331
332         public synchronized IoTString getCommittedAtomic(IoTString key) {
333                 KeyValue kv = committedKeyValueTable.get(key);
334
335                 if (arbitratorTable.get(key) == null) {
336                         throw new Error("Key not Found.");
337                 }
338
339                 // Make sure new key value pair matches the current arbitrator
340                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
341                         // TODO: Maybe not throw en error
342                         throw new Error("Not all Key Values Match Arbitrator.");
343                 }
344
345                 if (kv != null) {
346                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
347                         return kv.getValue();
348                 } else {
349                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
350                         return null;
351                 }
352         }
353
354         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
355                 if (arbitratorTable.get(key) == null) {
356                         throw new Error("Key not Found.");
357                 }
358
359                 // Make sure new key value pair matches the current arbitrator
360                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
361                         // TODO: Maybe not throw en error
362                         throw new Error("Not all Key Values Match Arbitrator.");
363                 }
364
365                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
366
367                 if (kv == null) {
368                         kv = speculatedKeyValueTable.get(key);
369                 }
370
371                 if (kv == null) {
372                         kv = committedKeyValueTable.get(key);
373                 }
374
375                 if (kv != null) {
376                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
377                         return kv.getValue();
378                 } else {
379                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
380                         return null;
381                 }
382         }
383
384         public synchronized boolean update()  {
385                 try {
386                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
387                         validateAndUpdate(newSlots, false);
388                         sendToServer(null);
389
390
391                         updateLiveTransactionsAndStatus();
392
393                         return true;
394                 } catch (Exception e) {
395                         // e.printStackTrace();
396
397                         for (Long m : localCommunicationTable.keySet()) {
398                                 updateFromLocal(m);
399                         }
400                 }
401
402                 return false;
403         }
404
405         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
406                 while (true) {
407                         if (arbitratorTable.get(keyName) != null) {
408                                 // There is already an arbitrator
409                                 return false;
410                         }
411
412                         NewKey newKey = new NewKey(null, keyName, machineId);
413
414                         if (sendToServer(newKey)) {
415                                 // If successfully inserted
416                                 return true;
417                         }
418                 }
419         }
420
421         public synchronized void startTransaction() {
422                 // Create a new transaction, invalidates any old pending transactions.
423                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
424         }
425
426         public synchronized void addKV(IoTString key, IoTString value) {
427
428                 // Make sure it is a valid key
429                 if (arbitratorTable.get(key) == null) {
430                         throw new Error("Key not Found.");
431                 }
432
433                 // Make sure new key value pair matches the current arbitrator
434                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
435                         // TODO: Maybe not throw en error
436                         throw new Error("Not all Key Values Match Arbitrator.");
437                 }
438
439                 // Add the key value to this transaction
440                 KeyValue kv = new KeyValue(key, value);
441                 pendingTransactionBuilder.addKV(kv);
442         }
443
444         public synchronized TransactionStatus commitTransaction() {
445
446                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
447                         // transaction with no updates will have no effect on the system
448                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
449                 }
450
451                 // Set the local transaction sequence number and increment
452                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
453                 localTransactionSequenceNumber++;
454
455                 // Create the transaction status
456                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
457
458                 // Create the new transaction
459                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
460                 newTransaction.setTransactionStatus(transactionStatus);
461
462                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
463                         // Add it to the queue and invalidate the builder for safety
464                         pendingTransactionQueue.add(newTransaction);
465                 } else {
466                         arbitrateOnLocalTransaction(newTransaction);
467                         updateLiveStateFromLocal();
468                 }
469
470                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
471
472                 try {
473                         sendToServer(null);
474                 } catch (ServerException e) {
475
476                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
477                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
478                                 Transaction transaction = iter.next();
479
480                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
481                                         // Already contacted this client so ignore all attempts to contact this client
482                                         // to preserve ordering for arbitrator
483                                         continue;
484                                 }
485
486                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
487
488                                 if (sendReturn.getFirst()) {
489                                         // Failed to contact over local
490                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
491                                 } else {
492                                         // Successful contact or should not contact
493
494                                         if (sendReturn.getSecond()) {
495                                                 // did arbitrate
496                                                 iter.remove();
497                                         }
498                                 }
499                         }
500                 }
501
502                 updateLiveStateFromLocal();
503
504                 return transactionStatus;
505         }
506
507         /**
508          * Get the machine ID for this client
509          */
510         public long getMachineId() {
511                 return localMachineId;
512         }
513
514         /**
515          * Decrement the number of live slots that we currently have
516          */
517         public void decrementLiveCount() {
518                 liveSlotCount--;
519         }
520
521         /**
522          * Recalculate the new resize threshold
523          */
524         private void setResizeThreshold() {
525                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
526                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
527         }
528
529         public long getLocalSequenceNumber() {
530                 return localSequenceNumber;
531         }
532
533
534         boolean lastInsertedNewKey = false;
535
536         private boolean sendToServer(NewKey newKey) throws ServerException {
537
538                 boolean fromRetry = false;
539
540                 try {
541                         if (hadPartialSendToServer) {
542                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
543                                 if (newSlots.length == 0) {
544                                         fromRetry = true;
545                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
546
547                                         if (sendSlotsReturn.getFirst()) {
548                                                 if (newKey != null) {
549                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
550                                                                 newKey = null;
551                                                         }
552                                                 }
553
554                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
555                                                         transaction.resetServerFailure();
556
557                                                         // Update which transactions parts still need to be sent
558                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
559
560                                                         // Add the transaction status to the outstanding list
561                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
562
563                                                         // Update the transaction status
564                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
565
566                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
567                                                         if (transaction.didSendAllParts()) {
568                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
569                                                                 pendingTransactionQueue.remove(transaction);
570                                                         }
571                                                 }
572                                         } else {
573
574                                                 newSlots = sendSlotsReturn.getThird();
575
576                                                 boolean isInserted = false;
577                                                 for (Slot s : newSlots) {
578                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
579                                                                 isInserted = true;
580                                                                 break;
581                                                         }
582                                                 }
583
584                                                 for (Slot s : newSlots) {
585                                                         if (isInserted) {
586                                                                 break;
587                                                         }
588
589                                                         // Process each entry in the slot
590                                                         for (Entry entry : s.getEntries()) {
591
592                                                                 if (entry.getType() == Entry.TypeLastMessage) {
593                                                                         LastMessage lastMessage = (LastMessage)entry;
594                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
595                                                                                 isInserted = true;
596                                                                                 break;
597                                                                         }
598                                                                 }
599                                                         }
600                                                 }
601
602                                                 if (isInserted) {
603                                                         if (newKey != null) {
604                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
605                                                                         newKey = null;
606                                                                 }
607                                                         }
608
609                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
610                                                                 transaction.resetServerFailure();
611
612                                                                 // Update which transactions parts still need to be sent
613                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
614
615                                                                 // Add the transaction status to the outstanding list
616                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
617
618                                                                 // Update the transaction status
619                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
620
621                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
622                                                                 if (transaction.didSendAllParts()) {
623                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
624                                                                         pendingTransactionQueue.remove(transaction);
625                                                                 } else {
626                                                                         transaction.resetServerFailure();
627                                                                         // Set the transaction sequence number back to nothing
628                                                                         if (!transaction.didSendAPartToServer()) {
629                                                                                 transaction.setSequenceNumber(-1);
630                                                                         }
631                                                                 }
632                                                         }
633                                                 }
634                                         }
635
636                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
637                                                 transaction.resetServerFailure();
638                                                 // Set the transaction sequence number back to nothing
639                                                 if (!transaction.didSendAPartToServer()) {
640                                                         transaction.setSequenceNumber(-1);
641                                                 }
642                                         }
643
644                                         if (sendSlotsReturn.getThird().length != 0) {
645                                                 // insert into the local block chain
646                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
647                                         }
648                                         // continue;
649                                 } else {
650                                         boolean isInserted = false;
651                                         for (Slot s : newSlots) {
652                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
653                                                         isInserted = true;
654                                                         break;
655                                                 }
656                                         }
657
658                                         for (Slot s : newSlots) {
659                                                 if (isInserted) {
660                                                         break;
661                                                 }
662
663                                                 // Process each entry in the slot
664                                                 for (Entry entry : s.getEntries()) {
665
666                                                         if (entry.getType() == Entry.TypeLastMessage) {
667                                                                 LastMessage lastMessage = (LastMessage)entry;
668                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
669                                                                         isInserted = true;
670                                                                         break;
671                                                                 }
672                                                         }
673                                                 }
674                                         }
675
676                                         if (isInserted) {
677                                                 if (newKey != null) {
678                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
679                                                                 newKey = null;
680                                                         }
681                                                 }
682
683                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
684                                                         transaction.resetServerFailure();
685
686                                                         // Update which transactions parts still need to be sent
687                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
688
689                                                         // Add the transaction status to the outstanding list
690                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
691
692                                                         // Update the transaction status
693                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
694
695                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
696                                                         if (transaction.didSendAllParts()) {
697                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
698                                                                 pendingTransactionQueue.remove(transaction);
699                                                         } else {
700                                                                 transaction.resetServerFailure();
701                                                                 // Set the transaction sequence number back to nothing
702                                                                 if (!transaction.didSendAPartToServer()) {
703                                                                         transaction.setSequenceNumber(-1);
704                                                                 }
705                                                         }
706                                                 }
707                                         } else {
708                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
709                                                         transaction.resetServerFailure();
710                                                         // Set the transaction sequence number back to nothing
711                                                         if (!transaction.didSendAPartToServer()) {
712                                                                 transaction.setSequenceNumber(-1);
713                                                         }
714                                                 }
715                                         }
716
717                                         // insert into the local block chain
718                                         validateAndUpdate(newSlots, true);
719                                 }
720                         }
721                 } catch (ServerException e) {
722                         throw e;
723                 }
724
725
726
727                 try {
728                         // While we have stuff that needs inserting into the block chain
729                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
730
731                                 fromRetry = false;
732
733                                 if (hadPartialSendToServer) {
734                                         throw new Error("Should Be error free");
735                                 }
736
737
738
739                                 // If there is a new key with same name then end
740                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
741                                         return false;
742                                 }
743
744                                 // Create the slot
745                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
746                                 localSequenceNumber++;
747
748                                 // Try to fill the slot with data
749                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
750                                 boolean needsResize = fillSlotsReturn.getFirst();
751                                 int newSize = fillSlotsReturn.getSecond();
752                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
753
754                                 if (needsResize) {
755                                         // Reset which transaction to send
756                                         for (Transaction transaction : transactionPartsSent.keySet()) {
757                                                 transaction.resetNextPartToSend();
758
759                                                 // Set the transaction sequence number back to nothing
760                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
761                                                         transaction.setSequenceNumber(-1);
762                                                 }
763                                         }
764
765                                         // Clear the sent data since we are trying again
766                                         pendingSendArbitrationEntriesToDelete.clear();
767                                         transactionPartsSent.clear();
768
769                                         // We needed a resize so try again
770                                         fillSlot(slot, true, newKey);
771                                 }
772
773                                 lastSlotAttemptedToSend = slot;
774                                 lastIsNewKey = (newKey != null);
775                                 lastInsertedNewKey = insertedNewKey;
776                                 lastNewSize = newSize;
777                                 lastNewKey = newKey;
778                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
779                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
780
781
782                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
783
784                                 if (sendSlotsReturn.getFirst()) {
785
786                                         // Did insert into the block chain
787
788                                         if (insertedNewKey) {
789                                                 // This slot was what was inserted not a previous slot
790
791                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
792                                                 newKey = null;
793                                         }
794
795                                         // Remove the aborts and commit parts that were sent from the pending to send queue
796                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
797                                                 ArbitrationRound round = iter.next();
798                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
799
800                                                 if (round.isDoneSending()) {
801                                                         // Sent all the parts
802                                                         iter.remove();
803                                                 }
804                                         }
805
806                                         for (Transaction transaction : transactionPartsSent.keySet()) {
807                                                 transaction.resetServerFailure();
808
809                                                 // Update which transactions parts still need to be sent
810                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
811
812                                                 // Add the transaction status to the outstanding list
813                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
814
815                                                 // Update the transaction status
816                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
817
818                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
819                                                 if (transaction.didSendAllParts()) {
820                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
821                                                         pendingTransactionQueue.remove(transaction);
822                                                 }
823                                         }
824                                 } else {
825
826                                         // if (!sendSlotsReturn.getSecond()) {
827                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
828                                         //              transaction.resetServerFailure();
829                                         //      }
830                                         // } else {
831                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
832                                         //              transaction.resetServerFailure();
833
834                                         //              // Update which transactions parts still need to be sent
835                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
836
837                                         //              // Add the transaction status to the outstanding list
838                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
839
840                                         //              // Update the transaction status
841                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
842
843                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
844                                         //              if (transaction.didSendAllParts()) {
845                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
846                                         //                      pendingTransactionQueue.remove(transaction);
847
848                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
849                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
850                                         //                      }
851                                         //              }
852                                         //      }
853                                         // }
854
855                                         // Reset which transaction to send
856                                         for (Transaction transaction : transactionPartsSent.keySet()) {
857                                                 transaction.resetNextPartToSend();
858                                                 // transaction.resetNextPartToSend();
859
860                                                 // Set the transaction sequence number back to nothing
861                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
862                                                         transaction.setSequenceNumber(-1);
863                                                 }
864                                         }
865                                 }
866
867                                 // Clear the sent data in preparation for next send
868                                 pendingSendArbitrationEntriesToDelete.clear();
869                                 transactionPartsSent.clear();
870
871                                 if (sendSlotsReturn.getThird().length != 0) {
872                                         // insert into the local block chain
873                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
874                                 }
875                         }
876
877                 } catch (ServerException e) {
878
879                         if (e.getType() != ServerException.TypeInputTimeout) {
880                                 // e.printStackTrace();
881
882                                 // Nothing was able to be sent to the server so just clear these data structures
883                                 for (Transaction transaction : transactionPartsSent.keySet()) {
884                                         transaction.resetNextPartToSend();
885
886                                         // Set the transaction sequence number back to nothing
887                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
888                                                 transaction.setSequenceNumber(-1);
889                                         }
890                                 }
891                         } else {
892                                 // There was a partial send to the server
893                                 hadPartialSendToServer = true;
894
895
896                                 // if (!fromRetry) {
897                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
898                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
899                                 // }
900
901                                 // Nothing was able to be sent to the server so just clear these data structures
902                                 for (Transaction transaction : transactionPartsSent.keySet()) {
903                                         transaction.resetNextPartToSend();
904                                         transaction.setServerFailure();
905                                 }
906                         }
907
908                         pendingSendArbitrationEntriesToDelete.clear();
909                         transactionPartsSent.clear();
910
911                         throw e;
912                 }
913
914                 return newKey == null;
915         }
916
917         private synchronized boolean updateFromLocal(long machineId) {
918                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
919                 if (localCommunicationInformation == null) {
920                         // Cant talk to that device locally so do nothing
921                         return false;
922                 }
923
924                 // Get the size of the send data
925                 int sendDataSize = Integer.BYTES + Long.BYTES;
926
927                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
928                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
929                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
930                 }
931
932                 byte[] sendData = new byte[sendDataSize];
933                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
934
935                 // Encode the data
936                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
937                 bbEncode.putInt(0);
938
939                 // Send by local
940                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
941                 localSequenceNumber++;
942
943                 if (returnData == null) {
944                         // Could not contact server
945                         return false;
946                 }
947
948                 // Decode the data
949                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
950                 int numberOfEntries = bbDecode.getInt();
951
952                 for (int i = 0; i < numberOfEntries; i++) {
953                         byte type = bbDecode.get();
954                         if (type == Entry.TypeAbort) {
955                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
956                                 processEntry(abort);
957                         } else if (type == Entry.TypeCommitPart) {
958                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
959                                 processEntry(commitPart);
960                         }
961                 }
962
963                 updateLiveStateFromLocal();
964
965                 return true;
966         }
967
968         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
969
970                 // Get the devices local communications
971                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
972
973                 if (localCommunicationInformation == null) {
974                         // Cant talk to that device locally so do nothing
975                         return new Pair<Boolean, Boolean>(true, false);
976                 }
977
978                 // Get the size of the send data
979                 int sendDataSize = Integer.BYTES + Long.BYTES;
980                 for (TransactionPart part : transaction.getParts().values()) {
981                         sendDataSize += part.getSize();
982                 }
983
984                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
985                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
986                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
987                 }
988
989                 // Make the send data size
990                 byte[] sendData = new byte[sendDataSize];
991                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
992
993                 // Encode the data
994                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
995                 bbEncode.putInt(transaction.getParts().size());
996                 for (TransactionPart part : transaction.getParts().values()) {
997                         part.encode(bbEncode);
998                 }
999
1000
1001                 // Send by local
1002                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
1003                 localSequenceNumber++;
1004
1005                 if (returnData == null) {
1006                         // Could not contact server
1007                         return new Pair<Boolean, Boolean>(true, false);
1008                 }
1009
1010                 // Decode the data
1011                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
1012                 boolean didCommit = bbDecode.get() == 1;
1013                 boolean couldArbitrate = bbDecode.get() == 1;
1014                 int numberOfEntries = bbDecode.getInt();
1015                 boolean foundAbort = false;
1016
1017                 for (int i = 0; i < numberOfEntries; i++) {
1018                         byte type = bbDecode.get();
1019                         if (type == Entry.TypeAbort) {
1020                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
1021
1022                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1023                                         foundAbort = true;
1024                                 }
1025
1026                                 processEntry(abort);
1027                         } else if (type == Entry.TypeCommitPart) {
1028                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
1029                                 processEntry(commitPart);
1030                         }
1031                 }
1032
1033                 updateLiveStateFromLocal();
1034
1035                 if (couldArbitrate) {
1036                         TransactionStatus status =  transaction.getTransactionStatus();
1037                         if (didCommit) {
1038                                 status.setStatus(TransactionStatus.StatusCommitted);
1039                         } else {
1040                                 status.setStatus(TransactionStatus.StatusAborted);
1041                         }
1042                 } else {
1043                         TransactionStatus status =  transaction.getTransactionStatus();
1044                         if (foundAbort) {
1045                                 status.setStatus(TransactionStatus.StatusAborted);
1046                         } else {
1047                                 status.setStatus(TransactionStatus.StatusCommitted);
1048                         }
1049                 }
1050
1051                 return new Pair<Boolean, Boolean>(false, true);
1052         }
1053
1054         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1055
1056                 // Decode the data
1057                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1058                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1059                 int numberOfParts = bbDecode.getInt();
1060
1061                 // If we did commit a transaction or not
1062                 boolean didCommit = false;
1063                 boolean couldArbitrate = false;
1064
1065                 if (numberOfParts != 0) {
1066
1067                         // decode the transaction
1068                         Transaction transaction = new Transaction();
1069                         for (int i = 0; i < numberOfParts; i++) {
1070                                 bbDecode.get();
1071                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1072                                 transaction.addPartDecode(newPart);
1073                         }
1074
1075                         // Arbitrate on transaction and pull relevant return data
1076                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1077                         couldArbitrate = localArbitrateReturn.getFirst();
1078                         didCommit = localArbitrateReturn.getSecond();
1079
1080                         updateLiveStateFromLocal();
1081
1082                         // Transaction was sent to the server so keep track of it to prevent double commit
1083                         if (transaction.getSequenceNumber() != -1) {
1084                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1085                         }
1086                 }
1087
1088                 // The data to send back
1089                 int returnDataSize = 0;
1090                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1091
1092                 // Get the aborts to send back
1093                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1094                 Collections.sort(abortLocalSequenceNumbers);
1095                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1096                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1097                                 continue;
1098                         }
1099
1100                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1101                         unseenArbitrations.add(abort);
1102                         returnDataSize += abort.getSize();
1103                 }
1104
1105                 // Get the commits to send back
1106                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1107                 if (commitForClientTable != null) {
1108                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1109                         Collections.sort(commitLocalSequenceNumbers);
1110
1111                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1112                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1113
1114                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1115                                         continue;
1116                                 }
1117
1118                                 unseenArbitrations.addAll(commit.getParts().values());
1119
1120                                 for (CommitPart commitPart : commit.getParts().values()) {
1121                                         returnDataSize += commitPart.getSize();
1122                                 }
1123                         }
1124                 }
1125
1126                 // Number of arbitration entries to decode
1127                 returnDataSize += 2 * Integer.BYTES;
1128
1129                 // Boolean of did commit or not
1130                 if (numberOfParts != 0) {
1131                         returnDataSize += Byte.BYTES;
1132                 }
1133
1134                 // Data to send Back
1135                 byte[] returnData = new byte[returnDataSize];
1136                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1137
1138                 if (numberOfParts != 0) {
1139                         if (didCommit) {
1140                                 bbEncode.put((byte)1);
1141                         } else {
1142                                 bbEncode.put((byte)0);
1143                         }
1144                         if (couldArbitrate) {
1145                                 bbEncode.put((byte)1);
1146                         } else {
1147                                 bbEncode.put((byte)0);
1148                         }
1149                 }
1150
1151                 bbEncode.putInt(unseenArbitrations.size());
1152                 for (Entry entry : unseenArbitrations) {
1153                         entry.encode(bbEncode);
1154                 }
1155
1156
1157                 localSequenceNumber++;
1158                 return returnData;
1159         }
1160
1161         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1162
1163                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1164                 attemptedToSendToServer = true;
1165
1166                 boolean inserted = false;
1167                 boolean lastTryInserted = false;
1168
1169                 Slot[] array = cloud.putSlot(slot, newSize);
1170                 if (array == null) {
1171                         array = new Slot[] {slot};
1172                         rejectedSlotList.clear();
1173                         inserted = true;
1174                 }       else {
1175                         if (array.length == 0) {
1176                                 throw new Error("Server Error: Did not send any slots");
1177                         }
1178
1179                         // if (attemptedToSendToServerTmp) {
1180                         if (hadPartialSendToServer) {
1181
1182                                 boolean isInserted = false;
1183                                 for (Slot s : array) {
1184                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1185                                                 isInserted = true;
1186                                                 break;
1187                                         }
1188                                 }
1189
1190                                 for (Slot s : array) {
1191                                         if (isInserted) {
1192                                                 break;
1193                                         }
1194
1195                                         // Process each entry in the slot
1196                                         for (Entry entry : s.getEntries()) {
1197
1198                                                 if (entry.getType() == Entry.TypeLastMessage) {
1199                                                         LastMessage lastMessage = (LastMessage)entry;
1200
1201                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1202                                                                 isInserted = true;
1203                                                                 break;
1204                                                         }
1205                                                 }
1206                                         }
1207                                 }
1208
1209                                 if (!isInserted) {
1210                                         rejectedSlotList.add(slot.getSequenceNumber());
1211                                         lastTryInserted = false;
1212                                 } else {
1213                                         lastTryInserted = true;
1214                                 }
1215                         } else {
1216                                 rejectedSlotList.add(slot.getSequenceNumber());
1217                                 lastTryInserted = false;
1218                         }
1219                 }
1220
1221                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1222         }
1223
1224         /**
1225          * Returns false if a resize was needed
1226          */
1227         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1228
1229
1230                 int newSize = 0;
1231                 if (liveSlotCount > bufferResizeThreshold) {
1232                         resize = true; //Resize is forced
1233
1234                 }
1235
1236                 if (resize) {
1237                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1238                         TableStatus status = new TableStatus(slot, newSize);
1239                         slot.addEntry(status);
1240                 }
1241
1242                 // Fill with rejected slots first before doing anything else
1243                 doRejectedMessages(slot);
1244
1245                 // Do mandatory rescue of entries
1246                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1247
1248                 // Extract working variables
1249                 boolean needsResize = mandatoryRescueReturn.getFirst();
1250                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1251                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1252
1253                 if (needsResize && !resize) {
1254                         // We need to resize but we are not resizing so return false
1255                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1256                 }
1257
1258                 boolean inserted = false;
1259                 if (newKeyEntry != null) {
1260                         newKeyEntry.setSlot(slot);
1261                         if (slot.hasSpace(newKeyEntry)) {
1262
1263                                 slot.addEntry(newKeyEntry);
1264                                 inserted = true;
1265                         }
1266                 }
1267
1268                 // Clear the transactions, aborts and commits that were sent previously
1269                 transactionPartsSent.clear();
1270                 pendingSendArbitrationEntriesToDelete.clear();
1271
1272                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1273                         boolean isFull = false;
1274                         round.generateParts();
1275                         List<Entry> parts = round.getParts();
1276
1277                         // Insert pending arbitration data
1278                         for (Entry arbitrationData : parts) {
1279
1280                                 // If it is an abort then we need to set some information
1281                                 if (arbitrationData instanceof Abort) {
1282                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1283                                 }
1284
1285                                 if (!slot.hasSpace(arbitrationData)) {
1286                                         // No space so cant do anything else with these data entries
1287                                         isFull = true;
1288                                         break;
1289                                 }
1290
1291                                 // Add to this current slot and add it to entries to delete
1292                                 slot.addEntry(arbitrationData);
1293                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1294                         }
1295
1296                         if (isFull) {
1297                                 break;
1298                         }
1299                 }
1300
1301                 if (pendingTransactionQueue.size() > 0) {
1302
1303                         Transaction transaction = pendingTransactionQueue.get(0);
1304
1305                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1306                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1307                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1308                         // }
1309
1310                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1311                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1312                         }
1313
1314
1315                         while (true) {
1316                                 TransactionPart part = transaction.getNextPartToSend();
1317
1318                                 if (part == null) {
1319                                         // Ran out of parts to send for this transaction so move on
1320                                         break;
1321                                 }
1322
1323                                 if (slot.hasSpace(part)) {
1324                                         slot.addEntry(part);
1325                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1326                                         if (partsSent == null) {
1327                                                 partsSent = new ArrayList<Integer>();
1328                                                 transactionPartsSent.put(transaction, partsSent);
1329                                         }
1330                                         partsSent.add(part.getPartNumber());
1331                                         transactionPartsSent.put(transaction, partsSent);
1332                                 } else {
1333                                         break;
1334                                 }
1335                         }
1336                 }
1337
1338                 // Fill the remainder of the slot with rescue data
1339                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1340
1341                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1342         }
1343
1344         private void doRejectedMessages(Slot s) {
1345                 if (! rejectedSlotList.isEmpty()) {
1346                         /* TODO: We should avoid generating a rejected message entry if
1347                          * there is already a sufficient entry in the queue (e.g.,
1348                          * equalsto value of true and same sequence number).  */
1349
1350                         long old_seqn = rejectedSlotList.firstElement();
1351                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1352                                 long new_seqn = rejectedSlotList.lastElement();
1353                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1354                                 s.addEntry(rm);
1355                         } else {
1356                                 long prev_seqn = -1;
1357                                 int i = 0;
1358                                 /* Go through list of missing messages */
1359                                 for (; i < rejectedSlotList.size(); i++) {
1360                                         long curr_seqn = rejectedSlotList.get(i);
1361                                         Slot s_msg = buffer.getSlot(curr_seqn);
1362                                         if (s_msg != null)
1363                                                 break;
1364                                         prev_seqn = curr_seqn;
1365                                 }
1366                                 /* Generate rejected message entry for missing messages */
1367                                 if (prev_seqn != -1) {
1368                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1369                                         s.addEntry(rm);
1370                                 }
1371                                 /* Generate rejected message entries for present messages */
1372                                 for (; i < rejectedSlotList.size(); i++) {
1373                                         long curr_seqn = rejectedSlotList.get(i);
1374                                         Slot s_msg = buffer.getSlot(curr_seqn);
1375                                         long machineid = s_msg.getMachineID();
1376                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1377                                         s.addEntry(rm);
1378                                 }
1379                         }
1380                 }
1381         }
1382
1383         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1384                 long newestSequenceNumber = buffer.getNewestSeqNum();
1385                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1386                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1387                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1388                 }
1389
1390                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1391                 boolean seenLiveSlot = false;
1392                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1393                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1394
1395
1396                 // Mandatory Rescue
1397                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1398                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1399                         // Push slot number forward
1400                         if (! seenLiveSlot) {
1401                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1402                         }
1403
1404                         if (!previousSlot.isLive()) {
1405                                 continue;
1406                         }
1407
1408                         // We have seen a live slot
1409                         seenLiveSlot = true;
1410
1411                         // Get all the live entries for a slot
1412                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1413
1414                         // Iterate over all the live entries and try to rescue them
1415                         for (Entry liveEntry : liveEntries) {
1416                                 if (slot.hasSpace(liveEntry)) {
1417
1418                                         // Enough space to rescue the entry
1419                                         slot.addEntry(liveEntry);
1420                                 } else if (currentSequenceNumber == firstIfFull) {
1421                                         //if there's no space but the entry is about to fall off the queue
1422                                         System.out.println("B"); //?
1423                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1424
1425                                 }
1426                         }
1427                 }
1428
1429                 // Did not resize
1430                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1431         }
1432
1433         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1434                 /* now go through live entries from least to greatest sequence number until
1435                  * either all live slots added, or the slot doesn't have enough room
1436                  * for SKIP_THRESHOLD consecutive entries*/
1437                 int skipcount = 0;
1438                 long newestseqnum = buffer.getNewestSeqNum();
1439                 search:
1440                 for (; seqn <= newestseqnum; seqn++) {
1441                         Slot prevslot = buffer.getSlot(seqn);
1442                         //Push slot number forward
1443                         if (!seenliveslot)
1444                                 oldestLiveSlotSequenceNumver = seqn;
1445
1446                         if (!prevslot.isLive())
1447                                 continue;
1448                         seenliveslot = true;
1449                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1450                         for (Entry liveentry : liveentries) {
1451                                 if (s.hasSpace(liveentry))
1452                                         s.addEntry(liveentry);
1453                                 else {
1454                                         skipcount++;
1455                                         if (skipcount > SKIP_THRESHOLD)
1456                                                 break search;
1457                                 }
1458                         }
1459                 }
1460         }
1461
1462         /**
1463          * Checks for malicious activity and updates the local copy of the block chain.
1464          */
1465         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1466
1467                 // The cloud communication layer has checked slot HMACs already before decoding
1468                 if (newSlots.length == 0) {
1469                         return;
1470                 }
1471
1472                 // Make sure all slots are newer than the last largest slot this client has seen
1473                 long firstSeqNum = newSlots[0].getSequenceNumber();
1474                 if (firstSeqNum <= sequenceNumber) {
1475                         throw new Error("Server Error: Sent older slots!");
1476                 }
1477
1478                 // Create an object that can access both new slots and slots in our local chain
1479                 // without committing slots to our local chain
1480                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1481
1482                 // Check that the HMAC chain is not broken
1483                 checkHMACChain(indexer, newSlots);
1484
1485                 // Set to keep track of messages from clients
1486                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1487
1488                 // Process each slots data
1489                 for (Slot slot : newSlots) {
1490                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1491
1492                         updateExpectedSize();
1493                 }
1494
1495                 // If there is a gap, check to see if the server sent us everything.
1496                 if (firstSeqNum != (sequenceNumber + 1)) {
1497
1498                         // Check the size of the slots that were sent down by the server.
1499                         // Can only check the size if there was a gap
1500                         checkNumSlots(newSlots.length);
1501
1502                         // Since there was a gap every machine must have pushed a slot or must have
1503                         // a last message message.  If not then the server is hiding slots
1504                         if (!machineSet.isEmpty()) {
1505                                 throw new Error("Missing record for machines: " + machineSet);
1506                         }
1507                 }
1508
1509                 // Update the size of our local block chain.
1510                 commitNewMaxSize();
1511
1512                 // Commit new to slots to the local block chain.
1513                 for (Slot slot : newSlots) {
1514
1515                         // Insert this slot into our local block chain copy.
1516                         buffer.putSlot(slot);
1517
1518                         // Keep track of how many slots are currently live (have live data in them).
1519                         liveSlotCount++;
1520                 }
1521
1522                 // Get the sequence number of the latest slot in the system
1523                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1524
1525                 updateLiveStateFromServer();
1526
1527                 // No Need to remember after we pulled from the server
1528                 offlineTransactionsCommittedAndAtServer.clear();
1529
1530                 // This is invalidated now
1531                 hadPartialSendToServer = false;
1532         }
1533
1534         private void updateLiveStateFromServer() {
1535                 // Process the new transaction parts
1536                 processNewTransactionParts();
1537
1538                 // Do arbitration on new transactions that were received
1539                 arbitrateFromServer();
1540
1541                 // Update all the committed keys
1542                 boolean didCommitOrSpeculate = updateCommittedTable();
1543
1544                 // Delete the transactions that are now dead
1545                 updateLiveTransactionsAndStatus();
1546
1547                 // Do speculations
1548                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1549                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1550         }
1551
1552         private void updateLiveStateFromLocal() {
1553                 // Update all the committed keys
1554                 boolean didCommitOrSpeculate = updateCommittedTable();
1555
1556                 // Delete the transactions that are now dead
1557                 updateLiveTransactionsAndStatus();
1558
1559                 // Do speculations
1560                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1561                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1562         }
1563
1564         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1565                 // if (didFindTableStatus) {
1566                 // return;
1567                 // }
1568                 long prevslots = firstSequenceNumber;
1569
1570
1571                 if (didFindTableStatus) {
1572                         // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1573                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1574
1575                 } else {
1576                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1577                         // System.out.println("Here: " + expectedsize);
1578                 }
1579
1580                 // System.out.println(numberOfSlots);
1581
1582                 didFindTableStatus = true;
1583                 currMaxSize = numberOfSlots;
1584         }
1585
1586         private void updateExpectedSize() {
1587                 expectedsize++;
1588
1589                 if (expectedsize > currMaxSize) {
1590                         expectedsize = currMaxSize;
1591                 }
1592         }
1593
1594
1595         /**
1596          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1597          * This is only called when we have a gap between the slots that we have locally and the slots
1598          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1599          * status message
1600          */
1601         private void checkNumSlots(int numberOfSlots) {
1602                 if (numberOfSlots != expectedsize) {
1603                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1604                 }
1605         }
1606
1607         private void updateCurrMaxSize(int newmaxsize) {
1608                 currMaxSize = newmaxsize;
1609         }
1610
1611
1612         /**
1613          * Update the size of of the local buffer if it is needed.
1614          */
1615         private void commitNewMaxSize() {
1616                 didFindTableStatus = false;
1617
1618                 // Resize the local slot buffer
1619                 if (numberOfSlots != currMaxSize) {
1620                         buffer.resize((int)currMaxSize);
1621                 }
1622
1623                 // Change the number of local slots to the new size
1624                 numberOfSlots = (int)currMaxSize;
1625
1626
1627                 // Recalculate the resize threshold since the size of the local buffer has changed
1628                 setResizeThreshold();
1629         }
1630
1631         /**
1632          * Process the new transaction parts from this latest round of slots received from the server
1633          */
1634         private void processNewTransactionParts() {
1635
1636                 if (newTransactionParts.size() == 0) {
1637                         // Nothing new to process
1638                         return;
1639                 }
1640
1641                 // Iterate through all the machine Ids that we received new parts for
1642                 for (Long machineId : newTransactionParts.keySet()) {
1643                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1644
1645                         // Iterate through all the parts for that machine Id
1646                         for (Pair<Long, Integer> partId : parts.keySet()) {
1647                                 TransactionPart part = parts.get(partId);
1648
1649                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1650                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1651                                         // Set dead the transaction part
1652                                         part.setDead();
1653                                         continue;
1654                                 }
1655
1656                                 // Get the transaction object for that sequence number
1657                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1658
1659                                 if (transaction == null) {
1660                                         // This is a new transaction that we dont have so make a new one
1661                                         transaction = new Transaction();
1662
1663                                         // Insert this new transaction into the live tables
1664                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1665                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1666                                 }
1667
1668                                 // Add that part to the transaction
1669                                 transaction.addPartDecode(part);
1670                         }
1671                 }
1672
1673                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1674                 newTransactionParts.clear();
1675         }
1676
1677
1678         private long lastSeqNumArbOn = 0;
1679
1680         private void arbitrateFromServer() {
1681
1682                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1683                         // Nothing to arbitrate on so move on
1684                         return;
1685                 }
1686
1687                 // Get the transaction sequence numbers and sort from oldest to newest
1688                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1689                 Collections.sort(transactionSequenceNumbers);
1690
1691                 // Collection of key value pairs that are
1692                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1693
1694                 // The last transaction arbitrated on
1695                 long lastTransactionCommitted = -1;
1696                 Set<Abort> generatedAborts = new HashSet<Abort>();
1697
1698                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1699                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1700
1701
1702
1703                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1704                         if (transaction.getArbitrator() != localMachineId) {
1705                                 continue;
1706                         }
1707
1708                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1709                                 continue;
1710                         }
1711
1712                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1713                                 // We have seen this already locally so dont commit again
1714                                 continue;
1715                         }
1716
1717
1718                         if (!transaction.isComplete()) {
1719                                 // Will arbitrate in incorrect order if we continue so just break
1720                                 // Most likely this
1721                                 break;
1722                         }
1723
1724
1725                         // update the largest transaction seen by arbitrator from server
1726                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1727                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1728                         } else {
1729                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1730                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1731                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1732                                 }
1733                         }
1734
1735                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1736                                 // Guard evaluated as true
1737
1738                                 // Update the local changes so we can make the commit
1739                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1740                                         speculativeTableTmp.put(kv.getKey(), kv);
1741                                 }
1742
1743                                 // Update what the last transaction committed was for use in batch commit
1744                                 lastTransactionCommitted = transactionSequenceNumber;
1745                         } else {
1746                                 // Guard evaluated was false so create abort
1747
1748                                 // Create the abort
1749                                 Abort newAbort = new Abort(null,
1750                                                 transaction.getClientLocalSequenceNumber(),
1751                                                 transaction.getSequenceNumber(),
1752                                                 transaction.getMachineId(),
1753                                                 transaction.getArbitrator(),
1754                                                 localArbitrationSequenceNumber);
1755                                 localArbitrationSequenceNumber++;
1756
1757                                 generatedAborts.add(newAbort);
1758
1759                                 // Insert the abort so we can process
1760                                 processEntry(newAbort);
1761                         }
1762
1763                         lastSeqNumArbOn = transactionSequenceNumber;
1764
1765                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1766                 }
1767
1768                 Commit newCommit = null;
1769
1770                 // If there is something to commit
1771                 if (speculativeTableTmp.size() != 0) {
1772
1773                         // Create the commit and increment the commit sequence number
1774                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1775                         localArbitrationSequenceNumber++;
1776
1777                         // Add all the new keys to the commit
1778                         for (KeyValue kv : speculativeTableTmp.values()) {
1779                                 newCommit.addKV(kv);
1780                         }
1781
1782                         // create the commit parts
1783                         newCommit.createCommitParts();
1784
1785                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1786
1787                         // Insert the commit so we can process it
1788                         for (CommitPart commitPart : newCommit.getParts().values()) {
1789                                 processEntry(commitPart);
1790                         }
1791                 }
1792
1793                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1794                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1795                         pendingSendArbitrationRounds.add(arbitrationRound);
1796
1797                         if (compactArbitrationData()) {
1798                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1799                                 if (newArbitrationRound.getCommit() != null) {
1800                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1801                                                 processEntry(commitPart);
1802                                         }
1803                                 }
1804                         }
1805                 }
1806         }
1807
1808         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1809
1810                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1811                 if (transaction.getArbitrator() != localMachineId) {
1812                         return new Pair<Boolean, Boolean>(false, false);
1813                 }
1814
1815                 if (!transaction.isComplete()) {
1816                         // Will arbitrate in incorrect order if we continue so just break
1817                         // Most likely this
1818                         return new Pair<Boolean, Boolean>(false, false);
1819                 }
1820
1821                 if (transaction.getMachineId() != localMachineId) {
1822                         // dont do this check for local transactions
1823                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1824                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1825                                         // We've have already seen this from the server
1826                                         return new Pair<Boolean, Boolean>(false, false);
1827                                 }
1828                         }
1829                 }
1830
1831                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1832                         // Guard evaluated as true
1833
1834                         // Create the commit and increment the commit sequence number
1835                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1836                         localArbitrationSequenceNumber++;
1837
1838                         // Update the local changes so we can make the commit
1839                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1840                                 newCommit.addKV(kv);
1841                         }
1842
1843                         // create the commit parts
1844                         newCommit.createCommitParts();
1845
1846                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1847                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1848                         pendingSendArbitrationRounds.add(arbitrationRound);
1849
1850                         if (compactArbitrationData()) {
1851                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1852                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1853                                         processEntry(commitPart);
1854                                 }
1855                         } else {
1856                                 // Insert the commit so we can process it
1857                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1858                                         processEntry(commitPart);
1859                                 }
1860                         }
1861
1862                         if (transaction.getMachineId() == localMachineId) {
1863                                 TransactionStatus status = transaction.getTransactionStatus();
1864                                 if (status != null) {
1865                                         status.setStatus(TransactionStatus.StatusCommitted);
1866                                 }
1867                         }
1868
1869                         updateLiveStateFromLocal();
1870                         return new Pair<Boolean, Boolean>(true, true);
1871                 } else {
1872
1873                         if (transaction.getMachineId() == localMachineId) {
1874                                 // For locally created messages update the status
1875
1876                                 // Guard evaluated was false so create abort
1877                                 TransactionStatus status = transaction.getTransactionStatus();
1878                                 if (status != null) {
1879                                         status.setStatus(TransactionStatus.StatusAborted);
1880                                 }
1881                         } else {
1882                                 Set addAbortSet = new HashSet<Abort>();
1883
1884
1885                                 // Create the abort
1886                                 Abort newAbort = new Abort(null,
1887                                                 transaction.getClientLocalSequenceNumber(),
1888                                                 -1,
1889                                                 transaction.getMachineId(),
1890                                                 transaction.getArbitrator(),
1891                                                 localArbitrationSequenceNumber);
1892                                 localArbitrationSequenceNumber++;
1893
1894                                 addAbortSet.add(newAbort);
1895
1896
1897                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1898                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1899                                 pendingSendArbitrationRounds.add(arbitrationRound);
1900
1901                                 if (compactArbitrationData()) {
1902                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1903                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1904                                                 processEntry(commitPart);
1905                                         }
1906                                 }
1907                         }
1908
1909                         updateLiveStateFromLocal();
1910                         return new Pair<Boolean, Boolean>(true, false);
1911                 }
1912         }
1913
1914         /**
1915          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1916          */
1917         private boolean compactArbitrationData() {
1918
1919                 if (pendingSendArbitrationRounds.size() < 2) {
1920                         // Nothing to compact so do nothing
1921                         return false;
1922                 }
1923
1924                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1925                 if (lastRound.didSendPart()) {
1926                         return false;
1927                 }
1928
1929                 boolean hadCommit = (lastRound.getCommit() == null);
1930                 boolean gotNewCommit = false;
1931
1932                 int numberToDelete = 1;
1933                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1934                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1935
1936                         if (round.isFull() || round.didSendPart()) {
1937                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1938                                 break;
1939                         }
1940
1941                         if (round.getCommit() == null) {
1942
1943                                 // Try compacting aborts only
1944                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1945                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1946                                         // Cant compact since it would be too large
1947                                         break;
1948                                 }
1949                                 lastRound.addAborts(round.getAborts());
1950                         } else {
1951
1952                                 // Create a new larger commit
1953                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1954                                 localArbitrationSequenceNumber++;
1955
1956                                 // Create the commit parts so that we can count them
1957                                 newCommit.createCommitParts();
1958
1959                                 // Calculate the new size of the parts
1960                                 int newSize = newCommit.getNumberOfParts();
1961                                 newSize += lastRound.getAbortsCount();
1962                                 newSize += round.getAbortsCount();
1963
1964                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1965                                         // Cant compact since it would be too large
1966                                         break;
1967                                 }
1968
1969                                 // Set the new compacted part
1970                                 lastRound.setCommit(newCommit);
1971                                 lastRound.addAborts(round.getAborts());
1972                                 gotNewCommit = true;
1973                         }
1974
1975                         numberToDelete++;
1976                 }
1977
1978                 if (numberToDelete != 1) {
1979                         // If there is a compaction
1980
1981                         // Delete the previous pieces that are now in the new compacted piece
1982                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1983                                 pendingSendArbitrationRounds.clear();
1984                         } else {
1985                                 for (int i = 0; i < numberToDelete; i++) {
1986                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1987                                 }
1988                         }
1989
1990                         // Add the new compacted into the pending to send list
1991                         pendingSendArbitrationRounds.add(lastRound);
1992
1993                         // Should reinsert into the commit processor
1994                         if (hadCommit && gotNewCommit) {
1995                                 return true;
1996                         }
1997                 }
1998
1999                 return false;
2000         }
2001         // private boolean compactArbitrationData() {
2002         //      return false;
2003         // }
2004
2005         /**
2006          * Update all the commits and the committed tables, sets dead the dead transactions
2007          */
2008         private boolean updateCommittedTable() {
2009
2010                 if (newCommitParts.size() == 0) {
2011                         // Nothing new to process
2012                         return false;
2013                 }
2014
2015                 // Iterate through all the machine Ids that we received new parts for
2016                 for (Long machineId : newCommitParts.keySet()) {
2017                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
2018
2019                         // Iterate through all the parts for that machine Id
2020                         for (Pair<Long, Integer> partId : parts.keySet()) {
2021                                 CommitPart part = parts.get(partId);
2022
2023                                 // Get the transaction object for that sequence number
2024                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2025
2026                                 if (commitForClientTable == null) {
2027                                         // This is the first commit from this device
2028                                         commitForClientTable = new HashMap<Long, Commit>();
2029                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2030                                 }
2031
2032                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2033
2034                                 if (commit == null) {
2035                                         // This is a new commit that we dont have so make a new one
2036                                         commit = new Commit();
2037
2038                                         // Insert this new commit into the live tables
2039                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2040                                 }
2041
2042                                 // Add that part to the commit
2043                                 commit.addPartDecode(part);
2044                         }
2045                 }
2046
2047                 // Clear all the new commits parts in preparation for the next time the server sends slots
2048                 newCommitParts.clear();
2049
2050                 // If we process a new commit keep track of it for future use
2051                 boolean didProcessANewCommit = false;
2052
2053                 // Process the commits one by one
2054                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2055
2056                         // Get all the commits for a specific arbitrator
2057                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2058
2059                         // Sort the commits in order
2060                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2061                         Collections.sort(commitSequenceNumbers);
2062
2063                         // Get the last commit seen from this arbitrator
2064                         long lastCommitSeenSequenceNumber = -1;
2065                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2066                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2067                         }
2068
2069                         // Go through each new commit one by one
2070                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2071                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2072                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2073
2074                                 // Special processing if a commit is not complete
2075                                 if (!commit.isComplete()) {
2076                                         if (i == (commitSequenceNumbers.size() - 1)) {
2077                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2078                                                 break;
2079                                         } else {
2080                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2081                                                 // Delete it and move on
2082                                                 commit.setDead();
2083                                                 commitForClientTable.remove(commit.getSequenceNumber());
2084                                                 continue;
2085                                         }
2086                                 }
2087
2088                                 // Update the last transaction that was updated if we can
2089                                 if (commit.getTransactionSequenceNumber() != -1) {
2090                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2091
2092                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2093                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2094                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2095                                         }
2096                                 }
2097
2098                                 // Update the last arbitration data that we have seen so far
2099                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2100
2101                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2102                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2103                                                 // Is larger
2104                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2105                                         }
2106                                 } else {
2107                                         // Never seen any data from this arbitrator so record the first one
2108                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2109                                 }
2110
2111                                 // We have already seen this commit before so need to do the full processing on this commit
2112                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2113
2114                                         // Update the last transaction that was updated if we can
2115                                         if (commit.getTransactionSequenceNumber() != -1) {
2116                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2117
2118                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2119                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2120                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2121                                                 }
2122                                         }
2123
2124                                         continue;
2125                                 }
2126
2127                                 // If we got here then this is a brand new commit and needs full processing
2128
2129                                 // Get what commits should be edited, these are the commits that have live values for their keys
2130                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2131                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2132                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2133                                 }
2134                                 commitsToEdit.remove(null); // remove null since it could be in this set
2135
2136                                 // Update each previous commit that needs to be updated
2137                                 for (Commit previousCommit : commitsToEdit) {
2138
2139                                         // Only bother with live commits (TODO: Maybe remove this check)
2140                                         if (previousCommit.isLive()) {
2141
2142                                                 // Update which keys in the old commits are still live
2143                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2144                                                         previousCommit.invalidateKey(kv.getKey());
2145                                                 }
2146
2147                                                 // if the commit is now dead then remove it
2148                                                 if (!previousCommit.isLive()) {
2149                                                         commitForClientTable.remove(previousCommit);
2150                                                 }
2151                                         }
2152                                 }
2153
2154                                 // Update the last seen sequence number from this arbitrator
2155                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2156                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2157                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2158                                         }
2159                                 } else {
2160                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2161                                 }
2162
2163                                 // We processed a new commit that we havent seen before
2164                                 didProcessANewCommit = true;
2165
2166                                 // Update the committed table of keys and which commit is using which key
2167                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2168                                         committedKeyValueTable.put(kv.getKey(), kv);
2169                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2170                                 }
2171                         }
2172                 }
2173
2174                 return didProcessANewCommit;
2175         }
2176
2177         /**
2178          * Create the speculative table from transactions that are still live and have come from the cloud
2179          */
2180         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2181                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2182                         // There is nothing to speculate on
2183                         return false;
2184                 }
2185
2186                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2187                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2188                 Collections.sort(transactionSequenceNumbersSorted);
2189
2190                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2191
2192
2193                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2194                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2195                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2196
2197                         // Start from scratch
2198                         speculatedKeyValueTable.clear();
2199                         lastTransactionSequenceNumberSpeculatedOn = -1;
2200                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2201
2202                 }
2203
2204                 // Remember the front of the transaction list
2205                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2206
2207                 // Find where to start arbitration from
2208                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2209
2210                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2211                         // Make sure we are not out of bounds
2212                         return false; // did not speculate
2213                 }
2214
2215                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2216                 boolean didSkip = true;
2217
2218                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2219                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2220                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2221
2222                         if (!transaction.isComplete()) {
2223                                 // If there is an incomplete transaction then there is nothing we can do
2224                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2225                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2226                                 didSkip = true;
2227                                 continue;
2228                         }
2229
2230                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2231                                 continue;
2232                         }
2233
2234                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2235
2236                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2237                                 // Guard evaluated to true so update the speculative table
2238                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2239                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2240                                 }
2241                         }
2242                 }
2243
2244                 if (didSkip) {
2245                         // Since there was a skip we need to redo the speculation next time around
2246                         lastTransactionSequenceNumberSpeculatedOn = -1;
2247                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2248                 }
2249
2250                 // We did some speculation
2251                 return true;
2252         }
2253
2254         /**
2255          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2256          */
2257         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2258                 if (pendingTransactionQueue.size() == 0) {
2259                         // There is nothing to speculate on
2260                         return;
2261                 }
2262
2263
2264                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2265                         // need to reset on the pending speculation
2266                         lastPendingTransactionSpeculatedOn = null;
2267                         firstPendingTransaction = pendingTransactionQueue.get(0);
2268                         pendingTransactionSpeculatedKeyValueTable.clear();
2269                 }
2270
2271                 // Find where to start arbitration from
2272                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2273
2274                 if (startIndex >= pendingTransactionQueue.size()) {
2275                         // Make sure we are not out of bounds
2276                         return;
2277                 }
2278
2279                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2280                         Transaction transaction = pendingTransactionQueue.get(i);
2281
2282                         lastPendingTransactionSpeculatedOn = transaction;
2283
2284                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2285                                 // Guard evaluated to true so update the speculative table
2286                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2287                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2288                                 }
2289                         }
2290                 }
2291         }
2292
2293         /**
2294          * Set dead and remove from the live transaction tables the transactions that are dead
2295          */
2296         private void updateLiveTransactionsAndStatus() {
2297
2298                 // Go through each of the transactions
2299                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2300                         Transaction transaction = iter.next().getValue();
2301
2302                         // Check if the transaction is dead
2303                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2304                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2305
2306                                 // Set dead the transaction
2307                                 transaction.setDead();
2308
2309                                 // Remove the transaction from the live table
2310                                 iter.remove();
2311                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2312                         }
2313                 }
2314
2315                 // Go through each of the transactions
2316                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2317                         TransactionStatus status = iter.next().getValue();
2318
2319                         // Check if the transaction is dead
2320                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2321                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2322
2323                                 // Set committed
2324                                 status.setStatus(TransactionStatus.StatusCommitted);
2325
2326                                 // Remove
2327                                 iter.remove();
2328                         }
2329                 }
2330         }
2331
2332         /**
2333          * Process this slot, entry by entry.  Also update the latest message sent by slot
2334          */
2335         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2336
2337                 // Update the last message seen
2338                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2339
2340                 // Process each entry in the slot
2341                 for (Entry entry : slot.getEntries()) {
2342                         switch (entry.getType()) {
2343
2344                                 case Entry.TypeCommitPart:
2345                                         processEntry((CommitPart)entry);
2346                                         break;
2347
2348                                 case Entry.TypeAbort:
2349                                         processEntry((Abort)entry);
2350                                         break;
2351
2352                                 case Entry.TypeTransactionPart:
2353                                         processEntry((TransactionPart)entry);
2354                                         break;
2355
2356                                 case Entry.TypeNewKey:
2357                                         processEntry((NewKey)entry);
2358                                         break;
2359
2360                                 case Entry.TypeLastMessage:
2361                                         processEntry((LastMessage)entry, machineSet);
2362                                         break;
2363
2364                                 case Entry.TypeRejectedMessage:
2365                                         processEntry((RejectedMessage)entry, indexer);
2366                                         break;
2367
2368                                 case Entry.TypeTableStatus:
2369                                         processEntry((TableStatus)entry, slot.getSequenceNumber());
2370                                         break;
2371
2372                                 default:
2373                                         throw new Error("Unrecognized type: " + entry.getType());
2374                         }
2375                 }
2376         }
2377
2378         /**
2379          * Update the last message that was sent for a machine Id
2380          */
2381         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2382                 // Update what the last message received by a machine was
2383                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2384         }
2385
2386         /**
2387          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2388          */
2389         private void processEntry(NewKey entry) {
2390
2391                 // Update the arbitrator table with the new key information
2392                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2393
2394                 // Update what the latest live new key is
2395                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2396                 if (oldNewKey != null) {
2397                         // Delete the old new key messages
2398                         oldNewKey.setDead();
2399                 }
2400         }
2401
2402         /**
2403          * Process new table status entries and set dead the old ones as new ones come in.
2404          * keeps track of the largest and smallest table status seen in this current round
2405          * of updating the local copy of the block chain
2406          */
2407         private void processEntry(TableStatus entry, long seq) {
2408                 int newNumSlots = entry.getMaxSlots();
2409                 updateCurrMaxSize(newNumSlots);
2410
2411                 initExpectedSize(seq, newNumSlots);
2412
2413                 if (liveTableStatus != null) {
2414                         // We have a larger table status so the old table status is no longer alive
2415                         liveTableStatus.setDead();
2416                 }
2417
2418                 // Make this new table status the latest alive table status
2419                 liveTableStatus = entry;
2420         }
2421
2422         /**
2423          * Check old messages to see if there is a block chain violation. Also
2424          */
2425         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2426                 long oldSeqNum = entry.getOldSeqNum();
2427                 long newSeqNum = entry.getNewSeqNum();
2428                 boolean isequal = entry.getEqual();
2429                 long machineId = entry.getMachineID();
2430                 long seq = entry.getSequenceNumber();
2431
2432
2433                 // Check if we have messages that were supposed to be rejected in our local block chain
2434                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2435
2436                         // Get the slot
2437                         Slot slot = indexer.getSlot(seqNum);
2438
2439                         if (slot != null) {
2440                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2441
2442                                 long slotMachineId = slot.getMachineID();
2443                                 if (isequal != (slotMachineId == machineId)) {
2444                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2445                                 }
2446                         }
2447                 }
2448
2449
2450                 // Create a list of clients to watch until they see this rejected message entry.
2451                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2452                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2453
2454                         // Machine ID for the last message entry
2455                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2456
2457                         // We've seen it, don't need to continue to watch.  Our next
2458                         // message will implicitly acknowledge it.
2459                         if (lastMessageEntryMachineId == localMachineId) {
2460                                 continue;
2461                         }
2462
2463                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2464                         long entrySequenceNumber = lastMessageValue.getFirst();
2465
2466                         if (entrySequenceNumber < seq) {
2467
2468                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2469                                 addWatchList(lastMessageEntryMachineId, entry);
2470
2471                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2472                                 deviceWatchSet.add(lastMessageEntryMachineId);
2473                         }
2474                 }
2475
2476                 if (deviceWatchSet.isEmpty()) {
2477                         // This rejected message has been seen by all the clients so
2478                         entry.setDead();
2479                 } else {
2480                         // We need to watch this rejected message
2481                         entry.setWatchSet(deviceWatchSet);
2482                 }
2483         }
2484
2485         /**
2486          * Check if this abort is live, if not then save it so we can kill it later.
2487          * update the last transaction number that was arbitrated on.
2488          */
2489         private void processEntry(Abort entry) {
2490
2491
2492                 if (entry.getTransactionSequenceNumber() != -1) {
2493                         // update the transaction status if it was sent to the server
2494                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2495                         if (status != null) {
2496                                 status.setStatus(TransactionStatus.StatusAborted);
2497                         }
2498                 }
2499
2500                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2501                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2502                 if (previouslySeenAbort != null) {
2503                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2504                 }
2505
2506                 if (entry.getTransactionArbitrator() == localMachineId) {
2507                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2508                 }
2509
2510                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2511
2512                         // The machine already saw this so it is dead
2513                         entry.setDead();
2514                         liveAbortTable.remove(entry.getAbortId());
2515
2516                         if (entry.getTransactionArbitrator() == localMachineId) {
2517                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2518                         }
2519
2520                         return;
2521                 }
2522
2523
2524
2525
2526                 // Update the last arbitration data that we have seen so far
2527                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2528
2529                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2530                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2531                                 // Is larger
2532                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2533                         }
2534                 } else {
2535                         // Never seen any data from this arbitrator so record the first one
2536                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2537                 }
2538
2539
2540                 // Set dead a transaction if we can
2541                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2542                 if (transactionToSetDead != null) {
2543                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2544                 }
2545
2546                 // Update the last transaction sequence number that the arbitrator arbitrated on
2547                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2548                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2549
2550                         // Is a valid one
2551                         if (entry.getTransactionSequenceNumber() != -1) {
2552                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2553                         }
2554                 }
2555         }
2556
2557         /**
2558          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2559          */
2560         private void processEntry(TransactionPart entry) {
2561                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2562                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2563                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2564                         // This transaction is dead, it was already committed or aborted
2565                         entry.setDead();
2566                         return;
2567                 }
2568
2569                 // This part is still alive
2570                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2571
2572                 if (transactionPart == null) {
2573                         // Dont have a table for this machine Id yet so make one
2574                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2575                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2576                 }
2577
2578                 // Update the part and set dead ones we have already seen (got a rescued version)
2579                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2580                 if (previouslySeenPart != null) {
2581                         previouslySeenPart.setDead();
2582                 }
2583         }
2584
2585         /**
2586          * Process new commit entries and save them for future use.  Delete duplicates
2587          */
2588         private void processEntry(CommitPart entry) {
2589
2590
2591                 // Update the last transaction that was updated if we can
2592                 if (entry.getTransactionSequenceNumber() != -1) {
2593                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2594
2595                         // Update the last transaction sequence number that the arbitrator arbitrated on
2596                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2597                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2598                         }
2599                 }
2600
2601
2602
2603
2604                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2605
2606                 if (commitPart == null) {
2607                         // Don't have a table for this machine Id yet so make one
2608                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2609                         newCommitParts.put(entry.getMachineId(), commitPart);
2610                 }
2611
2612                 // Update the part and set dead ones we have already seen (got a rescued version)
2613                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2614                 if (previouslySeenPart != null) {
2615                         previouslySeenPart.setDead();
2616                 }
2617         }
2618
2619         /**
2620          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2621          * Updates the live aborts, removes those that are dead and sets them dead.
2622          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2623          * other clients have not had a rollback on the last message.
2624          */
2625         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2626
2627                 // We have seen this machine ID
2628                 machineSet.remove(machineId);
2629
2630                 // Get the set of rejected messages that this machine Id is has not seen yet
2631                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2632
2633                 // If there is a rejected message that this machine Id has not seen yet
2634                 if (watchset != null) {
2635
2636                         // Go through each rejected message that this machine Id has not seen yet
2637                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2638
2639                                 RejectedMessage rm = rmit.next();
2640
2641                                 // If this machine Id has seen this rejected message...
2642                                 if (rm.getSequenceNumber() <= seqNum) {
2643
2644                                         // Remove it from our watchlist
2645                                         rmit.remove();
2646
2647                                         // Decrement machines that need to see this notification
2648                                         rm.removeWatcher(machineId);
2649                                 }
2650                         }
2651                 }
2652
2653                 // Set dead the abort
2654                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2655                         Abort abort = i.next().getValue();
2656
2657                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2658                                 abort.setDead();
2659                                 i.remove();
2660
2661                                 if (abort.getTransactionArbitrator() == localMachineId) {
2662                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2663                                 }
2664                         }
2665                 }
2666
2667
2668
2669                 if (machineId == localMachineId) {
2670                         // Our own messages are immediately dead.
2671                         if (liveness instanceof LastMessage) {
2672                                 ((LastMessage)liveness).setDead();
2673                         } else if (liveness instanceof Slot) {
2674                                 ((Slot)liveness).setDead();
2675                         } else {
2676                                 throw new Error("Unrecognized type");
2677                         }
2678                 }
2679
2680                 // Get the old last message for this device
2681                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2682                 if (lastMessageEntry == null) {
2683                         // If no last message then there is nothing else to process
2684                         return;
2685                 }
2686
2687                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2688                 Liveness lastEntry = lastMessageEntry.getSecond();
2689
2690                 // If it is not our machine Id since we already set ours to dead
2691                 if (machineId != localMachineId) {
2692                         if (lastEntry instanceof LastMessage) {
2693                                 ((LastMessage)lastEntry).setDead();
2694                         } else if (lastEntry instanceof Slot) {
2695                                 ((Slot)lastEntry).setDead();
2696                         } else {
2697                                 throw new Error("Unrecognized type");
2698                         }
2699                 }
2700
2701                 // Make sure the server is not playing any games
2702                 if (machineId == localMachineId) {
2703
2704                         if (hadPartialSendToServer) {
2705                                 // We were not making any updates and we had a machine mismatch
2706                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2707                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2708                                 }
2709
2710                         } else {
2711                                 // We were not making any updates and we had a machine mismatch
2712                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2713                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2714                                 }
2715                         }
2716                 } else {
2717                         if (lastMessageSeqNum > seqNum) {
2718                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2719                         }
2720                 }
2721         }
2722
2723         /**
2724          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2725          * rejected message entry and which have not.
2726          */
2727         private void addWatchList(long machineId, RejectedMessage entry) {
2728                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2729                 if (entries == null) {
2730                         // There is no set for this machine ID yet so create one
2731                         entries = new HashSet<RejectedMessage>();
2732                         rejectedMessageWatchListTable.put(machineId, entries);
2733                 }
2734                 entries.add(entry);
2735         }
2736
2737         /**
2738          * Check if the HMAC chain is not violated
2739          */
2740         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2741                 for (int i = 0; i < newSlots.length; i++) {
2742                         Slot currSlot = newSlots[i];
2743                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2744                         if (prevSlot != null &&
2745                                         !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2746                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2747                 }
2748         }
2749 }