various changes
authorbdemsky <bdemsky>
Mon, 14 Apr 2008 21:41:07 +0000 (21:41 +0000)
committerbdemsky <bdemsky>
Mon, 14 Apr 2008 21:41:07 +0000 (21:41 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/thread.c

index 56cb9627849da9613dadc4fd94f8f88059cc17ba..82e415dfa0ba9f1f5436bb32e6ee4dab1cb5a1ac 100644 (file)
@@ -264,7 +264,7 @@ void *handleLocalReq(void *);//handles Local requests
 int transComProcess(local_thread_data_array_t *);
 int transAbortProcess(local_thread_data_array_t *);
 void transAbort(transrecord_t *trans);
-
+void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
 void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
index b17319b9d48c7394afe28866400a07bd3f929eed..07dc2c8392a921bb747f6b61d1a3b67c450a8135 100644 (file)
@@ -608,15 +608,15 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
 int prefetchReq(int acceptfd) {
   int i, size, objsize, numoffset = 0;
   int length;
-  char *recvbuffer, *sendbuffer, control;
+  char *recvbuffer, control;
   unsigned int oid, mid=-1;
   objheader_t *header;
   oidmidpair_t oidmid;
   int sd = -1;
       
   while(1) {
-    recv_data((int)acceptfd, &length, sizeof(int));
-    if(length == -1) 
+    recv_data((int)acceptfd, &numoffset, sizeof(int));
+    if(numoffset == -1) 
       break;
     recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
     oid = oidmid.oid;
@@ -625,37 +625,26 @@ int prefetchReq(int acceptfd) {
        freeSockWithLock(transPResponseSocketPool, mid, sd);
       }
       mid=oidmid.mid;
-      if((sd = getSockWithLock(transPResponseSocketPool, mid)) == -1) {
-       printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
-       exit(-1);
-      }
+      sd = getSockWithLock(transPResponseSocketPool, mid);
     }
-    size = length - sizeof(int) - (2 * sizeof(unsigned int));
-    numoffset = size/sizeof(short);
     short offsetarry[numoffset];
-    recv_data((int) acceptfd, offsetarry, size);
+    recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
     
     /*Process each oid */
     if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
       /* Save the oids not found in buffer for later use */
       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-      sendbuffer = calloc(1, size);
+      char sendbuffer[size];
       *((int *) sendbuffer) = size;
       *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
       control = TRANS_PREFETCH_RESPONSE;
-      
-      if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-       printf("Error: %s() in sending prefetch response at %s, %d\n",
-              __func__, __FILE__, __LINE__);
-       close(sd);
-       return -1;
-      }
+      sendPrefetchResponse(sd, &control, sendbuffer, &size);
     } else { /* Object Found */
       int incr = 0;
       GETSIZE(objsize, header);
       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-      sendbuffer = calloc(1, size);
+      char sendbuffer[size];
       *((int *) (sendbuffer + incr)) = size;
       incr += sizeof(int);
       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
@@ -665,12 +654,7 @@ int prefetchReq(int acceptfd) {
       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
       
       control = TRANS_PREFETCH_RESPONSE;
-      if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-       printf("Error: %s() in sending prefetch response at %s, %d\n",
-              __func__, __FILE__, __LINE__);
-       close(sd);
-       return -1;
-      }
+      sendPrefetchResponse(sd, &control, sendbuffer, &size);
       
       /* Calculate the oid corresponding to the offset value */
       for(i = 0 ; i< numoffset ; i++) {
@@ -690,32 +674,19 @@ int prefetchReq(int acceptfd) {
        
        if((header = mhashSearch(oid)) == NULL) {
          size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-         if((sendbuffer = calloc(1, size)) == NULL) {
-           printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-           close(sd);
-           return -1;
-         }
+         char sendbuffer[size];
          *((int *) sendbuffer) = size;
          *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
          *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
          
          control = TRANS_PREFETCH_RESPONSE;
-         if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-           printf("Error: %s() in sending prefetch response at %s, %d\n",
-                  __FILE__, __LINE__);
-           close(sd);
-           return -1;
-         }
+         sendPrefetchResponse(sd, &control, sendbuffer, &size);
          break;
        } else {/* Obj Found */
          int incr = 0;
          GETSIZE(objsize, header);
          size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-         if((sendbuffer = calloc(1, size)) == NULL) {
-           printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
-           close(sd);
-           return -1;
-         }
+         char sendbuffer[size];
          *((int *) (sendbuffer + incr)) = size;
          incr += sizeof(int);
          *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
@@ -725,12 +696,7 @@ int prefetchReq(int acceptfd) {
          memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
          
          control = TRANS_PREFETCH_RESPONSE;
-         if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-           printf("Error: %s() in sending prefetch response at %s, %d\n",
-                  __func__, __FILE__, __LINE__);
-           close(sd);
-           return -1;
-         }
+         sendPrefetchResponse(sd, &control, sendbuffer, &size);
        }
       }
     }
@@ -742,13 +708,11 @@ int prefetchReq(int acceptfd) {
   return 0;
 }
 
-int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
        send_data(sd, control, sizeof(char));
        /* Send the buffer with its size */
        int length = *(size);
        send_data(sd, sendbuffer, length);
-       free(sendbuffer);
-       return 0;
 }
 
 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
index 046c83f9ef56431d9c5367403b5a31320c2c525e..4ffc9d254bdeee149f284af8db0febb71484c7d1 100644 (file)
@@ -1174,7 +1174,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
     char oidnoffset[len];
     char *buf=oidnoffset;
-    *((int*)buf) = len;
+    *((int*)buf) = tmp->numoffset;
     buf+=sizeof(int);
     *((unsigned int *)buf) = tmp->oid;
     buf+=sizeof(unsigned int);
@@ -1193,27 +1193,24 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
 }
 
 int getPrefetchResponse(int sd) {
-  int numbytes = 0, length = 0, size = 0;
-  char *recvbuffer, control;
+  int length = 0, size = 0;
+  char control;
   unsigned int oid;
   void *modptr, *oldptr;
   
   recv_data((int)sd, &length, sizeof(int)); 
   size = length - sizeof(int);
-  recvbuffer = calloc(1, size);
+  char recvbuffer[size];
 
   recv_data((int)sd, recvbuffer, size);
-
   control = *((char *) recvbuffer);
   if(control == OBJECT_FOUND) {
-    numbytes = 0;
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     size = size - (sizeof(char) + sizeof(unsigned int));
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
       pthread_mutex_unlock(&prefetchcache_mutex);
-      free(recvbuffer);
       return -1;
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
@@ -1226,8 +1223,6 @@ int getPrefetchResponse(int sd) {
       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
        prehashRemove(oid);
        prehashInsert(oid, modptr);
-      } else {
-       /* TODO modptr should be reference counted */
       }
     } else {/* Else add the object ptr to hash table*/
       prehashInsert(oid, modptr);
@@ -1243,14 +1238,11 @@ int getPrefetchResponse(int sd) {
     /* TODO: For each object not found query DHT for new location and retrieve the object */
     /* Throw an error */
     printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
-    free(recvbuffer);
     exit(-1);
   } else {
     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
   }
   
-  free(recvbuffer);
-  
   return 0;
 }
 
index 4dc88de3f10ac4a9fa1ff9dd6d6d4f6a35e073eb..2b388245ce0f62ca56d60029b41179801b1e9e4e 100644 (file)
@@ -142,6 +142,7 @@ transstart:
          transAbort(trans);
          return;
   } else {
+
          version = (ptr-1)->version;
          if((oidarray = calloc(1, sizeof(unsigned int))) == NULL) {
                  printf("Calloc error %s, %d\n", __FILE__, __LINE__);
@@ -174,6 +175,11 @@ transstart:
 #endif
 
 #ifdef THREADS
+void CALL01(___Thread______nativeJoin____, struct ___Thread___ * ___this___) {
+  /* This is an evil, non portable hack*/
+  pthread_join((thread_t)___this___->___threadid___, NULL);
+}
+
 void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) {
   pthread_t thread;
   int retval;
@@ -190,6 +196,8 @@ void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) {
     if (retval!=0)
       usleep(1);
   } while(retval!=0);
+  /* This next statement will likely not work on many machines */
+  ___this___->___threadid___=thread;
 
   pthread_attr_destroy(&nattr);
 }