changes to trans.c for performance improvement
authoradash <adash>
Sat, 6 Mar 2010 22:55:04 +0000 (22:55 +0000)
committeradash <adash>
Sat, 6 Mar 2010 22:55:04 +0000 (22:55 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 56a26b57ac91bee578674eff8928b5e6f28ee3fa..fa17c3da5c1e1e561a2997cfdddff93e1c9c4f35 100644 (file)
@@ -944,7 +944,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   }
   ptr = (char *) modptr;
   for(i = 0 ; i < fixed.nummod; i++) {
-    int tmpsize;
+    int tmpsize=0;
     headaddr = (objheader_t *) ptr;
     oid = OID(headaddr);
     oidmod[i] = oid;
index 2f4186d22d5ad9825e57baf267b57622c0f23750..a9030df04cedf09c1b188ad414a702425d26bcb7 100644 (file)
@@ -26,8 +26,6 @@
 #include <sys/select.h>
 #include "tlookup.h"
 
-//#define CPU_FREQ 2992440
-
 #define CPU_FREQ 3056842
 #endif
 
@@ -77,6 +75,7 @@ int numTransAbort = 0;
 int nchashSearch = 0;
 int nmhashSearch = 0;
 int nprehashSearch = 0;
+int ndirtyCacheObj = 0;
 int nRemoteSend = 0;
 int nSoftAbort = 0;
 int bytesSent = 0;
@@ -98,7 +97,7 @@ unsigned int *locateObjHosts;
 int waitThreadMid;            
 unsigned int waitThreadID; 
 
-int transRetryFlag;
+__thread int transRetryFlag;
 unsigned int transIDMin;
 unsigned int transIDMax;
 
@@ -208,6 +207,34 @@ GDBSEND1:
   return 0; // completed sending data
 }
 
+void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
+  if (buflen+sendbuffer->offset>WMAXBUF) {
+    send_data(fd, sendbuffer->buf, sendbuffer->offset);
+    sendbuffer->offset=0;
+    send_data(fd, buffer, buflen);
+    return;
+  }
+  memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
+  sendbuffer->offset+=buflen;
+  if (sendbuffer->offset>WTOP) {
+    send_data(fd, sendbuffer->buf, sendbuffer->offset);
+    sendbuffer->offset=0;
+  }
+}
+
+void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
+  if (buflen+sendbuffer->offset>WMAXBUF) {
+    send_data(fd, sendbuffer->buf, sendbuffer->offset);
+    sendbuffer->offset=0;
+    send_data(fd, buffer, buflen);
+    return;
+  }
+  memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
+  sendbuffer->offset+=buflen;
+  send_data(fd, sendbuffer->buf, sendbuffer->offset);
+  sendbuffer->offset=0;
+}
+
 //Returns negative value if receive cannot be completed because of
 //timeout or machine failure
 
@@ -222,6 +249,7 @@ int recv_data(int fd, void *buf, int buflen) {
 GDBRECV1:
 #endif
     numbytes = recv(fd, buffer, size, 0);
+    bytesRecv += numbytes;
     
     if (numbytes>0) {
       buffer += numbytes;
@@ -307,6 +335,7 @@ int recv_data_errorcode(int fd, void *buf, int buflen) {
       perror("recv_data_errorcode");
       return -1;
     }
+    bytesRecv += numbytes;
     buffer += numbytes;
     size -= numbytes;
   }
@@ -687,6 +716,12 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
   } else {
 #ifdef CACHE
     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+      if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+        ndirtyCacheObj++;
+#endif
+        goto remoteread;
+      }
 #ifdef TRANSSTATS
       nprehashSearch++;
 #endif
@@ -703,6 +738,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
       return objcopy;
 #endif
     }
+remoteread:
 #endif
     /* Get the object from the remote location */
     if((machinenumber = lhashSearch(oid)) == 0) {
@@ -719,6 +755,24 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
       nRemoteSend++;
 #endif
 #ifdef COMPILER
+#ifdef CACHE
+      //Copy object to prefetch cache
+      pthread_mutex_lock(&prefetchcache_mutex);
+      objheader_t *headerObj;
+      int size;
+      GETSIZE(size, objcopy);
+      if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
+        printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+            __FILE__, __LINE__);
+        pthread_mutex_unlock(&prefetchcache_mutex);
+        return NULL;
+      }
+      pthread_mutex_unlock(&prefetchcache_mutex);
+      memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+      //make an entry in prefetch lookup hashtable
+      prehashInsert(oid, headerObj);
+      LOGEVENT('B');
+#endif
       return &objcopy[1];
 #else
       return objcopy;
@@ -779,6 +833,12 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
     } else {
 #ifdef CACHE
       if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+        if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+          ndirtyCacheObj++;
+#endif
+          goto remoteread;
+        }
 #ifdef TRANSSTATS
       LOGEVENT('P')
       nprehashSearch++;
@@ -796,8 +856,9 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
                        return objcopy;
 #endif
        } 
+remoteread:
 #endif
-               /* Get the object from the remote location */
+      /* Get the object from the remote location */
     if((machinenumber = lhashSearch(oid)) == 0) {
       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
            return NULL;
@@ -818,7 +879,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
 #endif
 #endif
 
-         objcopy = getRemoteObj(machinenumber, oid);
+    objcopy = getRemoteObj(machinenumber, oid);
 
 #ifdef RECOVERY
     if(transRetryFlag) {
@@ -836,9 +897,27 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
        } else {
 #ifdef TRANSSTATS
     LOGEVENT('R');
-         nRemoteSend++;
+    nRemoteSend++;
 #endif
 #ifdef COMPILER
+#ifdef CACHE
+      //Copy object to prefetch cache
+      pthread_mutex_lock(&prefetchcache_mutex);
+      objheader_t *headerObj;
+      int size;
+      GETSIZE(size, objcopy);
+      if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
+        printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+            __FILE__, __LINE__);
+        pthread_mutex_unlock(&prefetchcache_mutex);
+        return NULL;
+      }
+      pthread_mutex_unlock(&prefetchcache_mutex);
+      memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+      //make an entry in prefetch lookup hashtable
+      prehashInsert(oid, headerObj);
+#endif
+
                return &objcopy[1];
 #else
                return objcopy;
@@ -848,7 +927,6 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
 #ifdef DEBUG
   printf("%s -> Finished!!\n",__func__);
 #endif
-    
 }
 
 /* This function creates objects in the transaction record */
@@ -857,8 +935,8 @@ objheader_t *transCreateObj(unsigned int size) {
   OID(tmp) = getNewOID();
   tmp->notifylist = NULL;
   tmp->version = 1;
-  tmp->rcount = 1;
-       tmp->isBackup = 0;
+  //tmp->rcount = 1;
+  tmp->isBackup = 0;
   STATUS(tmp) = NEW;
   t_chashInsert(OID(tmp), tmp);
 
@@ -903,8 +981,6 @@ plistnode_t *createPiles() {
                          int makedirty = 0;
         unsigned int mid;
 
-        mid = lhashSearch(oid);
-
         // if the obj is dirty or new
                        if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
           // set flag for backup machine
@@ -912,7 +988,7 @@ plistnode_t *createPiles() {
                    }
 
         // if the obj is new or local, destination will be my Ip
-        if((mid = lhashSearch(oid)) == 0) {
+        if((mid=lhashSearch(oid)) == 0) {
             mid = myIpAddr;
         }
  
@@ -989,7 +1065,8 @@ int transCommit() {
   int firsttime=1;
   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
   char finalResponse;
-
+  struct writestruct writebuffer;
+  writebuffer.offset=0;
 #ifdef RECOVERY
   int deadsd = -1;
   int deadmid = -1;
@@ -1040,16 +1117,16 @@ int transCommit() {
 
     /* Create a socket and getReplyCtrl array, initialize */
     int socklist[pilecount];
+    char getReplyCtrl[pilecount];
     int loopcount;
-    for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+    for(loopcount = 0 ; loopcount < pilecount; loopcount++) {
       socklist[loopcount] = 0;
-    char getReplyCtrl[pilecount];
-    for(loopcount = 0 ; loopcount < pilecount; loopcount++)
       getReplyCtrl[loopcount] = 0;
+    }
 
     /* Process each machine pile */
     int sockindex = 0;
-               int localReqsock = -1;
+    int localReqsock = -1;
     trans_req_data_t *tosend;
     tosend = calloc(pilecount, sizeof(trans_req_data_t));
     while(pile != NULL) {
@@ -1082,18 +1159,21 @@ int transCommit() {
                                }
                                socklist[sockindex] = sd;
                                /* Send bytes of data with TRANS_REQUEST control message */
-                               send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+                               //send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+                send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
 
                                /* Send list of machines involved in the transaction */
                                {
                                        int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
-                                       send_data(sd, tosend[sockindex].listmid, size);
+                                       //send_data(sd, tosend[sockindex].listmid, size);
+                    send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
                                }
 
                                /* Send oids and version number tuples for objects that are read */
                                {
                                        int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
-                                       send_data(sd, tosend[sockindex].objread, size);
+                                       //send_data(sd, tosend[sockindex].objread, size);
+                    send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
                                }
 
                                /* Send objects that are modified */
@@ -1121,11 +1201,13 @@ int transCommit() {
                                        memcpy(modptr+offset, headeraddr, size);
                                        offset+=size;
                                }
-                               send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+                               //send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+                forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
 
 #ifdef RECOVERY
         /* send transaction id, number of machine involved, machine ids */
-        send_data(sd, &transID, sizeof(unsigned int));
+        //send_data(sd, &transID, sizeof(unsigned int));
+        forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
 #endif
                                free(modptr);
                        } else { //handle request locally
@@ -1254,17 +1336,6 @@ int transCommit() {
                                                return 1;
                                        }
 
-#if 0
-                                       /* Invalidate objects in other machine cache */
-                                       if(tosend[i].f.nummod > 0) {
-                                               if((retval = invalidateObj(&(tosend[i]))) != 0) {
-                                                       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-                                                       free(tosend);
-                                                       free(listmid);
-                                                       return 1;
-                                               }
-                                       }
-#endif                    
 #ifdef ABORTREADERS
                                        removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
                                        removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
@@ -1863,7 +1934,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
     memcpy(ptrcreate, header, tmpsize);
     mhashInsert(oidcreated[i], ptrcreate);
     lhashInsert(oidcreated[i], myIpAddr);
-//    printf("oid created : %u\n",oidcreated[i]);
   }
   /* Unlock locked objects */
   int useWriteUnlock = 0;
@@ -2205,6 +2275,9 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
     *((int*)buf) = tmp->numoffset;
     buf+=sizeof(int);
     *((unsigned int *)buf) = tmp->oid;
+#ifdef TRANSSTATS
+    sendRemoteReq++;
+#endif
     buf+=sizeof(unsigned int);
     *((unsigned int *)buf) = myIpAddr;
     buf += sizeof(unsigned int);
@@ -2216,7 +2289,6 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
   endpair = -1;
   send_data(sd, &endpair, sizeof(int));
-
   return;
 }
 
@@ -2229,7 +2301,9 @@ int getPrefetchResponse(int sd) {
   recv_data((int)sd, &length, sizeof(int));
   size = length - sizeof(int);
   char recvbuffer[size];
-
+#ifdef TRANSSTATS
+  getResponse++;
+#endif
   recv_data((int)sd, recvbuffer, size);
   control = *((char *) recvbuffer);
   if(control == OBJECT_FOUND) {
@@ -2739,8 +2813,7 @@ void duplicateLostObjects(unsigned int mid){
   numRecovery++;
   long long st;
   long long fi;
-  unsigned int dupeSize;  // to calculate the size of backed up data
-  unsigned int recvDataSize = 0;  // to calculate the size of recv data
+  unsigned int dupeSize = 0;  // to calculate the size of backed up data
 
   st = myrdtsc(); // to get clock
   recoverStat[numRecovery-1].deadMachine = mid;
@@ -2778,12 +2851,15 @@ void duplicateLostObjects(unsigned int mid){
    * Backup     26      21,24
    */
 
+#ifdef RECOVERYSTATS
   dupeSize = 0;
+#endif
 
   if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
     printf("%s -> Socket create error\n",__func__);
     exit(0);
   }
+
 /* request for original */
        char duperequest;
        duperequest = DUPLICATE_ORIGINAL;
@@ -2801,14 +2877,16 @@ void duplicateLostObjects(unsigned int mid){
   recv_data(psd, &p_response, sizeof(char));
   recv_data(psd, &p_receivedSize, sizeof(unsigned int));
 
+#ifdef RECOVERYSTATS
   dupeSize += p_receivedSize; // size of primary data
-  recvDataSize += p_receivedSize; // size of primary data
+#endif
 
   recv_data(bsd, &b_response, sizeof(char));
   recv_data(bsd, &b_receivedSize, sizeof(unsigned int));
 
+#ifdef RECOVERYSTATS
   dupeSize += b_receivedSize; // size of backup data
-  recvDataSize += b_receivedSize; // size of backup data
+#endif
 
   if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE)
   {
@@ -2823,8 +2901,6 @@ void duplicateLostObjects(unsigned int mid){
   fi = myrdtsc();
   recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
   recoverStat[numRecovery-1].recoveredData = dupeSize;
-  recoverStat[numRecovery-1].recvData = recvDataSize;
-  
   printRecoveryStat();
 #endif
 
@@ -3678,9 +3754,10 @@ int checkiftheMachineDead(unsigned int mid) {
   int mIndex = findHost(mid);
   return getStatus(mIndex);
 }
+#endif
 
-#ifdef RECOVERYSTATS
 void printRecoveryStat() {
+#ifdef RECOVERYSTATS
   printf("\n***** Recovery Stats *****\n");
   printf("numRecovery = %d\n",numRecovery);
   int i;
@@ -3688,18 +3765,10 @@ void printRecoveryStat() {
     printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
     printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData);
     printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
-    printf("Data recv(bytes) = %ld\n",recoverStat[i].recvData);
   }
   printf("**************************\n\n");
   fflush(stdout);
-}
 #else
-void printRecoveryStat() {
   printf("No stat\n");
-}
-#endif
-
-
-
-
 #endif
+}