acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
}
- pthread_exit(NULL);
}
/* This function accepts a new connection request, decodes the control message in the connection
* and accordingly calls other functions to process new requests */
trans_commit_data_t transinfo;
unsigned short objType;
+ transinfo.objlocked = NULL;
+ transinfo.objnotfound = NULL;
+ transinfo.modptr = NULL;
+ transinfo.numlocked = 0;
+ transinfo.numnotfound = 0;
+
int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
/* Receive control messages from other machines */
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
if (retval == 0) {
- return; // Testing connection
+ pthread_exit(NULL); // Testing connection
}
perror("Error in receiving control from coordinator\n");
- return;
+ pthread_exit(NULL);
}
switch(control) {
/* Read oid requested and search if available */
if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
perror("Error receiving object from cooridnator\n");
- return NULL;
+ pthread_exit(NULL);
}
if((srcObj = mhashSearch(oid)) == NULL) {
printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
ctrl = OBJECT_NOT_FOUND;
if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending control msg to coordinator\n");
- return NULL;
+ pthread_exit(NULL);
}
} else {
/* Type */
*((int *)&msg[1])=size;
if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
perror("Error sending size of object to coordinator\n");
- return NULL;
+ pthread_exit(NULL);
}
if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
perror("Error in sending object\n");
- return NULL;
+ pthread_exit(NULL);
}
}
break;
printf("DEBUG -> Recv TRANS_REQUEST\n");
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
printf("Error in readClientReq\n");
- return;
+ pthread_exit(NULL);
}
break;
case TRANS_PREFETCH:
printf("DEBUG -> Recv TRANS_PREFETCH\n");
if((val = prefetchReq((int)acceptfd)) != 0) {
printf("Error in readClientReq\n");
- return;
+ pthread_exit(NULL);
}
break;
case START_REMOTE_THREAD:
objheader_t *headaddr;
int sum = 0, i, N, n, val;
+ oidmod = NULL;
+
/* Read fixed_data_t data structure */
N = sizeof(fixed) - 1;
ptr = (char *)&fixed;;
/* Create an array of oids for modified objects */
oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
+ if (oidmod == NULL)
+ {
+ printf("calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
ptr = (char *) modptr;
for(i = 0 ; i < fixed.nummod; i++) {
int tmpsize;
short version;
char control = 0, *ptr;
unsigned int oid;
- unsigned int *oidnotfound, *oidlocked, *oidmod;
+ unsigned int *oidnotfound, *oidlocked;
void *mobj;
objheader_t *headptr;
if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found and number of oids not found for later use */
- //oidnotfound[objnotfound] = OID(((objheader_t *)mobj));
oidnotfound[objnotfound] = oid;
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
int transCommit(transrecord_t *record) {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
- int i, rc, val;
- int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0;
+ int i, j, rc, val;
+ int pilecount, offset, threadnum, trecvcount;
char buffer[RECEIVE_BUFFER_SIZE],control;
char transid[TID_LEN];
trans_req_data_t *tosend;
char localstat = 0;
do {
+ trecvcount = 0;
+ threadnum = 0;
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
/* Create a list of machine ids(Participants) involved in transaction */
if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ free(record);
return 1;
}
pListMid(pile, listmid);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
+ free(record);
return 1;
}
pDelete(pile_ptr);
free(listmid);
free(thread_data_array);
+ free(record);
return 1;
}
free(listmid);
free(thread_data_array);
free(ltdata);
+ free(record);
return 1;
}
tosend->f.control = TRANS_REQUEST;
thread_data_array[threadnum].rec = record;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
- rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
+ rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
if(rc) {
perror("Error in pthread create\n");
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
+ for (i = 0; i < threadnum; i++)
+ free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
- free(tosend);
+ free(record);
return 1;
}
} else { /*Local*/
ltdata->tdata = &thread_data_array[threadnum];
ltdata->transinfo = &transinfo;
- val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
+ val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
if(val) {
perror("Error in pthread create\n");
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
+ for (i = 0; i < threadnum; i++)
+ free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
- free(tosend);
+ free(record);
return 1;
}
}
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
+ for (j = i; j < pilecount; j++)
+ free(thread_data_array[j].buffer);
free(thread_data_array);
free(ltdata);
- free(tosend);
+ free(record);
return 1;
}
free(thread_data_array[i].buffer);
/* Free resources */
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
- if(listmid != NULL)
- free(listmid);
+ free(listmid);
pDelete(pile_ptr);
- if(thread_data_array != NULL)
- free(thread_data_array);
- if(ltdata != NULL)
- free(ltdata);
+ free(thread_data_array);
+ free(ltdata);
/* wait a random amount of time */
if (treplyretry == 1)
/* Retry trans commit procedure if not sucessful in the first try */
} while (treplyretry == 1);
-
+
+ free(record);
return 0;
}
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for TRANS_REQUEST\n");
pthread_exit(NULL);
- return NULL;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
/* Open Connection */
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect for TRANS_REQUEST\n");
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
/* Send bytes of data with TRANS_REQUEST control message */
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
perror("Error sending fixed bytes for thread\n");
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
perror("Error sending list of machines for thread\n");
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
}
/* Send oids and version number tuples for objects that are read */
int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
perror("Error sending tuples for thread\n");
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
}
/* Send objects that are modified */
size+=sizeof(objheader_t);
if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
perror("Error sending obj modified for thread\n");
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
}
/* Read control message from Participant */
if((n = read(sd, &control, sizeof(char))) <= 0) {
perror("Error in reading control message from Participant\n");
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
recvcontrol = control;
/* Wake up the threads and invoke decideResponse (once) */
if(*(tdata->count) == tdata->buffer->f.mcount) {
- if (decideResponse(tdata) != 0) {
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(tdata->lock);
- pthread_exit(NULL);
- return NULL;
- }
+ decideResponse(tdata);
pthread_cond_broadcast(tdata->threshold);
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(tdata->lock);
+ close(sd);
pthread_exit(NULL);
- return NULL;
}
/* Close connection */
/* This function decides the reponse that needs to be sent to
* all Participant machines after the TRANS_REQUEST protocol */
-int decideResponse(thread_data_array_t *tdata) {
+void decideResponse(thread_data_array_t *tdata) {
char control;
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
written onto the shared array */
switch(control) {
+ default:
+ printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+ /* treat as disagree, pass thru */
case TRANS_DISAGREE:
transdisagree++;
break;
case TRANS_SOFT_ABORT:
transsoftabort++;
break;
- default:
- printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
- return -1;
}
}
/* Free resources */
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
- free(tdata->rec);
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
/* Free resources */
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
- free(tdata->rec);
- } else if(transsoftabort > 0 && transdisagree == 0) {
+ } else { /* (transsoftabort > 0 && transdisagree == 0) */
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 1;
- } else {
- return -1;
}
- return 0;
+ return;
}
/* This function sends the final response to remote machines per thread in their respective socket id */
char sendResponse(thread_data_array_t *tdata, int sd) {
if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&mainobjstore_mutex);
- return NULL;
+ pthread_exit(NULL);
}
pthread_mutex_unlock(&mainobjstore_mutex);
/* Write modified objects into the mainobject store */
/* Wake up the threads and invoke decideResponse (once) */
if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
- if (decideResponse(localtdata->tdata) != 0) {
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(localtdata->tdata->lock);
- return NULL;
- }
+ decideResponse(localtdata->tdata);
pthread_cond_broadcast(localtdata->tdata->threshold);
} else {
pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
- return NULL;
+ pthread_exit(NULL);
}
}else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
- return NULL;
+ pthread_exit(NULL);
}
}
/* dequeue node to create a machine piles and finally unlock mutex */
if((qnode = pre_dequeue()) == NULL) {
printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
- return NULL;
+ pthread_exit(NULL);
}
pthread_mutex_unlock(&pqueue.qlock);
/* Reduce redundant prefetch requests */
/* Dequeue node to send remote machine connections*/
if((mcpilenode = mcpiledequeue()) == NULL) {
printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
- return NULL;
+ pthread_exit(NULL);
}
/* Unlock mutex */
pthread_mutex_unlock(&mcqueue.qlock);
/* Open Connection */
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect for TRANS_REQUEST\n");
+ close(sd);
return;
}
control = TRANS_PREFETCH;
if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error in sending prefetch control\n");
+ close(sd);
return;
}
}
if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
perror("Error sending fixed bytes for thread\n");
+ close(sd);
return;
}
tmp = tmp->next;
endpair = -1;
if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
perror("Error sending fixed bytes for thread\n");
+ close(sd);
return;
}