if (lhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
- //pthread_t threadListen;
- //pthread_create(&threadListen, NULL, dstmListen, NULL);
-
return 0;
}
socklen_t addrlength = sizeof(struct sockaddr);
pthread_t thread_dstm_accept;
int i;
+ int setsockflag=1;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1)
exit(1);
}
+ if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
+#ifdef MAC
+ if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
+#endif
+
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(LISTEN_PORT);
my_addr.sin_addr.s_addr = INADDR_ANY;
size = sizeof(objheader_t) + sizeof(classsize[h->type]);
if (h == NULL) {
ctrl = OBJECT_NOT_FOUND;
- if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
+ if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending control msg to coordinator\n");
}
} else {
- //char responsemessage[sizeof(char)+sizeof(int)];
/* Type */
- ctrl = OBJECT_FOUND;
- if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
- perror("Error sending control msg to coordinator\n");
- }
-
- //responsemessage[0]=OBJECT_FOUND;
- /* Size of object */
- //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
- //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
- // perror("Error sending control msg to coordinator\n");
- //}
-
- /* Size of object */
- if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
- perror("Error sending size of object to coordinator\n");
- }
- if(send((int)acceptfd, h, size, 0) < 0) {
- perror("Error in sending object\n");
- }
+ char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+ *((int *)&msg[1])=size;
+ if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
+ perror("Error sending size of object to coordinator\n");
+ }
+ if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
+ perror("Error in sending object\n");
+ }
}
break;
break;
case TRANS_REQUEST:
- printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd);
+ printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
printf("Error in readClientReq\n");
}
fixed.control = TRANS_REQUEST;
do {
n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
- // printf("DEBUG -> 1. Reading %d bytes \n", n);
sum += n;
} while(sum < N && n != 0);
- //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
// Read list of mids
int mcount = fixed.mcount;
N = mcount * sizeof(unsigned int);
sum = 0;
do {
n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
- // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
sum += n;
} while(sum < N && n != 0);
N = numread * (sizeof(unsigned int) + sizeof(short));
char objread[N];
if(numread != 0) { // If pile contains objects to be read
- // N = numread * (sizeof(unsigned int) + sizeof(short));
- // char objread[N];
sum = 0;
do {
n = recv((int)acceptfd, (void *) objread, N, 0);
- // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
sum += n;
} while(sum < N && n != 0);
-// printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
}
// Read modified objects
sum = 0;
do { // Recv the objs that are modified at Coordinator
n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
- // printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
sum += n;
} while (sum < fixed.sum_bytes && n != 0);
}
//Send control message as per all votes from all oids in the machine
- if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) {
- printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__);
+ if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
+ printf("Handle req error\n");
}
//Read for new control message from Coordiator
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
- printf("DEBUG -> Error receiving control, received %d\n", control);
+ perror("Error in receiving control message");
return 1;
}
- printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control);
- fflush(stdout);
-
switch(control) {
case TRANS_ABORT:
- printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ;
+ printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
//send ack to coordinator
sendctrl = TRANS_SUCESSFUL;
- if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+ if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
return 1;
}
}
break;
case TRANS_ABORT_BUT_RETRY_COMMIT:
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd);
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
//Process again after waiting for sometime and on prev control message sent
sleep(2);
switch(prevctrl) {
case TRANS_AGREE:
sendctrl = TRANS_AGREE;
- if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+ if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
}
- //sleep(5);
break;
case TRANS_SOFT_ABORT:
if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
- printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__);
+ printf("Handle req error\n");
}
//If no change in previous control message that was sent then ABORT transaction
if(newctrl == TRANS_SOFT_ABORT){
//Send ABORT
newctrl = TRANS_DISAGREE;
- if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
+ if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
}
//Set the reference count of the object to 1 in mainstore for garbage collection
ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
((objheader_t *)ptr)->status &= ~(LOCK);
}
- // return 0;
} else if(newctrl == TRANS_AGREE) {
newctrl = TRANS_AGREE;
//Send new control message
- if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
+ if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
}
}
break;
case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
//TODO expect another transrequest from client
- printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd);
+ printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
break;
default:
- printf("No response to TRANS_AGREE OR DISAGREE control\n");
+ printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
//TODO Use fixed.trans_id TID since Client may have died
break;
}
-
//Free memory
printf("DEBUG -> Freeing...");
fflush(stdout);
//Process each object present in the pile
ptr = modptr;
-
+ //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
+ fflush(stdout);
//Process each oid in the machine pile/ group
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) {//Object is read
v_nomatch++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
- if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+ if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd);
+ printf("DEBUG -> Sending TRANS_DISAGREE\n");
return control;
}
} else {//Obj is not locked , so lock object
((objheader_t *)mobj)->status |= LOCK;
+ //FOR TESTING
+ sleep(1);
//Save all object oids that are locked on this machine during this transaction request call
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
- printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid);
+ printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
v_matchnolock++;
v_nomatch++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
- if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+ if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
+ printf("DEBUG -> Sending TRANS_DISAGREE\n");
return control;
}
}
if(v_matchnolock == fixed->numread + fixed->nummod) {
//send TRANS_AGREE to Coordinator
control = TRANS_AGREE;
- if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+ if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd);
+ printf("DEBUG -> Sending TRANS_AGREE\n");
}
//Condition to send TRANS_SOFT_ABORT
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
//send TRANS_SOFT_ABORT to Coordinator
control = TRANS_SOFT_ABORT;
- if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
- perror("Error in sending control back to coordinator\n");
- return 0;
- }
- printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd);
+ char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
+ *((int*)&msg[1])=objnotfound;
+
+ printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
//send number of oids not found and the missing oids
- if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
+ if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
perror("Error in sending no of objects that are not found\n");
return 0;
}
if(objnotfound != 0) {
- if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
+ int size=sizeof(unsigned int)*objnotfound;
+ if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
perror("Error in sending objects that are not found\n");
return 0;
}
//Process each modified object saved in the mainobject store
for(i=0; i<transinfo->nummod; i++) {
if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
- printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__);
+ printf("mhashserach returns NULL\n");
}
//change reference count of older address and free space in objstr ??
header->rcount = 1; //Not sure what would be th val
//change ptr address in mhash table
- printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]);
+ printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
mhashRemove(transinfo->objmod[i]);
mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
offset += sizeof(objheader_t) + classsize[header->type];
//send ack to coordinator
control = TRANS_SUCESSFUL;
//FOR TESTING
- printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd);
- if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
+ printf("DEBUG-> Transaction is SUCCESSFUL \n");
+ if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
}
objheader_t *transRead(transrecord_t *record, unsigned int oid)
{
-// printf("Enter TRANS_READ\n");
+ printf("Enter TRANS_READ\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
void *buf;
//check cache
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
- printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__);
+ printf("DEBUG -> transRead oid %d found local\n", oid);
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
- printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__);
+ printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
return NULL;
}
else {
unsigned int *oidnotfound;
objheader_t *header;
-// printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
+ printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
//Switch case
control = tdata->recvmsg[i].rcv_status;
switch(control) {
case TRANS_DISAGREE:
-// printf("DEBUG-> Inside TRANS_DISAGREE\n");
+ printf("DEBUG-> Inside TRANS_DISAGREE\n");
transdisagree++;
//Free transaction records
objstrDelete(tdata->rec->cache);
//send Abort
ctrl = TRANS_ABORT;
for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
return 1;
}
return 0;
case TRANS_AGREE:
- printf("Inside TRANS_AGREE\n");
+ printf("DEBUG-> Inside TRANS_AGREE\n");
PRINT_TID(tdata);
transagree++;
break;
case TRANS_SOFT_ABORT:
- printf("Inside TRANS_SOFT_ABORT\n");
+ printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
transsoftabort++;
/* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
if ((i == tdata->thread_id) && (val == 0)) {
} while(sum < N && n !=0);
}
}
+
break;
default:
printf("Participant sent unknown message\n");
if(transagree == tdata->pilecount){
//Send Commit
ctrl = TRANS_COMMIT;
- printf("Sending TRANS_COMMIT accept_fd = %d\n", sd);
- if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
+ printf("Sending TRANS_COMMIT\n");
+ if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
return 1;
}
if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
//Send abort but retry commit
ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
- printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd);
- if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
+ printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
+ if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
return 1;
}
+ /*
//Sleep and the resend the request
sleep(2);
//Read new control message from Participant
tdata->recvmsg[tdata->thread_id].rcv_status = control;
val = 1;
decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
+ */
}
if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
//Send abort but retry commit after relooking up objects
ctrl = TRANS_ABORT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
- if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
+ if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
return 1;
}
printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
// printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
- if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
+ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
perror("Error sending fixed bytes for thread");
return NULL;
}
//Send list of machines involved in the transaction
// printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
- if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
- perror("Error sending list of machines for thread");
- return NULL;
+ {
+ int size=sizeof(unsigned int)*tdata->pilecount;
+ if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
+ perror("Error sending list of machines for thread");
+ return NULL;
+ }
}
//Send oids and version number tuples for objects that are read
// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
- if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
- perror("Error sending tuples for thread");
- return NULL;
+ {
+ int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
+ if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
+ perror("Error sending tuples for thread");
+ return NULL;
+ }
}
//Send objects that are modified
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
- headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
- if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
- perror("Error sending obj modified for thread");
- return NULL;
- }
+ int size;
+ headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ size=sizeof(objheader_t)+classsize[headeraddr->type];
+ if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
+ perror("Error sending obj modified for thread");
+ return NULL;
+ }
}
//Read message control message from participant side
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
- if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
+ if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
perror("Error sending message\n");
return NULL;
}