From: bdemsky Date: Thu, 14 Jun 2007 18:09:46 +0000 (+0000) Subject: my changes: X-Git-Tag: preEdgeChange~549 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=14f0a25500a9c66f23a1713a9e99ac4389ad4632;p=IRC.git my changes: 1) Handle sigpipe 2) allow quick restart of the server...deal with the port issue 3) combine some requests 4) remove commented out code to make it readable --- diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 6ddba666..2f841f28 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -12,5 +12,11 @@ all: gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + +mac: + gcc -DMAC -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -DMAC -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -DMAC -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + clean: rm -rf d-2 d-1 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index eb0f655c..de0af029 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -1,6 +1,10 @@ #ifndef _DSTM_H_ #define _DSTM_H_ +#ifdef MAC +#define MSG_NOSIGNAL 0 +#endif + //Coordinator Messages #define READ_REQUEST 1 #define READ_MULT_REQUEST 2 diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 5e9aa599..34487ff0 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -26,9 +26,6 @@ int dstmInit(void) if (lhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure - //pthread_t threadListen; - //pthread_create(&threadListen, NULL, dstmListen, NULL); - return 0; } @@ -40,6 +37,7 @@ void *dstmListen() socklen_t addrlength = sizeof(struct sockaddr); pthread_t thread_dstm_accept; int i; + int setsockflag=1; listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd == -1) @@ -48,6 +46,17 @@ void *dstmListen() exit(1); } + if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } +#ifdef MAC + if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } +#endif + my_addr.sin_family = AF_INET; my_addr.sin_port = htons(LISTEN_PORT); my_addr.sin_addr.s_addr = INADDR_ANY; @@ -103,31 +112,19 @@ void *dstmAccept(void *acceptfd) size = sizeof(objheader_t) + sizeof(classsize[h->type]); if (h == NULL) { ctrl = OBJECT_NOT_FOUND; - if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) { + if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending control msg to coordinator\n"); } } else { - //char responsemessage[sizeof(char)+sizeof(int)]; /* Type */ - ctrl = OBJECT_FOUND; - if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) { - perror("Error sending control msg to coordinator\n"); - } - - //responsemessage[0]=OBJECT_FOUND; - /* Size of object */ - //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type]; - //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) { - // perror("Error sending control msg to coordinator\n"); - //} - - /* Size of object */ - if(send((int)acceptfd, &size, sizeof(int), 0) < 0) { - perror("Error sending size of object to coordinator\n"); - } - if(send((int)acceptfd, h, size, 0) < 0) { - perror("Error in sending object\n"); - } + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) { + perror("Error sending size of object to coordinator\n"); + } + if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) { + perror("Error in sending object\n"); + } } break; @@ -144,7 +141,7 @@ void *dstmAccept(void *acceptfd) break; case TRANS_REQUEST: - printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd); + printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n"); if((val = readClientReq((int)acceptfd, &transinfo)) != 0) { printf("Error in readClientReq\n"); } @@ -176,11 +173,9 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { fixed.control = TRANS_REQUEST; do { n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0); - // printf("DEBUG -> 1. Reading %d bytes \n", n); sum += n; } while(sum < N && n != 0); - //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes); // Read list of mids int mcount = fixed.mcount; N = mcount * sizeof(unsigned int); @@ -189,7 +184,6 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { sum = 0; do { n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0); - // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N); sum += n; } while(sum < N && n != 0); @@ -198,15 +192,11 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { N = numread * (sizeof(unsigned int) + sizeof(short)); char objread[N]; if(numread != 0) { // If pile contains objects to be read - // N = numread * (sizeof(unsigned int) + sizeof(short)); - // char objread[N]; sum = 0; do { n = recv((int)acceptfd, (void *) objread, N, 0); - // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N); sum += n; } while(sum < N && n != 0); -// printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18)); } // Read modified objects @@ -218,31 +208,27 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { sum = 0; do { // Recv the objs that are modified at Coordinator n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0); - // printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr)); sum += n; } while (sum < fixed.sum_bytes && n != 0); } //Send control message as per all votes from all oids in the machine - if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) { - printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__); + if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { + printf("Handle req error\n"); } //Read for new control message from Coordiator if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { - printf("DEBUG -> Error receiving control, received %d\n", control); + perror("Error in receiving control message"); return 1; } - printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control); - fflush(stdout); - switch(control) { case TRANS_ABORT: - printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ; + printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n"); //send ack to coordinator sendctrl = TRANS_SUCESSFUL; - if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) { + if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); return 1; } @@ -267,26 +253,25 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } break; case TRANS_ABORT_BUT_RETRY_COMMIT: - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd); + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n"); //Process again after waiting for sometime and on prev control message sent sleep(2); switch(prevctrl) { case TRANS_AGREE: sendctrl = TRANS_AGREE; - if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) { + if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); } - //sleep(5); break; case TRANS_SOFT_ABORT: if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { - printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__); + printf("Handle req error\n"); } //If no change in previous control message that was sent then ABORT transaction if(newctrl == TRANS_SOFT_ABORT){ //Send ABORT newctrl = TRANS_DISAGREE; - if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) { + if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); } //Set the reference count of the object to 1 in mainstore for garbage collection @@ -301,11 +286,10 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { ptr = mhashSearch(transinfo->objlocked[i]);// find the header address ((objheader_t *)ptr)->status &= ~(LOCK); } - // return 0; } else if(newctrl == TRANS_AGREE) { newctrl = TRANS_AGREE; //Send new control message - if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) { + if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); } } @@ -316,14 +300,13 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { break; case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: //TODO expect another transrequest from client - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd); + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n"); break; default: - printf("No response to TRANS_AGREE OR DISAGREE control\n"); + printf("No response to TRANS_AGREE OR DISAGREE protocol\n"); //TODO Use fixed.trans_id TID since Client may have died break; } - //Free memory printf("DEBUG -> Freeing..."); fflush(stdout); @@ -363,7 +346,8 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran //Process each object present in the pile ptr = modptr; - + //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread); + fflush(stdout); //Process each oid in the machine pile/ group for (i = 0; i < fixed->numread + fixed->nummod; i++) { if (i < fixed->numread) {//Object is read @@ -394,18 +378,20 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran v_nomatch++; //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; - if((val = write(acceptfd, &control, sizeof(char))) <= 0) { + if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd); + printf("DEBUG -> Sending TRANS_DISAGREE\n"); return control; } } else {//Obj is not locked , so lock object ((objheader_t *)mobj)->status |= LOCK; + //FOR TESTING + sleep(1); //Save all object oids that are locked on this machine during this transaction request call oidlocked[objlocked] = ((objheader_t *)mobj)->oid; - printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid); + printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid); objlocked++; if (version == ((objheader_t *)mobj)->version) { //If versions match v_matchnolock++; @@ -413,11 +399,11 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran v_nomatch++; //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; - if((val = write(acceptfd, &control, sizeof(char))) <= 0) { + if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd); + printf("DEBUG -> Sending TRANS_DISAGREE\n"); return control; } } @@ -436,28 +422,28 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran if(v_matchnolock == fixed->numread + fixed->nummod) { //send TRANS_AGREE to Coordinator control = TRANS_AGREE; - if((val = write(acceptfd, &control, sizeof(char))) <= 0) { + if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd); + printf("DEBUG -> Sending TRANS_AGREE\n"); } //Condition to send TRANS_SOFT_ABORT if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { //send TRANS_SOFT_ABORT to Coordinator control = TRANS_SOFT_ABORT; - if((val = write(acceptfd, &control, sizeof(char))) <=0 ) { - perror("Error in sending control back to coordinator\n"); - return 0; - } - printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd); + char msg[]={TRANS_SOFT_ABORT, 0,0,0,0}; + *((int*)&msg[1])=objnotfound; + + printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); //send number of oids not found and the missing oids - if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) { + if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) { perror("Error in sending no of objects that are not found\n"); return 0; } if(objnotfound != 0) { - if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) { + int size=sizeof(unsigned int)*objnotfound; + if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) { perror("Error in sending objects that are not found\n"); return 0; } @@ -500,12 +486,12 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { //Process each modified object saved in the mainobject store for(i=0; inummod; i++) { if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { - printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__); + printf("mhashserach returns NULL\n"); } //change reference count of older address and free space in objstr ?? header->rcount = 1; //Not sure what would be th val //change ptr address in mhash table - printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]); + printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]); mhashRemove(transinfo->objmod[i]); mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); offset += sizeof(objheader_t) + classsize[header->type]; @@ -524,8 +510,8 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { //send ack to coordinator control = TRANS_SUCESSFUL; //FOR TESTING - printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd); - if(send((int)acceptfd, &control, sizeof(char), 0) < 0) { + printf("DEBUG-> Transaction is SUCCESSFUL \n"); + if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 2c47e2ac..0af91b9d 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -26,7 +26,7 @@ transrecord_t *transStart() objheader_t *transRead(transrecord_t *record, unsigned int oid) { -// printf("Enter TRANS_READ\n"); + printf("Enter TRANS_READ\n"); unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; @@ -34,7 +34,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) void *buf; //check cache if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ - printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__); + printf("DEBUG -> transRead oid %d found local\n", oid); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { //Look up in Machine lookup table and found @@ -55,7 +55,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { //If object is not found in Remote location - printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__); + printf("Object oid = %d not found in Machine %d\n", oid, machinenumber); return NULL; } else { @@ -85,14 +85,14 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { unsigned int *oidnotfound; objheader_t *header; -// printf("DEBUG -> pilecount is %d\n", tdata->pilecount); + printf("DEBUG -> pilecount is %d\n", tdata->pilecount); //Check common data structure for (i = 0 ; i < tdata->pilecount ; i++) { //Switch case control = tdata->recvmsg[i].rcv_status; switch(control) { case TRANS_DISAGREE: -// printf("DEBUG-> Inside TRANS_DISAGREE\n"); + printf("DEBUG-> Inside TRANS_DISAGREE\n"); transdisagree++; //Free transaction records objstrDelete(tdata->rec->cache); @@ -101,7 +101,7 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { //send Abort ctrl = TRANS_ABORT; for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved - if (write(sd, &ctrl, sizeof(char)) < 0) { + if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ctrl message for participant\n"); return 1; } @@ -109,13 +109,13 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { return 0; case TRANS_AGREE: - printf("Inside TRANS_AGREE\n"); + printf("DEBUG-> Inside TRANS_AGREE\n"); PRINT_TID(tdata); transagree++; break; case TRANS_SOFT_ABORT: - printf("Inside TRANS_SOFT_ABORT\n"); + printf("DEBUG-> Inside TRANS_SOFT_ABORT\n"); transsoftabort++; /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */ if ((i == tdata->thread_id) && (val == 0)) { @@ -137,6 +137,7 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { } while(sum < N && n !=0); } } + break; default: printf("Participant sent unknown message\n"); @@ -147,8 +148,8 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { if(transagree == tdata->pilecount){ //Send Commit ctrl = TRANS_COMMIT; - printf("Sending TRANS_COMMIT accept_fd = %d\n", sd); - if((retval = write(sd, &ctrl, sizeof(char))) < 0) { + printf("Sending TRANS_COMMIT\n"); + if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { perror("Error sending ctrl message for participant\n"); return 1; } @@ -158,11 +159,12 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) { //Send abort but retry commit ctrl = TRANS_ABORT_BUT_RETRY_COMMIT; - printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd); - if((retval = write(sd, &ctrl, sizeof(char))) <= 0) { + printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n"); + if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { perror("Error sending ctrl message for participant\n"); return 1; } + /* //Sleep and the resend the request sleep(2); //Read new control message from Participant @@ -176,13 +178,14 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) { tdata->recvmsg[tdata->thread_id].rcv_status = control; val = 1; decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1 + */ } if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) { //Send abort but retry commit after relooking up objects ctrl = TRANS_ABORT; printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n"); - if((retval = write(sd, &ctrl, sizeof(char))) < 0) { + if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error sending ctrl message for participant\n"); return 1; } @@ -231,30 +234,38 @@ void *transRequest(void *threadarg) { printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control); // printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t)); // printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes); - if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) { + if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { perror("Error sending fixed bytes for thread"); return NULL; } //Send list of machines involved in the transaction // printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); - if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) { - perror("Error sending list of machines for thread"); - return NULL; + { + int size=sizeof(unsigned int)*tdata->pilecount; + if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { + perror("Error sending list of machines for thread"); + return NULL; + } } //Send oids and version number tuples for objects that are read // printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread); // printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); - if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) { - perror("Error sending tuples for thread"); - return NULL; + { + int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; + if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { + perror("Error sending tuples for thread"); + return NULL; + } } //Send objects that are modified for(i = 0; i < tdata->buffer->f.nummod ; i++) { - headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); - if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) { - perror("Error sending obj modified for thread"); - return NULL; - } + int size; + headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + size=sizeof(objheader_t)+classsize[headeraddr->type]; + if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { + perror("Error sending obj modified for thread"); + return NULL; + } } //Read message control message from participant side @@ -457,7 +468,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; - if (write(sd, &readrequest, sizeof(readrequest)) < 0) { + if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { perror("Error sending message\n"); return NULL; }