4 #include "structdefs.h"
6 #include "checkpoint.h"
8 #include "SimpleHash.h"
9 #include "GenericHashtable.h"
10 #include <sys/select.h>
11 #include <sys/types.h>
20 #include <raw_compiler_defs.h>
22 #elif defined THREADSIMULATE
23 // use POSIX message queue
24 // for each core, its message queue named as
30 extern int injectfailures;
31 extern float failurechance;
37 #define TOTALCORE raw_get_num_tiles()
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
48 struct RuntimeHash * forward;
49 struct RuntimeHash * reverse;
52 int corestatus[NUMCORES]; // records status of each core
55 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
56 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
58 struct RuntimeHash locktable;
59 static struct RuntimeHash* locktbl = &locktable;
60 void * curr_heapbase=0;
61 void * curr_heaptop=0;
63 int self_numreceiveobjs;
70 struct Queue objqueue;
75 void calCoords(int core_num, int* coordY, int* coordX);
77 #elif defined THREADSIMULATE
78 static struct RuntimeHash* locktbl;
86 struct thread_data thread_data_array[NUMCORES];
88 static pthread_key_t key;
89 static pthread_rwlock_t rwlock_tbl;
90 static pthread_rwlock_t rwlock_init;
95 bool transStallMsg(int targetcore);
96 void transTerminateMsg(int targetcore);
98 bool getreadlock(void* ptr);
99 void releasereadlock(void* ptr);
101 bool getreadlock_I(void* ptr);
102 void releasereadlock_I(void* ptr);
104 bool getwritelock(void* ptr);
105 void releasewritelock(void* ptr);
109 void flushAll(void) {
113 raw_user_interrupts_off();
115 raw_test_pass(0xec00);
116 for(i = 0; i < 512; ++i) {
119 flushCacheline(base);
120 flushCacheline(base|off1);
123 raw_user_interrupts_on();
125 raw_test_pass(0xec02);
131 raw_test_pass(0xefee);
132 raw_user_interrupts_off();
133 raw_test_pass(0xef00);
135 raw_test_pass(0xefff);
136 raw_user_interrupts_on();
137 raw_test_pass(0xefef);
142 int main(int argc, char **argv) {
148 bool sendStall = false;
150 bool tocontinue = false;
151 struct QueueItem * objitem = NULL;
152 struct transObjInfo * objInfo = NULL;
154 bool allStall = true;
157 raw_test_pass_reg(&locktable);
158 raw_test_pass_reg(&msglength);
159 raw_test_pass_reg(&i);
160 raw_test_pass(0xee01);
161 corenum = raw_get_abs_pos_x() + 4 * raw_get_abs_pos_y();
163 // initialize the arrays
164 if(STARTUPCORE == corenum) {
165 // startup core to initialize corestatus[]
166 for(i = 0; i < NUMCORES; ++i) {
168 numsendobjs[i] = 0; // assume all variables in RAW are local variables! MAY BE WRONG!!!
169 numreceiveobjs[i] = 0;
172 self_numsendobjs = 0;
173 self_numreceiveobjs = 0;
174 for(i = 0; i < 30; ++i) {
181 raw_test_pass(0xee02);
183 // create the lock table, lockresult table and obj queue
185 locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
186 /* Set allocation blocks*/
187 locktable.listhead=NULL;
188 locktable.listtail=NULL;
190 locktable.numelements = 0;
197 objqueue.head = NULL;
198 objqueue.tail = NULL;
199 raw_test_pass(0xee03);
202 if (corenum < NUMCORES) {
205 start_gdn_avail_ints(recvMsg);
206 raw_user_interrupts_on();
207 raw_test_pass(0xee04);
211 #elif defined THREADSIMULATE
215 pthread_t threads[NUMCORES];
218 // initialize three arrays and msg queue array
219 char * pathhead = "/msgqueue_";
220 int targetlen = strlen(pathhead);
221 for(i = 0; i < NUMCORES; ++i) {
224 numreceiveobjs[i] = 0;
229 corenumstr[0] = i + '0';
230 corenumstr[1] = '\0';
233 corenumstr[1] = i %10 + '0';
234 corenumstr[0] = (i / 10) + '0';
235 corenumstr[2] = '\0';
238 printf("Error: i >= 100\n");
242 char path[targetlen + sourcelen + 1];
243 strcpy(path, pathhead);
244 strncat(path, corenumstr, sourcelen);
245 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
246 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
248 mqd[i]= mq_open(path, oflags, omodes, NULL);
250 printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
253 printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
258 pthread_key_create(&key, NULL);
260 // create the lock table and initialize its mutex
261 locktbl = allocateRuntimeHash(20);
262 int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
263 printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
265 for(i = 0; i < NUMCORES; ++i) {
266 thread_data_array[i].corenum = i;
267 thread_data_array[i].argc = argc;
268 thread_data_array[i].argv = argv;
269 thread_data_array[i].numsendobjs = 0;
270 thread_data_array[i].numreceiveobjs = 0;
271 printf("[main] creating thread %d\n", i);
272 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
274 printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
280 //pthread_exit(NULL);
284 void run(void* arg) {
285 struct thread_data * my_tdata = (struct thread_data *)arg;
286 pthread_setspecific(key, (void *)my_tdata->corenum);
287 int argc = my_tdata->argc;
288 char** argv = my_tdata->argv;
289 printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
295 GC_init(); // Initialize the garbage collector
303 initializeexithandler();
305 raw_test_pass(0xee05);
307 /* Create table for failed tasks */
309 if(corenum > NUMCORES - 1) {
316 raw_test_pass(0xee06);
318 /*failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
319 (int (*)(void *,void *)) &comparetpd);*/
322 raw_test_pass(0xee07);
324 /* Create queue of active tasks */
325 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
326 (int (*)(void *,void *)) &comparetpd);
328 raw_test_pass(0xee08);
331 /* Process task information */
334 raw_test_pass(0xee09);
337 /* Create startup object */
338 createstartupobject(argc, argv);
340 raw_test_pass(0xee0a);
344 raw_test_pass(0xee0b);
348 while(receiveObject() != -1) {
352 // check if there are new active tasks can be executed
356 while(receiveObject() != -1) {
360 raw_test_pass(0xee0c);
362 // check if there are some pending objects, if yes, enqueue them and executetasks again
364 raw_test_pass(0xee0d);
365 while(!isEmpty(&objqueue)) {
368 raw_user_interrupts_off();
370 raw_test_pass(0xeee1);
373 objitem = getTail(&objqueue);
374 //obj = objitem->objectptr;
375 objInfo = (struct transObjInfo *)objitem->objectptr;
376 obj = objInfo->objptr;
377 raw_test_pass_reg((int)obj);
378 // grab lock and flush the obj
384 raw_test_pass_reg(grount);
396 for(k = 0; k < classsize[((struct ___Object___ *)obj)->type]; ++k) {
397 invalidateAddr(obj + k);
399 // enqueue the object
400 for(k = 0; k < objInfo->length; ++k) {
401 int taskindex = objInfo->queues[2 * k];
402 int paramindex = objInfo->queues[2 * k + 1];
403 struct parameterwrapper ** queues = &(paramqueues[corenum][taskindex][paramindex]);
404 raw_test_pass_reg(taskindex);
405 raw_test_pass_reg(paramindex);
406 enqueueObject_I(obj, queues, 1);
408 removeItem(&objqueue, objitem);
409 releasereadlock_I(obj);
410 RUNFREE(objInfo->queues);
412 /*enqueueObject_I(obj, NULL, 0);
413 removeItem(&objqueue, objitem);
414 releasereadlock_I(obj);*/
417 // put it at the end of the queue
418 // and try to execute active tasks already enqueued first
419 removeItem(&objqueue, objitem);
420 addNewItem_I(&objqueue, objInfo);
424 raw_user_interrupts_on();
426 raw_test_pass(0xee0e);
431 if(STARTUPCORE == corenum) {
433 raw_test_pass(0xee0f);
437 raw_user_interrupts_off();
439 corestatus[corenum] = 0;
440 numsendobjs[corenum] = self_numsendobjs;
441 numreceiveobjs[corenum] = self_numreceiveobjs;
442 // check the status of all cores
444 raw_test_pass_reg(NUMCORES);
445 for(i = 0; i < NUMCORES; ++i) {
446 raw_test_pass(0xe000 + corestatus[i]);
447 if(corestatus[i] != 0) {
453 // check if the sum of send objs and receive obj are the same
455 // no->go on executing
457 for(i = 0; i < NUMCORES; ++i) {
458 sumsendobj += numsendobjs[i];
459 raw_test_pass(0xf000 + numsendobjs[i]);
461 for(i = 0; i < NUMCORES; ++i) {
462 sumsendobj -= numreceiveobjs[i];
463 raw_test_pass(0xf000 + numreceiveobjs[i]);
465 if(0 == sumsendobj) {
467 raw_test_pass(0xee10);
468 raw_test_done(1); // All done.
472 raw_user_interrupts_on();
476 raw_test_pass(0xee11);
478 // wait for some time
480 raw_test_pass(0xee12);
483 raw_test_pass(0xee13);
485 // send StallMsg to startup core
486 raw_test_pass(0xee14);
487 sendStall = transStallMsg(STARTUPCORE);
492 raw_test_pass(0xee15);
498 #elif defined THREADSIMULATE
499 /* Start executing the tasks */
503 // check if there are new objects coming
504 bool sendStall = false;
506 int numofcore = pthread_getspecific(key);
508 switch(receiveObject()) {
510 printf("[run, %d] receive an object\n", numofcore);
512 // received an object
513 // check if there are new active tasks can be executed
518 //printf("[run, %d] no msg\n", numofcore);
520 if(STARTUPCORE == numofcore) {
521 corestatus[numofcore] = 0;
522 // check the status of all cores
523 bool allStall = true;
524 for(i = 0; i < NUMCORES; ++i) {
525 if(corestatus[i] != 0) {
531 // check if the sum of send objs and receive obj are the same
533 // no->go on executing
535 for(i = 0; i < NUMCORES; ++i) {
536 sumsendobj += numsendobjs[i];
538 for(i = 0; i < NUMCORES; ++i) {
539 sumsendobj -= numreceiveobjs[i];
541 if(0 == sumsendobj) {
545 int rc_tbl = pthread_rwlock_wrlock(&rwlock_tbl);
546 printf("[run, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc_tbl, strerror(rc_tbl));
547 struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
548 while(0 != RunhasNext(it_lock)) {
549 int key = Runkey(it_lock);
550 pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
551 int rc_des = pthread_rwlock_destroy(rwlock_obj);
552 printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
555 freeRuntimeHash(locktbl);
559 // destroy all message queues
560 char * pathhead = "/msgqueue_";
561 int targetlen = strlen(pathhead);
562 for(i = 0; i < NUMCORES; ++i) {
566 corenumstr[0] = i + '0';
567 corenumstr[1] = '\0';
570 corenumstr[1] = i %10 + '0';
571 corenumstr[0] = (i / 10) + '0';
572 corenumstr[2] = '\0';
575 printf("Error: i >= 100\n");
579 char path[targetlen + sourcelen + 1];
580 strcpy(path, pathhead);
581 strncat(path, corenumstr, sourcelen);
585 printf("[run, %d] terminate!\n", numofcore);
592 // send StallMsg to startup core
593 sendStall = transStallMsg(STARTUPCORE);
599 printf("[run, %d] receive a stall msg\n", numofcore);
600 // receive a Stall Msg, do nothing
601 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
606 printf("[run, %d] receive a terminate msg\n", numofcore);
607 // receive a terminate Msg
608 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
609 mq_close(mqd[corenum]);
615 printf("[run, %d] Error: invalid message type.\n", numofcore);
625 void createstartupobject(int argc, char ** argv) {
628 /* Allocate startup object */
630 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
631 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
633 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
634 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
636 /* Build array of strings */
637 startupobject->___parameters___=stringarray;
638 for(i=1;i<argc;i++) {
639 int length=strlen(argv[i]);
641 struct ___String___ *newstring=NewString(NULL, argv[i],length);
643 struct ___String___ *newstring=NewString(argv[i],length);
645 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
648 startupobject->isolate = 1;
649 startupobject->version = 0;
651 /* Set initialized flag for startup object */
652 flagorandinit(startupobject,1,0xFFFFFFFF);
653 enqueueObject(startupobject, NULL, 0);
659 int hashCodetpd(struct taskparamdescriptor *ftd) {
660 int hash=(int)ftd->task;
662 for(i=0;i<ftd->numParameters;i++){
663 hash^=(int)ftd->parameterArray[i];
668 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
670 if (ftd1->task!=ftd2->task)
672 for(i=0;i<ftd1->numParameters;i++)
673 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
678 /* This function sets a tag. */
680 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
682 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
684 struct ArrayObject * ao=NULL;
685 struct ___Object___ * tagptr=obj->___tags___;
687 raw_test_pass(0xebb0);
691 raw_test_pass(0xebb1);
693 obj->___tags___=(struct ___Object___ *)tagd;
695 /* Have to check if it is already set */
696 if (tagptr->type==TAGTYPE) {
697 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
699 raw_test_pass(0xebb2);
703 raw_test_pass(0xebb3);
708 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
709 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
710 obj=(struct ___Object___ *)ptrarray[2];
711 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
712 td=(struct ___TagDescriptor___ *) obj->___tags___;
715 raw_test_pass(0xebb4);
717 ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
720 raw_test_pass(0xebb5);
722 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
723 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
724 obj->___tags___=(struct ___Object___ *) ao;
725 ao->___cachedCode___=2;
727 raw_test_pass(0xebb6);
732 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
734 raw_test_pass(0xebb7);
736 for(i=0;i<ao->___cachedCode___;i++) {
737 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
739 raw_test_pass(0xebb8);
743 raw_test_pass(0xebb9);
748 if (ao->___cachedCode___<ao->___length___) {
750 raw_test_pass(0xebba);
752 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
753 ao->___cachedCode___++;
755 raw_test_pass(0xebbb);
759 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
760 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
761 obj=(struct ___Object___ *)ptrarray[2];
762 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
763 ao=(struct ArrayObject *)obj->___tags___;
765 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
768 raw_test_pass(0xebbc);
770 aonew->___cachedCode___=ao->___length___+1;
771 for(i=0;i<ao->___length___;i++) {
773 raw_test_pass(0xebbd);
775 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
778 raw_test_pass(0xebbe);
780 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
782 raw_test_pass(0xebbf);
789 struct ___Object___ * tagset=tagd->flagptr;
791 raw_test_pass(0xb008);
795 raw_test_pass(0xb009);
798 } else if (tagset->type!=OBJECTARRAYTYPE) {
800 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
801 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
802 obj=(struct ___Object___ *)ptrarray[2];
803 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
805 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
807 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
808 ARRAYSET(ao, struct ___Object___ *, 1, obj);
809 ao->___cachedCode___=2;
810 tagd->flagptr=(struct ___Object___ *)ao;
812 raw_test_pass(0xb00a);
815 struct ArrayObject *ao=(struct ArrayObject *) tagset;
816 if (ao->___cachedCode___<ao->___length___) {
818 raw_test_pass(0xb00b);
820 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
824 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
825 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
826 obj=(struct ___Object___ *)ptrarray[2];
827 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
828 ao=(struct ArrayObject *)tagd->flagptr;
830 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
832 aonew->___cachedCode___=ao->___cachedCode___+1;
833 for(i=0;i<ao->___length___;i++) {
834 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
836 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
837 tagd->flagptr=(struct ___Object___ *) aonew;
839 raw_test_pass(0xb00c);
846 /* This function clears a tag. */
848 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
850 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
852 /* We'll assume that tag is alway there.
853 Need to statically check for this of course. */
854 struct ___Object___ * tagptr=obj->___tags___;
856 if (tagptr->type==TAGTYPE) {
857 if ((struct ___TagDescriptor___ *)tagptr==tagd)
858 obj->___tags___=NULL;
861 printf("ERROR 1 in tagclear\n");
865 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
867 for(i=0;i<ao->___cachedCode___;i++) {
868 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
870 ao->___cachedCode___--;
871 if (i<ao->___cachedCode___)
872 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
873 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
874 if (ao->___cachedCode___==0)
875 obj->___tags___=NULL;
880 printf("ERROR 2 in tagclear\n");
886 struct ___Object___ *tagset=tagd->flagptr;
887 if (tagset->type!=OBJECTARRAYTYPE) {
892 printf("ERROR 3 in tagclear\n");
896 struct ArrayObject *ao=(struct ArrayObject *) tagset;
898 for(i=0;i<ao->___cachedCode___;i++) {
899 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
901 ao->___cachedCode___--;
902 if (i<ao->___cachedCode___)
903 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
904 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
905 if (ao->___cachedCode___==0)
911 printf("ERROR 4 in tagclear\n");
919 /* This function allocates a new tag. */
921 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
922 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
924 struct ___TagDescriptor___ * allocate_tag(int index) {
925 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
934 /* This function updates the flag for object ptr. It or's the flag
935 with the or mask and and's it with the andmask. */
937 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
939 int flagcomp(const int *val1, const int *val2) {
940 return (*val1)-(*val2);
943 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
945 int oldflag=((int *)ptr)[1];
946 int flag=ormask|oldflag;
949 raw_test_pass(0xaa000000 + oldflag);
950 raw_test_pass(0xaa000000 + flag);
952 flagbody(ptr, flag, queues, length, false);
956 bool intflagorand(void * ptr, int ormask, int andmask) {
958 int oldflag=((int *)ptr)[1];
959 int flag=ormask|oldflag;
961 if (flag==oldflag) /* Don't do anything */
964 flagbody(ptr, flag, NULL, 0, false);
970 void flagorandinit(void * ptr, int ormask, int andmask) {
971 int oldflag=((int *)ptr)[1];
972 int flag=ormask|oldflag;
975 raw_test_pass(0xaa100000 + oldflag);
976 raw_test_pass(0xaa100000 + flag);
978 flagbody(ptr,flag,NULL,0,true);
981 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
982 struct parameterwrapper * flagptr = NULL;
984 struct parameterwrapper ** queues = vqueues;
985 int length = vlength;
988 int * enterflags = NULL;
989 if((!isnew) && (queues == NULL)) {
990 #ifdef THREADSIMULATE
991 int numofcore = pthread_getspecific(key);
992 queues = objectqueues[numofcore][ptr->type];
993 length = numqueues[numofcore][ptr->type];
996 if(corenum < NUMCORES) {
998 queues = objectqueues[corenum][ptr->type];
999 length = numqueues[corenum][ptr->type];
1009 raw_test_pass(0xbb000000 + ptr->flag);
1012 /*Remove object from all queues */
1013 for(i = 0; i < length; ++i) {
1014 flagptr = queues[i];
1015 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1016 ObjectHashremove(flagptr->objectset, (int)ptr);
1017 if (enterflags!=NULL)
1018 RUNFREE(enterflags);
1022 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1023 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1026 struct QueueItem *tmpptr;
1027 struct parameterwrapper * parameter=NULL;
1030 struct parameterwrapper * prevptr=NULL;
1031 struct ___Object___ *tagptr=NULL;
1032 struct parameterwrapper ** queues = vqueues;
1033 int length = vlength;
1035 if(corenum > NUMCORES - 1) {
1039 if(queues == NULL) {
1040 #ifdef THREADSIMULATE
1041 int numofcore = pthread_getspecific(key);
1042 queues = objectqueues[numofcore][ptr->type];
1043 length = numqueues[numofcore][ptr->type];
1045 queues = objectqueues[corenum][ptr->type];
1046 length = numqueues[corenum][ptr->type];
1049 tagptr=ptr->___tags___;
1051 /* Outer loop iterates through all parameter queues an object of
1052 this type could be in. */
1053 for(j = 0; j < length; ++j) {
1054 parameter = queues[j];
1056 if (parameter->numbertags>0) {
1058 goto nextloop;//that means the object has no tag but that param needs tag
1059 else if(tagptr->type==TAGTYPE) {//one tag
1060 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1061 for(i=0;i<parameter->numbertags;i++) {
1062 //slotid is parameter->tagarray[2*i];
1063 int tagid=parameter->tagarray[2*i+1];
1064 if (tagid!=tagptr->flag)
1065 goto nextloop; /*We don't have this tag */
1067 } else {//multiple tags
1068 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1069 for(i=0;i<parameter->numbertags;i++) {
1070 //slotid is parameter->tagarray[2*i];
1071 int tagid=parameter->tagarray[2*i+1];
1073 for(j=0;j<ao->___cachedCode___;j++) {
1074 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
1085 for(i=0;i<parameter->numberofterms;i++) {
1086 int andmask=parameter->intarray[i*2];
1087 int checkmask=parameter->intarray[i*2+1];
1088 if ((ptr->flag&andmask)==checkmask) {
1090 raw_test_pass(0xcc000000 + andmask);
1091 raw_test_pass_reg((int)ptr);
1092 raw_test_pass(0xcc000000 + ptr->flag);
1093 raw_test_pass(0xcc000000 + checkmask);
1095 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
1107 void enqueueObject_I(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1108 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1111 struct QueueItem *tmpptr;
1112 struct parameterwrapper * parameter=NULL;
1115 struct parameterwrapper * prevptr=NULL;
1116 struct ___Object___ *tagptr=NULL;
1117 struct parameterwrapper ** queues = vqueues;
1118 int length = vlength;
1120 if(corenum > NUMCORES - 1) {
1124 if(queues == NULL) {
1125 #ifdef THREADSIMULATE
1126 int numofcore = pthread_getspecific(key);
1127 queues = objectqueues[numofcore][ptr->type];
1128 length = numqueues[numofcore][ptr->type];
1130 queues = objectqueues[corenum][ptr->type];
1131 length = numqueues[corenum][ptr->type];
1135 raw_test_pass(0xeaa1);
1136 raw_test_pass_reg(queues);
1137 raw_test_pass_reg(length);
1139 tagptr=ptr->___tags___;
1141 /* Outer loop iterates through all parameter queues an object of
1142 this type could be in. */
1143 for(j = 0; j < length; ++j) {
1144 parameter = queues[j];
1146 if (parameter->numbertags>0) {
1148 raw_test_pass(0xeaa2);
1149 raw_test_pass_reg(tagptr);
1152 goto nextloop;//that means the object has no tag but that param needs tag
1153 else if(tagptr->type==TAGTYPE) {//one tag
1154 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1156 raw_test_pass(0xeaa3);
1158 for(i=0;i<parameter->numbertags;i++) {
1159 //slotid is parameter->tagarray[2*i];
1160 int tagid=parameter->tagarray[2*i+1];
1161 if (tagid!=tagptr->flag) {
1163 raw_test_pass(0xeaa4);
1165 goto nextloop; /*We don't have this tag */
1168 } else {//multiple tags
1169 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1171 raw_test_pass(0xeaa5);
1173 for(i=0;i<parameter->numbertags;i++) {
1174 //slotid is parameter->tagarray[2*i];
1175 int tagid=parameter->tagarray[2*i+1];
1177 for(j=0;j<ao->___cachedCode___;j++) {
1178 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) {
1183 raw_test_pass(0xeaa6);
1193 for(i=0;i<parameter->numberofterms;i++) {
1194 int andmask=parameter->intarray[i*2];
1195 int checkmask=parameter->intarray[i*2+1];
1197 raw_test_pass(0xeaa7);
1198 raw_test_pass(0xcc000000 + andmask);
1199 raw_test_pass_reg(ptr);
1200 raw_test_pass(0xcc000000 + ptr->flag);
1201 raw_test_pass(0xcc000000 + checkmask);
1203 if ((ptr->flag&andmask)==checkmask) {
1205 raw_test_pass(0xeaa8);
1207 enqueuetasks_I(parameter, prevptr, ptr, NULL, 0);
1218 // helper function to compute the coordinates of a core from the core number
1219 void calCoords(int core_num, int* coordY, int* coordX) {
1220 *coordX = core_num % 4;
1221 *coordY = core_num / 4;
1225 /* Message format for RAW version:
1227 * type: 0 -- transfer object
1228 * 1 -- transfer stall msg
1234 * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
1235 * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
1236 * LockMsg: 2 + lock type + obj pointer + request core (size is always 4 * sizeof(int))
1237 * 3/4/5 + lock type + obj pointer (size is always 3 * sizeof(int))
1238 * lock type: 0 -- read; 1 -- write
1241 // transfer an object to targetcore
1243 void transferObject(struct transObjInfo * transObj) {
1244 void * obj = transObj->objptr;
1245 int type=((int *)obj)[0];
1246 int size=classsize[type];
1247 int targetcore = transObj->targetcore;
1248 //assert(type < NUMCLASSES); // can only transfer normal object
1252 int self_y, self_x, target_y, target_x;
1254 // for 32 bit machine, the size of fixed part is always 3 words
1255 //int msgsize = sizeof(int) * 2 + sizeof(void *);
1256 int msgsize = 3 + transObj->length * 2;
1259 struct ___Object___ * newobj = (struct ___Object___ *)obj;
1260 /*if(0 == newobj->isolate) {
1264 calCoords(corenum, &self_y, &self_x);
1265 calCoords(targetcore, &target_y, &target_x);
1266 // Build the message header
1267 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1269 target_y, target_x);
1270 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1271 raw_test_pass(0xbbbb);
1272 raw_test_pass(0xb000 + targetcore); // targetcore
1276 raw_test_pass_reg(msgsize);
1278 raw_test_pass_reg(obj);
1279 //gdn_send(isshared);
1280 //raw_test_pass_reg(isshared);
1281 for(i = 0; i < transObj->length; ++i) {
1282 int taskindex = transObj->queues[2*i];
1283 int paramindex = transObj->queues[2*i+1];
1284 gdn_send(taskindex);
1285 raw_test_pass_reg(taskindex);
1286 gdn_send(paramindex);
1287 raw_test_pass_reg(paramindex);
1289 raw_test_pass(0xffff);
1290 ++(self_numsendobjs);
1291 #elif defined THREADSIMULATE
1292 int numofcore = pthread_getspecific(key);
1294 // use POSIX message queue to transfer objects between cores
1298 if(targetcore < 10) {
1299 corenumstr[0] = targetcore + '0';
1300 corenumstr[1] = '\0';
1302 } else if(targetcore < 100) {
1303 corenumstr[1] = targetcore % 10 + '0';
1304 corenumstr[0] = (targetcore / 10) + '0';
1305 corenumstr[2] = '\0';
1308 printf("Error: targetcore >= 100\n");
1312 char * pathhead = "/msgqueue_";
1313 int targetlen = strlen(pathhead);
1314 char path[targetlen + sourcelen + 1];
1315 strcpy(path, pathhead);
1316 strncat(path, corenumstr, sourcelen);
1317 int oflags = O_WRONLY|O_NONBLOCK;
1318 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1319 mqdnum = mq_open(path, oflags, omodes, NULL);
1321 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1325 /*struct ___Object___ * newobj = (struct ___Object___ *)obj;
1326 if(0 == newobj->isolate) {
1327 newobj = RUNMALLOC(size);
1328 memcpy(newobj, obj, size);
1329 newobj->original=obj;
1331 struct transObjInfo * tmptransObj = RUNMALLOC(sizeof(struct transObjInfo));
1332 memcpy(tmptransObj, transObj, sizeof(struct transObjInfo));
1333 int * tmpqueue = RUNMALLOC(sizeof(int)*2*tmptransObj->length);
1334 memcpy(tmpqueue, tmptransObj->queues, sizeof(int)*2*tmptransObj->length);
1335 tmptransObj->queues = tmpqueue;
1336 struct ___Object___ * newobj = RUNMALLOC(sizeof(struct ___Object___));
1337 newobj->type = ((struct ___Object___ *)obj)->type;
1338 newobj->original = (struct ___Object___ *)tmptransObj;
1341 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1343 printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1347 if(numofcore == STARTUPCORE) {
1348 ++numsendobjs[numofcore];
1350 ++(thread_data_array[numofcore].numsendobjs);
1352 printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1356 // send terminate message to targetcore
1358 bool transStallMsg(int targetcore) {
1361 int self_y, self_x, target_y, target_x;
1362 // for 32 bit machine, the size is always 4 words
1363 //int msgsize = sizeof(int) * 4;
1366 calCoords(corenum, &self_y, &self_x);
1367 calCoords(targetcore, &target_y, &target_x);
1368 // Build the message header
1369 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1371 target_y, target_x);
1372 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1373 raw_test_pass(0xbbbb);
1374 raw_test_pass(0xb000 + targetcore); // targetcore
1378 raw_test_pass_reg(corenum);
1379 gdn_send(self_numsendobjs);
1380 raw_test_pass_reg(self_numsendobjs);
1381 gdn_send(self_numreceiveobjs);
1382 raw_test_pass_reg(self_numreceiveobjs);
1383 raw_test_pass(0xffff);
1385 #elif defined THREADSIMULATE
1386 struct ___Object___ *newobj = RUNMALLOC(sizeof(struct ___Object___));
1387 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
1389 int numofcore = pthread_getspecific(key);
1390 newobj->flag = numofcore;
1391 newobj->___cachedHash___ = thread_data_array[numofcore].numsendobjs;
1392 newobj->___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
1394 // use POSIX message queue to send stall msg to startup core
1395 assert(targetcore == STARTUPCORE);
1399 if(targetcore < 10) {
1400 corenumstr[0] = targetcore + '0';
1401 corenumstr[1] = '\0';
1403 } else if(targetcore < 100) {
1404 corenumstr[1] = targetcore % 10 + '0';
1405 corenumstr[0] = (targetcore / 10) + '0';
1406 corenumstr[2] = '\0';
1409 printf("Error: targetcore >= 100\n");
1413 char * pathhead = "/msgqueue_";
1414 int targetlen = strlen(pathhead);
1415 char path[targetlen + sourcelen + 1];
1416 strcpy(path, pathhead);
1417 strncat(path, corenumstr, sourcelen);
1418 int oflags = O_WRONLY|O_NONBLOCK;
1419 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1420 mqdnum = mq_open(path, oflags, omodes, NULL);
1422 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1427 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1429 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1433 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1434 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj->flag, newobj->___cachedHash___, newobj->___cachedCode___);
1441 // receive object transferred from other cores
1442 // or the terminate message from other cores
1443 // NOTICE: following format is for threadsimulate version only
1444 // RAW version please see previous description
1445 // format: type + object
1446 // type: -1--stall msg
1448 // return value: 0--received an object
1449 // 1--received nothing
1450 // 2--received a Stall Msg
1451 // 3--received a lock Msg
1452 // RAW version: -1 -- received nothing
1453 // otherwise -- received msg type
1454 int receiveObject() {
1458 int self_y, self_x, target_y, target_x;
1460 if(gdn_input_avail() == 0) {
1461 if(corenum < NUMCORES) {
1462 raw_test_pass(0xd001);
1466 raw_test_pass(0xcccc);
1467 while((gdn_input_avail() != 0) && (msgdataindex < msglength)) {
1468 msgdata[msgdataindex] = gdn_receive();
1469 if(msgdataindex == 0) {
1470 if(msgdata[0] > 2) {
1472 } else if(msgdata[0] > 0) {
1475 } else if((msgdataindex == 1) && (msgdata[0] == 0)) {
1476 msglength = msgdata[msgdataindex];
1478 raw_test_pass_reg(msgdata[msgdataindex]);
1481 /*if(msgdataindex == 0) {
1483 msgtype = gdn_receive();
1490 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1491 msgdata[msgdataindex] = msgtype;
1493 raw_test_pass_reg(msgtype);
1494 } else if((msgdataindex == 1) && (msgtype == 0)) {
1495 // object transfer msg
1496 msglength = gdn_receive();
1497 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1498 msgdata[0] = msgtype;
1499 msgdata[msgdataindex] = msglength;
1500 raw_test_pass_reg(msgdata[msgdataindex]);
1502 msgdata[msgdataindex] = gdn_receive();
1503 raw_test_pass_reg(msgdata[msgdataindex]);
1507 raw_test_pass(0xffff);
1508 if(msgdataindex == msglength) {
1509 // received a whole msg
1510 int type, data1, data2; // will receive at least 3 words including type
1516 // receive a object transfer msg
1517 struct transObjInfo * transObj = RUNMALLOC_I(sizeof(struct transObjInfo));
1519 if(corenum > NUMCORES - 1) {
1520 raw_test_done(0xa00a);
1522 // store the object and its corresponding queue info, enqueue it later
1523 transObj->objptr = (void *)data2; // data1 is now size of the msg
1524 transObj->length = (msglength - 3) / 2;
1525 transObj->queues = RUNMALLOC_I(sizeof(int)*(msglength - 3));
1526 for(k = 0; k < transObj->length; ++k) {
1527 transObj->queues[2*k] = msgdata[3+2*k];
1528 raw_test_pass_reg(transObj->queues[2*k]);
1529 transObj->queues[2*k+1] = msgdata[3+2*k+1];
1530 raw_test_pass_reg(transObj->queues[2*k+1]);
1532 //memcpy(transObj->queues, msgdata[3], sizeof(int)*(msglength - 3));
1533 addNewItem_I(&objqueue, (void *)transObj);
1534 ++(self_numreceiveobjs);
1535 raw_test_pass(0xe881);
1537 addNewItem_I(&objqueue, (void *)data2);
1538 ++(self_numreceiveobjs);
1539 raw_test_pass(0xe881);*/
1543 // receive a stall msg
1544 if(corenum != STARTUPCORE) {
1545 // non startup core can not receive stall msg
1547 raw_test_done(0xa001);
1549 if(data1 < NUMCORES) {
1550 raw_test_pass(0xe882);
1551 corestatus[data1] = 0;
1552 numsendobjs[data1] = data2;
1553 numreceiveobjs[data1] = msgdata[3];
1558 // receive lock request msg
1559 // for 32 bit machine, the size is always 3 words
1560 //int msgsize = sizeof(int) * 3;
1562 // lock request msg, handle it right now
1563 // check to see if there is a lock exist in locktbl for the required obj
1564 int data3 = msgdata[3];
1566 if(!RuntimeHashcontainskey(locktbl, data2)) {
1567 // no locks for this object
1568 // first time to operate on this shared object
1569 // create a lock for it
1570 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1571 raw_test_pass(0xe883);
1573 RuntimeHashadd_I(locktbl, data2, 1);
1575 RuntimeHashadd_I(locktbl, data2, -1);
1579 raw_test_pass(0xe884);
1580 RuntimeHashget(locktbl, data2, &rwlock_obj);
1581 raw_test_pass_reg(rwlock_obj);
1582 if(0 == rwlock_obj) {
1588 RuntimeHashremovekey(locktbl, data2);
1589 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1590 } else if((rwlock_obj > 0) && (data1 == 0)) {
1591 // read lock request and there are only read locks
1593 RuntimeHashremovekey(locktbl, data2);
1594 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1598 raw_test_pass_reg(rwlock_obj);
1601 calCoords(corenum, &self_y, &self_x);
1602 calCoords(targetcore, &target_y, &target_x);
1603 // Build the message header
1604 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1606 target_y, target_x);
1607 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1608 raw_test_pass(0xbbbb);
1609 raw_test_pass(0xb000 + targetcore); // targetcore
1611 // deny the lock request
1612 gdn_send(4); // lock request
1615 // grount the lock request
1616 gdn_send(3); // lock request
1619 gdn_send(data1); // lock type
1620 raw_test_pass_reg(data1);
1621 gdn_send(data2); // lock target
1622 raw_test_pass_reg(data2);
1623 raw_test_pass(0xffff);
1627 // receive lock grount msg
1628 if(corenum > NUMCORES - 1) {
1629 raw_test_done(0xa00b);
1631 if(lockobj == data2) {
1638 // conflicts on lockresults
1639 raw_test_done(0xa002);
1644 // receive lock grount/deny msg
1645 if(corenum > NUMCORES - 1) {
1646 raw_test_done(0xa00c);
1648 if(lockobj == data2) {
1655 // conflicts on lockresults
1656 raw_test_done(0xa003);
1661 // receive lock release msg
1662 if(!RuntimeHashcontainskey(locktbl, data2)) {
1663 // no locks for this object, something is wrong
1664 raw_test_done(0xa004);
1667 RuntimeHashget(locktbl, data2, &rwlock_obj);
1668 raw_test_pass(0xe885);
1669 raw_test_pass_reg(rwlock_obj);
1675 RuntimeHashremovekey(locktbl, data2);
1676 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1677 raw_test_pass_reg(rwlock_obj);
1686 for(msgdataindex--;msgdataindex > 0; --msgdataindex) {
1687 msgdata[msgdataindex] = -1;
1692 raw_test_pass(0xe886);
1696 raw_test_pass(0xe887);
1699 #elif defined THREADSIMULATE
1700 int numofcore = pthread_getspecific(key);
1701 // use POSIX message queue to transfer object
1703 struct mq_attr mqattr;
1704 mq_getattr(mqd[numofcore], &mqattr);
1705 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
1706 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
1712 //printf("msg: %s\n",msgptr);
1713 if(((int*)msgptr)[0] == -1) {
1715 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1716 int index = tmpptr->flag;
1717 corestatus[index] = 0;
1718 numsendobjs[index] = tmpptr->___cachedHash___;
1719 numreceiveobjs[index] = tmpptr->___cachedCode___;
1720 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
1723 } /*else if(((int*)msgptr)[0] == -2) {
1728 if(numofcore == STARTUPCORE) {
1729 ++(numreceiveobjs[numofcore]);
1731 ++(thread_data_array[numofcore].numreceiveobjs);
1733 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1734 struct transObjInfo * transObj = (struct transObjInfo *)tmpptr->original;
1735 tmpptr = (struct ___Object___ *)(transObj->objptr);
1736 int type = tmpptr->type;
1737 int size=classsize[type];
1738 struct ___Object___ * newobj=RUNMALLOC(size);
1739 memcpy(newobj, tmpptr, size);
1740 if(0 == newobj->isolate) {
1741 newobj->original=tmpptr;
1746 for(k = 0; k < transObj->length; ++k) {
1747 int taskindex = transObj->queues[2 * k];
1748 int paramindex = transObj->queues[2 * k + 1];
1749 struct parameterwrapper ** queues = &(paramqueues[numofcore][taskindex][paramindex]);
1750 enqueueObject(newobj, queues, 1);
1752 RUNFREE(transObj->queues);
1759 bool getreadlock(void * ptr) {
1762 int self_y, self_x, target_y, target_x;
1763 int targetcore = ((int)ptr) % TOTALCORE;
1764 // for 32 bit machine, the size is always 4 words
1765 //int msgsize = sizeof(int) * 4;
1775 if(targetcore == corenum) {
1776 // reside on this core
1779 raw_user_interrupts_off();
1781 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1782 // no locks for this object
1783 // first time to operate on this shared object
1784 // create a lock for it
1785 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1786 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1789 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1790 if(-1 != rwlock_obj) {
1792 RuntimeHashremovekey(locktbl, (int)ptr);
1793 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1799 raw_user_interrupts_on();
1801 if(lockobj == (int)ptr) {
1812 // conflicts on lockresults
1813 raw_test_done(0xa005);
1818 calCoords(corenum, &self_y, &self_x);
1819 calCoords(targetcore, &target_y, &target_x);
1820 // Build the message header
1821 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1823 target_y, target_x);
1824 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1825 raw_test_pass(0xbbbb);
1826 raw_test_pass(0xb000 + targetcore); // targetcore
1827 gdn_send(2); // lock request
1829 gdn_send(0); // read lock
1832 raw_test_pass_reg(ptr);
1834 raw_test_pass_reg(corenum);
1835 raw_test_pass(0xffff);
1837 #elif defined THREADSIMULATE
1838 int numofcore = pthread_getspecific(key);
1840 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1841 printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1845 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1846 // no locks for this object
1847 // first time to operate on this shared object
1848 // create a lock for it
1849 rc = pthread_rwlock_unlock(&rwlock_tbl);
1850 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1851 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1852 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1853 rc = pthread_rwlock_init(rwlock, NULL);
1854 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1855 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1856 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1861 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1863 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1866 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
1868 rc = pthread_rwlock_unlock(&rwlock_tbl);
1869 printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1871 rc = pthread_rwlock_tryrdlock(rwlock);
1872 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1879 pthread_rwlock_t* rwlock_obj = NULL;
1880 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1881 rc = pthread_rwlock_unlock(&rwlock_tbl);
1882 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1883 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1884 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1894 void releasereadlock(void * ptr) {
1897 int self_y, self_x, target_y, target_x;
1898 int targetcore = ((int)ptr) % TOTALCORE;
1899 // for 32 bit machine, the size is always 3 words
1900 //int msgsize = sizeof(int) * 3;
1903 if(targetcore == corenum) {
1905 raw_user_interrupts_off();
1907 // reside on this core
1908 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1909 // no locks for this object, something is wrong
1910 raw_test_done(0xa006);
1913 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1915 RuntimeHashremovekey(locktbl, (int)ptr);
1916 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1919 raw_user_interrupts_on();
1924 calCoords(corenum, &self_y, &self_x);
1925 calCoords(targetcore, &target_y, &target_x);
1926 // Build the message header
1927 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1929 target_y, target_x);
1930 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1931 raw_test_pass(0xbbbb);
1932 raw_test_pass(0xb000 + targetcore); // targetcore
1933 gdn_send(5); // lock release
1935 gdn_send(0); // read lock
1938 raw_test_pass_reg(ptr);
1939 raw_test_pass(0xffff);
1940 #elif defined THREADSIMULATE
1941 int numofcore = pthread_getspecific(key);
1942 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1943 printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1944 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1945 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1948 pthread_rwlock_t* rwlock_obj = NULL;
1949 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1950 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1951 printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1952 rc = pthread_rwlock_unlock(&rwlock_tbl);
1953 printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1958 bool getreadlock_I(void * ptr) {
1960 int self_y, self_x, target_y, target_x;
1961 int targetcore = ((int)ptr) % TOTALCORE;
1962 // for 32 bit machine, the size is always 4 words
1963 //int msgsize = sizeof(int) * 4;
1973 if(targetcore == corenum) {
1974 // reside on this core
1976 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1977 // no locks for this object
1978 // first time to operate on this shared object
1979 // create a lock for it
1980 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1981 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1984 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1985 if(-1 != rwlock_obj) {
1987 RuntimeHashremovekey(locktbl, (int)ptr);
1988 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1993 if(lockobj == (int)ptr) {
2004 // conflicts on lockresults
2005 raw_test_done(0xa005);
2010 calCoords(corenum, &self_y, &self_x);
2011 calCoords(targetcore, &target_y, &target_x);
2012 // Build the message header
2013 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2015 target_y, target_x);
2016 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2017 raw_test_pass(0xbbbb);
2018 raw_test_pass(0xb000 + targetcore); // targetcore
2019 gdn_send(2); // lock request
2021 gdn_send(0); // read lock
2024 raw_test_pass_reg(ptr);
2026 raw_test_pass_reg(corenum);
2027 raw_test_pass(0xffff);
2031 void releasereadlock_I(void * ptr) {
2033 int self_y, self_x, target_y, target_x;
2034 int targetcore = ((int)ptr) % TOTALCORE;
2035 // for 32 bit machine, the size is always 3 words
2036 //int msgsize = sizeof(int) * 3;
2039 if(targetcore == corenum) {
2040 // reside on this core
2041 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2042 // no locks for this object, something is wrong
2043 raw_test_done(0xa006);
2046 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2048 RuntimeHashremovekey(locktbl, (int)ptr);
2049 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2054 calCoords(corenum, &self_y, &self_x);
2055 calCoords(targetcore, &target_y, &target_x);
2056 // Build the message header
2057 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2059 target_y, target_x);
2060 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2061 raw_test_pass(0xbbbb);
2062 raw_test_pass(0xb000 + targetcore); // targetcore
2063 gdn_send(5); // lock release
2065 gdn_send(0); // read lock
2068 raw_test_pass_reg(ptr);
2069 raw_test_pass(0xffff);
2073 bool getwritelock(void * ptr) {
2076 int self_y, self_x, target_y, target_x;
2077 int targetcore = ((int)ptr) % TOTALCORE;
2078 // for 32 bit machine, the size is always 4 words
2079 //int msgsize = sizeof(int) * 4;
2089 if(targetcore == corenum) {
2090 // reside on this core
2093 raw_user_interrupts_off();
2095 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2096 // no locks for this object
2097 // first time to operate on this shared object
2098 // create a lock for it
2099 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
2100 raw_test_pass(0xe552);
2101 RuntimeHashadd_I(locktbl, (int)ptr, -1);
2104 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2105 raw_test_pass(0xe553);
2106 raw_test_pass_reg(rwlock_obj);
2107 if(0 == rwlock_obj) {
2109 RuntimeHashremovekey(locktbl, (int)ptr);
2110 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2116 raw_user_interrupts_on();
2118 raw_test_pass(0xe554);
2119 raw_test_pass_reg(lockresult);
2120 if(lockobj == (int)ptr) {
2133 // conflicts on lockresults
2134 raw_test_done(0xa007);
2139 raw_test_pass(0xe555);
2140 calCoords(corenum, &self_y, &self_x);
2141 calCoords(targetcore, &target_y, &target_x);
2142 // Build the message header
2143 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2145 target_y, target_x);
2146 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2147 raw_test_pass(0xbbbb);
2148 raw_test_pass(0xb000 + targetcore); // targetcore
2149 gdn_send(2); // lock request
2151 gdn_send(1); // write lock
2154 raw_test_pass_reg(ptr);
2156 raw_test_pass_reg(corenum);
2157 raw_test_pass(0xffff);
2159 #elif defined THREADSIMULATE
2160 int numofcore = pthread_getspecific(key);
2162 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
2163 printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2167 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2168 // no locks for this object
2169 // first time to operate on this shared object
2170 // create a lock for it
2171 rc = pthread_rwlock_unlock(&rwlock_tbl);
2172 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2173 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
2174 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
2175 rc = pthread_rwlock_init(rwlock, NULL);
2176 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2177 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
2178 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2180 pthread_rwlock_destroy(rwlock);
2184 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2186 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
2188 pthread_rwlock_destroy(rwlock);
2190 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
2192 rc = pthread_rwlock_unlock(&rwlock_tbl);
2193 printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2195 rc = pthread_rwlock_trywrlock(rwlock);
2196 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2203 pthread_rwlock_t* rwlock_obj = NULL;
2204 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2205 rc = pthread_rwlock_unlock(&rwlock_tbl);
2206 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2207 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
2208 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2219 void releasewritelock(void * ptr) {
2222 int self_y, self_x, target_y, target_x;
2223 int targetcore = ((int)ptr) % TOTALCORE;
2224 // for 32 bit machine, the size is always 3 words
2225 //int msgsize = sizeof(int) * 3;
2228 if(targetcore == corenum) {
2230 raw_user_interrupts_off();
2232 // reside on this core
2233 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2234 // no locks for this object, something is wrong
2235 raw_test_done(0xa008);
2238 raw_test_pass(0xe662);
2239 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2240 raw_test_pass_reg(rwlock_obj);
2242 RuntimeHashremovekey(locktbl, (int)ptr);
2243 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2244 raw_test_pass_reg(rwlock_obj);
2247 raw_user_interrupts_on();
2252 raw_test_pass(0xe663);
2253 calCoords(corenum, &self_y, &self_x);
2254 calCoords(targetcore, &target_y, &target_x);
2255 // Build the message header
2256 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2258 target_y, target_x);
2259 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2260 raw_test_pass(0xbbbb);
2261 raw_test_pass(0xb000 + targetcore);
2262 gdn_send(5); // lock release
2264 gdn_send(1); // write lock
2267 raw_test_pass_reg(ptr);
2268 raw_test_pass(0xffff);
2269 #elif defined THREADSIMULATE
2270 int numofcore = pthread_getspecific(key);
2271 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
2272 printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2273 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2274 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
2277 pthread_rwlock_t* rwlock_obj = NULL;
2278 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2279 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
2280 printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2281 rc = pthread_rwlock_unlock(&rwlock_tbl);
2282 printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2286 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2287 void * taskpointerarray[MAXTASKPARAMS];
2289 int numparams=parameter->task->numParameters;
2290 int numiterators=parameter->task->numTotal-1;
2295 struct taskdescriptor * task=parameter->task;
2297 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2299 /* Add enqueued object to parameter vector */
2300 taskpointerarray[parameter->slot]=ptr;
2302 /* Reset iterators */
2303 for(j=0;j<numiterators;j++) {
2304 toiReset(¶meter->iterators[j]);
2307 /* Find initial state */
2308 for(j=0;j<numiterators;j++) {
2310 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2311 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2313 /* Need to backtrack */
2314 toiReset(¶meter->iterators[j]);
2318 /* Nothing to enqueue */
2325 /* Enqueue current state */
2327 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
2329 tpd->numParameters=numiterators+1;
2330 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
2331 for(j=0;j<=numiterators;j++){
2332 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2335 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2336 genputtable(activetasks, tpd, tpd);
2338 RUNFREE(tpd->parameterArray);
2342 /* This loop iterates to the next parameter combination */
2343 if (numiterators==0)
2346 for(j=numiterators-1; j<numiterators;j++) {
2348 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2349 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2351 /* Need to backtrack */
2352 toiReset(¶meter->iterators[j]);
2356 /* Nothing more to enqueue */
2365 int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2366 void * taskpointerarray[MAXTASKPARAMS];
2368 int numparams=parameter->task->numParameters;
2369 int numiterators=parameter->task->numTotal-1;
2374 struct taskdescriptor * task=parameter->task;
2376 ObjectHashadd_I(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2378 /* Add enqueued object to parameter vector */
2379 taskpointerarray[parameter->slot]=ptr;
2381 /* Reset iterators */
2382 for(j=0;j<numiterators;j++) {
2383 toiReset(¶meter->iterators[j]);
2386 /* Find initial state */
2387 for(j=0;j<numiterators;j++) {
2389 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2390 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2392 /* Need to backtrack */
2393 toiReset(¶meter->iterators[j]);
2397 /* Nothing to enqueue */
2403 /* Enqueue current state */
2405 struct taskparamdescriptor *tpd=RUNMALLOC_I(sizeof(struct taskparamdescriptor));
2407 tpd->numParameters=numiterators+1;
2408 tpd->parameterArray=RUNMALLOC_I(sizeof(void *)*(numiterators+1));
2409 for(j=0;j<=numiterators;j++){
2410 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2413 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2414 genputtable_I(activetasks, tpd, tpd);
2416 RUNFREE(tpd->parameterArray);
2420 /* This loop iterates to the next parameter combination */
2421 if (numiterators==0)
2424 for(j=numiterators-1; j<numiterators;j++) {
2426 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2427 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2429 /* Need to backtrack */
2430 toiReset(¶meter->iterators[j]);
2434 /* Nothing more to enqueue */
2443 /* Handler for signals. The signals catch null pointer errors and
2444 arithmatic errors. */
2446 void myhandler(int sig, siginfo_t *info, void *uap) {
2449 printf("sig=%d\n",sig);
2452 sigemptyset(&toclear);
2453 sigaddset(&toclear, sig);
2454 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
2455 longjmp(error_handler,1);
2461 struct RuntimeHash *fdtoobject;
2463 void addreadfd(int fd) {
2466 FD_SET(fd, &readfds);
2469 void removereadfd(int fd) {
2470 FD_CLR(fd, &readfds);
2471 if (maxreadfd==(fd+1)) {
2473 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
2484 void executetasks() {
2485 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
2488 struct ___Object___ * tmpparam = NULL;
2489 struct parameterdescriptor * pd=NULL;
2490 struct parameterwrapper *pw=NULL;
2499 raw_test_pass(0xe991);
2503 /* Set up signal handlers */
2504 struct sigaction sig;
2505 sig.sa_sigaction=&myhandler;
2506 sig.sa_flags=SA_SIGINFO;
2507 sigemptyset(&sig.sa_mask);
2509 /* Catch bus errors, segmentation faults, and floating point exceptions*/
2510 sigaction(SIGBUS,&sig,0);
2511 sigaction(SIGSEGV,&sig,0);
2512 sigaction(SIGFPE,&sig,0);
2513 sigaction(SIGPIPE,&sig,0);
2522 fdtoobject=allocateRuntimeHash(100);
2526 /* Map first block of memory to protected, anonymous page */
2527 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
2531 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
2534 raw_test_pass(0xe992);
2536 /* Check if any filedescriptors have IO pending */
2539 struct timeval timeout={0,0};
2543 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
2545 /* Process ready fd's */
2547 for(fd=0;fd<maxreadfd;fd++) {
2548 if (FD_ISSET(fd, &tmpreadfds)) {
2549 /* Set ready flag on object */
2551 // printf("Setting fd %d\n",fd);
2552 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
2553 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
2554 enqueueObject(objptr, NULL, 0);
2563 /* See if there are any active tasks */
2564 if (hashsize(activetasks)>0) {
2566 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
2567 genfreekey(activetasks, currtpd);
2569 /* Check if this task has failed, allow a task that contains optional objects to fire */
2570 /*if (gencontains(failedtasks, currtpd)) {
2571 // Free up task parameter descriptor
2572 RUNFREE(currtpd->parameterArray);
2576 numparams=currtpd->task->numParameters;
2577 numtotal=currtpd->task->numTotal;
2579 #ifdef THREADSIMULATE
2580 int isolateflags[numparams];
2582 /* Make sure that the parameters are still in the queues */
2583 for(i=0;i<numparams;i++) {
2584 void * parameter=currtpd->parameterArray[i];
2586 raw_test_pass(0xe993);
2587 // require locks for this parameter
2588 getwritelock(parameter);
2597 while(receiveObject() != -1) {
2601 grount = lockresult;
2611 raw_test_pass(0xe994);
2612 // can not get the lock, try later
2613 for(j = 0; j < i; ++j) {
2614 releasewritelock(taskpointerarray[j+OFFSET]);
2616 genputtable(activetasks, currtpd, currtpd);
2622 for(tmp = 0; tmp < classsize[((struct ___Object___ *)parameter)->type]; ++tmp) {
2623 invalidateAddr(parameter + tmp);
2627 tmpparam = (struct ___Object___ *)parameter;
2628 #ifdef THREADSIMULATE
2629 if(0 == tmpparam->isolate) {
2630 isolateflags[i] = 0;
2631 // shared object, need to flush with current value
2632 //if(!getreadlock(tmpparam->original)) {
2633 // // fail to get read lock of the original object, try this task later
2634 if(!getwritelock(tmpparam->original)) {
2635 // fail to get write lock, release all obtained locks and try this task later
2637 for(j = 0; j < i; ++j) {
2638 if(0 == isolateflags[j]) {
2639 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2642 genputtable(activetasks, currtpd, currtpd);
2645 if(tmpparam->version != tmpparam->original->version) {
2646 // some task on another core has changed this object
2647 // flush this object
2648 //memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
2649 // release all obtained locks
2651 for(j = 0; j < i; ++j) {
2652 if(0 == isolateflags[j]) {
2653 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2656 releasewritelock(tmpparam->original);
2658 // dequeue this object
2659 int numofcore = pthread_getspecific(key);
2660 struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
2661 int length = numqueues[numofcore][tmpparam->type];
2662 for(j = 0; j < length; ++j) {
2663 struct parameterwrapper * pw = queues[j];
2664 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
2666 int UNUSED, UNUSED2;
2668 ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2669 ObjectHashremove(pw->objectset, (int)tmpparam);
2670 if (enterflags!=NULL)
2674 // try to enqueue it again to check if it feeds other tasks;
2675 //enqueueObject(tmpparam, NULL, 0);
2676 // Free up task parameter descriptor
2677 RUNFREE(currtpd->parameterArray);
2682 isolateflags[i] = 1;
2685 pd=currtpd->task->descriptorarray[i];
2686 pw=(struct parameterwrapper *) pd->queue;
2687 /* Check that object is still in queue */
2689 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
2691 raw_test_pass(0xe995);
2693 RUNFREE(currtpd->parameterArray);
2699 /* Check if the object's flags still meets requirements */
2703 for(tmpi = 0; tmpi < pw->numberofterms; ++tmpi) {
2704 andmask=pw->intarray[tmpi*2];
2705 checkmask=pw->intarray[tmpi*2+1];
2706 raw_test_pass(0xdd000000 + andmask);
2707 raw_test_pass_reg((int)parameter);
2708 raw_test_pass(0xdd000000 + ((struct ___Object___ *)parameter)->flag);
2709 raw_test_pass(0xdd000000 + checkmask);
2710 if((((struct ___Object___ *)parameter)->flag&andmask)==checkmask) {
2716 // flags are never suitable
2717 // remove this obj from the queue
2719 int UNUSED, UNUSED2;
2721 raw_test_pass(0xe996);
2722 ObjectHashget(pw->objectset, (int) parameter, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2723 ObjectHashremove(pw->objectset, (int)parameter);
2724 if (enterflags!=NULL)
2726 // release grabbed locks
2727 for(j = 0; j < i; ++j) {
2728 releasewritelock(taskpointerarray[j+OFFSET]);
2730 releasewritelock(parameter);
2731 RUNFREE(currtpd->parameterArray);
2739 /* Check that object still has necessary tags */
2740 for(j=0;j<pd->numbertags;j++) {
2741 int slotid=pd->tagarray[2*j]+numparams;
2742 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
2743 if (!containstag(parameter, tagd)) {
2745 raw_test_pass(0xe997);
2747 RUNFREE(currtpd->parameterArray);
2753 taskpointerarray[i+OFFSET]=parameter;
2756 for(;i<numtotal;i++) {
2757 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
2760 #ifdef THREADSIMULATE
2761 for(i = 0; i < numparams; ++i) {
2762 if(0 == isolateflags[i]) {
2763 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2764 if(tmpparam != tmpparam->original) {
2765 taskpointerarray[i+OFFSET] = tmpparam->original;
2774 /* Checkpoint the state */
2775 forward=allocateRuntimeHash(100);
2776 reverse=allocateRuntimeHash(100);
2777 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
2780 if (x=setjmp(error_handler)) {
2785 printf("Fatal Error=%d, Recovering!\n",x);
2789 genputtable(failedtasks,currtpd,currtpd);
2790 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
2792 freeRuntimeHash(forward);
2793 freeRuntimeHash(reverse);
2800 raw_test_pass_reg(x);
2801 raw_test_done(0xa009);
2806 /*if (injectfailures) {
2807 if ((((double)random())/RAND_MAX)<failurechance) {
2808 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
2809 longjmp(error_handler,10);
2812 /* Actually call task */
2814 ((int *)taskpointerarray)[0]=currtpd->numParameters;
2815 taskpointerarray[1]=NULL;
2819 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2821 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2823 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2826 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2829 for(i = 0; i < numparams; ++i) {
2831 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2832 raw_test_pass(0xe998);
2833 raw_test_pass(0xdd100000 + tmpparam->flag);
2834 releasewritelock(tmpparam);
2836 #elif defined THREADSIMULATE
2837 for(i = 0; i < numparams; ++i) {
2838 if(0 == isolateflags[i]) {
2839 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2840 releasewritelock(tmpparam);
2847 freeRuntimeHash(forward);
2848 freeRuntimeHash(reverse);
2852 // Free up task parameter descriptor
2853 RUNFREE(currtpd->parameterArray);
2867 /* This function processes an objects tags */
2868 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
2871 for(i=0;i<pd->numbertags;i++) {
2872 int slotid=pd->tagarray[2*i];
2873 int tagid=pd->tagarray[2*i+1];
2875 if (statusarray[slotid+numparams]==0) {
2876 parameter->iterators[*iteratorcount].istag=1;
2877 parameter->iterators[*iteratorcount].tagid=tagid;
2878 parameter->iterators[*iteratorcount].slot=slotid+numparams;
2879 parameter->iterators[*iteratorcount].tagobjectslot=index;
2880 statusarray[slotid+numparams]=1;
2887 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
2890 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
2892 parameter->iterators[*iteratorcount].istag=0;
2893 parameter->iterators[*iteratorcount].slot=index;
2894 parameter->iterators[*iteratorcount].objectset=objectset;
2895 statusarray[index]=1;
2897 for(i=0;i<pd->numbertags;i++) {
2898 int slotid=pd->tagarray[2*i];
2899 int tagid=pd->tagarray[2*i+1];
2900 if (statusarray[slotid+numparams]!=0) {
2901 /* This tag has already been enqueued, use it to narrow search */
2902 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
2906 parameter->iterators[*iteratorcount].numtags=tagcount;
2911 /* This function builds the iterators for a task & parameter */
2913 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
2914 int statusarray[MAXTASKPARAMS];
2916 int numparams=task->numParameters;
2917 int iteratorcount=0;
2918 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
2920 statusarray[index]=1; /* Initial parameter */
2921 /* Process tags for initial iterator */
2923 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
2927 /* Check for objects with existing tags */
2928 for(i=0;i<numparams;i++) {
2929 if (statusarray[i]==0) {
2930 struct parameterdescriptor *pd=task->descriptorarray[i];
2932 for(j=0;j<pd->numbertags;j++) {
2933 int slotid=pd->tagarray[2*j];
2934 if(statusarray[slotid+numparams]!=0) {
2935 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2936 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2943 /* Next do objects w/ unbound tags*/
2945 for(i=0;i<numparams;i++) {
2946 if (statusarray[i]==0) {
2947 struct parameterdescriptor *pd=task->descriptorarray[i];
2948 if (pd->numbertags>0) {
2949 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2950 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2956 /* Nothing with a tag enqueued */
2958 for(i=0;i<numparams;i++) {
2959 if (statusarray[i]==0) {
2960 struct parameterdescriptor *pd=task->descriptorarray[i];
2961 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2962 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2975 #ifdef THREADSIMULATE
2976 int numofcore = pthread_getspecific(key);
2977 for(i=0;i<numtasks[numofcore];i++) {
2978 struct taskdescriptor * task=taskarray[numofcore][i];
2981 if(corenum > NUMCORES - 1) {
2985 for(i=0;i<numtasks[corenum];i++) {
2986 struct taskdescriptor * task=taskarray[corenum][i];
2989 printf("%s\n", task->name);
2991 for(j=0;j<task->numParameters;j++) {
2992 struct parameterdescriptor *param=task->descriptorarray[j];
2993 struct parameterwrapper *parameter=param->queue;
2994 struct ObjectHash * set=parameter->objectset;
2995 struct ObjectIterator objit;
2997 printf(" Parameter %d\n", j);
2999 ObjectHashiterator(set, &objit);
3000 while(ObjhasNext(&objit)) {
3001 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
3002 struct ___Object___ * tagptr=obj->___tags___;
3003 int nonfailed=Objdata4(&objit);
3004 int numflags=Objdata3(&objit);
3005 int flags=Objdata2(&objit);
3008 printf(" Contains %lx\n", obj);
3009 printf(" flag=%d\n", obj->flag);
3012 } else if (tagptr->type==TAGTYPE) {
3014 printf(" tag=%lx\n",tagptr);
3019 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
3020 for(;tagindex<ao->___cachedCode___;tagindex++) {
3022 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
3032 /* This function processes the task information to create queues for
3033 each parameter type. */
3035 void processtasks() {
3038 if(corenum > NUMCORES - 1) {
3042 #ifdef THREADSIMULATE
3043 int numofcore = pthread_getspecific(key);
3044 for(i=0;i<numtasks[numofcore];i++) {
3045 struct taskdescriptor *task=taskarray[numofcore][i];
3047 for(i=0;i<numtasks[corenum];i++) {
3048 struct taskdescriptor * task=taskarray[corenum][i];
3052 /* Build objectsets */
3053 for(j=0;j<task->numParameters;j++) {
3054 struct parameterdescriptor *param=task->descriptorarray[j];
3055 struct parameterwrapper *parameter=param->queue;
3056 parameter->objectset=allocateObjectHash(10);
3057 parameter->task=task;
3060 /* Build iterators for parameters */
3061 for(j=0;j<task->numParameters;j++) {
3062 struct parameterdescriptor *param=task->descriptorarray[j];
3063 struct parameterwrapper *parameter=param->queue;
3064 builditerators(task, j, parameter);
3069 void toiReset(struct tagobjectiterator * it) {
3072 } else if (it->numtags>0) {
3075 ObjectHashiterator(it->objectset, &it->it);
3079 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
3082 /* Get object with tags */
3083 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3084 struct ___Object___ *tagptr=obj->___tags___;
3085 if (tagptr->type==TAGTYPE) {
3086 if ((it->tagobjindex==0)&& /* First object */
3087 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
3092 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3093 int tagindex=it->tagobjindex;
3094 for(;tagindex<ao->___cachedCode___;tagindex++) {
3095 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
3096 if (td->flag==it->tagid) {
3097 it->tagobjindex=tagindex; /* Found right type of tag */
3103 } else if (it->numtags>0) {
3104 /* Use tags to locate appropriate objects */
3105 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3106 struct ___Object___ *objptr=tag->flagptr;
3108 if (objptr->type!=OBJECTARRAYTYPE) {
3109 if (it->tagobjindex>0)
3111 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3113 for(i=1;i<it->numtags;i++) {
3114 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3115 if (!containstag(objptr,tag2))
3120 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3123 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
3124 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
3125 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3127 for(i=1;i<it->numtags;i++) {
3128 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3129 if (!containstag(objptr,tag2))
3132 it->tagobjindex=tagindex;
3137 it->tagobjindex=tagindex;
3141 return ObjhasNext(&it->it);
3145 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
3147 struct ___Object___ * objptr=tag->flagptr;
3148 if (objptr->type==OBJECTARRAYTYPE) {
3149 struct ArrayObject *ao=(struct ArrayObject *)objptr;
3150 for(j=0;j<ao->___cachedCode___;j++) {
3151 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
3159 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
3160 /* hasNext has all of the intelligence */
3163 /* Get object with tags */
3164 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3165 struct ___Object___ *tagptr=obj->___tags___;
3166 if (tagptr->type==TAGTYPE) {
3168 objectarray[it->slot]=tagptr;
3170 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3171 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
3173 } else if (it->numtags>0) {
3174 /* Use tags to locate appropriate objects */
3175 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3176 struct ___Object___ *objptr=tag->flagptr;
3177 if (objptr->type!=OBJECTARRAYTYPE) {
3179 objectarray[it->slot]=objptr;
3181 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3182 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
3185 /* Iterate object */
3186 objectarray[it->slot]=(void *)Objkey(&it->it);