numqueueentries(DEFAULT_SIZE),
fd(-1),
reqGetSlot(false),
- reqPutSlot(false) {
+ reqPutSlot(false),
+ reqSetSalt(false),
+ reqGetSalt(false) {
}
IoTQuery::~IoTQuery() {
delete data;
}
+/**
+ * Returns true if the account directory exists.
+ */
+
bool IoTQuery::checkDirectory() {
struct stat s;
int err=stat(directory, &s);
return S_ISDIR(s.st_mode);
}
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
void IoTQuery::decodeQuery() {
int len=strlen(query);
char * str=new char[len+1];
memcpy(str, query, len+1);
char *tok_ptr=str;
-
+
/* Parse commands */
char *command=strsep(&tok_ptr, "&");
if (strncmp(command, "req=putslot", 11) == 0)
reqPutSlot = true;
-
- if (strncmp(command, "req=getslot", 11) == 0)
+ else if (strncmp(command, "req=getslot", 11) == 0)
reqGetSlot = true;
+ else if (strncmp(command, "req=setsalt", 11) == 0)
+ reqSetSalt = true;
+ else if (strncmp(command, "req=getsalt", 11) == 0)
+ reqGetSalt = true;
/* Load Sequence Number for request */
char *sequencenumber_str = strsep(&tok_ptr, "&");
}
}
- //don't allow a really old sequence number
+ /* don't allow a really old sequence number */
if (requestsequencenumber < oldestentry)
requestsequencenumber = oldestentry;
char * numqueueentries_str = tok_ptr;
if (numqueueentries_str != NULL &&
strncmp(numqueueentries_str, "max=", 4) == 0) {
- numqueueentries_str = strchr(numqueueentries_str + 1, '=');
+ numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
numqueueentries = strtoll(numqueueentries_str, NULL, 10);
}
delete str;
}
+/**
+ * Helper function to write data to file.
+ */
+
void doWrite(int fd, char *data, long long length) {
long long offset=0;
do {
} while(length != 0);
}
+/** Helper function to read data from file. */
bool doRead(int fd, void *buf, int numbytes) {
int offset=0;
char *ptr=(char *)buf;
return true;
}
+/**
+ * Function that handles a getSlot request.
+ */
+
void IoTQuery::getSlot() {
int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
if (numrequeststosend < 0)
delete filename;
}
const char header[]="getslot";
- long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
char * response = new char[size];
long long offset=0;
memcpy(response, header, sizeof(header)-1);
offset+=sizeof(header)-1;
int numreq=htonl(numrequeststosend);
- cerr << numrequeststosend << " " << numreq << endl;
memcpy(response + offset, &numreq, sizeof(numreq));
offset+=sizeof(numrequeststosend);
for(int i=0; i<numrequeststosend; i++) {
offset+=sizeof(int);
}
- //copy file data
+ /* Read the file data into the buffer */
for(int i=0; i<numrequeststosend; i++) {
if (fdarray[i]>=0) {
doRead(fdarray[i], response+offset, filesizes[i]);
}
}
- //write response out
+ /* Send the response out to the webserver. */
sendResponse(response, size);
- //cleanup
+ /* Delete the response buffer and close the files. */
delete response;
for(int i=0; i<numrequeststosend; i++) {
if (fdarray[i] >= 0)
}
}
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+
+void IoTQuery::setSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+ doWrite(saltfd, data, length);
+ char response[0];
+ sendResponse(response, 0);
+ close(saltfd);
+ delete filename;
+}
+
+/**
+ * The method getSalt handles a setSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int filesize = 0;
+ struct stat st;
+ if (stat(filename, &st) == 0) {
+ filesize=st.st_size;
+ } else {
+ delete filename;
+ return;
+ }
+ int saltfd = open(filename, O_RDONLY);
+ int responsesize = filesize + sizeof(int);
+ char * response = new char[responsesize];
+ doRead(saltfd, response+ sizeof(int), filesize);
+ int n_filesize=htonl(filesize);
+ *((int*) response) = n_filesize;
+ sendResponse(response, responsesize);
+ close(saltfd);
+ delete filename;
+ delete response;
+}
+
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
if (requestsequencenumber!=(newestentry+1)) {
getSlot();
return;
}
+ /* See if we have too many slots and if so, delete the old one */
int numberofliveslots=(int) ((newestentry-oldestentry)+1);
if (numberofliveslots >= numqueueentries) {
- //need to drop slot
removeOldestSlot();
}
- //write slot data out to file
+ /* Write the slot data we received to a SLOT file */
char *filename = getSlotFileName(requestsequencenumber);
int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
doWrite(slotfd, data, length);
close(slotfd);
delete filename;
- newestentry = requestsequencenumber; // update sequence number
- updateStatusFile(); // update counts
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
char command[]="putslot";
sendResponse(command, sizeof(command)-1);
}
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
void IoTQuery::sendResponse(char * bytes, int len) {
cout << "Accept-Ranges: bytes\r\n"
<< "Content-Length: " << len << "\r\n"
cout.write(bytes, len);
}
-char * IoTQuery::getSlotFileName(long long slot) {
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
int directorylen=strlen(directory);
- char * filename=new char[25+directorylen]; //19 digits for long number + 4 characters for SLOT + 1 character for null termination
- snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename=new char[25+directorylen];
+ snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum);
+ return filename;
+}
+
+/**
+ * Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+ int directorylen=strlen(directory);
+
+ /* Size is 4 characters for SALT string + 1 character for null
+ termination + directory size*/
+
+ char * filename=new char[6+directorylen];
+ snprintf(filename, 6+directorylen, "%s/SALT", directory);
return filename;
}
+/**
+ * Removes the oldest slot file
+ */
+
void IoTQuery::removeOldestSlot() {
if (oldestentry!=0) {
char * filename=getSlotFileName(oldestentry);
oldestentry++;
}
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
void IoTQuery::processQuery() {
getQuery();
getDirectory();
readData();
+ /* Verify that we receive a post request. */
if (strncmp(method, "POST", 4) != 0) {
cerr << "Not POST Request" << endl;
return;
}
+ /* Make sure the directory is okay. */
if (directory == NULL ||
!checkDirectory()) {
cerr << "Directory " << directory << " does not exist" << endl;
return;
}
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
if (!openStatusFile()) {
cerr << "Failed to open status file" << endl;
return;
}
+ /* Lock status file to keep other requests out. */
flock(fd, LOCK_EX);
+ /* Decode query. */
decodeQuery();
-
+
+ /* Handle request. */
if (reqGetSlot)
getSlot();
else if (reqPutSlot)
putSlot();
+ else if (reqSetSalt)
+ setSalt();
+ else if (reqGetSalt)
+ getSalt();
else {
cerr << "No recognized request" << endl;
return;
}
}
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
void IoTQuery::readData() {
if (length) {
data = new char[length+1];
} while (!cin.eof());
}
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
void IoTQuery::getQuery() {
uri = FCGX_GetParam(uri_str, request->envp);
query = FCGX_GetParam(query_str, request->envp);
method = FCGX_GetParam(method_str, request->envp);
iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+ /** We require the content-length header to be sent. */
char * reqlength = FCGX_GetParam(length_str, request->envp);
if (reqlength) {
length=strtoll(reqlength, NULL, 10);
}
}
+/**
+ * Initializes directory field from environmental variables.
+ */
+
void IoTQuery::getDirectory() {
char * split = strchr((char *)uri, '?');
if (split == NULL)
directory[directory_len-1]=0;
}
+/**
+ * Helper function that is used to read the status file.
+ */
+
int doread(int fd, void *ptr, size_t count, off_t offset) {
do {
size_t bytesread=pread(fd, ptr, count, offset);
} while(1);
}
+
+/**
+ * Writes the current state to the status file.
+ */
+
void IoTQuery::updateStatusFile() {
pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
}
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
bool IoTQuery::openStatusFile() {
char statusfile[]="queuestatus";
int len=strlen(directory);
return false;
}
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
int size;
int needwrite=0;
if (doread(fd, &size, sizeof(size), OFFSET_MAX))