From 32cfb6d58316cfb4fca972f04541061b60f64c40 Mon Sep 17 00:00:00 2001 From: yeom Date: Wed, 27 Jan 2010 18:19:52 +0000 Subject: [PATCH] changes, use atomic operation for managing dependency counter. --- Robust/src/IR/Flat/BuildCode.java | 77 ++++++++++++++++++------------- Robust/src/Runtime/mlp_lock.h | 26 +++++++++++ Robust/src/Runtime/mlp_runtime.h | 4 +- Robust/src/Runtime/workschedule.c | 3 +- 4 files changed, 75 insertions(+), 35 deletions(-) create mode 100644 Robust/src/Runtime/mlp_lock.h diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index afca5165..803c623e 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -3349,14 +3349,12 @@ public class BuildCode { //output.println(" addNewItem( seseCallStack, (void*) seseToIssue);"); // fill in common data + output.println(" int localCount=0;"); output.println(" seseToIssue->common.classID = "+fsen.getIdentifier()+";"); output.println(" psem_init( &(seseToIssue->common.stallSem) );"); output.println(" seseToIssue->common.forwardList = createQueue();"); - //eom - output.println(" seseToIssue->common.connectedList = createQueue();"); - // - output.println(" seseToIssue->common.unresolvedDependencies = 0;"); + output.println(" seseToIssue->common.unresolvedDependencies = 10000;"); output.println(" pthread_cond_init( &(seseToIssue->common.doneCond), NULL );"); output.println(" seseToIssue->common.doneExecuting = FALSE;"); output.println(" pthread_cond_init( &(seseToIssue->common.runningChildrenCond), NULL );"); @@ -3389,6 +3387,11 @@ public class BuildCode { } } + // before potentially adding this SESE to other forwarding lists, + // create it's lock and take it immediately + output.println(" pthread_mutex_init( &(seseToIssue->common.lock), NULL );"); + output.println(" pthread_mutex_lock( &(seseToIssue->common.lock) );"); + // count up memory conflict dependencies, // eom ConflictGraph graph=null; @@ -3406,6 +3409,7 @@ public class BuildCode { output.println(); output.println(" /*add waiting queue element*/"); + output.println(" pthread_mutex_lock( &(parentCommon->lock) );"); output.println(" struct Queue* newWaitingItemQueue=createQueue();"); Set waitingQueueSet=graph.getWaitingElementSetBySESEID(fsen @@ -3417,7 +3421,7 @@ public class BuildCode { output.println(" WaitingElement* newElement=NULL;"); output.println(" struct Queue* list=NULL;"); output.println(" struct QueueItem* newQItem=NULL;"); - output.println(" pthread_mutex_lock( &(parentCommon->lock) );"); +// output.println(" pthread_mutex_lock( &(parentCommon->lock) );"); for (Iterator iterator = waitingQueueSet.iterator(); iterator .hasNext();) { WaitingElement waitingElement = (WaitingElement) iterator.next(); @@ -3437,20 +3441,19 @@ public class BuildCode { + waitingElement.getWaitingID() + ",newElement);"); output .println(" addNewItem(newWaitingItemQueue,newQItem);"); - output - .println(" ++(seseToIssue->common.unresolvedDependencies);"); +// output.println(" ++(seseToIssue->common.unresolvedDependencies);"); + output.println(" ++(localCount);"); output .println(); } - output - .println(" pthread_mutex_unlock( &(parentCommon->lock) );"); +// output.println(" pthread_mutex_unlock( &(parentCommon->lock) );"); output.println(" }"); } output.println(" /*decide whether it is runnable or not in regarding to memory conflicts*/"); output.println(" {"); output.println(" if( !isEmpty(newWaitingItemQueue) ){"); - output.println(" pthread_mutex_lock( &(parentCommon->lock) );"); +// output.println(" pthread_mutex_lock( &(parentCommon->lock) );"); output.println(" int idx;"); output.println(" for(idx = 0 ; idx < numRelatedWaitingQueue ; idx++){"); output.println(" struct Queue *allocQueue=parentCommon->allocSiteArray[idx].waitingQueue;"); @@ -3458,24 +3461,22 @@ public class BuildCode { output.println(" struct QueueItem* nextQItem=getHead(allocQueue);"); output.println(" while( nextQItem != NULL ){"); output.println(" if(contains(newWaitingItemQueue,nextQItem) && isRunnable(allocQueue,nextQItem)){"); - output.println(" if(seseToIssue->common.unresolvedDependencies>0)"); - output.println(" --(seseToIssue->common.unresolvedDependencies);"); +// output.println(" if(seseToIssue->common.unresolvedDependencies>0)"); +// output.println(" --(seseToIssue->common.unresolvedDependencies);"); + output.println(" --(localCount);"); output.println(" }"); output.println(" nextQItem=getNextQueueItem(nextQItem);"); output.println(" }"); output.println(" }"); output.println(" }"); - output.println(" pthread_mutex_unlock( &(parentCommon->lock) );"); + output.println(" }"); output.println(" }"); + output.println(" pthread_mutex_unlock( &(parentCommon->lock) );"); output.println(); } - // before potentially adding this SESE to other forwarding lists, - // create it's lock and take it immediately - output.println(" pthread_mutex_init( &(seseToIssue->common.lock), NULL );"); - output.println(" pthread_mutex_lock( &(seseToIssue->common.lock) );"); // eom // output.println(" pthread_mutex_init( &(seseToIssue->common.waitingQueueLock), NULL );"); // @@ -3495,7 +3496,8 @@ public class BuildCode { output.println(" }"); output.println(" if( !src->doneExecuting ) {"); output.println(" addNewItem( src->forwardList, seseToIssue );"); - output.println(" ++(seseToIssue->common.unresolvedDependencies);"); +// output.println(" ++(seseToIssue->common.unresolvedDependencies);"); + output.println(" ++(localCount);"); output.println(" }"); output.println(" pthread_mutex_unlock( &(src->lock) );"); output.println(" }"); @@ -3522,7 +3524,8 @@ public class BuildCode { output.println(" seseToIssue != peekItem( src->forwardList ) ) {"); output.println(" if( !src->doneExecuting ) {"); output.println(" addNewItem( src->forwardList, seseToIssue );"); - output.println(" ++(seseToIssue->common.unresolvedDependencies);"); +// output.println(" ++(seseToIssue->common.unresolvedDependencies);"); + output.println(" ++(localCount);"); output.println(" }"); output.println(" }"); output.println(" pthread_mutex_unlock( &(src->lock) );"); @@ -3569,15 +3572,23 @@ public class BuildCode { } } - + + // release this SESE for siblings to update its dependencies or, + // eventually, for it to mark itself finished + output.println(" pthread_mutex_unlock( &(seseToIssue->common.lock) );"); + // if there were no outstanding dependencies, issue here + output.println(" if( atomic_sub_and_test(10000-localCount,&(seseToIssue->common.unresolvedDependencies) ) ) {"); + output.println(" workScheduleSubmit( (void*)seseToIssue );"); + output.println(" }"); + /* output.println(" if( seseToIssue->common.unresolvedDependencies == 0 ) {"); output.println(" workScheduleSubmit( (void*)seseToIssue );"); output.println(" }"); - + */ // release this SESE for siblings to update its dependencies or, // eventually, for it to mark itself finished - output.println(" pthread_mutex_unlock( &(seseToIssue->common.lock) );"); +// output.println(" pthread_mutex_unlock( &(seseToIssue->common.lock) );"); output.println(" }"); } @@ -3662,12 +3673,13 @@ public class BuildCode { // decrement dependency count for all SESE's on your forwarding list output.println(" while( !isEmpty( "+com+".forwardList ) ) {"); output.println(" SESEcommon* consumer = (SESEcommon*) getItem( "+com+".forwardList );"); - output.println(" pthread_mutex_lock( &(consumer->lock) );"); - output.println(" --(consumer->unresolvedDependencies);"); - output.println(" if( consumer->unresolvedDependencies == 0 ) {"); +// output.println(" pthread_mutex_lock( &(consumer->lock) );"); +// output.println(" --(consumer->unresolvedDependencies);"); +// output.println(" if( consumer->unresolvedDependencies == 0 ) {"); + output.println(" if( atomic_sub_and_test(1, &(consumer->unresolvedDependencies)) ){"); output.println(" workScheduleSubmit( (void*)consumer );"); output.println(" }"); - output.println(" pthread_mutex_unlock( &(consumer->lock) );"); +// output.println(" pthread_mutex_unlock( &(consumer->lock) );"); output.println(" }"); @@ -3724,15 +3736,16 @@ public class BuildCode { output.println(" }"); output.println(" }else{"); output.println(" if(isResolved){"); - output.println(" pthread_mutex_lock( &(seseNextItem->lock) );"); - output.println(" if(seseNextItem->unresolvedDependencies > 0){"); - output.println(" --(seseNextItem->unresolvedDependencies);"); - output.println(" if( seseNextItem->unresolvedDependencies == 0){"); +// output.println(" pthread_mutex_lock( &(seseNextItem->lock) );"); +// output.println(" if(seseNextItem->unresolvedDependencies > 0){"); +// output.println(" --(seseNextItem->unresolvedDependencies);"); +// output.println(" if( seseNextItem->unresolvedDependencies == 0){"); //output.println(" workScheduleSubmit( (void*)nextItem);"); + output.println(" if( atomic_sub_and_test(1, &(seseNextItem->unresolvedDependencies)) ){"); output.println(" addNewItem(launchQueue,(void*)seseNextItem);"); output.println(" }"); - output.println(" }"); - output.println(" pthread_mutex_unlock( &(seseNextItem->lock) );"); +// output.println(" }"); +// output.println(" pthread_mutex_unlock( &(seseNextItem->lock) );"); output.println(" }"); output.println(" nextQItem=getNextQueueItem(nextQItem);"); output.println(" }"); diff --git a/Robust/src/Runtime/mlp_lock.h b/Robust/src/Runtime/mlp_lock.h new file mode 100644 index 00000000..18bcf4d5 --- /dev/null +++ b/Robust/src/Runtime/mlp_lock.h @@ -0,0 +1,26 @@ +#define LOCK_PREFIX \ + ".section .smp_locks,\"a\"\n" \ + " .align 4\n" \ + " .long 661f\n" /* address */\ + ".previous\n" \ + "661:\n\tlock; " + + +static inline void atomic_dec(volatile int *v) { + __asm__ __volatile__ (LOCK_PREFIX "decl %0" + : "+m" (*v)); +} + +static inline void atomic_inc(volatile int *v) { + __asm__ __volatile__ (LOCK_PREFIX "incl %0" + : "+m" (*v)); +} + +static inline int atomic_sub_and_test(int i, volatile int *v) { + unsigned char c; + + __asm__ __volatile__ (LOCK_PREFIX "subl %2,%0; sete %1" + : "+m" (*v), "=qm" (c) + : "ir" (i) : "memory"); + return c; +} diff --git a/Robust/src/Runtime/mlp_runtime.h b/Robust/src/Runtime/mlp_runtime.h index 61ebdd34..4ea520e3 100644 --- a/Robust/src/Runtime/mlp_runtime.h +++ b/Robust/src/Runtime/mlp_runtime.h @@ -5,7 +5,7 @@ #include #include "Queue.h" #include "psemaphore.h" - +#include "mlp_lock.h" #ifndef FALSE #define FALSE 0 @@ -49,7 +49,7 @@ typedef struct SESEcommon_t { pthread_mutex_t lock; struct Queue* forwardList; - int unresolvedDependencies; + volatile int unresolvedDependencies; pthread_cond_t doneCond; int doneExecuting; diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c index a83435c3..8b5b3c71 100644 --- a/Robust/src/Runtime/workschedule.c +++ b/Robust/src/Runtime/workschedule.c @@ -248,7 +248,8 @@ void workScheduleInit( int numProcessors, pthread_mutex_init(&gclistlock, NULL); pthread_cond_init(&gccond, NULL); - numWorkers = numProcessors*5; + //numWorkers = numProcessors*5; + numWorkers = numProcessors + 1; workFunc = func; dequeWorkUnits = createQueue(); -- 2.34.1