change
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #define _GNU_SOURCE
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <pthread.h>
5 #include <sched.h>
6 #include <sys/syscall.h>
7
8 #include "mem.h"
9 #include "workschedule.h"
10 #include "mlp_runtime.h"
11 #include "psemaphore.h"
12 #include "coreprof/coreprof.h"
13 #ifdef SQUEUE
14 #include "squeue.h"
15 #else
16 #include "deque.h"
17 #endif
18 #ifdef RCR
19 #include "rcr_runtime.h"
20 #include "trqueue.h"
21 #endif
22
23
24
25
26 //////////////////////////////////////////////////
27 //
28 //  for coordination with the garbage collector
29 //
30 //////////////////////////////////////////////////
31 int threadcount;
32 pthread_mutex_t gclock;
33 pthread_mutex_t gclistlock;
34 pthread_cond_t gccond;
35 #ifdef RCR
36 extern pthread_mutex_t queuelock;
37 #endif
38 // in garbage.h, listitem is a struct with a pointer
39 // to a stack, objects, etc. such that the garbage
40 // collector can find pointers for garbage collection
41
42 // this is a global list of listitem structs that the
43 // garbage collector uses to know about each thread
44 extern struct listitem* list;
45
46 // this is the local thread's item on the above list,
47 // it should be added to the global list before a thread
48 // starts doing work, and should be removed only when
49 // the thread is completely finished--in OoOJava/MLP the
50 // only thing hanging from this litem should be a single
51 // task record that the worker thread is executing, if any!
52 extern __thread struct listitem litem;
53 //////////////////////////////////////////////////
54 //
55 //  end coordination with the garbage collector
56 //
57 //////////////////////////////////////////////////
58
59
60
61
62 typedef struct workerData_t {
63   pthread_t workerThread;
64   int       id;
65 } WorkerData;
66
67 // a thread should know its worker id in any
68 // functions below
69 __thread int myWorkerID;
70
71 // the original thread starts up the work scheduler
72 // and sleeps while it is running, it has no worker
73 // ID so use this to realize that
74 const int workerID_NOTAWORKER = 0xffffff0;
75
76
77 int oidIncrement;
78 volatile int numWorkSchedWorkers;
79 int realnumWorkSchedWorkers;
80 static WorkerData*  workerDataArray;
81 static pthread_t*   workerArray;
82
83 static void(*workFunc)(void*);
84
85 // each thread can create objects but should assign
86 // globally-unique object ID's (oid) so have threads
87 // give out this as next id, then increment by number
88 // of threads to ensure disjoint oid sets
89 __thread int oid;
90
91 // global array of work-stealing deques, where
92 // each thread uses its ID as the index to its deque
93 deque* deques;
94
95
96
97 #ifdef RCR
98 #include "trqueue.h"
99 __thread struct trQueue * TRqueue=NULL;
100 #endif
101
102
103
104 // this is a read-by-all and write-by-one variable
105 // IT IS UNPROTECTED, BUT SAFE for all threads to
106 // read it (periodically, only when they can find no work)
107 // and only the worker that retires the main thread will
108 // write it to 1, at which time other workers will see
109 // that they should exit gracefully
110 static volatile int mainTaskRetired = FALSE;
111
112
113
114
115 void* workerMain( void* arg ) {
116   void*       workUnit;
117   WorkerData* myData  = (WorkerData*) arg;
118   deque*      myDeque = &(deques[myData->id]);
119   int         keepRunning = TRUE;
120   int         haveWork;
121   int         lastVictim = 0;
122   int         i;
123
124   myWorkerID = myData->id;
125
126   // ensure that object ID's start at 1 so that using
127   // oid with value 0 indicates an invalid object
128   oid = myData->id + 1;
129
130   // each thread has a single semaphore that a running
131   // task should hand off to children threads it is
132   // going to stall on
133   psem_init( &runningSESEstallSem );
134
135   // the worker threads really have no context relevant to the
136   // user program, so build an empty garbage list struct to
137   // pass to the collector if collection occurs
138   struct garbagelist emptygarbagelist = { 0, NULL };
139
140   // Add this worker to the gc list
141   pthread_mutex_lock( &gclistlock );
142   threadcount++;
143   litem.prev = NULL;
144   litem.next = list;
145   if( list != NULL ) 
146     list->prev = &litem;
147   list = &litem;
148   pthread_mutex_unlock( &gclistlock );
149
150
151   // start timing events in this thread
152   CP_CREATE();
153
154
155   // then continue to process work
156   while( keepRunning ) {
157
158     // wait for work
159 #ifdef CP_EVENTID_WORKSCHEDGRAB
160     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
161 #endif
162
163     haveWork = FALSE;
164     while( !haveWork ) {
165
166       workUnit = dqPopBottom( myDeque );
167
168
169       if( workUnit != DQ_POP_EMPTY ) {
170         haveWork = TRUE;
171         goto dowork;
172       } else {
173         // try to steal from another queue, starting
174         // with the last successful victim, don't check
175         // your own deque
176         int mynumWorkSchedWorkers=numWorkSchedWorkers;
177         for( i = 0; i < mynumWorkSchedWorkers - 1; ++i ) {
178
179           workUnit = dqPopTop( &(deques[lastVictim]) );
180           
181 #ifdef SQUEUE
182           if( workUnit != DQ_POP_EMPTY ) {
183 #else
184           if( workUnit != DQ_POP_ABORT &&
185               workUnit != DQ_POP_EMPTY ) {
186 #endif
187             // successful steal!
188             haveWork = TRUE;
189             goto dowork;
190           }
191        
192           // choose next victim
193           lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
194           
195           if( lastVictim == myWorkerID ) {
196             lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
197           }
198         }
199         // end steal attempts
200
201
202         // if we successfully stole work, break out of the
203         // while-not-have-work loop, otherwise we looked
204         // everywhere, so drop down to "I'm idle" code below
205         if( haveWork ) {
206           goto dowork;
207         }
208       }
209
210       // if we drop down this far, we didn't find any work,
211       // so do a garbage collection, yield the processor,
212       // then check if the entire system is out of work
213       if( unlikely( needtocollect ) ) {
214         checkcollect( &emptygarbagelist );
215       }
216
217       sched_yield();
218
219       if( mainTaskRetired ) {
220         keepRunning = FALSE;
221         break;
222       }
223
224     } // end the while-not-have-work loop
225
226     dowork:
227
228 #ifdef CP_EVENTID_WORKSCHEDGRAB
229     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
230 #endif
231
232     // when is no work left we will pop out
233     // here, so only do work if any left
234     if( haveWork ) {
235       // let GC see current work
236       litem.seseCommon = (void*)workUnit;
237
238 #ifdef DEBUG_DEQUE
239       if( workUnit == NULL ) {
240         printf( "About to execute a null work item\n" );
241       }
242 #endif
243
244       workFunc( workUnit );
245     }
246   } 
247
248
249   CP_EXIT();
250
251
252   // remove from GC list
253   pthread_mutex_lock( &gclistlock );
254   threadcount--;
255   if( litem.prev == NULL ) {
256     list = litem.next;
257   } else {
258     litem.prev->next = litem.next;
259   }
260   if( litem.next != NULL ) {
261     litem.next->prev = litem.prev;
262   }
263   pthread_mutex_unlock( &gclistlock );
264
265
266   return NULL;
267 }
268
269
270 void workScheduleInit( int numProcessors,
271                        void(*func)(void*) ) {
272   int i, status;
273   pthread_attr_t attr;
274
275   // the original thread must call this now to
276   // protect memory allocation events coming
277   CP_CREATE();
278
279   // the original thread is a worker
280   myWorkerID = 0;
281   oid = 1;
282
283 #ifdef RCR
284   pthread_mutex_init( &queuelock,     NULL );
285 #endif
286   pthread_mutex_init( &gclock,     NULL );
287   pthread_mutex_init( &gclistlock, NULL );
288   pthread_cond_init ( &gccond,     NULL );
289
290
291   numWorkSchedWorkers = numProcessors;
292   realnumWorkSchedWorkers=numProcessors;
293   oidIncrement=numProcessors;
294   while(1) {
295     int x=2;
296     //check primality
297     for(;x<oidIncrement;x++) {
298       //not prime
299       if (oidIncrement%x==0) {
300         oidIncrement++;
301         break;
302       }
303     }
304     //have prime
305     if (x==oidIncrement)
306       break;
307   }
308
309   workFunc = func;
310
311 #ifdef RCR
312   deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers*2);
313 #else
314   deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers );
315 #endif
316   workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
317
318 #ifdef RCR
319   for( i = 0; i < numWorkSchedWorkers*2; ++i ) {
320 #else
321   for( i = 0; i < numWorkSchedWorkers; ++i ) {
322 #endif
323     dqInit( &(deques[i]) );
324   }
325
326 #ifndef COREPIN
327   
328   pthread_attr_init( &attr );
329   pthread_attr_setdetachstate( &attr, 
330                                PTHREAD_CREATE_JOINABLE );
331
332   workerDataArray[0].id = 0;
333
334   for( i = 1; i < numWorkSchedWorkers; ++i ) {
335
336     workerDataArray[i].id = i;
337
338     status = pthread_create( &(workerDataArray[i].workerThread), 
339                              &attr,
340                              workerMain,
341                              (void*) &(workerDataArray[i])
342                              );
343
344     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
345   }
346 #else
347   int numCore=24;
348   cpu_set_t cpuset;
349   pthread_attr_t thread_attr[numWorkSchedWorkers];
350   int idx;
351
352   workerDataArray[0].id = 0;
353   CPU_ZERO(&cpuset);
354   CPU_SET(0, &cpuset);
355   sched_setaffinity(syscall(SYS_gettid), sizeof(cpuset), &cpuset);  
356   
357   for(idx=1;idx<numWorkSchedWorkers;idx++){
358     int coreidx=idx%numCore;    
359     pthread_attr_t* attr = &thread_attr[idx];
360     pthread_attr_init(attr);
361     pthread_attr_setdetachstate(attr, PTHREAD_CREATE_JOINABLE);
362     CPU_ZERO(&cpuset);
363     CPU_SET(coreidx, &cpuset);
364     pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
365     
366     workerDataArray[idx].id = idx;
367     
368     status = pthread_create( &(workerDataArray[idx].workerThread), 
369                              attr,
370                              workerMain,
371                              (void*) &(workerDataArray[idx])
372                              );
373     printf("assign %d on %d",idx,coreidx);
374
375   }
376 #endif
377 }
378
379
380 void workScheduleSubmit( void* workUnit ) {
381 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
382   CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_BEGIN );
383 #endif
384   dqPushBottom( &(deques[myWorkerID]), workUnit );
385 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
386   CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_END );
387 #endif
388 }
389
390
391 // really should be named "wait for work in system to complete"
392 void workScheduleBegin() {
393   int i;
394
395   // original thread becomes a worker
396   workerMain( (void*) &(workerDataArray[0]) );
397
398   // then wait for all other workers to exit gracefully
399   for( i = 1; i < realnumWorkSchedWorkers; ++i ) {
400     pthread_join( workerDataArray[i].workerThread, NULL );
401   }
402
403   // write all thread's events to disk
404   CP_DUMP();
405 }
406
407
408 // only the worker that executes and then retires
409 // the main task should invoke this, which indicates to
410 // all other workers they should exit gracefully
411 void workScheduleExit() {
412   mainTaskRetired = 1;
413 }