3 #include "structdefs.h"
5 #include "checkpoint.h"
7 #include "SimpleHash.h"
8 #include "GenericHashtable.h"
9 #include <sys/select.h>
10 #include <sys/types.h>
17 #elif defined THREADSIMULATE
19 #include <sys/mman.h> // for mmap
20 #include <sys/types.h>
24 int offset_transObj = 0;
27 // use POSIX message queue
28 // for each core, its message queue named as
34 extern int injectfailures;
35 extern float failurechance;
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
47 struct RuntimeHash * forward;
48 struct RuntimeHash * reverse;
50 int corestatus[NUMCORES]; // records status of each core
53 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
54 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
63 struct thread_data thread_data_array[NUMCORES];
65 static pthread_key_t key;
66 static struct RuntimeHash* locktbl;
67 static pthread_rwlock_t rwlock_tbl;
68 static pthread_rwlock_t rwlock_init;
70 bool transStallMsg(int targetcore);
71 void transTerminateMsg(int targetcore);
73 bool getreadlock(void* ptr);
74 void releasereadlock(void* ptr);
75 bool getwritelock(void* ptr);
76 void releasewritelock(void* ptr);
78 int main(int argc, char **argv) {
83 pthread_t threads[NUMCORES];
86 // initialize three arrays and msg queue array
87 char * pathhead = "/msgqueue_";
88 int targetlen = strlen(pathhead);
89 for(i = 0; i < NUMCORES; ++i) {
92 numreceiveobjs[i] = 0;
97 corenumstr[0] = i + '0';
101 corenumstr[1] = i %10 + '0';
102 corenumstr[0] = (i / 10) + '0';
103 corenumstr[2] = '\0';
106 printf("Error: i >= 100\n");
110 char path[targetlen + sourcelen + 1];
111 strcpy(path, pathhead);
112 strncat(path, corenumstr, sourcelen);
113 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
114 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
116 mqd[i]= mq_open(path, oflags, omodes, NULL);
118 printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
121 printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
126 pthread_key_create(&key, NULL);
128 // create the lock table and initialize its mutex
129 locktbl = allocateRuntimeHash(20);
130 int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
131 printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
134 printf("Usage: <bin> <corenum>\n");
140 char * number = argv[1];
141 int len = strlen(number);
142 for(i = 0; i < len; ++i) {
143 cnum = (number[i] - '0') + cnum * 10;
146 for(i = 0; i < NUMCORES; ++i) {
147 /* if(STARTUPCORE == i) {
150 thread_data_array[i].corenum = i;
151 thread_data_array[i].argc = argc;
152 thread_data_array[i].argv = argv;
153 thread_data_array[i].numsendobjs = 0;
154 thread_data_array[i].numreceiveobjs = 0;
155 printf("[main] creating thread %d\n", i);
156 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
158 printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
164 /*// do stuff of startup core
165 thread_data_array[STARTUPCORE].corenum = STARTUPCORE;
166 thread_data_array[STARTUPCORE].argc = argc;// - 1;
167 thread_data_array[STARTUPCORE].argv = argv;//&argv[1];
168 thread_data_array[STARTUPCORE].numsendobjs = 0;
169 thread_data_array[STARTUPCORE].numreceiveobjs = 0;
170 run(&thread_data_array[STARTUPCORE]);*/
174 void run(void* arg) {
175 struct thread_data * my_tdata = (struct thread_data *)arg;
176 //corenum = my_tdata->corenum;
177 //void * ptr = malloc(sizeof(int));
178 //*((int*)ptr) = my_tdata->corenum;
179 pthread_setspecific(key, (void *)my_tdata->corenum);
180 int argc = my_tdata->argc;
181 char** argv = my_tdata->argv;
182 printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
188 GC_init(); // Initialize the garbage collector
194 initializeexithandler();
195 /* Create table for failed tasks */
196 failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
197 (int (*)(void *,void *)) &comparetpd);
198 /* Create queue of active tasks */
199 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
200 (int (*)(void *,void *)) &comparetpd);
202 /* Process task information */
205 /* Create startup object */
206 createstartupobject(argc, argv);
208 /* Start executing the tasks */
211 #ifdef THREADSIMULATE
214 // check if there are new objects coming
215 bool sendStall = false;
217 int numofcore = pthread_getspecific(key);
219 switch(receiveObject()) {
221 printf("[run, %d] receive an object\n", numofcore);
223 // received an object
224 // check if there are new active tasks can be executed
229 //printf("[run, %d] no msg\n", numofcore);
231 if(STARTUPCORE == numofcore) {
232 corestatus[numofcore] = 0;
233 // check the status of all cores
234 bool allStall = true;
235 for(i = 0; i < NUMCORES; ++i) {
236 if(corestatus[i] != 0) {
242 // check if the sum of send objs and receive obj are the same
244 // no->go on executing
246 for(i = 0; i < NUMCORES; ++i) {
247 sumsendobj += numsendobjs[i];
249 for(i = 0; i < NUMCORES; ++i) {
250 sumsendobj -= numreceiveobjs[i];
252 if(0 == sumsendobj) {
255 /* for(i = 0; i < NUMCORES; ++i) {
257 transTerminateMsg(i);
260 mq_close(mqd[corenum]);*/
263 struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
264 while(0 != RunhasNext(it_lock)) {
265 int key = Runkey(it_lock);
266 pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
267 int rc_des = pthread_rwlock_destroy(rwlock_obj);
268 printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
270 freeRuntimeHash(locktbl);
274 // destroy all message queues
275 char * pathhead = "/msgqueue_";
276 int targetlen = strlen(pathhead);
277 for(i = 0; i < NUMCORES; ++i) {
281 corenumstr[0] = i + '0';
282 corenumstr[1] = '\0';
285 corenumstr[1] = i %10 + '0';
286 corenumstr[0] = (i / 10) + '0';
287 corenumstr[2] = '\0';
290 printf("Error: i >= 100\n");
294 char path[targetlen + sourcelen + 1];
295 strcpy(path, pathhead);
296 strncat(path, corenumstr, sourcelen);
300 printf("[run, %d] terminate!\n", numofcore);
307 // send StallMsg to startup core
308 sendStall = transStallMsg(STARTUPCORE);
314 printf("[run, %d] receive a stall msg\n", numofcore);
315 // receive a Stall Msg, do nothing
316 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
321 printf("[run, %d] receive a terminate msg\n", numofcore);
322 // receive a terminate Msg
323 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
324 mq_close(mqd[corenum]);
330 printf("[run, %d] Error: invalid message type.\n", numofcore);
340 void createstartupobject(int argc, char ** argv) {
343 /* Allocate startup object */
345 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
346 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
348 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
349 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
351 /* Build array of strings */
352 startupobject->___parameters___=stringarray;
353 for(i=1;i<argc;i++) {
354 int length=strlen(argv[i]);
356 struct ___String___ *newstring=NewString(NULL, argv[i],length);
358 struct ___String___ *newstring=NewString(argv[i],length);
360 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
363 startupobject->isolate = 1;
364 startupobject->version = 0;
366 /* Set initialized flag for startup object */
367 flagorandinit(startupobject,1,0xFFFFFFFF);
368 enqueueObject(startupobject, NULL, 0);
369 //enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]);
372 int hashCodetpd(struct taskparamdescriptor *ftd) {
373 int hash=(int)ftd->task;
375 for(i=0;i<ftd->numParameters;i++){
376 hash^=(int)ftd->parameterArray[i];
381 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
383 if (ftd1->task!=ftd2->task)
385 for(i=0;i<ftd1->numParameters;i++)
386 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
391 /* This function sets a tag. */
393 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
395 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
397 struct ___Object___ * tagptr=obj->___tags___;
399 obj->___tags___=(struct ___Object___ *)tagd;
401 /* Have to check if it is already set */
402 if (tagptr->type==TAGTYPE) {
403 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
407 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
408 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
409 obj=(struct ___Object___ *)ptrarray[2];
410 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
411 td=(struct ___TagDescriptor___ *) obj->___tags___;
413 struct ArrayObject * ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
415 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
416 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
417 obj->___tags___=(struct ___Object___ *) ao;
418 ao->___cachedCode___=2;
422 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
423 for(i=0;i<ao->___cachedCode___;i++) {
424 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
428 if (ao->___cachedCode___<ao->___length___) {
429 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
430 ao->___cachedCode___++;
433 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
434 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
435 obj=(struct ___Object___ *)ptrarray[2];
436 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
437 ao=(struct ArrayObject *)obj->___tags___;
439 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
441 aonew->___cachedCode___=ao->___length___+1;
442 for(i=0;i<ao->___length___;i++) {
443 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
445 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
451 struct ___Object___ * tagset=tagd->flagptr;
454 } else if (tagset->type!=OBJECTARRAYTYPE) {
456 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
457 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
458 obj=(struct ___Object___ *)ptrarray[2];
459 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
461 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
463 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
464 ARRAYSET(ao, struct ___Object___ *, 1, obj);
465 ao->___cachedCode___=2;
466 tagd->flagptr=(struct ___Object___ *)ao;
468 struct ArrayObject *ao=(struct ArrayObject *) tagset;
469 if (ao->___cachedCode___<ao->___length___) {
470 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
474 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
475 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
476 obj=(struct ___Object___ *)ptrarray[2];
477 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
478 ao=(struct ArrayObject *)tagd->flagptr;
480 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
482 aonew->___cachedCode___=ao->___cachedCode___+1;
483 for(i=0;i<ao->___length___;i++) {
484 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
486 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
487 tagd->flagptr=(struct ___Object___ *) aonew;
493 /* This function clears a tag. */
495 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
497 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
499 /* We'll assume that tag is alway there.
500 Need to statically check for this of course. */
501 struct ___Object___ * tagptr=obj->___tags___;
503 if (tagptr->type==TAGTYPE) {
504 if ((struct ___TagDescriptor___ *)tagptr==tagd)
505 obj->___tags___=NULL;
507 printf("ERROR 1 in tagclear\n");
509 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
511 for(i=0;i<ao->___cachedCode___;i++) {
512 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
514 ao->___cachedCode___--;
515 if (i<ao->___cachedCode___)
516 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
517 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
518 if (ao->___cachedCode___==0)
519 obj->___tags___=NULL;
523 printf("ERROR 2 in tagclear\n");
527 struct ___Object___ *tagset=tagd->flagptr;
528 if (tagset->type!=OBJECTARRAYTYPE) {
532 printf("ERROR 3 in tagclear\n");
534 struct ArrayObject *ao=(struct ArrayObject *) tagset;
536 for(i=0;i<ao->___cachedCode___;i++) {
537 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
539 ao->___cachedCode___--;
540 if (i<ao->___cachedCode___)
541 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
542 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
543 if (ao->___cachedCode___==0)
548 printf("ERROR 4 in tagclear\n");
555 /* This function allocates a new tag. */
557 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
558 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
560 struct ___TagDescriptor___ * allocate_tag(int index) {
561 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
570 /* This function updates the flag for object ptr. It or's the flag
571 with the or mask and and's it with the andmask. */
573 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
575 int flagcomp(const int *val1, const int *val2) {
576 return (*val1)-(*val2);
579 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
581 int oldflag=((int *)ptr)[1];
582 int flag=ormask|oldflag;
584 flagbody(ptr, flag, queues, length, false);
588 bool intflagorand(void * ptr, int ormask, int andmask) {
590 int oldflag=((int *)ptr)[1];
591 int flag=ormask|oldflag;
593 if (flag==oldflag) /* Don't do anything */
596 flagbody(ptr, flag, NULL, 0, false);
602 void flagorandinit(void * ptr, int ormask, int andmask) {
603 int oldflag=((int *)ptr)[1];
604 int flag=ormask|oldflag;
606 flagbody(ptr,flag,NULL,0,true);
609 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
610 struct parameterwrapper * flagptr = NULL;
612 struct parameterwrapper ** queues = vqueues;
613 int length = vlength;
614 if((!isnew) && (queues == NULL)) {
615 #ifdef THREADSIMULATE
616 int numofcore = pthread_getspecific(key);
617 queues = objectqueues[numofcore][ptr->type];
618 length = numqueues[numofcore][ptr->type];
620 queues = objectqueues[corenum][ptr->type];
621 length = numqueues[corenum][ptr->type];
626 /*Remove object from all queues */
627 for(i = 0; i < length; ++i) {
632 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
633 ObjectHashremove(flagptr->objectset, (int)ptr);
634 if (enterflags!=NULL)
639 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
640 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
643 struct QueueItem *tmpptr;
644 struct parameterwrapper * parameter=NULL;
646 struct parameterwrapper ** queues = vqueues;
647 int length = vlength;
649 #ifdef THREADSIMULATE
650 int numofcore = pthread_getspecific(key);
651 queues = objectqueues[numofcore][ptr->type];
652 length = numqueues[numofcore][ptr->type];
654 queues = objectqueues[corenum][ptr->type];
655 length = numqueues[corenum][ptr->type];
659 struct parameterwrapper * prevptr=NULL;
660 struct ___Object___ *tagptr=ptr->___tags___;
662 /* Outer loop iterates through all parameter queues an object of
663 this type could be in. */
665 for(j = 0; j < length; ++j) {
666 parameter = queues[j];
668 if (parameter->numbertags>0) {
670 goto nextloop;//that means the object has no tag but that param needs tag
671 else if(tagptr->type==TAGTYPE) {//one tag
672 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
673 for(i=0;i<parameter->numbertags;i++) {
674 //slotid is parameter->tagarray[2*i];
675 int tagid=parameter->tagarray[2*i+1];
676 if (tagid!=tagptr->flag)
677 goto nextloop; /*We don't have this tag */
679 } else {//multiple tags
680 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
681 for(i=0;i<parameter->numbertags;i++) {
682 //slotid is parameter->tagarray[2*i];
683 int tagid=parameter->tagarray[2*i+1];
685 for(j=0;j<ao->___cachedCode___;j++) {
686 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
697 for(i=0;i<parameter->numberofterms;i++) {
698 int andmask=parameter->intarray[i*2];
699 int checkmask=parameter->intarray[i*2+1];
700 if ((ptr->flag&andmask)==checkmask) {
701 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
712 // transfer an object to targetcore
714 void transferObject(void * obj, int targetcore) {
715 int type=((int *)obj)[0];
716 assert(type < NUMCLASSES); // can only transfer normal object
717 int size=classsize[type];
721 #elif defined THREADSIMULATE
723 // use shared memory to transfer objects between cores
724 int fd = 0; // mapped file
726 char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
729 fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
730 offset = lseek(fd, 0, SEEK_CUR);
732 printf("fail to open file " + filepath + " in transferObject.\n");
736 lseek(fd, size + sizeof(int)*2, SEEK_CUR);
738 p_map = (void *)mmap(NULL,size+sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
740 memcpy(p_map, type, sizeof(int));
741 memcpy(p_map+sizeof(int), corenum, sizeof(int));
742 memcpy((p_map+sizeof(int)*2), obj, size);
743 munmap(p_map, size+sizeof(int)*2);
744 //printf( "umap ok \n" );
747 int numofcore = pthread_getspecific(key);
749 // use POSIX message queue to transfer objects between cores
753 if(targetcore < 10) {
754 corenumstr[0] = targetcore + '0';
755 corenumstr[1] = '\0';
757 } else if(targetcore < 100) {
758 corenumstr[1] = targetcore % 10 + '0';
759 corenumstr[0] = (targetcore / 10) + '0';
760 corenumstr[2] = '\0';
763 printf("Error: targetcore >= 100\n");
767 char * pathhead = "/msgqueue_";
768 int targetlen = strlen(pathhead);
769 char path[targetlen + sourcelen + 1];
770 strcpy(path, pathhead);
771 strncat(path, corenumstr, sourcelen);
772 int oflags = O_WRONLY|O_NONBLOCK;
773 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
774 mqdnum = mq_open(path, oflags, omodes, NULL);
776 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
780 struct ___Object___ * newobj = (struct ___Object___ *)obj;
781 if(0 == newobj->isolate) {
782 newobj = RUNMALLOC(size);
783 memcpy(newobj, obj, size);
784 newobj->original=obj;
788 ret=mq_send(mqdnum, (void *)newobj, size, 0); // send the object into the queue
790 printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
793 if(numofcore == STARTUPCORE) {
794 ++numsendobjs[numofcore];
796 ++(thread_data_array[numofcore].numsendobjs);
798 printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
802 // send terminate message to targetcore
804 bool transStallMsg(int targetcore) {
805 struct ___Object___ newobj;
806 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
810 newobj.flag = corenum;
811 newobj.___cachedHash___ = thread_data_array[corenum].numsendobjs;
812 newobj.___cachedCode___ = thread_data_array[corenum].numreceiveobjs;
814 #elif defined THREADSIMULATE
815 int numofcore = pthread_getspecific(key);
816 newobj.flag = numofcore;
817 newobj.___cachedHash___ = thread_data_array[numofcore].numsendobjs;
818 newobj.___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
820 // use shared memory to transfer objects between cores
821 int fd = 0; // mapped file
823 char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
826 fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
827 offset = lseek(fd, 0, SEEK_CUR);
829 printf("fail to open file " + filepath + " in transferObject.\n");
833 lseek(fd, sizeof(int)*2, SEEK_CUR);
835 p_map = (void *)mmap(NULL,sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
837 memcpy(p_map, type, sizeof(int));
838 memcpy(p_map+sizeof(int), corenum, sizeof(int));
839 munmap(p_map, sizeof(int)*2);
840 //printf( "umap ok \n" );
843 // use POSIX message queue to send stall msg to startup core
844 assert(targetcore == STARTUPCORE);
848 if(targetcore < 10) {
849 corenumstr[0] = targetcore + '0';
850 corenumstr[1] = '\0';
852 } else if(targetcore < 100) {
853 corenumstr[1] = targetcore % 10 + '0';
854 corenumstr[0] = (targetcore / 10) + '0';
855 corenumstr[2] = '\0';
858 printf("Error: targetcore >= 100\n");
862 char * pathhead = "/msgqueue_";
863 int targetlen = strlen(pathhead);
864 char path[targetlen + sourcelen + 1];
865 strcpy(path, pathhead);
866 strncat(path, corenumstr, sourcelen);
867 int oflags = O_WRONLY|O_NONBLOCK;
868 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
869 mqdnum = mq_open(path, oflags, omodes, NULL);
871 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
876 ret=mq_send(mqdnum, (void *)&newobj, sizeof(struct ___Object___), 0); // send the object into the queue
878 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
881 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
882 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj.flag, newobj.___cachedHash___, newobj.___cachedCode___);
888 // send terminate message to targetcore
890 void transTerminateMsg(int targetcore) {
891 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
896 #elif defined THREADSIMULATE
898 // use POSIX message queue to send stall msg to startup core
899 assert(targetcore != STARTUPCORE);
903 if(targetcore < 10) {
904 corenumstr[0] = targetcore + '0';
905 corenumstr[1] = '\0';
907 } else if(corenum < 100) {
908 corenumstr[1] = targetcore % 10 + '0';
909 corenumstr[0] = (targetcore / 10) + '0';
910 corenumstr[2] = '\0';
913 printf("Error: targetcore >= 100\n");
917 char * pathhead = "/msgqueue_";
918 int targetlen = strlen(pathhead);
919 char path[targetlen + sourcelen + 1];
920 strcpy(path, pathhead);
921 strncat(path, corenumstr, sourcelen);
922 int oflags = O_WRONLY|O_NONBLOCK;
923 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
924 mqdnum = mq_open(path, oflags, omodes, NULL);
926 printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
932 ret=mq_send(mqdnum, (void *)&type, sizeof(int), 0); // send the object into the queue
934 printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
940 // receive object transferred from other cores
941 // or the terminate message from other cores
942 // format: type [+ object]
943 // type: -1--stall msg
945 // return value: 0--received an object
946 // 1--received nothing
947 // 2--received a Stall Msg
948 int receiveObject() {
951 #elif defined THREADSIMULATE
953 char * filepath = "/scratch/transObj/file_" + corenum + ".txt";
957 int sourcecorenum = 0;
959 fd = open(filepath, O_CREAT|O_RDONLY, 00777);
960 lseek(fd, offset_transObj, SEEK_SET);
961 p_map = (void*)mmap(NULL,sizeof(int)*2,PROT_READ,MAP_SHARED,fd,offset_transObj);
963 sourcecorenum = *(int*)(p_map+sinzeof(int));
964 offset_transObj += sizeof(int)*2;
965 munmap(p_map,sizeof(int)*2);
967 // sourecorenum has terminated
971 size = classsize[type];
972 p_map = (void*)mmap(NULL,size,PROT_READ,MAP_SHARED,fd,offset_transObj);
973 struct ___Object___ * newobj=RUNMALLOC(size);
974 memcpy(newobj, p_map, size);
976 enqueueObject(newobj,NULL,0);
978 int numofcore = pthread_getspecific(key);
979 // use POSIX message queue to transfer object
981 struct mq_attr mqattr;
982 mq_getattr(mqd[numofcore], &mqattr);
983 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
984 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
990 //printf("msg: %s\n",msgptr);
991 if(((int*)msgptr)[0] == -1) {
993 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
994 int index = tmpptr->flag;
995 corestatus[index] = 0;
996 numsendobjs[index] = tmpptr->___cachedHash___;
997 numreceiveobjs[index] = tmpptr->___cachedCode___;
998 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
1001 } /*else if(((int*)msgptr)[0] == -2) {
1006 if(numofcore == STARTUPCORE) {
1007 ++(numreceiveobjs[numofcore]);
1009 ++(thread_data_array[numofcore].numreceiveobjs);
1011 struct ___Object___ * newobj=RUNMALLOC(msglen);
1012 memcpy(newobj, msgptr, msglen);
1014 enqueueObject(newobj, NULL, 0);
1020 bool getreadlock(void * ptr) {
1023 #elif defined THREADSIMULATE
1024 int numofcore = pthread_getspecific(key);
1026 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1027 printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1031 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1032 // no locks for this object
1033 // first time to operate on this shared object
1034 // create a lock for it
1035 rc = pthread_rwlock_unlock(&rwlock_tbl);
1036 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1037 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1038 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1039 rc = pthread_rwlock_init(rwlock, NULL);
1040 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1041 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1042 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1046 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1047 rc = pthread_rwlock_unlock(&rwlock_tbl);
1048 printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1050 //rc = pthread_rwlock_rdlock(&rwlock);
1051 rc = pthread_rwlock_tryrdlock(rwlock);
1052 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1059 pthread_rwlock_t* rwlock_obj = NULL;
1060 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1061 rc = pthread_rwlock_unlock(&rwlock_tbl);
1062 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1063 //int rc_obj = pthread_rwlock_rdlock(&rwlock_obj);
1064 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1065 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1066 if(EBUSY == rc_obj) {
1075 void releasereadlock(void * ptr) {
1078 #elif defined THREADSIMULATE
1079 int numofcore = pthread_getspecific(key);
1080 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1081 printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1082 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1083 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1086 pthread_rwlock_t* rwlock_obj = NULL;
1087 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1088 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1089 printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1090 rc = pthread_rwlock_unlock(&rwlock_tbl);
1091 printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1095 bool getwritelock(void * ptr) {
1098 #elif defined THREADSIMULATE
1099 int numofcore = pthread_getspecific(key);
1101 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1102 printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1106 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1107 // no locks for this object
1108 // first time to operate on this shared object
1109 // create a lock for it
1110 rc = pthread_rwlock_unlock(&rwlock_tbl);
1111 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1112 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1113 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1114 rc = pthread_rwlock_init(rwlock, NULL);
1115 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1116 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1117 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1121 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1122 rc = pthread_rwlock_unlock(&rwlock_tbl);
1123 printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1125 //rc = pthread_rwlock_wrlock(rwlock);
1126 rc = pthread_rwlock_trywrlock(rwlock);
1127 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1134 pthread_rwlock_t* rwlock_obj = NULL;
1135 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1136 rc = pthread_rwlock_unlock(&rwlock_tbl);
1137 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1138 //int rc_obj = pthread_rwlock_wrlock(rwlock_obj);
1139 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
1140 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1141 if(EBUSY == rc_obj) {
1151 void releasewritelock(void * ptr) {
1154 #elif defined THREADSIMULATE
1155 int numofcore = pthread_getspecific(key);
1156 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1157 printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1158 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1159 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1162 pthread_rwlock_t* rwlock_obj = NULL;
1163 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1164 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1165 printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1166 rc = pthread_rwlock_unlock(&rwlock_tbl);
1167 printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1171 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
1172 void * taskpointerarray[MAXTASKPARAMS];
1174 int numparams=parameter->task->numParameters;
1175 int numiterators=parameter->task->numTotal-1;
1180 struct taskdescriptor * task=parameter->task;
1182 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
1184 /* Add enqueued object to parameter vector */
1185 taskpointerarray[parameter->slot]=ptr;
1187 /* Reset iterators */
1188 for(j=0;j<numiterators;j++) {
1189 toiReset(¶meter->iterators[j]);
1192 /* Find initial state */
1193 for(j=0;j<numiterators;j++) {
1195 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
1196 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
1198 /* Need to backtrack */
1199 toiReset(¶meter->iterators[j]);
1203 /* Nothing to enqueue */
1210 /* Enqueue current state */
1212 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
1214 tpd->numParameters=numiterators+1;
1215 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
1216 for(j=0;j<=numiterators;j++){
1217 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
1220 if ((!gencontains(failedtasks, tpd)&&!gencontains(activetasks,tpd))) {
1221 genputtable(activetasks, tpd, tpd);
1223 RUNFREE(tpd->parameterArray);
1227 /* This loop iterates to the next parameter combination */
1228 if (numiterators==0)
1231 for(j=numiterators-1; j<numiterators;j++) {
1233 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
1234 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
1236 /* Need to backtrack */
1237 toiReset(¶meter->iterators[j]);
1241 /* Nothing more to enqueue */
1249 /* Handler for signals. The signals catch null pointer errors and
1250 arithmatic errors. */
1252 void myhandler(int sig, siginfo_t *info, void *uap) {
1255 printf("sig=%d\n",sig);
1258 sigemptyset(&toclear);
1259 sigaddset(&toclear, sig);
1260 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
1261 longjmp(error_handler,1);
1266 struct RuntimeHash *fdtoobject;
1268 void addreadfd(int fd) {
1271 FD_SET(fd, &readfds);
1274 void removereadfd(int fd) {
1275 FD_CLR(fd, &readfds);
1276 if (maxreadfd==(fd+1)) {
1278 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
1289 void executetasks() {
1290 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
1292 /* Set up signal handlers */
1293 struct sigaction sig;
1294 sig.sa_sigaction=&myhandler;
1295 sig.sa_flags=SA_SIGINFO;
1296 sigemptyset(&sig.sa_mask);
1298 /* Catch bus errors, segmentation faults, and floating point exceptions*/
1299 sigaction(SIGBUS,&sig,0);
1300 sigaction(SIGSEGV,&sig,0);
1301 sigaction(SIGFPE,&sig,0);
1302 sigaction(SIGPIPE,&sig,0);
1307 fdtoobject=allocateRuntimeHash(100);
1309 /* Map first block of memory to protected, anonymous page */
1310 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
1313 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
1315 /* Check if any filedescriptors have IO pending */
1318 struct timeval timeout={0,0};
1322 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
1324 /* Process ready fd's */
1326 for(fd=0;fd<maxreadfd;fd++) {
1327 if (FD_ISSET(fd, &tmpreadfds)) {
1328 /* Set ready flag on object */
1330 // printf("Setting fd %d\n",fd);
1331 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
1332 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
1333 enqueueObject(objptr, NULL, 0);
1341 /* See if there are any active tasks */
1342 if (hashsize(activetasks)>0) {
1344 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
1345 genfreekey(activetasks, currtpd);
1347 /* Check if this task has failed, allow a task that contains optional objects to fire */
1348 if (gencontains(failedtasks, currtpd)) {
1349 // Free up task parameter descriptor
1350 RUNFREE(currtpd->parameterArray);
1354 int numparams=currtpd->task->numParameters;
1355 int numtotal=currtpd->task->numTotal;
1357 int isolateflags[numparams];
1358 /* Make sure that the parameters are still in the queues */
1359 for(i=0;i<numparams;i++) {
1360 void * parameter=currtpd->parameterArray[i];
1361 struct ___Object___ * tmpparam = (struct ___Object___ *)parameter;
1362 if(0 == tmpparam->isolate) {
1363 isolateflags[i] = 0;
1364 // shared object, need to flush with current value
1365 //if(!getreadlock(tmpparam->original)) {
1366 // // fail to get read lock of the original object, try this task later
1367 if(!getwritelock(tmpparam->original)) {
1368 // fail to get write lock, release all obtained locks and try this task later
1370 for(j = 0; j < i; ++j) {
1371 if(0 == isolateflags[j]) {
1372 releasewritelock(taskpointerarray[j]);
1375 genputtable(activetasks, currtpd, currtpd);
1378 if(tmpparam->version != tmpparam->original->version) {
1379 // flush this object
1380 memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
1381 //releasereadlock(tmpparam->original);
1382 // fail to get write lock, release all obtained locks and try this task later
1384 for(j = 0; j < i; ++j) {
1385 if(0 == isolateflags[j]) {
1386 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
1389 releasewritelock(tmpparam->original);
1391 // some task on another core has changed this object
1392 // Free up task parameter descriptor
1393 RUNFREE(currtpd->parameterArray);
1395 // dequeue this object
1396 #ifdef THREADSIMULATE
1397 int numofcore = pthread_getspecific(key);
1398 struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
1399 int length = numqueues[numofcore][tmpparam->type];
1401 struct parameterwrapper ** queues = objectqueues[corenum][tmpparam->type];
1402 int length = numqueues[corenum][tmpparam->type];
1404 for(j = 0; j < length; ++j) {
1405 struct parameterwrapper * pw = queues[j];
1406 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
1408 int UNUSED, UNUSED2;
1410 ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1411 ObjectHashremove(pw->objectset, (int)tmpparam);
1412 if (enterflags!=NULL)
1416 // try to enqueue it again to check if it feeds other tasks;
1417 enqueueObject(tmpparam, NULL, 0);
1420 //releasereadlock(tmpparam->original);
1422 isolateflags[i] = 1;
1424 struct parameterdescriptor * pd=currtpd->task->descriptorarray[i];
1425 struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
1427 /* Check that object is still in queue */
1429 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
1430 RUNFREE(currtpd->parameterArray);
1437 /* Check that object still has necessary tags */
1438 for(j=0;j<pd->numbertags;j++) {
1439 int slotid=pd->tagarray[2*j]+numparams;
1440 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
1441 if (!containstag(parameter, tagd)) {
1442 RUNFREE(currtpd->parameterArray);
1448 taskpointerarray[i+OFFSET]=parameter;
1451 for(;i<numtotal;i++) {
1452 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
1455 for(i = 0; i < numparams; ++i) {
1456 if(0 == isolateflags[i]) {
1457 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
1458 // shared object, need to replace this copy with original one
1459 /*if(!getwritelock(tmpparam->original)) {
1460 // fail to get write lock, release all obtained locks and try this task later
1462 for(j = 0; j < i; ++j) {
1463 if(0 == isolateflags[j]) {
1464 releasewritelock(taskpointerarray[j]);
1467 genputtable(activetasks, tpd, tpd);
1470 if(tmpparam != tmpparam->original) {
1471 taskpointerarray[i+OFFSET] = tmpparam->original;
1477 /* Checkpoint the state */
1478 forward=allocateRuntimeHash(100);
1479 reverse=allocateRuntimeHash(100);
1480 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
1482 if (x=setjmp(error_handler)) {
1487 printf("Fatal Error=%d, Recovering!\n",x);
1490 genputtable(failedtasks,currtpd,currtpd);
1491 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
1493 freeRuntimeHash(forward);
1494 freeRuntimeHash(reverse);
1502 /*if (injectfailures) {
1503 if ((((double)random())/RAND_MAX)<failurechance) {
1504 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
1505 longjmp(error_handler,10);
1508 /* Actually call task */
1510 ((int *)taskpointerarray)[0]=currtpd->numParameters;
1511 taskpointerarray[1]=NULL;
1514 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1515 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1516 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1518 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1520 for(i = 0; i < numparams; ++i) {
1521 if(0 == isolateflags[i]) {
1522 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
1523 releasewritelock(tmpparam);
1527 freeRuntimeHash(forward);
1528 freeRuntimeHash(reverse);
1530 // Free up task parameter descriptor
1531 RUNFREE(currtpd->parameterArray);
1541 /* This function processes an objects tags */
1542 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
1545 for(i=0;i<pd->numbertags;i++) {
1546 int slotid=pd->tagarray[2*i];
1547 int tagid=pd->tagarray[2*i+1];
1549 if (statusarray[slotid+numparams]==0) {
1550 parameter->iterators[*iteratorcount].istag=1;
1551 parameter->iterators[*iteratorcount].tagid=tagid;
1552 parameter->iterators[*iteratorcount].slot=slotid+numparams;
1553 parameter->iterators[*iteratorcount].tagobjectslot=index;
1554 statusarray[slotid+numparams]=1;
1561 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
1564 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
1566 parameter->iterators[*iteratorcount].istag=0;
1567 parameter->iterators[*iteratorcount].slot=index;
1568 parameter->iterators[*iteratorcount].objectset=objectset;
1569 statusarray[index]=1;
1571 for(i=0;i<pd->numbertags;i++) {
1572 int slotid=pd->tagarray[2*i];
1573 int tagid=pd->tagarray[2*i+1];
1574 if (statusarray[slotid+numparams]!=0) {
1575 /* This tag has already been enqueued, use it to narrow search */
1576 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
1580 parameter->iterators[*iteratorcount].numtags=tagcount;
1585 /* This function builds the iterators for a task & parameter */
1587 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
1588 int statusarray[MAXTASKPARAMS];
1590 int numparams=task->numParameters;
1591 int iteratorcount=0;
1592 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
1594 statusarray[index]=1; /* Initial parameter */
1595 /* Process tags for initial iterator */
1597 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
1601 /* Check for objects with existing tags */
1602 for(i=0;i<numparams;i++) {
1603 if (statusarray[i]==0) {
1604 struct parameterdescriptor *pd=task->descriptorarray[i];
1606 for(j=0;j<pd->numbertags;j++) {
1607 int slotid=pd->tagarray[2*j];
1608 if(statusarray[slotid+numparams]!=0) {
1609 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1610 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1617 /* Next do objects w/ unbound tags*/
1619 for(i=0;i<numparams;i++) {
1620 if (statusarray[i]==0) {
1621 struct parameterdescriptor *pd=task->descriptorarray[i];
1622 if (pd->numbertags>0) {
1623 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1624 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1630 /* Nothing with a tag enqueued */
1632 for(i=0;i<numparams;i++) {
1633 if (statusarray[i]==0) {
1634 struct parameterdescriptor *pd=task->descriptorarray[i];
1635 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1636 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1649 #ifdef THREADSIMULATE
1650 int numofcore = pthread_getspecific(key);
1651 for(i=0;i<numtasks[numofcore];i++) {
1652 struct taskdescriptor * task=taskarray[numofcore][i];
1654 for(i=0;i<numtasks[corenum];i++) {
1655 struct taskdescriptor * task=taskarray[corenum][i];
1657 printf("%s\n", task->name);
1658 for(j=0;j<task->numParameters;j++) {
1659 struct parameterdescriptor *param=task->descriptorarray[j];
1660 struct parameterwrapper *parameter=param->queue;
1661 struct ObjectHash * set=parameter->objectset;
1662 struct ObjectIterator objit;
1663 printf(" Parameter %d\n", j);
1664 ObjectHashiterator(set, &objit);
1665 while(ObjhasNext(&objit)) {
1666 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
1667 struct ___Object___ * tagptr=obj->___tags___;
1668 int nonfailed=Objdata4(&objit);
1669 int numflags=Objdata3(&objit);
1670 int flags=Objdata2(&objit);
1672 printf(" Contains %lx\n", obj);
1673 printf(" flag=%d\n", obj->flag);
1675 } else if (tagptr->type==TAGTYPE) {
1676 printf(" tag=%lx\n",tagptr);
1679 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
1680 for(;tagindex<ao->___cachedCode___;tagindex++) {
1681 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
1690 /* This function processes the task information to create queues for
1691 each parameter type. */
1693 void processtasks() {
1695 #ifdef THREADSIMULATE
1696 int numofcore = pthread_getspecific(key);
1697 for(i=0;i<numtasks[numofcore];i++) {
1698 struct taskdescriptor *task=taskarray[numofcore][i];
1700 for(i=0;i<numtasks[corenum];i++) {
1701 struct taskdescriptor * task=taskarray[corenum][i];
1705 /* Build objectsets */
1706 for(j=0;j<task->numParameters;j++) {
1707 struct parameterdescriptor *param=task->descriptorarray[j];
1708 struct parameterwrapper *parameter=param->queue;
1709 parameter->objectset=allocateObjectHash(10);
1710 parameter->task=task;
1713 /* Build iterators for parameters */
1714 for(j=0;j<task->numParameters;j++) {
1715 struct parameterdescriptor *param=task->descriptorarray[j];
1716 struct parameterwrapper *parameter=param->queue;
1717 builditerators(task, j, parameter);
1722 void toiReset(struct tagobjectiterator * it) {
1725 } else if (it->numtags>0) {
1728 ObjectHashiterator(it->objectset, &it->it);
1732 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
1735 /* Get object with tags */
1736 struct ___Object___ *obj=objectarray[it->tagobjectslot];
1737 struct ___Object___ *tagptr=obj->___tags___;
1738 if (tagptr->type==TAGTYPE) {
1739 if ((it->tagobjindex==0)&& /* First object */
1740 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
1745 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1746 int tagindex=it->tagobjindex;
1747 for(;tagindex<ao->___cachedCode___;tagindex++) {
1748 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
1749 if (td->flag==it->tagid) {
1750 it->tagobjindex=tagindex; /* Found right type of tag */
1756 } else if (it->numtags>0) {
1757 /* Use tags to locate appropriate objects */
1758 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1759 struct ___Object___ *objptr=tag->flagptr;
1761 if (objptr->type!=OBJECTARRAYTYPE) {
1762 if (it->tagobjindex>0)
1764 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1766 for(i=1;i<it->numtags;i++) {
1767 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1768 if (!containstag(objptr,tag2))
1773 struct ArrayObject *ao=(struct ArrayObject *) objptr;
1776 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
1777 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
1778 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1780 for(i=1;i<it->numtags;i++) {
1781 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1782 if (!containstag(objptr,tag2))
1785 it->tagobjindex=tagindex;
1790 it->tagobjindex=tagindex;
1794 return ObjhasNext(&it->it);
1798 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
1800 struct ___Object___ * objptr=tag->flagptr;
1801 if (objptr->type==OBJECTARRAYTYPE) {
1802 struct ArrayObject *ao=(struct ArrayObject *)objptr;
1803 for(j=0;j<ao->___cachedCode___;j++) {
1804 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
1812 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
1813 /* hasNext has all of the intelligence */
1816 /* Get object with tags */
1817 struct ___Object___ *obj=objectarray[it->tagobjectslot];
1818 struct ___Object___ *tagptr=obj->___tags___;
1819 if (tagptr->type==TAGTYPE) {
1821 objectarray[it->slot]=tagptr;
1823 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1824 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
1826 } else if (it->numtags>0) {
1827 /* Use tags to locate appropriate objects */
1828 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1829 struct ___Object___ *objptr=tag->flagptr;
1830 if (objptr->type!=OBJECTARRAYTYPE) {
1832 objectarray[it->slot]=objptr;
1834 struct ArrayObject *ao=(struct ArrayObject *) objptr;
1835 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
1838 /* Iterate object */
1839 objectarray[it->slot]=(void *)Objkey(&it->it);