1 ////////////////////////////////////////////////////////////////
3 // This is an implementation of the structure described in
4 // A Dynamic-Sized Nonblocking Work Stealing Deque
5 // Hendler, Lev, Moir, and Shavit
7 // The bottom and top values for the deque must be CAS-able
8 // and fit into 64 bits. Our strategy for this is:
10 // 19-bit Tag 36-bit Node Pointer 9-bit Index
11 // +-----------+-------------------------+------------+
12 // | 63 ... 45 | 44 ... 9 | 8 ... 0 |
13 // +-----------+-------------------------+------------+
15 // Let's call the encoded info E. To retrieve the values:
16 // tag = (0xffffe00000000000 & E) >> 45;
17 // ptr = (0x00001ffffffffe00 & E) << 3;
18 // idx = (0x00000000000001ff & E);
20 // Increment the tag without decrypting:
21 // E = (0x00001fffffffffff | E) + 1;
23 // Increment (decrement) the index when it is not equal to
24 // MAXINDEX (0) with E++ (E--).
26 // x86 64-bit processors currently only use the lowest 48 bits for
27 // virtual addresses, source:
28 // http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 // And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 // of a 64-bit pointer are always zero. This means if we are only
31 // alloted 36 bits to store a pointer to a Node we have
32 // 48 - 3 - 36 = 9 bits that could be lost. Instead of aligning Node
33 // pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 // sure the lower 12 bits of the address are zero. THEREFORE:
35 // Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 // we can ecnode the rest in 36 bits without a loss of information.
38 ////////////////////////////////////////////////////////////////
49 void* DQ_POP_EMPTY = (void*)0x1;
50 void* DQ_POP_ABORT = (void*)0x3;
53 // define a 19-bit dummy tag for the bottom
54 // value with a pattern that will expose errors
55 #define BOTTOM_NULL_TAG 0x40001
59 // the dequeNode struct must be 4096-byte aligned,
60 // see above, so use the following magic to ask
61 // the allocator for a space that wastes 4095 bytes
62 // but gaurantees the address of the struct within
63 // that space is 4096-aligned
64 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
66 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) {
67 INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
69 printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
70 printf( "aligned: 0x%08x to 0x%08x\n", aligned, aligned + sizeof( dequeNode ) );
71 memset( (void*) aligned, 0, sizeof( dequeNode ) );
73 return (dequeNode*) aligned;
77 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
78 INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
79 (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
82 (((INTPTR)tag) << 45) |
86 int tagOut = dqDecodeTag( E );
87 if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
89 dequeNode* ptrOut = dqDecodePtr( E );
90 if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
92 int idxOut = dqDecodeIdx( E );
93 if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
99 static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
100 dequeNode* botNode = dqDecodePtr( bottom );
101 int botIndx = dqDecodeIdx( bottom );
102 dequeNode* topNode = dqDecodePtr( top );
103 int topIndx = dqDecodeIdx( top );
105 if( (botNode == topNode) &&
106 (botIndx == topIndx || botIndx == (topIndx+1))
111 if( (botNode == topNode->next) &&
113 (topIndx == DQNODE_ARRAYSIZE - 1)
123 void dqInit( deque* dq ) {
125 dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
127 dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
128 dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
133 dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
134 dq->top = dqEncode( 0, a, DQNODE_ARRAYSIZE - 1 );
138 void dqPushBottom( deque* dq, void* item ) {
140 dequeNode* currNode = dqDecodePtr( dq->bottom );
141 int currIndx = dqDecodeIdx( dq->bottom );
143 currNode->itsDataArr[currIndx] = item;
148 if( currIndx != 0 ) {
150 newIndx = currIndx - 1;
153 newNode = dqGet4096aligned( poolalloc( dq->memPool ) );
154 newNode->next = currNode;
155 currNode->prev = newNode;
156 newIndx = DQNODE_ARRAYSIZE - 1;
159 dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
163 void* dqPopTop( deque* dq ) {
165 INTPTR currTop = dq->top;
167 int currTopTag = dqDecodeTag( currTop );
168 dequeNode* currTopNode = dqDecodePtr( currTop );
169 int currTopIndx = dqDecodeIdx( currTop );
171 // read of top followed by read of bottom, algorithm
172 // says specifically must be in this order
175 INTPTR currBottom = dq->bottom;
177 if( dqIndicateEmpty( currBottom, currTop ) ) {
178 if( currTop == dq->top ) {
185 dequeNode* nodeToFree;
187 dequeNode* newTopNode;
190 if( currTopIndx != 0 ) {
192 newTopTag = currTopTag;
193 newTopNode = currTopNode;
194 newTopIndx = currTopIndx - 1;
197 nodeToFree = currTopNode->next;
198 newTopTag = currTopTag + 1;
199 newTopNode = currTopNode->prev;
200 newTopIndx = DQNODE_ARRAYSIZE - 1;
203 void* retVal = currTopNode->itsDataArr[currTopIndx];
205 INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
207 // algorithm states above should happen
208 // before attempting the CAS
211 INTPTR actualTop = (INTPTR)
212 CAS( &(dq->top), // location
213 currTop, // expected value
214 newTop ); // desired value
216 if( actualTop == currTop ) {
218 if( nodeToFree != NULL ) {
219 poolfreeinto( dq->memPool, nodeToFree );
229 void* dqPopBottom ( deque* dq ) {
231 INTPTR oldBot = dq->bottom;
233 dequeNode* oldBotNode = dqDecodePtr( oldBot );
234 int oldBotIndx = dqDecodeIdx( oldBot );
236 dequeNode* newBotNode;
239 if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
240 newBotNode = oldBotNode;
241 newBotIndx = oldBotIndx + 1;
244 newBotNode = oldBotNode->next;
248 void* retVal = newBotNode->itsDataArr[newBotIndx];
250 dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
252 // algorithm states above should happen
253 // before attempting the CAS
256 INTPTR currTop = dq->top;
258 int currTopTag = dqDecodeTag( currTop );
259 dequeNode* currTopNode = dqDecodePtr( currTop );
260 int currTopIndx = dqDecodeIdx( currTop );
262 if( oldBotNode == currTopNode &&
263 oldBotIndx == currTopIndx ) {
264 dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
267 } else if( newBotNode == currTopNode &&
268 newBotIndx == currTopIndx ) {
269 INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
271 INTPTR actualTop = (INTPTR)
272 CAS( &(dq->top), // location
273 currTop, // expected value
274 newTop ); // desired value
276 if( actualTop == currTop ) {
278 if( oldBotNode != newBotNode ) {
279 poolfreeinto( dq->memPool, oldBotNode );
284 dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
289 if( oldBotNode != newBotNode ) {
290 poolfreeinto( dq->memPool, oldBotNode );