10 #include <netinet/in.h>
14 const char * query_str = "QUERY_STRING";
15 const char * uri_str = "REQUEST_URI";
16 const char * method_str = "REQUEST_METHOD";
17 const char * iotcloudroot_str = "IOTCLOUD_ROOT";
18 const char * length_str = "CONTENT_LENGTH";
20 IoTQuery::IoTQuery(FCGX_Request *request) :
31 requestsequencenumber(0),
32 numqueueentries(DEFAULT_SIZE),
40 IoTQuery::~IoTQuery() {
50 * Returns true if the account directory exists.
53 bool IoTQuery::checkDirectory() {
55 int err = stat(directory, &s);
58 return S_ISDIR(s.st_mode);
62 * Decodes query string from client. Extracts type of request,
63 * sequence number, and whether the request changes the number of
67 void IoTQuery::decodeQuery() {
68 int len = strlen(query);
69 char * str = new char[len + 1];
70 memcpy(str, query, len + 1);
74 char *command = strsep(&tok_ptr, "&");
75 if (strncmp(command, "req=putslot", 11) == 0)
77 else if (strncmp(command, "req=getslot", 11) == 0)
79 else if (strncmp(command, "req=setsalt", 11) == 0)
81 else if (strncmp(command, "req=getsalt", 11) == 0)
84 /* Load Sequence Number for request */
85 char *sequencenumber_str = strsep(&tok_ptr, "&");
86 if (sequencenumber_str != NULL &&
87 strncmp(sequencenumber_str, "seq=", 4) == 0) {
88 sequencenumber_str = strchr(sequencenumber_str, '=');
89 if (sequencenumber_str != NULL) {
90 requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
94 /* don't allow a really old sequence number */
95 if (requestsequencenumber < oldestentry)
96 requestsequencenumber = oldestentry;
98 /* Update size if we get request */
99 char * numqueueentries_str = tok_ptr;
100 if (numqueueentries_str != NULL &&
101 strncmp(numqueueentries_str, "max=", 4) == 0) {
102 numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
103 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
110 * Helper function to write data to file.
113 void doWrite(int fd, char *data, long long length) {
114 long long offset = 0;
116 long long byteswritten = write(fd, &data[offset], length);
117 if (byteswritten > 0) {
118 length -= byteswritten;
119 offset += byteswritten;
121 cerr << "Bytes not written" << endl;
122 if (byteswritten < 0) {
123 cerr << strerror(errno) << " error writing slot file" << endl;
127 } while (length != 0);
130 /** Helper function to read data from file. */
131 bool doRead(int fd, void *buf, int numbytes) {
133 char *ptr = (char *)buf;
135 int bytesread = read(fd, ptr + offset, numbytes);
138 numbytes -= bytesread;
141 } while (numbytes != 0);
146 * Function that handles a getSlot request.
149 void IoTQuery::getSlot() {
150 int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1);
151 if (numrequeststosend < 0)
152 numrequeststosend = 0;
153 long long numbytes = 0;
154 int filesizes[numrequeststosend];
155 int fdarray[numrequeststosend];
157 for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
159 char *filename = getSlotFileName(seqn);
160 if (stat(filename, &st) == 0) {
161 fdarray[index] = open(filename, O_RDONLY);
162 filesizes[index] = st.st_size;
163 numbytes += filesizes[index];
166 filesizes[index] = 0;
170 const char header[] = "getslot";
172 /* Size is the header + the payload + space for number of requests
173 plus sizes of each slot */
175 long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes;
176 char * response = new char[size];
177 long long offset = 0;
178 memcpy(response, header, sizeof(header) - 1);
179 offset += sizeof(header) - 1;
180 int numreq = htonl(numrequeststosend);
181 memcpy(response + offset, &numreq, sizeof(numreq));
182 offset += sizeof(numrequeststosend);
183 for (int i = 0; i < numrequeststosend; i++) {
184 int filesize = htonl(filesizes[i]);
185 memcpy(response + offset, &filesize, sizeof(filesize));
186 offset += sizeof(int);
189 /* Read the file data into the buffer */
190 for (int i = 0; i < numrequeststosend; i++) {
191 if (fdarray[i] >= 0) {
192 doRead(fdarray[i], response + offset, filesizes[i]);
193 offset += filesizes[i];
197 /* Send the response out to the webserver. */
198 sendResponse(response, size);
200 /* Delete the response buffer and close the files. */
202 for (int i = 0; i < numrequeststosend; i++) {
209 * The method setSalt handles a setSalt request from the client.
211 void IoTQuery::setSalt() {
212 /* Write the slot data we received to a SLOT file */
213 char *filename = getSaltFileName();
214 int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
215 doWrite(saltfd, data, length);
217 sendResponse(response, 0);
223 * The method getSalt handles a getSalt request from the client.
226 void IoTQuery::getSalt() {
227 /* Write the slot data we received to a SLOT file */
228 char *filename = getSaltFileName();
231 if (stat(filename, &st) == 0) {
232 filesize = st.st_size;
235 sendResponse(response, 0);
239 int saltfd = open(filename, O_RDONLY);
240 int responsesize = filesize + sizeof(int);
241 char * response = new char[responsesize];
242 doRead(saltfd, response + sizeof(int), filesize);
243 int n_filesize = htonl(filesize);
244 *((int*) response) = n_filesize;
245 sendResponse(response, responsesize);
252 * The method putSlot handles a putSlot request from the client
255 void IoTQuery::putSlot() {
256 /* Check if the request is stale and send update in that case. This
257 servers as an implicit failure of the request. */
258 if (requestsequencenumber != (newestentry + 1)) {
263 if (requestsequencenumber == 150)
265 /* Send response acknowledging success */
266 char command[] = "putslot";
267 sendResponse(command, sizeof(command) - 1);
271 /* See if we have too many slots and if so, delete the old one */
272 int numberofliveslots = (int) ((newestentry - oldestentry) + 1);
273 if (numberofliveslots >= numqueueentries) {
277 /* Write the slot data we received to a SLOT file */
278 char *filename = getSlotFileName(requestsequencenumber);
279 int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
280 doWrite(slotfd, data, length);
283 newestentry = requestsequencenumber;
285 /* Update the seuqence numbers and other status file information. */
288 /* Send response acknowledging success */
289 char command[] = "putslot";
290 sendResponse(command, sizeof(command) - 1);
294 * Method sends response. It wraps in appropriate headers for web
298 void IoTQuery::sendResponse(char * bytes, int len) {
299 cout << "Accept-Ranges: bytes\r\n"
300 << "Content-Length: " << len << "\r\n"
302 cout.write(bytes, len);
307 * Computes the name for a slot file for the given sequence number.
310 char * IoTQuery::getSlotFileName(long long seqnum) {
311 int directorylen = strlen(directory);
313 /* Size is 19 digits for ASCII representation of a long + 4
314 characters for SLOT string + 1 character for null termination +
317 char * filename = new char[25 + directorylen];
318 snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum);
323 * Computes the name for a salt file
326 char * IoTQuery::getSaltFileName() {
327 int directorylen = strlen(directory);
329 /* Size is 4 characters for SALT string + 1 character for null
330 termination + directory size*/
332 char * filename = new char[6 + directorylen];
333 snprintf(filename, 6 + directorylen, "%s/SALT", directory);
338 * Removes the oldest slot file
341 void IoTQuery::removeOldestSlot() {
342 if (oldestentry != 0) {
343 char * filename = getSlotFileName(oldestentry);
351 * Processes the query sent to the fastcgi handler.
354 void IoTQuery::processQuery() {
360 cerr << "No Data Available" << endl;
365 /* Verify that we receive a post request. */
366 if (strncmp(method, "POST", 4) != 0) {
367 cerr << "Not POST Request" << endl;
371 /* Make sure the directory is okay. */
372 if (directory == NULL ||
374 cerr << "Directory " << directory << " does not exist" << endl;
378 /* Get queue state from the status file. If it doesn't exist,
380 if (!openStatusFile()) {
381 cerr << "Failed to open status file" << endl;
385 /* Lock status file to keep other requests out. */
391 /* Handle request. */
401 cerr << "No recognized request" << endl;
407 * Reads in data for request. This is used for the slot to be
411 bool IoTQuery::readData() {
413 data = new char[length + 1];
414 memset(data, 0, length + 1);
415 cin.read(data, length);
421 } while (!cin.eof());
425 if (cin.gcount() != length)
436 * Reads relevant environmental variables to find out the request.
439 void IoTQuery::getQuery() {
440 uri = FCGX_GetParam(uri_str, request->envp);
441 query = FCGX_GetParam(query_str, request->envp);
442 method = FCGX_GetParam(method_str, request->envp);
443 iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
445 /** We require the content-length header to be sent. */
446 char * reqlength = FCGX_GetParam(length_str, request->envp);
448 length = strtoll(reqlength, NULL, 10);
455 * Initializes directory field from environmental variables.
458 void IoTQuery::getDirectory() {
459 char * split = strchr((char *)uri, '?');
462 int split_len = (int) (split - uri);
463 int rootdir_len = strlen(iotcloudroot);
464 int directory_len = split_len + rootdir_len + 1;
465 directory = new char[directory_len];
466 memcpy(directory, iotcloudroot, rootdir_len);
467 memcpy(directory + rootdir_len, uri, split_len);
468 directory[directory_len - 1] = 0;
472 * Helper function that is used to read the status file.
475 int doread(int fd, void *ptr, size_t count, off_t offset) {
477 size_t bytesread = pread(fd, ptr, count, offset);
478 if (bytesread == count) {
480 } else if (bytesread == 0) {
488 * Writes the current state to the status file.
491 void IoTQuery::updateStatusFile() {
492 pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
493 pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
494 pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
498 * Reads in queue state from the status file. Returns true if
502 bool IoTQuery::openStatusFile() {
503 char statusfile[] = "queuestatus";
504 int len = strlen(directory);
506 char * filename = new char[len + sizeof(statusfile) + 2];
507 memcpy(filename, directory, len);
509 memcpy(filename + len + 1, statusfile, sizeof(statusfile));
510 filename[len + sizeof(statusfile) + 1] = 0;
511 fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
515 cerr << strerror(errno) << " error opening statusfile" << endl;
519 /* Read in queue size, oldest sequence number, and newest sequence number. */
522 if (doread(fd, &size, sizeof(size), OFFSET_MAX))
523 numqueueentries = size;
528 if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
533 if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))