6 #include <sys/syscall.h>
9 #include "workschedule.h"
10 #include "mlp_runtime.h"
11 #include "psemaphore.h"
12 #include "coreprof/coreprof.h"
19 #include "rcr_runtime.h"
26 //////////////////////////////////////////////////
28 // for coordination with the garbage collector
30 //////////////////////////////////////////////////
32 pthread_mutex_t gclock;
33 pthread_mutex_t gclistlock;
34 pthread_cond_t gccond;
36 extern pthread_mutex_t queuelock;
38 // in garbage.h, listitem is a struct with a pointer
39 // to a stack, objects, etc. such that the garbage
40 // collector can find pointers for garbage collection
42 // this is a global list of listitem structs that the
43 // garbage collector uses to know about each thread
44 extern struct listitem* list;
46 // this is the local thread's item on the above list,
47 // it should be added to the global list before a thread
48 // starts doing work, and should be removed only when
49 // the thread is completely finished--in OoOJava/MLP the
50 // only thing hanging from this litem should be a single
51 // task record that the worker thread is executing, if any!
52 extern __thread struct listitem litem;
53 //////////////////////////////////////////////////
55 // end coordination with the garbage collector
57 //////////////////////////////////////////////////
62 typedef struct workerData_t {
63 pthread_t workerThread;
67 // a thread should know its worker id in any
69 __thread int myWorkerID;
71 // the original thread starts up the work scheduler
72 // and sleeps while it is running, it has no worker
73 // ID so use this to realize that
74 const int workerID_NOTAWORKER = 0xffffff0;
78 volatile int numWorkSchedWorkers;
79 int realnumWorkSchedWorkers;
80 static WorkerData* workerDataArray;
81 static pthread_t* workerArray;
83 static void(*workFunc)(void*);
85 // each thread can create objects but should assign
86 // globally-unique object ID's (oid) so have threads
87 // give out this as next id, then increment by number
88 // of threads to ensure disjoint oid sets
91 // global array of work-stealing deques, where
92 // each thread uses its ID as the index to its deque
99 __thread struct trQueue * TRqueue=NULL;
104 // this is a read-by-all and write-by-one variable
105 // IT IS UNPROTECTED, BUT SAFE for all threads to
106 // read it (periodically, only when they can find no work)
107 // and only the worker that retires the main thread will
108 // write it to 1, at which time other workers will see
109 // that they should exit gracefully
110 static volatile int mainTaskRetired = FALSE;
115 void* workerMain( void* arg ) {
117 WorkerData* myData = (WorkerData*) arg;
118 deque* myDeque = &(deques[myData->id]);
119 int keepRunning = TRUE;
124 myWorkerID = myData->id;
126 // ensure that object ID's start at 1 so that using
127 // oid with value 0 indicates an invalid object
128 oid = myData->id + 1;
130 // each thread has a single semaphore that a running
131 // task should hand off to children threads it is
133 psem_init( &runningSESEstallSem );
135 // the worker threads really have no context relevant to the
136 // user program, so build an empty garbage list struct to
137 // pass to the collector if collection occurs
138 struct garbagelist emptygarbagelist = { 0, NULL };
140 // Add this worker to the gc list
141 pthread_mutex_lock( &gclistlock );
148 pthread_mutex_unlock( &gclistlock );
151 // start timing events in this thread
155 // then continue to process work
156 while( keepRunning ) {
159 #ifdef CP_EVENTID_WORKSCHEDGRAB
160 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
166 workUnit = dqPopBottom( myDeque );
169 if( workUnit != DQ_POP_EMPTY ) {
173 // try to steal from another queue, starting
174 // with the last successful victim, don't check
176 int mynumWorkSchedWorkers=numWorkSchedWorkers;
177 for( i = 0; i < mynumWorkSchedWorkers - 1; ++i ) {
179 workUnit = dqPopTop( &(deques[lastVictim]) );
182 if( workUnit != DQ_POP_EMPTY ) {
184 if( workUnit != DQ_POP_ABORT &&
185 workUnit != DQ_POP_EMPTY ) {
192 // choose next victim
193 lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
195 if( lastVictim == myWorkerID ) {
196 lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
199 // end steal attempts
202 // if we successfully stole work, break out of the
203 // while-not-have-work loop, otherwise we looked
204 // everywhere, so drop down to "I'm idle" code below
210 // if we drop down this far, we didn't find any work,
211 // so do a garbage collection, yield the processor,
212 // then check if the entire system is out of work
213 if( unlikely( needtocollect ) ) {
214 checkcollect( &emptygarbagelist );
219 if( mainTaskRetired ) {
224 } // end the while-not-have-work loop
228 #ifdef CP_EVENTID_WORKSCHEDGRAB
229 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
232 // when is no work left we will pop out
233 // here, so only do work if any left
235 // let GC see current work
236 litem.seseCommon = (void*)workUnit;
239 if( workUnit == NULL ) {
240 printf( "About to execute a null work item\n" );
244 workFunc( workUnit );
252 // remove from GC list
253 pthread_mutex_lock( &gclistlock );
255 if( litem.prev == NULL ) {
258 litem.prev->next = litem.next;
260 if( litem.next != NULL ) {
261 litem.next->prev = litem.prev;
263 pthread_mutex_unlock( &gclistlock );
270 void workScheduleInit( int numProcessors,
271 void(*func)(void*) ) {
275 // the original thread must call this now to
276 // protect memory allocation events coming
279 // the original thread is a worker
284 pthread_mutex_init( &queuelock, NULL );
286 pthread_mutex_init( &gclock, NULL );
287 pthread_mutex_init( &gclistlock, NULL );
288 pthread_cond_init ( &gccond, NULL );
291 numWorkSchedWorkers = numProcessors;
292 realnumWorkSchedWorkers=numProcessors;
293 oidIncrement=numProcessors;
297 for(;x<oidIncrement;x++) {
299 if (oidIncrement%x==0) {
312 deques = RUNMALLOC( sizeof( deque )*numWorkSchedWorkers*2);
314 deques = RUNMALLOC( sizeof( deque )*numWorkSchedWorkers );
316 workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
319 for( i = 0; i < numWorkSchedWorkers*2; ++i ) {
321 for( i = 0; i < numWorkSchedWorkers; ++i ) {
323 dqInit( &(deques[i]) );
328 pthread_attr_init( &attr );
329 pthread_attr_setdetachstate( &attr,
330 PTHREAD_CREATE_JOINABLE );
332 workerDataArray[0].id = 0;
334 for( i = 1; i < numWorkSchedWorkers; ++i ) {
336 workerDataArray[i].id = i;
338 status = pthread_create( &(workerDataArray[i].workerThread),
341 (void*) &(workerDataArray[i])
344 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
349 pthread_attr_t thread_attr[numWorkSchedWorkers];
352 workerDataArray[0].id = 0;
355 sched_setaffinity(syscall(SYS_gettid), sizeof(cpuset), &cpuset);
357 for(idx=1;idx<numWorkSchedWorkers;idx++){
358 int coreidx=idx%numCore;
359 pthread_attr_t* attr = &thread_attr[idx];
360 pthread_attr_init(attr);
361 pthread_attr_setdetachstate(attr, PTHREAD_CREATE_JOINABLE);
363 CPU_SET(coreidx, &cpuset);
364 pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
366 workerDataArray[idx].id = idx;
368 status = pthread_create( &(workerDataArray[idx].workerThread),
371 (void*) &(workerDataArray[idx])
373 printf("assign %d on %d",idx,coreidx);
380 void workScheduleSubmit( void* workUnit ) {
381 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
382 CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_BEGIN );
384 dqPushBottom( &(deques[myWorkerID]), workUnit );
385 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
386 CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_END );
391 // really should be named "wait for work in system to complete"
392 void workScheduleBegin() {
395 // original thread becomes a worker
396 workerMain( (void*) &(workerDataArray[0]) );
398 // then wait for all other workers to exit gracefully
399 for( i = 1; i < realnumWorkSchedWorkers; ++i ) {
400 pthread_join( workerDataArray[i].workerThread, NULL );
403 // write all thread's events to disk
408 // only the worker that executes and then retires
409 // the main task should invoke this, which indicates to
410 // all other workers they should exit gracefully
411 void workScheduleExit() {