8 #include "mlp_runtime.h"
9 #include "workschedule.h"
13 __thread struct Queue* seseCallStack;
14 __thread pthread_once_t mlpOnceObj = PTHREAD_ONCE_INIT;
15 void mlpInitOncePerThread() {
16 seseCallStack = createQueue();
19 __thread SESEcommon_p seseCaller;
22 void* mlpAllocSESErecord( int size ) {
23 void* newrec = RUNMALLOC( size );
25 printf( "mlpAllocSESErecord did not obtain memory!\n" );
32 void mlpFreeSESErecord( void* seseRecord ) {
33 RUNFREE( seseRecord );
36 MemoryQueue** mlpCreateMemoryQueueArray(int numMemoryQueue){
38 MemoryQueue** newMemoryQueue=(MemoryQueue**)RUNMALLOC( sizeof( MemoryQueue* ) * numMemoryQueue );
39 for(i=0; i<numMemoryQueue; i++){
40 newMemoryQueue[i]=createMemoryQueue();
42 return newMemoryQueue;
45 REntry* mlpCreateREntryArray(){
46 REntry* newREntryArray=(REntry*)RUNMALLOC(sizeof(REntry)*NUMRENTRY);
47 return newREntryArray;
50 REntry* mlpCreateFineREntry(int type, void* seseToIssue, void* dynID){
51 REntry* newREntry=(REntry*)RUNMALLOC(sizeof(REntry));
53 newREntry->seseRec=seseToIssue;
54 newREntry->dynID=dynID;
58 REntry* mlpCreateREntry(int type, void* seseToIssue){
59 REntry* newREntry=(REntry*)RUNMALLOC(sizeof(REntry));
61 newREntry->seseRec=seseToIssue;
65 int isParent(REntry *r) {
66 if (r->type==PARENTREAD || r->type==PARENTWRITE) {
73 int isParentCoarse(REntry *r){
74 if (r->type==PARENTCOARSE){
81 int isFineRead(REntry *r) {
82 if (r->type==READ || r->type==PARENTREAD) {
89 int isFineWrite(REntry *r) {
90 if (r->type==WRITE || r->type==PARENTWRITE) {
97 int isCoarse(REntry *r){
98 if(r->type==COARSE || r->type==PARENTCOARSE){
105 int isSCC(REntry *r){
106 if(r->type==SCCITEM){
113 int isSingleItem(MemoryQueueItem *qItem){
114 if(qItem->type==SINGLEITEM){
121 int isHashtable(MemoryQueueItem *qItem){
122 if(qItem->type==HASHTABLE){
129 int isVector(MemoryQueueItem *qItem){
130 if(qItem->type==VECTOR){
137 int isReadBinItem(BinItem* b){
138 if(b->type==READBIN){
145 int isWriteBinItem(BinItem* b){
146 if(b->type==WRITEBIN){
153 int generateKey(unsigned int data){
154 return (data&H_MASK)>> 4;
157 Hashtable* createHashtable(){
159 Hashtable* newTable=(Hashtable*)RUNMALLOC(sizeof(Hashtable));
160 newTable->item.type=HASHTABLE;
161 for(i=0;i<NUMBINS;i++){
162 newTable->array[i]=(BinElement*)RUNMALLOC(sizeof(BinElement));
163 newTable->array[i]->head=NULL;
164 newTable->array[i]->tail=NULL;
166 newTable->unresolvedQueue=NULL;
170 WriteBinItem* createWriteBinItem(){
171 WriteBinItem* binitem=(WriteBinItem*)RUNMALLOC(sizeof(WriteBinItem));
172 binitem->item.type=WRITEBIN;
176 ReadBinItem* createReadBinItem(){
177 ReadBinItem* binitem=(ReadBinItem*)RUNMALLOC(sizeof(ReadBinItem));
179 binitem->item.type=READBIN;
183 Vector* createVector(){
184 Vector* vector=(Vector*)RUNMALLOC(sizeof(Vector));
186 vector->item.type=VECTOR;
191 SCC* scc=(SCC*)RUNMALLOC(sizeof(SCC));
192 scc->item.type=SINGLEITEM;
196 MemoryQueue* createMemoryQueue(){
197 MemoryQueue* queue = (MemoryQueue*)RUNMALLOC(sizeof(MemoryQueue));
198 MemoryQueueItem* dummy=(MemoryQueueItem*)RUNMALLOC(sizeof(MemoryQueueItem));
199 dummy->type=3; // dummy type
207 int ADDRENTRY(MemoryQueue * q, REntry * r) {
208 if (isFineRead(r) || isFineWrite(r)) {
209 return ADDTABLE(q, r);
210 } else if (isCoarse(r)) {
211 return ADDVECTOR(q, r);
212 } else if (isSCC(r)) {
217 int ADDTABLE(MemoryQueue *q, REntry *r) {
218 if(!isHashtable(q->tail)) {
220 MemoryQueueItem* tail=q->tail;
221 if (isParent(r) && tail->total==0 && q->tail==q->head) {
226 Hashtable* h=createHashtable();
227 tail->next=(MemoryQueueItem*)h;
228 //************NEED memory barrier here to ensure compiler does not cache Q.tail.status********
229 if (BARRIER() && tail->status==READY && tail->total==0 && q->tail==q->head) {
230 //previous Q item is finished
231 h->item.status=READY;
233 q->tail=(MemoryQueueItem*)h;
234 // handle the the queue item case
235 if(q->head->type==3){
236 q->head=(MemoryQueueItem*)h;
240 //at this point, have table
241 Hashtable* table=(Hashtable*)q->tail;
243 if(r->pointer!=0 && (*(r->pointer)==0 || (*(r->pointer)!=0 && table->unresolvedQueue!=NULL))){
245 // grab lock on the queue
247 val=(struct Queue*)0x1;
248 val=(struct Queue*)LOCKXCHG((unsigned INTPTR*)&(table->unresolvedQueue), (unsigned INTPTR)val);
249 } while(val==(struct Queue*)0x1);
253 if(*(r->pointer)!=0){
254 // pointer is already resolved.
255 table->unresolvedQueue=val; //released lock;
256 return ADDTABLE(q,r);
258 table->unresolvedQueue=(struct Queue*)0x1;
259 struct Queue* queue=createQueue();
261 atomic_inc(&table->item.total);
262 table->unresolvedQueue=queue; // expose new queue
265 // someone else has already cleared all queue stuff
266 table->unresolvedQueue=val; // released lock
267 return ADDTABLE(q,r);
269 addNewItemBack(val,r);
270 atomic_inc(&table->item.total);
271 table->unresolvedQueue=val; // released lock
276 r->dynID=(void*)*(r->pointer); // interim fix.
279 int key=generateKey((unsigned int)(unsigned INTPTR)r->dynID);
282 BinElement* bin=table->array[key];
283 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(bin->head), (unsigned INTPTR)val);//note...talk to me about optimizations here.
284 } while(val==(BinItem*)0x1);
285 //at this point have locked bin
287 return EMPTYBINCASE(table, table->array[key], r, TRUE);
289 if (isFineWrite(r)) {
290 return WRITEBINCASE(table, r, val, key, TRUE);
291 } else if (isFineRead(r)) {
292 return READBINCASE(table, r, val, key, TRUE);
297 int ADDTABLEITEM(Hashtable* table, REntry* r){
299 int key=generateKey((unsigned int)(unsigned INTPTR)r->dynID);
302 BinElement* bin=table->array[key];
303 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(bin->head), (unsigned INTPTR)val);//note...talk to me about optimizations here.
304 } while(val==(BinItem*)0x1);
305 //at this point have locked bin
307 return EMPTYBINCASE(table, table->array[key], r, FALSE);
309 if (isFineWrite(r)) {
310 return WRITEBINCASE(table, r, val, key, FALSE);
311 } else if (isFineRead(r)) {
312 return READBINCASE(table, r, val, key, FALSE);
317 int EMPTYBINCASE(Hashtable *T, BinElement* be, REntry *r, int inc) {
320 if (isFineWrite(r)) {
321 b=(BinItem*)createWriteBinItem();
322 ((WriteBinItem*)b)->val=r;//<-only different statement
323 } else if (isFineRead(r)) {
324 b=(BinItem*)createReadBinItem();
325 ReadBinItem* readbin=(ReadBinItem*)b;
326 readbin->array[readbin->index++]=r;
330 if (T->item.status==READY) {
331 //current entry is ready
335 be->head=NULL; // released lock
344 atomic_inc(&T->item.total);
349 be->head=b;//released lock
353 int WRITEBINCASE(Hashtable *T, REntry *r, BinItem *val, int key, int inc) {
354 //chain of bins exists => tail is valid
355 //if there is something in front of us, then we are not ready
358 BinElement* be=T->array[key];
360 BinItem *bintail=be->tail;
362 WriteBinItem *b=createWriteBinItem();
366 // note: If current table clears all dependencies, then write bin is ready
367 if (T->item.total==0){
372 b->item.status=retval;
373 // b->item.status=NOTREADY;
376 atomic_inc(&T->item.total);
380 r->binitem=(BinItem*)b;
382 be->tail->next=(BinItem*)b;
383 be->tail=(BinItem*)b;
388 READBINCASE(Hashtable *T, REntry *r, BinItem *val, int key, int inc) {
389 BinItem * bintail=T->array[key]->tail;
390 if (isReadBinItem(bintail)) {
391 return TAILREADCASE(T, r, val, bintail, key);
392 } else if (!isReadBinItem(bintail)) {
393 TAILWRITECASE(T, r, val, bintail, key);
398 int TAILREADCASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail, int key, int inc) {
399 ReadBinItem * readbintail=(ReadBinItem*)T->array[key]->tail;
401 if (readbintail->item.status=READY) {
405 T->array[key]->head=val;//released lock
413 if (readbintail->index==NUMREAD) { // create new read group
414 ReadBinItem* rb=createReadBinItem();
415 rb->array[rb->index++]=r;
416 rb->item.total=1;//safe only because item could not have started
417 rb->item.status=status;
418 T->array[key]->tail->next=(BinItem*)rb;
419 T->array[key]->tail=(BinItem*)rb;
420 r->binitem=(BinItem*)rb;
421 } else { // group into old tail
422 readbintail->array[readbintail->index++]=r;
423 atomic_inc(&readbintail->item.total);
424 r->binitem=(BinItem*)readbintail;
425 //printf("grouping with %d\n",readbintail->index);
428 atomic_inc(&T->item.total);
431 T->array[key]->head=val;//released lock
435 TAILWRITECASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail, int key, int inc) {
436 // WriteBinItem* wb=createWriteBinItem();
438 //wb->item.total=1;//safe because item could not have started
439 //wb->item.status=NOTREADY;
440 ReadBinItem* rb=createReadBinItem();
441 rb->array[rb->index++]=r;
442 rb->item.total=1;//safe because item could not have started
443 rb->item.status=NOTREADY;
445 atomic_inc(&T->item.total);
448 r->binitem=(BinItem*)rb;
449 T->array[key]->tail->next=(BinItem*)rb;
450 T->array[key]->tail=(BinItem*)rb;
451 T->array[key]->head=val;//released lock
454 ADDVECTOR(MemoryQueue *Q, REntry *r) {
455 if(!isVector(Q->tail)) {
457 if (isParentCoarse(r) && Q->tail->total==0 && Q->tail==Q->head) {
462 Vector* V=createVector();
463 Q->tail->next=(MemoryQueueItem*)V;
464 //************NEED memory barrier here to ensure compiler does not cache Q.tail.status******
465 if (BARRIER() && Q->tail->status==READY&&Q->tail->total==0) {
466 //previous Q item is finished
467 V->item.status=READY;
469 Q->tail=(MemoryQueueItem*)V;
470 // handle the the queue item case
471 if(Q->head->type==3){
472 Q->head=(MemoryQueueItem*)V;
475 //at this point, have vector
476 Vector* V=(Vector*)Q->tail;
477 if (V->index==NUMITEMS) {
481 V->item.status=NOTREADY;
482 Q->tail->next=(MemoryQueueItem*)V;
483 //***NEED memory barrier here to ensure compiler does not cache Q.tail.status******
484 if (BARRIER() && Q->tail->status==READY) {
485 V->item.status=READY;
487 Q->tail=(MemoryQueueItem*)V;
490 atomic_inc(&V->item.total);
494 //*****NEED memory barrier here to ensure compiler does not reorder writes to V.array and V.index
497 //*****NEED memory barrier here to ensure compiler does not cache V.status*********
499 if (BARRIER() && V->item.status==READY) {
501 flag=(void*)LOCKXCHG((unsigned INTPTR*)&(V->array[index]), (unsigned INTPTR)flag);
503 if (isParent(r)) { //parent's retire immediately
504 atomic_dec(&V->item.total);
508 return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
516 //SCC's don't come in parent variety
517 ADDSCC(MemoryQueue *Q, REntry *r) {
523 Q->tail->next=(MemoryQueueItem*)S;
524 //*** NEED BARRIER HERE
525 if (BARRIER() && Q->tail->status==READY && Q->tail->total==0 && Q->tail==Q->head) {
526 //previous Q item is finished
527 S->item.status=READY;
528 Q->tail=(MemoryQueueItem*)S;
529 // handle the the queue item case
530 if(Q->head->type==3){
531 Q->head=(MemoryQueueItem*)S;
534 flag=(void*)LOCKXCHG((unsigned INTPTR*)&(S->val), (unsigned INTPTR)flag);
538 return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
541 Q->tail=(MemoryQueueItem*)S;
547 void RETIRERENTRY(MemoryQueue* Q, REntry * r) {
548 if (isFineWrite(r)||isFineRead(r)) {
549 RETIREHASHTABLE(Q, r);
550 } else if (isCoarse(r)) {
552 } else if (isSCC(r)) {
557 RETIRESCC(MemoryQueue *Q, REntry *r) {
559 s->item.total=0;//don't need atomicdec
564 RETIREHASHTABLE(MemoryQueue *q, REntry *r) {
565 Hashtable *T=r->hashtable;
566 BinItem *b=r->binitem;
568 atomic_dec(&T->item.total);
569 if (T->item.next!=NULL && T->item.total==0) {
574 RETIREBIN(Hashtable *T, REntry *r, BinItem *b) {
575 int key=generateKey((unsigned int)(unsigned INTPTR)r->dynID);
577 atomic_dec(&b->total);
579 if (isFineWrite(r) || (isFineRead(r) && b->next!=NULL && b->total==0)) {
580 // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
584 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(T->array[key]->head), (unsigned INTPTR)val);
585 } while(val==(BinItem*)0x1);
586 // at this point have locked bin
591 if (isReadBinItem(ptr)) {
592 ReadBinItem* rptr=(ReadBinItem*)ptr;
593 if (rptr->item.status==NOTREADY) {
594 for (i=0;i<rptr->index;i++) {
595 resolveDependencies(rptr->array[i]);
596 if (isParent(rptr->array[i])) {
597 //parents go immediately
598 atomic_dec(&rptr->item.total);
599 atomic_dec(&T->item.total);
603 rptr->item.status=READY;
604 if (rptr->item.next==NULL) {
607 if (rptr->item.total!=0) {
609 } else if ((BinItem*)rptr==val) {
612 } else if(isWriteBinItem(ptr)) {
615 if(ptr->status==NOTREADY){
616 resolveDependencies(((WriteBinItem*)ptr)->val);
618 if(isParent(((WriteBinItem*)ptr)->val)){
619 atomic_dec(&T->item.total);
623 }else{ // write bin is already resolved
627 if(ptr->status==NOTREADY) {
628 resolveDependencies(((WriteBinItem*)ptr)->val);
631 if (isParent(((WriteBinItem*)ptr)->val)) {
632 atomic_dec(&T->item.total);
642 T->array[key]->head=val; // release lock
647 RETIREVECTOR(MemoryQueue *Q, REntry *r) {
649 atomic_dec(&V->item.total);
650 if (V->item.next!=NULL && V->item.total==0) { //NOTE: ORDERING CRUCIAL HERE
655 RESOLVECHAIN(MemoryQueue *Q) {
657 MemoryQueueItem* head=Q->head;
658 if (head->next==NULL||head->total!=0) {
659 //item is not finished
660 if (head->status!=READY) {
661 //need to update status
663 if (isHashtable(head)) {
664 RESOLVEHASHTABLE(Q, head);
665 } else if (isVector(head)) {
666 RESOLVEVECTOR(Q, head);
667 } else if (isSingleItem(head)) {
670 if (head->next==NULL)
677 MemoryQueueItem* nextitem=head->next;
678 CAS((unsigned INTPTR*)&(Q->head), (unsigned INTPTR)head, (unsigned INTPTR)nextitem);
679 //oldvalue not needed... if we fail we just repeat
684 RESOLVEHASHTABLE(MemoryQueue *Q, Hashtable *T) {
686 for (binidx=0;binidx<NUMBINS;binidx++) {
687 BinElement* bin=T->array[binidx];
691 val=(BinItem*)LOCKXCHG((unsigned INTPTR*)&(bin->head), (unsigned INTPTR)val);
692 } while (val==(BinItem*)1);
693 //at this point have locked bin
696 if(ptr!=NULL&&ptr->status==NOTREADY) {
698 if (isWriteBinItem(ptr)) {
701 resolveDependencies(((WriteBinItem*)ptr)->val);
703 if (isParent(((WriteBinItem*)ptr)->val)) {
704 atomic_dec(&T->item.total);
708 } else if (isReadBinItem(ptr)) {
710 ReadBinItem* rptr=(ReadBinItem*)ptr;
711 for(i=0;i<rptr->index;i++) {
712 resolveDependencies(rptr->array[i]);
713 if (isParent(rptr->array[i])) {
714 atomic_dec(&rptr->item.total);
715 atomic_dec(&T->item.total);
718 if (rptr->item.next==NULL||rptr->item.total!=0) {
720 } else if((BinItem*)rptr==val) {
723 rptr->item.status=READY; {
729 bin->head=val; // released lock;
733 RESOLVEVECTOR(MemoryQueue *q, Vector *V) {
739 for (i=0;i<NUMITEMS;i++) {
741 val=(REntry*)LOCKXCHG((unsigned INTPTR*)&(tmp->array[i]), (unsigned INTPTR)val);
743 resolveDependencies(val);
745 atomic_dec(&tmp->item.total);
749 if (tmp->item.next!=NULL&&isVector(tmp->item.next)) {
750 tmp=(Vector*)tmp->item.next;
758 //precondition: SCC's state is READY
760 flag=(void*)LOCKXCHG((unsigned INTPTR*)&(S->val), (unsigned INTPTR)flag);
762 resolveDependencies(flag);
767 resolveDependencies(REntry* rentry){
768 SESEcommon* seseCommon=(SESEcommon*)rentry->seseRec;
769 if(rentry->type==READ || rentry->type==WRITE || rentry->type==COARSE || rentry->type==SCCITEM){
770 if( atomic_sub_and_test(1, &(seseCommon->unresolvedDependencies)) ){
771 workScheduleSubmit(seseCommon);
773 }else if(rentry->type==PARENTREAD || rentry->type==PARENTWRITE ||rentry->type==PARENTCOARSE){
774 psem_give(&(rentry->parentStallSem));
778 resolvePointer(REntry* rentry){
779 rentry->dynID=(void*)*(rentry->pointer); // interim.
780 Hashtable* table=rentry->hashtable;
783 val=(struct Queue*)0x1;
784 val=(struct Queue*)LOCKXCHG((unsigned INTPTR*)&(table->unresolvedQueue), (unsigned INTPTR)val);
785 } while(val==(struct Queue*)0x1);
786 if(val!=NULL && getHead(val)->objectptr==rentry){
787 // handling pointer is the first item of the queue
788 // start to resolve until it reaches unresolved pointer or end of queue
790 struct QueueItem* head=getHead(val);
792 REntry* rentry=(REntry*)head->objectptr;
793 if(*(rentry->pointer)==0){
794 // encounters following unresolved pointer
795 table->unresolvedQueue=val;//released lock
798 removeItem(val,head);
799 if(ADDTABLEITEM(table, rentry)==READY){
800 SESEcommon* seseCommon=(SESEcommon*)rentry->seseRec;
801 atomic_sub_and_test(1, &(seseCommon->unresolvedDependencies));
804 table->unresolvedQueue=NULL; // set hashtable as normal-mode.
809 table->unresolvedQueue=val;//released lock;