import javax.crypto.spec.*;
import java.security.SecureRandom;
+/**
+ * This class provides a communication API to the webserver. It also
+ * validates the HMACs on the slots and handles encryption.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
class CloudComm {
String baseurl;
Cipher encryptcipher;
SecretKeySpec key;
static final int SALT_SIZE = 8;
-
+ /**
+ * Empty Constructor needed for child class.
+ */
+
CloudComm() {
}
+ /**
+ * Constructor for actual use. Takes in the url and password.
+ */
+
CloudComm(String _baseurl, String password) {
this.baseurl=_baseurl;
initCloud(password);
}
+ /**
+ * Generates Key from password.
+ */
+
private void initKey(String password) {
try {
salt=new byte[SALT_SIZE];
}
}
+ /**
+ * Inits the HMAC generator.
+ */
+
private void initCloud(String password) {
try {
initKey(password);
throw new Error("Failed To Initialize Ciphers");
}
}
-
+
+ /*
+ * Builds the URL for the given request.
+ */
+
private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
String reqstring=isput?"req=putslot":"req=getslot";
String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
return new URL(urlstr);
}
+ /*
+ * API for putting a slot into the queue. Returns null on success.
+ * On failure, the server will send slots with newer sequence
+ * numbers.
+ */
+
public Slot[] putSlot(Slot slot, int max) {
try {
long sequencenumber=slot.getSequenceNumber();
}
/*
- Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
- encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
- Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
- decryptCipher.init(Cipher.DECRYPT_MODE, secret);
- */
-
+ Cipher encryptCipher =
+ Cipher.getInstance("AES/CBC/PKCS5Padding");
+ encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
+ Cipher decryptCipher =
+ Cipher.getInstance("AES/CBC/PKCS5Padding");
+ decryptCipher.init(Cipher.DECRYPT_MODE, secret);
+ */
+
+ /**
+ * Request the server to send all slots with the given
+ * sequencenumber or newer.
+ */
+
public Slot[] getSlots(long sequencenumber) {
try {
URL url=buildRequest(false, sequencenumber, 0);
}
}
- Slot[] processSlots(DataInputStream dis) throws IOException {
+ /**
+ * Method that actually handles building Slot objects from the
+ * server response. Shared by both putSlot and getSlots.
+ */
+
+ private Slot[] processSlots(DataInputStream dis) throws IOException {
int numberofslots=dis.readInt();
int[] sizesofslots=new int[numberofslots];
Slot[] slots=new Slot[numberofslots];
package iotcloud;
import java.nio.ByteBuffer;
+/**
+ * Generic class that wraps all the different types of information
+ * that can be stored in a Slot.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
abstract class Entry implements Liveness {
static final byte TypeKeyValue = 1;
static final byte TypeLastMessage = 2;
static final byte TypeRejectedMessage = 3;
static final byte TypeTableStatus = 4;
+
+ /* Records whether the information is still live or has been
+ superceded by a newer update. */
+
private boolean islive = true;
private Slot parentslot;
parentslot = _parentslot;
}
+ /**
+ * Static method for decoding byte array into Entry objects. First
+ * byte tells the type of entry.
+ */
+
static Entry decode(Slot slot, ByteBuffer bb) {
byte type=bb.get();
switch(type) {
}
}
+ /**
+ * Returns true if the Entry object is still live.
+ */
+
boolean isLive() {
return islive;
}
+ /**
+ * Flags the entry object as dead. Also decrements the live count
+ * of the parent slot.
+ */
+
void setDead() {
islive = false;
parentslot.decrementLiveCount();
}
+ /**
+ * Serializes the Entry object into the byte buffer.
+ */
+
abstract void encode(ByteBuffer bb);
+ /**
+ * Returns the size in bytes the entry object will take in the byte
+ * array.
+ */
+
abstract int getSize();
+ /**
+ * Returns a byte encoding the type of the entry object.
+ */
+
abstract byte getType();
}
import java.util.Arrays;
+/**
+ * IoTString is wraps the underlying byte string. We don't use the
+ * standard String class as we have bytes and not chars.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
final public class IoTString {
byte[] array;
int hashcode;
private IoTString() {
}
+ /**
+ * Builds an IoTString object around the byte array. This
+ * constructor makes a copy, so the caller is free to modify the byte array.
+ */
+
public IoTString(byte[] _array) {
array=(byte[]) _array.clone();
hashcode=Arrays.hashCode(array);
}
+ /**
+ * Converts the String object to a byte representation and stores it
+ * into the IoTString object.
+ */
+
public IoTString(String str) {
array=str.getBytes();
hashcode=Arrays.hashCode(array);
}
+ /**
+ * Internal methods to build an IoTString using the byte[] passed
+ * in. Caller is responsible for ensuring the byte[] is never
+ * modified.
+ */
+
static IoTString shallow(byte[] _array) {
IoTString i=new IoTString();
i.array = _array;
return i;
}
+ /**
+ * Internal method to grab a reference to our byte array. Caller
+ * must not modify it.
+ */
+
byte[] internalBytes() {
return array;
}
+ /**
+ * Returns the hashCode as computed by Arrays.hashcode(byte[]).
+ */
+
public int hashCode() {
return hashcode;
}
+ /**
+ * Returns a String representation of the IoTString.
+ */
+
public String toString() {
return new String(array);
}
+ /**
+ * Returns a copy of the underlying byte string.
+ */
+
public byte[] getBytes() {
return (byte[]) array.clone();
}
+ /**
+ * Returns true if two byte strings have the same content.
+ */
+
public boolean equals(Object o) {
if (o instanceof IoTString) {
IoTString i=(IoTString)o;
return false;
}
+ /**
+ * Returns the length in bytes of the IoTString.
+ */
+
public int length() {
return array.length;
}
package iotcloud;
import java.nio.ByteBuffer;
+/**
+ * KeyValue entry for Slot.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
class KeyValue extends Entry {
private IoTString key;
private IoTString value;
import java.nio.ByteBuffer;
+/**
+ * This Entry records the last message sent by a given machine.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
class LastMessage extends Entry {
private long machineid;
private long seqnum;
package iotcloud;
+/**
+ * Interface common to both classes that record information about the
+ * last message sent by a machine. (Either a Slot or a LastMessage.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
interface Liveness {
}
package iotcloud;
import java.nio.ByteBuffer;
+/**
+ * Entry for tracking messages that the server rejected. We have to
+ * make sure that all clients know that this message was rejected to
+ * prevent the server from reusing these messages in an attack.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
class RejectedMessage extends Entry {
+ /* Machine identifier */
private long machineid;
- private long oldseqnum; //Oldest seqnum in range
- private long newseqnum; //Newest seqnum in range (inclusive)
- private boolean equalto; //Is message sent or not sent by machineid
+ /* Oldest sequence number in range */
+ private long oldseqnum;
+ /* Newest sequence number in range */
+ private long newseqnum;
+ /* Is the machine identifier of the relevant slots equal to (or not
+ * equal to) the specified machine identifier. */
+ private boolean equalto;
RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
super(slot);
import javax.crypto.Mac;
import java.util.Arrays;
+/**
+ * Data structuring for holding Slot information.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
class Slot implements Liveness {
+ /** Sets the slot size. */
static final int SLOT_SIZE=2048;
+ /** Sets how many bytes we reserve. */
static final int RESERVED_SPACE=64;
+ /** Sets the size for the HMAC. */
static final int HMAC_SIZE=32;
+ /** Sequence number of the slot. */
private long seqnum;
+ /** HMAC of previous slot. */
private byte[] prevhmac;
+ /** HMAC of this slot. */
private byte[] hmac;
+ /** Machine that sent this slot. */
private long machineid;
+ /** Vector of entries in this slot. */
private Vector<Entry> entries;
+ /** Pieces of information that are live. */
private int livecount;
+ /** Flag that indicates whether this slot is still live for
+ * recording the machine that sent it. */
private boolean seqnumlive;
+ /** Number of bytes of free space. */
private int freespace;
Slot(long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
freespace -= e.getSize();
}
+ /**
+ * Returns true if the slot has free space to hold the entry without
+ * using its reserved space. */
+
boolean hasSpace(Entry e) {
int newfreespace = freespace - e.getSize();
return newfreespace > RESERVED_SPACE;
}
+ /**
+ * Returns true if the slot can fit the entry potentially using the
+ * reserved space. */
+
boolean canFit(Entry e) {
int newfreespace = freespace - e.getSize();
return newfreespace >= 0;
byte[] encode(Mac mac) {
byte[] array=new byte[SLOT_SIZE];
ByteBuffer bb=ByteBuffer.wrap(array);
- bb.position(HMAC_SIZE); //Leave space for the HMACs
+ /* Leave space for the slot HMAC. */
+ bb.position(HMAC_SIZE);
bb.put(prevhmac);
bb.putLong(seqnum);
bb.putLong(machineid);
for(Entry entry:entries) {
entry.encode(bb);
}
- //Compute our HMAC
+ /* Compute our HMAC */
mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
byte[] realmac=mac.doFinal();
hmac = realmac;
return array;
}
+ /**
+ * Returns the empty size of a Slot. Includes 2 HMACs, the machine
+ * identifier, the sequence number, and the number of entries.
+ */
int getBaseSize() {
return 2*HMAC_SIZE+2*Long.BYTES+Integer.BYTES;
}
+ /**
+ * Returns the live set of entries for this Slot. Generates a fake
+ * LastMessage entry to represent the information stored by the slot
+ * itself.
+ */
+
Vector<Entry> getLiveEntries() {
Vector<Entry> liveEntries=new Vector<Entry>();
for(Entry entry: entries)
return liveEntries;
}
+ /**
+ * Returns the sequence number of the slot.
+ */
+
long getSequenceNumber() {
return seqnum;
}
+ /**
+ * Returns the machine that sent this slot.
+ */
+
long getMachineID() {
return machineid;
}
+ /**
+ * Records that a newer slot records the fact that this slot was
+ * sent by the relevant machine.
+ */
+
void setDead() {
decrementLiveCount();
seqnumlive=false;
}
+ /**
+ * Update the count of live entries.
+ */
+
void decrementLiveCount() {
livecount--;
}
+ /**
+ * Returns whether the slot stores any live information.
+ */
+
boolean isLive() {
return livecount > 0;
}
package iotcloud;
+/**
+ * Buffer that holds the live set of slots.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
class SlotBuffer {
static final int DEFAULT_SIZE = 128;
package iotcloud;
+/**
+ * Slot indexer allows slots in both the slot buffer and the new
+ * server response to looked up in a consistent fashion.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
class SlotIndexer {
private Slot[] updates;
private SlotBuffer buffer;
import java.util.Arrays;
import java.util.Vector;
+/**
+ * IoTTable data structure. Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
final public class Table {
private int numslots;
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
Slot[] newslots=cloud.getSlots(sequencenumber+1);
validateandupdate(newslots, true);
}
-
+
public void update() {
Slot[] newslots=cloud.getSlots(sequencenumber+1);
Slot[] array=cloud.putSlot(s, numslots);
if (array == null) {
array = new Slot[] {s};
- validateandupdate(array, true); // update data structure
+ /* update data structure */
+ validateandupdate(array, true);
} else {
throw new Error("Error on initialization");
}
}
if ((numslots - buffer.size()) < FREE_SLOTS) {
- //have to check whether we have enough free slots
+ /* have to check whether we have enough free slots */
long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
seqn = fullfirstseqn < 1?1:fullfirstseqn;
for(int i=0; i < FREE_SLOTS; i++, seqn++) {
else
insertedkv=false;
- validateandupdate(array, true); // update data structure
+ /* update data structure */
+ validateandupdate(array, true);
return insertedkv;
}
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
- //The cloud communication layer has checked slot HMACs already
- //before decoding
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
if (newslots.length==0)
return;
processSlot(indexer, slot, acceptupdatestolocal);
}
- //If there is a gap, check to see if the server sent us everything
+ /* If there is a gap, check to see if the server sent us everything. */
if (firstseqnum != (sequencenumber+1))
checkNumSlots(newslots.length);
commitNewMaxSize();
- //commit new to slots
+ /* Commit new to slots. */
for(Slot slot:newslots) {
buffer.putSlot(slot);
}
package iotcloud;
import java.nio.ByteBuffer;
+/**
+ * TableStatus entries record the current size of the data structure
+ * in slots. Used to remember the size and to perform resizes.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
class TableStatus extends Entry {
private int maxslots;
package iotcloud;
+/**
+ * Test cases.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
public class Test {
public static void main(String[] args) {
if (args[0].equals("1"))
static Thread buildThread(String prefix, Table t) {
return new Thread() {
- public void run() {
- for(int i=0; i<600; i++) {
- String a=prefix+i;
- IoTString ia=new IoTString(a);
- t.put(ia, ia);
- System.out.println(ia+"->"+t.get(ia));
- }
- }
+ public void run() {
+ for(int i=0; i<600; i++) {
+ String a=prefix+i;
+ IoTString ia=new IoTString(a);
+ t.put(ia, ia);
+ System.out.println(ia+"->"+t.get(ia));
+ }
+ }
};
}
-
+
static void test4() {
Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
e.printStackTrace();
}
}
-
+
static void test3() {
Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
package iotcloud;
import java.io.*;
import java.net.*;
-import java.util.Arrays;
-import javax.crypto.*;
+
+/**
+ * This class is a test driver to test the code w/o going through an
+ * actual web server.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
class TestCloudComm extends CloudComm {
SlotBuffer buffer;
1) check expiration of rejectedmessage entries
2) check missing machine messages
3) add crypto
-4) fix bugs with missing slots on server
+4) handle Salt
cin.rdbuf(&cin_fcgi_streambuf);
cout.rdbuf(&cout_fcgi_streambuf);
cerr.rdbuf(&cerr_fcgi_streambuf);
+
IoTQuery * iotquery=new IoTQuery(&request);
iotquery->processQuery();
+
delete iotquery;
}
delete data;
}
+/**
+ * Returns true if the account directory exists.
+ */
+
bool IoTQuery::checkDirectory() {
struct stat s;
int err=stat(directory, &s);
return S_ISDIR(s.st_mode);
}
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
void IoTQuery::decodeQuery() {
int len=strlen(query);
char * str=new char[len+1];
}
}
- //don't allow a really old sequence number
+ /* don't allow a really old sequence number */
if (requestsequencenumber < oldestentry)
requestsequencenumber = oldestentry;
delete str;
}
+/**
+ * Helper function to write data to file.
+ */
+
void doWrite(int fd, char *data, long long length) {
long long offset=0;
do {
} while(length != 0);
}
+/** Helper function to read data from file. */
bool doRead(int fd, void *buf, int numbytes) {
int offset=0;
char *ptr=(char *)buf;
return true;
}
+/**
+ * Function that handles a getSlot request.
+ */
+
void IoTQuery::getSlot() {
int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
if (numrequeststosend < 0)
delete filename;
}
const char header[]="getslot";
- long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
char * response = new char[size];
long long offset=0;
memcpy(response, header, sizeof(header)-1);
offset+=sizeof(int);
}
- //copy file data
+ /* Read the file data into the buffer */
for(int i=0; i<numrequeststosend; i++) {
if (fdarray[i]>=0) {
doRead(fdarray[i], response+offset, filesizes[i]);
}
}
- //write response out
+ /* Send the response out to the webserver. */
sendResponse(response, size);
- //cleanup
+ /* Delete the response buffer and close the files. */
delete response;
for(int i=0; i<numrequeststosend; i++) {
if (fdarray[i] >= 0)
}
}
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
if (requestsequencenumber!=(newestentry+1)) {
getSlot();
return;
}
+ /* See if we have too many slots and if so, delete the old one */
int numberofliveslots=(int) ((newestentry-oldestentry)+1);
if (numberofliveslots >= numqueueentries) {
- //need to drop slot
removeOldestSlot();
}
- //write slot data out to file
+ /* Write the slot data we received to a SLOT file */
char *filename = getSlotFileName(requestsequencenumber);
int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
doWrite(slotfd, data, length);
close(slotfd);
delete filename;
- newestentry = requestsequencenumber; // update sequence number
- updateStatusFile(); // update counts
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
char command[]="putslot";
sendResponse(command, sizeof(command)-1);
}
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
void IoTQuery::sendResponse(char * bytes, int len) {
cout << "Accept-Ranges: bytes\r\n"
<< "Content-Length: " << len << "\r\n"
cout.write(bytes, len);
}
-char * IoTQuery::getSlotFileName(long long slot) {
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
int directorylen=strlen(directory);
- char * filename=new char[25+directorylen]; //19 digits for long number + 4 characters for SLOT + 1 character for null termination
- snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename=new char[25+directorylen];
+ snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, seqnum);
return filename;
}
+/**
+ * Removes the oldest slot file
+ */
+
void IoTQuery::removeOldestSlot() {
if (oldestentry!=0) {
char * filename=getSlotFileName(oldestentry);
oldestentry++;
}
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
void IoTQuery::processQuery() {
getQuery();
getDirectory();
readData();
+ /* Verify that we receive a post request. */
if (strncmp(method, "POST", 4) != 0) {
cerr << "Not POST Request" << endl;
return;
}
+ /* Make sure the directory is okay. */
if (directory == NULL ||
!checkDirectory()) {
cerr << "Directory " << directory << " does not exist" << endl;
return;
}
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
if (!openStatusFile()) {
cerr << "Failed to open status file" << endl;
return;
}
+ /* Lock status file to keep other requests out. */
flock(fd, LOCK_EX);
+ /* Decode query. */
decodeQuery();
+ /* Handle request. */
if (reqGetSlot)
getSlot();
else if (reqPutSlot)
}
}
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
void IoTQuery::readData() {
if (length) {
data = new char[length+1];
} while (!cin.eof());
}
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
void IoTQuery::getQuery() {
uri = FCGX_GetParam(uri_str, request->envp);
query = FCGX_GetParam(query_str, request->envp);
method = FCGX_GetParam(method_str, request->envp);
iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+ /** We require the content-length header to be sent. */
char * reqlength = FCGX_GetParam(length_str, request->envp);
if (reqlength) {
length=strtoll(reqlength, NULL, 10);
}
}
+/**
+ * Initializes directory field from environmental variables.
+ */
+
void IoTQuery::getDirectory() {
char * split = strchr((char *)uri, '?');
if (split == NULL)
directory[directory_len-1]=0;
}
+/**
+ * Helper function that is used to read the status file.
+ */
+
int doread(int fd, void *ptr, size_t count, off_t offset) {
do {
size_t bytesread=pread(fd, ptr, count, offset);
} while(1);
}
+
+/**
+ * Writes the current state to the status file.
+ */
+
void IoTQuery::updateStatusFile() {
pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
}
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
bool IoTQuery::openStatusFile() {
char statusfile[]="queuestatus";
int len=strlen(directory);
return false;
}
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
int size;
int needwrite=0;
if (doread(fd, &size, sizeof(size), OFFSET_MAX))
FCGX_Request * request;
char *data;
+ /* Directory slot files are placed in. */
char *directory;
+ /* Full URI from Apache */
const char * uri;
+ /* Query portion of URI */
const char * query;
+ /* Type of request: GET or PUT */
const char * method;
+ /* Root directory for all accounts */
const char * iotcloudroot;
+ /* Expected length of data from client */
long long length;
+ /* Sequence number for oldest slot */
long long oldestentry;
+ /* Sequence number for newest slot */
long long newestentry;
+ /* Sequence number from request */
long long requestsequencenumber;
+ /* Size of queue */
int numqueueentries;
+ /* fd for queuestatus file */
int fd;
+ /* Is the request to get a slot? */
bool reqGetSlot;
+ /* Is the request to put a slot? */
bool reqPutSlot;
};
#endif