From: adash Date: Thu, 14 Feb 2008 18:41:53 +0000 (+0000) Subject: trans.c is still buggy for large number of threads X-Git-Tag: preEdgeChange~279 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=43832cd930b0fbf6d8dca660b69d4d3f281e82dc;p=IRC.git trans.c is still buggy for large number of threads several bug fixes --- diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index 347dd4f2..306c0c8c 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -1433,6 +1433,7 @@ public class BuildCode { if (state.PREFETCH) { Iterator it = fpn.hspp.iterator(); output.println("/* prefetch */"); + output.println("; /* empty statement to avoid compiler error */"); /* TODO Add support for arrays, Currently handles only field pointers*/ /* The while loop below removes all prefetch tuples with arrays from the set of prefetches */ while(it.hasNext()) { @@ -1529,7 +1530,7 @@ public class BuildCode { tstlbl += generateTemp(fm, id.getTempDescAt(i), lb) + "+"; } tstlbl += id.offset.toString(); - output.println("if ("+tstlbl+"< 0 || "+tstlbl+" > "+ + output.println("if ("+tstlbl+"< 0 || "+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___) {"); output.println(" failedboundschk();"); output.println("}"); diff --git a/Robust/src/Main/Main.java b/Robust/src/Main/Main.java index e1b220fe..d0500ff0 100644 --- a/Robust/src/Main/Main.java +++ b/Robust/src/Main/Main.java @@ -139,6 +139,7 @@ public class Main { readSourceFile(state, ClassLibraryPrefix+"InetAddress.java"); readSourceFile(state, ClassLibraryPrefix+"SocketInputStream.java"); readSourceFile(state, ClassLibraryPrefix+"SocketOutputStream.java"); + readSourceFile(state, ClassLibraryPrefix+"gnu/Random.java"); if (state.TASK) { diff --git a/Robust/src/Runtime/DSTM/interface/clookup.c b/Robust/src/Runtime/DSTM/interface/clookup.c index 45ac76c6..a38d22e1 100644 --- a/Robust/src/Runtime/DSTM/interface/clookup.c +++ b/Robust/src/Runtime/DSTM/interface/clookup.c @@ -177,12 +177,12 @@ unsigned int chashResize(chashtable_t *table, unsigned int newsize) { } //Delete the entire hash table -void chashDelete(chashtable_t *table) { +void chashDelete(chashtable_t *ctable) { int i, isFirst; chashlistnode_t *ptr, *curr, *next; - ptr = table->table; + ptr = ctable->table; - for(i=0 ; isize ; i++) { + for(i=0 ; isize ; i++) { curr = &ptr[i]; isFirst = 1 ; while(curr != NULL) { @@ -196,6 +196,7 @@ void chashDelete(chashtable_t *table) { } free(ptr); - free(table); - table = NULL; + ptr = NULL; + free(ctable); + ctable = NULL; } diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 0d8b03d9..a6f3213c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -36,6 +36,7 @@ #define START_REMOTE_THREAD 23 #define THREAD_NOTIFY_REQUEST 24 #define THREAD_NOTIFY_RESPONSE 25 +#define TRANS_UNSUCESSFUL 26 //Control bits for status of objects in Machine pile #define OBJ_LOCKED_BUT_VERSION_MATCH 14 @@ -242,7 +243,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *); prefetchpile_t *makePreGroups(prefetchqelem_t *, int *); void checkPreCache(prefetchqelem_t *, int *, unsigned int, int); int transPrefetchProcess(transrecord_t *, int **, short); -void sendPrefetchReq(prefetchpile_t*, int); +void sendPrefetchReq(prefetchpile_t*); void getPrefetchResponse(int, int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index fd328e45..92103bb3 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -28,6 +28,7 @@ extern int classsize[]; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ +pthread_mutex_t threadnotify_mutex = PTHREAD_MUTEX_INITIALIZER; /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -122,6 +123,8 @@ void *dstmAccept(void *acceptfd) trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; + + int i; transinfo.objlocked = NULL; transinfo.objnotfound = NULL; @@ -131,23 +134,19 @@ void *dstmAccept(void *acceptfd) /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { - if (retval == 0) { - pthread_exit(NULL); // Testing connection - } - perror("Error in receiving control from coordinator\n"); + perror("Error: in receiving control from coordinator\n"); pthread_exit(NULL); } switch(control) { case READ_REQUEST: - printf("DEBUG -> Recv READ_REQUEST\n"); /* Read oid requested and search if available */ if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { - perror("Error receiving object from cooridnator\n"); + perror("Error: receiving 0x0 object from cooridnator\n"); pthread_exit(NULL); } if((srcObj = mhashSearch(oid)) == NULL) { - printf("Object 0x%x is not found in Main Object Store %s %d\n", oid, __FILE__, __LINE__); + printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); pthread_exit(NULL); } h = (objheader_t *) srcObj; @@ -176,29 +175,24 @@ void *dstmAccept(void *acceptfd) break; case READ_MULT_REQUEST: - printf("DEBUG-> READ_MULT_REQUEST\n"); break; case MOVE_REQUEST: - printf("DEBUG -> MOVE_REQUEST\n"); break; case MOVE_MULT_REQUEST: - printf("DEBUG -> MOVE_MULT_REQUEST\n"); break; case TRANS_REQUEST: /* Read transaction request */ - printf("DEBUG -> Recv TRANS_REQUEST\n"); if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { - printf("Error in readClientReq\n"); + printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); } break; case TRANS_PREFETCH: - printf("DEBUG -> Recv TRANS_PREFETCH\n"); if((val = prefetchReq((int)acceptfd)) != 0) { - printf("Error in transPrefetch\n"); + printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); } break; @@ -207,8 +201,8 @@ void *dstmAccept(void *acceptfd) if (retval <= 0) perror("dstmAccept(): error receiving START_REMOTE_THREAD msg"); else if (retval != sizeof(unsigned int)) - printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n", - retval); + printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n", + retval, __FILE__, __LINE__); else { objType = getObjType(oid); @@ -218,20 +212,29 @@ void *dstmAccept(void *acceptfd) case THREAD_NOTIFY_REQUEST: size = sizeof(unsigned int); - retval = recv((int)acceptfd, &numoid, size, 0); + bzero(&buffer, RECEIVE_BUFFER_SIZE); + retval = recv((int)acceptfd, &buffer, size, 0); + numoid = *((unsigned int *) &buffer); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); bzero(&buffer, RECEIVE_BUFFER_SIZE); retval = recv((int)acceptfd, &buffer, size, 0); - oidarry = calloc(numoid, sizeof(unsigned int)); - memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); - size = sizeof(unsigned int) * numoid; - versionarry = calloc(numoid, sizeof(unsigned short)); - memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); - size += sizeof(unsigned short) * numoid; - mid = *((unsigned int *)(buffer+size)); - size += sizeof(unsigned int); - threadid = *((unsigned int *)(buffer+size)); - processReqNotify(numoid, oidarry, versionarry, mid, threadid); + if(retval <=0) + perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST"); + else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid)) + printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval, + __FILE__, __LINE__); + else { + oidarry = calloc(numoid, sizeof(unsigned int)); + memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); + size = sizeof(unsigned int) * numoid; + versionarry = calloc(numoid, sizeof(unsigned short)); + memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); + size += sizeof(unsigned short) * numoid; + mid = *((unsigned int *)(buffer+size)); + size += sizeof(unsigned int); + threadid = *((unsigned int *)(buffer+size)); + processReqNotify(numoid, oidarry, versionarry, mid, threadid); + } break; @@ -240,9 +243,10 @@ void *dstmAccept(void *acceptfd) bzero(&buffer, RECEIVE_BUFFER_SIZE); retval = recv((int)acceptfd, &buffer, size, 0); if(retval <= 0) - perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg"); + perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE"); else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short)) - printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval); + printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n", + retval, __FILE__, __LINE__); else { oid = *((unsigned int *)buffer); size = sizeof(unsigned int); @@ -255,7 +259,7 @@ void *dstmAccept(void *acceptfd) break; default: - printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); + printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); } /* Close connection */ @@ -342,7 +346,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { /*Process the information read */ if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) { - printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__); + printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__); /* Free resources */ if(oidmod != NULL) { free(oidmod); @@ -363,22 +367,20 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { * Following this it also receives a new control message from the co-ordinator and processes this message*/ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) { - char *ptr, control, sendctrl; + char control, sendctrl; objheader_t *tmp_header; void *header; int i = 0, val, retval; /* Send reply to the Coordinator */ if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) { - printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__); + printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__); return 1; } - /* Read new control message from Coordiator */ - if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { - perror("Error in receiving control message\n"); - return 1; - } + do { + retval = recv((int)acceptfd, &control, sizeof(char), 0); + } while(retval < sizeof(char)); /* Process the new control message */ switch(control) { @@ -392,9 +394,9 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } /* Send ack to Coordinator */ - sendctrl = TRANS_SUCESSFUL; + sendctrl = TRANS_UNSUCESSFUL; if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ACK to coordinator\n"); + perror("Error: In sending ACK to coordinator\n"); if (transinfo->objlocked != NULL) { free(transinfo->objlocked); } @@ -404,13 +406,12 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, return 1; } - ptr = NULL; break; case TRANS_COMMIT: /* Invoke the transCommit process() */ if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { - printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); + printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__); /* Free memory */ if (transinfo->objlocked != NULL) { free(transinfo->objlocked); @@ -423,11 +424,9 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, break; case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: - //TODO expect another transrequest from client - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n"); break; default: - printf("No response to TRANS_AGREE OR DISAGREE protocol\n"); + printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__); //TODO Use fixed.trans_id TID since Client may have died break; } @@ -531,7 +530,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne /* Decide what control message to send to Coordinator */ if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked, modptr, oidnotfound, oidlocked, acceptfd)) == 0) { - printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__); + printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__); return 0; } @@ -599,12 +598,12 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock /* Process each modified object saved in the mainobject store */ for(i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } GETSIZE(tmpsize,header); pthread_mutex_lock(&mainobjstore_mutex); - memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t)); + memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); header->version += 1; /* If threads are waiting on this object to be updated, notify them */ if(header->notifylist != NULL) { @@ -620,7 +619,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock /* Unlock locked objects */ for(i = 0; i < numlocked; i++) { if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } STATUS(header) &= ~(LOCK); @@ -631,6 +630,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock control = TRANS_SUCESSFUL; if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); + return 1; } return 0; @@ -642,20 +642,24 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * then use offset values to prefetch references to other objects */ int prefetchReq(int acceptfd) { - int i, length, sum, n, numbytes, numoffset, N, size, count = 0; - int isArray = 0, bytesRecvd; + int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0; + int isArray = 0; unsigned int oid, index = 0; - unsigned int objoid, myIpAddr; - char *ptr, control, buffer[PRE_BUF_SIZE]; + char *ptr, buffer[PRE_BUF_SIZE]; void *mobj; + unsigned int objoid; + char control; objheader_t * header; + int bytesRecvd; +/* + unsigned int myIpAddr; #ifdef MAC myIpAddr = getMyIpAddr("en1"); #else myIpAddr = getMyIpAddr("eth0"); #endif - +*/ /* Repeatedly recv the oid and offset pairs sent for prefetch */ while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { count++; @@ -738,23 +742,24 @@ int prefetchReq(int acceptfd) { } /* Check for overflow in the buffer */ if (index >= PRE_BUF_SIZE) { - printf("Char buffer is overflowing\n"); + printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__); return 1; } /* Send Prefetch response control message only once*/ if(count == 1){ control = TRANS_PREFETCH_RESPONSE; if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error in sending PREFETCH RESPONSE to Coordinator\n"); + perror("Error: in sending PREFETCH RESPONSE to Coordinator\n"); return 1; } } /* Add the buffer size into buffer as a parameter */ *((unsigned int *)buffer)=index; + /* Send the entire buffer with its size and oids found and not found */ if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) { - perror("Error sending oids found\n"); + perror("Error: sending oids found\n"); return 1; } } @@ -765,27 +770,33 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short objheader_t *header; unsigned int oid; unsigned short newversion; - char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; + char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; int sd; struct sockaddr_in remoteAddr; int bytesSent; int status, size, retry = 0; - + int i = 0; while(i < numoid) { oid = *(oidarry + i); if((header = (objheader_t *) mhashSearch(oid)) == NULL) { - printf("processReqNotify(): Object is not found in mlookup %s, %d\n", __FILE__, __LINE__); + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return; } else { /* Check to see if versions are same */ checkversion: if ((STATUS(header) & LOCK) != LOCK) { STATUS(header) |= LOCK; - if(header->version == *(versionarry + i)) { + newversion = header->version; + if(newversion == *(versionarry + i)) { //Add to the notify list - insNode(header->notifylist, threadid, mid); + if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) { + printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); + return; + } + STATUS(header) &= ~(LOCK); } else { + STATUS(header) &= ~(LOCK); if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ perror("processReqNotify():socket()"); return; @@ -796,13 +807,13 @@ checkversion: remoteAddr.sin_addr.s_addr = htonl(mid); if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - printf("processReqNotify():error %d connecting to %s:%d\n", errno, + printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); status = -1; } else { //Send Update notification msg[0] = THREAD_NOTIFY_RESPONSE; - msg[1] = oid; + *((unsigned int *)&msg[1]) = oid; size = sizeof(unsigned int); *((unsigned short *)(&msg[1]+size)) = newversion; size += sizeof(unsigned short); @@ -812,7 +823,8 @@ checkversion: perror("processReqNotify():send()"); status = -1; } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){ - printf("processReqNotify(): error, sent %d bytes\n", bytesSent); + printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", + bytesSent, __FILE__, __LINE__); status = -1; } else { status = 0; @@ -821,13 +833,12 @@ checkversion: } close(sd); } - STATUS(header) &= ~(LOCK); } else { randomdelay(); - printf("processReqNotify() Object is still locked\n"); goto checkversion; } } + i++; } free(oidarry); free(versionarry); diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 1752c36c..7672ee0e 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -35,7 +35,7 @@ plistnode_t *pCreate(int objects) { return NULL; } - pile->nummod = pile->numread = pile->numcreated = pile->sum_bytes = 0; + pile->nummod = pile->numread = pile->numcreated = pile->sum_bytes = pile->mid = 0; pile->next = NULL; return pile; } @@ -44,7 +44,7 @@ plistnode_t *pCreate(int objects) { * a machine pile data structure */ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { plistnode_t *ptr, *tmp; - int found = 0, offset; + int found = 0, offset = 0; tmp = pile; //Add oid into a machine that is already present in the pile linked list structure @@ -64,9 +64,9 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi tmp->sum_bytes += sizeof(objheader_t) + tmpsize; } else { offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; - *((unsigned int *)(tmp->objread + offset))=OID(headeraddr); + *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr); offset += sizeof(unsigned int); - memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short)); + *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version; tmp->numread = tmp->numread + 1; } found = 1; @@ -93,17 +93,17 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi ptr->sum_bytes += sizeof(objheader_t) + tmpsize; } else { *((unsigned int *)ptr->objread)=OID(headeraddr); - memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short)); + offset = sizeof(unsigned int); + *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version; ptr->numread = ptr->numread + 1; } ptr->next = pile; pile = ptr; } - - /* Clear Flags */ - STATUS(headeraddr) &= ~(NEW); - STATUS(headeraddr) &= ~(DIRTY); + /* Clear Flags */ + STATUS(headeraddr) &= ~NEW; + STATUS(headeraddr) &= ~DIRTY; return pile; } diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 6837726b..c4b9e45c 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -8,7 +8,6 @@ void queueInit(void) { pthread_mutexattr_init(&pqueue.qlockattr); pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr); - //pthread_mutex_init(&pqueue.qlock, NULL); pthread_cond_init(&pqueue.qcond, NULL); } diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.c b/Robust/src/Runtime/DSTM/interface/threadnotify.c index eb2f5f39..3d7f2928 100644 --- a/Robust/src/Runtime/DSTM/interface/threadnotify.c +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.c @@ -5,7 +5,7 @@ notifyhashtable_t nlookup; //Global hash table /* This function creates a new node in the linked list of threads waiting * for an update notification from a particular object. * This takes in the head of the linked list and inserts the new node to it */ -void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { +threadlist_t *insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { threadlist_t *ptr; if(head == NULL) { if((head = calloc(1, sizeof(threadlist_t))) == NULL) { @@ -25,6 +25,8 @@ void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { ptr->next = head; head = ptr; } + + return head; } /* This function displays the linked list of threads waiting on update notification @@ -73,7 +75,8 @@ unsigned int notifyhashFunction(unsigned int tid) { unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { unsigned int newsize; int index; - notifylistnode_t *ptr, *node; + notifylistnode_t *ptr, *node, *tmp; + int isFound = 0; if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) { //Resize Table @@ -82,6 +85,7 @@ unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { notifyhashResize(newsize); pthread_mutex_unlock(&nlookup.locktable); } + /* ptr = nlookup.table; nlookup.numelements++; @@ -105,6 +109,36 @@ unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { ptr[index].next = node; } pthread_mutex_unlock(&nlookup.locktable); + */ + ptr = nlookup.table; + index = notifyhashFunction(tid); + pthread_mutex_lock(&nlookup.locktable); + if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable + ptr[index].threadid = tid; + ptr[index].ndata = ndata; + } else { + tmp = &ptr[index]; + while(tmp != NULL) { + if(tmp->threadid == tid) { + isFound = 1; + tmp->ndata = ndata; + } + tmp = tmp->next; + } + if(!isFound) { + if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&nlookup.locktable); + return 1; + } + node->threadid = tid; + node->ndata = ndata; + node->next = ptr[index].next; + ptr[index].next = node; + } + } + pthread_mutex_unlock(&nlookup.locktable); + return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.h b/Robust/src/Runtime/DSTM/interface/threadnotify.h index 6749af42..05bddb48 100644 --- a/Robust/src/Runtime/DSTM/interface/threadnotify.h +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.h @@ -22,6 +22,7 @@ typedef struct notifydata { unsigned int *oidarry; /* Pointer to array of oids that this threadid is waiting on*/ unsigned short *versionarry;/* Pointer to array of versions of the oids that we are waiting on */ pthread_cond_t threadcond; /* Cond variable associated with each threadid that needs to be signaled*/ + pthread_mutex_t threadnotify; }notifydata_t; typedef struct notifylistnode { @@ -38,7 +39,7 @@ typedef struct notifyhashtable { pthread_mutex_t locktable; //Lock for the hashtable } notifyhashtable_t; -void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); //Inserts nodes for one object that +threadlist_t *insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); //Inserts nodes for one object that //needs to send notification to threads waiting on it void display(threadlist_t *head);// Displays linked list of nodes for one object unsigned int notifyhashCreate(unsigned int size, float loadfactor); //returns 1 if hashtable creation is not successful diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 419ed321..2aa4551e 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -45,6 +45,8 @@ int myIndexInHostArray; unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; +void *mlist[10000]; +pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); @@ -84,12 +86,16 @@ inline int findmax(int *array, int arraylength) { void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { int qnodesize; int len = 0; - int i; - + int i, rc; + + //do { + // rc=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + //} while(rc!=0); + /* Allocate for the queue node*/ char *node; if(ntuples > 0) { - qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); + qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(unsigned short); if((node = calloc(1, qnodesize)) == NULL) { printf("Calloc Error %s, %d\n", __FILE__, __LINE__); return; @@ -100,10 +106,9 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short len += sizeof(int); memcpy(node + len, oids, ntuples*sizeof(unsigned int)); len += ntuples * sizeof(unsigned int); - memcpy(node + len, endoffsets, ntuples*sizeof(short)); - len += ntuples * sizeof(short); - memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short)); - + memcpy(node + len, endoffsets, ntuples*sizeof(unsigned short)); + len += ntuples * sizeof(unsigned short); + memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(unsigned short)); /* Lock and insert into primary prefetch queue */ pthread_mutex_lock(&pqueue.qlock); pre_enqueue((prefetchqelem_t *)node); @@ -121,6 +126,12 @@ int dstmStartup(const char * option) { if (processConfigFile() != 0) return 0; //TODO: return error value, cause main program to exit + //TODO Remove after testing + //Initializing the global array + int i; + for (i = 0; i < 10000; i++) + mlist[i] = NULL; + //////// dstmInit(); transInit(); @@ -186,12 +197,11 @@ void transInit() { queueInit(); //Initialize machine pile w/prefetch oids and offsets shared queue mcpileqInit(); + //Create the primary prefetch thread - do { retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); } while(retval!=0); - pthread_detach(tPrefetch); //Create and Initialize a pool of threads @@ -234,6 +244,20 @@ transrecord_t *transStart() transrecord_t *tmp = calloc(1, sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); + //TODO Remove after testing + //Filling the global array when transansaction's record lookupTable + //is calloced + pthread_mutex_lock(&mlock); + int ii; + for (ii = 0; ii < 10000; ii++) { + if (mlist[ii] == NULL) { + mlist[ii] = (void *)tmp->lookupTable; + break; + } + } + if (ii == 10000) { fprintf(stderr, "Error"); } + pthread_mutex_unlock(&mlock); + //////////// #ifdef COMPILER tmp->revertlist=NULL; #endif @@ -279,8 +303,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* Look up in machine lookup table and copy into cache*/ GETSIZE(size, objheader); size += sizeof(objheader_t); - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)objheader, size); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, objheader, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, OID(objheader), objcopy); #ifdef COMPILER @@ -291,8 +315,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ GETSIZE(size, tmp); size+=sizeof(objheader_t); - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, tmp, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, OID(tmp), objcopy); #ifdef COMPILER @@ -304,14 +328,14 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /*If object not found in prefetch cache then block until object appears in the prefetch cache */ pthread_mutex_lock(&pflookup.lock); while(!found) { - rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); + rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); /* Check Prefetch cache again */ if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) { found = 1; GETSIZE(size,tmp); size+=sizeof(objheader_t); - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, tmp, size); chashInsert(record->lookupTable, OID(tmp), objcopy); pthread_mutex_unlock(&pflookup.lock); #ifdef COMPILER @@ -320,9 +344,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { return objcopy; #endif } else if (rc == ETIMEDOUT) { - printf("Wait timed out\n"); - pthread_mutex_unlock(&pflookup.lock); - break; + pthread_mutex_unlock(&pflookup.lock); + break; } } @@ -330,13 +353,13 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { - printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__); + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); return NULL; } else { #ifdef COMPILER - return &objcopy[1]; + return &objcopy[1]; #else - return objcopy; + return objcopy; #endif } } @@ -345,18 +368,18 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* This function creates objects in the transaction record */ objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { - objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); - tmp->notifylist = NULL; - OID(tmp) = getNewOID(); - tmp->version = 1; - tmp->rcount = 1; - STATUS(tmp) = NEW; - chashInsert(record->lookupTable, OID(tmp), tmp); + objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); + tmp->notifylist = NULL; + OID(tmp) = getNewOID(); + tmp->version = 1; + tmp->rcount = 1; + STATUS(tmp) = NEW; + chashInsert(record->lookupTable, OID(tmp), tmp); #ifdef COMPILER - return &tmp[1]; //want space after object header + return &tmp[1]; //want space after object header #else - return tmp; + return tmp; #endif } @@ -370,7 +393,7 @@ plistnode_t *createPiles(transrecord_t *record) { unsigned int machinenum; void *localmachinenum; objheader_t *headeraddr; - + ptr = record->lookupTable->table; size = record->lookupTable->size; @@ -390,7 +413,7 @@ plistnode_t *createPiles(transrecord_t *record) { } //Get machine location for object id (and whether local or not) - if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) { + if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { machinenum = myIpAddr; } else if ((machinenum = lhashSearch(curr->key)) == 0) { printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); @@ -418,7 +441,7 @@ int transCommit(transrecord_t *record) { unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; int i, j, rc, val; - int pilecount, offset, threadnum, trecvcount; + int pilecount, offset, threadnum = 0, trecvcount = 0; char buffer[RECEIVE_BUFFER_SIZE],control; char transid[TID_LEN]; trans_req_data_t *tosend; @@ -427,189 +450,213 @@ int transCommit(transrecord_t *record) { char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ char localstat = 0; - do { - trecvcount = 0; - threadnum = 0; - treplyretry = 0; + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + pile_ptr = pile = createPiles(record); - /* Look through all the objects in the transaction record and make piles - * for each machine involved in the transaction*/ - pile_ptr = pile = createPiles(record); + /* Create the packet to be sent in TRANS_REQUEST */ - /* Create the packet to be sent in TRANS_REQUEST */ + /* Count the number of participants */ + pilecount = pCount(pile); - /* Count the number of participants */ - pilecount = pCount(pile); + /* Create a list of machine ids(Participants) involved in transaction */ + if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + pListMid(pile, listmid); - /* Create a list of machine ids(Participants) involved in transaction */ - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - //free(record); - return 1; - } - pListMid(pile, listmid); + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; - /* Initialize thread variables, - * Spawn a thread for each Participant involved in a transaction */ - pthread_t thread[pilecount]; - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; + thread_data_array_t *thread_data_array; + if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + return 1; + } - thread_data_array_t *thread_data_array; - if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { - printf("Malloc error %s, %d\n", __FILE__, __LINE__); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - //free(record); - return 1; - } + local_thread_data_array_t *ltdata; + if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + return 1; + } + + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - local_thread_data_array_t *ltdata; - if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); free(thread_data_array); - //free(record); + free(ltdata); return 1; } - - thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - - /* Initialize and set thread detach attribute */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); - - /* Process each machine pile */ - while(pile != NULL) { - //Create transaction id - newtid++; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.numcreated = pile->numcreated; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + tosend->oidcreated = pile->oidcreated; + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->mid != myIpAddr) { /* Not local */ + do { + rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); + } while(rc!=0); + if(rc) { + perror("Error in pthread create\n"); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); - //free(record); return 1; } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - tosend->f.nummod = pile->nummod; - tosend->f.numcreated = pile->numcreated; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - tosend->oidcreated = pile->oidcreated; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->mid != myIpAddr) { /* Not local */ - do { - rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); - } while(rc!=0); - if(rc) { - perror("Error in pthread create\n"); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); - //free(record); - return 1; - } - } else { /*Local*/ - ltdata->tdata = &thread_data_array[threadnum]; - ltdata->transinfo = &transinfo; - do { + } else { /*Local*/ + ltdata->tdata = &thread_data_array[threadnum]; + ltdata->transinfo = &transinfo; + do { val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); - } while(val!=0); - if(val) { - perror("Error in pthread create\n"); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); - //free(record); - return 1; - } - } - - threadnum++; - pile = pile->next; - } - - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); - - for (i = 0; i < pilecount; i++) { - rc = pthread_join(thread[i], NULL); - if(rc) - { - printf("ERROR return code from pthread_join() is %d\n", rc); + } while(val!=0); + if(val) { + perror("Error in pthread create\n"); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); - for (j = i; j < pilecount; j++) - free(thread_data_array[j].buffer); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); - //free(record); return 1; } - free(thread_data_array[i].buffer); } - /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(listmid); - pDelete(pile_ptr); - free(thread_data_array); - free(ltdata); + threadnum++; + pile = pile->next; + } - /* wait a random amount of time before retrying to commit transaction*/ - if(treplyretry == 1) { - randomdelay(); + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + + for (i = 0; i < threadnum; i++) { + rc = pthread_join(thread[i], NULL); + if(rc) + { + printf("Error: return code from pthread_join() is %d\n", rc); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (j = i; j < threadnum; j++) { + free(thread_data_array[j].buffer); + } + return 1; } - - /* Retry trans commit procedure if not sucessful in the first try */ - } while (treplyretry == 1); + free(thread_data_array[i].buffer); + } + + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(listmid); + pDelete(pile_ptr); - /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + + if(treplyctrl == TRANS_ABORT) { + /* Free Resources */ + objstrDelete(record->cache); + //TODO Remove after testing + pthread_mutex_lock(&mlock); + int count = 0, jj = 0, ii = 0; + for (ii = 0; ii < 10000; ii++) { + if (mlist[ii] == (void *) record->lookupTable) { + count++; + jj = ii; + } + } + if (count==2 || count == 0) { + fprintf(stderr, "TRANS_ABORT CASE: Count for same addr:%d\n", count); + } + if (count == 1) + mlist[jj] = 0; + pthread_mutex_unlock(&mlock); + //////////// + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return TRANS_ABORT; + } else if(treplyctrl == TRANS_COMMIT) { + /* Free Resources */ + objstrDelete(record->cache); + //TODO Remove after Testing + pthread_mutex_lock(&mlock); + int count = 0, jj = 0, ii = 0; + for (ii = 0; ii < 10000; ii++) { + if (mlist[ii] == (void *) record->lookupTable) {count++; jj = ii;} + } + if (count==2 || count == 0) { + fprintf(stderr, "TRANS_COMMIT CASE: Count for same addr:%d\n", count); + } + if (count == 1) mlist[jj] = 0; + pthread_mutex_unlock(&mlock); + /////////////////////// + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return 0; + } else { + //TODO Add other cases + printf("DEBUG-> THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n"); + exit(-1); + } + return 0; } @@ -626,6 +673,7 @@ void *transRequest(void *threadarg) { char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; char machineip[16], retval; + tdata = (thread_data_array_t *) threadarg; /* Send Trans Request */ @@ -689,6 +737,7 @@ void *transRequest(void *threadarg) { close(sd); pthread_exit(NULL); } + recvcontrol = control; /* Update common data structure and increment count */ @@ -717,6 +766,20 @@ void *transRequest(void *threadarg) { pthread_exit(NULL); } + if ((retval = recv((int)sd, &control, sizeof(char), 0))<= 0) { + printf("Error: In receiving control %s,%d\n", __FILE__, __LINE__); + close(sd); + pthread_exit(NULL); + } + + if(control == TRANS_UNSUCESSFUL) { + //printf("DEBUG-> TRANS_ABORTED\n"); + } else if(control == TRANS_SUCESSFUL) { + //printf("DEBUG-> TRANS_SUCCESSFUL\n"); + } else { + //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); + } + /* Close connection */ close(sd); pthread_exit(NULL); @@ -756,7 +819,7 @@ void decideResponse(thread_data_array_t *tdata) { *(tdata->replyretry) = 0; /* clear objects from prefetch cache */ for (i = 0; i < tdata->buffer->f.numread; i++) - prehashRemove(*(unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); + prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); for (i = 0; i < tdata->buffer->f.nummod; i++) prehashRemove(tdata->buffer->oidmod[i]); } else if(transagree == tdata->buffer->f.mcount){ @@ -775,7 +838,7 @@ void decideResponse(thread_data_array_t *tdata) { * It returns a char that is only needed to check the correctness of execution of this function inside * transRequest()*/ char sendResponse(thread_data_array_t *tdata, int sd) { - int n, N, sum, oidcount = 0; + int n, N, sum, oidcount = 0, control; char *ptr, retval = 0; unsigned int *oidnotfound; @@ -805,6 +868,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) { if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ctrl message for participant\n"); + return 0; } return retval; @@ -824,6 +888,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { objheader_t *h; void *objcopy; + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket\n"); return NULL; @@ -844,7 +909,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; - if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { + if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { perror("Error sending message\n"); return NULL; } @@ -864,7 +929,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return NULL; } objcopy = objstrAlloc(record->cache, size); - if((val = read(sd, objcopy, size)) <= 0) { + if((val = read(sd, (char *)objcopy, size)) <= 0) { perror("No objects are read from the remote participant\n"); return NULL; } @@ -909,13 +974,13 @@ void *handleLocalReq(void *threadarg) { if (i < localtdata->tdata->buffer->f.numread) { int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array incr *= i; - oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr)); - version = *((short *)(localtdata->tdata->buffer->objread + incr + sizeof(unsigned int))); + oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); + version = *((short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); } else { // Objects Modified int tmpsize; headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); if (headptr == NULL) { - printf("Error: handleLocalReq() returning NULL\n"); + printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__); return NULL; } oid = OID(headptr); @@ -930,7 +995,7 @@ void *handleLocalReq(void *threadarg) { objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ - if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) { + if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) { if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ v_matchlock++; } else {/* If versions don't match ...HARD ABORT */ @@ -1021,8 +1086,6 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { STATUS(((objheader_t *)header)) &= ~(LOCK); } - printf("TRANS_ABORTED\n"); - return 0; } @@ -1039,6 +1102,7 @@ int transComProcess(local_thread_data_array_t *localtdata) { oidcreated = localtdata->tdata->buffer->oidcreated; numlocked = localtdata->transinfo->numlocked; oidlocked = localtdata->transinfo->objlocked; + for (i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); @@ -1050,15 +1114,12 @@ int transComProcess(local_thread_data_array_t *localtdata) { return 1; } GETSIZE(tmpsize, header); - pthread_mutex_lock(&mainobjstore_mutex); - memcpy(header, tcptr, tmpsize + sizeof(objheader_t)); + memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize); header->version += 1; - /* If threads are waiting on this object to be updated, notify them */ if(header->notifylist != NULL) { notifyAll(&header->notifylist, OID(header), header->version); } - pthread_mutex_unlock(&mainobjstore_mutex); } /* If object is newly created inside transaction then commit it */ @@ -1071,12 +1132,12 @@ int transComProcess(local_thread_data_array_t *localtdata) { tmpsize += sizeof(objheader_t); pthread_mutex_lock(&mainobjstore_mutex); if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { - printf("Error: transComProcess() failed objstrAlloc\n"); + printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&mainobjstore_mutex); return 1; } pthread_mutex_unlock(&mainobjstore_mutex); - memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t)); + memcpy(ptrcreate, header, tmpsize); mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); } @@ -1166,7 +1227,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { int ntuples, i, machinenum, count=0; unsigned int *oid; short *endoffsets, *arryfields, *offset; - prefetchpile_t *head = NULL; + prefetchpile_t *head = NULL, *tmp = NULL; /* Check for the case x.y.z and a.b.c are same oids */ ptr = (char *) node; @@ -1182,8 +1243,20 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { /* Check for redundant tuples by comparing oids of each tuple */ for(i = 0; i < ntuples; i++) { - if(oid[i] == 0) + if(oid[i] == 0){ + if(head->next != NULL) { + if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + tmp->mid = myIpAddr; + tmp->next = head; + head = tmp; + } else { + head->mid = myIpAddr; + } continue; + } /* For each tuple make piles */ if ((machinenum = lhashSearch(oid[i])) == 0) { printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__); @@ -1220,7 +1293,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); arryfields = GET_PTR_ARRYFLD(ptr, ntuples); - + /* Find offset length for each tuple */ int numoffset[ntuples];//Number of offsets for each tuple numoffset[0] = endoffsets[0]; @@ -1425,7 +1498,6 @@ void *transPrefetch(void *t) { pthread_mutex_unlock(&mcqueue.qlock); /* Deallocate the prefetch queue pile node */ predealloc(qnode); - pthread_exit(NULL); } } @@ -1457,15 +1529,15 @@ void *mcqProcess(void *threadid) { /*Initiate connection to remote host and send request */ /* Process Request */ - sendPrefetchReq(mcpilenode, tid); + if(mcpilenode->mid != myIpAddr) + sendPrefetchReq(mcpilenode); /* Deallocate the machine queue pile node */ mcdealloc(mcpilenode); - pthread_exit(NULL); } } -void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { +void sendPrefetchReq(prefetchpile_t *mcpilenode) { int sd, i, off, len, endpair, count = 0; struct sockaddr_in serv_addr; struct hostent *server; @@ -1505,23 +1577,27 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { while(tmp != NULL) { off = 0; count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ - len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short)); char oidnoffset[len]; bzero(oidnoffset, len); - memcpy(oidnoffset, &len, sizeof(int)); + *((unsigned int*)oidnoffset) = len; + //memcpy(oidnoffset, &len, sizeof(int)); off = sizeof(int); - memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); + *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid; + //memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); off += sizeof(unsigned int); for(i = 0; i < tmp->numoffset; i++) { - memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short)); - off+=sizeof(short); + *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i]; + //memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short)); + off+=sizeof(unsigned short); } - if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) { + + if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) { perror("Error sending fixed bytes for thread\n"); close(sd); return; } - + tmp = tmp->next; } @@ -1546,7 +1622,6 @@ void getPrefetchResponse(int count, int sd) { char *ptr; void *modptr, *oldptr; - /* Read prefetch response from the Remote machine */ if((val = read(sd, &control, sizeof(char))) <= 0) { perror("No control response for Prefetch request sent\n"); @@ -1584,7 +1659,7 @@ void getPrefetchResponse(int count, int sd) { index+=sizeof(int); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { - printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&prefetchcache_mutex); return; } @@ -1835,9 +1910,10 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n int bytesSent; int status, size; unsigned short version; - unsigned int oid, threadid, mid; - pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification - pthread_cond_t threadcond; + unsigned int oid,mid; + static unsigned int threadid = 0; + pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification + pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER; notifydata_t *ndata; //FIXME currently all oids belong to one machine @@ -1855,7 +1931,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n remoteAddr.sin_addr.s_addr = htonl(mid); /* Generate unique threadid */ - threadid = (unsigned int) pthread_self(); + threadid++; /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */ if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) { @@ -1867,6 +1943,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n ndata->oidarry = oidarry; ndata->versionarry = versionarry; ndata->threadcond = threadcond; + ndata->threadnotify = threadnotify; if((status = notifyhashInsert(threadid, ndata)) != 0) { printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); free(ndata); @@ -1909,24 +1986,29 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n size += sizeof(unsigned int); *((unsigned int *)(&msg[1] + size)) = threadid; - pthread_mutex_lock(&threadnotify); - bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int) , 0); + pthread_mutex_lock(&(ndata->threadnotify)); + bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0); if (bytesSent < 0){ perror("reqNotify():send()"); status = -1; - } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int)){ - printf("reNotify(): error, sent %d bytes\n", bytesSent); + } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){ + printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__); status = -1; } else { status = 0; } - pthread_cond_wait(&threadcond, &threadnotify); - pthread_mutex_unlock(&threadnotify); + + pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify)); + pthread_mutex_unlock(&(ndata->threadnotify)); } + pthread_cond_destroy(&threadcond); + pthread_mutex_destroy(&threadnotify); + free(ndata); close(sock); return status; } + void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { notifydata_t *ndata; int i, objIsFound = 0, index; @@ -1944,23 +2026,18 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { } } if(objIsFound == 0){ + printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__); return; } else { if(version <= ndata->versionarry[index]){ + printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__); return; } else { /* Clear from prefetch cache and free thread related data structure */ - if((ptr = prehashSearch(oid)) == NULL) { - //TODO Ask about freeing - printf("threadnotify(): No such oid %s, %d\n", __FILE__, __LINE__); - pthread_cond_signal(&ndata->threadcond); - free(ndata); - return; - } else { + if((ptr = prehashSearch(oid)) != NULL) { prehashRemove(oid); - pthread_cond_signal(&ndata->threadcond); - free(ndata); } + pthread_cond_signal(&(ndata->threadcond)); } } } @@ -1973,6 +2050,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { struct sockaddr_in remoteAddr; char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; int sock, status, size, bytesSent; + while(*head != NULL) { ptr = *head; mid = ptr->mid; @@ -1992,19 +2070,21 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); status = -1; } else { + bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); msg[0] = THREAD_NOTIFY_RESPONSE; - msg[1] = oid; + *((unsigned int *)&msg[1]) = oid; size = sizeof(unsigned int); *((unsigned short *)(&msg[1]+ size)) = version; size+= sizeof(unsigned short); *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; - bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0); + bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0); if (bytesSent < 0){ perror("notifyAll():send()"); status = -1; } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){ - printf("notifyAll(): error, sent %d bytes\n", bytesSent); + printf("notifyAll(): error, sent %d bytes %s, %d\n", + bytesSent, __FILE__, __LINE__); status = -1; } else { status = 0; diff --git a/Robust/src/Runtime/thread.c b/Robust/src/Runtime/thread.c index 5523426a..d5f195e8 100644 --- a/Robust/src/Runtime/thread.c +++ b/Robust/src/Runtime/thread.c @@ -20,7 +20,7 @@ pthread_key_t threadlocks; pthread_mutex_t threadnotifylock; pthread_cond_t threadnotifycond; transrecord_t * trans; -pthread_key_t oid; +pthread_key_t oidval; void threadexit() { objheader_t* ptr; @@ -47,7 +47,7 @@ void threadexit() { pthread_mutex_unlock(&gclistlock); #ifdef DSTM /* Add transaction to check if thread finished for join operation */ - value = pthread_getspecific(oid); + value = pthread_getspecific(oidval); oidvalue = *((unsigned int *)value); goto transstart; transstart: @@ -55,8 +55,8 @@ transstart: ptr = transRead(trans, oidvalue); struct ___Thread___ *p = (struct ___Thread___ *) ptr; p->___threadDone___ = 1; + *((unsigned int *)&((struct ___Object___ *) p)->___localcopy___) |=DIRTY; if(transCommit(trans) != 0) { - transAbort(trans); goto transstart; } #endif @@ -123,10 +123,9 @@ void CALL11(___Thread______sleep____J, long long ___millis___, long long ___mill #endif } +#ifdef DSTM /* Add thread join capability */ void CALL01(___Thread______join____, struct ___Thread___ * ___this___) { - printf("DEBUG -> Inside thread join\n"); -#ifdef DSTM pthread_t thread; unsigned int *oidarray; unsigned short *versionarray, version; @@ -163,8 +162,8 @@ transstart: goto transstart; } return; -#endif } +#endif #ifdef THREADS void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) { @@ -197,7 +196,7 @@ void CALL12(___Thread______start____I, int ___mid___, struct ___Thread___ * ___t #ifdef DSTM void globalDestructor(void *value) { free(value); - pthread_setspecific(oid, NULL); + pthread_setspecific(oidval, NULL); } void initDSMthread(int *ptr) { @@ -214,7 +213,7 @@ void initDSMthread(int *ptr) { #endif threadData = calloc(1, sizeof(unsigned int)); *((unsigned int *) threadData) = oid; - pthread_setspecific(oid, threadData); + pthread_setspecific(oidval, threadData); pthread_mutex_lock(&gclistlock); threadcount--; pthread_cond_signal(&gccond); @@ -224,29 +223,28 @@ void initDSMthread(int *ptr) { transstart: trans = transStart(); tmp = transRead(trans, (unsigned int) oid); - struct ___Thread___ *t = (struct ___Thread___ *) tmp; - t->___threadDone___ = 1; + ((struct ___Thread___ *)tmp)->___threadDone___ = 1; + *((unsigned int *)&((struct ___Object___ *) tmp)->___localcopy___) |=DIRTY; if(transCommit(trans)!= 0) { - transAbort(trans); goto transstart; } pthread_exit(NULL); } void startDSMthread(int oid, int objType) { - pthread_t thread; - int retval; - pthread_attr_t nattr; + pthread_t thread; + int retval; + pthread_attr_t nattr; - pthread_mutex_lock(&gclistlock); - threadcount++; - pthread_mutex_unlock(&gclistlock); + pthread_mutex_lock(&gclistlock); + threadcount++; + pthread_mutex_unlock(&gclistlock); pthread_attr_init(&nattr); pthread_attr_setdetachstate(&nattr, PTHREAD_CREATE_DETACHED); int * ptr=malloc(sizeof(int)*2); ptr[0]=oid; ptr[1]=objType; - pthread_key_create(&oid, globalDestructor); + pthread_key_create(&oidval, globalDestructor); do { retval=pthread_create(&thread, &nattr, (void * (*)(void *)) &initDSMthread, ptr); if (retval!=0) diff --git a/Robust/src/Tests/Atomic.java b/Robust/src/Tests/Atomic.java index 36cb6ce1..62b67227 100644 --- a/Robust/src/Tests/Atomic.java +++ b/Robust/src/Tests/Atomic.java @@ -7,6 +7,7 @@ public class Atomic { } int q=test(z); System.printInt(q); + System.printString("\n"); } public static atomic int test(Integer y) { return y.intValue(); diff --git a/Robust/src/Tests/Atomic2.java b/Robust/src/Tests/Atomic2.java index cea24dc1..c96ef202 100644 --- a/Robust/src/Tests/Atomic2.java +++ b/Robust/src/Tests/Atomic2.java @@ -13,8 +13,6 @@ public class Atomic2 extends Thread { System.printString("Starting\n"); t.start(mid); System.printString("Finished\n"); - /* Test thread join */ - t.join(); //this is ugly... while(true) { atomic { diff --git a/Robust/src/Tests/Atomic3.java b/Robust/src/Tests/Atomic3.java index e38acce1..f17fb650 100644 --- a/Robust/src/Tests/Atomic3.java +++ b/Robust/src/Tests/Atomic3.java @@ -17,6 +17,7 @@ public class Atomic3 extends Thread { } System.printString("b is "); System.printInt(b); + System.printString("\n"); atomic{ at3.root.item = 2445; y = global new Integer(400); diff --git a/Robust/src/Tests/Atomic5.java b/Robust/src/Tests/Atomic5.java index f9a74a6d..c4659e07 100644 --- a/Robust/src/Tests/Atomic5.java +++ b/Robust/src/Tests/Atomic5.java @@ -1,3 +1,4 @@ +/* This test case tests the thread joining for a threadDSM library */ public class Atomic5 extends Thread { public People[] team; public Atomic5() { @@ -9,6 +10,7 @@ public class Atomic5 extends Thread { Integer age; Atomic5 tmp; Atomic5[] at5; + atomic { at5 = global new Atomic5[4]; } @@ -20,6 +22,7 @@ public class Atomic5 extends Thread { at5[i].team[1] = global new People(); age = global new Integer(35); at5[i].team[0].age = age; + at5[i].team[1].age = age; } b = at5[1].team[0].getAge(); } @@ -29,6 +32,7 @@ public class Atomic5 extends Thread { age = global new Integer(70); at5[1].team[1].age = age; c = at5[1].team[1].getAge(); + //at5[20].team[1].age = age; } System.printInt(c); System.printString("\n"); @@ -39,22 +43,21 @@ public class Atomic5 extends Thread { } tmp.start(mid); } - /* for(int i = 0; i< 4; i++) { atomic { tmp = at5[i]; } tmp.join(); } - */ System.printString("Finished\n"); + /* while(true) { ; } + */ } public void run() { - /* int ag; boolean old = false; atomic { @@ -63,17 +66,12 @@ public class Atomic5 extends Thread { old = true; } if(old){ - System.printString(" gets Pension"); + System.printString("Gets Pension"); + System.printString("\n"); + } else { + System.printString("Gets No Pension"); System.printString("\n"); } - */ - System.printString("Atomic5() Inside the run program"); - System.printString("\n"); - /* - for(int i=0; i<4 ; i++) { - at5[i].join(); - } - */ } } diff --git a/Robust/src/buildscript b/Robust/src/buildscript index 6c8f1f10..67d09e44 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -13,7 +13,7 @@ echo -tagstate do tag state analysis echo -scheduling do task scheduling echo -optional enable optional echo -debug generate debug symbols -echo -prefetch prefetch analysis +echo -prefetch do prefetch analysis echo -webinterface enable web interface echo -runtimedebug printout runtime debug messages echo "-thread use support for multiple threads" @@ -200,7 +200,7 @@ cd $BUILDDIR/specdir echo > $BUILDDIR/checkers.h for i in `cat $BUILDDIR/specs` do -gcc -O0 -g -c $i\_aux.c +gcc -O0 -g -fbounds-check -c $i\_aux.c echo \#include \"specdir\/$i\_aux.h\" >> $BUILDDIR/checkers.h done fi # CHECKFLAG