From 9b8e42591d84c31568f33d456df7a888baac36d9 Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 11 Sep 2007 22:43:58 +0000 Subject: [PATCH] Fix memory leak problems in transCommit() Start to add support for remote thread invokation --- Robust/src/ClassLibrary/ThreadDSM.java | 21 ++++---- Robust/src/Runtime/DSTM/interface/trans.c | 61 ++++++++++++++++++++--- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/Robust/src/ClassLibrary/ThreadDSM.java b/Robust/src/ClassLibrary/ThreadDSM.java index 3008a067..d8890d1e 100644 --- a/Robust/src/ClassLibrary/ThreadDSM.java +++ b/Robust/src/ClassLibrary/ThreadDSM.java @@ -1,15 +1,14 @@ -public class Thread { - public void start() { - nativeCreate(); - } +public class ThreadDSM { - private static void staticStart(Thread t) { - t.run(); - } + public void start(int mid) { + run(mid); + } - public native static void sleep(long millis); - - public void run() {} + public native static void sleep(long millis); - private native void nativeCreate(); + public void run(int mid) { + + } + + public int startRemoteThread(int mid); } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 881baf1f..c5f3500a 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -87,7 +87,7 @@ int dstmStartup(const char * option) { pthread_attr_t attr; int master=strcmp(option, "master")==0; - myIpAddr = getMyIpAddr("eth0"); + myIpAddr = getMyIpAddr("eth0"); dstmInit(); transInit(); @@ -351,10 +351,23 @@ int transCommit(transrecord_t *record) { pthread_mutex_t tlshrd; thread_data_array_t *thread_data_array; - thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); + if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) { + printf("Malloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + return 1; + } + local_thread_data_array_t *ltdata; if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); return 1; } @@ -372,6 +385,12 @@ int transCommit(transrecord_t *record) { newtid++; if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + free(ltdata); return 1; } tosend->f.control = TRANS_REQUEST; @@ -399,16 +418,30 @@ int transCommit(transrecord_t *record) { /* If local do not create any extra connection */ if(pile->mid != myIpAddr) { /* Not local */ rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); - if (rc) { + if(rc) { perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + free(ltdata); + free(tosend); return 1; } } else { /*Local*/ ltdata->tdata = &thread_data_array[threadnum]; ltdata->transinfo = &transinfo; val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata); - if (val) { + if(val) { perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + free(ltdata); + free(tosend); return 1; } } @@ -420,9 +453,16 @@ int transCommit(transrecord_t *record) { pthread_attr_destroy(&attr); for (i = 0 ;i < pilecount ; i++) { rc = pthread_join(thread[i], NULL); - if (rc) + if(rc) { printf("ERROR return code from pthread_join() is %d\n", rc); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + free(ltdata); + free(tosend); return 1; } free(thread_data_array[i].buffer); @@ -464,6 +504,7 @@ void *transRequest(void *threadarg) { /* Send Trans Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST\n"); + pthread_exit(NULL); return NULL; } bzero((char*) &serv_addr, sizeof(serv_addr)); @@ -475,6 +516,7 @@ void *transRequest(void *threadarg) { /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); + pthread_exit(NULL); return NULL; } @@ -482,6 +524,7 @@ void *transRequest(void *threadarg) { /* Send bytes of data with TRANS_REQUEST control message */ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { perror("Error sending fixed bytes for thread\n"); + pthread_exit(NULL); return NULL; } /* Send list of machines involved in the transaction */ @@ -489,6 +532,7 @@ void *transRequest(void *threadarg) { int size=sizeof(unsigned int)*tdata->pilecount; if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { perror("Error sending list of machines for thread\n"); + pthread_exit(NULL); return NULL; } } @@ -497,6 +541,7 @@ void *transRequest(void *threadarg) { int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { perror("Error sending tuples for thread\n"); + pthread_exit(NULL); return NULL; } } @@ -507,6 +552,7 @@ void *transRequest(void *threadarg) { size=sizeof(objheader_t)+classsize[TYPE(headeraddr)]; if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { perror("Error sending obj modified for thread\n"); + pthread_exit(NULL); return NULL; } } @@ -514,6 +560,7 @@ void *transRequest(void *threadarg) { /* Read control message from Participant */ if((n = read(sd, &control, sizeof(char))) <= 0) { perror("Error in reading control message from Participant\n"); + pthread_exit(NULL); return NULL; } recvcontrol = control; @@ -532,7 +579,7 @@ void *transRequest(void *threadarg) { if (decideResponse(tdata) != 0) { printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_mutex_unlock(tdata->lock); - close(sd); + pthread_exit(NULL); return NULL; } pthread_cond_broadcast(tdata->threshold); @@ -546,7 +593,7 @@ void *transRequest(void *threadarg) { if (sendResponse(tdata, sd) == 0) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_mutex_unlock(tdata->lock); - close(sd); + pthread_exit(NULL); return NULL; } -- 2.34.1