a347c01aa507f9c0ae8dea782ab17234b5bad3a9
[IRC.git] / Robust / src / Runtime / deque.c
1 ////////////////////////////////////////////////////////////////
2 //
3 //  This is an implementation of the structure described in
4 //  A Dynamic-Sized Nonblocking Work Stealing Deque
5 //  Hendler, Lev, Moir, and Shavit
6 //   
7 //  The bottom and top values for the deque must be CAS-able
8 //  and fit into 64 bits.  Our strategy for this is:
9 //  
10 //    19-bit Tag    36-bit Node Pointer     9-bit Index
11 //   +-----------+-------------------------+------------+
12 //   | 63 ... 45 | 44 ...                9 | 8 ...    0 |
13 //   +-----------+-------------------------+------------+
14 //
15 //  Let's call the encoded info E.  To retrieve the values:  
16 //    tag = (0xffffe00000000000 & E) >> 45;
17 //    ptr = (0x00001ffffffffe00 & E) <<  3;
18 //    idx = (0x00000000000001ff & E);
19 //
20 //  Increment the tag without decrypting:
21 //    E = (0x00001fffffffffff | E) + 1;
22 //
23 //  Increment (decrement) the index when it is not equal to
24 //  MAXINDEX (0) with E++ (E--).
25 //
26 //  x86 64-bit processors currently only use the lowest 48 bits for
27 //  virtual addresses, source:
28 //  http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 //  And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 //  of a 64-bit pointer are always zero.  This means if we are only
31 //  alloted 36 bits to store a pointer to a Node we have 
32 //  48 - 3 - 36 = 9 bits that could be lost.  Instead of aligning Node
33 //  pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 //  sure the lower 12 bits of the address are zero.  THEREFORE:
35 //  Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 //  we can ecnode the rest in 36 bits without a loss of information.  
37 //
38 ////////////////////////////////////////////////////////////////
39
40 #ifdef DEBUG_DEQUE
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #endif
45
46 #include "deque.h"
47
48
49 void* DQ_POP_EMPTY = (void*)0x1;
50 void* DQ_POP_ABORT = (void*)0x3;
51
52
53 // define a 19-bit dummy tag for the bottom
54 // value with a pattern that will expose errors
55 #define BOTTOM_NULL_TAG 0x40001
56
57
58
59 // the dequeNode struct must be 4096-byte aligned, 
60 // see above, so use the following magic to ask
61 // the allocator for a space that wastes 4095 bytes
62 // but gaurantees the address of the struct within
63 // that space is 4096-aligned
64 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
65
66 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) { 
67   INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
68
69 #ifdef DEBUG_DEQUE
70   //printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
71   //printf( "aligned:        0x%08x to 0x%08x\n", aligned,               aligned               + sizeof( dequeNode )  );
72   //memset( (void*) aligned, 0, sizeof( dequeNode ) );
73 #endif
74
75   return (dequeNode*) aligned;
76 }
77
78
79 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
80   INTPTR ptrE = (0x00001ffffffffe00 &  // second, mask off the addr's high-order 1's
81                  (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
82
83   INTPTR E =
84     (((INTPTR)tag) << 45) |
85     (ptrE)                |
86     ((INTPTR)idx);
87 #ifdef DEBUG_DEQUE
88   int tagOut = dqDecodeTag( E ); 
89   if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
90
91   dequeNode* ptrOut = dqDecodePtr( E );
92   if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
93
94   int idxOut = dqDecodeIdx( E );
95   if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
96 #endif
97   return E;
98 }
99
100
101 static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
102   dequeNode* botNode = dqDecodePtr( bottom );
103   int        botIndx = dqDecodeIdx( bottom );
104   dequeNode* topNode = dqDecodePtr( top );
105   int        topIndx = dqDecodeIdx( top );  
106
107   if( (botNode == topNode) &&
108       (botIndx == topIndx || botIndx == (topIndx+1))
109       ) {
110     return 1;
111   }
112
113   if( (botNode == topNode->next) &&
114       (botIndx == 0)             &&
115       (topIndx == DQNODE_ARRAYSIZE - 1)
116       ) {
117     return 1;
118   }
119
120   return 0;
121 }
122
123
124
125 void dqInit( deque* dq ) {
126
127   dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
128
129   dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
130   dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
131   
132   a->next = b;
133   b->prev = a;
134
135   dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
136   dq->top    = dqEncode( 0,               a, DQNODE_ARRAYSIZE - 1 );
137 }
138
139
140 void dqPushBottom( deque* dq, void* item ) {
141
142 #ifdef DEBUG_DEQUE
143   if( item == 0x0 ) {
144     printf( "Pushing invalid work into the deque.\n" );
145   }
146 #endif
147
148   dequeNode* currNode = dqDecodePtr( dq->bottom );
149   int        currIndx = dqDecodeIdx( dq->bottom );
150
151   currNode->itsDataArr[currIndx] = item;
152
153   dequeNode* newNode;
154   int        newIndx;
155
156   if( currIndx != 0 ) {
157     newNode = currNode;
158     newIndx = currIndx - 1;
159
160   } else {
161     newNode        = dqGet4096aligned( poolalloc( dq->memPool ) );
162     newNode->next  = currNode;
163     currNode->prev = newNode;
164     newIndx        = DQNODE_ARRAYSIZE - 1;
165   }
166
167   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
168 }
169
170
171 void* dqPopTop( deque* dq ) {
172
173   INTPTR currTop = dq->top;
174
175   int        currTopTag  = dqDecodeTag( currTop );
176   dequeNode* currTopNode = dqDecodePtr( currTop );
177   int        currTopIndx = dqDecodeIdx( currTop );
178
179   // read of top followed by read of bottom, algorithm
180   // says specifically must be in this order
181   BARRIER();
182   
183   INTPTR currBottom = dq->bottom;
184
185   if( dqIndicateEmpty( currBottom, currTop ) ) {
186     if( currTop == dq->top ) {
187       return DQ_POP_EMPTY;
188     } else {
189       return DQ_POP_ABORT;
190     }
191   }
192
193   dequeNode* nodeToFree;
194   int        newTopTag;
195   dequeNode* newTopNode;
196   int        newTopIndx;
197
198   if( currTopIndx != 0 ) {
199     nodeToFree = NULL;
200     newTopTag  = currTopTag;
201     newTopNode = currTopNode;
202     newTopIndx = currTopIndx - 1;
203
204   } else {
205     nodeToFree = currTopNode->next;
206     newTopTag  = currTopTag + 1;
207     newTopNode = currTopNode->prev;
208     newTopIndx = DQNODE_ARRAYSIZE - 1;
209   }
210
211   void* retVal = currTopNode->itsDataArr[currTopIndx];
212
213   INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
214
215   // algorithm states above should happen
216   // before attempting the CAS
217   BARRIER();
218
219   INTPTR actualTop = (INTPTR)
220     CAS( &(dq->top), // location
221          currTop,    // expected value
222          newTop );   // desired value
223
224   if( actualTop == currTop ) {
225     // CAS succeeded
226     if( nodeToFree != NULL ) {
227       poolfreeinto( dq->memPool, nodeToFree );
228     }
229     return retVal;
230
231   } else {
232     return DQ_POP_ABORT;
233   }
234 }
235
236
237 void* dqPopBottom ( deque* dq ) {
238
239   INTPTR oldBot = dq->bottom;
240
241   dequeNode* oldBotNode = dqDecodePtr( oldBot );
242   int        oldBotIndx = dqDecodeIdx( oldBot );
243   
244   dequeNode* newBotNode;
245   int        newBotIndx;
246
247   if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
248     newBotNode = oldBotNode;
249     newBotIndx = oldBotIndx + 1;
250
251   } else {
252     newBotNode = oldBotNode->next;
253     newBotIndx = 0;
254   }
255
256   void* retVal = newBotNode->itsDataArr[newBotIndx];
257
258   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
259
260   // algorithm states above should happen
261   // before attempting the CAS
262   BARRIER();
263
264   INTPTR currTop = dq->top;
265
266   int        currTopTag  = dqDecodeTag( currTop );
267   dequeNode* currTopNode = dqDecodePtr( currTop );
268   int        currTopIndx = dqDecodeIdx( currTop );
269
270   if( oldBotNode == currTopNode &&
271       oldBotIndx == currTopIndx ) {
272     dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
273     return DQ_POP_EMPTY;
274
275   } else if( newBotNode == currTopNode &&
276              newBotIndx == currTopIndx ) {
277     INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
278
279     INTPTR actualTop = (INTPTR)
280       CAS( &(dq->top), // location
281            currTop,    // expected value
282            newTop );   // desired value
283
284     if( actualTop == currTop ) {
285       // CAS succeeded
286       if( oldBotNode != newBotNode ) {
287         poolfreeinto( dq->memPool, oldBotNode );
288       }
289       return retVal;
290       
291     } else {
292       dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );      
293       return DQ_POP_EMPTY;
294     }
295     
296   } else {
297     if( oldBotNode != newBotNode ) {
298       poolfreeinto( dq->memPool, oldBotNode );
299     }
300     return retVal;    
301   }
302 }