From 4bf41f2451d15f476920e05d00f41e922d98667c Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Thu, 1 Dec 2016 17:37:07 -0800 Subject: [PATCH] Initial Working version of IoTCloudv2, needs more testing --- src2/java/iotcloud/Abort.java | 2 +- src2/java/iotcloud/CloudComm.java | 103 +++++++++++---------- src2/java/iotcloud/Entry.java | 12 +++ src2/java/iotcloud/Guard.java | 28 +++++- src2/java/iotcloud/NewKey.java | 4 +- src2/java/iotcloud/PendingTransaction.java | 9 +- src2/java/iotcloud/Table.java | 87 +++++++++-------- src2/java/iotcloud/Test.java | 44 ++++++--- src2/java/iotcloud/Transaction.java | 2 + 9 files changed, 179 insertions(+), 112 deletions(-) diff --git a/src2/java/iotcloud/Abort.java b/src2/java/iotcloud/Abort.java index c2b57b7..4d0c59b 100644 --- a/src2/java/iotcloud/Abort.java +++ b/src2/java/iotcloud/Abort.java @@ -40,7 +40,7 @@ class Abort extends Entry { } int getSize() { - return 2*Long.BYTES+Byte.BYTES; + return (2 * Long.BYTES) + Byte.BYTES; } byte getType() { diff --git a/src2/java/iotcloud/CloudComm.java b/src2/java/iotcloud/CloudComm.java index ac906b1..f38756d 100644 --- a/src2/java/iotcloud/CloudComm.java +++ b/src2/java/iotcloud/CloudComm.java @@ -24,7 +24,7 @@ class CloudComm { static final int SALT_SIZE = 8; byte salt[]; Table table; - + /** * Empty Constructor needed for child class. */ @@ -37,8 +37,8 @@ class CloudComm { */ CloudComm(Table _table, String _baseurl, String _password) { - this.table=_table; - this.baseurl=_baseurl; + this.table = _table; + this.baseurl = _baseurl; this.password = _password; this.random = new SecureRandom(); } @@ -64,13 +64,13 @@ class CloudComm { private void initCrypt() { try { - SecretKeySpec key=initKey(); + SecretKeySpec key = initKey(); password = null; // drop password mac = Mac.getInstance("HmacSHA256"); mac.init(key); - encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding"); + encryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); encryptCipher.init(Cipher.ENCRYPT_MODE, key); - decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding"); + decryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); decryptCipher.init(Cipher.DECRYPT_MODE, key); } catch (Exception e) { e.printStackTrace(); @@ -83,29 +83,32 @@ class CloudComm { */ private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException { - String reqstring=isput?"req=putslot":"req=getslot"; - String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber; + String reqstring = isput ? "req=putslot" : "req=getslot"; + String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber; if (maxentries != 0) - urlstr += "&max="+maxentries; + urlstr += "&max=" + maxentries; return new URL(urlstr); } - + public void setSalt() { try { salt = new byte[SALT_SIZE]; random.nextBytes(salt); - URL url=new URL(baseurl+"?req=setsalt"); - URLConnection con=url.openConnection(); + URL url = new URL(baseurl + "?req=setsalt"); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.setFixedLengthStreamingMode(salt.length); http.setDoOutput(true); http.connect(); - OutputStream os=http.getOutputStream(); + OutputStream os = http.getOutputStream(); os.write(salt); - int responsecode=http.getResponseCode(); - if (responsecode != HttpURLConnection.HTTP_OK) + int responsecode = http.getResponseCode(); + if (responsecode != HttpURLConnection.HTTP_OK) { + // TODO: Remove this print + System.out.println(responsecode); throw new Error("Invalid response"); + } } catch (Exception e) { e.printStackTrace(); throw new Error("Failed setting salt"); @@ -114,20 +117,20 @@ class CloudComm { } private void getSalt() throws Exception { - URL url=new URL(baseurl+"?req=getsalt"); - URLConnection con=url.openConnection(); + URL url = new URL(baseurl + "?req=getsalt"); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.connect(); - - InputStream is=http.getInputStream(); - DataInputStream dis=new DataInputStream(is); - int salt_length=dis.readInt(); - byte [] tmp=new byte[salt_length]; + + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + int salt_length = dis.readInt(); + byte [] tmp = new byte[salt_length]; dis.readFully(tmp); - salt=tmp; + salt = tmp; } - + /* * API for putting a slot into the queue. Returns null on success. * On failure, the server will send slots with newer sequence @@ -140,13 +143,13 @@ class CloudComm { getSalt(); initCrypt(); } - - long sequencenumber=slot.getSequenceNumber(); - byte[] bytes=slot.encode(mac); + + long sequencenumber = slot.getSequenceNumber(); + byte[] bytes = slot.encode(mac); bytes = encryptCipher.doFinal(bytes); - URL url=buildRequest(true, sequencenumber, max); - URLConnection con=url.openConnection(); + URL url = buildRequest(true, sequencenumber, max); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); @@ -154,12 +157,12 @@ class CloudComm { http.setDoOutput(true); http.connect(); - OutputStream os=http.getOutputStream(); + OutputStream os = http.getOutputStream(); os.write(bytes); - InputStream is=http.getInputStream(); - DataInputStream dis=new DataInputStream(is); - byte[] resptype=new byte[7]; + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + byte[] resptype = new byte[7]; dis.readFully(resptype); if (Arrays.equals(resptype, "getslot".getBytes())) return processSlots(dis); @@ -184,20 +187,20 @@ class CloudComm { getSalt(); initCrypt(); } - - URL url=buildRequest(false, sequencenumber, 0); - URLConnection con=url.openConnection(); + + URL url = buildRequest(false, sequencenumber, 0); + URLConnection con = url.openConnection(); HttpURLConnection http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.connect(); - InputStream is=http.getInputStream(); + InputStream is = http.getInputStream(); + + DataInputStream dis = new DataInputStream(is); - DataInputStream dis=new DataInputStream(is); - - byte[] resptype=new byte[7]; + byte[] resptype = new byte[7]; dis.readFully(resptype); if (!Arrays.equals(resptype, "getslot".getBytes())) - throw new Error("Bad Response: "+new String(resptype)); + throw new Error("Bad Response: " + new String(resptype)); else return processSlots(dis); } catch (Exception e) { @@ -212,19 +215,19 @@ class CloudComm { */ private Slot[] processSlots(DataInputStream dis) throws Exception { - int numberofslots=dis.readInt(); - int[] sizesofslots=new int[numberofslots]; - Slot[] slots=new Slot[numberofslots]; - for(int i=0; i(commitedTable); - for (Transaction ut : uncommittedTransactionsList) { - - KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; - // Check if this machine arbitrates for this transaction - if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { - continue; - } - - Entry newEntry = null; - - try { - if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { - // Guard evaluated as true - - // update the local tmp current key set - for (KeyValue kv : ut.getkeyValueUpdateSet()) { - speculativeTableTmp.put(kv.getKey(), kv); - } - - // create the commit - newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); - } else { - // Guard was false - - // create the abort - newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); - } - } catch (Exception e) { - e.printStackTrace(); - } - - if ((newEntry != null) && s.hasSpace(newEntry)) { - s.addEntry(newEntry); - } else { - break; - } - } + // // Arbitrate + // Map speculativeTableTmp = new HashMap(commitedTable); + // for (Transaction ut : uncommittedTransactionsList) { + + // KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; + // // Check if this machine arbitrates for this transaction + // if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { + // continue; + // } + + // Entry newEntry = null; + + // try { + // if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { + // // Guard evaluated as true + + // // update the local tmp current key set + // for (KeyValue kv : ut.getkeyValueUpdateSet()) { + // speculativeTableTmp.put(kv.getKey(), kv); + // } + + // // create the commit + // newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); + // } else { + // // Guard was false + + // // create the abort + // newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); + // } + // } catch (Exception e) { + // e.printStackTrace(); + // } + + // if ((newEntry != null) && s.hasSpace(newEntry)) { + + // // TODO: Remove print + // System.out.println("Arbitrating..."); + // s.addEntry(newEntry); + // } else { + // break; + // } + // } NewKey newKey = new NewKey(s, keyName, arbMachineid); @@ -960,7 +963,8 @@ final public class Table { prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet()); if (!prevcommit.isLive()) { - commitList.remove(prevcommit); + //commitList.remove(prevcommit); + i.remove(); } } @@ -979,7 +983,8 @@ final public class Table { Transaction prevtrans = i.next(); if (prevtrans.getSequenceNumber() <= committedTransSeq) { - uncommittedTransactionsList.remove(prevtrans); + // uncommittedTransactionsList.remove(prevtrans); + i.remove(); prevtrans.setDead(); } } diff --git a/src2/java/iotcloud/Test.java b/src2/java/iotcloud/Test.java index 6ac5b06..00d4c7c 100644 --- a/src2/java/iotcloud/Test.java +++ b/src2/java/iotcloud/Test.java @@ -19,37 +19,57 @@ public class Test { Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); - for (int i = 0; i < 600; i++) { - String a = "STR" + i; - String b = "ABR" + i; + + + final int NUMBER_OF_TESTS = 200; + + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + System.out.println("Doing: " + i); + + String a = "a" + i; + String b = "b" + i; IoTString ia = new IoTString(a); IoTString ib = new IoTString(b); - t1.createNewKey(ia, 321); - t2.createNewKey(ib, 351); + t1.createNewKey(ia, 351); + t2.createNewKey(ib, 321); t1.startTransaction(); t1.addKV(ia, ia); t1.commitTransaction(); + } + + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + System.out.println("Doing: " + i); + + String a = "a" + i; + String b = "b" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); t2.startTransaction(); t2.addKV(ib, ib); t2.commitTransaction(); } - t1.update(); - t2.update(); - for (int i = 0; i < 600; i++) { - String a = "STR" + i; - String b = "ABR" + i; + t1.update(); + // t2.update(); + // t1.update(); + + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + String a = "a" + i; + String b = "b" + i; IoTString ia = new IoTString(a); IoTString ib = new IoTString(b); - System.out.println(ib + "->" + t1.getCommitted(ib)); - System.out.println(ia + "->" + t2.getCommitted(ia)); + System.out.println(ib + " -> " + t1.getCommitted(ib)); + System.out.println(ia + " -> " + t2.getCommitted(ia)); + System.out.println(); } } } diff --git a/src2/java/iotcloud/Transaction.java b/src2/java/iotcloud/Transaction.java index 18b8721..5acd5c2 100644 --- a/src2/java/iotcloud/Transaction.java +++ b/src2/java/iotcloud/Transaction.java @@ -16,6 +16,8 @@ class Transaction extends Entry { seqnum = _seqnum; machineid = _machineid; + keyValueUpdateSet = new HashSet(); + for (KeyValue kv : _keyValueUpdateSet) { KeyValue kvCopy = kv.getCopy(); keyValueUpdateSet.add(kvCopy); -- 2.34.1