From 2ebf9c28a58021bcf2f98af145273c368c0d9ce0 Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Thu, 17 Nov 2016 16:26:49 -0800 Subject: [PATCH] Initial commit of code for new version of block chain, does not compile (had to go to class mid java class creation) --- .gitignore | 3 + src2/java/.dir-locals.el | 2 + src2/java/iotcloud/Abort.java | 53 +++ src2/java/iotcloud/CloudComm.java | 232 +++++++++ src2/java/iotcloud/Commit.java | 56 +++ src2/java/iotcloud/Entry.java | 99 ++++ src2/java/iotcloud/Guard.java | 101 ++++ src2/java/iotcloud/IoTString.java | 105 +++++ src2/java/iotcloud/KeyValue.java | 51 ++ src2/java/iotcloud/LastMessage.java | 55 +++ src2/java/iotcloud/Liveness.java | 11 + src2/java/iotcloud/Makefile | 17 + src2/java/iotcloud/NewKey.java | 57 +++ src2/java/iotcloud/Pair.java | 23 + src2/java/iotcloud/PendingTransaction.java | 92 ++++ src2/java/iotcloud/RejectedMessage.java | 88 ++++ src2/java/iotcloud/Slot.java | 214 +++++++++ src2/java/iotcloud/SlotBuffer.java | 99 ++++ src2/java/iotcloud/SlotIndexer.java | 31 ++ src2/java/iotcloud/Table.java | 510 ++++++++++++++++++++ src2/java/iotcloud/TableStatus.java | 45 ++ src2/java/iotcloud/Test.java | 95 ++++ src2/java/iotcloud/Transaction.java | 84 ++++ src2/java/iotcloud/issues.txt | 2 + src2/script/C.cfg | 37 ++ src2/script/java.cfg | 37 ++ src2/script/makefile | 4 + src2/server/.dir-locals.el | 2 + src2/server/Makefile | 15 + src2/server/README.txt | 32 ++ src2/server/iotcloud.cpp | 40 ++ src2/server/iotquery.cpp | 517 +++++++++++++++++++++ src2/server/iotquery.h | 68 +++ 33 files changed, 2877 insertions(+) create mode 100644 src2/java/.dir-locals.el create mode 100644 src2/java/iotcloud/Abort.java create mode 100644 src2/java/iotcloud/CloudComm.java create mode 100644 src2/java/iotcloud/Commit.java create mode 100644 src2/java/iotcloud/Entry.java create mode 100644 src2/java/iotcloud/Guard.java create mode 100644 src2/java/iotcloud/IoTString.java create mode 100644 src2/java/iotcloud/KeyValue.java create mode 100644 src2/java/iotcloud/LastMessage.java create mode 100644 src2/java/iotcloud/Liveness.java create mode 100644 src2/java/iotcloud/Makefile create mode 100644 src2/java/iotcloud/NewKey.java create mode 100644 src2/java/iotcloud/Pair.java create mode 100644 src2/java/iotcloud/PendingTransaction.java create mode 100644 src2/java/iotcloud/RejectedMessage.java create mode 100644 src2/java/iotcloud/Slot.java create mode 100644 src2/java/iotcloud/SlotBuffer.java create mode 100644 src2/java/iotcloud/SlotIndexer.java create mode 100644 src2/java/iotcloud/Table.java create mode 100644 src2/java/iotcloud/TableStatus.java create mode 100644 src2/java/iotcloud/Test.java create mode 100644 src2/java/iotcloud/Transaction.java create mode 100644 src2/java/iotcloud/issues.txt create mode 100644 src2/script/C.cfg create mode 100644 src2/script/java.cfg create mode 100644 src2/script/makefile create mode 100644 src2/server/.dir-locals.el create mode 100644 src2/server/Makefile create mode 100644 src2/server/README.txt create mode 100644 src2/server/iotcloud.cpp create mode 100644 src2/server/iotquery.cpp create mode 100644 src2/server/iotquery.h diff --git a/.gitignore b/.gitignore index deab353..8e8a718 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,9 @@ dist/ bower_components test/bower_components +/src/bin + + # Ignoring those pesky .DS_Store files on mac .DS_Store diff --git a/src2/java/.dir-locals.el b/src2/java/.dir-locals.el new file mode 100644 index 0000000..e166a2e --- /dev/null +++ b/src2/java/.dir-locals.el @@ -0,0 +1,2 @@ +((nil . ((indent-tabs-mode . t)))) + diff --git a/src2/java/iotcloud/Abort.java b/src2/java/iotcloud/Abort.java new file mode 100644 index 0000000..8ec1871 --- /dev/null +++ b/src2/java/iotcloud/Abort.java @@ -0,0 +1,53 @@ +package iotcloud; + +import java.nio.ByteBuffer; + +/** + * This Entry records the abort sent by a given machine. + * @author Ali Younis + * @version 1.0 + */ + + +class Abort extends Entry { + private long seqnum; + private long machineid; + + Abort(Slot slot, long _seqnum, long _machineid) { + super(slot); + seqnum=_seqnum; + machineid=_machineid; + } + + long getMachineID() { + return machineid; + } + + long getSequenceNumber() { + return seqnum; + } + + static Entry decode(Slot slot, ByteBuffer bb) { + long seqnum=bb.getLong(); + long machineid=bb.getLong(); + return new Abort(slot, seqnum, machineid); + } + + void encode(ByteBuffer bb) { + bb.put(Entry.TypeAbort); + bb.putLong(seqnum); + bb.putLong(machineid); + } + + int getSize() { + return 2*Long.BYTES+Byte.BYTES; + } + + byte getType() { + return Entry.TypeAbort; + } + + Entry getCopy(Slot s) { + return new Abort(s, seqnum, machineid); + } +} \ No newline at end of file diff --git a/src2/java/iotcloud/CloudComm.java b/src2/java/iotcloud/CloudComm.java new file mode 100644 index 0000000..ac906b1 --- /dev/null +++ b/src2/java/iotcloud/CloudComm.java @@ -0,0 +1,232 @@ +package iotcloud; +import java.io.*; +import java.net.*; +import java.util.Arrays; +import javax.crypto.*; +import javax.crypto.spec.*; +import java.security.SecureRandom; + +/** + * This class provides a communication API to the webserver. It also + * validates the HMACs on the slots and handles encryption. + * @author Brian Demsky + * @version 1.0 + */ + + +class CloudComm { + String baseurl; + Cipher encryptCipher; + Cipher decryptCipher; + Mac mac; + String password; + SecureRandom random; + static final int SALT_SIZE = 8; + byte salt[]; + Table table; + + /** + * Empty Constructor needed for child class. + */ + + CloudComm() { + } + + /** + * Constructor for actual use. Takes in the url and password. + */ + + CloudComm(Table _table, String _baseurl, String _password) { + this.table=_table; + this.baseurl=_baseurl; + this.password = _password; + this.random = new SecureRandom(); + } + + /** + * Generates Key from password. + */ + + private SecretKeySpec initKey() { + try { + PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), salt, 65536, 128); + SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec); + return new SecretKeySpec(tmpkey.getEncoded(), "AES"); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed generating key."); + } + } + + /** + * Inits the HMAC generator. + */ + + private void initCrypt() { + try { + SecretKeySpec key=initKey(); + password = null; // drop password + mac = Mac.getInstance("HmacSHA256"); + mac.init(key); + encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding"); + encryptCipher.init(Cipher.ENCRYPT_MODE, key); + decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding"); + decryptCipher.init(Cipher.DECRYPT_MODE, key); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed To Initialize Ciphers"); + } + } + + /* + * Builds the URL for the given request. + */ + + private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException { + String reqstring=isput?"req=putslot":"req=getslot"; + String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber; + if (maxentries != 0) + urlstr += "&max="+maxentries; + return new URL(urlstr); + } + + public void setSalt() { + try { + salt = new byte[SALT_SIZE]; + random.nextBytes(salt); + URL url=new URL(baseurl+"?req=setsalt"); + URLConnection con=url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.setFixedLengthStreamingMode(salt.length); + http.setDoOutput(true); + http.connect(); + OutputStream os=http.getOutputStream(); + os.write(salt); + int responsecode=http.getResponseCode(); + if (responsecode != HttpURLConnection.HTTP_OK) + throw new Error("Invalid response"); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed setting salt"); + } + initCrypt(); + } + + private void getSalt() throws Exception { + URL url=new URL(baseurl+"?req=getsalt"); + URLConnection con=url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.connect(); + + InputStream is=http.getInputStream(); + DataInputStream dis=new DataInputStream(is); + int salt_length=dis.readInt(); + byte [] tmp=new byte[salt_length]; + dis.readFully(tmp); + salt=tmp; + } + + /* + * API for putting a slot into the queue. Returns null on success. + * On failure, the server will send slots with newer sequence + * numbers. + */ + + Slot[] putSlot(Slot slot, int max) { + try { + if (salt == null) { + getSalt(); + initCrypt(); + } + + long sequencenumber=slot.getSequenceNumber(); + byte[] bytes=slot.encode(mac); + bytes = encryptCipher.doFinal(bytes); + + URL url=buildRequest(true, sequencenumber, max); + URLConnection con=url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + + http.setRequestMethod("POST"); + http.setFixedLengthStreamingMode(bytes.length); + http.setDoOutput(true); + http.connect(); + + OutputStream os=http.getOutputStream(); + os.write(bytes); + + InputStream is=http.getInputStream(); + DataInputStream dis=new DataInputStream(is); + byte[] resptype=new byte[7]; + dis.readFully(resptype); + if (Arrays.equals(resptype, "getslot".getBytes())) + return processSlots(dis); + else if (Arrays.equals(resptype, "putslot".getBytes())) + return null; + else + throw new Error("Bad response to putslot"); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("putSlot failed"); + } + } + + /** + * Request the server to send all slots with the given + * sequencenumber or newer. + */ + + Slot[] getSlots(long sequencenumber) { + try { + if (salt == null) { + getSalt(); + initCrypt(); + } + + URL url=buildRequest(false, sequencenumber, 0); + URLConnection con=url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.connect(); + InputStream is=http.getInputStream(); + + DataInputStream dis=new DataInputStream(is); + + byte[] resptype=new byte[7]; + dis.readFully(resptype); + if (!Arrays.equals(resptype, "getslot".getBytes())) + throw new Error("Bad Response: "+new String(resptype)); + else + return processSlots(dis); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("getSlots failed"); + } + } + + /** + * Method that actually handles building Slot objects from the + * server response. Shared by both putSlot and getSlots. + */ + + private Slot[] processSlots(DataInputStream dis) throws Exception { + int numberofslots=dis.readInt(); + int[] sizesofslots=new int[numberofslots]; + Slot[] slots=new Slot[numberofslots]; + for(int i=0; i + * @version 1.0 + */ + + +class Commit extends Entry { + private long seqnum; + private Set keyValueUpdateSet; + + + public Commit(Slot slot, long _seqnum, long _machineid) { + super(slot); + seqnum=_seqnum; + machineid=_machineid; + } + + public long getSequenceNumber() { + return seqnum; + } + + + + + + static Entry decode(Slot slot, ByteBuffer bb) { + long seqnum=bb.getLong(); + long machineid=bb.getLong(); + return new Abort(slot, seqnum, machineid); + } + + public void encode(ByteBuffer bb) { + bb.put(Entry.TypeAbort); + bb.putLong(seqnum); + bb.putLong(machineid); + } + + public int getSize() { + return 2*Long.BYTES+Byte.BYTES; + } + + public byte getType() { + return Entry.TypeAbort; + } + + public Entry getCopy(Slot s) { + return new Abort(s, seqnum, machineid); + } +} \ No newline at end of file diff --git a/src2/java/iotcloud/Entry.java b/src2/java/iotcloud/Entry.java new file mode 100644 index 0000000..af99192 --- /dev/null +++ b/src2/java/iotcloud/Entry.java @@ -0,0 +1,99 @@ +package iotcloud; +import java.nio.ByteBuffer; + +/** + * Generic class that wraps all the different types of information + * that can be stored in a Slot. + * @author Brian Demsky + * @version 1.0 + */ + +abstract class Entry implements Liveness { + + + static final byte TypeCommit = 1; + static final byte TypeAbort = 2; + static final byte TypeTransaction = 3; + static final byte TypeNewKey = 4; + static final byte TypeLastMessage = 5; + static final byte TypeRejectedMessage = 6; + static final byte TypeTableStatus = 7; + + + + /* Records whether the information is still live or has been + superceded by a newer update. */ + + private boolean islive = true; + private Slot parentslot; + + public Entry(Slot _parentslot) { + parentslot = _parentslot; + } + + /** + * Static method for decoding byte array into Entry objects. First + * byte tells the type of entry. + */ + + static Entry decode(Slot slot, ByteBuffer bb) { + byte type = bb.get(); + switch (type) { + + case TypeLastMessage: + return LastMessage.decode(slot, bb); + + case TypeRejectedMessage: + return RejectedMessage.decode(slot, bb); + + case TypeTableStatus: + return TableStatus.decode(slot, bb); + + default: + throw new Error("Unrecognized Entry Type: " + type); + } + } + + /** + * Returns true if the Entry object is still live. + */ + + public boolean isLive() { + return islive; + } + + /** + * Flags the entry object as dead. Also decrements the live count + * of the parent slot. + */ + + public void setDead() { + islive = false; + parentslot.decrementLiveCount(); + } + + /** + * Serializes the Entry object into the byte buffer. + */ + + abstract void encode(ByteBuffer bb); + + /** + * Returns the size in bytes the entry object will take in the byte + * array. + */ + + abstract int getSize(); + + /** + * Returns a byte encoding the type of the entry object. + */ + + abstract byte getType(); + + /** + * Returns a copy of the Entry that can be added to a different slot. + */ + abstract Entry getCopy(Slot s); + +} diff --git a/src2/java/iotcloud/Guard.java b/src2/java/iotcloud/Guard.java new file mode 100644 index 0000000..aaf312b --- /dev/null +++ b/src2/java/iotcloud/Guard.java @@ -0,0 +1,101 @@ +package iotcloud; + +import java.util.Set; +import java.util.HashSet; +import java.nio.ByteBuffer; + +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import java.lang.NullPointerException; + + +class Guard { + + static final byte Equal = 1; + static final byte NotEqual = 2; + + private IoTString booleanExpression; + + public Guard() { + booleanExpression = null; + } + + public Guard(IoTString _booleanExpression) { + booleanExpression = _booleanExpression; + } + + /** + * Create an equality expression for a key value. + * + */ + public static String createExpression(IoTString keyName, IoTString keyValue, byte op) { + if (op == Equal) { + return keyName.toString() + "=='" + keyValue.toString() + "'"; + } else if (op == NotEqual) { + return keyName.toString() + "!='" + keyValue.toString() + "'"; + } + + // Unrecognized op + return null; + } + + /** + * Add a boolean expression to the guard. + * + */ + public void setGuardExpression(String expr) { + booleanExpression = new IoTString(expr); + } + + /** + * Evaluate the guard expression for a given set of key value pairs. + * + */ + public boolean evaluate(Set kvSet) throws ScriptException, NullPointerException { + + // There are no conditions to evaluate + if (booleanExpression == null) { + return true; + } + + // All the current key value pairs that we need to evaluate the condition + String[] variables = new String[kvSet.size()]; + + // Fill the variables array + int i = 0; + for (KeyValue kv : kvSet) { + variables[i] = kv.getKey() + " ='" + kv.getValue() + "'"; + i++; + } + + // Prep the evaluation engine (script engine) + ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript"); + for (String s : variables) { + engine.eval(s); + } + + // Evaluate the guard condition + return 1 == (Integer)engine.eval(booleanExpression.toString()); + } + + /** + * Get the size of the guard condition + * + */ + public int getSize() { + return Integer.BYTES + booleanExpression.length(); + } + + public void encode(ByteBuffer bb) { + bb.putInt(booleanExpression.length()); + bb.put(booleanExpression.internalBytes()); + } + + static Guard decode(ByteBuffer bb) { + int exprLength = bb.getInt(); + byte[] expr = new byte[exprLength]; + bb.get(expr); + return new Guard(IoTString.shallow(expr)); + } +} \ No newline at end of file diff --git a/src2/java/iotcloud/IoTString.java b/src2/java/iotcloud/IoTString.java new file mode 100644 index 0000000..83a3fa1 --- /dev/null +++ b/src2/java/iotcloud/IoTString.java @@ -0,0 +1,105 @@ +package iotcloud; + +import java.util.Arrays; + +/** + * IoTString is wraps the underlying byte string. We don't use the + * standard String class as we have bytes and not chars. + * @author Brian Demsky + * @version 1.0 + */ + + +final public class IoTString { + byte[] array; + int hashcode; + + private IoTString() { + } + + /** + * Builds an IoTString object around the byte array. This + * constructor makes a copy, so the caller is free to modify the byte array. + */ + + public IoTString(byte[] _array) { + array=(byte[]) _array.clone(); + hashcode=Arrays.hashCode(array); + } + + /** + * Converts the String object to a byte representation and stores it + * into the IoTString object. + */ + + public IoTString(String str) { + array=str.getBytes(); + hashcode=Arrays.hashCode(array); + } + + /** + * Internal methods to build an IoTString using the byte[] passed + * in. Caller is responsible for ensuring the byte[] is never + * modified. + */ + + static IoTString shallow(byte[] _array) { + IoTString i=new IoTString(); + i.array = _array; + i.hashcode = Arrays.hashCode(_array); + return i; + } + + /** + * Internal method to grab a reference to our byte array. Caller + * must not modify it. + */ + + byte[] internalBytes() { + return array; + } + + /** + * Returns the hashCode as computed by Arrays.hashcode(byte[]). + */ + + public int hashCode() { + return hashcode; + } + + /** + * Returns a String representation of the IoTString. + */ + + public String toString() { + return new String(array); + } + + /** + * Returns a copy of the underlying byte string. + */ + + public byte[] getBytes() { + return (byte[]) array.clone(); + } + + /** + * Returns true if two byte strings have the same content. + */ + + public boolean equals(Object o) { + if (o instanceof IoTString) { + IoTString i=(IoTString)o; + return Arrays.equals(array, i.array); + } + return false; + } + + /** + * Returns the length in bytes of the IoTString. + */ + + public int length() { + return array.length; + } +} diff --git a/src2/java/iotcloud/KeyValue.java b/src2/java/iotcloud/KeyValue.java new file mode 100644 index 0000000..9fbaf90 --- /dev/null +++ b/src2/java/iotcloud/KeyValue.java @@ -0,0 +1,51 @@ +package iotcloud; +import java.nio.ByteBuffer; + +/** + * KeyValue entry for Slot. + * @author Brian Demsky + * @version 1.0 + */ + +class KeyValue /*extends Entry */ { + private IoTString key; + private IoTString value; + + public KeyValue(IoTString _key, IoTString _value) { + key=_key; + value=_value; + } + + public IoTString getKey() { + return key; + } + + public IoTString getValue() { + return value; + } + + static KeyValue decode(ByteBuffer bb) { + int keylength=bb.getInt(); + int valuelength=bb.getInt(); + byte[] key=new byte[keylength]; + byte[] value=new byte[valuelength]; + bb.get(key); + bb.get(value); + return new KeyValue(IoTString.shallow(key), IoTString.shallow(value)); + } + + public void encode(ByteBuffer bb) { + bb.putInt(key.length()); + bb.putInt(value.length()); + bb.put(key.internalBytes()); + bb.put(value.internalBytes()); + } + + public int getSize() { + return 2*Integer.BYTES+key.length()+value.length(); + } + + public String toString() { + return value.toString(); + } +} diff --git a/src2/java/iotcloud/LastMessage.java b/src2/java/iotcloud/LastMessage.java new file mode 100644 index 0000000..738dff6 --- /dev/null +++ b/src2/java/iotcloud/LastMessage.java @@ -0,0 +1,55 @@ +package iotcloud; + +import java.nio.ByteBuffer; + +/** + * This Entry records the last message sent by a given machine. + * @author Brian Demsky + * @version 1.0 + */ + + +class LastMessage extends Entry { + private long machineid; + private long seqnum; + + public LastMessage(Slot slot, long _machineid, long _seqnum) { + super(slot); + machineid=_machineid; + seqnum=_seqnum; + } + + public long getMachineID() { + return machineid; + } + + public long getSequenceNumber() { + return seqnum; + } + + static Entry decode(Slot slot, ByteBuffer bb) { + long machineid=bb.getLong(); + long seqnum=bb.getLong(); + return new LastMessage(slot, machineid, seqnum); + } + + public void encode(ByteBuffer bb) { + bb.put(Entry.TypeLastMessage); + bb.putLong(machineid); + bb.putLong(seqnum); + } + + public int getSize() { + return 2*Long.BYTES+Byte.BYTES; + } + + public byte getType() { + return Entry.TypeLastMessage; + } + + public Entry getCopy(Slot s) { + return new LastMessage(s, machineid, seqnum); + } +} + + diff --git a/src2/java/iotcloud/Liveness.java b/src2/java/iotcloud/Liveness.java new file mode 100644 index 0000000..2c840e4 --- /dev/null +++ b/src2/java/iotcloud/Liveness.java @@ -0,0 +1,11 @@ +package iotcloud; + +/** + * Interface common to both classes that record information about the + * last message sent by a machine. (Either a Slot or a LastMessage. + * @author Brian Demsky + * @version 1.0 + */ + +interface Liveness { +} diff --git a/src2/java/iotcloud/Makefile b/src2/java/iotcloud/Makefile new file mode 100644 index 0000000..2d45b63 --- /dev/null +++ b/src2/java/iotcloud/Makefile @@ -0,0 +1,17 @@ +all: server + +JAVAC = javac +JAVADOC = javadoc +BIN_DIR = bin +DOCS_DIR = docs + +server: + $(JAVAC) -d $(BIN_DIR) *.java + +doc: server + $(JAVADOC) -private -d $(DOCS_DIR) *.java + +clean: + rm -r bin/* + rm -r docs/* + rm *~ diff --git a/src2/java/iotcloud/NewKey.java b/src2/java/iotcloud/NewKey.java new file mode 100644 index 0000000..c101c0b --- /dev/null +++ b/src2/java/iotcloud/NewKey.java @@ -0,0 +1,57 @@ +package iotcloud; + +import java.nio.ByteBuffer; + +/** + * This Entry records the abort sent by a given machine. + * @author Ali Younis + * @version 1.0 + */ + + +class NewKey extends Entry { + private IoTString key; + private long machineid; + + public NewKey(Slot slot, IoTString _key, long _machineid) { + super(slot); + key = _key; + machineid = _machineid; + } + + public long getMachineID() { + return machineid; + } + + public IoTString getKey() { + return key; + } + + static Entry decode(Slot slot, ByteBuffer bb) { + int keylength = bb.getInt(); + byte[] key = new byte[keylength]; + bb.get(key); + long machineid = bb.getLong(); + + return new NewKey(slot, IoTString.shallow(key), machineid); + } + + public void encode(ByteBuffer bb) { + bb.put(Entry.TypeAbort); + bb.putInt(key.length()); + bb.put(key.internalBytes()); + bb.putLong(machineid); + } + + public int getSize() { + return Long.BYTES + Byte.BYTES + key.length(); + } + + public byte getType() { + return Entry.TypeNewKey; + } + + public Entry getCopy(Slot s) { + return new NewKey(s, key, machineid); + } +} \ No newline at end of file diff --git a/src2/java/iotcloud/Pair.java b/src2/java/iotcloud/Pair.java new file mode 100644 index 0000000..73ed6bd --- /dev/null +++ b/src2/java/iotcloud/Pair.java @@ -0,0 +1,23 @@ +package iotcloud; + +class Pair { + private A a; + private B b; + + Pair(A a, B b) { + this.a=a; + this.b=b; + } + + A getFirst() { + return a; + } + + B getSecond() { + return b; + } + + public String toString() { + return "<"+a+","+b+">"; + } +} diff --git a/src2/java/iotcloud/PendingTransaction.java b/src2/java/iotcloud/PendingTransaction.java new file mode 100644 index 0000000..5253a94 --- /dev/null +++ b/src2/java/iotcloud/PendingTransaction.java @@ -0,0 +1,92 @@ +package iotcloud; + +import java.util.Set; +import java.util.HashSet; + +import javax.script.ScriptException; +import java.lang.NullPointerException; + + +class PendingTransaction { + + static final byte Equal = Guard.Equal; + static final byte NotEqual = Guard.NotEqual; + + private Set keyValueUpdateSet; + private Guard guard; + + public PendingTransaction() { + keyValueUpdateSet = new HashSet(); + guard = new Guard(); + } + + /** + * Add a new key value to the updates + * + */ + public void addKV(KeyValue newKV) { + + // Make sure there are no duplicates + for (KeyValue kv : keyValueUpdateSet) { + if (kv.getKey().equals(newKV.getKey())) { + + // Remove key if we are adding a newer version of the same key + keyValueUpdateSet.remove(kv); + break; + } + } + + // Add the key to the hash set + keyValueUpdateSet.add(newKV); + } + + /** + * Get the key value update set + * + */ + public Set getKVUpdates() { + return keyValueUpdateSet; + } + + /** + * Get the guard + * + */ + public Guard getGuard() { + return guard; + } + + /** + * Add a guard to this transaction + * + */ + public void addGuard(Guard _guard) { + guard = _guard; + } + + /** + * Evaluate the guard expression for a given transaction using a set of key value pairs. + * + */ + public boolean evaluate(Set kvSet) throws ScriptException, NullPointerException { + + // Evaluate the guard using the current KV Set + return guard.evaluate(kvSet); + } + + /** + * Add a boolean expression to the guard. + * + */ + public void setGuardExpression(String expr) { + guard.setGuardExpression(expr); + } + + /** + * Trampoline static method. + * + */ + public static String createExpression(IoTString keyName, IoTString keyValue, byte op) { + return Guard.createExpression(keyName, keyValue, op); + } +} \ No newline at end of file diff --git a/src2/java/iotcloud/RejectedMessage.java b/src2/java/iotcloud/RejectedMessage.java new file mode 100644 index 0000000..9c84f18 --- /dev/null +++ b/src2/java/iotcloud/RejectedMessage.java @@ -0,0 +1,88 @@ +package iotcloud; +import java.nio.ByteBuffer; +import java.util.HashSet; + +/** + * Entry for tracking messages that the server rejected. We have to + * make sure that all clients know that this message was rejected to + * prevent the server from reusing these messages in an attack. + * @author Brian Demsky + * @version 1.0 + */ + + +class RejectedMessage extends Entry { + /* Machine identifier */ + private long machineid; + /* Oldest sequence number in range */ + private long oldseqnum; + /* Newest sequence number in range */ + private long newseqnum; + /* Is the machine identifier of the relevant slots equal to (or not + * equal to) the specified machine identifier. */ + private boolean equalto; + /* Set of machines that have not received notification. */ + private HashSet watchset; + + RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) { + super(slot); + machineid=_machineid; + oldseqnum=_oldseqnum; + newseqnum=_newseqnum; + equalto=_equalto; + } + + long getOldSeqNum() { + return oldseqnum; + } + + long getNewSeqNum() { + return newseqnum; + } + + boolean getEqual() { + return equalto; + } + + long getMachineID() { + return machineid; + } + + static Entry decode(Slot slot, ByteBuffer bb) { + long machineid=bb.getLong(); + long oldseqnum=bb.getLong(); + long newseqnum=bb.getLong(); + byte equalto=bb.get(); + return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1); + } + + void setWatchSet(HashSet _watchset) { + watchset=_watchset; + } + + void removeWatcher(long machineid) { + if (watchset.remove(machineid)) + if (watchset.isEmpty()) + setDead(); + } + + void encode(ByteBuffer bb) { + bb.put(Entry.TypeRejectedMessage); + bb.putLong(machineid); + bb.putLong(oldseqnum); + bb.putLong(newseqnum); + bb.put(equalto?(byte)1:(byte)0); + } + + int getSize() { + return 3*Long.BYTES + 2*Byte.BYTES; + } + + byte getType() { + return Entry.TypeRejectedMessage; + } + + Entry getCopy(Slot s) { + return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto); + } +} diff --git a/src2/java/iotcloud/Slot.java b/src2/java/iotcloud/Slot.java new file mode 100644 index 0000000..5828fd3 --- /dev/null +++ b/src2/java/iotcloud/Slot.java @@ -0,0 +1,214 @@ +package iotcloud; +import java.util.Vector; +import java.nio.ByteBuffer; +import javax.crypto.Mac; +import java.util.Arrays; + +/** + * Data structuring for holding Slot information. + * @author Brian Demsky + * @version 1.0 + */ + +class Slot implements Liveness { + /** Sets the slot size. */ + static final int SLOT_SIZE=2048; + /** Sets the size for the HMAC. */ + static final int HMAC_SIZE=32; + + /** Sequence number of the slot. */ + private long seqnum; + /** HMAC of previous slot. */ + private byte[] prevhmac; + /** HMAC of this slot. */ + private byte[] hmac; + /** Machine that sent this slot. */ + private long machineid; + /** Vector of entries in this slot. */ + private Vector entries; + /** Pieces of information that are live. */ + private int livecount; + /** Flag that indicates whether this slot is still live for + * recording the machine that sent it. */ + private boolean seqnumlive; + /** Number of bytes of free space. */ + private int freespace; + /** Reference to Table */ + private Table table; + + Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) { + seqnum=_seqnum; + machineid=_machineid; + prevhmac=_prevhmac; + hmac=_hmac; + entries=new Vector(); + livecount=1; + seqnumlive=true; + freespace = SLOT_SIZE - getBaseSize(); + table=_table; + } + + Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac) { + this(_table, _seqnum, _machineid, _prevhmac, null); + } + + Slot(Table _table, long _seqnum, long _machineid) { + this(_table, _seqnum, _machineid, new byte[HMAC_SIZE], null); + } + + byte[] getHMAC() { + return hmac; + } + + byte[] getPrevHMAC() { + return prevhmac; + } + + void addEntry(Entry e) { + e=e.getCopy(this); + entries.add(e); + livecount++; + freespace -= e.getSize(); + } + + private void addShallowEntry(Entry e) { + entries.add(e); + livecount++; + freespace -= e.getSize(); + } + + /** + * Returns true if the slot has free space to hold the entry without + * using its reserved space. */ + + boolean hasSpace(Entry e) { + int newfreespace = freespace - e.getSize(); + return newfreespace >= 0; + } + + Vector getEntries() { + return entries; + } + + static Slot decode(Table table, byte[] array, Mac mac) { + mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE); + byte[] realmac=mac.doFinal(); + + ByteBuffer bb=ByteBuffer.wrap(array); + byte[] hmac=new byte[HMAC_SIZE]; + byte[] prevhmac=new byte[HMAC_SIZE]; + bb.get(hmac); + bb.get(prevhmac); + if (!Arrays.equals(realmac, hmac)) + throw new Error("Server Error: Invalid HMAC! Potential Attack!"); + + long seqnum=bb.getLong(); + long machineid=bb.getLong(); + int numentries=bb.getInt(); + Slot slot=new Slot(table, seqnum, machineid, prevhmac, hmac); + + for(int i=0; i getLiveEntries(boolean resize) { + Vector liveEntries=new Vector(); + for(Entry entry: entries) { + if (entry.isLive()) { + if (!resize || entry.getType() != Entry.TypeTableStatus) + liveEntries.add(entry); + } + } + + if (seqnumlive && !resize) + liveEntries.add(new LastMessage(this, machineid, seqnum)); + + return liveEntries; + } + + /** + * Returns the sequence number of the slot. + */ + + long getSequenceNumber() { + return seqnum; + } + + /** + * Returns the machine that sent this slot. + */ + + long getMachineID() { + return machineid; + } + + /** + * Records that a newer slot records the fact that this slot was + * sent by the relevant machine. + */ + + void setDead() { + seqnumlive=false; + decrementLiveCount(); + } + + /** + * Update the count of live entries. + */ + + void decrementLiveCount() { + livecount--; + if (livecount==0) + table.decrementLiveCount(); + } + + /** + * Returns whether the slot stores any live information. + */ + + boolean isLive() { + return livecount > 0; + } + + public String toString() { + return "<"+getSequenceNumber()+">"; + } +} diff --git a/src2/java/iotcloud/SlotBuffer.java b/src2/java/iotcloud/SlotBuffer.java new file mode 100644 index 0000000..14bc926 --- /dev/null +++ b/src2/java/iotcloud/SlotBuffer.java @@ -0,0 +1,99 @@ +package iotcloud; + +/** + * Circular buffer that holds the live set of slots. + * @author Brian Demsky + * @version 1.0 + */ + +class SlotBuffer { + static final int DEFAULT_SIZE = 128; + + private Slot[] array; + private int head; + private int tail; + private long oldestseqn; + + SlotBuffer() { + array=new Slot[DEFAULT_SIZE+1]; + head=tail=0; + oldestseqn=0; + } + + int size() { + if (head >= tail) + return head - tail; + return (array.length + head) - tail; + } + + int capacity() { + return array.length - 1; + } + + void resize(int newsize) { + if (newsize == (array.length-1)) + return; + Slot[] newarray = new Slot[newsize+1]; + int currsize = size(); + int index = tail; + for(int i=0; i < currsize; i++) { + newarray[i] = array[index]; + if ((++index) == array.length) + index = 0; + } + array = newarray; + tail = 0; + head = currsize; + } + + private void incrementHead() { + head++; + if (head >= array.length) + head=0; + } + + private void incrementTail() { + tail++; + if (tail >= array.length) + tail=0; + } + + void putSlot(Slot s) { + array[head]=s; + incrementHead(); + + if (oldestseqn==0) + oldestseqn = s.getSequenceNumber(); + + if (head==tail) { + incrementTail(); + oldestseqn++; + } + } + + Slot getSlot(long seqnum) { + int diff=(int) (seqnum-oldestseqn); + int index=diff + tail; + if (index >= array.length) { + if (head >= tail) + return null; + index-= array.length; + } + + if (index >= array.length) + return null; + + if (head >= tail && index >= head) + return null; + + return array[index]; + } + + long getOldestSeqNum() { + return oldestseqn; + } + + long getNewestSeqNum() { + return oldestseqn + size() - 1; + } +} diff --git a/src2/java/iotcloud/SlotIndexer.java b/src2/java/iotcloud/SlotIndexer.java new file mode 100644 index 0000000..cecdf2d --- /dev/null +++ b/src2/java/iotcloud/SlotIndexer.java @@ -0,0 +1,31 @@ +package iotcloud; + +/** + * Slot indexer allows slots in both the slot buffer and the new + * server response to looked up in a consistent fashion. + * @author Brian Demsky + * @version 1.0 + */ + +class SlotIndexer { + private Slot[] updates; + private SlotBuffer buffer; + private long firstslotseqnum; + + SlotIndexer(Slot[] _updates, SlotBuffer _buffer) { + buffer = _buffer; + updates = _updates; + firstslotseqnum = updates[0].getSequenceNumber(); + } + + Slot getSlot(long seqnum) { + if (seqnum >= firstslotseqnum) { + int offset = (int) (seqnum - firstslotseqnum); + if (offset >= updates.length) + throw new Error("Invalid Slot Sequence Number Reference"); + else + return updates[offset]; + } else + return buffer.getSlot(seqnum); + } +} diff --git a/src2/java/iotcloud/Table.java b/src2/java/iotcloud/Table.java new file mode 100644 index 0000000..7009ed9 --- /dev/null +++ b/src2/java/iotcloud/Table.java @@ -0,0 +1,510 @@ +package iotcloud; +import java.util.HashMap; +import java.util.Map; +import java.util.Iterator; +import java.util.HashSet; +import java.util.Arrays; +import java.util.Vector; +import java.util.Random; +import java.util.Queue; +import java.util.LinkedList; +/** + * IoTTable data structure. Provides client inferface. + * @author Brian Demsky + * @version 1.0 + */ + +final public class Table { + private int numslots; //number of slots stored in buffer + + //table of key-value pairs + private HashMap table = new HashMap(); + + // machine id -> (sequence number, Slot or LastMessage); records last message by each client + private HashMap > lastmessagetable = new HashMap >(); + // machine id -> ... + private HashMap > watchlist = new HashMap >(); + private Vector rejectedmessagelist = new Vector(); + private SlotBuffer buffer; + private CloudComm cloud; + private long sequencenumber; //Largest sequence number a client has received + private long localmachineid; + private TableStatus lastTableStatus; + static final int FREE_SLOTS = 10; //number of slots that should be kept free + static final int SKIP_THRESHOLD = 10; + private long liveslotcount = 0; + private int chance; + static final double RESIZE_MULTIPLE = 1.2; + static final double RESIZE_THRESHOLD = 0.75; + static final int REJECTED_THRESHOLD = 5; + private int resizethreshold; + private long lastliveslotseqn; //smallest sequence number with a live entry + private Random random = new Random(); + + private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building + private Queue pendingTransQueue = null; // Queue of pending transactions + + + public Table(String baseurl, String password, long _localmachineid) { + localmachineid = _localmachineid; + buffer = new SlotBuffer(); + numslots = buffer.capacity(); + setResizeThreshold(); + sequencenumber = 0; + cloud = new CloudComm(this, baseurl, password); + lastliveslotseqn = 1; + + pendingTransQueue = new LinkedList(); + } + + public Table(CloudComm _cloud, long _localmachineid) { + localmachineid = _localmachineid; + buffer = new SlotBuffer(); + numslots = buffer.capacity(); + setResizeThreshold(); + sequencenumber = 0; + cloud = _cloud; + + pendingTransQueue = new LinkedList(); + } + + public void rebuild() { + Slot[] newslots = cloud.getSlots(sequencenumber + 1); + validateandupdate(newslots, true); + } + + public void update() { + Slot[] newslots = cloud.getSlots(sequencenumber + 1); + + validateandupdate(newslots, false); + } + + public IoTString get(IoTString key) { + KeyValue kv = table.get(key); + if (kv != null) + return kv.getValue(); + else + return null; + } + + public void initTable() { + cloud.setSalt();//Set the salt + Slot s = new Slot(this, 1, localmachineid); + TableStatus status = new TableStatus(s, numslots); + s.addEntry(status); + Slot[] array = cloud.putSlot(s, numslots); + if (array == null) { + array = new Slot[] {s}; + /* update data structure */ + validateandupdate(array, true); + } else { + throw new Error("Error on initialization"); + } + } + + public String toString() { + return table.toString(); + } + + public void startTransaction() { + // Create a new transaction, invalidates any old pending transactions. + pendingTransBuild = new PendingTransaction(); + } + + public void commitTransaction() { + + // Add the pending transaction to the queue + pendingTransQueue.add(pendingTransBuild); + + while (!pendingTransQueue.isEmpty()) { + if (tryput( pendingTransQueue.peek(), false)) { + pendingTransQueue.poll(); + } + } + } + + public void addKV(IoTString key, IoTString value) { + KeyValue kv = new KeyValue(key, value); + pendingTransBuild.addKV(kv); + } + + public void addGuard(IoTString key, IoTString value) { + KeyValue kv = new KeyValue(key, value); + pendingTransBuild.addKV(kv); + } + + + + + + + void decrementLiveCount() { + liveslotcount--; + } + + + private void setResizeThreshold() { + int resize_lower = (int) (RESIZE_THRESHOLD * numslots); + resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower); + } + + private boolean tryput(PendingTransaction pendingTrans, boolean resize) { + Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + int newsize = 0; + if (liveslotcount > resizethreshold) { + resize = true; //Resize is forced + } + + if (resize) { + newsize = (int) (numslots * RESIZE_MULTIPLE); + TableStatus status = new TableStatus(s, newsize); + s.addEntry(status); + } + + if (! rejectedmessagelist.isEmpty()) { + /* TODO: We should avoid generating a rejected message entry if + * there is already a sufficient entry in the queue (e.g., + * equalsto value of true and same sequence number). */ + + long old_seqn = rejectedmessagelist.firstElement(); + if (rejectedmessagelist.size() > REJECTED_THRESHOLD) { + long new_seqn = rejectedmessagelist.lastElement(); + RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false); + s.addEntry(rm); + } else { + long prev_seqn = -1; + int i = 0; + /* Go through list of missing messages */ + for (; i < rejectedmessagelist.size(); i++) { + long curr_seqn = rejectedmessagelist.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + if (s_msg != null) + break; + prev_seqn = curr_seqn; + } + /* Generate rejected message entry for missing messages */ + if (prev_seqn != -1) { + RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false); + s.addEntry(rm); + } + /* Generate rejected message entries for present messages */ + for (; i < rejectedmessagelist.size(); i++) { + long curr_seqn = rejectedmessagelist.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + long machineid = s_msg.getMachineID(); + RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); + s.addEntry(rm); + } + } + } + + long newestseqnum = buffer.getNewestSeqNum(); + long oldestseqnum = buffer.getOldestSeqNum(); + if (lastliveslotseqn < oldestseqnum) + lastliveslotseqn = oldestseqnum; + + long seqn = lastliveslotseqn; + boolean seenliveslot = false; + long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full + long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point + + for (; seqn < threshold; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (! seenliveslot) + lastliveslotseqn = seqn; + + if (! prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) { + s.addEntry(liveentry); + } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue + if (!resize) { + System.out.print("B"); //? + return tryput(pendingTrans, true); + } + } + } + } + + + Transaction trans = new Transaction(s, + s.getSequenceNumber(), + localmachineid, + pendingTrans.getKVUpdates(), + pendingTrans.getGuard()); + boolean insertedTrans = false; + if (s.hasSpace(trans)) { + s.addEntry(trans); + insertedTrans=true; + } + + /* now go through live entries from least to greatest sequence number until + * either all live slots added, or the slot doesn't have enough room + * for SKIP_THRESHOLD consecutive entries*/ + int skipcount = 0; + search: + for (; seqn <= newestseqnum; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (!seenliveslot) + lastliveslotseqn = seqn; + + if (!prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) + s.addEntry(liveentry); + else { + skipcount++; + if (skipcount > SKIP_THRESHOLD) + break search; + } + } + } + + int max = 0; + if (resize) + max = newsize; + Slot[] array = cloud.putSlot(s, max); + if (array == null) { + array = new Slot[] {s}; + rejectedmessagelist.clear(); + } else { + if (array.length == 0) + throw new Error("Server Error: Did not send any slots"); + rejectedmessagelist.add(s.getSequenceNumber()); + insertedTrans = false; + } + + /* update data structure */ + validateandupdate(array, true); + + return insertedTrans; + } + + private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) { + /* The cloud communication layer has checked slot HMACs already + before decoding */ + if (newslots.length == 0) return; + + long firstseqnum = newslots[0].getSequenceNumber(); + if (firstseqnum <= sequencenumber) + throw new Error("Server Error: Sent older slots!"); + + SlotIndexer indexer = new SlotIndexer(newslots, buffer); + checkHMACChain(indexer, newslots); + + HashSet machineSet = new HashSet(lastmessagetable.keySet()); // + + initExpectedSize(firstseqnum); + for (Slot slot : newslots) { + processSlot(indexer, slot, acceptupdatestolocal, machineSet); + updateExpectedSize(); + } + + /* If there is a gap, check to see if the server sent us everything. */ + if (firstseqnum != (sequencenumber + 1)) { + checkNumSlots(newslots.length); + if (!machineSet.isEmpty()) + throw new Error("Missing record for machines: " + machineSet); + } + + commitNewMaxSize(); + + /* Commit new to slots. */ + for (Slot slot : newslots) { + buffer.putSlot(slot); + liveslotcount++; + } + sequencenumber = newslots[newslots.length - 1].getSequenceNumber(); + } + + private int expectedsize, currmaxsize; + + private void checkNumSlots(int numslots) { + if (numslots != expectedsize) + throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots); + } + + private void initExpectedSize(long firstsequencenumber) { + long prevslots = firstsequencenumber; + expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots; + currmaxsize = numslots; + } + + private void updateExpectedSize() { + expectedsize++; + if (expectedsize > currmaxsize) + expectedsize = currmaxsize; + } + + private void updateCurrMaxSize(int newmaxsize) { + currmaxsize = newmaxsize; + } + + private void commitNewMaxSize() { + if (numslots != currmaxsize) + buffer.resize(currmaxsize); + + numslots = currmaxsize; + setResizeThreshold(); + } + + // private void processEntry(KeyValue entry, SlotIndexer indexer) { + // IoTString key=entry.getKey(); + // KeyValue oldvalue=table.get(key); + // if (oldvalue != null) { + // oldvalue.setDead(); + // } + // table.put(key, entry); + // } + + private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet machineSet) { + updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); + } + + private void processEntry(RejectedMessage entry, SlotIndexer indexer) { + long oldseqnum = entry.getOldSeqNum(); + long newseqnum = entry.getNewSeqNum(); + boolean isequal = entry.getEqual(); + long machineid = entry.getMachineID(); + for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) { + Slot slot = indexer.getSlot(seqnum); + if (slot != null) { + long slotmachineid = slot.getMachineID(); + if (isequal != (slotmachineid == machineid)) { + throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum); + } + } + } + + HashSet watchset = new HashSet(); + for (Map.Entry > lastmsg_entry : lastmessagetable.entrySet()) { + long entry_mid = lastmsg_entry.getKey(); + /* We've seen it, don't need to continue to watch. Our next + * message will implicitly acknowledge it. */ + if (entry_mid == localmachineid) + continue; + Pair v = lastmsg_entry.getValue(); + long entry_seqn = v.getFirst(); + if (entry_seqn < newseqnum) { + addWatchList(entry_mid, entry); + watchset.add(entry_mid); + } + } + if (watchset.isEmpty()) + entry.setDead(); + else + entry.setWatchSet(watchset); + } + + private void addWatchList(long machineid, RejectedMessage entry) { + HashSet entries = watchlist.get(machineid); + if (entries == null) + watchlist.put(machineid, entries = new HashSet()); + entries.add(entry); + } + + private void processEntry(TableStatus entry, SlotIndexer indexer) { + int newnumslots = entry.getMaxSlots(); + updateCurrMaxSize(newnumslots); + if (lastTableStatus != null) + lastTableStatus.setDead(); + lastTableStatus = entry; + } + + private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet machineSet) { + machineSet.remove(machineid); + + HashSet watchset = watchlist.get(machineid); + if (watchset != null) { + for (Iterator rmit = watchset.iterator(); rmit.hasNext(); ) { + RejectedMessage rm = rmit.next(); + if (rm.getNewSeqNum() <= seqnum) { + /* Remove it from our watchlist */ + rmit.remove(); + /* Decrement machines that need to see this notification */ + rm.removeWatcher(machineid); + } + } + } + + if (machineid == localmachineid) { + /* Our own messages are immediately dead. */ + if (liveness instanceof LastMessage) { + ((LastMessage)liveness).setDead(); + } else if (liveness instanceof Slot) { + ((Slot)liveness).setDead(); + } else { + throw new Error("Unrecognized type"); + } + } + + + Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); + if (lastmsgentry == null) + return; + + long lastmsgseqnum = lastmsgentry.getFirst(); + Liveness lastentry = lastmsgentry.getSecond(); + if (machineid != localmachineid) { + if (lastentry instanceof LastMessage) { + ((LastMessage)lastentry).setDead(); + } else if (lastentry instanceof Slot) { + ((Slot)lastentry).setDead(); + } else { + throw new Error("Unrecognized type"); + } + } + + if (machineid == localmachineid) { + if (lastmsgseqnum != seqnum && !acceptupdatestolocal) + throw new Error("Server Error: Mismatch on local machine sequence number"); + } else { + if (lastmsgseqnum > seqnum) + throw new Error("Server Error: Rollback on remote machine sequence number"); + } + } + + private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet machineSet) { + updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet); + for (Entry entry : slot.getEntries()) { + switch (entry.getType()) { + // case Entry.TypeKeyValue: + // processEntry((KeyValue)entry, indexer); + // break; + + case Entry.TypeLastMessage: + processEntry((LastMessage)entry, indexer, machineSet); + break; + + case Entry.TypeRejectedMessage: + processEntry((RejectedMessage)entry, indexer); + break; + + case Entry.TypeTableStatus: + processEntry((TableStatus)entry, indexer); + break; + + default: + throw new Error("Unrecognized type: " + entry.getType()); + } + } + } + + private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) { + for (int i = 0; i < newslots.length; i++) { + Slot currslot = newslots[i]; + Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1); + if (prevslot != null && + !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC())) + throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot); + } + } +} diff --git a/src2/java/iotcloud/TableStatus.java b/src2/java/iotcloud/TableStatus.java new file mode 100644 index 0000000..62f3a6d --- /dev/null +++ b/src2/java/iotcloud/TableStatus.java @@ -0,0 +1,45 @@ +package iotcloud; +import java.nio.ByteBuffer; + +/** + * TableStatus entries record the current size of the data structure + * in slots. Used to remember the size and to perform resizes. + * @author Brian Demsky + * @version 1.0 + */ + + +class TableStatus extends Entry { + private int maxslots; + + TableStatus(Slot slot, int _maxslots) { + super(slot); + maxslots=_maxslots; + } + + int getMaxSlots() { + return maxslots; + } + + static Entry decode(Slot slot, ByteBuffer bb) { + int maxslots=bb.getInt(); + return new TableStatus(slot, maxslots); + } + + void encode(ByteBuffer bb) { + bb.put(Entry.TypeTableStatus); + bb.putInt(maxslots); + } + + int getSize() { + return Integer.BYTES+Byte.BYTES; + } + + byte getType() { + return Entry.TypeTableStatus; + } + + Entry getCopy(Slot s) { + return new TableStatus(s, maxslots); + } +} diff --git a/src2/java/iotcloud/Test.java b/src2/java/iotcloud/Test.java new file mode 100644 index 0000000..e70d88e --- /dev/null +++ b/src2/java/iotcloud/Test.java @@ -0,0 +1,95 @@ +package iotcloud; + +/** + * Test cases. + * @author Brian Demsky + * @version 1.0 + */ + +public class Test { + public static void main(String[] args) { + // if(args[0].equals("2")) + // test2(); + // else if(args[0].equals("3")) + // test3(); + // else if(args[0].equals("4")) + // test4(); + // else if(args[0].equals("5")) + // test5(); + + } + + + + // static Thread buildThread(String prefix, Table t) { + // return new Thread() { + // public void run() { + // for(int i=0; i<10000; i++) { + // String a=prefix+i; + // IoTString ia=new IoTString(a); + // t.put(ia, ia); + // System.out.println(ia+"->"+t.get(ia)); + // } + // } + // }; + // } + + // static void test5() { + // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + // t1.rebuild(); + // System.out.println(t1); + // } + + // static void test4() { + // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + // t1.rebuild(); + // t2.rebuild(); + // Thread thr1=buildThread("p1", t1); + // Thread thr2=buildThread("p2", t2); + // thr1.start(); + // thr2.start(); + // try { + // thr1.join(); + // thr2.join(); + // } catch (Exception e) { + // e.printStackTrace(); + // } + // } + + // static void test3() { + // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + // t1.rebuild(); + // t2.rebuild(); + // for(int i=0; i<600; i++) { + // String a="STR"+i; + // String b="ABR"+i; + // IoTString ia=new IoTString(a); + // IoTString ib=new IoTString(b); + // t1.put(ia, ia); + // t2.put(ib, ib); + // t1.update(); + // System.out.println(ib+"->"+t1.get(ib)); + // System.out.println(ia+"->"+t2.get(ia)); + // } + // } + + // static void test2() { + // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + // t1.initTable(); + // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + // t2.update(); + // for(int i=0; i<600; i++) { + // String a="STR"+i; + // String b="ABR"+i; + // IoTString ia=new IoTString(a); + // IoTString ib=new IoTString(b); + // t1.put(ia, ia); + // t2.put(ib, ib); + // t1.update(); + // System.out.println(ib+"->"+t1.get(ib)); + // System.out.println(ia+"->"+t2.get(ia)); + // } + // } +} diff --git a/src2/java/iotcloud/Transaction.java b/src2/java/iotcloud/Transaction.java new file mode 100644 index 0000000..d1a9933 --- /dev/null +++ b/src2/java/iotcloud/Transaction.java @@ -0,0 +1,84 @@ +package iotcloud; + +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.HashSet; + +class Transaction extends Entry { + + private long seqnum; + private long machineid; + private Set keyValueUpdateSet; + private Guard guard; + + public Transaction(Slot slot, long _seqnum, long _machineid, Set _keyValueUpdateSet, Guard _guard) { + super(slot); + seqnum = _seqnum; + machineid = _machineid; + keyValueUpdateSet = _keyValueUpdateSet; + guard = _guard; + } + + public long getMachineID() { + return machineid; + } + + public long getSequenceNumber() { + return seqnum; + } + + public byte getType() { + return Entry.TypeLastMessage; + } + + public int getSize() { + int size = 2 * Long.BYTES + Byte.BYTES; // seq, machine id, entry type + size += Integer.BYTES; // number of KV's + + // Size of each KV + for (KeyValue kv : keyValueUpdateSet) { + size += kv.getSize(); + } + + // Size of the guard + size += guard.getSize(); + + return size; + } + + + public void encode(ByteBuffer bb) { + bb.put(Entry.TypeTransaction); + bb.putLong(seqnum); + bb.putLong(machineid); + + for (KeyValue kv : keyValueUpdateSet) { + kv.encode(bb); + } + + guard.encode(bb); + } + + static Entry decode(Slot slot, ByteBuffer bb) { + long seqnum = bb.getLong(); + long machineid = bb.getLong(); + int numberOfKeys = bb.getInt(); + + Set kvSet = new HashSet(); + + for (int i = 0; i < numberOfKeys; i++) { + KeyValue kv = KeyValue.decode(bb); + kvSet.add(kv); + } + + Guard guard = Guard.decode(bb); + + return new Transaction(slot, seqnum, machineid, kvSet, guard); + } + + + + public Entry getCopy(Slot s) { + return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard); + } +} \ No newline at end of file diff --git a/src2/java/iotcloud/issues.txt b/src2/java/iotcloud/issues.txt new file mode 100644 index 0000000..4c246b6 --- /dev/null +++ b/src2/java/iotcloud/issues.txt @@ -0,0 +1,2 @@ +1) add better resizing support...gets stuck when it is full now... +2) Transaction does not check arbitrator is the same for all keys and guards \ No newline at end of file diff --git a/src2/script/C.cfg b/src2/script/C.cfg new file mode 100644 index 0000000..6165d83 --- /dev/null +++ b/src2/script/C.cfg @@ -0,0 +1,37 @@ +indent_with_tabs = 2 +indent_cmt_with_tabs = True +indent_columns = 2 +indent_class = True +output_tab_size = 2 +nl_if_brace = Remove +nl_brace_else = Remove +nl_elseif_brace = Remove +nl_struct_brace = Remove +nl_union_brace = Remove +nl_fcall_brace = Remove +nl_for_brace = Remove +nl_fdef_brace = Remove +nl_while_brace = Remove +nl_do_brace = Remove +nl_brace_while = Remove +nl_switch_brace = Remove +nl_before_case = True +nl_try_brace = Remove +nl_catch_brace = Remove +nl_brace_catch = Remove +sp_func_proto_paren = Remove +sp_func_def_paren = Remove +sp_inside_fparens = remove +sp_inside_fparen = remove +sp_func_call_paren = Remove +sp_fparen_brace = Add +sp_sparen_brace = Add +sp_paren_brace = Add +sp_else_brace = Add +sp_brace_else = Add +sp_catch_brace = Add +sp_brace_catch = Add +sp_try_brace = Add +sp_after_sparen = Add +sp_cond_colon = remove +sp_cond_question = remove diff --git a/src2/script/java.cfg b/src2/script/java.cfg new file mode 100644 index 0000000..6165d83 --- /dev/null +++ b/src2/script/java.cfg @@ -0,0 +1,37 @@ +indent_with_tabs = 2 +indent_cmt_with_tabs = True +indent_columns = 2 +indent_class = True +output_tab_size = 2 +nl_if_brace = Remove +nl_brace_else = Remove +nl_elseif_brace = Remove +nl_struct_brace = Remove +nl_union_brace = Remove +nl_fcall_brace = Remove +nl_for_brace = Remove +nl_fdef_brace = Remove +nl_while_brace = Remove +nl_do_brace = Remove +nl_brace_while = Remove +nl_switch_brace = Remove +nl_before_case = True +nl_try_brace = Remove +nl_catch_brace = Remove +nl_brace_catch = Remove +sp_func_proto_paren = Remove +sp_func_def_paren = Remove +sp_inside_fparens = remove +sp_inside_fparen = remove +sp_func_call_paren = Remove +sp_fparen_brace = Add +sp_sparen_brace = Add +sp_paren_brace = Add +sp_else_brace = Add +sp_brace_else = Add +sp_catch_brace = Add +sp_brace_catch = Add +sp_try_brace = Add +sp_after_sparen = Add +sp_cond_colon = remove +sp_cond_question = remove diff --git a/src2/script/makefile b/src2/script/makefile new file mode 100644 index 0000000..ac2da6e --- /dev/null +++ b/src2/script/makefile @@ -0,0 +1,4 @@ +tabbing: + uncrustify -c java.cfg --no-backup $$(find .. -name "*.java") + uncrustify -c C.cfg --no-backup $$(find .. -name "*.cpp") + uncrustify -c C.cfg --no-backup $$(find .. -name "*.h") diff --git a/src2/server/.dir-locals.el b/src2/server/.dir-locals.el new file mode 100644 index 0000000..e166a2e --- /dev/null +++ b/src2/server/.dir-locals.el @@ -0,0 +1,2 @@ +((nil . ((indent-tabs-mode . t)))) + diff --git a/src2/server/Makefile b/src2/server/Makefile new file mode 100644 index 0000000..8eee1fa --- /dev/null +++ b/src2/server/Makefile @@ -0,0 +1,15 @@ +CPPFLAGS=-O0 -g -Wall + +all: iotcloud.fcgi + +iotcloud.fcgi: iotcloud.o iotquery.o + g++ $(CPPFLAGS) -o iotcloud.fcgi iotcloud.o iotquery.o -lfcgi -lfcgi++ + +iotcloud.o: iotcloud.cpp iotquery.h + g++ $(CPPFLAGS) -c -o iotcloud.o iotcloud.cpp + +iotquery.o: iotquery.cpp iotquery.h + g++ $(CPPFLAGS) -c -o iotquery.o iotquery.cpp + +clean: + rm *.o iotcloud.fcgi diff --git a/src2/server/README.txt b/src2/server/README.txt new file mode 100644 index 0000000..6eb138f --- /dev/null +++ b/src2/server/README.txt @@ -0,0 +1,32 @@ +1) Requires apache2 +2) Requires fastcgi (libapache2-mod-fastcgi and libfcgi-dev) + +Setup on ubuntu +1) Install modules + +2) Add .htaccess file in /var/www/html +RewriteEngine on +RewriteBase / +SetHandler cgi-script +RewriteRule ^([a-zA-Z0-9._]*\.iotcloud/([a-zA-Z0-9._]*))$ /cgi-bin/iotcloud.fcgi/$1 + +3) Create account directory. For example, create the directory test.iotcloud in /var/www/html + -- To password protect, create the following .htaccess file in the account directory: +AuthType Basic +AuthName "Private" +AuthUserFile /var/www/html/foo.iotcloud/.htpasswd +Require valid-user + +4) In apache2.conf, add to the /var/www directory section: +AllowOverride FileInfo AuthConfig + +5) In the sites-enabled/000-default.conf file, add the line: +SetEnv IOTCLOUD_ROOT /iotcloud/ + +6) Create the /iotcloud directory. + +7) Create the account directory in the /iotcloud directory. For example, test.iotcloud and give it permissions that the apache daemon can write to. + +8) Compile cloud server by typing make + +9) Copy it to the cgi-bin directory. diff --git a/src2/server/iotcloud.cpp b/src2/server/iotcloud.cpp new file mode 100644 index 0000000..bb9eff8 --- /dev/null +++ b/src2/server/iotcloud.cpp @@ -0,0 +1,40 @@ +#include +#include "iotquery.h" + +using namespace std; + + +int main(void) { + // Backup the stdio streambufs + streambuf * cin_streambuf = cin.rdbuf(); + streambuf * cout_streambuf = cout.rdbuf(); + streambuf * cerr_streambuf = cerr.rdbuf(); + + FCGX_Request request; + + FCGX_Init(); + FCGX_InitRequest(&request, 0, 0); + + while (FCGX_Accept_r(&request) == 0) { + fcgi_streambuf cin_fcgi_streambuf(request.in); + fcgi_streambuf cout_fcgi_streambuf(request.out); + fcgi_streambuf cerr_fcgi_streambuf(request.err); + + cin.rdbuf(&cin_fcgi_streambuf); + cout.rdbuf(&cout_fcgi_streambuf); + cerr.rdbuf(&cerr_fcgi_streambuf); + + IoTQuery * iotquery=new IoTQuery(&request); + iotquery->processQuery(); + + delete iotquery; + } + + // restore stdio streambufs + cin.rdbuf(cin_streambuf); + cout.rdbuf(cout_streambuf); + cerr.rdbuf(cerr_streambuf); + + return 0; +} + diff --git a/src2/server/iotquery.cpp b/src2/server/iotquery.cpp new file mode 100644 index 0000000..0b1c4b3 --- /dev/null +++ b/src2/server/iotquery.cpp @@ -0,0 +1,517 @@ +#include "iotquery.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +const char * query_str="QUERY_STRING"; +const char * uri_str="REQUEST_URI"; +const char * method_str="REQUEST_METHOD"; +const char * iotcloudroot_str="IOTCLOUD_ROOT"; +const char * length_str="CONTENT_LENGTH"; + +IoTQuery::IoTQuery(FCGX_Request *request) : + request(request), + data(NULL), + directory(NULL), + uri(NULL), + query(NULL), + method(NULL), + iotcloudroot(NULL), + length(0), + oldestentry(0), + newestentry(0), + requestsequencenumber(0), + numqueueentries(DEFAULT_SIZE), + fd(-1), + reqGetSlot(false), + reqPutSlot(false), + reqSetSalt(false), + reqGetSalt(false) { +} + +IoTQuery::~IoTQuery() { + if (fd >= 0) + close(fd); + if (directory) + delete directory; + if (data) + delete data; +} + +/** + * Returns true if the account directory exists. + */ + +bool IoTQuery::checkDirectory() { + struct stat s; + int err=stat(directory, &s); + if (-1 == err) + return false; + return S_ISDIR(s.st_mode); +} + +/** + * Decodes query string from client. Extracts type of request, + * sequence number, and whether the request changes the number of + * slots. + */ + +void IoTQuery::decodeQuery() { + int len=strlen(query); + char * str=new char[len+1]; + memcpy(str, query, len+1); + char *tok_ptr=str; + + /* Parse commands */ + char *command=strsep(&tok_ptr, "&"); + if (strncmp(command, "req=putslot", 11) == 0) + reqPutSlot = true; + else if (strncmp(command, "req=getslot", 11) == 0) + reqGetSlot = true; + else if (strncmp(command, "req=setsalt", 11) == 0) + reqSetSalt = true; + else if (strncmp(command, "req=getsalt", 11) == 0) + reqGetSalt = true; + + /* Load Sequence Number for request */ + char *sequencenumber_str = strsep(&tok_ptr, "&"); + if (sequencenumber_str != NULL && + strncmp(sequencenumber_str, "seq=", 4) == 0) { + sequencenumber_str = strchr(sequencenumber_str, '='); + if (sequencenumber_str != NULL) { + requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10); + } + } + + /* don't allow a really old sequence number */ + if (requestsequencenumber < oldestentry) + requestsequencenumber = oldestentry; + + /* Update size if we get request */ + char * numqueueentries_str = tok_ptr; + if (numqueueentries_str != NULL && + strncmp(numqueueentries_str, "max=", 4) == 0) { + numqueueentries_str = strchr(numqueueentries_str, '=') + 1; + numqueueentries = strtoll(numqueueentries_str, NULL, 10); + } + + delete str; +} + +/** + * Helper function to write data to file. + */ + +void doWrite(int fd, char *data, long long length) { + long long offset=0; + do { + long long byteswritten=write(fd, &data[offset], length); + if (byteswritten > 0) { + length -= byteswritten; + offset += byteswritten; + } else { + cerr << "Bytes not written" << endl; + if (byteswritten < 0) { + cerr << strerror(errno) << " error writing slot file" << endl; + } + return; + } + } while(length != 0); +} + +/** Helper function to read data from file. */ +bool doRead(int fd, void *buf, int numbytes) { + int offset=0; + char *ptr=(char *)buf; + do { + int bytesread=read(fd, ptr+offset, numbytes); + if (bytesread > 0) { + offset += bytesread; + numbytes -= bytesread; + } else + return false; + } while (numbytes!=0); + return true; +} + +/** + * Function that handles a getSlot request. + */ + +void IoTQuery::getSlot() { + int numrequeststosend = (int)((newestentry-requestsequencenumber)+1); + if (numrequeststosend < 0) + numrequeststosend = 0; + long long numbytes = 0; + int filesizes[numrequeststosend]; + int fdarray[numrequeststosend]; + int index=0; + for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) { + struct stat st; + char *filename=getSlotFileName(seqn); + if (stat(filename, &st) == 0) { + fdarray[index]=open(filename, O_RDONLY); + filesizes[index]=st.st_size; + numbytes+=filesizes[index]; + } else { + fdarray[index]=-1; + filesizes[index]=0; + } + delete filename; + } + const char header[]="getslot"; + + /* Size is the header + the payload + space for number of requests + plus sizes of each slot */ + + long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; + char * response = new char[size]; + long long offset=0; + memcpy(response, header, sizeof(header)-1); + offset+=sizeof(header)-1; + int numreq=htonl(numrequeststosend); + memcpy(response + offset, &numreq, sizeof(numreq)); + offset+=sizeof(numrequeststosend); + for(int i=0; i=0) { + doRead(fdarray[i], response+offset, filesizes[i]); + offset+=filesizes[i]; + } + } + + /* Send the response out to the webserver. */ + sendResponse(response, size); + + /* Delete the response buffer and close the files. */ + delete response; + for(int i=0; i= 0) + close(fdarray[i]); + } +} + +/** + * The method setSalt handles a setSalt request from the client. + */ + +void IoTQuery::setSalt() { + /* Write the slot data we received to a SLOT file */ + char *filename = getSaltFileName(); + int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); + doWrite(saltfd, data, length); + char response[0]; + sendResponse(response, 0); + close(saltfd); + delete filename; +} + +/** + * The method getSalt handles a setSalt request from the client. + */ + +void IoTQuery::getSalt() { + /* Write the slot data we received to a SLOT file */ + char *filename = getSaltFileName(); + int filesize = 0; + struct stat st; + if (stat(filename, &st) == 0) { + filesize=st.st_size; + } else { + delete filename; + return; + } + int saltfd = open(filename, O_RDONLY); + int responsesize = filesize + sizeof(int); + char * response = new char[responsesize]; + doRead(saltfd, response+ sizeof(int), filesize); + int n_filesize=htonl(filesize); + *((int*) response) = n_filesize; + sendResponse(response, responsesize); + close(saltfd); + delete filename; + delete response; +} + +/** + * The method putSlot handles a putSlot request from the client + */ + +void IoTQuery::putSlot() { + /* Check if the request is stale and send update in that case. This + servers as an implicit failure of the request. */ + if (requestsequencenumber!=(newestentry+1)) { + getSlot(); + return; + } + + /* See if we have too many slots and if so, delete the old one */ + int numberofliveslots=(int) ((newestentry-oldestentry)+1); + if (numberofliveslots >= numqueueentries) { + removeOldestSlot(); + } + + /* Write the slot data we received to a SLOT file */ + char *filename = getSlotFileName(requestsequencenumber); + int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); + doWrite(slotfd, data, length); + close(slotfd); + delete filename; + newestentry = requestsequencenumber; + + /* Update the seuqence numbers and other status file information. */ + updateStatusFile(); + + /* Send response acknowledging success */ + char command[]="putslot"; + sendResponse(command, sizeof(command)-1); +} + +/** + * Method sends response. It wraps in appropriate headers for web + * server. + */ + +void IoTQuery::sendResponse(char * bytes, int len) { + cout << "Accept-Ranges: bytes\r\n" + << "Content-Length: " << len << "\r\n" + << "\r\n"; + cout.write(bytes, len); +} + +/** + * Computes the name for a slot file for the given sequence number. + */ + +char * IoTQuery::getSlotFileName(long long seqnum) { + int directorylen=strlen(directory); + + /* Size is 19 digits for ASCII representation of a long + 4 + characters for SLOT string + 1 character for null termination + + directory size*/ + + char * filename=new char[25+directorylen]; + snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum); + return filename; +} + +/** + * Computes the name for a salt file + */ + +char * IoTQuery::getSaltFileName() { + int directorylen=strlen(directory); + + /* Size is 4 characters for SALT string + 1 character for null + termination + directory size*/ + + char * filename=new char[6+directorylen]; + snprintf(filename, 6+directorylen, "%s/SALT", directory); + return filename; +} + +/** + * Removes the oldest slot file + */ + +void IoTQuery::removeOldestSlot() { + if (oldestentry!=0) { + char * filename=getSlotFileName(oldestentry); + unlink(filename); + delete filename; + } + oldestentry++; +} + +/** + * Processes the query sent to the fastcgi handler. + */ + +void IoTQuery::processQuery() { + getQuery(); + getDirectory(); + readData(); + + /* Verify that we receive a post request. */ + if (strncmp(method, "POST", 4) != 0) { + cerr << "Not POST Request" << endl; + return; + } + + /* Make sure the directory is okay. */ + if (directory == NULL || + !checkDirectory()) { + cerr << "Directory " << directory << " does not exist" << endl; + return; + } + + /* Get queue state from the status file. If it doesn't exist, + create it. */ + if (!openStatusFile()) { + cerr << "Failed to open status file" << endl; + return; + } + + /* Lock status file to keep other requests out. */ + flock(fd, LOCK_EX); + + /* Decode query. */ + decodeQuery(); + + /* Handle request. */ + if (reqGetSlot) + getSlot(); + else if (reqPutSlot) + putSlot(); + else if (reqSetSalt) + setSalt(); + else if (reqGetSalt) + getSalt(); + else { + cerr << "No recognized request" << endl; + return; + } +} + +/** + * Reads in data for request. This is used for the slot to be + * inserted. + */ + +void IoTQuery::readData() { + if (length) { + data = new char[length+1]; + memset(data, 0, length+1); + cin.read(data, length); + } + do { + char dummy; + cin >> dummy; + } while (!cin.eof()); +} + + +/** + * Reads relevant environmental variables to find out the request. + */ + +void IoTQuery::getQuery() { + uri = FCGX_GetParam(uri_str, request->envp); + query = FCGX_GetParam(query_str, request->envp); + method = FCGX_GetParam(method_str, request->envp); + iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp); + + /** We require the content-length header to be sent. */ + char * reqlength = FCGX_GetParam(length_str, request->envp); + if (reqlength) { + length=strtoll(reqlength, NULL, 10); + } else { + length=0; + } +} + +/** + * Initializes directory field from environmental variables. + */ + +void IoTQuery::getDirectory() { + char * split = strchr((char *)uri, '?'); + if (split == NULL) + return; + int split_len = (int) (split-uri); + int rootdir_len = strlen(iotcloudroot); + int directory_len = split_len + rootdir_len + 1; + directory = new char[directory_len]; + memcpy(directory, iotcloudroot, rootdir_len); + memcpy(directory + rootdir_len, uri, split_len); + directory[directory_len-1]=0; +} + +/** + * Helper function that is used to read the status file. + */ + +int doread(int fd, void *ptr, size_t count, off_t offset) { + do { + size_t bytesread=pread(fd, ptr, count, offset); + if (bytesread==count) { + return 1; + } else if (bytesread==0) { + return 0; + } + } while(1); +} + + +/** + * Writes the current state to the status file. + */ + +void IoTQuery::updateStatusFile() { + pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX); + pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD); + pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW); +} + +/** + * Reads in queue state from the status file. Returns true if + * successful. + */ + +bool IoTQuery::openStatusFile() { + char statusfile[]="queuestatus"; + int len=strlen(directory); + + char * filename=new char[len+sizeof(statusfile)+2]; + memcpy(filename, directory, len); + filename[len]='/'; + memcpy(filename+len+1, statusfile, sizeof(statusfile)); + filename[len+sizeof(statusfile)+1]=0; + fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR); + delete filename; + + if (fd < 0) { + cerr << strerror(errno) << " error opening statusfile" << endl; + return false; + } + + /* Read in queue size, oldest sequence number, and newest sequence number. */ + int size; + int needwrite=0; + if (doread(fd, &size, sizeof(size), OFFSET_MAX)) + numqueueentries=size; + else + needwrite=1; + + long long entry; + if (doread(fd, &entry, sizeof(entry), OFFSET_OLD)) + oldestentry=entry; + else + needwrite=1; + + if (doread(fd, &entry, sizeof(entry), OFFSET_NEW)) + newestentry=entry; + else + needwrite=1; + + if (needwrite) + updateStatusFile(); + + return true; +} + + diff --git a/src2/server/iotquery.h b/src2/server/iotquery.h new file mode 100644 index 0000000..ae39366 --- /dev/null +++ b/src2/server/iotquery.h @@ -0,0 +1,68 @@ +#ifndef IOTQUERY_H +#define IOTQUERY_H +#include +#include "fcgio.h" +#include "fcgi_stdio.h" + +#define DEFAULT_SIZE 128 +#define OFFSET_MAX 0 +#define OFFSET_OLD 4 +#define OFFSET_NEW 12 + +class IoTQuery { +public: + IoTQuery(FCGX_Request * request); + ~IoTQuery(); + void processQuery(); + +private: + void sendResponse(char *data, int length); + void getQuery(); + void getDirectory(); + void readData(); + bool checkDirectory(); + bool openStatusFile(); + void updateStatusFile(); + void decodeQuery(); + void getSlot(); + void putSlot(); + void setSalt(); + void getSalt(); + void removeOldestSlot(); + char * getSlotFileName(long long); + char * getSaltFileName(); + + FCGX_Request * request; + char *data; + /* Directory slot files are placed in. */ + char *directory; + /* Full URI from Apache */ + const char * uri; + /* Query portion of URI */ + const char * query; + /* Type of request: GET or PUT */ + const char * method; + /* Root directory for all accounts */ + const char * iotcloudroot; + /* Expected length of data from client */ + long long length; + /* Sequence number for oldest slot */ + long long oldestentry; + /* Sequence number for newest slot */ + long long newestentry; + /* Sequence number from request */ + long long requestsequencenumber; + /* Size of queue */ + int numqueueentries; + /* fd for queuestatus file */ + int fd; + /* Is the request to get a slot? */ + bool reqGetSlot; + /* Is the request to put a slot? */ + bool reqPutSlot; + /* Is the request to set the salt? */ + bool reqSetSalt; + /* Is the request to get the salt? */ + bool reqGetSalt; +}; +#endif -- 2.34.1