1 ////////////////////////////////////////////////////////////////
3 // This is an implementation of the structure described in
4 // A Dynamic-Sized Nonblocking Work Stealing Deque
5 // Hendler, Lev, Moir, and Shavit
7 // The bottom and top values for the deque must be CAS-able
8 // and fit into 64 bits. Our strategy for this is:
10 // 19-bit Tag 36-bit Node Pointer 9-bit Index
11 // +-----------+-------------------------+------------+
12 // | 63 ... 45 | 44 ... 9 | 8 ... 0 |
13 // +-----------+-------------------------+------------+
15 // Let's call the encoded info E. To retrieve the values:
16 // tag = (0xffffe00000000000 & E) >> 45;
17 // ptr = (0x00001ffffffffe00 & E) << 3;
18 // idx = (0x00000000000001ff & E);
20 // Increment the tag without decrypting:
21 // E = (0x00001fffffffffff | E) + 1;
23 // Increment (decrement) the index when it is not equal to
24 // MAXINDEX (0) with E++ (E--).
26 // x86 64-bit processors currently only use the lowest 48 bits for
27 // virtual addresses, source:
28 // http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 // And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 // of a 64-bit pointer are always zero. This means if we are only
31 // alloted 36 bits to store a pointer to a Node we have
32 // 48 - 3 - 36 = 9 bits that could be lost. Instead of aligning Node
33 // pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 // sure the lower 12 bits of the address are zero. THEREFORE:
35 // Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 // we can ecnode the rest in 36 bits without a loss of information.
38 ////////////////////////////////////////////////////////////////
48 void* DQ_POP_EMPTY = (void*)0x1;
49 void* DQ_POP_ABORT = (void*)0x3;
52 // define a 19-bit dummy tag for the bottom
53 // value with a pattern that will expose errors
54 #define BOTTOM_NULL_TAG 0x40001
57 // there are 9 bits for the index into a Node's array,
58 // so 2^9 = 512 elements per node of the deque
59 #define DQNODE_ARRAYSIZE 512
62 typedef struct dequeNode_t {
63 void* itsDataArr[DQNODE_ARRAYSIZE];
64 struct dequeNode_t* next;
65 struct dequeNode_t* prev;
69 // the dequeNode struct must be 4096-byte aligned,
70 // see above, so use the following magic to ask
71 // the allocator for a space that wastes 4095 bytes
72 // but gaurantees the address of the struct within
73 // that space is 4096-aligned
74 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
76 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) {
77 return (dequeNode*) ( ((INTPTR)fromAllocator) & (~4095) );
82 static inline int dqDecodeTag( INTPTR E ) { return (int) ((0xffffe00000000000 & E) >> 45); }
83 static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) << 3); }
84 static inline int dqDecodeIdx( INTPTR E ) { return (int) ((0x00000000000001ff & E) ); }
88 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
89 INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
90 (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
93 (((INTPTR)tag) << 45) |
97 int tagOut = dqDecodeTag( E );
98 if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
100 dequeNode* ptrOut = dqDecodePtr( E );
101 if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
103 int idxOut = dqDecodeIdx( E );
104 if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
110 static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
111 dequeNode* botNode = dqDecodePtr( bottom );
112 int botIndx = dqDecodeIdx( bottom );
113 dequeNode* topNode = dqDecodePtr( top );
114 int topIndx = dqDecodeIdx( top );
116 if( (botNode == topNode) &&
117 (botIndx == topIndx || botIndx == (topIndx+1))
122 if( (botNode == topNode->next) &&
124 (topIndx == DQNODE_ARRAYSIZE - 1)
134 void dqInit( deque* dq ) {
136 dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
138 dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
139 dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
144 dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
145 dq->top = dqEncode( 0, a, DQNODE_ARRAYSIZE - 1 );
149 void dqPushBottom( deque* dq, void* item ) {
151 dequeNode* currNode = dqDecodePtr( dq->bottom );
152 int currIndx = dqDecodeIdx( dq->bottom );
154 currNode->itsDataArr[currIndx] = item;
159 if( currIndx != 0 ) {
161 newIndx = currIndx - 1;
164 newNode = dqGet4096aligned( poolalloc( dq->memPool ) );
165 newNode->next = currNode;
166 currNode->prev = newNode;
167 newIndx = DQNODE_ARRAYSIZE - 1;
170 dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
174 void* dqPopTop( deque* dq ) {
176 INTPTR currTop = dq->top;
178 int currTopTag = dqDecodeTag( currTop );
179 dequeNode* currTopNode = dqDecodePtr( currTop );
180 int currTopIndx = dqDecodeIdx( currTop );
182 INTPTR currBottom = dq->bottom;
184 if( dqIndicateEmpty( currBottom, currTop ) ) {
185 if( currTop == dq->top ) {
192 dequeNode* nodeToFree;
194 dequeNode* newTopNode;
197 if( currTopIndx != 0 ) {
199 newTopTag = currTopTag;
200 newTopNode = currTopNode;
201 newTopIndx = currTopIndx - 1;
204 nodeToFree = currTopNode->next;
205 newTopTag = currTopTag + 1;
206 newTopNode = currTopNode->prev;
207 newTopIndx = DQNODE_ARRAYSIZE - 1;
210 void* retVal = currTopNode->itsDataArr[currTopIndx];
212 INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
214 INTPTR actualTop = (INTPTR)
215 CAS( &(dq->top), // location
216 currTop, // expected value
217 newTop ); // desired value
219 if( actualTop == currTop ) {
221 if( nodeToFree != NULL ) {
222 poolfreeinto( dq->memPool, nodeToFree );
232 void* dqPopBottom ( deque* dq ) {
234 INTPTR oldBot = dq->bottom;
236 dequeNode* oldBotNode = dqDecodePtr( oldBot );
237 int oldBotIndx = dqDecodeIdx( oldBot );
239 dequeNode* newBotNode;
242 if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
243 newBotNode = oldBotNode;
244 newBotIndx = oldBotIndx + 1;
247 newBotNode = oldBotNode->next;
251 void* retVal = newBotNode->itsDataArr[newBotIndx];
253 dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
255 INTPTR currTop = dq->top;
257 int currTopTag = dqDecodeTag( currTop );
258 dequeNode* currTopNode = dqDecodePtr( currTop );
259 int currTopIndx = dqDecodeIdx( currTop );
261 if( oldBotNode == currTopNode &&
262 oldBotIndx == currTopIndx ) {
263 dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
266 } else if( newBotNode == currTopNode &&
267 newBotIndx == currTopIndx ) {
268 INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
270 INTPTR actualTop = (INTPTR)
271 CAS( &(dq->top), // location
272 currTop, // expected value
273 newTop ); // desired value
275 if( actualTop == currTop ) {
277 if( oldBotNode != newBotNode ) {
278 poolfreeinto( dq->memPool, oldBotNode );
283 dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
288 if( oldBotNode != newBotNode ) {
289 poolfreeinto( dq->memPool, oldBotNode );