From 4ea90e69b0cc2aae06a60a0e8dcb923062119769 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 20 Sep 2007 09:43:59 +0000 Subject: [PATCH] fixed some thread allocation bugs: 1) if thread creation fails, retry it...the os always the option just to not cooperate 2) need to either: a) join a thread or b) set it as a detached thread --- .../src/Runtime/DSTM/interface/dstmserver.c | 9 +++++--- Robust/src/Runtime/DSTM/interface/trans.c | 22 ++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 7b27ce8d..7828afa7 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -35,7 +35,6 @@ int dstmInit(void) /* Initialize attribute for mutex */ pthread_mutexattr_init(&mainobjstore_mutex_attr); pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); - //pthread_mutex_init(&mainobjstore_mutex, NULL); pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -96,8 +95,12 @@ void *dstmListen() printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); while(1) { - acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); - pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); + int retval; + acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + do { + retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); + } while(retval!=0); + pthread_detach(thread_dstm_accept); } } /* This function accepts a new connection request, decodes the control message in the connection diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 6e6caba5..c7ff1f40 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -106,7 +106,7 @@ int dstmStartup(const char * option) { if (master) { pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&thread_Listen, &attr, dstmListen, NULL); return 1; } else { @@ -124,6 +124,7 @@ int dstmStartup(const char * option) { * processes the prefetch requests */ void transInit() { int t, rc; + int retval; //Create and initialize prefetch cache structure prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); @@ -141,15 +142,19 @@ void transInit() { //Initialize machine pile w/prefetch oids and offsets shared queue mcpileqInit(); //Create the primary prefetch thread - pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + + do { + retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + } while(retval!=0); + pthread_detach(tPrefetch); + //Create and Initialize a pool of threads /* Threads are active for the entire period runtime is running */ for(t = 0; t< NUM_THREADS; t++) { + do { rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t); - if (rc) { - printf("Thread create error %s, %d\n", __FILE__, __LINE__); - return; - } + } while(rc!=0); + pthread_detach(wthreads[t]); } } @@ -181,6 +186,7 @@ void randomdelay(void) /* This function initializes things required in the transaction start*/ transrecord_t *transStart() { + printf("Starting transaction\n"); transrecord_t *tmp = malloc(sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); @@ -467,7 +473,9 @@ int transCommit(transrecord_t *record) { thread_data_array[threadnum].rec = record; /* If local do not create any extra connection */ if(pile->mid != myIpAddr) { /* Not local */ + do { rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); + } while(rc!=0); if(rc) { perror("Error in pthread create\n"); pthread_cond_destroy(&tcond); @@ -484,7 +492,9 @@ int transCommit(transrecord_t *record) { } else { /*Local*/ ltdata->tdata = &thread_data_array[threadnum]; ltdata->transinfo = &transinfo; + do { val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); + } while(val!=0); if(val) { perror("Error in pthread create\n"); pthread_cond_destroy(&tcond); -- 2.34.1