Initial commit of code for new version of block chain, does not compile (had to go...
authorAli Younis <ayounis@uci.edu>
Fri, 18 Nov 2016 00:26:49 +0000 (16:26 -0800)
committerAli Younis <ayounis@uci.edu>
Fri, 18 Nov 2016 00:26:49 +0000 (16:26 -0800)
33 files changed:
.gitignore
src2/java/.dir-locals.el [new file with mode: 0644]
src2/java/iotcloud/Abort.java [new file with mode: 0644]
src2/java/iotcloud/CloudComm.java [new file with mode: 0644]
src2/java/iotcloud/Commit.java [new file with mode: 0644]
src2/java/iotcloud/Entry.java [new file with mode: 0644]
src2/java/iotcloud/Guard.java [new file with mode: 0644]
src2/java/iotcloud/IoTString.java [new file with mode: 0644]
src2/java/iotcloud/KeyValue.java [new file with mode: 0644]
src2/java/iotcloud/LastMessage.java [new file with mode: 0644]
src2/java/iotcloud/Liveness.java [new file with mode: 0644]
src2/java/iotcloud/Makefile [new file with mode: 0644]
src2/java/iotcloud/NewKey.java [new file with mode: 0644]
src2/java/iotcloud/Pair.java [new file with mode: 0644]
src2/java/iotcloud/PendingTransaction.java [new file with mode: 0644]
src2/java/iotcloud/RejectedMessage.java [new file with mode: 0644]
src2/java/iotcloud/Slot.java [new file with mode: 0644]
src2/java/iotcloud/SlotBuffer.java [new file with mode: 0644]
src2/java/iotcloud/SlotIndexer.java [new file with mode: 0644]
src2/java/iotcloud/Table.java [new file with mode: 0644]
src2/java/iotcloud/TableStatus.java [new file with mode: 0644]
src2/java/iotcloud/Test.java [new file with mode: 0644]
src2/java/iotcloud/Transaction.java [new file with mode: 0644]
src2/java/iotcloud/issues.txt [new file with mode: 0644]
src2/script/C.cfg [new file with mode: 0644]
src2/script/java.cfg [new file with mode: 0644]
src2/script/makefile [new file with mode: 0644]
src2/server/.dir-locals.el [new file with mode: 0644]
src2/server/Makefile [new file with mode: 0644]
src2/server/README.txt [new file with mode: 0644]
src2/server/iotcloud.cpp [new file with mode: 0644]
src2/server/iotquery.cpp [new file with mode: 0644]
src2/server/iotquery.h [new file with mode: 0644]

index deab3534836d049fbeba71942b6e97ac35cfe172..8e8a718fcb2ce3d158d60005433a32013c1693d6 100644 (file)
@@ -9,6 +9,9 @@ dist/
 bower_components
 test/bower_components
 
+/src/bin
+
+
 # Ignoring those pesky .DS_Store files on mac
 .DS_Store
 
diff --git a/src2/java/.dir-locals.el b/src2/java/.dir-locals.el
new file mode 100644 (file)
index 0000000..e166a2e
--- /dev/null
@@ -0,0 +1,2 @@
+((nil . ((indent-tabs-mode . t))))
+
diff --git a/src2/java/iotcloud/Abort.java b/src2/java/iotcloud/Abort.java
new file mode 100644 (file)
index 0000000..8ec1871
--- /dev/null
@@ -0,0 +1,53 @@
+package iotcloud;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This Entry records the abort sent by a given machine.
+ * @author Ali Younis <ayounis@uci.edu>
+ * @version 1.0
+ */
+
+
+class Abort extends Entry {
+       private long seqnum;
+       private long machineid;
+
+       Abort(Slot slot, long _seqnum, long _machineid) {
+               super(slot);
+               seqnum=_seqnum;
+               machineid=_machineid;
+       }
+
+       long getMachineID() {
+               return machineid;
+       }
+
+       long getSequenceNumber() {
+               return seqnum;
+       }
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               long seqnum=bb.getLong();
+               long machineid=bb.getLong();
+               return new Abort(slot, seqnum, machineid);
+       }
+
+       void encode(ByteBuffer bb) {
+               bb.put(Entry.TypeAbort);
+               bb.putLong(seqnum);
+               bb.putLong(machineid);
+       }
+
+       int getSize() {
+               return 2*Long.BYTES+Byte.BYTES;
+       }
+
+       byte getType() {
+               return Entry.TypeAbort;
+       }
+
+       Entry getCopy(Slot s) {
+               return new Abort(s, seqnum, machineid);
+       }
+}
\ No newline at end of file
diff --git a/src2/java/iotcloud/CloudComm.java b/src2/java/iotcloud/CloudComm.java
new file mode 100644 (file)
index 0000000..ac906b1
--- /dev/null
@@ -0,0 +1,232 @@
+package iotcloud;
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+import javax.crypto.*;
+import javax.crypto.spec.*;
+import java.security.SecureRandom;
+
+/**
+ * This class provides a communication API to the webserver.  It also
+ * validates the HMACs on the slots and handles encryption.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+class CloudComm {
+       String baseurl;
+       Cipher encryptCipher;
+       Cipher decryptCipher;
+       Mac mac;
+       String password;
+       SecureRandom random;
+       static final int SALT_SIZE = 8;
+       byte salt[];
+       Table table;
+       
+       /**
+        * Empty Constructor needed for child class.
+        */
+
+       CloudComm() {
+       }
+
+       /**
+        * Constructor for actual use. Takes in the url and password.
+        */
+
+       CloudComm(Table _table, String _baseurl, String _password) {
+               this.table=_table;
+               this.baseurl=_baseurl;
+               this.password = _password;
+               this.random = new SecureRandom();
+       }
+
+       /**
+        * Generates Key from password.
+        */
+
+       private SecretKeySpec initKey() {
+               try {
+                       PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), salt, 65536, 128);
+                       SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
+                       return new SecretKeySpec(tmpkey.getEncoded(), "AES");
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       throw new Error("Failed generating key.");
+               }
+       }
+
+       /**
+        * Inits the HMAC generator.
+        */
+
+       private void initCrypt() {
+               try {
+                       SecretKeySpec key=initKey();
+                       password = null; // drop password
+                       mac = Mac.getInstance("HmacSHA256");
+                       mac.init(key);
+                       encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+                       encryptCipher.init(Cipher.ENCRYPT_MODE, key);
+                       decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+                       decryptCipher.init(Cipher.DECRYPT_MODE, key);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       throw new Error("Failed To Initialize Ciphers");
+               }
+       }
+
+       /*
+        * Builds the URL for the given request.
+        */
+
+       private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
+               String reqstring=isput?"req=putslot":"req=getslot";
+               String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
+               if (maxentries != 0)
+                       urlstr += "&max="+maxentries;
+               return new URL(urlstr);
+       }
+       
+       public void setSalt() {
+               try {
+                       salt = new byte[SALT_SIZE];
+                       random.nextBytes(salt);
+                       URL url=new URL(baseurl+"?req=setsalt");
+                       URLConnection con=url.openConnection();
+                       HttpURLConnection http = (HttpURLConnection) con;
+                       http.setRequestMethod("POST");
+                       http.setFixedLengthStreamingMode(salt.length);
+                       http.setDoOutput(true);
+                       http.connect();
+                       OutputStream os=http.getOutputStream();
+                       os.write(salt);
+                       int responsecode=http.getResponseCode();
+                       if (responsecode != HttpURLConnection.HTTP_OK)
+                               throw new Error("Invalid response");
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       throw new Error("Failed setting salt");
+               }
+               initCrypt();
+       }
+
+       private void getSalt() throws Exception {
+               URL url=new URL(baseurl+"?req=getsalt");
+               URLConnection con=url.openConnection();
+               HttpURLConnection http = (HttpURLConnection) con;
+               http.setRequestMethod("POST");
+               http.connect();
+               
+               InputStream is=http.getInputStream();
+               DataInputStream dis=new DataInputStream(is);
+               int salt_length=dis.readInt();
+               byte [] tmp=new byte[salt_length];
+               dis.readFully(tmp);
+               salt=tmp;
+       }
+       
+       /*
+        * API for putting a slot into the queue.  Returns null on success.
+        * On failure, the server will send slots with newer sequence
+        * numbers.
+        */
+
+       Slot[] putSlot(Slot slot, int max) {
+               try {
+                       if (salt == null) {
+                               getSalt();
+                               initCrypt();
+                       }
+                       
+                       long sequencenumber=slot.getSequenceNumber();
+                       byte[] bytes=slot.encode(mac);
+                       bytes = encryptCipher.doFinal(bytes);
+
+                       URL url=buildRequest(true, sequencenumber, max);
+                       URLConnection con=url.openConnection();
+                       HttpURLConnection http = (HttpURLConnection) con;
+
+                       http.setRequestMethod("POST");
+                       http.setFixedLengthStreamingMode(bytes.length);
+                       http.setDoOutput(true);
+                       http.connect();
+
+                       OutputStream os=http.getOutputStream();
+                       os.write(bytes);
+
+                       InputStream is=http.getInputStream();
+                       DataInputStream dis=new DataInputStream(is);
+                       byte[] resptype=new byte[7];
+                       dis.readFully(resptype);
+                       if (Arrays.equals(resptype, "getslot".getBytes()))
+                               return processSlots(dis);
+                       else if (Arrays.equals(resptype, "putslot".getBytes()))
+                               return null;
+                       else
+                               throw new Error("Bad response to putslot");
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       throw new Error("putSlot failed");
+               }
+       }
+
+       /**
+        * Request the server to send all slots with the given
+        * sequencenumber or newer.
+        */
+
+       Slot[] getSlots(long sequencenumber) {
+               try {
+                       if (salt == null) {
+                               getSalt();
+                               initCrypt();
+                       }
+                       
+                       URL url=buildRequest(false, sequencenumber, 0);
+                       URLConnection con=url.openConnection();
+                       HttpURLConnection http = (HttpURLConnection) con;
+                       http.setRequestMethod("POST");
+                       http.connect();
+                       InputStream is=http.getInputStream();
+
+                       DataInputStream dis=new DataInputStream(is);
+                       
+                       byte[] resptype=new byte[7];
+                       dis.readFully(resptype);
+                       if (!Arrays.equals(resptype, "getslot".getBytes()))
+                               throw new Error("Bad Response: "+new String(resptype));
+                       else
+                               return processSlots(dis);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       throw new Error("getSlots failed");
+               }
+       }
+
+       /**
+        * Method that actually handles building Slot objects from the
+        * server response.  Shared by both putSlot and getSlots.
+        */
+
+       private Slot[] processSlots(DataInputStream dis) throws Exception {
+               int numberofslots=dis.readInt();
+               int[] sizesofslots=new int[numberofslots];
+               Slot[] slots=new Slot[numberofslots];
+               for(int i=0; i<numberofslots; i++)
+                       sizesofslots[i]=dis.readInt();
+
+               for(int i=0; i<numberofslots; i++) {
+                       byte[] data=new byte[sizesofslots[i]];
+                       dis.readFully(data);
+
+                       data = decryptCipher.doFinal(data);
+
+                       slots[i]=Slot.decode(table, data, mac);
+               }
+               dis.close();
+               return slots;
+       }
+}
diff --git a/src2/java/iotcloud/Commit.java b/src2/java/iotcloud/Commit.java
new file mode 100644 (file)
index 0000000..05834f0
--- /dev/null
@@ -0,0 +1,56 @@
+package iotcloud;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * This Entry records the abort sent by a given machine.
+ * @author Ali Younis <ayounis@uci.edu>
+ * @version 1.0
+ */
+
+
+class Commit extends Entry {
+       private long seqnum;
+       private Set<KeyValue> keyValueUpdateSet;
+
+
+       public Commit(Slot slot, long _seqnum, long _machineid) {
+               super(slot);
+               seqnum=_seqnum;
+               machineid=_machineid;
+       }
+
+       public long getSequenceNumber() {
+               return seqnum;
+       }
+
+
+
+
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               long seqnum=bb.getLong();
+               long machineid=bb.getLong();
+               return new Abort(slot, seqnum, machineid);
+       }
+
+       public void encode(ByteBuffer bb) {
+               bb.put(Entry.TypeAbort);
+               bb.putLong(seqnum);
+               bb.putLong(machineid);
+       }
+
+       public int getSize() {
+               return 2*Long.BYTES+Byte.BYTES;
+       }
+
+       public byte getType() {
+               return Entry.TypeAbort;
+       }
+
+       public Entry getCopy(Slot s) {
+               return new Abort(s, seqnum, machineid);
+       }
+}
\ No newline at end of file
diff --git a/src2/java/iotcloud/Entry.java b/src2/java/iotcloud/Entry.java
new file mode 100644 (file)
index 0000000..af99192
--- /dev/null
@@ -0,0 +1,99 @@
+package iotcloud;
+import java.nio.ByteBuffer;
+
+/**
+ * Generic class that wraps all the different types of information
+ * that can be stored in a Slot.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+abstract class Entry implements Liveness {
+
+
+       static final byte TypeCommit = 1;
+       static final byte TypeAbort = 2;
+       static final byte TypeTransaction = 3;
+       static final byte TypeNewKey = 4;
+       static final byte TypeLastMessage = 5;
+       static final byte TypeRejectedMessage = 6;
+       static final byte TypeTableStatus = 7;
+
+
+
+       /* Records whether the information is still live or has been
+                superceded by a newer update.  */
+
+       private boolean islive = true;
+       private Slot parentslot;
+
+       public Entry(Slot _parentslot) {
+               parentslot = _parentslot;
+       }
+
+       /**
+        * Static method for decoding byte array into Entry objects.  First
+        * byte tells the type of entry.
+        */
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               byte type = bb.get();
+               switch (type) {
+
+               case TypeLastMessage:
+                       return LastMessage.decode(slot, bb);
+
+               case TypeRejectedMessage:
+                       return RejectedMessage.decode(slot, bb);
+
+               case TypeTableStatus:
+                       return TableStatus.decode(slot, bb);
+
+               default:
+                       throw new Error("Unrecognized Entry Type: " + type);
+               }
+       }
+
+       /**
+        * Returns true if the Entry object is still live.
+        */
+
+       public boolean isLive() {
+               return islive;
+       }
+
+       /**
+        * Flags the entry object as dead.  Also decrements the live count
+        * of the parent slot.
+        */
+
+       public void setDead() {
+               islive = false;
+               parentslot.decrementLiveCount();
+       }
+
+       /**
+        * Serializes the Entry object into the byte buffer.
+        */
+
+       abstract void encode(ByteBuffer bb);
+
+       /**
+        * Returns the size in bytes the entry object will take in the byte
+        * array.
+        */
+
+       abstract int getSize();
+
+       /**
+        * Returns a byte encoding the type of the entry object.
+        */
+
+       abstract byte getType();
+
+       /**
+        * Returns a copy of the Entry that can be added to a different slot.
+        */
+       abstract Entry getCopy(Slot s);
+
+}
diff --git a/src2/java/iotcloud/Guard.java b/src2/java/iotcloud/Guard.java
new file mode 100644 (file)
index 0000000..aaf312b
--- /dev/null
@@ -0,0 +1,101 @@
+package iotcloud;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.nio.ByteBuffer;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.lang.NullPointerException;
+
+
+class Guard {
+
+    static final byte Equal = 1;
+    static final byte NotEqual = 2;
+
+    private IoTString booleanExpression;
+
+    public Guard() {
+        booleanExpression = null;
+    }
+
+    public Guard(IoTString _booleanExpression) {
+        booleanExpression = _booleanExpression;
+    }
+
+    /**
+     * Create an equality expression for a key value.
+     *
+     */
+    public static String createExpression(IoTString keyName, IoTString keyValue, byte op) {
+        if (op == Equal) {
+            return keyName.toString() + "=='" + keyValue.toString() + "'";
+        } else if (op == NotEqual) {
+            return keyName.toString() + "!='" + keyValue.toString() + "'";
+        }
+
+        // Unrecognized op
+        return null;
+    }
+
+    /**
+     * Add a boolean expression to the guard.
+     *
+     */
+    public void setGuardExpression(String expr) {
+        booleanExpression = new IoTString(expr);
+    }
+
+    /**
+     * Evaluate the guard expression for a given set of key value pairs.
+     *
+     */
+    public boolean evaluate(Set<KeyValue> kvSet) throws ScriptException, NullPointerException {
+
+        // There are no conditions to evaluate
+        if (booleanExpression == null) {
+            return true;
+        }
+
+        // All the current key value pairs that we need to evaluate the condition
+        String[] variables = new String[kvSet.size()];
+
+        // Fill the variables array
+        int i = 0;
+        for (KeyValue kv : kvSet) {
+            variables[i] = kv.getKey() + " ='" + kv.getValue() + "'";
+            i++;
+        }
+
+        // Prep the evaluation engine (script engine)
+        ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
+        for (String s : variables) {
+            engine.eval(s);
+        }
+
+        // Evaluate the guard condition
+        return 1 == (Integer)engine.eval(booleanExpression.toString());
+    }
+
+    /**
+     * Get the size of the guard condition
+     *
+     */
+    public int getSize() {
+        return Integer.BYTES + booleanExpression.length();
+    }
+
+    public void encode(ByteBuffer bb) {
+        bb.putInt(booleanExpression.length());
+        bb.put(booleanExpression.internalBytes());
+    }
+
+    static Guard decode(ByteBuffer bb) {
+        int exprLength = bb.getInt();
+        byte[] expr = new byte[exprLength];
+        bb.get(expr);
+        return new Guard(IoTString.shallow(expr));
+    }
+}
\ No newline at end of file
diff --git a/src2/java/iotcloud/IoTString.java b/src2/java/iotcloud/IoTString.java
new file mode 100644 (file)
index 0000000..83a3fa1
--- /dev/null
@@ -0,0 +1,105 @@
+package iotcloud;
+
+import java.util.Arrays;
+
+/**
+ * IoTString is wraps the underlying byte string.  We don't use the
+ * standard String class as we have bytes and not chars.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+final public class IoTString {
+       byte[] array;
+       int hashcode;
+
+       private IoTString() {
+       }
+
+       /**
+        * Builds an IoTString object around the byte array.  This
+        * constructor makes a copy, so the caller is free to modify the byte array.
+        */
+
+       public IoTString(byte[] _array) {
+               array=(byte[]) _array.clone();
+               hashcode=Arrays.hashCode(array);
+       }
+
+       /**
+        * Converts the String object to a byte representation and stores it
+        * into the IoTString object.
+        */
+
+       public IoTString(String str) {
+               array=str.getBytes();
+               hashcode=Arrays.hashCode(array);
+       }
+
+        /**
+        * Internal methods to build an IoTString using the byte[] passed
+        * in.  Caller is responsible for ensuring the byte[] is never
+        * modified.
+        */
+
+       static IoTString shallow(byte[] _array) {
+               IoTString i=new IoTString();
+               i.array = _array;
+               i.hashcode = Arrays.hashCode(_array);
+               return i;
+       }
+
+       /**
+        * Internal method to grab a reference to our byte array.  Caller
+        * must not modify it.
+        */
+
+       byte[] internalBytes() {
+               return array;
+       }
+
+       /**
+        * Returns the hashCode as computed by Arrays.hashcode(byte[]).
+        */
+
+       public int hashCode() {
+               return hashcode;
+       }
+
+       /**
+        * Returns a String representation of the IoTString.
+        */
+
+       public String toString() {
+               return new String(array);
+       }
+
+       /**
+        * Returns a copy of the underlying byte string.
+        */
+
+       public byte[] getBytes() {
+               return (byte[]) array.clone();
+       }
+
+       /**
+        * Returns true if two byte strings have the same content.
+        */
+
+       public boolean equals(Object o) {
+               if (o instanceof IoTString) {
+                       IoTString i=(IoTString)o;
+                       return Arrays.equals(array, i.array);
+               }
+               return false;
+       }
+
+       /**
+        * Returns the length in bytes of the IoTString.
+        */
+
+       public int length() {
+               return array.length;
+       }
+}
diff --git a/src2/java/iotcloud/KeyValue.java b/src2/java/iotcloud/KeyValue.java
new file mode 100644 (file)
index 0000000..9fbaf90
--- /dev/null
@@ -0,0 +1,51 @@
+package iotcloud;
+import java.nio.ByteBuffer;
+
+/**
+ * KeyValue entry for Slot.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+class KeyValue /*extends Entry */ {
+       private IoTString key;
+       private IoTString value;
+
+       public KeyValue(IoTString _key, IoTString _value) {
+               key=_key;
+               value=_value;
+       }
+
+       public IoTString getKey() {
+               return key;
+       }
+
+       public IoTString getValue() {
+               return value;
+       }
+
+       static KeyValue decode(ByteBuffer bb) {
+               int keylength=bb.getInt();
+               int valuelength=bb.getInt();
+               byte[] key=new byte[keylength];
+               byte[] value=new byte[valuelength];
+               bb.get(key);
+               bb.get(value);
+               return new KeyValue(IoTString.shallow(key), IoTString.shallow(value));
+       }
+
+       public void encode(ByteBuffer bb) {
+               bb.putInt(key.length());
+               bb.putInt(value.length());
+               bb.put(key.internalBytes());
+               bb.put(value.internalBytes());
+       }
+
+       public int getSize() {
+               return 2*Integer.BYTES+key.length()+value.length();
+       }
+
+       public String toString() {
+               return value.toString();
+       }
+}
diff --git a/src2/java/iotcloud/LastMessage.java b/src2/java/iotcloud/LastMessage.java
new file mode 100644 (file)
index 0000000..738dff6
--- /dev/null
@@ -0,0 +1,55 @@
+package iotcloud;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This Entry records the last message sent by a given machine.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+class LastMessage extends Entry {
+       private long machineid;
+       private long seqnum;
+
+       public LastMessage(Slot slot, long _machineid, long _seqnum) {
+               super(slot);
+               machineid=_machineid;
+               seqnum=_seqnum;
+       }
+
+       public long getMachineID() {
+               return machineid;
+       }
+
+       public long getSequenceNumber() {
+               return seqnum;
+       }
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               long machineid=bb.getLong();
+               long seqnum=bb.getLong();
+               return new LastMessage(slot, machineid, seqnum);
+       }
+
+       public void encode(ByteBuffer bb) {
+               bb.put(Entry.TypeLastMessage);
+               bb.putLong(machineid);
+               bb.putLong(seqnum);
+       }
+
+       public int getSize() {
+               return 2*Long.BYTES+Byte.BYTES;
+       }
+
+       public byte getType() {
+               return Entry.TypeLastMessage;
+       }
+
+       public Entry getCopy(Slot s) {
+               return new LastMessage(s, machineid, seqnum);
+       }
+}
+
+
diff --git a/src2/java/iotcloud/Liveness.java b/src2/java/iotcloud/Liveness.java
new file mode 100644 (file)
index 0000000..2c840e4
--- /dev/null
@@ -0,0 +1,11 @@
+package iotcloud;
+
+/**
+ * Interface common to both classes that record information about the
+ * last message sent by a machine.  (Either a Slot or a LastMessage.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+interface Liveness {
+}
diff --git a/src2/java/iotcloud/Makefile b/src2/java/iotcloud/Makefile
new file mode 100644 (file)
index 0000000..2d45b63
--- /dev/null
@@ -0,0 +1,17 @@
+all: server
+
+JAVAC = javac
+JAVADOC = javadoc
+BIN_DIR = bin
+DOCS_DIR = docs
+
+server:
+       $(JAVAC) -d $(BIN_DIR) *.java
+
+doc: server
+       $(JAVADOC) -private -d $(DOCS_DIR) *.java
+
+clean:
+       rm -r bin/*
+       rm -r docs/*
+       rm *~
diff --git a/src2/java/iotcloud/NewKey.java b/src2/java/iotcloud/NewKey.java
new file mode 100644 (file)
index 0000000..c101c0b
--- /dev/null
@@ -0,0 +1,57 @@
+package iotcloud;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This Entry records the abort sent by a given machine.
+ * @author Ali Younis <ayounis@uci.edu>
+ * @version 1.0
+ */
+
+
+class NewKey extends Entry {
+       private IoTString key;
+       private long machineid;
+
+       public NewKey(Slot slot, IoTString _key, long _machineid) {
+               super(slot);
+               key = _key;
+               machineid = _machineid;
+       }
+
+       public long getMachineID() {
+               return machineid;
+       }
+
+       public IoTString getKey() {
+               return key;
+       }
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               int keylength = bb.getInt();
+               byte[] key = new byte[keylength];
+               bb.get(key);
+               long machineid = bb.getLong();
+
+               return new NewKey(slot, IoTString.shallow(key), machineid);
+       }
+
+       public void encode(ByteBuffer bb) {
+               bb.put(Entry.TypeAbort);
+               bb.putInt(key.length());
+               bb.put(key.internalBytes());
+               bb.putLong(machineid);
+       }
+
+       public int getSize() {
+               return Long.BYTES + Byte.BYTES + key.length();
+       }
+
+       public byte getType() {
+               return Entry.TypeNewKey;
+       }
+
+       public Entry getCopy(Slot s) {
+               return new NewKey(s, key, machineid);
+       }
+}
\ No newline at end of file
diff --git a/src2/java/iotcloud/Pair.java b/src2/java/iotcloud/Pair.java
new file mode 100644 (file)
index 0000000..73ed6bd
--- /dev/null
@@ -0,0 +1,23 @@
+package iotcloud;
+
+class Pair<A,B> {
+       private A a;
+       private B b;
+
+       Pair(A a, B b) {
+               this.a=a;
+               this.b=b;
+       }
+
+       A getFirst() {
+               return a;
+       }
+
+       B getSecond() {
+               return b;
+       }
+
+       public String toString() {
+               return "<"+a+","+b+">";
+       }
+}
diff --git a/src2/java/iotcloud/PendingTransaction.java b/src2/java/iotcloud/PendingTransaction.java
new file mode 100644 (file)
index 0000000..5253a94
--- /dev/null
@@ -0,0 +1,92 @@
+package iotcloud;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import javax.script.ScriptException;
+import java.lang.NullPointerException;
+
+
+class PendingTransaction {
+
+    static final byte Equal = Guard.Equal;
+    static final byte NotEqual = Guard.NotEqual;
+
+    private Set<KeyValue> keyValueUpdateSet;
+    private Guard guard;
+
+    public PendingTransaction() {
+        keyValueUpdateSet = new HashSet<KeyValue>();
+        guard = new Guard();
+    }
+
+    /**
+     * Add a new key value to the updates
+     *
+     */
+    public void addKV(KeyValue newKV) {
+
+        // Make sure there are no duplicates
+        for (KeyValue kv : keyValueUpdateSet) {
+            if (kv.getKey().equals(newKV.getKey())) {
+
+                // Remove key if we are adding a newer version of the same key
+                keyValueUpdateSet.remove(kv);
+                break;
+            }
+        }
+
+        // Add the key to the hash set
+        keyValueUpdateSet.add(newKV);
+    }
+
+    /**
+     * Get the key value update set
+     *
+     */
+    public Set<KeyValue> getKVUpdates() {
+        return keyValueUpdateSet;
+    }
+
+    /**
+    * Get the guard
+    *
+    */
+    public Guard getGuard() {
+        return guard;
+    }
+
+    /**
+     * Add a guard to this transaction
+     *
+     */
+    public void addGuard(Guard _guard) {
+        guard = _guard;
+    }
+
+    /**
+     * Evaluate the guard expression for a given transaction using a set of key value pairs.
+     *
+     */
+    public boolean evaluate(Set<KeyValue> kvSet) throws ScriptException, NullPointerException {
+
+        // Evaluate the guard using the current KV Set
+        return guard.evaluate(kvSet);
+    }
+
+    /**
+     * Add a boolean expression to the guard.
+     *
+     */
+    public void setGuardExpression(String expr) {
+        guard.setGuardExpression(expr);
+    }
+
+    /**
+     * Trampoline static method.
+     *
+     */
+    public static String createExpression(IoTString keyName, IoTString keyValue, byte op) {
+        return Guard.createExpression(keyName, keyValue, op);
+    }
+}
\ No newline at end of file
diff --git a/src2/java/iotcloud/RejectedMessage.java b/src2/java/iotcloud/RejectedMessage.java
new file mode 100644 (file)
index 0000000..9c84f18
--- /dev/null
@@ -0,0 +1,88 @@
+package iotcloud;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+
+/**
+ * Entry for tracking messages that the server rejected.  We have to
+ * make sure that all clients know that this message was rejected to
+ * prevent the server from reusing these messages in an attack.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
+class RejectedMessage extends Entry {
+       /* Machine identifier */
+       private long machineid;
+       /* Oldest sequence number in range */
+       private long oldseqnum;
+       /* Newest sequence number in range */
+       private long newseqnum;
+       /* Is the machine identifier of the relevant slots equal to (or not
+        * equal to) the specified machine identifier. */
+       private boolean equalto;
+       /* Set of machines that have not received notification. */
+       private HashSet<Long> watchset;
+
+       RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
+               super(slot);
+               machineid=_machineid;
+               oldseqnum=_oldseqnum;
+               newseqnum=_newseqnum;
+               equalto=_equalto;
+       }
+
+       long getOldSeqNum() {
+               return oldseqnum;
+       }
+
+       long getNewSeqNum() {
+               return newseqnum;
+       }
+
+       boolean getEqual() {
+               return equalto;
+       }
+
+       long getMachineID() {
+               return machineid;
+       }
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               long machineid=bb.getLong();
+               long oldseqnum=bb.getLong();
+               long newseqnum=bb.getLong();
+               byte equalto=bb.get();
+               return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1);
+       }
+
+       void setWatchSet(HashSet<Long> _watchset) {
+               watchset=_watchset;
+       }
+
+       void removeWatcher(long machineid) {
+               if (watchset.remove(machineid))
+                       if (watchset.isEmpty())
+                               setDead();
+       }
+
+       void encode(ByteBuffer bb) {
+               bb.put(Entry.TypeRejectedMessage);
+               bb.putLong(machineid);
+               bb.putLong(oldseqnum);
+               bb.putLong(newseqnum);
+               bb.put(equalto?(byte)1:(byte)0);
+       }
+
+       int getSize() {
+               return 3*Long.BYTES + 2*Byte.BYTES;
+       }
+
+       byte getType() {
+               return Entry.TypeRejectedMessage;
+       }
+       
+       Entry getCopy(Slot s) {
+               return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto);
+       }
+}
diff --git a/src2/java/iotcloud/Slot.java b/src2/java/iotcloud/Slot.java
new file mode 100644 (file)
index 0000000..5828fd3
--- /dev/null
@@ -0,0 +1,214 @@
+package iotcloud;
+import java.util.Vector;
+import java.nio.ByteBuffer;
+import javax.crypto.Mac;
+import java.util.Arrays;
+
+/**
+ * Data structuring for holding Slot information.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+class Slot implements Liveness {
+       /** Sets the slot size. */
+       static final int SLOT_SIZE=2048;
+       /** Sets the size for the HMAC. */
+       static final int HMAC_SIZE=32;
+
+       /** Sequence number of the slot. */
+       private long seqnum;
+       /** HMAC of previous slot. */
+       private byte[] prevhmac;
+       /** HMAC of this slot. */
+       private byte[] hmac;
+       /** Machine that sent this slot. */
+       private long machineid;
+       /** Vector of entries in this slot. */
+       private Vector<Entry> entries;
+       /** Pieces of information that are live. */
+       private int livecount;
+       /** Flag that indicates whether this slot is still live for
+        * recording the machine that sent it. */
+       private boolean seqnumlive;
+       /** Number of bytes of free space. */
+       private int freespace;
+       /** Reference to Table */
+       private Table table;
+
+       Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
+               seqnum=_seqnum;
+               machineid=_machineid;
+               prevhmac=_prevhmac;
+               hmac=_hmac;
+               entries=new Vector<Entry>();
+               livecount=1;
+               seqnumlive=true;
+               freespace = SLOT_SIZE - getBaseSize();
+               table=_table;
+       }
+
+       Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac) {
+               this(_table, _seqnum, _machineid, _prevhmac, null);
+       }
+
+       Slot(Table _table, long _seqnum, long _machineid) {
+               this(_table, _seqnum, _machineid, new byte[HMAC_SIZE], null);
+       }
+
+       byte[] getHMAC() {
+               return hmac;
+       }
+
+       byte[] getPrevHMAC() {
+               return prevhmac;
+       }
+
+       void addEntry(Entry e) {
+               e=e.getCopy(this);
+               entries.add(e);
+               livecount++;
+               freespace -= e.getSize();
+       }
+
+       private void addShallowEntry(Entry e) {
+               entries.add(e);
+               livecount++;
+               freespace -= e.getSize();
+       }
+
+       /**
+        * Returns true if the slot has free space to hold the entry without
+        * using its reserved space. */
+
+       boolean hasSpace(Entry e) {
+               int newfreespace = freespace - e.getSize();
+               return newfreespace >= 0;
+       }
+
+       Vector<Entry> getEntries() {
+               return entries;
+       }
+
+       static Slot decode(Table table, byte[] array, Mac mac) {
+               mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
+               byte[] realmac=mac.doFinal();
+
+               ByteBuffer bb=ByteBuffer.wrap(array);
+               byte[] hmac=new byte[HMAC_SIZE];
+               byte[] prevhmac=new byte[HMAC_SIZE];
+               bb.get(hmac);
+               bb.get(prevhmac);
+               if (!Arrays.equals(realmac, hmac))
+                       throw new Error("Server Error: Invalid HMAC!  Potential Attack!");
+
+               long seqnum=bb.getLong();
+               long machineid=bb.getLong();
+               int numentries=bb.getInt();
+               Slot slot=new Slot(table, seqnum, machineid, prevhmac, hmac);
+
+               for(int i=0; i<numentries; i++) {
+                       slot.addShallowEntry(Entry.decode(slot, bb));
+               }
+
+               return slot;
+       }
+
+       byte[] encode(Mac mac) {
+               byte[] array=new byte[SLOT_SIZE];
+               ByteBuffer bb=ByteBuffer.wrap(array);
+               /* Leave space for the slot HMAC.  */
+               bb.position(HMAC_SIZE);
+               bb.put(prevhmac);
+               bb.putLong(seqnum);
+               bb.putLong(machineid);
+               bb.putInt(entries.size());
+               for(Entry entry:entries) {
+                       entry.encode(bb);
+               }
+               /* Compute our HMAC */
+               mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
+               byte[] realmac=mac.doFinal();
+               hmac = realmac;
+               bb.position(0);
+               bb.put(realmac);
+               return array;
+       }
+
+       /**
+        * Returns the empty size of a Slot. Includes 2 HMACs, the machine
+        * identifier, the sequence number, and the number of entries.
+        */
+       int getBaseSize() {
+               return 2*HMAC_SIZE+2*Long.BYTES+Integer.BYTES;
+       }
+
+       /**
+        * Returns the live set of entries for this Slot.  Generates a fake
+        * LastMessage entry to represent the information stored by the slot
+        * itself.
+        */
+
+       Vector<Entry> getLiveEntries(boolean resize) {
+               Vector<Entry> liveEntries=new Vector<Entry>();
+               for(Entry entry: entries) {
+                       if (entry.isLive()) {
+                               if (!resize || entry.getType() != Entry.TypeTableStatus)
+                                       liveEntries.add(entry);
+                       }
+               }
+                       
+               if (seqnumlive && !resize)
+                       liveEntries.add(new LastMessage(this, machineid, seqnum));
+
+               return liveEntries;
+       }
+
+       /**
+        * Returns the sequence number of the slot.
+        */
+
+       long getSequenceNumber() {
+               return seqnum;
+       }
+
+       /**
+        * Returns the machine that sent this slot.
+        */
+
+       long getMachineID() {
+               return machineid;
+       }
+
+       /**
+        * Records that a newer slot records the fact that this slot was
+        * sent by the relevant machine.
+        */
+
+       void setDead() {
+               seqnumlive=false;
+               decrementLiveCount();
+       }
+
+       /**
+        * Update the count of live entries.
+        */
+
+       void decrementLiveCount() {
+               livecount--;
+               if (livecount==0)
+                       table.decrementLiveCount();
+       }
+
+       /**
+        * Returns whether the slot stores any live information.
+        */
+
+       boolean isLive() {
+               return livecount > 0;
+       }
+
+       public String toString() {
+               return "<"+getSequenceNumber()+">";
+       }
+}
diff --git a/src2/java/iotcloud/SlotBuffer.java b/src2/java/iotcloud/SlotBuffer.java
new file mode 100644 (file)
index 0000000..14bc926
--- /dev/null
@@ -0,0 +1,99 @@
+package iotcloud;
+
+/**
+ * Circular buffer that holds the live set of slots.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+class SlotBuffer {
+       static final int DEFAULT_SIZE = 128;
+
+       private Slot[] array;
+       private int head;
+       private int tail;
+       private long oldestseqn;
+
+       SlotBuffer() {
+               array=new Slot[DEFAULT_SIZE+1];
+               head=tail=0;
+               oldestseqn=0;
+       }
+
+       int size() {
+               if (head >= tail)
+                       return head - tail;
+               return (array.length + head) - tail;
+       }
+
+       int capacity() {
+               return array.length - 1;
+       }
+
+       void resize(int newsize) {
+               if (newsize == (array.length-1))
+                       return;
+               Slot[] newarray = new Slot[newsize+1];
+               int currsize = size();
+               int index = tail;
+               for(int i=0; i < currsize; i++) {
+                       newarray[i] = array[index];
+                       if ((++index) == array.length)
+                               index = 0;
+               }
+               array = newarray;
+               tail = 0;
+               head = currsize;
+       }
+
+       private void incrementHead() {
+               head++;
+               if (head >= array.length)
+                       head=0;
+       }
+
+       private void incrementTail() {
+               tail++;
+               if (tail >= array.length)
+                       tail=0;
+       }
+
+       void putSlot(Slot s) {
+               array[head]=s;
+               incrementHead();
+
+               if (oldestseqn==0)
+                       oldestseqn = s.getSequenceNumber();
+
+               if (head==tail) {
+                       incrementTail();
+                       oldestseqn++;
+               }
+       }
+
+       Slot getSlot(long seqnum) {
+               int diff=(int) (seqnum-oldestseqn);
+               int index=diff + tail;
+               if (index >= array.length) {
+                       if (head >= tail)
+                               return null;
+                       index-= array.length;
+               }
+
+               if (index >= array.length)
+                       return null;
+
+               if (head >= tail && index >= head)
+                       return null;
+
+               return array[index];
+       }
+
+       long getOldestSeqNum() {
+               return oldestseqn;
+       }
+
+       long getNewestSeqNum() {
+               return oldestseqn + size() - 1;
+       }
+}
diff --git a/src2/java/iotcloud/SlotIndexer.java b/src2/java/iotcloud/SlotIndexer.java
new file mode 100644 (file)
index 0000000..cecdf2d
--- /dev/null
@@ -0,0 +1,31 @@
+package iotcloud;
+
+/**
+ * Slot indexer allows slots in both the slot buffer and the new
+ * server response to looked up in a consistent fashion.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+class SlotIndexer {
+       private Slot[] updates;
+       private SlotBuffer buffer;
+       private long firstslotseqnum;
+
+       SlotIndexer(Slot[] _updates, SlotBuffer _buffer) {
+               buffer = _buffer;
+               updates = _updates;
+               firstslotseqnum = updates[0].getSequenceNumber();
+       }
+
+       Slot getSlot(long seqnum) {
+               if (seqnum >= firstslotseqnum) {
+                       int offset = (int) (seqnum - firstslotseqnum);
+                       if (offset >= updates.length)
+                               throw new Error("Invalid Slot Sequence Number Reference");
+                       else
+                               return updates[offset];
+               } else
+                       return buffer.getSlot(seqnum);
+       }
+}
diff --git a/src2/java/iotcloud/Table.java b/src2/java/iotcloud/Table.java
new file mode 100644 (file)
index 0000000..7009ed9
--- /dev/null
@@ -0,0 +1,510 @@
+package iotcloud;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.Random;
+import java.util.Queue;
+import java.util.LinkedList;
+/**
+ * IoTTable data structure.  Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+final public class Table {
+       private int numslots;   //number of slots stored in buffer
+
+       //table of key-value pairs
+       private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
+
+       // machine id -> (sequence number, Slot or LastMessage); records last message by each client
+       private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
+       // machine id -> ...
+       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+       private Vector<Long> rejectedmessagelist = new Vector<Long>();
+       private SlotBuffer buffer;
+       private CloudComm cloud;
+       private long sequencenumber; //Largest sequence number a client has received
+       private long localmachineid;
+       private TableStatus lastTableStatus;
+       static final int FREE_SLOTS = 10; //number of slots that should be kept free
+       static final int SKIP_THRESHOLD = 10;
+       private long liveslotcount = 0;
+       private int chance;
+       static final double RESIZE_MULTIPLE = 1.2;
+       static final double RESIZE_THRESHOLD = 0.75;
+       static final int REJECTED_THRESHOLD = 5;
+       private int resizethreshold;
+       private long lastliveslotseqn;  //smallest sequence number with a live entry
+       private Random random = new Random();
+
+       private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
+       private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+
+
+       public Table(String baseurl, String password, long _localmachineid) {
+               localmachineid = _localmachineid;
+               buffer = new SlotBuffer();
+               numslots = buffer.capacity();
+               setResizeThreshold();
+               sequencenumber = 0;
+               cloud = new CloudComm(this, baseurl, password);
+               lastliveslotseqn = 1;
+
+               pendingTransQueue = new LinkedList<PendingTransaction>();
+       }
+
+       public Table(CloudComm _cloud, long _localmachineid) {
+               localmachineid = _localmachineid;
+               buffer = new SlotBuffer();
+               numslots = buffer.capacity();
+               setResizeThreshold();
+               sequencenumber = 0;
+               cloud = _cloud;
+
+               pendingTransQueue = new LinkedList<PendingTransaction>();
+       }
+
+       public void rebuild() {
+               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+               validateandupdate(newslots, true);
+       }
+
+       public void update() {
+               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+
+               validateandupdate(newslots, false);
+       }
+
+       public IoTString get(IoTString key) {
+               KeyValue kv = table.get(key);
+               if (kv != null)
+                       return kv.getValue();
+               else
+                       return null;
+       }
+
+       public void initTable() {
+               cloud.setSalt();//Set the salt
+               Slot s = new Slot(this, 1, localmachineid);
+               TableStatus status = new TableStatus(s, numslots);
+               s.addEntry(status);
+               Slot[] array = cloud.putSlot(s, numslots);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       /* update data structure */
+                       validateandupdate(array, true);
+               } else {
+                       throw new Error("Error on initialization");
+               }
+       }
+
+       public String toString() {
+               return table.toString();
+       }
+
+       public void startTransaction() {
+               // Create a new transaction, invalidates any old pending transactions.
+               pendingTransBuild = new PendingTransaction();
+       }
+
+       public void commitTransaction() {
+
+               // Add the pending transaction to the queue
+               pendingTransQueue.add(pendingTransBuild);
+
+               while (!pendingTransQueue.isEmpty()) {
+                       if (tryput( pendingTransQueue.peek(), false)) {
+                               pendingTransQueue.poll();
+                       }
+               }
+       }
+
+       public void addKV(IoTString key, IoTString value) {
+               KeyValue kv = new KeyValue(key, value);
+               pendingTransBuild.addKV(kv);
+       }
+
+       public void addGuard(IoTString key, IoTString value) {
+               KeyValue kv = new KeyValue(key, value);
+               pendingTransBuild.addKV(kv);
+       }
+
+
+
+
+
+
+       void decrementLiveCount() {
+               liveslotcount--;
+       }
+
+
+       private void setResizeThreshold() {
+               int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
+               resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
+       }
+
+       private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
+               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+               int newsize = 0;
+               if (liveslotcount > resizethreshold) {
+                       resize = true; //Resize is forced
+               }
+
+               if (resize) {
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
+               }
+
+               if (! rejectedmessagelist.isEmpty()) {
+                       /* TODO: We should avoid generating a rejected message entry if
+                        * there is already a sufficient entry in the queue (e.g.,
+                        * equalsto value of true and same sequence number).  */
+
+                       long old_seqn = rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn = rejectedmessagelist.lastElement();
+                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn = -1;
+                               int i = 0;
+                               /* Go through list of missing messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       if (s_msg != null)
+                                               break;
+                                       prev_seqn = curr_seqn;
+                               }
+                               /* Generate rejected message entry for missing messages */
+                               if (prev_seqn != -1) {
+                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                               /* Generate rejected message entries for present messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       long machineid = s_msg.getMachineID();
+                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+
+               long newestseqnum = buffer.getNewestSeqNum();
+               long oldestseqnum = buffer.getOldestSeqNum();
+               if (lastliveslotseqn < oldestseqnum)
+                       lastliveslotseqn = oldestseqnum;
+
+               long seqn = lastliveslotseqn;
+               boolean seenliveslot = false;
+               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
+
+               for (; seqn < threshold; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (! seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (! prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry)) {
+                                       s.addEntry(liveentry);
+                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+                                       if (!resize) {
+                                               System.out.print("B"); //?
+                                               return tryput(pendingTrans, true);
+                                       }
+                               }
+                       }
+               }
+
+
+               Transaction trans = new Transaction(s,
+                                                   s.getSequenceNumber(),
+                                                   localmachineid,
+                                                   pendingTrans.getKVUpdates(),
+                                                   pendingTrans.getGuard());
+               boolean insertedTrans = false;
+               if (s.hasSpace(trans)) {
+                       s.addEntry(trans);
+                       insertedTrans=true;
+               }
+
+               /* now go through live entries from least to greatest sequence number until
+                * either all live slots added, or the slot doesn't have enough room
+                * for SKIP_THRESHOLD consecutive entries*/
+               int skipcount = 0;
+               search:
+               for (; seqn <= newestseqnum; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (!seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (!prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry))
+                                       s.addEntry(liveentry);
+                               else {
+                                       skipcount++;
+                                       if (skipcount > SKIP_THRESHOLD)
+                                               break search;
+                               }
+                       }
+               }
+
+               int max = 0;
+               if (resize)
+                       max = newsize;
+               Slot[] array = cloud.putSlot(s, max);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       rejectedmessagelist.clear();
+               }       else {
+                       if (array.length == 0)
+                               throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
+                       insertedTrans = false;
+               }
+
+               /* update data structure */
+               validateandupdate(array, true);
+
+               return insertedTrans;
+       }
+
+       private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+               /* The cloud communication layer has checked slot HMACs already
+                        before decoding */
+               if (newslots.length == 0) return;
+
+               long firstseqnum = newslots[0].getSequenceNumber();
+               if (firstseqnum <= sequencenumber)
+                       throw new Error("Server Error: Sent older slots!");
+
+               SlotIndexer indexer = new SlotIndexer(newslots, buffer);
+               checkHMACChain(indexer, newslots);
+
+               HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
+
+               initExpectedSize(firstseqnum);
+               for (Slot slot : newslots) {
+                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
+                       updateExpectedSize();
+               }
+
+               /* If there is a gap, check to see if the server sent us everything. */
+               if (firstseqnum != (sequencenumber + 1)) {
+                       checkNumSlots(newslots.length);
+                       if (!machineSet.isEmpty())
+                               throw new Error("Missing record for machines: " + machineSet);
+               }
+
+               commitNewMaxSize();
+
+               /* Commit new to slots. */
+               for (Slot slot : newslots) {
+                       buffer.putSlot(slot);
+                       liveslotcount++;
+               }
+               sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+       }
+
+       private int expectedsize, currmaxsize;
+
+       private void checkNumSlots(int numslots) {
+               if (numslots != expectedsize)
+                       throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
+       }
+
+       private void initExpectedSize(long firstsequencenumber) {
+               long prevslots = firstsequencenumber;
+               expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
+               currmaxsize = numslots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+               if (expectedsize > currmaxsize)
+                       expectedsize = currmaxsize;
+       }
+
+       private void updateCurrMaxSize(int newmaxsize) {
+               currmaxsize = newmaxsize;
+       }
+
+       private void commitNewMaxSize() {
+               if (numslots != currmaxsize)
+                       buffer.resize(currmaxsize);
+
+               numslots = currmaxsize;
+               setResizeThreshold();
+       }
+
+       // private void processEntry(KeyValue entry, SlotIndexer indexer) {
+       //      IoTString key=entry.getKey();
+       //      KeyValue oldvalue=table.get(key);
+       //      if (oldvalue != null) {
+       //              oldvalue.setDead();
+       //      }
+       //      table.put(key, entry);
+       // }
+
+       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
+       }
+
+       private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+               long oldseqnum = entry.getOldSeqNum();
+               long newseqnum = entry.getNewSeqNum();
+               boolean isequal = entry.getEqual();
+               long machineid = entry.getMachineID();
+               for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
+                       Slot slot = indexer.getSlot(seqnum);
+                       if (slot != null) {
+                               long slotmachineid = slot.getMachineID();
+                               if (isequal != (slotmachineid == machineid)) {
+                                       throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
+                               }
+                       }
+               }
+
+               HashSet<Long> watchset = new HashSet<Long>();
+               for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
+                       long entry_mid = lastmsg_entry.getKey();
+                       /* We've seen it, don't need to continue to watch.  Our next
+                        * message will implicitly acknowledge it. */
+                       if (entry_mid == localmachineid)
+                               continue;
+                       Pair<Long, Liveness> v = lastmsg_entry.getValue();
+                       long entry_seqn = v.getFirst();
+                       if (entry_seqn < newseqnum) {
+                               addWatchList(entry_mid, entry);
+                               watchset.add(entry_mid);
+                       }
+               }
+               if (watchset.isEmpty())
+                       entry.setDead();
+               else
+                       entry.setWatchSet(watchset);
+       }
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries = watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+               entries.add(entry);
+       }
+
+       private void processEntry(TableStatus entry, SlotIndexer indexer) {
+               int newnumslots = entry.getMaxSlots();
+               updateCurrMaxSize(newnumslots);
+               if (lastTableStatus != null)
+                       lastTableStatus.setDead();
+               lastTableStatus = entry;
+       }
+
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               machineSet.remove(machineid);
+
+               HashSet<RejectedMessage> watchset = watchlist.get(machineid);
+               if (watchset != null) {
+                       for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
+                               RejectedMessage rm = rmit.next();
+                               if (rm.getNewSeqNum() <= seqnum) {
+                                       /* Remove it from our watchlist */
+                                       rmit.remove();
+                                       /* Decrement machines that need to see this notification */
+                                       rm.removeWatcher(machineid);
+                               }
+                       }
+               }
+
+               if (machineid == localmachineid) {
+                       /* Our own messages are immediately dead. */
+                       if (liveness instanceof LastMessage) {
+                               ((LastMessage)liveness).setDead();
+                       } else if (liveness instanceof Slot) {
+                               ((Slot)liveness).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
+               }
+
+
+               Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+               if (lastmsgentry == null)
+                       return;
+
+               long lastmsgseqnum = lastmsgentry.getFirst();
+               Liveness lastentry = lastmsgentry.getSecond();
+               if (machineid != localmachineid) {
+                       if (lastentry instanceof LastMessage) {
+                               ((LastMessage)lastentry).setDead();
+                       } else if (lastentry instanceof Slot) {
+                               ((Slot)lastentry).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
+               }
+
+               if (machineid == localmachineid) {
+                       if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
+                               throw new Error("Server Error: Mismatch on local machine sequence number");
+               } else {
+                       if (lastmsgseqnum > seqnum)
+                               throw new Error("Server Error: Rollback on remote machine sequence number");
+               }
+       }
+
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
+               for (Entry entry : slot.getEntries()) {
+                       switch (entry.getType()) {
+                       // case Entry.TypeKeyValue:
+                       //      processEntry((KeyValue)entry, indexer);
+                       //      break;
+
+                       case Entry.TypeLastMessage:
+                               processEntry((LastMessage)entry, indexer, machineSet);
+                               break;
+
+                       case Entry.TypeRejectedMessage:
+                               processEntry((RejectedMessage)entry, indexer);
+                               break;
+
+                       case Entry.TypeTableStatus:
+                               processEntry((TableStatus)entry, indexer);
+                               break;
+
+                       default:
+                               throw new Error("Unrecognized type: " + entry.getType());
+                       }
+               }
+       }
+
+       private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+               for (int i = 0; i < newslots.length; i++) {
+                       Slot currslot = newslots[i];
+                       Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
+                       if (prevslot != null &&
+                               !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
+                               throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
+               }
+       }
+}
diff --git a/src2/java/iotcloud/TableStatus.java b/src2/java/iotcloud/TableStatus.java
new file mode 100644 (file)
index 0000000..62f3a6d
--- /dev/null
@@ -0,0 +1,45 @@
+package iotcloud;
+import java.nio.ByteBuffer;
+
+/**
+ * TableStatus entries record the current size of the data structure
+ * in slots.  Used to remember the size and to perform resizes.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
+class TableStatus extends Entry {
+       private int maxslots;
+
+       TableStatus(Slot slot, int _maxslots) {
+               super(slot);
+               maxslots=_maxslots;
+       }
+
+       int getMaxSlots() {
+               return maxslots;
+       }
+
+       static Entry decode(Slot slot, ByteBuffer bb) {
+               int maxslots=bb.getInt();
+               return new TableStatus(slot, maxslots);
+       }
+
+       void encode(ByteBuffer bb) {
+               bb.put(Entry.TypeTableStatus);
+               bb.putInt(maxslots);
+       }
+
+       int getSize() {
+               return Integer.BYTES+Byte.BYTES;
+       }
+
+       byte getType() {
+               return Entry.TypeTableStatus;
+       }
+
+       Entry getCopy(Slot s) {
+               return new TableStatus(s, maxslots);
+       }
+}
diff --git a/src2/java/iotcloud/Test.java b/src2/java/iotcloud/Test.java
new file mode 100644 (file)
index 0000000..e70d88e
--- /dev/null
@@ -0,0 +1,95 @@
+package iotcloud;
+
+/**
+ * Test cases.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+public class Test {
+       public static void main(String[] args) {
+               // if(args[0].equals("2"))
+               //      test2();
+               // else if(args[0].equals("3"))
+               //      test3();
+               // else if(args[0].equals("4"))
+               //      test4();
+               // else if(args[0].equals("5"))
+               //      test5();
+
+       }
+
+       
+       
+       // static Thread buildThread(String prefix, Table t) {
+       //      return new Thread() {
+       //              public void run() {
+       //                      for(int i=0; i<10000; i++) {
+       //                              String a=prefix+i;
+       //                              IoTString ia=new IoTString(a);
+       //                              t.put(ia, ia);
+       //                              System.out.println(ia+"->"+t.get(ia));
+       //                      }
+       //              }
+       //      };
+       // }
+       
+       // static void test5() {
+       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+       //      t1.rebuild();
+       //      System.out.println(t1);
+       // }
+       
+       // static void test4() {
+       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+       //      Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+       //      t1.rebuild();
+       //      t2.rebuild();
+       //      Thread thr1=buildThread("p1", t1);
+       //      Thread thr2=buildThread("p2", t2);
+       //      thr1.start();
+       //      thr2.start();
+       //      try {
+       //              thr1.join();
+       //              thr2.join();
+       //      } catch (Exception e) {
+       //              e.printStackTrace();
+       //      }
+       // }
+
+       // static void test3() {
+       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+       //      Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+       //      t1.rebuild();
+       //      t2.rebuild();
+       //      for(int i=0; i<600; i++) {
+       //              String a="STR"+i;
+       //              String b="ABR"+i;
+       //              IoTString ia=new IoTString(a);
+       //              IoTString ib=new IoTString(b);
+       //              t1.put(ia, ia);
+       //              t2.put(ib, ib);
+       //              t1.update();
+       //              System.out.println(ib+"->"+t1.get(ib));
+       //              System.out.println(ia+"->"+t2.get(ia));
+       //      }
+       // }
+
+       // static void test2() {
+       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+       //      t1.initTable();
+       //      Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+       //      t2.update();
+       //      for(int i=0; i<600; i++) {
+       //              String a="STR"+i;
+       //              String b="ABR"+i;
+       //              IoTString ia=new IoTString(a);
+       //              IoTString ib=new IoTString(b);
+       //              t1.put(ia, ia);
+       //              t2.put(ib, ib);
+       //              t1.update();
+       //              System.out.println(ib+"->"+t1.get(ib));
+       //              System.out.println(ia+"->"+t2.get(ia));
+       //      }
+       // }
+}
diff --git a/src2/java/iotcloud/Transaction.java b/src2/java/iotcloud/Transaction.java
new file mode 100644 (file)
index 0000000..d1a9933
--- /dev/null
@@ -0,0 +1,84 @@
+package iotcloud;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.HashSet;
+
+class Transaction extends Entry {
+
+    private long seqnum;
+    private long machineid;
+    private Set<KeyValue> keyValueUpdateSet;
+    private Guard guard;
+
+    public Transaction(Slot slot, long _seqnum, long _machineid, Set<KeyValue> _keyValueUpdateSet, Guard _guard) {
+        super(slot);
+        seqnum = _seqnum;
+        machineid = _machineid;
+        keyValueUpdateSet = _keyValueUpdateSet;
+        guard = _guard;
+    }
+
+    public long getMachineID() {
+        return machineid;
+    }
+
+    public long getSequenceNumber() {
+        return seqnum;
+    }
+
+    public byte getType() {
+        return Entry.TypeLastMessage;
+    }
+
+    public int getSize() {
+        int size = 2 * Long.BYTES + Byte.BYTES; // seq, machine id, entry type
+        size += Integer.BYTES; // number of KV's
+
+        // Size of each KV
+        for (KeyValue kv : keyValueUpdateSet) {
+            size += kv.getSize();
+        }
+
+        // Size of the guard
+        size += guard.getSize();
+
+        return size;
+    }
+
+
+    public void encode(ByteBuffer bb) {
+        bb.put(Entry.TypeTransaction);
+        bb.putLong(seqnum);
+        bb.putLong(machineid);
+
+        for (KeyValue kv : keyValueUpdateSet) {
+            kv.encode(bb);
+        }
+
+        guard.encode(bb);
+    }
+
+    static Entry decode(Slot slot, ByteBuffer bb) {
+        long seqnum = bb.getLong();
+        long machineid = bb.getLong();
+        int numberOfKeys = bb.getInt();
+
+        Set<KeyValue> kvSet = new HashSet<KeyValue>();
+
+        for (int i = 0; i < numberOfKeys; i++) {
+            KeyValue kv = KeyValue.decode(bb);
+            kvSet.add(kv);
+        }
+
+        Guard guard = Guard.decode(bb);
+
+        return new Transaction(slot, seqnum, machineid, kvSet, guard);
+    }
+
+
+
+    public Entry getCopy(Slot s) {
+        return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard);
+    }
+}
\ No newline at end of file
diff --git a/src2/java/iotcloud/issues.txt b/src2/java/iotcloud/issues.txt
new file mode 100644 (file)
index 0000000..4c246b6
--- /dev/null
@@ -0,0 +1,2 @@
+1) add better resizing support...gets stuck when it is full now...
+2) Transaction does not check arbitrator is the same for all keys and guards
\ No newline at end of file
diff --git a/src2/script/C.cfg b/src2/script/C.cfg
new file mode 100644 (file)
index 0000000..6165d83
--- /dev/null
@@ -0,0 +1,37 @@
+indent_with_tabs = 2
+indent_cmt_with_tabs = True
+indent_columns = 2
+indent_class = True
+output_tab_size = 2
+nl_if_brace = Remove
+nl_brace_else = Remove
+nl_elseif_brace = Remove
+nl_struct_brace = Remove
+nl_union_brace = Remove
+nl_fcall_brace = Remove
+nl_for_brace = Remove
+nl_fdef_brace = Remove
+nl_while_brace = Remove
+nl_do_brace = Remove
+nl_brace_while = Remove
+nl_switch_brace = Remove
+nl_before_case = True
+nl_try_brace = Remove
+nl_catch_brace = Remove
+nl_brace_catch = Remove
+sp_func_proto_paren = Remove
+sp_func_def_paren = Remove
+sp_inside_fparens = remove
+sp_inside_fparen = remove
+sp_func_call_paren = Remove
+sp_fparen_brace = Add
+sp_sparen_brace = Add
+sp_paren_brace = Add
+sp_else_brace = Add
+sp_brace_else = Add
+sp_catch_brace = Add
+sp_brace_catch = Add
+sp_try_brace = Add
+sp_after_sparen = Add
+sp_cond_colon = remove
+sp_cond_question = remove
diff --git a/src2/script/java.cfg b/src2/script/java.cfg
new file mode 100644 (file)
index 0000000..6165d83
--- /dev/null
@@ -0,0 +1,37 @@
+indent_with_tabs = 2
+indent_cmt_with_tabs = True
+indent_columns = 2
+indent_class = True
+output_tab_size = 2
+nl_if_brace = Remove
+nl_brace_else = Remove
+nl_elseif_brace = Remove
+nl_struct_brace = Remove
+nl_union_brace = Remove
+nl_fcall_brace = Remove
+nl_for_brace = Remove
+nl_fdef_brace = Remove
+nl_while_brace = Remove
+nl_do_brace = Remove
+nl_brace_while = Remove
+nl_switch_brace = Remove
+nl_before_case = True
+nl_try_brace = Remove
+nl_catch_brace = Remove
+nl_brace_catch = Remove
+sp_func_proto_paren = Remove
+sp_func_def_paren = Remove
+sp_inside_fparens = remove
+sp_inside_fparen = remove
+sp_func_call_paren = Remove
+sp_fparen_brace = Add
+sp_sparen_brace = Add
+sp_paren_brace = Add
+sp_else_brace = Add
+sp_brace_else = Add
+sp_catch_brace = Add
+sp_brace_catch = Add
+sp_try_brace = Add
+sp_after_sparen = Add
+sp_cond_colon = remove
+sp_cond_question = remove
diff --git a/src2/script/makefile b/src2/script/makefile
new file mode 100644 (file)
index 0000000..ac2da6e
--- /dev/null
@@ -0,0 +1,4 @@
+tabbing:
+       uncrustify -c java.cfg --no-backup $$(find .. -name "*.java")
+       uncrustify -c C.cfg --no-backup $$(find .. -name "*.cpp")
+       uncrustify -c C.cfg --no-backup $$(find .. -name "*.h")
diff --git a/src2/server/.dir-locals.el b/src2/server/.dir-locals.el
new file mode 100644 (file)
index 0000000..e166a2e
--- /dev/null
@@ -0,0 +1,2 @@
+((nil . ((indent-tabs-mode . t))))
+
diff --git a/src2/server/Makefile b/src2/server/Makefile
new file mode 100644 (file)
index 0000000..8eee1fa
--- /dev/null
@@ -0,0 +1,15 @@
+CPPFLAGS=-O0 -g -Wall
+
+all: iotcloud.fcgi
+
+iotcloud.fcgi: iotcloud.o iotquery.o
+       g++ $(CPPFLAGS) -o iotcloud.fcgi iotcloud.o iotquery.o -lfcgi -lfcgi++
+
+iotcloud.o: iotcloud.cpp iotquery.h
+       g++ $(CPPFLAGS) -c -o iotcloud.o iotcloud.cpp
+
+iotquery.o: iotquery.cpp iotquery.h
+       g++ $(CPPFLAGS) -c -o iotquery.o iotquery.cpp
+
+clean:
+       rm *.o iotcloud.fcgi
diff --git a/src2/server/README.txt b/src2/server/README.txt
new file mode 100644 (file)
index 0000000..6eb138f
--- /dev/null
@@ -0,0 +1,32 @@
+1) Requires apache2
+2) Requires fastcgi (libapache2-mod-fastcgi and libfcgi-dev)
+
+Setup on ubuntu
+1) Install modules
+
+2) Add .htaccess file in /var/www/html
+RewriteEngine on
+RewriteBase /
+SetHandler cgi-script
+RewriteRule ^([a-zA-Z0-9._]*\.iotcloud/([a-zA-Z0-9._]*))$ /cgi-bin/iotcloud.fcgi/$1
+
+3) Create account directory.  For example, create the directory test.iotcloud in /var/www/html
+   -- To password protect, create the following .htaccess file in the account directory:
+AuthType Basic
+AuthName "Private"
+AuthUserFile /var/www/html/foo.iotcloud/.htpasswd
+Require valid-user
+
+4) In apache2.conf, add to the /var/www directory section:
+AllowOverride FileInfo AuthConfig
+
+5) In the sites-enabled/000-default.conf file, add the line:
+SetEnv IOTCLOUD_ROOT /iotcloud/
+
+6) Create the /iotcloud directory.
+
+7) Create the account directory in the /iotcloud directory.  For example, test.iotcloud and give it permissions that the apache daemon can write to.
+
+8) Compile cloud server by typing make
+
+9) Copy it to the cgi-bin directory.
diff --git a/src2/server/iotcloud.cpp b/src2/server/iotcloud.cpp
new file mode 100644 (file)
index 0000000..bb9eff8
--- /dev/null
@@ -0,0 +1,40 @@
+#include <iostream>
+#include "iotquery.h"
+
+using namespace std;
+
+
+int main(void) {
+       // Backup the stdio streambufs
+       streambuf * cin_streambuf  = cin.rdbuf();
+       streambuf * cout_streambuf = cout.rdbuf();
+       streambuf * cerr_streambuf = cerr.rdbuf();
+
+       FCGX_Request request;
+
+       FCGX_Init();
+       FCGX_InitRequest(&request, 0, 0);
+
+       while (FCGX_Accept_r(&request) == 0) {
+               fcgi_streambuf cin_fcgi_streambuf(request.in);
+               fcgi_streambuf cout_fcgi_streambuf(request.out);
+               fcgi_streambuf cerr_fcgi_streambuf(request.err);
+
+               cin.rdbuf(&cin_fcgi_streambuf);
+               cout.rdbuf(&cout_fcgi_streambuf);
+               cerr.rdbuf(&cerr_fcgi_streambuf);
+
+               IoTQuery * iotquery=new IoTQuery(&request);
+               iotquery->processQuery();
+
+               delete iotquery;
+       }
+
+       // restore stdio streambufs
+       cin.rdbuf(cin_streambuf);
+       cout.rdbuf(cout_streambuf);
+       cerr.rdbuf(cerr_streambuf);
+
+       return 0;
+}
+
diff --git a/src2/server/iotquery.cpp b/src2/server/iotquery.cpp
new file mode 100644 (file)
index 0000000..0b1c4b3
--- /dev/null
@@ -0,0 +1,517 @@
+#include "iotquery.h"
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
+
+using namespace std;
+
+const char * query_str="QUERY_STRING";
+const char * uri_str="REQUEST_URI";
+const char * method_str="REQUEST_METHOD";
+const char * iotcloudroot_str="IOTCLOUD_ROOT";
+const char * length_str="CONTENT_LENGTH";
+
+IoTQuery::IoTQuery(FCGX_Request *request) :
+       request(request),
+       data(NULL),
+       directory(NULL),
+       uri(NULL),
+       query(NULL),
+       method(NULL),
+       iotcloudroot(NULL),
+       length(0),
+       oldestentry(0),
+       newestentry(0),
+       requestsequencenumber(0),
+       numqueueentries(DEFAULT_SIZE),
+       fd(-1),
+       reqGetSlot(false),
+       reqPutSlot(false),
+       reqSetSalt(false),
+       reqGetSalt(false) {
+}
+
+IoTQuery::~IoTQuery() {
+       if (fd >= 0)
+               close(fd);
+       if (directory)
+               delete directory;
+       if (data)
+               delete data;
+}
+
+/**
+ *  Returns true if the account directory exists.
+ */
+
+bool IoTQuery::checkDirectory() {
+       struct stat s;
+       int err=stat(directory, &s);
+       if (-1 == err)
+               return false;
+       return S_ISDIR(s.st_mode);
+}
+
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
+void IoTQuery::decodeQuery() {
+       int len=strlen(query);
+       char * str=new char[len+1];
+       memcpy(str, query, len+1);
+       char *tok_ptr=str;
+       
+       /* Parse commands */
+       char *command=strsep(&tok_ptr, "&");
+       if (strncmp(command, "req=putslot", 11) == 0)
+               reqPutSlot = true;
+       else if (strncmp(command, "req=getslot", 11) == 0)
+               reqGetSlot = true;
+       else if (strncmp(command, "req=setsalt", 11) == 0)
+               reqSetSalt = true;
+       else if (strncmp(command, "req=getsalt", 11) == 0)
+               reqGetSalt = true;
+
+       /* Load Sequence Number for request */
+       char *sequencenumber_str = strsep(&tok_ptr, "&");
+       if (sequencenumber_str != NULL &&
+                       strncmp(sequencenumber_str, "seq=", 4) == 0) {
+               sequencenumber_str = strchr(sequencenumber_str, '=');
+               if (sequencenumber_str != NULL) {
+                       requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+               }
+       }
+
+       /* don't allow a really old sequence number */
+       if (requestsequencenumber < oldestentry)
+               requestsequencenumber = oldestentry;
+
+       /* Update size if we get request */
+       char * numqueueentries_str = tok_ptr;
+       if (numqueueentries_str != NULL &&
+                       strncmp(numqueueentries_str, "max=", 4) == 0) {
+               numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
+               numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+       }
+
+       delete str;
+}
+
+/**
+ * Helper function to write data to file.
+ */
+
+void doWrite(int fd, char *data, long long length) {
+       long long offset=0;
+       do {
+               long long byteswritten=write(fd, &data[offset], length);
+               if (byteswritten > 0) {
+                       length -= byteswritten;
+                       offset += byteswritten;
+               } else {
+                       cerr << "Bytes not written" << endl;
+                       if (byteswritten < 0) {
+                               cerr << strerror(errno) << " error writing slot file" << endl;
+                       }
+                       return;
+               }
+       } while(length != 0);
+}
+
+/** Helper function to read data from file. */
+bool doRead(int fd, void *buf, int numbytes) {
+       int offset=0;
+       char *ptr=(char *)buf;
+       do {
+               int bytesread=read(fd, ptr+offset, numbytes);
+               if (bytesread > 0) {
+                       offset += bytesread;
+                       numbytes -= bytesread;
+               } else
+                       return false;
+       } while (numbytes!=0);
+       return true;
+}
+
+/**
+ * Function that handles a getSlot request.
+ */
+
+void IoTQuery::getSlot() {
+       int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+       if (numrequeststosend < 0)
+               numrequeststosend = 0;
+       long long numbytes = 0;
+       int filesizes[numrequeststosend];
+       int fdarray[numrequeststosend];
+       int index=0;
+       for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+               struct stat st;
+               char *filename=getSlotFileName(seqn);
+               if (stat(filename, &st) == 0) {
+                       fdarray[index]=open(filename, O_RDONLY);
+                       filesizes[index]=st.st_size;
+                       numbytes+=filesizes[index];
+               } else {
+                       fdarray[index]=-1;
+                       filesizes[index]=0;
+               }
+               delete filename;
+       }
+       const char header[]="getslot";
+
+       /* Size is the header + the payload + space for number of requests
+                plus sizes of each slot */
+
+       long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
+       char * response = new char[size];
+       long long offset=0;
+       memcpy(response, header, sizeof(header)-1);
+       offset+=sizeof(header)-1;
+       int numreq=htonl(numrequeststosend);
+       memcpy(response + offset, &numreq, sizeof(numreq));
+       offset+=sizeof(numrequeststosend);
+       for(int i=0; i<numrequeststosend; i++) {
+               int filesize=htonl(filesizes[i]);
+               memcpy(response + offset, &filesize, sizeof(filesize));
+               offset+=sizeof(int);
+       }
+
+       /* Read the file data into the buffer */
+       for(int i=0; i<numrequeststosend; i++) {
+               if (fdarray[i]>=0) {
+                       doRead(fdarray[i], response+offset, filesizes[i]);
+                       offset+=filesizes[i];
+               }
+       }
+
+       /* Send the response out to the webserver. */
+       sendResponse(response, size);
+
+       /* Delete the response buffer and close the files. */
+       delete response;
+       for(int i=0; i<numrequeststosend; i++) {
+               if (fdarray[i] >= 0)
+                       close(fdarray[i]);
+       }
+}
+
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+
+void IoTQuery::setSalt() {
+       /* Write the slot data we received to a SLOT file */
+       char *filename = getSaltFileName();
+       int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+       doWrite(saltfd, data, length);
+       char response[0];
+       sendResponse(response, 0);
+       close(saltfd);
+       delete filename;
+}
+
+/**
+ * The method getSalt handles a setSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+       /* Write the slot data we received to a SLOT file */
+       char *filename = getSaltFileName();
+       int filesize = 0;
+       struct stat st;
+       if (stat(filename, &st) == 0) {
+               filesize=st.st_size;
+       } else {
+               delete filename;
+               return;
+       }
+       int saltfd = open(filename, O_RDONLY);
+       int responsesize = filesize + sizeof(int);
+       char * response = new char[responsesize];
+       doRead(saltfd, response+ sizeof(int), filesize);
+       int n_filesize=htonl(filesize);
+       *((int*) response) = n_filesize;
+       sendResponse(response, responsesize);
+       close(saltfd);
+       delete filename;
+       delete response;
+}
+
+/**
+ *     The method putSlot handles a putSlot request from the client
+ */
+
+void IoTQuery::putSlot() {
+       /* Check if the request is stale and send update in that case.  This
+                servers as an implicit failure of the request. */
+       if (requestsequencenumber!=(newestentry+1)) {
+               getSlot();
+               return;
+       }
+
+       /* See if we have too many slots and if so, delete the old one */
+       int numberofliveslots=(int) ((newestentry-oldestentry)+1);
+       if (numberofliveslots >=  numqueueentries) {
+               removeOldestSlot();
+       }
+
+       /* Write the slot data we received to a SLOT file */
+       char *filename = getSlotFileName(requestsequencenumber);
+       int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+       doWrite(slotfd, data, length);
+       close(slotfd);
+       delete filename;
+       newestentry = requestsequencenumber;
+
+       /* Update the seuqence numbers and other status file information. */
+       updateStatusFile();
+
+       /* Send response acknowledging success */
+       char command[]="putslot";
+       sendResponse(command, sizeof(command)-1);
+}
+
+/**
+ * Method sends response.  It wraps in appropriate headers for web
+ * server.
+ */
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+       cout << "Accept-Ranges: bytes\r\n"
+                        << "Content-Length: " << len << "\r\n"
+                        << "\r\n";
+       cout.write(bytes, len);
+}
+
+/**
+ *     Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
+       int directorylen=strlen(directory);
+
+       /* Size is 19 digits for ASCII representation of a long + 4
+                characters for SLOT string + 1 character for null termination +
+                directory size*/
+
+       char * filename=new char[25+directorylen];
+       snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum);
+       return filename;
+}
+
+/**
+ *     Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+       int directorylen=strlen(directory);
+
+       /* Size is 4 characters for SALT string + 1 character for null
+                termination + directory size*/
+
+       char * filename=new char[6+directorylen];
+       snprintf(filename, 6+directorylen, "%s/SALT", directory);
+       return filename;
+}
+
+/**
+ *  Removes the oldest slot file
+ */
+
+void IoTQuery::removeOldestSlot() {
+       if (oldestentry!=0) {
+               char * filename=getSlotFileName(oldestentry);
+               unlink(filename);
+               delete filename;
+       }
+       oldestentry++;
+}
+
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
+void IoTQuery::processQuery() {
+       getQuery();
+       getDirectory();
+       readData();
+
+       /* Verify that we receive a post request. */
+       if (strncmp(method, "POST", 4) != 0) {
+               cerr << "Not POST Request" << endl;
+               return;
+       }
+
+       /* Make sure the directory is okay. */
+       if (directory == NULL ||
+                       !checkDirectory()) {
+               cerr << "Directory " << directory << " does not exist" << endl;
+               return;
+       }
+
+       /* Get queue state from the status file.  If it doesn't exist,
+                create it. */
+       if (!openStatusFile()) {
+               cerr << "Failed to open status file" << endl;
+               return;
+       }
+
+       /* Lock status file to keep other requests out. */
+       flock(fd, LOCK_EX);
+
+       /* Decode query. */
+       decodeQuery();
+       
+       /* Handle request. */
+       if (reqGetSlot)
+               getSlot();
+       else if (reqPutSlot)
+               putSlot();
+       else if (reqSetSalt)
+               setSalt();
+       else if (reqGetSalt)
+               getSalt();
+       else {
+               cerr << "No recognized request" << endl;
+               return;
+       }
+}
+
+/**
+ * Reads in data for request.  This is used for the slot to be
+ * inserted.
+ */
+
+void IoTQuery::readData() {
+       if (length) {
+               data = new char[length+1];
+               memset(data, 0, length+1);
+               cin.read(data, length);
+       }
+       do {
+               char dummy;
+               cin >> dummy;
+       } while (!cin.eof());
+}
+
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
+void IoTQuery::getQuery() {
+       uri = FCGX_GetParam(uri_str, request->envp);
+       query = FCGX_GetParam(query_str, request->envp);
+       method = FCGX_GetParam(method_str, request->envp);
+       iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+       /** We require the content-length header to be sent. */
+       char * reqlength = FCGX_GetParam(length_str, request->envp);
+       if (reqlength) {
+               length=strtoll(reqlength, NULL, 10);
+       } else {
+               length=0;
+       }
+}
+
+/**
+ *  Initializes directory field from environmental variables.
+ */
+
+void IoTQuery::getDirectory() {
+       char * split = strchr((char *)uri, '?');
+       if (split == NULL)
+               return;
+       int split_len = (int) (split-uri);
+       int rootdir_len = strlen(iotcloudroot);
+       int directory_len = split_len + rootdir_len + 1;
+       directory = new char[directory_len];
+       memcpy(directory, iotcloudroot, rootdir_len);
+       memcpy(directory + rootdir_len, uri, split_len);
+       directory[directory_len-1]=0;
+}
+
+/**
+ * Helper function that is used to read the status file.
+ */
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+       do {
+               size_t bytesread=pread(fd, ptr, count, offset);
+               if (bytesread==count) {
+                       return 1;
+               } else if (bytesread==0) {
+                       return 0;
+               }
+       } while(1);
+}
+
+
+/**
+ * Writes the current state to the status file.
+ */
+
+void IoTQuery::updateStatusFile() {
+       pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+       pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+       pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+/**
+ * Reads in queue state from the status file.  Returns true if
+ * successful.
+ */
+
+bool IoTQuery::openStatusFile() {
+       char statusfile[]="queuestatus";
+       int len=strlen(directory);
+
+       char * filename=new char[len+sizeof(statusfile)+2];
+       memcpy(filename, directory, len);
+       filename[len]='/';
+       memcpy(filename+len+1, statusfile, sizeof(statusfile));
+       filename[len+sizeof(statusfile)+1]=0;
+       fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
+       delete filename;
+
+       if (fd < 0) {
+               cerr << strerror(errno) << " error opening statusfile" << endl;
+               return false;
+       }
+
+       /* Read in queue size, oldest sequence number, and newest sequence number. */
+       int size;
+       int needwrite=0;
+       if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+               numqueueentries=size;
+       else
+               needwrite=1;
+
+       long long entry;
+       if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+               oldestentry=entry;
+       else
+               needwrite=1;
+
+       if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+               newestentry=entry;
+       else
+               needwrite=1;
+
+       if (needwrite)
+               updateStatusFile();
+
+       return true;
+}
+
+
diff --git a/src2/server/iotquery.h b/src2/server/iotquery.h
new file mode 100644 (file)
index 0000000..ae39366
--- /dev/null
@@ -0,0 +1,68 @@
+#ifndef IOTQUERY_H
+#define IOTQUERY_H
+#include <iostream>
+#include "fcgio.h"
+#include "fcgi_stdio.h"
+
+#define DEFAULT_SIZE 128
+#define OFFSET_MAX 0
+#define OFFSET_OLD 4
+#define OFFSET_NEW 12
+
+class IoTQuery {
+public:
+       IoTQuery(FCGX_Request * request);
+       ~IoTQuery();
+       void processQuery();
+
+private:
+       void sendResponse(char *data, int length);
+       void getQuery();
+       void getDirectory();
+       void readData();
+       bool checkDirectory();
+       bool openStatusFile();
+       void updateStatusFile();
+       void decodeQuery();
+       void getSlot();
+       void putSlot();
+       void setSalt();
+       void getSalt();
+       void removeOldestSlot();
+       char * getSlotFileName(long long);
+       char * getSaltFileName();
+
+       FCGX_Request * request;
+       char *data;
+       /* Directory slot files are placed in. */
+       char *directory;
+       /* Full URI from Apache */
+       const char * uri;
+       /* Query portion of URI */
+       const char * query;
+       /* Type of request: GET or PUT */
+       const char * method;
+       /* Root directory for all accounts */
+       const char * iotcloudroot;
+       /* Expected length of data from client */
+       long long length;
+       /* Sequence number for oldest slot */
+       long long oldestentry;
+       /* Sequence number for newest slot */
+       long long newestentry;
+       /* Sequence number from request */
+       long long requestsequencenumber;
+       /* Size of queue */
+       int numqueueentries;
+       /* fd for queuestatus file */
+       int fd;
+       /* Is the request to get a slot? */
+       bool reqGetSlot;
+       /* Is the request to put a slot? */
+       bool reqPutSlot;
+       /* Is the request to set the salt? */
+       bool reqSetSalt;
+       /* Is the request to get the salt? */
+       bool reqGetSalt;
+};
+#endif