private List<Commit> pendingCommitsToDelete = null;
private Map<Long, LocalComm> localCommunicationChannels;
private Map<Long, TransactionStatus> transactionStatusMap = null;
+ private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
public Table(String hostname, String baseurl, String password, long _localmachineid) {
pendingCommitsToDelete = new LinkedList<Commit>();
localCommunicationChannels = new HashMap<Long, LocalComm>();
transactionStatusMap = new HashMap<Long, TransactionStatus>();
+ transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
}
public void initTable() throws ServerException {
}
}
- public void update() {
+ public Pair<Boolean, Boolean> update() {
+
+ boolean gotLatestFromServer = false;
+ boolean didSendLocal = false;
+
try {
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, false);
+ gotLatestFromServer = true;
if (!pendingTransQueue.isEmpty()) {
updateWithNotPendingTrans();
}
+ didSendLocal = true;
+
} catch (Exception e) {
// could not update so do nothing
}
+
+ return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
+ }
+
+ public Boolean updateFromLocal(long arb) {
+ LocalComm lc = localCommunicationChannels.get(arb);
+ if (lc == null) {
+ // Cant talk directly to arbitrator so cant do anything
+ return false;
+ }
+
+ byte[] array = new byte[Long.BYTES ];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
+ if (lastSeenCommit != null) {
+ bbEncode.putLong(lastSeenCommit);
+ } else {
+ bbEncode.putLong(0);
+ }
+
+ byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ boolean didCommit = bbDecode.get() == 1;
+ int numberOfCommites = bbDecode.getInt();
+
+ List<Commit> newCommits = new LinkedList<Commit>();
+ for (int i = 0; i < numberOfCommites; i++ ) {
+ bbDecode.get();
+ Commit com = (Commit)Commit.decode(null, bbDecode);
+ newCommits.add(com);
+ }
+
+
+ for (Commit commit : newCommits) {
+ // Prepare to process the commit
+ processEntry(commit);
+ }
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ return true;
}
public void startTransaction() {
localTransactionSequenceNumber++;
transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
- transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
+ transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
// Add the pending transaction to the queue
pendingTransQueue.add(pendingTransBuild);
sentAllPending = true;
} catch (Exception e) {
// There was a connection error
- e.printStackTrace();
sentAllPending = false;
}
-
if (!sentAllPending) {
for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
didCommitOrSpeculate |= createSpeculativeTable();
createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
-
if (retData.getFirst()) {
- TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
if (transStatus != null) {
transStatus.setStatus(TransactionStatus.StatusCommitted);
}
} else {
- TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
if (transStatus != null) {
transStatus.setStatus(TransactionStatus.StatusAborted);
}
}
+ i.remove();
}
}
}
// Decode the data
ByteBuffer bbDecode = ByteBuffer.wrap(data);
long lastSeenCommit = bbDecode.getLong();
- bbDecode.get();
- Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
+ Transaction ut = null;
+ if (data.length != Long.BYTES) {
+ bbDecode.get();
+ ut = (Transaction)Transaction.decode(null, bbDecode);
+ }
// Do the local update and arbitrate
Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
- if (ut.getArbitrator() != localmachineid) {
- // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
- return null;
- }
-
List<Commit> returnCommits = new ArrayList<Commit>();
if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
}
}
+
+ if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
+ // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+ // or there is no transaction to process
+ return new Pair<Boolean, List<Commit>>(false, returnCommits);
+ }
+
if (!ut.evaluateGuard(commitedTable, null)) {
// Guard evaluated as false so return only the commits that the other device has not seen yet
return new Pair<Boolean, List<Commit>>(false, returnCommits);
if (sendRetData.getFirst()) {
// update the status and change what the sequence number is for the
- TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
transStatus.setStatus(TransactionStatus.StatusSent);
transStatus.setSentTransaction();
transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
// Clear the new commits storage so we can use it later
newCommitMap.clear();
+
// go through all saved transactions and update the status of those that can be updated
for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<Long, TransactionStatus> entry = i.next();
long seqnum = entry.getKey();
TransactionStatus status = entry.getValue();
- if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) {
- status.setStatus(TransactionStatus.StatusCommitted);
- i.remove();
+ if (status.getSentTransaction()) {
+
+ Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
+ Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
+
+ if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
+ ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ i.remove();
+ }
}
}