bug fixes
authoradash <adash>
Sat, 9 Aug 2008 22:48:47 +0000 (22:48 +0000)
committeradash <adash>
Sat, 9 Aug 2008 22:48:47 +0000 (22:48 +0000)
changes to trans.c to get object type without doing a prefetch but a transRead instead

Robust/src/Benchmarks/Prefetch/Em3d/dsm/Node2.java
Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java
Robust/src/ClassLibrary/Barrier.java
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface/gCollect.h
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/socket.c
Robust/src/Runtime/thread.c

index 528ffb6760c515bc9af7eff9bb79e4ac5ce024a9..b8616b84c50736be63aea43d9c46ea05700ee99b 100644 (file)
@@ -60,7 +60,7 @@ public class Node {
                index = rand.nextInt();
                if (index < 0) index = -index;
                //local vs remote from em3d benchmark
-               if ((rand.nextInt()%4)==0)
+               if ((rand.nextInt()%2)==0)
                    index=index%nodeTable.length;
                else
                    index=begin+(index%(end-begin));
index 05ff5f9e2926d23900d3719d126f7a9327e8fdfe..d1f4a4b8aef62ab681d2d003a31d7eda924657c3 100644 (file)
@@ -42,7 +42,6 @@ class SORRunner extends Thread {
     int ilow, iupper, slice, tslice, ttslice, Mm1, Nm1;
 
     atomic {
-      System.printString("Inside atomic 1\n");
       N = M = G.length;
       
       omega_over_four = omega * 0.25;
@@ -59,19 +58,23 @@ class SORRunner extends Thread {
       iupper = ((tmpid+1)*slice)+1;
       if (iupper > Mm1) iupper =  Mm1+1;
       if (tmpid == (numthreads-1)) iupper = Mm1+1;
+      G[0]=global new double[N];
       for(int i=ilow;i<iupper;i++) {
-         G[i]=global new double[N];
+        G[i]=global new double[N];
       }
     }
 
     Barrier.enterBarrier(barr);
     atomic {
-       Random rand=new Random();
-       for(int i=ilow;i<iupper;i++) {
-           double[] R=G[i];
-           for(int j=0;j<M;j++)
-               R[j]=rand.nextDouble() * 1e-6;
-       }
+      Random rand=new Random();
+      double[] R = G[0];
+      for(int j=0;j<M;j++)
+        R[j]=rand.nextDouble() * 1e-6;
+      for(int i=ilow;i<iupper;i++) {
+        R=G[i];
+        for(int j=0;j<M;j++)
+          R[j]=rand.nextDouble() * 1e-6;
+      }
     }
     Barrier.enterBarrier(barr);
 
index c1892564872de2376784e6cd706fb087f1051249..4414cd25867d17298642fd8bacb47026a0e6368e 100644 (file)
@@ -23,13 +23,14 @@ public class BarrierServer extends Thread {
            for(int j=0;j<n;j++) {
                Socket s=ar[j];
                byte b[]=new byte[1];
-               while(s.read(b)!=1)
+               while(s.read(b)!=1) {
                    ;
+        }
            }
            byte b[]=new byte[1];
            b[0]= (byte) 'A';
            for(int j=0;j<n;j++)
-               ar[j].write(b);
+          ar[j].write(b);
        }
     }
 }
@@ -44,7 +45,8 @@ public class Barrier {
        byte b[]=new byte[1];
        b[0]=(byte)'A';
        barr.s.write(b);
-       while(barr.s.read(b)!=1)
-           ;
+       while(barr.s.read(b)!=1) {
+      ;
+    }
     }
 }
index 317f30fcc10ed739fe925f3770d796f00c676886..30da7aa2309b77b6ea12e42f4ed2c72ef0ff07a7 100644 (file)
@@ -111,7 +111,12 @@ int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata,
       oid = oidarray[i];
     }
     pthread_mutex_lock(&prefetchcache_mutex);
-    objheader_t *header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); 
+    objheader_t * header;
+    if((header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid)) == NULL) {
+      printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__);
+      fflush(stdout);
+      return -1;
+    }
     //copy into prefetch cache
     int size;
     GETSIZE(size, header);
index f45075a12dca5154e4afb1a689d381f120ebb0e2..3bf6a71c7982630098c8e2d0bb525da6b1f19d65 100644 (file)
@@ -6,7 +6,7 @@
 /***********************************
  ****** Global constants **********
  **********************************/
-#define PREFETCH_FLUSH_COUNT_THRESHOLD 20
+#define PREFETCH_FLUSH_COUNT_THRESHOLD 30
 
 /*********************************
  ********* Global variables ******
index 04dc63cef4a2e44d4056acacf1880460a6909391..834175108e3be0240e926fd7840e0d0beb969e41 100644 (file)
@@ -68,7 +68,7 @@ void send_data(int fd , void *buf, int buflen) {
     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
     if (numbytes == -1) {
       perror("send");
-      exit(-1);
+      return;
     }
     buffer += numbytes;
     size -= numbytes;
@@ -83,7 +83,7 @@ void recv_data(int fd , void *buf, int buflen) {
     numbytes = recv(fd, buffer, size, 0);
     if (numbytes == -1) {
       perror("recv");
-      exit(-1);
+      return;
     }
     buffer += numbytes;
     size -= numbytes;
@@ -693,7 +693,10 @@ void *transRequest(void *threadarg) {
   /* Send objects that are modified */
   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
     int size;
-    headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+    if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
+      printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+      pthread_exit(NULL);
+    }
     GETSIZE(size,headeraddr);
     size+=sizeof(objheader_t);
     send_data(sd, headeraddr, size);
@@ -946,8 +949,8 @@ void *handleLocalReq(void *threadarg) {
       int tmpsize;
       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
       if (headptr == NULL) {
-       printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
-       return NULL;
+        printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+        return NULL;
       }
       oid = OID(headptr);
       version = headptr->version;
@@ -1378,18 +1381,39 @@ unsigned short getObjType(unsigned int oid) {
   objheader_t *objheader;
   unsigned short numoffset[] ={0};
   short fieldoffset[] ={};
-  
+
   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
-      if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
-       prefetch(0, 1, &oid, numoffset, fieldoffset);
-       pthread_mutex_lock(&pflookup.lock);
-       while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
-         pthread_cond_wait(&pflookup.cond, &pflookup.lock);
-       }
-       pthread_mutex_unlock(&pflookup.lock);
+    if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+      unsigned int mid = lhashSearch(oid);
+      int sd = getSock2(transReadSockPool, mid);
+      char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
+      remotereadrequest[0] = READ_REQUEST;
+      *((unsigned int *)(&remotereadrequest[1])) = oid;
+      send_data(sd, remotereadrequest, sizeof(remotereadrequest));
+
+      /* Read response from the Participant */
+      char control;
+      recv_data(sd, &control, sizeof(char));
+
+      if (control==OBJECT_NOT_FOUND) {
+        printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+        fflush(stdout);
+        exit(-1);
+      } else {
+        /* Read object if found into local cache */
+        int size;
+        recv_data(sd, &size, sizeof(int));
+        pthread_mutex_lock(&prefetchcache_mutex);
+        if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
+          printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+          pthread_exit(NULL);
+        }
+        pthread_mutex_unlock(&prefetchcache_mutex);
+        recv_data(sd, objheader, size);
+        prehashInsert(oid, objheader);
       }
+    }
   }
-  
   return TYPE(objheader);
 }
 
@@ -1495,10 +1519,6 @@ int processConfigFile()
 #else
        myIpAddr = getMyIpAddr("eth0");
 #endif
-
-#ifdef CHECKTA
-    printf("My ip address = %x", myIpAddr);
-#endif
        myIndexInHostArray = findHost(myIpAddr);
        if (myIndexInHostArray == -1)
        {
@@ -1669,7 +1689,7 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
                        return;
                } else {
                        if(version <= ndata->versionarry[index]){
-                               printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
+                               printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
                                return;
                        } else {
                                /* Clear from prefetch cache and free thread related data structure */
index 43fb285b5c6ef84bdbf271d3dbf8051d6c5381a1..bb84411ddf125d2f02b3105597f6f86542994e0a 100644 (file)
@@ -25,7 +25,7 @@ int CALL24(___Socket______nativeConnect____I__AR_B_I, int ___fd___, int ___port_
   sin.sin_family= AF_INET;
   sin.sin_port=htons(___port___);
   sin.sin_addr.s_addr=htonl(*(int *)(((char *)&VAR(___address___)->___length___)+sizeof(int)));
-#if defined(THREADS)||define(DSTM)
+#if defined(THREADS)||defined(DSTM)
 #ifdef PRECISE_GC
   struct listitem *tmp=stopforgc((struct garbagelist *)___params___);
 #endif
@@ -33,7 +33,7 @@ int CALL24(___Socket______nativeConnect____I__AR_B_I, int ___fd___, int ___port_
   do {
     rc = connect(___fd___, (struct sockaddr *) &sin, sizeof(sin));
   } while (rc<0 && errno==EINTR); /* repeat if interrupted */
-#if defined(THREADS)||define(DSTM)
+#if defined(THREADS)||defined(DSTM)
 #ifdef PRECISE_GC
   restartaftergc(tmp);
 #endif
@@ -289,13 +289,13 @@ int CALL02(___ServerSocket______nativeaccept____L___Socket___,struct ___ServerSo
   unsigned int sinlen=sizeof(sin);
   int fd=VAR(___this___)->___fd___;
   int newfd;
-#if defined(THREADS)||define(DSTM)
+#if defined(THREADS)||defined(DSTM)
 #ifdef PRECISE_GC
   struct listitem *tmp=stopforgc((struct garbagelist *)___params___);
 #endif
 #endif
   newfd=accept(fd, (struct sockaddr *)&sin, &sinlen);
-#if defined(THREADS)||define(DSTM)
+#if defined(THREADS)||defined(DSTM)
 #ifdef PRECISE_GC
   restartaftergc(tmp);
 #endif
@@ -363,7 +363,7 @@ int CALL02(___Socket______nativeRead_____AR_B, struct ___Socket___ * ___this___,
 
   char * charstr=malloc(length);
   
-#if defined(THREADS)||define(DSTM)
+#if defined(THREADS)||defined(DSTM)
 #ifdef PRECISE_GC
   struct listitem *tmp=stopforgc((struct garbagelist *)___params___);
 #endif
@@ -374,7 +374,7 @@ int CALL02(___Socket______nativeRead_____AR_B, struct ___Socket___ * ___this___,
   do {
     byteread=read(fd, charstr, length);
   } while(byteread==-1&&errno==EINTR);
-#if defined(THREADS)||define(DSTM)
+#if defined(THREADS)||defined(DSTM)
 #ifdef PRECISE_GC
   restartaftergc(tmp);
 #endif
index a737e8df5a31853983fe45d738ad0a3e4f9c4978..f7855d717b71876ead15408f78dfd5e0e02acf66 100644 (file)
@@ -138,7 +138,7 @@ void CALL11(___Thread______sleep____J, long long ___millis___, long long ___mill
 #endif
 }
 
-#if defined(DSTM)||defined(THREADS)
+#if defined(DSTM)|| defined(THREADS)
 void CALL00(___Thread______yield____) {
   pthread_yield();
 }
@@ -157,7 +157,13 @@ transstart:
   trans = transStart();
   ptr = transRead(trans, (unsigned int) VAR(___this___));
   struct ___Thread___ *p = (struct ___Thread___ *) ptr;
+#ifdef THREADJOINDEBUG
+  printf("Start join process for Oid = %x\n", (unsigned int) VAR(___this___));
+#endif
   if(p->___threadDone___ == 1) {
+#ifdef THREADJOINDEBUG
+    printf("Thread oid = %x is done\n", (unsigned int) VAR(___this___));
+#endif
          transAbort(trans);
          return;
   } else {