From 3935733041685945de3c448af2c5b82fdcf0438a Mon Sep 17 00:00:00 2001 From: adash Date: Sun, 2 Mar 2008 18:35:18 +0000 Subject: [PATCH] Added checks and minor bug fix --- Robust/src/IR/Flat/BuildCode.java | 13 ++++---- .../src/Runtime/DSTM/interface/dstmserver.c | 2 +- Robust/src/Runtime/DSTM/interface/queue.c | 3 ++ Robust/src/Runtime/DSTM/interface/trans.c | 30 +++++++++++++------ 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index 6dd69a0a..adfaaf17 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -1513,6 +1513,7 @@ public class BuildCode { public void generateInsideTransCode(FlatMethod fm, LocalityBinding lb,PrintWriter output,PrefetchPair pp,Vector oids, Vector fieldoffset,Vector endoffset, int tuplecount){ int i,j; short offsetcount = 0; + String test = new String(); Object newdesc = pp.desc.get(0); if(newdesc instanceof FieldDescriptor) { @@ -1529,13 +1530,9 @@ public class BuildCode { tstlbl += generateTemp(fm, id.getTempDescAt(i), lb) + "+"; } tstlbl += id.offset.toString(); - output.println(" int flag_" + flagcount + "= 0;"); output.println("if ("+tstlbl+"< 0 || "+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___) {"); - output.println(" flag_" + flagcount+" = 1;"); - output.println("}"); - - output.println("if (flag_"+flagcount+") {"); + output.println(" failedboundschk();"); output.println("}"); TypeDescriptor elementtype = pp.base.getType().dereference(); @@ -1546,6 +1543,12 @@ public class BuildCode { type=elementtype.getSafeSymbol()+" "; String oid = new String("(unsigned int) (" + generateTemp(fm, pp.base, lb) + " != NULL ? " + "((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"] : 0)"); + + /* + test = "(("+tstlbl+"< 0) || ("+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___))"; + String oid = new String("(unsigned int) (" +genarateTemp(fm, pp.base, lb) + " != NULL ? (" +test+ " ? 0 : ((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"]) : 0);"); + */ + oids.add(oid); } diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 379ea287..b232cbef 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -309,7 +309,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { // keep reading all objects sum = 0; do { - n = recv((int)acceptfd, (void *) objread, N, 0); + n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0); sum += n; } while(sum < N && n != 0); } diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index c4b9e45c..26e1bfc4 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -50,6 +50,9 @@ prefetchqelem_t *pre_dequeue(void) { prefetchqelem_t *retnode; if (pqueue.front == NULL) { printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); + if(pqueue.rear != NULL) { + printf("pqueue.front points to invalid location %s, %d\n", __FILE__, __LINE__); + } return NULL; } retnode = pqueue.front; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d6782934..eb3b7647 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -249,7 +249,7 @@ transrecord_t *transStart() /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ -objheader_t *transRead(transrecord_t *record, unsigned int oid) { +objheader_t *transRead(transrecord_t *record, unsigned int oid) { unsigned int machinenumber; objheader_t *tmp, *objheader; objheader_t *objcopy; @@ -333,7 +333,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { } /* Get the object from the remote location */ - machinenumber = lhashSearch(oid); + if((machinenumber = lhashSearch(oid)) == 0) { + printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); + return NULL; + } + objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); @@ -629,7 +633,6 @@ void *transRequest(void *threadarg) { char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; char machineip[16], retval; - tdata = (thread_data_array_t *) threadarg; /* Send Trans Request */ @@ -656,6 +659,7 @@ void *transRequest(void *threadarg) { close(sd); pthread_exit(NULL); } + /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*tdata->buffer->f.mcount; @@ -665,15 +669,18 @@ void *transRequest(void *threadarg) { pthread_exit(NULL); } } + /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; + if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { perror("Error sending tuples for thread\n"); close(sd); pthread_exit(NULL); } } + /* Send objects that are modified */ for(i = 0; i < tdata->buffer->f.nummod ; i++) { int size; @@ -1425,7 +1432,7 @@ void *transPrefetch(void *t) { if((qnode = pre_dequeue()) == NULL) { printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&pqueue.qlock); - pthread_exit(NULL); + continue; } pthread_mutex_unlock(&pqueue.qlock); @@ -1435,7 +1442,8 @@ void *transPrefetch(void *t) { /* and group requests by remote machine ids by calling the makePreGroups() */ if((pilehead = foundLocal(qnode)) == NULL) { printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__); - pthread_exit(NULL); + pre_enqueue(qnode); + continue; } ptr = pilehead; @@ -1480,15 +1488,16 @@ void *mcqProcess(void *threadid) { if((mcpilenode = mcpiledequeue()) == NULL) { printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&mcqueue.qlock); - pthread_exit(NULL); + continue; } /* Unlock mutex */ pthread_mutex_unlock(&mcqueue.qlock); /*Initiate connection to remote host and send request */ /* Process Request */ - if(mcpilenode->mid != myIpAddr) + if(mcpilenode->mid != myIpAddr) { sendPrefetchReq(mcpilenode); + } /* Deallocate the machine queue pile node */ mcdealloc(mcpilenode); @@ -1610,7 +1619,7 @@ void getPrefetchResponse(int count, int sd) { index += sizeof(unsigned int); /* For each object found add to Prefetch Cache */ objsize = *((int *)(buffer+index)); - index+=sizeof(int); + index += sizeof(int); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); @@ -1873,7 +1882,10 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n //FIXME currently all oids belong to one machine oid = oidarry[0]; - mid = lhashSearch(oid); + if((mid = lhashSearch(oid)) == 0) { + printf("Error: %s() No such machine found for oid =%x\n",__func__, oid); + return; + } if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ perror("reqNotify():socket()"); -- 2.34.1