work stealing schedule system cleanly runs highly parallel workload at appropriate...
authorjjenista <jjenista>
Fri, 12 Jun 2009 22:39:04 +0000 (22:39 +0000)
committerjjenista <jjenista>
Fri, 12 Jun 2009 22:39:04 +0000 (22:39 +0000)
Robust/src/Runtime/workschedule.c [new file with mode: 0644]
Robust/src/Runtime/workschedule.h [new file with mode: 0644]
Robust/src/Tests/workSchedule/makefile [new file with mode: 0644]
Robust/src/Tests/workSchedule/testMain.c [new file with mode: 0644]

diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c
new file mode 100644 (file)
index 0000000..85c019b
--- /dev/null
@@ -0,0 +1,207 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+#include "Queue.h"
+#include "workschedule.h"
+
+
+// for convenience
+typedef struct Queue deq;
+
+
+// each worker needs the following
+typedef struct workerData_t {
+  pthread_t       workerThread;
+  pthread_mutex_t dequeLock;
+  deq*            dequeWorkUnits;
+  int             nextWorkerToLoad;
+} workerData;
+
+
+// implementation internal data
+static int             numWorkers;
+static workerData*     workerDataArray;
+static pthread_mutex_t systemBeginLock  = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  systemReturnCond = PTHREAD_COND_INITIALIZER;
+static void(*workFunc)(void*);
+
+
+// helper func
+int threadID2workerIndex( pthread_t id ) {
+  int i;
+  for( i = 0; i < numWorkers; ++i ) {
+    if( workerDataArray[i].workerThread == id ) {
+      return i;
+    }
+  }
+  // if we didn't find it, we are an outside
+  // thread and should pick arbitrary worker
+  return 0;
+}
+
+
+
+// the worker thread main func, which takes a func
+// from user for processing any one work unit, then
+// workers use it to process work units and steal
+// them from one another
+void* workerMain( void* arg ) {
+
+  workerData* myData = (workerData*) arg;
+  
+  void* workUnit;
+
+  int i;
+  int j;
+
+  // all workers wait until system is ready
+  pthread_mutex_lock  ( &systemBeginLock );
+  pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
+  pthread_mutex_unlock( &systemBeginLock );
+
+  while( 1 ) {
+
+    // lock my deque
+    pthread_mutex_lock( &(myData->dequeLock) );
+
+    if( isEmpty( myData->dequeWorkUnits ) ) {
+
+      // my deque is empty, try to steal
+      pthread_mutex_unlock( &(myData->dequeLock) );
+      
+      workUnit = NULL;
+      j = myData->nextWorkerToLoad;
+
+      // look at everyone's queue at least twice
+      for( i = 0; i < numWorkers; ++i ) {
+       if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+       
+       ++j; if( j == numWorkers ) { j = 0; }
+
+       pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
+
+       if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
+         pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
+         // no work here, keep looking
+         continue;
+       }
+
+       // found some work in another deque, steal it
+       workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
+       pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
+       break;
+      }
+
+      // didn't find any work, even in my own deque,
+      // after checking everyone twice?  Exit thread
+      if( workUnit == NULL ) {
+       break;
+      }
+
+    } else {
+      // have work in own deque, take out from front
+      workUnit = getItem( myData->dequeWorkUnits );
+      pthread_mutex_unlock( &(myData->dequeLock) );
+    }
+
+    // wherever the work came from, process it
+    workFunc( workUnit );
+
+    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+  }
+
+  printf( "Worker %d exiting.\n", myData->workerThread );
+  fflush( stdout );
+
+  return NULL;
+}
+
+
+void workScheduleInit( int numProcessors,
+                       void(*func)(void*) ) {
+  int i, status;
+
+  numWorkers = numProcessors;
+  workFunc   = func;
+
+  // allocate space for worker data
+  workerDataArray = malloc( sizeof( workerData ) * numWorkers );
+  for( i = 0; i < numWorkers; ++i ) {    
+
+    // the deque
+    workerDataArray[i].dequeWorkUnits = createQueue();
+
+    // set the next worker to add work to as itself
+    workerDataArray[i].nextWorkerToLoad = i;
+
+    // it's lock
+    status = pthread_mutex_init( &(workerDataArray[i].dequeLock), 
+                                NULL
+                                );
+    if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+  }
+
+  // only create the actual pthreads after all workers
+  // have data that is protected with initialized locks
+  for( i = 0; i < numWorkers; ++i ) {    
+    status = pthread_create( &(workerDataArray[i].workerThread), 
+                             NULL,
+                             workerMain,
+                             (void*) &(workerDataArray[i])
+                           );
+    if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+  }
+
+  // yield and let all workers get to the begin
+  // condition variable, waiting--we have to hold them
+  // so they don't all see empty work queues right away
+  if( sched_yield() == -1 ) {
+    printf( "Error thread trying to yield.\n" );
+    exit( -1 );
+  }
+}
+
+
+void workScheduleSubmit( void* workUnit ) {
+
+  // query who is submitting and find out who they are scheduled to load
+  int submitterIndex = threadID2workerIndex( pthread_self() );
+  int workerIndex    = workerDataArray[submitterIndex].nextWorkerToLoad;
+  
+  // choose a new index and save it
+  ++workerIndex;
+  if( workerIndex == numWorkers ) {
+    workerIndex = 0;
+  }
+  workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
+
+  // load the chosen worker
+  pthread_mutex_lock  ( &(workerDataArray[workerIndex].dequeLock) );
+  addNewItemBack      (   workerDataArray[workerIndex].dequeWorkUnits, workUnit );
+  pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
+}
+
+
+// really should be named "wait until work is finished"
+void workScheduleBegin() {
+  
+  int i;
+
+  // tell all workers to begin
+  pthread_mutex_lock    ( &systemBeginLock );
+  pthread_cond_broadcast( &systemBeginCond );
+  pthread_mutex_unlock  ( &systemBeginLock );  
+
+  // wait until workers inform that all work is complete
+  /*
+  pthread_mutex_lock  ( &systemReturnLock );
+  pthread_cond_wait   ( &systemReturnCond, &systemReturnLock );
+  pthread_mutex_unlock( &systemReturnLock );
+  */
+
+  for( i = 0; i < numWorkers; ++i ) {
+    pthread_join( workerDataArray[i].workerThread, NULL );
+  }
+}
diff --git a/Robust/src/Runtime/workschedule.h b/Robust/src/Runtime/workschedule.h
new file mode 100644 (file)
index 0000000..78a976a
--- /dev/null
@@ -0,0 +1,27 @@
+#ifndef __WORK_SCHEDULE__
+#define __WORK_SCHEDULE__
+
+
+// initialize the work schedule system, after 
+// which some preliminary work units can be
+// scheduled.  Note the supplied work func 
+// should operate on a work unit of the type
+// submitted in the function below
+void workScheduleInit( int numProcessors,
+                       void(*workFunc)(void*) );
+
+// your main program, before beginning this
+// system, or the execution of worker threads
+// themselves use this submit function to
+// distribute work units among the worker pool
+// threads.  The workers will dynamically
+// steal from one another to load balance
+void workScheduleSubmit( void* workUnit );
+
+// once you call this begin function your main
+// thread becomes a work thread, so programs
+// should not expect to return from this
+void workScheduleBegin();
+
+
+#endif /* __WORK_SCHEDULE__ */
diff --git a/Robust/src/Tests/workSchedule/makefile b/Robust/src/Tests/workSchedule/makefile
new file mode 100644 (file)
index 0000000..f46df39
--- /dev/null
@@ -0,0 +1,21 @@
+QDIR=../../Runtime
+DEFS= -D "RUNMALLOC=malloc" -D "RUNFREE=free" -D "DEBUG_WORKSCH="
+
+all: a.out
+
+a.out: testMain.o workschedule.o queue.o
+       gcc testMain.o workschedule.o queue.o -lpthread
+
+workschedule.o: $(QDIR)/workschedule.h $(QDIR)/workschedule.c $(QDIR)/Queue.h
+       gcc -c -I$(QDIR) $(DEFS) $(QDIR)/workschedule.c -o workschedule.o
+
+queue.o: $(QDIR)/Queue.h $(QDIR)/Queue.c
+       gcc -c -I$(QDIR) $(DEFS) $(QDIR)/Queue.c -o queue.o
+
+testMain.o: testMain.c $(QDIR)/workschedule.h
+       gcc -c -I$(QDIR) $(DEFS) testMain.c -o testMain.o
+
+clean:
+       rm -f a.out
+       rm -f *.o
+       rm *~
diff --git a/Robust/src/Tests/workSchedule/testMain.c b/Robust/src/Tests/workSchedule/testMain.c
new file mode 100644 (file)
index 0000000..cbf32a0
--- /dev/null
@@ -0,0 +1,140 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <sys/time.h>
+
+#include "workschedule.h"
+
+
+static int workLoad = 0;
+
+
+typedef struct wUnit_ {
+  int     n;
+  double* answer;
+} wUnit;
+
+
+double findSqrt( int x ) {
+  double a, b, p;
+  double e = 0.00001;
+  double k = (double) x;
+  int i;
+
+  // just do it a bunch of times
+  // to generate more work
+  for( i = 0; i < 100000; ++i ) {
+
+    a = k;
+    p = a*a;
+    while( p - k >= e ) {
+      b = (a + (k/a)) / 2.0;
+      a = b;
+      p = a*a;
+    }
+  }
+
+  return a;
+}
+
+
+void processWorkUnit( void* workUnit ) {
+  
+  wUnit* w = (wUnit*) workUnit;
+  wUnit* x = NULL;
+
+  // submit more work
+  if( w->n > 0 ) {
+    x         = malloc( sizeof( wUnit ) );
+    x->n      = w->n      - 1;
+    x->answer = w->answer - 1;
+    workScheduleSubmit( (void*)x );
+  }
+
+  // then start computing 
+  // and store answer
+  *(w->answer) = findSqrt( w->n );
+
+  // workunit no longer needed
+  free( w );
+}
+
+
+
+void usage() {
+  printf( "usage:\na.out <int workers> <int workLoad> <str singlethread/workschedule>\n" );
+}
+
+
+int main( int argc, char** argv ) {
+
+  struct timeval tstart;
+  struct timeval tfinish;
+  double         timediff;
+  double*        answers;
+  double         answerSummary;
+  int            i;
+
+  if( argc != 4 ) {
+    usage();
+    exit( 0 );
+  }
+
+  // initialize solution array outside of timing
+  workLoad = atoi( argv[2] );
+  answers = malloc( sizeof( double ) * workLoad );
+  for( i = 0; i < workLoad; ++i ) {
+    answers[i] = 0.0;
+  }
+
+
+  gettimeofday( &tstart, NULL );
+
+
+  if( strcmp( argv[3], "singlethread" ) == 0 ) {
+
+    for( i = 0; i < workLoad; ++i ) {
+      answers[i] = findSqrt( i );
+    }
+
+  } else if( strcmp( argv[3], "workschedule" ) == 0 ) {
+    // initialize the system
+    workScheduleInit( atoi( argv[1] ),   // num processors
+                     processWorkUnit ); // func for processing
+
+    // add a preliminary work unit
+    wUnit* w  = malloc( sizeof( wUnit ) );
+    w->n      = workLoad-1;
+    w->answer = &(answers[workLoad-1]);
+    workScheduleSubmit( (void*)w );
+
+    // start work schedule, some work will
+    // generate more work, when its all done
+    // the system will return to this point
+    workScheduleBegin();
+
+  } else {
+    usage();
+    exit( 0 );
+  }
+
+
+  gettimeofday( &tfinish, NULL );
+
+
+  // summarize answers outside of timing
+  answerSummary = 0.0;
+  for( i = 0; i < workLoad; ++i ) {
+    answerSummary += answers[i];
+  }
+
+
+  timediff = (double)(((tfinish.tv_sec  - tstart.tv_sec )*1000000)+
+                     ((tfinish.tv_usec - tstart.tv_usec)*1      ))
+    / 1000000.0;   
+
+  printf( "\n\nComputed summary %f in %f seconds.\n",
+         answerSummary,
+         timediff );
+
+  return 0;
+}