fdeaeb41c2c9b466742f7177f3063643261bf51f
[iotcloud.git] / version2 / src / Control-2bulbs / app / src / main / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15 import android.content.*;
16
17 import com.example.ali.control.MainActivity;
18
19 /**
20  * IoTTable data structure.  Provides client interface.
21  * @author Brian Demsky
22  * @version 1.0
23  */
24
25 final public class Table {
26
27         /* Constants */
28         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
29         static final int SKIP_THRESHOLD = 10;
30         static final double RESIZE_MULTIPLE = 1.2;
31         static final double RESIZE_THRESHOLD = 0.75;
32         static final int REJECTED_THRESHOLD = 5;
33
34         /* Helper Objects */
35         private SlotBuffer buffer = null;
36         private CloudComm cloud = null;
37         private Random random = null;
38         private TableStatus liveTableStatus = null;
39         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
40         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
41         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
42
43         /* Variables */
44         private int numberOfSlots = 0;  // Number of slots stored in buffer
45         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
46         private long liveSlotCount = 0; // Number of currently live slots
47         private long oldestLiveSlotSequenceNumver = 1;  // Smallest sequence number of the slot with a live entry
48         private long localMachineId = 0; // Machine ID of this client device
49         private long sequenceNumber = 0; // Largest sequence number a client has received
50         private long localSequenceNumber = 0;
51
52         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
53         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
54         private long localTransactionSequenceNumber = 1; // Local sequence number counter for transactions
55         private long lastTransactionSequenceNumberSpeculatedOn = 0; // the last transaction that was speculated on
56         private long oldestTransactionSequenceNumberSpeculatedOn = 0; // the oldest transaction that was speculated on
57         private long localArbitrationSequenceNumber = 1;
58         private boolean hadPartialSendToServer = false;
59         private boolean attemptedToSendToServer = false;
60         private long expectedsize;
61         private boolean didFindTableStatus = false;
62         private long currMaxSize = 0;
63
64         private Slot lastSlotAttemptedToSend = null;
65         private boolean lastIsNewKey = false;
66         private int lastNewSize = 0;
67         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
68         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
69         private NewKey lastNewKey = null;
70
71
72         /* Data Structures  */
73         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
74         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
75         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
76         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
77         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
78         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
79         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
80         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
81         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
82         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
83         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
84         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
85         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
86         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
87         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
88         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
89         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
90         private List<Transaction> pendingTransactionQueue = null;
91         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
92         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
93         private Map<Transaction, List<Integer>> transactionPartsSent = null;
94         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
95         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
96         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
97         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
98         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
99         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
100
101
102         public Table(String baseurl, String password, long _localMachineId, int listeningPort, Context context) {
103                 localMachineId = _localMachineId;
104                 cloud = new CloudComm(this, baseurl, password, listeningPort, context);
105
106                 init();
107         }
108
109         public Table(CloudComm _cloud, long _localMachineId) {
110                 localMachineId = _localMachineId;
111                 cloud = _cloud;
112
113                 init();
114         }
115
116     /**
117          * Init all the stuff needed for for table usage
118          */
119         private void init() {
120
121                 // Init helper objects
122                 random = new Random();
123                 buffer = new SlotBuffer();
124
125                 // Set Variables
126                 oldestLiveSlotSequenceNumver = 1;
127
128                 // init data structs
129                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
130                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
131                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
132                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
133                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
134                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
135                 arbitratorTable = new HashMap<IoTString, Long>();
136                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
137                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
138                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
139                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
140                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
141                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
142                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
143                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
144                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
145                 rejectedSlotList = new Vector<Long>();
146                 pendingTransactionQueue = new ArrayList<Transaction>();
147                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
148                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
149                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
150                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
151                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
152                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
153                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
154                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
155                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
156
157
158                 // Other init stuff
159                 numberOfSlots = buffer.capacity();
160                 setResizeThreshold();
161         }
162
163         // TODO: delete method
164         public synchronized void printSlots() {
165                 long o = buffer.getOldestSeqNum();
166                 long n = buffer.getNewestSeqNum();
167
168                 int[] types = new int[10];
169
170                 int num = 0;
171
172                 int livec = 0;
173                 int deadc = 0;
174
175                 int casdasd = 0;
176
177                 int liveslo = 0;
178
179                 for (long i = o; i < (n + 1); i++) {
180                         Slot s = buffer.getSlot(i);
181
182
183                         if (s.isLive()) {
184                                 liveslo++;
185                         }
186
187                         Vector<Entry> entries = s.getEntries();
188
189                         for (Entry e : entries) {
190                                 if (e.isLive()) {
191                                         int type = e.getType();
192
193
194                                         if (type == 6) {
195                                                 RejectedMessage rej = (RejectedMessage)e;
196                                                 casdasd++;
197
198                                                 System.out.println(rej.getMachineID());
199                                         }
200
201
202                                         types[type] = types[type] + 1;
203                                         num++;
204                                         livec++;
205                                 } else {
206                                         deadc++;
207                                 }
208                         }
209                 }
210
211                 for (int i = 0; i < 10; i++) {
212                         System.out.println(i + "    " + types[i]);
213                 }
214                 System.out.println("Live count:   " + livec);
215                 System.out.println("Live Slot count:   " + liveslo);
216
217                 System.out.println("Dead count:   " + deadc);
218                 System.out.println("Old:   " + o);
219                 System.out.println("New:   " + n);
220                 System.out.println("Size:   " + buffer.size());
221                 // System.out.println("Commits:   " + liveCommitsTable.size());
222                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
223                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
224
225                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
226                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
227                 }
228
229
230                 for (Long a : liveCommitsTable.keySet()) {
231                         for (Long b : liveCommitsTable.get(a).keySet()) {
232                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
233                                         System.out.print(kv + " ");
234                                 }
235                                 System.out.print("|| ");
236                         }
237                         System.out.println();
238                 }
239
240         }
241
242         /**
243          * Initialize the table by inserting a table status as the first entry into the table status
244          * also initialize the crypto stuff.
245          */
246         public synchronized void initTable() throws ServerException {
247                 cloud.initSecurity();
248
249                 // Create the first insertion into the block chain which is the table status
250                 Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
251                 localSequenceNumber++;
252                 TableStatus status = new TableStatus(s, numberOfSlots);
253                 s.addEntry(status);
254                 Slot[] array = cloud.putSlot(s, numberOfSlots);
255
256                 if (array == null) {
257                         array = new Slot[] {s};
258                         // update local block chain
259                         validateAndUpdate(array, true);
260                 } else if (array.length == 1) {
261                         // in case we did push the slot BUT we failed to init it
262                         validateAndUpdate(array, true);
263                 } else {
264                         throw new Error("Error on initialization");
265                 }
266         }
267
268         /**
269          * Rebuild the table from scratch by pulling the latest block chain from the server.
270          */
271         public synchronized void rebuild() throws ServerException {
272                 // Just pull the latest slots from the server
273                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
274                 validateAndUpdate(newslots, true);
275                 sendToServer(null);
276                 updateLiveTransactionsAndStatus();
277
278         }
279
280         // public String toString() {
281         //      String retString = " Committed Table: \n";
282         //      retString += "---------------------------\n";
283         //      retString += commitedTable.toString();
284
285         //      retString += "\n\n";
286
287         //      retString += " Speculative Table: \n";
288         //      retString += "---------------------------\n";
289         //      retString += speculativeTable.toString();
290
291         //      return retString;
292         // }
293
294         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
295                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
296         }
297
298         public synchronized Long getArbitrator(IoTString key) {
299                 return arbitratorTable.get(key);
300         }
301
302         public synchronized void close() {
303                 cloud.close();
304         }
305
306         public synchronized IoTString getCommitted(IoTString key)  {
307                 KeyValue kv = committedKeyValueTable.get(key);
308
309                 if (kv != null) {
310                         return kv.getValue();
311                 } else {
312                         return null;
313                 }
314         }
315
316         public synchronized IoTString getSpeculative(IoTString key) {
317                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
318
319                 if (kv == null) {
320                         kv = speculatedKeyValueTable.get(key);
321                 }
322
323                 if (kv == null) {
324                         kv = committedKeyValueTable.get(key);
325                 }
326
327                 if (kv != null) {
328                         return kv.getValue();
329                 } else {
330                         return null;
331                 }
332         }
333
334         public synchronized IoTString getCommittedAtomic(IoTString key) {
335                 KeyValue kv = committedKeyValueTable.get(key);
336
337                 if (arbitratorTable.get(key) == null) {
338                         throw new Error("Key not Found.");
339                 }
340
341                 // Make sure new key value pair matches the current arbitrator
342                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
343                         // TODO: Maybe not throw en error
344                         throw new Error("Not all Key Values Match Arbitrator.");
345                 }
346
347                 if (kv != null) {
348                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
349                         return kv.getValue();
350                 } else {
351                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
352                         return null;
353                 }
354         }
355
356         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
357                 if (arbitratorTable.get(key) == null) {
358                         throw new Error("Key not Found.");
359                 }
360
361                 // Make sure new key value pair matches the current arbitrator
362                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
363                         // TODO: Maybe not throw en error
364                         throw new Error("Not all Key Values Match Arbitrator.");
365                 }
366
367                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
368
369                 if (kv == null) {
370                         kv = speculatedKeyValueTable.get(key);
371                 }
372
373                 if (kv == null) {
374                         kv = committedKeyValueTable.get(key);
375                 }
376
377                 if (kv != null) {
378                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
379                         return kv.getValue();
380                 } else {
381                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
382                         return null;
383                 }
384         }
385
386         public synchronized boolean update()  {
387                 try {
388                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
389                         validateAndUpdate(newSlots, false);
390                         sendToServer(null);
391
392
393                         updateLiveTransactionsAndStatus();
394
395                         return true;
396                 } catch (Exception e) {
397                         e.printStackTrace();
398
399                         for (Long m : localCommunicationTable.keySet()) {
400                                 updateFromLocal(m);
401                         }
402                 }
403
404                 return false;
405         }
406
407         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
408                 while (true) {
409                         if (arbitratorTable.get(keyName) != null) {
410                                 // There is already an arbitrator
411                                 return false;
412                         }
413
414                         NewKey newKey = new NewKey(null, keyName, machineId);
415
416                         if (sendToServer(newKey)) {
417                                 // If successfully inserted
418                                 return true;
419                         }
420                 }
421         }
422
423         public synchronized void startTransaction() {
424                 // Create a new transaction, invalidates any old pending transactions.
425                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
426         }
427
428         public synchronized void addKV(IoTString key, IoTString value) {
429
430                 // Make sure it is a valid key
431                 if (arbitratorTable.get(key) == null) {
432                         throw new Error("Key not Found.");
433                 }
434
435                 // Make sure new key value pair matches the current arbitrator
436                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
437                         // TODO: Maybe not throw en error
438                         throw new Error("Not all Key Values Match Arbitrator.");
439                 }
440
441                 // Add the key value to this transaction
442                 KeyValue kv = new KeyValue(key, value);
443                 pendingTransactionBuilder.addKV(kv);
444         }
445
446         public synchronized TransactionStatus commitTransaction() {
447
448                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
449                         // transaction with no updates will have no effect on the system
450                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
451                 }
452
453                 // Set the local transaction sequence number and increment
454                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
455                 localTransactionSequenceNumber++;
456
457                 // Create the transaction status
458                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
459
460                 // Create the new transaction
461                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
462                 newTransaction.setTransactionStatus(transactionStatus);
463
464                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
465                         // Add it to the queue and invalidate the builder for safety
466                         pendingTransactionQueue.add(newTransaction);
467                 } else {
468                         arbitrateOnLocalTransaction(newTransaction);
469                         updateLiveStateFromLocal();
470                 }
471
472                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
473
474                 try {
475                         sendToServer(null);
476                 } catch (ServerException e) {
477
478                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
479                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
480                                 Transaction transaction = iter.next();
481
482                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
483                                         // Already contacted this client so ignore all attempts to contact this client
484                                         // to preserve ordering for arbitrator
485                                         continue;
486                                 }
487
488                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
489
490                                 if (sendReturn.getFirst()) {
491                                         // Failed to contact over local
492                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
493                                 } else {
494                                         // Successful contact or should not contact
495
496                                         if (sendReturn.getSecond()) {
497                                                 // did arbitrate
498                                                 iter.remove();
499                                         }
500                                 }
501                         }
502                 }
503
504                 updateLiveStateFromLocal();
505
506                 return transactionStatus;
507         }
508
509         /**
510          * Get the machine ID for this client
511          */
512         public long getMachineId() {
513                 return localMachineId;
514         }
515
516         /**
517          * Decrement the number of live slots that we currently have
518          */
519         public void decrementLiveCount() {
520                 liveSlotCount--;
521         }
522
523         /**
524          * Recalculate the new resize threshold
525          */
526         private void setResizeThreshold() {
527                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
528                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
529         }
530
531         public long getLocalSequenceNumber() {
532                 return localSequenceNumber;
533         }
534
535
536         boolean lastInsertedNewKey = false;
537
538         private boolean sendToServer(NewKey newKey) throws ServerException {
539
540                 boolean fromRetry = false;
541
542                 try {
543                         if (hadPartialSendToServer) {
544                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
545                                 if (newSlots.length == 0) {
546                                         fromRetry = true;
547                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
548
549                                         if (sendSlotsReturn.getFirst()) {
550                                                 if (newKey != null) {
551                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
552                                                                 newKey = null;
553                                                         }
554                                                 }
555
556                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
557                                                         transaction.resetServerFailure();
558
559                                                         // Update which transactions parts still need to be sent
560                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
561
562                                                         // Add the transaction status to the outstanding list
563                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
564
565                                                         // Update the transaction status
566                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
567
568                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
569                                                         if (transaction.didSendAllParts()) {
570                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
571                                                                 pendingTransactionQueue.remove(transaction);
572                                                         }
573                                                 }
574                                         } else {
575
576                                                 newSlots = sendSlotsReturn.getThird();
577
578                                                 boolean isInserted = false;
579                                                 for (Slot s : newSlots) {
580                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
581                                                                 isInserted = true;
582                                                                 break;
583                                                         }
584                                                 }
585
586                                                 for (Slot s : newSlots) {
587                                                         if (isInserted) {
588                                                                 break;
589                                                         }
590
591                                                         // Process each entry in the slot
592                                                         for (Entry entry : s.getEntries()) {
593
594                                                                 if (entry.getType() == Entry.TypeLastMessage) {
595                                                                         LastMessage lastMessage = (LastMessage)entry;
596                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
597                                                                                 isInserted = true;
598                                                                                 break;
599                                                                         }
600                                                                 }
601                                                         }
602                                                 }
603
604                                                 if (isInserted) {
605                                                         if (newKey != null) {
606                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
607                                                                         newKey = null;
608                                                                 }
609                                                         }
610
611                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
612                                                                 transaction.resetServerFailure();
613
614                                                                 // Update which transactions parts still need to be sent
615                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
616
617                                                                 // Add the transaction status to the outstanding list
618                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
619
620                                                                 // Update the transaction status
621                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
622
623                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
624                                                                 if (transaction.didSendAllParts()) {
625                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
626                                                                         pendingTransactionQueue.remove(transaction);
627                                                                 } else {
628                                                                         transaction.resetServerFailure();
629                                                                         // Set the transaction sequence number back to nothing
630                                                                         if (!transaction.didSendAPartToServer()) {
631                                                                                 transaction.setSequenceNumber(-1);
632                                                                         }
633                                                                 }
634                                                         }
635                                                 }
636                                         }
637
638                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
639                                                 transaction.resetServerFailure();
640                                                 // Set the transaction sequence number back to nothing
641                                                 if (!transaction.didSendAPartToServer()) {
642                                                         transaction.setSequenceNumber(-1);
643                                                 }
644                                         }
645
646                                         if (sendSlotsReturn.getThird().length != 0) {
647                                                 // insert into the local block chain
648                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
649                                         }
650                                         // continue;
651                                 } else {
652                                         boolean isInserted = false;
653                                         for (Slot s : newSlots) {
654                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
655                                                         isInserted = true;
656                                                         break;
657                                                 }
658                                         }
659
660                                         for (Slot s : newSlots) {
661                                                 if (isInserted) {
662                                                         break;
663                                                 }
664
665                                                 // Process each entry in the slot
666                                                 for (Entry entry : s.getEntries()) {
667
668                                                         if (entry.getType() == Entry.TypeLastMessage) {
669                                                                 LastMessage lastMessage = (LastMessage)entry;
670                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
671                                                                         isInserted = true;
672                                                                         break;
673                                                                 }
674                                                         }
675                                                 }
676                                         }
677
678                                         if (isInserted) {
679                                                 if (newKey != null) {
680                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
681                                                                 newKey = null;
682                                                         }
683                                                 }
684
685                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
686                                                         transaction.resetServerFailure();
687
688                                                         // Update which transactions parts still need to be sent
689                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
690
691                                                         // Add the transaction status to the outstanding list
692                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
693
694                                                         // Update the transaction status
695                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
696
697                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
698                                                         if (transaction.didSendAllParts()) {
699                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
700                                                                 pendingTransactionQueue.remove(transaction);
701                                                         } else {
702                                                                 transaction.resetServerFailure();
703                                                                 // Set the transaction sequence number back to nothing
704                                                                 if (!transaction.didSendAPartToServer()) {
705                                                                         transaction.setSequenceNumber(-1);
706                                                                 }
707                                                         }
708                                                 }
709                                         } else {
710                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
711                                                         transaction.resetServerFailure();
712                                                         // Set the transaction sequence number back to nothing
713                                                         if (!transaction.didSendAPartToServer()) {
714                                                                 transaction.setSequenceNumber(-1);
715                                                         }
716                                                 }
717                                         }
718
719                                         // insert into the local block chain
720                                         validateAndUpdate(newSlots, true);
721                                 }
722                         }
723                 } catch (ServerException e) {
724                         throw e;
725                 }
726
727
728
729                 try {
730                         // While we have stuff that needs inserting into the block chain
731                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
732
733                                 fromRetry = false;
734
735                                 if (hadPartialSendToServer) {
736                                         throw new Error("Should Be error free");
737                                 }
738
739
740
741                                 // If there is a new key with same name then end
742                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
743                                         return false;
744                                 }
745
746                                 // Create the slot
747                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
748                                 localSequenceNumber++;
749
750                                 // Try to fill the slot with data
751                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
752                                 boolean needsResize = fillSlotsReturn.getFirst();
753                                 int newSize = fillSlotsReturn.getSecond();
754                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
755
756                                 if (needsResize) {
757                                         // Reset which transaction to send
758                                         for (Transaction transaction : transactionPartsSent.keySet()) {
759                                                 transaction.resetNextPartToSend();
760
761                                                 // Set the transaction sequence number back to nothing
762                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
763                                                         transaction.setSequenceNumber(-1);
764                                                 }
765                                         }
766
767                                         // Clear the sent data since we are trying again
768                                         pendingSendArbitrationEntriesToDelete.clear();
769                                         transactionPartsSent.clear();
770
771                                         // We needed a resize so try again
772                                         fillSlot(slot, true, newKey);
773                                 }
774
775                                 lastSlotAttemptedToSend = slot;
776                                 lastIsNewKey = (newKey != null);
777                                 lastInsertedNewKey = insertedNewKey;
778                                 lastNewSize = newSize;
779                                 lastNewKey = newKey;
780                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
781                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
782
783
784                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
785
786                                 if (sendSlotsReturn.getFirst()) {
787
788                                         // Did insert into the block chain
789
790                                         if (insertedNewKey) {
791                                                 // This slot was what was inserted not a previous slot
792
793                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
794                                                 newKey = null;
795                                         }
796
797                                         // Remove the aborts and commit parts that were sent from the pending to send queue
798                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
799                                                 ArbitrationRound round = iter.next();
800                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
801
802                                                 if (round.isDoneSending()) {
803                                                         // Sent all the parts
804                                                         iter.remove();
805                                                 }
806                                         }
807
808                                         for (Transaction transaction : transactionPartsSent.keySet()) {
809                                                 transaction.resetServerFailure();
810
811                                                 // Update which transactions parts still need to be sent
812                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
813
814                                                 // Add the transaction status to the outstanding list
815                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
816
817                                                 // Update the transaction status
818                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
819
820                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
821                                                 if (transaction.didSendAllParts()) {
822                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
823                                                         pendingTransactionQueue.remove(transaction);
824                                                 }
825                                         }
826                                 } else {
827
828                                         // if (!sendSlotsReturn.getSecond()) {
829                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
830                                         //              transaction.resetServerFailure();
831                                         //      }
832                                         // } else {
833                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
834                                         //              transaction.resetServerFailure();
835
836                                         //              // Update which transactions parts still need to be sent
837                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
838
839                                         //              // Add the transaction status to the outstanding list
840                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
841
842                                         //              // Update the transaction status
843                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
844
845                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
846                                         //              if (transaction.didSendAllParts()) {
847                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
848                                         //                      pendingTransactionQueue.remove(transaction);
849
850                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
851                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
852                                         //                      }
853                                         //              }
854                                         //      }
855                                         // }
856
857                                         // Reset which transaction to send
858                                         for (Transaction transaction : transactionPartsSent.keySet()) {
859                                                 transaction.resetNextPartToSend();
860                                                 // transaction.resetNextPartToSend();
861
862                                                 // Set the transaction sequence number back to nothing
863                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
864                                                         transaction.setSequenceNumber(-1);
865                                                 }
866                                         }
867                                 }
868
869                                 // Clear the sent data in preparation for next send
870                                 pendingSendArbitrationEntriesToDelete.clear();
871                                 transactionPartsSent.clear();
872
873                                 if (sendSlotsReturn.getThird().length != 0) {
874                                         // insert into the local block chain
875                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
876                                 }
877                         }
878
879                 } catch (ServerException e) {
880
881                         if (e.getType() != ServerException.TypeInputTimeout) {
882                                 // e.printStackTrace();
883
884                                 // Nothing was able to be sent to the server so just clear these data structures
885                                 for (Transaction transaction : transactionPartsSent.keySet()) {
886                                         transaction.resetNextPartToSend();
887
888                                         // Set the transaction sequence number back to nothing
889                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
890                                                 transaction.setSequenceNumber(-1);
891                                         }
892                                 }
893                         } else {
894                                 // There was a partial send to the server
895                                 hadPartialSendToServer = true;
896
897
898                                 // if (!fromRetry) {
899                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
900                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
901                                 // }
902
903                                 // Nothing was able to be sent to the server so just clear these data structures
904                                 for (Transaction transaction : transactionPartsSent.keySet()) {
905                                         transaction.resetNextPartToSend();
906                                         transaction.setServerFailure();
907                                 }
908                         }
909
910                         pendingSendArbitrationEntriesToDelete.clear();
911                         transactionPartsSent.clear();
912
913                         throw e;
914                 }
915
916                 return newKey == null;
917         }
918
919         private synchronized boolean updateFromLocal(long machineId) {
920                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
921                 if (localCommunicationInformation == null) {
922                         // Cant talk to that device locally so do nothing
923                         return false;
924                 }
925
926                 // Get the size of the send data
927                 //int sendDataSize = Integer.BYTES + Long.BYTES;
928                 int sendDataSize = Integer.SIZE/8 + Long.SIZE/8;
929
930                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
931                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
932                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
933                 }
934
935                 byte[] sendData = new byte[sendDataSize];
936                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
937
938                 // Encode the data
939                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
940                 bbEncode.putInt(0);
941
942                 // Send by local
943                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
944                 localSequenceNumber++;
945
946                 if (returnData == null) {
947                         // Could not contact server
948                         return false;
949                 }
950
951                 // Decode the data
952                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
953                 int numberOfEntries = bbDecode.getInt();
954
955                 for (int i = 0; i < numberOfEntries; i++) {
956                         byte type = bbDecode.get();
957                         if (type == Entry.TypeAbort) {
958                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
959                                 processEntry(abort);
960                         } else if (type == Entry.TypeCommitPart) {
961                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
962                                 processEntry(commitPart);
963                         }
964                 }
965
966                 updateLiveStateFromLocal();
967
968                 return true;
969         }
970
971         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
972
973                 // Get the devices local communications
974                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
975
976                 if (localCommunicationInformation == null) {
977                         // Cant talk to that device locally so do nothing
978                         return new Pair<Boolean, Boolean>(true, false);
979                 }
980
981                 // Get the size of the send data
982                 //int sendDataSize = Integer.BYTES + Long.BYTES;
983                 int sendDataSize = Integer.SIZE/8 + Long.SIZE/8;
984                 for (TransactionPart part : transaction.getParts().values()) {
985                         sendDataSize += part.getSize();
986                 }
987
988                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
989                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
990                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
991                 }
992
993                 // Make the send data size
994                 byte[] sendData = new byte[sendDataSize];
995                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
996
997                 // Encode the data
998                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
999                 bbEncode.putInt(transaction.getParts().size());
1000                 for (TransactionPart part : transaction.getParts().values()) {
1001                         part.encode(bbEncode);
1002                 }
1003
1004
1005                 // Send by local
1006                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
1007                 localSequenceNumber++;
1008
1009                 if (returnData == null) {
1010                         // Could not contact server
1011                         return new Pair<Boolean, Boolean>(true, false);
1012                 }
1013
1014                 // Decode the data
1015                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
1016                 boolean didCommit = bbDecode.get() == 1;
1017                 boolean couldArbitrate = bbDecode.get() == 1;
1018                 int numberOfEntries = bbDecode.getInt();
1019                 boolean foundAbort = false;
1020
1021                 for (int i = 0; i < numberOfEntries; i++) {
1022                         byte type = bbDecode.get();
1023                         if (type == Entry.TypeAbort) {
1024                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
1025
1026                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1027                                         foundAbort = true;
1028                                 }
1029
1030                                 processEntry(abort);
1031                         } else if (type == Entry.TypeCommitPart) {
1032                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
1033                                 processEntry(commitPart);
1034                         }
1035                 }
1036
1037                 updateLiveStateFromLocal();
1038
1039                 if (couldArbitrate) {
1040                         TransactionStatus status =  transaction.getTransactionStatus();
1041                         if (didCommit) {
1042                                 status.setStatus(TransactionStatus.StatusCommitted);
1043                         } else {
1044                                 status.setStatus(TransactionStatus.StatusAborted);
1045                         }
1046                 } else {
1047                         TransactionStatus status =  transaction.getTransactionStatus();
1048                         if (foundAbort) {
1049                                 status.setStatus(TransactionStatus.StatusAborted);
1050                         } else {
1051                                 status.setStatus(TransactionStatus.StatusCommitted);
1052                         }
1053                 }
1054
1055                 return new Pair<Boolean, Boolean>(false, true);
1056         }
1057
1058         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1059
1060                 // Decode the data
1061                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1062                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1063                 int numberOfParts = bbDecode.getInt();
1064
1065                 // If we did commit a transaction or not
1066                 boolean didCommit = false;
1067                 boolean couldArbitrate = false;
1068
1069                 if (numberOfParts != 0) {
1070
1071                         // decode the transaction
1072                         Transaction transaction = new Transaction();
1073                         for (int i = 0; i < numberOfParts; i++) {
1074                                 bbDecode.get();
1075                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1076                                 transaction.addPartDecode(newPart);
1077                         }
1078
1079                         // Arbitrate on transaction and pull relevant return data
1080                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1081                         couldArbitrate = localArbitrateReturn.getFirst();
1082                         didCommit = localArbitrateReturn.getSecond();
1083
1084                         updateLiveStateFromLocal();
1085
1086                         // Transaction was sent to the server so keep track of it to prevent double commit
1087                         if (transaction.getSequenceNumber() != -1) {
1088                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1089                         }
1090                 }
1091
1092                 // The data to send back
1093                 int returnDataSize = 0;
1094                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1095
1096                 // Get the aborts to send back
1097                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1098                 Collections.sort(abortLocalSequenceNumbers);
1099                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1100                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1101                                 continue;
1102                         }
1103
1104                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1105                         unseenArbitrations.add(abort);
1106                         returnDataSize += abort.getSize();
1107                 }
1108
1109                 // Get the commits to send back
1110                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1111                 if (commitForClientTable != null) {
1112                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1113                         Collections.sort(commitLocalSequenceNumbers);
1114
1115                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1116                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1117
1118                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1119                                         continue;
1120                                 }
1121
1122                                 unseenArbitrations.addAll(commit.getParts().values());
1123
1124                                 for (CommitPart commitPart : commit.getParts().values()) {
1125                                         returnDataSize += commitPart.getSize();
1126                                 }
1127                         }
1128                 }
1129
1130                 // Number of arbitration entries to decode
1131                 //returnDataSize += 2 * Integer.BYTES;
1132                 returnDataSize += 2 * Integer.SIZE/8;
1133
1134                 // Boolean of did commit or not
1135                 if (numberOfParts != 0) {
1136                         //returnDataSize += Byte.BYTES;
1137                         returnDataSize += Byte.SIZE/8;
1138                 }
1139
1140                 // Data to send Back
1141                 byte[] returnData = new byte[returnDataSize];
1142                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1143
1144                 if (numberOfParts != 0) {
1145                         if (didCommit) {
1146                                 bbEncode.put((byte)1);
1147                         } else {
1148                                 bbEncode.put((byte)0);
1149                         }
1150                         if (couldArbitrate) {
1151                                 bbEncode.put((byte)1);
1152                         } else {
1153                                 bbEncode.put((byte)0);
1154                         }
1155                 }
1156
1157                 bbEncode.putInt(unseenArbitrations.size());
1158                 for (Entry entry : unseenArbitrations) {
1159                         entry.encode(bbEncode);
1160                 }
1161
1162
1163                 localSequenceNumber++;
1164                 return returnData;
1165         }
1166
1167         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1168
1169                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1170                 attemptedToSendToServer = true;
1171
1172                 boolean inserted = false;
1173                 boolean lastTryInserted = false;
1174
1175                 Slot[] array = cloud.putSlot(slot, newSize);
1176                 if (array == null) {
1177                         array = new Slot[] {slot};
1178                         rejectedSlotList.clear();
1179                         inserted = true;
1180                 }       else {
1181                         if (array.length == 0) {
1182                                 throw new Error("Server Error: Did not send any slots");
1183                         }
1184
1185                         // if (attemptedToSendToServerTmp) {
1186                         if (hadPartialSendToServer) {
1187
1188                                 boolean isInserted = false;
1189                                 for (Slot s : array) {
1190                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1191                                                 isInserted = true;
1192                                                 break;
1193                                         }
1194                                 }
1195
1196                                 for (Slot s : array) {
1197                                         if (isInserted) {
1198                                                 break;
1199                                         }
1200
1201                                         // Process each entry in the slot
1202                                         for (Entry entry : s.getEntries()) {
1203
1204                                                 if (entry.getType() == Entry.TypeLastMessage) {
1205                                                         LastMessage lastMessage = (LastMessage)entry;
1206
1207                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1208                                                                 isInserted = true;
1209                                                                 break;
1210                                                         }
1211                                                 }
1212                                         }
1213                                 }
1214
1215                                 if (!isInserted) {
1216                                         rejectedSlotList.add(slot.getSequenceNumber());
1217                                         lastTryInserted = false;
1218                                 } else {
1219                                         lastTryInserted = true;
1220                                 }
1221                         } else {
1222                                 rejectedSlotList.add(slot.getSequenceNumber());
1223                                 lastTryInserted = false;
1224                         }
1225                 }
1226
1227                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1228         }
1229
1230         /**
1231          * Returns false if a resize was needed
1232          */
1233         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1234
1235
1236                 int newSize = 0;
1237                 if (liveSlotCount > bufferResizeThreshold) {
1238                         resize = true; //Resize is forced
1239
1240                 }
1241
1242                 if (resize) {
1243                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1244                         TableStatus status = new TableStatus(slot, newSize);
1245                         slot.addEntry(status);
1246                 }
1247
1248                 // Fill with rejected slots first before doing anything else
1249                 doRejectedMessages(slot);
1250
1251                 // Do mandatory rescue of entries
1252                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1253
1254                 // Extract working variables
1255                 boolean needsResize = mandatoryRescueReturn.getFirst();
1256                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1257                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1258
1259                 if (needsResize && !resize) {
1260                         // We need to resize but we are not resizing so return false
1261                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1262                 }
1263
1264                 boolean inserted = false;
1265                 if (newKeyEntry != null) {
1266                         newKeyEntry.setSlot(slot);
1267                         if (slot.hasSpace(newKeyEntry)) {
1268
1269                                 slot.addEntry(newKeyEntry);
1270                                 inserted = true;
1271                         }
1272                 }
1273
1274                 // Clear the transactions, aborts and commits that were sent previously
1275                 transactionPartsSent.clear();
1276                 pendingSendArbitrationEntriesToDelete.clear();
1277
1278                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1279                         boolean isFull = false;
1280                         round.generateParts();
1281                         List<Entry> parts = round.getParts();
1282
1283                         // Insert pending arbitration data
1284                         for (Entry arbitrationData : parts) {
1285
1286                                 // If it is an abort then we need to set some information
1287                                 if (arbitrationData instanceof Abort) {
1288                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1289                                 }
1290
1291                                 if (!slot.hasSpace(arbitrationData)) {
1292                                         // No space so cant do anything else with these data entries
1293                                         isFull = true;
1294                                         break;
1295                                 }
1296
1297                                 // Add to this current slot and add it to entries to delete
1298                                 slot.addEntry(arbitrationData);
1299                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1300                         }
1301
1302                         if (isFull) {
1303                                 break;
1304                         }
1305                 }
1306
1307                 if (pendingTransactionQueue.size() > 0) {
1308
1309                         Transaction transaction = pendingTransactionQueue.get(0);
1310
1311                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1312                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1313                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1314                         // }
1315
1316                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1317                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1318                         }
1319
1320
1321                         while (true) {
1322                                 TransactionPart part = transaction.getNextPartToSend();
1323
1324                                 if (part == null) {
1325                                         // Ran out of parts to send for this transaction so move on
1326                                         break;
1327                                 }
1328
1329                                 if (slot.hasSpace(part)) {
1330                                         slot.addEntry(part);
1331                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1332                                         if (partsSent == null) {
1333                                                 partsSent = new ArrayList<Integer>();
1334                                                 transactionPartsSent.put(transaction, partsSent);
1335                                         }
1336                                         partsSent.add(part.getPartNumber());
1337                                         transactionPartsSent.put(transaction, partsSent);
1338                                 } else {
1339                                         break;
1340                                 }
1341                         }
1342                 }
1343
1344                 // Fill the remainder of the slot with rescue data
1345                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1346
1347                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1348         }
1349
1350         private void doRejectedMessages(Slot s) {
1351                 if (! rejectedSlotList.isEmpty()) {
1352                         /* TODO: We should avoid generating a rejected message entry if
1353                          * there is already a sufficient entry in the queue (e.g.,
1354                          * equalsto value of true and same sequence number).  */
1355
1356                         long old_seqn = rejectedSlotList.firstElement();
1357                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1358                                 long new_seqn = rejectedSlotList.lastElement();
1359                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1360                                 s.addEntry(rm);
1361                         } else {
1362                                 long prev_seqn = -1;
1363                                 int i = 0;
1364                                 /* Go through list of missing messages */
1365                                 for (; i < rejectedSlotList.size(); i++) {
1366                                         long curr_seqn = rejectedSlotList.get(i);
1367                                         Slot s_msg = buffer.getSlot(curr_seqn);
1368                                         if (s_msg != null)
1369                                                 break;
1370                                         prev_seqn = curr_seqn;
1371                                 }
1372                                 /* Generate rejected message entry for missing messages */
1373                                 if (prev_seqn != -1) {
1374                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1375                                         s.addEntry(rm);
1376                                 }
1377                                 /* Generate rejected message entries for present messages */
1378                                 for (; i < rejectedSlotList.size(); i++) {
1379                                         long curr_seqn = rejectedSlotList.get(i);
1380                                         Slot s_msg = buffer.getSlot(curr_seqn);
1381                                         long machineid = s_msg.getMachineID();
1382                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1383                                         s.addEntry(rm);
1384                                 }
1385                         }
1386                 }
1387         }
1388
1389         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1390                 long newestSequenceNumber = buffer.getNewestSeqNum();
1391                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1392                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1393                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1394                 }
1395
1396                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1397                 boolean seenLiveSlot = false;
1398                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1399                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1400
1401
1402                 // Mandatory Rescue
1403                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1404                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1405                         // Push slot number forward
1406                         if (! seenLiveSlot) {
1407                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1408                         }
1409                         
1410                         if (previousSlot == null || !previousSlot.isLive()) {
1411                                 continue;
1412                         }
1413
1414                         // We have seen a live slot
1415                         seenLiveSlot = true;
1416
1417                         // Get all the live entries for a slot
1418                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1419
1420                         // Iterate over all the live entries and try to rescue them
1421                         for (Entry liveEntry : liveEntries) {
1422                                 if (slot.hasSpace(liveEntry)) {
1423
1424                                         // Enough space to rescue the entry
1425                                         slot.addEntry(liveEntry);
1426                                 } else if (currentSequenceNumber == firstIfFull) {
1427                                         //if there's no space but the entry is about to fall off the queue
1428                                         System.out.println("B"); //?
1429                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1430
1431                                 }
1432                         }
1433                 }
1434
1435                 // Did not resize
1436                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1437         }
1438
1439         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1440                 /* now go through live entries from least to greatest sequence number until
1441                  * either all live slots added, or the slot doesn't have enough room
1442                  * for SKIP_THRESHOLD consecutive entries*/
1443                 int skipcount = 0;
1444                 long newestseqnum = buffer.getNewestSeqNum();
1445                 search:
1446                 for (; seqn <= newestseqnum; seqn++) {
1447                         Slot prevslot = buffer.getSlot(seqn);
1448                         //Push slot number forward
1449                         if (!seenliveslot)
1450                                 oldestLiveSlotSequenceNumver = seqn;
1451
1452                         if (!prevslot.isLive())
1453                                 continue;
1454                         seenliveslot = true;
1455                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1456                         for (Entry liveentry : liveentries) {
1457                                 if (s.hasSpace(liveentry))
1458                                         s.addEntry(liveentry);
1459                                 else {
1460                                         skipcount++;
1461                                         if (skipcount > SKIP_THRESHOLD)
1462                                                 break search;
1463                                 }
1464                         }
1465                 }
1466         }
1467
1468         /**
1469          * Checks for malicious activity and updates the local copy of the block chain.
1470          */
1471         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1472
1473                 // The cloud communication layer has checked slot HMACs already before decoding
1474                 if (newSlots.length == 0) {
1475                         return;
1476                 }
1477
1478                 // Make sure all slots are newer than the last largest slot this client has seen
1479                 long firstSeqNum = newSlots[0].getSequenceNumber();
1480                 if (firstSeqNum <= sequenceNumber) {
1481                         throw new Error("Server Error: Sent older slots!");
1482                 }
1483
1484                 // Create an object that can access both new slots and slots in our local chain
1485                 // without committing slots to our local chain
1486                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1487
1488                 // Check that the HMAC chain is not broken
1489                 checkHMACChain(indexer, newSlots);
1490
1491                 // Set to keep track of messages from clients
1492                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1493
1494                 // Process each slots data
1495                 for (Slot slot : newSlots) {
1496                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1497
1498                         updateExpectedSize();
1499                 }
1500
1501                 // If there is a gap, check to see if the server sent us everything.
1502                 if (firstSeqNum != (sequenceNumber + 1)) {
1503
1504                         // Check the size of the slots that were sent down by the server.
1505                         // Can only check the size if there was a gap
1506                         checkNumSlots(newSlots.length);
1507
1508                         // Since there was a gap every machine must have pushed a slot or must have
1509                         // a last message message.  If not then the server is hiding slots
1510                         if (!machineSet.isEmpty()) {
1511                                 throw new Error("Missing record for machines: " + machineSet);
1512                         }
1513                 }
1514
1515                 // Update the size of our local block chain.
1516                 commitNewMaxSize();
1517
1518                 // Commit new to slots to the local block chain.
1519                 for (Slot slot : newSlots) {
1520
1521                         // Insert this slot into our local block chain copy.
1522                         buffer.putSlot(slot);
1523
1524                         // Keep track of how many slots are currently live (have live data in them).
1525                         liveSlotCount++;
1526                 }
1527
1528                 // Get the sequence number of the latest slot in the system
1529                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1530
1531                 updateLiveStateFromServer();
1532
1533                 // No Need to remember after we pulled from the server
1534                 offlineTransactionsCommittedAndAtServer.clear();
1535
1536                 // This is invalidated now
1537                 hadPartialSendToServer = false;
1538         }
1539
1540         private void updateLiveStateFromServer() {
1541                 // Process the new transaction parts
1542                 processNewTransactionParts();
1543
1544                 // Do arbitration on new transactions that were received
1545                 arbitrateFromServer();
1546
1547                 // Update all the committed keys
1548                 boolean didCommitOrSpeculate = updateCommittedTable();
1549
1550                 // Delete the transactions that are now dead
1551                 updateLiveTransactionsAndStatus();
1552
1553                 // Do speculations
1554                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1555                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1556         }
1557
1558         private void updateLiveStateFromLocal() {
1559                 // Update all the committed keys
1560                 boolean didCommitOrSpeculate = updateCommittedTable();
1561
1562                 // Delete the transactions that are now dead
1563                 updateLiveTransactionsAndStatus();
1564
1565                 // Do speculations
1566                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1567                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1568         }
1569
1570         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1571                 // if (didFindTableStatus) {
1572                 // return;
1573                 // }
1574                 long prevslots = firstSequenceNumber;
1575
1576
1577                 if (didFindTableStatus) {
1578                         // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1579                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1580
1581                 } else {
1582                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1583                         // System.out.println("Here: " + expectedsize);
1584                 }
1585
1586                 // System.out.println(numberOfSlots);
1587
1588                 didFindTableStatus = true;
1589                 currMaxSize = numberOfSlots;
1590         }
1591
1592         private void updateExpectedSize() {
1593                 expectedsize++;
1594
1595                 if (expectedsize > currMaxSize) {
1596                         expectedsize = currMaxSize;
1597                 }
1598         }
1599
1600
1601         /**
1602          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1603          * This is only called when we have a gap between the slots that we have locally and the slots
1604          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1605          * status message
1606          */
1607         private void checkNumSlots(int numberOfSlots) {
1608                 if (numberOfSlots != expectedsize) {
1609                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1610                 }
1611         }
1612
1613         private void updateCurrMaxSize(int newmaxsize) {
1614                 currMaxSize = newmaxsize;
1615         }
1616
1617
1618         /**
1619          * Update the size of of the local buffer if it is needed.
1620          */
1621         private void commitNewMaxSize() {
1622                 didFindTableStatus = false;
1623
1624                 // Resize the local slot buffer
1625                 if (numberOfSlots != currMaxSize) {
1626                         buffer.resize((int)currMaxSize);
1627                 }
1628
1629                 // Change the number of local slots to the new size
1630                 numberOfSlots = (int)currMaxSize;
1631
1632
1633                 // Recalculate the resize threshold since the size of the local buffer has changed
1634                 setResizeThreshold();
1635         }
1636
1637         /**
1638          * Process the new transaction parts from this latest round of slots received from the server
1639          */
1640         private void processNewTransactionParts() {
1641
1642                 if (newTransactionParts.size() == 0) {
1643                         // Nothing new to process
1644                         return;
1645                 }
1646
1647                 // Iterate through all the machine Ids that we received new parts for
1648                 for (Long machineId : newTransactionParts.keySet()) {
1649                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1650
1651                         // Iterate through all the parts for that machine Id
1652                         for (Pair<Long, Integer> partId : parts.keySet()) {
1653                                 TransactionPart part = parts.get(partId);
1654
1655                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1656                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1657                                         // Set dead the transaction part
1658                                         part.setDead();
1659                                         continue;
1660                                 }
1661
1662                                 // Get the transaction object for that sequence number
1663                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1664
1665                                 if (transaction == null) {
1666                                         // This is a new transaction that we dont have so make a new one
1667                                         transaction = new Transaction();
1668
1669                                         // Insert this new transaction into the live tables
1670                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1671                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1672                                 }
1673
1674                                 // Add that part to the transaction
1675                                 transaction.addPartDecode(part);
1676                         }
1677                 }
1678
1679                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1680                 newTransactionParts.clear();
1681         }
1682
1683
1684         private long lastSeqNumArbOn = 0;
1685
1686         private void arbitrateFromServer() {
1687
1688                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1689                         // Nothing to arbitrate on so move on
1690                         return;
1691                 }
1692
1693                 // Get the transaction sequence numbers and sort from oldest to newest
1694                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1695                 Collections.sort(transactionSequenceNumbers);
1696
1697                 // Collection of key value pairs that are
1698                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1699
1700                 // The last transaction arbitrated on
1701                 long lastTransactionCommitted = -1;
1702                 Set<Abort> generatedAborts = new HashSet<Abort>();
1703
1704                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1705                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1706
1707
1708
1709                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1710                         if (transaction.getArbitrator() != localMachineId) {
1711                                 continue;
1712                         }
1713
1714                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1715                                 continue;
1716                         }
1717
1718                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1719                                 // We have seen this already locally so dont commit again
1720                                 continue;
1721                         }
1722
1723
1724                         if (!transaction.isComplete()) {
1725                                 // Will arbitrate in incorrect order if we continue so just break
1726                                 // Most likely this
1727                                 break;
1728                         }
1729
1730
1731                         // update the largest transaction seen by arbitrator from server
1732                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1733                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1734                         } else {
1735                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1736                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1737                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1738                                 }
1739                         }
1740
1741                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1742                                 // Guard evaluated as true
1743
1744                                 // Update the local changes so we can make the commit
1745                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1746                                         speculativeTableTmp.put(kv.getKey(), kv);
1747                                 }
1748
1749                                 // Update what the last transaction committed was for use in batch commit
1750                                 lastTransactionCommitted = transactionSequenceNumber;
1751                         } else {
1752                                 // Guard evaluated was false so create abort
1753
1754                                 // Create the abort
1755                                 Abort newAbort = new Abort(null,
1756                                                 transaction.getClientLocalSequenceNumber(),
1757                                                 transaction.getSequenceNumber(),
1758                                                 transaction.getMachineId(),
1759                                                 transaction.getArbitrator(),
1760                                                 localArbitrationSequenceNumber);
1761                                 localArbitrationSequenceNumber++;
1762
1763                                 generatedAborts.add(newAbort);
1764
1765                                 // Insert the abort so we can process
1766                                 processEntry(newAbort);
1767                         }
1768
1769                         lastSeqNumArbOn = transactionSequenceNumber;
1770
1771                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1772                 }
1773
1774                 Commit newCommit = null;
1775
1776                 // If there is something to commit
1777                 if (speculativeTableTmp.size() != 0) {
1778
1779                         // Create the commit and increment the commit sequence number
1780                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1781                         localArbitrationSequenceNumber++;
1782
1783                         // Add all the new keys to the commit
1784                         for (KeyValue kv : speculativeTableTmp.values()) {
1785                                 newCommit.addKV(kv);
1786                         }
1787
1788                         // create the commit parts
1789                         newCommit.createCommitParts();
1790
1791                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1792
1793                         // Insert the commit so we can process it
1794                         for (CommitPart commitPart : newCommit.getParts().values()) {
1795                                 processEntry(commitPart);
1796                         }
1797                 }
1798
1799                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1800                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1801                         pendingSendArbitrationRounds.add(arbitrationRound);
1802
1803                         if (compactArbitrationData()) {
1804                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1805                                 if (newArbitrationRound.getCommit() != null) {
1806                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1807                                                 processEntry(commitPart);
1808                                         }
1809                                 }
1810                         }
1811                 }
1812         }
1813
1814         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1815
1816                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1817                 if (transaction.getArbitrator() != localMachineId) {
1818                         return new Pair<Boolean, Boolean>(false, false);
1819                 }
1820
1821                 if (!transaction.isComplete()) {
1822                         // Will arbitrate in incorrect order if we continue so just break
1823                         // Most likely this
1824                         return new Pair<Boolean, Boolean>(false, false);
1825                 }
1826
1827                 if (transaction.getMachineId() != localMachineId) {
1828                         // dont do this check for local transactions
1829                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1830                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1831                                         // We've have already seen this from the server
1832                                         return new Pair<Boolean, Boolean>(false, false);
1833                                 }
1834                         }
1835                 }
1836
1837                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1838                         // Guard evaluated as true
1839
1840                         // Create the commit and increment the commit sequence number
1841                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1842                         localArbitrationSequenceNumber++;
1843
1844                         // Update the local changes so we can make the commit
1845                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1846                                 newCommit.addKV(kv);
1847                         }
1848
1849                         // create the commit parts
1850                         newCommit.createCommitParts();
1851
1852                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1853                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1854                         pendingSendArbitrationRounds.add(arbitrationRound);
1855
1856                         if (compactArbitrationData()) {
1857                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1858                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1859                                         processEntry(commitPart);
1860                                 }
1861                         } else {
1862                                 // Insert the commit so we can process it
1863                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1864                                         processEntry(commitPart);
1865                                 }
1866                         }
1867
1868                         if (transaction.getMachineId() == localMachineId) {
1869                                 TransactionStatus status = transaction.getTransactionStatus();
1870                                 if (status != null) {
1871                                         status.setStatus(TransactionStatus.StatusCommitted);
1872                                 }
1873                         }
1874
1875                         updateLiveStateFromLocal();
1876                         return new Pair<Boolean, Boolean>(true, true);
1877                 } else {
1878
1879                         if (transaction.getMachineId() == localMachineId) {
1880                                 // For locally created messages update the status
1881
1882                                 // Guard evaluated was false so create abort
1883                                 TransactionStatus status = transaction.getTransactionStatus();
1884                                 if (status != null) {
1885                                         status.setStatus(TransactionStatus.StatusAborted);
1886                                 }
1887                         } else {
1888                                 Set addAbortSet = new HashSet<Abort>();
1889
1890
1891                                 // Create the abort
1892                                 Abort newAbort = new Abort(null,
1893                                                 transaction.getClientLocalSequenceNumber(),
1894                                                 -1,
1895                                                 transaction.getMachineId(),
1896                                                 transaction.getArbitrator(),
1897                                                 localArbitrationSequenceNumber);
1898                                 localArbitrationSequenceNumber++;
1899
1900                                 addAbortSet.add(newAbort);
1901
1902
1903                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1904                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1905                                 pendingSendArbitrationRounds.add(arbitrationRound);
1906
1907                                 if (compactArbitrationData()) {
1908                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1909                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1910                                                 processEntry(commitPart);
1911                                         }
1912                                 }
1913                         }
1914
1915                         updateLiveStateFromLocal();
1916                         return new Pair<Boolean, Boolean>(true, false);
1917                 }
1918         }
1919
1920         /**
1921          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1922          */
1923         private boolean compactArbitrationData() {
1924
1925                 if (pendingSendArbitrationRounds.size() < 2) {
1926                         // Nothing to compact so do nothing
1927                         return false;
1928                 }
1929
1930                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1931                 if (lastRound.didSendPart()) {
1932                         return false;
1933                 }
1934
1935                 boolean hadCommit = (lastRound.getCommit() == null);
1936                 boolean gotNewCommit = false;
1937
1938                 int numberToDelete = 1;
1939                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1940                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1941
1942                         if (round.isFull() || round.didSendPart()) {
1943                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1944                                 break;
1945                         }
1946
1947                         if (round.getCommit() == null) {
1948
1949                                 // Try compacting aborts only
1950                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1951                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1952                                         // Cant compact since it would be too large
1953                                         break;
1954                                 }
1955                                 lastRound.addAborts(round.getAborts());
1956                         } else {
1957
1958                                 // Create a new larger commit
1959                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1960                                 localArbitrationSequenceNumber++;
1961
1962                                 // Create the commit parts so that we can count them
1963                                 newCommit.createCommitParts();
1964
1965                                 // Calculate the new size of the parts
1966                                 int newSize = newCommit.getNumberOfParts();
1967                                 newSize += lastRound.getAbortsCount();
1968                                 newSize += round.getAbortsCount();
1969
1970                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1971                                         // Cant compact since it would be too large
1972                                         break;
1973                                 }
1974
1975                                 // Set the new compacted part
1976                                 lastRound.setCommit(newCommit);
1977                                 lastRound.addAborts(round.getAborts());
1978                                 gotNewCommit = true;
1979                         }
1980
1981                         numberToDelete++;
1982                 }
1983
1984                 if (numberToDelete != 1) {
1985                         // If there is a compaction
1986
1987                         // Delete the previous pieces that are now in the new compacted piece
1988                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1989                                 pendingSendArbitrationRounds.clear();
1990                         } else {
1991                                 for (int i = 0; i < numberToDelete; i++) {
1992                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1993                                 }
1994                         }
1995
1996                         // Add the new compacted into the pending to send list
1997                         pendingSendArbitrationRounds.add(lastRound);
1998
1999                         // Should reinsert into the commit processor
2000                         if (hadCommit && gotNewCommit) {
2001                                 return true;
2002                         }
2003                 }
2004
2005                 return false;
2006         }
2007         // private boolean compactArbitrationData() {
2008         //      return false;
2009         // }
2010
2011         /**
2012          * Update all the commits and the committed tables, sets dead the dead transactions
2013          */
2014         private boolean updateCommittedTable() {
2015
2016                 if (newCommitParts.size() == 0) {
2017                         // Nothing new to process
2018                         return false;
2019                 }
2020
2021                 // Iterate through all the machine Ids that we received new parts for
2022                 for (Long machineId : newCommitParts.keySet()) {
2023                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
2024
2025                         // Iterate through all the parts for that machine Id
2026                         for (Pair<Long, Integer> partId : parts.keySet()) {
2027                                 CommitPart part = parts.get(partId);
2028
2029                                 // Get the transaction object for that sequence number
2030                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2031
2032                                 if (commitForClientTable == null) {
2033                                         // This is the first commit from this device
2034                                         commitForClientTable = new HashMap<Long, Commit>();
2035                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2036                                 }
2037
2038                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2039
2040                                 if (commit == null) {
2041                                         // This is a new commit that we dont have so make a new one
2042                                         commit = new Commit();
2043
2044                                         // Insert this new commit into the live tables
2045                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2046                                 }
2047
2048                                 // Add that part to the commit
2049                                 commit.addPartDecode(part);
2050                         }
2051                 }
2052
2053                 // Clear all the new commits parts in preparation for the next time the server sends slots
2054                 newCommitParts.clear();
2055
2056                 // If we process a new commit keep track of it for future use
2057                 boolean didProcessANewCommit = false;
2058
2059                 // Process the commits one by one
2060                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2061
2062                         // Get all the commits for a specific arbitrator
2063                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2064
2065                         // Sort the commits in order
2066                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2067                         Collections.sort(commitSequenceNumbers);
2068
2069                         // Get the last commit seen from this arbitrator
2070                         long lastCommitSeenSequenceNumber = -1;
2071                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2072                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2073                         }
2074
2075                         // Go through each new commit one by one
2076                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2077                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2078                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2079
2080                                 // Special processing if a commit is not complete
2081                                 if (!commit.isComplete()) {
2082                                         if (i == (commitSequenceNumbers.size() - 1)) {
2083                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2084                                                 break;
2085                                         } else {
2086                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2087                                                 // Delete it and move on
2088                                                 commit.setDead();
2089                                                 commitForClientTable.remove(commit.getSequenceNumber());
2090                                                 continue;
2091                                         }
2092                                 }
2093
2094                                 // Update the last transaction that was updated if we can
2095                                 if (commit.getTransactionSequenceNumber() != -1) {
2096                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2097
2098                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2099                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2100                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2101                                         }
2102                                 }
2103
2104                                 // Update the last arbitration data that we have seen so far
2105                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2106
2107                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2108                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2109                                                 // Is larger
2110                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2111                                         }
2112                                 } else {
2113                                         // Never seen any data from this arbitrator so record the first one
2114                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2115                                 }
2116
2117                                 // We have already seen this commit before so need to do the full processing on this commit
2118                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2119
2120                                         // Update the last transaction that was updated if we can
2121                                         if (commit.getTransactionSequenceNumber() != -1) {
2122                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2123
2124                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2125                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2126                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2127                                                 }
2128                                         }
2129
2130                                         continue;
2131                                 }
2132
2133                                 // If we got here then this is a brand new commit and needs full processing
2134
2135                                 // Get what commits should be edited, these are the commits that have live values for their keys
2136                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2137                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2138                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2139                                 }
2140                                 commitsToEdit.remove(null); // remove null since it could be in this set
2141
2142                                 // Update each previous commit that needs to be updated
2143                                 for (Commit previousCommit : commitsToEdit) {
2144
2145                                         // Only bother with live commits (TODO: Maybe remove this check)
2146                                         if (previousCommit.isLive()) {
2147
2148                                                 // Update which keys in the old commits are still live
2149                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2150                                                         previousCommit.invalidateKey(kv.getKey());
2151                                                 }
2152
2153                                                 // if the commit is now dead then remove it
2154                                                 if (!previousCommit.isLive()) {
2155                                                         commitForClientTable.remove(previousCommit);
2156                                                 }
2157                                         }
2158                                 }
2159
2160                                 // Update the last seen sequence number from this arbitrator
2161                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2162                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2163                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2164                                         }
2165                                 } else {
2166                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2167                                 }
2168
2169                                 // We processed a new commit that we havent seen before
2170                                 didProcessANewCommit = true;
2171
2172                                 // Update the committed table of keys and which commit is using which key
2173                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2174                                         committedKeyValueTable.put(kv.getKey(), kv);
2175                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2176                                 }
2177                         }
2178                 }
2179
2180                 return didProcessANewCommit;
2181         }
2182
2183         /**
2184          * Create the speculative table from transactions that are still live and have come from the cloud
2185          */
2186         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2187                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2188                         // There is nothing to speculate on
2189                         return false;
2190                 }
2191
2192                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2193                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2194                 Collections.sort(transactionSequenceNumbersSorted);
2195
2196                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2197
2198
2199                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2200                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2201                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2202
2203                         // Start from scratch
2204                         speculatedKeyValueTable.clear();
2205                         lastTransactionSequenceNumberSpeculatedOn = -1;
2206                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2207
2208                 }
2209
2210                 // Remember the front of the transaction list
2211                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2212
2213                 // Find where to start arbitration from
2214                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2215
2216                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2217                         // Make sure we are not out of bounds
2218                         return false; // did not speculate
2219                 }
2220
2221                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2222                 boolean didSkip = true;
2223
2224                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2225                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2226                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2227
2228                         if (!transaction.isComplete()) {
2229                                 // If there is an incomplete transaction then there is nothing we can do
2230                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2231                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2232                                 didSkip = true;
2233                                 continue;
2234                         }
2235
2236                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2237                                 continue;
2238                         }
2239
2240                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2241
2242                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2243                                 // Guard evaluated to true so update the speculative table
2244                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2245                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2246                                 }
2247                         }
2248                 }
2249
2250                 if (didSkip) {
2251                         // Since there was a skip we need to redo the speculation next time around
2252                         lastTransactionSequenceNumberSpeculatedOn = -1;
2253                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2254                 }
2255
2256                 // We did some speculation
2257                 return true;
2258         }
2259
2260         /**
2261          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2262          */
2263         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2264                 if (pendingTransactionQueue.size() == 0) {
2265                         // There is nothing to speculate on
2266                         return;
2267                 }
2268
2269
2270                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2271                         // need to reset on the pending speculation
2272                         lastPendingTransactionSpeculatedOn = null;
2273                         firstPendingTransaction = pendingTransactionQueue.get(0);
2274                         pendingTransactionSpeculatedKeyValueTable.clear();
2275                 }
2276
2277                 // Find where to start arbitration from
2278                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2279
2280                 if (startIndex >= pendingTransactionQueue.size()) {
2281                         // Make sure we are not out of bounds
2282                         return;
2283                 }
2284
2285                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2286                         Transaction transaction = pendingTransactionQueue.get(i);
2287
2288                         lastPendingTransactionSpeculatedOn = transaction;
2289
2290                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2291                                 // Guard evaluated to true so update the speculative table
2292                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2293                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2294                                 }
2295                         }
2296                 }
2297         }
2298
2299         /**
2300          * Set dead and remove from the live transaction tables the transactions that are dead
2301          */
2302         private void updateLiveTransactionsAndStatus() {
2303
2304                 // Go through each of the transactions
2305                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2306                         Transaction transaction = iter.next().getValue();
2307
2308                         // Check if the transaction is dead
2309                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2310                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2311
2312                                 // Set dead the transaction
2313                                 transaction.setDead();
2314
2315                                 // Remove the transaction from the live table
2316                                 iter.remove();
2317                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2318                         }
2319                 }
2320
2321                 // Go through each of the transactions
2322                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2323                         TransactionStatus status = iter.next().getValue();
2324
2325                         // Check if the transaction is dead
2326                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2327                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2328
2329                                 // Set committed
2330                                 status.setStatus(TransactionStatus.StatusCommitted);
2331
2332                                 // Remove
2333                                 iter.remove();
2334                         }
2335                 }
2336         }
2337
2338         /**
2339          * Process this slot, entry by entry.  Also update the latest message sent by slot
2340          */
2341         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2342
2343                 // Update the last message seen
2344                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2345
2346                 // Process each entry in the slot
2347                 for (Entry entry : slot.getEntries()) {
2348                         switch (entry.getType()) {
2349
2350                                 case Entry.TypeCommitPart:
2351                                         processEntry((CommitPart)entry);
2352                                         break;
2353
2354                                 case Entry.TypeAbort:
2355                                         processEntry((Abort)entry);
2356                                         break;
2357
2358                                 case Entry.TypeTransactionPart:
2359                                         processEntry((TransactionPart)entry);
2360                                         break;
2361
2362                                 case Entry.TypeNewKey:
2363                                         processEntry((NewKey)entry);
2364                                         break;
2365
2366                                 case Entry.TypeLastMessage:
2367                                         processEntry((LastMessage)entry, machineSet);
2368                                         break;
2369
2370                                 case Entry.TypeRejectedMessage:
2371                                         processEntry((RejectedMessage)entry, indexer);
2372                                         break;
2373
2374                                 case Entry.TypeTableStatus:
2375                                         processEntry((TableStatus)entry, slot.getSequenceNumber());
2376                                         break;
2377
2378                                 default:
2379                                         throw new Error("Unrecognized type: " + entry.getType());
2380                         }
2381                 }
2382         }
2383
2384         /**
2385          * Update the last message that was sent for a machine Id
2386          */
2387         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2388                 // Update what the last message received by a machine was
2389                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2390         }
2391
2392         /**
2393          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2394          */
2395         private void processEntry(NewKey entry) {
2396
2397                 // Update the arbitrator table with the new key information
2398                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2399
2400                 // Update what the latest live new key is
2401                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2402                 if (oldNewKey != null) {
2403                         // Delete the old new key messages
2404                         oldNewKey.setDead();
2405                 }
2406         }
2407
2408         /**
2409          * Process new table status entries and set dead the old ones as new ones come in.
2410          * keeps track of the largest and smallest table status seen in this current round
2411          * of updating the local copy of the block chain
2412          */
2413         private void processEntry(TableStatus entry, long seq) {
2414                 int newNumSlots = entry.getMaxSlots();
2415                 updateCurrMaxSize(newNumSlots);
2416
2417                 initExpectedSize(seq, newNumSlots);
2418
2419                 if (liveTableStatus != null) {
2420                         // We have a larger table status so the old table status is no longer alive
2421                         liveTableStatus.setDead();
2422                 }
2423
2424                 // Make this new table status the latest alive table status
2425                 liveTableStatus = entry;
2426         }
2427
2428         /**
2429          * Check old messages to see if there is a block chain violation. Also
2430          */
2431         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2432                 long oldSeqNum = entry.getOldSeqNum();
2433                 long newSeqNum = entry.getNewSeqNum();
2434                 boolean isequal = entry.getEqual();
2435                 long machineId = entry.getMachineID();
2436                 long seq = entry.getSequenceNumber();
2437
2438
2439                 // Check if we have messages that were supposed to be rejected in our local block chain
2440                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2441
2442                         // Get the slot
2443                         Slot slot = indexer.getSlot(seqNum);
2444
2445                         if (slot != null) {
2446                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2447
2448                                 long slotMachineId = slot.getMachineID();
2449                                 if (isequal != (slotMachineId == machineId)) {
2450                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2451                                 }
2452                         }
2453                 }
2454
2455
2456                 // Create a list of clients to watch until they see this rejected message entry.
2457                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2458                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2459
2460                         // Machine ID for the last message entry
2461                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2462
2463                         // We've seen it, don't need to continue to watch.  Our next
2464                         // message will implicitly acknowledge it.
2465                         if (lastMessageEntryMachineId == localMachineId) {
2466                                 continue;
2467                         }
2468
2469                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2470                         long entrySequenceNumber = lastMessageValue.getFirst();
2471
2472                         if (entrySequenceNumber < seq) {
2473
2474                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2475                                 addWatchList(lastMessageEntryMachineId, entry);
2476
2477                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2478                                 deviceWatchSet.add(lastMessageEntryMachineId);
2479                         }
2480                 }
2481
2482                 if (deviceWatchSet.isEmpty()) {
2483                         // This rejected message has been seen by all the clients so
2484                         entry.setDead();
2485                 } else {
2486                         // We need to watch this rejected message
2487                         entry.setWatchSet(deviceWatchSet);
2488                 }
2489         }
2490
2491         /**
2492          * Check if this abort is live, if not then save it so we can kill it later.
2493          * update the last transaction number that was arbitrated on.
2494          */
2495         private void processEntry(Abort entry) {
2496
2497
2498                 if (entry.getTransactionSequenceNumber() != -1) {
2499                         // update the transaction status if it was sent to the server
2500                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2501                         if (status != null) {
2502                                 status.setStatus(TransactionStatus.StatusAborted);
2503                         }
2504                 }
2505
2506                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2507                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2508                 if (previouslySeenAbort != null) {
2509                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2510                 }
2511
2512                 if (entry.getTransactionArbitrator() == localMachineId) {
2513                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2514                 }
2515
2516                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2517
2518                         // The machine already saw this so it is dead
2519                         entry.setDead();
2520                         liveAbortTable.remove(entry.getAbortId());
2521
2522                         if (entry.getTransactionArbitrator() == localMachineId) {
2523                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2524                         }
2525
2526                         return;
2527                 }
2528
2529
2530
2531
2532                 // Update the last arbitration data that we have seen so far
2533                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2534
2535                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2536                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2537                                 // Is larger
2538                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2539                         }
2540                 } else {
2541                         // Never seen any data from this arbitrator so record the first one
2542                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2543                 }
2544
2545
2546                 // Set dead a transaction if we can
2547                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2548                 if (transactionToSetDead != null) {
2549                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2550                 }
2551
2552                 // Update the last transaction sequence number that the arbitrator arbitrated on
2553                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2554                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2555
2556                         // Is a valid one
2557                         if (entry.getTransactionSequenceNumber() != -1) {
2558                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2559                         }
2560                 }
2561         }
2562
2563         /**
2564          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2565          */
2566         private void processEntry(TransactionPart entry) {
2567                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2568                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2569                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2570                         // This transaction is dead, it was already committed or aborted
2571                         entry.setDead();
2572                         return;
2573                 }
2574
2575                 // This part is still alive
2576                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2577
2578                 if (transactionPart == null) {
2579                         // Dont have a table for this machine Id yet so make one
2580                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2581                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2582                 }
2583
2584                 // Update the part and set dead ones we have already seen (got a rescued version)
2585                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2586                 if (previouslySeenPart != null) {
2587                         previouslySeenPart.setDead();
2588                 }
2589         }
2590
2591         /**
2592          * Process new commit entries and save them for future use.  Delete duplicates
2593          */
2594         private void processEntry(CommitPart entry) {
2595
2596
2597                 // Update the last transaction that was updated if we can
2598                 if (entry.getTransactionSequenceNumber() != -1) {
2599                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2600
2601                         // Update the last transaction sequence number that the arbitrator arbitrated on
2602                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2603                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2604                         }
2605                 }
2606
2607
2608
2609
2610                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2611
2612                 if (commitPart == null) {
2613                         // Don't have a table for this machine Id yet so make one
2614                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2615                         newCommitParts.put(entry.getMachineId(), commitPart);
2616                 }
2617
2618                 // Update the part and set dead ones we have already seen (got a rescued version)
2619                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2620                 if (previouslySeenPart != null) {
2621                         previouslySeenPart.setDead();
2622                 }
2623         }
2624
2625         /**
2626          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2627          * Updates the live aborts, removes those that are dead and sets them dead.
2628          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2629          * other clients have not had a rollback on the last message.
2630          */
2631         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2632
2633                 // We have seen this machine ID
2634                 machineSet.remove(machineId);
2635
2636                 // Get the set of rejected messages that this machine Id is has not seen yet
2637                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2638
2639                 // If there is a rejected message that this machine Id has not seen yet
2640                 if (watchset != null) {
2641
2642                         // Go through each rejected message that this machine Id has not seen yet
2643                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2644
2645                                 RejectedMessage rm = rmit.next();
2646
2647                                 // If this machine Id has seen this rejected message...
2648                                 if (rm.getSequenceNumber() <= seqNum) {
2649
2650                                         // Remove it from our watchlist
2651                                         rmit.remove();
2652
2653                                         // Decrement machines that need to see this notification
2654                                         rm.removeWatcher(machineId);
2655                                 }
2656                         }
2657                 }
2658
2659                 // Set dead the abort
2660                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2661                         Abort abort = i.next().getValue();
2662
2663                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2664                                 abort.setDead();
2665                                 i.remove();
2666
2667                                 if (abort.getTransactionArbitrator() == localMachineId) {
2668                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2669                                 }
2670                         }
2671                 }
2672
2673
2674
2675                 if (machineId == localMachineId) {
2676                         // Our own messages are immediately dead.
2677                         if (liveness instanceof LastMessage) {
2678                                 ((LastMessage)liveness).setDead();
2679                         } else if (liveness instanceof Slot) {
2680                                 ((Slot)liveness).setDead();
2681                         } else {
2682                                 throw new Error("Unrecognized type");
2683                         }
2684                 }
2685
2686                 // Get the old last message for this device
2687                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2688                 if (lastMessageEntry == null) {
2689                         // If no last message then there is nothing else to process
2690                         return;
2691                 }
2692
2693                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2694                 Liveness lastEntry = lastMessageEntry.getSecond();
2695
2696                 // If it is not our machine Id since we already set ours to dead
2697                 if (machineId != localMachineId) {
2698                         if (lastEntry instanceof LastMessage) {
2699                                 ((LastMessage)lastEntry).setDead();
2700                         } else if (lastEntry instanceof Slot) {
2701                                 ((Slot)lastEntry).setDead();
2702                         } else {
2703                                 throw new Error("Unrecognized type");
2704                         }
2705                 }
2706
2707                 // Make sure the server is not playing any games
2708                 if (machineId == localMachineId) {
2709
2710                         if (hadPartialSendToServer) {
2711                                 // We were not making any updates and we had a machine mismatch
2712                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2713                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2714                                 }
2715
2716                         } else {
2717                                 // We were not making any updates and we had a machine mismatch
2718                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2719                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2720                                 }
2721                         }
2722                 } else {
2723                         if (lastMessageSeqNum > seqNum) {
2724                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2725                         }
2726                 }
2727         }
2728
2729         /**
2730          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2731          * rejected message entry and which have not.
2732          */
2733         private void addWatchList(long machineId, RejectedMessage entry) {
2734                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2735                 if (entries == null) {
2736                         // There is no set for this machine ID yet so create one
2737                         entries = new HashSet<RejectedMessage>();
2738                         rejectedMessageWatchListTable.put(machineId, entries);
2739                 }
2740                 entries.add(entry);
2741         }
2742
2743         /**
2744          * Check if the HMAC chain is not violated
2745          */
2746         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2747                 for (int i = 0; i < newSlots.length; i++) {
2748                         Slot currSlot = newSlots[i];
2749                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2750                         if (prevSlot != null &&
2751                                         !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2752                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2753                 }
2754         }
2755 }