6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "psemaphore.h"
9 #include "coreprof/coreprof.h"
11 #include "rcr_runtime.h"
15 // NOTE: Converting this from a work-stealing strategy
16 // to a single-queue thread pool protected by a single
17 // lock. This will not scale, but it will support
18 // development of the system for now
26 typedef struct Queue deq;
28 typedef struct workerData_t{
29 pthread_t workerThread;
34 static pthread_mutex_t systemLockIn;
35 static pthread_mutex_t systemLockOut;
37 // implementation internal data
38 static WorkerData* workerDataArray;
39 static pthread_t* workerArray;
41 static int systemStarted = 0;
43 static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
44 static void(*workFunc)(void*);
46 static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
51 pthread_mutex_t gclock;
52 pthread_mutex_t gclistlock;
53 pthread_cond_t gccond;
55 extern struct listitem * list;
56 extern __thread struct listitem litem;
57 extern __thread SESEcommon* seseCommon;
63 __thread struct trQueue * TRqueue=NULL;
67 void workerExit( void* arg ) {
68 //printf( "Thread %d canceled.\n", pthread_self() );
72 void* workerMain( void* arg ) {
74 WorkerData* myData = (WorkerData*) arg;
78 // the worker threads really have no context relevant to the
79 // user program, so build an empty garbage list struct to
80 // pass to the collector if collection occurs
81 struct garbagelist emptygarbagelist = { 0, NULL };
83 // once-per-thread stuff
86 //pthread_cleanup_push( workerExit, NULL );
88 // ensure that object ID's start at 1 so that using
89 // oid with value 0 indicates an invalid object
92 // each thread has a single semaphore that a running
93 // task should hand off to children threads it is
95 psem_init( &runningSESEstallSem );
99 //allocate task record queue
101 pthread_attr_t nattr;
102 pthread_attr_init( &nattr );
103 pthread_attr_setdetachstate( &nattr, PTHREAD_CREATE_DETACHED );
105 //set up the stall site SESErecord
106 stallrecord.common.offsetToParamRecords=(INTPTR) &((SESEstall *)0)->rcrRecords;
107 stallrecord.common.classID=-1;
109 if( TRqueue == NULL ) {
113 int status = pthread_create( &thread,
118 pthread_attr_destroy( &nattr );
120 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
124 //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
125 //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, &oldState );
128 // Add this worker to the gc list
129 pthread_mutex_lock( &gclistlock );
136 pthread_mutex_unlock( &gclistlock );
139 // then continue to process work
143 #ifdef CP_EVENTID_WORKSCHEDGRAB
144 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
150 //NOTE...Fix these things...
151 pthread_mutex_lock( &systemLockOut );
152 if( headqi->next == NULL ) {
153 pthread_mutex_unlock( &systemLockOut );
155 //NOTE: Do a check to see if we need to collect..
156 if( unlikely( needtocollect ) ) {
157 checkcollect( &emptygarbagelist );
167 struct QI* tmp = headqi;
168 headqi = headqi->next;
169 workUnit = headqi->value;
170 pthread_mutex_unlock( &systemLockOut );
173 #ifdef CP_EVENTID_WORKSCHEDGRAB
174 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
177 // let GC see current work
178 litem.seseCommon = (void*)workUnit;
180 // unclear how useful this is
181 if( unlikely( needtocollect ) ) {
182 checkcollect( &emptygarbagelist );
185 workFunc( workUnit );
189 // remove from GC list
190 pthread_mutex_lock( &gclistlock );
192 if( litem.prev == NULL ) {
195 litem.prev->next = litem.next;
197 if( litem.next != NULL ) {
198 litem.next->prev = litem.prev;
200 pthread_mutex_unlock( &gclistlock );
203 //pthread_cleanup_pop( 0 );
208 void workScheduleInit( int numProcessors,
209 void(*func)(void*) ) {
212 // the original thread must call this now to
213 // protect memory allocation events coming, but it
214 // will also add itself to the worker pool and therefore
215 // try to call it again, CP_CREATE should just ignore
219 pthread_mutex_init(&gclock, NULL);
220 pthread_mutex_init(&gclistlock, NULL);
221 pthread_cond_init(&gccond, NULL);
223 numWorkers = numProcessors + 1;
227 headqi=tailqi=RUNMALLOC(sizeof(struct QI));
230 status = pthread_mutex_init( &systemLockIn, NULL );
231 status = pthread_mutex_init( &systemLockOut, NULL );
233 // allocate space for one more--the original thread (running
234 // this code) will become a worker thread after setup
235 workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
238 //make sure the queue is initialized
243 for( i = 0; i < numWorkers; ++i ) {
245 // the original thread is ID 1, start counting from there
246 workerDataArray[i].id = 2 + i;
248 status = pthread_create( &(workerDataArray[i].workerThread),
251 (void*) &(workerDataArray[i])
254 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
256 // yield and let all workers get to the begin
257 // condition variable, waiting--we have to hold them
258 // so they don't all see empty work queues right away
259 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
264 void workScheduleSubmit( void* workUnit ) {
265 struct QI* item=RUNMALLOC(sizeof(struct QI));
266 item->value=workUnit;
269 pthread_mutex_lock ( &systemLockIn );
272 pthread_mutex_unlock( &systemLockIn );
276 // really should be named "add original thread as a worker"
277 void workScheduleBegin() {
280 // space was saved for the original thread to become a
281 // worker after setup is complete
282 workerDataArray[numWorkers].id = 1;
283 workerDataArray[numWorkers].workerThread = pthread_self();
286 workerMain( &(workerDataArray[numWorkers-1]) );
290 // the above function does NOT naturally join all the worker
291 // threads at exit, once the main SESE/Rblock/Task completes
292 // we know all worker threads are finished executing other
293 // tasks so we can explicitly kill the workers, and therefore
294 // trigger any worker-specific cleanup (like coreprof!)
295 void workScheduleExit() {
298 // This is not working well--canceled threads don't run their
299 // thread-level exit routines? Anyway, its not critical for
300 // coreprof but if we ever need a per-worker exit routine to
301 // run we'll have to look back into this.
303 //printf( "Thread %d performing schedule exit.\n", pthread_self() );
305 //for( i = 0; i < numWorkers; ++i ) {
306 // if( pthread_self() != workerDataArray[i].workerThread ) {
307 // pthread_cancel( workerDataArray[i].workerThread );
311 //// how to let all the threads actually get canceled?