1 ////////////////////////////////////////////////////////////////
3 // This is an implementation of the structure described in
4 // A Dynamic-Sized Nonblocking Work Stealing Deque
5 // Hendler, Lev, Moir, and Shavit
7 // The bottom and top values for the deque must be CAS-able
8 // and fit into 64 bits. Our strategy for this is:
10 // 19-bit Tag 36-bit Node Pointer 9-bit Index
11 // +-----------+-------------------------+------------+
12 // | 63 ... 45 | 44 ... 9 | 8 ... 0 |
13 // +-----------+-------------------------+------------+
15 // Let's call the encoded info E. To retrieve the values:
16 // tag = (0xffffe00000000000 & E) >> 45;
17 // ptr = (0x00001ffffffffe00 & E) << 3;
18 // idx = (0x00000000000001ff & E);
20 // Increment the tag without decrypting:
21 // E = (0x00001fffffffffff | E) + 1;
23 // Increment (decrement) the index when it is not equal to
24 // MAXINDEX (0) with E++ (E--).
26 // x86 64-bit processors currently only use the lowest 48 bits for
27 // virtual addresses, source:
28 // http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 // And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 // of a 64-bit pointer are always zero. This means if we are only
31 // alloted 36 bits to store a pointer to a Node we have
32 // 48 - 3 - 36 = 9 bits that could be lost. Instead of aligning Node
33 // pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 // sure the lower 12 bits of the address are zero. THEREFORE:
35 // Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 // we can ecnode the rest in 36 bits without a loss of information.
38 ////////////////////////////////////////////////////////////////
49 void* DQ_POP_EMPTY = (void*)0x17;
50 void* DQ_POP_ABORT = (void*)0x33;
53 // define a 19-bit dummy tag for the bottom
54 // value with a pattern that will expose errors
55 #define BOTTOM_NULL_TAG 0x40001
59 // the dequeNode struct must be 4096-byte aligned,
60 // see above, so use the following magic to ask
61 // the allocator for a space that wastes 4095 bytes
62 // but gaurantees the address of the struct within
63 // that space is 4096-aligned
64 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
66 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) {
67 INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
70 //printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
71 //printf( "aligned: 0x%08x to 0x%08x\n", aligned, aligned + sizeof( dequeNode ) );
72 memset( (void*) aligned, 0x9, sizeof( dequeNode ) );
75 return (dequeNode*) aligned;
79 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
80 INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
81 (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
84 (((INTPTR)tag) << 45) |
88 int tagOut = dqDecodeTag( E );
89 if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
91 dequeNode* ptrOut = dqDecodePtr( E );
92 if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
94 int idxOut = dqDecodeIdx( E );
95 if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
101 static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
102 dequeNode* botNode = dqDecodePtr( bottom );
103 int botIndx = dqDecodeIdx( bottom );
104 dequeNode* topNode = dqDecodePtr( top );
105 int topIndx = dqDecodeIdx( top );
107 if( (botNode == topNode) &&
108 (botIndx == topIndx || botIndx == (topIndx+1))
113 if( (botNode == topNode->next) &&
115 (topIndx == DQNODE_ARRAYSIZE - 1)
125 void dqInit( deque* dq ) {
127 dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
129 dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
130 dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
135 dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
136 dq->top = dqEncode( 0, a, DQNODE_ARRAYSIZE - 1 );
140 void dqPushBottom( deque* dq, void* item ) {
144 printf( "Pushing invalid work into the deque.\n" );
148 dequeNode* currNode = dqDecodePtr( dq->bottom );
149 int currIndx = dqDecodeIdx( dq->bottom );
151 currNode->itsDataArr[currIndx] = item;
156 if( currIndx != 0 ) {
158 newIndx = currIndx - 1;
161 newNode = dqGet4096aligned( poolalloc( dq->memPool ) );
162 newNode->next = currNode;
163 currNode->prev = newNode;
164 newIndx = DQNODE_ARRAYSIZE - 1;
167 dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
171 void* dqPopTop( deque* dq ) {
173 INTPTR currTop = dq->top;
175 int currTopTag = dqDecodeTag( currTop );
176 dequeNode* currTopNode = dqDecodePtr( currTop );
177 int currTopIndx = dqDecodeIdx( currTop );
180 // read of top followed by read of bottom, algorithm
181 // says specifically must be in this order
184 INTPTR currBottom = dq->bottom;
186 if( dqIndicateEmpty( currBottom, currTop ) ) {
187 if( currTop == dq->top ) {
194 dequeNode* nodeToFree;
196 dequeNode* newTopNode;
199 if( currTopIndx != 0 ) {
201 newTopTag = currTopTag;
202 newTopNode = currTopNode;
203 newTopIndx = currTopIndx - 1;
206 nodeToFree = currTopNode->next;
207 newTopTag = currTopTag + 1;
208 newTopNode = currTopNode->prev;
209 newTopIndx = DQNODE_ARRAYSIZE - 1;
212 void* retVal = currTopNode->itsDataArr[currTopIndx];
214 INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
216 // algorithm states above should happen
217 // before attempting the CAS
220 INTPTR actualTop = (INTPTR)
221 CAS( &(dq->top), // location
222 currTop, // expected value
223 newTop ); // desired value
225 if( actualTop == currTop ) {
227 if( nodeToFree != NULL ) {
228 poolfreeinto( dq->memPool, nodeToFree );
238 void* dqPopBottom ( deque* dq ) {
240 INTPTR oldBot = dq->bottom;
242 dequeNode* oldBotNode = dqDecodePtr( oldBot );
243 int oldBotIndx = dqDecodeIdx( oldBot );
245 dequeNode* newBotNode;
248 if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
249 newBotNode = oldBotNode;
250 newBotIndx = oldBotIndx + 1;
253 newBotNode = oldBotNode->next;
257 void* retVal = newBotNode->itsDataArr[newBotIndx];
259 dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
261 // algorithm states above should happen
262 // before attempting the CAS
265 INTPTR currTop = dq->top;
267 int currTopTag = dqDecodeTag( currTop );
268 dequeNode* currTopNode = dqDecodePtr( currTop );
269 int currTopIndx = dqDecodeIdx( currTop );
271 if( oldBotNode == currTopNode &&
272 oldBotIndx == currTopIndx ) {
273 dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
276 } else if( newBotNode == currTopNode &&
277 newBotIndx == currTopIndx ) {
278 INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
280 INTPTR actualTop = (INTPTR)
281 CAS( &(dq->top), // location
282 currTop, // expected value
283 newTop ); // desired value
285 if( actualTop == currTop ) {
287 if( oldBotNode != newBotNode ) {
288 poolfreeinto( dq->memPool, oldBotNode );
293 dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
298 if( oldBotNode != newBotNode ) {
299 poolfreeinto( dq->memPool, oldBotNode );