bug fixes and changes to the matrixmultiply benchmark
authoradash <adash>
Mon, 28 Apr 2008 22:52:12 +0000 (22:52 +0000)
committeradash <adash>
Mon, 28 Apr 2008 22:52:12 +0000 (22:52 +0000)
Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiply.java
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/objstr.c
Robust/src/Runtime/DSTM/interface/trans.c

index 4fcf3ad9ed87a80c4898432ce50b667171b6f588..c288be2ec6a3a9201064585b99db7ce30d6c8f6a 100644 (file)
@@ -11,19 +11,19 @@ public class MatrixMultiply extends Thread{
        }
 
        public void run() {
-               int localresults[][];
+               double localresults[][];
 
                atomic {
                    //compute the results
-                   localresults=new int[1+x1-x0][1+y1-y0];
+                   localresults=new double[1+x1-x0][1+y1-y0];
                
                    //Use b transpose for cache performance
                    for(int i = x0; i<= x1; i++){
-                       int a[]=mmul.a[i];
+                       double a[]=mmul.a[i];
                        int M=mmul.M;
                        for (int j = y0; j <= y1; j++) {
-                           int innerProduct=0;
-                           int b[] = mmul.btranspose[j];
+                           double innerProduct=0;
+                           double b[] = mmul.btranspose[j];
                            for(int k = 0; k < M; k++) {
                                innerProduct += a[k] *b[k];
                            }
@@ -35,7 +35,7 @@ public class MatrixMultiply extends Thread{
                atomic {
                    //write the results
                    for(int i=x0;i<=x1;i++) {
-                       int c[]=mmul.c[i];
+                       double c[]=mmul.c[i];
                        for(int j=y0;j<=y1;j++) {
                            c[j]=localresults[i-x0][j-y0];
                        }
@@ -44,10 +44,10 @@ public class MatrixMultiply extends Thread{
        }
 
        public static void main(String[] args) {
-               int mid1 = (128<<24)|(195<<16)|(175<<8)|73;
+               int mid1 = (128<<24)|(195<<16)|(175<<8)|69;
                int mid2 = (128<<24)|(195<<16)|(175<<8)|69;
                int mid3 = (128<<24)|(195<<16)|(175<<8)|71;
-               int NUM_THREADS = 1;
+               int NUM_THREADS = 2;
                int p, q, r;
                MatrixMultiply[] mm;
                MatrixMultiply tmp;
@@ -64,7 +64,8 @@ public class MatrixMultiply extends Thread{
                }
 
                atomic {
-                       mm[0] = global new MatrixMultiply(matrix,0,0,399,399);
+                       mm[0] = global new MatrixMultiply(matrix,0,0,399,200);
+                       mm[1] = global new MatrixMultiply(matrix,0,201,399,399);
                }
 
                atomic {
@@ -104,10 +105,10 @@ public class MatrixMultiply extends Thread{
                // print out the result of the matrix multiply
                System.printString("Starting\n");
                System.printString("Matrix Product c =\n");
-               int val;
+               double val;
                atomic {
                        for (int i = 0; i < p; i++) {
-                               int c[]=matrix.c[i];
+                               double c[]=matrix.c[i];
                                for (int j = 0; j < r; j++) {
                                        val = c[j];
                                }
@@ -120,44 +121,44 @@ public class MatrixMultiply extends Thread{
 public class MMul{
 
        public int L, M, N;
-       public int[][] a;
-       public int[][] b;
-       public int[][] c;
-       public int[][] btranspose;
+       public double[][] a;
+       public double[][] b;
+       public double[][] c;
+       public double[][] btranspose;
 
        public MMul(int L, int M, int N) {
                this.L = L;
                this.M = M;
                this.N = N;
-               a = global new int[L][M];  
-               b = global new int[M][N]; 
-               c = global new int[L][N]; 
-               btranspose = global new int[N][M];
+               a = global new double[L][M];  
+               b = global new double[M][N]; 
+               c = global new double[L][N]; 
+               btranspose = global new double[N][M];
        }
 
        public void setValues() {
                for(int i = 0; i < L; i++) {
-            int ai[] = a[i];
+            double ai[] = a[i];
                        for(int j = 0; j < M; j++) {
                                ai[j] = j+1;
                        }
                }
 
                for(int i = 0; i < M; i++) {
-            int bi[] = b[i];
+            double bi[] = b[i];
                        for(int j = 0; j < N; j++) {
                                bi[j] = j+1;
                        }
                }
 
                for(int i = 0; i < L; i++) {
-            int ci[] = c[i];
+            double ci[] = c[i];
                        for(int j = 0; j < N; j++) {
                                ci[j] = 0;
                        }
                }
                for(int i = 0; i < N; i++) {
-            int btransposei[] = btranspose[i];
+            double btransposei[] = btranspose[i];
                        for(int j = 0; j < M; j++) {
                                btransposei[j] = 0;
                        }
@@ -166,7 +167,7 @@ public class MMul{
 
        public void transpose() {
                for(int row = 0; row < M; row++) {
-            int brow[] = b[row];
+            double brow[] = b[row];
                        for(int col = 0; col < N; col++) {
                                btranspose[col][row] = brow[col];
                        }
index 18c1d0b800b1b994ca072cd5a01412a37df0123e..505fd3489c141320ed00a3ce60554dc59276c741 100644 (file)
@@ -14,6 +14,7 @@
 
 extern int classsize[];
 extern int numHostsInSystem;
+extern pthread_mutex_t notifymutex;
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
@@ -121,19 +122,13 @@ void *dstmAccept(void *acceptfd) {
   unsigned short objType, *versionarry, version;
   unsigned int *oidarry, numoid, mid, threadid;
   
-  /*
-  transinfo.objlocked = NULL;
-  transinfo.objnotfound = NULL;
-  transinfo.modptr = NULL;
-  transinfo.numlocked = 0;
-  transinfo.numnotfound = 0;
-  */
-  
   /* Receive control messages from other machines */
   while(1) {
     int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
-    if (ret==-1)
+    if (ret==-1) {
+      printf("DEBUG -> RECV Error!.. retrying\n");
       break;
+    }
     switch(control) {
     case READ_REQUEST:
       /* Read oid requested and search if available */
@@ -574,9 +569,11 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
                header->version += 1; 
                /* If threads are waiting on this object to be updated, notify them */
+        pthread_mutex_lock(&notifymutex);
                if(header->notifylist != NULL) {
                        notifyAll(&header->notifylist, OID(header), header->version);
                }
+        pthread_mutex_unlock(&notifymutex);
                pthread_mutex_unlock(&mainobjstore_mutex);
                offset += sizeof(objheader_t) + tmpsize;
        }
@@ -728,8 +725,8 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
        struct sockaddr_in remoteAddr;
        int bytesSent;
        int size;
-
        int i = 0;
+
        while(i < numoid) {
                oid = *(oidarry + i);
                if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
@@ -739,18 +736,21 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
                        /* Check to see if versions are same */
 checkversion:
                        if ((STATUS(header) & LOCK) != LOCK) {          
-                               //FIXME make locking atomic
+                pthread_mutex_lock(&notifymutex);
                                STATUS(header) |= LOCK;
                                newversion = header->version;
                                if(newversion == *(versionarry + i)) {
                                        //Add to the notify list 
                                        if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
                                                printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
+                        pthread_mutex_unlock(&notifymutex);
                                                return;
                                        }
                                        STATUS(header) &= ~(LOCK);              
+                    pthread_mutex_unlock(&notifymutex);
                                } else {
                                        STATUS(header) &= ~(LOCK);              
+                    pthread_mutex_unlock(&notifymutex);
                                        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
                                                perror("processReqNotify():socket()");
                                                return;
index a8910138a1e198d97052470b962b51f5e4b67540..b8279487fd79de813cea049559a7e5049a5ae816 100644 (file)
@@ -2,7 +2,11 @@
 extern objstr_t *prefetchcache;
 
 objstr_t *objstrCreate(unsigned int size) {
-  objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size));
+  objstr_t *tmp;
+  if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
+    printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+    return NULL;
+  }
   tmp->size = size;
   tmp->next = NULL;
   tmp->top = tmp + 1; //points to end of objstr_t structure!
@@ -38,7 +42,10 @@ void *objstrAlloc(objstr_t *store, unsigned int size)
                {  //end of list, all full
                        if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
                        {
-                               store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size));
+                               if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) {
+                  printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+                  return NULL;
+                }
                                if (store->next == NULL)
                                        return NULL;
                                store = store->next;
@@ -46,7 +53,10 @@ void *objstrAlloc(objstr_t *store, unsigned int size)
                        }
                        else
                        {
-                               store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE));
+                               if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) {
+                  printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+                  return NULL;
+                }
                                if (store->next == NULL)
                                        return NULL;
                                store = store->next;
index 3644d968ee5905c3bf77e5ef8136b4816ebf63ee..bbd87742e025ab01e3bb2422ca597d33c03e1b2a 100644 (file)
@@ -38,6 +38,7 @@ unsigned int oidMax;
 
 sockPoolHashTable_t *transReadSockPool;
 sockPoolHashTable_t *transPrefetchSockPool;
+pthread_mutex_t notifymutex;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -131,13 +132,6 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
   int top=endoffsets[ntuples-1];
   *((int *)(node+len))=ntuples;
   len += sizeof(int);
-  /*  int i;
-  for(i=0;i<ntuples;i++) {
-    if (oids[i]%2==0&&(oids[i]!=0)) {
-      printf("Bad oid %ld\n",oids[i]);
-    }
-    }*/
-
   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
@@ -222,6 +216,7 @@ void transInit() {
   
   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
   
+  pthread_mutex_init(&notifymutex, NULL);
   //Create prefetch cache lookup table
   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
     printf("ERROR\n");
@@ -265,7 +260,11 @@ void randomdelay() {
 
 /* This function initializes things required in the transaction start*/
 transrecord_t *transStart() {
-  transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
+  transrecord_t *tmp;
+  if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){
+    printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+    return NULL;
+  }
   tmp->cache = objstrCreate(1048576);
   tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
 #ifdef COMPILER
@@ -800,8 +799,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
     chashInsert(record->lookupTable, oid, objcopy); 
   }
   
-  //    freeSock(transReadSockPool, mnum, sd);
-  
   return objcopy;
 }
 
@@ -977,17 +974,20 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
                }
                GETSIZE(tmpsize, header);
                pthread_mutex_lock(&mainobjstore_mutex);
-               memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize);
+        char *tmptcptr = (char *) tcptr;
+               memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
                header->version += 1;
+        pthread_mutex_lock(&notifymutex);
                if(header->notifylist != NULL) {
                        notifyAll(&header->notifylist, OID(header), header->version);
                }
+        pthread_mutex_unlock(&notifymutex);
                pthread_mutex_unlock(&mainobjstore_mutex);
        }
        /* If object is newly created inside transaction then commit it */
        for (i = 0; i < numcreated; i++) {
                if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
-                       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+                       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
                        return 1;
                }
                GETSIZE(tmpsize, header);
@@ -1438,7 +1438,6 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
        pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
        notifydata_t *ndata;
 
-       //FIXME currently all oids belong to one machine
        oid = oidarry[0];
        if((mid = lhashSearch(oid)) == 0) {
                printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
@@ -1585,6 +1584,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
                        printf("notifyAll():error %d connecting to %s:%d\n", errno,
                                        inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
                        status = -1;
+            fflush(stdout);
                } else {
                        bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
                        msg[0] = THREAD_NOTIFY_RESPONSE;
@@ -1607,7 +1607,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
 }
 
 void transAbort(transrecord_t *trans) {
-       objstrDelete(trans->cache);
-       chashDelete(trans->lookupTable);
-       free(trans);
+  objstrDelete(trans->cache);
+  chashDelete(trans->lookupTable);
+  free(trans);
 }