+void *transRequest(void *threadarg) {
+ int sd, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ thread_data_array_t *tdata;
+ char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+
+ tdata = (thread_data_array_t *) threadarg;
+ //Send Trans Request
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket for TRANS_REQUEST");
+ return NULL;
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
+
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ perror("Error in connect for TRANS_REQUEST");
+ return NULL;
+ }
+
+ if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+ perror("Error sending message for thread");
+ return NULL;
+ }
+ //Read message from participant side
+ read(sd, buffer, sizeof(buffer));
+ //process the participant's request
+ recvcontrol = buffer[0];
+ //Update common data structure and increment count
+ tdata->recvmsg[tdata->thread_id] = recvcontrol;
+ //Lock and update count
+ //Thread sleeps until all messages from pariticipants are received by coordinator
+ pthread_mutex_lock(tdata->lock);
+ (*(tdata->count))++;
+
+ if(*(tdata->count) == tdata->pilecount) {
+ pthread_cond_broadcast(tdata->threshold);
+ //Check common data structure
+ for (i = 0 ; i < tdata->pilecount ; i++) {
+ //Check in any DISAGREE has come
+ if(tdata->recvmsg[i] == TRANS_DISAGREE) {
+ //Send abort
+ transabort++;
+ buffer[0] = TRANS_ABORT;
+ if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+ perror("Error sending message for thread");
+ return NULL;
+ }
+ } else if(tdata->recvmsg[i] == AGREE) {
+ transagree++;
+ } else if(tdata->recvmsg[i] == AGREE_BUT_MISSING_OBJECTS) {
+ transmiss++;
+ } else
+ transsoftabort++;
+ }
+ if(transagree == tdata->pilecount){
+ //Send Commit
+ buffer[0] = TRANS_COMMIT;
+ if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+ perror("Error sending message for thread");
+ return NULL;
+ }
+ }
+ if(transsoftabort > 0 && transabort == 0) {
+ //Send abort but retry commit
+ //i.e. wait at the participant end and then resend either agree or disagree
+ //
+
+ }
+ if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
+ //Relookup all missing objects
+ //send missing mising object/ objects
+ }
+
+ } else {
+ pthread_cond_wait(tdata->threshold, tdata->lock);
+ }
+ pthread_mutex_unlock(tdata->lock);
+ close(sd);
+ //Reset numread and nummod for the next machine
+ pthread_exit(NULL);
+}
+