From 1e23ae339caa2709f28af2d22e6b34c2c0b01abb Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 10 Jul 2008 00:52:54 +0000 Subject: [PATCH] bug fixes for udp broadcast flags and code added to collect transaction commit+abort statistics --- Robust/src/ClassLibrary/Signal.java | 8 ++ Robust/src/IR/Flat/BuildCode.java | 6 +- Robust/src/Main/Main.java | 1 + .../Runtime/DSTM/interface/addUdpEnhance.c | 118 +++++++++++------- .../Runtime/DSTM/interface/addUdpEnhance.h | 3 +- Robust/src/Runtime/DSTM/interface/queue.c | 7 +- Robust/src/Runtime/DSTM/interface/trans.c | 37 ++++-- Robust/src/Runtime/runtime.c | 1 - Robust/src/Runtime/signal.c | 27 ++++ Robust/src/buildscript | 10 ++ 10 files changed, 155 insertions(+), 63 deletions(-) create mode 100644 Robust/src/ClassLibrary/Signal.java create mode 100644 Robust/src/Runtime/signal.c diff --git a/Robust/src/ClassLibrary/Signal.java b/Robust/src/ClassLibrary/Signal.java new file mode 100644 index 00000000..85e88325 --- /dev/null +++ b/Robust/src/ClassLibrary/Signal.java @@ -0,0 +1,8 @@ +public class Signal { + public Signal() { + } + public native void nativeSigAction(); + public void sigAction() { + nativeSigAction(); + } +} diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index f5080288..fe3dc9a9 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -262,8 +262,10 @@ public class BuildCode { outmethod.println("pthread_exit(NULL);"); } - + outmethod.println("printf(\"numTransAbort= %d\\n\", numTransAbort);"); + outmethod.println("printf(\"numTransCommit= %d\\n\", numTransCommit);"); outmethod.println("}"); + } /* This method outputs code for each task. */ @@ -717,6 +719,8 @@ public class BuildCode { * information. */ private void generateSizeArray(PrintWriter outclassdefs) { + outclassdefs.print("int numTransAbort = 0;"); + outclassdefs.print("int numTransCommit = 0;"); outclassdefs.print("int classsize[]={"); Iterator it=state.getClassSymbolTable().getDescriptorsIterator(); cdarray=new ClassDescriptor[state.numClasses()]; diff --git a/Robust/src/Main/Main.java b/Robust/src/Main/Main.java index a46cca82..2fa3e23e 100644 --- a/Robust/src/Main/Main.java +++ b/Robust/src/Main/Main.java @@ -164,6 +164,7 @@ public class Main { readSourceFile(state, ClassLibraryPrefix+"gnu/Random.java"); readSourceFile(state, ClassLibraryPrefix+"Vector.java"); readSourceFile(state, ClassLibraryPrefix+"Enumeration.java"); + readSourceFile(state, ClassLibraryPrefix+"Signal.java"); if (state.TASK) { diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c index 8126962c..79d08be5 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c @@ -2,6 +2,8 @@ #include #include #include +#include +#include #include "addUdpEnhance.h" /************************ @@ -66,28 +68,23 @@ int udpInit() { return sockfd; } +/* Function that listens for udp broadcast messages */ void *udpListenBroadcast(void *sockfd) { pthread_t thread_udpBroadcast; struct sockaddr_in servaddr; - char readBuffer[MAX_SIZE]; socklen_t socklen = sizeof(struct sockaddr); + char readBuffer[MAX_SIZE]; int retval; - memset(readBuffer, 0, MAX_SIZE); printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd); + memset(readBuffer, 0, MAX_SIZE); while(1) { - //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL); - int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen); - if(bytesRcvd == 0) { - break; - } - + int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen); if(bytesRcvd == -1) { printf("DEBUG-> Recv Error! \n"); break; } - short status = *((short *) &readBuffer[0]); switch (status) { case INVALIDATE_OBJS: @@ -101,68 +98,95 @@ void *udpListenBroadcast(void *sockfd) { } } -closeconnection: - /* Close connection */ - if(close((int)sockfd) == -1) - perror("close"); - pthread_exit(NULL); + /* Close connection */ + if(close((int)sockfd) == -1) + perror("close"); + pthread_exit(NULL); } -/* Function that sends a broadcast to Invalidate objects that - * have been currently modified */ +/* Function that invalidate objects that + * have been currently modified + * returns -1 on error and 0 on success */ int invalidateObj(thread_data_array_t *tdata) { struct sockaddr_in clientaddr; - //TODO Instead of sending "hello" send modified objects - char writeBuffer[MAX_SIZE]; - //char writeBuffer[] = "hello"; - const int on = 1; + int retval; bzero(&clientaddr, sizeof(clientaddr)); clientaddr.sin_family = AF_INET; clientaddr.sin_port = htons(UDP_PORT); clientaddr.sin_addr.s_addr = INADDR_BROADCAST; - /* Create Udp Message */ - int offset = 0; - *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; - offset += sizeof(short); - *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); - offset += sizeof(short); - int i; - for(i = 0; i < tdata->buffer->f.nummod; i++) { - if(offset == MAX_SIZE) { - if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) { - perror("sendto error- "); - printf("DEBUG-> sendto error: errorno %d\n", errno); + int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int); + if(tdata->buffer->f.nummod < maxObjsPerMsg) { + /* send single udp msg */ + int iteration = 0; + if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) { + printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + } else { + /* Split into several udp msgs */ + int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg; + if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++; + int i; + for(i = 1; i <= maxUdpMsg; i++) { + if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) { + printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); return -1; } - offset = 0; - } - /* - if(offset >= MAX_SIZE) { - printf("DEBUG-> Large number of objects for one udp message\n"); - return -1; } - */ + } + return 0; +} - *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i]; - offset += sizeof(unsigned int); +/* Function sends a udp broadcast, also distinguishes + * msg size to be sent based on the iteration flag + * returns -1 on error and 0 on success */ +int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) { + char writeBuffer[MAX_SIZE]; + int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int); + int offset = 0; + *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg + offset += sizeof(short); + if(iteration == 0) { // iteration flag == zero, send single udp msg + *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); + offset += sizeof(short); + int i; + for(i = 0; i < tdata->buffer->f.nummod; i++) { + *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i]; + offset += sizeof(unsigned int); + } + } else { // iteration flag > zero, send multiple udp msg + int numObj; + if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) + numObj = maxObjsPerMsg; + else + numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg); + *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj); + offset += sizeof(short); + int index = (iteration - 1) * maxObjsPerMsg; + int i; + for(i = 0; i < numObj; i++) { + *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i]; + offset += sizeof(unsigned int); + } } int n; - if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) { + if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) { perror("sendto error- "); printf("DEBUG-> sendto error: errorno %d\n", errno); return -1; } - //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer); return 0; -} +} +/* Function searches given oid in prefetch cache and invalidates obj from cache + * returns -1 on error and 0 on success */ int invalidateFromPrefetchCache(char *buffer) { - int offset = sizeof(int); + int offset = sizeof(short); /* Read objects sent */ - int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int); + int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int); int i; - for(i = 0; i < numObjs; i++) { + for(i = 0; i < numObjsRecv; i++) { unsigned int oid; oid = *((unsigned int *)(buffer+offset)); objheader_t *header; diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h index 7d3d98c1..3a0f9174 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h @@ -12,7 +12,7 @@ /************************* * Global constants ************************/ -#define MAX_SIZE 4000 +#define MAX_SIZE 2000 /******************************** * Function Prototypes @@ -22,4 +22,5 @@ int udpInit(); void *udpListenBroadcast(void *); int invalidateObj(thread_data_array_t *); int invalidateFromPrefetchCache(char *); +int sendUdpMsg(thread_data_array_t *, struct sockaddr_in *, int); #endif diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 51d586c5..38434dc6 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -13,7 +13,7 @@ void queueInit(void) { /* Intitialize primary queue */ headoffset=0; tailoffset=0; - memory=malloc(QSIZE); + memory=malloc(QSIZE+sizeof(int));//leave space for -1 pthread_mutexattr_init(&qlockattr); pthread_mutexattr_settype(&qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&qlock, &qlockattr); @@ -30,13 +30,13 @@ void * getmemory(int size) { //Wait for tail to go past new start while(tailoffsetcache); chashDelete(record->lookupTable); @@ -584,6 +598,9 @@ int transCommit(transrecord_t *record) { free(ltdata); return TRANS_ABORT; } else if(treplyctrl == TRANS_COMMIT) { +#ifdef TRANSSTATS + ++numTransCommit; +#endif /* Free Resources */ objstrDelete(record->cache); chashDelete(record->lookupTable); @@ -681,10 +698,10 @@ void *transRequest(void *threadarg) { //make an entry in prefetch hash table void *oldptr; if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { - prehashRemove(oidToPrefetch); - prehashInsert(oidToPrefetch, header); + prehashRemove(oidToPrefetch); + prehashInsert(oidToPrefetch, header); } else { - prehashInsert(oidToPrefetch, header); + prehashInsert(oidToPrefetch, header); } length = length - size; offset += size; @@ -790,9 +807,11 @@ void decideResponse(thread_data_array_t *tdata) { return; } /* Invalidate objects in other machine cache */ - if((retval = invalidateObj(tdata)) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - return; + if(tdata->buffer->f.nummod > 0) { + if((retval = invalidateObj(tdata)) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + return; + } } } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ @@ -818,10 +837,10 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) { } else { oid = tdata->buffer->oidmod[i]; } + pthread_mutex_lock(&prefetchcache_mutex); header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); //copy object into prefetch cache GETSIZE(size, header); - pthread_mutex_lock(&prefetchcache_mutex); if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) { printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); return -1; @@ -1275,7 +1294,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { buf+=sizeof(unsigned int); *((unsigned int *)buf) = myIpAddr; buf += sizeof(unsigned int); - memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short)); + memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short)); send_data(sd, oidnoffset, len); tmp = tmp->next; } diff --git a/Robust/src/Runtime/runtime.c b/Robust/src/Runtime/runtime.c index a5c78425..03ee0bcf 100644 --- a/Robust/src/Runtime/runtime.c +++ b/Robust/src/Runtime/runtime.c @@ -4,7 +4,6 @@ #include "mem.h" #include #include -#include #include #include "option.h" #ifdef DSTM diff --git a/Robust/src/Runtime/signal.c b/Robust/src/Runtime/signal.c new file mode 100644 index 00000000..76ead1a4 --- /dev/null +++ b/Robust/src/Runtime/signal.c @@ -0,0 +1,27 @@ +#include "runtime.h" +#include +#include +#include + +extern int numTransAbort; +extern int numTransCommit; + + +void transStatsHandler(int sig, siginfo_t* info, void *context) { +#ifdef TRANSSTATS + printf("numTransCommit = %d\n", numTransCommit); + printf("numTransAbort = %d\n", numTransAbort); + exit(0); +#endif +} + +#ifdef TRANSSTATS +void CALL00(___Signal______nativeSigAction____) { + struct sigaction siga; + siga.sa_handler = NULL; + siga.sa_flags = SA_SIGINFO; + siga.sa_sigaction = &transStatsHandler; + sigemptyset(&siga.sa_mask); + sigaction(SIGUSR1, &siga, 0); +} +#endif diff --git a/Robust/src/buildscript b/Robust/src/buildscript index beb59c1d..6042f0bb 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -22,6 +22,7 @@ echo -threadsimulate generate multi-thread simulate version binary echo -optional enable optional echo -debug generate debug symbols echo -prefetch do prefetch analysis +echo -transstats generates transaction stats on commits and aborts echo -webinterface enable web interface echo -runtimedebug printout runtime debug messages echo "-thread use support for multiple threads" @@ -46,6 +47,7 @@ NOJAVA=false CHECKFLAG=false RECOVERFLAG=false MULTICOREFLAG=false +TRANSSTATSFLAG=false RAWFLAG=false THREADSIMULATEFLAG=false; USEDMALLOC=false @@ -100,6 +102,9 @@ DSMFLAG=true elif [[ $1 = '-prefetch' ]] then JAVAOPTS="$JAVAOPTS -prefetch" +elif [[ $1 = '-transstats' ]] +then +TRANSSTATSFLAG=true elif [[ $1 = '-printflat' ]] then JAVAOPTS="$JAVAOPTS -printflat" @@ -284,11 +289,16 @@ $ROBUSTROOT/Runtime/SimpleHash.c $ROBUSTROOT/Runtime/option.c \ $ROBUSTROOT/Runtime/ObjectHash.c \ $ROBUSTROOT/Runtime/garbage.c $ROBUSTROOT/Runtime/socket.c \ $ROBUSTROOT/Runtime/math.c \ +$ROBUSTROOT/Runtime/signal.c \ $ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c" if $DSMFLAG then EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME" +if $TRANSSTATSFLAG +then +EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DTRANSSTATS -DCOMPILER -DDSTM -I$DSMRUNTIME" +fi FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c" fi -- 2.34.1