From: Ali Younis Date: Wed, 8 Feb 2017 00:28:54 +0000 (-0800) Subject: Fixed bug X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=45b3af9d75b19075a2312f5f4b29f5d85763a6d2;hp=b4be52a8821c74d7d9dae03a0e5d655c1bba9bda;p=iotcloud.git Fixed bug --- diff --git a/version1/src/java/iotcloud/CloudComm.java b/version1/src/java/iotcloud/CloudComm.java index ac906b1..5a55f58 100644 --- a/version1/src/java/iotcloud/CloudComm.java +++ b/version1/src/java/iotcloud/CloudComm.java @@ -24,7 +24,7 @@ class CloudComm { static final int SALT_SIZE = 8; byte salt[]; Table table; - + /** * Empty Constructor needed for child class. */ @@ -37,8 +37,8 @@ class CloudComm { */ CloudComm(Table _table, String _baseurl, String _password) { - this.table=_table; - this.baseurl=_baseurl; + this.table = _table; + this.baseurl = _baseurl; this.password = _password; this.random = new SecureRandom(); } @@ -64,13 +64,13 @@ class CloudComm { private void initCrypt() { try { - SecretKeySpec key=initKey(); + SecretKeySpec key = initKey(); password = null; // drop password mac = Mac.getInstance("HmacSHA256"); mac.init(key); - encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding"); + encryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); encryptCipher.init(Cipher.ENCRYPT_MODE, key); - decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding"); + decryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); decryptCipher.init(Cipher.DECRYPT_MODE, key); } catch (Exception e) { e.printStackTrace(); @@ -83,27 +83,27 @@ class CloudComm { */ private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException { - String reqstring=isput?"req=putslot":"req=getslot"; - String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber; + String reqstring = isput ? "req=putslot" : "req=getslot"; + String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber; if (maxentries != 0) - urlstr += "&max="+maxentries; + urlstr += "&max=" + maxentries; return new URL(urlstr); } - + public void setSalt() { try { salt = new byte[SALT_SIZE]; random.nextBytes(salt); - URL url=new URL(baseurl+"?req=setsalt"); - URLConnection con=url.openConnection(); + URL url = new URL(baseurl + "?req=setsalt"); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.setFixedLengthStreamingMode(salt.length); http.setDoOutput(true); http.connect(); - OutputStream os=http.getOutputStream(); + OutputStream os = http.getOutputStream(); os.write(salt); - int responsecode=http.getResponseCode(); + int responsecode = http.getResponseCode(); if (responsecode != HttpURLConnection.HTTP_OK) throw new Error("Invalid response"); } catch (Exception e) { @@ -114,20 +114,20 @@ class CloudComm { } private void getSalt() throws Exception { - URL url=new URL(baseurl+"?req=getsalt"); - URLConnection con=url.openConnection(); + URL url = new URL(baseurl + "?req=getsalt"); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.connect(); - - InputStream is=http.getInputStream(); - DataInputStream dis=new DataInputStream(is); - int salt_length=dis.readInt(); - byte [] tmp=new byte[salt_length]; + + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + int salt_length = dis.readInt(); + byte [] tmp = new byte[salt_length]; dis.readFully(tmp); - salt=tmp; + salt = tmp; } - + /* * API for putting a slot into the queue. Returns null on success. * On failure, the server will send slots with newer sequence @@ -140,13 +140,13 @@ class CloudComm { getSalt(); initCrypt(); } - - long sequencenumber=slot.getSequenceNumber(); - byte[] bytes=slot.encode(mac); + + long sequencenumber = slot.getSequenceNumber(); + byte[] bytes = slot.encode(mac); bytes = encryptCipher.doFinal(bytes); - URL url=buildRequest(true, sequencenumber, max); - URLConnection con=url.openConnection(); + URL url = buildRequest(true, sequencenumber, max); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); @@ -154,12 +154,12 @@ class CloudComm { http.setDoOutput(true); http.connect(); - OutputStream os=http.getOutputStream(); + OutputStream os = http.getOutputStream(); os.write(bytes); - InputStream is=http.getInputStream(); - DataInputStream dis=new DataInputStream(is); - byte[] resptype=new byte[7]; + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + byte[] resptype = new byte[7]; dis.readFully(resptype); if (Arrays.equals(resptype, "getslot".getBytes())) return processSlots(dis); @@ -184,20 +184,20 @@ class CloudComm { getSalt(); initCrypt(); } - - URL url=buildRequest(false, sequencenumber, 0); - URLConnection con=url.openConnection(); + + URL url = buildRequest(false, sequencenumber, 0); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.connect(); - InputStream is=http.getInputStream(); + InputStream is = http.getInputStream(); + + DataInputStream dis = new DataInputStream(is); - DataInputStream dis=new DataInputStream(is); - - byte[] resptype=new byte[7]; + byte[] resptype = new byte[7]; dis.readFully(resptype); if (!Arrays.equals(resptype, "getslot".getBytes())) - throw new Error("Bad Response: "+new String(resptype)); + throw new Error("Bad Response: " + new String(resptype)); else return processSlots(dis); } catch (Exception e) { @@ -212,19 +212,19 @@ class CloudComm { */ private Slot[] processSlots(DataInputStream dis) throws Exception { - int numberofslots=dis.readInt(); - int[] sizesofslots=new int[numberofslots]; - Slot[] slots=new Slot[numberofslots]; - for(int i=0; i> lastTransactionPartsSent = null; + private List lastPendingSendArbitrationEntriesToDelete = null; + private NewKey lastNewKey = null; + + /* Data Structures */ private Map committedKeyValueTable = null; // Table of committed key value pairs private Map speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value @@ -183,7 +191,14 @@ final public class Table { System.out.println("Old: " + o); System.out.println("New: " + n); System.out.println("Size: " + buffer.size()); - System.out.println("Commits: " + liveCommitsTable.size()); + // System.out.println("Commits: " + liveCommitsTable.size()); + System.out.println("pendingTrans: " + pendingTransactionQueue.size()); + System.out.println("Trans Status Out: " + outstandingTransactionStatus.size()); + + for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) { + System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k)); + } + for (Long a : liveCommitsTable.keySet()) { for (Long b : liveCommitsTable.get(a).keySet()) { @@ -231,19 +246,19 @@ final public class Table { validateAndUpdate(newslots, true); } -// public String toString() { -// String retString = " Committed Table: \n"; -// retString += "---------------------------\n"; -// retString += commitedTable.toString(); + // public String toString() { + // String retString = " Committed Table: \n"; + // retString += "---------------------------\n"; + // retString += commitedTable.toString(); -// retString += "\n\n"; + // retString += "\n\n"; -// retString += " Speculative Table: \n"; -// retString += "---------------------------\n"; -// retString += speculativeTable.toString(); + // retString += " Speculative Table: \n"; + // retString += "---------------------------\n"; + // retString += speculativeTable.toString(); -// return retString; -// } + // return retString; + // } public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) { localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); @@ -343,6 +358,9 @@ final public class Table { validateAndUpdate(newSlots, false); sendToServer(null); + + updateLiveTransactionsAndStatus(); + return true; } catch (Exception e) { // e.printStackTrace(); @@ -474,11 +492,215 @@ final public class Table { bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); } + + boolean lastInsertedNewKey = false; + private boolean sendToServer(NewKey newKey) throws ServerException { + boolean fromRetry = false; + + try { + if (hadPartialSendToServer) { + Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + if (newSlots.length == 0) { + fromRetry = true; + ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + + if (sendSlotsReturn.getFirst()) { + if (newKey != null) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = null; + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } + } + } else { + + newSlots = sendSlotsReturn.getThird(); + + boolean isInserted = false; + for (Slot s : newSlots) { + if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (Slot s : newSlots) { + if (isInserted) { + break; + } + + // Process each entry in the slot + for (Entry entry : s.getEntries()) { + + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + isInserted = true; + break; + } + } + } + } + + if (isInserted) { + if (newKey != null) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = null; + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } else { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + } + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + + if (sendSlotsReturn.getThird().length != 0) { + // insert into the local block chain + validateAndUpdate(sendSlotsReturn.getThird(), true); + } + // continue; + } else { + boolean isInserted = false; + for (Slot s : newSlots) { + if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (Slot s : newSlots) { + if (isInserted) { + break; + } + + // Process each entry in the slot + for (Entry entry : s.getEntries()) { + + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + isInserted = true; + break; + } + } + } + } + + if (isInserted) { + if (newKey != null) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = null; + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } else { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + } + } else { + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + } + + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + } + } catch (ServerException e) { + throw e; + } + + try { // While we have stuff that needs inserting into the block chain while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) { + fromRetry = false; + + if (hadPartialSendToServer) { + throw new Error("Should Be error free"); + } + + + + // If there is a new key with same name then end + if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) { + System.out.println("New Key Fail"); + return false; + } // Create the slot Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC()); @@ -495,7 +717,7 @@ final public class Table { transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } @@ -508,13 +730,22 @@ final public class Table { fillSlot(slot, true, newKey); } - // Try to send to the server + lastSlotAttemptedToSend = slot; + lastIsNewKey = (newKey != null); + lastInsertedNewKey = insertedNewKey; + lastNewSize = newSize; + lastNewKey = newKey; + lastTransactionPartsSent = new HashMap>(transactionPartsSent); + lastPendingSendArbitrationEntriesToDelete = new ArrayList(pendingSendArbitrationEntriesToDelete); + + ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null); - if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) { + if (sendSlotsReturn.getFirst()) { + // Did insert into the block chain - if (sendSlotsReturn.getFirst()) { + if (insertedNewKey) { // This slot was what was inserted not a previous slot // New Key was successfully inserted into the block chain so dont want to insert it again @@ -548,16 +779,50 @@ final public class Table { if (transaction.didSendAllParts()) { transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); pendingTransactionQueue.remove(transaction); + + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + slot.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); + } } } } else { + + // if (!sendSlotsReturn.getSecond()) { + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + // } + // } else { + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + + // // Update which transactions parts still need to be sent + // transaction.removeSentParts(transactionPartsSent.get(transaction)); + + // // Add the transaction status to the outstanding list + // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // // Update the transaction status + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // // Check if all the transaction parts were successfully sent and if so then remove it from pending + // if (transaction.didSendAllParts()) { + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + // pendingTransactionQueue.remove(transaction); + + // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); + // } + // } + // } + // } + // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); - transaction.resetNextPartToSend(); + // transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } @@ -572,9 +837,15 @@ final public class Table { validateAndUpdate(sendSlotsReturn.getThird(), true); } } + } catch (ServerException e) { - // System.out.println("Server Failure: " + e.getType()); + System.out.println("Server Failure: " + e.getType()); + for (Transaction transaction : transactionPartsSent.keySet()) { + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Sent Error: " + kv + " " + e.getType()); + } + } if (e.getType() != ServerException.TypeInputTimeout) { // e.printStackTrace(); @@ -584,7 +855,7 @@ final public class Table { transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } @@ -592,6 +863,12 @@ final public class Table { // There was a partial send to the server hadPartialSendToServer = true; + + // if (!fromRetry) { + // lastTransactionPartsSent = new HashMap>(transactionPartsSent); + // lastPendingSendArbitrationEntriesToDelete = new ArrayList(pendingSendArbitrationEntriesToDelete); + // } + // Nothing was able to be sent to the server so just clear these data structures for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); @@ -665,7 +942,7 @@ final public class Table { if (localCommunicationInformation == null) { // Cant talk to that device locally so do nothing - return new Pair(false, false); + return new Pair(true, false); } // Get the size of the send data @@ -989,10 +1266,15 @@ final public class Table { Transaction transaction = pendingTransactionQueue.get(0); // Set the transaction sequence number if it has yet to be inserted into the block chain - if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { + // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { + // transaction.setSequenceNumber(slot.getSequenceNumber()); + // } + + if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { transaction.setSequenceNumber(slot.getSequenceNumber()); } + while (true) { TransactionPart part = transaction.getNextPartToSend(); @@ -1241,8 +1523,6 @@ final public class Table { updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); } - - private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) { if (didFindTableStatus) { return; @@ -1254,8 +1534,7 @@ final public class Table { private void updateExpectedSize() { expectedsize++; - if (expectedsize > currMaxSize) - { + if (expectedsize > currMaxSize) { expectedsize = currMaxSize; } } @@ -1274,7 +1553,7 @@ final public class Table { } private void updateCurrMaxSize(int newmaxsize) { - currMaxSize=newmaxsize; + currMaxSize = newmaxsize; } @@ -1282,7 +1561,7 @@ final public class Table { * Update the size of of the local buffer if it is needed. */ private void commitNewMaxSize() { - didFindTableStatus = false; + didFindTableStatus = false; // Resize the local slot buffer if (numberOfSlots != currMaxSize) { @@ -1342,6 +1621,9 @@ final public class Table { newTransactionParts.clear(); } + + private long lastSeqNumArbOn = 0; + private void arbitrateFromServer() { if (liveTransactionBySequenceNumberTable.size() == 0) { @@ -1363,11 +1645,25 @@ final public class Table { for (Long transactionSequenceNumber : transactionSequenceNumbers) { Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Arb Seen: " + kv + " " + lastSeqNumArbOn + " " + transactionSequenceNumber + " " + localMachineId + " " + transaction.getArbitrator()); + } + + // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { continue; } + if (transactionSequenceNumber < lastSeqNumArbOn) { + continue; + } + + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Arb Seen: " + kv + " " + lastSeqNumArbOn + " " + transactionSequenceNumber + " " + localMachineId); + } + + if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { // We have seen this already locally so dont commit again continue; @@ -1380,6 +1676,11 @@ final public class Table { break; } + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Arb on: " + kv + " " + lastSeqNumArbOn + " " + transactionSequenceNumber + " " + localMachineId); + } + + // update the largest transaction seen by arbitrator from server if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) { lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); @@ -1399,7 +1700,9 @@ final public class Table { } // Update what the last transaction committed was for use in batch commit - lastTransactionCommitted = transaction.getSequenceNumber(); + lastTransactionCommitted = transactionSequenceNumber; + + System.out.println("Commit Generated: " + lastTransactionCommitted + " " + localMachineId); } else { // Guard evaluated was false so create abort @@ -1417,6 +1720,10 @@ final public class Table { // Insert the abort so we can process processEntry(newAbort); } + + lastSeqNumArbOn = transactionSequenceNumber; + + // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber); } Commit newCommit = null; @@ -1739,6 +2046,30 @@ final public class Table { } } + // Update the last transaction that was updated if we can + if (commit.getTransactionSequenceNumber() != -1) { + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); + + // Update the last transaction sequence number that the arbitrator arbitrated on + if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); + } + } + + + for (KeyValue kv : commit.getKeyValueUpdateSet()) { + System.out.println("Commit Seen: " + kv + " " + commit.getTransactionSequenceNumber() + " " + localMachineId); + } + + + + + + + + + + // Update the last arbitration data that we have seen so far if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) { @@ -1804,16 +2135,6 @@ final public class Table { lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); } - // Update the last transaction that was updated if we can - if (commit.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); - } - } - // We processed a new commit that we havent seen before didProcessANewCommit = true; @@ -2240,6 +2561,21 @@ final public class Table { * Process new commit entries and save them for future use. Delete duplicates */ private void processEntry(CommitPart entry) { + + + // Update the last transaction that was updated if we can + if (entry.getTransactionSequenceNumber() != -1) { + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId()); + + // Update the last transaction sequence number that the arbitrator arbitrated on + if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber()); + } + } + + + + Map, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); if (commitPart == null) { diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index bfe0f87..99babcb 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -11,7 +11,7 @@ import java.util.ArrayList; public class Test { - public static final int NUMBER_OF_TESTS = 1000; + public static final int NUMBER_OF_TESTS = 15; public static void main(String[] args) throws ServerException { if (args[0].equals("2")) { @@ -144,18 +144,23 @@ public class Test { IoTString iValueCPrev = new IoTString(valueCPrev); IoTString iValueDPrev = new IoTString(valueDPrev); + + System.out.println("t1 A"); t1.startTransaction(); t1.addKV(iKeyA, iValueA); transStatusList.add(t1.commitTransaction()); + System.out.println("t1 B"); t1.startTransaction(); t1.addKV(iKeyB, iValueB); transStatusList.add(t1.commitTransaction()); + System.out.println("t2 C"); t2.startTransaction(); t2.addKV(iKeyC, iValueC); transStatusList.add(t2.commitTransaction()); + System.out.println("t2 D"); t2.startTransaction(); t2.addKV(iKeyD, iValueD); transStatusList.add(t2.commitTransaction()); @@ -849,10 +854,14 @@ public class Test { } } + int count = 0; for (TransactionStatus status : transStatusList) { if (status.getStatus() != TransactionStatus.StatusCommitted) { foundError = true; + System.out.println("Status: " + status.getStatus() + " " + status.getTransactionSequenceNumber()); } + + count++; } if (foundError) { @@ -863,6 +872,14 @@ public class Test { t1.close(); t2.close(); + + System.out.println(); + System.out.println(); + t1.printSlots(); + + System.out.println(); + System.out.println(); + t2.printSlots(); } static void test7() throws ServerException { @@ -1880,40 +1897,22 @@ public class Test { IoTString iValueD = new IoTString(valueD); - System.out.println("==============================================================================="); - System.out.println("AAAAAAAA"); - System.out.println("==============================================================================="); t1.startTransaction(); t1.addKV(iKeyA, iValueA); transStatusList.add(t1.commitTransaction()); - System.out.println(); - System.out.println("==============================================================================="); - System.out.println("BBBBBBB"); - System.out.println("==============================================================================="); t1.startTransaction(); t1.addKV(iKeyB, iValueB); transStatusList.add(t1.commitTransaction()); - System.out.println(); - - System.out.println("==============================================================================="); - System.out.println("CCCCCCC"); - System.out.println("==============================================================================="); t2.startTransaction(); t2.addKV(iKeyC, iValueC); transStatusList.add(t2.commitTransaction()); - System.out.println(); - - System.out.println("==============================================================================="); - System.out.println("DDDDDDDDDD"); - System.out.println("==============================================================================="); t2.startTransaction(); t2.addKV(iKeyD, iValueD); transStatusList.add(t2.commitTransaction()); - System.out.println(); } endTime = System.currentTimeMillis(); @@ -2014,12 +2013,12 @@ public class Test { System.out.println("No Errors Found..."); } - System.out.println(); - System.out.println(); - t1.printSlots(); + // System.out.println(); + // System.out.println(); + // t1.printSlots(); - System.out.println(); - System.out.println(); - t2.printSlots(); + // System.out.println(); + // System.out.println(); + // t2.printSlots(); } } diff --git a/version2/src/java/iotcloud/Transaction.java b/version2/src/java/iotcloud/Transaction.java index 8494625..e25d068 100644 --- a/version2/src/java/iotcloud/Transaction.java +++ b/version2/src/java/iotcloud/Transaction.java @@ -171,9 +171,11 @@ class Transaction { public void removeSentParts(List sentParts) { nextPartToSend = 0; - partsPendingSend.removeAll(sentParts); - didSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); + if(partsPendingSend.removeAll(sentParts)) + { + didSendAPartToServer = true; + transactionStatus.setTransactionSequenceNumber(sequenceNumber); + } } public boolean didSendAllParts() {