class CloudComm {
private static final int SALT_SIZE = 8;
- private static final int TIMEOUT_MILLIS = 25; // 100
+ private static final int TIMEOUT_MILLIS = 2000; // 100
/** Sets the size for the HMAC. */
static final int HMAC_SIZE = 32;
private Thread localServerThread = null;
private boolean doEnd = false;
+ private TimingSingleton timer = null;
+
/**
* Empty Constructor needed for child class.
*/
CloudComm() {
+ timer = TimingSingleton.getInstance();
}
/**
* Constructor for actual use. Takes in the url and password.
*/
CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) {
+ timer = TimingSingleton.getInstance();
this.table = _table;
this.baseurl = _baseurl;
this.password = _password;
random.nextBytes(saltTmp);
URL url = new URL(baseurl + "?req=setsalt");
+
+ timer.startTime();
URLConnection con = url.openConnection();
HttpURLConnection http = (HttpURLConnection) con;
http.setFixedLengthStreamingMode(saltTmp.length);
http.setDoOutput(true);
http.setConnectTimeout(TIMEOUT_MILLIS);
+
+
http.connect();
OutputStream os = http.getOutputStream();
os.write(saltTmp);
os.flush();
-
+
int responsecode = http.getResponseCode();
if (responsecode != HttpURLConnection.HTTP_OK) {
// TODO: Remove this print
throw new Error("Invalid response");
}
+ timer.endTime();
+
salt = saltTmp;
} catch (Exception e) {
// e.printStackTrace();
+ timer.endTime();
throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
}
}
}
try {
+ timer.startTime();
con = url.openConnection();
http = (HttpURLConnection) con;
http.setRequestMethod("POST");
http.setConnectTimeout(TIMEOUT_MILLIS);
http.setReadTimeout(TIMEOUT_MILLIS);
+
+
http.connect();
+ timer.endTime();
} catch (SocketTimeoutException e) {
+ timer.endTime();
throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout);
} catch (Exception e) {
// e.printStackTrace();
try {
+ timer.startTime();
+
int responsecode = http.getResponseCode();
if (responsecode != HttpURLConnection.HTTP_OK) {
// TODO: Remove this print
byte [] tmp = new byte[salt_length];
dis.readFully(tmp);
salt = tmp;
+ timer.endTime();
+
return true;
} else {
+ timer.endTime();
+
return false;
}
} catch (SocketTimeoutException e) {
+ timer.endTime();
+
throw new ServerException("getSalt failed", ServerException.TypeInputTimeout);
} catch (Exception e) {
// e.printStackTrace();
byte[] bytes = slot.encode(mac);
bytes = encryptCipher.doFinal(bytes);
+
+
+
url = buildRequest(true, sequencenumber, max);
+
+ timer.startTime();
con = url.openConnection();
http = (HttpURLConnection) con;
os.write(bytes);
os.flush();
+ timer.endTime();
+
+
// System.out.println("Bytes Sent: " + bytes.length);
} catch (ServerException e) {
+ timer.endTime();
+
throw e;
} catch (SocketTimeoutException e) {
+ timer.endTime();
+
throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
} catch (Exception e) {
// e.printStackTrace();
try {
+ timer.startTime();
InputStream is = http.getInputStream();
DataInputStream dis = new DataInputStream(is);
byte[] resptype = new byte[7];
dis.readFully(resptype);
+ timer.endTime();
if (Arrays.equals(resptype, "getslot".getBytes()))
+ {
return processSlots(dis);
+ }
else if (Arrays.equals(resptype, "putslot".getBytes()))
+ {
return null;
+ }
else
throw new Error("Bad response to putslot");
} catch (SocketTimeoutException e) {
+ timer.endTime();
throw new ServerException("putSlot failed", ServerException.TypeInputTimeout);
} catch (Exception e) {
// e.printStackTrace();
}
url = buildRequest(false, sequencenumber, 0);
+ timer.startTime();
con = url.openConnection();
http = (HttpURLConnection) con;
http.setRequestMethod("POST");
http.setConnectTimeout(TIMEOUT_MILLIS);
http.setReadTimeout(TIMEOUT_MILLIS);
+
+
+
http.connect();
+ timer.endTime();
+
} catch (SocketTimeoutException e) {
+ timer.endTime();
+
throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout);
} catch (ServerException e) {
+ timer.endTime();
+
throw e;
} catch (Exception e) {
// e.printStackTrace();
}
try {
- InputStream is = http.getInputStream();
+
+ timer.startTime();
+ InputStream is = http.getInputStream();
DataInputStream dis = new DataInputStream(is);
byte[] resptype = new byte[7];
+
dis.readFully(resptype);
+ timer.endTime();
+
if (!Arrays.equals(resptype, "getslot".getBytes()))
throw new Error("Bad Response: " + new String(resptype));
- else
- return processSlots(dis);
+
+ return processSlots(dis);
} catch (SocketTimeoutException e) {
+ timer.endTime();
+
throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
} catch (Exception e) {
// e.printStackTrace();
DataOutputStream output = new DataOutputStream(socket.getOutputStream());
DataInputStream input = new DataInputStream(socket.getInputStream());
+
+ timer.startTime();
// Send data to output (length of data, the data)
output.writeInt(encryptedData.length);
output.write(encryptedData, 0, encryptedData.length);
int lengthOfReturnData = input.readInt();
byte[] returnData = new byte[lengthOfReturnData];
input.readFully(returnData);
+
+ timer.endTime();
+
returnData = decryptCipher.doFinal(returnData);
// We are done with this socket
byte[] readData = new byte[dataSize];
input.readFully(readData);
+ timer.endTime();
+
// Decrypt the data
readData = decryptCipher.doFinal(readData);
// Encrypt the data for sending
byte[] encryptedData = encryptCipher.doFinal(totalData);
+
+ timer.startTime();
// Send data to output (length of data, the data)
output.writeInt(encryptedData.length);
output.write(encryptedData, 0, encryptedData.length);
--- /dev/null
+package iotcloud;
+
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+import javax.crypto.*;
+import javax.crypto.spec.*;
+import java.security.SecureRandom;
+
+/**
+ * This class provides a communication API to the webserver. It also
+ * validates the HMACs on the slots and handles encryption.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+class CloudComm {
+ private static final int SALT_SIZE = 8;
+ private static final int TIMEOUT_MILLIS = 25; // 100
+
+ /** Sets the size for the HMAC. */
+ static final int HMAC_SIZE = 32;
+
+ private String baseurl;
+ private Cipher encryptCipher;
+ private Cipher decryptCipher;
+ private Mac mac;
+ private String password;
+ private SecureRandom random;
+ private byte salt[];
+ private Table table;
+ private int listeningPort = -1;
+ private Thread localServerThread = null;
+ private boolean doEnd = false;
+
+ /**
+ * Empty Constructor needed for child class.
+ */
+ CloudComm() {
+ }
+
+ /**
+ * Constructor for actual use. Takes in the url and password.
+ */
+ CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) {
+ this.table = _table;
+ this.baseurl = _baseurl;
+ this.password = _password;
+ this.random = new SecureRandom();
+ this.listeningPort = _listeningPort;
+
+ if (this.listeningPort > 0) {
+ localServerThread = new Thread(new Runnable() {
+ public void run() {
+ localServerWorkerFunction();
+ }
+ });
+ localServerThread.start();
+ }
+ }
+
+ /**
+ * Generates Key from password.
+ */
+ private SecretKeySpec initKey() {
+ try {
+ PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(),
+ salt,
+ 65536,
+ 128);
+ SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
+ return new SecretKeySpec(tmpkey.getEncoded(), "AES");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed generating key.");
+ }
+ }
+
+ /**
+ * Inits all the security stuff
+ */
+ public void initSecurity() throws ServerException {
+ // try to get the salt and if one does not exist set one
+ if (!getSalt()) {
+ //Set the salt
+ setSalt();
+ }
+
+ initCrypt();
+ }
+
+ /**
+ * Inits the HMAC generator.
+ */
+ private void initCrypt() {
+
+ if (password == null) {
+ return;
+ }
+
+ try {
+ SecretKeySpec key = initKey();
+ password = null; // drop password
+ mac = Mac.getInstance("HmacSHA256");
+ mac.init(key);
+ encryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+ encryptCipher.init(Cipher.ENCRYPT_MODE, key);
+ decryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+ decryptCipher.init(Cipher.DECRYPT_MODE, key);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed To Initialize Ciphers");
+ }
+ }
+
+ /*
+ * Builds the URL for the given request.
+ */
+ private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
+ String reqstring = isput ? "req=putslot" : "req=getslot";
+ String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber;
+ if (maxentries != 0)
+ urlstr += "&max=" + maxentries;
+ return new URL(urlstr);
+ }
+
+ private void setSalt() throws ServerException {
+
+ if (salt != null) {
+ // Salt already sent to server so dont set it again
+ return;
+ }
+
+ try {
+ byte[] saltTmp = new byte[SALT_SIZE];
+ random.nextBytes(saltTmp);
+
+ URL url = new URL(baseurl + "?req=setsalt");
+ URLConnection con = url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(saltTmp.length);
+ http.setDoOutput(true);
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.connect();
+
+ OutputStream os = http.getOutputStream();
+ os.write(saltTmp);
+ os.flush();
+
+ int responsecode = http.getResponseCode();
+ if (responsecode != HttpURLConnection.HTTP_OK) {
+ // TODO: Remove this print
+ System.out.println(responsecode);
+ throw new Error("Invalid response");
+ }
+
+ salt = saltTmp;
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
+ }
+ }
+
+ private boolean getSalt() throws ServerException {
+ URL url = null;
+ URLConnection con = null;
+ HttpURLConnection http = null;
+
+ try {
+ url = new URL(baseurl + "?req=getsalt");
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlot failed");
+ }
+ try {
+
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.setReadTimeout(TIMEOUT_MILLIS);
+ http.connect();
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlot failed");
+ }
+
+ try {
+
+ int responsecode = http.getResponseCode();
+ if (responsecode != HttpURLConnection.HTTP_OK) {
+ // TODO: Remove this print
+ // System.out.println(responsecode);
+ throw new Error("Invalid response");
+ }
+
+ InputStream is = http.getInputStream();
+ if (is.available() > 0) {
+ DataInputStream dis = new DataInputStream(is);
+ int salt_length = dis.readInt();
+ byte [] tmp = new byte[salt_length];
+ dis.readFully(tmp);
+ salt = tmp;
+ return true;
+ } else {
+ return false;
+ }
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("getSalt failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlot failed");
+ }
+ }
+
+ /*
+ * API for putting a slot into the queue. Returns null on success.
+ * On failure, the server will send slots with newer sequence
+ * numbers.
+ */
+ public Slot[] putSlot(Slot slot, int max) throws ServerException {
+ URL url = null;
+ URLConnection con = null;
+ HttpURLConnection http = null;
+
+ try {
+ if (salt == null) {
+ if (!getSalt()) {
+ throw new ServerException("putSlot failed", ServerException.TypeSalt);
+ }
+ initCrypt();
+ }
+
+ long sequencenumber = slot.getSequenceNumber();
+ byte[] bytes = slot.encode(mac);
+ bytes = encryptCipher.doFinal(bytes);
+
+ url = buildRequest(true, sequencenumber, max);
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
+
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(bytes.length);
+ http.setDoOutput(true);
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.setReadTimeout(TIMEOUT_MILLIS);
+ http.connect();
+
+ OutputStream os = http.getOutputStream();
+ os.write(bytes);
+ os.flush();
+
+ // System.out.println("Bytes Sent: " + bytes.length);
+ } catch (ServerException e) {
+ throw e;
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("putSlot failed");
+ }
+
+
+
+ try {
+ InputStream is = http.getInputStream();
+ DataInputStream dis = new DataInputStream(is);
+ byte[] resptype = new byte[7];
+ dis.readFully(resptype);
+
+ if (Arrays.equals(resptype, "getslot".getBytes()))
+ return processSlots(dis);
+ else if (Arrays.equals(resptype, "putslot".getBytes()))
+ return null;
+ else
+ throw new Error("Bad response to putslot");
+
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("putSlot failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("putSlot failed");
+ }
+ }
+
+ /**
+ * Request the server to send all slots with the given
+ * sequencenumber or newer.
+ */
+ public Slot[] getSlots(long sequencenumber) throws ServerException {
+ URL url = null;
+ URLConnection con = null;
+ HttpURLConnection http = null;
+
+ try {
+ if (salt == null) {
+ if (!getSalt()) {
+ throw new ServerException("getSlots failed", ServerException.TypeSalt);
+ }
+ initCrypt();
+ }
+
+ url = buildRequest(false, sequencenumber, 0);
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.setReadTimeout(TIMEOUT_MILLIS);
+ http.connect();
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout);
+ } catch (ServerException e) {
+ throw e;
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlots failed");
+ }
+
+ try {
+ InputStream is = http.getInputStream();
+ DataInputStream dis = new DataInputStream(is);
+ byte[] resptype = new byte[7];
+ dis.readFully(resptype);
+ if (!Arrays.equals(resptype, "getslot".getBytes()))
+ throw new Error("Bad Response: " + new String(resptype));
+ else
+ return processSlots(dis);
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlots failed");
+ }
+ }
+
+ /**
+ * Method that actually handles building Slot objects from the
+ * server response. Shared by both putSlot and getSlots.
+ */
+ private Slot[] processSlots(DataInputStream dis) throws Exception {
+ int numberofslots = dis.readInt();
+ int[] sizesofslots = new int[numberofslots];
+
+ Slot[] slots = new Slot[numberofslots];
+ for (int i = 0; i < numberofslots; i++)
+ sizesofslots[i] = dis.readInt();
+
+ for (int i = 0; i < numberofslots; i++) {
+
+ byte[] data = new byte[sizesofslots[i]];
+ dis.readFully(data);
+
+ data = decryptCipher.doFinal(data);
+
+ slots[i] = Slot.decode(table, data, mac);
+ }
+ dis.close();
+ return slots;
+ }
+
+ public byte[] sendLocalData(byte[] sendData, String host, int port) {
+
+ if (salt == null) {
+ return null;
+ }
+ try {
+
+ System.out.println("Passing Locally");
+
+ mac.update(sendData);
+ byte[] genmac = mac.doFinal();
+ byte[] totalData = new byte[sendData.length + genmac.length];
+ System.arraycopy(sendData, 0, totalData, 0, sendData.length);
+ System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length);
+
+ // Encrypt the data for sending
+ // byte[] encryptedData = encryptCipher.doFinal(totalData);
+ byte[] encryptedData = encryptCipher.doFinal(totalData);
+
+ // Open a TCP socket connection to a local device
+ Socket socket = new Socket(host, port);
+ socket.setReuseAddress(true);
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+ DataInputStream input = new DataInputStream(socket.getInputStream());
+
+ // Send data to output (length of data, the data)
+ output.writeInt(encryptedData.length);
+ output.write(encryptedData, 0, encryptedData.length);
+ output.flush();
+
+ int lengthOfReturnData = input.readInt();
+ byte[] returnData = new byte[lengthOfReturnData];
+ input.readFully(returnData);
+ returnData = decryptCipher.doFinal(returnData);
+
+ // We are done with this socket
+ socket.close();
+
+ mac.update(returnData, 0, returnData.length - HMAC_SIZE);
+ byte[] realmac = mac.doFinal();
+ byte[] recmac = new byte[HMAC_SIZE];
+ System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length);
+
+ if (!Arrays.equals(recmac, realmac))
+ throw new Error("Local Error: Invalid HMAC! Potential Attack!");
+
+ byte[] returnData2 = new byte[lengthOfReturnData - recmac.length];
+ System.arraycopy(returnData, 0, returnData2, 0, returnData2.length);
+
+ return returnData2;
+ } catch (SocketTimeoutException e) {
+
+ } catch (BadPaddingException e) {
+
+ } catch (IllegalBlockSizeException e) {
+
+ } catch (UnknownHostException e) {
+
+ } catch (IOException e) {
+
+ }
+
+ return null;
+ }
+
+ private void localServerWorkerFunction() {
+
+ ServerSocket inputSocket = null;
+
+ try {
+ // Local server socket
+ inputSocket = new ServerSocket(listeningPort);
+ inputSocket.setReuseAddress(true);
+ inputSocket.setSoTimeout(TIMEOUT_MILLIS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local server setup failure...");
+ }
+
+ while (!doEnd) {
+
+ try {
+ // Accept incoming socket
+ Socket socket = inputSocket.accept();
+
+ DataInputStream input = new DataInputStream(socket.getInputStream());
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+
+ // Get the encrypted data from the server
+ int dataSize = input.readInt();
+ byte[] readData = new byte[dataSize];
+ input.readFully(readData);
+
+ // Decrypt the data
+ readData = decryptCipher.doFinal(readData);
+
+ mac.update(readData, 0, readData.length - HMAC_SIZE);
+ byte[] genmac = mac.doFinal();
+ byte[] recmac = new byte[HMAC_SIZE];
+ System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length);
+
+ if (!Arrays.equals(recmac, genmac))
+ throw new Error("Local Error: Invalid HMAC! Potential Attack!");
+
+ byte[] returnData = new byte[readData.length - recmac.length];
+ System.arraycopy(readData, 0, returnData, 0, returnData.length);
+
+ // Process the data
+ // byte[] sendData = table.acceptDataFromLocal(readData);
+ byte[] sendData = table.acceptDataFromLocal(returnData);
+
+ mac.update(sendData);
+ byte[] realmac = mac.doFinal();
+ byte[] totalData = new byte[sendData.length + realmac.length];
+ System.arraycopy(sendData, 0, totalData, 0, sendData.length);
+ System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length);
+
+ // Encrypt the data for sending
+ byte[] encryptedData = encryptCipher.doFinal(totalData);
+
+ // Send data to output (length of data, the data)
+ output.writeInt(encryptedData.length);
+ output.write(encryptedData, 0, encryptedData.length);
+ output.flush();
+
+ // close the socket
+ socket.close();
+ } catch (SocketTimeoutException e) {
+
+ } catch (BadPaddingException e) {
+
+ } catch (IllegalBlockSizeException e) {
+
+ } catch (UnknownHostException e) {
+
+ } catch (IOException e) {
+
+ }
+ }
+
+ if (inputSocket != null) {
+ try {
+ inputSocket.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local server close failure...");
+ }
+ }
+ }
+
+ public void close() {
+ doEnd = true;
+
+ if (localServerThread != null) {
+ try {
+ localServerThread.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local Server thread join issue...");
+ }
+ }
+
+ // System.out.println("Done Closing Cloud Comm");
+ }
+
+ protected void finalize() throws Throwable {
+ try {
+ close(); // close open files
+ } finally {
+ super.finalize();
+ }
+ }
+
+}
return true;
} catch (Exception e) {
// e.printStackTrace();
+
+ for (Long m : localCommunicationTable.keySet()) {
+ updateFromLocal(m);
+ }
}
return false;
// If there is a new key with same name then end
if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
- System.out.println("New Key Fail");
return false;
}
if (transaction.didSendAllParts()) {
transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
pendingTransactionQueue.remove(transaction);
-
- for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + slot.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
- }
}
}
} else {
} catch (ServerException e) {
- System.out.println("Server Failure: " + e.getType());
- for (Transaction transaction : transactionPartsSent.keySet()) {
- for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- System.out.println("Sent Error: " + kv + " " + e.getType());
- }
- }
-
if (e.getType() != ServerException.TypeInputTimeout) {
// e.printStackTrace();
return newKey == null;
}
- public synchronized boolean updateFromLocal(long machineId) {
+ private synchronized boolean updateFromLocal(long machineId) {
Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
if (localCommunicationInformation == null) {
// Cant talk to that device locally so do nothing
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
- for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- System.out.println("Arb Seen: " + kv + " " + lastSeqNumArbOn + " " + transactionSequenceNumber + " " + localMachineId + " " + transaction.getArbitrator());
- }
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
continue;
}
- for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- System.out.println("Arb Seen: " + kv + " " + lastSeqNumArbOn + " " + transactionSequenceNumber + " " + localMachineId);
- }
-
-
if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
// We have seen this already locally so dont commit again
continue;
break;
}
- for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- System.out.println("Arb on: " + kv + " " + lastSeqNumArbOn + " " + transactionSequenceNumber + " " + localMachineId);
- }
-
// update the largest transaction seen by arbitrator from server
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
// Update what the last transaction committed was for use in batch commit
lastTransactionCommitted = transactionSequenceNumber;
-
- System.out.println("Commit Generated: " + lastTransactionCommitted + " " + localMachineId);
} else {
// Guard evaluated was false so create abort
}
}
-
- for (KeyValue kv : commit.getKeyValueUpdateSet()) {
- System.out.println("Commit Seen: " + kv + " " + commit.getTransactionSequenceNumber() + " " + localMachineId);
- }
-
-
-
-
-
-
-
-
-
-
// Update the last arbitration data that we have seen so far
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
public class Test {
- public static final int NUMBER_OF_TESTS = 15;
+ public static final int NUMBER_OF_TESTS = 25;
public static void main(String[] args) throws ServerException {
if (args[0].equals("2")) {
test10();
} else if (args[0].equals("11")) {
test11();
+ } else if (args[0].equals("12")) {
+ test12();
+ } else if (args[0].equals("13")) {
+ test13();
+ }
+
+ else if (args[0].equals("14")) {
+ test14();
}
}
+ static void test14() throws ServerException {
+ TimingSingleton timer = TimingSingleton.getInstance();
+
+ boolean foundError = false;
+ long startTime = 0;
+ long endTime = 0;
+ List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+ // Setup the 2 clients
+ Table t1 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 321, -1);
+ t1.initTable();
+ System.out.println("T1 Ready");
+
+ // Make the Keys
+ System.out.println("Setting up keys");
+ startTime = System.nanoTime();
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+ String a = "a" + i;
+ IoTString ia = new IoTString(a);
+ t1.createNewKey(ia, 321);
+ }
+ endTime = System.nanoTime();
+ long keysDt = endTime - startTime;
+ long keysNet = timer.getTime();
+
+ System.out.println("Total Key Create Time: " + keysDt / 1000000);
+ System.out.println("Total Key Create Time Network: " + keysNet / 1000000);
+ System.out.println("Total Key Create Time no Network: " + (keysDt - keysNet) / 1000000);
+ System.out.println();
+ }
+
+
+ static void test13() throws ServerException {
+ TimingSingleton timer = TimingSingleton.getInstance();
+
+ boolean foundError = false;
+ long startTime = 0;
+ long endTime = 0;
+ List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+ // Setup the 2 clients
+ Table t1 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 321, -1);
+ t1.initTable();
+ System.out.println("T1 Ready");
+
+ Table t2 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 351, -1);
+ t2.update();
+ System.out.println("T2 Ready");
+
+ // Make the Keys
+ System.out.println("Setting up keys");
+ startTime = System.nanoTime();
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+ String a = "a" + i;
+ String b = "b" + i;
+ String c = "c" + i;
+ String d = "d" + i;
+ IoTString ia = new IoTString(a);
+ IoTString ib = new IoTString(b);
+ IoTString ic = new IoTString(c);
+ IoTString id = new IoTString(d);
+ t1.createNewKey(ia, 351);
+ t1.createNewKey(ib, 351);
+ t2.createNewKey(ic, 321);
+ t2.createNewKey(id, 321);
+ }
+ endTime = System.nanoTime();
+ long keysDt = endTime - startTime;
+ long keysNet = timer.getTime();
+
+ // Do Updates for the keys
+ System.out.println("Setting Key-Values...");
+ startTime = System.nanoTime();
+ for (int t = 0; t < 3; t++) {
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+ String keyA = "a" + i;
+ String keyB = "b" + i;
+ String keyC = "c" + i;
+ String keyD = "d" + i;
+ String valueA = "a" + i;
+ String valueB = "b" + i;
+ String valueC = "c" + i;
+ String valueD = "d" + i;
+
+ IoTString iKeyA = new IoTString(keyA);
+ IoTString iKeyB = new IoTString(keyB);
+ IoTString iKeyC = new IoTString(keyC);
+ IoTString iKeyD = new IoTString(keyD);
+ IoTString iValueA = new IoTString(valueA);
+ IoTString iValueB = new IoTString(valueB);
+ IoTString iValueC = new IoTString(valueC);
+ IoTString iValueD = new IoTString(valueD);
+
+
+ t1.startTransaction();
+ t1.addKV(iKeyA, iValueA);
+ transStatusList.add(t1.commitTransaction());
+
+ t1.startTransaction();
+ t1.addKV(iKeyB, iValueB);
+ transStatusList.add(t1.commitTransaction());
+
+ t2.startTransaction();
+ t2.addKV(iKeyC, iValueC);
+ transStatusList.add(t2.commitTransaction());
+
+ t2.startTransaction();
+ t2.addKV(iKeyD, iValueD);
+ transStatusList.add(t2.commitTransaction());
+
+ }
+ }
+ endTime = System.nanoTime();
+ long writesDt = endTime - startTime;
+ long writesNet = timer.getTime() - keysNet;
+
+ System.out.println("Updating Clients...");
+ startTime = System.nanoTime();
+ t1.update();
+ t2.update();
+ endTime = System.nanoTime();
+ long updatesDt = endTime - startTime;
+ long updatesNet = timer.getTime() - keysNet - writesNet;
+
+
+ System.out.println("Total Key Create Time: " + keysDt / 1000000);
+ System.out.println("Total Key Create Time Network: " + keysNet / 1000000);
+ System.out.println("Total Key Create Time no Network: " + (keysDt - keysNet) / 1000000);
+ System.out.println();
+ System.out.println("Total write Time: " + writesDt / 1000000);
+ System.out.println("Total write Time Network: " + writesNet / 1000000);
+ System.out.println("Total write Time no Network: " + (writesDt - writesNet) / 1000000);
+ System.out.println();
+ System.out.println("Total updates Time: " + updatesDt / 1000000);
+ System.out.println("Total updates Time Network: " + updatesNet / 1000000);
+ System.out.println("Total updates Time no Network: " + (updatesDt - updatesNet) / 1000000);
+ }
+
+
+ static void test12() throws ServerException {
+ TimingSingleton timer = TimingSingleton.getInstance();
+
+ boolean foundError = false;
+ long startTime = 0;
+ long endTime = 0;
+ List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+ // Setup the 2 clients
+ Table t1 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 321, -1);
+ t1.initTable();
+ System.out.println("T1 Ready");
+
+ Table t2 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 351, -1);
+ t2.update();
+ System.out.println("T2 Ready");
+
+ // Make the Keys
+ System.out.println("Setting up keys");
+ startTime = System.nanoTime();
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+ String a = "a" + i;
+ String b = "b" + i;
+ String c = "c" + i;
+ String d = "d" + i;
+ IoTString ia = new IoTString(a);
+ IoTString ib = new IoTString(b);
+ IoTString ic = new IoTString(c);
+ IoTString id = new IoTString(d);
+ t1.createNewKey(ia, 321);
+ t1.createNewKey(ib, 321);
+ t2.createNewKey(ic, 351);
+ t2.createNewKey(id, 351);
+ }
+ endTime = System.nanoTime();
+ long keysDt = endTime - startTime;
+ long keysNet = timer.getTime();
+
+ // Do Updates for the keys
+ System.out.println("Setting Key-Values...");
+ startTime = System.nanoTime();
+ for (int t = 0; t < 3; t++) {
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+ String keyA = "a" + i;
+ String keyB = "b" + i;
+ String keyC = "c" + i;
+ String keyD = "d" + i;
+ String valueA = "a" + i;
+ String valueB = "b" + i;
+ String valueC = "c" + i;
+ String valueD = "d" + i;
+
+ IoTString iKeyA = new IoTString(keyA);
+ IoTString iKeyB = new IoTString(keyB);
+ IoTString iKeyC = new IoTString(keyC);
+ IoTString iKeyD = new IoTString(keyD);
+ IoTString iValueA = new IoTString(valueA);
+ IoTString iValueB = new IoTString(valueB);
+ IoTString iValueC = new IoTString(valueC);
+ IoTString iValueD = new IoTString(valueD);
+
+
+ t1.startTransaction();
+ t1.addKV(iKeyA, iValueA);
+ transStatusList.add(t1.commitTransaction());
+
+ t1.startTransaction();
+ t1.addKV(iKeyB, iValueB);
+ transStatusList.add(t1.commitTransaction());
+
+ t2.startTransaction();
+ t2.addKV(iKeyC, iValueC);
+ transStatusList.add(t2.commitTransaction());
+
+ t2.startTransaction();
+ t2.addKV(iKeyD, iValueD);
+ transStatusList.add(t2.commitTransaction());
+
+ }
+ }
+ endTime = System.nanoTime();
+ long writesDt = endTime - startTime;
+ long writesNet = timer.getTime() - keysNet;
+
+ System.out.println("Updating Clients...");
+ startTime = System.nanoTime();
+ t1.update();
+ t2.update();
+ endTime = System.nanoTime();
+ long updatesDt = endTime - startTime;
+ long updatesNet = timer.getTime() - keysNet - writesNet;
+
+
+ System.out.println("Total Key Create Time: " + keysDt / 1000000);
+ System.out.println("Total Key Create Time Network: " + keysNet / 1000000);
+ System.out.println("Total Key Create Time no Network: " + (keysDt - keysNet) / 1000000);
+ System.out.println();
+ System.out.println("Total write Time: " + writesDt / 1000000);
+ System.out.println("Total write Time Network: " + writesNet / 1000000);
+ System.out.println("Total write Time no Network: " + (writesDt - writesNet) / 1000000);
+ System.out.println();
+ System.out.println("Total updates Time: " + updatesDt / 1000000);
+ System.out.println("Total updates Time Network: " + updatesNet / 1000000);
+ System.out.println("Total updates Time no Network: " + (updatesDt - updatesNet) / 1000000);
+ }
+
+
static void test11() {
boolean foundError = false;
transStatusList.add(t2.commitTransaction());
}
- while (t1.updateFromLocal(351) == false) {}
- while (t2.updateFromLocal(321) == false) {}
+ // while (t1.updateFromLocal(351) == false) {}
+ // while (t2.updateFromLocal(321) == false) {}
System.out.println("Updating...");
}
static void test8() {
+ TimingSingleton timer = TimingSingleton.getInstance();
+ long startTime = 0;
+ long endTime = 0;
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Make the Keys
System.out.println("Setting up keys");
+ startTime = System.nanoTime();
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
System.out.println(i);
} catch (Exception e) { }
}
}
+ endTime = System.nanoTime();
+ long keysDt = endTime - startTime;
+ long keysNet = timer.getTime();
+
// Do Updates for the keys
+ startTime = System.nanoTime();
System.out.println("Setting Key-Values...");
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
System.out.println(i);
t2.addKV(iKeyD, iValueD);
transStatusList.add(t2.commitTransaction());
}
+ endTime = System.nanoTime();
+ long writesDt = endTime - startTime;
+ long writesNet = timer.getTime() - keysNet;
+
+
System.out.println("Updating...");
+ startTime = System.nanoTime();
while (t1.update() == false) {}
while (t2.update() == false) {}
while (t1.update() == false) {}
while (t2.update() == false) {}
+ endTime = System.nanoTime();
+ long updatesDt = endTime - startTime;
+ long updatesNet = timer.getTime() - keysNet - writesNet;
- System.out.println("Checking Key-Values...");
- for (int i = 0; i < NUMBER_OF_TESTS; i++) {
-
- String keyA = "a" + i;
- String keyB = "b" + i;
- String keyC = "c" + i;
- String keyD = "d" + i;
- String valueA = "a" + i;
- String valueB = "b" + i;
- String valueC = "c" + i;
- String valueD = "d" + i;
-
- IoTString iKeyA = new IoTString(keyA);
- IoTString iKeyB = new IoTString(keyB);
- IoTString iKeyC = new IoTString(keyC);
- IoTString iKeyD = new IoTString(keyD);
- IoTString iValueA = new IoTString(valueA);
- IoTString iValueB = new IoTString(valueB);
- IoTString iValueC = new IoTString(valueC);
- IoTString iValueD = new IoTString(valueD);
-
-
- IoTString testValA1 = t1.getCommitted(iKeyA);
- IoTString testValB1 = t1.getCommitted(iKeyB);
- IoTString testValC1 = t1.getCommitted(iKeyC);
- IoTString testValD1 = t1.getCommitted(iKeyD);
-
- IoTString testValA2 = t2.getCommitted(iKeyA);
- IoTString testValB2 = t2.getCommitted(iKeyB);
- IoTString testValC2 = t2.getCommitted(iKeyC);
- IoTString testValD2 = t2.getCommitted(iKeyD);
-
- if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
- System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1);
- foundError = true;
- }
-
- if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
- System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1);
- foundError = true;
- }
-
- if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
- System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1);
- foundError = true;
- }
-
- if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
- System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1);
- foundError = true;
- }
-
-
- if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
- System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2);
- foundError = true;
- }
-
- if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
- System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2);
- foundError = true;
- }
-
- if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
- System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2);
- foundError = true;
- }
-
- if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
- System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2);
- foundError = true;
- }
- }
-
- int count = 0;
- for (TransactionStatus status : transStatusList) {
- if (status.getStatus() != TransactionStatus.StatusCommitted) {
- foundError = true;
- System.out.println("Status: " + status.getStatus() + " " + status.getTransactionSequenceNumber());
- }
-
- count++;
- }
- if (foundError) {
- System.out.println("Found Errors...");
- } else {
- System.out.println("No Errors Found...");
- }
+ System.out.println("Total Key Create Time: " + keysDt / 1000000);
+ System.out.println("Total Key Create Time Network: " + keysNet / 1000000);
+ System.out.println("Total Key Create Time no Network: " + (keysDt - keysNet) / 1000000);
+ System.out.println();
+ System.out.println("Total write Time: " + writesDt / 1000000);
+ System.out.println("Total write Time Network: " + writesNet / 1000000);
+ System.out.println("Total write Time no Network: " + (writesDt - writesNet) / 1000000);
+ System.out.println();
+ System.out.println("Total updates Time: " + updatesDt / 1000000);
+ System.out.println("Total updates Time Network: " + updatesNet / 1000000);
+ System.out.println("Total updates Time no Network: " + (updatesDt - updatesNet) / 1000000);
+
+
+
+
+ // System.out.println("Checking Key-Values...");
+ // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+ // String keyA = "a" + i;
+ // String keyB = "b" + i;
+ // String keyC = "c" + i;
+ // String keyD = "d" + i;
+ // String valueA = "a" + i;
+ // String valueB = "b" + i;
+ // String valueC = "c" + i;
+ // String valueD = "d" + i;
+
+ // IoTString iKeyA = new IoTString(keyA);
+ // IoTString iKeyB = new IoTString(keyB);
+ // IoTString iKeyC = new IoTString(keyC);
+ // IoTString iKeyD = new IoTString(keyD);
+ // IoTString iValueA = new IoTString(valueA);
+ // IoTString iValueB = new IoTString(valueB);
+ // IoTString iValueC = new IoTString(valueC);
+ // IoTString iValueD = new IoTString(valueD);
+
+
+ // IoTString testValA1 = t1.getCommitted(iKeyA);
+ // IoTString testValB1 = t1.getCommitted(iKeyB);
+ // IoTString testValC1 = t1.getCommitted(iKeyC);
+ // IoTString testValD1 = t1.getCommitted(iKeyD);
+
+ // IoTString testValA2 = t2.getCommitted(iKeyA);
+ // IoTString testValB2 = t2.getCommitted(iKeyB);
+ // IoTString testValC2 = t2.getCommitted(iKeyC);
+ // IoTString testValD2 = t2.getCommitted(iKeyD);
+
+ // if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+ // System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1);
+ // foundError = true;
+ // }
+
+ // if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
+ // System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1);
+ // foundError = true;
+ // }
+
+ // if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
+ // System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1);
+ // foundError = true;
+ // }
+
+ // if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+ // System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1);
+ // foundError = true;
+ // }
+
+
+ // if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+ // System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2);
+ // foundError = true;
+ // }
+
+ // if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
+ // System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2);
+ // foundError = true;
+ // }
+
+ // if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
+ // System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2);
+ // foundError = true;
+ // }
+
+ // if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+ // System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2);
+ // foundError = true;
+ // }
+ // }
+
+ // for (TransactionStatus status : transStatusList) {
+ // if (status.getStatus() != TransactionStatus.StatusCommitted) {
+ // foundError = true;
+ // }
+ // }
+
+ // if (foundError) {
+ // System.out.println("Found Errors...");
+ // } else {
+ // System.out.println("No Errors Found...");
+ // }
t1.close();
t2.close();
- System.out.println();
- System.out.println();
- t1.printSlots();
+ // System.out.println();
+ // System.out.println();
+ // t1.printSlots();
- System.out.println();
- System.out.println();
- t2.printSlots();
+ // System.out.println();
+ // System.out.println();
+ // t2.printSlots();
}
static void test7() throws ServerException {
}
static void test2() throws ServerException {
+ TimingSingleton timer = TimingSingleton.getInstance();
boolean foundError = false;
- long startTime = 0;
- long endTime = 0;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
+ Table t1 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
System.out.println("T1 Ready");
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
+ Table t2 = new Table("http://dc-6.calit2.uci.edu/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
System.out.println("T2 Ready");
// Make the Keys
System.out.println("Setting up keys");
- startTime = System.currentTimeMillis();
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
System.out.println(i);
String a = "a" + i;
t2.createNewKey(ic, 321);
t2.createNewKey(id, 351);
}
- endTime = System.currentTimeMillis();
- System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) );
- System.out.println("Time Taken Per Key: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) );
- System.out.println();
// Do Updates for the keys
System.out.println("Setting Key-Values...");
- startTime = System.currentTimeMillis();
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
System.out.println(i);
String keyA = "a" + i;
t1.startTransaction();
t1.addKV(iKeyA, iValueA);
transStatusList.add(t1.commitTransaction());
-
-
t1.startTransaction();
t1.addKV(iKeyB, iValueB);
transStatusList.add(t1.commitTransaction());
transStatusList.add(t2.commitTransaction());
}
- endTime = System.currentTimeMillis();
- System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) );
- System.out.println("Time Taken Per Update: " + (double) (((endTime - startTime) / 1000.0) / (NUMBER_OF_TESTS * 4)) );
- System.out.println();
-
System.out.println("Updating Clients...");
t1.update();
t2.update();
+
+
System.out.println("Checking Key-Values...");
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
--- /dev/null
+package iotcloud;
+
+
+class TimingSingleton {
+ private static TimingSingleton singleton = new TimingSingleton( );
+ private static long startTime = 0;
+
+ private static long totalTime = 0;
+
+ private TimingSingleton() {
+
+ }
+
+ public static TimingSingleton getInstance( ) {
+ return singleton;
+ }
+
+
+ public static void startTime( ) {
+ startTime = System.nanoTime();
+ }
+
+ public static void endTime( ) {
+ totalTime += System.nanoTime() - startTime;
+
+ }
+
+ public static long getTime( ) {
+ return totalTime;
+ }
+
+
+}
\ No newline at end of file
--- /dev/null
+((nil . ((indent-tabs-mode . t))))
+
--- /dev/null
+CPPFLAGS=-O0 -g -Wall
+
+all: iotcloud.fcgi
+
+iotcloud.fcgi: iotcloud.o iotquery.o
+ g++ $(CPPFLAGS) -o iotcloud.fcgi iotcloud.o iotquery.o -lfcgi -lfcgi++
+
+iotcloud.o: iotcloud.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotcloud.o iotcloud.cpp
+
+iotquery.o: iotquery.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotquery.o iotquery.cpp
+
+clean:
+ rm *.o iotcloud.fcgi
--- /dev/null
+1) Requires apache2
+2) Requires fastcgi (libapache2-mod-fastcgi and libfcgi-dev)
+
+Setup on ubuntu
+1) Install modules
+
+2) Add .htaccess file in /var/www/html
+RewriteEngine on
+RewriteBase /
+SetHandler cgi-script
+RewriteRule ^([a-zA-Z0-9._]*\.iotcloud/([a-zA-Z0-9._]*))$ /cgi-bin/iotcloud.fcgi/$1
+
+3) Create account directory. For example, create the directory test.iotcloud in /var/www/html
+ -- To password protect, create the following .htaccess file in the account directory:
+AuthType Basic
+AuthName "Private"
+AuthUserFile /var/www/html/foo.iotcloud/.htpasswd
+Require valid-user
+
+4) In apache2.conf, add to the /var/www directory section:
+AllowOverride FileInfo AuthConfig
+
+5) In the sites-enabled/000-default.conf file, add the line:
+SetEnv IOTCLOUD_ROOT /iotcloud/
+
+6) Create the /iotcloud directory.
+
+7) Create the account directory in the /iotcloud directory. For example, test.iotcloud and give it permissions that the apache daemon can write to.
+
+8) Compile cloud server by typing make
+
+9) Copy it to the cgi-bin directory.
--- /dev/null
+#include <iostream>
+#include "iotquery.h"
+
+using namespace std;
+
+
+int main(void) {
+ // Backup the stdio streambufs
+ streambuf * cin_streambuf = cin.rdbuf();
+ streambuf * cout_streambuf = cout.rdbuf();
+ streambuf * cerr_streambuf = cerr.rdbuf();
+
+ FCGX_Request request;
+
+ FCGX_Init();
+ FCGX_InitRequest(&request, 0, 0);
+
+ while (FCGX_Accept_r(&request) == 0) {
+ fcgi_streambuf cin_fcgi_streambuf(request.in);
+ fcgi_streambuf cout_fcgi_streambuf(request.out);
+ fcgi_streambuf cerr_fcgi_streambuf(request.err);
+
+ cin.rdbuf(&cin_fcgi_streambuf);
+ cout.rdbuf(&cout_fcgi_streambuf);
+ cerr.rdbuf(&cerr_fcgi_streambuf);
+
+ IoTQuery * iotquery=new IoTQuery(&request);
+ iotquery->processQuery();
+
+ delete iotquery;
+ }
+
+ // restore stdio streambufs
+ cin.rdbuf(cin_streambuf);
+ cout.rdbuf(cout_streambuf);
+ cerr.rdbuf(cerr_streambuf);
+
+ return 0;
+}
+
--- /dev/null
+#include "iotquery.h"
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
+
+using namespace std;
+
+const char * query_str = "QUERY_STRING";
+const char * uri_str = "REQUEST_URI";
+const char * method_str = "REQUEST_METHOD";
+const char * iotcloudroot_str = "IOTCLOUD_ROOT";
+const char * length_str = "CONTENT_LENGTH";
+
+IoTQuery::IoTQuery(FCGX_Request *request) :
+ request(request),
+ data(NULL),
+ directory(NULL),
+ uri(NULL),
+ query(NULL),
+ method(NULL),
+ iotcloudroot(NULL),
+ length(0),
+ oldestentry(0),
+ newestentry(0),
+ requestsequencenumber(0),
+ numqueueentries(DEFAULT_SIZE),
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false),
+ reqSetSalt(false),
+ reqGetSalt(false) {
+}
+
+IoTQuery::~IoTQuery() {
+ if (fd >= 0)
+ close(fd);
+ if (directory)
+ delete directory;
+ if (data)
+ delete data;
+}
+
+/**
+ * Returns true if the account directory exists.
+ */
+
+bool IoTQuery::checkDirectory() {
+ struct stat s;
+ int err = stat(directory, &s);
+ if (-1 == err)
+ return false;
+ return S_ISDIR(s.st_mode);
+}
+
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
+void IoTQuery::decodeQuery() {
+ int len = strlen(query);
+ char * str = new char[len + 1];
+ memcpy(str, query, len + 1);
+ char *tok_ptr = str;
+
+ /* Parse commands */
+ char *command = strsep(&tok_ptr, "&");
+ if (strncmp(command, "req=putslot", 11) == 0)
+ reqPutSlot = true;
+ else if (strncmp(command, "req=getslot", 11) == 0)
+ reqGetSlot = true;
+ else if (strncmp(command, "req=setsalt", 11) == 0)
+ reqSetSalt = true;
+ else if (strncmp(command, "req=getsalt", 11) == 0)
+ reqGetSalt = true;
+
+ /* Load Sequence Number for request */
+ char *sequencenumber_str = strsep(&tok_ptr, "&");
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
+ /* don't allow a really old sequence number */
+ if (requestsequencenumber < oldestentry)
+ requestsequencenumber = oldestentry;
+
+ /* Update size if we get request */
+ char * numqueueentries_str = tok_ptr;
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+ }
+
+ delete str;
+}
+
+/**
+ * Helper function to write data to file.
+ */
+
+void doWrite(int fd, char *data, long long length) {
+ long long offset = 0;
+ do {
+ long long byteswritten = write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
+ return;
+ }
+ } while (length != 0);
+}
+
+/** Helper function to read data from file. */
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset = 0;
+ char *ptr = (char *)buf;
+ do {
+ int bytesread = read(fd, ptr + offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes != 0);
+ return true;
+}
+
+/**
+ * Function that handles a getSlot request.
+ */
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index = 0;
+ for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+ char *filename = getSlotFileName(seqn);
+ if (stat(filename, &st) == 0) {
+ fdarray[index] = open(filename, O_RDONLY);
+ filesizes[index] = st.st_size;
+ numbytes += filesizes[index];
+ } else {
+ fdarray[index] = -1;
+ filesizes[index] = 0;
+ }
+ delete filename;
+ }
+ const char header[] = "getslot";
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes;
+ char * response = new char[size];
+ long long offset = 0;
+ memcpy(response, header, sizeof(header) - 1);
+ offset += sizeof(header) - 1;
+ int numreq = htonl(numrequeststosend);
+ memcpy(response + offset, &numreq, sizeof(numreq));
+ offset += sizeof(numrequeststosend);
+ for (int i = 0; i < numrequeststosend; i++) {
+ int filesize = htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
+ offset += sizeof(int);
+ }
+
+ /* Read the file data into the buffer */
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0) {
+ doRead(fdarray[i], response + offset, filesizes[i]);
+ offset += filesizes[i];
+ }
+ }
+
+ /* Send the response out to the webserver. */
+ sendResponse(response, size);
+
+ /* Delete the response buffer and close the files. */
+ delete response;
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
+}
+
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+void IoTQuery::setSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(saltfd, data, length);
+ char response[0];
+ sendResponse(response, 0);
+ close(saltfd);
+ delete filename;
+}
+
+/**
+ * The method getSalt handles a getSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int filesize = 0;
+ struct stat st;
+ if (stat(filename, &st) == 0) {
+ filesize = st.st_size;
+ } else {
+ char response[0];
+ sendResponse(response, 0);
+ delete filename;
+ return;
+ }
+ int saltfd = open(filename, O_RDONLY);
+ int responsesize = filesize + sizeof(int);
+ char * response = new char[responsesize];
+ doRead(saltfd, response + sizeof(int), filesize);
+ int n_filesize = htonl(filesize);
+ *((int*) response) = n_filesize;
+ sendResponse(response, responsesize);
+ close(saltfd);
+ delete filename;
+ delete response;
+}
+
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
+void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
+ if (requestsequencenumber != (newestentry + 1)) {
+ getSlot();
+ return;
+ }
+
+ if (requestsequencenumber == 150)
+ {
+ /* Send response acknowledging success */
+ char command[] = "putslot";
+ sendResponse(command, sizeof(command) - 1);
+ return;
+ }
+
+ /* See if we have too many slots and if so, delete the old one */
+ int numberofliveslots = (int) ((newestentry - oldestentry) + 1);
+ if (numberofliveslots >= numqueueentries) {
+ removeOldestSlot();
+ }
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
+ char command[] = "putslot";
+ sendResponse(command, sizeof(command) - 1);
+}
+
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+ cout << flush;
+}
+
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
+ int directorylen = strlen(directory);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename = new char[25 + directorylen];
+ snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum);
+ return filename;
+}
+
+/**
+ * Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+ int directorylen = strlen(directory);
+
+ /* Size is 4 characters for SALT string + 1 character for null
+ termination + directory size*/
+
+ char * filename = new char[6 + directorylen];
+ snprintf(filename, 6 + directorylen, "%s/SALT", directory);
+ return filename;
+}
+
+/**
+ * Removes the oldest slot file
+ */
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry != 0) {
+ char * filename = getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
+}
+
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
+void IoTQuery::processQuery() {
+ getQuery();
+ getDirectory();
+ // readData();
+ if (!readData())
+ {
+ cerr << "No Data Available" << endl;
+ return;
+ }
+
+
+ /* Verify that we receive a post request. */
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
+ return;
+ }
+
+ /* Make sure the directory is okay. */
+ if (directory == NULL ||
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
+ return;
+ }
+
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
+ return;
+ }
+
+ /* Lock status file to keep other requests out. */
+ flock(fd, LOCK_EX);
+
+ /* Decode query. */
+ decodeQuery();
+
+ /* Handle request. */
+ if (reqGetSlot)
+ getSlot();
+ else if (reqPutSlot)
+ putSlot();
+ else if (reqSetSalt)
+ setSalt();
+ else if (reqGetSalt)
+ getSalt();
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
+}
+
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
+bool IoTQuery::readData() {
+ if (length != 0) {
+ data = new char[length + 1];
+ memset(data, 0, length + 1);
+ cin.read(data, length);
+ }
+
+ do {
+ char dummy;
+ cin >> dummy;
+ } while (!cin.eof());
+
+ if (length != 0)
+ {
+ if (cin.gcount() != length)
+ {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
+void IoTQuery::getQuery() {
+ uri = FCGX_GetParam(uri_str, request->envp);
+ query = FCGX_GetParam(query_str, request->envp);
+ method = FCGX_GetParam(method_str, request->envp);
+ iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+ /** We require the content-length header to be sent. */
+ char * reqlength = FCGX_GetParam(length_str, request->envp);
+ if (reqlength) {
+ length = strtoll(reqlength, NULL, 10);
+ } else {
+ length = 0;
+ }
+}
+
+/**
+ * Initializes directory field from environmental variables.
+ */
+
+void IoTQuery::getDirectory() {
+ char * split = strchr((char *)uri, '?');
+ if (split == NULL)
+ return;
+ int split_len = (int) (split - uri);
+ int rootdir_len = strlen(iotcloudroot);
+ int directory_len = split_len + rootdir_len + 1;
+ directory = new char[directory_len];
+ memcpy(directory, iotcloudroot, rootdir_len);
+ memcpy(directory + rootdir_len, uri, split_len);
+ directory[directory_len - 1] = 0;
+}
+
+/**
+ * Helper function that is used to read the status file.
+ */
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+ do {
+ size_t bytesread = pread(fd, ptr, count, offset);
+ if (bytesread == count) {
+ return 1;
+ } else if (bytesread == 0) {
+ return 0;
+ }
+ } while (1);
+}
+
+
+/**
+ * Writes the current state to the status file.
+ */
+
+void IoTQuery::updateStatusFile() {
+ pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
+bool IoTQuery::openStatusFile() {
+ char statusfile[] = "queuestatus";
+ int len = strlen(directory);
+
+ char * filename = new char[len + sizeof(statusfile) + 2];
+ memcpy(filename, directory, len);
+ filename[len] = '/';
+ memcpy(filename + len + 1, statusfile, sizeof(statusfile));
+ filename[len + sizeof(statusfile) + 1] = 0;
+ fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
+ delete filename;
+
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
+ return false;
+ }
+
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
+ int size;
+ int needwrite = 0;
+ if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+ numqueueentries = size;
+ else
+ needwrite = 1;
+
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry = entry;
+ else
+ needwrite = 1;
+
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry = entry;
+ else
+ needwrite = 1;
+
+ if (needwrite)
+ updateStatusFile();
+
+ return true;
+}
+
+
--- /dev/null
+#ifndef IOTQUERY_H
+#define IOTQUERY_H
+#include <iostream>
+#include "fcgio.h"
+#include "fcgi_stdio.h"
+
+#define DEFAULT_SIZE 128
+#define OFFSET_MAX 0
+#define OFFSET_OLD 4
+#define OFFSET_NEW 12
+
+class IoTQuery {
+public:
+ IoTQuery(FCGX_Request * request);
+ ~IoTQuery();
+ void processQuery();
+
+private:
+ void sendResponse(char *data, int length);
+ void getQuery();
+ void getDirectory();
+ bool readData();
+ bool checkDirectory();
+ bool openStatusFile();
+ void updateStatusFile();
+ void decodeQuery();
+ void getSlot();
+ void putSlot();
+ void setSalt();
+ void getSalt();
+ void removeOldestSlot();
+ char * getSlotFileName(long long);
+ char * getSaltFileName();
+
+ FCGX_Request * request;
+ char *data;
+ /* Directory slot files are placed in. */
+ char *directory;
+ /* Full URI from Apache */
+ const char * uri;
+ /* Query portion of URI */
+ const char * query;
+ /* Type of request: GET or PUT */
+ const char * method;
+ /* Root directory for all accounts */
+ const char * iotcloudroot;
+ /* Expected length of data from client */
+ long long length;
+ /* Sequence number for oldest slot */
+ long long oldestentry;
+ /* Sequence number for newest slot */
+ long long newestentry;
+ /* Sequence number from request */
+ long long requestsequencenumber;
+ /* Size of queue */
+ int numqueueentries;
+ /* fd for queuestatus file */
+ int fd;
+ /* Is the request to get a slot? */
+ bool reqGetSlot;
+ /* Is the request to put a slot? */
+ bool reqPutSlot;
+ /* Is the request to set the salt? */
+ bool reqSetSalt;
+ /* Is the request to get the salt? */
+ bool reqGetSalt;
+};
+#endif
--- /dev/null
+((nil . ((indent-tabs-mode . t))))
+
--- /dev/null
+CPPFLAGS=-O0 -g -Wall
+
+all: iotcloud.fcgi
+
+iotcloud.fcgi: iotcloud.o iotquery.o
+ g++ $(CPPFLAGS) -o iotcloud.fcgi iotcloud.o iotquery.o -lfcgi -lfcgi++
+
+iotcloud.o: iotcloud.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotcloud.o iotcloud.cpp
+
+iotquery.o: iotquery.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotquery.o iotquery.cpp
+
+clean:
+ rm *.o iotcloud.fcgi
--- /dev/null
+1) Requires apache2
+2) Requires fastcgi (libapache2-mod-fastcgi and libfcgi-dev)
+
+Setup on ubuntu
+1) Install modules
+
+2) Add .htaccess file in /var/www/html
+RewriteEngine on
+RewriteBase /
+SetHandler cgi-script
+RewriteRule ^([a-zA-Z0-9._]*\.iotcloud/([a-zA-Z0-9._]*))$ /cgi-bin/iotcloud.fcgi/$1
+
+3) Create account directory. For example, create the directory test.iotcloud in /var/www/html
+ -- To password protect, create the following .htaccess file in the account directory:
+AuthType Basic
+AuthName "Private"
+AuthUserFile /var/www/html/foo.iotcloud/.htpasswd
+Require valid-user
+
+4) In apache2.conf, add to the /var/www directory section:
+AllowOverride FileInfo AuthConfig
+
+5) In the sites-enabled/000-default.conf file, add the line:
+SetEnv IOTCLOUD_ROOT /iotcloud/
+
+6) Create the /iotcloud directory.
+
+7) Create the account directory in the /iotcloud directory. For example, test.iotcloud and give it permissions that the apache daemon can write to.
+
+8) Compile cloud server by typing make
+
+9) Copy it to the cgi-bin directory.
--- /dev/null
+#include <iostream>
+#include "iotquery.h"
+
+using namespace std;
+
+
+int main(void) {
+ // Backup the stdio streambufs
+ streambuf * cin_streambuf = cin.rdbuf();
+ streambuf * cout_streambuf = cout.rdbuf();
+ streambuf * cerr_streambuf = cerr.rdbuf();
+
+ FCGX_Request request;
+
+ FCGX_Init();
+ FCGX_InitRequest(&request, 0, 0);
+
+ while (FCGX_Accept_r(&request) == 0) {
+ fcgi_streambuf cin_fcgi_streambuf(request.in);
+ fcgi_streambuf cout_fcgi_streambuf(request.out);
+ fcgi_streambuf cerr_fcgi_streambuf(request.err);
+
+ cin.rdbuf(&cin_fcgi_streambuf);
+ cout.rdbuf(&cout_fcgi_streambuf);
+ cerr.rdbuf(&cerr_fcgi_streambuf);
+
+ IoTQuery * iotquery=new IoTQuery(&request);
+ iotquery->processQuery();
+
+ delete iotquery;
+ }
+
+ // restore stdio streambufs
+ cin.rdbuf(cin_streambuf);
+ cout.rdbuf(cout_streambuf);
+ cerr.rdbuf(cerr_streambuf);
+
+ return 0;
+}
+
--- /dev/null
+#include "iotquery.h"
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
+
+using namespace std;
+
+const char * query_str = "QUERY_STRING";
+const char * uri_str = "REQUEST_URI";
+const char * method_str = "REQUEST_METHOD";
+const char * iotcloudroot_str = "IOTCLOUD_ROOT";
+const char * length_str = "CONTENT_LENGTH";
+
+IoTQuery::IoTQuery(FCGX_Request *request) :
+ request(request),
+ data(NULL),
+ directory(NULL),
+ uri(NULL),
+ query(NULL),
+ method(NULL),
+ iotcloudroot(NULL),
+ length(0),
+ oldestentry(0),
+ newestentry(0),
+ requestsequencenumber(0),
+ numqueueentries(DEFAULT_SIZE),
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false),
+ reqSetSalt(false),
+ reqGetSalt(false) {
+}
+
+IoTQuery::~IoTQuery() {
+ if (fd >= 0)
+ close(fd);
+ if (directory)
+ delete directory;
+ if (data)
+ delete data;
+}
+
+/**
+ * Returns true if the account directory exists.
+ */
+
+bool IoTQuery::checkDirectory() {
+ struct stat s;
+ int err = stat(directory, &s);
+ if (-1 == err)
+ return false;
+ return S_ISDIR(s.st_mode);
+}
+
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
+void IoTQuery::decodeQuery() {
+ int len = strlen(query);
+ char * str = new char[len + 1];
+ memcpy(str, query, len + 1);
+ char *tok_ptr = str;
+
+ /* Parse commands */
+ char *command = strsep(&tok_ptr, "&");
+ if (strncmp(command, "req=putslot", 11) == 0)
+ reqPutSlot = true;
+ else if (strncmp(command, "req=getslot", 11) == 0)
+ reqGetSlot = true;
+ else if (strncmp(command, "req=setsalt", 11) == 0)
+ reqSetSalt = true;
+ else if (strncmp(command, "req=getsalt", 11) == 0)
+ reqGetSalt = true;
+
+ /* Load Sequence Number for request */
+ char *sequencenumber_str = strsep(&tok_ptr, "&");
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
+ /* don't allow a really old sequence number */
+ if (requestsequencenumber < oldestentry)
+ requestsequencenumber = oldestentry;
+
+ /* Update size if we get request */
+ char * numqueueentries_str = tok_ptr;
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+ }
+
+ delete str;
+}
+
+/**
+ * Helper function to write data to file.
+ */
+
+void doWrite(int fd, char *data, long long length) {
+ long long offset = 0;
+ do {
+ long long byteswritten = write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
+ return;
+ }
+ } while (length != 0);
+}
+
+/** Helper function to read data from file. */
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset = 0;
+ char *ptr = (char *)buf;
+ do {
+ int bytesread = read(fd, ptr + offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes != 0);
+ return true;
+}
+
+/**
+ * Function that handles a getSlot request.
+ */
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index = 0;
+ for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+ char *filename = getSlotFileName(seqn);
+ if (stat(filename, &st) == 0) {
+ fdarray[index] = open(filename, O_RDONLY);
+ filesizes[index] = st.st_size;
+ numbytes += filesizes[index];
+ } else {
+ fdarray[index] = -1;
+ filesizes[index] = 0;
+ }
+ delete filename;
+ }
+ const char header[] = "getslot";
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes;
+ char * response = new char[size];
+ long long offset = 0;
+ memcpy(response, header, sizeof(header) - 1);
+ offset += sizeof(header) - 1;
+ int numreq = htonl(numrequeststosend);
+ memcpy(response + offset, &numreq, sizeof(numreq));
+ offset += sizeof(numrequeststosend);
+ for (int i = 0; i < numrequeststosend; i++) {
+ int filesize = htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
+ offset += sizeof(int);
+ }
+
+ /* Read the file data into the buffer */
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0) {
+ doRead(fdarray[i], response + offset, filesizes[i]);
+ offset += filesizes[i];
+ }
+ }
+
+ /* Send the response out to the webserver. */
+ sendResponse(response, size);
+
+ /* Delete the response buffer and close the files. */
+ delete response;
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
+}
+
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+void IoTQuery::setSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(saltfd, data, length);
+ char response[0];
+ sendResponse(response, 0);
+ close(saltfd);
+ delete filename;
+}
+
+/**
+ * The method getSalt handles a getSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int filesize = 0;
+ struct stat st;
+ if (stat(filename, &st) == 0) {
+ filesize = st.st_size;
+ } else {
+ char response[0];
+ sendResponse(response, 0);
+ delete filename;
+ return;
+ }
+ int saltfd = open(filename, O_RDONLY);
+ int responsesize = filesize + sizeof(int);
+ char * response = new char[responsesize];
+ doRead(saltfd, response + sizeof(int), filesize);
+ int n_filesize = htonl(filesize);
+ *((int*) response) = n_filesize;
+ sendResponse(response, responsesize);
+ close(saltfd);
+ delete filename;
+ delete response;
+}
+
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
+void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
+ if (requestsequencenumber == 150)
+ {
+ if (requestsequencenumber != (newestentry + 1)) {
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ unlink(filename);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+
+ getSlot();
+ return;
+ }
+ }
+
+
+
+ if (requestsequencenumber != (newestentry + 1)) {
+ getSlot();
+ return;
+ }
+
+ /* See if we have too many slots and if so, delete the old one */
+ int numberofliveslots = (int) ((newestentry - oldestentry) + 1);
+ if (numberofliveslots >= numqueueentries) {
+ removeOldestSlot();
+ }
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
+ char command[] = "putslot";
+ sendResponse(command, sizeof(command) - 1);
+}
+
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+ cout << flush;
+}
+
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
+ int directorylen = strlen(directory);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename = new char[25 + directorylen];
+ snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum);
+ return filename;
+}
+
+/**
+ * Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+ int directorylen = strlen(directory);
+
+ /* Size is 4 characters for SALT string + 1 character for null
+ termination + directory size*/
+
+ char * filename = new char[6 + directorylen];
+ snprintf(filename, 6 + directorylen, "%s/SALT", directory);
+ return filename;
+}
+
+/**
+ * Removes the oldest slot file
+ */
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry != 0) {
+ char * filename = getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
+}
+
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
+void IoTQuery::processQuery() {
+ getQuery();
+ getDirectory();
+ // readData();
+ if (!readData())
+ {
+ cerr << "No Data Available" << endl;
+ return;
+ }
+
+
+ /* Verify that we receive a post request. */
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
+ return;
+ }
+
+ /* Make sure the directory is okay. */
+ if (directory == NULL ||
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
+ return;
+ }
+
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
+ return;
+ }
+
+ /* Lock status file to keep other requests out. */
+ flock(fd, LOCK_EX);
+
+ /* Decode query. */
+ decodeQuery();
+
+ /* Handle request. */
+ if (reqGetSlot)
+ getSlot();
+ else if (reqPutSlot)
+ putSlot();
+ else if (reqSetSalt)
+ setSalt();
+ else if (reqGetSalt)
+ getSalt();
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
+}
+
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
+bool IoTQuery::readData() {
+ if (length != 0) {
+ data = new char[length + 1];
+ memset(data, 0, length + 1);
+ cin.read(data, length);
+ }
+
+ do {
+ char dummy;
+ cin >> dummy;
+ } while (!cin.eof());
+
+ if (length != 0)
+ {
+ if (cin.gcount() != length)
+ {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
+void IoTQuery::getQuery() {
+ uri = FCGX_GetParam(uri_str, request->envp);
+ query = FCGX_GetParam(query_str, request->envp);
+ method = FCGX_GetParam(method_str, request->envp);
+ iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+ /** We require the content-length header to be sent. */
+ char * reqlength = FCGX_GetParam(length_str, request->envp);
+ if (reqlength) {
+ length = strtoll(reqlength, NULL, 10);
+ } else {
+ length = 0;
+ }
+}
+
+/**
+ * Initializes directory field from environmental variables.
+ */
+
+void IoTQuery::getDirectory() {
+ char * split = strchr((char *)uri, '?');
+ if (split == NULL)
+ return;
+ int split_len = (int) (split - uri);
+ int rootdir_len = strlen(iotcloudroot);
+ int directory_len = split_len + rootdir_len + 1;
+ directory = new char[directory_len];
+ memcpy(directory, iotcloudroot, rootdir_len);
+ memcpy(directory + rootdir_len, uri, split_len);
+ directory[directory_len - 1] = 0;
+}
+
+/**
+ * Helper function that is used to read the status file.
+ */
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+ do {
+ size_t bytesread = pread(fd, ptr, count, offset);
+ if (bytesread == count) {
+ return 1;
+ } else if (bytesread == 0) {
+ return 0;
+ }
+ } while (1);
+}
+
+
+/**
+ * Writes the current state to the status file.
+ */
+
+void IoTQuery::updateStatusFile() {
+ pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
+bool IoTQuery::openStatusFile() {
+ char statusfile[] = "queuestatus";
+ int len = strlen(directory);
+
+ char * filename = new char[len + sizeof(statusfile) + 2];
+ memcpy(filename, directory, len);
+ filename[len] = '/';
+ memcpy(filename + len + 1, statusfile, sizeof(statusfile));
+ filename[len + sizeof(statusfile) + 1] = 0;
+ fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
+ delete filename;
+
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
+ return false;
+ }
+
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
+ int size;
+ int needwrite = 0;
+ if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+ numqueueentries = size;
+ else
+ needwrite = 1;
+
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry = entry;
+ else
+ needwrite = 1;
+
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry = entry;
+ else
+ needwrite = 1;
+
+ if (needwrite)
+ updateStatusFile();
+
+ return true;
+}
+
+
--- /dev/null
+#ifndef IOTQUERY_H
+#define IOTQUERY_H
+#include <iostream>
+#include "fcgio.h"
+#include "fcgi_stdio.h"
+
+#define DEFAULT_SIZE 128
+#define OFFSET_MAX 0
+#define OFFSET_OLD 4
+#define OFFSET_NEW 12
+
+class IoTQuery {
+public:
+ IoTQuery(FCGX_Request * request);
+ ~IoTQuery();
+ void processQuery();
+
+private:
+ void sendResponse(char *data, int length);
+ void getQuery();
+ void getDirectory();
+ bool readData();
+ bool checkDirectory();
+ bool openStatusFile();
+ void updateStatusFile();
+ void decodeQuery();
+ void getSlot();
+ void putSlot();
+ void setSalt();
+ void getSalt();
+ void removeOldestSlot();
+ char * getSlotFileName(long long);
+ char * getSaltFileName();
+
+ FCGX_Request * request;
+ char *data;
+ /* Directory slot files are placed in. */
+ char *directory;
+ /* Full URI from Apache */
+ const char * uri;
+ /* Query portion of URI */
+ const char * query;
+ /* Type of request: GET or PUT */
+ const char * method;
+ /* Root directory for all accounts */
+ const char * iotcloudroot;
+ /* Expected length of data from client */
+ long long length;
+ /* Sequence number for oldest slot */
+ long long oldestentry;
+ /* Sequence number for newest slot */
+ long long newestentry;
+ /* Sequence number from request */
+ long long requestsequencenumber;
+ /* Size of queue */
+ int numqueueentries;
+ /* fd for queuestatus file */
+ int fd;
+ /* Is the request to get a slot? */
+ bool reqGetSlot;
+ /* Is the request to put a slot? */
+ bool reqPutSlot;
+ /* Is the request to set the salt? */
+ bool reqSetSalt;
+ /* Is the request to get the salt? */
+ bool reqGetSalt;
+};
+#endif
--- /dev/null
+((nil . ((indent-tabs-mode . t))))
+
--- /dev/null
+CPPFLAGS=-O0 -g -Wall
+
+all: iotcloud.fcgi
+
+iotcloud.fcgi: iotcloud.o iotquery.o
+ g++ $(CPPFLAGS) -o iotcloud.fcgi iotcloud.o iotquery.o -lfcgi -lfcgi++
+
+iotcloud.o: iotcloud.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotcloud.o iotcloud.cpp
+
+iotquery.o: iotquery.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotquery.o iotquery.cpp
+
+clean:
+ rm *.o iotcloud.fcgi
--- /dev/null
+1) Requires apache2
+2) Requires fastcgi (libapache2-mod-fastcgi and libfcgi-dev)
+
+Setup on ubuntu
+1) Install modules
+
+2) Add .htaccess file in /var/www/html
+RewriteEngine on
+RewriteBase /
+SetHandler cgi-script
+RewriteRule ^([a-zA-Z0-9._]*\.iotcloud/([a-zA-Z0-9._]*))$ /cgi-bin/iotcloud.fcgi/$1
+
+3) Create account directory. For example, create the directory test.iotcloud in /var/www/html
+ -- To password protect, create the following .htaccess file in the account directory:
+AuthType Basic
+AuthName "Private"
+AuthUserFile /var/www/html/foo.iotcloud/.htpasswd
+Require valid-user
+
+4) In apache2.conf, add to the /var/www directory section:
+AllowOverride FileInfo AuthConfig
+
+5) In the sites-enabled/000-default.conf file, add the line:
+SetEnv IOTCLOUD_ROOT /iotcloud/
+
+6) Create the /iotcloud directory.
+
+7) Create the account directory in the /iotcloud directory. For example, test.iotcloud and give it permissions that the apache daemon can write to.
+
+8) Compile cloud server by typing make
+
+9) Copy it to the cgi-bin directory.
--- /dev/null
+#include <iostream>
+#include "iotquery.h"
+
+using namespace std;
+
+
+int main(void) {
+ // Backup the stdio streambufs
+ streambuf * cin_streambuf = cin.rdbuf();
+ streambuf * cout_streambuf = cout.rdbuf();
+ streambuf * cerr_streambuf = cerr.rdbuf();
+
+ FCGX_Request request;
+
+ FCGX_Init();
+ FCGX_InitRequest(&request, 0, 0);
+
+ while (FCGX_Accept_r(&request) == 0) {
+ fcgi_streambuf cin_fcgi_streambuf(request.in);
+ fcgi_streambuf cout_fcgi_streambuf(request.out);
+ fcgi_streambuf cerr_fcgi_streambuf(request.err);
+
+ cin.rdbuf(&cin_fcgi_streambuf);
+ cout.rdbuf(&cout_fcgi_streambuf);
+ cerr.rdbuf(&cerr_fcgi_streambuf);
+
+ IoTQuery * iotquery=new IoTQuery(&request);
+ iotquery->processQuery();
+
+ delete iotquery;
+ }
+
+ // restore stdio streambufs
+ cin.rdbuf(cin_streambuf);
+ cout.rdbuf(cout_streambuf);
+ cerr.rdbuf(cerr_streambuf);
+
+ return 0;
+}
+
--- /dev/null
+#include "iotquery.h"
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
+
+using namespace std;
+
+const char * query_str = "QUERY_STRING";
+const char * uri_str = "REQUEST_URI";
+const char * method_str = "REQUEST_METHOD";
+const char * iotcloudroot_str = "IOTCLOUD_ROOT";
+const char * length_str = "CONTENT_LENGTH";
+
+IoTQuery::IoTQuery(FCGX_Request *request) :
+ request(request),
+ data(NULL),
+ directory(NULL),
+ uri(NULL),
+ query(NULL),
+ method(NULL),
+ iotcloudroot(NULL),
+ length(0),
+ oldestentry(0),
+ newestentry(0),
+ requestsequencenumber(0),
+ numqueueentries(DEFAULT_SIZE),
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false),
+ reqSetSalt(false),
+ reqGetSalt(false) {
+}
+
+IoTQuery::~IoTQuery() {
+ if (fd >= 0)
+ close(fd);
+ if (directory)
+ delete directory;
+ if (data)
+ delete data;
+}
+
+/**
+ * Returns true if the account directory exists.
+ */
+
+bool IoTQuery::checkDirectory() {
+ struct stat s;
+ int err = stat(directory, &s);
+ if (-1 == err)
+ return false;
+ return S_ISDIR(s.st_mode);
+}
+
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
+void IoTQuery::decodeQuery() {
+ int len = strlen(query);
+ char * str = new char[len + 1];
+ memcpy(str, query, len + 1);
+ char *tok_ptr = str;
+
+ /* Parse commands */
+ char *command = strsep(&tok_ptr, "&");
+ if (strncmp(command, "req=putslot", 11) == 0)
+ reqPutSlot = true;
+ else if (strncmp(command, "req=getslot", 11) == 0)
+ reqGetSlot = true;
+ else if (strncmp(command, "req=setsalt", 11) == 0)
+ reqSetSalt = true;
+ else if (strncmp(command, "req=getsalt", 11) == 0)
+ reqGetSalt = true;
+
+ /* Load Sequence Number for request */
+ char *sequencenumber_str = strsep(&tok_ptr, "&");
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
+ /* don't allow a really old sequence number */
+ if (requestsequencenumber < oldestentry)
+ requestsequencenumber = oldestentry;
+
+ /* Update size if we get request */
+ char * numqueueentries_str = tok_ptr;
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+ }
+
+ delete str;
+}
+
+/**
+ * Helper function to write data to file.
+ */
+
+void doWrite(int fd, char *data, long long length) {
+ long long offset = 0;
+ do {
+ long long byteswritten = write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
+ return;
+ }
+ } while (length != 0);
+}
+
+/** Helper function to read data from file. */
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset = 0;
+ char *ptr = (char *)buf;
+ do {
+ int bytesread = read(fd, ptr + offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes != 0);
+ return true;
+}
+
+/**
+ * Function that handles a getSlot request.
+ */
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index = 0;
+ for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+
+ int seqnt = seqn;
+ if (seqn > 150)
+ {
+ seqnt--;
+ }
+
+ char *filename = getSlotFileName(seqnt);
+ if (stat(filename, &st) == 0) {
+ fdarray[index] = open(filename, O_RDONLY);
+ filesizes[index] = st.st_size;
+ numbytes += filesizes[index];
+ } else {
+ fdarray[index] = -1;
+ filesizes[index] = 0;
+ }
+ delete filename;
+ }
+ const char header[] = "getslot";
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes;
+ char * response = new char[size];
+ long long offset = 0;
+ memcpy(response, header, sizeof(header) - 1);
+ offset += sizeof(header) - 1;
+ int numreq = htonl(numrequeststosend);
+ memcpy(response + offset, &numreq, sizeof(numreq));
+ offset += sizeof(numrequeststosend);
+ for (int i = 0; i < numrequeststosend; i++) {
+ int filesize = htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
+ offset += sizeof(int);
+ }
+
+ /* Read the file data into the buffer */
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0) {
+ doRead(fdarray[i], response + offset, filesizes[i]);
+ offset += filesizes[i];
+ }
+ }
+
+ /* Send the response out to the webserver. */
+ sendResponse(response, size);
+
+ /* Delete the response buffer and close the files. */
+ delete response;
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
+}
+
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+void IoTQuery::setSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(saltfd, data, length);
+ char response[0];
+ sendResponse(response, 0);
+ close(saltfd);
+ delete filename;
+}
+
+/**
+ * The method getSalt handles a getSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int filesize = 0;
+ struct stat st;
+ if (stat(filename, &st) == 0) {
+ filesize = st.st_size;
+ } else {
+ char response[0];
+ sendResponse(response, 0);
+ delete filename;
+ return;
+ }
+ int saltfd = open(filename, O_RDONLY);
+ int responsesize = filesize + sizeof(int);
+ char * response = new char[responsesize];
+ doRead(saltfd, response + sizeof(int), filesize);
+ int n_filesize = htonl(filesize);
+ *((int*) response) = n_filesize;
+ sendResponse(response, responsesize);
+ close(saltfd);
+ delete filename;
+ delete response;
+}
+
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
+void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
+ if (requestsequencenumber != (newestentry + 1)) {
+ getSlot();
+ return;
+ }
+
+ /* See if we have too many slots and if so, delete the old one */
+ int numberofliveslots = (int) ((newestentry - oldestentry) + 1);
+ if (numberofliveslots >= numqueueentries) {
+ removeOldestSlot();
+ }
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
+ char command[] = "putslot";
+ sendResponse(command, sizeof(command) - 1);
+}
+
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+ cout << flush;
+}
+
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
+ int directorylen = strlen(directory);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename = new char[25 + directorylen];
+ snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum);
+ return filename;
+}
+
+/**
+ * Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+ int directorylen = strlen(directory);
+
+ /* Size is 4 characters for SALT string + 1 character for null
+ termination + directory size*/
+
+ char * filename = new char[6 + directorylen];
+ snprintf(filename, 6 + directorylen, "%s/SALT", directory);
+ return filename;
+}
+
+/**
+ * Removes the oldest slot file
+ */
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry != 0) {
+ char * filename = getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
+}
+
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
+void IoTQuery::processQuery() {
+ getQuery();
+ getDirectory();
+ // readData();
+ if (!readData())
+ {
+ cerr << "No Data Available" << endl;
+ return;
+ }
+
+
+ /* Verify that we receive a post request. */
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
+ return;
+ }
+
+ /* Make sure the directory is okay. */
+ if (directory == NULL ||
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
+ return;
+ }
+
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
+ return;
+ }
+
+ /* Lock status file to keep other requests out. */
+ flock(fd, LOCK_EX);
+
+ /* Decode query. */
+ decodeQuery();
+
+ /* Handle request. */
+ if (reqGetSlot)
+ getSlot();
+ else if (reqPutSlot)
+ putSlot();
+ else if (reqSetSalt)
+ setSalt();
+ else if (reqGetSalt)
+ getSalt();
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
+}
+
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
+bool IoTQuery::readData() {
+ if (length != 0) {
+ data = new char[length + 1];
+ memset(data, 0, length + 1);
+ cin.read(data, length);
+ }
+
+ do {
+ char dummy;
+ cin >> dummy;
+ } while (!cin.eof());
+
+ if (length != 0)
+ {
+ if (cin.gcount() != length)
+ {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
+void IoTQuery::getQuery() {
+ uri = FCGX_GetParam(uri_str, request->envp);
+ query = FCGX_GetParam(query_str, request->envp);
+ method = FCGX_GetParam(method_str, request->envp);
+ iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+ /** We require the content-length header to be sent. */
+ char * reqlength = FCGX_GetParam(length_str, request->envp);
+ if (reqlength) {
+ length = strtoll(reqlength, NULL, 10);
+ } else {
+ length = 0;
+ }
+}
+
+/**
+ * Initializes directory field from environmental variables.
+ */
+
+void IoTQuery::getDirectory() {
+ char * split = strchr((char *)uri, '?');
+ if (split == NULL)
+ return;
+ int split_len = (int) (split - uri);
+ int rootdir_len = strlen(iotcloudroot);
+ int directory_len = split_len + rootdir_len + 1;
+ directory = new char[directory_len];
+ memcpy(directory, iotcloudroot, rootdir_len);
+ memcpy(directory + rootdir_len, uri, split_len);
+ directory[directory_len - 1] = 0;
+}
+
+/**
+ * Helper function that is used to read the status file.
+ */
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+ do {
+ size_t bytesread = pread(fd, ptr, count, offset);
+ if (bytesread == count) {
+ return 1;
+ } else if (bytesread == 0) {
+ return 0;
+ }
+ } while (1);
+}
+
+
+/**
+ * Writes the current state to the status file.
+ */
+
+void IoTQuery::updateStatusFile() {
+ pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
+bool IoTQuery::openStatusFile() {
+ char statusfile[] = "queuestatus";
+ int len = strlen(directory);
+
+ char * filename = new char[len + sizeof(statusfile) + 2];
+ memcpy(filename, directory, len);
+ filename[len] = '/';
+ memcpy(filename + len + 1, statusfile, sizeof(statusfile));
+ filename[len + sizeof(statusfile) + 1] = 0;
+ fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
+ delete filename;
+
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
+ return false;
+ }
+
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
+ int size;
+ int needwrite = 0;
+ if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+ numqueueentries = size;
+ else
+ needwrite = 1;
+
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry = entry;
+ else
+ needwrite = 1;
+
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry = entry;
+ else
+ needwrite = 1;
+
+ if (needwrite)
+ updateStatusFile();
+
+ return true;
+}
+
+
--- /dev/null
+#ifndef IOTQUERY_H
+#define IOTQUERY_H
+#include <iostream>
+#include "fcgio.h"
+#include "fcgi_stdio.h"
+
+#define DEFAULT_SIZE 128
+#define OFFSET_MAX 0
+#define OFFSET_OLD 4
+#define OFFSET_NEW 12
+
+class IoTQuery {
+public:
+ IoTQuery(FCGX_Request * request);
+ ~IoTQuery();
+ void processQuery();
+
+private:
+ void sendResponse(char *data, int length);
+ void getQuery();
+ void getDirectory();
+ bool readData();
+ bool checkDirectory();
+ bool openStatusFile();
+ void updateStatusFile();
+ void decodeQuery();
+ void getSlot();
+ void putSlot();
+ void setSalt();
+ void getSalt();
+ void removeOldestSlot();
+ char * getSlotFileName(long long);
+ char * getSaltFileName();
+
+ FCGX_Request * request;
+ char *data;
+ /* Directory slot files are placed in. */
+ char *directory;
+ /* Full URI from Apache */
+ const char * uri;
+ /* Query portion of URI */
+ const char * query;
+ /* Type of request: GET or PUT */
+ const char * method;
+ /* Root directory for all accounts */
+ const char * iotcloudroot;
+ /* Expected length of data from client */
+ long long length;
+ /* Sequence number for oldest slot */
+ long long oldestentry;
+ /* Sequence number for newest slot */
+ long long newestentry;
+ /* Sequence number from request */
+ long long requestsequencenumber;
+ /* Size of queue */
+ int numqueueentries;
+ /* fd for queuestatus file */
+ int fd;
+ /* Is the request to get a slot? */
+ bool reqGetSlot;
+ /* Is the request to put a slot? */
+ bool reqPutSlot;
+ /* Is the request to set the salt? */
+ bool reqSetSalt;
+ /* Is the request to get the salt? */
+ bool reqGetSalt;
+};
+#endif