From: adash Date: Thu, 18 Mar 2010 01:22:52 +0000 (+0000) Subject: added the new looup table for prefetch cache X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=79904f3c1219936bd52c6a937cbc65fd866a68e3;p=IRC.git added the new looup table for prefetch cache updated gCollect as per the new prefetch cache send trans disagree quickly and unlock the objects quickly some bugs fixes that resulted from the dsm caching --- diff --git a/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.c b/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.c index f0436e0a..ec000e3e 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.c @@ -1,5 +1,5 @@ #include "addPrefetchEnhance.h" -#include "prelookup.h" +#include "altprelookup.h" extern int numprefetchsites; // Number of prefetch sites extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site @@ -44,10 +44,11 @@ char getOperationMode(int siteid) { * we take action accordingly */ void handleDynPrefetching(int numLocal, int ntuples, int siteid) { if(numLocal < ntuples) { - /* prefetch not found locally(miss in cache) */ + /* prefetch not found locally(miss in cache); turn on prefetching*/ evalPrefetch[siteid].operMode = 1; evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL; } else { + //Turn off prefetch site if(getOperationMode(siteid) != 0) { evalPrefetch[siteid].uselesscount--; if(evalPrefetch[siteid].uselesscount <= 0) { @@ -107,15 +108,15 @@ void cleanPCache() { int updatePrefetchCache(trans_req_data_t *tdata) { int retval; char oidType; - /* TODO commit it for now because objects read - * are already copied to cache during remote reading */ - //oidType = 'R'; - //if(tdata->f.numread > 0) { - // if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) { - // printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); - // return -1; - // } - //} + /*//TODO comment it for now because remote objects read are already in the prefetch cache + oidType = 'R'; + if(tdata->f.numread > 0) { + if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + } + */ if(tdata->f.nummod > 0) { oidType = 'M'; if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) { @@ -130,13 +131,13 @@ int copyToCache(int numoid, unsigned int *oidarray, char oidType) { int i; for (i = 0; i < numoid; i++) { unsigned int oid; - if(oidType == 'R') { - char * objread = (char *) oidarray; - oid = *((unsigned int *)(objread+(sizeof(unsigned int)+ - sizeof(unsigned short))*i)); - } else { + //if(oidType == 'R') { + // char * objread = (char *) oidarray; + // oid = *((unsigned int *)(objread+(sizeof(unsigned int)+ + // sizeof(unsigned short))*i)); + //} else { oid = oidarray[i]; - } + //} pthread_mutex_lock(&prefetchcache_mutex); objheader_t * header; if((header = (objheader_t *) t_chashSearch(oid)) == NULL) { @@ -161,14 +162,10 @@ int copyToCache(int numoid, unsigned int *oidarray, char oidType) { newAddr->version += 1; newAddr->notifylist = NULL; } + STATUS(newAddr)=0; + //make an entry in prefetch lookup hashtable - void *oldptr; - if((oldptr = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - prehashInsert(oid, newAddr); - } else { - prehashInsert(oid, newAddr); - } + prehashInsert(oid, newAddr); } //end of for return 0; } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.h b/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.h index 44c87049..7eb3c519 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.h @@ -2,7 +2,7 @@ #define _ADDPREFETCHENHANCE_H_ #include "dstm.h" -#include "mlookup.h" +#include "altmlookup.h" #include "gCollect.h" typedef struct prefetchCountStats { diff --git a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c index c7e3f9f4..ae49de7f 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c @@ -5,7 +5,7 @@ #include #include #include "addUdpEnhance.h" -#include "prelookup.h" +#include "altprelookup.h" #ifdef ABORTREADERS #include "abortreaders.h" #endif @@ -135,26 +135,6 @@ int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, in return 0; } -#if 0 -int invalidateObj(trans_req_data_t *tdata) { - struct sockaddr_in clientaddr; - int retval; - - bzero(&clientaddr, sizeof(clientaddr)); - clientaddr.sin_family = AF_INET; - clientaddr.sin_port = htons(UDP_PORT); - clientaddr.sin_addr.s_addr = INADDR_BROADCAST; - int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); - /* send single udp msg */ - if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) { - printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; - } - return 0; -} - -#endif - /* Function sends a udp broadcast, also distinguishes * msg size to be sent based on the total number of objects modified * returns -1 on error and 0 on success */ @@ -198,52 +178,6 @@ send: return 0; } -#if 0 - -/* Function sends a udp broadcast, also distinguishes - * msg size to be sent based on the iteration flag - * returns -1 on error and 0 on success */ -int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) { - char writeBuffer[MAX_SIZE]; - int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); - int offset = 0; - *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg - offset += sizeof(short); - *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation - offset += sizeof(unsigned int); - if(iteration == 0) { // iteration flag == zero, send single udp msg - *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod)); //sizeof msg - offset += sizeof(short); - int i; - for(i = 0; i < tdata->f.nummod; i++) { - *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i]; //copy objects - offset += sizeof(unsigned int); - } - } else { // iteration flag > zero, send multiple udp msg - int numObj; - if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0) - numObj = maxObjsPerMsg; - else - numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg); - *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj); - offset += sizeof(short); - int index = (iteration - 1) * maxObjsPerMsg; - int i; - for(i = 0; i < numObj; i++) { - *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i]; - offset += sizeof(unsigned int); - } - } - int n; - if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) { - perror("sendto error- "); - printf("DEBUG-> sendto error: errorno %d\n", errno); - return -1; - } - return 0; -} -#endif - /* Function searches given oid in prefetch cache and invalidates obj from cache * returns -1 on error and 0 on success */ int invalidateFromPrefetchCache(char *buffer) { diff --git a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h index 295f8af4..38cca125 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h @@ -2,7 +2,7 @@ #define _ADDUDPENHANCE_H #include "dstm.h" -#include "mlookup.h" +#include "altmlookup.h" /******************************* @@ -21,9 +21,7 @@ int createUdpSocket(); int udpInit(); void *udpListenBroadcast(void *); -//int invalidateObj(trans_req_data_t *); int invalidateObj(trans_req_data_t *, int, char, int*); int invalidateFromPrefetchCache(char *); -//int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int); int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*); #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.c b/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.c new file mode 100755 index 00000000..b44e8e99 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.c @@ -0,0 +1,304 @@ +#include "altprelookup.h" +#include "dsmlock.h" +#include "gCollect.h" +extern objstr_t *prefetchcache; +extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache +extern prefetchNodeInfo_t pNodeInfo; + +prehashtable_t pflookup; //Global prefetch cache table + +unsigned int prehashCreate(unsigned int size, float loadfactor) { + prehashlistnode_t *nodes; + int i; + + // Allocate space for the hash table + if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + pflookup.table = nodes; + pflookup.size = size; + pflookup.mask = size -1; + pflookup.numelements = 0; // Initial number of elements in the hash + pflookup.loadfactor = loadfactor; + pflookup.threshold=loadfactor*size; + + //Initilize + for(i=0;i> 1; +} + +//Store oids and their pointers into hash +void prehashInsert(unsigned int key, void *val) { + + int isFound=0; + prehashlistnode_t *ptr, *tmp, *node; + + if(pflookup.numelements > (pflookup.threshold)) { + //Resize + unsigned int newsize = pflookup.size << 1; + prehashResize(newsize); + } + + unsigned int keyindex=key>>1; + volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock; + while(!write_trylock(lockptr)) { + sched_yield(); + } + + ptr = &pflookup.table[keyindex&pflookup.mask]; + + if(ptr->key==0) { //Insert at the first bin of the table + ptr->key = key; + ptr->val = val; + atomic_inc(&pflookup.numelements); + } else { + tmp = ptr; + while(tmp != NULL) { + if(tmp->key == key) { + isFound=1; + tmp->val = val;//Replace value for an exsisting key + write_unlock(lockptr); + return; + } + tmp=tmp->next; + } + if(!isFound) { //Insert new key and value into the chain of linked list for the given bin + node = calloc(1, sizeof(prehashlistnode_t)); + node->key = key; + node->val = val ; + node->next = ptr->next; + ptr->next=node; + atomic_inc(&pflookup.numelements); + } + } + write_unlock(lockptr); + return; +} + +// Search for an address for a given oid +void *prehashSearch(unsigned int key) { + int index; + + unsigned int keyindex=key>>1; + volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock; + while(!read_trylock(lockptr)) { + sched_yield(); + } + prehashlistnode_t *node = &pflookup.table[keyindex&pflookup.mask]; + + do { + if(node->key == key) { + void * tmp=node->val; + read_unlock(lockptr); + return tmp; + } + node = node->next; + } while (node!=NULL); + read_unlock(lockptr); + return NULL; +} + +unsigned int prehashRemove(unsigned int key) { + unsigned int keyindex = key >> 1; + volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock; + prehashlistnode_t *node, *prev; + + while(!write_trylock(lockptr)) { + sched_yield(); + } + prehashlistnode_t *curr = &pflookup.table[keyindex&pflookup.mask]; + // If there are no elements + //delete from first bin of table + if (curr->next == NULL && curr->key == key) { + curr->key = 0; + //TODO free(val) ? + curr->val = NULL; + atomic_dec(&(pflookup.numelements)); + write_unlock(lockptr); + return 0; + } + //delete from first bin of table but elements follow in linked list + if (curr->next != NULL && curr->key == key) { + curr->key = curr->next->key; + curr->val = curr->next->val; + node = curr->next; + curr->next = node->next; + free(node); + atomic_dec(&(pflookup.numelements)); + write_unlock(lockptr); + return 0; + } + prev = curr; + curr = curr->next; + //delete from elements in the linked list + for(; curr != NULL; curr = curr->next) { + if (curr->key == key) { + prev->next = curr->next; + free(curr); + atomic_dec(&(pflookup.numelements)); + write_unlock(lockptr); + return 0; + } + prev = curr; + } + write_unlock(lockptr); + return 1; +} + +unsigned int prehashResize(unsigned int newsize) { + prehashlistnode_t *node, *ptr; // curr and next keep track of the current and the next chashlistnodes in a linked list + unsigned int oldsize; + int i,index; + unsigned int mask; + + for(i=0;ikey) == 0) { //Exit inner loop if there the first element for a given bin/index is NULL + break; //key = val =0 for element if not present within the hash table + } + next = curr->next; + //index = (key & mask)>>1; + index = (key >> 1) & mask; + tmp=&pflookup.table[index]; + // Insert into the new table + if(tmp->key==0) { + tmp->key=curr->key; + tmp->val=curr->val; + if (!isfirst) + free(curr); + } /* + NOTE: Add this case if you change this... + This case currently never happens because of the way things rehash.... +else if (isfirst) { + prehashlistnode_t * newnode = calloc(1, sizeof(prehashlistnode_t)); + newnode->key = curr->key; + newnode->val = curr->val; + newnode->next = tmp->next; + tmp->next=newnode; + } */ + else { + curr->next=tmp->next; + tmp->next=curr; + } + + isfirst = 0; + curr = next; + } while(curr!=NULL); + } + + free(ptr); //Free the memory of the old hash table + for(i=0;inext != NULL) { + isFirstBin = 0; + curr = prev->next; + prev->next = curr->next; + free(curr); + } + if(isFirstBin == 1) { + prev->key = 0; + prev->next = NULL; + } + } + { + int stale; + pthread_mutex_unlock(&pflookup.lock); + pthread_mutex_lock(&prefetchcache_mutex); + if (pNodeInfo.newstale==NULL) { + //transfer the list wholesale; + pNodeInfo.oldstale=pNodeInfo.oldptr; + pNodeInfo.newstale=pNodeInfo.newptr; + } else { + //merge the two lists + pNodeInfo.newstale->prev=pNodeInfo.oldptr; + pNodeInfo.newstale=pNodeInfo.newptr; + } + stale=STALL_THRESHOLD-pNodeInfo.stale_count; + + if (stale>0&&stale>pNodeInfo.stall) + pNodeInfo.stall=stale; + + pNodeInfo.stale_count+=pNodeInfo.os_count; + pNodeInfo.oldptr=getObjStr(DEFAULT_OBJ_STORE_SIZE); + pNodeInfo.newptr=pNodeInfo.oldptr; + pNodeInfo.os_count=1; + pthread_mutex_unlock(&prefetchcache_mutex); + } +#endif + */ +} + diff --git a/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.h b/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.h new file mode 100755 index 00000000..fc08e291 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.h @@ -0,0 +1,50 @@ +#ifndef _PRELOOKUP_H_ +#define _PRELOOKUP_H_ + +#include +#include +#include + +#define PLOADFACTOR 0.25 +#define PHASH_SIZE 1024 + +typedef struct prehashlistnode { + unsigned int key; + void *val; //this can be cast to another type or used to point to a larger structure + struct prehashlistnode *next; +} prehashlistnode_t; + + +struct prelockarray { + volatile unsigned int lock; + int buf[15]; +}; + +#define PRENUMLOCKS 16 +#define PRELOCKMASK (PRENUMLOCKS-1) + + +struct objstr; + +typedef struct prehashtable { + prehashlistnode_t *table; // points to beginning of hash table + unsigned int size; + unsigned int mask; + unsigned int numelements; + unsigned int threshold; + double loadfactor; + struct prelockarray larray[PRENUMLOCKS]; +} prehashtable_t; + +/* Prototypes for hash*/ +unsigned int prehashCreate(unsigned int size, float loadfactor); +unsigned int prehashFunction(unsigned int key); +void prehashInsert(unsigned int key, void *val); +void *prehashSearch(unsigned int key); //returns val, NULL if not found +unsigned int prehashRemove(unsigned int key); //returns -1 if not found +unsigned int prehashResize(unsigned int newsize); +void prehashClear(); +/* end hash */ + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 31212ea4..2ed4d70a 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -87,6 +87,7 @@ #define RETRYINTERVAL 75 //N (For MatrixMultiply, 2DFFT benchmarks) #define SHUTDOWNINTERVAL 1 //M #define NUM_TRY_TO_COMMIT 2 +#define MEM_ALLOC_THRESHOLD 20485760//20MB #include #include @@ -319,10 +320,12 @@ char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid); -void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *, +char getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short); -void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, +char getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short); +void procRestObjs(char *, char *, int , int, int, unsigned int *, unsigned int *, int *, int *, int *, int *); +void processVerNoMatch(unsigned int *, unsigned int *, int *, int *, int *, int *, unsigned int, unsigned short); /* end server portion */ /* Prototypes for transactions */ diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index cf5a0611..44b8b895 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -281,7 +281,6 @@ void *dstmAccept(void *acceptfd) { char control,ctrl, response; char *ptr; void *srcObj; - #ifdef RECOVERY void *dupeptr; unsigned int transIDreceived; @@ -368,7 +367,6 @@ void *dstmAccept(void *acceptfd) { transinfo.modptr = NULL; transinfo.numlocked = 0; transinfo.numnotfound = 0; - //if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); @@ -479,6 +477,7 @@ void *dstmAccept(void *acceptfd) { threadNotify(oid,version,threadid); free(buffer); break; + #ifdef RECOVERY case CLEAR_NOTIFY_LIST: #ifdef DEBUG @@ -923,7 +922,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { return 0; /* Read modified objects */ - //printf("fixed.sum_bytes= %d\n", fixed.sum_bytes); if(fixed.nummod != 0) { if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) { printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__); @@ -947,15 +945,12 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { return 1; } ptr = (char *) modptr; - //printf("fixed.nummod= %d\n", fixed.nummod); - //fflush(stdout); - for(i = 0 ; i < fixed.nummod; i++) { + for(i = 0 ; i < fixed.nummod; i++){ int tmpsize=0; headaddr = (objheader_t *) ptr; oid = OID(headaddr); oidmod[i] = oid; GETSIZE(tmpsize, headaddr); - //printf("i= %d, tmpsize= %d, oid= %u\n", i, tmpsize, oid); ptr += sizeof(objheader_t) + tmpsize; } #ifdef DEBUG @@ -1131,6 +1126,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne * Object store holds the modified objects involved in the transaction request */ ptr = (char *) modptr; + char retval; + /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < fixed->numread + fixed->nummod; i++) { if (i < fixed->numread) { //Objs only read and not modified @@ -1142,7 +1139,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne #ifdef DEBUG printf("%s -> oid : %u version : %d\n",__func__,oid,version); #endif - getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, + retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version); } else { //Objs modified if(i == fixed->numread) { @@ -1155,13 +1152,45 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne GETSIZE(tmpsize, headptr); ptr += sizeof(objheader_t) + tmpsize; - getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, + retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version); } + if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) { + //unlock objects as soon versions mismatch or else + //locks cannot be acquired elsewhere + if (objlocked > 0) { + int useWriteUnlock = 0; + for(j = 0; j < objlocked; j++) { + if(oidlocked[j] == -1) { + useWriteUnlock = 1; + continue; + } + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + if(useWriteUnlock) { + write_unlock(STATUSPTR(headptr)); + } else { + read_unlock(STATUSPTR(headptr)); + } + } + if(v_nomatch > 0) + free(oidlocked); + } + objlocked=0; + break; + } + } + + //go through rest of the objects for version mismatches + if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) { + i++; + procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes); } - /* send TRANS_DISAGREE and objs*/ + /* send TRANS_DISAGREE and objs that caused the ABORTS*/ if(v_nomatch > 0) { #ifdef CACHE char *objs = calloc(1, numBytes); @@ -1175,6 +1204,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne offset += size; } #endif + /* if (objlocked > 0) { int useWriteUnlock = 0; for(j = 0; j < objlocked; j++) { @@ -1194,20 +1224,20 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } free(oidlocked); } + */ - #ifdef DEBUG printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__); #endif if(control < 0) printf("control = %d\n",control); + control=TRANS_DISAGREE; send_data(acceptfd, &control, sizeof(char)); #ifdef CACHE send_data(acceptfd, &numBytes, sizeof(int)); send_data(acceptfd, objs, numBytes); - transinfo->objvernotmatch = oidvernotmatch; transinfo->numvernotmatch = objvernotmatch; free(objs); @@ -1226,7 +1256,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } /* Update Commit info for objects that are modified */ -void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, +char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch, int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) { @@ -1251,6 +1281,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, /* Save the oids not found and number of oids not found for later use */ oidnotfound[*objnotfound] = oid; (*objnotfound)++; + *control = TRANS_DISAGREE; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock @@ -1260,6 +1291,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, #endif if (version == ((objheader_t *)mobj)->version) { /* match versions */ (*v_matchnolock)++; + *control = TRANS_AGREE; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[*objvernotmatch] = oid; @@ -1277,6 +1309,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, } else { //we are locked if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ (*v_matchlock)++; + *control = TRANS_SOFT_ABORT; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[*objvernotmatch] = oid; @@ -1293,10 +1326,11 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, #ifdef DEBUG printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch); #endif + return *control; } /* Update Commit info for objects that are read */ -void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, +char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) { void *mobj; @@ -1322,6 +1356,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked /* Save the oids not found and number of oids not found for later use */ oidnotfound[*objnotfound] = oid; (*objnotfound)++; + *control = TRANS_DISAGREE; } else { /* If Obj found in machine (i.e. has not moved) */ #ifdef DEBUG printf("%s -> Obj found!!\n",__func__); @@ -1333,6 +1368,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks if (version == ((objheader_t *)mobj)->version) { /* match versions */ (*v_matchnolock)++; + *control = TRANS_AGREE; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[(*objvernotmatch)++] = oid; @@ -1350,6 +1386,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked } else { /* Some other transaction has aquired a write lock on this object */ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ (*v_matchlock)++; + *control = TRANS_SOFT_ABORT; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[*objvernotmatch] = oid; @@ -1365,6 +1402,80 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked #ifdef DEBUG printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch); #endif + return *control; +} + +void procRestObjs(char *objread, + char *objmod, + int index, + int numread, + int nummod, + unsigned int *oidnotfound, + unsigned int *oidvernotmatch, + int *objnotfound, + int *objvernotmatch, + int *v_nomatch, + int *numBytes) { + int i; + unsigned int oid; + unsigned short version; + + /* Process each oid in the machine pile/ group per thread */ + for (i = index; i < numread+nummod; i++) { + if (i < numread) { //Objs only read and not modified + int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(objread + incr)); + incr += sizeof(unsigned int); + version = *((unsigned short *)(objread + incr)); + } else { //Objs modified + objheader_t *headptr; + headptr = (objheader_t *) objmod; + oid = OID(headptr); + version = headptr->version; + int tmpsize; + GETSIZE(tmpsize, headptr); + objmod += sizeof(objheader_t) + tmpsize; + } + processVerNoMatch(oidnotfound, + oidvernotmatch, + objnotfound, + objvernotmatch, + v_nomatch, + numBytes, + oid, + version); + } + return; +} + +void processVerNoMatch(unsigned int *oidnotfound, + unsigned int *oidvernotmatch, + int *objnotfound, + int *objvernotmatch, + int *v_nomatch, + int *numBytes, + unsigned int oid, + unsigned short version) { + void *mobj; + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[*objnotfound] = oid; + (*objnotfound)++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (version != ((objheader_t *)mobj)->version) { /* match versions */ + (*v_nomatch)++; + oidvernotmatch[*objvernotmatch] = oid; + (*objvernotmatch)++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + *numBytes += size; + } + } } /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT diff --git a/Robust/src/Runtime/DSTM/interface_recovery/gCollect.c b/Robust/src/Runtime/DSTM/interface_recovery/gCollect.c index 6e9d339e..185a02f6 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/gCollect.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/gCollect.c @@ -1,5 +1,5 @@ #include "gCollect.h" -#include "prelookup.h" +#include "altprelookup.h" extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache @@ -62,7 +62,7 @@ void *prefetchobjstrAlloc(unsigned int size) { pNodeInfo.newptr->prev=tmp; pNodeInfo.newptr=tmp; pNodeInfo.os_count++; - + if (pNodeInfo.os_count>PREFETCH_FLUSH_THRESHOLD) { //remove oldest from linked list objstr_t *tofree=pNodeInfo.oldptr; @@ -96,14 +96,21 @@ void *prefetchobjstrAlloc(unsigned int size) { } void clearBlock(objstr_t *block) { + unsigned long int tmpbegin=(unsigned int)block; unsigned long int tmpend=(unsigned int)block->top; int i, j; prehashlistnode_t *ptr; - pthread_mutex_lock(&pflookup.lock); + int lockindex=0; ptr = pflookup.table; + volatile unsigned int * lockptr_current=&pflookup.larray[lockindex].lock; + while(!write_trylock(lockptr_current)) { + sched_yield(); + } + for(i = 0; inext; @@ -112,10 +119,10 @@ void clearBlock(objstr_t *block) { if ((val>=tmpbegin)&(valnext=next->next; free(next); - next=tmp; + next=curr; //loop condition is broken now...need to check before incrementing - if (next==NULL) - break; + // if (next==NULL) + // break; } } { @@ -133,8 +140,21 @@ void clearBlock(objstr_t *block) { } } } - } - pthread_mutex_unlock(&pflookup.lock); + + if(((i+1)&(pflookup.mask>>4))==0 && (i+1)head-readbuffer->tail; + if (numbytes>buflen) + numbytes=buflen; + if (numbytes>0) { + memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes); + readbuffer->tail+=numbytes; + buflen-=numbytes; + buf+=numbytes; + } + if (buflen==0) { + return; + } + if (buflen>=MAXBUF) { + recv_data(fd, buf, buflen); + return; + } + + int maxbuf=MAXBUF; + int obufflen=buflen; + readbuffer->head=0; + + while (buflen > 0) { + int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0); + if (numbytes == -1) { + perror("recv"); + exit(0); + } + bytesRecv+=numbytes; + buflen-=numbytes; + readbuffer->head+=numbytes; + maxbuf-=numbytes; + } + memcpy(buf,readbuffer->buf,obufflen); + readbuffer->tail=obufflen; +} + int recv_data_errorcode(int fd, void *buf, int buflen) { #ifdef DEBUG printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen); @@ -896,26 +937,12 @@ remoteread: #endif objcopy = getRemoteObj(machinenumber, oid); - -#ifdef RECOVERY - if(transRetryFlag) { - restoreDuplicationState(machinenumber); -#ifdef DEBUG - printf("%s -> Recall transRead2\n",__func__); -#endif - return transRead2(oid); - } -#endif - - if(objcopy == NULL) { - printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); - return NULL; - } else { #ifdef TRANSSTATS LOGEVENT('R'); nRemoteSend++; #endif -#ifdef COMPILER + + if(objcopy!=NULL) { #ifdef CACHE //Copy object to prefetch cache pthread_mutex_lock(&prefetchcache_mutex); @@ -932,8 +959,25 @@ remoteread: memcpy(headerObj, objcopy, size+sizeof(objheader_t)); //make an entry in prefetch lookup hashtable prehashInsert(oid, headerObj); + LOGEVENT('B'); #endif + } +#ifdef RECOVERY + if(transRetryFlag) { + restoreDuplicationState(machinenumber); +#ifdef DEBUG + printf("%s -> Recall transRead2\n",__func__); +#endif + return transRead2(oid); + } +#endif + + if(objcopy == NULL) { + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); + return NULL; + } else { +#ifdef COMPILER return &objcopy[1]; #else return objcopy; @@ -951,11 +995,9 @@ objheader_t *transCreateObj(unsigned int size) { OID(tmp) = getNewOID(); tmp->notifylist = NULL; tmp->version = 1; - //tmp->rcount = 1; tmp->isBackup = 0; STATUS(tmp) = NEW; t_chashInsert(OID(tmp), tmp); - #ifdef COMPILER return &tmp[1]; //want space after object header #else @@ -981,7 +1023,6 @@ plistnode_t *createPiles() { chashlistnode_t * ptr = c_table; /* Represents number of bins in the chash table */ unsigned int size = c_size; - for(i = 0; i < size ; i++) { chashlistnode_t * curr = &ptr[i]; /* Inner loop to traverse the linked list of the cache lookupTable */ @@ -1008,14 +1049,28 @@ plistnode_t *createPiles() { mid = myIpAddr; } + //if(mid == myIpAddr) { pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + //} else { + // if(bit) + // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + // else + // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + //} if(numLiveHostsInSystem > 1) { - if(makedirty) { - STATUS(headeraddr) = DIRTY; - pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); - } - //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + if(makedirty) { + STATUS(headeraddr) = DIRTY; + //if(mid == myIpAddr) { + // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + //} else { + // if(bit) + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + // else + // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + // } + } + //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); } #else // Get machine location for object id (and whether local or not) @@ -1113,7 +1168,7 @@ int transCommit() { int treplyretryCount = 0; /* Initialize timeout for exponential delay */ exponential_backoff.tv_sec = 0; - exponential_backoff.tv_nsec = (long)(10000);//10 microsec + exponential_backoff.tv_nsec = (long)(12000);//12 microsec count_exponential_backoff = 0; do { treplyretry = 0; @@ -1208,7 +1263,6 @@ int transCommit() { } int offset = 0; int i; - //printf("tosend[sockindex].f.nummod = %d\n", tosend[sockindex].f.nummod); for(i = 0; i < tosend[sockindex].f.nummod ; i++) { int size; objheader_t *headeraddr; @@ -1220,15 +1274,12 @@ int transCommit() { return 1; } GETSIZE(size,headeraddr); - //printf("i= %d, tmpsize= %d, oid= %u\n", i, size, OID(headeraddr)); size+=sizeof(objheader_t); memcpy(modptr+offset, headeraddr, size); offset+=size; } - //printf("tosend[sockindex].f.sum_bytes= %d\n", tosend[sockindex].f.sum_bytes); - //fflush(stdout); send_data(sd, modptr, tosend[sockindex].f.sum_bytes); - //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); + //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); #ifdef RECOVERY /* send transaction id, number of machine involved, machine ids */ @@ -1288,13 +1339,7 @@ int transCommit() { GETSIZE(size, header); size += sizeof(objheader_t); //make an entry in prefetch hash table - void *oldptr; - if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { - prehashRemove(oidToPrefetch); - prehashInsert(oidToPrefetch, header); - } else { - prehashInsert(oidToPrefetch, header); - } + prehashInsert(oidToPrefetch, header); length = length - size; offset += size; } @@ -1415,7 +1460,7 @@ int transCommit() { pDelete(pile_ptr); /* wait a random amount of time before retrying to commit transaction*/ if(treplyretry) { - treplyretryCount++; + //treplyretryCount++; //if(treplyretryCount >= NUM_TRY_TO_COMMIT) // exponentialdelay(); //else @@ -1592,7 +1637,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { return TRANS_ABORT; #ifdef CACHE /* clear objects from prefetch cache */ - cleanPCache(); + //cleanPCache(); #endif } else if(transagree == pilecount) { /* Send Commit */ @@ -1810,7 +1855,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ *getReplyCtrl = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } @@ -2360,19 +2404,20 @@ int getPrefetchResponse(int sd) { /* Do a version comparison if the oid exists */ if((oldptr = prehashSearch(oid)) != NULL) { /* If older version then update with new object ptr */ - if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { - prehashRemove(oid); + if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) { prehashInsert(oid, modptr); } } else { /* Else add the object ptr to hash table*/ prehashInsert(oid, modptr); } +#if 0 /* Lock the Prefetch Cache look up table*/ pthread_mutex_lock(&pflookup.lock); /* Broadcast signal on prefetch cache condition variable */ pthread_cond_broadcast(&pflookup.cond); /* Unlock the Prefetch Cache look up table*/ pthread_mutex_unlock(&pflookup.lock); +#endif } else if(control == OBJECT_NOT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); /* TODO: For each object not found query DHT for new location and retrieve the object */