fix memory corruption errors and replace mallocs with calloc.
authoradash <adash>
Sat, 26 Jan 2008 01:48:15 +0000 (01:48 +0000)
committeradash <adash>
Sat, 26 Jan 2008 01:48:15 +0000 (01:48 +0000)
Current fix works fine for testcase Atomic2.java  and Atomic3.java
other minor fixes
Remove prefetchpile.c file (was not used)

Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/machinepile.c
Robust/src/Runtime/DSTM/interface/machinepile.h
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/objstr.c
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c

index 5928799c780694b28b17ce4ab84ff96090424741..fd328e459213b877dbb6875751ae8bfbbffce156 100644 (file)
@@ -140,13 +140,14 @@ void *dstmAccept(void *acceptfd)
        
        switch(control) {
                case READ_REQUEST:
+                       printf("DEBUG -> Recv READ_REQUEST\n");
                        /* Read oid requested and search if available */
                        if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
                                perror("Error receiving object from cooridnator\n");
                                pthread_exit(NULL);
                        }
                        if((srcObj = mhashSearch(oid)) == NULL) {
-                               printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
+                               printf("Object 0x%x is not found in Main Object Store %s %d\n", oid, __FILE__, __LINE__);
                                pthread_exit(NULL);
                        }
                        h = (objheader_t *) srcObj;
@@ -217,36 +218,37 @@ void *dstmAccept(void *acceptfd)
 
                case THREAD_NOTIFY_REQUEST:
                        size = sizeof(unsigned int);
-                       retval = recv((int)acceptfd, ptr, size, 0);
-                       numoid = *((unsigned int *) ptr);
+                       retval = recv((int)acceptfd, &numoid, size, 0);
                        size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
-                       retval = recv((int)acceptfd, ptr, size, 0);
+                       bzero(&buffer, RECEIVE_BUFFER_SIZE);
+                       retval = recv((int)acceptfd, &buffer, size, 0);
                        oidarry = calloc(numoid, sizeof(unsigned int)); 
-                       memcpy(oidarry, ptr, sizeof(unsigned int) * numoid);
+                       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
                        size = sizeof(unsigned int) * numoid;
                        versionarry = calloc(numoid, sizeof(unsigned short));
-                       memcpy(versionarry, ptr+size, sizeof(unsigned short) * numoid);
+                       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
                        size += sizeof(unsigned short) * numoid;
-                       mid = *((unsigned int *)(ptr+size));
+                       mid = *((unsigned int *)(buffer+size));
                        size += sizeof(unsigned int);
-                       threadid = *((unsigned int *)(ptr+size));
+                       threadid = *((unsigned int *)(buffer+size));
                        processReqNotify(numoid, oidarry, versionarry, mid, threadid);
 
                        break;
 
                case THREAD_NOTIFY_RESPONSE:
                        size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
-                       retval = recv((int)acceptfd, ptr, size, 0);
+                       bzero(&buffer, RECEIVE_BUFFER_SIZE);
+                       retval = recv((int)acceptfd, &buffer, size, 0);
                        if(retval <= 0) 
                                perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg");
                        else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
                                printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval);
                        else {
-                               oid = *((unsigned int *)ptr);
+                               oid = *((unsigned int *)buffer);
                                size = sizeof(unsigned int);
-                               version = *((unsigned short *)(ptr+size));
+                               version = *((unsigned short *)(buffer+size));
                                size += sizeof(unsigned short);
-                               threadid = *((unsigned int *)(ptr+size));
+                               threadid = *((unsigned int *)(buffer+size));
                                threadNotify(oid,version,threadid);
                        }
 
@@ -640,119 +642,123 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
  * then use offset values to prefetch references to other objects */
 
 int prefetchReq(int acceptfd) {
-  int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
-  int isArray = 0;
-  unsigned int oid, index = 0;
-  char *ptr, buffer[PRE_BUF_SIZE];
-  void *mobj;
-  unsigned int objoid;
-  char control;
-  objheader_t * header;
-  int bytesRecvd;
-  
-  /* Repeatedly recv the oid and offset pairs sent for prefetch */
-  while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
-    count++;
-    if(length == -1)
-      break;
-    sum = 0;
-    index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
-    // first 4 bytes are saved to send the
-    // size of the buffer (that is computed at the end of the loop)
-    bytesRecvd = 0;
-    do {
-      bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
-                        sizeof(unsigned int) - bytesRecvd, 0);
-    } while (bytesRecvd < sizeof(unsigned int));
-    numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
-    N = numoffset * sizeof(short);
-    short offset[numoffset];
-    ptr = (char *)&offset;
-    /* Recv the offset values per oid */ 
-    do {
-      n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
-      sum += n; 
-    } while(sum < N && n != 0);        
-    
-    /* Process each oid */
-    if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
-      /* Save the oids not found in buffer for later use */
-      *(buffer + index) = OBJECT_NOT_FOUND;
-      index += sizeof(char);
-      memcpy(buffer+index, &oid, sizeof(unsigned int));
-      index += sizeof(unsigned int);
-    } else { /* If Obj found in machine (i.e. has not moved) */
-      /* send the oid, it's size, it's header and data */
-      header = mobj;
-      GETSIZE(size, header);
-      size += sizeof(objheader_t);
-      *(buffer + index) = OBJECT_FOUND;
-      index += sizeof(char);
-      memcpy(buffer+index, &oid, sizeof(unsigned int));
-      index += sizeof(unsigned int);
-      memcpy(buffer+index, &size, sizeof(int));
-      index += sizeof(int);
-      memcpy(buffer + index, header, size);
-      index += size;
-      /* Calculate the oid corresponding to the offset value */
-      for(i = 0 ; i< numoffset ; i++) {
-             /* Check for arrays  */
-             if(TYPE(header) > NUMCLASSES) {
-                     isArray = 1;
-             }
-             if(isArray == 1) {
-                     int elementsize = classsize[TYPE(header)];
-                     objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
-             } else {
-                     objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
-             }
-             if((header = mhashSearch(objoid)) == NULL) {
-                     /* Obj not found, send oid */
-                     *(buffer + index) = OBJECT_NOT_FOUND;
-                     index += sizeof(char);
-                     memcpy(buffer+index, &oid, sizeof(unsigned int));
-                     index += sizeof(unsigned int);
-                     break;
-             } else {/* Obj Found */
-                     /* send the oid, it's size, it's header and data */
-                     GETSIZE(size, header);
-                     size+=sizeof(objheader_t);
-                     *(buffer + index) = OBJECT_FOUND;
-                     index += sizeof(char);
-                     memcpy(buffer+index, &oid, sizeof(unsigned int));
-                     index += sizeof(unsigned int);
-                     memcpy(buffer+index, &size, sizeof(int));
-                     index += sizeof(int);
-                     memcpy(buffer+index, header, size);
-                     index += size;
-                     isArray = 0;
-                     continue;
-             }
-      }
-    }
-    /* Check for overflow in the buffer */
-    if (index >= PRE_BUF_SIZE) {
-      printf("Char buffer is overflowing\n");
-      return 1;
-    }
-    /* Send Prefetch response control message only once*/
-    if(count == 1) {
-      control = TRANS_PREFETCH_RESPONSE;
-      if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-       perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
-       return 1;
-      }
-    }
-    
-    /* Add the buffer size into buffer as a parameter */
-    *((unsigned int *)buffer)=index;
-    /* Send the entire buffer with its size and oids found and not found */
-    if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
-      perror("Error sending oids found\n");
-      return 1;
-    }
-  }
-  return 0;
+       int i, length, sum, n, numbytes, numoffset, N, size, count = 0;
+       int isArray = 0, bytesRecvd;
+       unsigned int oid, index = 0;
+       unsigned int objoid, myIpAddr;
+       char *ptr, control, buffer[PRE_BUF_SIZE];
+       void *mobj;
+       objheader_t * header;
+
+#ifdef MAC
+       myIpAddr = getMyIpAddr("en1");
+#else
+       myIpAddr = getMyIpAddr("eth0");
+#endif
+
+       /* Repeatedly recv the oid and offset pairs sent for prefetch */
+       while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+               count++;
+               if(length == -1)
+                       break;
+               index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
+               // first 4 bytes are saved to send the
+               // size of the buffer (that is computed at the end of the loop)
+               bytesRecvd = 0;
+               do {
+                       bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
+                                       sizeof(unsigned int) - bytesRecvd, 0);
+               } while (bytesRecvd < sizeof(unsigned int));
+               numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
+               N = numoffset * sizeof(short);
+               short offset[numoffset];
+               ptr = (char *)&offset;
+               sum = 0;
+               /* Recv the offset values per oid */ 
+               do {
+                       n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
+                       sum += n; 
+               } while(sum < N && n != 0);     
+
+               /* Process each oid */
+               if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+                       /* Save the oids not found in buffer for later use */
+                       *(buffer + index) = OBJECT_NOT_FOUND;
+                       index += sizeof(char);
+                       *((unsigned int *)(buffer+index)) = oid;
+                       index += sizeof(unsigned int);
+               } else { /* If Obj found in machine (i.e. has not moved) */
+                       /* send the oid, it's size, it's header and data */
+                       header = (objheader_t *)mobj;
+                       GETSIZE(size, header);
+                       size += sizeof(objheader_t);
+                       *(buffer + index) = OBJECT_FOUND;
+                       index += sizeof(char);
+                       *((unsigned int *)(buffer+index)) = oid;
+                       index += sizeof(unsigned int);
+                       *((int *)(buffer+index)) = size;
+                       index += sizeof(int);
+                       memcpy(buffer + index, header, size);
+                       index += size;
+                       /* Calculate the oid corresponding to the offset value */
+                       for(i = 0 ; i< numoffset ; i++) {
+                               /* Check for arrays  */
+                               if(TYPE(header) > NUMCLASSES) {
+                                       isArray = 1;
+                               }
+                               if(isArray == 1) {
+                                       int elementsize = classsize[TYPE(header)];
+                                       objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
+                               } else {
+                                       objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
+                               }
+                               if((header = mhashSearch(objoid)) == NULL) {
+                                       /* Obj not found, send oid */
+                                       *(buffer + index) = OBJECT_NOT_FOUND;
+                                       index += sizeof(char);
+                                       *((unsigned int *)(buffer+index)) = objoid;
+                                       index += sizeof(unsigned int);
+                                       break;
+                               } else {/* Obj Found */
+                                       /* send the oid, it's size, it's header and data */
+                                       GETSIZE(size, header);
+                                       size+=sizeof(objheader_t);
+                                       *(buffer+index) = OBJECT_FOUND;
+                                       index += sizeof(char);
+                                       *((unsigned int *)(buffer+index)) = objoid;
+                                       index += sizeof(unsigned int);
+                                       *((int *)(buffer+index)) = size;
+                                       index += sizeof(int);
+                                       memcpy(buffer+index, header, size);
+                                       index += size;
+                                       isArray = 0;
+                                       continue;
+                               }
+                       }
+               }
+               /* Check for overflow in the buffer */
+               if (index >= PRE_BUF_SIZE) {
+                       printf("Char buffer is overflowing\n");
+                       return 1;
+               }
+               /* Send Prefetch response control message only once*/
+               if(count == 1){
+                       control = TRANS_PREFETCH_RESPONSE;
+                       if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+                               perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+                               return 1;
+                       }
+               }
+
+               /* Add the buffer size into buffer as a parameter */
+               *((unsigned int *)buffer)=index;
+               /* Send the entire buffer with its size and oids found and not found */
+               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
+                       perror("Error sending oids found\n");
+                       return 1;
+               }
+       }
+       return 0;
 }
 
 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
@@ -818,7 +824,7 @@ checkversion:
                                STATUS(header) &= ~(LOCK);              
                        } else {
                                randomdelay();
-                               printf("DEBUG-> processReqNotify() Object is still locked\n");
+                               printf("processReqNotify() Object is still locked\n");
                                goto checkversion;
                        }
                }
index ec5e9ba9cbff19c1bebd2d0ad6b92b8a808ab274..c27ea01be31d6d977fdd44da5fb8f0b091d9af14 100644 (file)
@@ -1,22 +1,30 @@
 #include "machinepile.h"
 
-int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) {
-       prefetchpile_t *tmp = *head;
+prefetchpile_t *insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) {
+       prefetchpile_t *tmp = head;
+       prefetchpile_t *ptr;
        objpile_t *objnode;
        unsigned int *oidarray;
-       int ntuples;
+       short *offvalues;
+       int i;
        char found = 0;
 
        while (tmp != NULL) {
                if (tmp->mid == mid) { // Found a match with exsisting machine id
                        if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
                                printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return -1;
+                               return NULL;
+                       }
+                       if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return NULL;
                        }
                        /* Fill objpiles DS */
                        objnode->oid = oid;
                        objnode->numoffset = numoffset;
-                       objnode->offset = offset;
+                       for(i = 0; i<numoffset; i++)
+                               offvalues[i] = offset[i];
+                       objnode->offset = offvalues;
                        objnode->next = tmp->objpiles;
                        tmp->objpiles = objnode;
                        found = 1;
@@ -24,26 +32,54 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet
                }
                tmp = tmp->next;
        }
-       if (!found) {// Not found => insert new mid DS
-               if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
-                       printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                       return -1;
-               }
-               tmp->mid = mid;
-               if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
-                       printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                       return -1;
+
+       tmp = head;
+       if(found != 1) {
+                if(tmp->mid == 0) {//First time
+                       tmp->mid = mid;
+                       if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+                       if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+                       // Fill objpiles DS
+                       objnode->oid = oid;
+                       objnode->numoffset = numoffset;
+                       for(i = 0; i<numoffset; i++)
+                               offvalues[i] = *((short *)offset + i); 
+                       objnode->offset = offvalues;
+                       objnode->next = NULL;
+                       tmp->objpiles = objnode;
+                       tmp->next = NULL;
+               } else {
+                       if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+                       tmp->mid = mid;
+                       if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+                       if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+                       // Fill objpiles DS
+                       objnode->oid = oid;
+                       objnode->numoffset = numoffset;
+                       for(i = 0; i<numoffset; i++)
+                               offvalues[i] = *((short *)offset + i); 
+                       objnode->offset = offvalues;
+                       objnode->next = NULL;
+                       tmp->objpiles = objnode;
+                       tmp->next = head;
+                       head = tmp;
                }
-               /* Fill objpiles DS */
-               objnode->oid = oid;
-               objnode->numoffset = numoffset;
-               objnode->offset = offset;
-               objnode->next = tmp->objpiles; // i.e., objnode->next = NULL;
-               tmp->objpiles = objnode;
-               tmp->next = *head;
-               *head = tmp;
        }
-       return 0;
+       
+       return head;
 }
-
-
index 7d98b2144233d640faf10dd2e11bec3145f0ff32..8add41b7c77644c59db4c21fd4d479361c90cb29 100644 (file)
@@ -5,6 +5,6 @@
 #include <stdio.h>
 #include <stdlib.h>
 
-int insertPile(int, unsigned int, short, short *, prefetchpile_t **);
+prefetchpile_t *insertPile(int, unsigned int, short, short *, prefetchpile_t *);
 
 #endif
index d6da34f318683f5487512b62bdc243b9d73fdd6a..fea54094aa4710e16595ed93459c3f804d9b81bf 100644 (file)
@@ -1,6 +1,6 @@
 #include "mcpileq.h"
 
-mcpileq_t mcqueue;
+mcpileq_t mcqueue; //Global queue
 
 void mcpileqInit(void) {
        /* Initialize machine queue that containing prefetch oids and offset values  sorted by remote machineid */  
@@ -14,17 +14,13 @@ void mcpileqInit(void) {
 }
 
 /* Insert to the rear of machine pile queue */
-void mcpileenqueue(prefetchpile_t *node) {
-       prefetchpile_t *tmp, *prev;
+void mcpileenqueue(prefetchpile_t *node, prefetchpile_t *tail) {
        if(mcqueue.front == NULL && mcqueue.rear == NULL) {
-               mcqueue.front = mcqueue.rear = node;
+               mcqueue.front = node;
+               mcqueue.rear = tail;
        } else {
-               tmp = mcqueue.rear->next = node;
-               while(tmp != NULL) {
-                       prev = tmp;
-                       tmp = tmp->next;
-               }
-               mcqueue.rear = prev;
+               mcqueue.rear->next = node;
+               mcqueue.rear = tail;
        }
 }
 
@@ -32,7 +28,7 @@ void mcpileenqueue(prefetchpile_t *node) {
 prefetchpile_t *mcpiledequeue(void) {
        prefetchpile_t *retnode;
        if(mcqueue.front == NULL) {
-               printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__);
+               printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__);
                return NULL;
        }
        retnode = mcqueue.front;
@@ -73,19 +69,16 @@ void mcdealloc(prefetchpile_t *node) {
 
        while (prefetchpile_ptr != NULL)
        {
-               objpile_ptr = prefetchpile_ptr->objpiles;
-               while (objpile_ptr != NULL)
-               {
-                       if (objpile_ptr->numoffset > 0)
-                               free(objpile_ptr->offset);
-                       objpile_next_ptr = objpile_ptr->next;
+               prefetchpile_next_ptr = prefetchpile_ptr;
+               while(prefetchpile_ptr->objpiles != NULL) {
+                       if(prefetchpile_ptr->objpiles->numoffset > 0) {
+                               free(prefetchpile_ptr->objpiles->offset);
+                       }
+                       objpile_ptr = prefetchpile_ptr->objpiles;
+                       prefetchpile_ptr->objpiles = objpile_ptr->next;
                        free(objpile_ptr);
-                       objpile_ptr = objpile_next_ptr;
                }
-               prefetchpile_next_ptr = prefetchpile_ptr->next;
-               free(prefetchpile_ptr);
-               prefetchpile_ptr = prefetchpile_next_ptr;
+               prefetchpile_ptr = prefetchpile_next_ptr->next;
+               free(prefetchpile_next_ptr);
        }
 }
-
-
index 8c570d7f0428c26d34f5d8af82a2128a2b94f63d..26a3de2ca2e96a0272f0266274eda4e0dd122c76 100644 (file)
@@ -29,7 +29,7 @@ typedef struct mcpileq {
 }mcpileq_t;
 
 void mcpileqInit(void);
-void mcpileenqueue(prefetchpile_t *);
+void mcpileenqueue(prefetchpile_t *, prefetchpile_t *);
 prefetchpile_t *mcpiledequeue(void);
 void mcpiledelete();
 void mcpiledisplay();
index 906573385c6cd78ff0062e47211f8c8ed30d81fd..20004396c34c7814cb04e835fce9e5b9d5e469e7 100644 (file)
@@ -2,7 +2,7 @@
 
 objstr_t *objstrCreate(unsigned int size)
 {
-       objstr_t *tmp = malloc(sizeof(objstr_t) + size);
+       objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size));
        tmp->size = size;
        tmp->next = NULL;
        tmp->top = tmp + 1; //points to end of objstr_t structure!
@@ -38,7 +38,7 @@ void *objstrAlloc(objstr_t *store, unsigned int size)
                {  //end of list, all full
                        if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
                        {
-                               store->next = (objstr_t *)malloc(sizeof(objstr_t) + size);
+                               store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size));
                                if (store->next == NULL)
                                        return NULL;
                                store = store->next;
@@ -46,7 +46,7 @@ void *objstrAlloc(objstr_t *store, unsigned int size)
                        }
                        else
                        {
-                               store->next = malloc(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE);
+                               store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE));
                                if (store->next == NULL)
                                        return NULL;
                                store = store->next;
index 954a52b5443b063e4f3b82c2f2fc9f5d2aa71002..6837726bd34ed7b5a6fe808cc385bdbd529e343b 100644 (file)
@@ -57,6 +57,7 @@ prefetchqelem_t *pre_dequeue(void) {
        pqueue.front = pqueue.front->next;
        if (pqueue.front == NULL)
                pqueue.rear = NULL;
+       retnode->next = NULL;
 
        return retnode;
 }
index 48dcd42476f20882288374dfd91ea6e2ba626144..419ed321924828f28897955ef6cbf4d6ca41f19b 100644 (file)
@@ -84,6 +84,7 @@ inline int findmax(int *array, int arraylength) {
 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
        int qnodesize;
        int len = 0;
+       int i;
 
        /* Allocate for the queue node*/
        char *node;
@@ -102,6 +103,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
                memcpy(node + len, endoffsets, ntuples*sizeof(short));
                len += ntuples * sizeof(short);
                memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
+
                /* Lock and insert into primary prefetch queue */
                pthread_mutex_lock(&pqueue.qlock);
                pre_enqueue((prefetchqelem_t *)node);
@@ -189,6 +191,7 @@ void transInit() {
        do {
          retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        } while(retval!=0);
+
        pthread_detach(tPrefetch);
 
        //Create and Initialize a pool of threads 
@@ -228,7 +231,7 @@ void randomdelay()
 /* This function initializes things required in the transaction start*/
 transrecord_t *transStart()
 {
-       transrecord_t *tmp = malloc(sizeof(transrecord_t));
+       transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
        tmp->cache = objstrCreate(1048576);
        tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
 #ifdef COMPILER
@@ -325,8 +328,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 
                /* Get the object from the remote location */
                machinenumber = lhashSearch(oid);
-               char* ipaddr;
-               midtoIP(machinenumber, ipaddr);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
@@ -458,7 +459,7 @@ int transCommit(transrecord_t *record) {
                pthread_mutex_t tlshrd;
 
                thread_data_array_t *thread_data_array;
-               if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) {
+               if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
                        printf("Malloc error %s, %d\n", __FILE__, __LINE__);
                        pthread_cond_destroy(&tcond);
                        pthread_mutex_destroy(&tlock);
@@ -718,7 +719,7 @@ void *transRequest(void *threadarg) {
 
        /* Close connection */
        close(sd);
-       //pthread_exit(NULL);
+       pthread_exit(NULL);
 }
 
 /* This function decides the reponse that needs to be sent to 
@@ -812,7 +813,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
 /* This function opens a connection, places an object read request to the 
  * remote machine, reads the control message and object if available  and 
  * copies the object and its header to the local cache.
- * TODO replace mnum and midtoIP() with MACHINE_IP address later */ 
+ * */ 
 
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        int sd, size, val;
@@ -827,13 +828,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                perror("Error in socket\n");
                return NULL;
        }
+
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
-       //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
        midtoIP(mnum,machineip);
        machineip[15] = '\0';
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
+
        /* Open connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect\n");
@@ -999,7 +1001,7 @@ void *handleLocalReq(void *threadarg) {
                free(localtdata->transinfo->objnotfound);
        }
 
-       //pthread_exit(NULL);
+       pthread_exit(NULL);
 }
 
 /* This function completes the ABORT process if the transaction is aborting */
@@ -1020,12 +1022,12 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
        }
 
        printf("TRANS_ABORTED\n");
+
        return 0;
 }
 
 /*This function completes the COMMIT process is the transaction is commiting*/
 int transComProcess(local_thread_data_array_t  *localtdata) {
-       static int prevsize = 0, *prevptr;
        objheader_t *header, *tcptr;
        int i, nummod, tmpsize, numcreated, numlocked;
        unsigned int *oidmod, *oidcreated, *oidlocked;
@@ -1160,9 +1162,8 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
 }
 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
 prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
-       char *ptr, *tmp;
-       int ntuples, slength, i, machinenum;
-       int maxoffset;
+       char *ptr;
+       int ntuples, i, machinenum, count=0;
        unsigned int *oid;
        short *endoffsets, *arryfields, *offset; 
        prefetchpile_t *head = NULL;
@@ -1174,6 +1175,11 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
        arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
 
+       if((head = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+
        /* Check for redundant tuples by comparing oids of each tuple */
        for(i = 0; i < ntuples; i++) {
                if(oid[i] == 0)
@@ -1184,9 +1190,18 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
                        return NULL;
                }
                /* Insert into machine pile */
-               offset = &arryfields[endoffsets[i-1]];
-               insertPile(machinenum, oid[i], numoffset[i], offset, &head);
+               if(i == 0){
+                       offset = &arryfields[0];
+               } else {
+                       offset = &arryfields[endoffsets[i-1]];
+               }
+
+               if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){
+                       printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
        }
+
        return head;
 }
 
@@ -1205,6 +1220,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        oid = GET_PTR_OID(ptr);
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
        arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
        /* Find offset length for each tuple */
        int numoffset[ntuples];//Number of offsets for each tuple
        numoffset[0] = endoffsets[0];
@@ -1283,10 +1299,14 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
                        /* Look in Prefetch cache */
                        checkPreCache(node, numoffset, oid[i],i); 
                }
-
        }
+       
        /* Make machine groups */
-       head = makePreGroups(node, numoffset);
+       if((head = makePreGroups(node, numoffset)) == NULL) {
+               printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+
        return head;
 }
 
@@ -1360,6 +1380,7 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i
 void *transPrefetch(void *t) {
        prefetchqelem_t *qnode;
        prefetchpile_t *pilehead = NULL;
+       prefetchpile_t *ptr = NULL, *piletail = NULL;
 
        while(1) {
                /* lock mutex of primary prefetch queue */
@@ -1376,23 +1397,35 @@ void *transPrefetch(void *t) {
                        pthread_exit(NULL);
                }
                pthread_mutex_unlock(&pqueue.qlock);
+                               
                /* Reduce redundant prefetch requests */
                checkPrefetchTuples(qnode);
                /* Check if the tuples are found locally, if yes then reduce them further*/ 
                /* and group requests by remote machine ids by calling the makePreGroups() */
-               pilehead = foundLocal(qnode);
+               if((pilehead = foundLocal(qnode)) == NULL) {
+                       printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
+                       pthread_exit(NULL);
+               }
+
+               ptr = pilehead;
+               while(ptr != NULL) {
+                       if(ptr->next == NULL) {
+                               piletail = ptr;
+                       } 
+                       ptr = ptr->next;
+               }
 
                /* Lock mutex of pool queue */
                pthread_mutex_lock(&mcqueue.qlock);
                /* Update the pool queue with the new remote machine piles generated per prefetch call */
-               mcpileenqueue(pilehead);
+               mcpileenqueue(pilehead, piletail);
                /* Broadcast signal on machine pile queue */
                pthread_cond_broadcast(&mcqueue.qcond);
                /* Unlock mutex of  machine pile queue */
                pthread_mutex_unlock(&mcqueue.qlock);
                /* Deallocate the prefetch queue pile node */
                predealloc(qnode);
-
+               pthread_exit(NULL);
        }
 }
 
@@ -1428,17 +1461,17 @@ void *mcqProcess(void *threadid) {
 
                /* Deallocate the machine queue pile node */
                mcdealloc(mcpilenode);
+               pthread_exit(NULL);
        }
 }
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
-       int sd, i, offset, off, len, endpair, count = 0;
+       int sd, i, off, len, endpair, count = 0;
        struct sockaddr_in serv_addr;
        struct hostent *server;
        char machineip[16], control;
        objpile_t *tmp;
 
-
        /* Send Trans Prefetch Request */
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for SEND_PREFETCH_REQUEST\n");
@@ -1470,16 +1503,17 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        /* Send Oids and offsets in pairs */
        tmp = mcpilenode->objpiles;
        while(tmp != NULL) {
-               off = offset = 0;
+               off = 0;
                count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
                len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
                char oidnoffset[len];
+               bzero(oidnoffset, len);
                memcpy(oidnoffset, &len, sizeof(int));
                off = sizeof(int);
                memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
                off += sizeof(unsigned int);
                for(i = 0; i < tmp->numoffset; i++) {
-                       memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short));
+                       memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short));
                        off+=sizeof(short);
                }
                if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
@@ -1487,6 +1521,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
                        close(sd);
                        return;
                }
+
                tmp = tmp->next;
        }
 
@@ -1511,6 +1546,7 @@ void getPrefetchResponse(int count, int sd) {
        char *ptr;
        void *modptr, *oldptr;
 
+
        /* Read  prefetch response from the Remote machine */
        if((val = read(sd, &control, sizeof(char))) <= 0) {
                perror("No control response for Prefetch request sent\n");
@@ -1527,23 +1563,24 @@ void getPrefetchResponse(int count, int sd) {
                                perror("Size of buffer not recv\n");
                                return;
                        }
-                       memcpy(&bufsize, buffer, sizeof(unsigned int));
+                       bufsize = *((unsigned int *) buffer);
                        ptr = buffer + sizeof(unsigned int);
                        /* Keep receiving the buffer containing oid info */ 
                        do {
                                n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
                                sum +=n;
                        } while(sum < bufsize && n != 0);
+
                        /* Decode the contents of the buffer */
                        index = sizeof(unsigned int);
                        while(index < (bufsize - sizeof(unsigned int))) {
                                if(buffer[index] == OBJECT_FOUND) {
                                        /* Increment it to get the object */
                                        index += sizeof(char);
-                                       memcpy(&oid, buffer + index, sizeof(unsigned int));
+                                       oid = *((unsigned int *)(buffer+index));
                                        index += sizeof(unsigned int);
                                        /* For each object found add to Prefetch Cache */
-                                       memcpy(&objsize, buffer + index, sizeof(int));
+                                       objsize = *((int *)(buffer+index));
                                        index+=sizeof(int);
                                        pthread_mutex_lock(&prefetchcache_mutex);
                                        if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
@@ -1581,7 +1618,6 @@ void getPrefetchResponse(int count, int sd) {
                                        /* Increment it to get the object */
                                        /* TODO: For each object not found query DHT for new location and retrieve the object */
                                        index += sizeof(char);
-                                       //memcpy(&oid, buffer + index, sizeof(unsigned int));
                                        oid = *((unsigned int *)(buffer + index));
                                        index += sizeof(unsigned int);
                                        /* Throw an error */
@@ -1845,7 +1881,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
                return -1;
        } else {
                msg[0] = THREAD_NOTIFY_REQUEST;
-               msg[1] = numoid;
+               *((unsigned int *)(&msg[1])) = numoid;
                /* Send array of oids  */
                size = sizeof(unsigned int);
                {
@@ -1863,7 +1899,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
                        i = 0;
                        while(i < numoid) {
                                version = versionarry[i];
-                               *((unsigned short *)(&msg[1] + size)) = oid;
+                               *((unsigned short *)(&msg[1] + size)) = version;
                                size += sizeof(unsigned short);
                                i++;
                        }
@@ -1878,7 +1914,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
                if (bytesSent < 0){
                        perror("reqNotify():send()");
                        status = -1;
-               } else if (bytesSent != 1 + 5*sizeof(unsigned int)){
+               } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int)){
                        printf("reNotify(): error, sent %d bytes\n", bytesSent);
                        status = -1;
                } else {
@@ -1891,7 +1927,6 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
        close(sock);
        return status;
 }
-
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
        notifydata_t *ndata;
        int i, objIsFound = 0, index;