7 #include "workschedule.h"
11 // NOTE: Converting this from a work-stealing strategy
12 // to a single-queue thread pool protected by a single
13 // lock. This will not scale, but it will support
14 // development of the system for now
19 typedef struct Queue deq;
23 // each worker needs the following
24 typedef struct workerData_t {
25 pthread_t workerThread;
26 pthread_mutex_t dequeLock;
31 // just one queue for everyone
32 static pthread_mutex_t dequeLock;
33 static deq* dequeWorkUnits;
37 // implementation internal data
38 static int numWorkers;
39 //static workerData* workerDataArray;
40 static pthread_t* workerArray;
41 static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
42 static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
43 static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
44 static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
45 static void(*workFunc)(void*);
50 int threadID2workerIndex( pthread_t id ) {
52 for( i = 0; i < numWorkers; ++i ) {
53 if( workerDataArray[i].workerThread == id ) {
57 // if we didn't find it, we are an outside
58 // thread and should pick arbitrary worker
65 // the worker thread main func, which takes a func
66 // from user for processing any one work unit, then
67 // workers use it to process work units and steal
68 // them from one another
69 void* workerMain( void* arg ) {
71 workerData* myData = (workerData*) arg;
78 // all workers wait until system is ready
79 pthread_mutex_lock ( &systemBeginLock );
80 pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
81 pthread_mutex_unlock( &systemBeginLock );
86 pthread_mutex_lock( &(myData->dequeLock) );
88 if( isEmpty( myData->dequeWorkUnits ) ) {
90 // my deque is empty, try to steal
91 pthread_mutex_unlock( &(myData->dequeLock) );
94 j = myData->nextWorkerToLoad;
96 // look at everyone's queue at least twice
97 for( i = 0; i < numWorkers; ++i ) {
98 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
100 ++j; if( j == numWorkers ) { j = 0; }
102 pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
104 if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
105 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
106 // no work here, yield and then keep looking
107 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
111 // found some work in another deque, steal it
112 workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
113 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
117 // didn't find any work, even in my own deque,
118 // after checking everyone twice? Exit thread
119 if( workUnit == NULL ) {
124 // have work in own deque, take out from front
125 workUnit = getItem( myData->dequeWorkUnits );
126 pthread_mutex_unlock( &(myData->dequeLock) );
129 // wherever the work came from, process it
130 workFunc( workUnit );
132 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
135 printf( "Worker %d exiting.\n", myData->workerThread );
143 void* workerMain( void* arg ) {
149 // all workers wait until system is ready
150 pthread_mutex_lock ( &systemBeginLock );
151 pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
152 pthread_mutex_unlock( &systemBeginLock );
156 pthread_mutex_lock( &dequeLock );
158 // look in the queue for work
159 if( !isEmpty( dequeWorkUnits ) ) {
160 workUnit = getItem( dequeWorkUnits );
165 pthread_mutex_unlock( &dequeLock );
167 // yield processor before moving on
168 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
170 if( workUnit != NULL ) {
171 workFunc( workUnit );
183 void workScheduleInit( int numProcessors,
184 void(*func)(void*) ) {
187 numWorkers = numProcessors;
190 // allocate space for worker data
191 workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
193 for( i = 0; i < numWorkers; ++i ) {
196 workerDataArray[i].dequeWorkUnits = createQueue();
198 // set the next worker to add work to as itself
199 workerDataArray[i].nextWorkerToLoad = i;
202 status = pthread_mutex_init( &(workerDataArray[i].dequeLock),
205 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
208 // only create the actual pthreads after all workers
209 // have data that is protected with initialized locks
210 for( i = 0; i < numWorkers; ++i ) {
211 status = pthread_create( &(workerDataArray[i].workerThread),
214 (void*) &(workerDataArray[i])
216 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
219 // yield and let all workers get to the begin
220 // condition variable, waiting--we have to hold them
221 // so they don't all see empty work queues right away
222 if( sched_yield() == -1 ) {
223 printf( "Error thread trying to yield.\n" );
230 void workScheduleInit( int numProcessors,
231 void(*func)(void*) ) {
234 numWorkers = numProcessors*5;
237 dequeWorkUnits = createQueue();
239 status = pthread_mutex_init( &dequeLock, NULL );
240 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
242 workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
244 for( i = 0; i < numWorkers; ++i ) {
245 status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
246 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
248 // yield and let all workers get to the beginx3
249 // condition variable, waiting--we have to hold them
250 // so they don't all see empty work queues right away
251 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
257 void workScheduleSubmit( void* workUnit ) {
259 // query who is submitting and find out who they are scheduled to load
260 int submitterIndex = threadID2workerIndex( pthread_self() );
261 int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad;
263 // choose a new index and save it
265 if( workerIndex == numWorkers ) {
268 workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
270 // load the chosen worker
271 pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) );
272 addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit );
273 pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
277 void workScheduleSubmit( void* workUnit ) {
278 pthread_mutex_lock ( &dequeLock );
279 addNewItemBack ( dequeWorkUnits, workUnit );
280 pthread_mutex_unlock( &dequeLock );
284 // really should be named "wait until work is finished"
285 void workScheduleBegin() {
289 // tell all workers to begin
290 pthread_mutex_lock ( &systemBeginLock );
291 pthread_cond_broadcast( &systemBeginCond );
292 pthread_mutex_unlock ( &systemBeginLock );
294 for( i = 0; i < numWorkers; ++i ) {
295 pthread_join( workerArray[i], NULL );