From 426761a08f4ac91ea85da50a60718ebfb47881eb Mon Sep 17 00:00:00 2001 From: jjenista Date: Fri, 12 Jun 2009 22:39:04 +0000 Subject: [PATCH] work stealing schedule system cleanly runs highly parallel workload at appropriate load factor --- Robust/src/Runtime/workschedule.c | 207 +++++++++++++++++++++++ Robust/src/Runtime/workschedule.h | 27 +++ Robust/src/Tests/workSchedule/makefile | 21 +++ Robust/src/Tests/workSchedule/testMain.c | 140 +++++++++++++++ 4 files changed, 395 insertions(+) create mode 100644 Robust/src/Runtime/workschedule.c create mode 100644 Robust/src/Runtime/workschedule.h create mode 100644 Robust/src/Tests/workSchedule/makefile create mode 100644 Robust/src/Tests/workSchedule/testMain.c diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c new file mode 100644 index 00000000..85c019b5 --- /dev/null +++ b/Robust/src/Runtime/workschedule.c @@ -0,0 +1,207 @@ +#include +#include +#include +#include "Queue.h" +#include "workschedule.h" + + +// for convenience +typedef struct Queue deq; + + +// each worker needs the following +typedef struct workerData_t { + pthread_t workerThread; + pthread_mutex_t dequeLock; + deq* dequeWorkUnits; + int nextWorkerToLoad; +} workerData; + + +// implementation internal data +static int numWorkers; +static workerData* workerDataArray; +static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER; +static void(*workFunc)(void*); + + +// helper func +int threadID2workerIndex( pthread_t id ) { + int i; + for( i = 0; i < numWorkers; ++i ) { + if( workerDataArray[i].workerThread == id ) { + return i; + } + } + // if we didn't find it, we are an outside + // thread and should pick arbitrary worker + return 0; +} + + + +// the worker thread main func, which takes a func +// from user for processing any one work unit, then +// workers use it to process work units and steal +// them from one another +void* workerMain( void* arg ) { + + workerData* myData = (workerData*) arg; + + void* workUnit; + + int i; + int j; + + // all workers wait until system is ready + pthread_mutex_lock ( &systemBeginLock ); + pthread_cond_wait ( &systemBeginCond, &systemBeginLock ); + pthread_mutex_unlock( &systemBeginLock ); + + while( 1 ) { + + // lock my deque + pthread_mutex_lock( &(myData->dequeLock) ); + + if( isEmpty( myData->dequeWorkUnits ) ) { + + // my deque is empty, try to steal + pthread_mutex_unlock( &(myData->dequeLock) ); + + workUnit = NULL; + j = myData->nextWorkerToLoad; + + // look at everyone's queue at least twice + for( i = 0; i < numWorkers; ++i ) { + if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } + + ++j; if( j == numWorkers ) { j = 0; } + + pthread_mutex_lock( &(workerDataArray[j].dequeLock) ); + + if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) { + pthread_mutex_unlock( &(workerDataArray[j].dequeLock) ); + // no work here, keep looking + continue; + } + + // found some work in another deque, steal it + workUnit = getItemBack( workerDataArray[j].dequeWorkUnits ); + pthread_mutex_unlock( &(workerDataArray[j].dequeLock) ); + break; + } + + // didn't find any work, even in my own deque, + // after checking everyone twice? Exit thread + if( workUnit == NULL ) { + break; + } + + } else { + // have work in own deque, take out from front + workUnit = getItem( myData->dequeWorkUnits ); + pthread_mutex_unlock( &(myData->dequeLock) ); + } + + // wherever the work came from, process it + workFunc( workUnit ); + + if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } + } + + printf( "Worker %d exiting.\n", myData->workerThread ); + fflush( stdout ); + + return NULL; +} + + +void workScheduleInit( int numProcessors, + void(*func)(void*) ) { + int i, status; + + numWorkers = numProcessors; + workFunc = func; + + // allocate space for worker data + workerDataArray = malloc( sizeof( workerData ) * numWorkers ); + for( i = 0; i < numWorkers; ++i ) { + + // the deque + workerDataArray[i].dequeWorkUnits = createQueue(); + + // set the next worker to add work to as itself + workerDataArray[i].nextWorkerToLoad = i; + + // it's lock + status = pthread_mutex_init( &(workerDataArray[i].dequeLock), + NULL + ); + if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } + } + + // only create the actual pthreads after all workers + // have data that is protected with initialized locks + for( i = 0; i < numWorkers; ++i ) { + status = pthread_create( &(workerDataArray[i].workerThread), + NULL, + workerMain, + (void*) &(workerDataArray[i]) + ); + if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } + } + + // yield and let all workers get to the begin + // condition variable, waiting--we have to hold them + // so they don't all see empty work queues right away + if( sched_yield() == -1 ) { + printf( "Error thread trying to yield.\n" ); + exit( -1 ); + } +} + + +void workScheduleSubmit( void* workUnit ) { + + // query who is submitting and find out who they are scheduled to load + int submitterIndex = threadID2workerIndex( pthread_self() ); + int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad; + + // choose a new index and save it + ++workerIndex; + if( workerIndex == numWorkers ) { + workerIndex = 0; + } + workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex; + + // load the chosen worker + pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) ); + addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit ); + pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) ); +} + + +// really should be named "wait until work is finished" +void workScheduleBegin() { + + int i; + + // tell all workers to begin + pthread_mutex_lock ( &systemBeginLock ); + pthread_cond_broadcast( &systemBeginCond ); + pthread_mutex_unlock ( &systemBeginLock ); + + // wait until workers inform that all work is complete + /* + pthread_mutex_lock ( &systemReturnLock ); + pthread_cond_wait ( &systemReturnCond, &systemReturnLock ); + pthread_mutex_unlock( &systemReturnLock ); + */ + + for( i = 0; i < numWorkers; ++i ) { + pthread_join( workerDataArray[i].workerThread, NULL ); + } +} diff --git a/Robust/src/Runtime/workschedule.h b/Robust/src/Runtime/workschedule.h new file mode 100644 index 00000000..78a976ac --- /dev/null +++ b/Robust/src/Runtime/workschedule.h @@ -0,0 +1,27 @@ +#ifndef __WORK_SCHEDULE__ +#define __WORK_SCHEDULE__ + + +// initialize the work schedule system, after +// which some preliminary work units can be +// scheduled. Note the supplied work func +// should operate on a work unit of the type +// submitted in the function below +void workScheduleInit( int numProcessors, + void(*workFunc)(void*) ); + +// your main program, before beginning this +// system, or the execution of worker threads +// themselves use this submit function to +// distribute work units among the worker pool +// threads. The workers will dynamically +// steal from one another to load balance +void workScheduleSubmit( void* workUnit ); + +// once you call this begin function your main +// thread becomes a work thread, so programs +// should not expect to return from this +void workScheduleBegin(); + + +#endif /* __WORK_SCHEDULE__ */ diff --git a/Robust/src/Tests/workSchedule/makefile b/Robust/src/Tests/workSchedule/makefile new file mode 100644 index 00000000..f46df399 --- /dev/null +++ b/Robust/src/Tests/workSchedule/makefile @@ -0,0 +1,21 @@ +QDIR=../../Runtime +DEFS= -D "RUNMALLOC=malloc" -D "RUNFREE=free" -D "DEBUG_WORKSCH=" + +all: a.out + +a.out: testMain.o workschedule.o queue.o + gcc testMain.o workschedule.o queue.o -lpthread + +workschedule.o: $(QDIR)/workschedule.h $(QDIR)/workschedule.c $(QDIR)/Queue.h + gcc -c -I$(QDIR) $(DEFS) $(QDIR)/workschedule.c -o workschedule.o + +queue.o: $(QDIR)/Queue.h $(QDIR)/Queue.c + gcc -c -I$(QDIR) $(DEFS) $(QDIR)/Queue.c -o queue.o + +testMain.o: testMain.c $(QDIR)/workschedule.h + gcc -c -I$(QDIR) $(DEFS) testMain.c -o testMain.o + +clean: + rm -f a.out + rm -f *.o + rm *~ diff --git a/Robust/src/Tests/workSchedule/testMain.c b/Robust/src/Tests/workSchedule/testMain.c new file mode 100644 index 00000000..cbf32a0d --- /dev/null +++ b/Robust/src/Tests/workSchedule/testMain.c @@ -0,0 +1,140 @@ +#include +#include +#include + +#include "workschedule.h" + + +static int workLoad = 0; + + +typedef struct wUnit_ { + int n; + double* answer; +} wUnit; + + +double findSqrt( int x ) { + double a, b, p; + double e = 0.00001; + double k = (double) x; + int i; + + // just do it a bunch of times + // to generate more work + for( i = 0; i < 100000; ++i ) { + + a = k; + p = a*a; + while( p - k >= e ) { + b = (a + (k/a)) / 2.0; + a = b; + p = a*a; + } + } + + return a; +} + + +void processWorkUnit( void* workUnit ) { + + wUnit* w = (wUnit*) workUnit; + wUnit* x = NULL; + + // submit more work + if( w->n > 0 ) { + x = malloc( sizeof( wUnit ) ); + x->n = w->n - 1; + x->answer = w->answer - 1; + workScheduleSubmit( (void*)x ); + } + + // then start computing + // and store answer + *(w->answer) = findSqrt( w->n ); + + // workunit no longer needed + free( w ); +} + + + +void usage() { + printf( "usage:\na.out \n" ); +} + + +int main( int argc, char** argv ) { + + struct timeval tstart; + struct timeval tfinish; + double timediff; + double* answers; + double answerSummary; + int i; + + if( argc != 4 ) { + usage(); + exit( 0 ); + } + + // initialize solution array outside of timing + workLoad = atoi( argv[2] ); + answers = malloc( sizeof( double ) * workLoad ); + for( i = 0; i < workLoad; ++i ) { + answers[i] = 0.0; + } + + + gettimeofday( &tstart, NULL ); + + + if( strcmp( argv[3], "singlethread" ) == 0 ) { + + for( i = 0; i < workLoad; ++i ) { + answers[i] = findSqrt( i ); + } + + } else if( strcmp( argv[3], "workschedule" ) == 0 ) { + // initialize the system + workScheduleInit( atoi( argv[1] ), // num processors + processWorkUnit ); // func for processing + + // add a preliminary work unit + wUnit* w = malloc( sizeof( wUnit ) ); + w->n = workLoad-1; + w->answer = &(answers[workLoad-1]); + workScheduleSubmit( (void*)w ); + + // start work schedule, some work will + // generate more work, when its all done + // the system will return to this point + workScheduleBegin(); + + } else { + usage(); + exit( 0 ); + } + + + gettimeofday( &tfinish, NULL ); + + + // summarize answers outside of timing + answerSummary = 0.0; + for( i = 0; i < workLoad; ++i ) { + answerSummary += answers[i]; + } + + + timediff = (double)(((tfinish.tv_sec - tstart.tv_sec )*1000000)+ + ((tfinish.tv_usec - tstart.tv_usec)*1 )) + / 1000000.0; + + printf( "\n\nComputed summary %f in %f seconds.\n", + answerSummary, + timediff ); + + return 0; +} -- 2.34.1