simplified work schedule, will implement work-stealing later
authorjjenista <jjenista>
Mon, 27 Jul 2009 17:32:59 +0000 (17:32 +0000)
committerjjenista <jjenista>
Mon, 27 Jul 2009 17:32:59 +0000 (17:32 +0000)
Robust/src/Runtime/workschedule.c

index 0f9e49bbc32f9fda4066920ed534870012b62d78..01aeab3784750207982a2559df7bcff5619f357c 100644 (file)
@@ -7,10 +7,19 @@
 #include "workschedule.h"
 
 
+
+// NOTE: Converting this from a work-stealing strategy
+// to a single-queue thread pool protected by a single
+// lock.  This will not scale, but it will support
+// development of the system for now
+
+
+
 // for convenience
 typedef struct Queue deq;
 
 
+/*
 // each worker needs the following
 typedef struct workerData_t {
   pthread_t       workerThread;
@@ -18,11 +27,17 @@ typedef struct workerData_t {
   deq*            dequeWorkUnits;
   int             nextWorkerToLoad;
 } workerData;
+*/
+// just one queue for everyone
+static pthread_mutex_t dequeLock;
+static deq*            dequeWorkUnits;
+
 
 
 // implementation internal data
 static int             numWorkers;
-static workerData*     workerDataArray;
+//static workerData*     workerDataArray;
+static pthread_t*      workerArray;
 static pthread_mutex_t systemBeginLock  = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
 static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
@@ -30,6 +45,7 @@ static pthread_cond_t  systemReturnCond = PTHREAD_COND_INITIALIZER;
 static void(*workFunc)(void*);
 
 
+/*
 // helper func
 int threadID2workerIndex( pthread_t id ) {
   int i;
@@ -42,9 +58,10 @@ int threadID2workerIndex( pthread_t id ) {
   // thread and should pick arbitrary worker
   return 0;
 }
+*/
 
 
-
+/*
 // the worker thread main func, which takes a func
 // from user for processing any one work unit, then
 // workers use it to process work units and steal
@@ -120,8 +137,49 @@ void* workerMain( void* arg ) {
 
   return NULL;
 }
+*/
+
+
+void* workerMain( void* arg ) {
+  
+  void* workUnit;
+
+  int tries = 3;
+
+  // all workers wait until system is ready
+  pthread_mutex_lock  ( &systemBeginLock );
+  pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
+  pthread_mutex_unlock( &systemBeginLock );
+
+  while( tries > 0 ) {
+
+    pthread_mutex_lock( &dequeLock );
+
+    // look in the queue for work
+    if( !isEmpty( dequeWorkUnits ) ) {
+      workUnit = getItem( dequeWorkUnits );
+    } else {
+      workUnit = NULL;
+    }
+
+    pthread_mutex_unlock( &dequeLock );
+
+    // yield processor before moving on
+    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+
+    if( workUnit != NULL ) {   
+      workFunc( workUnit );
+      tries = 3;
+    } else {
+      --tries;
+    }
+  }
+
+  return NULL;
+}
 
 
+/*
 void workScheduleInit( int numProcessors,
                        void(*func)(void*) ) {
   int i, status;
@@ -131,6 +189,7 @@ void workScheduleInit( int numProcessors,
 
   // allocate space for worker data
   workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
+
   for( i = 0; i < numWorkers; ++i ) {    
 
     // the deque
@@ -165,8 +224,36 @@ void workScheduleInit( int numProcessors,
     exit( -1 );
   }
 }
+*/
+
+
+void workScheduleInit( int numProcessors,
+                       void(*func)(void*) ) {
+  int i, status;
+
+  numWorkers = numProcessors;
+  workFunc   = func;
+
+  dequeWorkUnits = createQueue();
+
+  status = pthread_mutex_init( &dequeLock, NULL );
+  if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+
+  workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
+
+  for( i = 0; i < numWorkers; ++i ) {    
+    status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
+    if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+
+    // yield and let all workers get to the beginx3
+    // condition variable, waiting--we have to hold them
+    // so they don't all see empty work queues right away
+    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+  }
+}
 
 
+/*
 void workScheduleSubmit( void* workUnit ) {
 
   // query who is submitting and find out who they are scheduled to load
@@ -185,6 +272,13 @@ void workScheduleSubmit( void* workUnit ) {
   addNewItemBack      (   workerDataArray[workerIndex].dequeWorkUnits, workUnit );
   pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
 }
+*/
+
+void workScheduleSubmit( void* workUnit ) {
+  pthread_mutex_lock  ( &dequeLock );
+  addNewItemBack      ( dequeWorkUnits, workUnit );
+  pthread_mutex_unlock( &dequeLock );
+}
 
 
 // really should be named "wait until work is finished"
@@ -197,14 +291,7 @@ void workScheduleBegin() {
   pthread_cond_broadcast( &systemBeginCond );
   pthread_mutex_unlock  ( &systemBeginLock );  
 
-  // wait until workers inform that all work is complete
-  /*
-  pthread_mutex_lock  ( &systemReturnLock );
-  pthread_cond_wait   ( &systemReturnCond, &systemReturnLock );
-  pthread_mutex_unlock( &systemReturnLock );
-  */
-
   for( i = 0; i < numWorkers; ++i ) {
-    pthread_join( workerDataArray[i].workerThread, NULL );
+    pthread_join( workerArray[i], NULL );
   }
 }