BUILDSCRIPT=../../../buildscript
-COREPROFOVERFLOW= #-coreprof-checkoverflow
+COREPROFOVERFLOW= -coreprof-checkoverflow
USECOREPROF= -coreprof $(COREPROFOVERFLOW) \
-coreprof-eventwords 1024*1024*512 \
-coreprof-enable cpe_main \
-coreprof-enable cpe_runmalloc \
-coreprof-enable cpe_taskexecute \
-coreprof-enable cpe_taskdispatch \
- -coreprof-enable cpe_poolalloc
+ -coreprof-enable cpe_poolalloc \
+ -coreprof-enable cpe_taskretire \
+ -coreprof-enable cpe_workschedgrab
# -coreprof-enable cpe_preparememq
# -coreprof-enable cpe_runfree \
# -coreprof-enable cpe_count_poolalloc \
# -coreprof-enable cpe_count_poolreuse \
-# -coreprof-enable cpe_workschedgrab \
-# -coreprof-enable cpe_taskretire \
# -coreprof-enable cpe_taskstallvar \
# -coreprof-enable cpe_taskstallmem
--- /dev/null
+////////////////////////////////////////////////////////////////
+//
+// This is an implementation of the structure described in
+// A Dynamic-Sized Nonblocking Work Stealing Deque
+// Hendler, Lev, Moir, and Shavit
+//
+// The bottom and top values for the deque must be CAS-able
+// and fit into 64 bits. Our strategy for this is:
+//
+// 19-bit Tag 36-bit Node Pointer 9-bit Index
+// +-----------+-------------------------+------------+
+// | 63 ... 45 | 44 ... 9 | 8 ... 0 |
+// +-----------+-------------------------+------------+
+//
+// Let's call the encoded info E. To retrieve the values:
+// tag = (0xffffe00000000000 & E) >> 45;
+// ptr = (0x00001ffffffffe00 & E) << 3;
+// idx = (0x00000000000001ff & E);
+//
+// Increment the tag without decrypting:
+// E = (0x00001fffffffffff | E) + 1;
+//
+// Increment (decrement) the index when it is not equal to
+// MAXINDEX (0) with E++ (E--).
+//
+// x86 64-bit processors currently only use the lowest 48 bits for
+// virtual addresses, source:
+// http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
+// And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
+// of a 64-bit pointer are always zero. This means if we are only
+// alloted 36 bits to store a pointer to a Node we have
+// 48 - 3 - 36 = 9 bits that could be lost. Instead of aligning Node
+// pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
+// sure the lower 12 bits of the address are zero. THEREFORE:
+// Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
+// we can ecnode the rest in 36 bits without a loss of information.
+//
+////////////////////////////////////////////////////////////////
+
+#ifdef DEBUG_DEQUE
+#include <stdlib.h>
+#include <stdio.h>
+#endif
+
+#include "deque.h"
+
+
+void* DQ_POP_EMPTY = (void*)0x1;
+void* DQ_POP_ABORT = (void*)0x3;
+
+
+// define a 19-bit dummy tag for the bottom
+// value with a pattern that will expose errors
+#define BOTTOM_NULL_TAG 0x40001
+
+
+// there are 9 bits for the index into a Node's array,
+// so 2^9 = 512 elements per node of the deque
+#define DQNODE_ARRAYSIZE 512
+
+
+typedef struct dequeNode_t {
+ void* itsDataArr[DQNODE_ARRAYSIZE];
+ struct dequeNode_t* next;
+ struct dequeNode_t* prev;
+} dequeNode;
+
+
+// the dequeNode struct must be 4096-byte aligned,
+// see above, so use the following magic to ask
+// the allocator for a space that wastes 4095 bytes
+// but gaurantees the address of the struct within
+// that space is 4096-aligned
+const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
+
+static inline dequeNode* dqGet4096aligned( void* fromAllocator ) {
+ return (dequeNode*) ( ((INTPTR)fromAllocator) & (~4095) );
+}
+
+
+
+static inline int dqDecodeTag( INTPTR E ) { return (int) ((0xffffe00000000000 & E) >> 45); }
+static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) << 3); }
+static inline int dqDecodeIdx( INTPTR E ) { return (int) ((0x00000000000001ff & E) ); }
+
+
+
+static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
+ INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
+ (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
+
+ INTPTR E =
+ (((INTPTR)tag) << 45) |
+ (ptrE) |
+ ((INTPTR)idx);
+#ifdef DEBUG_DEQUE
+ int tagOut = dqDecodeTag( E );
+ if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
+
+ dequeNode* ptrOut = dqDecodePtr( E );
+ if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
+
+ int idxOut = dqDecodeIdx( E );
+ if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
+#endif
+ return E;
+}
+
+
+static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
+ dequeNode* botNode = dqDecodePtr( bottom );
+ int botIndx = dqDecodeIdx( bottom );
+ dequeNode* topNode = dqDecodePtr( top );
+ int topIndx = dqDecodeIdx( top );
+
+ if( (botNode == topNode) &&
+ (botIndx == topIndx || botIndx == (topIndx+1))
+ ) {
+ return 1;
+ }
+
+ if( (botNode == topNode->next) &&
+ (botIndx == 0) &&
+ (topIndx == DQNODE_ARRAYSIZE - 1)
+ ) {
+ return 1;
+ }
+
+ return 0;
+}
+
+
+
+void dqInit( deque* dq ) {
+
+ dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
+
+ dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
+ dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
+
+ a->next = b;
+ b->prev = a;
+
+ dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
+ dq->top = dqEncode( 0, a, DQNODE_ARRAYSIZE - 1 );
+}
+
+
+void dqPushBottom( deque* dq, void* item ) {
+
+ dequeNode* currNode = dqDecodePtr( dq->bottom );
+ int currIndx = dqDecodeIdx( dq->bottom );
+
+ currNode->itsDataArr[currIndx] = item;
+
+ dequeNode* newNode;
+ int newIndx;
+
+ if( currIndx != 0 ) {
+ newNode = currNode;
+ newIndx = currIndx - 1;
+
+ } else {
+ newNode = dqGet4096aligned( poolalloc( dq->memPool ) );
+ newNode->next = currNode;
+ currNode->prev = newNode;
+ newIndx = DQNODE_ARRAYSIZE - 1;
+ }
+
+ dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
+}
+
+
+void* dqPopTop( deque* dq ) {
+
+ INTPTR currTop = dq->top;
+
+ int currTopTag = dqDecodeTag( currTop );
+ dequeNode* currTopNode = dqDecodePtr( currTop );
+ int currTopIndx = dqDecodeIdx( currTop );
+
+ INTPTR currBottom = dq->bottom;
+
+ if( dqIndicateEmpty( currBottom, currTop ) ) {
+ if( currTop == dq->top ) {
+ return DQ_POP_EMPTY;
+ } else {
+ return DQ_POP_ABORT;
+ }
+ }
+
+ dequeNode* nodeToFree;
+ int newTopTag;
+ dequeNode* newTopNode;
+ int newTopIndx;
+
+ if( currTopIndx != 0 ) {
+ nodeToFree = NULL;
+ newTopTag = currTopTag;
+ newTopNode = currTopNode;
+ newTopIndx = currTopIndx - 1;
+
+ } else {
+ nodeToFree = currTopNode->next;
+ newTopTag = currTopTag + 1;
+ newTopNode = currTopNode->prev;
+ newTopIndx = DQNODE_ARRAYSIZE - 1;
+ }
+
+ void* retVal = currTopNode->itsDataArr[currTopIndx];
+
+ INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
+
+ INTPTR actualTop = (INTPTR)
+ CAS( &(dq->top), // location
+ currTop, // expected value
+ newTop ); // desired value
+
+ if( actualTop == currTop ) {
+ // CAS succeeded
+ if( nodeToFree != NULL ) {
+ poolfreeinto( dq->memPool, nodeToFree );
+ }
+ return retVal;
+
+ } else {
+ return DQ_POP_ABORT;
+ }
+}
+
+
+void* dqPopBottom ( deque* dq ) {
+
+ INTPTR oldBot = dq->bottom;
+
+ dequeNode* oldBotNode = dqDecodePtr( oldBot );
+ int oldBotIndx = dqDecodeIdx( oldBot );
+
+ dequeNode* newBotNode;
+ int newBotIndx;
+
+ if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
+ newBotNode = oldBotNode;
+ newBotIndx = oldBotIndx + 1;
+
+ } else {
+ newBotNode = oldBotNode->next;
+ newBotIndx = 0;
+ }
+
+ void* retVal = newBotNode->itsDataArr[newBotIndx];
+
+ dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
+
+ INTPTR currTop = dq->top;
+
+ int currTopTag = dqDecodeTag( currTop );
+ dequeNode* currTopNode = dqDecodePtr( currTop );
+ int currTopIndx = dqDecodeIdx( currTop );
+
+ if( oldBotNode == currTopNode &&
+ oldBotIndx == currTopIndx ) {
+ dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
+ return DQ_POP_EMPTY;
+
+ } else if( newBotNode == currTopNode &&
+ newBotIndx == currTopIndx ) {
+ INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
+
+ INTPTR actualTop = (INTPTR)
+ CAS( &(dq->top), // location
+ currTop, // expected value
+ newTop ); // desired value
+
+ if( actualTop == currTop ) {
+ // CAS succeeded
+ if( oldBotNode != newBotNode ) {
+ poolfreeinto( dq->memPool, oldBotNode );
+ }
+ return retVal;
+
+ } else {
+ dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
+ return DQ_POP_EMPTY;
+ }
+
+ } else {
+ if( oldBotNode != newBotNode ) {
+ poolfreeinto( dq->memPool, oldBotNode );
+ }
+ return retVal;
+ }
+}
--- /dev/null
+#ifndef ___DEQUE_H__
+#define ___DEQUE_H__
+
+#include "runtime.h"
+#include "memPool.h"
+
+
+// the bottom and top 64-bit values encode
+// several sub-values, see deque.c
+typedef struct deque_t {
+ MemPool* memPool;
+ INTPTR bottom;
+
+ // force bottom and top to different cache lines
+ char buffer[CACHELINESIZE];
+
+ INTPTR top;
+} deque;
+
+
+void dqInit ( deque* dq );
+void dqPushBottom( deque* dq, void* item );
+void* dqPopTop ( deque* dq );
+void* dqPopBottom ( deque* dq );
+
+
+// pop operations may return these values
+// instead of an item
+extern void* DQ_POP_EMPTY;
+extern void* DQ_POP_ABORT;
+
+
+//void dq_take ( deque* sem, struct garbagelist* gl );
+
+
+#endif // ___DEQUE_H__
//////////////////////////////////////////////////////////
#include <stdlib.h>
+#include "runtime.h"
#include "mem.h"
#include "mlp_lock.h"
-// The cache line size is set for the AMD Opteron 6168 (dc-10)
-// that has L1 and L2 cache line sizes of 64 bytes. Source:
-// http://www.cs.virginia.edu/~skadron/cs451/opteron/opteron.ppt
-#define CACHELINESIZE 64
-
typedef struct MemPoolItem_t {
void* next;
if(next == NULL) {
// only one item, so don't take from pool
- return RUNMALLOC( p->itemSize );
+ return (void*) RUNMALLOC( p->itemSize );
}
p->head = next;
//__builtin_prefetch( &(p->head->next) );
asm volatile( "prefetcht0 (%0)" :: "r" (next));
- return headCurrent;
+ return (void*)headCurrent;
}
#endif
#endif
+#ifndef CACHELINESIZE
+// The L1 and L2 cache line size for the
+// AMD Opteron 6168 (dc-10) is 64 bytes. Source:
+// http://www.cs.virginia.edu/~skadron/cs451/opteron/opteron.ppt
+#define CACHELINESIZE 64
+#endif
+
extern void * curr_heapbase;
extern void * curr_heaptop;
+
+
+
// for convenience
typedef struct Queue deq;
WorkerData* myData = (WorkerData*) arg;
int oldState;
int haveWork;
- struct garbagelist emptygarbagelist={0,NULL};
+
+ // the worker threads really have no context relevant to the
+ // user program, so build an empty garbage list struct to
+ // pass to the collector if collection occurs
+ struct garbagelist emptygarbagelist = { 0, NULL };
// once-per-thread stuff
CP_CREATE();
//allocate task record queue
pthread_t thread;
pthread_attr_t nattr;
- pthread_attr_init(&nattr);
- pthread_attr_setdetachstate(&nattr, PTHREAD_CREATE_DETACHED);
- if (TRqueue==NULL)
- TRqueue=allocTR();
+ pthread_attr_init( &nattr );
+ pthread_attr_setdetachstate( &nattr, PTHREAD_CREATE_DETACHED );
+
+ if( TRqueue == NULL ) {
+ TRqueue = allocTR();
+ }
+
int status = pthread_create( &thread,
NULL,
workerTR,
- (void*) TRqueue);
- pthread_attr_destroy(&nattr);
+ (void*) TRqueue );
+
+ pthread_attr_destroy( &nattr );
+
if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
#endif
+
//pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
//pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, &oldState );
- // then continue to process work
- //NOTE: ADD US TO THE GC LIST
-
- pthread_mutex_lock(&gclistlock);
- threadcount++;
- litem.prev=NULL;
- litem.next=list;
- if(list!=NULL)
- list->prev=&litem;
- list=&litem;
- pthread_mutex_unlock(&gclistlock);
+ // Add this worker to the gc list
+ pthread_mutex_lock( &gclistlock );
+ threadcount++;
+ litem.prev = NULL;
+ litem.next = list;
+ if( list != NULL )
+ list->prev = &litem;
+ list = &litem;
+ pthread_mutex_unlock( &gclistlock );
- //ALSO CREATE EMPTY GARBAGELIST TO PASS TO COLLECTOR
+ // then continue to process work
while( 1 ) {
// wait for work
#ifdef CP_EVENTID_WORKSCHEDGRAB
CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
#endif
+
haveWork = FALSE;
while( !haveWork ) {
+
//NOTE...Fix these things...
pthread_mutex_lock( &systemLockOut );
if( headqi->next == NULL ) {
pthread_mutex_unlock( &systemLockOut );
+
//NOTE: Do a check to see if we need to collect..
- if (unlikely(needtocollect)) checkcollect(&emptygarbagelist);
+ if( unlikely( needtocollect ) ) {
+ checkcollect( &emptygarbagelist );
+ }
+
sched_yield();
continue;
} else {
haveWork = TRUE;
}
}
- struct QI * tmp=headqi;
- headqi = headqi->next;
- workUnit = headqi->value;
+
+ struct QI* tmp = headqi;
+ headqi = headqi->next;
+ workUnit = headqi->value;
pthread_mutex_unlock( &systemLockOut );
free( tmp );
+
#ifdef CP_EVENTID_WORKSCHEDGRAB
CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
#endif
- //let GC see current work
- litem.seseCommon=(void*)workUnit;
+ // let GC see current work
+ litem.seseCommon = (void*)workUnit;
- //unclear how useful this is
- if (unlikely(needtocollect)) checkcollect(&emptygarbagelist);
+ // unclear how useful this is
+ if( unlikely( needtocollect ) ) {
+ checkcollect( &emptygarbagelist );
+ }
workFunc( workUnit );
- }
+ }
+
- //NOTE: Remove from GC LIST DOWN HERE....
- pthread_mutex_lock(&gclistlock);
+ // remove from GC list
+ pthread_mutex_lock( &gclistlock );
threadcount--;
- if (litem.prev==NULL) {
- list=litem.next;
+ if( litem.prev == NULL ) {
+ list = litem.next;
} else {
- litem.prev->next=litem.next;
+ litem.prev->next = litem.next;
}
- if (litem.next!=NULL) {
- litem.next->prev=litem.prev;
+ if( litem.next != NULL ) {
+ litem.next->prev = litem.prev;
}
- pthread_mutex_unlock(&gclistlock);
+ pthread_mutex_unlock( &gclistlock );
//pthread_cleanup_pop( 0 );
}
}
+
void workScheduleSubmit( void* workUnit ) {
struct QI* item=RUNMALLOC(sizeof(struct QI));
item->value=workUnit;
// should not expect to return from this
void workScheduleBegin();
+
+
+
extern int threadcount;
extern pthread_mutex_t gclock;
extern pthread_mutex_t gclistlock;
-extern pthread_cond_t gccond;
+extern pthread_cond_t gccond;
struct QI {
struct QI * next;
struct QI * tailqi;
+
+
#endif /* __WORK_SCHEDULE__ */
--- /dev/null
+PROGRAM=test-deque
+
+DQDIR=../../Runtime
+DEFS= -D "RUNMALLOC=malloc" -D "RUNFREE=free" -DBIT64 -DDEBUG_DEQUE
+FLAGS= -m64 -g #-O3
+
+all: $(PROGRAM)
+
+$(PROGRAM): $(PROGRAM).o deque.o
+ gcc $(PROGRAM).o deque.o -lpthread -o $(PROGRAM)
+
+deque.o: $(DQDIR)/deque.h $(DQDIR)/deque.c
+ gcc -c $(FLAGS) -I$(DQDIR) $(DEFS) $(DQDIR)/deque.c -o deque.o
+
+$(PROGRAM).o: $(PROGRAM).c $(DQDIR)/deque.h
+ gcc -c $(FLAGS) -I$(DQDIR) $(DEFS) $(PROGRAM).c -o $(PROGRAM).o
+
+clean:
+ rm -f $(PROGRAM)
+ rm -f *.o
+ rm -f *~
--- /dev/null
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <pthread.h>
+#include "deque.h"
+
+
+#define numThreads 24
+#define numCycles 1000
+#define retries 100000
+
+
+// global array of work-stealing deques
+// each thread knows its own index
+deque* deques;
+
+
+
+// for artificially keeping threads busy
+int spin( int n ) {
+ long x = 0;
+ int i, j;
+ //for( i = 0; i < n; ++i ) {
+ // for( j = 0; j < numCycles; ++j ) {
+ // x = (x * x) + (x + x) - (x * 1 * x * 2) + i;
+ // }
+ // }
+ return x;
+}
+
+
+
+void* workerMain( void* arg ) {
+ INTPTR i = (INTPTR)arg;
+
+ INTPTR w = 0;
+ int r = retries;
+
+ deque* dq = &(deques[i]);
+
+ srand( i * 1777 );
+
+ int j;
+ for( j = 0; j < i; ++j ) {
+ int* one = malloc( sizeof( int ) );
+ *one = 1;
+ dqPushBottom( dq, one );
+ spin( i );
+ }
+
+ while( r > 0 ) {
+ void* num = dqPopBottom( dq );
+
+ if( num == DQ_POP_ABORT ) {
+ // another op is in progress, try again
+ continue;
+
+ } else if( num == DQ_POP_EMPTY ) {
+
+ // IF YOU INSERT THIS (NEVER STEAL) THE AMOUNT
+ // OF WORK COMES OUT RIGHT?!?!?!
+ //pthread_exit( (void*)w );
+
+ // no work here, steal!
+ int v = rand() % numThreads;
+ num = dqPopTop( &(deques[v]) );
+
+ if( num == DQ_POP_ABORT ) {
+ // another op in progress, try again later
+ continue;
+
+ } else if( num == DQ_POP_EMPTY ) {
+ // lose a retry
+ r--;
+ continue;
+
+ } else {
+ // STOLE WORK!
+ w += *((int*)num);
+ spin( w / (i+1) );
+ continue;
+ }
+
+ } else {
+ // grabbed work
+ w += *((int*)num);
+ spin( w / (i+1) );
+ continue;
+ }
+ }
+ printf( "I'm %d and I did %d many.\n", i, w );
+ pthread_exit( (void*)w );
+}
+
+
+int main() {
+ INTPTR i;
+ int j;
+
+ pthread_t threads[numThreads];
+ pthread_attr_t attr;
+
+ long total = 0;
+
+
+ pthread_attr_init( &attr );
+ pthread_attr_setdetachstate( &attr,
+ PTHREAD_CREATE_JOINABLE );
+
+ deques = malloc( sizeof( deque )*numThreads );
+
+ for( i = 0; i < numThreads; ++i ) {
+ dqInit( &(deques[i]) );
+ }
+
+ for( i = 0; i < numThreads; ++i ) {
+ pthread_create( &(threads[i]),
+ &attr,
+ workerMain,
+ (void*)i );
+ printf( "." );
+ }
+
+ printf( "\n" );
+
+ for( i = 0; i < numThreads; ++i ) {
+ long x;
+ pthread_join( threads[i],
+ (void*)&x );
+ total += x;
+ printf( "+" );
+ }
+
+ printf( "\nTotal (expect 300)=%d\n", total+24 );
+ return 0;
+}