From 1f20c498092089f1cbce287ed799398ce8dad97d Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 5 Aug 2008 08:32:53 +0000 Subject: [PATCH] minor changes for update cache call(where is it called from) --- .../DSTM/interface/addPrefetchEnhance.c | 29 +++++++-------- .../Runtime/DSTM/interface/addUdpEnhance.c | 35 ++++++++++++------- Robust/src/Runtime/DSTM/interface/dstm.h | 1 - Robust/src/Runtime/DSTM/interface/gCollect.c | 2 +- Robust/src/Runtime/DSTM/interface/trans.c | 31 ++++++++-------- 5 files changed, 50 insertions(+), 48 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c index a39bc897..317f30fc 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c @@ -84,23 +84,17 @@ void cleanPCache(thread_data_array_t *tdata) { * transaction commits * Return -1 on error else returns 0 */ int updatePrefetchCache(thread_data_array_t* tdata) { - plistnode_t *pile = tdata->pilehead; - while(pile != NULL) { - if(pile->mid != myIpAddr) { //Not local machine - int retval; - char oidType; - oidType = 'R'; - if((retval = copyToCache(pile->numread, (unsigned int *)(pile->objread), tdata, oidType)) != 0) { - printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; - } - oidType = 'M'; - if((retval = copyToCache(pile->nummod, pile->oidmod, tdata, oidType)) != 0) { - printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; - } - } - pile = pile->next; + int retval; + char oidType; + oidType = 'R'; + if((retval = copyToCache(tdata->buffer->f.numread, (unsigned int *)(tdata->buffer->objread), tdata, oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + oidType = 'M'; + if((retval = copyToCache(tdata->buffer->f.nummod, tdata->buffer->oidmod, tdata, oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; } return 0; } @@ -133,6 +127,7 @@ int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata, //Increment version for every modified object if(oidType == 'M') { newAddr->version += 1; + newAddr->notifylist = NULL; } //make an entry in prefetch lookup hashtable void *oldptr; diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c index d5b88621..aa8eeb82 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c @@ -11,6 +11,7 @@ * Global Variables * ***********************/ int udpSockFd; +extern unsigned int myIpAddr; int createUdpSocket() { int sockfd; @@ -116,7 +117,7 @@ int invalidateObj(thread_data_array_t *tdata) { clientaddr.sin_family = AF_INET; clientaddr.sin_port = htons(UDP_PORT); clientaddr.sin_addr.s_addr = INADDR_BROADCAST; - int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int); + int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); if(tdata->buffer->f.nummod < maxObjsPerMsg) { /* send single udp msg */ int iteration = 0; @@ -144,10 +145,12 @@ int invalidateObj(thread_data_array_t *tdata) { * returns -1 on error and 0 on success */ int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) { char writeBuffer[MAX_SIZE]; - int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int); + int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); int offset = 0; *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg offset += sizeof(short); + *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation + offset += sizeof(unsigned int); if(iteration == 0) { // iteration flag == zero, send single udp msg *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg offset += sizeof(short); @@ -184,18 +187,24 @@ int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int i * returns -1 on error and 0 on success */ int invalidateFromPrefetchCache(char *buffer) { int offset = sizeof(short); - /* Read objects sent */ - int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int); - int i; - for(i = 0; i < numObjsRecv; i++) { - unsigned int oid; - oid = *((unsigned int *)(buffer+offset)); - objheader_t *header; - /* Lookup Objects in prefetch cache and remove them */ - if((header = prehashSearch(oid)) != NULL) { - prehashRemove(oid); + /* Read mid from msg */ + unsigned int mid = *((unsigned int *)(buffer+offset)); + offset += sizeof(unsigned int); + //Invalidate only if broadcast if from different machine + if(mid != myIpAddr) { + /* Read objects sent */ + int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int); + int i; + for(i = 0; i < numObjsRecv; i++) { + unsigned int oid; + oid = *((unsigned int *)(buffer+offset)); + objheader_t *header; + /* Lookup Objects in prefetch cache and remove them */ + if((header = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + } + offset += sizeof(unsigned int); } - offset += sizeof(unsigned int); } return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 2fbf05f6..df979d74 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -214,7 +214,6 @@ typedef struct thread_data_array { char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */ char *replyretry; /* Shared variable that keep track if coordinator needs retry */ transrecord_t *rec; /* To send modified objects */ - plistnode_t *pilehead; /* Shared variable, ptr to the head of the machine piles for the transaction rec */ } thread_data_array_t; diff --git a/Robust/src/Runtime/DSTM/interface/gCollect.c b/Robust/src/Runtime/DSTM/interface/gCollect.c index fed70e2d..91fddb56 100644 --- a/Robust/src/Runtime/DSTM/interface/gCollect.c +++ b/Robust/src/Runtime/DSTM/interface/gCollect.c @@ -7,7 +7,7 @@ extern prehashtable_t pflookup; //Global prefetch cache lookup table prefetchNodeInfo_t *pNodeInfo; //Global prefetch holding metadata void initializePCache() { - pNodeInfo = calloc(1, sizeof(prefetchNodeInfo_t)); + pNodeInfo = calloc(1, sizeof(prefetchNodeInfo_t)); //Not freed yet pNodeInfo->oldptr = prefetchcache; pNodeInfo->newptr = NULL; pNodeInfo->num_old_objstr = 1; //for prefetch cache allocated by objstralloc in trans.c file diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index b5527c2b..242e6bce 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -192,10 +192,10 @@ int dstmStartup(const char * option) { fd=startlistening(); udpfd = udpInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd); if (master) { - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd); return 1; } else { @@ -537,7 +537,6 @@ int transCommit(transrecord_t *record) { thread_data_array[threadnum].replyctrl = &treplyctrl; thread_data_array[threadnum].replyretry = &treplyretry; thread_data_array[threadnum].rec = record; - thread_data_array[threadnum].pilehead = pile_ptr; /* If local do not create any extra connection */ if(pile->mid != myIpAddr) { /* Not local */ do { @@ -761,8 +760,15 @@ void *transRequest(void *threadarg) { } pthread_mutex_unlock(tdata->lock); - /* Invalidate objects in other machine cache */ if(*(tdata->replyctrl) == TRANS_COMMIT) { + int retval; + /* Update prefetch cache */ + if((retval = updatePrefetchCache(tdata)) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return; + } + + /* Invalidate objects in other machine cache */ if(tdata->buffer->f.nummod > 0) { if((retval = invalidateObj(tdata)) != 0) { printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); @@ -771,7 +777,6 @@ void *transRequest(void *threadarg) { } } - /* Send the final response such as TRANS_COMMIT or TRANS_ABORT * to all participants in their respective socket */ if (sendResponse(tdata, sd) == 0) { @@ -788,7 +793,6 @@ void *transRequest(void *threadarg) { } else { //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); } - pthread_exit(NULL); } @@ -830,12 +834,6 @@ void decideResponse(thread_data_array_t *tdata) { /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; *(tdata->replyretry) = 0; - int retval; - /* Update prefetch cache */ - if((retval = updatePrefetchCache(tdata)) != 0) { - printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); - return; - } } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; @@ -926,7 +924,7 @@ void *handleLocalReq(void *threadarg) { unsigned short version; void *mobj; objheader_t *headptr; - + localtdata = (local_thread_data_array_t *) threadarg; /* Counters and arrays to formulate decision on control message to be sent */ @@ -1035,11 +1033,13 @@ void *handleLocalReq(void *threadarg) { if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ if(transAbortProcess(localtdata) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); + fflush(stdout); pthread_exit(NULL); } } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { if(transComProcess(localtdata) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); + fflush(stdout); pthread_exit(NULL); } } @@ -1050,7 +1050,6 @@ void *handleLocalReq(void *threadarg) { if (localtdata->transinfo->objnotfound != NULL) { free(localtdata->transinfo->objnotfound); } - pthread_exit(NULL); } @@ -1363,7 +1362,7 @@ int getPrefetchResponse(int sd) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); /* TODO: For each object not found query DHT for new location and retrieve the object */ /* Throw an error */ - printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); + //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); // exit(-1); } else { printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); @@ -1705,8 +1704,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ printf("notifyAll():error %d connecting to %s:%d\n", errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - status = -1; fflush(stdout); + status = -1; } else { bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); msg[0] = THREAD_NOTIFY_RESPONSE; -- 2.34.1