From 79e07b7ff2db144f637fbbae6c75da437e3cc1ec Mon Sep 17 00:00:00 2001 From: bdemsky Date: Tue, 22 Sep 2009 10:34:04 +0000 Subject: [PATCH] changes --- .../DSTM/interface/addPrefetchEnhance.c | 1 + Robust/src/Runtime/DSTM/interface/dstm.h | 7 +- .../src/Runtime/DSTM/interface/dstmserver.c | 93 +++++----- Robust/src/Runtime/DSTM/interface/prefetch.c | 14 +- Robust/src/Runtime/DSTM/interface/prefetch.h | 5 +- Robust/src/Runtime/DSTM/interface/queue.c | 17 +- Robust/src/Runtime/DSTM/interface/trans.c | 162 ++++++++++++++++-- 7 files changed, 224 insertions(+), 75 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c index ca7c845a..c3cc8bd1 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c @@ -51,6 +51,7 @@ void handleDynPrefetching(int numLocal, int ntuples, int siteid) { if(getOperationMode(siteid) != 0) { evalPrefetch[siteid].uselesscount--; if(evalPrefetch[siteid].uselesscount <= 0) { + printf("O"); evalPrefetch[siteid].operMode = 0; } } diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 9a993d88..983405db 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -82,6 +82,7 @@ #include #include "plookup.h" #include "dsmdebug.h" +#include "readstruct.h" #ifdef ABORTREADERS #include #endif @@ -234,8 +235,8 @@ void clearObjStore(); // TODO:currently only clears the prefetch cache object st void *dstmListen(void *); int startlistening(); void *dstmAccept(void *); -int readClientReq(trans_commit_data_t *, int); -int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int); +int readClientReq(trans_commit_data_t *, int, struct readstruct * readbuffer); +int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int, struct readstruct *); char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int); char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); @@ -292,7 +293,7 @@ int checkoid(unsigned int oid); int transPrefetchProcess(int **, short); void sendPrefetchReq(prefetchpile_t*, int); void sendPrefetchReqnew(prefetchpile_t*, int); -int getPrefetchResponse(int); +int getPrefetchResponse(int, struct readstruct *); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index ca40fac2..edfbb4e8 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -12,6 +12,7 @@ #include "thread.h" #endif #include "gCollect.h" +#include "readstruct.h" #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 @@ -130,10 +131,13 @@ void *dstmAccept(void *acceptfd) { trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; + struct readstruct readbuffer; + readbuffer.head=0; + readbuffer.tail=0; /* Receive control messages from other machines */ while(1) { - int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); + int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char)); if (ret==0) break; if (ret==-1) { @@ -143,7 +147,7 @@ void *dstmAccept(void *acceptfd) { switch(control) { case READ_REQUEST: /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int)); while((srcObj = mhashSearch(oid)) == NULL) { int ret; if((ret = sched_yield()) != 0) { @@ -182,7 +186,7 @@ void *dstmAccept(void *acceptfd) { transinfo.modptr = NULL; transinfo.numlocked = 0; transinfo.numnotfound = 0; - if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { + if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) { printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); } @@ -190,12 +194,12 @@ void *dstmAccept(void *acceptfd) { case TRANS_PREFETCH: #ifdef RANGEPREFETCH - if((val = rangePrefetchReq((int)acceptfd)) != 0) { + if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) { printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__); break; } #else - if((val = prefetchReq((int)acceptfd)) != 0) { + if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) { printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); break; } @@ -204,12 +208,12 @@ void *dstmAccept(void *acceptfd) { case TRANS_PREFETCH_RESPONSE: #ifdef RANGEPREFETCH - if((val = getRangePrefetchResponse((int)acceptfd)) != 0) { + if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) { printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__); break; } #else - if((val = getPrefetchResponse((int) acceptfd)) != 0) { + if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) { printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); break; } @@ -217,20 +221,20 @@ void *dstmAccept(void *acceptfd) { break; case START_REMOTE_THREAD: - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int)); objType = getObjType(oid); startDSMthread(oid, objType); break; case THREAD_NOTIFY_REQUEST: - recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); + recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int)); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); pthread_exit(NULL); } - recv_data((int)acceptfd, buffer, size); + recv_data_buf((int)acceptfd, &readbuffer, buffer, size); oidarry = calloc(numoid, sizeof(unsigned int)); memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); @@ -243,7 +247,6 @@ void *dstmAccept(void *acceptfd) { threadid = *((unsigned int *)(buffer+size)); processReqNotify(numoid, oidarry, versionarry, mid, threadid); free(buffer); - break; case THREAD_NOTIFY_RESPONSE: @@ -253,7 +256,7 @@ void *dstmAccept(void *acceptfd) { pthread_exit(NULL); } - recv_data((int)acceptfd, buffer, size); + recv_data_buf((int)acceptfd, &readbuffer, buffer, size); oid = *((unsigned int *)buffer); size = sizeof(unsigned int); @@ -281,7 +284,7 @@ closeconnection: /* This function reads the information available in a transaction request * and makes a function call to process the request */ -int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { +int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) { char *ptr; void *modptr; unsigned int *oidmod, oid; @@ -295,14 +298,14 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { size = sizeof(fixed) - 1; ptr = (char *)&fixed;; fixed.control = TRANS_REQUEST; - recv_data((int)acceptfd, ptr+1, size); + recv_data_buf((int)acceptfd, readbuffer, ptr+1, size); /* Read list of mids */ int mcount = fixed.mcount; size = mcount * sizeof(unsigned int); unsigned int listmid[mcount]; ptr = (char *) listmid; - recv_data((int)acceptfd, ptr, size); + recv_data_buf((int)acceptfd, readbuffer, ptr, size); /* Read oid and version tuples for those objects that are not modified in the transaction */ int numread = fixed.numread; @@ -310,7 +313,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { char objread[size]; if(numread != 0) { //If pile contains more than one object to be read, // keep reading all objects - recv_data((int)acceptfd, objread, size); + recv_data_buf((int)acceptfd, readbuffer, objread, size); } /* Read modified objects */ @@ -320,7 +323,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { return 1; } size = fixed.sum_bytes; - recv_data((int)acceptfd, modptr, size); + recv_data_buf((int)acceptfd, readbuffer, modptr, size); } /* Create an array of oids for modified objects */ @@ -340,7 +343,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { } /*Process the information read */ - if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) { + if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) { printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__); /* Free resources */ if(oidmod != NULL) { @@ -361,7 +364,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { * function and sends a reply to the co-ordinator. * Following this it also receives a new control message from the co-ordinator and processes this message*/ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, - unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) { + unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) { char control, sendctrl, retval; objheader_t *tmp_header; @@ -374,7 +377,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, return 1; } - recv_data((int)acceptfd, &control, sizeof(char)); + recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char)); /* Process the new control message */ switch(control) { case TRANS_ABORT: @@ -736,7 +739,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * If objects are not found then record those and if objects are found * then use offset values to prefetch references to other objects */ -int prefetchReq(int acceptfd) { +int prefetchReq(int acceptfd, struct readstruct * readbuffer) { int i, size, objsize, numoffset = 0; int length; char *recvbuffer, control; @@ -746,10 +749,10 @@ int prefetchReq(int acceptfd) { int sd = -1; while(1) { - recv_data((int)acceptfd, &numoffset, sizeof(int)); + recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int)); if(numoffset == -1) break; - recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); + recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int)); oid = oidmid.oid; if (mid != oidmid.mid) { if (mid!=-1) { @@ -759,23 +762,24 @@ int prefetchReq(int acceptfd) { sd = getSockWithLock(transPResponseSocketPool, mid); } short offsetarry[numoffset]; - recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short)); + recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short)); /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */ /* Save the oids not found in buffer for later use */ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - char sendbuffer[size]; - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; + *((int *) (sendbuffer+sizeof(char))) = size; + *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid; + send_data(sd, sendbuffer, size+1); } else { /* Object Found */ - int incr = 0; + int incr = 1; GETSIZE(objsize, header); size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - char sendbuffer[size]; + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *)(sendbuffer + incr)) = size; incr += sizeof(int); *((char *)(sendbuffer + incr)) = OBJECT_FOUND; @@ -783,9 +787,7 @@ int prefetchReq(int acceptfd) { *((unsigned int *)(sendbuffer+incr)) = oid; incr += sizeof(unsigned int); memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + send_data(sd, sendbuffer, size+1); /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { @@ -809,19 +811,20 @@ int prefetchReq(int acceptfd) { if((header = mhashSearch(oid)) == NULL) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - char sendbuffer[size]; - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; + *((int *) (sendbuffer+1)) = size; + *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid; - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + send_data(sd, sendbuffer, size+1); break; } else { /* Obj Found */ - int incr = 0; + int incr = 1; GETSIZE(objsize, header); size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - char sendbuffer[size]; + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *)(sendbuffer + incr)) = size; incr += sizeof(int); *((char *)(sendbuffer + incr)) = OBJECT_FOUND; @@ -829,9 +832,7 @@ int prefetchReq(int acceptfd) { *((unsigned int *)(sendbuffer+incr)) = oid; incr += sizeof(unsigned int); memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + send_data(sd, sendbuffer, size+1); } } //end of for } diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.c b/Robust/src/Runtime/DSTM/interface/prefetch.c index de522845..956dd000 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.c +++ b/Robust/src/Runtime/DSTM/interface/prefetch.c @@ -331,12 +331,12 @@ void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd, unsigned int return; } -int getRangePrefetchResponse(int sd) { +int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) { int length = 0; - recv_data(sd, &length, sizeof(int)); + recv_data_buf(sd, readbuffer, &length, sizeof(int)); int size = length - sizeof(int); char recvbuffer[size]; - recv_data(sd, recvbuffer, size); + recv_data_buf(sd, readbuffer, recvbuffer, size); char control = *((char *) recvbuffer); unsigned int oid; if(control == OBJECT_FOUND) { @@ -390,16 +390,16 @@ int getRangePrefetchResponse(int sd) { return 0; } -int rangePrefetchReq(int acceptfd) { +int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer) { int numoffset, sd = -1; unsigned int baseoid, mid = -1; oidmidpair_t oidmid; while (1) { - recv_data(acceptfd, &numoffset, sizeof(int)); + recv_data_buf(acceptfd, readbuffer, &numoffset, sizeof(int)); if(numoffset == -1) break; - recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int)); + recv_data_buf(acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int)); baseoid = oidmid.oid; if(mid != oidmid.mid) { if(mid!= -1) @@ -408,7 +408,7 @@ int rangePrefetchReq(int acceptfd) { sd = getSockWithLock(transPResponseSocketPool, mid); } short offsetsarry[numoffset]; - recv_data(acceptfd, offsetsarry, numoffset*sizeof(short)); + recv_data_buf(acceptfd, readbuffer, offsetsarry, numoffset*sizeof(short)); perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset); diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.h b/Robust/src/Runtime/DSTM/interface/prefetch.h index 08697d57..3e1c2dd9 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.h +++ b/Robust/src/Runtime/DSTM/interface/prefetch.h @@ -2,6 +2,7 @@ #define _PREFETCH_H_ #include "queue.h" #include "dstm.h" +#include "readstruct.h" #define GET_STRIDE(x) ((x & 0x7000) >> 12) #define GET_RANGE(x) (x & 0x0fff) @@ -54,9 +55,9 @@ void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **); /******** Sending and Receiving Prefetches *******/ void sendRangePrefetchReq(perMcPrefetchList_t *, int sd, unsigned int mid); -int rangePrefetchReq(int acceptfd); +int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer); int processOidFound(objheader_t *, short *, int, int, int); -int getRangePrefetchResponse(int sd); +int getRangePrefetchResponse(int sd, struct readstruct *); INLINE objheader_t *searchObj(unsigned int); diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index fcb58191..8ebff808 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -8,6 +8,13 @@ pthread_cond_t qcond; #define QSIZE 2048 //2 KB +extern char bigarray[16*1024*1024]; +extern int bigindex; +#define LOGEVENT(x) { \ + int tmp=bigindex++; \ + bigarray[tmp]=x; \ + } + void queueInit(void) { /* Intitialize primary queue */ headoffset=0; @@ -58,10 +65,12 @@ void movehead(int size) { void * gettail() { while(tailoffset==headoffset) { //Sleep - // pthread_mutex_lock(&qlock); - // if (tailoffset==headoffset) - // pthread_cond_wait(&qcond, &qlock); - // pthread_mutex_unlock(&qlock); + LOGEVENT('T'); + pthread_mutex_lock(&qlock); + if (tailoffset==headoffset) + pthread_cond_wait(&qcond, &qlock); + pthread_mutex_unlock(&qlock); + LOGEVENT('W'); } if (*((int *)(memory+tailoffset))==-1) { tailoffset=0; //do loop diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d59cb23a..36465abb 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -79,6 +79,14 @@ void printhex(unsigned char *, int); plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); +char bigarray[16*1024*1024]; +int bigindex=0; +#define LOGEVENT(x) { \ + int tmp=bigindex++; \ + bigarray[tmp]=x; \ + } + + /******************************* * Send and Recv function calls *******************************/ @@ -98,6 +106,87 @@ void send_data(int fd, void *buf, int buflen) { } } +void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) { + char *buf=(char *)buffer; + + int numbytes=readbuffer->head-readbuffer->tail; + if (numbytes>buflen) + numbytes=buflen; + if (numbytes>0) { + memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes); + readbuffer->tail+=numbytes; + buflen-=numbytes; + buf+=numbytes; + if (buflen==0) { + return; + } + } + + if (buflen>=MAXBUF) { + recv_data(fd, buf, buflen); + return; + } + + int maxbuf=MAXBUF; + int obufflen=buflen; + readbuffer->head=0; + + while (buflen > 0) { + int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0); + if (numbytes == -1) { + perror("recv"); + exit(0); + } + buflen-=numbytes; + readbuffer->head+=numbytes; + maxbuf-=numbytes; + } + memcpy(buf,readbuffer->buf,obufflen); + readbuffer->tail=obufflen; +} + +int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) { + char *buf=(char *)buffer; + //now tail<=head + int numbytes=readbuffer->head-readbuffer->tail; + if (numbytes>buflen) + numbytes=buflen; + if (numbytes>0) { + memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes); + readbuffer->tail+=numbytes; + buflen-=numbytes; + buf+=numbytes; + if (buflen==0) + return 1; + } + + if (buflen>=MAXBUF) { + return recv_data_errorcode(fd, buf, buflen); + } + + int maxbuf=MAXBUF; + int obufflen=buflen; + readbuffer->head=0; + + while (buflen > 0) { + int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0); + if (numbytes ==0) { + return 0; + } + if (numbytes==-1) { + perror("recvbuf"); + return -1; + } + buflen-=numbytes; + readbuffer->head+=numbytes; + maxbuf-=numbytes; + } + memcpy(buf,readbuffer->buf,obufflen); + readbuffer->tail=obufflen; + return 1; +} + + void recv_data(int fd, void *buf, int buflen) { char *buffer = (char *)(buf); int size = buflen; @@ -168,11 +257,17 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof /* Allocate for the queue node*/ int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); int len; - char * node= getmemory(qnodesize); +#ifdef INLINEPREFETCH + char node[qnodesize]; +#else + char *node=getmemory(qnodesize); +#endif int top=endoffsets[ntuples-1]; - if (node==NULL) + if (node==NULL) { + LOGEVENT('D'); return; + } /* Set queue node values */ /* TODO: Remove this after testing */ @@ -185,8 +280,30 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short)); memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short)); +#ifdef INLINEPREFETCH + prefetchpile_t *pilehead = foundLocal(node); + + if (pilehead!=NULL) { + // Get sock from shared pool + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + /* Release socket */ + // freeSock(transPrefetchSockPool, pilehead->mid, sd); + + /* Deallocated pilehead */ + mcdealloc(pilehead); + } +#else /* Lock and insert into primary prefetch queue */ movehead(qnodesize); +#endif } /* This function starts up the transaction runtime. */ @@ -302,9 +419,11 @@ void transInit() { retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL); } while(retval!=0); #else +#ifndef INLINEPREFETCH do { retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); } while(retval!=0); +#endif #endif pthread_detach(tPrefetch); #endif @@ -523,6 +642,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { #ifdef CACHE if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { #ifdef TRANSSTATS + LOGEVENT('P') nprehashSearch++; #endif /* Look up in prefetch cache */ @@ -551,6 +671,8 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { return NULL; } else { #ifdef TRANSSTATS + + LOGEVENT('R'); nRemoteSend++; #endif #ifdef COMPILER @@ -662,6 +784,13 @@ int transCommit() { trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */ char finalResponse; +#ifdef TRANSSTATS + int iii; + for(iii=0;iiiobjpiles; while(tmp != NULL) { len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); - char oidnoffset[len]; + char oidnoffset[len+5]; char *buf=oidnoffset; + if (first) { + *buf=TRANS_PREFETCH; + buf++;len++; + first=0; + } *((int*)buf) = tmp->numoffset; buf+=sizeof(int); *((unsigned int *)buf) = tmp->oid; @@ -1476,30 +1610,32 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { *((unsigned int *)buf) = myIpAddr; buf += sizeof(unsigned int); memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short)); - send_data(sd, oidnoffset, len); tmp = tmp->next; + if (tmp==NULL) { + *((int *)(&oidnoffset[len]))=-1; + len+=sizeof(int); + } + send_data(sd, oidnoffset, len); } - /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ - endpair = -1; - send_data(sd, &endpair, sizeof(int)); - + LOGEVENT('S'); return; } -int getPrefetchResponse(int sd) { +int getPrefetchResponse(int sd, struct readstruct *readbuffer) { int length = 0, size = 0; char control; unsigned int oid; void *modptr, *oldptr; - recv_data((int)sd, &length, sizeof(int)); + recv_data_buf(sd, readbuffer, &length, sizeof(int)); size = length - sizeof(int); char recvbuffer[size]; #ifdef TRANSSTATS getResponse++; + LOGEVENT('Z'); #endif - recv_data((int)sd, recvbuffer, size); + recv_data_buf(sd, readbuffer, recvbuffer, size); control = *((char *) recvbuffer); if(control == OBJECT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); -- 2.34.1