From 51d6764b2d76be7e87107c5bc10ae36c4aa7be3d Mon Sep 17 00:00:00 2001 From: bdemsky Date: Tue, 26 Oct 2010 22:18:58 +0000 Subject: [PATCH] single ended queue work stealing scheduler --- Robust/src/Runtime/garbage.c | 97 +++++++++++---- Robust/src/Runtime/memPool.h | 4 +- Robust/src/Runtime/squeue.h | 190 ++++++++++++++++++++++++++++++ Robust/src/Runtime/workschedule.c | 16 ++- Robust/src/buildscript | 9 ++ 5 files changed, 289 insertions(+), 27 deletions(-) create mode 100644 Robust/src/Runtime/squeue.h diff --git a/Robust/src/Runtime/garbage.c b/Robust/src/Runtime/garbage.c index 75d85032..262b82bd 100644 --- a/Robust/src/Runtime/garbage.c +++ b/Robust/src/Runtime/garbage.c @@ -10,7 +10,11 @@ #include "thread.h" #endif #ifdef MLP +#ifdef SQUEUE +#include "squeue.h" +#else #include "deque.h" +#endif #include "workschedule.h" extern int numWorkSchedWorkers; extern deque* deques; @@ -541,38 +545,25 @@ void collect(struct garbagelist * stackptr) { #endif #ifdef MLP +#ifdef SQUEUE { int i; deque* dq; - dequeNode* botNode; - int botIndx; - dequeNode* topNode; - int topIndx; - dequeNode* n; + dequeItem *di; int j; - int jLo; - int jHi; // goes over ready-to-run SESEs for( i = 0; i < numWorkSchedWorkers; ++i ) { dq = &(deques[i]); - botNode = dqDecodePtr( dq->bottom ); - botIndx = dqDecodeIdx( dq->bottom ); - - topNode = dqDecodePtr( dq->top ); - topIndx = dqDecodeIdx( dq->top ); + di=dq->head; - n = botNode; do { // check all the relevant indices of this // node in the deque, noting if we are in // the top/bottom node which can be partially // full - if( n == botNode ) { jLo = botIndx; } else { jLo = 0; } - if( n == topNode ) { jHi = topIndx; } else { jHi = DQNODE_ARRAYSIZE; } - - for( j = jLo; j < jHi; ++j ) { + // WHAT? //SESEcommon* common = (SESEcommon*) n->itsDataArr[j]; //if(common==seseCommon){ @@ -580,7 +571,7 @@ void collect(struct garbagelist * stackptr) { // continue; //} - SESEcommon* seseRec = (SESEcommon*) n->itsDataArr[j]; + SESEcommon* seseRec = (SESEcommon*) di->work; struct garbagelist* gl = (struct garbagelist*) &(seseRec[1]); struct garbagelist* glroot = gl; @@ -594,18 +585,82 @@ void collect(struct garbagelist * stackptr) { } gl = gl->next; } - } // we only have to move across the nodes // of the deque if the top and bottom are // not the same already + di=di->next; + } while( di !=NULL) ; + } + } +#else + { + int i; + deque* dq; + dequeNode* botNode; + int botIndx; + dequeNode* topNode; + int topIndx; + dequeNode* n; + int j; + int jLo; + int jHi; + + // goes over ready-to-run SESEs + for( i = 0; i < numWorkSchedWorkers; ++i ) { + dq = &(deques[i]); + + botNode = dqDecodePtr( dq->bottom ); + botIndx = dqDecodeIdx( dq->bottom ); + + topNode = dqDecodePtr( dq->top ); + topIndx = dqDecodeIdx( dq->top ); + + + n = botNode; + do { + // check all the relevant indices of this + // node in the deque, noting if we are in + // the top/bottom node which can be partially + // full + if( n == botNode ) { jLo = botIndx; } else { jLo = 0; } + if( n == topNode ) { jHi = topIndx; } else { jHi = DQNODE_ARRAYSIZE; } + + for( j = jLo; j < jHi; ++j ) { + + // WHAT? + //SESEcommon* common = (SESEcommon*) n->itsDataArr[j]; + //if(common==seseCommon){ + // continue; + //} + + SESEcommon* seseRec = (SESEcommon*) n->itsDataArr[j]; + + struct garbagelist* gl = (struct garbagelist*) &(seseRec[1]); + struct garbagelist* glroot = gl; + + updateAscendantSESE( seseRec ); + + while( gl != NULL ) { + int k; + for( k = 0; k < gl->size; k++ ) { + void* orig = gl->array[k]; + ENQUEUE( orig, gl->array[k] ); + } + gl = gl->next; + } + } + + // we only have to move across the nodes + // of the deque if the top and bottom are + // not the same already if( botNode != topNode ) { n = n->next; } } while( n != topNode ); } - - } + } +#endif #endif diff --git a/Robust/src/Runtime/memPool.h b/Robust/src/Runtime/memPool.h index 582c7e9f..900e8fd5 100644 --- a/Robust/src/Runtime/memPool.h +++ b/Robust/src/Runtime/memPool.h @@ -83,8 +83,8 @@ static inline void poolfreeinto( MemPool* p, void* ptr ) { tailCurrent = p->tail; tailActual = (MemPoolItem*) CAS( &(p->tail), // ptr to set - (long) tailCurrent, // current tail's next should be NULL - (long) tailNew // try set to our new tail + (INTPTR) tailCurrent, // current tail's next should be NULL + (INTPTR) tailNew // try set to our new tail ); if( tailActual == tailCurrent ) { // success, update tail diff --git a/Robust/src/Runtime/squeue.h b/Robust/src/Runtime/squeue.h new file mode 100644 index 00000000..38325e02 --- /dev/null +++ b/Robust/src/Runtime/squeue.h @@ -0,0 +1,190 @@ +#ifndef ___MYQUE_H__ +#define ___MYQUE_H__ + +////////////////////////////////////////////////////////// +// +// A memory pool implements POOLCREATE, POOLALLOC and +// POOLFREE to improve memory allocation by reusing records. +// +// This implementation uses a lock-free singly-linked list +// to store reusable records. The list is initialized with +// one valid record, and the list is considered empty when +// it has only one record; this allows the enqueue operation's +// CAS to assume tail can always be dereferenced. +// +// poolfree adds newly freed records to the list BACK +// +// poolalloc either takes records from FRONT or mallocs +// +////////////////////////////////////////////////////////// + +#include +#include "runtime.h" +#include "mem.h" +#include "mlp_lock.h" +#include "memPool.h" + +#define CACHELINESIZE 64 +#define DQ_POP_EMPTY NULL +#define DQ_POP_ABORT NULL + + +typedef struct dequeItem_t { + void *otherqueue; + struct dequeItem_t * next; + volatile void *work; +} dequeItem; + +typedef struct deque_t { + dequeItem* head; + + // avoid cache line contention between producer/consumer... + char buffer[CACHELINESIZE - sizeof(void*)]; + + dequeItem* tail; + + MemPool objret; +} deque; + +#define EXTRACTPTR(x) (x&0x0000ffffffffffff) +#define INCREMENTTAG 0x0001000000000000 + +// the memory pool must always have at least one +// item in it +static void dqInit(deque *q) { + q->head = calloc( 1, sizeof(dequeItem) ); + q->head->next = NULL; + q->tail = q->head; + q->objret.itemSize=sizeof(dequeItem); + q->objret.head=calloc(1, sizeof(dequeItem)); + q->objret.head->next=NULL; + q->objret.tail=q->objret.head; +} + +static inline void tagpoolfreeinto( MemPool* p, void* ptr, void *realptr ) { + MemPoolItem* tailCurrent; + MemPoolItem* tailActual; + + // set up the now unneeded record to as the tail of the + // free list by treating its first bytes as next pointer, + MemPoolItem* tailNew = (MemPoolItem*) realptr; + tailNew->next = NULL; + + while( 1 ) { + // make sure the null happens before the insertion, + // also makes sure that we reload tailCurrent, etc.. + BARRIER(); + + tailCurrent = p->tail; + tailActual = (MemPoolItem*) + CAS( &(p->tail), // ptr to set + (INTPTR) tailCurrent, // current tail's next should be NULL + (INTPTR) realptr); // try set to our new tail + + if( tailActual == tailCurrent ) { + // success, update tail + tailCurrent->next = (MemPoolItem *) ptr; + return; + } + + // if CAS failed, retry entire operation + } +} + +static inline void* tagpoolalloc( MemPool* p ) { + + // to protect CAS in poolfree from dereferencing + // null, treat the queue as empty when there is + // only one item. The dequeue operation is only + // executed by the thread that owns the pool, so + // it doesn't require an atomic op + MemPoolItem* headCurrent = p->head; + MemPoolItem* realHead=(MemPoolItem *) EXTRACTPTR((INTPTR)headCurrent); + MemPoolItem* next=realHead->next; + int i; + if(next == NULL) { + // only one item, so don't take from pool + return (void*) RUNMALLOC( p->itemSize ); + } + + p->head = next; + + ////////////////////////////////////////////////////////// + // + // + // static inline void prefetch(void *x) + // { + // asm volatile("prefetcht0 %0" :: "m" (*(unsigned long *)x)); + // } + // + // + // but this built-in gcc one seems the most portable: + ////////////////////////////////////////////////////////// + //__builtin_prefetch( &(p->head->next) ); + MemPoolItem* realNext=(MemPoolItem *) EXTRACTPTR((INTPTR)next); + asm volatile( "prefetcht0 (%0)" :: "r" (realNext)); + realNext=(MemPoolItem*)(((char *)realNext)+CACHELINESIZE); + asm volatile( "prefetcht0 (%0)" :: "r" (realNext)); + + return (void*)headCurrent; +} + + + +// CAS +// in: a ptr, expected old, desired new +// return: actual old +// +// Pass in a ptr, what you expect the old value is and +// what you want the new value to be. +// The CAS returns what the value is actually: if it matches +// your proposed old value then you assume the update was successful, +// otherwise someone did CAS before you, so try again (the return +// value is the old value you will pass next time.) + +static inline void dqPushBottom( deque* p, void* work ) { + dequeItem *ptr=(dequeItem *) tagpoolalloc(&p->objret); + // dequeItem *ptr=(dequeItem *) calloc(1,sizeof(dequeItem)); + dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr); + ptr=(dequeItem *) (((INTPTR)ptr)+INCREMENTTAG); + realptr->work=work; + BARRIER(); + p->tail->next=ptr; + p->tail=realptr; +} + +static inline void* dqPopTop(deque *p) { + dequeItem *ptr=p->head; + dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr); + dequeItem *next=realptr->next; + //remove if we can..steal work no matter what + if (likely(next!=NULL)) { + if (((dequeItem *)CAS(&(p->head),(INTPTR)ptr, (INTPTR)next))!=ptr) + return DQ_POP_EMPTY; + void * item=NULL; + item=(void *)LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item); + realptr->next=NULL; + BARRIER(); + tagpoolfreeinto(&p->objret,ptr, realptr); + return item; + } else { + void * item=NULL; + item=(void *) LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item); + return item; + } +} + +#define dqPopBottom dqPopTop + + +#endif // ___MEMPOOL_H__ + + + + + + + + + + diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c index 24201db6..e3c9db2a 100644 --- a/Robust/src/Runtime/workschedule.c +++ b/Robust/src/Runtime/workschedule.c @@ -7,7 +7,11 @@ #include "mlp_runtime.h" #include "psemaphore.h" #include "coreprof/coreprof.h" +#ifdef SQUEUE +#include "squeue.h" +#else #include "deque.h" +#endif #ifdef RCR #include "rcr_runtime.h" #include "trqueue.h" @@ -185,7 +189,7 @@ void* workerMain( void* arg ) { workUnit = dqPopBottom( myDeque ); -#ifdef DEBUG_DEQUE +#if defined(DEBUG_DEQUE)&&!defined(SQUEUE) if( workUnit == 0x0 ) { printf( "Got invalid work from the deque bottom.\n" ); } @@ -202,14 +206,18 @@ void* workerMain( void* arg ) { for( i = 0; i < numWorkSchedWorkers - 1; ++i ) { workUnit = dqPopTop( &(deques[lastVictim]) ); -#ifdef DEBUG_DEQUE +#if defined(DEBUG_DEQUE)&&!defined(SQUEUE) if( workUnit == 0x0 ) { printf( "Got invalid work from the deque top.\n" ); } #endif - if( workUnit != DQ_POP_ABORT && - workUnit != DQ_POP_EMPTY ) { +#ifdef SQUEUE + if( workUnit != DQ_POP_EMPTY ) { +#else + if( workUnit != DQ_POP_ABORT && + workUnit != DQ_POP_EMPTY ) { +#endif // successful steal! haveWork = TRUE; break; diff --git a/Robust/src/buildscript b/Robust/src/buildscript index e36d9762..8578175d 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -26,6 +26,7 @@ echo "-ooojava " echo -ooodebug general OOOJava debugging messages echo -ooodebug-disable-task-mem-pool this is a tricky module, disable for simpler runtime echo -rcr turn on runtime conflict resolver +echo -squeue use single queue echo echo Disjoint Reachability Analysis options echo -disjoint enable analysis @@ -152,6 +153,7 @@ STMRUNTIME=$ROBUSTROOT/Runtime/STM/ DSMRECOVERYRUNTIME=$ROBUSTROOT/Runtime/DSTM/interface_recovery/ REPAIRROOT=~/research/Repair/RepairCompiler/ CURDIR=`pwd` +SQUEUE=false DSMFLAG=false DSMRECOVERY=false FASTMEMCPY=false @@ -363,6 +365,10 @@ shift elif [[ $1 = '-mac' ]] then EXTRAOPTIONS="$EXTRAOPTIONS -DMAC" +elif [[ $1 = '-squeue' ]] +then +EXTRAOPTIONS="$EXTRAOPTIONS -DSQUEUE" +SQUEUE=true elif [[ $1 = '-profile' ]] then PROFILEFLAG=true @@ -1225,8 +1231,11 @@ then FILES="$FILES $ROBUSTROOT/Runtime/mlp_runtime.c" FILES="$FILES $ROBUSTROOT/Runtime/psemaphore.c" FILES="$FILES $ROBUSTROOT/Runtime/workschedule.c" +if ! $SQUEUE +then FILES="$FILES $ROBUSTROOT/Runtime/deque.c" fi +fi if $RECOVERFLAG then -- 2.34.1