From 95c8a925e792895f5e4a5da7c08e2c592ee03ee9 Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Tue, 3 Jan 2017 14:41:14 -0800 Subject: [PATCH] API Changes --- version2/src/java/iotcloud/Abort.java | 6 +- version2/src/java/iotcloud/CloudComm.java | 17 +- version2/src/java/iotcloud/Commit.java | 15 +- version2/src/java/iotcloud/Entry.java | 5 +- version2/src/java/iotcloud/LocalComm.java | 22 + .../src/java/iotcloud/PendingTransaction.java | 9 + version2/src/java/iotcloud/Table.java | 872 +++++++++++++----- version2/src/java/iotcloud/Test.java | 751 ++++----------- version2/src/java/iotcloud/Transaction.java | 11 +- .../src/java/iotcloud/TransactionStatus.java | 53 ++ 10 files changed, 964 insertions(+), 797 deletions(-) create mode 100644 version2/src/java/iotcloud/LocalComm.java create mode 100644 version2/src/java/iotcloud/TransactionStatus.java diff --git a/version2/src/java/iotcloud/Abort.java b/version2/src/java/iotcloud/Abort.java index f27c787..327ce33 100644 --- a/version2/src/java/iotcloud/Abort.java +++ b/version2/src/java/iotcloud/Abort.java @@ -15,7 +15,7 @@ class Abort extends Entry { private long transarbitrator; - public Abort(Slot slot, long _seqnumtrans, long _machineid, long _transarbitrator) { + public Abort(Slot slot, long _seqnumtrans, long _machineid, long _transarbitrator) { super(slot); seqnumtrans = _seqnumtrans; machineid = _machineid; @@ -30,16 +30,16 @@ class Abort extends Entry { return seqnumtrans; } + public long getTransArbitrator() { return transarbitrator; } - static Entry decode(Slot slot, ByteBuffer bb) { long seqnumtrans = bb.getLong(); long machineid = bb.getLong(); long transarbitrator = bb.getLong(); - return new Abort(slot, seqnumtrans, machineid, transarbitrator); + return new Abort(slot, seqnumtrans, machineid, transarbitrator); } public void encode(ByteBuffer bb) { diff --git a/version2/src/java/iotcloud/CloudComm.java b/version2/src/java/iotcloud/CloudComm.java index 5ee249d..d7a01d6 100644 --- a/version2/src/java/iotcloud/CloudComm.java +++ b/version2/src/java/iotcloud/CloudComm.java @@ -15,6 +15,7 @@ import java.security.SecureRandom; class CloudComm { + String hostname; String baseurl; Cipher encryptCipher; Cipher decryptCipher; @@ -35,8 +36,9 @@ class CloudComm { /** * Constructor for actual use. Takes in the url and password. */ - CloudComm(Table _table, String _baseurl, String _password) { + CloudComm(Table _table, String _hostname, String _baseurl, String _password) { this.table = _table; + this.hostname = _hostname; this.baseurl = _baseurl; this.password = _password; this.random = new SecureRandom(); @@ -222,6 +224,15 @@ class CloudComm { } } + public boolean hasConnection() { + try { + InetAddress address = InetAddress.getByName(hostname); + return address.isReachable(TIMEOUT_MILLIS); + } catch (Exception e) { + return false; + } + } + /** * Method that actually handles building Slot objects from the * server response. Shared by both putSlot and getSlots. @@ -244,4 +255,8 @@ class CloudComm { dis.close(); return slots; } + + + + } diff --git a/version2/src/java/iotcloud/Commit.java b/version2/src/java/iotcloud/Commit.java index deeb501..fb52e67 100644 --- a/version2/src/java/iotcloud/Commit.java +++ b/version2/src/java/iotcloud/Commit.java @@ -14,14 +14,16 @@ import java.util.Iterator; class Commit extends Entry { private long seqnumtrans; + private long seqnumcommit; private long transarbitrator; private Set keyValueUpdateSet = null; - public Commit(Slot slot, long _seqnumtrans, long _transarbitrator, Set _keyValueUpdateSet) { + public Commit(Slot slot, long _seqnumtrans, long _seqnumcommit, long _transarbitrator, Set _keyValueUpdateSet) { super(slot); seqnumtrans = _seqnumtrans; + seqnumcommit = _seqnumcommit; transarbitrator = _transarbitrator; keyValueUpdateSet = new HashSet(); @@ -35,6 +37,9 @@ class Commit extends Entry { public long getTransSequenceNumber() { return seqnumtrans; } + public long getSequenceNumber() { + return seqnumcommit; + } public long getTransArbitrator() { return transarbitrator; @@ -49,7 +54,7 @@ class Commit extends Entry { } public int getSize() { - int size = 2 * Long.BYTES + Byte.BYTES; // seq id, entry type + int size = 3 * Long.BYTES + Byte.BYTES; // seq id, entry type size += Integer.BYTES; // number of KV's // Size of each KV @@ -62,6 +67,7 @@ class Commit extends Entry { static Entry decode(Slot slot, ByteBuffer bb) { long seqnumtrans = bb.getLong(); + long seqnumcommit = bb.getLong(); long transarbitrator = bb.getLong(); int numberOfKeys = bb.getInt(); @@ -71,12 +77,13 @@ class Commit extends Entry { kvSet.add(kv); } - return new Commit(slot, seqnumtrans, transarbitrator, kvSet); + return new Commit(slot, seqnumtrans, seqnumcommit, transarbitrator, kvSet); } public void encode(ByteBuffer bb) { bb.put(Entry.TypeCommit); bb.putLong(seqnumtrans); + bb.putLong(seqnumcommit); bb.putLong(transarbitrator); bb.putInt(keyValueUpdateSet.size()); @@ -87,7 +94,7 @@ class Commit extends Entry { } public Entry getCopy(Slot s) { - return new Commit(s, seqnumtrans, transarbitrator, keyValueUpdateSet); + return new Commit(s, seqnumtrans, seqnumcommit, transarbitrator, keyValueUpdateSet); } public Set updateLiveKeys(Set kvSet) { diff --git a/version2/src/java/iotcloud/Entry.java b/version2/src/java/iotcloud/Entry.java index c5a6807..8395dec 100644 --- a/version2/src/java/iotcloud/Entry.java +++ b/version2/src/java/iotcloud/Entry.java @@ -86,7 +86,10 @@ abstract class Entry implements Liveness { } islive = false; - parentslot.decrementLiveCount(); + + if (parentslot != null) { + parentslot.decrementLiveCount(); + } } /** diff --git a/version2/src/java/iotcloud/LocalComm.java b/version2/src/java/iotcloud/LocalComm.java new file mode 100644 index 0000000..d6d3491 --- /dev/null +++ b/version2/src/java/iotcloud/LocalComm.java @@ -0,0 +1,22 @@ +package iotcloud; + +class LocalComm { + private Table t1; + private Table t2; + + public LocalComm(Table _t1, Table _t2) { + t1 = _t1; + t2 = _t2; + } + + public byte[] sendDataToLocalDevice(Long deviceId, byte[] data) { + if (deviceId == t1.getId()) { + return t1.localCommInput(data); + } else if (deviceId == t2.getId()) { + return t2.localCommInput(data); + } + else { + throw new Error("Cannot send to " + deviceId + " using this local comm"); + } + } +} \ No newline at end of file diff --git a/version2/src/java/iotcloud/PendingTransaction.java b/version2/src/java/iotcloud/PendingTransaction.java index 578f1eb..1a14674 100644 --- a/version2/src/java/iotcloud/PendingTransaction.java +++ b/version2/src/java/iotcloud/PendingTransaction.java @@ -13,6 +13,7 @@ class PendingTransaction { private Set keyValueUpdateSet = null; private Set keyValueGuardSet = null; private long arbitrator = -1; + private long machineLocalTransSeqNum = -1; public PendingTransaction() { keyValueUpdateSet = new HashSet(); @@ -94,6 +95,14 @@ class PendingTransaction { return keyValueGuardSet; } + public void setMachineLocalTransSeqNum(long _machineLocalTransSeqNum) { + machineLocalTransSeqNum = _machineLocalTransSeqNum; + } + + public long getMachineLocalTransSeqNum() { + return machineLocalTransSeqNum; + } + public boolean evaluateGuard(Map keyValTableCommitted, Map keyValTableSpeculative, Map keyValTablePendingTransSpeculative) { for (KeyValue kvGuard : keyValueGuardSet) { diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 6aeb28e..be9defb 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Set; import java.util.Collection; import java.util.Collections; +import java.nio.ByteBuffer; /** @@ -24,9 +25,6 @@ import java.util.Collections; final public class Table { private int numslots; //number of slots stored in buffer - //table of key-value pairs - //private HashMap table = new HashMap(); - // machine id -> (sequence number, Slot or LastMessage); records last message by each client private HashMap > lastmessagetable = new HashMap >(); // machine id -> ... @@ -52,10 +50,12 @@ final public class Table { private int smallestTableStatusSeen = -1; private int largestTableStatusSeen = -1; private int lastSeenPendingTransactionSpeculateIndex = 0; + private int commitSequenceNumber = 0; + private long localTransactionSequenceNumber = 0; private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building private LinkedList pendingTransQueue = null; // Queue of pending transactions - private Map commitMap = null; // List of all the most recent live commits + private Map> commitMap = null; // List of all the most recent live commits private Map abortMap = null; // Set of the live aborts private Map committedMapByKey = null; // Table of committed KV private Map commitedTable = null; // Table of committed KV @@ -63,20 +63,24 @@ final public class Table { private Map uncommittedTransactionsMap = null; private Map arbitratorTable = null; // Table of arbitrators private Map newKeyTable = null; // Table of speculative KV - private Map newCommitMap = null; // Map of all the new commits + private Map> newCommitMap = null; // Map of all the new commits private Map lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator - private Map lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator + private Map lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator + private Map lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator private Map pendingTransSpeculativeTable = null; + private List pendingCommitsList = null; + private List pendingCommitsToDelete = null; + private Map localCommunicationChannels; + private Map transactionStatusMap = null; - - public Table(String baseurl, String password, long _localmachineid) { + public Table(String hostname, String baseurl, String password, long _localmachineid) { localmachineid = _localmachineid; buffer = new SlotBuffer(); numslots = buffer.capacity(); setResizeThreshold(); sequencenumber = 0; - cloud = new CloudComm(this, baseurl, password); + cloud = new CloudComm(this, hostname, baseurl, password); lastliveslotseqn = 1; setupDataStructs(); @@ -95,7 +99,7 @@ final public class Table { private void setupDataStructs() { pendingTransQueue = new LinkedList(); - commitMap = new HashMap(); + commitMap = new HashMap>(); abortMap = new HashMap(); committedMapByKey = new HashMap(); commitedTable = new HashMap(); @@ -103,10 +107,30 @@ final public class Table { uncommittedTransactionsMap = new HashMap(); arbitratorTable = new HashMap(); newKeyTable = new HashMap(); - newCommitMap = new HashMap(); + newCommitMap = new HashMap>(); lastCommitSeenSeqNumMap = new HashMap(); + lastCommitSeenTransSeqNumMap = new HashMap(); lastAbortSeenSeqNumMap = new HashMap(); pendingTransSpeculativeTable = new HashMap(); + pendingCommitsList = new LinkedList(); + pendingCommitsToDelete = new LinkedList(); + localCommunicationChannels = new HashMap(); + transactionStatusMap = new HashMap(); + } + + public void initTable() throws ServerException { + cloud.setSalt();//Set the salt + Slot s = new Slot(this, 1, localmachineid); + TableStatus status = new TableStatus(s, numslots); + s.addEntry(status); + Slot[] array = cloud.putSlot(s, numslots); + if (array == null) { + array = new Slot[] {s}; + /* update data structure */ + validateandupdate(array, true); + } else { + throw new Error("Error on initialization"); + } } public void rebuild() throws ServerException { @@ -114,45 +138,112 @@ final public class Table { validateandupdate(newslots, true); } - // // TODO: delete method - // public void printSlots() { - // long o = buffer.getOldestSeqNum(); - // long n = buffer.getNewestSeqNum(); - - // int[] types = new int[10]; - - // int num = 0; - - // int livec = 0; - // int deadc = 0; - // for (long i = o; i < (n + 1); i++) { - // Slot s = buffer.getSlot(i); - - // Vector entries = s.getEntries(); - - // for (Entry e : entries) { - // if (e.isLive()) { - // int type = e.getType(); - // types[type] = types[type] + 1; - // num++; - // livec++; - // } else { - // deadc++; - // } - // } - // } - - // for (int i = 0; i < 10; i++) { - // System.out.println(i + " " + types[i]); - // } - // System.out.println("Live count: " + livec); - // System.out.println("Dead count: " + deadc); - // System.out.println("Old: " + o); - // System.out.println("New: " + n); - // System.out.println("Size: " + buffer.size()); - // System.out.println("Commits Map: " + commitedTable.size()); - // System.out.println("Commits List: " + commitMap.size()); - // } + // TODO: delete method + public void printSlots() { + long o = buffer.getOldestSeqNum(); + long n = buffer.getNewestSeqNum(); + + int[] types = new int[10]; + + int num = 0; + + int livec = 0; + int deadc = 0; + for (long i = o; i < (n + 1); i++) { + Slot s = buffer.getSlot(i); + + Vector entries = s.getEntries(); + + for (Entry e : entries) { + if (e.isLive()) { + int type = e.getType(); + types[type] = types[type] + 1; + num++; + livec++; + } else { + deadc++; + } + } + } + + for (int i = 0; i < 10; i++) { + System.out.println(i + " " + types[i]); + } + System.out.println("Live count: " + livec); + System.out.println("Dead count: " + deadc); + System.out.println("Old: " + o); + System.out.println("New: " + n); + System.out.println("Size: " + buffer.size()); + System.out.println("Commits Key Map: " + commitedTable.size()); + // System.out.println("Commits Live Map: " + commitMap.size()); + System.out.println("Pending: " + pendingTransQueue.size()); + + // List strList = new ArrayList(); + // for (int i = 0; i < 100; i++) { + // String keyA = "a" + i; + // String keyB = "b" + i; + // String keyC = "c" + i; + // String keyD = "d" + i; + + // IoTString iKeyA = new IoTString(keyA); + // IoTString iKeyB = new IoTString(keyB); + // IoTString iKeyC = new IoTString(keyC); + // IoTString iKeyD = new IoTString(keyD); + + // strList.add(iKeyA); + // strList.add(iKeyB); + // strList.add(iKeyC); + // strList.add(iKeyD); + // } + + + // for (Long l : commitMap.keySet()) { + // for (Long l2 : commitMap.get(l).keySet()) { + // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) { + // strList.remove(kv.getKey()); + // System.out.print(kv.getKey() + " "); + // } + // } + // } + + // System.out.println(); + // System.out.println(); + + // for (IoTString s : strList) { + // System.out.print(s + " "); + // } + // System.out.println(); + // System.out.println(strList.size()); + } + + public long getId() { + return localmachineid; + } + + public boolean hasConnection() { + return cloud.hasConnection(); + } + + public String toString() { + String retString = " Committed Table: \n"; + retString += "---------------------------\n"; + retString += commitedTable.toString(); + + retString += "\n\n"; + + retString += " Speculative Table: \n"; + retString += "---------------------------\n"; + retString += speculativeTable.toString(); + + return retString; + } + + public void addLocalComm(long machineId, LocalComm lc) { + localCommunicationChannels.put(machineId, lc); + } + public Long getArbitrator(IoTString key) { + return arbitratorTable.get(key); + } public IoTString getCommitted(IoTString key) { KeyValue kv = commitedTable.get(key); @@ -234,37 +325,24 @@ final public class Table { } } - public Long getArbitrator(IoTString key) { - return arbitratorTable.get(key); - } - - public void initTable() throws ServerException { - cloud.setSalt();//Set the salt - Slot s = new Slot(this, 1, localmachineid); - TableStatus status = new TableStatus(s, numslots); - s.addEntry(status); - Slot[] array = cloud.putSlot(s, numslots); - if (array == null) { - array = new Slot[] {s}; - /* update data structure */ - validateandupdate(array, true); - } else { - throw new Error("Error on initialization"); - } - } + public void update() { + try { + Slot[] newslots = cloud.getSlots(sequencenumber + 1); + validateandupdate(newslots, false); - public String toString() { - String retString = " Committed Table: \n"; - retString += "---------------------------\n"; - retString += commitedTable.toString(); + if (!pendingTransQueue.isEmpty()) { - retString += "\n\n"; + // We have a pending transaction so do full insertion + processPendingTrans(); + } else { - retString += " Speculative Table: \n"; - retString += "---------------------------\n"; - retString += speculativeTable.toString(); + // We dont have a pending transaction so do minimal effort + updateWithNotPendingTrans(); + } - return retString; + } catch (Exception e) { + // could not update so do nothing + } } public void startTransaction() { @@ -272,150 +350,372 @@ final public class Table { pendingTransBuild = new PendingTransaction(); } - public void commitTransaction() throws ServerException { + public void addKV(IoTString key, IoTString value) { + + if (arbitratorTable.get(key) == null) { + throw new Error("Key not Found."); + } + + // Make sure new key value pair matches the current arbitrator + if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } + + KeyValue kv = new KeyValue(key, value); + pendingTransBuild.addKV(kv); + } + + public TransactionStatus commitTransaction() { if (pendingTransBuild.getKVUpdates().size() == 0) { - // If no updates are made then there is no point inserting into the chain - return; + + // transaction with no updates will have no effect on the system + return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); } - // Add the pending transaction to the queue - pendingTransQueue.add(pendingTransBuild); + TransactionStatus transStatus = null; - for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) { - PendingTransaction pt = pendingTransQueue.get(i); + if (pendingTransBuild.getArbitrator() != localmachineid) { - if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) { + // set the local sequence number so we can recognize this transaction later + pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber); + localTransactionSequenceNumber++; - lastSeenPendingTransactionSpeculateIndex = i; + transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator()); + transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus); - for (KeyValue kv : pt.getKVUpdates()) { - pendingTransSpeculativeTable.put(kv.getKey(), kv); - } + // Add the pending transaction to the queue + pendingTransQueue.add(pendingTransBuild); - } - } + for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) { + PendingTransaction pt = pendingTransQueue.get(i); - // Delete since already inserted - pendingTransBuild = new PendingTransaction(); + if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) { - while (!pendingTransQueue.isEmpty()) { - if (tryput( pendingTransQueue.peek(), false)) { - pendingTransQueue.poll(); + lastSeenPendingTransactionSpeculateIndex = i; + + for (KeyValue kv : pt.getKVUpdates()) { + pendingTransSpeculativeTable.put(kv.getKey(), kv); + } + + } } - } - } + } else { + Transaction ut = new Transaction(null, + -1, + localmachineid, + pendingTransBuild.getArbitrator(), + pendingTransBuild.getKVUpdates(), + pendingTransBuild.getKVGuard()); - public void addKV(IoTString key, IoTString value) { + Pair> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid)); - if (arbitratorTable.get(key) == null) { - throw new Error("Key not Found."); + if (retData.getFirst()) { + transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator()); + } else { + transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator()); + } } - // Make sure new key value pair matches the current arbitrator - if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); + // Try to insert transactions if possible + if (!pendingTransQueue.isEmpty()) { + // We have a pending transaction so do full insertion + processPendingTrans(); + } else { + try { + // We dont have a pending transaction so do minimal effort + updateWithNotPendingTrans(); + } catch (Exception e) { + // Do nothing + } } - KeyValue kv = new KeyValue(key, value); - pendingTransBuild.addKV(kv); + // reset it so next time is fresh + pendingTransBuild = new PendingTransaction(); + + return transStatus; } - public void update() throws ServerException { - Slot[] newslots = cloud.getSlots(sequencenumber + 1); - validateandupdate(newslots, false); + public boolean createNewKey(IoTString keyName, long machineId) throws ServerException { - if (!pendingTransQueue.isEmpty()) { - System.out.println("Full Update"); + while (true) { + if (arbitratorTable.get(keyName) != null) { + // There is already an arbitrator + return false; + } - // We have a pending transaction so do full insertion + if (tryput(keyName, machineId, false)) { + // If successfully inserted + return true; + } + } + } + + private void processPendingTrans() { + boolean sentAllPending = false; + try { while (!pendingTransQueue.isEmpty()) { if (tryput( pendingTransQueue.peek(), false)) { pendingTransQueue.poll(); } } - } else { - // We dont have a pending transaction so do minimal effort - if (uncommittedTransactionsMap.keySet().size() > 0) { - - boolean doEnd = false; - boolean needResize = false; - while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) { - boolean resize = needResize; - needResize = false; - - Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); - int newsize = 0; - if (liveslotcount > resizethreshold) { - resize = true; //Resize is forced - } - if (resize) { - newsize = (int) (numslots * RESIZE_MULTIPLE); - TableStatus status = new TableStatus(s, newsize); - s.addEntry(status); - } + // if got here then all pending transactions were sent + sentAllPending = true; + } catch (Exception e) { + // There was a connection error + e.printStackTrace(); + sentAllPending = false; + } - doRejectedMessages(s); - ThreeTuple retTup = doMandatoryResuce(s, resize); + if (!sentAllPending) { - // Resize was needed so redo call - if (retTup.getFirst()) { - needResize = true; - continue; - } + for (Iterator i = pendingTransQueue.iterator(); i.hasNext(); ) { + PendingTransaction pt = i.next(); + LocalComm lc = localCommunicationChannels.get(pt.getArbitrator()); + if (lc == null) { + // Cant talk directly to arbitrator so cant do anything + continue; + } - // Extract working variables - boolean seenliveslot = retTup.getSecond(); - long seqn = retTup.getThird(); - // Did need to arbitrate - doEnd = !doArbitration(s); + Transaction ut = new Transaction(null, + -1, + localmachineid, + pendingTransBuild.getArbitrator(), + pendingTransBuild.getKVUpdates(), + pendingTransBuild.getKVGuard()); - doOptionalRescue(s, seenliveslot, seqn, resize); - int max = 0; - if (resize) { - max = newsize; - } + Pair> retData = sendTransactionToLocal(ut, lc); + + for (Commit commit : retData.getSecond()) { + // Prepare to process the commit + processEntry(commit); + } + + boolean didCommitOrSpeculate = proccessAllNewCommits(); + + // Go through all uncommitted transactions and kill the ones that are dead + deleteDeadUncommittedTransactions(); - Slot[] array = cloud.putSlot(s, max); - if (array == null) { - array = new Slot[] {s}; - rejectedmessagelist.clear(); - } else { - if (array.length == 0) - throw new Error("Server Error: Did not send any slots"); - rejectedmessagelist.add(s.getSequenceNumber()); - doEnd = false; + // Speculate on key value pairs + didCommitOrSpeculate |= createSpeculativeTable(); + createPendingTransactionSpeculativeTable(didCommitOrSpeculate); + + + if (retData.getFirst()) { + TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum()); + if (transStatus != null) { + transStatus.setStatus(TransactionStatus.StatusCommitted); } - /* update data structure */ - validateandupdate(array, true); + } else { + TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum()); + if (transStatus != null) { + transStatus.setStatus(TransactionStatus.StatusAborted); + } } } } } - public boolean createNewKey(IoTString keyName, long machineId) throws ServerException { + private void updateWithNotPendingTrans() throws ServerException { - while (true) { - if (arbitratorTable.get(keyName) != null) { - // There is already an arbitrator - return false; + boolean doEnd = false; + boolean needResize = false; + while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) { + boolean resize = needResize; + needResize = false; + + Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + int newsize = 0; + if (liveslotcount > resizethreshold) { + resize = true; //Resize is forced } - if (tryput(keyName, machineId, false)) { - // If successfully inserted - return true; + if (resize) { + newsize = (int) (numslots * RESIZE_MULTIPLE); + TableStatus status = new TableStatus(s, newsize); + s.addEntry(status); + } + + doRejectedMessages(s); + + ThreeTuple retTup = doMandatoryResuce(s, resize); + + // Resize was needed so redo call + if (retTup.getFirst()) { + needResize = true; + continue; } + + // Extract working variables + boolean seenliveslot = retTup.getSecond(); + long seqn = retTup.getThird(); + + // Did need to arbitrate + doEnd = !doArbitration(s); + + doOptionalRescue(s, seenliveslot, seqn, resize); + + int max = 0; + if (resize) { + max = newsize; + } + + Slot[] array = cloud.putSlot(s, max); + if (array == null) { + array = new Slot[] {s}; + rejectedmessagelist.clear(); + + // Delete pending commits that were sent to the cloud + deletePendingCommits(); + + } else { + if (array.length == 0) + throw new Error("Server Error: Did not send any slots"); + rejectedmessagelist.add(s.getSequenceNumber()); + doEnd = false; + } + + /* update data structure */ + validateandupdate(array, true); } } + private Pair> sendTransactionToLocal(Transaction ut, LocalComm lc) { + + // encode the request + byte[] array = new byte[Long.BYTES + ut.getSize()]; + ByteBuffer bbEncode = ByteBuffer.wrap(array); + Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator()); + if (lastSeenCommit != null) { + bbEncode.putLong(lastSeenCommit); + } else { + bbEncode.putLong(0); + } + ut.encode(bbEncode); + + byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array()); + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + boolean didCommit = bbDecode.get() == 1; + int numberOfCommites = bbDecode.getInt(); + + List newCommits = new LinkedList(); + for (int i = 0; i < numberOfCommites; i++ ) { + bbDecode.get(); + Commit com = (Commit)Commit.decode(null, bbDecode); + newCommits.add(com); + } + + return new Pair>(didCommit, newCommits); + } + + public byte[] localCommInput(byte[] data) { + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + long lastSeenCommit = bbDecode.getLong(); + bbDecode.get(); + Transaction ut = (Transaction)Transaction.decode(null, bbDecode); + + // Do the local update and arbitrate + Pair> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit); + + // Calculate the size of the response + int size = Byte.BYTES + Integer.BYTES; + for (Commit com : returnData.getSecond()) { + size += com.getSize(); + } + + // encode the response + byte[] array = new byte[size]; + ByteBuffer bbEncode = ByteBuffer.wrap(array); + if (returnData.getFirst()) { + bbEncode.put((byte)1); + } else { + bbEncode.put((byte)0); + } + bbEncode.putInt(returnData.getSecond().size()); + + for (Commit com : returnData.getSecond()) { + com.encode(bbEncode); + } + + return bbEncode.array(); + } + + private Pair> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) { + + if (ut.getArbitrator() != localmachineid) { + // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator + return null; + } + + List returnCommits = new ArrayList(); + + if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) { + // There is a commit that the other client has not seen yet + + Map cm = commitMap.get(localmachineid); + if (cm != null) { + + List commitKeys = new ArrayList(cm.keySet()); + Collections.sort(commitKeys); + + + for (int i = (commitKeys.size() - 1); i >= 0; i--) { + Commit com = cm.get(commitKeys.get(i)); + + if (com.getSequenceNumber() <= lastCommitSeen) { + break; + } + returnCommits.add((Commit)com.getCopy(null)); + } + } + } + + if (!ut.evaluateGuard(commitedTable, null)) { + // Guard evaluated as false so return only the commits that the other device has not seen yet + return new Pair>(false, returnCommits); + } + + // create the commit + Commit commit = new Commit(null, + -1, + commitSequenceNumber, + ut.getArbitrator(), + ut.getkeyValueUpdateSet()); + commitSequenceNumber = commitSequenceNumber + 1; + + // Add to the pending commits list + pendingCommitsList.add(commit); + + // Add this commit so we can send it back + returnCommits.add(commit); + + // Prepare to process the commit + processEntry(commit); + + boolean didCommitOrSpeculate = proccessAllNewCommits(); + + // Go through all uncommitted transactions and kill the ones that are dead + deleteDeadUncommittedTransactions(); + + // Speculate on key value pairs + didCommitOrSpeculate |= createSpeculativeTable(); + createPendingTransactionSpeculativeTable(didCommitOrSpeculate); + + return new Pair>(true, returnCommits); + } + public void decrementLiveCount() { liveslotcount--; } @@ -467,7 +767,23 @@ final public class Table { } doOptionalRescue(s, seenliveslot, seqn, resize); - return doSendSlotsAndInsert(s, insertedTrans, resize, newsize); + Pair sendRetData = doSendSlots(s, insertedTrans, resize, newsize); + + if (sendRetData.getFirst()) { + // update the status and change what the sequence number is for the + TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum()); + transStatus.setStatus(TransactionStatus.StatusSent); + transStatus.setSentTransaction(); + transactionStatusMap.put(trans.getSequenceNumber(), transStatus); + } + + + if (sendRetData.getSecond().length != 0) { + // insert into the local block chain + validateandupdate(sendRetData.getSecond(), true); + } + + return sendRetData.getFirst(); } private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException { @@ -506,7 +822,14 @@ final public class Table { } doOptionalRescue(s, seenliveslot, seqn, resize); - return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize); + Pair sendRetData = doSendSlots(s, insertedNewKey, resize, newsize); + + if (sendRetData.getSecond().length != 0) { + // insert into the local block chain + validateandupdate(sendRetData.getSecond(), true); + } + + return sendRetData.getFirst(); } private void doRejectedMessages(Slot s) { @@ -588,6 +911,24 @@ final public class Table { } private boolean doArbitration(Slot s) { + + // flag whether we have finished all arbitration + boolean stillHasArbitration = false; + + pendingCommitsToDelete.clear(); + + // First add queue commits + for (Commit commit : pendingCommitsList) { + if (s.hasSpace(commit)) { + s.addEntry(commit); + pendingCommitsToDelete.add(commit); + } else { + // Ran out of space so move on but still not done + stillHasArbitration = true; + return stillHasArbitration; + } + } + // Arbitrate Map speculativeTableTmp = new HashMap(); List transSeqNums = new ArrayList(uncommittedTransactionsMap.keySet()); @@ -595,7 +936,6 @@ final public class Table { // Sort from oldest to newest Collections.sort(transSeqNums); - boolean didNeedArbitration = false; for (Long transNum : transSeqNums) { Transaction ut = uncommittedTransactionsMap.get(transNum); @@ -605,7 +945,7 @@ final public class Table { } // we did have something to arbitrate on - didNeedArbitration = true; + stillHasArbitration = true; Entry newEntry = null; @@ -618,12 +958,20 @@ final public class Table { } // create the commit - newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet()); + newEntry = new Commit(s, + ut.getSequenceNumber(), + commitSequenceNumber, + ut.getArbitrator(), + ut.getkeyValueUpdateSet()); + commitSequenceNumber = commitSequenceNumber + 1; } else { // Guard was false // create the abort - newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator()); + newEntry = new Abort(s, + ut.getSequenceNumber(), + ut.getMachineID(), + ut.getArbitrator()); } if ((newEntry != null) && s.hasSpace(newEntry)) { @@ -633,7 +981,14 @@ final public class Table { } } - return didNeedArbitration; + return stillHasArbitration; + } + + private void deletePendingCommits() { + for (Commit com : pendingCommitsToDelete) { + pendingCommitsList.remove(com); + } + pendingCommitsToDelete.clear(); } private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) { @@ -665,7 +1020,7 @@ final public class Table { } } - private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException { + private Pair doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException { int max = 0; if (resize) max = newsize; @@ -674,6 +1029,9 @@ final public class Table { if (array == null) { array = new Slot[] {s}; rejectedmessagelist.clear(); + + // Delete pending commits that were sent to the cloud + deletePendingCommits(); } else { // if (array.length == 0) // throw new Error("Server Error: Did not send any slots"); @@ -681,11 +1039,7 @@ final public class Table { inserted = false; } - if (array.length != 0) { - validateandupdate(array, true); - } - - return inserted; + return new Pair(inserted, array); } private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) { @@ -745,79 +1099,112 @@ final public class Table { createPendingTransactionSpeculativeTable(didCommitOrSpeculate); } - public boolean proccessAllNewCommits() { - + private boolean proccessAllNewCommits() { // Process only if there are commit if (newCommitMap.keySet().size() == 0) { return false; } + boolean didProcessNewCommit = false; - List commitSeqNums = new ArrayList(newCommitMap.keySet()); - - // Sort from oldest to newest commit - Collections.sort(commitSeqNums); + for (Long arb : newCommitMap.keySet()) { - boolean didProcessNewCommit = false; + List commitSeqNums = new ArrayList(newCommitMap.get(arb).keySet()); - // Go through each new commit one by one - for (Long entrySeqNum : commitSeqNums) { - Commit entry = newCommitMap.get(entrySeqNum); + // Sort from oldest to newest commit + Collections.sort(commitSeqNums); - long lastCommitSeenSeqNum = -1; + // Go through each new commit one by one + for (Long entrySeqNum : commitSeqNums) { + Commit entry = newCommitMap.get(arb).get(entrySeqNum); - if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) { - lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()); - } + long lastCommitSeenSeqNum = -1; + if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) { + lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()); + } - if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) { + if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) { + Map cm = commitMap.get(arb); + if (cm == null) { + cm = new HashMap(); + } - Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry); + Commit prevCommit = cm.put(entry.getSequenceNumber(), entry); + commitMap.put(arb, cm); - if (prevCommit != null) { - prevCommit.setDead(); + if (prevCommit != null) { + prevCommit.setDead(); - for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) { - committedMapByKey.put(kv.getKey(), entry); + for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) { + committedMapByKey.put(kv.getKey(), entry); + } } + + continue; } - continue; - } + Set commitsToEditSet = new HashSet(); - Set commitsToEditSet = new HashSet(); + for (KeyValue kv : entry.getkeyValueUpdateSet()) { + commitsToEditSet.add(committedMapByKey.get(kv.getKey())); + } - for (KeyValue kv : entry.getkeyValueUpdateSet()) { - commitsToEditSet.add(committedMapByKey.get(kv.getKey())); - } + commitsToEditSet.remove(null); - commitsToEditSet.remove(null); + for (Commit prevCommit : commitsToEditSet) { - for (Commit prevCommit : commitsToEditSet) { + Set deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet()); - Set deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet()); + if (!prevCommit.isLive()) { + Map cm = commitMap.get(arb); - if (!prevCommit.isLive()) { - commitMap.remove(prevCommit.getTransSequenceNumber()); + // remove it from the map so that it can be set as dead + if (cm != null) { + cm.remove(prevCommit.getSequenceNumber()); + commitMap.put(arb, cm); + } + } } - } - // Add the new commit - commitMap.put(entry.getTransSequenceNumber(), entry); - lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); - didProcessNewCommit = true; + // Add the new commit + Map cm = commitMap.get(arb); + if (cm == null) { + cm = new HashMap(); + } + cm.put(entry.getSequenceNumber(), entry); + commitMap.put(arb, cm); + + lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber()); + + // set the trans sequence number if we are able to + if (entry.getTransSequenceNumber() != -1) { + lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); + } - // Update the committed table list - for (KeyValue kv : entry.getkeyValueUpdateSet()) { - IoTString key = kv.getKey(); - commitedTable.put(key, kv); + didProcessNewCommit = true; - committedMapByKey.put(key, entry); + // Update the committed table list + for (KeyValue kv : entry.getkeyValueUpdateSet()) { + IoTString key = kv.getKey(); + commitedTable.put(key, kv); + committedMapByKey.put(key, entry); + } } } - // Clear the new commits storage so we can use it later newCommitMap.clear(); + // go through all saved transactions and update the status of those that can be updated + for (Iterator> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) { + Map.Entry entry = i.next(); + long seqnum = entry.getKey(); + TransactionStatus status = entry.getValue(); + + if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) { + status.setStatus(TransactionStatus.StatusCommitted); + i.remove(); + } + } + return didProcessNewCommit; } @@ -827,8 +1214,11 @@ final public class Table { Transaction prevtrans = i.next().getValue(); long transArb = prevtrans.getArbitrator(); - if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) || - (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) { + Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb); + Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb); + + if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) || + ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) { i.remove(); prevtrans.setDead(); } @@ -942,7 +1332,6 @@ final public class Table { private void updateExpectedSize() { expectedsize++; if (expectedsize > currmaxsize) { - System.out.println("Maxing Out: " + expectedsize + " " + currmaxsize); expectedsize = currmaxsize; } } @@ -1019,7 +1408,7 @@ final public class Table { private void processEntry(Transaction entry) { long arb = entry.getArbitrator(); - Long comLast = lastCommitSeenSeqNumMap.get(arb); + Long comLast = lastCommitSeenTransSeqNumMap.get(arb); Long abLast = lastAbortSeenSeqNumMap.get(arb); Transaction prevTrans = null; @@ -1039,7 +1428,6 @@ final public class Table { } private void processEntry(Abort entry) { - if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) { // Abort has not been seen yet so we need to keep track of it @@ -1047,18 +1435,32 @@ final public class Table { if (prevAbort != null) { prevAbort.setDead(); // delete old version of the duplicate } + + if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) { + lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); + } } else { // The machine already saw this so it is dead entry.setDead(); } - if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) { - lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); + // Update the status of the transaction and remove it since we are done with this transaction + TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber()); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); } } - private void processEntry(Commit entry, Slot s) { - Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry); + private void processEntry(Commit entry) { + Map arbMap = newCommitMap.get(entry.getTransArbitrator()); + + if (arbMap == null) { + arbMap = new HashMap(); + } + + Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry); + newCommitMap.put(entry.getTransArbitrator(), arbMap); + if (prevCommit != null) { prevCommit.setDead(); } @@ -1078,8 +1480,6 @@ final public class Table { if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) { largestTableStatusSeen = newnumslots; } - - // System.out.println("Table Stat: " + newnumslots + " large: " + largestTableStatusSeen + " small: " + smallestTableStatusSeen); } private void addWatchList(long machineid, RejectedMessage entry) { @@ -1161,7 +1561,7 @@ final public class Table { break; case Entry.TypeCommit: - processEntry((Commit)entry, slot); + processEntry((Commit)entry); break; case Entry.TypeAbort: diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index cae1c39..d2af002 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -1,5 +1,8 @@ package iotcloud; +import java.util.List; +import java.util.ArrayList; + /** * Test cases. * @author Brian Demsky @@ -8,7 +11,7 @@ package iotcloud; public class Test { - public static final int NUMBER_OF_TESTS = 15; + public static final int NUMBER_OF_TESTS = 100; public static void main(String[] args) throws ServerException { if (args[0].equals("2")) { @@ -28,143 +31,35 @@ public class Test { } else if (args[0].equals("9")) { test9(); } + // else if (args[0].equals("10")) { + // test10(); + // } } - // static void test9() throws ServerException { - // // Setup the 2 clients - // Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - // t1.initTable(); - // Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); - // t2.update(); - - - // // Make the Keys - // System.out.println("Setting up keys"); - // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - // String a = "a" + i; - // String b = "b" + i; - // String c = "c" + i; - // String d = "d" + i; - // IoTString ia = new IoTString(a); - // IoTString ib = new IoTString(b); - // IoTString ic = new IoTString(c); - // IoTString id = new IoTString(d); - // t1.createNewKey(ia, 321); - // t1.createNewKey(ib, 351); - // t2.createNewKey(ic, 321); - // t2.createNewKey(id, 351); - // } - - // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - // String a = "a" + i; - // String b = "b" + i; - // String c = "c" + i; - // String d = "d" + i; - // IoTString ia = new IoTString(a); - // IoTString ib = new IoTString(b); - // IoTString ic = new IoTString(c); - // IoTString id = new IoTString(d); - // t1.createNewKey(ia, 1000); - // t1.createNewKey(ib, 1000); - // t2.createNewKey(ic, 1000); - // t2.createNewKey(id, 1000); - // } - - // System.out.println("Updating Clients..."); - // t1.update(); - // t2.update(); - // t1.update(); - // t2.update(); - // boolean foundError = false; - - // System.out.println("Checking Key-Values..."); - // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - - // String keyA = "a" + i; - // String keyB = "b" + i; - // String keyC = "c" + i; - // String keyD = "d" + i; - - // IoTString iKeyA = new IoTString(keyA); - // IoTString iKeyB = new IoTString(keyB); - // IoTString iKeyC = new IoTString(keyC); - // IoTString iKeyD = new IoTString(keyD); - - - // Long testValA1 = t1.getArbitrator(iKeyA); - // Long testValB1 = t1.getArbitrator(iKeyB); - // Long testValC1 = t1.getArbitrator(iKeyC); - // Long testValD1 = t1.getArbitrator(iKeyD); - - // Long testValA2 = t2.getArbitrator(iKeyA); - // Long testValB2 = t2.getArbitrator(iKeyB); - // Long testValC2 = t2.getArbitrator(iKeyC); - // Long testValD2 = t2.getArbitrator(iKeyD); - - // if ((testValA1 == null) || (testValA1 != 321)) { - // System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1); - // foundError = true; - // } - - // if ((testValB1 == null) || (testValB1 != 351)) { - // System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1); - // foundError = true; - // } - - // if ((testValC1 == null) || (testValC1 != 321)) { - // System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1); - // foundError = true; - // } - - // if ((testValD1 == null) || (testValD1 != 351)) { - // System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1); - // foundError = true; - // } - - // if ((testValA2 == null) || (testValA2 != 321)) { - // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); - // foundError = true; - // } - - // if ((testValB2 == null) || (testValB2 != 351)) { - // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); - // foundError = true; - // } - - // if ((testValC2 == null) || (testValC2 != 321)) { - // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); - // foundError = true; - // } - - // if ((testValD2 == null) || (testValD2 != 351)) { - // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); - // foundError = true; - // } - // } - - // if (foundError) { - // System.out.println("Found Errors..."); - // } else { - // System.out.println("No Errors Found..."); - // } - // } - - // static void test8() throws ServerException { + // static void test10() throws ServerException { + // long startTime = 0; + // long endTime = 0; // boolean foundError = false; // // Setup the 2 clients - // Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + // Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); // t1.initTable(); - // Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + // Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); // t2.update(); - // // t1.rebuild(); - // // t2.rebuild(); + // if (t1.hasConnection()) { + // System.out.println("Can see server"); + // } + + // LocalComm lc = new LocalComm(t1, t2); + // t1.addLocalComm(t2.getId(), lc); + // t2.addLocalComm(t1.getId(), lc); // // Make the Keys // System.out.println("Setting up keys"); + // startTime = System.currentTimeMillis(); // for (int i = 0; i < NUMBER_OF_TESTS; i++) { // String a = "a" + i; // String b = "b" + i; @@ -179,92 +74,16 @@ public class Test { // t2.createNewKey(ic, 321); // t2.createNewKey(id, 351); // } + // endTime = System.currentTimeMillis(); + // System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) ); + // System.out.println("Time Taken Per Key: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) ); + // System.out.println(); // // Do Updates for the keys // System.out.println("Setting Key-Values..."); - - - // String keyA0 = "a0"; - // String keyB0 = "b0"; - // String keyC0 = "c0"; - // String keyD0 = "d0"; - // String valueA0 = "a0"; - // String valueB0 = "b0"; - // String valueC0 = "c0"; - // String valueD0 = "d0"; - - // IoTString iKeyA0 = new IoTString(keyA0); - // IoTString iKeyB0 = new IoTString(keyB0); - // IoTString iKeyC0 = new IoTString(keyC0); - // IoTString iKeyD0 = new IoTString(keyD0); - // IoTString iValueA0 = new IoTString(valueA0); - // IoTString iValueB0 = new IoTString(valueB0); - // IoTString iValueC0 = new IoTString(valueC0); - // IoTString iValueD0 = new IoTString(valueD0); - - // t1.startTransaction(); - // t1.addKV( iKeyA0, iValueA0); - // t1.commitTransaction(); - - // t1.startTransaction(); - // t1.addKV(iKeyB0, iValueB0); - // t1.commitTransaction(); - - // t2.startTransaction(); - // t2.addKV(iKeyC0, iValueC0); - // t2.commitTransaction(); - - // t2.startTransaction(); - // t2.addKV(iKeyD0, iValueD0); - // t2.commitTransaction(); - - // for (int i = 1; i < NUMBER_OF_TESTS; i++) { - // String keyB = "b" + i; - // String valueB = "b" + i; - // IoTString iKeyB = new IoTString(keyB); - // IoTString iValueB = new IoTString(valueB); - - // String keyBOld = "b" + (i - 1); - // String valueBOld = "b" + (i - 1); - // IoTString iKeyBOld = new IoTString(keyBOld); - // IoTString iValueBOld = new IoTString(valueBOld); - - - // t1.startTransaction(); - // t1.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyBOld, iValueBOld, Guard.Equal)))); - // t1.addKV(iKeyB, iValueB); - // t1.commitTransaction(); - // } - - // System.out.println("Checking Key-Values..."); - // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - - // String keyB = "b" + i; - // String valueB = "b" + i; - - // IoTString iKeyB = new IoTString(keyB); - // IoTString iValueB = new IoTString(valueB); - - // IoTString testValB1 = t1.getSpeculative(iKeyB); - - // if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyB); - // foundError = true; - - // } - // } - - - // System.out.println("Updating Clients..."); - // t1.update(); - // t2.update(); - // t1.update(); - // t2.update(); - - // System.out.println("Checking Key-Values..."); + // startTime = System.currentTimeMillis(); // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - // String keyA = "a" + i; // String keyB = "b" + i; // String keyC = "c" + i; @@ -283,271 +102,44 @@ public class Test { // IoTString iValueC = new IoTString(valueC); // IoTString iValueD = new IoTString(valueD); - - // IoTString testValA1 = t1.getCommitted(iKeyA); - // IoTString testValB1 = t1.getCommitted(iKeyB); - // IoTString testValC1 = t1.getCommitted(iKeyC); - // IoTString testValD1 = t1.getCommitted(iKeyD); - - // IoTString testValA2 = t2.getCommitted(iKeyA); - // IoTString testValB2 = t2.getCommitted(iKeyB); - // IoTString testValC2 = t2.getCommitted(iKeyC); - // IoTString testValD2 = t2.getCommitted(iKeyD); - - - // if (i == 0) { - // if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyA); - // foundError = true; - // } - - // if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyB); - // foundError = true; - // } - - // if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyC); - // foundError = true; - // } - - // if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyD); - // foundError = true; - // } - - - // if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); - // foundError = true; - // } - - // if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); - // foundError = true; - // } - - // if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); - // foundError = true; - // } - - // if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); - // foundError = true; - // } - // } else { - // if (testValA1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyA); - // foundError = true; - // } - - // if (testValB1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyB); - // foundError = true; - // } - - // if (testValC1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyC); - // foundError = true; - // } - - // if (testValD1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyD); - // foundError = true; - // } - - // if (testValA2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); - // foundError = true; + // while (true) { + // t1.startTransaction(); + // t1.addKV(iKeyA, iValueA); + // if (t1.commitTransactionLocal()) { + // break; // } + // } - // if (testValB2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); - // foundError = true; + // while (true) { + // t1.startTransaction(); + // t1.addKV(iKeyB, iValueB); + // if (t1.commitTransactionLocal()) { + // break; // } + // } - // if (testValC2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); - // foundError = true; + // while (true) { + // t2.startTransaction(); + // t2.addKV(iKeyC, iValueC); + // if (t2.commitTransactionLocal()) { + // break; // } + // } - // if (testValD2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); - // foundError = true; + // while (true) { + // t2.startTransaction(); + // t2.addKV(iKeyD, iValueD); + // if (t2.commitTransactionLocal()) { + // break; // } // } // } - - // if (foundError) { - // System.out.println("Found Errors..."); - // } else { - // System.out.println("No Errors Found..."); - // } - // } - - // static void test7() throws ServerException { - - // long startTime = 0; - // long endTime = 0; - - // // Setup the 2 clients - // Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - // t1.initTable(); - // Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); - // t2.update(); - - // // t1.rebuild(); - // // t2.rebuild(); - - // // Make the Keys - // System.out.println("Setting up keys"); - // startTime = System.currentTimeMillis(); - // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - // String a = "a" + i; - // String b = "b" + i; - // String c = "c" + i; - // String d = "d" + i; - // IoTString ia = new IoTString(a); - // IoTString ib = new IoTString(b); - // IoTString ic = new IoTString(c); - // IoTString id = new IoTString(d); - // t1.createNewKey(ia, 321); - // t1.createNewKey(ib, 351); - // t2.createNewKey(ic, 321); - // t2.createNewKey(id, 351); - // } // endTime = System.currentTimeMillis(); - - // System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) ); - // System.out.println("Time Taken Per Key: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) ); - // System.out.println(); - - - // // Do Updates for the keys - // System.out.println("Setting Key-Values..."); - // startTime = System.currentTimeMillis(); - - - - // String keyA0 = "a0"; - // String keyB0 = "b0"; - // String keyC0 = "c0"; - // String keyD0 = "d0"; - // String valueA0 = "a0"; - // String valueB0 = "b0"; - // String valueC0 = "c0"; - // String valueD0 = "d0"; - - // IoTString iKeyA0 = new IoTString(keyA0); - // IoTString iKeyB0 = new IoTString(keyB0); - // IoTString iKeyC0 = new IoTString(keyC0); - // IoTString iKeyD0 = new IoTString(keyD0); - // IoTString iValueA0 = new IoTString(valueA0); - // IoTString iValueB0 = new IoTString(valueB0); - // IoTString iValueC0 = new IoTString(valueC0); - // IoTString iValueD0 = new IoTString(valueD0); - - // t1.startTransaction(); - // t1.addKV( iKeyA0, iValueA0); - // t1.commitTransaction(); - - // t1.startTransaction(); - // t1.addKV(iKeyB0, iValueB0); - // t1.commitTransaction(); - - // t2.startTransaction(); - // t2.addKV(iKeyC0, iValueC0); - // t2.commitTransaction(); - - // t2.startTransaction(); - // t2.addKV(iKeyD0, iValueD0); - // t2.commitTransaction(); - - // for (int i = 1; i < NUMBER_OF_TESTS; i++) { - // String keyB = "b" + i; - // String valueB = "b" + i; - // IoTString iKeyB = new IoTString(keyB); - // IoTString iValueB = new IoTString(valueB); - - // String keyBOld = "b" + (i - 1); - // String valueBOld = "b" + (i - 2); - // IoTString iKeyBOld = new IoTString(keyBOld); - // IoTString iValueBOld = new IoTString(valueBOld); - - - // t1.startTransaction(); - // t1.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyBOld, iValueBOld, Guard.Equal)))); - // t1.addKV(iKeyB, iValueB); - // t1.commitTransaction(); - // } - - // for (int i = 1; i < NUMBER_OF_TESTS; i++) { - // String keyC = "c" + i; - // String valueC = "c" + i; - // IoTString iKeyC = new IoTString(keyC); - // IoTString iValueC = new IoTString(valueC); - - // String keyCOld = "c" + (i - 1); - // String valueCOld = "c" + (i - 2); - // IoTString iKeyCOld = new IoTString(keyCOld); - // IoTString iValueCOld = new IoTString(valueCOld); - - - // t2.startTransaction(); - // t2.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyCOld, iValueCOld, Guard.Equal)))); - // t2.addKV(iKeyC, iValueC); - // t2.commitTransaction(); - // } - - // for (int i = 1; i < NUMBER_OF_TESTS; i++) { - // String keyA = "a" + i; - // String keyD = "d" + i; - // String valueA = "a" + i; - // String valueD = "d" + i; - - // IoTString iKeyA = new IoTString(keyA); - // IoTString iKeyD = new IoTString(keyD); - // IoTString iValueA = new IoTString(valueA); - // IoTString iValueD = new IoTString(valueD); - - - // String keyAOld = "a" + (i - 1); - // String keyDOld = "d" + (i - 1); - // String valueAOld = "a" + (i - 2); - // String valueDOld = "d" + (i - 2); - // IoTString iKeyAOld = new IoTString(keyAOld); - // IoTString iKeyDOld = new IoTString(keyDOld); - // IoTString iValueAOld = new IoTString(valueAOld); - // IoTString iValueDOld = new IoTString(valueDOld); - - - // t1.startTransaction(); - // t1.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyAOld, iValueAOld, Guard.Equal)))); - // t1.addKV(iKeyA, iValueA); - // t1.commitTransaction(); - - // t2.startTransaction(); - // t2.addGuard(new Guard(new IoTString(Guard.createExpression(iKeyDOld, iValueDOld, Guard.Equal)))); - // t2.addKV(iKeyD, iValueD); - // t2.commitTransaction(); - // } - - // endTime = System.currentTimeMillis(); - // System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) ); // System.out.println("Time Taken Per Update: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) ); // System.out.println(); - // System.out.println("Updating Clients..."); - // t1.update(); - // t2.update(); - // t1.update(); - // t2.update(); - - // boolean foundError = false; - // System.out.println("Checking Key-Values..."); // for (int i = 0; i < NUMBER_OF_TESTS; i++) { @@ -580,96 +172,146 @@ public class Test { // IoTString testValC2 = t2.getCommitted(iKeyC); // IoTString testValD2 = t2.getCommitted(iKeyD); + // if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { + // System.out.println("Key-Value t1 incorrect: " + keyA); + // foundError = true; + // } - // if (i == 0) { - // if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyA); - // foundError = true; - // } + // if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { + // System.out.println("Key-Value t1 incorrect: " + keyB); + // foundError = true; + // } - // if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyB); - // foundError = true; - // } + // if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { + // System.out.println("Key-Value t1 incorrect: " + keyC); + // foundError = true; + // } - // if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyC); - // foundError = true; - // } + // if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { + // System.out.println("Key-Value t1 incorrect: " + keyD); + // foundError = true; + // } - // if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { - // System.out.println("Key-Value t1 incorrect: " + keyD); - // foundError = true; - // } + // if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { + // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); + // foundError = true; + // } - // if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); - // foundError = true; - // } + // if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { + // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); + // foundError = true; + // } - // if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); - // foundError = true; - // } + // if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { + // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); + // foundError = true; + // } - // if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); - // foundError = true; - // } + // if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { + // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); + // foundError = true; + // } + // } - // if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { - // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); - // foundError = true; - // } - // } else { - // if (testValA1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyA); - // foundError = true; - // } - // if (testValB1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyB); - // foundError = true; - // } + // // System.out.println("Updating Clients..."); + // // t1.update(); + // // t2.update(); + // // t1.update(); + // // t2.update(); - // if (testValC1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyC); - // foundError = true; - // } - // if (testValD1 != null) { - // System.out.println("Key-Value t1 incorrect: " + keyD); - // foundError = true; - // } - // if (testValA2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); - // foundError = true; - // } + // // System.out.println("Checking Key-Values..."); + // // for (int i = 0; i < NUMBER_OF_TESTS; i++) { - // if (testValB2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); - // foundError = true; - // } + // // String keyA = "a" + i; + // // String keyB = "b" + i; + // // String keyC = "c" + i; + // // String keyD = "d" + i; + // // String valueA = "a" + i; + // // String valueB = "b" + i; + // // String valueC = "c" + i; + // // String valueD = "d" + i; - // if (testValC2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); - // foundError = true; - // } + // // IoTString iKeyA = new IoTString(keyA); + // // IoTString iKeyB = new IoTString(keyB); + // // IoTString iKeyC = new IoTString(keyC); + // // IoTString iKeyD = new IoTString(keyD); + // // IoTString iValueA = new IoTString(valueA); + // // IoTString iValueB = new IoTString(valueB); + // // IoTString iValueC = new IoTString(valueC); + // // IoTString iValueD = new IoTString(valueD); + + + // // IoTString testValA1 = t1.getCommitted(iKeyA); + // // IoTString testValB1 = t1.getCommitted(iKeyB); + // // IoTString testValC1 = t1.getCommitted(iKeyC); + // // IoTString testValD1 = t1.getCommitted(iKeyD); + + // // IoTString testValA2 = t2.getCommitted(iKeyA); + // // IoTString testValB2 = t2.getCommitted(iKeyB); + // // IoTString testValC2 = t2.getCommitted(iKeyC); + // // IoTString testValD2 = t2.getCommitted(iKeyD); + + // // if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { + // // System.out.println("Key-Value t1 incorrect: " + keyA); + // // foundError = true; + // // } + + // // if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { + // // System.out.println("Key-Value t1 incorrect: " + keyB); + // // foundError = true; + // // } + + // // if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { + // // System.out.println("Key-Value t1 incorrect: " + keyC); + // // foundError = true; + // // } + + // // if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { + // // System.out.println("Key-Value t1 incorrect: " + keyD); + // // foundError = true; + // // } + + + // // if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { + // // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); + // // foundError = true; + // // } + + // // if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { + // // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); + // // foundError = true; + // // } + + // // if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { + // // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); + // // foundError = true; + // // } + + // // if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { + // // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); + // // foundError = true; + // // } + // // } - // if (testValD2 != null) { - // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); - // foundError = true; - // } - // } - // } // if (foundError) { // System.out.println("Found Errors..."); // } else { // System.out.println("No Errors Found..."); // } + + + // System.out.println(); + // System.out.println(); + // System.out.println(); + // t1.printSlots(); + // System.out.println(); + // System.out.println(); + // t2.printSlots(); // } static void test9() { @@ -677,7 +319,7 @@ public class Test { boolean foundError = false; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); while (true) { try { @@ -686,7 +328,7 @@ public class Test { } catch (Exception e) {} } - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); while (true) { try { @@ -1019,7 +661,7 @@ public class Test { // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); while (true) { try { t1.initTable(); @@ -1027,7 +669,7 @@ public class Test { } catch (Exception e) {} } - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); while (true) { try { t2.update(); @@ -1305,9 +947,9 @@ public class Test { boolean foundError = false; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); startTime = System.currentTimeMillis(); @@ -1537,9 +1179,9 @@ public class Test { long endTime = 0; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); @@ -1723,9 +1365,9 @@ public class Test { boolean foundError = false; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); @@ -1942,9 +1584,9 @@ public class Test { long endTime = 0; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); @@ -2117,9 +1759,9 @@ public class Test { boolean foundError = false; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); @@ -2296,11 +1938,13 @@ public class Test { long endTime = 0; // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); + List transStatusList = new ArrayList(); + // Make the Keys System.out.println("Setting up keys"); startTime = System.currentTimeMillis(); @@ -2319,9 +1963,6 @@ public class Test { t2.createNewKey(id, 351); } endTime = System.currentTimeMillis(); - - - System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) ); System.out.println("Time Taken Per Key: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) ); System.out.println(); @@ -2351,22 +1992,21 @@ public class Test { t1.startTransaction(); t1.addKV(iKeyA, iValueA); - t1.commitTransaction(); + transStatusList.add(t1.commitTransaction()); t1.startTransaction(); t1.addKV(iKeyB, iValueB); - t1.commitTransaction(); + transStatusList.add(t1.commitTransaction()); t2.startTransaction(); t2.addKV(iKeyC, iValueC); - t2.commitTransaction(); + transStatusList.add(t2.commitTransaction()); t2.startTransaction(); t2.addKV(iKeyD, iValueD); - t2.commitTransaction(); + transStatusList.add(t2.commitTransaction()); } endTime = System.currentTimeMillis(); - System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) ); System.out.println("Time Taken Per Update: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) ); System.out.println(); @@ -2454,10 +2094,25 @@ public class Test { } } + for (TransactionStatus status : transStatusList) { + if (status.getStatus() != TransactionStatus.StatusCommitted) { + foundError = true; + } + } + if (foundError) { System.out.println("Found Errors..."); } else { System.out.println("No Errors Found..."); } + + + System.out.println(); + System.out.println(); + System.out.println(); + t1.printSlots(); + System.out.println(); + System.out.println(); + t2.printSlots(); } } diff --git a/version2/src/java/iotcloud/Transaction.java b/version2/src/java/iotcloud/Transaction.java index 72aed03..90ef4d4 100644 --- a/version2/src/java/iotcloud/Transaction.java +++ b/version2/src/java/iotcloud/Transaction.java @@ -47,6 +47,7 @@ class Transaction extends Entry { return seqnum; } + public Set getkeyValueUpdateSet() { return keyValueUpdateSet; } @@ -59,24 +60,26 @@ class Transaction extends Entry { for (KeyValue kvGuard : keyValueGuardSet) { // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = keyValTableSpeculative.get(kvGuard.getKey()); + KeyValue kv = null; + + // If we have a speculation table then use it first + if (keyValTableSpeculative != null) { + kv = keyValTableSpeculative.get(kvGuard.getKey()); + } if (kv == null) { // if it is not in the speculative table then check the committed table and use that // value as our latest assumption kv = keyValTableCommitted.get(kvGuard.getKey()); - // System.out.println("Replaced With Commit Table"); } if (kvGuard.getValue() != null) { if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) { - // System.out.println("Fail 1 " + (kv == null) + " " + kvGuard.getValue() + " " + kv.getValue()); return false; } } else { if (kv != null) { - // System.out.println("Fail 2 " + kv.getValue()); return false; } } diff --git a/version2/src/java/iotcloud/TransactionStatus.java b/version2/src/java/iotcloud/TransactionStatus.java new file mode 100644 index 0000000..a42f570 --- /dev/null +++ b/version2/src/java/iotcloud/TransactionStatus.java @@ -0,0 +1,53 @@ +package iotcloud; + +class TransactionStatus { + static final byte StatusAborted = 1; + static final byte StatusPending = 2; + static final byte StatusCommitted = 3; + // static final byte StatusRetrying = 4; + static final byte StatusSent = 5; + static final byte StatusNoEffect = 6; + + private byte status = 0; + private boolean applicationReleased = false; + private long arbitrator = 0; + private boolean wasSentInChain = false; + + public TransactionStatus(byte _status, long _arbitrator) { + status = _status; + arbitrator = _arbitrator; + } + + public byte getStatus() { + return status; + } + + public void setStatus(byte _status) { + status = _status; + } + + public void setSentTransaction() { + wasSentInChain = true; + } + + public boolean getSentTransaction() { + return wasSentInChain; + } + + + // public void setArbitrator(long _arbitrator) { + // arbitrator = _arbitrator; + // } + + public long getArbitrator() { + return arbitrator; + } + + public void release() { + applicationReleased = true; + } + + public boolean getReleased() { + return applicationReleased; + } +} \ No newline at end of file -- 2.34.1