--- /dev/null
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+#include "Queue.h"
+#include "workschedule.h"
+
+
+// for convenience
+typedef struct Queue deq;
+
+
+// each worker needs the following
+typedef struct workerData_t {
+ pthread_t workerThread;
+ pthread_mutex_t dequeLock;
+ deq* dequeWorkUnits;
+ int nextWorkerToLoad;
+} workerData;
+
+
+// implementation internal data
+static int numWorkers;
+static workerData* workerDataArray;
+static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
+static void(*workFunc)(void*);
+
+
+// helper func
+int threadID2workerIndex( pthread_t id ) {
+ int i;
+ for( i = 0; i < numWorkers; ++i ) {
+ if( workerDataArray[i].workerThread == id ) {
+ return i;
+ }
+ }
+ // if we didn't find it, we are an outside
+ // thread and should pick arbitrary worker
+ return 0;
+}
+
+
+
+// the worker thread main func, which takes a func
+// from user for processing any one work unit, then
+// workers use it to process work units and steal
+// them from one another
+void* workerMain( void* arg ) {
+
+ workerData* myData = (workerData*) arg;
+
+ void* workUnit;
+
+ int i;
+ int j;
+
+ // all workers wait until system is ready
+ pthread_mutex_lock ( &systemBeginLock );
+ pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
+ pthread_mutex_unlock( &systemBeginLock );
+
+ while( 1 ) {
+
+ // lock my deque
+ pthread_mutex_lock( &(myData->dequeLock) );
+
+ if( isEmpty( myData->dequeWorkUnits ) ) {
+
+ // my deque is empty, try to steal
+ pthread_mutex_unlock( &(myData->dequeLock) );
+
+ workUnit = NULL;
+ j = myData->nextWorkerToLoad;
+
+ // look at everyone's queue at least twice
+ for( i = 0; i < numWorkers; ++i ) {
+ if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+
+ ++j; if( j == numWorkers ) { j = 0; }
+
+ pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
+
+ if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
+ pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
+ // no work here, keep looking
+ continue;
+ }
+
+ // found some work in another deque, steal it
+ workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
+ pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
+ break;
+ }
+
+ // didn't find any work, even in my own deque,
+ // after checking everyone twice? Exit thread
+ if( workUnit == NULL ) {
+ break;
+ }
+
+ } else {
+ // have work in own deque, take out from front
+ workUnit = getItem( myData->dequeWorkUnits );
+ pthread_mutex_unlock( &(myData->dequeLock) );
+ }
+
+ // wherever the work came from, process it
+ workFunc( workUnit );
+
+ if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+ }
+
+ printf( "Worker %d exiting.\n", myData->workerThread );
+ fflush( stdout );
+
+ return NULL;
+}
+
+
+void workScheduleInit( int numProcessors,
+ void(*func)(void*) ) {
+ int i, status;
+
+ numWorkers = numProcessors;
+ workFunc = func;
+
+ // allocate space for worker data
+ workerDataArray = malloc( sizeof( workerData ) * numWorkers );
+ for( i = 0; i < numWorkers; ++i ) {
+
+ // the deque
+ workerDataArray[i].dequeWorkUnits = createQueue();
+
+ // set the next worker to add work to as itself
+ workerDataArray[i].nextWorkerToLoad = i;
+
+ // it's lock
+ status = pthread_mutex_init( &(workerDataArray[i].dequeLock),
+ NULL
+ );
+ if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+ }
+
+ // only create the actual pthreads after all workers
+ // have data that is protected with initialized locks
+ for( i = 0; i < numWorkers; ++i ) {
+ status = pthread_create( &(workerDataArray[i].workerThread),
+ NULL,
+ workerMain,
+ (void*) &(workerDataArray[i])
+ );
+ if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+ }
+
+ // yield and let all workers get to the begin
+ // condition variable, waiting--we have to hold them
+ // so they don't all see empty work queues right away
+ if( sched_yield() == -1 ) {
+ printf( "Error thread trying to yield.\n" );
+ exit( -1 );
+ }
+}
+
+
+void workScheduleSubmit( void* workUnit ) {
+
+ // query who is submitting and find out who they are scheduled to load
+ int submitterIndex = threadID2workerIndex( pthread_self() );
+ int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad;
+
+ // choose a new index and save it
+ ++workerIndex;
+ if( workerIndex == numWorkers ) {
+ workerIndex = 0;
+ }
+ workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
+
+ // load the chosen worker
+ pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) );
+ addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit );
+ pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
+}
+
+
+// really should be named "wait until work is finished"
+void workScheduleBegin() {
+
+ int i;
+
+ // tell all workers to begin
+ pthread_mutex_lock ( &systemBeginLock );
+ pthread_cond_broadcast( &systemBeginCond );
+ pthread_mutex_unlock ( &systemBeginLock );
+
+ // wait until workers inform that all work is complete
+ /*
+ pthread_mutex_lock ( &systemReturnLock );
+ pthread_cond_wait ( &systemReturnCond, &systemReturnLock );
+ pthread_mutex_unlock( &systemReturnLock );
+ */
+
+ for( i = 0; i < numWorkers; ++i ) {
+ pthread_join( workerDataArray[i].workerThread, NULL );
+ }
+}
--- /dev/null
+#ifndef __WORK_SCHEDULE__
+#define __WORK_SCHEDULE__
+
+
+// initialize the work schedule system, after
+// which some preliminary work units can be
+// scheduled. Note the supplied work func
+// should operate on a work unit of the type
+// submitted in the function below
+void workScheduleInit( int numProcessors,
+ void(*workFunc)(void*) );
+
+// your main program, before beginning this
+// system, or the execution of worker threads
+// themselves use this submit function to
+// distribute work units among the worker pool
+// threads. The workers will dynamically
+// steal from one another to load balance
+void workScheduleSubmit( void* workUnit );
+
+// once you call this begin function your main
+// thread becomes a work thread, so programs
+// should not expect to return from this
+void workScheduleBegin();
+
+
+#endif /* __WORK_SCHEDULE__ */
--- /dev/null
+#include <stdlib.h>
+#include <stdio.h>
+#include <sys/time.h>
+
+#include "workschedule.h"
+
+
+static int workLoad = 0;
+
+
+typedef struct wUnit_ {
+ int n;
+ double* answer;
+} wUnit;
+
+
+double findSqrt( int x ) {
+ double a, b, p;
+ double e = 0.00001;
+ double k = (double) x;
+ int i;
+
+ // just do it a bunch of times
+ // to generate more work
+ for( i = 0; i < 100000; ++i ) {
+
+ a = k;
+ p = a*a;
+ while( p - k >= e ) {
+ b = (a + (k/a)) / 2.0;
+ a = b;
+ p = a*a;
+ }
+ }
+
+ return a;
+}
+
+
+void processWorkUnit( void* workUnit ) {
+
+ wUnit* w = (wUnit*) workUnit;
+ wUnit* x = NULL;
+
+ // submit more work
+ if( w->n > 0 ) {
+ x = malloc( sizeof( wUnit ) );
+ x->n = w->n - 1;
+ x->answer = w->answer - 1;
+ workScheduleSubmit( (void*)x );
+ }
+
+ // then start computing
+ // and store answer
+ *(w->answer) = findSqrt( w->n );
+
+ // workunit no longer needed
+ free( w );
+}
+
+
+
+void usage() {
+ printf( "usage:\na.out <int workers> <int workLoad> <str singlethread/workschedule>\n" );
+}
+
+
+int main( int argc, char** argv ) {
+
+ struct timeval tstart;
+ struct timeval tfinish;
+ double timediff;
+ double* answers;
+ double answerSummary;
+ int i;
+
+ if( argc != 4 ) {
+ usage();
+ exit( 0 );
+ }
+
+ // initialize solution array outside of timing
+ workLoad = atoi( argv[2] );
+ answers = malloc( sizeof( double ) * workLoad );
+ for( i = 0; i < workLoad; ++i ) {
+ answers[i] = 0.0;
+ }
+
+
+ gettimeofday( &tstart, NULL );
+
+
+ if( strcmp( argv[3], "singlethread" ) == 0 ) {
+
+ for( i = 0; i < workLoad; ++i ) {
+ answers[i] = findSqrt( i );
+ }
+
+ } else if( strcmp( argv[3], "workschedule" ) == 0 ) {
+ // initialize the system
+ workScheduleInit( atoi( argv[1] ), // num processors
+ processWorkUnit ); // func for processing
+
+ // add a preliminary work unit
+ wUnit* w = malloc( sizeof( wUnit ) );
+ w->n = workLoad-1;
+ w->answer = &(answers[workLoad-1]);
+ workScheduleSubmit( (void*)w );
+
+ // start work schedule, some work will
+ // generate more work, when its all done
+ // the system will return to this point
+ workScheduleBegin();
+
+ } else {
+ usage();
+ exit( 0 );
+ }
+
+
+ gettimeofday( &tfinish, NULL );
+
+
+ // summarize answers outside of timing
+ answerSummary = 0.0;
+ for( i = 0; i < workLoad; ++i ) {
+ answerSummary += answers[i];
+ }
+
+
+ timediff = (double)(((tfinish.tv_sec - tstart.tv_sec )*1000000)+
+ ((tfinish.tv_usec - tstart.tv_usec)*1 ))
+ / 1000000.0;
+
+ printf( "\n\nComputed summary %f in %f seconds.\n",
+ answerSummary,
+ timediff );
+
+ return 0;
+}