6 #include "workschedule.h"
7 #include "mlp_runtime.h"
10 // NOTE: Converting this from a work-stealing strategy
11 // to a single-queue thread pool protected by a single
12 // lock. This will not scale, but it will support
13 // development of the system for now
18 typedef struct Queue deq;
22 // each worker needs the following
23 typedef struct workerData_t {
24 pthread_t workerThread;
25 pthread_mutex_t dequeLock;
31 typedef struct workerData_t{
32 pthread_t workerThread;
37 static pthread_mutex_t systemLockIn;
38 static pthread_mutex_t systemLockOut;
40 // just one queue for everyone
41 //static pthread_mutex_t dequeLock;
45 // implementation internal data
46 static WorkerData* workerDataArray;
47 static pthread_t* workerArray;
49 static int systemStarted = 0;
51 //static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
52 static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
53 //static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
54 //static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
55 static void(*workFunc)(void*);
57 static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
62 pthread_mutex_t gclock;
63 pthread_mutex_t gclistlock;
64 pthread_cond_t gccond;
66 extern struct listitem * list;
67 extern __thread struct listitem litem;
68 extern __thread SESEcommon* seseCommon;
84 int threadID2workerIndex( pthread_t id ) {
86 for( i = 0; i < numWorkers; ++i ) {
87 if( workerDataArray[i].workerThread == id ) {
91 // if we didn't find it, we are an outside
92 // thread and should pick arbitrary worker
99 // the worker thread main func, which takes a func
100 // from user for processing any one work unit, then
101 // workers use it to process work units and steal
102 // them from one another
103 void* workerMain( void* arg ) {
105 workerData* myData = (workerData*) arg;
112 // all workers wait until system is ready
113 pthread_mutex_lock ( &systemBeginLock );
114 pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
115 pthread_mutex_unlock( &systemBeginLock );
120 pthread_mutex_lock( &(myData->dequeLock) );
122 if( isEmpty( myData->dequeWorkUnits ) ) {
124 // my deque is empty, try to steal
125 pthread_mutex_unlock( &(myData->dequeLock) );
128 j = myData->nextWorkerToLoad;
130 // look at everyone's queue at least twice
131 for( i = 0; i < numWorkers; ++i ) {
132 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
134 ++j; if( j == numWorkers ) { j = 0; }
136 pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
138 if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
139 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
140 // no work here, yield and then keep looking
141 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
145 // found some work in another deque, steal it
146 workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
147 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
151 // didn't find any work, even in my own deque,
152 // after checking everyone twice? Exit thread
153 if( workUnit == NULL ) {
158 // have work in own deque, take out from front
159 workUnit = getItem( myData->dequeWorkUnits );
160 pthread_mutex_unlock( &(myData->dequeLock) );
163 // wherever the work came from, process it
164 workFunc( workUnit );
166 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
169 printf( "Worker %d exiting.\n", myData->workerThread );
177 void* workerMain( void* arg ) {
180 WorkerData* myData = (WorkerData*) arg;
183 // make sure init mlp once-per-thread stuff
184 //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
186 // all workers wait until system is ready
188 // then continue to process work
193 if(pthread_mutex_trylock(&systemLock)==0){
194 if(isEmpty(dequeWorkUnits)){
195 pthread_mutex_unlock(&systemLock);
201 workUnit = getItem( dequeWorkUnits );
202 pthread_mutex_unlock( &systemLock );
205 pthread_mutex_lock( &systemLockOut );
207 if (headqi->next==NULL) {
208 pthread_mutex_unlock( &systemLockOut );
212 struct QI * tmp=headqi;
213 headqi = headqi->next;
214 workUnit = headqi->value;
215 pthread_mutex_unlock( &systemLockOut );
217 // yield processor before moving on, just to exercise
218 // system's out-of-order correctness
219 //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
220 //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
223 pthread_mutex_lock(&gclistlock);
225 litem.seseCommon=(void*)workUnit;
231 seseCommon=(SESEcommon*)workUnit;
232 pthread_mutex_unlock(&gclistlock);
234 workFunc( workUnit );
236 pthread_mutex_lock(&gclistlock);
238 if (litem.prev==NULL) {
241 litem.prev->next=litem.next;
243 if (litem.next!=NULL) {
244 litem.next->prev=litem.prev;
246 pthread_mutex_unlock(&gclistlock);
255 void workScheduleInit( int numProcessors,
256 void(*func)(void*) ) {
259 numWorkers = numProcessors;
262 // allocate space for worker data
263 workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
265 for( i = 0; i < numWorkers; ++i ) {
268 workerDataArray[i].dequeWorkUnits = createQueue();
270 // set the next worker to add work to as itself
271 workerDataArray[i].nextWorkerToLoad = i;
274 status = pthread_mutex_init( &(workerDataArray[i].dequeLock),
277 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
280 // only create the actual pthreads after all workers
281 // have data that is protected with initialized locks
282 for( i = 0; i < numWorkers; ++i ) {
283 status = pthread_create( &(workerDataArray[i].workerThread),
286 (void*) &(workerDataArray[i])
288 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
291 // yield and let all workers get to the begin
292 // condition variable, waiting--we have to hold them
293 // so they don't all see empty work queues right away
294 if( sched_yield() == -1 ) {
295 printf( "Error thread trying to yield.\n" );
302 void workScheduleInit( int numProcessors,
303 void(*func)(void*) ) {
307 pthread_mutex_init(&gclock, NULL);
308 pthread_mutex_init(&gclistlock, NULL);
309 pthread_cond_init(&gccond, NULL);
311 //numWorkers = numProcessors*5;
312 numWorkers = numProcessors + 1;
316 headqi=tailqi=RUNMALLOC(sizeof(struct QI));
319 status = pthread_mutex_init( &systemLockIn, NULL );
320 status = pthread_mutex_init( &systemLockOut, NULL );
322 //workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
323 workerDataArray = RUNMALLOC( sizeof( WorkerData ) * numWorkers );
325 for( i = 0; i < numWorkers; ++i ) {
326 workerDataArray[i].id=i+2;
327 status = pthread_create( &(workerDataArray[i].workerThread),
330 (void*) &(workerDataArray[i])
332 //status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
333 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
335 // yield and let all workers get to the beginx3
336 // condition variable, waiting--we have to hold them
337 // so they don't all see empty work queues right away
338 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
344 void workScheduleSubmit( void* workUnit ) {
346 // query who is submitting and find out who they are scheduled to load
347 int submitterIndex = threadID2workerIndex( pthread_self() );
348 int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad;
350 // choose a new index and save it
352 if( workerIndex == numWorkers ) {
355 workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
357 // load the chosen worker
358 pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) );
359 addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit );
360 pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
364 void workScheduleSubmit( void* workUnit ) {
367 if(pthread_mutex_trylock(&systemLock)==0){
368 addNewItemBack( dequeWorkUnits, workUnit );
372 pthread_mutex_unlock( &systemLock );
374 struct QI* item=RUNMALLOC(sizeof(struct QI));
375 item->value=workUnit;
378 pthread_mutex_lock ( &systemLockIn );
381 pthread_mutex_unlock( &systemLockIn );
385 // really should be named "wait until work is finished"
386 void workScheduleBegin() {
389 WorkerData *workerData = RUNMALLOC( sizeof( WorkerData ) );
392 workerMain(workerData);
394 // tell all workers to begin
395 for( i = 0; i < numWorkers; ++i ) {
396 //pthread_join( workerArray[i], NULL );
397 pthread_join( workerDataArray[i].workerThread, NULL );