From: adash Date: Fri, 5 Sep 2008 01:35:16 +0000 (+0000) Subject: Changes to runtime for read/write lock implementation X-Git-Tag: buildscript^6~41 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=cbd97e1f4b7c9af5e2a3a401d4c430a3556da964;p=IRC.git Changes to runtime for read/write lock implementation and modifying benchmark for dw-10 with dw-14 --- diff --git a/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3d2.java b/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3d2.java index 20edbdf3..e35e4731 100644 --- a/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3d2.java +++ b/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3d2.java @@ -61,7 +61,7 @@ public class Em3d extends Thread { Random random; String hname; - barr = new Barrier("128.195.175.79"); + barr = new Barrier("128.195.175.84"); atomic { iteration = numIter; degree = numDegree; @@ -139,10 +139,10 @@ public class Em3d extends Thread { long start0 = System.currentTimeMillis(); int numThreads = em.numThreads; int[] mid = new int[4]; - mid[0] = (128<<24)|(195<<16)|(175<<8)|79;//dw-8 - mid[1] = (128<<24)|(195<<16)|(175<<8)|80;//dw-9 - mid[2] = (128<<24)|(195<<16)|(175<<8)|78;//dw-7 - mid[3] = (128<<24)|(195<<16)|(175<<8)|73;//dw-5 + mid[0] = (128<<24)|(195<<16)|(175<<8)|84;//dw-8 + mid[1] = (128<<24)|(195<<16)|(175<<8)|85;//dw-9 + mid[2] = (128<<24)|(195<<16)|(175<<8)|86;//dw-7 + mid[3] = (128<<24)|(195<<16)|(175<<8)|87;//dw-5 System.printString("DEBUG -> numThreads = " + numThreads+"\n"); BarrierServer mybarr; diff --git a/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiplyN.java b/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiplyN.java index 236592c6..59c837a4 100644 --- a/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiplyN.java +++ b/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiplyN.java @@ -42,10 +42,10 @@ public class MatrixMultiply extends Thread{ } int[] mid = new int[4]; - mid[0] = (128<<24)|(195<<16)|(175<<8)|79; //dw-8 - mid[1] = (128<<24)|(195<<16)|(175<<8)|80; //dw-9 - mid[2] = (128<<24)|(195<<16)|(175<<8)|78; //dw-7 - mid[3] = (128<<24)|(195<<16)|(175<<8)|73; //dw-5 + mid[0] = (128<<24)|(195<<16)|(175<<8)|84; //dw-10 + mid[1] = (128<<24)|(195<<16)|(175<<8)|85; //dw-11 + mid[2] = (128<<24)|(195<<16)|(175<<8)|86; //dw-12 + mid[3] = (128<<24)|(195<<16)|(175<<8)|87; //dw-13 int p, q, r; MatrixMultiply[] mm; MatrixMultiply tmp; diff --git a/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCReadcommit.java b/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCReadcommit.java index 5954126d..ded2de86 100644 --- a/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCReadcommit.java +++ b/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCReadcommit.java @@ -25,7 +25,6 @@ public class ReadArrayObj extends Thread { Barrier.enterBarrier(barr); - /* //All machines reading data from array int val; for(int i=0; i<10000; i++) { @@ -35,7 +34,6 @@ public class ReadArrayObj extends Thread { } } } - */ } public static void main(String[] args) { diff --git a/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCWritecommit.java b/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCWritecommit.java index ed7eb6e2..f2fd6003 100644 --- a/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCWritecommit.java +++ b/Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCWritecommit.java @@ -24,7 +24,6 @@ public class WriteArrayObj extends Thread { } Barrier.enterBarrier(barr); -/* //Write into array elements Integer val; for(int j=0; j<10000; j++) { @@ -35,7 +34,6 @@ public class WriteArrayObj extends Thread { } } } - */ } public static void main(String[] args) { diff --git a/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java b/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java index 35e7fdd4..c629c046 100644 --- a/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java +++ b/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java @@ -47,10 +47,10 @@ public class JGFSORBench { BarrierServer mybarr; int[] mid = new int[4]; - mid[0] = (128<<24)|(195<<16)|(175<<8)|79;//dw-8 - mid[1] = (128<<24)|(195<<16)|(175<<8)|80;//dw-9 - mid[2] = (128<<24)|(195<<16)|(175<<8)|78;//dw-7 - mid[3] = (128<<24)|(195<<16)|(175<<8)|73;//dw-5 + mid[0] = (128<<24)|(195<<16)|(175<<8)|84;//dw-10 + mid[1] = (128<<24)|(195<<16)|(175<<8)|85;//dw-11 + mid[2] = (128<<24)|(195<<16)|(175<<8)|86;//dw-12 + mid[3] = (128<<24)|(195<<16)|(175<<8)|87;//dw-13 double[][] G; int num_iterations; @@ -109,7 +109,7 @@ public class JGFSORBench { public int JGFvalidate(){ double refval[]; - refval = new double[3]; + refval = new double[4]; refval[0] = 0.498574406322512; refval[1] = 1.1234778980135105; refval[2] = 1.9954895063582696; diff --git a/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java b/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java index 8c661af8..032cc6ed 100644 --- a/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java +++ b/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java @@ -21,7 +21,7 @@ class SORRunner extends Thread { - int id,num_iterations; + int id, num_iterations; double G[][],omega; int nthreads; @@ -38,7 +38,7 @@ class SORRunner extends Thread { double omega_over_four, one_minus_omega; int numiterations; Barrier barr; - barr = new Barrier("128.195.175.79"); + barr = new Barrier("128.195.175.84"); int ilow, iupper, slice, tslice, ttslice, Mm1, Nm1; atomic { diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index 89e549f1..318f3c0d 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -274,6 +274,7 @@ public class BuildCode { outmethod.println("printf(\"nmhashSearch= %d\\n\", nmhashSearch);"); outmethod.println("printf(\"nprehashSearch= %d\\n\", nprehashSearch);"); outmethod.println("printf(\"nRemoteReadSend= %d\\n\", nRemoteSend);"); + outmethod.println("printf(\"nSoftAbort= %d\\n\", nSoftAbort);"); outmethod.println("#endif\n"); outmethod.println("}"); @@ -735,6 +736,7 @@ public class BuildCode { outclassdefs.print("extern int nmhashSearch;\n"); outclassdefs.print("extern int nprehashSearch;\n"); outclassdefs.print("extern int nRemoteSend;\n"); + outclassdefs.print("extern int nSoftAbort;\n"); outclassdefs.print("extern void handle();\n"); outclassdefs.print("#endif\n"); outclassdefs.print("int numprefetchsites = " + pa.prefetchsiteid + ";\n"); diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 993d730e..6eb801b8 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -256,6 +256,10 @@ char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid); +void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *, + int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short); +void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, + int *, int *, char *, unsigned int, unsigned short); /* end server portion */ /* Prototypes for transactions */ @@ -294,6 +298,8 @@ int getPrefetchResponse(int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs); +void commitCountForObjRead(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); +void commitCountForObjMod(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index c1e36c8b..b5314303 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -362,12 +362,21 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, if (fixed->nummod > 0) free(modptr); /* Unlock objects that was locked due to this transaction */ + int useWriteUnlock = 0; for(i = 0; i< transinfo->numlocked; i++) { + if(transinfo->objlocked[i] == -1) { + useWriteUnlock = 1; + continue; + } if((header = mhashSearch(transinfo->objlocked[i])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address return 1; } - UnLock(STATUSPTR(header)); + if(useWriteUnlock) { + write_unlock(STATUSPTR(header)); + } else { + read_unlock(STATUSPTR(header)); + } } /* Send ack to Coordinator */ @@ -417,12 +426,11 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne char control = 0, *ptr; unsigned int oid; unsigned int *oidnotfound, *oidlocked, *oidvernotmatch; - void *mobj; objheader_t *headptr; /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); - oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int)); oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); int objnotfound = 0, objlocked = 0, objvernotmatch = 0; int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; @@ -434,61 +442,28 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < fixed->numread + fixed->nummod; i++) { - if (i < fixed->numread) { //Objs only read and not modified - int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array + if (i < fixed->numread) { //Objs only read and not modified + int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array incr *= i; oid = *((unsigned int *)(objread + incr)); incr += sizeof(unsigned int); version = *((unsigned short *)(objread + incr)); - } else { //Objs modified + getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, + &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version); + } else { //Objs modified + if(i == fixed->numread) { + oidlocked[objlocked] = -1; + objlocked++; + } int tmpsize; headptr = (objheader_t *) ptr; oid = OID(headptr); version = headptr->version; GETSIZE(tmpsize, headptr); ptr += sizeof(objheader_t) + tmpsize; - } - - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ - - if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ - /* Save the oids not found and number of oids not found for later use */ - oidnotfound[objnotfound] = oid; - objnotfound++; - } else { /* If Obj found in machine (i.e. has not moved) */ - /* Check if Obj is locked by any previous transaction */ - if (test_and_set(STATUSPTR(mobj))) { - //don't have lock - if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ - v_matchlock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - oidvernotmatch[objvernotmatch] = oid; - objvernotmatch++; - int size; - GETSIZE(size, mobj); - size += sizeof(objheader_t); - numBytes += size; - /* Send TRANS_DISAGREE to Coordinator */ - control = TRANS_DISAGREE; - } - } else { /* If Obj is not locked then lock object */ - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = OID(((objheader_t *)mobj)); - objlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - oidvernotmatch[objvernotmatch] = oid; - objvernotmatch++; - int size; - GETSIZE(size, mobj); - size += sizeof(objheader_t); - numBytes += size; - control = TRANS_DISAGREE; - } - } + getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, + &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch, + &numBytes, &control, oid, version); } } @@ -507,12 +482,21 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } #endif if (objlocked > 0) { + int useWriteUnlock = 0; for(j = 0; j < objlocked; j++) { + if(oidlocked[j] == -1) { + useWriteUnlock = 1; + continue; + } if((headptr = mhashSearch(oidlocked[j])) == NULL) { printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 0; } - UnLock(STATUSPTR(headptr)); + if(useWriteUnlock) { + write_unlock(STATUSPTR(headptr)); + } else { + read_unlock(STATUSPTR(headptr)); + } } free(oidlocked); } @@ -537,6 +521,101 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne return control; } + +/* Update Commit info for objects that are read */ +void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, + unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch, + int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes, + char *control, unsigned int oid, unsigned short version) { + void *mobj; + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[*objnotfound] = oid; + (*objnotfound)++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock + if (version == ((objheader_t *)mobj)->version) { /* match versions */ + (*v_matchnolock)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + oidvernotmatch[*objvernotmatch] = oid; + (*objvernotmatch)++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + *numBytes += size; + /* Send TRANS_DISAGREE to Coordinator */ + *control = TRANS_DISAGREE; + } + //Keep track of oid locked + oidlocked[*objlocked] = OID(((objheader_t *)mobj)); + (*objlocked)++; + } else { //we are locked + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + (*v_matchlock)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + oidvernotmatch[*objvernotmatch] = oid; + (*objvernotmatch)++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + *numBytes += size; + *control = TRANS_DISAGREE; + } + } + } +} + +/* Update Commit info for objects that are read */ +void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, + int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock, + int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) { + void *mobj; + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[*objnotfound] = oid; + (*objnotfound)++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks + if (version == ((objheader_t *)mobj)->version) { /* match versions */ + (*v_matchnolock)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + oidvernotmatch[*objvernotmatch] = oid; + (*objvernotmatch)++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + *numBytes += size; + /* Send TRANS_DISAGREE to Coordinator */ + *control = TRANS_DISAGREE; + } + //Keep track of oid locked + oidlocked[*objlocked] = OID(((objheader_t *)mobj)); + (*objlocked)++; + } else { /* Some other transaction has aquired a write lock on this object */ + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + (*v_matchlock)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + oidvernotmatch[*objvernotmatch] = oid; + (*objvernotmatch)++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + *numBytes += size; + *control = TRANS_DISAGREE; + } + } + } +} + /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT * to send to Coordinator based on the votes of oids involved in the transaction */ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, @@ -609,12 +688,22 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock free(modptr); /* Unlock locked objects */ + int useWriteUnlock = 0; for(i = 0; i < numlocked; i++) { + if(oidlocked[i] == -1) { + useWriteUnlock = 1; + continue; + } if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - UnLock(STATUSPTR(header)); + + if(useWriteUnlock) { + write_unlock(STATUSPTR(header)); + } else { + read_unlock(STATUSPTR(header)); + } } //TODO Update location lookup table @@ -762,8 +851,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short } else { /* Check to see if versions are same */ checkversion: - if (test_and_set(STATUSPTR(header))==0) { - //have lock + if (write_trylock(STATUSPTR(header))) { // Can acquire write lock newversion = header->version; if(newversion == *(versionarry + i)) { //Add to the notify list @@ -771,9 +859,9 @@ checkversion: printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); return; } - UnLock(STATUSPTR(header)); + write_unlock(STATUSPTR(header)); } else { - UnLock(STATUSPTR(header)); + write_unlock(STATUSPTR(header)); if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("processReqNotify():socket()"); return; diff --git a/Robust/src/Runtime/DSTM/interface/signal.c b/Robust/src/Runtime/DSTM/interface/signal.c index f88b06d8..9433e4fb 100644 --- a/Robust/src/Runtime/DSTM/interface/signal.c +++ b/Robust/src/Runtime/DSTM/interface/signal.c @@ -8,6 +8,7 @@ extern int nchashSearch; extern int nmhashSearch; extern int nprehashSearch; extern int nRemoteSend; +extern int nSoftAbort; extern int numprefetchsites; void handle(); extern pfcstats_t *evalPrefetch; @@ -21,6 +22,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) { printf("nmhashSearch = %d\n", nmhashSearch); printf("nprehashSearch = %d\n", nprehashSearch); printf("nRemoteReadSend = %d\n", nRemoteSend); + printf("nSoftAbort = %d\n", nSoftAbort); //TODO Remove later /* int i; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 3eaa411d..36fdcf1e 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -10,6 +10,7 @@ #include "addUdpEnhance.h" #include "addPrefetchEnhance.h" #include "gCollect.h" +#include "dsmlock.h" #ifdef COMPILER #include "thread.h" #endif @@ -54,6 +55,7 @@ int nchashSearch = 0; int nmhashSearch = 0; int nprehashSearch = 0; int nRemoteSend = 0; +int nSoftAbort = 0; void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); @@ -299,7 +301,7 @@ void randomdelay() { t = time(NULL); req.tv_sec = 0; - req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec + req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec nanosleep(&req, NULL); return; } @@ -622,6 +624,9 @@ int transCommit(transrecord_t *record) { free(thread_data_array); free(ltdata); randomdelay(); +#ifdef TRANSSTATS + nSoftAbort++; +#endif } /* Retry trans commit procedure during soft_abort case */ @@ -714,6 +719,7 @@ void *transRequest(void *threadarg) { pthread_mutex_lock(&prefetchcache_mutex); if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) { printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); pthread_exit(NULL); } pthread_mutex_unlock(&prefetchcache_mutex); @@ -958,14 +964,14 @@ void *handleLocalReq(void *threadarg) { int numread, i; unsigned int oid; unsigned short version; - void *mobj; - objheader_t *headptr; localtdata = (local_thread_data_array_t *) threadarg; /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for + //setting a divider of read locks + //and write locks numread = localtdata->tdata->buffer->f.numread; /* Process each oid in the machine pile/ group per thread */ @@ -975,8 +981,14 @@ void *handleLocalReq(void *threadarg) { incr *= i; oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); + commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version); } else { // Objects Modified + if(i == localtdata->tdata->buffer->f.numread) { + oidlocked[numoidlocked] = -1; + numoidlocked++; + } int tmpsize; + objheader_t *headptr; headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); if (headptr == NULL) { printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__); @@ -984,42 +996,11 @@ void *handleLocalReq(void *threadarg) { } oid = OID(headptr); version = headptr->version; + commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version); } - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + } - /* Save the oids not found and number of oids not found for later use */ - if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ - /* Save the oids not found and number of oids not found for later use */ - oidnotfound[numoidnotfound] = oid; - numoidnotfound++; - } else { /* If Obj found in machine (i.e. has not moved) */ - /* Check if Obj is locked by any previous transaction */ - if (test_and_set(STATUSPTR(mobj))) { - if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ - v_matchlock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - break; - } - } else { - //we're locked - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[numoidlocked] = OID(((objheader_t *)mobj)); - numoidlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - break; - } - } - } - } // End for - /* Condition to send TRANS_AGREE */ + /* Condition to send TRANS_AGREE */ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; } @@ -1035,6 +1016,7 @@ void *handleLocalReq(void *threadarg) { localtdata->transinfo->modptr = NULL; localtdata->transinfo->numlocked = numoidlocked; localtdata->transinfo->numnotfound = numoidnotfound; + /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(localtdata->tdata->lock); @@ -1048,6 +1030,7 @@ void *handleLocalReq(void *threadarg) { pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); } pthread_mutex_unlock(localtdata->tdata->lock); + if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) { if(transAbortProcess(localtdata) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); @@ -1079,6 +1062,88 @@ void *handleLocalReq(void *threadarg) { free(localtdata->transinfo->objnotfound); } pthread_exit(NULL); + +} + +/* Commit info for objects modified */ +void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound, + int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) { + void *mobj; + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + /* Save the oids not found and number of oids not found for later use */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[*numoidnotfound] = oid; + (*numoidnotfound)++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock + if (version == ((objheader_t *)mobj)->version) { /* match versions */ + (*v_matchnolock)++; + //Keep track of what is locked + oidlocked[*numoidlocked] = OID(((objheader_t *)mobj)); + (*numoidlocked)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + //Keep track of what is locked + oidlocked[*numoidlocked] = OID(((objheader_t *)mobj)); + (*numoidlocked)++; + return; + } + } else { //A lock is acquired some place else + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + (*v_matchlock)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + return; + } + } + } +} + +/* Commit info for objects modified */ +void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound, + int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) { + void *mobj; + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + /* Save the oids not found and number of oids not found for later use */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[*numoidnotfound] = oid; + (*numoidnotfound)++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ + (*v_matchnolock)++; + //Keep track of what is locked + oidlocked[*numoidlocked] = OID(((objheader_t *)mobj)); + (*numoidlocked)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + //Keep track of what is locked + oidlocked[*numoidlocked] = OID(((objheader_t *)mobj)); + (*numoidlocked)++; + return; + } + } else { //Has reached max number of readers or some other transaction + //has acquired a lock on this object + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + (*v_matchlock)++; + } else { /* If versions don't match ...HARD ABORT */ + (*v_nomatch)++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + return; + } + } + } } /* This function completes the ABORT process if the transaction is aborting */ @@ -1090,12 +1155,21 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { numlocked = localtdata->transinfo->numlocked; objlocked = localtdata->transinfo->objlocked; + int useWriteUnlock = 0; for (i = 0; i < numlocked; i++) { + if(objlocked[i] == -1) { + useWriteUnlock = 1; + continue; + } if((header = mhashSearch(objlocked[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - UnLock(STATUSPTR(header)); + if(!useWriteUnlock) { + read_unlock(STATUSPTR(header)); + } else { + write_unlock(STATUSPTR(header)); + } } return 0; @@ -1148,19 +1222,29 @@ int transComProcess(local_thread_data_array_t *localtdata) { return 1; } pthread_mutex_unlock(&mainobjstore_mutex); + /* Initialize read and write locks */ + initdsmlocks(STATUSPTR(header)); memcpy(ptrcreate, header, tmpsize); mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); } /* Unlock locked objects */ + int useWriteUnlock = 0; for(i = 0; i < numlocked; i++) { + if(oidlocked[i] == -1) { + useWriteUnlock = 1; + continue; + } if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - UnLock(STATUSPTR(header)); + if(!useWriteUnlock) { + read_unlock(STATUSPTR(header)); + } else { + write_unlock(STATUSPTR(header)); + } } - return 0; } @@ -1847,5 +1931,6 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi /* Clear Flags */ STATUS(headeraddr) =0; + return pile; } diff --git a/Robust/src/buildscript b/Robust/src/buildscript index 30c516d8..f24420c1 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -360,7 +360,7 @@ if $TRANSSTATSFLAG then EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DTRANSSTATS -DCOMPILER -DDSTM -I$DSMRUNTIME" fi -FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c $DSMRUNTIME/signal.c $DSMRUNTIME/gCollect.c $DSMRUNTIME/addPrefetchEnhance.c" +FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c $DSMRUNTIME/signal.c $DSMRUNTIME/gCollect.c $DSMRUNTIME/addPrefetchEnhance.c $DSMRUNTIME/dsmlock.c" fi if $RECOVERFLAG