changes to improve commit intensive benchmarks and combine invalid messages
authoradash <adash>
Mon, 15 Feb 2010 07:44:04 +0000 (07:44 +0000)
committeradash <adash>
Mon, 15 Feb 2010 07:44:04 +0000 (07:44 +0000)
and commit messages for speedup

Robust/src/Runtime/DSTM/interface/addUdpEnhance.c
Robust/src/Runtime/DSTM/interface/addUdpEnhance.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/sandbox.c
Robust/src/Runtime/DSTM/interface/trans.c

index adaf6671242d68675c7ec18c428122d3c0640447..71bc6baf19c8632590f9ca43849ca866b7a600ab 100644 (file)
@@ -83,7 +83,6 @@ void *udpListenBroadcast(void *sockfd) {
 
   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
 
-  memset(readBuffer, 0, MAX_SIZE);
   while(1) {
     int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
     if(bytesRcvd == -1) {
@@ -113,76 +112,72 @@ void *udpListenBroadcast(void *sockfd) {
 /* Function that invalidate objects that
  * have been currently modified
  * returns -1 on error and 0 on success */
-int invalidateObj(trans_req_data_t *tdata) {
+int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) {
+  struct timeval start, end;
   struct sockaddr_in clientaddr;
   int retval;
-
+  int i;
+  int nummod=0;
+  for(i=0;i<pilecount;i++) {
+    nummod+=tdata[i].f.nummod;
+  }
   bzero(&clientaddr, sizeof(clientaddr));
   clientaddr.sin_family = AF_INET;
   clientaddr.sin_port = htons(UDP_PORT);
   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
-  if(tdata->f.nummod < maxObjsPerMsg) {
-    /* send single udp msg */
-    int iteration = 0;
-    if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
-      printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
-      return -1;
-    }
-  } else {
-    /* Split into several udp msgs */
-    int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg;
-    if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++;
-    int i;
-    for(i = 1; i <= maxUdpMsg; i++) {
-      if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
-       printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
-       return -1;
-      }
-    }
+  /* send single udp msg */
+  if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
+    printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+    return -1;
+  }
+  /* Send trans commit or abort message when prefetching or caching */
+  for(i=0;i<pilecount;i++){
+    if(socklist[i] !=0)
+      send_data(socklist[i], &finalresponse, sizeof(char));
   }
   return 0;
 }
 
 /* Function sends a udp broadcast, also distinguishes
- * msg size to be sent based on the iteration flag
+ * msg size to be sent based on the total number of objects modified
  * returns -1 on error and 0 on success */
-int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
+int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) {
   char writeBuffer[MAX_SIZE];
   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
   int offset = 0;
+  int i=0,j=0;
+
   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
   offset += sizeof(short);
   *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
   offset += sizeof(unsigned int);
-  if(iteration == 0) { // iteration flag == zero, send single udp msg
-    *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod));  //sizeof msg
-    offset += sizeof(short);
-    int i;
-    for(i = 0; i < tdata->f.nummod; i++) {
-      *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i];  //copy objects
-      offset += sizeof(unsigned int);
+
+  while(nummod>0) {
+    int numtosend=nummod>maxObjsPerMsg?maxObjsPerMsg:nummod;
+    int localoffset=offset;
+    int sentmsgs=0;
+    *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend);
+    localoffset += sizeof(short);
+
+    for(; j < pilecount; j++) {
+      for(; i < tdata[j].f.nummod; i++) {
+        *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i];  //copy objects
+        localoffset += sizeof(unsigned int);
+        if ((++sentmsgs)==numtosend) {
+          i++;
+          goto send;
+        }
+      }
+      i=0;
     }
-  } else { // iteration flag > zero, send multiple udp msg
-    int numObj;
-    if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0)
-      numObj = maxObjsPerMsg;
-    else
-      numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg);
-    *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
-    offset += sizeof(short);
-    int index = (iteration - 1) * maxObjsPerMsg;
-    int i;
-    for(i = 0; i < numObj; i++) {
-      *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i];
-      offset += sizeof(unsigned int);
+send:
+    if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) {
+      perror("sendto error- ");
+      printf("DEBUG-> sendto error: errorno %d\n", errno);
+      return -1;
     }
-  }
-  int n;
-  if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
-    perror("sendto error- ");
-    printf("DEBUG-> sendto error: errorno %d\n", errno);
-    return -1;
+    nummod= nummod - numtosend;
   }
   return 0;
 }
@@ -218,3 +213,4 @@ int invalidateFromPrefetchCache(char *buffer) {
   }
   return 0;
 }
+
index 5011df313efd126537009b3e1c9b637d347188dd..38cca125e64a0d21a0ddccece5c23dc246f46f87 100644 (file)
@@ -21,7 +21,7 @@
 int createUdpSocket();
 int udpInit();
 void *udpListenBroadcast(void *);
-int invalidateObj(trans_req_data_t *);
+int invalidateObj(trans_req_data_t *, int, char, int*);
 int invalidateFromPrefetchCache(char *);
-int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int);
+int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*);
 #endif
index 40554000b6307f6eff36d58850e0fca41822efc8..62de32aadc15468465e929fd376bebb43013869d 100644 (file)
@@ -472,7 +472,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     break;
 
   default:
-    printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
+    printf("Error: No response to TRANS_AGREE OR DISAGREE protocol  control = %d %s, %d\n", control, __FILE__, __LINE__);
     //TODO Use fixed.trans_id  TID since Client may have died
     break;
   }
@@ -814,7 +814,6 @@ void processVerNoMatch(unsigned int *oidnotfound,
     }
     /* Condition to send TRANS_SOFT_ABORT */
     if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
-    //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
       control = TRANS_SOFT_ABORT;
 
       /* Send control message */
index 29e1bbd657d4ea344ed0410e92924d32e0befcb2..301910f76984a1a0745f33257419dacb317da366 100644 (file)
@@ -48,36 +48,31 @@ int checktrans() {
         break;
       objheader_t *headeraddr=(objheader_t*) curr->val;
       unsigned int machinenum;
-      if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
-        machinenum = myIpAddr;
-      } else if ((machinenum = lhashSearch(curr->key)) == 0) {
-        printf("Error: No such machine %s, %d\n", __func__, __LINE__);
-        return 1;
-      }
-      if(machinenum != myIpAddr)
+      objheader_t *tmp;
+      
+      if (STATUS(headeraddr) & NEW) {
+       //new objects cannot be stale
+      } else if ((tmp=mhashSearch(curr->key)) != NULL) {
+       if (tmp->version!=headeraddr->version) {
+         //version mismatch
+         deletehead(head);
+         return 1; //return 1 when objects are inconsistent
+       }
+      } else {
+       machinenum = lhashSearch(curr->key);
         head = createList(head, headeraddr, machinenum, c_numelements);
+      }
+
       curr = curr->next;
     }
   }
   /* Send oid and versions for checking */
-  int retval=-1;
-  if(head != NULL) {
-    retval = verify(head);
-  }
-
-  if(retval == 1) { //consistent objects
-    /* free head */
-    deletehead(head);
+  if(head == NULL) 
     return 0;
-  }
-
-  if(retval == 0) {
-    /* free head */
-    deletehead(head);
-    return 1; //return 1 when objects are inconsistent
-  }
-
-  return 0; 
+  
+  int retval = verify(head);
+  deletehead(head);
+  return retval==0;
 }
 
 nodeElem_t * createList(nodeElem_t *head, objheader_t *headeraddr, unsigned int mid,
index 99daaeb9ab782e807547db2b1c6eb38d9bbf10c5..eaf70309f6788047b22ec76e2e08a4aeb4affa53 100644 (file)
@@ -121,8 +121,6 @@ void printhex(unsigned char *, int);
 plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
-
-
 /*******************************
 * Send and Recv function calls
 *******************************/
@@ -935,6 +933,7 @@ plistnode_t *createPiles() {
  * Sends a transrequest() to each remote machines for objects found remotely
  * and calls handleLocalReq() to process objects found locally */
 int transCommit() {
+  char buffer[30];
   unsigned int tot_bytes_mod, *listmid;
   plistnode_t *pile, *pile_ptr;
   char treplyretry; /* keeps track of the common response that needs to be sent */
@@ -944,6 +943,8 @@ int transCommit() {
 #ifdef SANDBOX
   abortenabled=0;
 #endif
+  struct writestruct writebuffer;
+  writebuffer.offset=0;
 
 #ifdef LOGEVENTS
   int iii;
@@ -1031,18 +1032,18 @@ int transCommit() {
        }
        socklist[sockindex] = sd;
        /* Send bytes of data with TRANS_REQUEST control message */
-       send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+       send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
 
        /* Send list of machines involved in the transaction */
        {
          int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
-         send_data(sd, tosend[sockindex].listmid, size);
+         send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
        }
 
        /* Send oids and version number tuples for objects that are read */
        {
          int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
-         send_data(sd, tosend[sockindex].objread, size);
+         send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
        }
 
        /* Send objects that are modified */
@@ -1070,7 +1071,7 @@ int transCommit() {
          memcpy(modptr+offset, headeraddr, size);
          offset+=size;
        }
-       send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+       forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
        free(modptr);
       } else { //handle request locally
        handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
@@ -1078,6 +1079,7 @@ int transCommit() {
       sockindex++;
       pile = pile->next;
     } //end of pile processing
+
       /* Recv Ctrl msgs from all machines */
     int i;
     for(i = 0; i < pilecount; i++) {
@@ -1123,6 +1125,7 @@ int transCommit() {
 #endif
       }
     }
+
     /* Decide the final response */
     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
@@ -1145,17 +1148,6 @@ int transCommit() {
            free(listmid);
            return 1;
          }
-
-
-         /* Invalidate objects in other machine cache */
-         if(tosend[i].f.nummod > 0) {
-           if((retval = invalidateObj(&(tosend[i]))) != 0) {
-             printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-             free(tosend);
-             free(listmid);
-             return 1;
-           }
-         }
 #ifdef ABORTREADERS
          removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
          removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
@@ -1168,7 +1160,9 @@ int transCommit() {
        }
 #endif
 #endif
-       send_data(sd, &finalResponse, sizeof(char));
+#ifndef CACHE
+    send_data(sd, &finalResponse, sizeof(char));
+#endif
       } else {
        /* Complete local processing */
        doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
@@ -1184,6 +1178,18 @@ int transCommit() {
       }
     }
 
+#ifdef CACHE
+    {
+      /* Invalidate objects in other machine cache */
+      int retval;
+      if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+       free(tosend);
+       free(listmid);
+       return 1;
+      }
+    }
+#endif
     /* Free resources */
     free(tosend);
     free(listmid);
@@ -1195,7 +1201,7 @@ int transCommit() {
       if(treplyretryCount >= NUM_TRY_TO_COMMIT)
         exponentialdelay();
       else 
-        randomdelay();
+      randomdelay();
 #ifdef TRANSSTATS
       nSoftAbort++;
 #endif
@@ -1203,9 +1209,6 @@ int transCommit() {
     /* Retry trans commit procedure during soft_abort case */
   } while (treplyretry);
 
-  exponential_backoff.tv_sec = 0;
-  exponential_backoff.tv_nsec = (long)(10000);//10 microsec_
-
   if(finalResponse == TRANS_ABORT) {
 #ifdef TRANSSTATS
     LOGEVENT('A');
@@ -1283,7 +1286,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
   transinfo->modptr = NULL;
   transinfo->numlocked = numoidlocked;
   transinfo->numnotfound = numoidnotfound;
-
+  
   /* Condition to send TRANS_AGREE */
   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
     *getReplyCtrl = TRANS_AGREE;
@@ -1302,16 +1305,6 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
       return;
     }
   } else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
-    /* Invalidate objects in other machine cache */
-    if(tdata->f.nummod > 0) {
-      int retval;
-      if((retval = invalidateObj(tdata)) != 0) {
-       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-       return;
-      }
-    }
-#endif
     if(transComProcess(tdata, transinfo) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
       fflush(stdout);