From fd2adb87c7f9aa7238511956057b8bc87f90819e Mon Sep 17 00:00:00 2001 From: jjenista Date: Mon, 27 Jul 2009 17:32:59 +0000 Subject: [PATCH] simplified work schedule, will implement work-stealing later --- Robust/src/Runtime/workschedule.c | 107 +++++++++++++++++++++++++++--- 1 file changed, 97 insertions(+), 10 deletions(-) diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c index 0f9e49bb..01aeab37 100644 --- a/Robust/src/Runtime/workschedule.c +++ b/Robust/src/Runtime/workschedule.c @@ -7,10 +7,19 @@ #include "workschedule.h" + +// NOTE: Converting this from a work-stealing strategy +// to a single-queue thread pool protected by a single +// lock. This will not scale, but it will support +// development of the system for now + + + // for convenience typedef struct Queue deq; +/* // each worker needs the following typedef struct workerData_t { pthread_t workerThread; @@ -18,11 +27,17 @@ typedef struct workerData_t { deq* dequeWorkUnits; int nextWorkerToLoad; } workerData; +*/ +// just one queue for everyone +static pthread_mutex_t dequeLock; +static deq* dequeWorkUnits; + // implementation internal data static int numWorkers; -static workerData* workerDataArray; +//static workerData* workerDataArray; +static pthread_t* workerArray; static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER; @@ -30,6 +45,7 @@ static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER; static void(*workFunc)(void*); +/* // helper func int threadID2workerIndex( pthread_t id ) { int i; @@ -42,9 +58,10 @@ int threadID2workerIndex( pthread_t id ) { // thread and should pick arbitrary worker return 0; } +*/ - +/* // the worker thread main func, which takes a func // from user for processing any one work unit, then // workers use it to process work units and steal @@ -120,8 +137,49 @@ void* workerMain( void* arg ) { return NULL; } +*/ + + +void* workerMain( void* arg ) { + + void* workUnit; + + int tries = 3; + + // all workers wait until system is ready + pthread_mutex_lock ( &systemBeginLock ); + pthread_cond_wait ( &systemBeginCond, &systemBeginLock ); + pthread_mutex_unlock( &systemBeginLock ); + + while( tries > 0 ) { + + pthread_mutex_lock( &dequeLock ); + + // look in the queue for work + if( !isEmpty( dequeWorkUnits ) ) { + workUnit = getItem( dequeWorkUnits ); + } else { + workUnit = NULL; + } + + pthread_mutex_unlock( &dequeLock ); + + // yield processor before moving on + if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } + + if( workUnit != NULL ) { + workFunc( workUnit ); + tries = 3; + } else { + --tries; + } + } + + return NULL; +} +/* void workScheduleInit( int numProcessors, void(*func)(void*) ) { int i, status; @@ -131,6 +189,7 @@ void workScheduleInit( int numProcessors, // allocate space for worker data workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers ); + for( i = 0; i < numWorkers; ++i ) { // the deque @@ -165,8 +224,36 @@ void workScheduleInit( int numProcessors, exit( -1 ); } } +*/ + + +void workScheduleInit( int numProcessors, + void(*func)(void*) ) { + int i, status; + + numWorkers = numProcessors; + workFunc = func; + + dequeWorkUnits = createQueue(); + + status = pthread_mutex_init( &dequeLock, NULL ); + if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } + + workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers ); + + for( i = 0; i < numWorkers; ++i ) { + status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL ); + if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } + + // yield and let all workers get to the beginx3 + // condition variable, waiting--we have to hold them + // so they don't all see empty work queues right away + if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } + } +} +/* void workScheduleSubmit( void* workUnit ) { // query who is submitting and find out who they are scheduled to load @@ -185,6 +272,13 @@ void workScheduleSubmit( void* workUnit ) { addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit ); pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) ); } +*/ + +void workScheduleSubmit( void* workUnit ) { + pthread_mutex_lock ( &dequeLock ); + addNewItemBack ( dequeWorkUnits, workUnit ); + pthread_mutex_unlock( &dequeLock ); +} // really should be named "wait until work is finished" @@ -197,14 +291,7 @@ void workScheduleBegin() { pthread_cond_broadcast( &systemBeginCond ); pthread_mutex_unlock ( &systemBeginLock ); - // wait until workers inform that all work is complete - /* - pthread_mutex_lock ( &systemReturnLock ); - pthread_cond_wait ( &systemReturnCond, &systemReturnLock ); - pthread_mutex_unlock( &systemReturnLock ); - */ - for( i = 0; i < numWorkers; ++i ) { - pthread_join( workerDataArray[i].workerThread, NULL ); + pthread_join( workerArray[i], NULL ); } } -- 2.34.1