From: bdemsky Date: Mon, 14 Apr 2008 21:41:07 +0000 (+0000) Subject: various changes X-Git-Tag: preEdgeChange~167 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=5872031b0d0e4178ce008d82e2bfd968c7ddcbf9;p=IRC.git various changes --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 56cb9627..82e415df 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -264,7 +264,7 @@ void *handleLocalReq(void *);//handles Local requests int transComProcess(local_thread_data_array_t *); int transAbortProcess(local_thread_data_array_t *); void transAbort(transrecord_t *trans); - +void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index b17319b9..07dc2c83 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -608,15 +608,15 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock int prefetchReq(int acceptfd) { int i, size, objsize, numoffset = 0; int length; - char *recvbuffer, *sendbuffer, control; + char *recvbuffer, control; unsigned int oid, mid=-1; objheader_t *header; oidmidpair_t oidmid; int sd = -1; while(1) { - recv_data((int)acceptfd, &length, sizeof(int)); - if(length == -1) + recv_data((int)acceptfd, &numoffset, sizeof(int)); + if(numoffset == -1) break; recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); oid = oidmid.oid; @@ -625,37 +625,26 @@ int prefetchReq(int acceptfd) { freeSockWithLock(transPResponseSocketPool, mid, sd); } mid=oidmid.mid; - if((sd = getSockWithLock(transPResponseSocketPool, mid)) == -1) { - printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__); - exit(-1); - } + sd = getSockWithLock(transPResponseSocketPool, mid); } - size = length - sizeof(int) - (2 * sizeof(unsigned int)); - numoffset = size/sizeof(short); short offsetarry[numoffset]; - recv_data((int) acceptfd, offsetarry, size); + recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short)); /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found in buffer for later use */ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - sendbuffer = calloc(1, size); + char sendbuffer[size]; *((int *) sendbuffer) = size; *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; control = TRANS_PREFETCH_RESPONSE; - - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } + sendPrefetchResponse(sd, &control, sendbuffer, &size); } else { /* Object Found */ int incr = 0; GETSIZE(objsize, header); size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - sendbuffer = calloc(1, size); + char sendbuffer[size]; *((int *) (sendbuffer + incr)) = size; incr += sizeof(int); *((char *)(sendbuffer + incr)) = OBJECT_FOUND; @@ -665,12 +654,7 @@ int prefetchReq(int acceptfd) { memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } + sendPrefetchResponse(sd, &control, sendbuffer, &size); /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { @@ -690,32 +674,19 @@ int prefetchReq(int acceptfd) { if((header = mhashSearch(oid)) == NULL) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - close(sd); - return -1; - } + char sendbuffer[size]; *((int *) sendbuffer) = size; *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __FILE__, __LINE__); - close(sd); - return -1; - } + sendPrefetchResponse(sd, &control, sendbuffer, &size); break; } else {/* Obj Found */ int incr = 0; GETSIZE(objsize, header); size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); - close(sd); - return -1; - } + char sendbuffer[size]; *((int *) (sendbuffer + incr)) = size; incr += sizeof(int); *((char *)(sendbuffer + incr)) = OBJECT_FOUND; @@ -725,12 +696,7 @@ int prefetchReq(int acceptfd) { memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } + sendPrefetchResponse(sd, &control, sendbuffer, &size); } } } @@ -742,13 +708,11 @@ int prefetchReq(int acceptfd) { return 0; } -int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { +void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { send_data(sd, control, sizeof(char)); /* Send the buffer with its size */ int length = *(size); send_data(sd, sendbuffer, length); - free(sendbuffer); - return 0; } void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) { diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 046c83f9..4ffc9d25 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -1174,7 +1174,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); char oidnoffset[len]; char *buf=oidnoffset; - *((int*)buf) = len; + *((int*)buf) = tmp->numoffset; buf+=sizeof(int); *((unsigned int *)buf) = tmp->oid; buf+=sizeof(unsigned int); @@ -1193,27 +1193,24 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { } int getPrefetchResponse(int sd) { - int numbytes = 0, length = 0, size = 0; - char *recvbuffer, control; + int length = 0, size = 0; + char control; unsigned int oid; void *modptr, *oldptr; recv_data((int)sd, &length, sizeof(int)); size = length - sizeof(int); - recvbuffer = calloc(1, size); + char recvbuffer[size]; recv_data((int)sd, recvbuffer, size); - control = *((char *) recvbuffer); if(control == OBJECT_FOUND) { - numbytes = 0; oid = *((unsigned int *)(recvbuffer + sizeof(char))); size = size - (sizeof(char) + sizeof(unsigned int)); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&prefetchcache_mutex); - free(recvbuffer); return -1; } pthread_mutex_unlock(&prefetchcache_mutex); @@ -1226,8 +1223,6 @@ int getPrefetchResponse(int sd) { if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { prehashRemove(oid); prehashInsert(oid, modptr); - } else { - /* TODO modptr should be reference counted */ } } else {/* Else add the object ptr to hash table*/ prehashInsert(oid, modptr); @@ -1243,14 +1238,11 @@ int getPrefetchResponse(int sd) { /* TODO: For each object not found query DHT for new location and retrieve the object */ /* Throw an error */ printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); - free(recvbuffer); exit(-1); } else { printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); } - free(recvbuffer); - return 0; } diff --git a/Robust/src/Runtime/thread.c b/Robust/src/Runtime/thread.c index 4dc88de3..2b388245 100644 --- a/Robust/src/Runtime/thread.c +++ b/Robust/src/Runtime/thread.c @@ -142,6 +142,7 @@ transstart: transAbort(trans); return; } else { + version = (ptr-1)->version; if((oidarray = calloc(1, sizeof(unsigned int))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); @@ -174,6 +175,11 @@ transstart: #endif #ifdef THREADS +void CALL01(___Thread______nativeJoin____, struct ___Thread___ * ___this___) { + /* This is an evil, non portable hack*/ + pthread_join((thread_t)___this___->___threadid___, NULL); +} + void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) { pthread_t thread; int retval; @@ -190,6 +196,8 @@ void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) { if (retval!=0) usleep(1); } while(retval!=0); + /* This next statement will likely not work on many machines */ + ___this___->___threadid___=thread; pthread_attr_destroy(&nattr); }