From: bdemsky Date: Fri, 27 Feb 2009 04:34:51 +0000 (+0000) Subject: Thread local hack to make transRead faster... X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=97ffd10a70e49bd94cdf8fe41b07ba3d2778e00f;p=IRC.git Thread local hack to make transRead faster... 1) make transrecords threadlocal so we don't pass them around anymore 2) inline the clookup into the code...call transread only if we miss on the transaction cache This doesn't help much for the current benchmark set, but should make a significant difference if benchmarks read fields a lot. --- diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index 0d224a06..0e69ab91 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -1258,13 +1258,6 @@ public class BuildCode { printcomma=true; } - if (state.DSM&&lb.isAtomic()&&!md.getModifiers().isNative()) { - if (printcomma) - headersout.print(", "); - headersout.print("transrecord_t * trans"); - printcomma=true; - } - /* Output parameter list*/ for(int i=0; i0) { - output.println("if (needtocollect) checkcollect2(&"+localsprefix+",trans);"); + output.println("if (needtocollect) checkcollect2(&"+localsprefix+");"); } else output.println("if (needtocollect) checkcollect(&"+localsprefix+");"); } else @@ -1794,7 +1784,7 @@ public class BuildCode { return; /* Have to generate flat globalconv */ if (fgcn.getMakePtr()) { - output.println(generateTemp(fm, fgcn.getSrc(),lb)+"=(void *)transRead(trans, (unsigned int) "+generateTemp(fm, fgcn.getSrc(),lb)+");"); + output.println("TRANSREAD("+generateTemp(fm, fgcn.getSrc(),lb)+", (unsigned int) "+generateTemp(fm, fgcn.getSrc(),lb)+");"); } else { /* Need to convert to OID */ if (fgcn.doConvert()) { @@ -1853,11 +1843,10 @@ public class BuildCode { /******* Tell the runtime to start the transaction *******/ output.println("transstart"+faen.getIdentifier()+":"); - output.println("trans=transStart();"); + output.println("transStart();"); if (state.ABORTREADERS) { - output.println("if (_setjmp(trans->aborttrans)) {"); - output.println(" free(trans);"); + output.println("if (_setjmp(aborttrans)) {"); output.println(" goto transretry"+faen.getIdentifier()+"; }"); } } @@ -1868,8 +1857,8 @@ public class BuildCode { return; //store the revert list before we lose the transaction object String revertptr=generateTemp(fm, reverttable.get(lb),lb); - output.println(revertptr+"=trans->revertlist;"); - output.println("if (transCommit(trans)) {"); + output.println(revertptr+"=revertlist;"); + output.println("if (transCommit()) {"); /* Transaction aborts if it returns true */ output.println("goto transretry"+faen.getAtomicEnter().getIdentifier()+";"); output.println("} else {"); @@ -1999,13 +1988,6 @@ public class BuildCode { output.print(temp.getType().getSafeSymbol()); } - if (state.DSM&&locality.getBinding(lb,fc).isAtomic()&&!fc.getMethod().getModifiers().isNative()) { - LocalityBinding fclb=locality.getBinding(lb, fc); - if (printcomma) - output.print(", "); - output.print("transrecord_t *"); - printcomma=true; - } if (state.DSM) { LocalityBinding fclb=locality.getBinding(lb, fc); @@ -2021,13 +2003,6 @@ public class BuildCode { needcomma=true; } - if (state.DSM&&locality.getBinding(lb,fc).isAtomic()&&!fc.getMethod().getModifiers().isNative()) { - if (needcomma) - output.print(","); - output.print("trans"); - needcomma=true; - } - if (!GENERATEPRECISEGC) { if (fc.getThis()!=null) { TypeDescriptor ptd=md.getThis().getType(); @@ -2098,7 +2073,7 @@ public class BuildCode { //} else { output.println(dst+"="+ src +"->"+field+ ";"); //output.println("if ("+dst+"&0x1) {"); - output.println(dst+"=(void *) transRead(trans, (unsigned int) "+dst+");"); + output.println("TRANSREAD("+dst+", (unsigned int) "+dst+");"); //output.println(src+"->"+field+"="+src+"->"+field+";"); //output.println("}"); //} @@ -2113,7 +2088,7 @@ public class BuildCode { String dst=generateTemp(fm, ffn.getDst(),lb); output.println(dst+"="+ src +"->"+field+ ";"); if (locality.getAtomic(lb).get(ffn).intValue()>0) - output.println(dst+"=(void *) transRead(trans, (unsigned int) "+dst+");"); + output.println("TRANSREAD("+dst+", (unsigned int) "+dst+");"); } else output.println(generateTemp(fm, ffn.getDst(),lb)+"="+ generateTemp(fm,ffn.getSrc(),lb)+"->"+ ffn.getField().getSafeSymbol()+";"); } else if (status==LocalityAnalysis.EITHER) { @@ -2158,13 +2133,13 @@ public class BuildCode { output.println("if(!"+dst+"->"+localcopystr+") {"); /* Link object into list */ String revertptr=generateTemp(fm, reverttable.get(lb),lb); - output.println(revertptr+"=trans->revertlist;"); + output.println(revertptr+"=revertlist;"); if (GENERATEPRECISEGC) output.println("COPY_OBJ((struct garbagelist *)&"+localsprefix+",(struct ___Object___ *)"+dst+");"); else output.println("COPY_OBJ("+dst+");"); output.println(dst+"->"+nextobjstr+"="+revertptr+";"); - output.println("trans->revertlist=(struct ___Object___ *)"+dst+";"); + output.println("revertlist=(struct ___Object___ *)"+dst+";"); output.println("}"); if (srcglobal) output.println(dst+"->"+ fsfn.getField().getSafeSymbol()+"=srcoid;"); @@ -2221,7 +2196,7 @@ public class BuildCode { if (elementtype.isPtr()) { output.println(dst +"=(("+ type+"*)(((char *) &("+ generateTemp(fm,fen.getSrc(),lb)+"->___length___))+sizeof(int)))["+generateTemp(fm, fen.getIndex(),lb)+"];"); - output.println(dst+"=(void *) transRead(trans, (unsigned int) "+dst+");"); + output.println("TRANSREAD("+dst+", "+dst+");"); } else { output.println(dst +"=(("+ type+"*)(((char *) &("+ generateTemp(fm,fen.getSrc(),lb)+"->___length___))+sizeof(int)))["+generateTemp(fm, fen.getIndex(),lb)+"];"); } @@ -2274,13 +2249,13 @@ public class BuildCode { output.println("if(!"+dst+"->"+localcopystr+") {"); /* Link object into list */ String revertptr=generateTemp(fm, reverttable.get(lb),lb); - output.println(revertptr+"=trans->revertlist;"); + output.println(revertptr+"=revertlist;"); if (GENERATEPRECISEGC) output.println("COPY_OBJ((struct garbagelist *)&"+localsprefix+",(struct ___Object___ *)"+dst+");"); else output.println("COPY_OBJ("+dst+");"); output.println(dst+"->"+nextobjstr+"="+revertptr+";"); - output.println("trans->revertlist=(struct ___Object___ *)"+dst+";"); + output.println("revertlist=(struct ___Object___ *)"+dst+";"); output.println("}"); } else throw new Error("Unknown array type"); if (srcglobal) { @@ -2313,12 +2288,12 @@ public class BuildCode { if (state.DSM && locality.getAtomic(lb).get(fn).intValue()>0&&!fn.isGlobal()) { //Stash pointer in case of GC String revertptr=generateTemp(fm, reverttable.get(lb),lb); - output.println(revertptr+"=trans->revertlist;"); + output.println(revertptr+"=revertlist;"); } if (fn.getType().isArray()) { int arrayid=state.getArrayNumber(fn.getType())+state.numClasses(); if (fn.isGlobal()) { - output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarrayglobal(trans, "+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");"); + output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarrayglobal("+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");"); } else if (GENERATEPRECISEGC) { output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarray(&"+localsprefix+", "+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");"); } else { @@ -2326,7 +2301,7 @@ public class BuildCode { } } else { if (fn.isGlobal()) { - output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newglobal(trans, "+fn.getType().getClassDesc().getId()+");"); + output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newglobal("+fn.getType().getClassDesc().getId()+");"); } else if (GENERATEPRECISEGC) { output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_new(&"+localsprefix+", "+fn.getType().getClassDesc().getId()+");"); } else { @@ -2338,7 +2313,7 @@ public class BuildCode { String dst=generateTemp(fm,fn.getDst(),lb); output.println(dst+"->___localcopy___=(struct ___Object___*)1;"); output.println(dst+"->"+nextobjstr+"="+revertptr+";"); - output.println("trans->revertlist=(struct ___Object___ *)"+dst+";"); + output.println("revertlist=(struct ___Object___ *)"+dst+";"); } if (state.FASTCHECK) { String dst=generateTemp(fm,fn.getDst(),lb); @@ -2410,13 +2385,13 @@ public class BuildCode { if (state.DSM && locality.getAtomic(lb).get(fln).intValue()>0) { //Stash pointer in case of GC String revertptr=generateTemp(fm, reverttable.get(lb),lb); - output.println(revertptr+"=trans->revertlist;"); + output.println(revertptr+"=revertlist;"); } output.println(generateTemp(fm, fln.getDst(),lb)+"=NewString(&"+localsprefix+", \""+FlatLiteralNode.escapeString((String)fln.getValue())+"\","+((String)fln.getValue()).length()+");"); if (state.DSM && locality.getAtomic(lb).get(fln).intValue()>0) { //Stash pointer in case of GC String revertptr=generateTemp(fm, reverttable.get(lb),lb); - output.println("trans->revertlist="+revertptr+";"); + output.println("revertlist="+revertptr+";"); } } else { output.println(generateTemp(fm, fln.getDst(),lb)+"=NewString(\""+FlatLiteralNode.escapeString((String)fln.getValue())+"\","+((String)fln.getValue()).length()+");"); @@ -2493,13 +2468,6 @@ public class BuildCode { printcomma=true; } - if (state.DSM&&lb.isAtomic()) { - if (printcomma) - output.print(", "); - output.print("transrecord_t * trans"); - printcomma=true; - } - if (md!=null) { /* Method */ for(int i=0; inumreaders++; for(i=0;iarray[i]==NULL) { - rl->array[i]=trans; + rl->array[i]=&t_abort; pthread_mutex_unlock(&aborttablelock); return; } @@ -60,9 +60,9 @@ void removetransaction(unsigned int oidarray[], unsigned int numoids) { int count=rl->numreaders; int j; for(j=0;count;j++) { - struct transrecord *trans=rl->array[j]; - if (trans!=NULL) { - trans->abort=1;//It's okay to set our own abort flag...it is + int *t_abort=rl->array[j]; + if (t_abort!=NULL) { + *t_abort=1;//It's okay to set our own abort flag...it is //too late to abort us count--; } @@ -76,7 +76,7 @@ void removetransaction(unsigned int oidarray[], unsigned int numoids) { pthread_mutex_unlock(&aborttablelock); } -void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids, struct transrecord * trans) { +void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids) { int i,j; pthread_mutex_lock(&aborttablelock); for(i=0;iarray[j]==trans) { + if (rl->array[j]==&t_abort) { rl->array[j]=NULL; if ((--rl->numreaders)==0) { if (first==rl) { @@ -113,12 +113,11 @@ void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids, pthread_mutex_unlock(&aborttablelock); } -void removetransactionhash(chashtable_t *table, struct transrecord *trans) { - chashlistnode_t *ptr=table->table; - unsigned int size=table->size; +void removetransactionhash() { + chashlistnode_t *ptr=c_table; int i,j; pthread_mutex_lock(&aborttablelock); - for(i=0;ikey; @@ -128,7 +127,7 @@ void removetransactionhash(chashtable_t *table, struct transrecord *trans) { struct readerlist *first=rl; while(rl!=NULL) { for(j=0;jarray[j]==trans) { + if (rl->array[j]==&t_abort) { rl->array[j]=NULL; if ((--rl->numreaders)==0) { if (first==rl) { @@ -157,7 +156,7 @@ void removetransactionhash(chashtable_t *table, struct transrecord *trans) { } -void removethistransaction(unsigned int oidarray[], unsigned int numoids, struct transrecord * trans) { +void removethistransaction(unsigned int oidarray[], unsigned int numoids) { int i,j; pthread_mutex_lock(&aborttablelock); for(i=0;iarray[j]==trans) { + if (rl->array[j]==&t_abort) { rl->array[j]=NULL; if ((--rl->numreaders)==0) { if (first==rl) { diff --git a/Robust/src/Runtime/DSTM/interface/abortreaders.h b/Robust/src/Runtime/DSTM/interface/abortreaders.h index a3801cc2..90fe2279 100644 --- a/Robust/src/Runtime/DSTM/interface/abortreaders.h +++ b/Robust/src/Runtime/DSTM/interface/abortreaders.h @@ -5,15 +5,15 @@ #define READERSIZE 8 struct readerlist { - struct transrecord *array[READERSIZE]; + int *array[READERSIZE]; int numreaders; struct readerlist * next; }; void initreaderlist(); -void addtransaction(unsigned int oid, struct transrecord * trans); +void addtransaction(unsigned int oid); void removetransaction(unsigned int oidarray[], unsigned int numoids); -void removethistransaction(unsigned int oidarray[], unsigned int numoids, struct transrecord * trans); -void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids, struct transrecord * trans); -void removetransactionhash(chashtable_t *table, struct transrecord *trans); +void removethistransaction(unsigned int oidarray[], unsigned int numoids); +void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids); +void removetransactionhash(); #endif diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c index fedcb919..ca7c845a 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c @@ -60,10 +60,9 @@ void handleDynPrefetching(int numLocal, int ntuples, int siteid) { #if 1 /* This function clears from prefetch cache those * entries that caused a transaction abort */ -void cleanPCache(transrecord_t *record) { - transrecord_t *rec = record; - unsigned int size = rec->lookupTable->size; - chashlistnode_t *ptr = rec->lookupTable->table; +void cleanPCache() { + unsigned int size = c_size; + chashlistnode_t *ptr = c_table; int i; for(i = 0; i < size; i++) { chashlistnode_t *curr = &ptr[i]; //for each entry in the cache lookupTable @@ -83,10 +82,9 @@ void cleanPCache(transrecord_t *record) { #else /* This function clears from prefetch cache those * entries that caused a transaction abort */ -void cleanPCache(transrecord_t *record) { - transrecord_t *rec = record; - unsigned int size = rec->lookupTable->size; - struct chashentry *ptr = rec->lookupTable->table; +void cleanPCache() { + unsigned int size = c_size; + struct chashentry *ptr = c_table; int i; for(i = 0; i < size; i++) { struct chashentry *curr = &ptr[i]; //for each entry in the cache lookupTable @@ -106,19 +104,19 @@ void cleanPCache(transrecord_t *record) { * entries from the transaction cache when a * transaction commits * Return -1 on error else returns 0 */ -int updatePrefetchCache(trans_req_data_t *tdata, transrecord_t *rec) { +int updatePrefetchCache(trans_req_data_t *tdata) { int retval; char oidType; oidType = 'R'; if(tdata->f.numread > 0) { - if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), rec, oidType)) != 0) { + if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) { printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); return -1; } } if(tdata->f.nummod > 0) { oidType = 'M'; - if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, rec, oidType)) != 0) { + if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) { printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); return -1; } @@ -126,7 +124,7 @@ int updatePrefetchCache(trans_req_data_t *tdata, transrecord_t *rec) { return 0; } -int copyToCache(int numoid, unsigned int *oidarray, transrecord_t *rec, char oidType) { +int copyToCache(int numoid, unsigned int *oidarray, char oidType) { int i; for (i = 0; i < numoid; i++) { unsigned int oid; @@ -139,7 +137,7 @@ int copyToCache(int numoid, unsigned int *oidarray, transrecord_t *rec, char oid } pthread_mutex_lock(&prefetchcache_mutex); objheader_t * header; - if((header = (objheader_t *) chashSearch(rec->lookupTable, oid)) == NULL) { + if((header = (objheader_t *) t_chashSearch(oid)) == NULL) { printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__); fflush(stdout); return -1; diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h index 93dd853e..44c87049 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h @@ -17,8 +17,8 @@ int getRetryCount(int siteid); int getUselessCount(int siteid); char getOperationMode(int); void handleDynPrefetching(int, int, int); -void cleanPCache(transrecord_t *); -int updatePrefetchCache(trans_req_data_t *, transrecord_t *); -int copyToCache(int, unsigned int *, transrecord_t *rec, char); +void cleanPCache(); +int updatePrefetchCache(trans_req_data_t *); +int copyToCache(int, unsigned int *, char); #endif diff --git a/Robust/src/Runtime/DSTM/interface/clookup.c b/Robust/src/Runtime/DSTM/interface/clookup.c index 05d9401e..28dda702 100644 --- a/Robust/src/Runtime/DSTM/interface/clookup.c +++ b/Robust/src/Runtime/DSTM/interface/clookup.c @@ -1,5 +1,27 @@ #include "clookup.h" -#define INLINE inline __attribute__((always_inline)) + +__thread chashlistnode_t *c_table; +__thread unsigned int c_size; +__thread unsigned int c_mask; +__thread unsigned int c_numelements; +__thread unsigned int c_threshold; +__thread double c_loadfactor; + +void t_chashCreate(unsigned int size, double loadfactor) { + chashtable_t *ctable; + chashlistnode_t *nodes; + int i; + + // Allocate space for the hash table + + + c_table = calloc(size, sizeof(chashlistnode_t)); + c_loadfactor = loadfactor; + c_size = size; + c_threshold=size*loadfactor; + c_mask = (size << 1)-1; + c_numelements = 0; // Initial number of elements in the hash +} chashtable_t *chashCreate(unsigned int size, double loadfactor) { chashtable_t *ctable; @@ -38,6 +60,7 @@ static INLINE unsigned int chashFunction(chashtable_t *table, unsigned int key) void chashInsert(chashtable_t *table, unsigned int key, void *val) { chashlistnode_t *ptr; + if(table->numelements > (table->threshold)) { //Resize unsigned int newsize = table->size << 1; @@ -74,6 +97,47 @@ INLINE void * chashSearch(chashtable_t *table, unsigned int key) { return NULL; } +//Store objects and their pointers into hash +void t_chashInsert(unsigned int key, void *val) { + chashlistnode_t *ptr; + + + if(c_numelements > (c_threshold)) { + //Resize + unsigned int newsize = c_size << 1; + t_chashResize(newsize); + } + + ptr = &c_table[(key&c_mask)>>1]; + c_numelements++; + + if(ptr->key==0) { + ptr->key=key; + ptr->val=val; + } else { // Insert in the beginning of linked list + chashlistnode_t * node = calloc(1, sizeof(chashlistnode_t)); + node->key = key; + node->val = val; + node->next = ptr->next; + ptr->next=node; + } +} + +// Search for an address for a given oid +INLINE void * t_chashSearch(unsigned int key) { + //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE + chashlistnode_t *node = &c_table[(key & c_mask)>>1]; + + do { + if(node->key == key) { + return node->val; + } + node = node->next; + } while(node != NULL); + + return NULL; +} + unsigned int chashRemove(chashtable_t *table, unsigned int key) { return chashRemove2(table, key)==NULL; @@ -179,6 +243,70 @@ unsigned int chashResize(chashtable_t *table, unsigned int newsize) { return 0; } +unsigned int t_chashResize(unsigned int newsize) { + chashlistnode_t *node, *ptr, *curr; // curr and next keep track of the current and the next chashlistnodes in a linked list + unsigned int oldsize; + int isfirst; // Keeps track of the first element in the chashlistnode_t for each bin in hashtable + unsigned int i,index; + unsigned int mask; + + ptr = c_table; + oldsize = c_size; + + if((node = calloc(newsize, sizeof(chashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + c_table = node; //Update the global hashtable upon resize() + c_size = newsize; + c_threshold = newsize * c_loadfactor; + mask=c_mask = (newsize << 1)-1; + + for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table + curr = &ptr[i]; + isfirst = 1; + do { //Inner loop to go through linked lists + unsigned int key; + chashlistnode_t *tmp,*next; + + if ((key=curr->key) == 0) { //Exit inner loop if there the first element is 0 + break; //key = val =0 for element if not present within the hash table + } + next = curr->next; + index = (key & mask) >>1; + tmp=&node[index]; + // Insert into the new table + if(tmp->key == 0) { + tmp->key = curr->key; + tmp->val = curr->val; + if (!isfirst) { + free(curr); + } + }/* + NOTE: Add this case if you change this... + This case currently never happens because of the way things rehash.... + else if (isfirst) { + chashlistnode_t *newnode= calloc(1, sizeof(chashlistnode_t)); + newnode->key = curr->key; + newnode->val = curr->val; + newnode->next = tmp->next; + tmp->next=newnode; + } */ + else { + curr->next=tmp->next; + tmp->next=curr; + } + + isfirst = 0; + curr = next; + } while(curr!=NULL); + } + + free(ptr); //Free the memory of the old hash table + return 0; +} + //Delete the entire hash table void chashDelete(chashtable_t *ctable) { int i; @@ -195,3 +323,19 @@ void chashDelete(chashtable_t *ctable) { free(ptr); free(ctable); } + +//Delete the entire hash table +void t_chashDelete() { + int i; + chashlistnode_t *ptr = c_table; + + for(i=0 ; inext; + free(curr); + curr=next; + } + } + free(ptr); +} diff --git a/Robust/src/Runtime/DSTM/interface/clookup.h b/Robust/src/Runtime/DSTM/interface/clookup.h index c3348d21..2382208e 100644 --- a/Robust/src/Runtime/DSTM/interface/clookup.h +++ b/Robust/src/Runtime/DSTM/interface/clookup.h @@ -7,6 +7,9 @@ #define CLOADFACTOR 0.25 #define CHASH_SIZE 1024 +#define INLINE inline __attribute__((always_inline)) + + typedef struct chashlistnode { unsigned int key; void *val; //this can be cast to another type or used to point to a larger structure @@ -22,6 +25,13 @@ typedef struct chashtable { double loadfactor; } chashtable_t; + +void t_chashCreate(unsigned int size, double loadfactor); +void t_chashInsert(unsigned int key, void *val); +void * t_chashSearch(unsigned int key); +unsigned int t_chashResize(unsigned int newsize); +void t_chashDelete(); + /* Prototypes for hash*/ chashtable_t *chashCreate(unsigned int size, double loadfactor); static unsigned int chashFunction(chashtable_t *table, unsigned int key); @@ -33,5 +43,11 @@ unsigned int chashResize(chashtable_t *table, unsigned int newsize); void chashDelete(chashtable_t *table); /* end hash */ -#endif +extern __thread chashlistnode_t *c_table; +extern __thread unsigned int c_size; +extern __thread unsigned int c_mask; +extern __thread unsigned int c_numelements; +extern __thread unsigned int c_threshold; +extern __thread double c_loadfactor; +#endif diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 28dc122f..a7439836 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -163,18 +163,6 @@ typedef struct oidmidpair { unsigned int mid; } oidmidpair_t; -typedef struct transrecord { - objstr_t *cache; - chashtable_t *lookupTable; -#ifdef COMPILER - struct ___Object___ * revertlist; -#endif -#ifdef ABORTREADERS - int abort; - jmp_buf aborttrans; -#endif -} transrecord_t; - // Structure is a shared structure that keeps track of responses from the participants typedef struct thread_response { char rcv_status; @@ -214,33 +202,6 @@ typedef struct trans_commit_data { #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id); -/* Structure for passing multiple arguments to a thread - * spawned to process each transaction on a machine */ -typedef struct thread_data_array { - int thread_id; - int mid; - trans_req_data_t *buffer; /* Holds trans request information sent to a participant, based on threadid */ - thread_response_t *recvmsg; /* Shared datastructure to keep track of the participants response to a trans request */ - pthread_cond_t *threshold; /* Condition var to wake up a thread */ - pthread_mutex_t *lock; /* Lock for counting participants response */ - int *count; /* Shared variable to count responses from all participants to the TRANS_REQUEST protocol */ - char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */ - char *replyretry; /* Shared variable that keep track if coordinator needs retry */ - transrecord_t *rec; /* Shared variable transaction record send to all thread data */ -} thread_data_array_t; - - -//Structure for passing arguments to the local m/c thread -typedef struct local_thread_data_array { - thread_data_array_t *tdata; /* Holds all the arguments send to a thread that is spawned when transaction commits */ - trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ -} local_thread_data_array_t; - -//Structure to store mid and socketid information -typedef struct midSocketInfo { - unsigned int mid; /* To communicate with mid use sockid in this data structure */ - int sockid; -} midSocketInfo_t; /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -286,18 +247,31 @@ void addHost(unsigned int); void mapObjMethod(unsigned short); void randomdelay(); -__attribute__((malloc)) transrecord_t *transStart(); -__attribute__((pure)) objheader_t *transRead(transrecord_t *, unsigned int); -objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header -int transCommit(transrecord_t *record); //return 0 if successful +void transStart(); +#define TRANSREAD(x,y) { \ + unsigned int inputvalue;\ +if ((inputvalue=(unsigned int)y)==0) x=NULL;\ +else { \ +chashlistnode_t * cnodetmp=&c_table[(inputvalue&c_mask)>>1]; \ +do { \ + if (cnodetmp->key==inputvalue) {x=(void *)&((objheader_t*)cnodetmp->val)[1];break;} \ +cnodetmp=cnodetmp->next;\ +if (cnodetmp==NULL) {x=(void *)transRead2(inputvalue);break;} \ +} while(1);\ +}} + +__attribute__((pure)) objheader_t *transRead(unsigned int); +__attribute__((pure)) objheader_t *transRead2(unsigned int); +objheader_t *transCreateObj(unsigned int); //returns oid header +int transCommit(); //return 0 if successful void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins -char decideResponse(char *, char *, transrecord_t *, int); // Coordinator decides what response to send to the participant -void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine -void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, transrecord_t *, char *); -int transComProcess(trans_req_data_t *, trans_commit_data_t *, transrecord_t *); -void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *, transrecord_t *); +char decideResponse(char *, char *, int); // Coordinator decides what response to send to the participant +void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine +void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, char *); +int transComProcess(trans_req_data_t *, trans_commit_data_t *); +void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *); int transAbortProcess(trans_commit_data_t *); -void transAbort(transrecord_t *trans); +void transAbort(); void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); @@ -305,7 +279,7 @@ void *mcqProcess(void *); prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets) int lookupObject(unsigned int * oid, short offset); int checkoid(unsigned int oid); -int transPrefetchProcess(transrecord_t *, int **, short); +int transPrefetchProcess(int **, short); void sendPrefetchReq(prefetchpile_t*, int); void sendPrefetchReqnew(prefetchpile_t*, int); int getPrefetchResponse(int); @@ -335,4 +309,6 @@ void swap(double *e1, double *e2); double avgofthreads(int siteid, int threadid); /* end transactions */ + +#include "trans.h" #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index e0a16ae7..42aef318 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -18,10 +18,20 @@ #ifdef ABORTREADERS #include "abortreaders.h" #endif +#include "trans.h" #define NUM_THREADS 1 #define CONFIG_FILENAME "dstm.conf" +/* Thread transaction variables */ + +__thread objstr_t *t_cache; +__thread struct ___Object___ *revertlist; +#ifdef ABORTREADERS +__thread int t_abort; +__thread jmp_buf aborttrans; +#endif + /* Global Variables */ extern int classsize[]; @@ -63,7 +73,7 @@ int bytesSent = 0; int bytesRecv = 0; void printhex(unsigned char *, int); -plistnode_t *createPiles(transrecord_t *); +plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); /******************************* @@ -323,18 +333,13 @@ void randomdelay() { } /* This function initializes things required in the transaction start*/ -__attribute__((malloc)) transrecord_t *transStart() { - transrecord_t *tmp; - if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) { - printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); - return NULL; - } - tmp->cache = objstrCreate(1048576); - tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR); - //#ifdef COMPILER - // tmp->revertlist=NULL; //Not necessary...already null - //#endif - return tmp; +void transStart() { + t_cache = objstrCreate(1048576); + t_chashCreate(CHASH_SIZE, CLOADFACTOR); + revertlist=NULL; +#ifdef ABORTREADERS + t_abort=0; +#endif } // Search for an address for a given oid @@ -355,22 +360,24 @@ INLINE void * chashSearchI(chashtable_t *table, unsigned int key) { }*/ + + /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ -__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) { +__attribute__((pure)) objheader_t *transRead(unsigned int oid) { unsigned int machinenumber; objheader_t *tmp, *objheader; objheader_t *objcopy; int size; void *buf; chashlistnode_t *node; - chashtable_t *table=record->lookupTable; if(oid == 0) { return NULL; } - node= &table->table[(oid & table->mask)>>1]; + + node= &c_table[(oid & c_mask)>>1]; do { if(node->key == oid) { #ifdef TRANSSTATS @@ -400,15 +407,96 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int */ #ifdef ABORTREADERS - if (record->abort) { + if (t_abort) { + //abort this transaction + //printf("ABORTING\n"); + removetransactionhash(); + objstrDelete(t_cache); + t_chashDelete(); + _longjmp(aborttrans,1); + } else + addtransaction(oid); +#endif + + if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { +#ifdef TRANSSTATS + nmhashSearch++; +#endif + /* Look up in machine lookup table and copy into cache*/ + GETSIZE(size, objheader); + size += sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); + memcpy(objcopy, objheader, size); + /* Insert into cache's lookup table */ + STATUS(objcopy)=0; + t_chashInsert(OID(objheader), objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } else { +#ifdef CACHE + if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { +#ifdef TRANSSTATS + nprehashSearch++; +#endif + /* Look up in prefetch cache */ + GETSIZE(size, tmp); + size+=sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); + memcpy(objcopy, tmp, size); + /* Insert into cache's lookup table */ + t_chashInsert(OID(tmp), objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } +#endif + /* Get the object from the remote location */ + if((machinenumber = lhashSearch(oid)) == 0) { + printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); + return NULL; + } + objcopy = getRemoteObj(machinenumber, oid); + + if(objcopy == NULL) { + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); + return NULL; + } else { +#ifdef TRANSSTATS + nRemoteSend++; +#endif +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } + } +} + + +/* This function finds the location of the objects involved in a transaction + * and returns the pointer to the object if found in a remote location */ +__attribute__((pure)) objheader_t *transRead2(unsigned int oid) { + unsigned int machinenumber; + objheader_t *tmp, *objheader; + objheader_t *objcopy; + int size; + +#ifdef ABORTREADERS + if (t_abort) { //abort this transaction //printf("ABORTING\n"); - removetransactionhash(record->lookupTable, record); - objstrDelete(record->cache); - chashDelete(record->lookupTable); - _longjmp(record->aborttrans,1); + removetransactionhash(); + objstrDelete(t_cache); + t_chashDelete(); + _longjmp(aborttrans,1); } else - addtransaction(oid,record); + addtransaction(oid); #endif if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { @@ -418,11 +506,11 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int /* Look up in machine lookup table and copy into cache*/ GETSIZE(size, objheader); size += sizeof(objheader_t); - objcopy = (objheader_t *) objstrAlloc(&record->cache, size); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); memcpy(objcopy, objheader, size); /* Insert into cache's lookup table */ STATUS(objcopy)=0; - chashInsert(record->lookupTable, OID(objheader), objcopy); + t_chashInsert(OID(objheader), objcopy); #ifdef COMPILER return &objcopy[1]; #else @@ -437,10 +525,10 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int /* Look up in prefetch cache */ GETSIZE(size, tmp); size+=sizeof(objheader_t); - objcopy = (objheader_t *) objstrAlloc(&record->cache, size); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); memcpy(objcopy, tmp, size); /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, OID(tmp), objcopy); + t_chashInsert(OID(tmp), objcopy); #ifdef COMPILER return &objcopy[1]; #else @@ -453,7 +541,7 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); return NULL; } - objcopy = getRemoteObj(record, machinenumber, oid); + objcopy = getRemoteObj(machinenumber, oid); if(objcopy == NULL) { printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); @@ -472,13 +560,13 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int } /* This function creates objects in the transaction record */ -objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { - objheader_t *tmp = (objheader_t *) objstrAlloc(&record->cache, (sizeof(objheader_t) + size)); +objheader_t *transCreateObj(unsigned int size) { + objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size)); OID(tmp) = getNewOID(); tmp->version = 1; tmp->rcount = 1; STATUS(tmp) = NEW; - chashInsert(record->lookupTable, OID(tmp), tmp); + t_chashInsert(OID(tmp), tmp); #ifdef COMPILER return &tmp[1]; //want space after object header @@ -490,14 +578,14 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { #if 1 /* This function creates machine piles based on all machines involved in a * transaction commit request */ -plistnode_t *createPiles(transrecord_t *record) { +plistnode_t *createPiles() { int i; plistnode_t *pile = NULL; unsigned int machinenum; objheader_t *headeraddr; - chashlistnode_t * ptr = record->lookupTable->table; + chashlistnode_t * ptr = c_table; /* Represents number of bins in the chash table */ - unsigned int size = record->lookupTable->size; + unsigned int size = c_size; for(i = 0; i < size ; i++) { chashlistnode_t * curr = &ptr[i]; @@ -517,7 +605,7 @@ plistnode_t *createPiles(transrecord_t *record) { } //Make machine groups - pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements); + pile = pInsert(pile, headeraddr, machinenum, c_numelements); curr = curr->next; } } @@ -526,14 +614,14 @@ plistnode_t *createPiles(transrecord_t *record) { #else /* This function creates machine piles based on all machines involved in a * transaction commit request */ -plistnode_t *createPiles(transrecord_t *record) { +plistnode_t *createPiles() { int i; plistnode_t *pile = NULL; unsigned int machinenum; objheader_t *headeraddr; - struct chashentry * ptr = record->lookupTable->table; + struct chashentry * ptr = c_table; /* Represents number of bins in the chash table */ - unsigned int size = record->lookupTable->size; + unsigned int size = c_size; for(i = 0; i < size ; i++) { struct chashentry * curr = & ptr[i]; @@ -552,7 +640,7 @@ plistnode_t *createPiles(transrecord_t *record) { } //Make machine groups - pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements); + pile = pInsert(pile, headeraddr, machinenum, c_numelements); } return pile; } @@ -563,7 +651,7 @@ plistnode_t *createPiles(transrecord_t *record) { * and creates new piles by calling the createPiles(), * Sends a transrequest() to each remote machines for objects found remotely * and calls handleLocalReq() to process objects found locally */ -int transCommit(transrecord_t *record) { +int transCommit() { unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; int trecvcount; @@ -573,13 +661,12 @@ int transCommit(transrecord_t *record) { char finalResponse; #ifdef ABORTREADERS - if (record->abort) { + if (t_abort) { //abort this transaction printf("ABORTING TRANSACTION AT COMMIT\n"); - removetransactionhash(record->lookupTable, record); - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + removetransactionhash(); + objstrDelete(t_cache); + t_chashDelete(); return 1; } #endif @@ -592,7 +679,7 @@ int transCommit(transrecord_t *record) { /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ if (firsttime) { - pile_ptr = pile = createPiles(record); + pile_ptr = pile = createPiles(); pile_ptr = pile = sortPiles(pile); } else { pile = pile_ptr; @@ -669,7 +756,7 @@ int transCommit(transrecord_t *record) { for(i = 0; i < tosend[sockindex].f.nummod ; i++) { int size; objheader_t *headeraddr; - if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) { + if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) { printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__); free(modptr); free(listmid); @@ -684,7 +771,7 @@ int transCommit(transrecord_t *record) { send_data(sd, modptr, tosend[sockindex].f.sum_bytes); free(modptr); } else { //handle request locally - handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]); + handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); } sockindex++; pile = pile->next; @@ -740,7 +827,7 @@ int transCommit(transrecord_t *record) { } } /* Decide the final response */ - if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) { + if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); @@ -755,7 +842,7 @@ int transCommit(transrecord_t *record) { if(finalResponse == TRANS_COMMIT) { int retval; /* Update prefetch cache */ - if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) { + if((retval = updatePrefetchCache(&(tosend[i]))) != 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); @@ -774,27 +861,27 @@ int transCommit(transrecord_t *record) { } #ifdef ABORTREADERS removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread, tosend[i].f.numread, record); + removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); #endif } #ifdef ABORTREADERS else if (!treplyretry) { - removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record); + removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); } #endif #endif send_data(sd, &finalResponse, sizeof(char)); } else { /* Complete local processing */ - doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record); + doLocalProcess(finalResponse, &(tosend[i]), &transinfo); #ifdef ABORTREADERS if(finalResponse == TRANS_COMMIT) { removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread, record); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); } else if (!treplyretry) { - removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record); + removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); } #endif } @@ -821,18 +908,16 @@ int transCommit(transrecord_t *record) { numTransAbort++; #endif /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + objstrDelete(t_cache); + t_chashDelete(); return TRANS_ABORT; } else if(finalResponse == TRANS_COMMIT) { #ifdef TRANSSTATS numTransCommit++; #endif /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + objstrDelete(t_cache); + t_chashDelete(); return 0; } else { //TODO Add other cases @@ -845,7 +930,7 @@ int transCommit(transrecord_t *record) { /* This function handles the local objects involved in a transaction * commiting process. It also makes a decision if this local machine * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */ -void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) { +void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) { unsigned int *oidnotfound = NULL, *oidlocked = NULL; int numoidnotfound = 0, numoidlocked = 0; int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; @@ -872,7 +957,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra } int tmpsize; objheader_t *headptr; - headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]); + headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]); if (headptr == NULL) { printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__); return; @@ -901,7 +986,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra } } -void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) { +void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { if(finalResponse == TRANS_ABORT) { if(transAbortProcess(transinfo) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); @@ -919,7 +1004,7 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da } } #endif - if(transComProcess(tdata, transinfo, record) != 0) { + if(transComProcess(tdata, transinfo) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); fflush(stdout); return; @@ -939,7 +1024,7 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da /* This function decides the reponse that needs to be sent to * all Participant machines after the TRANS_REQUEST protocol */ -char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) { +char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ for (i = 0 ; i < pilecount; i++) { @@ -970,7 +1055,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record return TRANS_ABORT; #ifdef CACHE /* clear objects from prefetch cache */ - cleanPCache(record); + cleanPCache(); #endif } else if(transagree == pilecount) { /* Send Commit */ @@ -989,7 +1074,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record * available and copies the object and its header to the local * cache. */ -void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { +void *getRemoteObj(unsigned int mnum, unsigned int oid) { int size, val; struct sockaddr_in serv_addr; char machineip[16]; @@ -1011,11 +1096,11 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { } else { /* Read object if found into local cache */ recv_data(sd, &size, sizeof(int)); - objcopy = objstrAlloc(&record->cache, size); + objcopy = objstrAlloc(&t_cache, size); recv_data(sd, objcopy, size); STATUS(objcopy)=0; /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, oid, objcopy); + t_chashInsert(oid, objcopy); } return objcopy; @@ -1133,7 +1218,7 @@ int transAbortProcess(trans_commit_data_t *transinfo) { } /*This function completes the COMMIT process if the transaction is commiting*/ -int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) { +int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { objheader_t *header, *tcptr; int i, nummod, tmpsize, numcreated, numlocked; unsigned int *oidmod, *oidcreated, *oidlocked; @@ -1152,7 +1237,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra return 1; } /* Copy from transaction cache -> main object store */ - if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) { + if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) { printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); return 1; } @@ -1174,7 +1259,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra } /* If object is newly created inside transaction then commit it */ for (i = 0; i < numcreated; i++) { - if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) { + if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) { printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); return 1; } @@ -1820,13 +1905,12 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { return status; } -void transAbort(transrecord_t *trans) { +void transAbort() { #ifdef ABORTREADERS - removetransactionhash(trans->lookupTable, trans); + removetransactionhash(); #endif - objstrDelete(trans->cache); - chashDelete(trans->lookupTable); - free(trans); + objstrDelete(t_cache); + t_chashDelete(); } /* This function inserts necessary information into diff --git a/Robust/src/Runtime/DSTM/interface/trans.h b/Robust/src/Runtime/DSTM/interface/trans.h new file mode 100644 index 00000000..2eee1d33 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/trans.h @@ -0,0 +1,11 @@ +#ifndef TRANS_H +#define TRANS_H + +extern __thread objstr_t *t_cache; +extern __thread struct ___Object___ *revertlist; +#ifdef ABORTREADERS +extern __thread int t_abort; +extern __thread jmp_buf aborttrans; +#endif + +#endif diff --git a/Robust/src/Runtime/garbage.c b/Robust/src/Runtime/garbage.c index 74a8bdd8..6fdbc27f 100644 --- a/Robust/src/Runtime/garbage.c +++ b/Robust/src/Runtime/garbage.c @@ -426,13 +426,13 @@ void checkcollect(void * ptr) { } #ifdef DSTM -void checkcollect2(void * ptr, transrecord_t *trans) { - int ptrarray[]={1, (int)ptr, (int) trans->revertlist}; +void checkcollect2(void * ptr) { + int ptrarray[]={1, (int)ptr, (int) revertlist}; struct listitem * tmp=stopforgc((struct garbagelist *)ptrarray); pthread_mutex_lock(&gclock); // Wait for GC restartaftergc(tmp); pthread_mutex_unlock(&gclock); - trans->revertlist=(struct ___Object___*)ptrarray[2]; + revertlist=(struct ___Object___*)ptrarray[2]; } #endif diff --git a/Robust/src/Runtime/runtime.c b/Robust/src/Runtime/runtime.c index 95f4d54c..665eaf2d 100644 --- a/Robust/src/Runtime/runtime.c +++ b/Robust/src/Runtime/runtime.c @@ -161,8 +161,8 @@ void CALL02(___System______rangePrefetch____L___Object_____AR_S, struct ___Objec /* Object allocation function */ #ifdef DSTM -__attribute__((malloc)) void * allocate_newglobal(transrecord_t *trans, int type) { - struct ___Object___ * v=(struct ___Object___ *) transCreateObj(trans, classsize[type]); +__attribute__((malloc)) void * allocate_newglobal(int type) { + struct ___Object___ * v=(struct ___Object___ *) transCreateObj(classsize[type]); v->type=type; #ifdef THREADS v->tid=0; @@ -174,8 +174,8 @@ __attribute__((malloc)) void * allocate_newglobal(transrecord_t *trans, int type /* Array allocation function */ -__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(transrecord_t *trans, int type, int length) { - struct ArrayObject * v=(struct ArrayObject *)transCreateObj(trans, sizeof(struct ArrayObject)+length*classsize[type]); +__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(int type, int length) { + struct ArrayObject * v=(struct ArrayObject *)transCreateObj(sizeof(struct ArrayObject)+length*classsize[type]); if (length<0) { printf("ERROR: negative array\n"); return NULL; diff --git a/Robust/src/Runtime/runtime.h b/Robust/src/Runtime/runtime.h index 20a1d5fb..fa1afc2d 100644 --- a/Robust/src/Runtime/runtime.h +++ b/Robust/src/Runtime/runtime.h @@ -27,8 +27,8 @@ extern void * curr_heaptop; #endif #ifdef DSTM -__attribute__((malloc)) void * allocate_newglobal(transrecord_t *, int type); -__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(transrecord_t *, int type, int length); +__attribute__((malloc)) void * allocate_newglobal(int type); +__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(int type, int length); #endif #ifdef PRECISE_GC diff --git a/Robust/src/Runtime/thread.c b/Robust/src/Runtime/thread.c index aa5fbadc..25362e6e 100644 --- a/Robust/src/Runtime/thread.c +++ b/Robust/src/Runtime/thread.c @@ -29,7 +29,6 @@ pthread_key_t oidval; void threadexit() { objheader_t* ptr; void *value; - transrecord_t * trans; unsigned int oidvalue; #ifdef THREADS @@ -57,12 +56,12 @@ void threadexit() { goto transstart; transstart: { - transrecord_t * trans = transStart(); - ptr = transRead(trans, oidvalue); + transStart(); + ptr = transRead(oidvalue); struct ___Thread___ *p = (struct ___Thread___ *) ptr; p->___threadDone___ = 1; *((unsigned int *)&((struct ___Object___ *) p)->___localcopy___) |=DIRTY; - if(transCommit(trans) != 0) { + if(transCommit() != 0) { goto transstart; } } @@ -149,12 +148,11 @@ void CALL00(___Thread______yield____) { void CALL01(___Thread______join____, struct ___Thread___ * ___this___) { unsigned int *oidarray; unsigned short *versionarray, version; - transrecord_t *trans; objheader_t *ptr; /* Add transaction to check if thread finished for join operation */ transstart: - trans = transStart(); - ptr = transRead(trans, (unsigned int) VAR(___this___)); + transStart(); + ptr = transRead((unsigned int) VAR(___this___)); struct ___Thread___ *p = (struct ___Thread___ *) ptr; #ifdef THREADJOINDEBUG printf("Start join process for Oid = %x\n", (unsigned int) VAR(___this___)); @@ -163,7 +161,7 @@ transstart: #ifdef THREADJOINDEBUG printf("Thread oid = %x is done\n", (unsigned int) VAR(___this___)); #endif - transAbort(trans); + transAbort(); return; } else { @@ -191,7 +189,7 @@ transstart: #endif free(oidarray); free(versionarray); - transAbort(trans); + transAbort(); goto transstart; } return; @@ -242,7 +240,6 @@ void globalDestructor(void *value) { void initDSMthread(int *ptr) { objheader_t *tmp; - transrecord_t * trans; void *threadData; int oid=ptr[0]; int type=ptr[1]; @@ -264,11 +261,11 @@ void initDSMthread(int *ptr) { goto transstart; transstart: { - transrecord_t * trans = transStart(); - tmp = transRead(trans, (unsigned int) oid); + transStart(); + tmp = transRead((unsigned int) oid); ((struct ___Thread___ *)tmp)->___threadDone___ = 1; *((unsigned int *)&((struct ___Object___ *) tmp)->___localcopy___) |=DIRTY; - if(transCommit(trans)!= 0) { + if(transCommit()!= 0) { goto transstart; } }