From: adash Date: Mon, 28 Apr 2008 22:52:12 +0000 (+0000) Subject: bug fixes and changes to the matrixmultiply benchmark X-Git-Tag: preEdgeChange~128 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=5d1ad7810fffe8db0309e2c31ca4383e87de3776;p=IRC.git bug fixes and changes to the matrixmultiply benchmark --- diff --git a/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiply.java b/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiply.java index 4fcf3ad9..c288be2e 100644 --- a/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiply.java +++ b/Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiply.java @@ -11,19 +11,19 @@ public class MatrixMultiply extends Thread{ } public void run() { - int localresults[][]; + double localresults[][]; atomic { //compute the results - localresults=new int[1+x1-x0][1+y1-y0]; + localresults=new double[1+x1-x0][1+y1-y0]; //Use b transpose for cache performance for(int i = x0; i<= x1; i++){ - int a[]=mmul.a[i]; + double a[]=mmul.a[i]; int M=mmul.M; for (int j = y0; j <= y1; j++) { - int innerProduct=0; - int b[] = mmul.btranspose[j]; + double innerProduct=0; + double b[] = mmul.btranspose[j]; for(int k = 0; k < M; k++) { innerProduct += a[k] *b[k]; } @@ -35,7 +35,7 @@ public class MatrixMultiply extends Thread{ atomic { //write the results for(int i=x0;i<=x1;i++) { - int c[]=mmul.c[i]; + double c[]=mmul.c[i]; for(int j=y0;j<=y1;j++) { c[j]=localresults[i-x0][j-y0]; } @@ -44,10 +44,10 @@ public class MatrixMultiply extends Thread{ } public static void main(String[] args) { - int mid1 = (128<<24)|(195<<16)|(175<<8)|73; + int mid1 = (128<<24)|(195<<16)|(175<<8)|69; int mid2 = (128<<24)|(195<<16)|(175<<8)|69; int mid3 = (128<<24)|(195<<16)|(175<<8)|71; - int NUM_THREADS = 1; + int NUM_THREADS = 2; int p, q, r; MatrixMultiply[] mm; MatrixMultiply tmp; @@ -64,7 +64,8 @@ public class MatrixMultiply extends Thread{ } atomic { - mm[0] = global new MatrixMultiply(matrix,0,0,399,399); + mm[0] = global new MatrixMultiply(matrix,0,0,399,200); + mm[1] = global new MatrixMultiply(matrix,0,201,399,399); } atomic { @@ -104,10 +105,10 @@ public class MatrixMultiply extends Thread{ // print out the result of the matrix multiply System.printString("Starting\n"); System.printString("Matrix Product c =\n"); - int val; + double val; atomic { for (int i = 0; i < p; i++) { - int c[]=matrix.c[i]; + double c[]=matrix.c[i]; for (int j = 0; j < r; j++) { val = c[j]; } @@ -120,44 +121,44 @@ public class MatrixMultiply extends Thread{ public class MMul{ public int L, M, N; - public int[][] a; - public int[][] b; - public int[][] c; - public int[][] btranspose; + public double[][] a; + public double[][] b; + public double[][] c; + public double[][] btranspose; public MMul(int L, int M, int N) { this.L = L; this.M = M; this.N = N; - a = global new int[L][M]; - b = global new int[M][N]; - c = global new int[L][N]; - btranspose = global new int[N][M]; + a = global new double[L][M]; + b = global new double[M][N]; + c = global new double[L][N]; + btranspose = global new double[N][M]; } public void setValues() { for(int i = 0; i < L; i++) { - int ai[] = a[i]; + double ai[] = a[i]; for(int j = 0; j < M; j++) { ai[j] = j+1; } } for(int i = 0; i < M; i++) { - int bi[] = b[i]; + double bi[] = b[i]; for(int j = 0; j < N; j++) { bi[j] = j+1; } } for(int i = 0; i < L; i++) { - int ci[] = c[i]; + double ci[] = c[i]; for(int j = 0; j < N; j++) { ci[j] = 0; } } for(int i = 0; i < N; i++) { - int btransposei[] = btranspose[i]; + double btransposei[] = btranspose[i]; for(int j = 0; j < M; j++) { btransposei[j] = 0; } @@ -166,7 +167,7 @@ public class MMul{ public void transpose() { for(int row = 0; row < M; row++) { - int brow[] = b[row]; + double brow[] = b[row]; for(int col = 0; col < N; col++) { btranspose[col][row] = brow[col]; } diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 18c1d0b8..505fd348 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -14,6 +14,7 @@ extern int classsize[]; extern int numHostsInSystem; +extern pthread_mutex_t notifymutex; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; @@ -121,19 +122,13 @@ void *dstmAccept(void *acceptfd) { unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; - /* - transinfo.objlocked = NULL; - transinfo.objnotfound = NULL; - transinfo.modptr = NULL; - transinfo.numlocked = 0; - transinfo.numnotfound = 0; - */ - /* Receive control messages from other machines */ while(1) { int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); - if (ret==-1) + if (ret==-1) { + printf("DEBUG -> RECV Error!.. retrying\n"); break; + } switch(control) { case READ_REQUEST: /* Read oid requested and search if available */ @@ -574,9 +569,11 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); header->version += 1; /* If threads are waiting on this object to be updated, notify them */ + pthread_mutex_lock(¬ifymutex); if(header->notifylist != NULL) { notifyAll(&header->notifylist, OID(header), header->version); } + pthread_mutex_unlock(¬ifymutex); pthread_mutex_unlock(&mainobjstore_mutex); offset += sizeof(objheader_t) + tmpsize; } @@ -728,8 +725,8 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short struct sockaddr_in remoteAddr; int bytesSent; int size; - int i = 0; + while(i < numoid) { oid = *(oidarry + i); if((header = (objheader_t *) mhashSearch(oid)) == NULL) { @@ -739,18 +736,21 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short /* Check to see if versions are same */ checkversion: if ((STATUS(header) & LOCK) != LOCK) { - //FIXME make locking atomic + pthread_mutex_lock(¬ifymutex); STATUS(header) |= LOCK; newversion = header->version; if(newversion == *(versionarry + i)) { //Add to the notify list if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) { printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(¬ifymutex); return; } STATUS(header) &= ~(LOCK); + pthread_mutex_unlock(¬ifymutex); } else { STATUS(header) &= ~(LOCK); + pthread_mutex_unlock(¬ifymutex); if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ perror("processReqNotify():socket()"); return; diff --git a/Robust/src/Runtime/DSTM/interface/objstr.c b/Robust/src/Runtime/DSTM/interface/objstr.c index a8910138..b8279487 100644 --- a/Robust/src/Runtime/DSTM/interface/objstr.c +++ b/Robust/src/Runtime/DSTM/interface/objstr.c @@ -2,7 +2,11 @@ extern objstr_t *prefetchcache; objstr_t *objstrCreate(unsigned int size) { - objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size)); + objstr_t *tmp; + if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) { + printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); + return NULL; + } tmp->size = size; tmp->next = NULL; tmp->top = tmp + 1; //points to end of objstr_t structure! @@ -38,7 +42,10 @@ void *objstrAlloc(objstr_t *store, unsigned int size) { //end of list, all full if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects { - store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size)); + if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) { + printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); + return NULL; + } if (store->next == NULL) return NULL; store = store->next; @@ -46,7 +53,10 @@ void *objstrAlloc(objstr_t *store, unsigned int size) } else { - store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE)); + if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) { + printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); + return NULL; + } if (store->next == NULL) return NULL; store = store->next; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 3644d968..bbd87742 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -38,6 +38,7 @@ unsigned int oidMax; sockPoolHashTable_t *transReadSockPool; sockPoolHashTable_t *transPrefetchSockPool; +pthread_mutex_t notifymutex; void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); @@ -131,13 +132,6 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short int top=endoffsets[ntuples-1]; *((int *)(node+len))=ntuples; len += sizeof(int); - /* int i; - for(i=0;icache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); #ifdef COMPILER @@ -800,8 +799,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { chashInsert(record->lookupTable, oid, objcopy); } - // freeSock(transReadSockPool, mnum, sd); - return objcopy; } @@ -977,17 +974,20 @@ int transComProcess(local_thread_data_array_t *localtdata) { } GETSIZE(tmpsize, header); pthread_mutex_lock(&mainobjstore_mutex); - memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize); + char *tmptcptr = (char *) tcptr; + memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize); header->version += 1; + pthread_mutex_lock(¬ifymutex); if(header->notifylist != NULL) { notifyAll(&header->notifylist, OID(header), header->version); } + pthread_mutex_unlock(¬ifymutex); pthread_mutex_unlock(&mainobjstore_mutex); } /* If object is newly created inside transaction then commit it */ for (i = 0; i < numcreated; i++) { if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { - printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); + printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); return 1; } GETSIZE(tmpsize, header); @@ -1438,7 +1438,6 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER; notifydata_t *ndata; - //FIXME currently all oids belong to one machine oid = oidarry[0]; if((mid = lhashSearch(oid)) == 0) { printf("Error: %s() No such machine found for oid =%x\n",__func__, oid); @@ -1585,6 +1584,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { printf("notifyAll():error %d connecting to %s:%d\n", errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); status = -1; + fflush(stdout); } else { bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); msg[0] = THREAD_NOTIFY_RESPONSE; @@ -1607,7 +1607,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { } void transAbort(transrecord_t *trans) { - objstrDelete(trans->cache); - chashDelete(trans->lookupTable); - free(trans); + objstrDelete(trans->cache); + chashDelete(trans->lookupTable); + free(trans); }