From fea2fabcbe6f13a8790a603c093f09c79b1d4ccd Mon Sep 17 00:00:00 2001 From: adash Date: Sat, 9 Aug 2008 22:48:47 +0000 Subject: [PATCH] bug fixes changes to trans.c to get object type without doing a prefetch but a transRead instead --- .../Benchmarks/Prefetch/Em3d/dsm/Node2.java | 2 +- .../Prefetch/SOR/dsm/SORRunner.java | 19 +++--- Robust/src/ClassLibrary/Barrier.java | 10 ++-- .../DSTM/interface/addPrefetchEnhance.c | 7 ++- Robust/src/Runtime/DSTM/interface/gCollect.h | 2 +- Robust/src/Runtime/DSTM/interface/trans.c | 58 +++++++++++++------ Robust/src/Runtime/socket.c | 12 ++-- Robust/src/Runtime/thread.c | 8 ++- 8 files changed, 77 insertions(+), 41 deletions(-) diff --git a/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Node2.java b/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Node2.java index 528ffb67..b8616b84 100644 --- a/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Node2.java +++ b/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Node2.java @@ -60,7 +60,7 @@ public class Node { index = rand.nextInt(); if (index < 0) index = -index; //local vs remote from em3d benchmark - if ((rand.nextInt()%4)==0) + if ((rand.nextInt()%2)==0) index=index%nodeTable.length; else index=begin+(index%(end-begin)); diff --git a/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java b/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java index 05ff5f9e..d1f4a4b8 100644 --- a/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java +++ b/Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java @@ -42,7 +42,6 @@ class SORRunner extends Thread { int ilow, iupper, slice, tslice, ttslice, Mm1, Nm1; atomic { - System.printString("Inside atomic 1\n"); N = M = G.length; omega_over_four = omega * 0.25; @@ -59,19 +58,23 @@ class SORRunner extends Thread { iupper = ((tmpid+1)*slice)+1; if (iupper > Mm1) iupper = Mm1+1; if (tmpid == (numthreads-1)) iupper = Mm1+1; + G[0]=global new double[N]; for(int i=ilow;irec->lookupTable, oid); + objheader_t * header; + if((header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid)) == NULL) { + printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__); + fflush(stdout); + return -1; + } //copy into prefetch cache int size; GETSIZE(size, header); diff --git a/Robust/src/Runtime/DSTM/interface/gCollect.h b/Robust/src/Runtime/DSTM/interface/gCollect.h index f45075a1..3bf6a71c 100644 --- a/Robust/src/Runtime/DSTM/interface/gCollect.h +++ b/Robust/src/Runtime/DSTM/interface/gCollect.h @@ -6,7 +6,7 @@ /*********************************** ****** Global constants ********** **********************************/ -#define PREFETCH_FLUSH_COUNT_THRESHOLD 20 +#define PREFETCH_FLUSH_COUNT_THRESHOLD 30 /********************************* ********* Global variables ****** diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 04dc63ce..83417510 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -68,7 +68,7 @@ void send_data(int fd , void *buf, int buflen) { numbytes = send(fd, buffer, size, MSG_NOSIGNAL); if (numbytes == -1) { perror("send"); - exit(-1); + return; } buffer += numbytes; size -= numbytes; @@ -83,7 +83,7 @@ void recv_data(int fd , void *buf, int buflen) { numbytes = recv(fd, buffer, size, 0); if (numbytes == -1) { perror("recv"); - exit(-1); + return; } buffer += numbytes; size -= numbytes; @@ -693,7 +693,10 @@ void *transRequest(void *threadarg) { /* Send objects that are modified */ for(i = 0; i < tdata->buffer->f.nummod ; i++) { int size; - headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) { + printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); + } GETSIZE(size,headeraddr); size+=sizeof(objheader_t); send_data(sd, headeraddr, size); @@ -946,8 +949,8 @@ void *handleLocalReq(void *threadarg) { int tmpsize; headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); if (headptr == NULL) { - printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__); - return NULL; + printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__); + return NULL; } oid = OID(headptr); version = headptr->version; @@ -1378,18 +1381,39 @@ unsigned short getObjType(unsigned int oid) { objheader_t *objheader; unsigned short numoffset[] ={0}; short fieldoffset[] ={}; - + if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) { - if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { - prefetch(0, 1, &oid, numoffset, fieldoffset); - pthread_mutex_lock(&pflookup.lock); - while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { - pthread_cond_wait(&pflookup.cond, &pflookup.lock); - } - pthread_mutex_unlock(&pflookup.lock); + if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { + unsigned int mid = lhashSearch(oid); + int sd = getSock2(transReadSockPool, mid); + char remotereadrequest[sizeof(char)+sizeof(unsigned int)]; + remotereadrequest[0] = READ_REQUEST; + *((unsigned int *)(&remotereadrequest[1])) = oid; + send_data(sd, remotereadrequest, sizeof(remotereadrequest)); + + /* Read response from the Participant */ + char control; + recv_data(sd, &control, sizeof(char)); + + if (control==OBJECT_NOT_FOUND) { + printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); + fflush(stdout); + exit(-1); + } else { + /* Read object if found into local cache */ + int size; + recv_data(sd, &size, sizeof(int)); + pthread_mutex_lock(&prefetchcache_mutex); + if ((objheader = prefetchobjstrAlloc(size)) == NULL) { + printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); + } + pthread_mutex_unlock(&prefetchcache_mutex); + recv_data(sd, objheader, size); + prehashInsert(oid, objheader); } + } } - return TYPE(objheader); } @@ -1495,10 +1519,6 @@ int processConfigFile() #else myIpAddr = getMyIpAddr("eth0"); #endif - -#ifdef CHECKTA - printf("My ip address = %x", myIpAddr); -#endif myIndexInHostArray = findHost(myIpAddr); if (myIndexInHostArray == -1) { @@ -1669,7 +1689,7 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { return; } else { if(version <= ndata->versionarry[index]){ - printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__); + printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__); return; } else { /* Clear from prefetch cache and free thread related data structure */ diff --git a/Robust/src/Runtime/socket.c b/Robust/src/Runtime/socket.c index 43fb285b..bb84411d 100644 --- a/Robust/src/Runtime/socket.c +++ b/Robust/src/Runtime/socket.c @@ -25,7 +25,7 @@ int CALL24(___Socket______nativeConnect____I__AR_B_I, int ___fd___, int ___port_ sin.sin_family= AF_INET; sin.sin_port=htons(___port___); sin.sin_addr.s_addr=htonl(*(int *)(((char *)&VAR(___address___)->___length___)+sizeof(int))); -#if defined(THREADS)||define(DSTM) +#if defined(THREADS)||defined(DSTM) #ifdef PRECISE_GC struct listitem *tmp=stopforgc((struct garbagelist *)___params___); #endif @@ -33,7 +33,7 @@ int CALL24(___Socket______nativeConnect____I__AR_B_I, int ___fd___, int ___port_ do { rc = connect(___fd___, (struct sockaddr *) &sin, sizeof(sin)); } while (rc<0 && errno==EINTR); /* repeat if interrupted */ -#if defined(THREADS)||define(DSTM) +#if defined(THREADS)||defined(DSTM) #ifdef PRECISE_GC restartaftergc(tmp); #endif @@ -289,13 +289,13 @@ int CALL02(___ServerSocket______nativeaccept____L___Socket___,struct ___ServerSo unsigned int sinlen=sizeof(sin); int fd=VAR(___this___)->___fd___; int newfd; -#if defined(THREADS)||define(DSTM) +#if defined(THREADS)||defined(DSTM) #ifdef PRECISE_GC struct listitem *tmp=stopforgc((struct garbagelist *)___params___); #endif #endif newfd=accept(fd, (struct sockaddr *)&sin, &sinlen); -#if defined(THREADS)||define(DSTM) +#if defined(THREADS)||defined(DSTM) #ifdef PRECISE_GC restartaftergc(tmp); #endif @@ -363,7 +363,7 @@ int CALL02(___Socket______nativeRead_____AR_B, struct ___Socket___ * ___this___, char * charstr=malloc(length); -#if defined(THREADS)||define(DSTM) +#if defined(THREADS)||defined(DSTM) #ifdef PRECISE_GC struct listitem *tmp=stopforgc((struct garbagelist *)___params___); #endif @@ -374,7 +374,7 @@ int CALL02(___Socket______nativeRead_____AR_B, struct ___Socket___ * ___this___, do { byteread=read(fd, charstr, length); } while(byteread==-1&&errno==EINTR); -#if defined(THREADS)||define(DSTM) +#if defined(THREADS)||defined(DSTM) #ifdef PRECISE_GC restartaftergc(tmp); #endif diff --git a/Robust/src/Runtime/thread.c b/Robust/src/Runtime/thread.c index a737e8df..f7855d71 100644 --- a/Robust/src/Runtime/thread.c +++ b/Robust/src/Runtime/thread.c @@ -138,7 +138,7 @@ void CALL11(___Thread______sleep____J, long long ___millis___, long long ___mill #endif } -#if defined(DSTM)||defined(THREADS) +#if defined(DSTM)|| defined(THREADS) void CALL00(___Thread______yield____) { pthread_yield(); } @@ -157,7 +157,13 @@ transstart: trans = transStart(); ptr = transRead(trans, (unsigned int) VAR(___this___)); struct ___Thread___ *p = (struct ___Thread___ *) ptr; +#ifdef THREADJOINDEBUG + printf("Start join process for Oid = %x\n", (unsigned int) VAR(___this___)); +#endif if(p->___threadDone___ == 1) { +#ifdef THREADJOINDEBUG + printf("Thread oid = %x is done\n", (unsigned int) VAR(___this___)); +#endif transAbort(trans); return; } else { -- 2.34.1