changes for prefetch/caching and performance improvements
authoradash <adash>
Wed, 3 Mar 2010 23:53:11 +0000 (23:53 +0000)
committeradash <adash>
Wed, 3 Mar 2010 23:53:11 +0000 (23:53 +0000)
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c
Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h
Robust/src/Runtime/DSTM/interface_recovery/altmlookup.h
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/queue.c
Robust/src/Runtime/DSTM/interface_recovery/queue.h
Robust/src/Runtime/DSTM/interface_recovery/trans.c
Robust/src/Runtime/runtime.c

index e7b6291b0f9b95930cbe789f847c71a514344e1e..182a9aa24e91b9984e57c5f4724fae24e7afe61e 100644 (file)
@@ -313,10 +313,15 @@ public class BuildCode {
     }
 
     if (state.DSM) {
-      outmethod.println("#ifdef TRANSSTATS \n");
-      outmethod.println("handle();\n");
-      outmethod.println("#endif\n");
+      if (state.DSMRECOVERYSTATS)
+        outmethod.println("handle();\n");
+      else {
+        outmethod.println("#ifdef TRANSSTATS \n");
+        outmethod.println("handle();\n");
+        outmethod.println("#endif\n");
+      }
     }
+    
     if (state.THREAD||state.DSM||state.SINGLETM) {
       outmethod.println("initializethreads();");
     }
index 5c9363a0f0e82080eac4fd5ae5a75b1133d11be8..c7e3f9f4bc333b409d13c9e3029276c600aad770 100644 (file)
@@ -113,6 +113,29 @@ void *udpListenBroadcast(void *sockfd) {
 /* Function that invalidate objects that
  * have been currently modified
  * returns -1 on error and 0 on success */
+int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) {
+  struct timeval start, end;
+  struct sockaddr_in clientaddr;
+  int retval;
+  int i;
+  int nummod=0;
+  for(i=0;i<pilecount;i++) {
+    nummod+=tdata[i].f.nummod;
+  }
+  bzero(&clientaddr, sizeof(clientaddr));
+  clientaddr.sin_family = AF_INET;
+  clientaddr.sin_port = htons(UDP_PORT);
+  clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
+  int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
+  /* send single udp msg */
+  if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
+    printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+#if 0
 int invalidateObj(trans_req_data_t *tdata) {
   struct sockaddr_in clientaddr;
   int retval;
@@ -122,28 +145,61 @@ int invalidateObj(trans_req_data_t *tdata) {
   clientaddr.sin_port = htons(UDP_PORT);
   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
-  if(tdata->f.nummod < maxObjsPerMsg) {
-    /* send single udp msg */
-    int iteration = 0;
-    if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
-      printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
-      return -1;
-    }
-  } else {
-    /* Split into several udp msgs */
-    int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg;
-    if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++;
-    int i;
-    for(i = 1; i <= maxUdpMsg; i++) {
-      if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
-       printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
-       return -1;
+  /* send single udp msg */
+  if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
+    printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+#endif
+
+/* Function sends a udp broadcast, also distinguishes
+ * msg size to be sent based on the total number of objects modified
+ * returns -1 on error and 0 on success */
+int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) {
+  char writeBuffer[MAX_SIZE];
+  int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
+  int offset = 0;
+  int i=0,j=0;
+
+  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
+  offset += sizeof(short);
+  *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
+  offset += sizeof(unsigned int);
+
+  while(nummod>0) {
+    int numtosend=nummod>maxObjsPerMsg?maxObjsPerMsg:nummod;
+    int localoffset=offset;
+    int sentmsgs=0;
+    *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend);
+    localoffset += sizeof(short);
+
+    for(; j < pilecount; j++) {
+      for(; i < tdata[j].f.nummod; i++) {
+        *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i];  //copy objects
+        localoffset += sizeof(unsigned int);
+        if ((++sentmsgs)==numtosend) {
+          i++;
+          goto send;
+        }
       }
+      i=0;
     }
+send:
+    if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) {
+      perror("sendto error- ");
+      printf("DEBUG-> sendto error: errorno %d\n", errno);
+      return -1;
+    }
+    nummod= nummod - numtosend;
   }
   return 0;
 }
 
+#if 0
+
 /* Function sends a udp broadcast, also distinguishes
  * msg size to be sent based on the iteration flag
  * returns -1 on error and 0 on success */
@@ -186,6 +242,7 @@ int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iter
   }
   return 0;
 }
+#endif
 
 /* Function searches given oid in prefetch cache and invalidates obj from cache
  * returns -1 on error and 0 on success */
@@ -209,7 +266,9 @@ int invalidateFromPrefetchCache(char *buffer) {
       objheader_t *header;
       /* Lookup Objects in prefetch cache and remove them */
       if(((header = prehashSearch(oid)) != NULL)) {
-       prehashRemove(oid);
+        //Keep invalid objects
+        STATUS(header)=DIRTY;
+        //prehashRemove(oid);
       }
       offset += sizeof(unsigned int);
     }
index b3964bce4ea67ccb68c6c6d47b088bdf6e8dc394..295f8af454ef3f6a5f1561784f4675737c7281bf 100644 (file)
@@ -21,7 +21,9 @@
 int createUdpSocket();
 int udpInit();
 void *udpListenBroadcast(void *);
-int invalidateObj(trans_req_data_t *);
+//int invalidateObj(trans_req_data_t *);
+int invalidateObj(trans_req_data_t *, int, char, int*);
 int invalidateFromPrefetchCache(char *);
-int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int);
+//int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int);
+int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*);
 #endif
index 34984bdc2b6d20d6c6b65720cf70a833be2a36c9..877dd3a473c71c88fbcc007d38082f10271624a7 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _ALTMLOOKUP_H_
-#define _ALTMLOOKUP_H_
+#ifndef _MLOOKUP_H_
+#define _MLOOKUP_H_
 
 #include <stdlib.h>
 #include <stdio.h>
index 5f4eb9d28b294cc2dbd07b2ba779f12f6bd0477a..1583232d7851369a4ac02f9ea988caabe22af8e7 100644 (file)
@@ -242,6 +242,7 @@ typedef struct recoverystat {
   unsigned int deadMachine;
   long long elapsedTime;
   unsigned int recoveredData;
+  unsigned int recvData;
 } recovery_stat_t;
 #endif
 
@@ -363,9 +364,9 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
 void prefetch(int, int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
-prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets)
-int lookupObject(unsigned int * oid, short offset);
-int checkoid(unsigned int oid);
+prefetchpile_t *foundLocal(char *,int); // returns node with prefetch elements(oids, offsets)
+int lookupObject(unsigned int * oid, short offset, int *);
+int checkoid(unsigned int oid, int);
 int transPrefetchProcess(int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
 void sendPrefetchReqnew(prefetchpile_t*, int);
index 811888cdcf04832b796dea0a9a353c46f9d79ec2..56a26b57ac91bee578674eff8928b5e6f28ee3fa 100644 (file)
@@ -4,7 +4,7 @@
 #include <netinet/tcp.h>
 #include <ip.h>
 #include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 #include "llookup.h"
 #include "threadnotify.h"
 #include "prefetch.h"
index fcb58191864383ea2b65ceacde1c56ab08ae47f8..06641b62735542577f7623f0de62782d75199818 100644 (file)
@@ -58,10 +58,10 @@ void movehead(int size) {
 void * gettail() {
   while(tailoffset==headoffset) {
     //Sleep
-    //    pthread_mutex_lock(&qlock);
-    //    if (tailoffset==headoffset)
-    //      pthread_cond_wait(&qcond, &qlock);
-    //    pthread_mutex_unlock(&qlock);
+        pthread_mutex_lock(&qlock);
+        if (tailoffset==headoffset)
+          pthread_cond_wait(&qcond, &qlock);
+        pthread_mutex_unlock(&qlock);
   }
   if (*((int *)(memory+tailoffset))==-1) {
     tailoffset=0; //do loop
@@ -70,6 +70,38 @@ void * gettail() {
   return memory+tailoffset+sizeof(int);
 }
 
+int numavailable() {
+  int tmp=tailoffset;
+  int available=0;
+  if (*((int *)(memory+tmp))==-1) {
+    tmp=0;
+  }
+  while(tmp!=headoffset) {
+    available++;
+    tmp=tmp+*((int *)(memory+tmp));
+    if (tmp>QSIZE|| (*((int *)(memory+tmp))==-1)) {
+      break;
+    }
+  }
+  return available;
+}
+
+void incmulttail(int num) {
+  int i;
+  for(i=0;i<num;i++) {
+    int tmpoffset=tailoffset+*((int *)(memory+tailoffset));
+    if (tmpoffset>QSIZE)
+      tailoffset=0;
+    else
+      tailoffset=tmpoffset;
+  }
+}
+
+void resetqueue() {
+  headoffset=0;
+  tailoffset=0;
+}
+
 void inctail() {
   int tmpoffset=tailoffset+*((int *)(memory+tailoffset));
   if (tmpoffset>QSIZE)
index 2e1aa9ec309a01aa4775731e1fa2d5e61d0b6cf9..e284615ded7b81f18d3fb94fa881c08c5b23c7b4 100644 (file)
@@ -13,4 +13,7 @@ void movehead(int size);
 void * gettail();
 void inctail();
 void predealloc();
+int numavailable();
+void resetqueue();
+void incmulttail(int);
 #endif
index b751584b66a65ccbf1564a01ee07b27864017df6..31aa60ab87c95671b3ecf29fa13356af58d3dfc0 100644 (file)
@@ -1,7 +1,7 @@
 #include "dstm.h"
 #include "ip.h"
 #include "machinepile.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 #include "llookup.h"
 #include "plookup.h"
 #include "prelookup.h"
@@ -352,17 +352,33 @@ char* midtoIPString(unsigned int mid){
                return ip;
 }
 #endif
+
+#define INLINEPREFETCH
+#define PREFTHRESHOLD 0
+
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
   /* Allocate for the queue node*/
   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
   int len;
-  char * node= getmemory(qnodesize);
+#ifdef INLINEPREFETCH
+  int attempted=0;
+  char *node;
+  do {
+  node=getmemory(qnodesize);
+  if (node==NULL&&attempted)
+    break;
+  if (node!=NULL) {
+#else
+  char *node=getmemory(qnodesize);
+#endif
   int top=endoffsets[ntuples-1];
 
-  if (node==NULL)
+  if (node==NULL) {
+    LOGEVENT('D');
     return;
+  }
   /* Set queue node values */
 
   /* TODO: Remove this after testing */
@@ -375,8 +391,35 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
 
+#ifdef INLINEPREFETCH
+  movehead(qnodesize);
+  }
+  int numpref=numavailable();
+  attempted=1;
+
+  if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
+    node=gettail();
+    prefetchpile_t *pilehead = foundLocal(node,numpref);
+    if (pilehead!=NULL) {
+      // Get sock from shared pool
+      
+      /* Send  Prefetch Request */
+      prefetchpile_t *ptr = pilehead;
+      while(ptr != NULL) {
+        int sd = getSock2(transPrefetchSockPool, ptr->mid);
+        sendPrefetchReq(ptr, sd);
+        ptr = ptr->next;
+      }
+      
+      mcdealloc(pilehead);
+    }
+    resetqueue();
+  }//end do prefetch if condition
+  } while(node==NULL);
+#else
   /* Lock and insert into primary prefetch queue */
   movehead(qnodesize);
+#endif
 }
 
 /* This function starts up the transaction runtime. */
@@ -503,12 +546,16 @@ void transInit() {
     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
   } while(retval!=0);
 #else
+#ifndef INLINEPREFETCH
   do {
     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
   } while(retval!=0);
 #endif
+#endif
+#ifndef INLINEPREFETCH
   pthread_detach(tPrefetch);
 #endif
+#endif
 }
 
 /* This function stops the threads spawned */
@@ -1175,7 +1222,20 @@ int transCommit() {
 #ifdef DEBUG
     printf("%s-> Final Response: %d\n", __func__, (int)finalResponse);
 #endif
-    
+
+#ifdef CACHE
+    if (finalResponse == TRANS_COMMIT) {
+      /* Invalidate objects in other machine cache */
+      int retval;
+      if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+       free(tosend);
+       free(listmid);
+       return 1;
+      }
+    }
+#endif
+
                /* Send responses to all machines */
                for(i = 0; i < pilecount; i++) {
                        int sd = socklist[i];
@@ -1194,6 +1254,7 @@ int transCommit() {
                                                return 1;
                                        }
 
+#if 0
                                        /* Invalidate objects in other machine cache */
                                        if(tosend[i].f.nummod > 0) {
                                                if((retval = invalidateObj(&(tosend[i]))) != 0) {
@@ -1203,6 +1264,7 @@ int transCommit() {
                                                        return 1;
                                                }
                                        }
+#endif                    
 #ifdef ABORTREADERS
                                        removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
                                        removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
@@ -1369,16 +1431,6 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
       return;
     }
   } else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
-    /* Invalidate objects in other machine cache */
-    if(tdata->f.nummod > 0) {
-      int retval;
-      if((retval = invalidateObj(tdata)) != 0) {
-       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-       return;
-      }
-    }
-#endif
     if(transComProcess(tdata, transinfo) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
       fflush(stdout);
@@ -1833,6 +1885,70 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   return 0;
 }
 
+prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
+  int i;
+  int j;
+  prefetchpile_t * head=NULL;
+
+  for(j=0;j<numprefetches;j++) {
+    int siteid = *(GET_SITEID(ptr));
+    int ntuples = *(GET_NTUPLES(ptr));
+    unsigned int * oidarray = GET_PTR_OID(ptr);
+    unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+    short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+    int numLocal = 0;
+    
+    for(i=0; i<ntuples; i++) {
+      unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
+      unsigned short endindex=endoffsets[i];
+      unsigned int oid=oidarray[i];
+      int newbase;
+      int machinenum;
+      int countInvalidObj=0;
+
+      if (oid==0) {
+       numLocal++;
+       continue;
+      }
+      //Look up fields locally
+      int isLastOffset=0;
+      if(endindex==0)
+          isLastOffset=1;
+      for(newbase=baseindex; newbase<endindex; newbase++) {
+        if(newbase==(endindex-1))
+          isLastOffset=1;
+       if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
+         break;
+       }
+       //Ended in a null pointer...
+       if (oid==0) {
+         numLocal++;
+         goto tuple;
+       }
+      }
+
+      //Entire prefetch is local
+      if (newbase==endindex&&checkoid(oid,isLastOffset)) {
+       numLocal++;
+       goto tuple;
+      }
+
+      //Add to remote requests
+      machinenum=lhashSearch(oid);
+      insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+    tuple:
+      ;
+    }
+    
+    /* handle dynamic prefetching */
+    handleDynPrefetching(numLocal, ntuples, siteid);
+    ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
+  }
+
+  return head;
+}
+
+/*
 prefetchpile_t *foundLocal(char *ptr) {
        int siteid = *(GET_SITEID(ptr));
        int ntuples = *(GET_NTUPLES(ptr));
@@ -1871,11 +1987,30 @@ tuple:
                ;
        }
 
-       /* handle dynamic prefetching */
+       // handle dynamic prefetching
        handleDynPrefetching(numLocal, ntuples, siteid);
        return head;
 }
+*/
+
+int checkoid(unsigned int oid, int isLastOffset) {
+  objheader_t *header;
+  if ((header=mhashSearch(oid))!=NULL) {
+    //Found on machine
+    return 1;
+  } else if ((header=prehashSearch(oid))!=NULL) {
+    //if the last offset then prefetch object
+    if((STATUS(header) & DIRTY) && isLastOffset) {
+      return 0;
+    }
+    //Found in cache
+    return 1;
+  } else {
+    return 0;
+  }
+}
 
+#if 0
 int checkoid(unsigned int oid) {
        objheader_t *header;
        if ((header=mhashSearch(oid))!=NULL) {
@@ -1888,7 +2023,45 @@ int checkoid(unsigned int oid) {
                return 0;
        }
 }
+#endif
+
+int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
+  objheader_t *header;
+  if ((header=mhashSearch(*oid))!=NULL) {
+    //Found on machine
+    ;
+  } else if ((header=prehashSearch(*oid))!=NULL) {
+    //Found in cache
+    if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+      //only once because later old entries may still cause unnecessary roundtrips during prefetching
+      (*countInvalidObj)+=1;
+      if(*countInvalidObj > 1) {
+        return 0;
+      }
+    }
+  } else {
+    return 0;
+  }
 
+  if(TYPE(header) >= NUMCLASSES) {
+    int elementsize = classsize[TYPE(header)];
+    struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+    int length = ao->___length___;
+    /* Check if array out of bounds */
+    if(offset < 0 || offset >= length) {
+      //if yes treat the object as found
+      (*oid)=0;
+      return 1;
+    }
+    (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
+    return 1;
+  } else {
+    (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
+    return 1;
+  }
+}
+
+#if 0
 int lookupObject(unsigned int * oid, short offset) {
   objheader_t *header;
   if ((header=mhashSearch(*oid))!=NULL) {
@@ -1918,9 +2091,42 @@ int lookupObject(unsigned int * oid, short offset) {
     return 1;
   }
 }
+#endif
+
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *t) {
+  while(1) {
+    /* read from prefetch queue */
+    void *node=gettail();
+    /* Check if the tuples are found locally, if yes then reduce them further*/
+    /* and group requests by remote machine ids by calling the makePreGroups() */
+    int count=numavailable();
+    prefetchpile_t *pilehead = foundLocal(node, count);
 
+    if (pilehead!=NULL) {
+      // Get sock from shared pool
+
+      /* Send  Prefetch Request */
+      prefetchpile_t *ptr = pilehead;
+      while(ptr != NULL) {
+        int sd = getSock2(transPrefetchSockPool, ptr->mid);
+        sendPrefetchReq(ptr, sd);
+        ptr = ptr->next;
+      }
+
+      /* Release socket */
+      //       freeSock(transPrefetchSockPool, pilehead->mid, sd);
+
+      /* Deallocated pilehead */
+      mcdealloc(pilehead);
+    }
+    // Deallocate the prefetch queue pile node
+    incmulttail(count);
+  }
+}
 
 /* This function is called by the thread calling transPrefetch */
+#if 0
 void *transPrefetch(void *t) {
   while(1) {
     /* read from prefetch queue */
@@ -1950,6 +2156,7 @@ void *transPrefetch(void *t) {
     inctail();
   }
 }
+#endif
 
 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
   objpile_t *tmp;
@@ -2532,7 +2739,8 @@ void duplicateLostObjects(unsigned int mid){
   numRecovery++;
   long long st;
   long long fi;
-  unsigned int dupeSize = 0;  // to calculate the size of backed up data
+  unsigned int dupeSize;  // to calculate the size of backed up data
+  unsigned int recvDataSize = 0;  // to calculate the size of recv data
 
   st = myrdtsc(); // to get clock
   recoverStat[numRecovery-1].deadMachine = mid;
@@ -2577,7 +2785,6 @@ void duplicateLostObjects(unsigned int mid){
   if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
     printf("%s -> Socket create error\n",__func__);
     exit(0);
-  }
 
 /* request for original */
        char duperequest;
@@ -2591,17 +2798,19 @@ void duplicateLostObjects(unsigned int mid){
        send_data(bsd, &originalMid, sizeof(unsigned int));
 
        char p_response,b_response;
-  unsigned int p_receivedSize,b_receivedSize;
+    unsigned int p_receivedSize,b_receivedSize;
 
   recv_data(psd, &p_response, sizeof(char));
   recv_data(psd, &p_receivedSize, sizeof(unsigned int));
 
   dupeSize += p_receivedSize; // size of primary data
+  recvDataSize += p_receivedSize; // size of primary data
 
   recv_data(bsd, &b_response, sizeof(char));
   recv_data(bsd, &b_receivedSize, sizeof(unsigned int));
 
   dupeSize += b_receivedSize; // size of backup data
+  recvDataSize += b_receivedSize; // size of backup data
 
   if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE)
   {
@@ -2616,6 +2825,7 @@ void duplicateLostObjects(unsigned int mid){
   fi = myrdtsc();
   recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
   recoverStat[numRecovery-1].recoveredData = dupeSize;
+  recoverStat[numRecovery-1].recvData = recvDataSize;
   
   printRecoveryStat();
 #endif
@@ -3479,7 +3689,8 @@ void printRecoveryStat() {
   for(i=0; i < numRecovery;i++) {
     printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
     printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData);
-    printf("Recovery Time(us) = %ld\n",recoverStat[i].elapsedTime);
+    printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
+    printf("Data recv(bytes) = %ld\n",recoverStat[i].recvData);
   }
   printf("**************************\n\n");
   fflush(stdout);
index 56c003dcbef741030f78e543ef075a3a361fc210..9e9392277293396b0bca6ddfb89b29bfdbb0e449 100644 (file)
@@ -232,9 +232,11 @@ void CALL02(___System______deepArrayCopy____L___Object____L___Object___, struct
 
 void CALL11(___System______exit____I,int ___status___, int ___status___) {
 #ifdef TRANSSTATS
+#ifndef RECOVERY
   printf("numTransCommit = %d\n", numTransCommit);
   printf("numTransAbort = %d\n", numTransAbort);
   printf("nSoftAbort = %d\n", nSoftAbort);
+#endif
 #ifdef STM
   printf("nSoftAbortCommit = %d\n", nSoftAbortCommit);
   printf("nSoftAbortAbort = %d\n", nSoftAbortAbort);