* Following this it also receives a new control message from the co-ordinator and processes this message*/
int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
- char control, sendctrl;
+ char control, sendctrl, retval;
objheader_t *tmp_header;
void *header;
- int i = 0, val, retval;
+ int i = 0, val;
/* Send reply to the Coordinator */
if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
free(modptr);
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< transinfo->numlocked; i++) {
- header = mhashSearch(transinfo->objlocked[i]);// find the header address
+ if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
+ return 1;
+ }
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
/* This function increments counters while running a voting decision on all objects involved
* in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
- int val, i = 0;
+ int val, i = 0, j;
unsigned short version;
char control = 0, *ptr;
unsigned int oid;
v_nomatch++;
/* Send TRANS_DISAGREE to Coordinator */
control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ STATUS(headptr) &= ~(LOCK);
+ }
+ free(oidlocked);
+ }
if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to the Coordinator\n");
return 0;
} else { /* If versions don't match ...HARD ABORT */
v_nomatch++;
control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ STATUS(headptr) &= ~(LOCK);
+ }
+ free(oidlocked);
+ }
+
/* Send TRANS_DISAGREE to Coordinator */
if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
perror("Error in sending control to the Coordinator\n");
return 0;
}
- if (objlocked > 0) {
- STATUS(((objheader_t *)mobj)) &= ~(LOCK);
- free(oidlocked);
- }
+
return control;
}
}
}
/* Decide what control message to send to Coordinator */
- if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
+ if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
return 0;
}
- return val;
+ return control;
}
/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
* to send to Coordinator based on the votes of oids involved in the transaction */
-int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
+char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
int val;
char control = 0;
+
/* Condition to send TRANS_AGREE */
if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
control = TRANS_AGREE;
/* Condition to send TRANS_SOFT_ABORT */
if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
control = TRANS_SOFT_ABORT;
- char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
- *((int*)&msg[1])= *(objnotfound);
/* Send control message */
- if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
- perror("Error in sending no of objects that are not found\n");
+ if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
+ perror("Error in sending TRANS_SOFT_ABORT control\n");
return 0;
}
+
/* Send number of oids not found and the missing oids if objects are missing in the machine */
if(*(objnotfound) != 0) {
+ int msg[1];
+ msg[0] = *(objnotfound);
+ if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
+ perror("Error in sending objects that are not found\n");
+ return 0;
+ }
int size = sizeof(unsigned int)* *(objnotfound);
if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
perror("Error in sending objects that are not found\n");
char control;
objheader_t * header;
int bytesRecvd;
-/*
- unsigned int myIpAddr;
-#ifdef MAC
- myIpAddr = getMyIpAddr("en1");
-#else
- myIpAddr = getMyIpAddr("eth0");
-#endif
-*/
- /* Repeatedly recv the oid and offset pairs sent for prefetch */
+ /* Repeatedly recv one oid and offset pair sent for prefetch */
while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
count++;
if(length == -1)
break;
- index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
- // first 4 bytes are saved to send the
- // size of the buffer (that is computed at the end of the loop)
+ index = 0;
bytesRecvd = 0;
do {
bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
sum += n;
} while(sum < N && n != 0);
+ bzero(&buffer, PRE_BUF_SIZE);
/* Process each oid */
if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found in buffer for later use */
}
}
}
+
/* Check for overflow in the buffer */
if (index >= PRE_BUF_SIZE) {
printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
}
}
- /* Add the buffer size into buffer as a parameter */
- *((unsigned int *)buffer)=index;
+ //Send buffer size
+ if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) {
+ perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
+ return 1;
+ }
/* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
+ if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) {
perror("Error: sending oids found\n");
return 1;
}
int sd;
struct sockaddr_in remoteAddr;
int bytesSent;
- int status, size, retry = 0;
-
+ int status, size;
+
int i = 0;
while(i < numoid) {
oid = *(oidarry + i);
/* Check to see if versions are same */
checkversion:
if ((STATUS(header) & LOCK) != LOCK) {
+ //FIXME make locking atomic
STATUS(header) |= LOCK;
newversion = header->version;
if(newversion == *(versionarry + i)) {