changed the prefetch request send and prefetch response receive design
authoradash <adash>
Thu, 6 Mar 2008 23:45:33 +0000 (23:45 +0000)
committeradash <adash>
Thu, 6 Mar 2008 23:45:33 +0000 (23:45 +0000)
delete the second retry in the prefetch cache
added do while loop for trans_soft_abort case in transCommit() function
minor bug fixes

Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index adfaaf17c9756878354a81ae0e387a9ccc8ecd9b..0f61a9ddf47dfeae0fac9c2ccf6dadfea615b966 100644 (file)
@@ -1530,10 +1530,6 @@ public class BuildCode {
                            tstlbl += generateTemp(fm, id.getTempDescAt(i), lb) + "+";
                    }
                    tstlbl += id.offset.toString();
-                   output.println("if ("+tstlbl+"< 0 || "+tstlbl+" >= "+
-                                   generateTemp(fm, pp.base, lb) + "->___length___) {");
-                   output.println("   failedboundschk();");
-                   output.println("}");
 
                    TypeDescriptor elementtype = pp.base.getType().dereference();
                    String type="";
@@ -1542,13 +1538,9 @@ public class BuildCode {
                    else 
                            type=elementtype.getSafeSymbol()+" ";
 
-                   String oid = new String("(unsigned int) (" + generateTemp(fm, pp.base, lb) + " != NULL ? " + "((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"] : 0)");
-
-                   /*
-                   test = "(("+tstlbl+"< 0) || ("+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___))";
-                   String oid = new String("(unsigned int) (" +genarateTemp(fm, pp.base, lb) + " != NULL ? (" +test+ " ? 0 : ((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"]) : 0);"); 
-                   */
-                   
+                   String oid = new String("(unsigned int) (" + generateTemp(fm, pp.base, lb) + " != NULL ? " + 
+                                   "((" + tstlbl+"< 0 || "+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___) ? 0 :"+
+                                   "((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"]) : 0)");
                    oids.add(oid);
            }
 
index c250439ff9eb49cba3b1bff9c2ce421610e2810e..31deba122aff47712c0cefd0b62631eced94194c 100644 (file)
@@ -244,7 +244,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
 void checkPreCache(prefetchqelem_t *, int *, unsigned int, int);
 int transPrefetchProcess(transrecord_t *, int **, short);
 void sendPrefetchReq(prefetchpile_t*);
-void getPrefetchResponse(int, int);
+int getPrefetchResponse(int);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
index b232cbef165b37769ae550b98c81158f742cabd8..1988c502f098afd07c974e4441471e093c6db9f3 100644 (file)
@@ -21,7 +21,6 @@
 #define LISTEN_PORT 2156
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
-#define PRE_BUF_SIZE 2048
 
 extern int classsize[];
 
@@ -114,9 +113,10 @@ void *dstmListen()
  * and accordingly calls other functions to process new requests */
 void *dstmAccept(void *acceptfd)
 {
-       int val, retval, size;
+       int val, retval, size, sum;
        unsigned int oid;
-       char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
+       char *buffer;
+       char control,ctrl;
        char *ptr;
        void *srcObj;
        objheader_t *h;
@@ -124,8 +124,6 @@ void *dstmAccept(void *acceptfd)
        unsigned short objType, *versionarry, version;
        unsigned int *oidarry, numoid, mid, threadid;
        
-       int i;
-
        transinfo.objlocked = NULL;
        transinfo.objnotfound = NULL;
        transinfo.modptr = NULL;
@@ -134,7 +132,7 @@ void *dstmAccept(void *acceptfd)
 
        /* Receive control messages from other machines */
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
-               perror("Error: in receiving control from coordinator\n");
+               printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
                pthread_exit(NULL);
        }
        
@@ -196,6 +194,12 @@ void *dstmAccept(void *acceptfd)
                                pthread_exit(NULL);
                        }
                        break;
+               case TRANS_PREFETCH_RESPONSE:
+                       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+                               printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+                               pthread_exit(NULL);
+                       }
+                       break;
                case START_REMOTE_THREAD:
                        retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
                        if (retval <= 0)
@@ -211,53 +215,52 @@ void *dstmAccept(void *acceptfd)
                        break;
 
                case THREAD_NOTIFY_REQUEST:
-                       size = sizeof(unsigned int);
-                       bzero(&buffer, RECEIVE_BUFFER_SIZE);
-                       retval = recv((int)acceptfd, &buffer, size, 0);
-                       numoid = *((unsigned int *) &buffer);
+                       retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0);
                        size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
-                       bzero(&buffer, RECEIVE_BUFFER_SIZE);
-                       retval = recv((int)acceptfd, &buffer, size, 0);
-                       if(retval <=0)
-                               perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST");
-                       else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid))
-                               printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval, 
-                                               __FILE__, __LINE__);
-                       else {
-                               oidarry = calloc(numoid, sizeof(unsigned int)); 
-                               memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
-                               size = sizeof(unsigned int) * numoid;
-                               versionarry = calloc(numoid, sizeof(unsigned short));
-                               memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
-                               size += sizeof(unsigned short) * numoid;
-                               mid = *((unsigned int *)(buffer+size));
-                               size += sizeof(unsigned int);
-                               threadid = *((unsigned int *)(buffer+size));
-                               processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+                       if((buffer = calloc(1,size)) == NULL) {
+                               printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+                               pthread_exit(NULL);
                        }
+                       sum = 0;
+                       do {
+                               sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
+                       } while(sum < size);
+
+                       oidarry = calloc(numoid, sizeof(unsigned int)); 
+                       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
+                       size = sizeof(unsigned int) * numoid;
+                       versionarry = calloc(numoid, sizeof(unsigned short));
+                       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
+                       size += sizeof(unsigned short) * numoid;
+                       mid = *((unsigned int *)(buffer+size));
+                       size += sizeof(unsigned int);
+                       threadid = *((unsigned int *)(buffer+size));
+                       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+                       free(buffer);
 
                        break;
 
                case THREAD_NOTIFY_RESPONSE:
                        size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
-                       bzero(&buffer, RECEIVE_BUFFER_SIZE);
-                       retval = recv((int)acceptfd, &buffer, size, 0);
-                       if(retval <= 0) 
-                               perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE");
-                       else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
-                               printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n", 
-                                               retval, __FILE__, __LINE__);
-                       else {
-                               oid = *((unsigned int *)buffer);
-                               size = sizeof(unsigned int);
-                               version = *((unsigned short *)(buffer+size));
-                               size += sizeof(unsigned short);
-                               threadid = *((unsigned int *)(buffer+size));
-                               threadNotify(oid,version,threadid);
+                       if((buffer = calloc(1,size)) == NULL) {
+                               printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+                               pthread_exit(NULL);
                        }
 
-                       break;
+                       sum = 0;
+                       do {
+                               sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
+                       } while(sum < size);
+
+                       oid = *((unsigned int *)buffer);
+                       size = sizeof(unsigned int);
+                       version = *((unsigned short *)(buffer+size));
+                       size += sizeof(unsigned short);
+                       threadid = *((unsigned int *)(buffer+size));
+                       threadNotify(oid,version,threadid);
+                       free(buffer);
 
+                       break;
                default:
                        printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
        }
@@ -669,122 +672,194 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
  * then use offset values to prefetch references to other objects */
 
 int prefetchReq(int acceptfd) {
-       int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
-       int isArray = 0;
-       unsigned int oid, index = 0;
-       char *ptr, buffer[PRE_BUF_SIZE];
-       void *mobj;
-       unsigned int objoid;
-       char control;
-       objheader_t * header;
-       int bytesRecvd;
+       int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
+       int length, sd;
+       char *recvbuffer, *sendbuffer, control;
+       unsigned int oid, mid;
+       unsigned short *offsetarry;
+       objheader_t *header;
+       struct sockaddr_in remoteAddr;
 
-       /* Repeatedly recv one oid and offset pair sent for prefetch */
-       while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
-               count++;
-               if(length == -1)
+       while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
+               if(length == -1) { //-1 is special character to represent end of sending oids and offsets
                        break;
-               index = 0;  
-               bytesRecvd = 0;
-               do {
-                       bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
-                                       sizeof(unsigned int) - bytesRecvd, 0);
-               } while (bytesRecvd < sizeof(unsigned int));
-               numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
-               N = numoffset * sizeof(short);
-               short offset[numoffset];
-               ptr = (char *)&offset;
-               sum = 0;
-               /* Recv the offset values per oid */ 
-               do {
-                       n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
-                       sum += n; 
-               } while(sum < N && n != 0);     
+               } else {
+                       numbytes = 0;
+                       size = length - sizeof(int);
+                       if((recvbuffer = calloc(1, size)) == NULL) {
+                               printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+                               return -1;
+                       }
+                       while(numbytes < size) {
+                               numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
+                       }
 
-               bzero(&buffer, PRE_BUF_SIZE);
-               /* Process each oid */
-               if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
-                       /* Save the oids not found in buffer for later use */
-                       *(buffer + index) = OBJECT_NOT_FOUND;
-                       index += sizeof(char);
-                       *((unsigned int *)(buffer+index)) = oid;
-                       index += sizeof(unsigned int);
-               } else { /* If Obj found in machine (i.e. has not moved) */
-                       /* send the oid, it's size, it's header and data */
-                       header = (objheader_t *)mobj;
-                       GETSIZE(size, header);
-                       size += sizeof(objheader_t);
-                       *(buffer + index) = OBJECT_FOUND;
-                       index += sizeof(char);
-                       *((unsigned int *)(buffer+index)) = oid;
-                       index += sizeof(unsigned int);
-                       *((int *)(buffer+index)) = size;
-                       index += sizeof(int);
-                       memcpy(buffer + index, header, size);
-                       index += size;
-                       /* Calculate the oid corresponding to the offset value */
-                       for(i = 0 ; i< numoffset ; i++) {
-                               /* Check for arrays  */
-                               if(TYPE(header) > NUMCLASSES) {
-                                       isArray = 1;
+                       oid = *((unsigned int *) recvbuffer);
+                       mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
+                       size = size - (2 * sizeof(unsigned int));
+                       numoffset = size / sizeof(short);
+                       if((offsetarry = calloc(numoffset, sizeof(unsigned short))) == NULL) {
+                               printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+                               free(recvbuffer);
+                               return -1;
+                       }
+                       memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
+                       free(recvbuffer);
+
+                       /* Create socket to send information */
+                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+                               perror("prefetchReq():socket()");
+                               return;
+                       }
+                       bzero(&remoteAddr, sizeof(remoteAddr));
+                       remoteAddr.sin_family = AF_INET;
+                       remoteAddr.sin_port = htons(LISTEN_PORT);
+                       remoteAddr.sin_addr.s_addr = htonl(mid);
+
+                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+                               printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+                               close(sd);
+                               return -1;
+                       }
+
+                       /*Process each oid */
+                       if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+                               /* Save the oids not found in buffer for later use */
+                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+                               if((sendbuffer = calloc(1, size)) == NULL) {
+                                       printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+                                       free(offsetarry);
+                                       close(sd);
+                                       return -1;
                                }
-                               if(isArray == 1) {
-                                       int elementsize = classsize[TYPE(header)];
-                                       objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
-                               } else {
-                                       objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
+                               *((int *) sendbuffer) = size;
+                               *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+                               *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+                               control = TRANS_PREFETCH_RESPONSE;
+                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+                                       free(offsetarry);
+                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
+                                                       __func__, __FILE__, __LINE__);
+                                       close(sd);
+                                       return -1;
+                               }
+                       } else { /* Object Found */
+                               int incr = 0;
+                               GETSIZE(objsize, header);
+                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+                               if((sendbuffer = calloc(1, size)) == NULL) {
+                                       printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+                                       free(offsetarry);
+                                       close(sd);
+                                       return -1;
                                }
-                               if((header = mhashSearch(objoid)) == NULL) {
-                                       /* Obj not found, send oid */
-                                       *(buffer + index) = OBJECT_NOT_FOUND;
-                                       index += sizeof(char);
-                                       *((unsigned int *)(buffer+index)) = objoid;
-                                       index += sizeof(unsigned int);
-                                       break;
-                               } else {/* Obj Found */
-                                       /* send the oid, it's size, it's header and data */
-                                       GETSIZE(size, header);
-                                       size+=sizeof(objheader_t);
-                                       *(buffer+index) = OBJECT_FOUND;
-                                       index += sizeof(char);
-                                       *((unsigned int *)(buffer+index)) = objoid;
-                                       index += sizeof(unsigned int);
-                                       *((int *)(buffer+index)) = size;
-                                       index += sizeof(int);
-                                       memcpy(buffer+index, header, size);
-                                       index += size;
+                               *((int *) (sendbuffer + incr)) = size;
+                               incr += sizeof(int);
+                               *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+                               incr += sizeof(char);
+                               *((unsigned int *)(sendbuffer+incr)) = oid;
+                               incr += sizeof(unsigned int);
+                               memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+                               control = TRANS_PREFETCH_RESPONSE;
+                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+                                       free(offsetarry);
+                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
+                                                       __func__, __FILE__, __LINE__);
+                                       close(sd);
+                                       return -1;
+                               }
+                               /* Calculate the oid corresponding to the offset value */
+                               for(i = 0 ; i< numoffset ; i++) {
+                                       /* Check for arrays  */
+                                       if(TYPE(header) > NUMCLASSES) {
+                                               isArray = 1;
+                                       }
+                                       if(isArray == 1) {
+                                               int elementsize = classsize[TYPE(header)];
+                                               oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+                                       } else {
+                                               oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+                                       }
+
+                                       if((header = mhashSearch(oid)) == NULL) {
+                                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+                                               if((sendbuffer = calloc(1, size)) == NULL) {
+                                                       printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+                                                       free(offsetarry);
+                                                       close(sd);
+                                                       return -1;
+                                               }
+                                               *((int *) sendbuffer) = size;
+                                               *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+                                               *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+                                               control = TRANS_PREFETCH_RESPONSE;
+                                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+                                                       free(offsetarry);
+                                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
+                                                                       __FILE__, __LINE__);
+                                                       close(sd);
+                                                       return -1;
+                                               }
+                                               break;
+                                       } else {/* Obj Found */
+                                               int incr = 0;
+                                               GETSIZE(objsize, header);
+                                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+                                               if((sendbuffer = calloc(1, size)) == NULL) {
+                                                       printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+                                                       free(offsetarry);
+                                                       close(sd);
+                                                       return -1;
+                                               }
+                                               *((int *) (sendbuffer + incr)) = size;
+                                               incr += sizeof(int);
+                                               *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+                                               incr += sizeof(char);
+                                               *((unsigned int *)(sendbuffer+incr)) = oid;
+                                               incr += sizeof(unsigned int);
+                                               memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+                                               control = TRANS_PREFETCH_RESPONSE;
+                                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+                                                       free(offsetarry);
+                                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
+                                                                       __func__, __FILE__, __LINE__);
+                                                       close(sd);
+                                                       return -1;
+                                               }
+                                       }
                                        isArray = 0;
-                                       continue;
                                }
+                               free(offsetarry);
                        }
                }
+       }
+       close(sd);
+       return 0;
+}
 
-               /* Check for overflow in the buffer */
-               if (index >= PRE_BUF_SIZE) {
-                       printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               /* Send Prefetch response control message only once*/
-               if(count == 1){
-                       control = TRANS_PREFETCH_RESPONSE;
-                       if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-                               perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
-                               return 1;
-                       }
-               }
+int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+       int numbytes = 0;
 
-               //Send buffer size 
-               if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) {
-                       perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
-                       return 1;
-               }
+       if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+               printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
+               free(sendbuffer);
+               return -1;
+       }
 
-               /* Send the entire buffer with its size and oids found and not found */
-               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) {
-                       perror("Error: sending oids found\n");
-                       return 1;
-               }
+       /* Send the buffer with its size */
+       if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) {
+               printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n",
+                               __func__, __FILE__, __LINE__, numbytes, *(size));
+               free(sendbuffer);
+               return -1;
        }
+
+       free(sendbuffer);
        return 0;
 }
 
@@ -796,7 +871,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
        int sd;
        struct sockaddr_in remoteAddr;
        int bytesSent;
-       int status, size;
+       int size;
 
        int i = 0;
        while(i < numoid) {
@@ -832,7 +907,8 @@ checkversion:
                                        if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
                                                printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
                                                                inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                                               status = -1;
+                                               close(sd);
+                                               return;
                                        } else {
                                                //Send Update notification
                                                msg[0] = THREAD_NOTIFY_RESPONSE;
@@ -844,13 +920,16 @@ checkversion:
                                                bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
                                                if (bytesSent < 0){
                                                        perror("processReqNotify():send()");
-                                                       status = -1;
+                                                       close(sd);
+                                                       return;
                                                } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
                                                        printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", 
                                                                        bytesSent, __FILE__, __LINE__);
-                                                       status = -1;
+                                                       close(sd);
+                                                       return;
                                                } else {
-                                                       status = 0;
+                                                       close(sd);
+                                                       return;
                                                }
 
                                        }
index eb3b76471f38452b0833d26a3222d5c36d6defe4..eec4a90bedb3cad37d56cb2670f8569688d65a41 100644 (file)
@@ -23,7 +23,6 @@
 #endif
 
 #define LISTEN_PORT 2156
-#define RECEIVE_BUFFER_SIZE 2048
 #define NUM_THREADS 10
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
 #define CONFIG_FILENAME "dstm.conf"
@@ -90,7 +89,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
        int qnodesize;
        int len = 0;
        int i, rc;
-       
+
        /* Allocate for the queue node*/
        char *node;
        if(ntuples > 0) {
@@ -259,6 +258,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
        struct timeval tp;
 
        if(oid == 0) {
+               printf("Error: %s, %d oid is NULL \n", __FILE__, __LINE__);
                return NULL;
        }
         
@@ -277,6 +277,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 
        /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+
 #ifdef COMPILER
          return &objheader[1];
 #else
@@ -309,10 +310,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 #endif
        } else {
                /*If object not found in prefetch cache then block until object appears in the prefetch cache */
+               /*
                pthread_mutex_lock(&pflookup.lock);
                while(!found) {
                        rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
-                       /* Check Prefetch cache again */
+                       // Check Prefetch cache again 
                        if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
                                found = 1;
                                GETSIZE(size,tmp);
@@ -331,6 +333,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                                break;
                        }
                }
+               */
 
                /* Get the object from the remote location */
                if((machinenumber = lhashSearch(oid)) == 0) {
@@ -343,6 +346,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                        printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
                        return NULL;
                } else {
+
 #ifdef COMPILER
                        return &objcopy[1];
 #else
@@ -429,172 +433,186 @@ int transCommit(transrecord_t *record) {
        plistnode_t *pile, *pile_ptr;
        int i, j, rc, val;
        int pilecount, offset, threadnum = 0, trecvcount = 0;
-       char buffer[RECEIVE_BUFFER_SIZE],control;
+       char control;
        char transid[TID_LEN];
        trans_req_data_t *tosend;
        trans_commit_data_t transinfo;
        static int newtid = 0;
        char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
        char localstat = 0;
+       thread_data_array_t *thread_data_array;
+       local_thread_data_array_t *ltdata;
 
+       do { 
+               trecvcount = 0; 
+               threadnum = 0; 
+               treplyretry = 0;
+               thread_data_array = NULL;
+               ltdata = NULL;
 
+               /* Look through all the objects in the transaction record and make piles 
+                * for each machine involved in the transaction*/
+               pile_ptr = pile = createPiles(record);
 
-       /* Look through all the objects in the transaction record and make piles 
-        * for each machine involved in the transaction*/
-       pile_ptr = pile = createPiles(record);
-
-       /* Create the packet to be sent in TRANS_REQUEST */
-
-       /* Count the number of participants */
-       pilecount = pCount(pile);
-
-       /* Create a list of machine ids(Participants) involved in transaction   */
-       if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
-               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-               return 1;
-       }               
-       pListMid(pile, listmid);
-
+               /* Create the packet to be sent in TRANS_REQUEST */
 
-       /* Initialize thread variables,
-        * Spawn a thread for each Participant involved in a transaction */
-       pthread_t thread[pilecount];
-       pthread_attr_t attr;                    
-       pthread_cond_t tcond;
-       pthread_mutex_t tlock;
-       pthread_mutex_t tlshrd;
+               /* Count the number of participants */
+               pilecount = pCount(pile);
 
-       thread_data_array_t *thread_data_array;
-       if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
-               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-               pthread_cond_destroy(&tcond);
-               pthread_mutex_destroy(&tlock);
-               pDelete(pile_ptr);
-               free(listmid);
-               return 1;
-       }
+               /* Create a list of machine ids(Participants) involved in transaction   */
+               if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }               
+               pListMid(pile, listmid);
 
-       local_thread_data_array_t *ltdata;
-       if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
-               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-               pthread_cond_destroy(&tcond);
-               pthread_mutex_destroy(&tlock);
-               pDelete(pile_ptr);
-               free(listmid);
-               free(thread_data_array);
-               return 1;
-       }
 
-       thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
+               /* Initialize thread variables,
+                * Spawn a thread for each Participant involved in a transaction */
+               pthread_t thread[pilecount];
+               pthread_attr_t attr;                    
+               pthread_cond_t tcond;
+               pthread_mutex_t tlock;
+               pthread_mutex_t tlshrd;
 
-       /* Initialize and set thread detach attribute */
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-       pthread_mutex_init(&tlock, NULL);
-       pthread_cond_init(&tcond, NULL);
+               if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
+                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_cond_destroy(&tcond);
+                       pthread_mutex_destroy(&tlock);
+                       pDelete(pile_ptr);
+                       free(listmid);
+                       return 1;
+               }
 
-       /* Process each machine pile */
-       while(pile != NULL) {
-               //Create transaction id
-               newtid++;
-               if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+               if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                        pthread_cond_destroy(&tcond);
                        pthread_mutex_destroy(&tlock);
                        pDelete(pile_ptr);
                        free(listmid);
                        free(thread_data_array);
-                       free(ltdata);
                        return 1;
                }
-               tosend->f.control = TRANS_REQUEST;
-               sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
-               tosend->f.mcount = pilecount;
-               tosend->f.numread = pile->numread;
-               tosend->f.nummod = pile->nummod;
-               tosend->f.numcreated = pile->numcreated;
-               tosend->f.sum_bytes = pile->sum_bytes;
-               tosend->listmid = listmid;
-               tosend->objread = pile->objread;
-               tosend->oidmod = pile->oidmod;
-               tosend->oidcreated = pile->oidcreated;
-               thread_data_array[threadnum].thread_id = threadnum;
-               thread_data_array[threadnum].mid = pile->mid;
-               thread_data_array[threadnum].buffer = tosend;
-               thread_data_array[threadnum].recvmsg = rcvd_control_msg;
-               thread_data_array[threadnum].threshold = &tcond;
-               thread_data_array[threadnum].lock = &tlock;
-               thread_data_array[threadnum].count = &trecvcount;
-               thread_data_array[threadnum].replyctrl = &treplyctrl;
-               thread_data_array[threadnum].replyretry = &treplyretry;
-               thread_data_array[threadnum].rec = record;
-               /* If local do not create any extra connection */
-               if(pile->mid != myIpAddr) { /* Not local */
-                       do {
-                               rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
-                       } while(rc!=0);
-                       if(rc) {
-                               perror("Error in pthread create\n");
+
+               thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
+
+               /* Initialize and set thread detach attribute */
+               pthread_attr_init(&attr);
+               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+               pthread_mutex_init(&tlock, NULL);
+               pthread_cond_init(&tcond, NULL);
+
+               /* Process each machine pile */
+               while(pile != NULL) {
+                       //Create transaction id
+                       newtid++;
+                       if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                                pthread_cond_destroy(&tcond);
                                pthread_mutex_destroy(&tlock);
                                pDelete(pile_ptr);
                                free(listmid);
-                               for (i = 0; i < threadnum; i++)
-                                       free(thread_data_array[i].buffer);
                                free(thread_data_array);
                                free(ltdata);
                                return 1;
                        }
-               } else { /*Local*/
-                       ltdata->tdata = &thread_data_array[threadnum];
-                       ltdata->transinfo = &transinfo;
-                       do {
-                               val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
-                       } while(val!=0);
-                       if(val) {
-                               perror("Error in pthread create\n");
+                       tosend->f.control = TRANS_REQUEST;
+                       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
+                       tosend->f.mcount = pilecount;
+                       tosend->f.numread = pile->numread;
+                       tosend->f.nummod = pile->nummod;
+                       tosend->f.numcreated = pile->numcreated;
+                       tosend->f.sum_bytes = pile->sum_bytes;
+                       tosend->listmid = listmid;
+                       tosend->objread = pile->objread;
+                       tosend->oidmod = pile->oidmod;
+                       tosend->oidcreated = pile->oidcreated;
+                       thread_data_array[threadnum].thread_id = threadnum;
+                       thread_data_array[threadnum].mid = pile->mid;
+                       thread_data_array[threadnum].buffer = tosend;
+                       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+                       thread_data_array[threadnum].threshold = &tcond;
+                       thread_data_array[threadnum].lock = &tlock;
+                       thread_data_array[threadnum].count = &trecvcount;
+                       thread_data_array[threadnum].replyctrl = &treplyctrl;
+                       thread_data_array[threadnum].replyretry = &treplyretry;
+                       thread_data_array[threadnum].rec = record;
+                       /* If local do not create any extra connection */
+                       if(pile->mid != myIpAddr) { /* Not local */
+                               do {
+                                       rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
+                               } while(rc!=0);
+                               if(rc) {
+                                       perror("Error in pthread create\n");
+                                       pthread_cond_destroy(&tcond);
+                                       pthread_mutex_destroy(&tlock);
+                                       pDelete(pile_ptr);
+                                       free(listmid);
+                                       for (i = 0; i < threadnum; i++)
+                                               free(thread_data_array[i].buffer);
+                                       free(thread_data_array);
+                                       free(ltdata);
+                                       return 1;
+                               }
+                       } else { /*Local*/
+                               ltdata->tdata = &thread_data_array[threadnum];
+                               ltdata->transinfo = &transinfo;
+                               do {
+                                       val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
+                               } while(val!=0);
+                               if(val) {
+                                       perror("Error in pthread create\n");
+                                       pthread_cond_destroy(&tcond);
+                                       pthread_mutex_destroy(&tlock);
+                                       pDelete(pile_ptr);
+                                       free(listmid);
+                                       for (i = 0; i < threadnum; i++)
+                                               free(thread_data_array[i].buffer);
+                                       free(thread_data_array);
+                                       free(ltdata);
+                                       return 1;
+                               }
+                       }
+
+                       threadnum++;            
+                       pile = pile->next;
+               }
+               /* Free attribute and wait for the other threads */
+               pthread_attr_destroy(&attr);
+
+               for (i = 0; i < threadnum; i++) {
+                       rc = pthread_join(thread[i], NULL);
+                       if(rc)
+                       {
+                               printf("Error: return code from pthread_join() is %d\n", rc);
                                pthread_cond_destroy(&tcond);
                                pthread_mutex_destroy(&tlock);
                                pDelete(pile_ptr);
                                free(listmid);
-                               for (i = 0; i < threadnum; i++)
-                                       free(thread_data_array[i].buffer);
-                               free(thread_data_array);
-                               free(ltdata);
+                               for (j = i; j < threadnum; j++) {
+                                       free(thread_data_array[j].buffer);
+                               }
                                return 1;
                        }
+                       free(thread_data_array[i].buffer);
                }
 
-               threadnum++;            
-               pile = pile->next;
-       }
-
-       /* Free attribute and wait for the other threads */
-       pthread_attr_destroy(&attr);
+               /* Free resources */    
+               pthread_cond_destroy(&tcond);
+               pthread_mutex_destroy(&tlock);
+               free(listmid);
+               pDelete(pile_ptr);
 
-       for (i = 0; i < threadnum; i++) {
-               rc = pthread_join(thread[i], NULL);
-               if(rc)
-               {
-                       printf("Error: return code from pthread_join() is %d\n", rc);
-                       pthread_cond_destroy(&tcond);
-                       pthread_mutex_destroy(&tlock);
-                       pDelete(pile_ptr);
-                       free(listmid);
-                       for (j = i; j < threadnum; j++) {
-                               free(thread_data_array[j].buffer);
-                       }
-                       return 1;
+               /* wait a random amount of time before retrying to commit transaction*/
+               if(treplyretry == 1) {
+                       free(thread_data_array);
+                       free(ltdata);
+                       randomdelay();
                }
-               free(thread_data_array[i].buffer);
-       }
 
-       /* Free resources */    
-       pthread_cond_destroy(&tcond);
-       pthread_mutex_destroy(&tlock);
-       free(listmid);
-       pDelete(pile_ptr);
-       
+       /* Retry trans commit procedure during soft_abort case */
+       } while (treplyretry == 1);
+
 
        if(treplyctrl == TRANS_ABORT) {
                /* Free Resources */
@@ -614,7 +632,7 @@ int transCommit(transrecord_t *record) {
                return 0;
        } else {
                //TODO Add other cases
-               printf("DEBUG-> THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n");
+               printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
                exit(-1);
        }
 
@@ -630,7 +648,7 @@ void *transRequest(void *threadarg) {
        struct sockaddr_in serv_addr;
        thread_data_array_t *tdata;
        objheader_t *headeraddr;
-       char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+       char control, recvcontrol;
        char machineip[16], retval;
 
        tdata = (thread_data_array_t *) threadarg;
@@ -894,10 +912,16 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                                return NULL;
                        }
                        objcopy = objstrAlloc(record->cache, size);
+                       int sum = 0;
+                       while (sum < size) {
+                               sum += read(sd, (char *)objcopy+sum, size-sum);
+                       }
+                       /*
                        if((val = read(sd, (char *)objcopy, size)) <= 0) {
                                perror("No objects are read from the remote participant\n");
                                return NULL;
                        }
+                       */
                        /* Insert into cache's lookup table */
                        chashInsert(record->lookupTable, oid, objcopy); 
                        break;
@@ -1246,8 +1270,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val;
        unsigned int *oid;
-       unsigned int  objoid;
-       int isArray = 0;
+       int isArray;
        char *ptr, *tmp;
        objheader_t *objheader;
        short *endoffsets, *arryfields; 
@@ -1265,6 +1288,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        for(i = 1; i<ntuples; i++) {
                numoffset[i] = endoffsets[i] - endoffsets[i-1];
        }
+
        for(i = 0; i < ntuples; i++) { 
                if(oid[i] == 0){
                        if(i == 0) {
@@ -1286,8 +1310,10 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
                        }
                        continue;
                }
+
                /* If object found locally */
                if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
+                       isArray = 0;
                        tmp = (char *) objheader;
                        int orgnumoffset = numoffset[i];
                        if(i == 0) {
@@ -1297,6 +1323,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
                        }
 
                        for(j = 0; j<orgnumoffset; j++) {
+                               unsigned int objoid = 0;
                                /* Check for arrays  */
                                if(TYPE(objheader) > NUMCLASSES) {
                                        isArray = 1;
@@ -1325,7 +1352,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
                                        flag = 1;
                                        checkPreCache(node, numoffset, oid[i], i); 
                                        break;
-                               }  
+                               }
                                tmp = (char *) objheader;
                                isArray = 0;
                        }
@@ -1545,13 +1572,15 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) {
        while(tmp != NULL) {
                off = 0;
                count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
-               len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short));
+               len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short));
                char oidnoffset[len];
                bzero(oidnoffset, len);
                *((unsigned int*)oidnoffset) = len;
                off = sizeof(int);
                *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid;
                off += sizeof(unsigned int);
+               *((unsigned int *)((char *)oidnoffset + off)) = myIpAddr; //Recently added as of 03/03/2008 at 6:00pm
+               off += sizeof(unsigned int);
                for(i = 0; i < tmp->numoffset; i++) {
                        *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i];
                        off+=sizeof(unsigned short);
@@ -1574,105 +1603,78 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) {
                return;
        }
 
-       /* Get Response from the remote machine */
-       getPrefetchResponse(count,sd);
        close(sd);
        return;
 }
 
-void getPrefetchResponse(int count, int sd) {
-       int i = 0, val, n, N, sum, index, objsize;
-       unsigned int bufsize,oid;
-       char *buffer;
-       char control;
-       char *ptr;
+int getPrefetchResponse(int sd) {
+       int numbytes = 0, length = 0, size = 0;
+       char *recvbuffer, control;
+       unsigned int oid;
        void *modptr, *oldptr;
 
-       /* Read  prefetch response from the Remote machine */
-       if((val = read(sd, &control, sizeof(char))) <= 0) {
-               perror("No control response for Prefetch request sent\n");
-               return;
-       }
+       if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) {
+               printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__);
+               return -1;
+       } else {
+               numbytes = 0;
+               size = length - sizeof(int);
+               if((recvbuffer = calloc(1, size)) == NULL) {
+                       printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+                       return -1;
+               }
+               while(numbytes < size) {
+                       numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0);
+               }
 
-       if(control == TRANS_PREFETCH_RESPONSE) {
-               /*For each oid and offset tuple sent as prefetch request to remote machine*/
-               while(N = recv((int)sd, &bufsize, sizeof(unsigned int), 0) != 0) {
-                       if((buffer = calloc(1, bufsize)) == NULL) {
-                               printf("Calloc Error in %s() at %s, %d\n", __func__, __FILE__, __LINE__);
-                               return;
+               control = *((char *) recvbuffer);
+               if(control == OBJECT_FOUND) {
+                       numbytes = 0;
+                       oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+                       size = size - (sizeof(char) + sizeof(unsigned int));
+                       pthread_mutex_lock(&prefetchcache_mutex);
+                       if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+                               printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+                               pthread_mutex_unlock(&prefetchcache_mutex);
+                               free(recvbuffer);
+                               return -1;
                        }
-                       sum = 0;
-                       index = 0;
-                       ptr = buffer;
-                       /* Keep receiving the buffer containing oid info */ 
-                       do {
-                               n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
-                               sum +=n;
-                       } while(sum < bufsize && n != 0);
-
-                       /* Decode the contents of the buffer */
-                       while(index < bufsize ) {
-                               if(buffer[index] == OBJECT_FOUND) {
-                                       /* Increment it to get the object */
-                                       index += sizeof(char);
-                                       oid = *((unsigned int *)(buffer+index));
-                                       index += sizeof(unsigned int);
-                                       /* For each object found add to Prefetch Cache */
-                                       objsize = *((int *)(buffer+index));
-                                       index += sizeof(int);
-                                       pthread_mutex_lock(&prefetchcache_mutex);
-                                       if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
-                                               printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
-                                               pthread_mutex_unlock(&prefetchcache_mutex);
-                                               free(buffer);
-                                               return;
-                                       }
-                                       pthread_mutex_unlock(&prefetchcache_mutex);
-                                       memcpy(modptr, buffer+index, objsize);
-                                       index += objsize;
-                                       /* Insert the oid and its address into the prefetch hash lookup table */
-                                       /* Do a version comparison if the oid exists */
-                                       if((oldptr = prehashSearch(oid)) != NULL) {
-                                               /* If older version then update with new object ptr */
-                                               if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
-                                                       prehashRemove(oid);
-                                                       prehashInsert(oid, modptr);
-                                               } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
-                                                       /* Add the new object ptr to hash table */
-                                                       prehashRemove(oid);
-                                                       prehashInsert(oid, modptr);
-                                               } else { /* Do nothing: TODO modptr should be reference counted */
-                                                       ;
-                                               }
-                                       } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
-                                               prehashInsert(oid, modptr);
-                                       }
-                                       /* Lock the Prefetch Cache look up table*/
-                                       pthread_mutex_lock(&pflookup.lock);
-                                       /* Broadcast signal on prefetch cache condition variable */ 
-                                       pthread_cond_broadcast(&pflookup.cond);
-                                       /* Unlock the Prefetch Cache look up table*/
-                                       pthread_mutex_unlock(&pflookup.lock);
-                               } else if(buffer[index] == OBJECT_NOT_FOUND) {
-                                       /* Increment it to get the object */
-                                       /* TODO: For each object not found query DHT for new location and retrieve the object */
-                                       index += sizeof(char);
-                                       oid = *((unsigned int *)(buffer + index));
-                                       index += sizeof(unsigned int);
-                                       /* Throw an error */
-                                       printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
-                                       exit(-1);
+                       pthread_mutex_unlock(&prefetchcache_mutex);
+                       memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+
+                       /* Insert the oid and its address into the prefetch hash lookup table */
+                       /* Do a version comparison if the oid exists */
+                       if((oldptr = prehashSearch(oid)) != NULL) {
+                               /* If older version then update with new object ptr */
+                               if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+                                       prehashRemove(oid);
+                                       prehashInsert(oid, modptr);
                                } else {
-                                       printf("Error in decoding the index value %d, %s, %d\n",index, __FILE__, __LINE__);
-                                       free(buffer);
-                                       return;
+                                       /* TODO modptr should be reference counted */
                                }
+                       } else {/* Else add the object ptr to hash table*/
+                               prehashInsert(oid, modptr);
                        }
-                       free(buffer);
+                       /* Lock the Prefetch Cache look up table*/
+                       pthread_mutex_lock(&pflookup.lock);
+                       /* Broadcast signal on prefetch cache condition variable */ 
+                       pthread_cond_broadcast(&pflookup.cond);
+                       /* Unlock the Prefetch Cache look up table*/
+                       pthread_mutex_unlock(&pflookup.lock);
+               } else if(control == OBJECT_NOT_FOUND) {
+                       oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+                       /* TODO: For each object not found query DHT for new location and retrieve the object */
+                       /* Throw an error */
+                       printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
+                       free(recvbuffer);
+                       exit(-1);
+               } else {
+                       printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
                }
-       } else
-               printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
-       return;
+               free(recvbuffer);
+       }
+
+       return 0;
 }
 
 unsigned short getObjType(unsigned int oid)