Updates
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index be9defb91cef43d5921db27bd9f0ed3e9e3bb02e..1d097c77d07204f9783cf77a9748d3cd843f1fdd 100644 (file)
@@ -72,6 +72,7 @@ final public class Table {
        private List<Commit> pendingCommitsToDelete = null;
        private Map<Long, LocalComm> localCommunicationChannels;
        private Map<Long, TransactionStatus> transactionStatusMap = null;
+       private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
 
 
        public Table(String hostname, String baseurl, String password, long _localmachineid) {
@@ -116,6 +117,7 @@ final public class Table {
                pendingCommitsToDelete = new LinkedList<Commit>();
                localCommunicationChannels = new HashMap<Long, LocalComm>();
                transactionStatusMap = new HashMap<Long, TransactionStatus>();
+               transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
        }
 
        public void initTable() throws ServerException {
@@ -325,10 +327,15 @@ final public class Table {
                }
        }
 
-       public void update() {
+       public Pair<Boolean, Boolean> update() {
+
+               boolean gotLatestFromServer = false;
+               boolean didSendLocal = false;
+
                try {
                        Slot[] newslots = cloud.getSlots(sequencenumber + 1);
                        validateandupdate(newslots, false);
+                       gotLatestFromServer = true;
 
                        if (!pendingTransQueue.isEmpty()) {
 
@@ -340,9 +347,61 @@ final public class Table {
                                updateWithNotPendingTrans();
                        }
 
+                       didSendLocal = true;
+
                } catch (Exception e) {
                        // could not update so do nothing
                }
+
+               return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
+       }
+
+       public Boolean updateFromLocal(long arb) {
+               LocalComm lc = localCommunicationChannels.get(arb);
+               if (lc == null) {
+                       // Cant talk directly to arbitrator so cant do anything
+                       return false;
+               }
+
+               byte[] array = new byte[Long.BYTES ];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
+               if (lastSeenCommit != null) {
+                       bbEncode.putLong(lastSeenCommit);
+               } else {
+                       bbEncode.putLong(0);
+               }
+
+               byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               boolean didCommit = bbDecode.get() == 1;
+               int numberOfCommites = bbDecode.getInt();
+
+               List<Commit> newCommits = new LinkedList<Commit>();
+               for (int i = 0; i < numberOfCommites; i++ ) {
+                       bbDecode.get();
+                       Commit com = (Commit)Commit.decode(null, bbDecode);
+                       newCommits.add(com);
+               }
+
+
+               for (Commit commit : newCommits) {
+                       // Prepare to process the commit
+                       processEntry(commit);
+               }
+
+               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
+               // Speculate on key value pairs
+               didCommitOrSpeculate |= createSpeculativeTable();
+               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+               return true;
        }
 
        public void startTransaction() {
@@ -383,7 +442,7 @@ final public class Table {
                        localTransactionSequenceNumber++;
 
                        transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
-                       transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
+                       transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
 
                        // Add the pending transaction to the queue
                        pendingTransQueue.add(pendingTransBuild);
@@ -467,11 +526,9 @@ final public class Table {
                        sentAllPending = true;
                } catch (Exception e) {
                        // There was a connection error
-                       e.printStackTrace();
                        sentAllPending = false;
                }
 
-
                if (!sentAllPending) {
 
                        for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
@@ -507,19 +564,19 @@ final public class Table {
                                didCommitOrSpeculate |= createSpeculativeTable();
                                createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
 
-
                                if (retData.getFirst()) {
-                                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
                                        if (transStatus != null) {
                                                transStatus.setStatus(TransactionStatus.StatusCommitted);
                                        }
 
                                } else {
-                                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
                                        if (transStatus != null) {
                                                transStatus.setStatus(TransactionStatus.StatusAborted);
                                        }
                                }
+                               i.remove();
                        }
                }
        }
@@ -623,9 +680,12 @@ final public class Table {
                // Decode the data
                ByteBuffer bbDecode = ByteBuffer.wrap(data);
                long lastSeenCommit = bbDecode.getLong();
-               bbDecode.get();
-               Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
 
+               Transaction ut = null;
+               if (data.length != Long.BYTES) {
+                       bbDecode.get();
+                       ut = (Transaction)Transaction.decode(null, bbDecode);
+               }
                // Do the local update and arbitrate
                Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
 
@@ -654,11 +714,6 @@ final public class Table {
 
        private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
 
-               if (ut.getArbitrator() != localmachineid) {
-                       // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
-                       return null;
-               }
-
                List<Commit> returnCommits = new ArrayList<Commit>();
 
                if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
@@ -682,6 +737,13 @@ final public class Table {
                        }
                }
 
+
+               if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
+                       // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+                       // or there is no transaction to process
+                       return new Pair<Boolean, List<Commit>>(false, returnCommits);
+               }
+
                if (!ut.evaluateGuard(commitedTable, null)) {
                        // Guard evaluated as false so return only the commits that the other device has not seen yet
                        return new Pair<Boolean, List<Commit>>(false, returnCommits);
@@ -771,7 +833,7 @@ final public class Table {
 
                if (sendRetData.getFirst()) {
                        // update the status and change what the sequence number is for the
-                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+                       TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
                        transStatus.setStatus(TransactionStatus.StatusSent);
                        transStatus.setSentTransaction();
                        transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
@@ -1193,15 +1255,23 @@ final public class Table {
                // Clear the new commits storage so we can use it later
                newCommitMap.clear();
 
+
                // go through all saved transactions and update the status of those that can be updated
                for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
                        Map.Entry<Long, TransactionStatus> entry = i.next();
                        long seqnum = entry.getKey();
                        TransactionStatus status = entry.getValue();
 
-                       if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) {
-                               status.setStatus(TransactionStatus.StatusCommitted);
-                               i.remove();
+                       if (status.getSentTransaction()) {
+
+                               Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
+                               Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
+
+                               if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
+                                       ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
+                                       status.setStatus(TransactionStatus.StatusCommitted);
+                                       i.remove();
+                               }
                        }
                }