changes for optimizations
authoradash <adash>
Sat, 13 Mar 2010 01:59:42 +0000 (01:59 +0000)
committeradash <adash>
Sat, 13 Mar 2010 01:59:42 +0000 (01:59 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/readstruct.h [new file with mode: 0755]
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index dc0965a1249a8a7667561826c5976a29d90b91e7..31212ea4db39ae7a5787d9e9166c2141c26a29ee 100644 (file)
@@ -84,8 +84,9 @@
 //Prefetch tuning paramters
 //#define RETRYINTERVAL  20 //N (For Em3d, SOR, Moldyn benchmarks)
 //#define SHUTDOWNINTERVAL  3  //M
-#define RETRYINTERVAL  100 //N  (For MatrixMultiply, 2DFFT benchmarks)
+#define RETRYINTERVAL  75 //N  (For MatrixMultiply, 2DFFT benchmarks)
 #define SHUTDOWNINTERVAL  1  //M
+#define NUM_TRY_TO_COMMIT 2
 
 #include <stdlib.h>
 #include <stdio.h>
index fa17c3da5c1e1e561a2997cfdddff93e1c9c4f35..cf5a0611ea6df8eb44b2054227f4f5bc5031998c 100644 (file)
@@ -13,6 +13,7 @@
 #include "thread.h"
 #endif
 #include "gCollect.h"
+#include "readstruct.h"
 
 #ifdef RECOVERY
 #include <unistd.h>
@@ -294,7 +295,7 @@ void *dstmAccept(void *acceptfd) {
        trans_commit_data_t transinfo;
   unsigned short objType, *versionarry, version;
        unsigned int *oidarry, numoid, mid, threadid;
-  int n, v;
+    int n, v;
 
 #ifdef DEBUG
        printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
@@ -302,6 +303,7 @@ void *dstmAccept(void *acceptfd) {
        /* Receive control messages from other machines */
        while(1) {
                int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+               //int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
     dupeptr = NULL;
 
                if (ret==0)
@@ -366,6 +368,7 @@ void *dstmAccept(void *acceptfd) {
                                transinfo.modptr = NULL;
                                transinfo.numlocked = 0;
                                transinfo.numnotfound = 0;
+                               //if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
                                if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
                                        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
                                        pthread_exit(NULL);
@@ -920,6 +923,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
     return 0;
 
   /* Read modified objects */
+  //printf("fixed.sum_bytes= %d\n", fixed.sum_bytes);
   if(fixed.nummod != 0) {
     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
@@ -943,12 +947,15 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
     return 1;
   }
   ptr = (char *) modptr;
+  //printf("fixed.nummod= %d\n", fixed.nummod);
+  //fflush(stdout);
   for(i = 0 ; i < fixed.nummod; i++) {
     int tmpsize=0;
     headaddr = (objheader_t *) ptr;
     oid = OID(headaddr);
     oidmod[i] = oid;
     GETSIZE(tmpsize, headaddr);
+    //printf("i= %d, tmpsize= %d, oid= %u\n", i, tmpsize, oid);
     ptr += sizeof(objheader_t) + tmpsize;
   }
 #ifdef DEBUG
diff --git a/Robust/src/Runtime/DSTM/interface_recovery/readstruct.h b/Robust/src/Runtime/DSTM/interface_recovery/readstruct.h
new file mode 100755 (executable)
index 0000000..2cd8311
--- /dev/null
@@ -0,0 +1,22 @@
+#ifndef READSTRUCT_H
+#define READSTRUCT_H
+#define MAXBUF 1024
+struct readstruct {
+  char buf[MAXBUF];
+  int head;
+  int tail;
+};
+
+#define WMAXBUF 2048
+#define WTOP 512
+struct writestruct {
+  char buf[WMAXBUF];
+  int offset;
+};
+
+void recv_data_buf(int fd, struct readstruct *, void *buffer, int buflen);
+int recv_data_errorcode_buf(int fd, struct readstruct *, void *buffer, int buflen);
+void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen);
+void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen);
+
+#endif
index a9030df04cedf09c1b188ad414a702425d26bcb7..88dd8f6eef39781d6a47b1d51079c587a1b35a18 100644 (file)
@@ -36,6 +36,9 @@
 
 __thread objstr_t *t_cache;
 __thread struct ___Object___ *revertlist;
+__thread struct timespec exponential_backoff;
+__thread int count_exponential_backoff;
+__thread const int max_exponential_backoff = 1000; // safety limit
 #ifdef ABORTREADERS
 __thread int t_abort;
 __thread jmp_buf aborttrans;
@@ -612,6 +615,19 @@ void randomdelay() {
   return;
 }
 
+/* This functions inserts exponential backoff delays in the order of msec
+ * Mostly used when transaction commits retry*/
+void exponentialdelay() {
+  exponential_backoff.tv_nsec = exponential_backoff.tv_nsec * 2;
+  nanosleep(&exponential_backoff, NULL);
+  ++count_exponential_backoff;
+  if (count_exponential_backoff >= max_exponential_backoff) {
+    printf(" reached max_exponential_backoff at %s, %s(), %d\n", __FILE__, __func__, __LINE__);
+    exit(-1);
+  }
+  return;
+}
+
 /* This function initializes things required in the transaction start*/
 void transStart() {
   t_cache = objstrCreate(1048576);
@@ -991,14 +1007,15 @@ plistnode_t *createPiles() {
         if((mid=lhashSearch(oid)) == 0) {
             mid = myIpAddr;
         }
-                   pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+
+        pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
 
         if(numLiveHostsInSystem > 1) {
                            if(makedirty) { 
                                    STATUS(headeraddr) = DIRTY;
+                    pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
                        }
-                         pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+                         //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
         }
 #else
                // Get machine location for object id (and whether local or not)
@@ -1093,6 +1110,11 @@ int transCommit() {
   }
 #endif
 
+  int treplyretryCount = 0;
+  /* Initialize timeout for exponential delay */
+  exponential_backoff.tv_sec = 0;
+  exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+  count_exponential_backoff = 0;
   do {
     treplyretry = 0;
 
@@ -1159,21 +1181,21 @@ int transCommit() {
                                }
                                socklist[sockindex] = sd;
                                /* Send bytes of data with TRANS_REQUEST control message */
-                               //send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
-                send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
+                               send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+                //send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
 
                                /* Send list of machines involved in the transaction */
                                {
                                        int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
-                                       //send_data(sd, tosend[sockindex].listmid, size);
-                    send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
+                                       send_data(sd, tosend[sockindex].listmid, size);
+                    //send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
                                }
 
                                /* Send oids and version number tuples for objects that are read */
                                {
                                        int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
-                                       //send_data(sd, tosend[sockindex].objread, size);
-                    send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
+                                       send_data(sd, tosend[sockindex].objread, size);
+                    //send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
                                }
 
                                /* Send objects that are modified */
@@ -1186,6 +1208,7 @@ int transCommit() {
                                }
                                int offset = 0;
                                int i;
+                //printf("tosend[sockindex].f.nummod = %d\n", tosend[sockindex].f.nummod);
                                for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
                                        int size;
                                        objheader_t *headeraddr;
@@ -1197,17 +1220,20 @@ int transCommit() {
                                                return 1;
                                        }
                                        GETSIZE(size,headeraddr);
+                    //printf("i= %d, tmpsize= %d, oid= %u\n", i, size, OID(headeraddr));
                                        size+=sizeof(objheader_t);
                                        memcpy(modptr+offset, headeraddr, size);
                                        offset+=size;
                                }
-                               //send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
-                forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+                //printf("tosend[sockindex].f.sum_bytes= %d\n", tosend[sockindex].f.sum_bytes);
+                //fflush(stdout);
+                               send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+                //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
 
 #ifdef RECOVERY
         /* send transaction id, number of machine involved, machine ids */
-        //send_data(sd, &transID, sizeof(unsigned int));
-        forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
+        send_data(sd, &transID, sizeof(unsigned int));
+        //forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
 #endif
                                free(modptr);
                        } else { //handle request locally
@@ -1389,13 +1415,15 @@ int transCommit() {
       pDelete(pile_ptr);
     /* wait a random amount of time before retrying to commit transaction*/
     if(treplyretry) {
-      randomdelay();
+      treplyretryCount++;
+      //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+      //  exponentialdelay();
+      //else 
+        randomdelay();
 #ifdef TRANSSTATS
                        nSoftAbort++;
 #endif
                }
-    
-
        } while (treplyretry && deadmid != -1);
 
        if(finalResponse == TRANS_ABORT) {
@@ -2005,6 +2033,15 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
 
       //Add to remote requests
       machinenum=lhashSearch(oid);
+#ifdef RECOVERY
+    static int flipBit = 0;// Used to distribute requests between primary and backup evenly
+                           // either primary or backup machine
+    machinenum = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid));
+    flipBit ^= 1;
+#ifdef DEBUG
+    printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber));
+#endif
+#endif
       insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
     tuple:
       ;
@@ -2362,7 +2399,7 @@ unsigned short getObjType(unsigned int oid) {
 #ifdef RECOVERY
     unsigned int mid = lhashSearch(oid);
     unsigned int machineID;
-    static flipBit = 0;
+    static int flipBit = 0;
     machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid));
     int sd = getSock2(transReadSockPool, machineID);
 #else