6a00f82222e374244d1a27dfc0af91f6b924f25d
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23
24         /* Constants */
25         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31         /* Helper Objects */
32         private SlotBuffer buffer = null;
33         private CloudComm cloud = null;
34         private Random random = null;
35         private TableStatus liveTableStatus = null;
36         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
37         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
38         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
39
40         /* Variables */
41         private int numberOfSlots = 0;  // Number of slots stored in buffer
42         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
43         private long liveSlotCount = 0; // Number of currently live slots
44         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
45         private long localMachineId = 0; // Machine ID of this client device
46         private long sequenceNumber = 0; // Largest sequence number a client has received
47         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
48         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
49         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
50         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
51         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
52         private long localArbitrationSequenceNumber = 0;
53         private boolean hadPartialSendToServer = false;
54         private boolean attemptedToSendToServer = false;
55         private long expectedsize;
56         private boolean didFindTableStatus = false;
57         private long currMaxSize = 0;
58
59         private Slot lastSlotAttemptedToSend = null;
60         private boolean lastIsNewKey = false;
61         private int lastNewSize = 0;
62         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
63         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
64         private NewKey lastNewKey = null;
65
66
67         /* Data Structures  */
68         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
69         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
70         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
71         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
72         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
73         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
74         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
75         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
76         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
77         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
78         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
79         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
80         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
81         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
82         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
83         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
84         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
85         private List<Transaction> pendingTransactionQueue = null;
86         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
87         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
88         private Map<Transaction, List<Integer>> transactionPartsSent = null;
89         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
90         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
91         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
92         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
93         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
94         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
95
96
97         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
98                 localMachineId = _localMachineId;
99                 cloud = new CloudComm(this, baseurl, password, listeningPort);
100
101                 init();
102         }
103
104         public Table(CloudComm _cloud, long _localMachineId) {
105                 localMachineId = _localMachineId;
106                 cloud = _cloud;
107
108                 init();
109         }
110
111         /**
112          * Init all the stuff needed for for table usage
113          */
114         private void init() {
115
116                 // Init helper objects
117                 random = new Random();
118                 buffer = new SlotBuffer();
119
120                 // Set Variables
121                 oldestLiveSlotSequenceNumver = 1;
122
123                 // init data structs
124                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
125                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
126                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
127                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
128                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
129                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
130                 arbitratorTable = new HashMap<IoTString, Long>();
131                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
132                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
133                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
134                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
135                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
136                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
137                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
138                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
139                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
140                 rejectedSlotList = new Vector<Long>();
141                 pendingTransactionQueue = new ArrayList<Transaction>();
142                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
143                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
144                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
145                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
146                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
147                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
148                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
149                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
150                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
151
152
153                 // Other init stuff
154                 numberOfSlots = buffer.capacity();
155                 setResizeThreshold();
156         }
157
158         // TODO: delete method
159         public synchronized void printSlots() {
160                 long o = buffer.getOldestSeqNum();
161                 long n = buffer.getNewestSeqNum();
162
163                 int[] types = new int[10];
164
165                 int num = 0;
166
167                 int livec = 0;
168                 int deadc = 0;
169
170                 int casdasd = 0;
171
172                 for (long i = o; i < (n + 1); i++) {
173                         Slot s = buffer.getSlot(i);
174
175                         Vector<Entry> entries = s.getEntries();
176
177                         for (Entry e : entries) {
178                                 if (e.isLive()) {
179                                         int type = e.getType();
180
181
182                                         if(type == 6)
183                                         {
184                                                 RejectedMessage rej = (RejectedMessage)e;
185                                                 casdasd++;
186
187                                                 System.out.println(rej.getMachineID());
188                                         }
189
190
191                                         types[type] = types[type] + 1;
192                                         num++;
193                                         livec++;
194                                 } else {
195                                         deadc++;
196                                 }
197                         }
198                 }
199
200                 for (int i = 0; i < 10; i++) {
201                         System.out.println(i + "    " + types[i]);
202                 }
203                 System.out.println("Live count:   " + livec);
204                 System.out.println("Dead count:   " + deadc);
205                 System.out.println("Old:   " + o);
206                 System.out.println("New:   " + n);
207                 System.out.println("Size:   " + buffer.size());
208                 // System.out.println("Commits:   " + liveCommitsTable.size());
209                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
210                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
211
212                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
213                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
214                 }
215
216
217                 for (Long a : liveCommitsTable.keySet()) {
218                         for (Long b : liveCommitsTable.get(a).keySet()) {
219                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
220                                         System.out.print(kv + " ");
221                                 }
222                                 System.out.print("|| ");
223                         }
224                         System.out.println();
225                 }
226
227         }
228
229         /**
230          * Initialize the table by inserting a table status as the first entry into the table status
231          * also initialize the crypto stuff.
232          */
233         public synchronized void initTable() throws ServerException {
234                 cloud.initSecurity();
235
236                 // Create the first insertion into the block chain which is the table status
237                 Slot s = new Slot(this, 1, localMachineId);
238                 TableStatus status = new TableStatus(s, numberOfSlots);
239                 s.addEntry(status);
240                 Slot[] array = cloud.putSlot(s, numberOfSlots);
241
242                 if (array == null) {
243                         array = new Slot[] {s};
244                         // update local block chain
245                         validateAndUpdate(array, true);
246                 } else if (array.length == 1) {
247                         // in case we did push the slot BUT we failed to init it
248                         validateAndUpdate(array, true);
249                 } else {
250                         throw new Error("Error on initialization");
251                 }
252         }
253
254         /**
255          * Rebuild the table from scratch by pulling the latest block chain from the server.
256          */
257         public synchronized void rebuild() throws ServerException {
258                 // Just pull the latest slots from the server
259                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
260                 validateAndUpdate(newslots, true);
261                 sendToServer(null);
262                                         updateLiveTransactionsAndStatus();
263
264         }
265
266         // public String toString() {
267         //      String retString = " Committed Table: \n";
268         //      retString += "---------------------------\n";
269         //      retString += commitedTable.toString();
270
271         //      retString += "\n\n";
272
273         //      retString += " Speculative Table: \n";
274         //      retString += "---------------------------\n";
275         //      retString += speculativeTable.toString();
276
277         //      return retString;
278         // }
279
280         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
281                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
282         }
283
284         public synchronized Long getArbitrator(IoTString key) {
285                 return arbitratorTable.get(key);
286         }
287
288         public synchronized void close() {
289                 cloud.close();
290         }
291
292         public synchronized IoTString getCommitted(IoTString key)  {
293                 KeyValue kv = committedKeyValueTable.get(key);
294
295                 if (kv != null) {
296                         return kv.getValue();
297                 } else {
298                         return null;
299                 }
300         }
301
302         public synchronized IoTString getSpeculative(IoTString key) {
303                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
304
305                 if (kv == null) {
306                         kv = speculatedKeyValueTable.get(key);
307                 }
308
309                 if (kv == null) {
310                         kv = committedKeyValueTable.get(key);
311                 }
312
313                 if (kv != null) {
314                         return kv.getValue();
315                 } else {
316                         return null;
317                 }
318         }
319
320         public synchronized IoTString getCommittedAtomic(IoTString key) {
321                 KeyValue kv = committedKeyValueTable.get(key);
322
323                 if (arbitratorTable.get(key) == null) {
324                         throw new Error("Key not Found.");
325                 }
326
327                 // Make sure new key value pair matches the current arbitrator
328                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
329                         // TODO: Maybe not throw en error
330                         throw new Error("Not all Key Values Match Arbitrator.");
331                 }
332
333                 if (kv != null) {
334                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
335                         return kv.getValue();
336                 } else {
337                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
338                         return null;
339                 }
340         }
341
342         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
343                 if (arbitratorTable.get(key) == null) {
344                         throw new Error("Key not Found.");
345                 }
346
347                 // Make sure new key value pair matches the current arbitrator
348                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
349                         // TODO: Maybe not throw en error
350                         throw new Error("Not all Key Values Match Arbitrator.");
351                 }
352
353                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
354
355                 if (kv == null) {
356                         kv = speculatedKeyValueTable.get(key);
357                 }
358
359                 if (kv == null) {
360                         kv = committedKeyValueTable.get(key);
361                 }
362
363                 if (kv != null) {
364                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
365                         return kv.getValue();
366                 } else {
367                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
368                         return null;
369                 }
370         }
371
372         public synchronized boolean update()  {
373                 try {
374                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
375                         validateAndUpdate(newSlots, false);
376                         sendToServer(null);
377
378
379                         updateLiveTransactionsAndStatus();
380
381                         return true;
382                 } catch (Exception e) {
383                         // e.printStackTrace();
384
385                         for (Long m : localCommunicationTable.keySet()) {
386                                 updateFromLocal(m);
387                         }
388                 }
389
390                 return false;
391         }
392
393         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
394                 while (true) {
395                         if (arbitratorTable.get(keyName) != null) {
396                                 // There is already an arbitrator
397                                 return false;
398                         }
399
400                         NewKey newKey = new NewKey(null, keyName, machineId);
401                         if (sendToServer(newKey)) {
402                                 // If successfully inserted
403                                 return true;
404                         }
405                 }
406         }
407
408         public synchronized void startTransaction() {
409                 // Create a new transaction, invalidates any old pending transactions.
410                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
411         }
412
413         public synchronized void addKV(IoTString key, IoTString value) {
414
415                 // Make sure it is a valid key
416                 if (arbitratorTable.get(key) == null) {
417                         throw new Error("Key not Found.");
418                 }
419
420                 // Make sure new key value pair matches the current arbitrator
421                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
422                         // TODO: Maybe not throw en error
423                         throw new Error("Not all Key Values Match Arbitrator.");
424                 }
425
426                 // Add the key value to this transaction
427                 KeyValue kv = new KeyValue(key, value);
428                 pendingTransactionBuilder.addKV(kv);
429         }
430
431         public synchronized TransactionStatus commitTransaction() {
432
433                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
434                         // transaction with no updates will have no effect on the system
435                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
436                 }
437
438                 // Set the local transaction sequence number and increment
439                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
440                 localTransactionSequenceNumber++;
441
442                 // Create the transaction status
443                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
444
445                 // Create the new transaction
446                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
447                 newTransaction.setTransactionStatus(transactionStatus);
448
449                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
450                         // Add it to the queue and invalidate the builder for safety
451                         pendingTransactionQueue.add(newTransaction);
452                 } else {
453                         arbitrateOnLocalTransaction(newTransaction);
454                         updateLiveStateFromLocal();
455                 }
456
457                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
458
459                 try {
460                         sendToServer(null);
461                 } catch (ServerException e) {
462
463                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
464                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
465                                 Transaction transaction = iter.next();
466
467                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
468                                         // Already contacted this client so ignore all attempts to contact this client
469                                         // to preserve ordering for arbitrator
470                                         continue;
471                                 }
472
473                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
474
475                                 if (sendReturn.getFirst()) {
476                                         // Failed to contact over local
477                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
478                                 } else {
479                                         // Successful contact or should not contact
480
481                                         if (sendReturn.getSecond()) {
482                                                 // did arbitrate
483                                                 iter.remove();
484                                         }
485                                 }
486                         }
487                 }
488
489                 updateLiveStateFromLocal();
490
491                 return transactionStatus;
492         }
493
494         /**
495          * Get the machine ID for this client
496          */
497         public long getMachineId() {
498                 return localMachineId;
499         }
500
501         /**
502          * Decrement the number of live slots that we currently have
503          */
504         public void decrementLiveCount() {
505                 liveSlotCount--;
506         }
507
508         /**
509          * Recalculate the new resize threshold
510          */
511         private void setResizeThreshold() {
512                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
513                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
514         }
515
516
517         boolean lastInsertedNewKey = false;
518
519         private boolean sendToServer(NewKey newKey) throws ServerException {
520
521                 boolean fromRetry = false;
522
523                 try {
524                         if (hadPartialSendToServer) {
525                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
526                                 if (newSlots.length == 0) {
527                                         fromRetry = true;
528                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
529
530                                         if (sendSlotsReturn.getFirst()) {
531                                                 if (newKey != null) {
532                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
533                                                                 newKey = null;
534                                                         }
535                                                 }
536
537                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
538                                                         transaction.resetServerFailure();
539
540                                                         // Update which transactions parts still need to be sent
541                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
542
543                                                         // Add the transaction status to the outstanding list
544                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
545
546                                                         // Update the transaction status
547                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
548
549                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
550                                                         if (transaction.didSendAllParts()) {
551                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
552                                                                 pendingTransactionQueue.remove(transaction);
553                                                         }
554                                                 }
555                                         } else {
556
557                                                 newSlots = sendSlotsReturn.getThird();
558
559                                                 boolean isInserted = false;
560                                                 for (Slot s : newSlots) {
561                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
562                                                                 isInserted = true;
563                                                                 break;
564                                                         }
565                                                 }
566
567                                                 for (Slot s : newSlots) {
568                                                         if (isInserted) {
569                                                                 break;
570                                                         }
571
572                                                         // Process each entry in the slot
573                                                         for (Entry entry : s.getEntries()) {
574
575                                                                 if (entry.getType() == Entry.TypeLastMessage) {
576                                                                         LastMessage lastMessage = (LastMessage)entry;
577                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
578                                                                                 isInserted = true;
579                                                                                 break;
580                                                                         }
581                                                                 }
582                                                         }
583                                                 }
584
585                                                 if (isInserted) {
586                                                         if (newKey != null) {
587                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
588                                                                         newKey = null;
589                                                                 }
590                                                         }
591
592                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
593                                                                 transaction.resetServerFailure();
594
595                                                                 // Update which transactions parts still need to be sent
596                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
597
598                                                                 // Add the transaction status to the outstanding list
599                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
600
601                                                                 // Update the transaction status
602                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
603
604                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
605                                                                 if (transaction.didSendAllParts()) {
606                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
607                                                                         pendingTransactionQueue.remove(transaction);
608                                                                 } else {
609                                                                         transaction.resetServerFailure();
610                                                                         // Set the transaction sequence number back to nothing
611                                                                         if (!transaction.didSendAPartToServer()) {
612                                                                                 transaction.setSequenceNumber(-1);
613                                                                         }
614                                                                 }
615                                                         }
616                                                 }
617                                         }
618
619                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
620                                                 transaction.resetServerFailure();
621                                                 // Set the transaction sequence number back to nothing
622                                                 if (!transaction.didSendAPartToServer()) {
623                                                         transaction.setSequenceNumber(-1);
624                                                 }
625                                         }
626
627                                         if (sendSlotsReturn.getThird().length != 0) {
628                                                 // insert into the local block chain
629                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
630                                         }
631                                         // continue;
632                                 } else {
633                                         boolean isInserted = false;
634                                         for (Slot s : newSlots) {
635                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
636                                                         isInserted = true;
637                                                         break;
638                                                 }
639                                         }
640
641                                         for (Slot s : newSlots) {
642                                                 if (isInserted) {
643                                                         break;
644                                                 }
645
646                                                 // Process each entry in the slot
647                                                 for (Entry entry : s.getEntries()) {
648
649                                                         if (entry.getType() == Entry.TypeLastMessage) {
650                                                                 LastMessage lastMessage = (LastMessage)entry;
651                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
652                                                                         isInserted = true;
653                                                                         break;
654                                                                 }
655                                                         }
656                                                 }
657                                         }
658
659                                         if (isInserted) {
660                                                 if (newKey != null) {
661                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
662                                                                 newKey = null;
663                                                         }
664                                                 }
665
666                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
667                                                         transaction.resetServerFailure();
668
669                                                         // Update which transactions parts still need to be sent
670                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
671
672                                                         // Add the transaction status to the outstanding list
673                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
674
675                                                         // Update the transaction status
676                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
677
678                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
679                                                         if (transaction.didSendAllParts()) {
680                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
681                                                                 pendingTransactionQueue.remove(transaction);
682                                                         } else {
683                                                                 transaction.resetServerFailure();
684                                                                 // Set the transaction sequence number back to nothing
685                                                                 if (!transaction.didSendAPartToServer()) {
686                                                                         transaction.setSequenceNumber(-1);
687                                                                 }
688                                                         }
689                                                 }
690                                         } else {
691                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
692                                                         transaction.resetServerFailure();
693                                                         // Set the transaction sequence number back to nothing
694                                                         if (!transaction.didSendAPartToServer()) {
695                                                                 transaction.setSequenceNumber(-1);
696                                                         }
697                                                 }
698                                         }
699
700                                         // insert into the local block chain
701                                         validateAndUpdate(newSlots, true);
702                                 }
703                         }
704                 } catch (ServerException e) {
705                         throw e;
706                 }
707
708
709                 try {
710                         // While we have stuff that needs inserting into the block chain
711                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
712                                 fromRetry = false;
713
714                                 if (hadPartialSendToServer) {
715                                         throw new Error("Should Be error free");
716                                 }
717
718
719
720                                 // If there is a new key with same name then end
721                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
722                                         return false;
723                                 }
724
725                                 // Create the slot
726                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
727
728                                 // Try to fill the slot with data
729                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
730                                 boolean needsResize = fillSlotsReturn.getFirst();
731                                 int newSize = fillSlotsReturn.getSecond();
732                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
733
734                                 if (needsResize) {
735                                         // Reset which transaction to send
736                                         for (Transaction transaction : transactionPartsSent.keySet()) {
737                                                 transaction.resetNextPartToSend();
738
739                                                 // Set the transaction sequence number back to nothing
740                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
741                                                         transaction.setSequenceNumber(-1);
742                                                 }
743                                         }
744
745                                         // Clear the sent data since we are trying again
746                                         pendingSendArbitrationEntriesToDelete.clear();
747                                         transactionPartsSent.clear();
748
749                                         // We needed a resize so try again
750                                         fillSlot(slot, true, newKey);
751                                 }
752
753                                 lastSlotAttemptedToSend = slot;
754                                 lastIsNewKey = (newKey != null);
755                                 lastInsertedNewKey = insertedNewKey;
756                                 lastNewSize = newSize;
757                                 lastNewKey = newKey;
758                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
759                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
760
761
762                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
763
764                                 if (sendSlotsReturn.getFirst()) {
765
766                                         // Did insert into the block chain
767
768                                         if (insertedNewKey) {
769                                                 // This slot was what was inserted not a previous slot
770
771                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
772                                                 newKey = null;
773                                         }
774
775                                         // Remove the aborts and commit parts that were sent from the pending to send queue
776                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
777                                                 ArbitrationRound round = iter.next();
778                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
779
780                                                 if (round.isDoneSending()) {
781                                                         // Sent all the parts
782                                                         iter.remove();
783                                                 }
784                                         }
785
786                                         for (Transaction transaction : transactionPartsSent.keySet()) {
787                                                 transaction.resetServerFailure();
788
789                                                 // Update which transactions parts still need to be sent
790                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
791
792                                                 // Add the transaction status to the outstanding list
793                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
794
795                                                 // Update the transaction status
796                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
797
798                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
799                                                 if (transaction.didSendAllParts()) {
800                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
801                                                         pendingTransactionQueue.remove(transaction);
802                                                 }
803                                         }
804                                 } else {
805
806                                         // if (!sendSlotsReturn.getSecond()) {
807                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
808                                         //              transaction.resetServerFailure();
809                                         //      }
810                                         // } else {
811                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
812                                         //              transaction.resetServerFailure();
813
814                                         //              // Update which transactions parts still need to be sent
815                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
816
817                                         //              // Add the transaction status to the outstanding list
818                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
819
820                                         //              // Update the transaction status
821                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
822
823                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
824                                         //              if (transaction.didSendAllParts()) {
825                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
826                                         //                      pendingTransactionQueue.remove(transaction);
827
828                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
829                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
830                                         //                      }
831                                         //              }
832                                         //      }
833                                         // }
834
835                                         // Reset which transaction to send
836                                         for (Transaction transaction : transactionPartsSent.keySet()) {
837                                                 transaction.resetNextPartToSend();
838                                                 // transaction.resetNextPartToSend();
839
840                                                 // Set the transaction sequence number back to nothing
841                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
842                                                         transaction.setSequenceNumber(-1);
843                                                 }
844                                         }
845                                 }
846
847                                 // Clear the sent data in preparation for next send
848                                 pendingSendArbitrationEntriesToDelete.clear();
849                                 transactionPartsSent.clear();
850
851                                 if (sendSlotsReturn.getThird().length != 0) {
852                                         // insert into the local block chain
853                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
854                                 }
855                         }
856
857                 } catch (ServerException e) {
858
859                         if (e.getType() != ServerException.TypeInputTimeout) {
860                                 // e.printStackTrace();
861
862                                 // Nothing was able to be sent to the server so just clear these data structures
863                                 for (Transaction transaction : transactionPartsSent.keySet()) {
864                                         transaction.resetNextPartToSend();
865
866                                         // Set the transaction sequence number back to nothing
867                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
868                                                 transaction.setSequenceNumber(-1);
869                                         }
870                                 }
871                         } else {
872                                 // There was a partial send to the server
873                                 hadPartialSendToServer = true;
874
875
876                                 // if (!fromRetry) {
877                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
878                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
879                                 // }
880
881                                 // Nothing was able to be sent to the server so just clear these data structures
882                                 for (Transaction transaction : transactionPartsSent.keySet()) {
883                                         transaction.resetNextPartToSend();
884                                         transaction.setServerFailure();
885                                 }
886                         }
887
888                         pendingSendArbitrationEntriesToDelete.clear();
889                         transactionPartsSent.clear();
890
891                         throw e;
892                 }
893
894                 return newKey == null;
895         }
896
897         private synchronized boolean updateFromLocal(long machineId) {
898                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
899                 if (localCommunicationInformation == null) {
900                         // Cant talk to that device locally so do nothing
901                         return false;
902                 }
903
904                 // Get the size of the send data
905                 int sendDataSize = Integer.BYTES + Long.BYTES;
906
907                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
908                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
909                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
910                 }
911
912                 byte[] sendData = new byte[sendDataSize];
913                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
914
915                 // Encode the data
916                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
917                 bbEncode.putInt(0);
918
919                 // Send by local
920                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
921
922                 if (returnData == null) {
923                         // Could not contact server
924                         return false;
925                 }
926
927                 // Decode the data
928                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
929                 int numberOfEntries = bbDecode.getInt();
930
931                 for (int i = 0; i < numberOfEntries; i++) {
932                         byte type = bbDecode.get();
933                         if (type == Entry.TypeAbort) {
934                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
935                                 processEntry(abort);
936                         } else if (type == Entry.TypeCommitPart) {
937                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
938                                 processEntry(commitPart);
939                         }
940                 }
941
942                 updateLiveStateFromLocal();
943
944                 return true;
945         }
946
947         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
948
949                 // Get the devices local communications
950                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
951
952                 if (localCommunicationInformation == null) {
953                         // Cant talk to that device locally so do nothing
954                         return new Pair<Boolean, Boolean>(true, false);
955                 }
956
957                 // Get the size of the send data
958                 int sendDataSize = Integer.BYTES + Long.BYTES;
959                 for (TransactionPart part : transaction.getParts().values()) {
960                         sendDataSize += part.getSize();
961                 }
962
963                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
964                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
965                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
966                 }
967
968                 // Make the send data size
969                 byte[] sendData = new byte[sendDataSize];
970                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
971
972                 // Encode the data
973                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
974                 bbEncode.putInt(transaction.getParts().size());
975                 for (TransactionPart part : transaction.getParts().values()) {
976                         part.encode(bbEncode);
977                 }
978
979
980                 // Send by local
981                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
982
983                 if (returnData == null) {
984                         // Could not contact server
985                         return new Pair<Boolean, Boolean>(true, false);
986                 }
987
988                 // Decode the data
989                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
990                 boolean didCommit = bbDecode.get() == 1;
991                 boolean couldArbitrate = bbDecode.get() == 1;
992                 int numberOfEntries = bbDecode.getInt();
993                 boolean foundAbort = false;
994
995                 for (int i = 0; i < numberOfEntries; i++) {
996                         byte type = bbDecode.get();
997                         if (type == Entry.TypeAbort) {
998                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
999
1000                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1001                                         foundAbort = true;
1002                                 }
1003
1004                                 processEntry(abort);
1005                         } else if (type == Entry.TypeCommitPart) {
1006                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
1007                                 processEntry(commitPart);
1008                         }
1009                 }
1010
1011                 updateLiveStateFromLocal();
1012
1013                 if (couldArbitrate) {
1014                         TransactionStatus status =  transaction.getTransactionStatus();
1015                         if (didCommit) {
1016                                 status.setStatus(TransactionStatus.StatusCommitted);
1017                         } else {
1018                                 status.setStatus(TransactionStatus.StatusAborted);
1019                         }
1020                 } else {
1021                         TransactionStatus status =  transaction.getTransactionStatus();
1022                         if (foundAbort) {
1023                                 status.setStatus(TransactionStatus.StatusAborted);
1024                         } else {
1025                                 status.setStatus(TransactionStatus.StatusCommitted);
1026                         }
1027                 }
1028
1029                 return new Pair<Boolean, Boolean>(false, true);
1030         }
1031
1032         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1033
1034                 // Decode the data
1035                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1036                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1037                 int numberOfParts = bbDecode.getInt();
1038
1039                 // If we did commit a transaction or not
1040                 boolean didCommit = false;
1041                 boolean couldArbitrate = false;
1042
1043                 if (numberOfParts != 0) {
1044
1045                         // decode the transaction
1046                         Transaction transaction = new Transaction();
1047                         for (int i = 0; i < numberOfParts; i++) {
1048                                 bbDecode.get();
1049                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1050                                 transaction.addPartDecode(newPart);
1051                         }
1052
1053                         // Arbitrate on transaction and pull relevant return data
1054                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1055                         couldArbitrate = localArbitrateReturn.getFirst();
1056                         didCommit = localArbitrateReturn.getSecond();
1057
1058                         updateLiveStateFromLocal();
1059
1060                         // Transaction was sent to the server so keep track of it to prevent double commit
1061                         if (transaction.getSequenceNumber() != -1) {
1062                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1063                         }
1064                 }
1065
1066                 // The data to send back
1067                 int returnDataSize = 0;
1068                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1069
1070                 // Get the aborts to send back
1071                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1072                 Collections.sort(abortLocalSequenceNumbers);
1073                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1074                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1075                                 continue;
1076                         }
1077
1078                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1079                         unseenArbitrations.add(abort);
1080                         returnDataSize += abort.getSize();
1081                 }
1082
1083                 // Get the commits to send back
1084                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1085                 if (commitForClientTable != null) {
1086                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1087                         Collections.sort(commitLocalSequenceNumbers);
1088
1089                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1090                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1091
1092                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1093                                         continue;
1094                                 }
1095
1096                                 unseenArbitrations.addAll(commit.getParts().values());
1097
1098                                 for (CommitPart commitPart : commit.getParts().values()) {
1099                                         returnDataSize += commitPart.getSize();
1100                                 }
1101                         }
1102                 }
1103
1104                 // Number of arbitration entries to decode
1105                 returnDataSize += 2 * Integer.BYTES;
1106
1107                 // Boolean of did commit or not
1108                 if (numberOfParts != 0) {
1109                         returnDataSize += Byte.BYTES;
1110                 }
1111
1112                 // Data to send Back
1113                 byte[] returnData = new byte[returnDataSize];
1114                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1115
1116                 if (numberOfParts != 0) {
1117                         if (didCommit) {
1118                                 bbEncode.put((byte)1);
1119                         } else {
1120                                 bbEncode.put((byte)0);
1121                         }
1122                         if (couldArbitrate) {
1123                                 bbEncode.put((byte)1);
1124                         } else {
1125                                 bbEncode.put((byte)0);
1126                         }
1127                 }
1128
1129                 bbEncode.putInt(unseenArbitrations.size());
1130                 for (Entry entry : unseenArbitrations) {
1131                         entry.encode(bbEncode);
1132                 }
1133
1134                 return returnData;
1135         }
1136
1137         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1138
1139                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1140                 attemptedToSendToServer = true;
1141
1142                 boolean inserted = false;
1143                 boolean lastTryInserted = false;
1144
1145                 Slot[] array = cloud.putSlot(slot, newSize);
1146                 if (array == null) {
1147                         array = new Slot[] {slot};
1148                         rejectedSlotList.clear();
1149                         inserted = true;
1150                 }       else {
1151                         if (array.length == 0) {
1152                                 throw new Error("Server Error: Did not send any slots");
1153                         }
1154
1155                         // if (attemptedToSendToServerTmp) {
1156                         if (hadPartialSendToServer) {
1157
1158                                 boolean isInserted = false;
1159                                 for (Slot s : array) {
1160                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1161                                                 isInserted = true;
1162                                                 break;
1163                                         }
1164                                 }
1165
1166                                 for (Slot s : array) {
1167                                         if (isInserted) {
1168                                                 break;
1169                                         }
1170
1171                                         // Process each entry in the slot
1172                                         for (Entry entry : s.getEntries()) {
1173
1174                                                 if (entry.getType() == Entry.TypeLastMessage) {
1175                                                         LastMessage lastMessage = (LastMessage)entry;
1176
1177                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1178                                                                 isInserted = true;
1179                                                                 break;
1180                                                         }
1181                                                 }
1182                                         }
1183                                 }
1184
1185                                 if (!isInserted) {
1186                                         rejectedSlotList.add(slot.getSequenceNumber());
1187                                         lastTryInserted = false;
1188                                 } else {
1189                                         lastTryInserted = true;
1190                                 }
1191                         } else {
1192                                 rejectedSlotList.add(slot.getSequenceNumber());
1193                                 lastTryInserted = false;
1194                         }
1195                 }
1196
1197                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1198         }
1199
1200         /**
1201          * Returns false if a resize was needed
1202          */
1203         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1204                 int newSize = 0;
1205                 if (liveSlotCount > bufferResizeThreshold) {
1206                         resize = true; //Resize is forced
1207                 }
1208
1209                 if (resize) {
1210                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1211                         TableStatus status = new TableStatus(slot, newSize);
1212                         slot.addEntry(status);
1213                 }
1214
1215                 // Fill with rejected slots first before doing anything else
1216                 doRejectedMessages(slot);
1217
1218                 // Do mandatory rescue of entries
1219                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1220
1221                 // Extract working variables
1222                 boolean needsResize = mandatoryRescueReturn.getFirst();
1223                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1224                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1225
1226                 if (needsResize && !resize) {
1227                         // We need to resize but we are not resizing so return false
1228                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1229                 }
1230
1231                 boolean inserted = false;
1232                 if (newKeyEntry != null) {
1233                         newKeyEntry.setSlot(slot);
1234                         if (slot.hasSpace(newKeyEntry)) {
1235                                 slot.addEntry(newKeyEntry);
1236                                 inserted = true;
1237                         }
1238                 }
1239
1240                 // Clear the transactions, aborts and commits that were sent previously
1241                 transactionPartsSent.clear();
1242                 pendingSendArbitrationEntriesToDelete.clear();
1243
1244                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1245                         boolean isFull = false;
1246                         round.generateParts();
1247                         List<Entry> parts = round.getParts();
1248
1249                         // Insert pending arbitration data
1250                         for (Entry arbitrationData : parts) {
1251
1252                                 // If it is an abort then we need to set some information
1253                                 if (arbitrationData instanceof Abort) {
1254                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1255                                 }
1256
1257                                 if (!slot.hasSpace(arbitrationData)) {
1258                                         // No space so cant do anything else with these data entries
1259                                         isFull = true;
1260                                         break;
1261                                 }
1262
1263                                 // Add to this current slot and add it to entries to delete
1264                                 slot.addEntry(arbitrationData);
1265                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1266                         }
1267
1268                         if (isFull) {
1269                                 break;
1270                         }
1271                 }
1272
1273                 if (pendingTransactionQueue.size() > 0) {
1274
1275                         Transaction transaction = pendingTransactionQueue.get(0);
1276
1277                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1278                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1279                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1280                         // }
1281
1282                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1283                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1284                         }
1285
1286
1287                         while (true) {
1288                                 TransactionPart part = transaction.getNextPartToSend();
1289
1290                                 if (part == null) {
1291                                         // Ran out of parts to send for this transaction so move on
1292                                         break;
1293                                 }
1294
1295                                 if (slot.hasSpace(part)) {
1296                                         slot.addEntry(part);
1297                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1298                                         if (partsSent == null) {
1299                                                 partsSent = new ArrayList<Integer>();
1300                                                 transactionPartsSent.put(transaction, partsSent);
1301                                         }
1302                                         partsSent.add(part.getPartNumber());
1303                                         transactionPartsSent.put(transaction, partsSent);
1304                                 } else {
1305                                         break;
1306                                 }
1307                         }
1308                 }
1309
1310                 // Fill the remainder of the slot with rescue data
1311                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1312
1313                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1314         }
1315
1316         private void doRejectedMessages(Slot s) {
1317                 if (! rejectedSlotList.isEmpty()) {
1318                         /* TODO: We should avoid generating a rejected message entry if
1319                          * there is already a sufficient entry in the queue (e.g.,
1320                          * equalsto value of true and same sequence number).  */
1321
1322                         long old_seqn = rejectedSlotList.firstElement();
1323                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1324                                 long new_seqn = rejectedSlotList.lastElement();
1325                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1326                                 s.addEntry(rm);
1327                         } else {
1328                                 long prev_seqn = -1;
1329                                 int i = 0;
1330                                 /* Go through list of missing messages */
1331                                 for (; i < rejectedSlotList.size(); i++) {
1332                                         long curr_seqn = rejectedSlotList.get(i);
1333                                         Slot s_msg = buffer.getSlot(curr_seqn);
1334                                         if (s_msg != null)
1335                                                 break;
1336                                         prev_seqn = curr_seqn;
1337                                 }
1338                                 /* Generate rejected message entry for missing messages */
1339                                 if (prev_seqn != -1) {
1340                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1341                                         s.addEntry(rm);
1342                                 }
1343                                 /* Generate rejected message entries for present messages */
1344                                 for (; i < rejectedSlotList.size(); i++) {
1345                                         long curr_seqn = rejectedSlotList.get(i);
1346                                         Slot s_msg = buffer.getSlot(curr_seqn);
1347                                         long machineid = s_msg.getMachineID();
1348                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1349                                         s.addEntry(rm);
1350                                 }
1351                         }
1352                 }
1353         }
1354
1355         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1356                 long newestSequenceNumber = buffer.getNewestSeqNum();
1357                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1358                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1359                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1360                 }
1361
1362                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1363                 boolean seenLiveSlot = false;
1364                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1365                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1366
1367
1368                 // Mandatory Rescue
1369                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1370                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1371                         // Push slot number forward
1372                         if (! seenLiveSlot) {
1373                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1374                         }
1375
1376                         if (!previousSlot.isLive()) {
1377                                 continue;
1378                         }
1379
1380                         // We have seen a live slot
1381                         seenLiveSlot = true;
1382
1383                         // Get all the live entries for a slot
1384                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1385
1386                         // Iterate over all the live entries and try to rescue them
1387                         for (Entry liveEntry : liveEntries) {
1388                                 if (slot.hasSpace(liveEntry)) {
1389
1390                                         // Enough space to rescue the entry
1391                                         slot.addEntry(liveEntry);
1392                                 } else if (currentSequenceNumber == firstIfFull) {
1393                                         //if there's no space but the entry is about to fall off the queue
1394                                         System.out.println("B"); //?
1395                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1396
1397                                 }
1398                         }
1399                 }
1400
1401                 // Did not resize
1402                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1403         }
1404
1405         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1406                 /* now go through live entries from least to greatest sequence number until
1407                  * either all live slots added, or the slot doesn't have enough room
1408                  * for SKIP_THRESHOLD consecutive entries*/
1409                 int skipcount = 0;
1410                 long newestseqnum = buffer.getNewestSeqNum();
1411                 search:
1412                 for (; seqn <= newestseqnum; seqn++) {
1413                         Slot prevslot = buffer.getSlot(seqn);
1414                         //Push slot number forward
1415                         if (!seenliveslot)
1416                                 oldestLiveSlotSequenceNumver = seqn;
1417
1418                         if (!prevslot.isLive())
1419                                 continue;
1420                         seenliveslot = true;
1421                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1422                         for (Entry liveentry : liveentries) {
1423                                 if (s.hasSpace(liveentry))
1424                                         s.addEntry(liveentry);
1425                                 else {
1426                                         skipcount++;
1427                                         if (skipcount > SKIP_THRESHOLD)
1428                                                 break search;
1429                                 }
1430                         }
1431                 }
1432         }
1433
1434         /**
1435          * Checks for malicious activity and updates the local copy of the block chain.
1436          */
1437         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1438
1439                 // The cloud communication layer has checked slot HMACs already before decoding
1440                 if (newSlots.length == 0) {
1441                         return;
1442                 }
1443
1444                 // Make sure all slots are newer than the last largest slot this client has seen
1445                 long firstSeqNum = newSlots[0].getSequenceNumber();
1446                 if (firstSeqNum <= sequenceNumber) {
1447                         throw new Error("Server Error: Sent older slots!");
1448                 }
1449
1450                 // Create an object that can access both new slots and slots in our local chain
1451                 // without committing slots to our local chain
1452                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1453
1454                 // Check that the HMAC chain is not broken
1455                 checkHMACChain(indexer, newSlots);
1456
1457                 // Set to keep track of messages from clients
1458                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1459
1460                 // Process each slots data
1461                 for (Slot slot : newSlots) {
1462                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1463
1464                         updateExpectedSize();
1465                 }
1466
1467                 // If there is a gap, check to see if the server sent us everything.
1468                 if (firstSeqNum != (sequenceNumber + 1)) {
1469
1470                         // Check the size of the slots that were sent down by the server.
1471                         // Can only check the size if there was a gap
1472                         checkNumSlots(newSlots.length);
1473
1474                         // Since there was a gap every machine must have pushed a slot or must have
1475                         // a last message message.  If not then the server is hiding slots
1476                         if (!machineSet.isEmpty()) {
1477                                 throw new Error("Missing record for machines: " + machineSet);
1478                         }
1479                 }
1480
1481                 // Update the size of our local block chain.
1482                 commitNewMaxSize();
1483
1484                 // Commit new to slots to the local block chain.
1485                 for (Slot slot : newSlots) {
1486
1487                         // Insert this slot into our local block chain copy.
1488                         buffer.putSlot(slot);
1489
1490                         // Keep track of how many slots are currently live (have live data in them).
1491                         liveSlotCount++;
1492                 }
1493
1494                 // Get the sequence number of the latest slot in the system
1495                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1496
1497                 updateLiveStateFromServer();
1498
1499                 // No Need to remember after we pulled from the server
1500                 offlineTransactionsCommittedAndAtServer.clear();
1501
1502                 // This is invalidated now
1503                 hadPartialSendToServer = false;
1504         }
1505
1506         private void updateLiveStateFromServer() {
1507                 // Process the new transaction parts
1508                 processNewTransactionParts();
1509
1510                 // Do arbitration on new transactions that were received
1511                 arbitrateFromServer();
1512
1513                 // Update all the committed keys
1514                 boolean didCommitOrSpeculate = updateCommittedTable();
1515
1516                 // Delete the transactions that are now dead
1517                 updateLiveTransactionsAndStatus();
1518
1519                 // Do speculations
1520                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1521                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1522         }
1523
1524         private void updateLiveStateFromLocal() {
1525                 // Update all the committed keys
1526                 boolean didCommitOrSpeculate = updateCommittedTable();
1527
1528                 // Delete the transactions that are now dead
1529                 updateLiveTransactionsAndStatus();
1530
1531                 // Do speculations
1532                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1533                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1534         }
1535
1536         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1537                 // if (didFindTableStatus) {
1538                 // return;
1539                 // }
1540                 long prevslots = firstSequenceNumber;
1541
1542
1543                 if (didFindTableStatus) {
1544                         // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1545                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1546
1547                 } else {
1548                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1549                         // System.out.println("Here: " + expectedsize);
1550                 }
1551
1552                 // System.out.println(numberOfSlots);
1553
1554                 didFindTableStatus = true;
1555                 currMaxSize = numberOfSlots;
1556         }
1557
1558         private void updateExpectedSize() {
1559                 expectedsize++;
1560
1561                 if (expectedsize > currMaxSize) {
1562                         expectedsize = currMaxSize;
1563                 }
1564
1565                                         // System.out.println("" + expectedsize);
1566
1567         }
1568
1569
1570         /**
1571          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1572          * This is only called when we have a gap between the slots that we have locally and the slots
1573          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1574          * status message
1575          */
1576         private void checkNumSlots(int numberOfSlots) {
1577                 if (numberOfSlots != expectedsize) {
1578                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1579                 }
1580         }
1581
1582         private void updateCurrMaxSize(int newmaxsize) {
1583                 currMaxSize = newmaxsize;
1584         }
1585
1586
1587         /**
1588          * Update the size of of the local buffer if it is needed.
1589          */
1590         private void commitNewMaxSize() {
1591                 didFindTableStatus = false;
1592
1593                 // Resize the local slot buffer
1594                 if (numberOfSlots != currMaxSize) {
1595                         buffer.resize((int)currMaxSize);
1596                 }
1597
1598                 // Change the number of local slots to the new size
1599                 numberOfSlots = (int)currMaxSize;
1600
1601                 // Recalculate the resize threshold since the size of the local buffer has changed
1602                 setResizeThreshold();
1603         }
1604
1605         /**
1606          * Process the new transaction parts from this latest round of slots received from the server
1607          */
1608         private void processNewTransactionParts() {
1609
1610                 if (newTransactionParts.size() == 0) {
1611                         // Nothing new to process
1612                         return;
1613                 }
1614
1615                 // Iterate through all the machine Ids that we received new parts for
1616                 for (Long machineId : newTransactionParts.keySet()) {
1617                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1618
1619                         // Iterate through all the parts for that machine Id
1620                         for (Pair<Long, Integer> partId : parts.keySet()) {
1621                                 TransactionPart part = parts.get(partId);
1622
1623                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1624                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1625                                         // Set dead the transaction part
1626                                         part.setDead();
1627                                         continue;
1628                                 }
1629
1630                                 // Get the transaction object for that sequence number
1631                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1632
1633                                 if (transaction == null) {
1634                                         // This is a new transaction that we dont have so make a new one
1635                                         transaction = new Transaction();
1636
1637                                         // Insert this new transaction into the live tables
1638                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1639                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1640                                 }
1641
1642                                 // Add that part to the transaction
1643                                 transaction.addPartDecode(part);
1644                         }
1645                 }
1646
1647                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1648                 newTransactionParts.clear();
1649         }
1650
1651
1652         private long lastSeqNumArbOn = 0;
1653
1654         private void arbitrateFromServer() {
1655
1656                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1657                         // Nothing to arbitrate on so move on
1658                         return;
1659                 }
1660
1661                 // Get the transaction sequence numbers and sort from oldest to newest
1662                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1663                 Collections.sort(transactionSequenceNumbers);
1664
1665                 // Collection of key value pairs that are
1666                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1667
1668                 // The last transaction arbitrated on
1669                 long lastTransactionCommitted = -1;
1670                 Set<Abort> generatedAborts = new HashSet<Abort>();
1671
1672                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1673                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1674
1675
1676
1677                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1678                         if (transaction.getArbitrator() != localMachineId) {
1679                                 continue;
1680                         }
1681
1682                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1683                                 continue;
1684                         }
1685
1686                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1687                                 // We have seen this already locally so dont commit again
1688                                 continue;
1689                         }
1690
1691
1692                         if (!transaction.isComplete()) {
1693                                 // Will arbitrate in incorrect order if we continue so just break
1694                                 // Most likely this
1695                                 break;
1696                         }
1697
1698
1699                         // update the largest transaction seen by arbitrator from server
1700                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1701                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1702                         } else {
1703                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1704                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1705                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1706                                 }
1707                         }
1708
1709                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1710                                 // Guard evaluated as true
1711
1712                                 // Update the local changes so we can make the commit
1713                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1714                                         speculativeTableTmp.put(kv.getKey(), kv);
1715                                 }
1716
1717                                 // Update what the last transaction committed was for use in batch commit
1718                                 lastTransactionCommitted = transactionSequenceNumber;
1719                         } else {
1720                                 // Guard evaluated was false so create abort
1721
1722                                 // Create the abort
1723                                 Abort newAbort = new Abort(null,
1724                                                            transaction.getClientLocalSequenceNumber(),
1725                                                            transaction.getSequenceNumber(),
1726                                                            transaction.getMachineId(),
1727                                                            transaction.getArbitrator(),
1728                                                            localArbitrationSequenceNumber);
1729                                 localArbitrationSequenceNumber++;
1730
1731                                 generatedAborts.add(newAbort);
1732
1733                                 // Insert the abort so we can process
1734                                 processEntry(newAbort);
1735                         }
1736
1737                         lastSeqNumArbOn = transactionSequenceNumber;
1738
1739                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1740                 }
1741
1742                 Commit newCommit = null;
1743
1744                 // If there is something to commit
1745                 if (speculativeTableTmp.size() != 0) {
1746
1747                         // Create the commit and increment the commit sequence number
1748                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1749                         localArbitrationSequenceNumber++;
1750
1751                         // Add all the new keys to the commit
1752                         for (KeyValue kv : speculativeTableTmp.values()) {
1753                                 newCommit.addKV(kv);
1754                         }
1755
1756                         // create the commit parts
1757                         newCommit.createCommitParts();
1758
1759                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1760
1761                         // Insert the commit so we can process it
1762                         for (CommitPart commitPart : newCommit.getParts().values()) {
1763                                 processEntry(commitPart);
1764                         }
1765                 }
1766
1767                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1768                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1769                         pendingSendArbitrationRounds.add(arbitrationRound);
1770
1771                         if (compactArbitrationData()) {
1772                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1773                                 if (newArbitrationRound.getCommit() != null) {
1774                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1775                                                 processEntry(commitPart);
1776                                         }
1777                                 }
1778                         }
1779                 }
1780         }
1781
1782         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1783
1784                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1785                 if (transaction.getArbitrator() != localMachineId) {
1786                         return new Pair<Boolean, Boolean>(false, false);
1787                 }
1788
1789                 if (!transaction.isComplete()) {
1790                         // Will arbitrate in incorrect order if we continue so just break
1791                         // Most likely this
1792                         return new Pair<Boolean, Boolean>(false, false);
1793                 }
1794
1795                 if (transaction.getMachineId() != localMachineId) {
1796                         // dont do this check for local transactions
1797                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1798                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1799                                         // We've have already seen this from the server
1800                                         return new Pair<Boolean, Boolean>(false, false);
1801                                 }
1802                         }
1803                 }
1804
1805                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1806                         // Guard evaluated as true
1807
1808                         // Create the commit and increment the commit sequence number
1809                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1810                         localArbitrationSequenceNumber++;
1811
1812                         // Update the local changes so we can make the commit
1813                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1814                                 newCommit.addKV(kv);
1815                         }
1816
1817                         // create the commit parts
1818                         newCommit.createCommitParts();
1819
1820                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1821                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1822                         pendingSendArbitrationRounds.add(arbitrationRound);
1823
1824                         if (compactArbitrationData()) {
1825                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1826                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1827                                         processEntry(commitPart);
1828                                 }
1829                         } else {
1830                                 // Insert the commit so we can process it
1831                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1832                                         processEntry(commitPart);
1833                                 }
1834                         }
1835
1836                         if (transaction.getMachineId() == localMachineId) {
1837                                 TransactionStatus status = transaction.getTransactionStatus();
1838                                 if (status != null) {
1839                                         status.setStatus(TransactionStatus.StatusCommitted);
1840                                 }
1841                         }
1842
1843                         updateLiveStateFromLocal();
1844                         return new Pair<Boolean, Boolean>(true, true);
1845                 } else {
1846
1847                         if (transaction.getMachineId() == localMachineId) {
1848                                 // For locally created messages update the status
1849
1850                                 // Guard evaluated was false so create abort
1851                                 TransactionStatus status = transaction.getTransactionStatus();
1852                                 if (status != null) {
1853                                         status.setStatus(TransactionStatus.StatusAborted);
1854                                 }
1855                         } else {
1856                                 Set addAbortSet = new HashSet<Abort>();
1857
1858
1859                                 // Create the abort
1860                                 Abort newAbort = new Abort(null,
1861                                                            transaction.getClientLocalSequenceNumber(),
1862                                                            -1,
1863                                                            transaction.getMachineId(),
1864                                                            transaction.getArbitrator(),
1865                                                            localArbitrationSequenceNumber);
1866                                 localArbitrationSequenceNumber++;
1867
1868                                 addAbortSet.add(newAbort);
1869
1870
1871                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1872                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1873                                 pendingSendArbitrationRounds.add(arbitrationRound);
1874
1875                                 if (compactArbitrationData()) {
1876                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1877                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1878                                                 processEntry(commitPart);
1879                                         }
1880                                 }
1881                         }
1882
1883                         updateLiveStateFromLocal();
1884                         return new Pair<Boolean, Boolean>(true, false);
1885                 }
1886         }
1887
1888         /**
1889          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1890          */
1891         private boolean compactArbitrationData() {
1892
1893                 if (pendingSendArbitrationRounds.size() < 2) {
1894                         // Nothing to compact so do nothing
1895                         return false;
1896                 }
1897
1898                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1899                 if (lastRound.didSendPart()) {
1900                         return false;
1901                 }
1902
1903                 boolean hadCommit = (lastRound.getCommit() == null);
1904                 boolean gotNewCommit = false;
1905
1906                 int numberToDelete = 1;
1907                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1908                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1909
1910                         if (round.isFull() || round.didSendPart()) {
1911                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1912                                 break;
1913                         }
1914
1915                         if (round.getCommit() == null) {
1916
1917                                 // Try compacting aborts only
1918                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1919                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1920                                         // Cant compact since it would be too large
1921                                         break;
1922                                 }
1923                                 lastRound.addAborts(round.getAborts());
1924                         } else {
1925
1926                                 // Create a new larger commit
1927                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1928                                 localArbitrationSequenceNumber++;
1929
1930                                 // Create the commit parts so that we can count them
1931                                 newCommit.createCommitParts();
1932
1933                                 // Calculate the new size of the parts
1934                                 int newSize = newCommit.getNumberOfParts();
1935                                 newSize += lastRound.getAbortsCount();
1936                                 newSize += round.getAbortsCount();
1937
1938                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1939                                         // Cant compact since it would be too large
1940                                         break;
1941                                 }
1942
1943                                 // Set the new compacted part
1944                                 lastRound.setCommit(newCommit);
1945                                 lastRound.addAborts(round.getAborts());
1946                                 gotNewCommit = true;
1947                         }
1948
1949                         numberToDelete++;
1950                 }
1951
1952                 if (numberToDelete != 1) {
1953                         // If there is a compaction
1954
1955                         // Delete the previous pieces that are now in the new compacted piece
1956                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1957                                 pendingSendArbitrationRounds.clear();
1958                         } else {
1959                                 for (int i = 0; i < numberToDelete; i++) {
1960                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1961                                 }
1962                         }
1963
1964                         // Add the new compacted into the pending to send list
1965                         pendingSendArbitrationRounds.add(lastRound);
1966
1967                         // Should reinsert into the commit processor
1968                         if (hadCommit && gotNewCommit) {
1969                                 return true;
1970                         }
1971                 }
1972
1973                 return false;
1974         }
1975         // private boolean compactArbitrationData() {
1976         //      return false;
1977         // }
1978
1979         /**
1980          * Update all the commits and the committed tables, sets dead the dead transactions
1981          */
1982         private boolean updateCommittedTable() {
1983
1984                 if (newCommitParts.size() == 0) {
1985                         // Nothing new to process
1986                         return false;
1987                 }
1988
1989                 // Iterate through all the machine Ids that we received new parts for
1990                 for (Long machineId : newCommitParts.keySet()) {
1991                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1992
1993                         // Iterate through all the parts for that machine Id
1994                         for (Pair<Long, Integer> partId : parts.keySet()) {
1995                                 CommitPart part = parts.get(partId);
1996
1997                                 // Get the transaction object for that sequence number
1998                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1999
2000                                 if (commitForClientTable == null) {
2001                                         // This is the first commit from this device
2002                                         commitForClientTable = new HashMap<Long, Commit>();
2003                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2004                                 }
2005
2006                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2007
2008                                 if (commit == null) {
2009                                         // This is a new commit that we dont have so make a new one
2010                                         commit = new Commit();
2011
2012                                         // Insert this new commit into the live tables
2013                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2014                                 }
2015
2016                                 // Add that part to the commit
2017                                 commit.addPartDecode(part);
2018                         }
2019                 }
2020
2021                 // Clear all the new commits parts in preparation for the next time the server sends slots
2022                 newCommitParts.clear();
2023
2024                 // If we process a new commit keep track of it for future use
2025                 boolean didProcessANewCommit = false;
2026
2027                 // Process the commits one by one
2028                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2029
2030                         // Get all the commits for a specific arbitrator
2031                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2032
2033                         // Sort the commits in order
2034                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2035                         Collections.sort(commitSequenceNumbers);
2036
2037                         // Get the last commit seen from this arbitrator
2038                         long lastCommitSeenSequenceNumber = -1;
2039                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2040                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2041                         }
2042
2043                         // Go through each new commit one by one
2044                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2045                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2046                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2047
2048                                 // Special processing if a commit is not complete
2049                                 if (!commit.isComplete()) {
2050                                         if (i == (commitSequenceNumbers.size() - 1)) {
2051                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2052                                                 break;
2053                                         } else {
2054                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2055                                                 // Delete it and move on
2056                                                 commit.setDead();
2057                                                 commitForClientTable.remove(commit.getSequenceNumber());
2058                                                 continue;
2059                                         }
2060                                 }
2061
2062                                 // Update the last transaction that was updated if we can
2063                                 if (commit.getTransactionSequenceNumber() != -1) {
2064                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2065
2066                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2067                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2068                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2069                                         }
2070                                 }
2071
2072                                 // Update the last arbitration data that we have seen so far
2073                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2074
2075                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2076                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2077                                                 // Is larger
2078                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2079                                         }
2080                                 } else {
2081                                         // Never seen any data from this arbitrator so record the first one
2082                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2083                                 }
2084
2085                                 // We have already seen this commit before so need to do the full processing on this commit
2086                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2087
2088                                         // Update the last transaction that was updated if we can
2089                                         if (commit.getTransactionSequenceNumber() != -1) {
2090                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2091
2092                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2093                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2094                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2095                                                 }
2096                                         }
2097
2098                                         continue;
2099                                 }
2100
2101                                 // If we got here then this is a brand new commit and needs full processing
2102
2103                                 // Get what commits should be edited, these are the commits that have live values for their keys
2104                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2105                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2106                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2107                                 }
2108                                 commitsToEdit.remove(null); // remove null since it could be in this set
2109
2110                                 // Update each previous commit that needs to be updated
2111                                 for (Commit previousCommit : commitsToEdit) {
2112
2113                                         // Only bother with live commits (TODO: Maybe remove this check)
2114                                         if (previousCommit.isLive()) {
2115
2116                                                 // Update which keys in the old commits are still live
2117                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2118                                                         previousCommit.invalidateKey(kv.getKey());
2119                                                 }
2120
2121                                                 // if the commit is now dead then remove it
2122                                                 if (!previousCommit.isLive()) {
2123                                                         commitForClientTable.remove(previousCommit);
2124                                                 }
2125                                         }
2126                                 }
2127
2128                                 // Update the last seen sequence number from this arbitrator
2129                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2130                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2131                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2132                                         }
2133                                 } else {
2134                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2135                                 }
2136
2137                                 // We processed a new commit that we havent seen before
2138                                 didProcessANewCommit = true;
2139
2140                                 // Update the committed table of keys and which commit is using which key
2141                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2142                                         committedKeyValueTable.put(kv.getKey(), kv);
2143                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2144                                 }
2145                         }
2146                 }
2147
2148                 return didProcessANewCommit;
2149         }
2150
2151         /**
2152          * Create the speculative table from transactions that are still live and have come from the cloud
2153          */
2154         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2155                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2156                         // There is nothing to speculate on
2157                         return false;
2158                 }
2159
2160                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2161                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2162                 Collections.sort(transactionSequenceNumbersSorted);
2163
2164                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2165
2166
2167                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2168                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2169                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2170
2171                         // Start from scratch
2172                         speculatedKeyValueTable.clear();
2173                         lastTransactionSequenceNumberSpeculatedOn = -1;
2174                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2175
2176                 }
2177
2178                 // Remember the front of the transaction list
2179                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2180
2181                 // Find where to start arbitration from
2182                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2183
2184                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2185                         // Make sure we are not out of bounds
2186                         return false; // did not speculate
2187                 }
2188
2189                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2190                 boolean didSkip = true;
2191
2192                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2193                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2194                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2195
2196                         if (!transaction.isComplete()) {
2197                                 // If there is an incomplete transaction then there is nothing we can do
2198                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2199                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2200                                 didSkip = true;
2201                                 continue;
2202                         }
2203
2204                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2205                                 continue;
2206                         }
2207
2208                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2209
2210                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2211                                 // Guard evaluated to true so update the speculative table
2212                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2213                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2214                                 }
2215                         }
2216                 }
2217
2218                 if (didSkip) {
2219                         // Since there was a skip we need to redo the speculation next time around
2220                         lastTransactionSequenceNumberSpeculatedOn = -1;
2221                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2222                 }
2223
2224                 // We did some speculation
2225                 return true;
2226         }
2227
2228         /**
2229          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2230          */
2231         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2232                 if (pendingTransactionQueue.size() == 0) {
2233                         // There is nothing to speculate on
2234                         return;
2235                 }
2236
2237
2238                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2239                         // need to reset on the pending speculation
2240                         lastPendingTransactionSpeculatedOn = null;
2241                         firstPendingTransaction = pendingTransactionQueue.get(0);
2242                         pendingTransactionSpeculatedKeyValueTable.clear();
2243                 }
2244
2245                 // Find where to start arbitration from
2246                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2247
2248                 if (startIndex >= pendingTransactionQueue.size()) {
2249                         // Make sure we are not out of bounds
2250                         return;
2251                 }
2252
2253                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2254                         Transaction transaction = pendingTransactionQueue.get(i);
2255
2256                         lastPendingTransactionSpeculatedOn = transaction;
2257
2258                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2259                                 // Guard evaluated to true so update the speculative table
2260                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2261                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2262                                 }
2263                         }
2264                 }
2265         }
2266
2267         /**
2268          * Set dead and remove from the live transaction tables the transactions that are dead
2269          */
2270         private void updateLiveTransactionsAndStatus() {
2271
2272                 // Go through each of the transactions
2273                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2274                         Transaction transaction = iter.next().getValue();
2275
2276                         // Check if the transaction is dead
2277                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2278                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2279
2280                                 // Set dead the transaction
2281                                 transaction.setDead();
2282
2283                                 // Remove the transaction from the live table
2284                                 iter.remove();
2285                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2286                         }
2287                 }
2288
2289                 // Go through each of the transactions
2290                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2291                         TransactionStatus status = iter.next().getValue();
2292
2293                         // Check if the transaction is dead
2294                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2295                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2296
2297                                 // Set committed
2298                                 status.setStatus(TransactionStatus.StatusCommitted);
2299
2300                                 // Remove
2301                                 iter.remove();
2302                         }
2303                 }
2304         }
2305
2306         /**
2307          * Process this slot, entry by entry.  Also update the latest message sent by slot
2308          */
2309         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2310
2311                 // Update the last message seen
2312                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2313
2314                 // Process each entry in the slot
2315                 for (Entry entry : slot.getEntries()) {
2316                         switch (entry.getType()) {
2317
2318                         case Entry.TypeCommitPart:
2319                                 processEntry((CommitPart)entry);
2320                                 break;
2321
2322                         case Entry.TypeAbort:
2323                                 processEntry((Abort)entry);
2324                                 break;
2325
2326                         case Entry.TypeTransactionPart:
2327                                 processEntry((TransactionPart)entry);
2328                                 break;
2329
2330                         case Entry.TypeNewKey:
2331                                 processEntry((NewKey)entry);
2332                                 break;
2333
2334                         case Entry.TypeLastMessage:
2335                                 processEntry((LastMessage)entry, machineSet);
2336                                 break;
2337
2338                         case Entry.TypeRejectedMessage:
2339                                 processEntry((RejectedMessage)entry, indexer);
2340                                 break;
2341
2342                         case Entry.TypeTableStatus:
2343                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2344                                 break;
2345
2346                         default:
2347                                 throw new Error("Unrecognized type: " + entry.getType());
2348                         }
2349                 }
2350         }
2351
2352         /**
2353          * Update the last message that was sent for a machine Id
2354          */
2355         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2356                 // Update what the last message received by a machine was
2357                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2358         }
2359
2360         /**
2361          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2362          */
2363         private void processEntry(NewKey entry) {
2364
2365                 // Update the arbitrator table with the new key information
2366                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2367
2368                 // Update what the latest live new key is
2369                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2370                 if (oldNewKey != null) {
2371                         // Delete the old new key messages
2372                         oldNewKey.setDead();
2373                 }
2374         }
2375
2376         /**
2377          * Process new table status entries and set dead the old ones as new ones come in.
2378          * keeps track of the largest and smallest table status seen in this current round
2379          * of updating the local copy of the block chain
2380          */
2381         private void processEntry(TableStatus entry, long seq) {
2382                 int newNumSlots = entry.getMaxSlots();
2383                 updateCurrMaxSize(newNumSlots);
2384
2385                 initExpectedSize(seq, newNumSlots);
2386
2387                 if (liveTableStatus != null) {
2388                         // We have a larger table status so the old table status is no longer alive
2389                         liveTableStatus.setDead();
2390                 }
2391
2392                 // Make this new table status the latest alive table status
2393                 liveTableStatus = entry;
2394         }
2395
2396         /**
2397          * Check old messages to see if there is a block chain violation. Also
2398          */
2399         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2400                 long oldSeqNum = entry.getOldSeqNum();
2401                 long newSeqNum = entry.getNewSeqNum();
2402                 boolean isequal = entry.getEqual();
2403                 long machineId = entry.getMachineID();
2404                 long seq = entry.getSequenceNumber();
2405
2406
2407                 // Check if we have messages that were supposed to be rejected in our local block chain
2408                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2409
2410                         // Get the slot
2411                         Slot slot = indexer.getSlot(seqNum);
2412
2413                         if (slot != null) {
2414                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2415
2416                                 long slotMachineId = slot.getMachineID();
2417                                 if (isequal != (slotMachineId == machineId)) {
2418                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2419                                 }
2420                         }
2421                 }
2422
2423
2424                 // Create a list of clients to watch until they see this rejected message entry.
2425                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2426                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2427
2428                         // Machine ID for the last message entry
2429                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2430
2431                         // We've seen it, don't need to continue to watch.  Our next
2432                         // message will implicitly acknowledge it.
2433                         if (lastMessageEntryMachineId == localMachineId) {
2434                                 continue;
2435                         }
2436
2437                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2438                         long entrySequenceNumber = lastMessageValue.getFirst();
2439
2440                         if (entrySequenceNumber < seq) {
2441
2442                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2443                                 addWatchList(lastMessageEntryMachineId, entry);
2444
2445                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2446                                 deviceWatchSet.add(lastMessageEntryMachineId);
2447                         }
2448                 }
2449
2450                 if (deviceWatchSet.isEmpty()) {
2451                         // This rejected message has been seen by all the clients so
2452                         entry.setDead();
2453                 } else {
2454                         // We need to watch this rejected message
2455                         entry.setWatchSet(deviceWatchSet);
2456                 }
2457         }
2458
2459         /**
2460          * Check if this abort is live, if not then save it so we can kill it later.
2461          * update the last transaction number that was arbitrated on.
2462          */
2463         private void processEntry(Abort entry) {
2464
2465
2466                 if (entry.getTransactionSequenceNumber() != -1) {
2467                         // update the transaction status if it was sent to the server
2468                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2469                         if (status != null) {
2470                                 status.setStatus(TransactionStatus.StatusAborted);
2471                         }
2472                 }
2473
2474                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2475                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2476                 if (previouslySeenAbort != null) {
2477                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2478                 }
2479
2480                 if (entry.getTransactionArbitrator() == localMachineId) {
2481                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2482                 }
2483
2484                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2485
2486                         // The machine already saw this so it is dead
2487                         entry.setDead();
2488                         liveAbortTable.remove(entry.getAbortId());
2489
2490                         if (entry.getTransactionArbitrator() == localMachineId) {
2491                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2492                         }
2493
2494                         return;
2495                 }
2496
2497
2498
2499
2500                 // Update the last arbitration data that we have seen so far
2501                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2502
2503                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2504                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2505                                 // Is larger
2506                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2507                         }
2508                 } else {
2509                         // Never seen any data from this arbitrator so record the first one
2510                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2511                 }
2512
2513
2514                 // Set dead a transaction if we can
2515                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2516                 if (transactionToSetDead != null) {
2517                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2518                 }
2519
2520                 // Update the last transaction sequence number that the arbitrator arbitrated on
2521                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2522                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2523
2524                         // Is a valid one
2525                         if (entry.getTransactionSequenceNumber() != -1) {
2526                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2527                         }
2528                 }
2529         }
2530
2531         /**
2532          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2533          */
2534         private void processEntry(TransactionPart entry) {
2535                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2536                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2537                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2538                         // This transaction is dead, it was already committed or aborted
2539                         entry.setDead();
2540                         return;
2541                 }
2542
2543                 // This part is still alive
2544                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2545
2546                 if (transactionPart == null) {
2547                         // Dont have a table for this machine Id yet so make one
2548                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2549                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2550                 }
2551
2552                 // Update the part and set dead ones we have already seen (got a rescued version)
2553                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2554                 if (previouslySeenPart != null) {
2555                         previouslySeenPart.setDead();
2556                 }
2557         }
2558
2559         /**
2560          * Process new commit entries and save them for future use.  Delete duplicates
2561          */
2562         private void processEntry(CommitPart entry) {
2563
2564
2565                 // Update the last transaction that was updated if we can
2566                 if (entry.getTransactionSequenceNumber() != -1) {
2567                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2568
2569                         // Update the last transaction sequence number that the arbitrator arbitrated on
2570                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2571                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2572                         }
2573                 }
2574
2575
2576
2577
2578                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2579
2580                 if (commitPart == null) {
2581                         // Don't have a table for this machine Id yet so make one
2582                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2583                         newCommitParts.put(entry.getMachineId(), commitPart);
2584                 }
2585
2586                 // Update the part and set dead ones we have already seen (got a rescued version)
2587                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2588                 if (previouslySeenPart != null) {
2589                         previouslySeenPart.setDead();
2590                 }
2591         }
2592
2593         /**
2594          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2595          * Updates the live aborts, removes those that are dead and sets them dead.
2596          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2597          * other clients have not had a rollback on the last message.
2598          */
2599         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2600
2601                 // We have seen this machine ID
2602                 machineSet.remove(machineId);
2603
2604                 // Get the set of rejected messages that this machine Id is has not seen yet
2605                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2606
2607                 // If there is a rejected message that this machine Id has not seen yet
2608                 if (watchset != null) {
2609
2610                         // Go through each rejected message that this machine Id has not seen yet
2611                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2612
2613                                 RejectedMessage rm = rmit.next();
2614
2615                                 // If this machine Id has seen this rejected message...
2616                                 if (rm.getSequenceNumber() <= seqNum) {
2617
2618                                         // Remove it from our watchlist
2619                                         rmit.remove();
2620
2621                                         // Decrement machines that need to see this notification
2622                                         rm.removeWatcher(machineId);
2623                                 }
2624                         }
2625                 }
2626
2627                 // Set dead the abort
2628                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2629                         Abort abort = i.next().getValue();
2630
2631                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2632                                 abort.setDead();
2633                                 i.remove();
2634
2635                                 if (abort.getTransactionArbitrator() == localMachineId) {
2636                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2637                                 }
2638                         }
2639                 }
2640
2641
2642
2643                 if (machineId == localMachineId) {
2644                         // Our own messages are immediately dead.
2645                         if (liveness instanceof LastMessage) {
2646                                 ((LastMessage)liveness).setDead();
2647                         } else if (liveness instanceof Slot) {
2648                                 ((Slot)liveness).setDead();
2649                         } else {
2650                                 throw new Error("Unrecognized type");
2651                         }
2652                 }
2653
2654                 // Get the old last message for this device
2655                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2656                 if (lastMessageEntry == null) {
2657                         // If no last message then there is nothing else to process
2658                         return;
2659                 }
2660
2661                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2662                 Liveness lastEntry = lastMessageEntry.getSecond();
2663
2664                 // If it is not our machine Id since we already set ours to dead
2665                 if (machineId != localMachineId) {
2666                         if (lastEntry instanceof LastMessage) {
2667                                 ((LastMessage)lastEntry).setDead();
2668                         } else if (lastEntry instanceof Slot) {
2669                                 ((Slot)lastEntry).setDead();
2670                         } else {
2671                                 throw new Error("Unrecognized type");
2672                         }
2673                 }
2674
2675                 // Make sure the server is not playing any games
2676                 if (machineId == localMachineId) {
2677
2678                         if (hadPartialSendToServer) {
2679                                 // We were not making any updates and we had a machine mismatch
2680                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2681                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2682                                 }
2683
2684                         } else {
2685                                 // We were not making any updates and we had a machine mismatch
2686                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2687                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2688                                 }
2689                         }
2690                 } else {
2691                         if (lastMessageSeqNum > seqNum) {
2692                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2693                         }
2694                 }
2695         }
2696
2697         /**
2698          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2699          * rejected message entry and which have not.
2700          */
2701         private void addWatchList(long machineId, RejectedMessage entry) {
2702                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2703                 if (entries == null) {
2704                         // There is no set for this machine ID yet so create one
2705                         entries = new HashSet<RejectedMessage>();
2706                         rejectedMessageWatchListTable.put(machineId, entries);
2707                 }
2708                 entries.add(entry);
2709         }
2710
2711         /**
2712          * Check if the HMAC chain is not violated
2713          */
2714         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2715                 for (int i = 0; i < newSlots.length; i++) {
2716                         Slot currSlot = newSlots[i];
2717                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2718                         if (prevSlot != null &&
2719                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2720                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2721                 }
2722         }
2723 }