bug fixes
[IRC.git] / Robust / src / Runtime / deque.c
1 ////////////////////////////////////////////////////////////////
2 //
3 //  This is an implementation of the structure described in
4 //  A Dynamic-Sized Nonblocking Work Stealing Deque
5 //  Hendler, Lev, Moir, and Shavit
6 //   
7 //  The bottom and top values for the deque must be CAS-able
8 //  and fit into 64 bits.  Our strategy for this is:
9 //  
10 //    19-bit Tag    36-bit Node Pointer     9-bit Index
11 //   +-----------+-------------------------+------------+
12 //   | 63 ... 45 | 44 ...                9 | 8 ...    0 |
13 //   +-----------+-------------------------+------------+
14 //
15 //  Let's call the encoded info E.  To retrieve the values:  
16 //    tag = (0xffffe00000000000 & E) >> 45;
17 //    ptr = (0x00001ffffffffe00 & E) <<  3;
18 //    idx = (0x00000000000001ff & E);
19 //
20 //  Increment the tag without decrypting:
21 //    E = (0x00001fffffffffff | E) + 1;
22 //
23 //  Increment (decrement) the index when it is not equal to
24 //  MAXINDEX (0) with E++ (E--).
25 //
26 //  x86 64-bit processors currently only use the lowest 48 bits for
27 //  virtual addresses, source:
28 //  http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 //  And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 //  of a 64-bit pointer are always zero.  This means if we are only
31 //  alloted 36 bits to store a pointer to a Node we have 
32 //  48 - 3 - 36 = 9 bits that could be lost.  Instead of aligning Node
33 //  pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 //  sure the lower 12 bits of the address are zero.  THEREFORE:
35 //  Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 //  we can ecnode the rest in 36 bits without a loss of information.  
37 //
38 ////////////////////////////////////////////////////////////////
39
40 #ifdef DEBUG_DEQUE
41 #include <stdlib.h>
42 #include <stdio.h>
43 #endif
44
45 #include "deque.h"
46
47
48 void* DQ_POP_EMPTY = (void*)0x1;
49 void* DQ_POP_ABORT = (void*)0x3;
50
51
52 // define a 19-bit dummy tag for the bottom
53 // value with a pattern that will expose errors
54 #define BOTTOM_NULL_TAG 0x40001
55
56
57 // there are 9 bits for the index into a Node's array,
58 // so 2^9 = 512 elements per node of the deque
59 #define DQNODE_ARRAYSIZE 512
60
61
62 typedef struct dequeNode_t {
63   void* itsDataArr[DQNODE_ARRAYSIZE];
64   struct dequeNode_t* next;
65   struct dequeNode_t* prev;
66 } dequeNode;
67
68
69 // the dequeNode struct must be 4096-byte aligned, 
70 // see above, so use the following magic to ask
71 // the allocator for a space that wastes 4095 bytes
72 // but gaurantees the address of the struct within
73 // that space is 4096-aligned
74 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
75
76 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) { 
77   return (dequeNode*) ( ((INTPTR)fromAllocator) & (~4095) );
78 }
79
80
81
82 static inline int        dqDecodeTag( INTPTR E ) { return (int)        ((0xffffe00000000000 & E) >> 45); }
83 static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) <<  3); }
84 static inline int        dqDecodeIdx( INTPTR E ) { return (int)        ((0x00000000000001ff & E)      ); }
85
86
87
88 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
89   INTPTR ptrE = (0x00001ffffffffe00 &  // second, mask off the addr's high-order 1's
90                  (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
91
92   INTPTR E =
93     (((INTPTR)tag) << 45) |
94     (ptrE)                |
95     ((INTPTR)idx);
96 #ifdef DEBUG_DEQUE
97   int tagOut = dqDecodeTag( E ); 
98   if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
99
100   dequeNode* ptrOut = dqDecodePtr( E );
101   if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
102
103   int idxOut = dqDecodeIdx( E );
104   if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
105 #endif
106   return E;
107 }
108
109
110 static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
111   dequeNode* botNode = dqDecodePtr( bottom );
112   int        botIndx = dqDecodeIdx( bottom );
113   dequeNode* topNode = dqDecodePtr( top );
114   int        topIndx = dqDecodeIdx( top );  
115
116   if( (botNode == topNode) &&
117       (botIndx == topIndx || botIndx == (topIndx+1))
118       ) {
119     return 1;
120   }
121
122   if( (botNode == topNode->next) &&
123       (botIndx == 0)             &&
124       (topIndx == DQNODE_ARRAYSIZE - 1)
125       ) {
126     return 1;
127   }
128
129   return 0;
130 }
131
132
133
134 void dqInit( deque* dq ) {
135
136   dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
137
138   dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
139   dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
140   
141   a->next = b;
142   b->prev = a;
143
144   dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
145   dq->top    = dqEncode( 0,               a, DQNODE_ARRAYSIZE - 1 );
146 }
147
148
149 void dqPushBottom( deque* dq, void* item ) {
150
151   dequeNode* currNode = dqDecodePtr( dq->bottom );
152   int        currIndx = dqDecodeIdx( dq->bottom );
153
154   currNode->itsDataArr[currIndx] = item;
155
156   dequeNode* newNode;
157   int        newIndx;
158
159   if( currIndx != 0 ) {
160     newNode = currNode;
161     newIndx = currIndx - 1;
162
163   } else {
164     newNode        = dqGet4096aligned( poolalloc( dq->memPool ) );
165     newNode->next  = currNode;
166     currNode->prev = newNode;
167     newIndx        = DQNODE_ARRAYSIZE - 1;
168   }
169
170   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
171 }
172
173
174 void* dqPopTop( deque* dq ) {
175
176   INTPTR currTop = dq->top;
177
178   int        currTopTag  = dqDecodeTag( currTop );
179   dequeNode* currTopNode = dqDecodePtr( currTop );
180   int        currTopIndx = dqDecodeIdx( currTop );
181   
182   INTPTR currBottom = dq->bottom;
183
184   if( dqIndicateEmpty( currBottom, currTop ) ) {
185     if( currTop == dq->top ) {
186       return DQ_POP_EMPTY;
187     } else {
188       return DQ_POP_ABORT;
189     }
190   }
191
192   dequeNode* nodeToFree;
193   int        newTopTag;
194   dequeNode* newTopNode;
195   int        newTopIndx;
196
197   if( currTopIndx != 0 ) {
198     nodeToFree = NULL;
199     newTopTag  = currTopTag;
200     newTopNode = currTopNode;
201     newTopIndx = currTopIndx - 1;
202
203   } else {
204     nodeToFree = currTopNode->next;
205     newTopTag  = currTopTag + 1;
206     newTopNode = currTopNode->prev;
207     newTopIndx = DQNODE_ARRAYSIZE - 1;
208   }
209
210   void* retVal = currTopNode->itsDataArr[currTopIndx];
211
212   INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
213
214   INTPTR actualTop = (INTPTR)
215     CAS( &(dq->top), // location
216          currTop,    // expected value
217          newTop );   // desired value
218
219   if( actualTop == currTop ) {
220     // CAS succeeded
221     if( nodeToFree != NULL ) {
222       poolfreeinto( dq->memPool, nodeToFree );
223     }
224     return retVal;
225
226   } else {
227     return DQ_POP_ABORT;
228   }
229 }
230
231
232 void* dqPopBottom ( deque* dq ) {
233
234   INTPTR oldBot = dq->bottom;
235
236   dequeNode* oldBotNode = dqDecodePtr( oldBot );
237   int        oldBotIndx = dqDecodeIdx( oldBot );
238   
239   dequeNode* newBotNode;
240   int        newBotIndx;
241
242   if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
243     newBotNode = oldBotNode;
244     newBotIndx = oldBotIndx + 1;
245
246   } else {
247     newBotNode = oldBotNode->next;
248     newBotIndx = 0;
249   }
250
251   void* retVal = newBotNode->itsDataArr[newBotIndx];
252
253   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
254
255   INTPTR currTop = dq->top;
256
257   int        currTopTag  = dqDecodeTag( currTop );
258   dequeNode* currTopNode = dqDecodePtr( currTop );
259   int        currTopIndx = dqDecodeIdx( currTop );
260
261   if( oldBotNode == currTopNode &&
262       oldBotIndx == currTopIndx ) {
263     dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
264     return DQ_POP_EMPTY;
265
266   } else if( newBotNode == currTopNode &&
267              newBotIndx == currTopIndx ) {
268     INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
269
270     INTPTR actualTop = (INTPTR)
271       CAS( &(dq->top), // location
272            currTop,    // expected value
273            newTop );   // desired value
274
275     if( actualTop == currTop ) {
276       // CAS succeeded
277       if( oldBotNode != newBotNode ) {
278         poolfreeinto( dq->memPool, oldBotNode );
279       }
280       return retVal;
281       
282     } else {
283       dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );      
284       return DQ_POP_EMPTY;
285     }
286     
287   } else {
288     if( oldBotNode != newBotNode ) {
289       poolfreeinto( dq->memPool, oldBotNode );
290     }
291     return retVal;    
292   }
293 }