4 #include "structdefs.h"
6 #include "checkpoint.h"
8 #include "SimpleHash.h"
9 #include "GenericHashtable.h"
10 #include <sys/select.h>
11 #include <sys/types.h>
20 #include <raw_compiler_defs.h>
21 //#include <libints.h>
22 #elif defined THREADSIMULATE
23 // use POSIX message queue
24 // for each core, its message queue named as
30 extern int injectfailures;
31 extern float failurechance;
37 #define TOTALCORE raw_get_num_tiles()
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
48 struct RuntimeHash * forward;
49 struct RuntimeHash * reverse;
52 int corestatus[NUMCORES]; // records status of each core
55 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
56 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
58 struct RuntimeHash locktable;
59 static struct RuntimeHash* locktbl = &locktable;
60 void * curr_heapbase=0;
61 void * curr_heaptop=0;
63 int self_numreceiveobjs;
70 struct Queue objqueue;
75 void calCoords(int core_num, int* coordY, int* coordX);
77 #elif defined THREADSIMULATE
78 static struct RuntimeHash* locktbl;
86 struct thread_data thread_data_array[NUMCORES];
88 static pthread_key_t key;
89 static pthread_rwlock_t rwlock_tbl;
90 static pthread_rwlock_t rwlock_init;
95 bool transStallMsg(int targetcore);
96 void transTerminateMsg(int targetcore);
98 bool getreadlock(void* ptr);
99 void releasereadlock(void* ptr);
101 bool getreadlock_I(void* ptr);
102 void releasereadlock_I(void* ptr);
104 bool getwritelock(void* ptr);
105 void releasewritelock(void* ptr);
109 void flushAll(void) {
113 raw_user_interrupts_off();
115 raw_test_pass(0xec00);
116 for(i = 0; i < 512; ++i) {
119 flushCacheline(base);
120 flushCacheline(base|off1);
123 raw_user_interrupts_on();
125 raw_test_pass(0xec02);
131 //raw_test_pass(0xefee);
132 //raw_user_interrupts_off();
133 //raw_test_pass(0xef00);
135 //raw_test_pass(0xefff);
136 //raw_user_interrupts_on();
137 //raw_test_pass(0xefef);
142 int main(int argc, char **argv) {
148 bool sendStall = false;
150 bool tocontinue = false;
151 struct QueueItem * objitem = NULL;
152 struct transObjInfo * objInfo = NULL;
154 bool allStall = true;
157 raw_test_pass_reg(&locktable);
158 raw_test_pass_reg(&msglength);
159 raw_test_pass_reg(&i);
160 raw_test_pass(0xee01);
161 corenum = raw_get_abs_pos_x() + 4 * raw_get_abs_pos_y();
163 // initialize the arrays
164 if(STARTUPCORE == corenum) {
165 // startup core to initialize corestatus[]
166 for(i = 0; i < NUMCORES; ++i) {
168 numsendobjs[i] = 0; // assume all variables in RAW are local variables! MAY BE WRONG!!!
169 numreceiveobjs[i] = 0;
172 self_numsendobjs = 0;
173 self_numreceiveobjs = 0;
174 for(i = 0; i < 30; ++i) {
181 raw_test_pass(0xee02);
183 // create the lock table, lockresult table and obj queue
185 locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
186 /* Set allocation blocks*/
187 locktable.listhead=NULL;
188 locktable.listtail=NULL;
190 locktable.numelements = 0;
197 objqueue.head = NULL;
198 objqueue.tail = NULL;
199 raw_test_pass(0xee03);
202 if (corenum < NUMCORES) {
205 //setup_interrupts();
206 //start_gdn_avail_ints(recvMsg);
207 raw_user_interrupts_on();
208 raw_test_pass(0xee04);
212 #elif defined THREADSIMULATE
216 pthread_t threads[NUMCORES];
219 // initialize three arrays and msg queue array
220 char * pathhead = "/msgqueue_";
221 int targetlen = strlen(pathhead);
222 for(i = 0; i < NUMCORES; ++i) {
225 numreceiveobjs[i] = 0;
230 corenumstr[0] = i + '0';
231 corenumstr[1] = '\0';
234 corenumstr[1] = i %10 + '0';
235 corenumstr[0] = (i / 10) + '0';
236 corenumstr[2] = '\0';
239 printf("Error: i >= 100\n");
243 char path[targetlen + sourcelen + 1];
244 strcpy(path, pathhead);
245 strncat(path, corenumstr, sourcelen);
246 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
247 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
249 mqd[i]= mq_open(path, oflags, omodes, NULL);
251 printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
254 printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
259 pthread_key_create(&key, NULL);
261 // create the lock table and initialize its mutex
262 locktbl = allocateRuntimeHash(20);
263 int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
264 printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
266 for(i = 0; i < NUMCORES; ++i) {
267 thread_data_array[i].corenum = i;
268 thread_data_array[i].argc = argc;
269 thread_data_array[i].argv = argv;
270 thread_data_array[i].numsendobjs = 0;
271 thread_data_array[i].numreceiveobjs = 0;
272 printf("[main] creating thread %d\n", i);
273 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
275 printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
281 //pthread_exit(NULL);
285 void run(void* arg) {
286 struct thread_data * my_tdata = (struct thread_data *)arg;
287 pthread_setspecific(key, (void *)my_tdata->corenum);
288 int argc = my_tdata->argc;
289 char** argv = my_tdata->argv;
290 printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
296 GC_init(); // Initialize the garbage collector
304 initializeexithandler();
306 raw_test_pass(0xee05);
308 /* Create table for failed tasks */
310 if(corenum > NUMCORES - 1) {
317 raw_test_pass(0xee06);
319 /*failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
320 (int (*)(void *,void *)) &comparetpd);*/
323 raw_test_pass(0xee07);
325 /* Create queue of active tasks */
326 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
327 (int (*)(void *,void *)) &comparetpd);
329 raw_test_pass(0xee08);
332 /* Process task information */
335 raw_test_pass(0xee09);
338 /* Create startup object */
339 createstartupobject(argc, argv);
341 raw_test_pass(0xee0a);
345 raw_test_pass(0xee0b);
349 while(receiveObject() != -1) {
353 // check if there are new active tasks can be executed
357 while(receiveObject() != -1) {
361 raw_test_pass(0xee0c);
363 // check if there are some pending objects, if yes, enqueue them and executetasks again
365 raw_test_pass(0xee0d);
366 while(!isEmpty(&objqueue)) {
369 raw_user_interrupts_off();
371 raw_test_pass(0xeee1);
374 objitem = getTail(&objqueue);
375 //obj = objitem->objectptr;
376 objInfo = (struct transObjInfo *)objitem->objectptr;
377 obj = objInfo->objptr;
378 raw_test_pass_reg((int)obj);
379 // grab lock and flush the obj
385 raw_test_pass_reg(grount);
397 for(k = 0; k < classsize[((struct ___Object___ *)obj)->type]; ++k) {
398 invalidateAddr(obj + k);
400 // enqueue the object
401 for(k = 0; k < objInfo->length; ++k) {
402 int taskindex = objInfo->queues[2 * k];
403 int paramindex = objInfo->queues[2 * k + 1];
404 struct parameterwrapper ** queues = &(paramqueues[corenum][taskindex][paramindex]);
405 raw_test_pass_reg(taskindex);
406 raw_test_pass_reg(paramindex);
407 enqueueObject_I(obj, queues, 1);
409 removeItem(&objqueue, objitem);
410 releasereadlock_I(obj);
411 RUNFREE(objInfo->queues);
413 /*enqueueObject_I(obj, NULL, 0);
414 removeItem(&objqueue, objitem);
415 releasereadlock_I(obj);*/
418 // put it at the end of the queue
419 // and try to execute active tasks already enqueued first
420 removeItem(&objqueue, objitem);
421 addNewItem_I(&objqueue, objInfo);
425 raw_user_interrupts_on();
427 raw_test_pass(0xee0e);
429 raw_test_pass(0xee0f);
433 if(STARTUPCORE == corenum) {
435 raw_test_pass(0xee10);
439 raw_user_interrupts_off();
441 corestatus[corenum] = 0;
442 numsendobjs[corenum] = self_numsendobjs;
443 numreceiveobjs[corenum] = self_numreceiveobjs;
444 // check the status of all cores
446 raw_test_pass_reg(NUMCORES);
447 for(i = 0; i < NUMCORES; ++i) {
448 raw_test_pass(0xe000 + corestatus[i]);
449 if(corestatus[i] != 0) {
455 // check if the sum of send objs and receive obj are the same
457 // no->go on executing
459 for(i = 0; i < NUMCORES; ++i) {
460 sumsendobj += numsendobjs[i];
461 raw_test_pass(0xf000 + numsendobjs[i]);
463 for(i = 0; i < NUMCORES; ++i) {
464 sumsendobj -= numreceiveobjs[i];
465 raw_test_pass(0xf000 + numreceiveobjs[i]);
467 if(0 == sumsendobj) {
469 raw_test_pass(0xee11);
470 raw_test_done(1); // All done.
474 raw_user_interrupts_on();
478 raw_test_pass(0xee12);
480 // wait for some time
482 raw_test_pass(0xee13);
485 raw_test_pass(0xee14);
487 // send StallMsg to startup core
488 raw_test_pass(0xee15);
489 sendStall = transStallMsg(STARTUPCORE);
494 raw_test_pass(0xee16);
500 #elif defined THREADSIMULATE
501 /* Start executing the tasks */
505 // check if there are new objects coming
506 bool sendStall = false;
508 int numofcore = pthread_getspecific(key);
510 switch(receiveObject()) {
512 printf("[run, %d] receive an object\n", numofcore);
514 // received an object
515 // check if there are new active tasks can be executed
520 //printf("[run, %d] no msg\n", numofcore);
522 if(STARTUPCORE == numofcore) {
523 corestatus[numofcore] = 0;
524 // check the status of all cores
525 bool allStall = true;
526 for(i = 0; i < NUMCORES; ++i) {
527 if(corestatus[i] != 0) {
533 // check if the sum of send objs and receive obj are the same
535 // no->go on executing
537 for(i = 0; i < NUMCORES; ++i) {
538 sumsendobj += numsendobjs[i];
540 for(i = 0; i < NUMCORES; ++i) {
541 sumsendobj -= numreceiveobjs[i];
543 if(0 == sumsendobj) {
547 int rc_tbl = pthread_rwlock_wrlock(&rwlock_tbl);
548 printf("[run, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc_tbl, strerror(rc_tbl));
549 struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
550 while(0 != RunhasNext(it_lock)) {
551 int key = Runkey(it_lock);
552 pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
553 int rc_des = pthread_rwlock_destroy(rwlock_obj);
554 printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
557 freeRuntimeHash(locktbl);
561 // destroy all message queues
562 char * pathhead = "/msgqueue_";
563 int targetlen = strlen(pathhead);
564 for(i = 0; i < NUMCORES; ++i) {
568 corenumstr[0] = i + '0';
569 corenumstr[1] = '\0';
572 corenumstr[1] = i %10 + '0';
573 corenumstr[0] = (i / 10) + '0';
574 corenumstr[2] = '\0';
577 printf("Error: i >= 100\n");
581 char path[targetlen + sourcelen + 1];
582 strcpy(path, pathhead);
583 strncat(path, corenumstr, sourcelen);
587 printf("[run, %d] terminate!\n", numofcore);
594 // send StallMsg to startup core
595 sendStall = transStallMsg(STARTUPCORE);
601 printf("[run, %d] receive a stall msg\n", numofcore);
602 // receive a Stall Msg, do nothing
603 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
608 printf("[run, %d] receive a terminate msg\n", numofcore);
609 // receive a terminate Msg
610 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
611 mq_close(mqd[corenum]);
617 printf("[run, %d] Error: invalid message type.\n", numofcore);
627 void createstartupobject(int argc, char ** argv) {
630 /* Allocate startup object */
632 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
633 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
635 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
636 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
638 /* Build array of strings */
639 startupobject->___parameters___=stringarray;
640 for(i=1;i<argc;i++) {
641 int length=strlen(argv[i]);
643 struct ___String___ *newstring=NewString(NULL, argv[i],length);
645 struct ___String___ *newstring=NewString(argv[i],length);
647 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
650 startupobject->isolate = 1;
651 startupobject->version = 0;
653 /* Set initialized flag for startup object */
654 flagorandinit(startupobject,1,0xFFFFFFFF);
655 enqueueObject(startupobject, NULL, 0);
661 int hashCodetpd(struct taskparamdescriptor *ftd) {
662 int hash=(int)ftd->task;
664 for(i=0;i<ftd->numParameters;i++){
665 hash^=(int)ftd->parameterArray[i];
670 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
672 if (ftd1->task!=ftd2->task)
674 for(i=0;i<ftd1->numParameters;i++)
675 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
680 /* This function sets a tag. */
682 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
684 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
686 struct ArrayObject * ao=NULL;
687 struct ___Object___ * tagptr=obj->___tags___;
689 raw_test_pass(0xebb0);
693 raw_test_pass(0xebb1);
695 obj->___tags___=(struct ___Object___ *)tagd;
697 /* Have to check if it is already set */
698 if (tagptr->type==TAGTYPE) {
699 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
701 raw_test_pass(0xebb2);
705 raw_test_pass(0xebb3);
710 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
711 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
712 obj=(struct ___Object___ *)ptrarray[2];
713 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
714 td=(struct ___TagDescriptor___ *) obj->___tags___;
717 raw_test_pass(0xebb4);
719 ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
722 raw_test_pass(0xebb5);
724 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
725 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
726 obj->___tags___=(struct ___Object___ *) ao;
727 ao->___cachedCode___=2;
729 raw_test_pass(0xebb6);
734 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
736 raw_test_pass(0xebb7);
738 for(i=0;i<ao->___cachedCode___;i++) {
739 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
741 raw_test_pass(0xebb8);
745 raw_test_pass(0xebb9);
750 if (ao->___cachedCode___<ao->___length___) {
752 raw_test_pass(0xebba);
754 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
755 ao->___cachedCode___++;
757 raw_test_pass(0xebbb);
761 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
762 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
763 obj=(struct ___Object___ *)ptrarray[2];
764 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
765 ao=(struct ArrayObject *)obj->___tags___;
767 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
770 raw_test_pass(0xebbc);
772 aonew->___cachedCode___=ao->___length___+1;
773 for(i=0;i<ao->___length___;i++) {
775 raw_test_pass(0xebbd);
777 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
780 raw_test_pass(0xebbe);
782 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
784 raw_test_pass(0xebbf);
791 struct ___Object___ * tagset=tagd->flagptr;
793 raw_test_pass(0xb008);
797 raw_test_pass(0xb009);
800 } else if (tagset->type!=OBJECTARRAYTYPE) {
802 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
803 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
804 obj=(struct ___Object___ *)ptrarray[2];
805 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
807 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
809 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
810 ARRAYSET(ao, struct ___Object___ *, 1, obj);
811 ao->___cachedCode___=2;
812 tagd->flagptr=(struct ___Object___ *)ao;
814 raw_test_pass(0xb00a);
817 struct ArrayObject *ao=(struct ArrayObject *) tagset;
818 if (ao->___cachedCode___<ao->___length___) {
820 raw_test_pass(0xb00b);
822 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
826 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
827 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
828 obj=(struct ___Object___ *)ptrarray[2];
829 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
830 ao=(struct ArrayObject *)tagd->flagptr;
832 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
834 aonew->___cachedCode___=ao->___cachedCode___+1;
835 for(i=0;i<ao->___length___;i++) {
836 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
838 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
839 tagd->flagptr=(struct ___Object___ *) aonew;
841 raw_test_pass(0xb00c);
848 /* This function clears a tag. */
850 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
852 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
854 /* We'll assume that tag is alway there.
855 Need to statically check for this of course. */
856 struct ___Object___ * tagptr=obj->___tags___;
858 if (tagptr->type==TAGTYPE) {
859 if ((struct ___TagDescriptor___ *)tagptr==tagd)
860 obj->___tags___=NULL;
863 printf("ERROR 1 in tagclear\n");
867 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
869 for(i=0;i<ao->___cachedCode___;i++) {
870 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
872 ao->___cachedCode___--;
873 if (i<ao->___cachedCode___)
874 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
875 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
876 if (ao->___cachedCode___==0)
877 obj->___tags___=NULL;
882 printf("ERROR 2 in tagclear\n");
888 struct ___Object___ *tagset=tagd->flagptr;
889 if (tagset->type!=OBJECTARRAYTYPE) {
894 printf("ERROR 3 in tagclear\n");
898 struct ArrayObject *ao=(struct ArrayObject *) tagset;
900 for(i=0;i<ao->___cachedCode___;i++) {
901 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
903 ao->___cachedCode___--;
904 if (i<ao->___cachedCode___)
905 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
906 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
907 if (ao->___cachedCode___==0)
913 printf("ERROR 4 in tagclear\n");
921 /* This function allocates a new tag. */
923 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
924 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
926 struct ___TagDescriptor___ * allocate_tag(int index) {
927 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
936 /* This function updates the flag for object ptr. It or's the flag
937 with the or mask and and's it with the andmask. */
939 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
941 int flagcomp(const int *val1, const int *val2) {
942 return (*val1)-(*val2);
945 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
947 int oldflag=((int *)ptr)[1];
948 int flag=ormask|oldflag;
951 raw_test_pass(0xaa000000 + oldflag);
952 raw_test_pass(0xaa000000 + flag);
954 flagbody(ptr, flag, queues, length, false);
958 bool intflagorand(void * ptr, int ormask, int andmask) {
960 int oldflag=((int *)ptr)[1];
961 int flag=ormask|oldflag;
963 if (flag==oldflag) /* Don't do anything */
966 flagbody(ptr, flag, NULL, 0, false);
972 void flagorandinit(void * ptr, int ormask, int andmask) {
973 int oldflag=((int *)ptr)[1];
974 int flag=ormask|oldflag;
977 raw_test_pass(0xaa100000 + oldflag);
978 raw_test_pass(0xaa100000 + flag);
980 flagbody(ptr,flag,NULL,0,true);
983 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
984 struct parameterwrapper * flagptr = NULL;
986 struct parameterwrapper ** queues = vqueues;
987 int length = vlength;
990 int * enterflags = NULL;
991 if((!isnew) && (queues == NULL)) {
992 #ifdef THREADSIMULATE
993 int numofcore = pthread_getspecific(key);
994 queues = objectqueues[numofcore][ptr->type];
995 length = numqueues[numofcore][ptr->type];
998 if(corenum < NUMCORES) {
1000 queues = objectqueues[corenum][ptr->type];
1001 length = numqueues[corenum][ptr->type];
1011 raw_test_pass(0xbb000000 + ptr->flag);
1014 /*Remove object from all queues */
1015 for(i = 0; i < length; ++i) {
1016 flagptr = queues[i];
1017 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1018 ObjectHashremove(flagptr->objectset, (int)ptr);
1019 if (enterflags!=NULL)
1020 RUNFREE(enterflags);
1024 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1025 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1028 struct QueueItem *tmpptr;
1029 struct parameterwrapper * parameter=NULL;
1032 struct parameterwrapper * prevptr=NULL;
1033 struct ___Object___ *tagptr=NULL;
1034 struct parameterwrapper ** queues = vqueues;
1035 int length = vlength;
1037 if(corenum > NUMCORES - 1) {
1041 if(queues == NULL) {
1042 #ifdef THREADSIMULATE
1043 int numofcore = pthread_getspecific(key);
1044 queues = objectqueues[numofcore][ptr->type];
1045 length = numqueues[numofcore][ptr->type];
1047 queues = objectqueues[corenum][ptr->type];
1048 length = numqueues[corenum][ptr->type];
1051 tagptr=ptr->___tags___;
1053 /* Outer loop iterates through all parameter queues an object of
1054 this type could be in. */
1055 for(j = 0; j < length; ++j) {
1056 parameter = queues[j];
1058 if (parameter->numbertags>0) {
1060 goto nextloop;//that means the object has no tag but that param needs tag
1061 else if(tagptr->type==TAGTYPE) {//one tag
1062 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1063 for(i=0;i<parameter->numbertags;i++) {
1064 //slotid is parameter->tagarray[2*i];
1065 int tagid=parameter->tagarray[2*i+1];
1066 if (tagid!=tagptr->flag)
1067 goto nextloop; /*We don't have this tag */
1069 } else {//multiple tags
1070 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1071 for(i=0;i<parameter->numbertags;i++) {
1072 //slotid is parameter->tagarray[2*i];
1073 int tagid=parameter->tagarray[2*i+1];
1075 for(j=0;j<ao->___cachedCode___;j++) {
1076 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
1087 for(i=0;i<parameter->numberofterms;i++) {
1088 int andmask=parameter->intarray[i*2];
1089 int checkmask=parameter->intarray[i*2+1];
1090 if ((ptr->flag&andmask)==checkmask) {
1092 raw_test_pass(0xcc000000 + andmask);
1093 raw_test_pass_reg((int)ptr);
1094 raw_test_pass(0xcc000000 + ptr->flag);
1095 raw_test_pass(0xcc000000 + checkmask);
1097 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
1109 void enqueueObject_I(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1110 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1113 struct QueueItem *tmpptr;
1114 struct parameterwrapper * parameter=NULL;
1117 struct parameterwrapper * prevptr=NULL;
1118 struct ___Object___ *tagptr=NULL;
1119 struct parameterwrapper ** queues = vqueues;
1120 int length = vlength;
1122 if(corenum > NUMCORES - 1) {
1126 if(queues == NULL) {
1127 #ifdef THREADSIMULATE
1128 int numofcore = pthread_getspecific(key);
1129 queues = objectqueues[numofcore][ptr->type];
1130 length = numqueues[numofcore][ptr->type];
1132 queues = objectqueues[corenum][ptr->type];
1133 length = numqueues[corenum][ptr->type];
1137 raw_test_pass(0xeaa1);
1138 raw_test_pass_reg(queues);
1139 raw_test_pass_reg(length);
1141 tagptr=ptr->___tags___;
1143 /* Outer loop iterates through all parameter queues an object of
1144 this type could be in. */
1145 for(j = 0; j < length; ++j) {
1146 parameter = queues[j];
1148 if (parameter->numbertags>0) {
1150 raw_test_pass(0xeaa2);
1151 raw_test_pass_reg(tagptr);
1154 goto nextloop;//that means the object has no tag but that param needs tag
1155 else if(tagptr->type==TAGTYPE) {//one tag
1156 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1158 raw_test_pass(0xeaa3);
1160 for(i=0;i<parameter->numbertags;i++) {
1161 //slotid is parameter->tagarray[2*i];
1162 int tagid=parameter->tagarray[2*i+1];
1163 if (tagid!=tagptr->flag) {
1165 raw_test_pass(0xeaa4);
1167 goto nextloop; /*We don't have this tag */
1170 } else {//multiple tags
1171 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1173 raw_test_pass(0xeaa5);
1175 for(i=0;i<parameter->numbertags;i++) {
1176 //slotid is parameter->tagarray[2*i];
1177 int tagid=parameter->tagarray[2*i+1];
1179 for(j=0;j<ao->___cachedCode___;j++) {
1180 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) {
1185 raw_test_pass(0xeaa6);
1195 for(i=0;i<parameter->numberofterms;i++) {
1196 int andmask=parameter->intarray[i*2];
1197 int checkmask=parameter->intarray[i*2+1];
1199 raw_test_pass(0xeaa7);
1200 raw_test_pass(0xcc000000 + andmask);
1201 raw_test_pass_reg(ptr);
1202 raw_test_pass(0xcc000000 + ptr->flag);
1203 raw_test_pass(0xcc000000 + checkmask);
1205 if ((ptr->flag&andmask)==checkmask) {
1207 raw_test_pass(0xeaa8);
1209 enqueuetasks_I(parameter, prevptr, ptr, NULL, 0);
1220 // helper function to compute the coordinates of a core from the core number
1221 void calCoords(int core_num, int* coordY, int* coordX) {
1222 *coordX = core_num % 4;
1223 *coordY = core_num / 4;
1227 /* Message format for RAW version:
1229 * type: 0 -- transfer object
1230 * 1 -- transfer stall msg
1236 * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
1237 * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
1238 * LockMsg: 2 + lock type + obj pointer + request core (size is always 4 * sizeof(int))
1239 * 3/4/5 + lock type + obj pointer (size is always 3 * sizeof(int))
1240 * lock type: 0 -- read; 1 -- write
1243 // transfer an object to targetcore
1245 void transferObject(struct transObjInfo * transObj) {
1246 void * obj = transObj->objptr;
1247 int type=((int *)obj)[0];
1248 int size=classsize[type];
1249 int targetcore = transObj->targetcore;
1250 //assert(type < NUMCLASSES); // can only transfer normal object
1254 int self_y, self_x, target_y, target_x;
1256 // for 32 bit machine, the size of fixed part is always 3 words
1257 //int msgsize = sizeof(int) * 2 + sizeof(void *);
1258 int msgsize = 3 + transObj->length * 2;
1261 struct ___Object___ * newobj = (struct ___Object___ *)obj;
1262 /*if(0 == newobj->isolate) {
1266 calCoords(corenum, &self_y, &self_x);
1267 calCoords(targetcore, &target_y, &target_x);
1268 // Build the message header
1269 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1271 target_y, target_x);
1272 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1273 raw_test_pass(0xbbbb);
1274 raw_test_pass(0xb000 + targetcore); // targetcore
1278 raw_test_pass_reg(msgsize);
1280 raw_test_pass_reg(obj);
1281 //gdn_send(isshared);
1282 //raw_test_pass_reg(isshared);
1283 for(i = 0; i < transObj->length; ++i) {
1284 int taskindex = transObj->queues[2*i];
1285 int paramindex = transObj->queues[2*i+1];
1286 gdn_send(taskindex);
1287 raw_test_pass_reg(taskindex);
1288 gdn_send(paramindex);
1289 raw_test_pass_reg(paramindex);
1291 raw_test_pass(0xffff);
1292 ++(self_numsendobjs);
1293 #elif defined THREADSIMULATE
1294 int numofcore = pthread_getspecific(key);
1296 // use POSIX message queue to transfer objects between cores
1300 if(targetcore < 10) {
1301 corenumstr[0] = targetcore + '0';
1302 corenumstr[1] = '\0';
1304 } else if(targetcore < 100) {
1305 corenumstr[1] = targetcore % 10 + '0';
1306 corenumstr[0] = (targetcore / 10) + '0';
1307 corenumstr[2] = '\0';
1310 printf("Error: targetcore >= 100\n");
1314 char * pathhead = "/msgqueue_";
1315 int targetlen = strlen(pathhead);
1316 char path[targetlen + sourcelen + 1];
1317 strcpy(path, pathhead);
1318 strncat(path, corenumstr, sourcelen);
1319 int oflags = O_WRONLY|O_NONBLOCK;
1320 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1321 mqdnum = mq_open(path, oflags, omodes, NULL);
1323 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1327 /*struct ___Object___ * newobj = (struct ___Object___ *)obj;
1328 if(0 == newobj->isolate) {
1329 newobj = RUNMALLOC(size);
1330 memcpy(newobj, obj, size);
1331 newobj->original=obj;
1333 struct transObjInfo * tmptransObj = RUNMALLOC(sizeof(struct transObjInfo));
1334 memcpy(tmptransObj, transObj, sizeof(struct transObjInfo));
1335 int * tmpqueue = RUNMALLOC(sizeof(int)*2*tmptransObj->length);
1336 memcpy(tmpqueue, tmptransObj->queues, sizeof(int)*2*tmptransObj->length);
1337 tmptransObj->queues = tmpqueue;
1338 struct ___Object___ * newobj = RUNMALLOC(sizeof(struct ___Object___));
1339 newobj->type = ((struct ___Object___ *)obj)->type;
1340 newobj->original = (struct ___Object___ *)tmptransObj;
1343 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1345 printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1349 if(numofcore == STARTUPCORE) {
1350 ++numsendobjs[numofcore];
1352 ++(thread_data_array[numofcore].numsendobjs);
1354 printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1358 // send terminate message to targetcore
1360 bool transStallMsg(int targetcore) {
1363 int self_y, self_x, target_y, target_x;
1364 // for 32 bit machine, the size is always 4 words
1365 //int msgsize = sizeof(int) * 4;
1368 calCoords(corenum, &self_y, &self_x);
1369 calCoords(targetcore, &target_y, &target_x);
1370 // Build the message header
1371 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1373 target_y, target_x);
1374 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1375 raw_test_pass(0xbbbb);
1376 raw_test_pass(0xb000 + targetcore); // targetcore
1380 raw_test_pass_reg(corenum);
1381 gdn_send(self_numsendobjs);
1382 raw_test_pass_reg(self_numsendobjs);
1383 gdn_send(self_numreceiveobjs);
1384 raw_test_pass_reg(self_numreceiveobjs);
1385 raw_test_pass(0xffff);
1387 #elif defined THREADSIMULATE
1388 struct ___Object___ *newobj = RUNMALLOC(sizeof(struct ___Object___));
1389 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
1391 int numofcore = pthread_getspecific(key);
1392 newobj->flag = numofcore;
1393 newobj->___cachedHash___ = thread_data_array[numofcore].numsendobjs;
1394 newobj->___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
1396 // use POSIX message queue to send stall msg to startup core
1397 assert(targetcore == STARTUPCORE);
1401 if(targetcore < 10) {
1402 corenumstr[0] = targetcore + '0';
1403 corenumstr[1] = '\0';
1405 } else if(targetcore < 100) {
1406 corenumstr[1] = targetcore % 10 + '0';
1407 corenumstr[0] = (targetcore / 10) + '0';
1408 corenumstr[2] = '\0';
1411 printf("Error: targetcore >= 100\n");
1415 char * pathhead = "/msgqueue_";
1416 int targetlen = strlen(pathhead);
1417 char path[targetlen + sourcelen + 1];
1418 strcpy(path, pathhead);
1419 strncat(path, corenumstr, sourcelen);
1420 int oflags = O_WRONLY|O_NONBLOCK;
1421 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1422 mqdnum = mq_open(path, oflags, omodes, NULL);
1424 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1429 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1431 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1435 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1436 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj->flag, newobj->___cachedHash___, newobj->___cachedCode___);
1443 // receive object transferred from other cores
1444 // or the terminate message from other cores
1445 // NOTICE: following format is for threadsimulate version only
1446 // RAW version please see previous description
1447 // format: type + object
1448 // type: -1--stall msg
1450 // return value: 0--received an object
1451 // 1--received nothing
1452 // 2--received a Stall Msg
1453 // 3--received a lock Msg
1454 // RAW version: -1 -- received nothing
1455 // otherwise -- received msg type
1456 int receiveObject() {
1460 int self_y, self_x, target_y, target_x;
1462 if(gdn_input_avail() == 0) {
1463 if(corenum < NUMCORES) {
1464 raw_test_pass(0xd001);
1469 raw_test_pass(0xcccc);
1470 while((gdn_input_avail() != 0) && (msgdataindex < msglength)) {
1471 msgdata[msgdataindex] = gdn_receive();
1472 if(msgdataindex == 0) {
1473 if(msgdata[0] > 2) {
1475 } else if(msgdata[0] > 0) {
1478 } else if((msgdataindex == 1) && (msgdata[0] == 0)) {
1479 msglength = msgdata[msgdataindex];
1481 raw_test_pass_reg(msgdata[msgdataindex]);
1484 /*if(msgdataindex == 0) {
1486 msgtype = gdn_receive();
1493 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1494 msgdata[msgdataindex] = msgtype;
1496 raw_test_pass_reg(msgtype);
1497 } else if((msgdataindex == 1) && (msgtype == 0)) {
1498 // object transfer msg
1499 msglength = gdn_receive();
1500 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1501 msgdata[0] = msgtype;
1502 msgdata[msgdataindex] = msglength;
1503 raw_test_pass_reg(msgdata[msgdataindex]);
1505 msgdata[msgdataindex] = gdn_receive();
1506 raw_test_pass_reg(msgdata[msgdataindex]);
1510 raw_test_pass(0xffff);
1511 if(msgdataindex == msglength) {
1512 // received a whole msg
1513 int type, data1, data2; // will receive at least 3 words including type
1519 // receive a object transfer msg
1520 struct transObjInfo * transObj = RUNMALLOC_I(sizeof(struct transObjInfo));
1522 if(corenum > NUMCORES - 1) {
1523 raw_test_done(0xa00a);
1525 // store the object and its corresponding queue info, enqueue it later
1526 transObj->objptr = (void *)data2; // data1 is now size of the msg
1527 transObj->length = (msglength - 3) / 2;
1528 transObj->queues = RUNMALLOC_I(sizeof(int)*(msglength - 3));
1529 for(k = 0; k < transObj->length; ++k) {
1530 transObj->queues[2*k] = msgdata[3+2*k];
1531 raw_test_pass_reg(transObj->queues[2*k]);
1532 transObj->queues[2*k+1] = msgdata[3+2*k+1];
1533 raw_test_pass_reg(transObj->queues[2*k+1]);
1535 //memcpy(transObj->queues, msgdata[3], sizeof(int)*(msglength - 3));
1536 addNewItem_I(&objqueue, (void *)transObj);
1537 ++(self_numreceiveobjs);
1538 raw_test_pass(0xe881);
1540 addNewItem_I(&objqueue, (void *)data2);
1541 ++(self_numreceiveobjs);
1542 raw_test_pass(0xe881);*/
1546 // receive a stall msg
1547 if(corenum != STARTUPCORE) {
1548 // non startup core can not receive stall msg
1550 raw_test_done(0xa001);
1552 if(data1 < NUMCORES) {
1553 raw_test_pass(0xe882);
1554 corestatus[data1] = 0;
1555 numsendobjs[data1] = data2;
1556 numreceiveobjs[data1] = msgdata[3];
1561 // receive lock request msg
1562 // for 32 bit machine, the size is always 3 words
1563 //int msgsize = sizeof(int) * 3;
1565 // lock request msg, handle it right now
1566 // check to see if there is a lock exist in locktbl for the required obj
1567 int data3 = msgdata[3];
1569 if(!RuntimeHashcontainskey(locktbl, data2)) {
1570 // no locks for this object
1571 // first time to operate on this shared object
1572 // create a lock for it
1573 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1574 raw_test_pass(0xe883);
1576 RuntimeHashadd_I(locktbl, data2, 1);
1578 RuntimeHashadd_I(locktbl, data2, -1);
1582 raw_test_pass(0xe884);
1583 RuntimeHashget(locktbl, data2, &rwlock_obj);
1584 raw_test_pass_reg(rwlock_obj);
1585 if(0 == rwlock_obj) {
1591 RuntimeHashremovekey(locktbl, data2);
1592 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1593 } else if((rwlock_obj > 0) && (data1 == 0)) {
1594 // read lock request and there are only read locks
1596 RuntimeHashremovekey(locktbl, data2);
1597 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1601 raw_test_pass_reg(rwlock_obj);
1604 calCoords(corenum, &self_y, &self_x);
1605 calCoords(targetcore, &target_y, &target_x);
1606 // Build the message header
1607 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1609 target_y, target_x);
1610 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1611 raw_test_pass(0xbbbb);
1612 raw_test_pass(0xb000 + targetcore); // targetcore
1614 // deny the lock request
1615 gdn_send(4); // lock request
1618 // grount the lock request
1619 gdn_send(3); // lock request
1622 gdn_send(data1); // lock type
1623 raw_test_pass_reg(data1);
1624 gdn_send(data2); // lock target
1625 raw_test_pass_reg(data2);
1626 raw_test_pass(0xffff);
1630 // receive lock grount msg
1631 if(corenum > NUMCORES - 1) {
1632 raw_test_done(0xa00b);
1634 if(lockobj == data2) {
1641 // conflicts on lockresults
1642 raw_test_done(0xa002);
1647 // receive lock grount/deny msg
1648 if(corenum > NUMCORES - 1) {
1649 raw_test_done(0xa00c);
1651 if(lockobj == data2) {
1658 // conflicts on lockresults
1659 raw_test_done(0xa003);
1664 // receive lock release msg
1665 if(!RuntimeHashcontainskey(locktbl, data2)) {
1666 // no locks for this object, something is wrong
1667 raw_test_done(0xa004);
1670 RuntimeHashget(locktbl, data2, &rwlock_obj);
1671 raw_test_pass(0xe885);
1672 raw_test_pass_reg(rwlock_obj);
1678 RuntimeHashremovekey(locktbl, data2);
1679 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1680 raw_test_pass_reg(rwlock_obj);
1689 for(msgdataindex--;msgdataindex > 0; --msgdataindex) {
1690 msgdata[msgdataindex] = -1;
1695 raw_test_pass(0xe886);
1696 if(gdn_input_avail() != 0) {
1702 raw_test_pass(0xe887);
1705 #elif defined THREADSIMULATE
1706 int numofcore = pthread_getspecific(key);
1707 // use POSIX message queue to transfer object
1709 struct mq_attr mqattr;
1710 mq_getattr(mqd[numofcore], &mqattr);
1711 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
1712 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
1718 //printf("msg: %s\n",msgptr);
1719 if(((int*)msgptr)[0] == -1) {
1721 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1722 int index = tmpptr->flag;
1723 corestatus[index] = 0;
1724 numsendobjs[index] = tmpptr->___cachedHash___;
1725 numreceiveobjs[index] = tmpptr->___cachedCode___;
1726 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
1729 } /*else if(((int*)msgptr)[0] == -2) {
1734 if(numofcore == STARTUPCORE) {
1735 ++(numreceiveobjs[numofcore]);
1737 ++(thread_data_array[numofcore].numreceiveobjs);
1739 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1740 struct transObjInfo * transObj = (struct transObjInfo *)tmpptr->original;
1741 tmpptr = (struct ___Object___ *)(transObj->objptr);
1742 int type = tmpptr->type;
1743 int size=classsize[type];
1744 struct ___Object___ * newobj=RUNMALLOC(size);
1745 memcpy(newobj, tmpptr, size);
1746 if(0 == newobj->isolate) {
1747 newobj->original=tmpptr;
1752 for(k = 0; k < transObj->length; ++k) {
1753 int taskindex = transObj->queues[2 * k];
1754 int paramindex = transObj->queues[2 * k + 1];
1755 struct parameterwrapper ** queues = &(paramqueues[numofcore][taskindex][paramindex]);
1756 enqueueObject(newobj, queues, 1);
1758 RUNFREE(transObj->queues);
1765 bool getreadlock(void * ptr) {
1768 int self_y, self_x, target_y, target_x;
1769 int targetcore = ((int)ptr) % TOTALCORE;
1770 // for 32 bit machine, the size is always 4 words
1771 //int msgsize = sizeof(int) * 4;
1781 if(targetcore == corenum) {
1782 // reside on this core
1785 raw_user_interrupts_off();
1787 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1788 // no locks for this object
1789 // first time to operate on this shared object
1790 // create a lock for it
1791 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1792 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1795 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1796 if(-1 != rwlock_obj) {
1798 RuntimeHashremovekey(locktbl, (int)ptr);
1799 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1805 raw_user_interrupts_on();
1807 if(lockobj == (int)ptr) {
1818 // conflicts on lockresults
1819 raw_test_done(0xa005);
1824 calCoords(corenum, &self_y, &self_x);
1825 calCoords(targetcore, &target_y, &target_x);
1826 // Build the message header
1827 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1829 target_y, target_x);
1830 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1831 raw_test_pass(0xbbbb);
1832 raw_test_pass(0xb000 + targetcore); // targetcore
1833 gdn_send(2); // lock request
1835 gdn_send(0); // read lock
1838 raw_test_pass_reg(ptr);
1840 raw_test_pass_reg(corenum);
1841 raw_test_pass(0xffff);
1843 #elif defined THREADSIMULATE
1844 int numofcore = pthread_getspecific(key);
1846 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1847 printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1851 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1852 // no locks for this object
1853 // first time to operate on this shared object
1854 // create a lock for it
1855 rc = pthread_rwlock_unlock(&rwlock_tbl);
1856 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1857 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1858 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1859 rc = pthread_rwlock_init(rwlock, NULL);
1860 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1861 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1862 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1867 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1869 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1872 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
1874 rc = pthread_rwlock_unlock(&rwlock_tbl);
1875 printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1877 rc = pthread_rwlock_tryrdlock(rwlock);
1878 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1885 pthread_rwlock_t* rwlock_obj = NULL;
1886 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1887 rc = pthread_rwlock_unlock(&rwlock_tbl);
1888 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1889 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1890 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1900 void releasereadlock(void * ptr) {
1903 int self_y, self_x, target_y, target_x;
1904 int targetcore = ((int)ptr) % TOTALCORE;
1905 // for 32 bit machine, the size is always 3 words
1906 //int msgsize = sizeof(int) * 3;
1909 if(targetcore == corenum) {
1911 raw_user_interrupts_off();
1913 // reside on this core
1914 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1915 // no locks for this object, something is wrong
1916 raw_test_done(0xa006);
1919 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1921 RuntimeHashremovekey(locktbl, (int)ptr);
1922 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1925 raw_user_interrupts_on();
1930 calCoords(corenum, &self_y, &self_x);
1931 calCoords(targetcore, &target_y, &target_x);
1932 // Build the message header
1933 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1935 target_y, target_x);
1936 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1937 raw_test_pass(0xbbbb);
1938 raw_test_pass(0xb000 + targetcore); // targetcore
1939 gdn_send(5); // lock release
1941 gdn_send(0); // read lock
1944 raw_test_pass_reg(ptr);
1945 raw_test_pass(0xffff);
1946 #elif defined THREADSIMULATE
1947 int numofcore = pthread_getspecific(key);
1948 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1949 printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1950 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1951 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1954 pthread_rwlock_t* rwlock_obj = NULL;
1955 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1956 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1957 printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1958 rc = pthread_rwlock_unlock(&rwlock_tbl);
1959 printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1964 bool getreadlock_I(void * ptr) {
1966 int self_y, self_x, target_y, target_x;
1967 int targetcore = ((int)ptr) % TOTALCORE;
1968 // for 32 bit machine, the size is always 4 words
1969 //int msgsize = sizeof(int) * 4;
1979 if(targetcore == corenum) {
1980 // reside on this core
1982 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1983 // no locks for this object
1984 // first time to operate on this shared object
1985 // create a lock for it
1986 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1987 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1990 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1991 if(-1 != rwlock_obj) {
1993 RuntimeHashremovekey(locktbl, (int)ptr);
1994 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1999 if(lockobj == (int)ptr) {
2010 // conflicts on lockresults
2011 raw_test_done(0xa005);
2016 calCoords(corenum, &self_y, &self_x);
2017 calCoords(targetcore, &target_y, &target_x);
2018 // Build the message header
2019 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2021 target_y, target_x);
2022 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2023 raw_test_pass(0xbbbb);
2024 raw_test_pass(0xb000 + targetcore); // targetcore
2025 gdn_send(2); // lock request
2027 gdn_send(0); // read lock
2030 raw_test_pass_reg(ptr);
2032 raw_test_pass_reg(corenum);
2033 raw_test_pass(0xffff);
2037 void releasereadlock_I(void * ptr) {
2039 int self_y, self_x, target_y, target_x;
2040 int targetcore = ((int)ptr) % TOTALCORE;
2041 // for 32 bit machine, the size is always 3 words
2042 //int msgsize = sizeof(int) * 3;
2045 if(targetcore == corenum) {
2046 // reside on this core
2047 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2048 // no locks for this object, something is wrong
2049 raw_test_done(0xa006);
2052 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2054 RuntimeHashremovekey(locktbl, (int)ptr);
2055 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2060 calCoords(corenum, &self_y, &self_x);
2061 calCoords(targetcore, &target_y, &target_x);
2062 // Build the message header
2063 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2065 target_y, target_x);
2066 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2067 raw_test_pass(0xbbbb);
2068 raw_test_pass(0xb000 + targetcore); // targetcore
2069 gdn_send(5); // lock release
2071 gdn_send(0); // read lock
2074 raw_test_pass_reg(ptr);
2075 raw_test_pass(0xffff);
2079 bool getwritelock(void * ptr) {
2082 int self_y, self_x, target_y, target_x;
2083 int targetcore = ((int)ptr) % TOTALCORE;
2084 // for 32 bit machine, the size is always 4 words
2085 //int msgsize = sizeof(int) * 4;
2095 if(targetcore == corenum) {
2096 // reside on this core
2099 raw_user_interrupts_off();
2101 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2102 // no locks for this object
2103 // first time to operate on this shared object
2104 // create a lock for it
2105 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
2106 raw_test_pass(0xe552);
2107 RuntimeHashadd_I(locktbl, (int)ptr, -1);
2110 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2111 raw_test_pass(0xe553);
2112 raw_test_pass_reg(rwlock_obj);
2113 if(0 == rwlock_obj) {
2115 RuntimeHashremovekey(locktbl, (int)ptr);
2116 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2122 raw_user_interrupts_on();
2124 raw_test_pass(0xe554);
2125 raw_test_pass_reg(lockresult);
2126 if(lockobj == (int)ptr) {
2139 // conflicts on lockresults
2140 raw_test_done(0xa007);
2145 raw_test_pass(0xe555);
2146 calCoords(corenum, &self_y, &self_x);
2147 calCoords(targetcore, &target_y, &target_x);
2148 // Build the message header
2149 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2151 target_y, target_x);
2152 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2153 raw_test_pass(0xbbbb);
2154 raw_test_pass(0xb000 + targetcore); // targetcore
2155 gdn_send(2); // lock request
2157 gdn_send(1); // write lock
2160 raw_test_pass_reg(ptr);
2162 raw_test_pass_reg(corenum);
2163 raw_test_pass(0xffff);
2165 #elif defined THREADSIMULATE
2166 int numofcore = pthread_getspecific(key);
2168 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
2169 printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2173 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2174 // no locks for this object
2175 // first time to operate on this shared object
2176 // create a lock for it
2177 rc = pthread_rwlock_unlock(&rwlock_tbl);
2178 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2179 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
2180 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
2181 rc = pthread_rwlock_init(rwlock, NULL);
2182 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2183 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
2184 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2186 pthread_rwlock_destroy(rwlock);
2190 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2192 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
2194 pthread_rwlock_destroy(rwlock);
2196 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
2198 rc = pthread_rwlock_unlock(&rwlock_tbl);
2199 printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2201 rc = pthread_rwlock_trywrlock(rwlock);
2202 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2209 pthread_rwlock_t* rwlock_obj = NULL;
2210 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2211 rc = pthread_rwlock_unlock(&rwlock_tbl);
2212 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2213 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
2214 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2225 void releasewritelock(void * ptr) {
2228 int self_y, self_x, target_y, target_x;
2229 int targetcore = ((int)ptr) % TOTALCORE;
2230 // for 32 bit machine, the size is always 3 words
2231 //int msgsize = sizeof(int) * 3;
2234 if(targetcore == corenum) {
2236 raw_user_interrupts_off();
2238 // reside on this core
2239 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2240 // no locks for this object, something is wrong
2241 raw_test_done(0xa008);
2244 raw_test_pass(0xe662);
2245 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2246 raw_test_pass_reg(rwlock_obj);
2248 RuntimeHashremovekey(locktbl, (int)ptr);
2249 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2250 raw_test_pass_reg(rwlock_obj);
2253 raw_user_interrupts_on();
2258 raw_test_pass(0xe663);
2259 calCoords(corenum, &self_y, &self_x);
2260 calCoords(targetcore, &target_y, &target_x);
2261 // Build the message header
2262 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2264 target_y, target_x);
2265 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2266 raw_test_pass(0xbbbb);
2267 raw_test_pass(0xb000 + targetcore);
2268 gdn_send(5); // lock release
2270 gdn_send(1); // write lock
2273 raw_test_pass_reg(ptr);
2274 raw_test_pass(0xffff);
2275 #elif defined THREADSIMULATE
2276 int numofcore = pthread_getspecific(key);
2277 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
2278 printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2279 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2280 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
2283 pthread_rwlock_t* rwlock_obj = NULL;
2284 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2285 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
2286 printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2287 rc = pthread_rwlock_unlock(&rwlock_tbl);
2288 printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2292 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2293 void * taskpointerarray[MAXTASKPARAMS];
2295 int numparams=parameter->task->numParameters;
2296 int numiterators=parameter->task->numTotal-1;
2301 struct taskdescriptor * task=parameter->task;
2303 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2305 /* Add enqueued object to parameter vector */
2306 taskpointerarray[parameter->slot]=ptr;
2308 /* Reset iterators */
2309 for(j=0;j<numiterators;j++) {
2310 toiReset(¶meter->iterators[j]);
2313 /* Find initial state */
2314 for(j=0;j<numiterators;j++) {
2316 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2317 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2319 /* Need to backtrack */
2320 toiReset(¶meter->iterators[j]);
2324 /* Nothing to enqueue */
2331 /* Enqueue current state */
2333 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
2335 tpd->numParameters=numiterators+1;
2336 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
2337 for(j=0;j<=numiterators;j++){
2338 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2341 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2342 genputtable(activetasks, tpd, tpd);
2344 RUNFREE(tpd->parameterArray);
2348 /* This loop iterates to the next parameter combination */
2349 if (numiterators==0)
2352 for(j=numiterators-1; j<numiterators;j++) {
2354 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2355 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2357 /* Need to backtrack */
2358 toiReset(¶meter->iterators[j]);
2362 /* Nothing more to enqueue */
2371 int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2372 void * taskpointerarray[MAXTASKPARAMS];
2374 int numparams=parameter->task->numParameters;
2375 int numiterators=parameter->task->numTotal-1;
2380 struct taskdescriptor * task=parameter->task;
2382 ObjectHashadd_I(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2384 /* Add enqueued object to parameter vector */
2385 taskpointerarray[parameter->slot]=ptr;
2387 /* Reset iterators */
2388 for(j=0;j<numiterators;j++) {
2389 toiReset(¶meter->iterators[j]);
2392 /* Find initial state */
2393 for(j=0;j<numiterators;j++) {
2395 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2396 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2398 /* Need to backtrack */
2399 toiReset(¶meter->iterators[j]);
2403 /* Nothing to enqueue */
2409 /* Enqueue current state */
2411 struct taskparamdescriptor *tpd=RUNMALLOC_I(sizeof(struct taskparamdescriptor));
2413 tpd->numParameters=numiterators+1;
2414 tpd->parameterArray=RUNMALLOC_I(sizeof(void *)*(numiterators+1));
2415 for(j=0;j<=numiterators;j++){
2416 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2419 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2420 genputtable_I(activetasks, tpd, tpd);
2422 RUNFREE(tpd->parameterArray);
2426 /* This loop iterates to the next parameter combination */
2427 if (numiterators==0)
2430 for(j=numiterators-1; j<numiterators;j++) {
2432 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2433 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2435 /* Need to backtrack */
2436 toiReset(¶meter->iterators[j]);
2440 /* Nothing more to enqueue */
2449 /* Handler for signals. The signals catch null pointer errors and
2450 arithmatic errors. */
2452 void myhandler(int sig, siginfo_t *info, void *uap) {
2455 printf("sig=%d\n",sig);
2458 sigemptyset(&toclear);
2459 sigaddset(&toclear, sig);
2460 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
2461 longjmp(error_handler,1);
2467 struct RuntimeHash *fdtoobject;
2469 void addreadfd(int fd) {
2472 FD_SET(fd, &readfds);
2475 void removereadfd(int fd) {
2476 FD_CLR(fd, &readfds);
2477 if (maxreadfd==(fd+1)) {
2479 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
2490 void executetasks() {
2491 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
2494 struct ___Object___ * tmpparam = NULL;
2495 struct parameterdescriptor * pd=NULL;
2496 struct parameterwrapper *pw=NULL;
2505 raw_test_pass(0xe991);
2509 /* Set up signal handlers */
2510 struct sigaction sig;
2511 sig.sa_sigaction=&myhandler;
2512 sig.sa_flags=SA_SIGINFO;
2513 sigemptyset(&sig.sa_mask);
2515 /* Catch bus errors, segmentation faults, and floating point exceptions*/
2516 sigaction(SIGBUS,&sig,0);
2517 sigaction(SIGSEGV,&sig,0);
2518 sigaction(SIGFPE,&sig,0);
2519 sigaction(SIGPIPE,&sig,0);
2528 fdtoobject=allocateRuntimeHash(100);
2532 /* Map first block of memory to protected, anonymous page */
2533 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
2537 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
2540 raw_test_pass(0xe992);
2542 /* Check if any filedescriptors have IO pending */
2545 struct timeval timeout={0,0};
2549 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
2551 /* Process ready fd's */
2553 for(fd=0;fd<maxreadfd;fd++) {
2554 if (FD_ISSET(fd, &tmpreadfds)) {
2555 /* Set ready flag on object */
2557 // printf("Setting fd %d\n",fd);
2558 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
2559 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
2560 enqueueObject(objptr, NULL, 0);
2569 /* See if there are any active tasks */
2570 if (hashsize(activetasks)>0) {
2572 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
2573 genfreekey(activetasks, currtpd);
2575 /* Check if this task has failed, allow a task that contains optional objects to fire */
2576 /*if (gencontains(failedtasks, currtpd)) {
2577 // Free up task parameter descriptor
2578 RUNFREE(currtpd->parameterArray);
2582 numparams=currtpd->task->numParameters;
2583 numtotal=currtpd->task->numTotal;
2585 #ifdef THREADSIMULATE
2586 int isolateflags[numparams];
2588 /* Make sure that the parameters are still in the queues */
2589 for(i=0;i<numparams;i++) {
2590 void * parameter=currtpd->parameterArray[i];
2592 raw_test_pass(0xe993);
2593 // require locks for this parameter
2594 getwritelock(parameter);
2596 raw_user_interrupts_off();
2604 while(receiveObject() != -1) {
2608 grount = lockresult;
2616 raw_user_interrupts_on();
2619 raw_test_pass(0xe994);
2620 // can not get the lock, try later
2621 for(j = 0; j < i; ++j) {
2622 releasewritelock(taskpointerarray[j+OFFSET]);
2624 genputtable(activetasks, currtpd, currtpd);
2630 for(tmp = 0; tmp < classsize[((struct ___Object___ *)parameter)->type]; ++tmp) {
2631 invalidateAddr(parameter + tmp);
2635 tmpparam = (struct ___Object___ *)parameter;
2636 #ifdef THREADSIMULATE
2637 if(0 == tmpparam->isolate) {
2638 isolateflags[i] = 0;
2639 // shared object, need to flush with current value
2640 //if(!getreadlock(tmpparam->original)) {
2641 // // fail to get read lock of the original object, try this task later
2642 if(!getwritelock(tmpparam->original)) {
2643 // fail to get write lock, release all obtained locks and try this task later
2645 for(j = 0; j < i; ++j) {
2646 if(0 == isolateflags[j]) {
2647 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2650 genputtable(activetasks, currtpd, currtpd);
2653 if(tmpparam->version != tmpparam->original->version) {
2654 // some task on another core has changed this object
2655 // flush this object
2656 //memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
2657 // release all obtained locks
2659 for(j = 0; j < i; ++j) {
2660 if(0 == isolateflags[j]) {
2661 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2664 releasewritelock(tmpparam->original);
2666 // dequeue this object
2667 int numofcore = pthread_getspecific(key);
2668 struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
2669 int length = numqueues[numofcore][tmpparam->type];
2670 for(j = 0; j < length; ++j) {
2671 struct parameterwrapper * pw = queues[j];
2672 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
2674 int UNUSED, UNUSED2;
2676 ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2677 ObjectHashremove(pw->objectset, (int)tmpparam);
2678 if (enterflags!=NULL)
2682 // try to enqueue it again to check if it feeds other tasks;
2683 //enqueueObject(tmpparam, NULL, 0);
2684 // Free up task parameter descriptor
2685 RUNFREE(currtpd->parameterArray);
2690 isolateflags[i] = 1;
2693 pd=currtpd->task->descriptorarray[i];
2694 pw=(struct parameterwrapper *) pd->queue;
2695 /* Check that object is still in queue */
2697 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
2699 raw_test_pass(0xe995);
2701 // release grabbed locks
2702 for(j = 0; j < i; ++j) {
2703 releasewritelock(taskpointerarray[j+OFFSET]);
2705 releasewritelock(parameter);
2706 RUNFREE(currtpd->parameterArray);
2712 /* Check if the object's flags still meets requirements */
2716 for(tmpi = 0; tmpi < pw->numberofterms; ++tmpi) {
2717 andmask=pw->intarray[tmpi*2];
2718 checkmask=pw->intarray[tmpi*2+1];
2719 raw_test_pass(0xdd000000 + andmask);
2720 raw_test_pass_reg((int)parameter);
2721 raw_test_pass(0xdd000000 + ((struct ___Object___ *)parameter)->flag);
2722 raw_test_pass(0xdd000000 + checkmask);
2723 if((((struct ___Object___ *)parameter)->flag&andmask)==checkmask) {
2729 // flags are never suitable
2730 // remove this obj from the queue
2732 int UNUSED, UNUSED2;
2734 raw_test_pass(0xe996);
2735 ObjectHashget(pw->objectset, (int) parameter, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2736 ObjectHashremove(pw->objectset, (int)parameter);
2737 if (enterflags!=NULL)
2739 // release grabbed locks
2740 for(j = 0; j < i; ++j) {
2741 releasewritelock(taskpointerarray[j+OFFSET]);
2743 releasewritelock(parameter);
2744 RUNFREE(currtpd->parameterArray);
2752 /* Check that object still has necessary tags */
2753 for(j=0;j<pd->numbertags;j++) {
2754 int slotid=pd->tagarray[2*j]+numparams;
2755 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
2756 if (!containstag(parameter, tagd)) {
2758 raw_test_pass(0xe997);
2760 RUNFREE(currtpd->parameterArray);
2766 taskpointerarray[i+OFFSET]=parameter;
2769 for(;i<numtotal;i++) {
2770 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
2773 #ifdef THREADSIMULATE
2774 for(i = 0; i < numparams; ++i) {
2775 if(0 == isolateflags[i]) {
2776 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2777 if(tmpparam != tmpparam->original) {
2778 taskpointerarray[i+OFFSET] = tmpparam->original;
2787 /* Checkpoint the state */
2788 forward=allocateRuntimeHash(100);
2789 reverse=allocateRuntimeHash(100);
2790 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
2793 if (x=setjmp(error_handler)) {
2798 printf("Fatal Error=%d, Recovering!\n",x);
2802 genputtable(failedtasks,currtpd,currtpd);
2803 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
2805 freeRuntimeHash(forward);
2806 freeRuntimeHash(reverse);
2813 raw_test_pass_reg(x);
2814 raw_test_done(0xa009);
2819 /*if (injectfailures) {
2820 if ((((double)random())/RAND_MAX)<failurechance) {
2821 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
2822 longjmp(error_handler,10);
2825 /* Actually call task */
2827 ((int *)taskpointerarray)[0]=currtpd->numParameters;
2828 taskpointerarray[1]=NULL;
2832 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2834 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2836 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2839 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2842 for(i = 0; i < numparams; ++i) {
2844 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2845 raw_test_pass(0xe998);
2846 raw_test_pass(0xdd100000 + tmpparam->flag);
2847 releasewritelock(tmpparam);
2849 #elif defined THREADSIMULATE
2850 for(i = 0; i < numparams; ++i) {
2851 if(0 == isolateflags[i]) {
2852 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2853 releasewritelock(tmpparam);
2860 freeRuntimeHash(forward);
2861 freeRuntimeHash(reverse);
2865 // Free up task parameter descriptor
2866 RUNFREE(currtpd->parameterArray);
2879 raw_test_pass(0xe999);
2883 /* This function processes an objects tags */
2884 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
2887 for(i=0;i<pd->numbertags;i++) {
2888 int slotid=pd->tagarray[2*i];
2889 int tagid=pd->tagarray[2*i+1];
2891 if (statusarray[slotid+numparams]==0) {
2892 parameter->iterators[*iteratorcount].istag=1;
2893 parameter->iterators[*iteratorcount].tagid=tagid;
2894 parameter->iterators[*iteratorcount].slot=slotid+numparams;
2895 parameter->iterators[*iteratorcount].tagobjectslot=index;
2896 statusarray[slotid+numparams]=1;
2903 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
2906 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
2908 parameter->iterators[*iteratorcount].istag=0;
2909 parameter->iterators[*iteratorcount].slot=index;
2910 parameter->iterators[*iteratorcount].objectset=objectset;
2911 statusarray[index]=1;
2913 for(i=0;i<pd->numbertags;i++) {
2914 int slotid=pd->tagarray[2*i];
2915 int tagid=pd->tagarray[2*i+1];
2916 if (statusarray[slotid+numparams]!=0) {
2917 /* This tag has already been enqueued, use it to narrow search */
2918 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
2922 parameter->iterators[*iteratorcount].numtags=tagcount;
2927 /* This function builds the iterators for a task & parameter */
2929 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
2930 int statusarray[MAXTASKPARAMS];
2932 int numparams=task->numParameters;
2933 int iteratorcount=0;
2934 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
2936 statusarray[index]=1; /* Initial parameter */
2937 /* Process tags for initial iterator */
2939 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
2943 /* Check for objects with existing tags */
2944 for(i=0;i<numparams;i++) {
2945 if (statusarray[i]==0) {
2946 struct parameterdescriptor *pd=task->descriptorarray[i];
2948 for(j=0;j<pd->numbertags;j++) {
2949 int slotid=pd->tagarray[2*j];
2950 if(statusarray[slotid+numparams]!=0) {
2951 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2952 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2959 /* Next do objects w/ unbound tags*/
2961 for(i=0;i<numparams;i++) {
2962 if (statusarray[i]==0) {
2963 struct parameterdescriptor *pd=task->descriptorarray[i];
2964 if (pd->numbertags>0) {
2965 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2966 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2972 /* Nothing with a tag enqueued */
2974 for(i=0;i<numparams;i++) {
2975 if (statusarray[i]==0) {
2976 struct parameterdescriptor *pd=task->descriptorarray[i];
2977 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2978 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2991 #ifdef THREADSIMULATE
2992 int numofcore = pthread_getspecific(key);
2993 for(i=0;i<numtasks[numofcore];i++) {
2994 struct taskdescriptor * task=taskarray[numofcore][i];
2997 if(corenum > NUMCORES - 1) {
3001 for(i=0;i<numtasks[corenum];i++) {
3002 struct taskdescriptor * task=taskarray[corenum][i];
3005 printf("%s\n", task->name);
3007 for(j=0;j<task->numParameters;j++) {
3008 struct parameterdescriptor *param=task->descriptorarray[j];
3009 struct parameterwrapper *parameter=param->queue;
3010 struct ObjectHash * set=parameter->objectset;
3011 struct ObjectIterator objit;
3013 printf(" Parameter %d\n", j);
3015 ObjectHashiterator(set, &objit);
3016 while(ObjhasNext(&objit)) {
3017 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
3018 struct ___Object___ * tagptr=obj->___tags___;
3019 int nonfailed=Objdata4(&objit);
3020 int numflags=Objdata3(&objit);
3021 int flags=Objdata2(&objit);
3024 printf(" Contains %lx\n", obj);
3025 printf(" flag=%d\n", obj->flag);
3028 } else if (tagptr->type==TAGTYPE) {
3030 printf(" tag=%lx\n",tagptr);
3035 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
3036 for(;tagindex<ao->___cachedCode___;tagindex++) {
3038 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
3048 /* This function processes the task information to create queues for
3049 each parameter type. */
3051 void processtasks() {
3054 if(corenum > NUMCORES - 1) {
3058 #ifdef THREADSIMULATE
3059 int numofcore = pthread_getspecific(key);
3060 for(i=0;i<numtasks[numofcore];i++) {
3061 struct taskdescriptor *task=taskarray[numofcore][i];
3063 for(i=0;i<numtasks[corenum];i++) {
3064 struct taskdescriptor * task=taskarray[corenum][i];
3068 /* Build objectsets */
3069 for(j=0;j<task->numParameters;j++) {
3070 struct parameterdescriptor *param=task->descriptorarray[j];
3071 struct parameterwrapper *parameter=param->queue;
3072 parameter->objectset=allocateObjectHash(10);
3073 parameter->task=task;
3076 /* Build iterators for parameters */
3077 for(j=0;j<task->numParameters;j++) {
3078 struct parameterdescriptor *param=task->descriptorarray[j];
3079 struct parameterwrapper *parameter=param->queue;
3080 builditerators(task, j, parameter);
3085 void toiReset(struct tagobjectiterator * it) {
3088 } else if (it->numtags>0) {
3091 ObjectHashiterator(it->objectset, &it->it);
3095 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
3098 /* Get object with tags */
3099 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3100 struct ___Object___ *tagptr=obj->___tags___;
3101 if (tagptr->type==TAGTYPE) {
3102 if ((it->tagobjindex==0)&& /* First object */
3103 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
3108 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3109 int tagindex=it->tagobjindex;
3110 for(;tagindex<ao->___cachedCode___;tagindex++) {
3111 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
3112 if (td->flag==it->tagid) {
3113 it->tagobjindex=tagindex; /* Found right type of tag */
3119 } else if (it->numtags>0) {
3120 /* Use tags to locate appropriate objects */
3121 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3122 struct ___Object___ *objptr=tag->flagptr;
3124 if (objptr->type!=OBJECTARRAYTYPE) {
3125 if (it->tagobjindex>0)
3127 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3129 for(i=1;i<it->numtags;i++) {
3130 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3131 if (!containstag(objptr,tag2))
3136 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3139 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
3140 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
3141 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3143 for(i=1;i<it->numtags;i++) {
3144 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3145 if (!containstag(objptr,tag2))
3148 it->tagobjindex=tagindex;
3153 it->tagobjindex=tagindex;
3157 return ObjhasNext(&it->it);
3161 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
3163 struct ___Object___ * objptr=tag->flagptr;
3164 if (objptr->type==OBJECTARRAYTYPE) {
3165 struct ArrayObject *ao=(struct ArrayObject *)objptr;
3166 for(j=0;j<ao->___cachedCode___;j++) {
3167 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
3175 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
3176 /* hasNext has all of the intelligence */
3179 /* Get object with tags */
3180 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3181 struct ___Object___ *tagptr=obj->___tags___;
3182 if (tagptr->type==TAGTYPE) {
3184 objectarray[it->slot]=tagptr;
3186 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3187 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
3189 } else if (it->numtags>0) {
3190 /* Use tags to locate appropriate objects */
3191 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3192 struct ___Object___ *objptr=tag->flagptr;
3193 if (objptr->type!=OBJECTARRAYTYPE) {
3195 objectarray[it->slot]=objptr;
3197 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3198 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
3201 /* Iterate object */
3202 objectarray[it->slot]=(void *)Objkey(&it->it);