Added checks and minor bug fix
authoradash <adash>
Sun, 2 Mar 2008 18:35:18 +0000 (18:35 +0000)
committeradash <adash>
Sun, 2 Mar 2008 18:35:18 +0000 (18:35 +0000)
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c

index 6dd69a0ab058d5587171ff1b27aeaf39fd3c8707..adfaaf17c9756878354a81ae0e387a9ccc8ecd9b 100644 (file)
@@ -1513,6 +1513,7 @@ public class BuildCode {
     public void generateInsideTransCode(FlatMethod fm, LocalityBinding lb,PrintWriter output,PrefetchPair pp,Vector oids, Vector fieldoffset,Vector endoffset, int tuplecount){
            int i,j;
            short offsetcount = 0;
+           String test = new String();
 
            Object newdesc = pp.desc.get(0);
            if(newdesc instanceof FieldDescriptor) {
@@ -1529,13 +1530,9 @@ public class BuildCode {
                            tstlbl += generateTemp(fm, id.getTempDescAt(i), lb) + "+";
                    }
                    tstlbl += id.offset.toString();
-                   output.println("   int flag_" + flagcount + "=  0;");
                    output.println("if ("+tstlbl+"< 0 || "+tstlbl+" >= "+
                                    generateTemp(fm, pp.base, lb) + "->___length___) {");
-                   output.println("    flag_" + flagcount+"  = 1;");
-                   output.println("}");
-
-                   output.println("if (flag_"+flagcount+") {");
+                   output.println("   failedboundschk();");
                    output.println("}");
 
                    TypeDescriptor elementtype = pp.base.getType().dereference();
@@ -1546,6 +1543,12 @@ public class BuildCode {
                            type=elementtype.getSafeSymbol()+" ";
 
                    String oid = new String("(unsigned int) (" + generateTemp(fm, pp.base, lb) + " != NULL ? " + "((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"] : 0)");
+
+                   /*
+                   test = "(("+tstlbl+"< 0) || ("+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___))";
+                   String oid = new String("(unsigned int) (" +genarateTemp(fm, pp.base, lb) + " != NULL ? (" +test+ " ? 0 : ((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"]) : 0);"); 
+                   */
+                   
                    oids.add(oid);
            }
 
index 379ea28731e9f60484b597aa7348a606aae6225f..b232cbef165b37769ae550b98c81158f742cabd8 100644 (file)
@@ -309,7 +309,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
                          // keep reading all objects
                sum = 0;
                do {
-                       n = recv((int)acceptfd, (void *) objread, N, 0);
+                       n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0);
                        sum += n;
                } while(sum < N && n != 0);
        }
index c4b9e45c74c0c4c868ec7204e0e1d97b6b59d31d..26e1bfc460291c1c1d50f73b1fbeb851aa86ac41 100644 (file)
@@ -50,6 +50,9 @@ prefetchqelem_t *pre_dequeue(void) {
        prefetchqelem_t *retnode;
        if (pqueue.front == NULL) {
                printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
+               if(pqueue.rear != NULL) {
+                       printf("pqueue.front points to invalid location %s, %d\n", __FILE__, __LINE__);
+               }
                return NULL;
        }
        retnode = pqueue.front;
index d678293433e7fc2701dcadb7d6aeeeb10c5d13f0..eb3b76471f38452b0833d26a3222d5c36d6defe4 100644 (file)
@@ -249,7 +249,7 @@ transrecord_t *transStart()
 
 /* This function finds the location of the objects involved in a transaction
  * and returns the pointer to the object if found in a remote location */
-objheader_t *transRead(transrecord_t *record, unsigned int oid) {      
+objheader_t *transRead(transrecord_t *record, unsigned int oid) {
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        objheader_t *objcopy;
@@ -333,7 +333,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                }
 
                /* Get the object from the remote location */
-               machinenumber = lhashSearch(oid);
+               if((machinenumber = lhashSearch(oid)) == 0) {
+                       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+                       return NULL;
+               }
+
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
@@ -629,7 +633,6 @@ void *transRequest(void *threadarg) {
        char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
        char machineip[16], retval;
 
-
        tdata = (thread_data_array_t *) threadarg;
 
        /* Send Trans Request */
@@ -656,6 +659,7 @@ void *transRequest(void *threadarg) {
                close(sd);
                pthread_exit(NULL);
        }
+
        /* Send list of machines involved in the transaction */
        {
                int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
@@ -665,15 +669,18 @@ void *transRequest(void *threadarg) {
                        pthread_exit(NULL);
                }
        }
+
        /* Send oids and version number tuples for objects that are read */
        {
                int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
+               
                if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
                        perror("Error sending tuples for thread\n");
                        close(sd);
                        pthread_exit(NULL);
                }
        }
+
        /* Send objects that are modified */
        for(i = 0; i < tdata->buffer->f.nummod ; i++) {
                int size;
@@ -1425,7 +1432,7 @@ void *transPrefetch(void *t) {
                if((qnode = pre_dequeue()) == NULL) {
                        printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
                        pthread_mutex_unlock(&pqueue.qlock);
-                       pthread_exit(NULL);
+                       continue;
                }
                pthread_mutex_unlock(&pqueue.qlock);
                                
@@ -1435,7 +1442,8 @@ void *transPrefetch(void *t) {
                /* and group requests by remote machine ids by calling the makePreGroups() */
                if((pilehead = foundLocal(qnode)) == NULL) {
                        printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
-                       pthread_exit(NULL);
+                       pre_enqueue(qnode);
+                       continue;
                }
 
                ptr = pilehead;
@@ -1480,15 +1488,16 @@ void *mcqProcess(void *threadid) {
                if((mcpilenode = mcpiledequeue()) == NULL) {
                        printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
                        pthread_mutex_unlock(&mcqueue.qlock);
-                       pthread_exit(NULL);
+                       continue;
                }
                /* Unlock mutex */
                pthread_mutex_unlock(&mcqueue.qlock);
 
                /*Initiate connection to remote host and send request */ 
                /* Process Request */
-               if(mcpilenode->mid != myIpAddr)
+               if(mcpilenode->mid != myIpAddr) {
                        sendPrefetchReq(mcpilenode);
+               }
 
                /* Deallocate the machine queue pile node */
                mcdealloc(mcpilenode);
@@ -1610,7 +1619,7 @@ void getPrefetchResponse(int count, int sd) {
                                        index += sizeof(unsigned int);
                                        /* For each object found add to Prefetch Cache */
                                        objsize = *((int *)(buffer+index));
-                                       index+=sizeof(int);
+                                       index += sizeof(int);
                                        pthread_mutex_lock(&prefetchcache_mutex);
                                        if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
                                                printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
@@ -1873,7 +1882,10 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
 
        //FIXME currently all oids belong to one machine
        oid = oidarry[0];
-       mid = lhashSearch(oid);
+       if((mid = lhashSearch(oid)) == 0) {
+               printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
+               return;
+       }
 
        if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
                perror("reqNotify():socket()");