pthread_attr_t attr;
int master=strcmp(option, "master")==0;
- myIpAddr = getMyIpAddr("eth0");
+ myIpAddr = getMyIpAddr("eth0");
dstmInit();
transInit();
pthread_mutex_t tlshrd;
thread_data_array_t *thread_data_array;
- thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
+ if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) {
+ printf("Malloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ return 1;
+ }
+
local_thread_data_array_t *ltdata;
if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ free(thread_data_array);
return 1;
}
newtid++;
if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ free(thread_data_array);
+ free(ltdata);
return 1;
}
tosend->f.control = TRANS_REQUEST;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
- if (rc) {
+ if(rc) {
perror("Error in pthread create\n");
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ free(thread_data_array);
+ free(ltdata);
+ free(tosend);
return 1;
}
} else { /*Local*/
ltdata->tdata = &thread_data_array[threadnum];
ltdata->transinfo = &transinfo;
val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
- if (val) {
+ if(val) {
perror("Error in pthread create\n");
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ free(thread_data_array);
+ free(ltdata);
+ free(tosend);
return 1;
}
}
pthread_attr_destroy(&attr);
for (i = 0 ;i < pilecount ; i++) {
rc = pthread_join(thread[i], NULL);
- if (rc)
+ if(rc)
{
printf("ERROR return code from pthread_join() is %d\n", rc);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ free(thread_data_array);
+ free(ltdata);
+ free(tosend);
return 1;
}
free(thread_data_array[i].buffer);
/* Send Trans Request */
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for TRANS_REQUEST\n");
+ pthread_exit(NULL);
return NULL;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
/* Open Connection */
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect for TRANS_REQUEST\n");
+ pthread_exit(NULL);
return NULL;
}
/* Send bytes of data with TRANS_REQUEST control message */
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
perror("Error sending fixed bytes for thread\n");
+ pthread_exit(NULL);
return NULL;
}
/* Send list of machines involved in the transaction */
int size=sizeof(unsigned int)*tdata->pilecount;
if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
perror("Error sending list of machines for thread\n");
+ pthread_exit(NULL);
return NULL;
}
}
int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
perror("Error sending tuples for thread\n");
+ pthread_exit(NULL);
return NULL;
}
}
size=sizeof(objheader_t)+classsize[TYPE(headeraddr)];
if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
perror("Error sending obj modified for thread\n");
+ pthread_exit(NULL);
return NULL;
}
}
/* Read control message from Participant */
if((n = read(sd, &control, sizeof(char))) <= 0) {
perror("Error in reading control message from Participant\n");
+ pthread_exit(NULL);
return NULL;
}
recvcontrol = control;
if (decideResponse(tdata) != 0) {
printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(tdata->lock);
- close(sd);
+ pthread_exit(NULL);
return NULL;
}
pthread_cond_broadcast(tdata->threshold);
if (sendResponse(tdata, sd) == 0) {
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(tdata->lock);
- close(sd);
+ pthread_exit(NULL);
return NULL;
}