int readClientReq(trans_commit_data_t *, int);
int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
-int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
+char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
/* end server portion */
* Following this it also receives a new control message from the co-ordinator and processes this message*/
int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
- char control, sendctrl;
+ char control, sendctrl, retval;
objheader_t *tmp_header;
void *header;
- int i = 0, val, retval;
+ int i = 0, val;
/* Send reply to the Coordinator */
if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
free(modptr);
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< transinfo->numlocked; i++) {
- header = mhashSearch(transinfo->objlocked[i]);// find the header address
+ if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
+ return 1;
+ }
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ STATUS(headptr) &= ~(LOCK);
+ }
+ free(oidlocked);
+ }
if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to the Coordinator\n");
return 0;
} else { /* If versions don't match ...HARD ABORT */
v_nomatch++;
control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ STATUS(headptr) &= ~(LOCK);
+ }
+ free(oidlocked);
+ }
+
/* Send TRANS_DISAGREE to Coordinator */
if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to the Coordinator\n");
return 0;
}
- if (objlocked > 0) {
- STATUS(((objheader_t *)mobj)) &= ~(LOCK);
- free(oidlocked);
- }
+
return control;
}
}
}
/* Decide what control message to send to Coordinator */
- if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
+ if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
return 0;
}
- return val;
+ return control;
}
/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
* to send to Coordinator based on the votes of oids involved in the transaction */
-int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
+char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
int val;
char control = 0;
+
/* Condition to send TRANS_AGREE */
if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
control = TRANS_AGREE;
/* Condition to send TRANS_SOFT_ABORT */
if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
control = TRANS_SOFT_ABORT;
- char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
- *((int*)&msg[1])= *(objnotfound);
/* Send control message */
- if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
- perror("Error in sending no of objects that are not found\n");
+ if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
+ perror("Error in sending TRANS_SOFT_ABORT control\n");
return 0;
}
+
/* Send number of oids not found and the missing oids if objects are missing in the machine */
if(*(objnotfound) != 0) {
+ int msg[1];
+ msg[0] = *(objnotfound);
+ if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
+ perror("Error in sending objects that are not found\n");
+ return 0;
+ }
int size = sizeof(unsigned int)* *(objnotfound);
if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
perror("Error in sending objects that are not found\n");
/* Check to see if versions are same */
checkversion:
if ((STATUS(header) & LOCK) != LOCK) {
+ //FIXME make locking atomic
STATUS(header) |= LOCK;
newversion = header->version;
if(newversion == *(versionarry + i)) {
char localstat = 0;
+
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
pile_ptr = pile = createPiles(record);
void *transRequest(void *threadarg) {
int sd, i, n;
struct sockaddr_in serv_addr;
- struct hostent *server;
thread_data_array_t *tdata;
objheader_t *headeraddr;
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
pthread_exit(NULL);
}
- if ((retval = recv((int)sd, &control, sizeof(char), 0))<= 0) {
- printf("Error: In receiving control %s,%d\n", __FILE__, __LINE__);
- close(sd);
- pthread_exit(NULL);
- }
+ do {
+ retval = recv((int)sd, &control, sizeof(char), 0);
+ } while (retval < sizeof(char));
if(control == TRANS_UNSUCESSFUL) {
//printf("DEBUG-> TRANS_ABORTED\n");
char *ptr, retval = 0;
unsigned int *oidnotfound;
+ control = *(tdata->replyctrl);
+ if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+ perror("Error sending ctrl message for participant\n");
+ return 0;
+ }
+
+ //FIXME read missing objects
/* If the decided response is due to a soft abort and missing objects at the Participant's side */
+ /*
if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
- /* Read list of objects missing */
+ // Read list of objects missing
if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
N = oidcount * sizeof(unsigned int);
if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
}
retval = TRANS_SOFT_ABORT;
}
+ */
+
/* If the decided response is TRANS_ABORT */
if(*(tdata->replyctrl) == TRANS_ABORT) {
retval = TRANS_ABORT;
- } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
+ } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
retval = TRANS_COMMIT;
}
-
- if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 0;
- }
-
+
return retval;
}
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
int sd, size, val;
struct sockaddr_in serv_addr;
- struct hostent *server;
char control;
char machineip[16];
objheader_t *h;
void sendPrefetchReq(prefetchpile_t *mcpilenode) {
int sd, i, off, len, endpair, count = 0;
- struct sockaddr_in serv_addr;
- struct hostent *server;
+ struct sockaddr_in remoteAddr;
char machineip[16], control;
objpile_t *tmp;
+ unsigned int mid;
/* Send Trans Prefetch Request */
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
return;
}
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- midtoIP(mcpilenode->mid ,machineip);
- machineip[15] = '\0';
- serv_addr.sin_addr.s_addr = inet_addr(machineip);
+ mid = mcpilenode->mid;
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
/* Open Connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect for SEND_PREFETCH_REQUEST\n");
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("%s():error %d connecting to %s:%d\n", __func__, errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
close(sd);
return;
}
char oidnoffset[len];
bzero(oidnoffset, len);
*((unsigned int*)oidnoffset) = len;
- //memcpy(oidnoffset, &len, sizeof(int));
off = sizeof(int);
*((unsigned int *)((char *)oidnoffset + off)) = tmp->oid;
- //memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
off += sizeof(unsigned int);
for(i = 0; i < tmp->numoffset; i++) {
*((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i];
- //memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short));
off+=sizeof(unsigned short);
}