changes on work scheduler.
authoryeom <yeom>
Fri, 29 Jan 2010 17:26:07 +0000 (17:26 +0000)
committeryeom <yeom>
Fri, 29 Jan 2010 17:26:07 +0000 (17:26 +0000)
Robust/src/Runtime/workschedule.c

index 8b5b3c71bd46617c3aa8d8e0d5f2a80dbfa3a864..785704eca7abd1dfb7df4edad7dd151aec6823a3 100644 (file)
@@ -3,7 +3,6 @@
 #include <pthread.h>
 
 #include "mem.h"
-#include "Queue.h"
 #include "workschedule.h"
 #include "mlp_runtime.h"
 
@@ -30,11 +29,11 @@ typedef struct workerData_t {
 */
 
 
-static pthread_mutex_t systemLock;
+static pthread_mutex_t systemLockIn;
+static pthread_mutex_t systemLockOut;
 
 // just one queue for everyone
 //static pthread_mutex_t dequeLock;
-static deq*            dequeWorkUnits;
 
 
 
@@ -59,6 +58,14 @@ pthread_mutex_t gclock;
 pthread_mutex_t gclistlock;
 pthread_cond_t gccond;
 
+struct QI {
+  struct QI * next;
+  void * value;
+};
+
+struct QI * head;
+struct QI * tail;
+
 /*
 // helper func
 int threadID2workerIndex( pthread_t id ) {
@@ -162,27 +169,40 @@ void* workerMain( void* arg ) {
   //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
 
   // all workers wait until system is ready
-  pthread_mutex_lock  ( &systemLock );
-  while( !systemStarted ) {
-    pthread_cond_wait( &systemBeginCond, &systemLock );
-  }
-  pthread_mutex_unlock( &systemLock );
 
   // then continue to process work
   while( 1 ) {
 
-    pthread_mutex_lock( &systemLock );
-    // wait for work
-    while( isEmpty( dequeWorkUnits ) ) {
-      pthread_cond_wait( &workAvailCond, &systemLock );
-    }     
+    /*
+    while(1){
+      if(pthread_mutex_trylock(&systemLock)==0){
+       if(isEmpty(dequeWorkUnits)){
+         pthread_mutex_unlock(&systemLock);
+       }else{
+         break;
+       }
+      }
+    }
     workUnit = getItem( dequeWorkUnits );
     pthread_mutex_unlock( &systemLock );
-
+    */
+    
+    pthread_mutex_lock( &systemLockOut );
+    // wait for work
+    if (head->next==NULL) {
+      pthread_mutex_unlock( &systemLockOut );
+      sched_yield();
+      continue;
+    }
+    struct QI * tmp=head;
+    head = head->next;
+    workUnit = head->value;
+    pthread_mutex_unlock( &systemLockOut );
+    free(tmp);
     // yield processor before moving on, just to exercise
     // system's out-of-order correctness
-    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
-    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+    //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+    //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
     
     workFunc( workUnit );
   }
@@ -250,12 +270,14 @@ void workScheduleInit( int numProcessors,
 
   //numWorkers = numProcessors*5;
   numWorkers = numProcessors + 1;
-  workFunc   = func;
 
-  dequeWorkUnits = createQueue();
+  workFunc   = func;
 
-  status = pthread_mutex_init( &systemLock, NULL );
-  if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
+  head=tail=RUNMALLOC(sizeof(struct QI));
+  head->next=NULL;
+  
+  status = pthread_mutex_init( &systemLockIn, NULL );
+  status = pthread_mutex_init( &systemLockOut, NULL );
 
   workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
 
@@ -293,10 +315,24 @@ void workScheduleSubmit( void* workUnit ) {
 */
 
 void workScheduleSubmit( void* workUnit ) {
-  pthread_mutex_lock  ( &systemLock );
-  addNewItemBack      ( dequeWorkUnits, workUnit );
-  pthread_cond_signal( &workAvailCond );
-  pthread_mutex_unlock( &systemLock );
+  /*
+   while(1){
+      if(pthread_mutex_trylock(&systemLock)==0){
+        addNewItemBack( dequeWorkUnits, workUnit );
+        break;
+      }
+    }
+    pthread_mutex_unlock( &systemLock );
+  */
+  
+  struct QI* item=RUNMALLOC(sizeof(struct QI));
+  item->value=workUnit;
+  item->next=NULL;
+  
+  pthread_mutex_lock  ( &systemLockIn );
+  tail->next=item;
+  tail=item;
+  pthread_mutex_unlock( &systemLockIn );
 }
 
 
@@ -306,11 +342,6 @@ void workScheduleBegin() {
   int i;
 
   // tell all workers to begin
-  pthread_mutex_lock    ( &systemLock );
-  systemStarted = 1;
-  pthread_cond_broadcast( &systemBeginCond );
-  pthread_mutex_unlock  ( &systemLock );  
-
   for( i = 0; i < numWorkers; ++i ) {
     pthread_join( workerArray[i], NULL );
   }