From a57acbb4d587f41aea4d344632e77f27081df7de Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 18 May 2010 22:38:40 +0000 Subject: [PATCH] optimized picking of read-set during pile creation --- .../Runtime/DSTM/interface_recovery/trans.c | 152 ++++++++++++++---- 1 file changed, 118 insertions(+), 34 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index d5d433ef..731fab8b 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -865,7 +865,10 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { /* Look up in machine lookup table and copy into cache*/ GETSIZE(size, objheader); size += sizeof(objheader_t); - objcopy = (objheader_t *) objstrAlloc(&t_cache, size); + if((objcopy = (objheader_t *) objstrAlloc(&t_cache, size)) == NULL) { + printf("DEBUG: %s() mlookup objcopy= %x\n", __func__, objcopy); + exit(-1); + } memcpy(objcopy, objheader, size); /* Insert into cache's lookup table */ STATUS(objcopy)=0; @@ -895,7 +898,10 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { /* Look up in prefetch cache */ GETSIZE(size, tmp); size+=sizeof(objheader_t); - objcopy = (objheader_t *) objstrAlloc(&t_cache, size); + if((objcopy = (objheader_t *) objstrAlloc(&t_cache, size)) == NULL) { + printf("DEBUG: %s() prefetch cache objcopy= %x\n", __func__, objcopy); + exit(-1); + } memcpy(objcopy, tmp, size); /* Insert into cache's lookup table */ t_chashInsert(OID(tmp), objcopy); @@ -961,6 +967,7 @@ remoteread: } pthread_mutex_unlock(&prefetchcache_mutex); memcpy(headerObj, objcopy, size+sizeof(objheader_t)); + //printf("%s() DEBUG: type=%d\n",__func__, TYPE(headerObj)); //make an entry in prefetch lookup hashtable prehashInsert(oid, headerObj); LOGEVENT('B'); @@ -1018,6 +1025,13 @@ plistnode_t *createPiles() { chashlistnode_t * ptr = c_table; /* Represents number of bins in the chash table */ unsigned int size = c_size; +#ifdef RECOVERY + int phostindex[numHostsInSystem]; + int k; + for(k=0; kval; #ifdef RECOVERY + oid = OID(headeraddr); int makedirty = 0; @@ -1044,28 +1059,70 @@ plistnode_t *createPiles() { mid = myIpAddr; } - //if(mid == myIpAddr) { - pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); - //} else { - // if(bit) - // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); - // else - // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); - //} + int selectMid=0; + if(mid == myIpAddr) { + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + if(!checkIndex(findHost(myIpAddr), phostindex)) { + phostindex[hostIndex++] = findHost(myIpAddr); + } + } else { + int pindex = findHost(mid);//primary copy's index + int bindex = (findHost(mid) + 1) % numHostsInSystem;//backup copy's index + if(checkIndex(pindex, phostindex)) { + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + selectMid = 1; + } else if(checkIndex(bindex, phostindex)) { + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + } else { + //check if any indexes present in the phostindex arry is odd or even + int chktype; + if((chktype = typeIndex(phostindex)) != -1) { + if(chktype == 1) { //odd indexed machines + //pick up either backup or primary copy based on the type of previous indexes + if((pindex%2) == 0) { + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + phostindex[hostIndex++] = bindex; + } else { + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + selectMid = 1; + phostindex[hostIndex++] = pindex; + } + } else { //even indexed machines + if((pindex%2) == 0) { + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + selectMid = 1; + phostindex[hostIndex++] = pindex; + } else { + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + phostindex[hostIndex++] = bindex; + } + } + } else { + if(((myIpAddr%2 == 0) && ((mid%2) == 0)) || ((myIpAddr%2 == 0) && ((mid%2) == 0))) { + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + selectMid = 1; + phostindex[hostIndex++] = pindex; + } else { + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + phostindex[hostIndex++] = bindex; + } + } + } + } if(numLiveHostsInSystem > 1) { if(makedirty) { STATUS(headeraddr) = DIRTY; - //if(mid == myIpAddr) { - // pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); - //} else { - // if(bit) - pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); - // else - // pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); - // } + if(mid == myIpAddr) { + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + } else { + if(selectMid) { + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + } else { + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + } + } } - //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); } #else // Get machine location for object id (and whether local or not) @@ -1120,6 +1177,34 @@ plistnode_t *createPiles() { } #endif +/** + * This function return 0 if indexes present are even + * returns 1 if indexes present are odd + * return -1 if indexes present are -1 +**/ +int typeIndex(int *phostindex) { + if(phostindex[0] == -1) + return -1; + if((phostindex[0]%2) == 0) + return 0; + else + return 1; +} + +/** + * This function returns 1 is pindex is found + * in the phostindex array else + * returns 0 + **/ +int checkIndex(int pindex, int *phostindex) { + int i; + for(i=0; i New pile:[%s],", __func__, midtoIPString(pile->mid)); @@ -1229,7 +1315,7 @@ int transCommit() { tosend[sockindex].oidcreated = pile->oidcreated; - int sd = 0; + int sd = 0; if(pile->mid != myIpAddr) { if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) { printf("\ntransRequest(): socket create error\n"); @@ -1285,7 +1371,7 @@ int transCommit() { offset+=size; } send_data(sd, modptr, tosend[sockindex].f.sum_bytes); - //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); + //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); free(modptr); } else { //handle request locally @@ -1294,7 +1380,7 @@ int transCommit() { sockindex++; pile = pile->next; } //end of pile processing - + /* Recv Ctrl msgs from all machines */ #ifdef DEBUG printf("%s-> Finished sending transaction read/mod objects\n",__func__); @@ -1356,8 +1442,7 @@ int transCommit() { #endif } } - - /* Decide the final response */ + /* Decide the final response */ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); @@ -1404,7 +1489,7 @@ int transCommit() { free(listmid); return 1; } - + #ifdef ABORTREADERS removetransaction(tosend[i].oidmod,tosend[i].f.nummod); removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); @@ -1436,8 +1521,7 @@ int transCommit() { } #endif } - - + for(i = 0; i< pilecount; i++) { if(socklist[i] > 0) { freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]); @@ -1451,7 +1535,11 @@ int transCommit() { pDelete(pile_ptr); /* wait a random amount of time before retrying to commit transaction*/ if(treplyretry) { - randomdelay(); + //treplyretryCount++; + //if(treplyretryCount >= NUM_TRY_TO_COMMIT) + // exponentialdelay(); + //else + randomdelay(); #ifdef TRANSSTATS nSoftAbort++; #endif @@ -1459,7 +1547,6 @@ int transCommit() { } while (treplyretry && deadmid != -1); #ifdef RECOVERY - //=========== after transaction point tlist_node_t* tNode = tlistSearch(transList,transID); inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER); @@ -2166,7 +2253,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne //Keep track of what is locked oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj)); - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } else { //A lock is acquired some place else @@ -2205,7 +2291,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign *getReplyCtrl = TRANS_DISAGREE; //Keep track of what is locked oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj)); - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } else { //Has reached max number of readers or some other transaction @@ -2216,7 +2301,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ *getReplyCtrl = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } @@ -2293,9 +2377,9 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { header->version += 1; if(header->notifylist != NULL) { #ifdef RECOVERY - printf("%s -> to notifyAll\n",__func__); + //printf("%s -> to notifyAll\n",__func__); if(header->isBackup == 0) { // if it is primary obj, notify - printf("%s -> Called notifyAll\n",__func__); + //printf("%s -> Called notifyAll\n",__func__); notifyAll(&header->notifylist, OID(header), header->version); } else // if not, just clear the notification list -- 2.34.1