#include "workschedule.h"
+
+// NOTE: Converting this from a work-stealing strategy
+// to a single-queue thread pool protected by a single
+// lock. This will not scale, but it will support
+// development of the system for now
+
+
+
// for convenience
typedef struct Queue deq;
+/*
// each worker needs the following
typedef struct workerData_t {
pthread_t workerThread;
deq* dequeWorkUnits;
int nextWorkerToLoad;
} workerData;
+*/
+// just one queue for everyone
+static pthread_mutex_t dequeLock;
+static deq* dequeWorkUnits;
+
// implementation internal data
static int numWorkers;
-static workerData* workerDataArray;
+//static workerData* workerDataArray;
+static pthread_t* workerArray;
static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
static void(*workFunc)(void*);
+/*
// helper func
int threadID2workerIndex( pthread_t id ) {
int i;
// thread and should pick arbitrary worker
return 0;
}
+*/
-
+/*
// the worker thread main func, which takes a func
// from user for processing any one work unit, then
// workers use it to process work units and steal
return NULL;
}
+*/
+
+
+void* workerMain( void* arg ) {
+
+ void* workUnit;
+
+ int tries = 3;
+
+ // all workers wait until system is ready
+ pthread_mutex_lock ( &systemBeginLock );
+ pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
+ pthread_mutex_unlock( &systemBeginLock );
+
+ while( tries > 0 ) {
+
+ pthread_mutex_lock( &dequeLock );
+
+ // look in the queue for work
+ if( !isEmpty( dequeWorkUnits ) ) {
+ workUnit = getItem( dequeWorkUnits );
+ } else {
+ workUnit = NULL;
+ }
+
+ pthread_mutex_unlock( &dequeLock );
+
+ // yield processor before moving on
+ if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+
+ if( workUnit != NULL ) {
+ workFunc( workUnit );
+ tries = 3;
+ } else {
+ --tries;
+ }
+ }
+
+ return NULL;
+}
+/*
void workScheduleInit( int numProcessors,
void(*func)(void*) ) {
int i, status;
// allocate space for worker data
workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
+
for( i = 0; i < numWorkers; ++i ) {
// the deque
exit( -1 );
}
}
+*/
+
+
+void workScheduleInit( int numProcessors,
+ void(*func)(void*) ) {
+ int i, status;
+
+ numWorkers = numProcessors;
+ workFunc = func;
+
+ dequeWorkUnits = createQueue();
+
+ status = pthread_mutex_init( &dequeLock, NULL );
+ if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+
+ workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
+
+ for( i = 0; i < numWorkers; ++i ) {
+ status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
+ if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+
+ // yield and let all workers get to the beginx3
+ // condition variable, waiting--we have to hold them
+ // so they don't all see empty work queues right away
+ if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+ }
+}
+/*
void workScheduleSubmit( void* workUnit ) {
// query who is submitting and find out who they are scheduled to load
addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit );
pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
}
+*/
+
+void workScheduleSubmit( void* workUnit ) {
+ pthread_mutex_lock ( &dequeLock );
+ addNewItemBack ( dequeWorkUnits, workUnit );
+ pthread_mutex_unlock( &dequeLock );
+}
// really should be named "wait until work is finished"
pthread_cond_broadcast( &systemBeginCond );
pthread_mutex_unlock ( &systemBeginLock );
- // wait until workers inform that all work is complete
- /*
- pthread_mutex_lock ( &systemReturnLock );
- pthread_cond_wait ( &systemReturnCond, &systemReturnLock );
- pthread_mutex_unlock( &systemReturnLock );
- */
-
for( i = 0; i < numWorkers; ++i ) {
- pthread_join( workerDataArray[i].workerThread, NULL );
+ pthread_join( workerArray[i], NULL );
}
}