1 #include "hashStructure.h"
2 //#include "WaitingQueue.h"
6 #include "rcr_runtime.h"
8 //NOTE: this is only temporary (for testing) and will be removed in favor of thread local variables
9 //It's basically an array of hashStructures so we can simulate what would happen in a many-threaded version
10 HashStructure ** allHashStructures;
11 #define ISWRITEBIN(x) (x&BINMASK)
12 #define ISREADBIN(x) (!(x&BINMASK))
13 //#define POPCOUNT(x) __builtin_popcountll(x)
14 //__builtin_popcountll
17 //NOTE: only temporary
18 void rcr_createMasterHashTableArray(int maxSize){
21 HashStructure* rcr_createHashtable(int sizeofWaitingQueue){
23 HashStructure* newTable=(HashStructure*)RUNMALLOC(sizeof(HashStructure));
24 for(i=0;i<RNUMBINS;i++){
25 newTable->array[i].head=NULL;
26 newTable->array[i].tail=NULL;
32 WriteBinItem_rcr* rcr_createWriteBinItem(){
33 WriteBinItem_rcr* binitem=(WriteBinItem_rcr*)RUNMALLOC(sizeof(WriteBinItem_rcr));
34 binitem->item.type=WRITEBIN;
38 ReadBinItem_rcr* rcr_createReadBinItem(){
39 ReadBinItem_rcr* binitem=(ReadBinItem_rcr*)RUNMALLOC(sizeof(ReadBinItem_rcr));
41 binitem->item.type=READBIN;
45 inline int rcr_generateKey(void * ptr){
46 return (((struct ___Object___ *) ptr)->oid)&RH_MASK;
50 int rcr_WRITEBINCASE(HashStructure *T, void *ptr, SESEcommon *task, int index) {
51 //chain of bins exists => tail is valid
52 //if there is something in front of us, then we are not ready
54 int key=rcr_generateKey(ptr);
55 BinElement_rcr* be= &(T->array[key]); //do not grab head from here since it's locked (i.e. = 0x1)
57 //LOCK is still needed as different threads will remove items...
59 val=(BinItem_rcr *)0x1;
60 val=(BinItem_rcr *)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
61 } while(val==(BinItem_rcr*)0x1);
64 BinItem_rcr * b=(BinItem_rcr*)rcr_createWriteBinItem();
65 WriteBinItem_rcr * td = (WriteBinItem_rcr*)b;
69 //common to both types
71 td->bitindexrd=td->bitindexwr=1<<index;
79 BinItem_rcr *bintail=be->tail;
80 bitvt rdmask=0,wrmask=0;
83 if (ISWRITEBIN(bintail->type)) {
84 WriteBinItem_rcr * td = (WriteBinItem_rcr *)bintail;
85 //last one is to check for SESE blocks in a while loop.
86 if(unlikely(td->task == task)) {
89 if (!(bit & td->bitindexwr)) {
92 return (bintail->status==READY)?SPECREADY:SPECNOTREADY;
97 TraverserData * td = &((ReadBinItem_rcr *)bintail)->array[((ReadBinItem_rcr *)bintail)->index - 1];
98 if(unlikely(td->task == task)) {
99 //if it matches, then we remove it and the code below will upgrade it to a write.
100 ((ReadBinItem_rcr *)bintail)->index--;
101 atomic_dec(&bintail->total);
103 if (bintail->status!=READY)
109 WriteBinItem_rcr *b=rcr_createWriteBinItem();
115 //count already includes this
118 b->bitindexwr=bit|wrmask;
119 b->bitindexrd=bit|rdmask;
120 b->item.status=status;
121 bintail->next=(BinItem_rcr*)b;
122 be->tail=(BinItem_rcr*)b;
124 if (bintail->status==READY&&bintail->total==0) {
125 //we may have to set write as ready
126 while(val->total==0) {
127 if (val==((BinItem_rcr *)b)) {
128 b->item.status=READY;
144 int rcr_READBINCASE(HashStructure *T, void *ptr, SESEcommon * task, int index) {
146 int key=rcr_generateKey(ptr);
147 BinElement_rcr * be = &(T->array[key]);
149 //LOCK is still needed as different threads will remove items...
151 val=(BinItem_rcr *)0x1;
152 val=(BinItem_rcr *)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
153 } while(val==(BinItem_rcr*)0x1);
156 BinItem_rcr * b=(BinItem_rcr*)rcr_createReadBinItem();
157 ReadBinItem_rcr* readbin=(ReadBinItem_rcr*)b;
158 TraverserData * td = &(readbin->array[readbin->index++]);
162 //common to both types
164 td->bitindex=1<<index;
174 BinItem_rcr * bintail=be->tail;
176 //check if already added item or not.
177 if (ISWRITEBIN(bintail->type)) {
178 WriteBinItem_rcr * td = (WriteBinItem_rcr *)bintail;
179 if(unlikely(td->task==task)) {
182 int status=bintail->status;
183 if (!(td->bitindexrd & bit)) {
193 TraverserData * td = &((ReadBinItem_rcr *)bintail)->array[((ReadBinItem_rcr *)bintail)->index - 1];
194 if (unlikely(td->task==task)) {
197 int status=bintail->status;
198 if (!(td->bitindex & bit)) {
208 if (ISREADBIN(bintail->type)) {
209 return rcr_TAILREADCASE(T, ptr, val, bintail, key, task, index);
211 rcr_TAILWRITECASE(T, ptr, val, bintail, key, task, index);
217 int rcr_TAILREADCASE(HashStructure *T, void * ptr, BinItem_rcr *val, BinItem_rcr *bintail, int key, SESEcommon * task, int index) {
218 ReadBinItem_rcr * readbintail=(ReadBinItem_rcr*)T->array[key].tail;
221 if (readbintail->item.status==READY) {
229 if (readbintail->index==RNUMREAD) { // create new read group
230 ReadBinItem_rcr* rb=rcr_createReadBinItem();
231 td = &rb->array[rb->index++];
234 rb->item.status=status;
235 T->array[key].tail->next=(BinItem_rcr*)rb;
236 T->array[key].tail=(BinItem_rcr*)rb;
237 } else { // group into old tail
238 td = &readbintail->array[readbintail->index++];
239 atomic_inc(&readbintail->item.total);
243 td->bitindex=1<<index;
245 T->array[key].head=val;//released lock
249 void rcr_TAILWRITECASE(HashStructure *T, void *ptr, BinItem_rcr *val, BinItem_rcr *bintail, int key, SESEcommon * task, int index) {
250 ReadBinItem_rcr* rb=rcr_createReadBinItem();
251 TraverserData * td = &(rb->array[rb->index++]);
253 rb->item.status=NOTREADY;
256 td->bitindex=1<<index;
258 T->array[key].tail->next=(BinItem_rcr*)rb;
259 T->array[key].tail=(BinItem_rcr*)rb;
260 T->array[key].head=val;//released lock
263 void rcr_RETIREHASHTABLE(HashStructure *T, SESEcommon *task, int key) {
264 BinElement_rcr * be = &(T->array[key]);
265 BinItem_rcr *b=be->head;
267 if(ISREADBIN(READBIN)) {
268 atomic_dec(&b->total);
269 if (b->next==NULL || b->total>0) {
273 //We either have a write bin or we are at the end of a read bin
276 // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
277 BinItem_rcr * val=(BinItem_rcr *)0x1;
279 val=(BinItem_rcr*)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
280 } while(val==(BinItem_rcr*)0x1);
282 // at this point have locked bin
283 BinItem_rcr *ptr=val;
287 if (ISREADBIN(ptr->type)) {
288 if (ptr->status==NOTREADY) {
289 ReadBinItem_rcr* rptr=(ReadBinItem_rcr*)ptr;
290 for (i=0;i<rptr->index;i++) {
291 TraverserData * td=&rptr->array[i];
292 RESOLVE(td->task, td->bitindex);
293 if (((INTPTR)rptr->array[i].task)&PARENTBIN) {
294 //parents go immediately
295 atomic_dec(&rptr->item.total);
300 if (ptr->next==NULL) {
305 } else if (ptr==val) {
312 if(ptr->status==NOTREADY) {
313 WriteBinItem_rcr* wptr=(WriteBinItem_rcr*)ptr;
314 RESOLVE(wptr->task, wptr->bitindexwr);
316 if(((INTPTR)wptr->task)&PARENTBIN) {
320 } else { // write bin is already resolved
326 be->head=val; // release lock
330 void RESOLVE(SESEcommon *record, bitvt mask) {
332 struct rcrRecord * array=(struct rcrRecord *)(((char *)record)+record->offsetToParamRecords);
334 int shift=__builtin_ctzll(mask)+1;
336 if (atomic_sub_and_test(1,&array[index].count)) {
337 int flag=LOCKXCHG32(&array[index].flag,0);
339 if(atomic_sub_and_test(1, &(record->unresolvedDependencies)))
340 workScheduleSubmit((void *)record);