1 ////////////////////////////////////////////////////////////////
3 // This is an implementation of the structure described in
4 // A Dynamic-Sized Nonblocking Work Stealing Deque
5 // Hendler, Lev, Moir, and Shavit
7 // The bottom and top values for the deque must be CAS-able
8 // and fit into 64 bits. Our strategy for this is:
10 // 19-bit Tag 36-bit Node Pointer 9-bit Index
11 // +-----------+-------------------------+------------+
12 // | 63 ... 45 | 44 ... 9 | 8 ... 0 |
13 // +-----------+-------------------------+------------+
15 // Let's call the encoded info E. To retrieve the values:
16 // tag = (0xffffe00000000000 & E) >> 45;
17 // ptr = (0x00001ffffffffe00 & E) << 3;
18 // idx = (0x00000000000001ff & E);
20 // Increment the tag without decrypting:
21 // E = (0x00001fffffffffff | E) + 1;
23 // Increment (decrement) the index when it is not equal to
24 // MAXINDEX (0) with E++ (E--).
26 // x86 64-bit processors currently only use the lowest 48 bits for
27 // virtual addresses, source:
28 // http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 // And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 // of a 64-bit pointer are always zero. This means if we are only
31 // alloted 36 bits to store a pointer to a Node we have
32 // 48 - 3 - 36 = 9 bits that could be lost. Instead of aligning Node
33 // pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 // sure the lower 12 bits of the address are zero. THEREFORE:
35 // Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 // we can ecnode the rest in 36 bits without a loss of information.
38 ////////////////////////////////////////////////////////////////
49 void* DQ_POP_EMPTY = (void*)0x17;
50 void* DQ_POP_ABORT = (void*)0x33;
53 // define a 19-bit dummy tag for the bottom
54 // value with a pattern that will expose errors
55 #define BOTTOM_NULL_TAG 0x40001
59 // the dequeNode struct must be 4096-byte aligned,
60 // see above, so use the following magic to ask
61 // the allocator for a space that wastes 4095 bytes
62 // but gaurantees the address of the struct within
63 // that space is 4096-aligned
64 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
66 static inline dequeNode* dqGet4096aligned(void* fromAllocator) {
67 INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
70 //printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
71 //printf( "aligned: 0x%08x to 0x%08x\n", aligned, aligned + sizeof( dequeNode ) );
72 memset( (void*) aligned, 0x9, sizeof( dequeNode ) );
75 return (dequeNode*) aligned;
79 static inline INTPTR dqEncode(int tag, dequeNode* ptr, int idx) {
80 INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
81 (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
84 (((INTPTR)tag) << 45) |
88 int tagOut = dqDecodeTag(E);
90 printf("Lost tag information.\n"); exit(-1);
93 dequeNode* ptrOut = dqDecodePtr(E);
95 printf("Lost ptr information.\n"); exit(-1);
98 int idxOut = dqDecodeIdx(E);
100 printf("Lost idx information.\n"); exit(-1);
107 static inline int dqIndicateEmpty(INTPTR bottom, INTPTR top) {
108 dequeNode* botNode = dqDecodePtr(bottom);
109 int botIndx = dqDecodeIdx(bottom);
110 dequeNode* topNode = dqDecodePtr(top);
111 int topIndx = dqDecodeIdx(top);
113 if( (botNode == topNode) &&
114 (botIndx == topIndx || botIndx == (topIndx+1))
119 if( (botNode == topNode->next) &&
121 (topIndx == DQNODE_ARRAYSIZE - 1)
131 void dqInit(deque* dq) {
133 dq->memPool = poolcreate(DQNODE_SIZETOREQUEST, NULL);
135 dequeNode* a = dqGet4096aligned(poolalloc(dq->memPool) );
136 dequeNode* b = dqGet4096aligned(poolalloc(dq->memPool) );
141 dq->bottom = dqEncode(BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1);
142 dq->top = dqEncode(0, a, DQNODE_ARRAYSIZE - 1);
146 void dqPushBottom(deque* dq, void* item) {
150 printf("Pushing invalid work into the deque.\n");
154 dequeNode* currNode = dqDecodePtr(dq->bottom);
155 int currIndx = dqDecodeIdx(dq->bottom);
157 currNode->itsDataArr[currIndx] = item;
162 if( currIndx != 0 ) {
164 newIndx = currIndx - 1;
167 newNode = dqGet4096aligned(poolalloc(dq->memPool) );
168 newNode->next = currNode;
169 currNode->prev = newNode;
170 newIndx = DQNODE_ARRAYSIZE - 1;
173 dq->bottom = dqEncode(BOTTOM_NULL_TAG, newNode, newIndx);
177 void* dqPopTop(deque* dq) {
179 INTPTR currTop = dq->top;
181 int currTopTag = dqDecodeTag(currTop);
182 dequeNode* currTopNode = dqDecodePtr(currTop);
183 int currTopIndx = dqDecodeIdx(currTop);
186 // read of top followed by read of bottom, algorithm
187 // says specifically must be in this order
190 INTPTR currBottom = dq->bottom;
192 if( dqIndicateEmpty(currBottom, currTop) ) {
193 if( currTop == dq->top ) {
200 dequeNode* nodeToFree;
202 dequeNode* newTopNode;
205 if( currTopIndx != 0 ) {
207 newTopTag = currTopTag;
208 newTopNode = currTopNode;
209 newTopIndx = currTopIndx - 1;
212 nodeToFree = currTopNode->next;
213 newTopTag = currTopTag + 1;
214 newTopNode = currTopNode->prev;
215 newTopIndx = DQNODE_ARRAYSIZE - 1;
218 void* retVal = currTopNode->itsDataArr[currTopIndx];
220 INTPTR newTop = dqEncode(newTopTag, newTopNode, newTopIndx);
222 // algorithm states above should happen
223 // before attempting the CAS
226 INTPTR actualTop = (INTPTR)
227 CAS(&(dq->top), // location
228 currTop, // expected value
229 newTop); // desired value
231 if( actualTop == currTop ) {
233 if( nodeToFree != NULL ) {
234 poolfreeinto(dq->memPool, nodeToFree);
244 void* dqPopBottom(deque* dq) {
246 INTPTR oldBot = dq->bottom;
248 dequeNode* oldBotNode = dqDecodePtr(oldBot);
249 int oldBotIndx = dqDecodeIdx(oldBot);
251 dequeNode* newBotNode;
254 if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
255 newBotNode = oldBotNode;
256 newBotIndx = oldBotIndx + 1;
259 newBotNode = oldBotNode->next;
263 void* retVal = newBotNode->itsDataArr[newBotIndx];
265 dq->bottom = dqEncode(BOTTOM_NULL_TAG, newBotNode, newBotIndx);
267 // algorithm states above should happen
268 // before attempting the CAS
271 INTPTR currTop = dq->top;
273 int currTopTag = dqDecodeTag(currTop);
274 dequeNode* currTopNode = dqDecodePtr(currTop);
275 int currTopIndx = dqDecodeIdx(currTop);
277 if( oldBotNode == currTopNode &&
278 oldBotIndx == currTopIndx ) {
279 dq->bottom = dqEncode(BOTTOM_NULL_TAG, oldBotNode, oldBotIndx);
282 } else if( newBotNode == currTopNode &&
283 newBotIndx == currTopIndx ) {
284 INTPTR newTop = dqEncode(currTopTag + 1, currTopNode, currTopIndx);
286 INTPTR actualTop = (INTPTR)
287 CAS(&(dq->top), // location
288 currTop, // expected value
289 newTop); // desired value
291 if( actualTop == currTop ) {
293 if( oldBotNode != newBotNode ) {
294 poolfreeinto(dq->memPool, oldBotNode);
299 dq->bottom = dqEncode(BOTTOM_NULL_TAG, oldBotNode, oldBotIndx);
304 if( oldBotNode != newBotNode ) {
305 poolfreeinto(dq->memPool, oldBotNode);