printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
- memset(readBuffer, 0, MAX_SIZE);
while(1) {
int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
if(bytesRcvd == -1) {
/* Function that invalidate objects that
* have been currently modified
* returns -1 on error and 0 on success */
-int invalidateObj(trans_req_data_t *tdata) {
+int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) {
+ struct timeval start, end;
struct sockaddr_in clientaddr;
int retval;
-
+ int i;
+ int nummod=0;
+ for(i=0;i<pilecount;i++) {
+ nummod+=tdata[i].f.nummod;
+ }
bzero(&clientaddr, sizeof(clientaddr));
clientaddr.sin_family = AF_INET;
clientaddr.sin_port = htons(UDP_PORT);
clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
- if(tdata->f.nummod < maxObjsPerMsg) {
- /* send single udp msg */
- int iteration = 0;
- if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
- printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- } else {
- /* Split into several udp msgs */
- int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg;
- if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++;
- int i;
- for(i = 1; i <= maxUdpMsg; i++) {
- if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
- printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- }
+ /* send single udp msg */
+ if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ /* Send trans commit or abort message when prefetching or caching */
+ for(i=0;i<pilecount;i++){
+ if(socklist[i] !=0)
+ send_data(socklist[i], &finalresponse, sizeof(char));
}
return 0;
}
/* Function sends a udp broadcast, also distinguishes
- * msg size to be sent based on the iteration flag
+ * msg size to be sent based on the total number of objects modified
* returns -1 on error and 0 on success */
-int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
+int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) {
char writeBuffer[MAX_SIZE];
int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
int offset = 0;
+ int i=0,j=0;
+
*((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
offset += sizeof(short);
*((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
offset += sizeof(unsigned int);
- if(iteration == 0) { // iteration flag == zero, send single udp msg
- *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod)); //sizeof msg
- offset += sizeof(short);
- int i;
- for(i = 0; i < tdata->f.nummod; i++) {
- *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i]; //copy objects
- offset += sizeof(unsigned int);
+
+ while(nummod>0) {
+ int numtosend=nummod>maxObjsPerMsg?maxObjsPerMsg:nummod;
+ int localoffset=offset;
+ int sentmsgs=0;
+ *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend);
+ localoffset += sizeof(short);
+
+ for(; j < pilecount; j++) {
+ for(; i < tdata[j].f.nummod; i++) {
+ *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i]; //copy objects
+ localoffset += sizeof(unsigned int);
+ if ((++sentmsgs)==numtosend) {
+ i++;
+ goto send;
+ }
+ }
+ i=0;
}
- } else { // iteration flag > zero, send multiple udp msg
- int numObj;
- if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0)
- numObj = maxObjsPerMsg;
- else
- numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg);
- *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
- offset += sizeof(short);
- int index = (iteration - 1) * maxObjsPerMsg;
- int i;
- for(i = 0; i < numObj; i++) {
- *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i];
- offset += sizeof(unsigned int);
+send:
+ if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) {
+ perror("sendto error- ");
+ printf("DEBUG-> sendto error: errorno %d\n", errno);
+ return -1;
}
- }
- int n;
- if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
- perror("sendto error- ");
- printf("DEBUG-> sendto error: errorno %d\n", errno);
- return -1;
+ nummod= nummod - numtosend;
}
return 0;
}
}
return 0;
}
+
int createUdpSocket();
int udpInit();
void *udpListenBroadcast(void *);
-int invalidateObj(trans_req_data_t *);
+int invalidateObj(trans_req_data_t *, int, char, int*);
int invalidateFromPrefetchCache(char *);
-int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int);
+int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*);
#endif
break;
default:
- printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
+ printf("Error: No response to TRANS_AGREE OR DISAGREE protocol control = %d %s, %d\n", control, __FILE__, __LINE__);
//TODO Use fixed.trans_id TID since Client may have died
break;
}
}
/* Condition to send TRANS_SOFT_ABORT */
if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
- //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
control = TRANS_SOFT_ABORT;
/* Send control message */
break;
objheader_t *headeraddr=(objheader_t*) curr->val;
unsigned int machinenum;
- if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
- machinenum = myIpAddr;
- } else if ((machinenum = lhashSearch(curr->key)) == 0) {
- printf("Error: No such machine %s, %d\n", __func__, __LINE__);
- return 1;
- }
- if(machinenum != myIpAddr)
+ objheader_t *tmp;
+
+ if (STATUS(headeraddr) & NEW) {
+ //new objects cannot be stale
+ } else if ((tmp=mhashSearch(curr->key)) != NULL) {
+ if (tmp->version!=headeraddr->version) {
+ //version mismatch
+ deletehead(head);
+ return 1; //return 1 when objects are inconsistent
+ }
+ } else {
+ machinenum = lhashSearch(curr->key);
head = createList(head, headeraddr, machinenum, c_numelements);
+ }
+
curr = curr->next;
}
}
/* Send oid and versions for checking */
- int retval=-1;
- if(head != NULL) {
- retval = verify(head);
- }
-
- if(retval == 1) { //consistent objects
- /* free head */
- deletehead(head);
+ if(head == NULL)
return 0;
- }
-
- if(retval == 0) {
- /* free head */
- deletehead(head);
- return 1; //return 1 when objects are inconsistent
- }
-
- return 0;
+
+ int retval = verify(head);
+ deletehead(head);
+ return retval==0;
}
nodeElem_t * createList(nodeElem_t *head, objheader_t *headeraddr, unsigned int mid,
plistnode_t *createPiles();
plistnode_t *sortPiles(plistnode_t *pileptr);
-
-
/*******************************
* Send and Recv function calls
*******************************/
* Sends a transrequest() to each remote machines for objects found remotely
* and calls handleLocalReq() to process objects found locally */
int transCommit() {
+ char buffer[30];
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
char treplyretry; /* keeps track of the common response that needs to be sent */
#ifdef SANDBOX
abortenabled=0;
#endif
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
#ifdef LOGEVENTS
int iii;
}
socklist[sockindex] = sd;
/* Send bytes of data with TRANS_REQUEST control message */
- send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
- send_data(sd, tosend[sockindex].listmid, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
}
/* Send oids and version number tuples for objects that are read */
{
int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
- send_data(sd, tosend[sockindex].objread, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
}
/* Send objects that are modified */
memcpy(modptr+offset, headeraddr, size);
offset+=size;
}
- send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
free(modptr);
} else { //handle request locally
handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
sockindex++;
pile = pile->next;
} //end of pile processing
+
/* Recv Ctrl msgs from all machines */
int i;
for(i = 0; i < pilecount; i++) {
#endif
}
}
+
/* Decide the final response */
if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(listmid);
return 1;
}
-
-
- /* Invalidate objects in other machine cache */
- if(tosend[i].f.nummod > 0) {
- if((retval = invalidateObj(&(tosend[i]))) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
- return 1;
- }
- }
#ifdef ABORTREADERS
removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
}
#endif
#endif
- send_data(sd, &finalResponse, sizeof(char));
+#ifndef CACHE
+ send_data(sd, &finalResponse, sizeof(char));
+#endif
} else {
/* Complete local processing */
doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
}
}
+#ifdef CACHE
+ {
+ /* Invalidate objects in other machine cache */
+ int retval;
+ if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ }
+#endif
/* Free resources */
free(tosend);
free(listmid);
if(treplyretryCount >= NUM_TRY_TO_COMMIT)
exponentialdelay();
else
- randomdelay();
+ randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
#endif
/* Retry trans commit procedure during soft_abort case */
} while (treplyretry);
- exponential_backoff.tv_sec = 0;
- exponential_backoff.tv_nsec = (long)(10000);//10 microsec_
-
if(finalResponse == TRANS_ABORT) {
#ifdef TRANSSTATS
LOGEVENT('A');
transinfo->modptr = NULL;
transinfo->numlocked = numoidlocked;
transinfo->numnotfound = numoidnotfound;
-
+
/* Condition to send TRANS_AGREE */
if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
*getReplyCtrl = TRANS_AGREE;
return;
}
} else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
- /* Invalidate objects in other machine cache */
- if(tdata->f.nummod > 0) {
- int retval;
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
-#endif
if(transComProcess(tdata, transinfo) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
fflush(stdout);