add load balance module for multicore gc, fix message handling and memory allocation...
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "Queue.h"
7 #include "workschedule.h"
8
9
10
11 // NOTE: Converting this from a work-stealing strategy
12 // to a single-queue thread pool protected by a single
13 // lock.  This will not scale, but it will support
14 // development of the system for now
15
16
17
18 // for convenience
19 typedef struct Queue deq;
20
21
22 /*
23 // each worker needs the following
24 typedef struct workerData_t {
25   pthread_t       workerThread;
26   pthread_mutex_t dequeLock;
27   deq*            dequeWorkUnits;
28   int             nextWorkerToLoad;
29 } workerData;
30 */
31 // just one queue for everyone
32 static pthread_mutex_t dequeLock;
33 static deq*            dequeWorkUnits;
34
35
36
37 // implementation internal data
38 static int             numWorkers;
39 //static workerData*     workerDataArray;
40 static pthread_t*      workerArray;
41 static pthread_mutex_t systemBeginLock  = PTHREAD_MUTEX_INITIALIZER;
42 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
43 static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
44 static pthread_cond_t  systemReturnCond = PTHREAD_COND_INITIALIZER;
45 static void(*workFunc)(void*);
46
47
48 /*
49 // helper func
50 int threadID2workerIndex( pthread_t id ) {
51   int i;
52   for( i = 0; i < numWorkers; ++i ) {
53     if( workerDataArray[i].workerThread == id ) {
54       return i;
55     }
56   }
57   // if we didn't find it, we are an outside
58   // thread and should pick arbitrary worker
59   return 0;
60 }
61 */
62
63
64 /*
65 // the worker thread main func, which takes a func
66 // from user for processing any one work unit, then
67 // workers use it to process work units and steal
68 // them from one another
69 void* workerMain( void* arg ) {
70
71   workerData* myData = (workerData*) arg;
72   
73   void* workUnit;
74
75   int i;
76   int j;
77
78   // all workers wait until system is ready
79   pthread_mutex_lock  ( &systemBeginLock );
80   pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
81   pthread_mutex_unlock( &systemBeginLock );
82
83   while( 1 ) {
84
85     // lock my deque
86     pthread_mutex_lock( &(myData->dequeLock) );
87
88     if( isEmpty( myData->dequeWorkUnits ) ) {
89
90       // my deque is empty, try to steal
91       pthread_mutex_unlock( &(myData->dequeLock) );
92       
93       workUnit = NULL;
94       j = myData->nextWorkerToLoad;
95
96       // look at everyone's queue at least twice
97       for( i = 0; i < numWorkers; ++i ) {
98         if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
99         
100         ++j; if( j == numWorkers ) { j = 0; }
101
102         pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
103
104         if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
105           pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
106           // no work here, yield and then keep looking
107           if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
108           continue;
109         }
110
111         // found some work in another deque, steal it
112         workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
113         pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
114         break;
115       }
116
117       // didn't find any work, even in my own deque,
118       // after checking everyone twice?  Exit thread
119       if( workUnit == NULL ) {
120         break;
121       }
122
123     } else {
124       // have work in own deque, take out from front
125       workUnit = getItem( myData->dequeWorkUnits );
126       pthread_mutex_unlock( &(myData->dequeLock) );
127     }
128
129     // wherever the work came from, process it
130     workFunc( workUnit );
131
132     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
133   }
134
135   printf( "Worker %d exiting.\n", myData->workerThread );
136   fflush( stdout );
137
138   return NULL;
139 }
140 */
141
142
143 void* workerMain( void* arg ) {
144   
145   void* workUnit;
146
147   int tries = 3;
148
149   // all workers wait until system is ready
150   pthread_mutex_lock  ( &systemBeginLock );
151   pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
152   pthread_mutex_unlock( &systemBeginLock );
153
154   while( tries > 0 ) {
155
156     pthread_mutex_lock( &dequeLock );
157
158     // look in the queue for work
159     if( !isEmpty( dequeWorkUnits ) ) {
160       workUnit = getItem( dequeWorkUnits );
161     } else {
162       workUnit = NULL;
163     }
164
165     pthread_mutex_unlock( &dequeLock );
166
167     // yield processor before moving on
168     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
169
170     if( workUnit != NULL ) {   
171       workFunc( workUnit );
172       tries = 3;
173     } else {
174       --tries;
175     }
176   }
177
178   return NULL;
179 }
180
181
182 /*
183 void workScheduleInit( int numProcessors,
184                        void(*func)(void*) ) {
185   int i, status;
186
187   numWorkers = numProcessors;
188   workFunc   = func;
189
190   // allocate space for worker data
191   workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
192
193   for( i = 0; i < numWorkers; ++i ) {    
194
195     // the deque
196     workerDataArray[i].dequeWorkUnits = createQueue();
197
198     // set the next worker to add work to as itself
199     workerDataArray[i].nextWorkerToLoad = i;
200
201     // it's lock
202     status = pthread_mutex_init( &(workerDataArray[i].dequeLock), 
203                                  NULL
204                                  );
205     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
206   }
207
208   // only create the actual pthreads after all workers
209   // have data that is protected with initialized locks
210   for( i = 0; i < numWorkers; ++i ) {    
211     status = pthread_create( &(workerDataArray[i].workerThread), 
212                              NULL,
213                              workerMain,
214                              (void*) &(workerDataArray[i])
215                            );
216     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
217   }
218
219   // yield and let all workers get to the begin
220   // condition variable, waiting--we have to hold them
221   // so they don't all see empty work queues right away
222   if( sched_yield() == -1 ) {
223     printf( "Error thread trying to yield.\n" );
224     exit( -1 );
225   }
226 }
227 */
228
229
230 void workScheduleInit( int numProcessors,
231                        void(*func)(void*) ) {
232   int i, status;
233
234   numWorkers = numProcessors*5;
235   workFunc   = func;
236
237   dequeWorkUnits = createQueue();
238
239   status = pthread_mutex_init( &dequeLock, NULL );
240   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
241
242   workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
243
244   for( i = 0; i < numWorkers; ++i ) {    
245     status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
246     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
247
248     // yield and let all workers get to the beginx3
249     // condition variable, waiting--we have to hold them
250     // so they don't all see empty work queues right away
251     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
252   }
253 }
254
255
256 /*
257 void workScheduleSubmit( void* workUnit ) {
258
259   // query who is submitting and find out who they are scheduled to load
260   int submitterIndex = threadID2workerIndex( pthread_self() );
261   int workerIndex    = workerDataArray[submitterIndex].nextWorkerToLoad;
262   
263   // choose a new index and save it
264   ++workerIndex;
265   if( workerIndex == numWorkers ) {
266     workerIndex = 0;
267   }
268   workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
269
270   // load the chosen worker
271   pthread_mutex_lock  ( &(workerDataArray[workerIndex].dequeLock) );
272   addNewItemBack      (   workerDataArray[workerIndex].dequeWorkUnits, workUnit );
273   pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
274 }
275 */
276
277 void workScheduleSubmit( void* workUnit ) {
278   pthread_mutex_lock  ( &dequeLock );
279   addNewItemBack      ( dequeWorkUnits, workUnit );
280   pthread_mutex_unlock( &dequeLock );
281 }
282
283
284 // really should be named "wait until work is finished"
285 void workScheduleBegin() {
286   
287   int i;
288
289   // tell all workers to begin
290   pthread_mutex_lock    ( &systemBeginLock );
291   pthread_cond_broadcast( &systemBeginCond );
292   pthread_mutex_unlock  ( &systemBeginLock );  
293
294   for( i = 0; i < numWorkers; ++i ) {
295     pthread_join( workerArray[i], NULL );
296   }
297 }