Added new PTHREAD_MUTEX_RECURSIVE mutex attribute
authoradash <adash>
Tue, 18 Sep 2007 17:31:31 +0000 (17:31 +0000)
committeradash <adash>
Tue, 18 Sep 2007 17:31:31 +0000 (17:31 +0000)
Checked locks for correcting waits
Fixed double frees
TODO: Correct segmentation violation due to corrupt pointers to run the third remotethread
test case

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/prelookup.c
Robust/src/Runtime/DSTM/interface/prelookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index efbc4a6b8c949d57f22f67a89d01de13b1655185..d4ea42b7da4ec67116a7b7d34e12aeb5d7e8a815 100644 (file)
@@ -162,7 +162,6 @@ typedef struct trans_commit_data{
 typedef struct thread_data_array {
   int thread_id;       
   int mid;    
-  int pilecount;               /* No of remote machines involved */
   trans_req_data_t *buffer;    /* Holds trans request information sent to participants */  
   thread_response_t *recvmsg;  /* Shared datastructure to keep track of the participants response to a trans request */
   pthread_cond_t *threshold;    /* Condition var to waking up a thread */
index 4ddf42a345fdea1d9bace03fb12ded97274ec829..2d2935779bd7617dfab20172796f6e8248b7db94 100644 (file)
@@ -127,7 +127,9 @@ void *dstmAccept(void *acceptfd)
                                perror("Error receiving object from cooridnator\n");
                                return NULL;
                        }
-                       srcObj = mhashSearch(oid);
+                       if((srcObj = mhashSearch(oid)) == NULL) {
+                               printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
+                       }
                        h = (objheader_t *) srcObj;
                        GETSIZE(size, h);
                        size += sizeof(objheader_t);
@@ -140,16 +142,16 @@ void *dstmAccept(void *acceptfd)
                                }
                        } else {
                                /* Type */
-                         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-                         *((int *)&msg[1])=size;
-                         if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
-                                 perror("Error sending size of object to coordinator\n");
-                                 return NULL;
-                         }
-                         if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
-                                 perror("Error in sending object\n");
-                                 return NULL;
-                         }
+                               char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+                               *((int *)&msg[1])=size;
+                               if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
+                                       perror("Error sending size of object to coordinator\n");
+                                       return NULL;
+                               }
+                               if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
+                                       perror("Error in sending object\n");
+                                       return NULL;
+                               }
                        }
                        break;
                
@@ -285,12 +287,19 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        /*Process the information read */
        if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
                printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
+               /* Free resources */
+               if(oidmod != NULL) {
+                       free(oidmod);
+                       oidmod = NULL;
+               }
                return 1;
        }
 
-
        /* Free resources */
-       free(oidmod);
+       if(oidmod != NULL) {
+               free(oidmod);
+               oidmod = NULL;
+       }
 
        return 0;
 }
@@ -334,12 +343,21 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                                header = mhashSearch(transinfo->objlocked[i]);// find the header address
                                STATUS(((objheader_t *)header)) &= ~(LOCK);             
                        }
-               
+
                        /* Send ack to Coordinator */
                        printf("DEBUG -> Recv TRANS_ABORT\n");
                        sendctrl = TRANS_SUCESSFUL;
                        if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                perror("Error sending ACK to coordinator\n");
+                               if (transinfo->objlocked != NULL) {
+                                       free(transinfo->objlocked);
+                                       transinfo->objlocked = NULL;
+                               }
+                               if (transinfo->objnotfound != NULL) {
+                                       free(transinfo->objnotfound);
+                                       transinfo->objnotfound = NULL;
+                               }
+
                                return 1;
                        }
                        ptr = NULL;
@@ -350,6 +368,17 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                        printf("DEBUG -> Recv TRANS_COMMIT \n");
                        if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
                                printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
+                               /* Free memory */
+                               printf("DEBUG -> Freeing...\n");
+                               fflush(stdout);
+                               if (transinfo->objlocked != NULL) {
+                                       free(transinfo->objlocked);
+                                       transinfo->objlocked = NULL;
+                               }
+                               if (transinfo->objnotfound != NULL) {
+                                       free(transinfo->objnotfound);
+                                       transinfo->objnotfound = NULL;
+                               }
                                return 1;
                        }
                        break;
@@ -363,10 +392,11 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                        //TODO Use fixed.trans_id  TID since Client may have died
                        break;
        }
+
        /* Free memory */
        printf("DEBUG -> Freeing...\n");
        fflush(stdout);
-       
+
        if (transinfo->objlocked != NULL) {
                free(transinfo->objlocked);
                transinfo->objlocked = NULL;
@@ -375,6 +405,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                free(transinfo->objnotfound);
                transinfo->objnotfound = NULL;
        }
+
        return 0;
 }
 
@@ -557,6 +588,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                header = (objheader_t *) mhashSearch(oidmod[i]);
                header->version += 1; 
        }
+
        /* Unlock locked objects */
        for(i = 0; i < numlocked; i++) {
                if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
@@ -565,7 +597,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                }
                STATUS(header) &= ~(LOCK);
        }
-
        //TODO Update location lookup table
 
        /* Send ack to coordinator */
index e92d5957b120cda38d36b0f562bf13ef3492b65a..64f756ba298760bcb1a4aec234321d11f538b66f 100644 (file)
@@ -18,8 +18,15 @@ unsigned int prehashCreate(unsigned int size, float loadfactor) {
         pflookup.numelements = 0; // Initial number of elements in the hash
         pflookup.loadfactor = loadfactor;
 
+       //Intiliaze and set prefetch table mutex attribute
+       pthread_mutexattr_init(&pflookup.prefetchmutexattr);
+       //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
+       //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
+       pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
+
        //Initialize mutex var
-       pthread_mutex_init(&pflookup.lock, NULL);
+       pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
+       //pthread_mutex_init(&pflookup.lock, NULL);
         
        return 0;
 }
index e8bb204dbc23dd674e7662018132aab8bd609363..acb540d94051fd29abaae3443b7e0ee2cae62265 100644 (file)
@@ -20,6 +20,7 @@ typedef struct prehashtable {
        unsigned int numelements;
        float loadfactor;
        pthread_mutex_t lock;
+       pthread_mutexattr_t prefetchmutexattr;
        pthread_cond_t cond;
 } prehashtable_t;
 
index f85e8fbcd365e4598ccff8806b38114b6c7694bd..2b0b5fbcaabe91aab98091984076db6bb75a42fb 100644 (file)
@@ -30,11 +30,11 @@ extern int classsize[];
 extern primarypfq_t pqueue; // shared prefetch queue
 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
 objstr_t *prefetchcache; //Global Prefetch cache
-pthread_mutex_t prefetchcache_mutex;
-extern pthread_mutex_t mainobjstore_mutex;
+pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
+extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
-pthread_t tPrefetch;
+pthread_t tPrefetch;           /* Primary Prefetch thread that processes the prefetch queue */
 extern objstr_t *mainobjstore;
 unsigned int myIpAddr;
 unsigned int *hostIpAddrs;
@@ -196,7 +196,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
        rc = gettimeofday(&tp, NULL);
 
        /* Convert from timeval to timespec */
-       ts.tv_nsec = tp.tv_usec * 1000;
+       ts.tv_nsec = tp.tv_usec * 10;
 
        /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
@@ -210,6 +210,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                tmp = mhashSearch(oid);
                GETSIZE(size, tmp);
                size += sizeof(objheader_t);
+               //TODO:Lock the local trans cache while copying the object here
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)objheader, size);
                /* Insert into cache's lookup table */
@@ -223,6 +224,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                found = 1;
                GETSIZE(size, tmp);
                size+=sizeof(objheader_t);
+               //TODO:Lock the local  trans cache while copying the object here
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)tmp, size);
                /* Insert into cache's lookup table */
@@ -232,41 +234,39 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 #else
                return objcopy;
 #endif
-       } else { /* If not found anywhere, then block until object appears in prefetch cache */
-#if 0  
-               printf("Inside remote machine\n");
-               pthread_mutex_lock(&pflookup.lock);
+       } else {
+               /*If object not found in prefetch cache then block until object appears in the prefetch cache */
+               pthread_mutex_lock(&prefetchcache_mutex);
                while(!found) {
-                       rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
+                       rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
                        if(rc == ETIMEDOUT) {
                                printf("Wait timed out\n");
                                /* Check Prefetch cache again */
-                               if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+                               if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
                                        found = 1;
-                                       GETSIZE(size, tmp);
+                                       GETSIZE(size,tmp);
                                        size+=sizeof(objheader_t);
                                        objcopy = objstrAlloc(record->cache, size);
                                        memcpy(objcopy, (void *)tmp, size);
-                                       /* Insert into cache's lookup table */
                                        chashInsert(record->lookupTable, OID(tmp), objcopy); 
+                                       pthread_mutex_unlock(&prefetchcache_mutex);
 #ifdef COMPILER
                                        return &objcopy[1];
 #else
                                        return objcopy;
 #endif
                                } else {
-                                       pthread_mutex_unlock(&pflookup.lock);
+                                       pthread_mutex_unlock(&prefetchcache_mutex);
                                        break;
                                }
-                               pthread_mutex_unlock(&pflookup.lock);
                        }
                }
-#endif
+
                /* Get the object from the remote location */
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
-                       //If object is not found in Remote location
+                       printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
                        return NULL;
                } else {
 #ifdef COMPILER
@@ -275,7 +275,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                  return objcopy;
 #endif
                }
-       } 
+       }
 }
 
 /* This function creates objects in the transaction record */
@@ -444,7 +444,6 @@ int transCommit(transrecord_t *record) {
                        tosend->oidcreated = pile->oidcreated;
                        thread_data_array[threadnum].thread_id = threadnum;
                        thread_data_array[threadnum].mid = pile->mid;
-                       thread_data_array[threadnum].pilecount = pilecount;
                        thread_data_array[threadnum].buffer = tosend;
                        thread_data_array[threadnum].recvmsg = rcvd_control_msg;
                        thread_data_array[threadnum].threshold = &tcond;
@@ -483,13 +482,15 @@ int transCommit(transrecord_t *record) {
                                        return 1;
                                }
                        }
+
                        threadnum++;            
                        pile = pile->next;
                }
 
                /* Free attribute and wait for the other threads */
                pthread_attr_destroy(&attr);
-               for (i = 0 ;i < pilecount ; i++) {
+               
+               for (i = 0; i < pilecount; i++) {
                        rc = pthread_join(thread[i], NULL);
                        if(rc)
                        {
@@ -505,14 +506,18 @@ int transCommit(transrecord_t *record) {
                        }
                        free(thread_data_array[i].buffer);
                }
+       
 
                /* Free resources */    
                pthread_cond_destroy(&tcond);
                pthread_mutex_destroy(&tlock);
-               free(listmid);
+               if(listmid != NULL)
+                       free(listmid);
                pDelete(pile_ptr);
-               free(thread_data_array);
-               free(ltdata);
+               if(thread_data_array != NULL)
+                       free(thread_data_array);
+               if(ltdata != NULL)
+                       free(ltdata);
 
                /* wait a random amount of time */
                if (treplyretry == 1)
@@ -567,7 +572,7 @@ void *transRequest(void *threadarg) {
        }
        /* Send list of machines involved in the transaction */
        {
-               int size=sizeof(unsigned int)*tdata->pilecount;
+               int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
                if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
                        perror("Error sending list of machines for thread\n");
                        pthread_exit(NULL);
@@ -614,7 +619,7 @@ void *transRequest(void *threadarg) {
        (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
 
        /* Wake up the threads and invoke decideResponse (once) */
-       if(*(tdata->count) == tdata->pilecount) {
+       if(*(tdata->count) == tdata->buffer->f.mcount) {
                if (decideResponse(tdata) != 0) { 
                        printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
                        pthread_mutex_unlock(tdata->lock);
@@ -648,7 +653,7 @@ int decideResponse(thread_data_array_t *tdata) {
        int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                         message to send */
 
-       for (i = 0 ; i < tdata->pilecount ; i++) {
+       for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
                control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
                                                           written onto the shared array */
                switch(control) {
@@ -676,7 +681,7 @@ int decideResponse(thread_data_array_t *tdata) {
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
                free(tdata->rec);
-       } else if(transagree == tdata->pilecount){
+       } else if(transagree == tdata->buffer->f.mcount){
                /* Send Commit */
                *(tdata->replyctrl) = TRANS_COMMIT;
                /* Free resources */
@@ -840,7 +845,7 @@ void *handleLocalReq(void *threadarg) {
                headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
                GETSIZE(size,headeraddr);
                size+=sizeof(objheader_t);
-               memcpy(modptr+offset, headeraddr, size);  
+               memcpy((char *)modptr+offset, headeraddr, size);  
                offset += size;
        }
        /* Write new objects into the mainobject store */
@@ -848,7 +853,7 @@ void *handleLocalReq(void *threadarg) {
                headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
                GETSIZE(size, headeraddr);
                size+=sizeof(objheader_t);
-               memcpy(modptr+offset, headeraddr, size);  
+               memcpy((char *)modptr+offset, headeraddr, size);  
                offset += size;
        }
 
@@ -940,7 +945,7 @@ void *handleLocalReq(void *threadarg) {
        (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
 
        /* Wake up the threads and invoke decideResponse (once) */
-       if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
+       if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
                if (decideResponse(localtdata->tdata) != 0) { 
                        printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
                        pthread_mutex_unlock(localtdata->tdata->lock);
@@ -966,7 +971,6 @@ void *handleLocalReq(void *threadarg) {
        }
 
        /* Free memory */
-
        if (localtdata->transinfo->objlocked != NULL) {
                free(localtdata->transinfo->objlocked);
                localtdata->transinfo->objlocked = NULL;
@@ -987,9 +991,9 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        void *header;
 
        /* Set all ref counts as 1 and do garbage collection */
-       ptr = (char *)modptr;
+       ptr = modptr;
        for(i = 0; i< nummod; i++) {
-         int tmpsize;
+               int tmpsize;
                tmp_header = (objheader_t *)ptr;
                tmp_header->rcount = 0;
                GETSIZE(tmpsize, tmp_header);
@@ -1005,6 +1009,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        }
 
        /* Send ack to Coordinator */
+       printf("TRANS_SUCCESSFUL\n");
 
        /*Free the pointer */
        ptr = NULL;
@@ -1041,7 +1046,7 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated
 
        for (i = 0; i < numcreated; i++)
        {
-         int tmpsize;
+               int tmpsize;
                header = (objheader_t *)(((char *)modptr) + offset);
                mhashInsert(oidcreated[i], (((char *)modptr) + offset));
                GETSIZE(tmpsize, header);
@@ -1062,6 +1067,7 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated
        //TODO Update location lookup table
 
        /* Send ack to Coordinator */
+       printf("TRANS_SUCCESSFUL\n");
        return 0;
 }
 
@@ -1552,12 +1558,12 @@ unsigned short getObjType(unsigned int oid)
                if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
                {
                        prefetch(1, &oid, &numoffsets, NULL);
+                       pthread_mutex_lock(&pflookup.lock);
                        while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
                        {
-                               pthread_mutex_lock(&pflookup.lock);
                                pthread_cond_wait(&pflookup.cond, &pflookup.lock);
-                               pthread_mutex_unlock(&pflookup.lock);
                        }
+                       pthread_mutex_unlock(&pflookup.lock);
                }
        }