Fix memory leak problems in transCommit()
authoradash <adash>
Tue, 11 Sep 2007 22:43:58 +0000 (22:43 +0000)
committeradash <adash>
Tue, 11 Sep 2007 22:43:58 +0000 (22:43 +0000)
Start to add support for remote thread invokation

Robust/src/ClassLibrary/ThreadDSM.java
Robust/src/Runtime/DSTM/interface/trans.c

index 3008a067012d806701ed2c5e0103914e145d539e..d8890d1e943e6c13080d112b8b7948eb3d81fa9a 100644 (file)
@@ -1,15 +1,14 @@
-public class Thread {
-    public void start() {
-       nativeCreate();
-    }
+public class ThreadDSM {
 
-    private static void staticStart(Thread t) {
-       t.run();
-    }
+       public void start(int mid) {
+               run(mid);
+       }
 
-    public native static void sleep(long millis);
-    
-    public void run() {}
+       public native static void sleep(long millis);
 
-    private native void nativeCreate();
+       public void run(int mid) {
+
+       }
+       
+       public int startRemoteThread(int mid);
 }
index 881baf1fba251acfcc921d89ac8fbc4a5dd4db8e..c5f3500ade25fcfc69c4aeafc1d6682e364bf525 100644 (file)
@@ -87,7 +87,7 @@ int dstmStartup(const char * option) {
   pthread_attr_t attr;
   int master=strcmp(option, "master")==0;
 
-       myIpAddr = getMyIpAddr("eth0");
+  myIpAddr = getMyIpAddr("eth0");
 
   dstmInit();
   transInit();
@@ -351,10 +351,23 @@ int transCommit(transrecord_t *record) {
                pthread_mutex_t tlshrd;
 
                thread_data_array_t *thread_data_array;
-               thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
+               if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) {
+                       printf("Malloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_cond_destroy(&tcond);
+                       pthread_mutex_destroy(&tlock);
+                       pDelete(pile_ptr);
+                       free(listmid);
+                       return 1;
+               }
+
                local_thread_data_array_t *ltdata;
                if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_cond_destroy(&tcond);
+                       pthread_mutex_destroy(&tlock);
+                       pDelete(pile_ptr);
+                       free(listmid);
+                       free(thread_data_array);
                        return 1;
                }
 
@@ -372,6 +385,12 @@ int transCommit(transrecord_t *record) {
                        newtid++;
                        if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
                                printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                               pthread_cond_destroy(&tcond);
+                               pthread_mutex_destroy(&tlock);
+                               pDelete(pile_ptr);
+                               free(listmid);
+                               free(thread_data_array);
+                               free(ltdata);
                                return 1;
                        }
                        tosend->f.control = TRANS_REQUEST;
@@ -399,16 +418,30 @@ int transCommit(transrecord_t *record) {
                        /* If local do not create any extra connection */
                        if(pile->mid != myIpAddr) { /* Not local */
                                rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
-                               if (rc) {
+                               if(rc) {
                                        perror("Error in pthread create\n");
+                                       pthread_cond_destroy(&tcond);
+                                       pthread_mutex_destroy(&tlock);
+                                       pDelete(pile_ptr);
+                                       free(listmid);
+                                       free(thread_data_array);
+                                       free(ltdata);
+                                       free(tosend);
                                        return 1;
                                }
                        } else { /*Local*/
                                ltdata->tdata = &thread_data_array[threadnum];
                                ltdata->transinfo = &transinfo;
                                val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
-                               if (val) {
+                               if(val) {
                                        perror("Error in pthread create\n");
+                                       pthread_cond_destroy(&tcond);
+                                       pthread_mutex_destroy(&tlock);
+                                       pDelete(pile_ptr);
+                                       free(listmid);
+                                       free(thread_data_array);
+                                       free(ltdata);
+                                       free(tosend);
                                        return 1;
                                }
                        }
@@ -420,9 +453,16 @@ int transCommit(transrecord_t *record) {
                pthread_attr_destroy(&attr);
                for (i = 0 ;i < pilecount ; i++) {
                        rc = pthread_join(thread[i], NULL);
-                       if (rc)
+                       if(rc)
                        {
                                printf("ERROR return code from pthread_join() is %d\n", rc);
+                               pthread_cond_destroy(&tcond);
+                               pthread_mutex_destroy(&tlock);
+                               pDelete(pile_ptr);
+                               free(listmid);
+                               free(thread_data_array);
+                               free(ltdata);
+                               free(tosend);
                                return 1;
                        }
                        free(thread_data_array[i].buffer);
@@ -464,6 +504,7 @@ void *transRequest(void *threadarg) {
        /* Send Trans Request */
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST\n");
+               pthread_exit(NULL);
                return NULL;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
@@ -475,6 +516,7 @@ void *transRequest(void *threadarg) {
        /* Open Connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST\n");
+               pthread_exit(NULL);
                return NULL;
        }
 
@@ -482,6 +524,7 @@ void *transRequest(void *threadarg) {
        /* Send bytes of data with TRANS_REQUEST control message */
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
                perror("Error sending fixed bytes for thread\n");
+               pthread_exit(NULL);
                return NULL;
        }
        /* Send list of machines involved in the transaction */
@@ -489,6 +532,7 @@ void *transRequest(void *threadarg) {
                int size=sizeof(unsigned int)*tdata->pilecount;
                if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
                        perror("Error sending list of machines for thread\n");
+                       pthread_exit(NULL);
                        return NULL;
                }
        }
@@ -497,6 +541,7 @@ void *transRequest(void *threadarg) {
                int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
                if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
                        perror("Error sending tuples for thread\n");
+                       pthread_exit(NULL);
                        return NULL;
                }
        }
@@ -507,6 +552,7 @@ void *transRequest(void *threadarg) {
                size=sizeof(objheader_t)+classsize[TYPE(headeraddr)];
                if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
                        perror("Error sending obj modified for thread\n");
+                       pthread_exit(NULL);
                        return NULL;
                }
        }
@@ -514,6 +560,7 @@ void *transRequest(void *threadarg) {
        /* Read control message from Participant */
        if((n = read(sd, &control, sizeof(char))) <= 0) {
                perror("Error in reading control message from Participant\n");
+               pthread_exit(NULL);
                return NULL;
        }
        recvcontrol = control;
@@ -532,7 +579,7 @@ void *transRequest(void *threadarg) {
                if (decideResponse(tdata) != 0) { 
                        printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
                        pthread_mutex_unlock(tdata->lock);
-                       close(sd);
+                       pthread_exit(NULL);
                        return NULL;
                }
                pthread_cond_broadcast(tdata->threshold);
@@ -546,7 +593,7 @@ void *transRequest(void *threadarg) {
        if (sendResponse(tdata, sd) == 0) { 
                printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
                pthread_mutex_unlock(tdata->lock);
-               close(sd);
+               pthread_exit(NULL);
                return NULL;
        }