From: adash Date: Thu, 19 Apr 2007 07:39:38 +0000 (+0000) Subject: Various bug fixes X-Git-Tag: preEdgeChange~617 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=cdb7b631b0eb7d70099a81b140cbb37483d4d20d;p=IRC.git Various bug fixes Able to run multithreaded versions on 2 diff. machines --- diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 1a714de0..7b693293 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,8 +1,11 @@ client: - gcc -lpthread -g -O0 -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c server: - gcc -lpthread -g -O0 -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c +all: + gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c clean: rm client server diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 5b791c35..5d292c9d 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -76,7 +76,7 @@ void *dstmListen() void *dstmAccept(void *acceptfd) { - int numbytes,i, val; + int numbytes,i, val, retval; unsigned int oid; char buffer[RECEIVE_BUFFER_SIZE], control,ctrl; char *ptr; @@ -87,11 +87,17 @@ void *dstmAccept(void *acceptfd) int fd_flags = fcntl((int)acceptfd, F_GETFD), size; printf("Recieved connection: fd = %d\n", (int)acceptfd); - recv((int)acceptfd, &control, sizeof(char), 0); + if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { + perror("Error in receiving control from coordinator\n"); + return; + } switch(control) { case READ_REQUEST: printf("DEBUG -> Recv READ_REQUEST from Coordinator\n"); - recv((int)acceptfd, &oid, sizeof(unsigned int), 0); + if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { + perror("Error receiving object from cooridnator\n"); + return; + } srcObj = mhashSearch(oid); h = (objheader_t *) srcObj; size = sizeof(objheader_t) + sizeof(classsize[h->type]); @@ -145,7 +151,7 @@ void *dstmAccept(void *acceptfd) break; default: - printf("Error receiving\n"); + printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); } if (close((int)acceptfd) == -1) { @@ -166,7 +172,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { void *modptr, *header; objheader_t *tmp_header; fixed_data_t fixed; - int sum = 0, i, N, n, val; + int sum = 0, i, N, n, val, retval; //Reads to process the TRANS_REQUEST protocol further // Read fixed_data @@ -196,33 +202,43 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { int numread = fixed.numread; N = numread * (sizeof(unsigned int) + sizeof(short)); char objread[N]; - sum = 0; - do { - n = recv((int)acceptfd, (void *) objread, N, 0); - // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N); - sum += n; - } while(sum < N && n != 0); - //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18)); - - // Read modified objects - if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { - printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__); - return 1; + if(numread != 0) { // If pile contains objects to be read + // N = numread * (sizeof(unsigned int) + sizeof(short)); + // char objread[N]; + sum = 0; + do { + n = recv((int)acceptfd, (void *) objread, N, 0); + // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N); + sum += n; + } while(sum < N && n != 0); +// printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18)); } - sum = 0; - do { - n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0); - //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr)); - sum += n; - } while (sum < fixed.sum_bytes && n != 0); + // Read modified objects + if(fixed.nummod != 0) { // If pile contains modified objects + if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { + printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__); + return 1; + } + sum = 0; + do { // Recv the objs that are modified at Coordinator + n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0); + // printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr)); + sum += n; + } while (sum < fixed.sum_bytes && n != 0); + } + //Send control message as per all votes from all oids in the machine if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { printf("Handle req error\n"); } - + //Read for new control message from Coordiator - recv((int)acceptfd, &control, sizeof(char), 0); + if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { + perror("Error in receiving control message"); + return 1; + } + switch(control) { case TRANS_ABORT: printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n"); @@ -230,6 +246,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { sendctrl = TRANS_SUCESSFUL; if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) { perror("Error sending ACK to coordinator\n"); + return 1; } //Mark all ref counts as 1 and do garbage collection ptr = modptr; @@ -292,10 +309,10 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { perror("Error sending ACK to coordinator\n"); } } - + break; } - + break; case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: //TODO expect another transrequest from client @@ -313,6 +330,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { //This function runs a decision after all objects are weighed under one of the 4 possibilities //and returns the appropriate control message to the Ccordinator char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) { + int val; short version; char control = 0, ctrlmissoid, *ptr; int i, j = 0; @@ -331,7 +349,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran //Process each object present in the pile ptr = modptr; - printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread); + //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread); fflush(stdout); //Process each oid in the machine pile/ group for (i = 0; i < fixed->numread + fixed->nummod; i++) { @@ -363,7 +381,10 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran v_nomatch++; //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; - write(acceptfd, &control, sizeof(char)); + if((val = write(acceptfd, &control, sizeof(char))) <= 0) { + perror("Error in sending control to the Coordinator\n"); + return 0; + } printf("DEBUG -> Sending TRANS_DISAGREE\n"); return control; } @@ -371,7 +392,6 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran ((objheader_t *)mobj)->status |= LOCK; //Save all object oids that are locked on this machine during this transaction request call oidlocked[objlocked] = ((objheader_t *)mobj)->oid; - printf("DEBUG-> Object to be locked is %d\n", ((objheader_t *)mobj)->oid); objlocked++; if (version == ((objheader_t *)mobj)->version) { //If versions match v_matchnolock++; @@ -379,7 +399,10 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran v_nomatch++; //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; - write(acceptfd, &control, sizeof(char)); + if((val = write(acceptfd, &control, sizeof(char))) <= 0) { + perror("Error in sending control to the Coordinator\n"); + return 0; + } printf("DEBUG -> Sending TRANS_DISAGREE\n"); return control; } @@ -387,30 +410,43 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran } } - printf("No of objs locked = %d\n", objlocked); - printf("No of v_nomatch = %d\n", v_nomatch); - printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock); - printf("No of objs v_match but had locks before = %d\n", v_matchlock); - printf("No of objs not found = %d\n", objnotfound); - printf("No of objs modified but not found = %d\n", objmodnotfound); + //printf("No of objs locked = %d\n", objlocked); + //printf("No of v_nomatch = %d\n", v_nomatch); + //printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock); + //printf("No of objs v_match but had locks before = %d\n", v_matchlock); + //printf("No of objs not found = %d\n", objnotfound); + //printf("No of objs modified but not found = %d\n", objmodnotfound); //Decide what control message(s) to send if(v_matchnolock == fixed->numread + fixed->nummod) { //send TRANS_AGREE to Coordinator control = TRANS_AGREE; - write(acceptfd, &control, sizeof(char)); + if((val = write(acceptfd, &control, sizeof(char))) <= 0) { + perror("Error in sending control to Coordinator\n"); + return 0; + } printf("DEBUG -> Sending TRANS_AGREE\n"); } if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { //send TRANS_SOFT_ABORT to Coordinator control = TRANS_SOFT_ABORT; - write(acceptfd, &control, sizeof(char)); + if((val = write(acceptfd, &control, sizeof(char))) <=0 ) { + perror("Error in sending control back to coordinator\n"); + return 0; + } printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); //send number of oids not found and the missing oids - write(acceptfd, &objnotfound, sizeof(int)); - if(objnotfound != 0) - write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound)); + if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) { + perror("Error in sending no of objects that are not found\n"); + return 0; + } + if(objnotfound != 0) { + if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) { + perror("Error in sending objects that are not found\n"); + return 0; + } + } } //Do the following when TRANS_DISAGREE is sent diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 97190542..7bf85693 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -48,6 +48,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi offset += sizeof(unsigned int); memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short)); tmp->numread = tmp->numread + 1; + // printf("DEBUG->pInsert() No of obj read = %d\n", tmp->numread); } found = 1; break; @@ -67,13 +68,13 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi } else { ptr->oidread[ptr->numread] = headeraddr->oid; memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int)); - //printf("DEBUG -> objread oid is %d\n", *(ptr->objread)); memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short)); ptr->numread = ptr->numread + 1; } ptr->next = pile; pile = ptr; } + return pile; } diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index 14c573b5..688d44a3 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -1,10 +1,11 @@ #include +#include #include "dstm.h" #include "llookup.h" #include "ip.h" -#include -#include -#include +//#include +//#include +//#include extern objstr_t *mainobjstore; //extern lhashtable_t llookup; //Global Hash table @@ -17,7 +18,6 @@ int test2(void); unsigned int createObjects(transrecord_t *record) { objheader_t *header, *tmp; - struct sockaddr_in antelope; unsigned int size, mid; int i = 0; for(i = 20 ; i< 23; i++) { @@ -33,11 +33,9 @@ unsigned int createObjects(transrecord_t *record) { header = (objheader_t *) objstrAlloc(mainobjstore, size); memcpy(header, tmp, size); mhashInsert(header->oid, header); - //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope - //mid = iptoMid(inet_ntoa(antelope.sin_addr)); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.27");//machine d-2 + printf("DEBUG -> createObjects mid is %x\n", mid); lhashInsert(header->oid, mid); - // lhashInsert(header->oid, 1); } // printf("Insert oid = %d at address %x\n",tmp->oid, tmp); size = sizeof(objheader_t) + classsize[0] ; @@ -49,9 +47,7 @@ unsigned int createObjects(transrecord_t *record) { header->status = 0; header->status |= NEW; mhashInsert(header->oid, header); - //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope - //mid = iptoMid(inet_ntoa(antelope.sin_addr)); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.27"); lhashInsert(header->oid, mid); size = sizeof(objheader_t) + classsize[1] ; header = (objheader_t *) objstrAlloc(mainobjstore, size); @@ -62,9 +58,7 @@ unsigned int createObjects(transrecord_t *record) { header->status = 0; header->status |= LOCK; mhashInsert(header->oid, header); - //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope - //mid = iptoMid(inet_ntoa(antelope.sin_addr)); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.27"); lhashInsert(header->oid, mid); size = sizeof(objheader_t) + classsize[2] ; header = (objheader_t *) objstrAlloc(mainobjstore, size); @@ -75,9 +69,7 @@ unsigned int createObjects(transrecord_t *record) { header->status = 0; header->status |= LOCK; mhashInsert(header->oid, header); - //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope - //mid = iptoMid(inet_ntoa(antelope.sin_addr)); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.27"); lhashInsert(header->oid, mid); return 0; } @@ -207,20 +199,53 @@ int test4(void) { //trans commit int test5(void) { transrecord_t *record; - unsigned int mid; + objheader_t *header; + unsigned int size, mid; + pthread_t thread_Listen; + pthread_attr_t attr; objheader_t *h1,*h2, *h3, *h4; dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + record = transStart(); printf("DEBUG -> Init done\n"); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu lhashInsert(1,mid); lhashInsert(2,mid); lhashInsert(3,mid); lhashInsert(4,mid); lhashInsert(5,mid); lhashInsert(6,mid); - createObjects(record); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + //Create and Insert Oid 20 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 20; + header->type = 2; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 21 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 21; + header->type = 1; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); //read object 1 if((h1 = transRead(record, 1)) == NULL){ printf("Object not found\n"); @@ -239,4 +264,5 @@ int test5(void) { } transCommit(record); + pthread_join(thread_Listen, NULL); } diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index bf1d57f5..46f17bb1 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -19,7 +19,7 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) { tmp = (objheader_t *) objstrAlloc(mainobjstore, size); memcpy(tmp, header, size); mhashInsert(tmp->oid, tmp); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.10"); lhashInsert(tmp->oid, mid); //Lock oid 3 object // if(tmp->oid == 3) @@ -79,7 +79,8 @@ int test2() { pthread_t thread_Listen; dstmInit(); - mid = iptoMid("127.0.0.1"); + mid = iptoMid("128.200.9.27"); + //Inserting into lhashtable lhashInsert(20, mid); lhashInsert(21, mid); lhashInsert(22, mid); @@ -117,6 +118,4 @@ int test2() { printf("Error transCreateObj6"); } pthread_join(thread_Listen, NULL); - - } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d4d9bb17..369b3453 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -38,7 +38,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { //Look up in Machine lookup table and found - printf("oid not found in local cache\n"); + printf("oid is found in Local mlookup\n"); tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; //Copy into cache @@ -49,11 +49,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objcopy); } else { //Get the object from the remote location - //printf("oid not found in local machine lookup\n"); - printf("machinenumber = %d\n",machinenumber); - printf("oid = %d\n",oid); + printf("oid is found in remote machine\n"); machinenumber = lhashSearch(oid); - printf("machinenumber = %d\n",machinenumber); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { //If object is not found in Remote location @@ -79,11 +76,12 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) } //int decideResponse(thread_data_array_t *tdata, char *control, int sd) { int decideResponse(thread_data_array_t *tdata, int sd, int val) { - int i, n, N, sum, oidcount = 0; + int i, n, N, sum, retval, oidcount = 0; int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0; char ctrl, control, *ptr; unsigned int *oidnotfound; objheader_t *header; + //Check common data structure for (i = 0 ; i < tdata->pilecount ; i++) { @@ -99,9 +97,11 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { free(tdata->rec); //send Abort ctrl = TRANS_ABORT; - if (write(sd, &ctrl, sizeof(char)) < 0) { - perror("Error sending ctrl message for participant\n"); - return 1; + for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved + if (write(sd, &ctrl, sizeof(char)) < 0) { + perror("Error sending ctrl message for participant\n"); + return 1; + } } return 0; @@ -145,24 +145,29 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { //Send Commit ctrl = TRANS_COMMIT; printf("Sending TRANS_COMMIT\n"); - if (write(sd, &ctrl, sizeof(char)) < 0) { + if((retval = write(sd, &ctrl, sizeof(char))) < 0) { perror("Error sending ctrl message for participant\n"); return 1; } + //printf("Sending control %d ,sd = %d\n", ctrl, i, sd); } if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) { //Send abort but retry commit ctrl = TRANS_ABORT_BUT_RETRY_COMMIT; printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n"); - if (write(sd, &ctrl, sizeof(char)) < 0) { + if((retval = write(sd, &ctrl, sizeof(char))) <= 0) { perror("Error sending ctrl message for participant\n"); return 1; } - //Sleep + //Sleep and the resend the request sleep(5); //Read new control message from Participant - n = read(sd, &control, sizeof(char)); + + if((n = read(sd, &control, sizeof(char))) <= 0) { + perror("No bytes are read for participant\n"); + return 1; + } //Update common data structure and increment count tdata->recvmsg[tdata->thread_id].rcv_status = control; @@ -171,17 +176,17 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { } if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) { - //Send abort but retry commit after relloking up objects - //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING; + //Send abort but retry commit after relooking up objects ctrl = TRANS_ABORT; printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n"); - if (write(sd, &ctrl, sizeof(char)) < 0) { + if((retval = write(sd, &ctrl, sizeof(char))) < 0) { perror("Error sending ctrl message for participant\n"); return 1; } //TODO //Relook up objects //update location table + //Free pointers free(oidnotfound); } @@ -200,7 +205,6 @@ void *transRequest(void *threadarg) { char machineip[16]; tdata = (thread_data_array_t *) threadarg; - printf("DEBUG -> New thread id %d\n", tdata->thread_id); //Send Trans Request if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST"); @@ -213,7 +217,6 @@ void *transRequest(void *threadarg) { midtoIP(tdata->mid,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); - //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid); if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST"); @@ -237,7 +240,7 @@ void *transRequest(void *threadarg) { } //Send oids and version number tuples for objects that are read // printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread); -// printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); +// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) { perror("Error sending tuples for thread"); return NULL; @@ -245,7 +248,6 @@ void *transRequest(void *threadarg) { //Send objects that are modified for(i = 0; i < tdata->buffer->f.nummod ; i++) { headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); -// printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]); if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) { perror("Error sending obj modified for thread"); return NULL; @@ -253,9 +255,11 @@ void *transRequest(void *threadarg) { } //Read message control message from participant side - n = read(sd, &control, sizeof(char)); + if((n = read(sd, &control, sizeof(char))) <= 0) { + perror("Error in reading control message from Participant\n"); + return NULL; + } recvcontrol = control; - printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol); //Update common data structure and increment count tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; @@ -266,15 +270,16 @@ void *transRequest(void *threadarg) { if(*(tdata->count) == tdata->pilecount) { pthread_cond_broadcast(tdata->threshold); - //process the participant's request - if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0 - printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - return NULL; - } } else { pthread_cond_wait(tdata->threshold, tdata->lock); } + + //process the participant's request + if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0 + printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(tdata->lock); + return NULL; + } pthread_mutex_unlock(tdata->lock); close(sd); @@ -292,6 +297,7 @@ int transCommit(transrecord_t *record) { char buffer[RECEIVE_BUFFER_SIZE],control; char transid[TID_LEN]; static int newtid = 0; + trans_req_data_t *tosend; ptr = record->lookupTable->table; size = record->lookupTable->size; @@ -337,7 +343,10 @@ int transCommit(transrecord_t *record) { pthread_cond_t tcond; pthread_mutex_t tlock; pthread_mutex_t tlshrd; - thread_data_array_t thread_data_array[pilecount]; + //thread_data_array_t thread_data_array[pilecount]; + thread_data_array_t *thread_data_array; + + thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants @@ -356,10 +365,9 @@ int transCommit(transrecord_t *record) { pListMid(pile, listmid); //Process each machine group while(tmp != NULL) { - printf("DEBUG -> Created thread %d... \n", numthreads); //Create transaction id newtid++; - trans_req_data_t *tosend; + //trans_req_data_t *tosend; if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); return 1; @@ -388,9 +396,10 @@ int transCommit(transrecord_t *record) { perror("Error in pthread create"); return 1; } + numthreads++; //TODO frees - free(tosend); + //free(tosend); tmp = tmp->next; } @@ -408,6 +417,9 @@ int transCommit(transrecord_t *record) { //Free resources pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); +// for(i = 0 ;i< pilecount ;i++) { + free(tosend); +// } free(listmid); pDelete(pile); return 0; @@ -415,7 +427,7 @@ int transCommit(transrecord_t *record) { //mnun will be used to represent the machine IP address later void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { - int sd, size; + int sd, size, val; struct sockaddr_in serv_addr; struct hostent *server; char control; @@ -424,7 +436,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { void *objcopy; if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket"); + perror("Error in socket\n"); return NULL; } bzero((char*) &serv_addr, sizeof(serv_addr)); @@ -436,14 +448,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { serv_addr.sin_addr.s_addr = inet_addr(machineip); if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect"); + perror("Error in connect\n"); return NULL; } char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; if (write(sd, &readrequest, sizeof(readrequest)) < 0) { - perror("Error sending message"); + perror("Error sending message\n"); return NULL; } @@ -451,15 +463,24 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { printf("DEBUG -> ready to rcv ...\n"); #endif //Read response from the Participant - read(sd, &control, sizeof(char)); + if((val = read(sd, &control, sizeof(char))) <= 0) { + perror("No control response for getRemoteObj sent\n"); + return NULL; + } switch(control) { case OBJECT_NOT_FOUND: return NULL; break; case OBJECT_FOUND: - read(sd, &size, sizeof(int)); + if((val = read(sd, &size, sizeof(int))) <= 0) { + perror("No size is read from the participant\n"); + return NULL; + } objcopy = objstrAlloc(record->cache, size); - read(sd, objcopy, size); + if((val = read(sd, objcopy, size)) <= 0) { + perror("No objects are read from the remote participant\n"); + return NULL; + } //Insert into cache's lookup table chashInsert(record->lookupTable, oid, objcopy); break; @@ -467,5 +488,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { printf("Error in recv request from participant on a READ_REQUEST\n"); return NULL; } + close(sd); return objcopy; }