6 #include <sys/syscall.h>
9 #include "workschedule.h"
10 #include "mlp_runtime.h"
11 #include "psemaphore.h"
12 #include "coreprof/coreprof.h"
19 #include "rcr_runtime.h"
26 //////////////////////////////////////////////////
28 // for coordination with the garbage collector
30 //////////////////////////////////////////////////
32 pthread_mutex_t gclock;
33 pthread_mutex_t gclistlock;
34 pthread_cond_t gccond;
36 extern pthread_mutex_t queuelock;
38 // in garbage.h, listitem is a struct with a pointer
39 // to a stack, objects, etc. such that the garbage
40 // collector can find pointers for garbage collection
42 // this is a global list of listitem structs that the
43 // garbage collector uses to know about each thread
44 extern struct listitem* list;
46 // this is the local thread's item on the above list,
47 // it should be added to the global list before a thread
48 // starts doing work, and should be removed only when
49 // the thread is completely finished--in OoOJava/MLP the
50 // only thing hanging from this litem should be a single
51 // task record that the worker thread is executing, if any!
52 extern __thread struct listitem litem;
53 //////////////////////////////////////////////////
55 // end coordination with the garbage collector
57 //////////////////////////////////////////////////
62 typedef struct workerData_t {
63 pthread_t workerThread;
67 // a thread should know its worker id in any
69 __thread int myWorkerID;
71 // the original thread starts up the work scheduler
72 // and sleeps while it is running, it has no worker
73 // ID so use this to realize that
74 const int workerID_NOTAWORKER = 0xffffff0;
78 volatile int numWorkSchedWorkers;
79 int realnumWorkSchedWorkers;
80 static WorkerData* workerDataArray;
81 static pthread_t* workerArray;
83 static void (*workFunc)(void*);
85 // each thread can create objects but should assign
86 // globally-unique object ID's (oid) so have threads
87 // give out this as next id, then increment by number
88 // of threads to ensure disjoint oid sets
91 // global array of work-stealing deques, where
92 // each thread uses its ID as the index to its deque
99 __thread struct trQueue * TRqueue=NULL;
104 // this is a read-by-all and write-by-one variable
105 // IT IS UNPROTECTED, BUT SAFE for all threads to
106 // read it (periodically, only when they can find no work)
107 // and only the worker that retires the main thread will
108 // write it to 1, at which time other workers will see
109 // that they should exit gracefully
110 static volatile int mainTaskRetired = FALSE;
115 void* workerMain(void* arg) {
117 WorkerData* myData = (WorkerData*) arg;
118 deque* myDeque = &(deques[myData->id]);
119 int keepRunning = TRUE;
124 myWorkerID = myData->id;
126 // ensure that object ID's start at 1 so that using
127 // oid with value 0 indicates an invalid object
128 oid = myData->id + 1;
130 // each thread has a single semaphore that a running
131 // task should hand off to children threads it is
133 psem_init(&runningSESEstallSem);
135 // the worker threads really have no context relevant to the
136 // user program, so build an empty garbage list struct to
137 // pass to the collector if collection occurs
138 struct garbagelist emptygarbagelist = { 0, NULL };
140 // Add this worker to the gc list
141 pthread_mutex_lock(&gclistlock);
148 pthread_mutex_unlock(&gclistlock);
151 // start timing events in this thread
155 // then continue to process work
156 while( keepRunning ) {
159 #ifdef CP_EVENTID_WORKSCHEDGRAB
160 CP_LOGEVENT(CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN);
166 workUnit = dqPopBottom(myDeque);
169 if( workUnit != DQ_POP_EMPTY ) {
173 // try to steal from another queue, starting
174 // with the last successful victim, don't check
176 int mynumWorkSchedWorkers=numWorkSchedWorkers;
177 for( i = 0; i < mynumWorkSchedWorkers - 1; ++i ) {
179 workUnit = dqPopTop(&(deques[lastVictim]) );
182 if( workUnit != DQ_POP_EMPTY ) {
184 if( workUnit != DQ_POP_ABORT &&
185 workUnit != DQ_POP_EMPTY ) {
192 // choose next victim
193 lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) {
197 if( lastVictim == myWorkerID ) {
198 lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) {
203 // end steal attempts
206 // if we successfully stole work, break out of the
207 // while-not-have-work loop, otherwise we looked
208 // everywhere, so drop down to "I'm idle" code below
214 // if we drop down this far, we didn't find any work,
215 // so do a garbage collection, yield the processor,
216 // then check if the entire system is out of work
217 if( unlikely(needtocollect) ) {
218 checkcollect(&emptygarbagelist);
223 if( mainTaskRetired ) {
228 } // end the while-not-have-work loop
232 #ifdef CP_EVENTID_WORKSCHEDGRAB
233 CP_LOGEVENT(CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END);
236 // when is no work left we will pop out
237 // here, so only do work if any left
239 // let GC see current work
240 litem.seseCommon = (void*)workUnit;
243 if( workUnit == NULL ) {
244 printf("About to execute a null work item\n");
249 litem.seseCommon = NULL;
257 // remove from GC list
258 pthread_mutex_lock(&gclistlock);
260 if( litem.prev == NULL ) {
263 litem.prev->next = litem.next;
265 if( litem.next != NULL ) {
266 litem.next->prev = litem.prev;
268 pthread_mutex_unlock(&gclistlock);
275 void workScheduleInit(int numProcessors,
276 void (*func)(void*) ) {
280 // the original thread must call this now to
281 // protect memory allocation events coming
284 // the original thread is a worker
289 pthread_mutex_init(&queuelock, NULL);
291 pthread_mutex_init(&gclock, NULL);
292 pthread_mutex_init(&gclistlock, NULL);
293 pthread_cond_init(&gccond, NULL);
296 numWorkSchedWorkers = numProcessors;
297 realnumWorkSchedWorkers=numProcessors;
298 oidIncrement=numProcessors;
302 for(; x<oidIncrement; x++) {
304 if (oidIncrement%x==0) {
317 deques = RUNMALLOC(sizeof( deque )*numWorkSchedWorkers*2);
319 deques = RUNMALLOC(sizeof( deque )*numWorkSchedWorkers);
321 workerDataArray = RUNMALLOC(sizeof( WorkerData )*numWorkSchedWorkers);
324 for( i = 0; i < numWorkSchedWorkers*2; ++i ) {
326 for( i = 0; i < numWorkSchedWorkers; ++i ) {
328 dqInit(&(deques[i]) );
333 pthread_attr_init(&attr);
334 pthread_attr_setdetachstate(&attr,
335 PTHREAD_CREATE_JOINABLE);
337 workerDataArray[0].id = 0;
339 for( i = 1; i < numWorkSchedWorkers; ++i ) {
341 workerDataArray[i].id = i;
343 status = pthread_create(&(workerDataArray[i].workerThread),
346 (void*) &(workerDataArray[i])
350 printf("Error\n"); exit(-1);
356 pthread_attr_t thread_attr[numWorkSchedWorkers];
359 workerDataArray[0].id = 0;
362 sched_setaffinity(syscall(SYS_gettid), sizeof(cpuset), &cpuset);
364 for(idx=1; idx<numWorkSchedWorkers; idx++) {
365 int coreidx=idx%numCore;
366 pthread_attr_t* attr = &thread_attr[idx];
367 pthread_attr_init(attr);
368 pthread_attr_setdetachstate(attr, PTHREAD_CREATE_JOINABLE);
370 CPU_SET(coreidx, &cpuset);
371 pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
373 workerDataArray[idx].id = idx;
375 status = pthread_create(&(workerDataArray[idx].workerThread),
378 (void*) &(workerDataArray[idx])
386 void workScheduleSubmit(void* workUnit) {
387 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
388 CP_LOGEVENT(CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_BEGIN);
390 dqPushBottom(&(deques[myWorkerID]), workUnit);
391 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
392 CP_LOGEVENT(CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_END);
397 // really should be named "wait for work in system to complete"
398 void workScheduleBegin() {
401 // original thread becomes a worker
402 workerMain( (void*) &(workerDataArray[0]) );
404 // then wait for all other workers to exit gracefully
405 for( i = 1; i < realnumWorkSchedWorkers; ++i ) {
406 pthread_join(workerDataArray[i].workerThread, NULL);
409 // write all thread's events to disk
414 // only the worker that executes and then retires
415 // the main task should invoke this, which indicates to
416 // all other workers they should exit gracefully
417 void workScheduleExit() {