add code for Prefetch request processing at the remote machine
authoradash <adash>
Sun, 5 Aug 2007 03:21:33 +0000 (03:21 +0000)
committeradash <adash>
Sun, 5 Aug 2007 03:21:33 +0000 (03:21 +0000)
end

Robust/src/Runtime/DSTM/interface/dstmserver.c

index aaf1676285233160aa00279d2fe955445c00d735..87f21dec75eda9cbcafb296e78a61ea6e2f4473d 100644 (file)
@@ -14,6 +14,7 @@
 #define LISTEN_PORT 2156
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
+#define PRE_BUF_SIZE 2048
 
 extern int classsize[];
 
@@ -528,22 +529,21 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        return 0;
 }
 
-
 int prefetchReq(int acceptfd) {
-       int length, sum, n, numbytes, N, oidnfound = 0;
-       unsigned int oid;
-       char *ptr;
+       int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size;
+       unsigned int oid, index = 0;
+       char *ptr, buffer[PRE_BUF_SIZE];
        void *mobj;
-       unsigned int *oidnotfound;
+       unsigned int *oidnotfound, objoid;
+       char *header;
+       objheader_t * head;
        
-       ptr = (char *)&fixed;;
-
-       /* Counters and arrays to formulate decision on control message to be sent */
-       oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
-       
-       /* Repeated recv the oid and offset pairs sent for prefetch */
-       while((numbytes = recv(int)acceptfd, &length, sizeof(int), 0) != -1) {
+       /* Repeatedly recv the oid and offset pairs sent for prefetch */
+       while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+               if(length == -1)
+                       break;
                sum = 0;
+               index = 0;
                oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
                numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
                N = numoffset * sizeof(short);
@@ -553,19 +553,60 @@ int prefetchReq(int acceptfd) {
                do {
                        n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
                        sum += n; 
-
                } while(sum < N && n != 0);     
 
                /* Process each oid */
                /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
-                       /* Save the oids not found and number of oids not found for later use */
-                       oidnotfound[objnotfound] = oid;
-                       objnotfound++;
+                       /* Save the oids not found in buffer for later use */
+                       *(buffer + index) = OBJECT_NOT_FOUND;
+                       index += sizeof(char);
+                       memcpy(buffer+index, &oid, sizeof(unsigned int));
+                       index += sizeof(unsigned int);
                } else { /* If Obj found in machine (i.e. has not moved) */
                        /* Return the oid ..its header and data */
-
-
+                       header = (char *) mobj;
+                       head = (objheader_t *) header; 
+                       size = sizeof(objheader_t) + sizeof(classsize[head->type]);
+                       *(buffer + index) = OBJECT_FOUND;
+                       index += sizeof(char);
+                       memcpy(buffer+index, &oid, sizeof(unsigned int));
+                       index += sizeof(unsigned int);
+                       memcpy(buffer + index, header, size);
+                       index += size;
+                       /* Calculate the oid corresponding to the offset value */
+                       for(i = 0 ; i< numoffset ; i++) {
+                               objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
+                               if((header = (char *) mhashSearch(objoid)) == NULL) {
+                                       /* Obj not found, send oid and its offsets */
+                                       *(buffer + index) = OBJECT_NOT_FOUND;
+                                       index += sizeof(char);
+                                       memcpy(buffer+index, &oid, sizeof(unsigned int));
+                                       index += sizeof(unsigned int);
+                                       break;
+                               } else {/* Obj Found */
+                                       head = (objheader_t *) header; 
+                                       size = sizeof(objheader_t) + sizeof(classsize[head->type]);
+                                       *(buffer + index) = OBJECT_FOUND;
+                                       index += sizeof(char);
+                                       memcpy(buffer+index, &oid, sizeof(unsigned int));
+                                       index += sizeof(unsigned int);
+                                       memcpy(buffer + index, header, size);
+                                       index += size;
+                                       continue;
+                               }
+                       }
+               }
+               /* Check for overflow in the buffer */
+               if (index >= PRE_BUF_SIZE) {
+                       printf("Char buffer is overflowing\n");
+                       return 1;
+               }
+               /* Send the buffer with all oids found and not found */
+               if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
+                       perror("Error sending size of object\n");
+                       return 1;
+               }
        }
 
        return 0;