From: adash Date: Mon, 5 May 2008 06:54:57 +0000 (+0000) Subject: fix for too many socket errors and race condition in locking main X-Git-Tag: preEdgeChange~104 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=b1294d9c5619d71018bb2a826b0ea27fa59638e6;p=IRC.git fix for too many socket errors and race condition in locking main object stores look up table --- diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 2674a911..a5e2016b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -16,6 +16,7 @@ extern int classsize[]; extern int numHostsInSystem; extern pthread_mutex_t notifymutex; +extern unsigned int myIpAddr; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; @@ -29,89 +30,92 @@ sockPoolHashTable_t *transPResponseSocketPool; int dstmInit(void) { - mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); - /* Initialize attribute for mutex */ - pthread_mutexattr_init(&mainobjstore_mutex_attr); - pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); - pthread_mutex_init(&lockObjHeader,NULL); - if (mhashCreate(HASH_SIZE, LOADFACTOR)) - return 1; //failure - - if (lhashCreate(HASH_SIZE, LOADFACTOR)) - return 1; //failure - - if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) - return 1; //failure - - //Initialize socket pool - if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) { - printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); - return 0; - } - - return 0; + mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); + /* Initialize attribute for mutex */ + pthread_mutexattr_init(&mainobjstore_mutex_attr); + pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); + pthread_mutex_init(&lockObjHeader,NULL); + if (mhashCreate(HASH_SIZE, LOADFACTOR)) + return 1; //failure + + if (lhashCreate(HASH_SIZE, LOADFACTOR)) + return 1; //failure + + if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) + return 1; //failure + + //Initialize socket pool + if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) { + printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); + return 0; + } + return 0; } /* This function starts the thread to listen on a socket * for tranaction calls */ void *dstmListen() { - int listenfd, acceptfd; - struct sockaddr_in my_addr; - struct sockaddr_in client_addr; - socklen_t addrlength = sizeof(struct sockaddr); - pthread_t thread_dstm_accept; - int i; - int setsockflag=1; - - listenfd = socket(AF_INET, SOCK_STREAM, 0); - if (listenfd == -1) - { - perror("socket"); - exit(1); - } + int listenfd, acceptfd; + struct sockaddr_in my_addr; + struct sockaddr_in client_addr; + socklen_t addrlength = sizeof(struct sockaddr); + pthread_t thread_dstm_accept; + int i; + int setsockflag=1; + + listenfd = socket(AF_INET, SOCK_STREAM, 0); + if (listenfd == -1) + { + perror("socket"); + exit(1); + } - if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } + if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #ifdef MAC - if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } + if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #endif - my_addr.sin_family = AF_INET; - my_addr.sin_port = htons(LISTEN_PORT); - my_addr.sin_addr.s_addr = INADDR_ANY; - memset(&(my_addr.sin_zero), '\0', 8); + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(LISTEN_PORT); + my_addr.sin_addr.s_addr = INADDR_ANY; + memset(&(my_addr.sin_zero), '\0', 8); - if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) - { - perror("bind"); - exit(1); - } - - if (listen(listenfd, BACKLOG) == -1) - { - perror("listen"); - exit(1); - } + if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) + { + perror("bind"); + exit(1); + } - printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); - while(1) - { - int retval; - int flag=1; - acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); - setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); - do { - retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); - } while(retval!=0); - pthread_detach(thread_dstm_accept); - } + if (listen(listenfd, BACKLOG) == -1) + { + perror("listen"); + exit(1); + } + + printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); + while(1) + { + int retval; + int flag=1; + acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + if (acceptfd < 0) { + perror("Error in accept: "); + printf("error %x", acceptfd); fflush(stdout); + } + setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); + do { + retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); + } while(retval!=0); + pthread_detach(thread_dstm_accept); + } } /* This function accepts a new connection request, decodes the control message in the connection * and accordingly calls other functions to process new requests */ @@ -126,50 +130,52 @@ void *dstmAccept(void *acceptfd) { trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; - + /* Receive control messages from other machines */ - while(1) { - int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); - if (ret==0) - return; - if (ret==-1) { - printf("DEBUG -> RECV Error!.. retrying\n"); - break; - } - switch(control) { + if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { + printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__); + pthread_exit(NULL); + } + + switch(control) { case READ_REQUEST: - /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); - if((srcObj = mhashSearch(oid)) == NULL) { - printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); - break; - } - h = (objheader_t *) srcObj; - GETSIZE(size, h); - size += sizeof(objheader_t); - sockid = (int) acceptfd; - - if (h == NULL) { - ctrl = OBJECT_NOT_FOUND; - send_data(sockid, &ctrl, sizeof(char)); - } else { - /* Type */ - char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; - *((int *)&msg[1])=size; - send_data(sockid, &msg, sizeof(msg)); - send_data(sockid, h, size); - } + do { + /* Read oid requested and search if available */ + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + if((srcObj = mhashSearch(oid)) == NULL) { + printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); + break; + } + h = (objheader_t *) srcObj; + GETSIZE(size, h); + size += sizeof(objheader_t); + sockid = (int) acceptfd; + + if (h == NULL) { + ctrl = OBJECT_NOT_FOUND; + send_data(sockid, &ctrl, sizeof(char)); + } else { + /* Type */ + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + send_data(sockid, &msg, sizeof(msg)); + send_data(sockid, h, size); + } + if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { + pthread_exit(NULL); + } + } while(control == READ_REQUEST); break; - + case READ_MULT_REQUEST: break; - + case MOVE_REQUEST: break; - + case MOVE_MULT_REQUEST: break; - + case TRANS_REQUEST: /* Read transaction request */ transinfo.objlocked = NULL; @@ -178,38 +184,48 @@ void *dstmAccept(void *acceptfd) { transinfo.numlocked = 0; transinfo.numnotfound = 0; if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { - printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); + printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); + break; } break; case TRANS_PREFETCH: - if((val = prefetchReq((int)acceptfd)) != 0) { - printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); - break; - } + do { + if((val = prefetchReq((int)acceptfd)) != 0) { + printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); + break; + } + if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { + pthread_exit(NULL); + } + } while (control == TRANS_PREFETCH); break; case TRANS_PREFETCH_RESPONSE: - if((val = getPrefetchResponse((int) acceptfd)) != 0) { - printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); - break; - } + do { + if((val = getPrefetchResponse((int) acceptfd)) != 0) { + printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); + break; + } + if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { + pthread_exit(NULL); + } + } while (control == TRANS_PREFETCH_RESPONSE); break; case START_REMOTE_THREAD: recv_data((int)acceptfd, &oid, sizeof(unsigned int)); objType = getObjType(oid); startDSMthread(oid, objType); break; - + case THREAD_NOTIFY_REQUEST: recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - + recv_data((int)acceptfd, buffer, size); - + oidarry = calloc(numoid, sizeof(unsigned int)); memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); size = sizeof(unsigned int) * numoid; @@ -221,18 +237,18 @@ void *dstmAccept(void *acceptfd) { threadid = *((unsigned int *)(buffer+size)); processReqNotify(numoid, oidarry, versionarry, mid, threadid); free(buffer); - + break; case THREAD_NOTIFY_RESPONSE: size = sizeof(unsigned short) + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - + recv_data((int)acceptfd, buffer, size); - + oid = *((unsigned int *)buffer); size = sizeof(unsigned int); version = *((unsigned short *)(buffer+size)); @@ -247,10 +263,9 @@ void *dstmAccept(void *acceptfd) { default: printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); - } } - closeconnection: +closeconnection: /* Close connection */ if (close((int)acceptfd) == -1) perror("close"); diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.c b/Robust/src/Runtime/DSTM/interface/mlookup.c index 0629adbe..9000b80e 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface/mlookup.c @@ -41,11 +41,11 @@ unsigned int mhashInsert(unsigned int key, void *val) { ptr = mlookup.table; mlookup.numelements++; - index = mhashFunction(key); #ifdef DEBUG printf("DEBUG -> index = %d, key = %d, val = %x\n", index, key, val); #endif pthread_mutex_lock(&mlookup.locktable); + index = mhashFunction(key); if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable ptr[index].key = key; ptr[index].val = val; @@ -69,10 +69,10 @@ void *mhashSearch(unsigned int key) { int index; mhashlistnode_t *ptr, *node; + pthread_mutex_lock(&mlookup.locktable); ptr = mlookup.table; // Address of the beginning of hash table index = mhashFunction(key); node = &ptr[index]; - pthread_mutex_lock(&mlookup.locktable); while(node != NULL) { if(node->key == key) { pthread_mutex_unlock(&mlookup.locktable); @@ -90,11 +90,10 @@ unsigned int mhashRemove(unsigned int key) { mhashlistnode_t *curr, *prev; mhashlistnode_t *ptr, *node; + pthread_mutex_lock(&mlookup.locktable); ptr = mlookup.table; index = mhashFunction(key); curr = &ptr[index]; - - pthread_mutex_lock(&mlookup.locktable); for (; curr != NULL; curr = curr->next) { if (curr->key == key) { // Find a match in the hash table mlookup.numelements--; // Decrement the number of elements in the global hashtable