3 #include "structdefs.h"
5 #include "checkpoint.h"
7 #include "SimpleHash.h"
8 #include "GenericHashtable.h"
9 #include <sys/select.h>
10 #include <sys/types.h>
17 #elif defined THREADSIMULATE
19 #include <sys/mman.h> // for mmap
20 #include <sys/types.h>
24 int offset_transObj = 0;
27 // use POSIX message queue
28 // for each core, its message queue named as
34 extern int injectfailures;
35 extern float failurechance;
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
47 struct RuntimeHash * forward;
48 struct RuntimeHash * reverse;
50 int corestatus[NUMCORES]; // records status of each core
53 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
54 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
63 struct thread_data thread_data_array[NUMCORES];
65 static pthread_key_t key;
66 bool transStallMsg(int targetcore);
67 void transTerminateMsg(int targetcore);
71 int main(int argc, char **argv) {
75 pthread_t threads[NUMCORES];
78 // initialize three arrays and msg queue array
79 char * pathhead = "/msgqueue_";
80 int targetlen = strlen(pathhead);
81 for(i = 0; i < NUMCORES; ++i) {
84 numreceiveobjs[i] = 0;
89 corenumstr[0] = i + '0';
93 corenumstr[1] = i %10 + '0';
94 corenumstr[0] = (i / 10) + '0';
98 printf("Error: i >= 100\n");
102 char path[targetlen + sourcelen + 1];
103 strcpy(path, pathhead);
104 strncat(path, corenumstr, sourcelen);
105 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
106 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
108 mqd[i]= mq_open(path, oflags, omodes, NULL);
112 pthread_key_create(&key, NULL);
115 printf("Usage: <bin> <corenum>\n");
121 char * number = argv[1];
122 int len = strlen(number);
123 for(i = 0; i < len; ++i) {
124 cnum = (number[i] - '0') + cnum * 10;
127 for(i = 0; i < NUMCORES; ++i) {
128 /* if(STARTUPCORE == i) {
131 thread_data_array[i].corenum = i;
132 thread_data_array[i].argc = argc;
133 thread_data_array[i].argv = argv;
134 thread_data_array[i].numsendobjs = 0;
135 thread_data_array[i].numreceiveobjs = 0;
136 printf("In main: creating thread %d\n", i);
137 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
139 printf("ERROR; return code from pthread_create() is %d\n", rc[i]);
145 /*// do stuff of startup core
146 thread_data_array[STARTUPCORE].corenum = STARTUPCORE;
147 thread_data_array[STARTUPCORE].argc = argc;// - 1;
148 thread_data_array[STARTUPCORE].argv = argv;//&argv[1];
149 thread_data_array[STARTUPCORE].numsendobjs = 0;
150 thread_data_array[STARTUPCORE].numreceiveobjs = 0;
151 run(&thread_data_array[STARTUPCORE]);*/
155 void run(void* arg) {
156 struct thread_data * my_tdata = (struct thread_data *)arg;
157 //corenum = my_tdata->corenum;
158 //void * ptr = malloc(sizeof(int));
159 //*((int*)ptr) = my_tdata->corenum;
160 pthread_setspecific(key, (void *)my_tdata->corenum);
161 int argc = my_tdata->argc;
162 char** argv = my_tdata->argv;
163 printf("Thread %d runs: %x\n", my_tdata->corenum, (int)pthread_self());
169 GC_init(); // Initialize the garbage collector
175 initializeexithandler();
176 /* Create table for failed tasks */
177 failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
178 (int (*)(void *,void *)) &comparetpd);
179 /* Create queue of active tasks */
180 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
181 (int (*)(void *,void *)) &comparetpd);
183 /* Process task information */
186 /* Create startup object */
187 createstartupobject(argc, argv);
189 /* Start executing the tasks */
192 #ifdef THREADSIMULATE
195 // check if there are new objects coming
196 bool sendStall = false;
198 int numofcore = pthread_getspecific(key);
200 switch(receiveObject()) {
202 printf("[run] receive an object\n");
204 // received an object
205 // check if there are new active tasks can be executed
210 //printf("[run] no msg\n");
212 if(STARTUPCORE == numofcore) {
213 corestatus[numofcore] = 0;
214 // check the status of all cores
215 bool allStall = true;
216 for(i = 0; i < NUMCORES; ++i) {
217 if(corestatus[i] != 0) {
223 // check if the sum of send objs and receive obj are the same
225 // no->go on executing
227 for(i = 0; i < NUMCORES; ++i) {
228 sumsendobj += numsendobjs[i];
230 for(i = 0; i < NUMCORES; ++i) {
231 sumsendobj -= numreceiveobjs[i];
233 if(0 == sumsendobj) {
236 /* for(i = 0; i < NUMCORES; ++i) {
238 transTerminateMsg(i);
241 mq_close(mqd[corenum]);*/
242 printf("[run] terminate!\n");
249 // send StallMsg to startup core
250 sendStall = transStallMsg(STARTUPCORE);
256 printf("[run] receive a stall msg\n");
257 // receive a Stall Msg, do nothing
258 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
263 printf("[run] receive a terminate msg\n");
264 // receive a terminate Msg
265 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
266 mq_close(mqd[corenum]);
272 printf("Error: invalid message type.\n");
282 void createstartupobject(int argc, char ** argv) {
285 /* Allocate startup object */
287 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
288 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
290 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
291 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
293 /* Build array of strings */
294 startupobject->___parameters___=stringarray;
295 for(i=1;i<argc;i++) {
296 int length=strlen(argv[i]);
298 struct ___String___ *newstring=NewString(NULL, argv[i],length);
300 struct ___String___ *newstring=NewString(argv[i],length);
302 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
305 /* Set initialized flag for startup object */
306 flagorandinit(startupobject,1,0xFFFFFFFF);
307 enqueueObject(startupobject, NULL, 0);
308 //enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]);
311 int hashCodetpd(struct taskparamdescriptor *ftd) {
312 int hash=(int)ftd->task;
314 for(i=0;i<ftd->numParameters;i++){
315 hash^=(int)ftd->parameterArray[i];
320 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
322 if (ftd1->task!=ftd2->task)
324 for(i=0;i<ftd1->numParameters;i++)
325 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
330 /* This function sets a tag. */
332 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
334 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
336 struct ___Object___ * tagptr=obj->___tags___;
338 obj->___tags___=(struct ___Object___ *)tagd;
340 /* Have to check if it is already set */
341 if (tagptr->type==TAGTYPE) {
342 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
346 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
347 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
348 obj=(struct ___Object___ *)ptrarray[2];
349 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
350 td=(struct ___TagDescriptor___ *) obj->___tags___;
352 struct ArrayObject * ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
354 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
355 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
356 obj->___tags___=(struct ___Object___ *) ao;
357 ao->___cachedCode___=2;
361 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
362 for(i=0;i<ao->___cachedCode___;i++) {
363 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
367 if (ao->___cachedCode___<ao->___length___) {
368 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
369 ao->___cachedCode___++;
372 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
373 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
374 obj=(struct ___Object___ *)ptrarray[2];
375 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
376 ao=(struct ArrayObject *)obj->___tags___;
378 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
380 aonew->___cachedCode___=ao->___length___+1;
381 for(i=0;i<ao->___length___;i++) {
382 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
384 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
390 struct ___Object___ * tagset=tagd->flagptr;
393 } else if (tagset->type!=OBJECTARRAYTYPE) {
395 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
396 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
397 obj=(struct ___Object___ *)ptrarray[2];
398 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
400 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
402 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
403 ARRAYSET(ao, struct ___Object___ *, 1, obj);
404 ao->___cachedCode___=2;
405 tagd->flagptr=(struct ___Object___ *)ao;
407 struct ArrayObject *ao=(struct ArrayObject *) tagset;
408 if (ao->___cachedCode___<ao->___length___) {
409 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
413 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
414 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
415 obj=(struct ___Object___ *)ptrarray[2];
416 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
417 ao=(struct ArrayObject *)tagd->flagptr;
419 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
421 aonew->___cachedCode___=ao->___cachedCode___+1;
422 for(i=0;i<ao->___length___;i++) {
423 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
425 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
426 tagd->flagptr=(struct ___Object___ *) aonew;
432 /* This function clears a tag. */
434 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
436 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
438 /* We'll assume that tag is alway there.
439 Need to statically check for this of course. */
440 struct ___Object___ * tagptr=obj->___tags___;
442 if (tagptr->type==TAGTYPE) {
443 if ((struct ___TagDescriptor___ *)tagptr==tagd)
444 obj->___tags___=NULL;
446 printf("ERROR 1 in tagclear\n");
448 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
450 for(i=0;i<ao->___cachedCode___;i++) {
451 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
453 ao->___cachedCode___--;
454 if (i<ao->___cachedCode___)
455 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
456 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
457 if (ao->___cachedCode___==0)
458 obj->___tags___=NULL;
462 printf("ERROR 2 in tagclear\n");
466 struct ___Object___ *tagset=tagd->flagptr;
467 if (tagset->type!=OBJECTARRAYTYPE) {
471 printf("ERROR 3 in tagclear\n");
473 struct ArrayObject *ao=(struct ArrayObject *) tagset;
475 for(i=0;i<ao->___cachedCode___;i++) {
476 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
478 ao->___cachedCode___--;
479 if (i<ao->___cachedCode___)
480 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
481 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
482 if (ao->___cachedCode___==0)
487 printf("ERROR 4 in tagclear\n");
494 /* This function allocates a new tag. */
496 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
497 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
499 struct ___TagDescriptor___ * allocate_tag(int index) {
500 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
509 /* This function updates the flag for object ptr. It or's the flag
510 with the or mask and and's it with the andmask. */
512 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
514 int flagcomp(const int *val1, const int *val2) {
515 return (*val1)-(*val2);
518 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
520 int oldflag=((int *)ptr)[1];
521 int flag=ormask|oldflag;
523 flagbody(ptr, flag, queues, length, false);
527 bool intflagorand(void * ptr, int ormask, int andmask) {
529 int oldflag=((int *)ptr)[1];
530 int flag=ormask|oldflag;
532 if (flag==oldflag) /* Don't do anything */
535 flagbody(ptr, flag, NULL, 0, false);
541 void flagorandinit(void * ptr, int ormask, int andmask) {
542 int oldflag=((int *)ptr)[1];
543 int flag=ormask|oldflag;
545 flagbody(ptr,flag,NULL,0,true);
548 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
549 struct parameterwrapper * flagptr = NULL;
551 struct parameterwrapper ** queues = vqueues;
552 int length = vlength;
553 if((!isnew) && (queues == NULL)) {
554 #ifdef THREADSIMULATE
555 int numofcore = pthread_getspecific(key);
556 queues = objectqueues[numofcore][ptr->type];
557 length = numqueues[numofcore][ptr->type];
559 queues = objectqueues[corenum][ptr->type];
560 length = numqueues[corenum][ptr->type];
565 /*Remove object from all queues */
566 for(i = 0; i < length; ++i) {
571 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
572 ObjectHashremove(flagptr->objectset, (int)ptr);
573 if (enterflags!=NULL)
578 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
579 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
582 struct QueueItem *tmpptr;
583 struct parameterwrapper * parameter=NULL;
585 struct parameterwrapper ** queues = vqueues;
586 int length = vlength;
588 #ifdef THREADSIMULATE
589 int numofcore = pthread_getspecific(key);
590 queues = objectqueues[numofcore][ptr->type];
591 length = numqueues[numofcore][ptr->type];
593 queues = objectqueues[corenum][ptr->type];
594 length = numqueues[corenum][ptr->type];
598 struct parameterwrapper * prevptr=NULL;
599 struct ___Object___ *tagptr=ptr->___tags___;
601 /* Outer loop iterates through all parameter queues an object of
602 this type could be in. */
604 for(j = 0; j < length; ++j) {
605 parameter = queues[j];
607 if (parameter->numbertags>0) {
609 goto nextloop;//that means the object has no tag but that param needs tag
610 else if(tagptr->type==TAGTYPE) {//one tag
611 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
612 for(i=0;i<parameter->numbertags;i++) {
613 //slotid is parameter->tagarray[2*i];
614 int tagid=parameter->tagarray[2*i+1];
615 if (tagid!=tagptr->flag)
616 goto nextloop; /*We don't have this tag */
618 } else {//multiple tags
619 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
620 for(i=0;i<parameter->numbertags;i++) {
621 //slotid is parameter->tagarray[2*i];
622 int tagid=parameter->tagarray[2*i+1];
624 for(j=0;j<ao->___cachedCode___;j++) {
625 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
636 for(i=0;i<parameter->numberofterms;i++) {
637 int andmask=parameter->intarray[i*2];
638 int checkmask=parameter->intarray[i*2+1];
639 if ((ptr->flag&andmask)==checkmask) {
640 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
651 // transfer an object to targetcore
653 void transferObject(void * obj, int targetcore) {
654 int type=((int *)obj)[0];
655 assert(type < NUMCLASSES); // can only transfer normal object
656 int size=classsize[type];
660 #elif defined THREADSIMULATE
662 // use shared memory to transfer objects between cores
663 int fd = 0; // mapped file
665 char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
668 fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
669 offset = lseek(fd, 0, SEEK_CUR);
671 printf("fail to open file " + filepath + " in transferObject.\n");
675 lseek(fd, size + sizeof(int)*2, SEEK_CUR);
677 p_map = (void *)mmap(NULL,size+sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
679 memcpy(p_map, type, sizeof(int));
680 memcpy(p_map+sizeof(int), corenum, sizeof(int));
681 memcpy((p_map+sizeof(int)*2), obj, size);
682 munmap(p_map, size+sizeof(int)*2);
683 //printf( "umap ok \n" );
686 // use POSIX message queue to transfer objects between cores
690 if(targetcore < 10) {
691 corenumstr[0] = targetcore + '0';
692 corenumstr[1] = '\0';
694 } else if(targetcore < 100) {
695 corenumstr[1] = targetcore % 10 + '0';
696 corenumstr[0] = (targetcore / 10) + '0';
697 corenumstr[2] = '\0';
700 printf("Error: targetcore >= 100\n");
704 char * pathhead = "/msgqueue_";
705 int targetlen = strlen(pathhead);
706 char path[targetlen + sourcelen + 1];
707 strcpy(path, pathhead);
708 strncat(path, corenumstr, sourcelen);
709 int oflags = O_WRONLY|O_CREAT|O_NONBLOCK;
710 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
711 mqdnum = mq_open(path, oflags, omodes, NULL);
713 printf("[transferObject] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
719 ret=mq_send(mqdnum, obj, size, 0); // send the object into the queue
721 printf("[transferObject] mq_send returned: %d, error: %s\n", ret, strerror(errno));
724 int numofcore = pthread_getspecific(key);
725 if(numofcore == STARTUPCORE) {
726 ++numsendobjs[numofcore];
728 ++(thread_data_array[numofcore].numsendobjs);
730 printf("[transferObject] mq_send returned: $%x\n",ret);
734 // send terminate message to targetcore
736 bool transStallMsg(int targetcore) {
737 struct ___Object___ newobj;
738 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
742 newobj.flag = corenum;
743 newobj.___cachedHash___ = thread_data_array[corenum].numsendobjs;
744 newobj.___cachedCode___ = thread_data_array[corenum].numreceiveobjs;
746 #elif defined THREADSIMULATE
747 int numofcore = pthread_getspecific(key);
748 newobj.flag = numofcore;
749 newobj.___cachedHash___ = thread_data_array[numofcore].numsendobjs;
750 newobj.___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
752 // use shared memory to transfer objects between cores
753 int fd = 0; // mapped file
755 char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
758 fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
759 offset = lseek(fd, 0, SEEK_CUR);
761 printf("fail to open file " + filepath + " in transferObject.\n");
765 lseek(fd, sizeof(int)*2, SEEK_CUR);
767 p_map = (void *)mmap(NULL,sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
769 memcpy(p_map, type, sizeof(int));
770 memcpy(p_map+sizeof(int), corenum, sizeof(int));
771 munmap(p_map, sizeof(int)*2);
772 //printf( "umap ok \n" );
775 // use POSIX message queue to send stall msg to startup core
776 assert(targetcore == STARTUPCORE);
780 if(targetcore < 10) {
781 corenumstr[0] = targetcore + '0';
782 corenumstr[1] = '\0';
784 } else if(targetcore < 100) {
785 corenumstr[1] = targetcore % 10 + '0';
786 corenumstr[0] = (targetcore / 10) + '0';
787 corenumstr[2] = '\0';
790 printf("Error: targetcore >= 100\n");
794 char * pathhead = "/msgqueue_";
795 int targetlen = strlen(pathhead);
796 char path[targetlen + sourcelen + 1];
797 strcpy(path, pathhead);
798 strncat(path, corenumstr, sourcelen);
799 int oflags = O_WRONLY|O_CREAT|O_NONBLOCK;
800 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
801 mqdnum = mq_open(path, oflags, omodes, NULL);
803 printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
808 ret=mq_send(mqdnum, (void *)&newobj, sizeof(struct ___Object___), 0); // send the object into the queue
810 printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
813 printf("[transStallMsg] mq_send returned: $%x\n", ret);
814 printf("index: %d, sendobjs: %d, receiveobjs: %d\n", newobj.flag, newobj.___cachedHash___, newobj.___cachedCode___);
820 // send terminate message to targetcore
822 void transTerminateMsg(int targetcore) {
823 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
828 #elif defined THREADSIMULATE
830 // use POSIX message queue to send stall msg to startup core
831 assert(targetcore != STARTUPCORE);
835 if(targetcore < 10) {
836 corenumstr[0] = targetcore + '0';
837 corenumstr[1] = '\0';
839 } else if(corenum < 100) {
840 corenumstr[1] = targetcore % 10 + '0';
841 corenumstr[0] = (targetcore / 10) + '0';
842 corenumstr[2] = '\0';
845 printf("Error: targetcore >= 100\n");
849 char * pathhead = "/msgqueue_";
850 int targetlen = strlen(pathhead);
851 char path[targetlen + sourcelen + 1];
852 strcpy(path, pathhead);
853 strncat(path, corenumstr, sourcelen);
854 int oflags = O_WRONLY|O_CREAT|O_NONBLOCK;
855 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
856 mqdnum = mq_open(path, oflags, omodes, NULL);
858 printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
864 ret=mq_send(mqdnum, (void *)&type, sizeof(int), 0); // send the object into the queue
866 printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
872 // receive object transferred from other cores
873 // or the terminate message from other cores
874 // format: type [+ object]
875 // type: -1--stall msg
877 // return value: 0--received an object
878 // 1--received nothing
879 // 2--received a Stall Msg
880 int receiveObject() {
883 #elif defined THREADSIMULATE
885 char * filepath = "/scratch/transObj/file_" + corenum + ".txt";
889 int sourcecorenum = 0;
891 fd = open(filepath, O_CREAT|O_RDONLY, 00777);
892 lseek(fd, offset_transObj, SEEK_SET);
893 p_map = (void*)mmap(NULL,sizeof(int)*2,PROT_READ,MAP_SHARED,fd,offset_transObj);
895 sourcecorenum = *(int*)(p_map+sinzeof(int));
896 offset_transObj += sizeof(int)*2;
897 munmap(p_map,sizeof(int)*2);
899 // sourecorenum has terminated
903 size = classsize[type];
904 p_map = (void*)mmap(NULL,size,PROT_READ,MAP_SHARED,fd,offset_transObj);
905 struct ___Object___ * newobj=RUNMALLOC(size);
906 memcpy(newobj, p_map, size);
908 enqueueObject(newobj,NULL,0);
910 int numofcore = pthread_getspecific(key);
911 // use POSIX message queue to transfer object
913 struct mq_attr mqattr;
914 mq_getattr(mqd[numofcore], &mqattr);
915 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
916 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
922 //printf("msg: %s\n",msgptr);
923 if(((int*)msgptr)[0] == -1) {
925 int* tmpptr = (int*)msgptr;
926 int index = tmpptr[1];
927 corestatus[index] = 0;
928 numsendobjs[index] = tmpptr[2];
929 numreceiveobjs[index] = ((int *)(msgptr + sizeof(int) * 3 + sizeof(void *)))[0];
930 printf("index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
933 } /*else if(((int*)msgptr)[0] == -2) {
938 if(numofcore == STARTUPCORE) {
939 ++(numreceiveobjs[numofcore]);
941 ++(thread_data_array[numofcore].numreceiveobjs);
943 struct ___Object___ * newobj=RUNMALLOC(msglen);
944 memcpy(newobj, msgptr, msglen);
946 enqueueObject(newobj, NULL, 0);
952 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
953 void * taskpointerarray[MAXTASKPARAMS];
955 int numparams=parameter->task->numParameters;
956 int numiterators=parameter->task->numTotal-1;
961 struct taskdescriptor * task=parameter->task;
963 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
965 /* Add enqueued object to parameter vector */
966 taskpointerarray[parameter->slot]=ptr;
968 /* Reset iterators */
969 for(j=0;j<numiterators;j++) {
970 toiReset(¶meter->iterators[j]);
973 /* Find initial state */
974 for(j=0;j<numiterators;j++) {
976 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
977 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
979 /* Need to backtrack */
980 toiReset(¶meter->iterators[j]);
984 /* Nothing to enqueue */
991 /* Enqueue current state */
993 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
995 tpd->numParameters=numiterators+1;
996 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
997 for(j=0;j<=numiterators;j++){
998 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
1001 if ((!gencontains(failedtasks, tpd)&&!gencontains(activetasks,tpd))) {
1002 genputtable(activetasks, tpd, tpd);
1004 RUNFREE(tpd->parameterArray);
1008 /* This loop iterates to the next parameter combination */
1009 if (numiterators==0)
1012 for(j=numiterators-1; j<numiterators;j++) {
1014 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
1015 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
1017 /* Need to backtrack */
1018 toiReset(¶meter->iterators[j]);
1022 /* Nothing more to enqueue */
1030 /* Handler for signals. The signals catch null pointer errors and
1031 arithmatic errors. */
1033 void myhandler(int sig, siginfo_t *info, void *uap) {
1036 printf("sig=%d\n",sig);
1039 sigemptyset(&toclear);
1040 sigaddset(&toclear, sig);
1041 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
1042 longjmp(error_handler,1);
1047 struct RuntimeHash *fdtoobject;
1049 void addreadfd(int fd) {
1052 FD_SET(fd, &readfds);
1055 void removereadfd(int fd) {
1056 FD_CLR(fd, &readfds);
1057 if (maxreadfd==(fd+1)) {
1059 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
1070 void executetasks() {
1071 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
1073 /* Set up signal handlers */
1074 struct sigaction sig;
1075 sig.sa_sigaction=&myhandler;
1076 sig.sa_flags=SA_SIGINFO;
1077 sigemptyset(&sig.sa_mask);
1079 /* Catch bus errors, segmentation faults, and floating point exceptions*/
1080 sigaction(SIGBUS,&sig,0);
1081 sigaction(SIGSEGV,&sig,0);
1082 sigaction(SIGFPE,&sig,0);
1083 sigaction(SIGPIPE,&sig,0);
1088 fdtoobject=allocateRuntimeHash(100);
1090 /* Map first block of memory to protected, anonymous page */
1091 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
1094 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
1096 /* Check if any filedescriptors have IO pending */
1099 struct timeval timeout={0,0};
1103 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
1105 /* Process ready fd's */
1107 for(fd=0;fd<maxreadfd;fd++) {
1108 if (FD_ISSET(fd, &tmpreadfds)) {
1109 /* Set ready flag on object */
1111 // printf("Setting fd %d\n",fd);
1112 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
1113 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
1114 enqueueObject(objptr, NULL, 0);
1122 /* See if there are any active tasks */
1123 if (hashsize(activetasks)>0) {
1125 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
1126 genfreekey(activetasks, currtpd);
1128 /* Check if this task has failed, allow a task that contains optional objects to fire */
1129 if (gencontains(failedtasks, currtpd)) {
1130 // Free up task parameter descriptor
1131 RUNFREE(currtpd->parameterArray);
1135 int numparams=currtpd->task->numParameters;
1136 int numtotal=currtpd->task->numTotal;
1138 /* Make sure that the parameters are still in the queues */
1139 for(i=0;i<numparams;i++) {
1140 void * parameter=currtpd->parameterArray[i];
1141 struct parameterdescriptor * pd=currtpd->task->descriptorarray[i];
1142 struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
1144 /* Check that object is still in queue */
1146 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
1147 RUNFREE(currtpd->parameterArray);
1154 /* Check that object still has necessary tags */
1155 for(j=0;j<pd->numbertags;j++) {
1156 int slotid=pd->tagarray[2*j]+numparams;
1157 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
1158 if (!containstag(parameter, tagd)) {
1159 RUNFREE(currtpd->parameterArray);
1165 taskpointerarray[i+OFFSET]=parameter;
1168 for(;i<numtotal;i++) {
1169 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
1173 /* Checkpoint the state */
1174 forward=allocateRuntimeHash(100);
1175 reverse=allocateRuntimeHash(100);
1176 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
1178 if (x=setjmp(error_handler)) {
1183 printf("Fatal Error=%d, Recovering!\n",x);
1186 genputtable(failedtasks,currtpd,currtpd);
1187 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
1189 freeRuntimeHash(forward);
1190 freeRuntimeHash(reverse);
1198 /*if (injectfailures) {
1199 if ((((double)random())/RAND_MAX)<failurechance) {
1200 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
1201 longjmp(error_handler,10);
1204 /* Actually call task */
1206 ((int *)taskpointerarray)[0]=currtpd->numParameters;
1207 taskpointerarray[1]=NULL;
1210 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1211 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1212 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1214 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1216 freeRuntimeHash(forward);
1217 freeRuntimeHash(reverse);
1219 // Free up task parameter descriptor
1220 RUNFREE(currtpd->parameterArray);
1230 /* This function processes an objects tags */
1231 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
1234 for(i=0;i<pd->numbertags;i++) {
1235 int slotid=pd->tagarray[2*i];
1236 int tagid=pd->tagarray[2*i+1];
1238 if (statusarray[slotid+numparams]==0) {
1239 parameter->iterators[*iteratorcount].istag=1;
1240 parameter->iterators[*iteratorcount].tagid=tagid;
1241 parameter->iterators[*iteratorcount].slot=slotid+numparams;
1242 parameter->iterators[*iteratorcount].tagobjectslot=index;
1243 statusarray[slotid+numparams]=1;
1250 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
1253 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
1255 parameter->iterators[*iteratorcount].istag=0;
1256 parameter->iterators[*iteratorcount].slot=index;
1257 parameter->iterators[*iteratorcount].objectset=objectset;
1258 statusarray[index]=1;
1260 for(i=0;i<pd->numbertags;i++) {
1261 int slotid=pd->tagarray[2*i];
1262 int tagid=pd->tagarray[2*i+1];
1263 if (statusarray[slotid+numparams]!=0) {
1264 /* This tag has already been enqueued, use it to narrow search */
1265 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
1269 parameter->iterators[*iteratorcount].numtags=tagcount;
1274 /* This function builds the iterators for a task & parameter */
1276 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
1277 int statusarray[MAXTASKPARAMS];
1279 int numparams=task->numParameters;
1280 int iteratorcount=0;
1281 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
1283 statusarray[index]=1; /* Initial parameter */
1284 /* Process tags for initial iterator */
1286 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
1290 /* Check for objects with existing tags */
1291 for(i=0;i<numparams;i++) {
1292 if (statusarray[i]==0) {
1293 struct parameterdescriptor *pd=task->descriptorarray[i];
1295 for(j=0;j<pd->numbertags;j++) {
1296 int slotid=pd->tagarray[2*j];
1297 if(statusarray[slotid+numparams]!=0) {
1298 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1299 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1306 /* Next do objects w/ unbound tags*/
1308 for(i=0;i<numparams;i++) {
1309 if (statusarray[i]==0) {
1310 struct parameterdescriptor *pd=task->descriptorarray[i];
1311 if (pd->numbertags>0) {
1312 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1313 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1319 /* Nothing with a tag enqueued */
1321 for(i=0;i<numparams;i++) {
1322 if (statusarray[i]==0) {
1323 struct parameterdescriptor *pd=task->descriptorarray[i];
1324 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1325 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1338 #ifdef THREADSIMULATE
1339 int numofcore = pthread_getspecific(key);
1340 for(i=0;i<numtasks[numofcore];i++) {
1341 struct taskdescriptor * task=taskarray[numofcore][i];
1343 for(i=0;i<numtasks[corenum];i++) {
1344 struct taskdescriptor * task=taskarray[corenum][i];
1346 printf("%s\n", task->name);
1347 for(j=0;j<task->numParameters;j++) {
1348 struct parameterdescriptor *param=task->descriptorarray[j];
1349 struct parameterwrapper *parameter=param->queue;
1350 struct ObjectHash * set=parameter->objectset;
1351 struct ObjectIterator objit;
1352 printf(" Parameter %d\n", j);
1353 ObjectHashiterator(set, &objit);
1354 while(ObjhasNext(&objit)) {
1355 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
1356 struct ___Object___ * tagptr=obj->___tags___;
1357 int nonfailed=Objdata4(&objit);
1358 int numflags=Objdata3(&objit);
1359 int flags=Objdata2(&objit);
1361 printf(" Contains %lx\n", obj);
1362 printf(" flag=%d\n", obj->flag);
1364 } else if (tagptr->type==TAGTYPE) {
1365 printf(" tag=%lx\n",tagptr);
1368 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
1369 for(;tagindex<ao->___cachedCode___;tagindex++) {
1370 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
1379 /* This function processes the task information to create queues for
1380 each parameter type. */
1382 void processtasks() {
1384 #ifdef THREADSIMULATE
1385 int numofcore = pthread_getspecific(key);
1386 for(i=0;i<numtasks[numofcore];i++) {
1387 struct taskdescriptor *task=taskarray[numofcore][i];
1389 for(i=0;i<numtasks[corenum];i++) {
1390 struct taskdescriptor * task=taskarray[corenum][i];
1394 /* Build iterators for parameters */
1395 for(j=0;j<task->numParameters;j++) {
1396 struct parameterdescriptor *param=task->descriptorarray[j];
1397 struct parameterwrapper *parameter=param->queue;
1398 parameter->objectset=allocateObjectHash(10);
1399 parameter->task=task;
1400 builditerators(task, j, parameter);
1405 void toiReset(struct tagobjectiterator * it) {
1408 } else if (it->numtags>0) {
1411 ObjectHashiterator(it->objectset, &it->it);
1415 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
1418 /* Get object with tags */
1419 struct ___Object___ *obj=objectarray[it->tagobjectslot];
1420 struct ___Object___ *tagptr=obj->___tags___;
1421 if (tagptr->type==TAGTYPE) {
1422 if ((it->tagobjindex==0)&& /* First object */
1423 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
1428 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1429 int tagindex=it->tagobjindex;
1430 for(;tagindex<ao->___cachedCode___;tagindex++) {
1431 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
1432 if (td->flag==it->tagid) {
1433 it->tagobjindex=tagindex; /* Found right type of tag */
1439 } else if (it->numtags>0) {
1440 /* Use tags to locate appropriate objects */
1441 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1442 struct ___Object___ *objptr=tag->flagptr;
1444 if (objptr->type!=OBJECTARRAYTYPE) {
1445 if (it->tagobjindex>0)
1447 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1449 for(i=1;i<it->numtags;i++) {
1450 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1451 if (!containstag(objptr,tag2))
1456 struct ArrayObject *ao=(struct ArrayObject *) objptr;
1459 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
1460 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
1461 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1463 for(i=1;i<it->numtags;i++) {
1464 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1465 if (!containstag(objptr,tag2))
1468 it->tagobjindex=tagindex;
1473 it->tagobjindex=tagindex;
1477 return ObjhasNext(&it->it);
1481 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
1483 struct ___Object___ * objptr=tag->flagptr;
1484 if (objptr->type==OBJECTARRAYTYPE) {
1485 struct ArrayObject *ao=(struct ArrayObject *)objptr;
1486 for(j=0;j<ao->___cachedCode___;j++) {
1487 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
1495 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
1496 /* hasNext has all of the intelligence */
1499 /* Get object with tags */
1500 struct ___Object___ *obj=objectarray[it->tagobjectslot];
1501 struct ___Object___ *tagptr=obj->___tags___;
1502 if (tagptr->type==TAGTYPE) {
1504 objectarray[it->slot]=tagptr;
1506 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1507 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
1509 } else if (it->numtags>0) {
1510 /* Use tags to locate appropriate objects */
1511 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1512 struct ___Object___ *objptr=tag->flagptr;
1513 if (objptr->type!=OBJECTARRAYTYPE) {
1515 objectarray[it->slot]=objptr;
1517 struct ArrayObject *ao=(struct ArrayObject *) objptr;
1518 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
1521 /* Iterate object */
1522 objectarray[it->slot]=(void *)Objkey(&it->it);