* transaction commits
* Return -1 on error else returns 0 */
int updatePrefetchCache(thread_data_array_t* tdata) {
- plistnode_t *pile = tdata->pilehead;
- while(pile != NULL) {
- if(pile->mid != myIpAddr) { //Not local machine
- int retval;
- char oidType;
- oidType = 'R';
- if((retval = copyToCache(pile->numread, (unsigned int *)(pile->objread), tdata, oidType)) != 0) {
- printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- oidType = 'M';
- if((retval = copyToCache(pile->nummod, pile->oidmod, tdata, oidType)) != 0) {
- printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- }
- pile = pile->next;
+ int retval;
+ char oidType;
+ oidType = 'R';
+ if((retval = copyToCache(tdata->buffer->f.numread, (unsigned int *)(tdata->buffer->objread), tdata, oidType)) != 0) {
+ printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ oidType = 'M';
+ if((retval = copyToCache(tdata->buffer->f.nummod, tdata->buffer->oidmod, tdata, oidType)) != 0) {
+ printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
}
return 0;
}
//Increment version for every modified object
if(oidType == 'M') {
newAddr->version += 1;
+ newAddr->notifylist = NULL;
}
//make an entry in prefetch lookup hashtable
void *oldptr;
* Global Variables *
***********************/
int udpSockFd;
+extern unsigned int myIpAddr;
int createUdpSocket() {
int sockfd;
clientaddr.sin_family = AF_INET;
clientaddr.sin_port = htons(UDP_PORT);
clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
- int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+ int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
if(tdata->buffer->f.nummod < maxObjsPerMsg) {
/* send single udp msg */
int iteration = 0;
* returns -1 on error and 0 on success */
int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
char writeBuffer[MAX_SIZE];
- int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+ int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
int offset = 0;
*((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
offset += sizeof(short);
+ *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
+ offset += sizeof(unsigned int);
if(iteration == 0) { // iteration flag == zero, send single udp msg
*((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg
offset += sizeof(short);
* returns -1 on error and 0 on success */
int invalidateFromPrefetchCache(char *buffer) {
int offset = sizeof(short);
- /* Read objects sent */
- int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
- int i;
- for(i = 0; i < numObjsRecv; i++) {
- unsigned int oid;
- oid = *((unsigned int *)(buffer+offset));
- objheader_t *header;
- /* Lookup Objects in prefetch cache and remove them */
- if((header = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
+ /* Read mid from msg */
+ unsigned int mid = *((unsigned int *)(buffer+offset));
+ offset += sizeof(unsigned int);
+ //Invalidate only if broadcast if from different machine
+ if(mid != myIpAddr) {
+ /* Read objects sent */
+ int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
+ int i;
+ for(i = 0; i < numObjsRecv; i++) {
+ unsigned int oid;
+ oid = *((unsigned int *)(buffer+offset));
+ objheader_t *header;
+ /* Lookup Objects in prefetch cache and remove them */
+ if((header = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
+ offset += sizeof(unsigned int);
}
- offset += sizeof(unsigned int);
}
return 0;
}
char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */
char *replyretry; /* Shared variable that keep track if coordinator needs retry */
transrecord_t *rec; /* To send modified objects */
- plistnode_t *pilehead; /* Shared variable, ptr to the head of the machine piles for the transaction rec */
} thread_data_array_t;
prefetchNodeInfo_t *pNodeInfo; //Global prefetch holding metadata
void initializePCache() {
- pNodeInfo = calloc(1, sizeof(prefetchNodeInfo_t));
+ pNodeInfo = calloc(1, sizeof(prefetchNodeInfo_t)); //Not freed yet
pNodeInfo->oldptr = prefetchcache;
pNodeInfo->newptr = NULL;
pNodeInfo->num_old_objstr = 1; //for prefetch cache allocated by objstralloc in trans.c file
fd=startlistening();
udpfd = udpInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
} else {
thread_data_array[threadnum].replyctrl = &treplyctrl;
thread_data_array[threadnum].replyretry = &treplyretry;
thread_data_array[threadnum].rec = record;
- thread_data_array[threadnum].pilehead = pile_ptr;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
do {
}
pthread_mutex_unlock(tdata->lock);
- /* Invalidate objects in other machine cache */
if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(tdata)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+
+ /* Invalidate objects in other machine cache */
if(tdata->buffer->f.nummod > 0) {
if((retval = invalidateObj(tdata)) != 0) {
printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
}
}
-
/* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
} else {
//printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
}
-
pthread_exit(NULL);
}
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
- int retval;
- /* Update prefetch cache */
- if((retval = updatePrefetchCache(tdata)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
unsigned short version;
void *mobj;
objheader_t *headptr;
-
+
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
} else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
if(transComProcess(localtdata) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
pthread_exit(NULL);
}
}
if (localtdata->transinfo->objnotfound != NULL) {
free(localtdata->transinfo->objnotfound);
}
-
pthread_exit(NULL);
}
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
- printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
// exit(-1);
} else {
printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
printf("notifyAll():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
fflush(stdout);
+ status = -1;
} else {
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;