From 5a64f4126992d4010bad055397980ac3a736c420 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Tue, 3 Aug 2010 08:05:54 +0000 Subject: [PATCH] more changes --- Robust/src/Runtime/coreprof/coreprof.c | 7 +- Robust/src/Runtime/coreprof/coreprof.h | 40 +++- Robust/src/Runtime/mem.h | 6 + Robust/src/Runtime/workschedule.c | 242 +------------------------ Robust/src/buildscript | 2 +- 5 files changed, 53 insertions(+), 244 deletions(-) diff --git a/Robust/src/Runtime/coreprof/coreprof.c b/Robust/src/Runtime/coreprof/coreprof.c index 6014572b..7b61c8a6 100644 --- a/Robust/src/Runtime/coreprof/coreprof.c +++ b/Robust/src/Runtime/coreprof/coreprof.c @@ -28,7 +28,12 @@ void createprofiler() { //point thread lock variable to eventmonitor cp_events=event; - CPLOGEVENT((CP_START<value[cp_events->index++]=((x<value[cp_events->index])=rdtsc(); \ + cp_events->index+=2; struct coreprofmonitor { int index; @@ -23,12 +43,20 @@ extern __thread int cp_threadnum; extern __thread struct coreprofmonitor * cp_events; extern struct coreprofmonitor * cp_eventlist; void createprofiler(); +void exitprofiler(); void dumpprofiler(); -#define CPLOGTIME *((long long *)&cp_events->value[cp_events->index])=rdtsc(); \ - cp_events->index+=2; +static inline void *cp_calloc(int size) { + CP_LOGEVENT(CP_RUNMALLOC, CP_BEGIN); + void *mem=calloc(1,size); + CP_LOGEVENT(CP_RUNMALLOC, CP_END); + return mem; +} -#define CPLOGEVENT(x) { CP_events->value[cp_events->index++]=x; \ - CPLOGTIME \ - } +static inline void cp_free(void *ptr) { + CP_LOGEVENT(CP_RUNFREE, CP_BEGIN); + free(ptr); + CP_LOGEVENT(CP_RUNFREE, CP_END); +} +#endif #endif diff --git a/Robust/src/Runtime/mem.h b/Robust/src/Runtime/mem.h index fb0cc0a9..5aca44b5 100644 --- a/Robust/src/Runtime/mem.h +++ b/Robust/src/Runtime/mem.h @@ -13,8 +13,14 @@ #else #ifdef PRECISE_GC #include "garbage.h" +#ifdef COREPROF +#include "coreprof.h" +#define RUNMALLOC(x) cp_calloc(x) +#define RUNFREE(x) cp_free(x) +#else #define RUNMALLOC(x) calloc(1,x) #define RUNFREE(x) free(x) +#endif #else #ifdef MULTICORE void * mycalloc(int m, int size); diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c index d0c4efc3..093dacba 100644 --- a/Robust/src/Runtime/workschedule.c +++ b/Robust/src/Runtime/workschedule.c @@ -5,7 +5,7 @@ #include "mem.h" #include "workschedule.h" #include "mlp_runtime.h" - +#include "coreprof.h" // NOTE: Converting this from a work-stealing strategy // to a single-queue thread pool protected by a single @@ -17,17 +17,6 @@ // for convenience typedef struct Queue deq; - -/* -// each worker needs the following -typedef struct workerData_t { - pthread_t workerThread; - pthread_mutex_t dequeLock; - deq* dequeWorkUnits; - int nextWorkerToLoad; -} workerData; -*/ - typedef struct workerData_t{ pthread_t workerThread; int id; @@ -37,21 +26,13 @@ typedef struct workerData_t{ static pthread_mutex_t systemLockIn; static pthread_mutex_t systemLockOut; -// just one queue for everyone -//static pthread_mutex_t dequeLock; - - - // implementation internal data static WorkerData* workerDataArray; static pthread_t* workerArray; static int systemStarted = 0; -//static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER; -//static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER; -//static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER; static void(*workFunc)(void*); static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER; @@ -69,139 +50,19 @@ extern __thread SESEcommon* seseCommon; __thread int oid; -/* -struct QI { - struct QI * next; - void * value; -}; - -struct QI * headqi; -struct QI * tailqi; -*/ - -/* -// helper func -int threadID2workerIndex( pthread_t id ) { - int i; - for( i = 0; i < numWorkers; ++i ) { - if( workerDataArray[i].workerThread == id ) { - return i; - } - } - // if we didn't find it, we are an outside - // thread and should pick arbitrary worker - return 0; -} -*/ - - -/* -// the worker thread main func, which takes a func -// from user for processing any one work unit, then -// workers use it to process work units and steal -// them from one another void* workerMain( void* arg ) { - - workerData* myData = (workerData*) arg; - - void* workUnit; - - int i; - int j; - - // all workers wait until system is ready - pthread_mutex_lock ( &systemBeginLock ); - pthread_cond_wait ( &systemBeginCond, &systemBeginLock ); - pthread_mutex_unlock( &systemBeginLock ); - - while( 1 ) { - - // lock my deque - pthread_mutex_lock( &(myData->dequeLock) ); - - if( isEmpty( myData->dequeWorkUnits ) ) { - - // my deque is empty, try to steal - pthread_mutex_unlock( &(myData->dequeLock) ); - - workUnit = NULL; - j = myData->nextWorkerToLoad; - - // look at everyone's queue at least twice - for( i = 0; i < numWorkers; ++i ) { - if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } - - ++j; if( j == numWorkers ) { j = 0; } - - pthread_mutex_lock( &(workerDataArray[j].dequeLock) ); - - if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) { - pthread_mutex_unlock( &(workerDataArray[j].dequeLock) ); - // no work here, yield and then keep looking - if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } - continue; - } - - // found some work in another deque, steal it - workUnit = getItemBack( workerDataArray[j].dequeWorkUnits ); - pthread_mutex_unlock( &(workerDataArray[j].dequeLock) ); - break; - } - - // didn't find any work, even in my own deque, - // after checking everyone twice? Exit thread - if( workUnit == NULL ) { - break; - } - - } else { - // have work in own deque, take out from front - workUnit = getItem( myData->dequeWorkUnits ); - pthread_mutex_unlock( &(myData->dequeLock) ); - } - - // wherever the work came from, process it - workFunc( workUnit ); - - if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } - } - - printf( "Worker %d exiting.\n", myData->workerThread ); - fflush( stdout ); - - return NULL; -} -*/ - - -void* workerMain( void* arg ) { - void* workUnit; WorkerData* myData = (WorkerData*) arg; + //Start profiler + CREATEPROFILER(); + oid=myData->id; - // make sure init mlp once-per-thread stuff - //pthread_once( &mlpOnceObj, mlpInitOncePerThread ); // all workers wait until system is ready // then continue to process work while( 1 ) { - - /* - while(1){ - if(pthread_mutex_trylock(&systemLock)==0){ - if(isEmpty(dequeWorkUnits)){ - pthread_mutex_unlock(&systemLock); - }else{ - break; - } - } - } - workUnit = getItem( dequeWorkUnits ); - pthread_mutex_unlock( &systemLock ); - */ - pthread_mutex_lock( &systemLockOut ); // wait for work if (headqi->next==NULL) { @@ -214,10 +75,6 @@ void* workerMain( void* arg ) { workUnit = headqi->value; pthread_mutex_unlock( &systemLockOut ); free(tmp); - // yield processor before moving on, just to exercise - // system's out-of-order correctness - //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } - //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } pthread_mutex_lock(&gclistlock); @@ -244,71 +101,19 @@ void* workerMain( void* arg ) { litem.next->prev=litem.prev; } pthread_mutex_unlock(&gclistlock); - } - + EXITPROFILER(); return NULL; } - -/* -void workScheduleInit( int numProcessors, - void(*func)(void*) ) { - int i, status; - - numWorkers = numProcessors; - workFunc = func; - - // allocate space for worker data - workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers ); - - for( i = 0; i < numWorkers; ++i ) { - - // the deque - workerDataArray[i].dequeWorkUnits = createQueue(); - - // set the next worker to add work to as itself - workerDataArray[i].nextWorkerToLoad = i; - - // it's lock - status = pthread_mutex_init( &(workerDataArray[i].dequeLock), - NULL - ); - if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } - } - - // only create the actual pthreads after all workers - // have data that is protected with initialized locks - for( i = 0; i < numWorkers; ++i ) { - status = pthread_create( &(workerDataArray[i].workerThread), - NULL, - workerMain, - (void*) &(workerDataArray[i]) - ); - if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } - } - - // yield and let all workers get to the begin - // condition variable, waiting--we have to hold them - // so they don't all see empty work queues right away - if( sched_yield() == -1 ) { - printf( "Error thread trying to yield.\n" ); - exit( -1 ); - } -} -*/ - - void workScheduleInit( int numProcessors, void(*func)(void*) ) { int i, status; - pthread_mutex_init(&gclock, NULL); pthread_mutex_init(&gclistlock, NULL); pthread_cond_init(&gccond, NULL); - //numWorkers = numProcessors*5; numWorkers = numProcessors + 1; workFunc = func; @@ -319,7 +124,6 @@ void workScheduleInit( int numProcessors, status = pthread_mutex_init( &systemLockIn, NULL ); status = pthread_mutex_init( &systemLockOut, NULL ); - //workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers ); workerDataArray = RUNMALLOC( sizeof( WorkerData ) * numWorkers ); for( i = 0; i < numWorkers; ++i ) { @@ -329,7 +133,6 @@ void workScheduleInit( int numProcessors, workerMain, (void*) &(workerDataArray[i]) ); - //status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL ); if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } // yield and let all workers get to the beginx3 @@ -339,38 +142,7 @@ void workScheduleInit( int numProcessors, } } - -/* void workScheduleSubmit( void* workUnit ) { - - // query who is submitting and find out who they are scheduled to load - int submitterIndex = threadID2workerIndex( pthread_self() ); - int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad; - - // choose a new index and save it - ++workerIndex; - if( workerIndex == numWorkers ) { - workerIndex = 0; - } - workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex; - - // load the chosen worker - pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) ); - addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit ); - pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) ); -} -*/ - -void workScheduleSubmit( void* workUnit ) { - /* - while(1){ - if(pthread_mutex_trylock(&systemLock)==0){ - addNewItemBack( dequeWorkUnits, workUnit ); - break; - } - } - pthread_mutex_unlock( &systemLock ); - */ struct QI* item=RUNMALLOC(sizeof(struct QI)); item->value=workUnit; item->next=NULL; @@ -384,16 +156,14 @@ void workScheduleSubmit( void* workUnit ) { // really should be named "wait until work is finished" void workScheduleBegin() { - int i; WorkerData *workerData = RUNMALLOC( sizeof( WorkerData ) ); workerData->id=1; - // workerMain(NULL); workerMain(workerData); // tell all workers to begin for( i = 0; i < numWorkers; ++i ) { - //pthread_join( workerArray[i], NULL ); pthread_join( workerDataArray[i].workerThread, NULL ); } + DUMPPROFILER(); } diff --git a/Robust/src/buildscript b/Robust/src/buildscript index b445cc68..aac0f29b 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -491,7 +491,7 @@ elif [[ $1 = '-coreprof' ]] then COREPROF=true JAVAOPTS="$JAVAOPTS -coreprof" -EXTRAOPTIONS="$EXTRAOPTIONS -DCOREPROF" +EXTRAOPTIONS="$EXTRAOPTIONS -DCOREPROF -I$ROBUSTROOT/Runtime/coreprof" shift shift -- 2.34.1