single ended queue work stealing scheduler
[IRC.git] / Robust / src / Runtime / squeue.h
1 #ifndef ___MYQUE_H__
2 #define ___MYQUE_H__
3
4 //////////////////////////////////////////////////////////
5 //
6 //  A memory pool implements POOLCREATE, POOLALLOC and 
7 //  POOLFREE to improve memory allocation by reusing records.
8 //
9 //  This implementation uses a lock-free singly-linked list
10 //  to store reusable records.  The list is initialized with
11 //  one valid record, and the list is considered empty when
12 //  it has only one record; this allows the enqueue operation's
13 //  CAS to assume tail can always be dereferenced.
14 //
15 //  poolfree adds newly freed records to the list BACK
16 //
17 //  poolalloc either takes records from FRONT or mallocs
18 //
19 //////////////////////////////////////////////////////////
20
21 #include <stdlib.h>
22 #include "runtime.h"
23 #include "mem.h"
24 #include "mlp_lock.h"
25 #include "memPool.h"
26
27 #define CACHELINESIZE 64
28 #define DQ_POP_EMPTY NULL
29 #define DQ_POP_ABORT NULL
30
31
32 typedef struct dequeItem_t {
33   void *otherqueue;
34   struct dequeItem_t * next;
35   volatile void *work;
36 } dequeItem;
37
38 typedef struct deque_t {
39   dequeItem* head;
40
41   // avoid cache line contention between producer/consumer...
42   char buffer[CACHELINESIZE - sizeof(void*)];
43
44   dequeItem* tail;
45   
46   MemPool objret;
47 } deque;
48
49 #define EXTRACTPTR(x) (x&0x0000ffffffffffff)
50 #define INCREMENTTAG     0x0001000000000000
51
52 // the memory pool must always have at least one
53 // item in it
54 static void dqInit(deque *q) {
55   q->head       = calloc( 1, sizeof(dequeItem) );
56   q->head->next = NULL;
57   q->tail       = q->head;
58   q->objret.itemSize=sizeof(dequeItem);
59   q->objret.head=calloc(1, sizeof(dequeItem));
60   q->objret.head->next=NULL;
61   q->objret.tail=q->objret.head;
62 }
63
64 static inline void tagpoolfreeinto( MemPool* p, void* ptr, void *realptr ) {
65   MemPoolItem* tailCurrent;
66   MemPoolItem* tailActual;
67   
68   // set up the now unneeded record to as the tail of the
69   // free list by treating its first bytes as next pointer,
70   MemPoolItem* tailNew = (MemPoolItem*) realptr;
71   tailNew->next = NULL;
72
73   while( 1 ) {
74     // make sure the null happens before the insertion,
75     // also makes sure that we reload tailCurrent, etc..
76     BARRIER();
77
78     tailCurrent = p->tail;
79     tailActual = (MemPoolItem*)
80       CAS( &(p->tail),         // ptr to set
81            (INTPTR) tailCurrent, // current tail's next should be NULL
82            (INTPTR) realptr);  // try set to our new tail
83     
84     if( tailActual == tailCurrent ) {
85       // success, update tail
86       tailCurrent->next = (MemPoolItem *) ptr;
87       return;
88     }
89
90     // if CAS failed, retry entire operation
91   }
92 }
93
94 static inline void* tagpoolalloc( MemPool* p ) {
95
96   // to protect CAS in poolfree from dereferencing
97   // null, treat the queue as empty when there is
98   // only one item.  The dequeue operation is only
99   // executed by the thread that owns the pool, so
100   // it doesn't require an atomic op
101   MemPoolItem* headCurrent = p->head;
102   MemPoolItem* realHead=(MemPoolItem *) EXTRACTPTR((INTPTR)headCurrent);
103   MemPoolItem* next=realHead->next;
104   int i;
105   if(next == NULL) {
106     // only one item, so don't take from pool
107     return (void*) RUNMALLOC( p->itemSize );
108   }
109  
110   p->head = next;
111
112   //////////////////////////////////////////////////////////
113   //
114   //
115   //  static inline void prefetch(void *x) 
116   //  { 
117   //    asm volatile("prefetcht0 %0" :: "m" (*(unsigned long *)x));
118   //  } 
119   //
120   //
121   //  but this built-in gcc one seems the most portable:
122   //////////////////////////////////////////////////////////
123   //__builtin_prefetch( &(p->head->next) );
124   MemPoolItem* realNext=(MemPoolItem *) EXTRACTPTR((INTPTR)next);
125   asm volatile( "prefetcht0 (%0)" :: "r" (realNext));
126   realNext=(MemPoolItem*)(((char *)realNext)+CACHELINESIZE);
127   asm volatile( "prefetcht0 (%0)" :: "r" (realNext));
128
129   return (void*)headCurrent;
130 }
131
132
133
134 // CAS
135 // in: a ptr, expected old, desired new
136 // return: actual old
137 //
138 // Pass in a ptr, what you expect the old value is and
139 // what you want the new value to be.
140 // The CAS returns what the value is actually: if it matches
141 // your proposed old value then you assume the update was successful,
142 // otherwise someone did CAS before you, so try again (the return
143 // value is the old value you will pass next time.)
144
145 static inline void dqPushBottom( deque* p, void* work ) {
146   dequeItem *ptr=(dequeItem *) tagpoolalloc(&p->objret);
147   //  dequeItem *ptr=(dequeItem *) calloc(1,sizeof(dequeItem));
148   dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
149   ptr=(dequeItem *) (((INTPTR)ptr)+INCREMENTTAG);
150   realptr->work=work;
151   BARRIER();
152   p->tail->next=ptr;
153   p->tail=realptr;
154 }
155
156 static inline void* dqPopTop(deque *p) {
157   dequeItem *ptr=p->head;
158   dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
159   dequeItem *next=realptr->next;
160   //remove if we can..steal work no matter what
161   if (likely(next!=NULL)) {
162     if (((dequeItem *)CAS(&(p->head),(INTPTR)ptr, (INTPTR)next))!=ptr)
163       return DQ_POP_EMPTY;
164     void * item=NULL;
165     item=(void *)LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
166     realptr->next=NULL;
167     BARRIER();
168     tagpoolfreeinto(&p->objret,ptr, realptr);
169     return item;
170   } else {
171     void * item=NULL;
172     item=(void *) LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
173     return item;
174   }
175 }
176
177 #define dqPopBottom dqPopTop
178
179
180 #endif // ___MEMPOOL_H__
181
182
183
184
185
186
187
188
189
190