8 #include "mlp_runtime.h"
9 #include "workschedule.h"
13 __thread struct Queue* seseCallStack;
14 __thread pthread_once_t mlpOnceObj = PTHREAD_ONCE_INIT;
15 void mlpInitOncePerThread() {
16 seseCallStack = createQueue();
19 __thread SESEcommon_p seseCaller;
22 void* mlpAllocSESErecord( int size ) {
23 void* newrec = RUNMALLOC( size );
25 printf( "mlpAllocSESErecord did not obtain memory!\n" );
32 void mlpFreeSESErecord( void* seseRecord ) {
33 RUNFREE( seseRecord );
36 MemoryQueue** mlpCreateMemoryQueueArray(int numMemoryQueue){
38 MemoryQueue** newMemoryQueue=(MemoryQueue**)RUNMALLOC( sizeof( MemoryQueue* ) * numMemoryQueue );
39 for(i=0; i<numMemoryQueue; i++){
40 newMemoryQueue[i]=createMemoryQueue();
42 return newMemoryQueue;
45 REntry* mlpCreateREntryArray(){
46 REntry* newREntryArray=(REntry*)RUNMALLOC(sizeof(REntry)*NUMRENTRY);
47 return newREntryArray;
50 REntry* mlpCreateFineREntry(int type, void* seseToIssue, void* dynID){
51 REntry* newREntry=(REntry*)RUNMALLOC(sizeof(REntry));
53 newREntry->seseRec=seseToIssue;
54 newREntry->dynID=dynID;
58 REntry* mlpCreateREntry(int type, void* seseToIssue){
59 REntry* newREntry=(REntry*)RUNMALLOC(sizeof(REntry));
61 newREntry->seseRec=seseToIssue;
65 int isParent(REntry *r) {
66 if (r->type==PARENTREAD || r->type==PARENTWRITE) {
73 int isParentCoarse(REntry *r){
74 if (r->type==PARENTCOARSE){
81 int isFineRead(REntry *r) {
82 if (r->type==READ || r->type==PARENTREAD) {
89 int isFineWrite(REntry *r) {
90 if (r->type==WRITE || r->type==PARENTWRITE) {
97 int isCoarse(REntry *r){
98 if(r->type==COARSE || r->type==PARENTCOARSE){
105 int isSCC(REntry *r){
106 if(r->type==SCCITEM){
113 int isSingleItem(MemoryQueueItem *qItem){
114 if(qItem->type==SINGLEITEM){
121 int isHashtable(MemoryQueueItem *qItem){
122 if(qItem->type==HASHTABLE){
129 int isVector(MemoryQueueItem *qItem){
130 if(qItem->type==VECTOR){
137 int isReadBinItem(BinItem* b){
138 if(b->type==READBIN){
145 int isWriteBinItem(BinItem* b){
146 if(b->type==WRITEBIN){
153 int generateKey(unsigned int data){
154 return (data&H_MASK)>> 4;
157 Hashtable* createHashtable(){
159 Hashtable* newTable=(Hashtable*)RUNMALLOC(sizeof(Hashtable));
160 newTable->item.type=HASHTABLE;
161 for(i=0;i<NUMBINS;i++){
162 newTable->array[i]=(BinElement*)RUNMALLOC(sizeof(BinElement));
163 newTable->array[i]->head=NULL;
164 newTable->array[i]->tail=NULL;
166 newTable->unresolvedQueue=NULL;
170 WriteBinItem* createWriteBinItem(){
171 WriteBinItem* binitem=(WriteBinItem*)RUNMALLOC(sizeof(WriteBinItem));
172 binitem->item.type=WRITEBIN;
176 ReadBinItem* createReadBinItem(){
177 ReadBinItem* binitem=(ReadBinItem*)RUNMALLOC(sizeof(ReadBinItem));
179 binitem->item.type=READBIN;
183 Vector* createVector(){
184 Vector* vector=(Vector*)RUNMALLOC(sizeof(Vector));
186 vector->item.type=VECTOR;
191 SCC* scc=(SCC*)RUNMALLOC(sizeof(SCC));
192 scc->item.type=SINGLEITEM;
196 MemoryQueue* createMemoryQueue(){
197 MemoryQueue* queue = (MemoryQueue*)RUNMALLOC(sizeof(MemoryQueue));
198 MemoryQueueItem* dummy=(MemoryQueueItem*)RUNMALLOC(sizeof(MemoryQueueItem));
199 dummy->type=3; // dummy type
207 int ADDRENTRY(MemoryQueue * q, REntry * r) {
208 if (isFineRead(r) || isFineWrite(r)) {
209 return ADDTABLE(q, r);
210 } else if (isCoarse(r)) {
211 return ADDVECTOR(q, r);
212 } else if (isSCC(r)) {
217 int ADDTABLE(MemoryQueue *q, REntry *r) {
218 if(!isHashtable(q->tail)) {
220 MemoryQueueItem* tail=q->tail;
221 if (isParent(r) && tail->total==0 && q->tail==q->head) {
226 Hashtable* h=createHashtable();
227 tail->next=(MemoryQueueItem*)h;
228 //************NEED memory barrier here to ensure compiler does not cache Q.tail.status********
229 if (BARRIER() && tail->status==READY && tail->total==0 && q->tail==q->head) {
230 //previous Q item is finished
231 h->item.status=READY;
233 q->tail=(MemoryQueueItem*)h;
234 // handle the the queue item case
235 if(q->head->type==3){
236 q->head=(MemoryQueueItem*)h;
240 //at this point, have table
241 Hashtable* table=(Hashtable*)q->tail;
243 if(*(r->pointer)==0 || (*(r->pointer)!=0 && table->unresolvedQueue!=NULL)){
245 // grab lock on the queue
247 val=(struct Queue*)0x1;
248 val=(struct Queue*)LOCKXCHG((unsigned INTPTR*)&(table->unresolvedQueue), (unsigned INTPTR)val);
249 } while(val==(struct Queue*)0x1);
253 if(*(r->pointer)!=0){
254 // pointer is already resolved.
255 table->unresolvedQueue=val; //released lock;
256 return ADDTABLE(q,r);
258 table->unresolvedQueue=(struct Queue*)0x1;
259 struct Queue* queue=createQueue();
261 atomic_inc(&table->item.total);
262 table->unresolvedQueue=queue; // expose new queue
265 // someone else has already cleared all queue stuff
266 table->unresolvedQueue=val; // released lock
267 return ADDTABLE(q,r);
269 addNewItemBack(val,r);
270 atomic_inc(&table->item.total);
271 table->unresolvedQueue=val; // released lock
276 r->dynID=(void*)*(r->pointer); // interim fix.
278 int key=generateKey((unsigned int)(unsigned INTPTR)r->dynID);
281 BinElement* bin=table->array[key];
282 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(bin->head), (unsigned INTPTR)val);//note...talk to me about optimizations here.
283 } while(val==(BinItem*)0x1);
284 //at this point have locked bin
286 return EMPTYBINCASE(table, table->array[key], r, TRUE);
288 if (isFineWrite(r)) {
289 return WRITEBINCASE(table, r, val, key, TRUE);
290 } else if (isFineRead(r)) {
291 return READBINCASE(table, r, val, key, TRUE);
296 int ADDTABLEITEM(Hashtable* table, REntry* r){
298 int key=generateKey((unsigned int)(unsigned INTPTR)r->dynID);
301 BinElement* bin=table->array[key];
302 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(bin->head), (unsigned INTPTR)val);//note...talk to me about optimizations here.
303 } while(val==(BinItem*)0x1);
304 //at this point have locked bin
306 return EMPTYBINCASE(table, table->array[key], r, FALSE);
308 if (isFineWrite(r)) {
309 return WRITEBINCASE(table, r, val, key, FALSE);
310 } else if (isFineRead(r)) {
311 return READBINCASE(table, r, val, key, FALSE);
316 int EMPTYBINCASE(Hashtable *T, BinElement* be, REntry *r, int inc) {
319 if (isFineWrite(r)) {
320 b=(BinItem*)createWriteBinItem();
321 ((WriteBinItem*)b)->val=r;//<-only different statement
322 } else if (isFineRead(r)) {
323 b=(BinItem*)createReadBinItem();
324 ReadBinItem* readbin=(ReadBinItem*)b;
325 readbin->array[readbin->index++]=r;
329 if (T->item.status==READY) {
330 //current entry is ready
334 be->head=NULL; // released lock
343 atomic_inc(&T->item.total);
348 be->head=b;//released lock
352 int WRITEBINCASE(Hashtable *T, REntry *r, BinItem *val, int key, int inc) {
353 //chain of bins exists => tail is valid
354 //if there is something in front of us, then we are not ready
357 BinElement* be=T->array[key];
359 BinItem *bintail=be->tail;
361 WriteBinItem *b=createWriteBinItem();
365 // note: If current table clears all dependencies, then write bin is ready
366 if (T->item.total==0){
371 b->item.status=retval;
372 // b->item.status=NOTREADY;
375 atomic_inc(&T->item.total);
379 r->binitem=(BinItem*)b;
381 be->tail->next=(BinItem*)b;
382 be->tail=(BinItem*)b;
387 READBINCASE(Hashtable *T, REntry *r, BinItem *val, int key, int inc) {
388 BinItem * bintail=T->array[key]->tail;
389 if (isReadBinItem(bintail)) {
390 return TAILREADCASE(T, r, val, bintail, key);
391 } else if (!isReadBinItem(bintail)) {
392 TAILWRITECASE(T, r, val, bintail, key);
397 int TAILREADCASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail, int key, int inc) {
398 ReadBinItem * readbintail=(ReadBinItem*)T->array[key]->tail;
400 if (readbintail->item.status=READY) {
404 T->array[key]->head=val;//released lock
412 if (readbintail->index==NUMREAD) { // create new read group
413 ReadBinItem* rb=createReadBinItem();
414 rb->array[rb->index++]=r;
415 rb->item.total=1;//safe only because item could not have started
416 rb->item.status=status;
417 T->array[key]->tail->next=(BinItem*)rb;
418 T->array[key]->tail=(BinItem*)rb;
419 r->binitem=(BinItem*)rb;
420 } else { // group into old tail
421 readbintail->array[readbintail->index++]=r;
422 atomic_inc(&readbintail->item.total);
423 r->binitem=(BinItem*)readbintail;
424 //printf("grouping with %d\n",readbintail->index);
427 atomic_inc(&T->item.total);
430 T->array[key]->head=val;//released lock
434 TAILWRITECASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail, int key, int inc) {
435 // WriteBinItem* wb=createWriteBinItem();
437 //wb->item.total=1;//safe because item could not have started
438 //wb->item.status=NOTREADY;
439 ReadBinItem* rb=createReadBinItem();
440 rb->array[rb->index++]=r;
441 rb->item.total=1;//safe because item could not have started
442 rb->item.status=NOTREADY;
444 atomic_inc(&T->item.total);
447 r->binitem=(BinItem*)rb;
448 T->array[key]->tail->next=(BinItem*)rb;
449 T->array[key]->tail=(BinItem*)rb;
450 T->array[key]->head=val;//released lock
453 ADDVECTOR(MemoryQueue *Q, REntry *r) {
454 if(!isVector(Q->tail)) {
456 if (isParentCoarse(r) && Q->tail->total==0 && Q->tail==Q->head) {
461 Vector* V=createVector();
462 Q->tail->next=(MemoryQueueItem*)V;
463 //************NEED memory barrier here to ensure compiler does not cache Q.tail.status******
464 if (BARRIER() && Q->tail->status==READY&&Q->tail->total==0) {
465 //previous Q item is finished
466 V->item.status=READY;
468 Q->tail=(MemoryQueueItem*)V;
469 // handle the the queue item case
470 if(Q->head->type==3){
471 Q->head=(MemoryQueueItem*)V;
474 //at this point, have vector
475 Vector* V=(Vector*)Q->tail;
476 if (V->index==NUMITEMS) {
480 V->item.status=NOTREADY;
481 Q->tail->next=(MemoryQueueItem*)V;
482 //***NEED memory barrier here to ensure compiler does not cache Q.tail.status******
483 if (BARRIER() && Q->tail->status==READY) {
484 V->item.status=READY;
486 Q->tail=(MemoryQueueItem*)V;
489 atomic_inc(&V->item.total);
493 //*****NEED memory barrier here to ensure compiler does not reorder writes to V.array and V.index
496 //*****NEED memory barrier here to ensure compiler does not cache V.status*********
498 if (BARRIER() && V->item.status==READY) {
500 flag=(void*)LOCKXCHG((unsigned INTPTR*)&(V->array[index]), (unsigned INTPTR)flag);
502 if (isParent(r)) { //parent's retire immediately
503 atomic_dec(&V->item.total);
507 return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
515 //SCC's don't come in parent variety
516 ADDSCC(MemoryQueue *Q, REntry *r) {
522 Q->tail->next=(MemoryQueueItem*)S;
523 //*** NEED BARRIER HERE
524 if (BARRIER() && Q->tail->status==READY && Q->tail->total==0 && Q->tail==Q->head) {
525 //previous Q item is finished
526 S->item.status=READY;
527 Q->tail=(MemoryQueueItem*)S;
528 // handle the the queue item case
529 if(Q->head->type==3){
530 Q->head=(MemoryQueueItem*)S;
533 flag=(void*)LOCKXCHG((unsigned INTPTR*)&(S->val), (unsigned INTPTR)flag);
537 return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
540 Q->tail=(MemoryQueueItem*)S;
546 void RETIRERENTRY(MemoryQueue* Q, REntry * r) {
547 if (isFineWrite(r)||isFineRead(r)) {
548 RETIREHASHTABLE(Q, r);
549 } else if (isCoarse(r)) {
551 } else if (isSCC(r)) {
556 RETIRESCC(MemoryQueue *Q, REntry *r) {
558 s->item.total=0;//don't need atomicdec
563 RETIREHASHTABLE(MemoryQueue *q, REntry *r) {
564 Hashtable *T=r->hashtable;
565 BinItem *b=r->binitem;
567 atomic_dec(&T->item.total);
568 if (T->item.next!=NULL && T->item.total==0) {
573 RETIREBIN(Hashtable *T, REntry *r, BinItem *b) {
574 int key=generateKey((unsigned int)(unsigned INTPTR)r->dynID);
576 atomic_dec(&b->total);
578 if (isFineWrite(r) || (isFineRead(r) && b->next!=NULL && b->total==0)) {
579 // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
583 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(T->array[key]->head), (unsigned INTPTR)val);
584 } while(val==(BinItem*)0x1);
585 // at this point have locked bin
590 if (isReadBinItem(ptr)) {
591 ReadBinItem* rptr=(ReadBinItem*)ptr;
592 if (rptr->item.status==NOTREADY) {
593 for (i=0;i<rptr->index;i++) {
594 resolveDependencies(rptr->array[i]);
595 if (isParent(rptr->array[i])) {
596 //parents go immediately
597 atomic_dec(&rptr->item.total);
598 atomic_dec(&T->item.total);
602 rptr->item.status=READY;
603 if (rptr->item.next==NULL) {
606 if (rptr->item.total!=0) {
608 } else if ((BinItem*)rptr==val) {
611 } else if(isWriteBinItem(ptr)) {
614 if(ptr->status==NOTREADY){
615 resolveDependencies(((WriteBinItem*)ptr)->val);
617 if(isParent(((WriteBinItem*)ptr)->val)){
618 atomic_dec(&T->item.total);
622 }else{ // write bin is already resolved
626 if(ptr->status==NOTREADY) {
627 resolveDependencies(((WriteBinItem*)ptr)->val);
630 if (isParent(((WriteBinItem*)ptr)->val)) {
631 atomic_dec(&T->item.total);
641 T->array[key]->head=val; // release lock
646 RETIREVECTOR(MemoryQueue *Q, REntry *r) {
648 atomic_dec(&V->item.total);
649 if (V->item.next!=NULL && V->item.total==0) { //NOTE: ORDERING CRUCIAL HERE
654 RESOLVECHAIN(MemoryQueue *Q) {
656 MemoryQueueItem* head=Q->head;
657 if (head->next==NULL||head->total!=0) {
658 //item is not finished
659 if (head->status!=READY) {
660 //need to update status
662 if (isHashtable(head)) {
663 RESOLVEHASHTABLE(Q, head);
664 } else if (isVector(head)) {
665 RESOLVEVECTOR(Q, head);
666 } else if (isSingleItem(head)) {
669 if (head->next==NULL)
676 MemoryQueueItem* nextitem=head->next;
677 CAS((unsigned INTPTR*)&(Q->head), (unsigned INTPTR)head, (unsigned INTPTR)nextitem);
678 //oldvalue not needed... if we fail we just repeat
683 RESOLVEHASHTABLE(MemoryQueue *Q, Hashtable *T) {
685 for (binidx=0;binidx<NUMBINS;binidx++) {
686 BinElement* bin=T->array[binidx];
690 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(bin->head), (unsigned INTPTR)val);
691 } while (val==(BinItem*)1);
692 //at this point have locked bin
695 if(ptr!=NULL&&ptr->status==NOTREADY) {
697 if (isWriteBinItem(ptr)) {
700 resolveDependencies(((WriteBinItem*)ptr)->val);
702 if (isParent(((WriteBinItem*)ptr)->val)) {
703 atomic_dec(&T->item.total);
707 } else if (isReadBinItem(ptr)) {
709 ReadBinItem* rptr=(ReadBinItem*)ptr;
710 for(i=0;i<rptr->index;i++) {
711 resolveDependencies(rptr->array[i]);
712 if (isParent(rptr->array[i])) {
713 atomic_dec(&rptr->item.total);
714 atomic_dec(&T->item.total);
717 if (rptr->item.next==NULL||rptr->item.total!=0) {
719 } else if((BinItem*)rptr==val) {
722 rptr->item.status=READY; {
728 bin->head=val; // released lock;
732 RESOLVEVECTOR(MemoryQueue *q, Vector *V) {
738 for (i=0;i<NUMITEMS;i++) {
740 val=(REntry*)LOCKXCHG((unsigned INTPTR*)&(tmp->array[i]), (unsigned INTPTR)val);
742 resolveDependencies(val);
744 atomic_dec(&tmp->item.total);
748 if (tmp->item.next!=NULL&&isVector(tmp->item.next)) {
749 tmp=(Vector*)tmp->item.next;
757 //precondition: SCC's state is READY
759 flag=(void*)LOCKXCHG((unsigned INTPTR*)&(S->val), (unsigned INTPTR)flag);
761 resolveDependencies(flag);
766 resolveDependencies(REntry* rentry){
767 SESEcommon* seseCommon=(SESEcommon*)rentry->seseRec;
768 if(rentry->type==READ || rentry->type==WRITE || rentry->type==COARSE || rentry->type==SCCITEM){
769 if( atomic_sub_and_test(1, &(seseCommon->unresolvedDependencies)) ){
770 workScheduleSubmit(seseCommon);
772 }else if(rentry->type==PARENTREAD || rentry->type==PARENTWRITE ||rentry->type==PARENTCOARSE){
773 psem_give(&(rentry->parentStallSem));
777 resolvePointer(REntry* rentry){
778 rentry->dynID=(void*)*(rentry->pointer); // interim.
779 Hashtable* table=rentry->hashtable;
782 val=(struct Queue*)0x1;
783 val=(struct Queue*)LOCKXCHG((unsigned INTPTR*)&(table->unresolvedQueue), (unsigned INTPTR)val);
784 } while(val==(struct Queue*)0x1);
785 if(val!=NULL && getHead(val)->objectptr==rentry){
786 // handling pointer is the first item of the queue
787 // start to resolve until it reaches unresolved pointer or end of queue
789 struct QueueItem* head=getHead(val);
791 REntry* rentry=(REntry*)head->objectptr;
792 if(*(rentry->pointer)==0){
793 // encounters following unresolved pointer
794 table->unresolvedQueue=val;//released lock
797 removeItem(val,head);
798 if(ADDTABLEITEM(table, rentry)==READY){
799 SESEcommon* seseCommon=(SESEcommon*)rentry->seseRec;
800 atomic_sub_and_test(1, &(seseCommon->unresolvedDependencies));
803 table->unresolvedQueue=NULL; // set hashtable as normal-mode.
808 table->unresolvedQueue=val;//released lock;