--- /dev/null
+package iotcloud;
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+
+class CloudComm {
+ String baseurl;
+ CloudComm(String _baseurl) {
+ this.baseurl=_baseurl;
+ }
+
+ private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
+ String reqstring=isput?"req=putslot":"req=getslot";
+ String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
+ if (maxentries != 0)
+ urlstr += "&max="+maxentries;
+ return new URL(urlstr);
+ }
+
+ public Slot[] putSlot(Slot slot, int max) throws IOException{
+ long sequencenumber=slot.getSequenceNumber();
+ URL url=buildRequest(true, sequencenumber, max);
+ URLConnection con=url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(slot.getBytes().length);
+ http.setDoOutput(true);
+ http.connect();
+ OutputStream os=http.getOutputStream();
+ os.write(slot.getBytes());
+ System.out.println(http.getResponseMessage());
+
+ InputStream is=http.getInputStream();
+ DataInputStream dis=new DataInputStream(is);
+ byte[] resptype=new byte[7];
+ dis.readFully(resptype);
+ if (Arrays.equals(resptype, "getslot".getBytes()))
+ return processSlots(dis, sequencenumber);
+ else if (Arrays.equals(resptype, "putslot".getBytes()))
+ return null;
+ else
+ throw new Error("Bad response to putslot");
+ }
+
+ public Slot[] getSlots(long sequencenumber) throws IOException {
+ URL url=buildRequest(false, sequencenumber, 0);
+ URLConnection con=url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.connect();
+ System.out.println(http.getResponseMessage());
+ InputStream is=http.getInputStream();
+
+ DataInputStream dis=new DataInputStream(is);
+ byte[] resptype=new byte[7];
+ dis.readFully(resptype);
+ if (!Arrays.equals(resptype, "getslot".getBytes()))
+ throw new Error("Bad Response: "+new String(resptype));
+ else
+ return processSlots(dis, sequencenumber);
+ }
+
+ Slot[] processSlots(DataInputStream dis, long sequencenumber) throws IOException {
+ int numberofslots=dis.readInt();
+ int[] sizesofslots=new int[numberofslots];
+ Slot[] slots=new Slot[numberofslots];
+ System.out.println(numberofslots);
+ for(int i=0;i<numberofslots;i++)
+ sizesofslots[i]=dis.readInt();
+
+ for(int i=0;i<numberofslots;i++) {
+ byte[] data=new byte[sizesofslots[i]];
+ dis.readFully(data);
+ slots[i]=new Slot(sequencenumber+i, data);
+ }
+ dis.close();
+ return slots;
+ }
+}
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
using namespace std;
newestentry(0),
requestsequencenumber(0),
numqueueentries(DEFAULT_SIZE),
- fd(-1) {
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false) {
}
IoTQuery::~IoTQuery() {
char * str=new char[len+1];
memcpy(str, query, len+1);
char *tok_ptr=str;
-
+
/* Parse commands */
char *command=strsep(&tok_ptr, "&");
- if (strncmp(command, "putslot", 7) == 0)
+ if (strncmp(command, "req=putslot", 11) == 0)
reqPutSlot = true;
- if (strncmp(command, "getslot", 7) == 0)
+ if (strncmp(command, "req=getslot", 11) == 0)
reqGetSlot = true;
/* Load Sequence Number for request */
char *sequencenumber_str = strsep(&tok_ptr, "&");
-
- if (sequencenumber_str != NULL)
- requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
-
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
/* Update size if we get request */
char * numqueueentries_str = tok_ptr;
- if (numqueueentries_str != NULL)
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str + 1, '=');
numqueueentries = strtoll(numqueueentries_str, NULL, 10);
-
+ }
+
delete str;
}
length -= byteswritten;
offset += byteswritten;
} else {
- cerr << "Bytes not written";
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
return;
}
} while(length != 0);
void IoTQuery::getSlot() {
int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
long long numbytes = 0;
int filesizes[numrequeststosend];
int fdarray[numrequeststosend];
}
delete filename;
}
- const char header[]="getdata";
- long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+ const char header[]="getslot";
+ long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
char * response = new char[size];
long long offset=0;
- memcpy(response, header, sizeof(header));
- offset+=sizeof(header);
- memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend));
+ memcpy(response, header, sizeof(header)-1);
+ offset+=sizeof(header)-1;
+ int numreq=htonl(numrequeststosend);
+ cerr << numrequeststosend << " " << numreq << endl;
+ memcpy(response + offset, &numreq, sizeof(numreq));
offset+=sizeof(numrequeststosend);
for(int i=0;i<numrequeststosend;i++) {
- memcpy(response + offset, &filesizes[i], sizeof(int));
+ int filesize=htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
offset+=sizeof(int);
}
-
+
//copy file data
for(int i=0;i<numrequeststosend;i++) {
if (fdarray[i]>=0) {
getSlot();
return;
}
-
+
int numberofliveslots=(int) ((newestentry-oldestentry)+1);
if (numberofliveslots >= numqueueentries) {
//need to drop slot
removeOldestSlot();
}
+
//write slot data out to file
char *filename = getSlotFileName(requestsequencenumber);
int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
delete filename;
newestentry = requestsequencenumber; // update sequence number
updateStatusFile(); // update counts
- char command[]="putdata";
- sendResponse(command, sizeof(command));
+ char command[]="putslot";
+ sendResponse(command, sizeof(command)-1);
}
void IoTQuery::sendResponse(char * bytes, int len) {
getDirectory();
readData();
- if (strncmp(method, "POST", 4) != 0)
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
return;
+ }
if (directory == NULL ||
- !checkDirectory())
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
return;
+ }
- if (!openStatusFile())
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
return;
+ }
flock(fd, LOCK_EX);
decodeQuery();
-
+
if (reqGetSlot)
getSlot();
else if (reqPutSlot)
putSlot();
- else return;
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
}
void IoTQuery::readData() {
iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
char * reqlength = FCGX_GetParam(length_str, request->envp);
- if (length) {
+ if (reqlength) {
length=strtoll(reqlength, NULL, 10);
} else {
length=0;
directory = new char[directory_len];
memcpy(directory, iotcloudroot, rootdir_len);
memcpy(directory + rootdir_len, uri, split_len);
- directory[directory_len]=0;
+ directory[directory_len-1]=0;
}
int doread(int fd, void *ptr, size_t count, off_t offset) {
fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
delete filename;
- if (fd < 0)
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
return false;
-
+ }
+
int size;
int needwrite=0;
if (doread(fd, &size, sizeof(size), OFFSET_MAX))