add multi-thread simulator for multi-core version codes. Also add two new files in...
[IRC.git] / Robust / src / Runtime / multicoretask.c
1 #ifdef TASK
2 #include "runtime.h"
3 #include "structdefs.h"
4 #include "mem.h"
5 #include "checkpoint.h"
6 #include "Queue.h"
7 #include "SimpleHash.h"
8 #include "GenericHashtable.h"
9 #include <sys/select.h>
10 #include <sys/types.h>
11 #include <sys/mman.h>
12 #include <string.h>
13 #include <signal.h>
14 #include <assert.h>
15 #include <errno.h>
16 #ifdef RAW
17 #elif defined THREADSIMULATE
18 #if 0
19 #include <sys/mman.h> // for mmap
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23
24 int offset_transObj = 0;
25 #endif
26
27 // use POSIX message queue
28 // for each core, its message queue named as
29 // /msgqueue_corenum
30 #include <mqueue.h>
31 #include <sys/stat.h>
32 #endif
33 /*
34 extern int injectfailures;
35 extern float failurechance;
36 */
37 extern int debugtask;
38 extern int instaccum;
39
40 #ifdef CONSCHECK
41 #include "instrument.h"
42 #endif
43
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
47 struct RuntimeHash * forward;
48 struct RuntimeHash * reverse;
49
50 int corestatus[NUMCORES]; // records status of each core
51                           // 1: running tasks
52                                                   // 0: stall
53 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
54 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
55 #ifdef THREADSIMULATE
56 struct thread_data {
57         int corenum;
58         int argc;
59         char** argv;
60         int numsendobjs;
61         int numreceiveobjs;
62 };
63 struct thread_data thread_data_array[NUMCORES];
64 mqd_t mqd[NUMCORES];
65 static pthread_key_t key;
66 bool transStallMsg(int targetcore);
67 void transTerminateMsg(int targetcore);
68 void run(void * arg);
69 #endif
70
71 int main(int argc, char **argv) {
72 #ifdef THREADSIMULATE
73         int tids[NUMCORES];
74         int rc[NUMCORES];
75         pthread_t threads[NUMCORES];
76         int i = 0;
77
78         // initialize three arrays and msg queue array
79         char * pathhead = "/msgqueue_";
80         int targetlen = strlen(pathhead);
81         for(i = 0; i < NUMCORES; ++i) {
82                 corestatus[i] = 1;
83                 numsendobjs[i] = 0;
84                 numreceiveobjs[i] = 0;
85
86                 char corenumstr[3];
87                 int sourcelen = 0;
88                 if(i < 10) {
89                         corenumstr[0] = i + '0';
90                         corenumstr[1] = '\0';
91                         sourcelen = 1;
92                 } else if(i < 100) {
93                         corenumstr[1] = i %10 + '0';
94                         corenumstr[0] = (i / 10) + '0';
95                         corenumstr[2] = '\0';
96                         sourcelen = 2;
97                 } else {
98                         printf("Error: i >= 100\n");
99                         fflush(stdout);
100                         exit(-1);
101                 }
102                 char path[targetlen + sourcelen + 1];
103                 strcpy(path, pathhead);
104                 strncat(path, corenumstr, sourcelen);
105                 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
106                 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
107                 mq_unlink(path);
108                 mqd[i]= mq_open(path, oflags, omodes, NULL);
109         }
110
111         // create the key
112         pthread_key_create(&key, NULL);
113
114 /*      if(argc < 2) {
115                 printf("Usage: <bin> <corenum>\n");
116                 fflush(stdout);
117                 exit(-1);
118         }
119
120         int cnum = 0;
121         char * number = argv[1];
122         int len = strlen(number);
123         for(i = 0; i < len; ++i) {
124                 cnum = (number[i] - '0') + cnum * 10;   
125         }
126 */
127         for(i = 0; i < NUMCORES; ++i) {
128         /*      if(STARTUPCORE == i) {
129                         continue;
130                 }*/
131                 thread_data_array[i].corenum = i;
132                 thread_data_array[i].argc = argc;
133                 thread_data_array[i].argv = argv;
134                 thread_data_array[i].numsendobjs = 0;
135                 thread_data_array[i].numreceiveobjs = 0;
136                 printf("In main: creating thread %d\n", i);
137                 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
138         if (rc[i]){
139                         printf("ERROR; return code from pthread_create() is %d\n", rc[i]);
140                         fflush(stdout);
141                         exit(-1);
142                 }
143         }//*/
144         
145         /*// do stuff of startup core
146         thread_data_array[STARTUPCORE].corenum = STARTUPCORE;
147         thread_data_array[STARTUPCORE].argc = argc;// - 1;
148         thread_data_array[STARTUPCORE].argv = argv;//&argv[1];
149         thread_data_array[STARTUPCORE].numsendobjs = 0;
150         thread_data_array[STARTUPCORE].numreceiveobjs = 0;
151         run(&thread_data_array[STARTUPCORE]);*/
152         pthread_exit(NULL);
153 }
154
155 void run(void* arg) {
156         struct thread_data * my_tdata = (struct thread_data *)arg;
157         //corenum = my_tdata->corenum;
158         //void * ptr = malloc(sizeof(int));
159         //*((int*)ptr) = my_tdata->corenum;
160         pthread_setspecific(key, (void *)my_tdata->corenum);
161         int argc = my_tdata->argc;
162         char** argv = my_tdata->argv;
163         printf("Thread %d runs: %x\n", my_tdata->corenum, (int)pthread_self());
164         fflush(stdout);
165
166 #endif
167
168 #ifdef BOEHM_GC
169   GC_init(); // Initialize the garbage collector
170 #endif
171 #ifdef CONSCHECK
172   initializemmap();
173 #endif
174   processOptions();
175   initializeexithandler();
176   /* Create table for failed tasks */
177   failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, 
178                                    (int (*)(void *,void *)) &comparetpd);
179   /* Create queue of active tasks */
180   activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, 
181                                    (int (*)(void *,void *)) &comparetpd);
182   
183   /* Process task information */
184   processtasks();
185
186   /* Create startup object */
187   createstartupobject(argc, argv);
188
189   /* Start executing the tasks */
190   executetasks();
191
192 #ifdef THREADSIMULATE
193
194   int i = 0;
195   // check if there are new objects coming
196   bool sendStall = false;
197
198   int numofcore = pthread_getspecific(key);
199   while(true) {
200           switch(receiveObject()) {
201                   case 0: {
202                                           printf("[run] receive an object\n");
203                                           sendStall = false;
204                                           // received an object
205                                           // check if there are new active tasks can be executed
206                                           executetasks();
207                                           break;
208                                   }
209                   case 1: {
210                                           //printf("[run] no msg\n");
211                                           // no msg received
212                                           if(STARTUPCORE == numofcore) {
213                                                   corestatus[numofcore] = 0;
214                                                   // check the status of all cores
215                                                   bool allStall = true;
216                                                   for(i = 0; i < NUMCORES; ++i) {
217                                                           if(corestatus[i] != 0) {
218                                                                   allStall = false;
219                                                                   break;
220                                                           }
221                                                   }
222                                                   if(allStall) {
223                                                           // check if the sum of send objs and receive obj are the same
224                                                           // yes->terminate
225                                                           // no->go on executing
226                                                           int sumsendobj = 0;
227                                                           for(i = 0; i < NUMCORES; ++i) {
228                                                                   sumsendobj += numsendobjs[i];
229                                                           }
230                                                           for(i = 0; i < NUMCORES; ++i) {
231                                                                   sumsendobj -= numreceiveobjs[i];
232                                                           }
233                                                           if(0 == sumsendobj) {
234                                                                   // terminate
235                                                                   // TODO
236                                                                  /* for(i = 0; i < NUMCORES; ++i) {
237                                                                           if(i != corenum) {
238                                                                                   transTerminateMsg(i);
239                                                                           }
240                                                                   }
241                                                                   mq_close(mqd[corenum]);*/
242                                                                   printf("[run] terminate!\n");
243                                                                   fflush(stdout);
244                                                                   exit(0);
245                                                           }
246                                                   }
247                                           } else {
248                                                   if(!sendStall) {
249                                                           // send StallMsg to startup core
250                                                           sendStall = transStallMsg(STARTUPCORE);
251                                                   }
252                                           }
253                                           break;
254                                   }
255                   case 2: {
256                                           printf("[run] receive a stall msg\n");
257                                           // receive a Stall Msg, do nothing
258                                           assert(STARTUPCORE == numofcore); // only startup core can receive such msg
259                                           sendStall = false;
260                                           break;
261                                   }
262                  /* case 3: {
263                                           printf("[run] receive a terminate msg\n");
264                                           // receive a terminate Msg
265                                           assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
266                                           mq_close(mqd[corenum]);
267                                           fflush(stdout);
268                                           exit(0);
269                                           break;
270                                   }*/
271                   default: {
272                                            printf("Error: invalid message type.\n");
273                                            fflush(stdout);
274                                            exit(-1);
275                                            break;
276                                    }
277           }
278   }
279 #endif
280 }
281
282 void createstartupobject(int argc, char ** argv) {
283   int i;
284   
285   /* Allocate startup object     */
286 #ifdef PRECISE_GC
287   struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
288   struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1); 
289 #else
290   struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
291   struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1); 
292 #endif
293   /* Build array of strings */
294   startupobject->___parameters___=stringarray;
295   for(i=1;i<argc;i++) {
296     int length=strlen(argv[i]);
297 #ifdef PRECISE_GC
298     struct ___String___ *newstring=NewString(NULL, argv[i],length);
299 #else
300     struct ___String___ *newstring=NewString(argv[i],length);
301 #endif
302     ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
303   }
304   
305   /* Set initialized flag for startup object */ 
306   flagorandinit(startupobject,1,0xFFFFFFFF);
307   enqueueObject(startupobject, NULL, 0);
308   //enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]);
309 }
310
311 int hashCodetpd(struct taskparamdescriptor *ftd) {
312   int hash=(int)ftd->task;
313   int i;
314   for(i=0;i<ftd->numParameters;i++){ 
315     hash^=(int)ftd->parameterArray[i];
316   }
317   return hash;
318 }
319
320 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
321   int i;
322   if (ftd1->task!=ftd2->task)
323     return 0;
324   for(i=0;i<ftd1->numParameters;i++)
325     if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
326       return 0;
327   return 1;
328 }
329
330 /* This function sets a tag. */
331 #ifdef PRECISE_GC
332 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
333 #else
334 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
335 #endif
336   struct ___Object___ * tagptr=obj->___tags___;
337   if (tagptr==NULL) {
338     obj->___tags___=(struct ___Object___ *)tagd;
339   } else {
340     /* Have to check if it is already set */
341     if (tagptr->type==TAGTYPE) {
342       struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
343       if (td==tagd)
344         return;
345 #ifdef PRECISE_GC
346       int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
347       struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
348       obj=(struct ___Object___ *)ptrarray[2];
349       tagd=(struct ___TagDescriptor___ *)ptrarray[3];
350       td=(struct ___TagDescriptor___ *) obj->___tags___;
351 #else
352       struct ArrayObject * ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
353 #endif
354       ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
355       ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
356       obj->___tags___=(struct ___Object___ *) ao;
357       ao->___cachedCode___=2;
358     } else {
359       /* Array Case */
360       int i;
361       struct ArrayObject *ao=(struct ArrayObject *) tagptr;
362       for(i=0;i<ao->___cachedCode___;i++) {
363         struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
364         if (td==tagd)
365           return;
366       }
367       if (ao->___cachedCode___<ao->___length___) {
368         ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
369         ao->___cachedCode___++;
370       } else {
371 #ifdef PRECISE_GC
372         int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
373         struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
374         obj=(struct ___Object___ *)ptrarray[2];
375         tagd=(struct ___TagDescriptor___ *) ptrarray[3];
376         ao=(struct ArrayObject *)obj->___tags___;
377 #else
378         struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
379 #endif
380         aonew->___cachedCode___=ao->___length___+1;
381         for(i=0;i<ao->___length___;i++) {
382           ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
383         }
384         ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
385       }
386     }
387   }
388
389   {
390     struct ___Object___ * tagset=tagd->flagptr;
391     if(tagset==NULL) {
392       tagd->flagptr=obj;
393     } else if (tagset->type!=OBJECTARRAYTYPE) {
394 #ifdef PRECISE_GC
395       int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
396       struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
397       obj=(struct ___Object___ *)ptrarray[2];
398       tagd=(struct ___TagDescriptor___ *)ptrarray[3];
399 #else
400       struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
401 #endif
402       ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
403       ARRAYSET(ao, struct ___Object___ *, 1, obj);
404       ao->___cachedCode___=2;
405       tagd->flagptr=(struct ___Object___ *)ao;
406     } else {
407       struct ArrayObject *ao=(struct ArrayObject *) tagset;
408       if (ao->___cachedCode___<ao->___length___) {
409         ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
410       } else {
411         int i;
412 #ifdef PRECISE_GC
413         int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
414         struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
415         obj=(struct ___Object___ *)ptrarray[2];
416         tagd=(struct ___TagDescriptor___ *)ptrarray[3];
417         ao=(struct ArrayObject *)tagd->flagptr;
418 #else
419         struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
420 #endif
421         aonew->___cachedCode___=ao->___cachedCode___+1;
422         for(i=0;i<ao->___length___;i++) {
423           ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
424         }
425         ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
426         tagd->flagptr=(struct ___Object___ *) aonew;
427       }
428     }
429   }
430 }
431
432 /* This function clears a tag. */
433 #ifdef PRECISE_GC
434 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
435 #else
436 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
437 #endif
438   /* We'll assume that tag is alway there.
439      Need to statically check for this of course. */
440   struct ___Object___ * tagptr=obj->___tags___;
441
442   if (tagptr->type==TAGTYPE) {
443     if ((struct ___TagDescriptor___ *)tagptr==tagd)
444       obj->___tags___=NULL;
445     else
446       printf("ERROR 1 in tagclear\n");
447   } else {
448     struct ArrayObject *ao=(struct ArrayObject *) tagptr;
449     int i;
450     for(i=0;i<ao->___cachedCode___;i++) {
451       struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
452       if (td==tagd) {
453         ao->___cachedCode___--;
454         if (i<ao->___cachedCode___)
455           ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
456         ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
457         if (ao->___cachedCode___==0)
458           obj->___tags___=NULL;
459         goto PROCESSCLEAR;
460       }
461     }
462     printf("ERROR 2 in tagclear\n");
463   }
464  PROCESSCLEAR:
465   {
466     struct ___Object___ *tagset=tagd->flagptr;
467     if (tagset->type!=OBJECTARRAYTYPE) {
468       if (tagset==obj)
469         tagd->flagptr=NULL;
470       else
471         printf("ERROR 3 in tagclear\n");
472     } else {
473       struct ArrayObject *ao=(struct ArrayObject *) tagset;
474       int i;
475       for(i=0;i<ao->___cachedCode___;i++) {
476         struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
477         if (tobj==obj) {
478           ao->___cachedCode___--;
479           if (i<ao->___cachedCode___)
480             ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
481           ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
482           if (ao->___cachedCode___==0)
483             tagd->flagptr=NULL;
484           goto ENDCLEAR;
485         }
486       }
487       printf("ERROR 4 in tagclear\n");
488     }
489   }
490  ENDCLEAR:
491   return;
492 }
493  
494 /* This function allocates a new tag. */
495 #ifdef PRECISE_GC
496 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
497   struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
498 #else
499 struct ___TagDescriptor___ * allocate_tag(int index) {
500   struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
501 #endif
502   v->type=TAGTYPE;
503   v->flag=index;
504   return v;
505
506
507
508
509 /* This function updates the flag for object ptr.  It or's the flag
510    with the or mask and and's it with the andmask. */
511
512 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
513  
514  int flagcomp(const int *val1, const int *val2) {
515    return (*val1)-(*val2);
516  } 
517
518 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
519     {
520       int oldflag=((int *)ptr)[1];
521       int flag=ormask|oldflag;
522       flag&=andmask;
523           flagbody(ptr, flag, queues, length, false);
524     }
525 }
526  
527 bool intflagorand(void * ptr, int ormask, int andmask) {
528     {
529       int oldflag=((int *)ptr)[1];
530       int flag=ormask|oldflag;
531       flag&=andmask;
532       if (flag==oldflag) /* Don't do anything */
533         return false;
534       else {
535                  flagbody(ptr, flag, NULL, 0, false);
536                  return true;
537           }
538     }
539 }
540
541 void flagorandinit(void * ptr, int ormask, int andmask) {
542   int oldflag=((int *)ptr)[1];
543   int flag=ormask|oldflag;
544   flag&=andmask;
545   flagbody(ptr,flag,NULL,0,true);
546 }
547
548 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
549   struct parameterwrapper * flagptr = NULL;
550   int i = 0;
551   struct parameterwrapper ** queues = vqueues;
552   int length = vlength;
553   if((!isnew) && (queues == NULL)) {
554 #ifdef THREADSIMULATE
555           int numofcore = pthread_getspecific(key);
556           queues = objectqueues[numofcore][ptr->type];
557           length = numqueues[numofcore][ptr->type];
558 #else
559           queues = objectqueues[corenum][ptr->type];
560           length = numqueues[corenum][ptr->type];
561 #endif
562   }
563   ptr->flag=flag;
564   
565   /*Remove object from all queues */
566   for(i = 0; i < length; ++i) {
567           flagptr = queues[i];
568         int next;
569     int UNUSED, UNUSED2;
570     int * enterflags;
571     ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
572     ObjectHashremove(flagptr->objectset, (int)ptr);
573     if (enterflags!=NULL)
574       free(enterflags);
575   }
576  }
577
578  void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
579    struct ___Object___ *ptr = (struct ___Object___ *)vptr;
580   
581   {
582     struct QueueItem *tmpptr;
583         struct parameterwrapper * parameter=NULL;
584         int j;
585         struct parameterwrapper ** queues = vqueues;
586         int length = vlength;
587         if(queues == NULL) {
588 #ifdef THREADSIMULATE
589                 int numofcore = pthread_getspecific(key);
590                 queues = objectqueues[numofcore][ptr->type];
591                 length = numqueues[numofcore][ptr->type];
592 #else
593                 queues = objectqueues[corenum][ptr->type];
594                 length = numqueues[corenum][ptr->type];
595 #endif
596         }
597     int i;
598     struct parameterwrapper * prevptr=NULL;
599     struct ___Object___ *tagptr=ptr->___tags___;
600     
601     /* Outer loop iterates through all parameter queues an object of
602        this type could be in.  */
603     
604         for(j = 0; j < length; ++j) {
605                 parameter = queues[j];
606       /* Check tags */
607       if (parameter->numbertags>0) {
608         if (tagptr==NULL)
609           goto nextloop;//that means the object has no tag but that param needs tag
610         else if(tagptr->type==TAGTYPE) {//one tag
611           struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
612           for(i=0;i<parameter->numbertags;i++) {
613             //slotid is parameter->tagarray[2*i];
614             int tagid=parameter->tagarray[2*i+1];
615             if (tagid!=tagptr->flag)
616               goto nextloop; /*We don't have this tag */          
617            }
618         } else {//multiple tags
619           struct ArrayObject * ao=(struct ArrayObject *) tagptr;
620           for(i=0;i<parameter->numbertags;i++) {
621             //slotid is parameter->tagarray[2*i];
622             int tagid=parameter->tagarray[2*i+1];
623             int j;
624             for(j=0;j<ao->___cachedCode___;j++) {
625               if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
626                 goto foundtag;
627             }
628             goto nextloop;
629           foundtag:
630             ;
631           }
632         }
633       }
634       
635       /* Check flags */
636       for(i=0;i<parameter->numberofterms;i++) {
637         int andmask=parameter->intarray[i*2];
638         int checkmask=parameter->intarray[i*2+1];
639         if ((ptr->flag&andmask)==checkmask) {
640           enqueuetasks(parameter, prevptr, ptr, NULL, 0);
641           prevptr=parameter;
642           break;
643         }
644       }
645     nextloop:
646           ;
647     }
648   }
649 }
650
651 // transfer an object to targetcore
652 // format: object
653 void transferObject(void * obj, int targetcore) {
654         int type=((int *)obj)[0];
655         assert(type < NUMCLASSES); // can only transfer normal object
656     int size=classsize[type];
657
658 #ifdef RAW
659
660 #elif defined THREADSIMULATE
661 #if 0
662     // use shared memory to transfer objects between cores
663         int fd = 0; // mapped file
664         void * p_map = NULL;
665         char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
666         int offset;
667         // open the file 
668         fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
669         offset = lseek(fd, 0, SEEK_CUR);
670         if(offset == -1) {
671                 printf("fail to open file " + filepath + " in transferObject.\n");
672                 fflush(stdout);
673                 exit(-1);
674         }
675         lseek(fd, size + sizeof(int)*2, SEEK_CUR);
676         write(fd, "", 1); 
677         p_map = (void *)mmap(NULL,size+sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
678         close(fd);
679         memcpy(p_map, type, sizeof(int));
680         memcpy(p_map+sizeof(int), corenum, sizeof(int));
681         memcpy((p_map+sizeof(int)*2), obj, size);
682         munmap(p_map, size+sizeof(int)*2); 
683         //printf( "umap ok \n" );
684 #endif
685
686         // use POSIX message queue to transfer objects between cores
687         mqd_t mqdnum;
688         char corenumstr[3];
689         int sourcelen = 0;
690         if(targetcore < 10) {
691                 corenumstr[0] = targetcore + '0';
692                 corenumstr[1] = '\0';
693                 sourcelen = 1;
694         } else if(targetcore < 100) {
695                 corenumstr[1] = targetcore % 10 + '0';
696                 corenumstr[0] = (targetcore / 10) + '0';
697                 corenumstr[2] = '\0';
698                 sourcelen = 2;
699         } else {
700                 printf("Error: targetcore >= 100\n");
701                 fflush(stdout);
702                 exit(-1);
703         }
704         char * pathhead = "/msgqueue_";
705         int targetlen = strlen(pathhead);
706         char path[targetlen + sourcelen + 1];
707         strcpy(path, pathhead);
708         strncat(path, corenumstr, sourcelen);
709         int oflags = O_WRONLY|O_CREAT|O_NONBLOCK;
710         int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
711         mqdnum = mq_open(path, oflags, omodes, NULL);
712         if(mqdnum==-1) {
713                 printf("[transferObject] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
714                 fflush(stdout);
715                 exit(-1);
716         }
717         int ret;
718         do {
719                 ret=mq_send(mqdnum, obj, size, 0); // send the object into the queue
720                 if(ret != 0) {
721                         printf("[transferObject] mq_send returned: %d, error: %s\n", ret, strerror(errno));
722                 }
723         }while(ret!=0);
724         int numofcore = pthread_getspecific(key);
725         if(numofcore == STARTUPCORE) {
726                 ++numsendobjs[numofcore];
727         } else {
728                 ++(thread_data_array[numofcore].numsendobjs);
729         }
730         printf("[transferObject] mq_send returned: $%x\n",ret);
731 #endif
732 }
733
734 // send terminate message to targetcore
735 // format: -1
736 bool transStallMsg(int targetcore) {
737         struct ___Object___ newobj;
738         // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
739         newobj.type = -1;
740
741 #ifdef RAW
742         newobj.flag = corenum;
743         newobj.___cachedHash___ = thread_data_array[corenum].numsendobjs;
744         newobj.___cachedCode___ = thread_data_array[corenum].numreceiveobjs;
745
746 #elif defined THREADSIMULATE
747         int numofcore = pthread_getspecific(key);
748         newobj.flag = numofcore;
749         newobj.___cachedHash___ = thread_data_array[numofcore].numsendobjs;
750         newobj.___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
751 #if 0
752     // use shared memory to transfer objects between cores
753         int fd = 0; // mapped file
754         void * p_map = NULL;
755         char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
756         int offset;
757         // open the file 
758         fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
759         offset = lseek(fd, 0, SEEK_CUR);
760         if(offset == -1) {
761                 printf("fail to open file " + filepath + " in transferObject.\n");
762                 fflush(stdout);
763                 exit(-1);
764         }
765         lseek(fd, sizeof(int)*2, SEEK_CUR);
766         write(fd, "", 1); 
767         p_map = (void *)mmap(NULL,sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
768         close(fd);
769         memcpy(p_map, type, sizeof(int));
770         memcpy(p_map+sizeof(int), corenum, sizeof(int));
771         munmap(p_map, sizeof(int)*2); 
772         //printf( "umap ok \n" );
773 #endif
774
775         // use POSIX message queue to send stall msg to startup core
776         assert(targetcore == STARTUPCORE);
777         mqd_t mqdnum;
778         char corenumstr[3];
779         int sourcelen = 0;
780         if(targetcore < 10) {
781                 corenumstr[0] = targetcore + '0';
782                 corenumstr[1] = '\0';
783                 sourcelen = 1;
784         } else if(targetcore < 100) {
785                 corenumstr[1] = targetcore % 10 + '0';
786                 corenumstr[0] = (targetcore / 10) + '0';
787                 corenumstr[2] = '\0';
788                 sourcelen = 2;
789         } else {
790                 printf("Error: targetcore >= 100\n");
791                 fflush(stdout);
792                 exit(-1);
793         }
794         char * pathhead = "/msgqueue_";
795         int targetlen = strlen(pathhead);
796         char path[targetlen + sourcelen + 1];
797         strcpy(path, pathhead);
798         strncat(path, corenumstr, sourcelen);
799         int oflags = O_WRONLY|O_CREAT|O_NONBLOCK;
800         int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
801         mqdnum = mq_open(path, oflags, omodes, NULL);
802         if(mqdnum==-1) {
803                 printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
804                 fflush(stdout);
805                 exit(-1);
806         }
807         int ret;
808         ret=mq_send(mqdnum, (void *)&newobj, sizeof(struct ___Object___), 0); // send the object into the queue
809         if(ret != 0) {
810                 printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
811                 return false;
812         } else {
813                 printf("[transStallMsg] mq_send returned: $%x\n", ret);
814                 printf("index: %d, sendobjs: %d, receiveobjs: %d\n", newobj.flag, newobj.___cachedHash___, newobj.___cachedCode___);
815                 return true;
816         }
817 #endif
818 }
819 #if 0
820 // send terminate message to targetcore
821 // format: -1
822 void transTerminateMsg(int targetcore) {
823         // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
824         int type = -2;
825
826 #ifdef RAW
827
828 #elif defined THREADSIMULATE
829
830         // use POSIX message queue to send stall msg to startup core
831         assert(targetcore != STARTUPCORE);
832         mqd_t mqdnum;
833         char corenumstr[3];
834         int sourcelen = 0;
835         if(targetcore < 10) {
836                 corenumstr[0] = targetcore + '0';
837                 corenumstr[1] = '\0';
838                 sourcelen = 1;
839         } else if(corenum < 100) {
840                 corenumstr[1] = targetcore % 10 + '0';
841                 corenumstr[0] = (targetcore / 10) + '0';
842                 corenumstr[2] = '\0';
843                 sourcelen = 2;
844         } else {
845                 printf("Error: targetcore >= 100\n");
846                 fflush(stdout);
847                 exit(-1);
848         }
849         char * pathhead = "/msgqueue_";
850         int targetlen = strlen(pathhead);
851         char path[targetlen + sourcelen + 1];
852         strcpy(path, pathhead);
853         strncat(path, corenumstr, sourcelen);
854         int oflags = O_WRONLY|O_CREAT|O_NONBLOCK;
855         int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
856         mqdnum = mq_open(path, oflags, omodes, NULL);
857         if(mqdnum==-1) {
858                 printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
859                 fflush(stdout);
860                 exit(-1);
861         }
862         int ret;
863         do {
864                 ret=mq_send(mqdnum, (void *)&type, sizeof(int), 0); // send the object into the queue
865                 if(ret != 0) {
866                         printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
867                 }
868         }while(ret != 0);
869 #endif
870 }
871 #endif
872 // receive object transferred from other cores
873 // or the terminate message from other cores
874 // format: type [+ object]
875 // type: -1--stall msg
876 //      !-1--object
877 // return value: 0--received an object
878 //               1--received nothing
879 //               2--received a Stall Msg
880 int receiveObject() {
881 #ifdef RAW
882
883 #elif defined THREADSIMULATE
884 #if 0
885     char * filepath = "/scratch/transObj/file_" + corenum + ".txt";
886         int fd = 0;
887         void * p_map = NULL;
888         int type = 0;
889         int sourcecorenum = 0;
890         int size = 0;
891         fd = open(filepath, O_CREAT|O_RDONLY, 00777);
892         lseek(fd, offset_transObj, SEEK_SET);
893         p_map = (void*)mmap(NULL,sizeof(int)*2,PROT_READ,MAP_SHARED,fd,offset_transObj);
894         type = *(int*)p_map;
895         sourcecorenum = *(int*)(p_map+sinzeof(int));
896         offset_transObj += sizeof(int)*2;
897         munmap(p_map,sizeof(int)*2);
898         if(type == -1) {
899                 // sourecorenum has terminated
900                 ++offset_transObj;
901                 return;
902         }
903         size = classsize[type];
904         p_map = (void*)mmap(NULL,size,PROT_READ,MAP_SHARED,fd,offset_transObj);
905         struct ___Object___ * newobj=RUNMALLOC(size);
906     memcpy(newobj, p_map, size);
907         ++offset_transObj;
908         enqueueObject(newobj,NULL,0);
909 #endif
910         int numofcore = pthread_getspecific(key);
911         // use POSIX message queue to transfer object
912         int msglen = 0;
913         struct mq_attr mqattr;
914         mq_getattr(mqd[numofcore], &mqattr);
915         void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
916         msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
917         if(-1 == msglen) {
918                 // no msg
919                 free(msgptr);
920                 return 1;
921         }
922         //printf("msg: %s\n",msgptr);
923         if(((int*)msgptr)[0] == -1) {
924                 // StallMsg
925                 int* tmpptr = (int*)msgptr;
926                 int index = tmpptr[1];
927                 corestatus[index] = 0;
928                 numsendobjs[index] = tmpptr[2];
929                 numreceiveobjs[index] = ((int *)(msgptr + sizeof(int) * 3 + sizeof(void *)))[0];
930                 printf("index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
931                 free(msgptr);
932                 return 2;
933         } /*else if(((int*)msgptr)[0] == -2) {
934                 // terminate msg
935                 return 3;
936         } */else {
937                 // an object
938                 if(numofcore == STARTUPCORE) {
939                         ++(numreceiveobjs[numofcore]);
940                 } else {
941                         ++(thread_data_array[numofcore].numreceiveobjs);
942                 }
943                 struct ___Object___ * newobj=RUNMALLOC(msglen);
944                 memcpy(newobj, msgptr, msglen);
945                 free(msgptr);
946                 enqueueObject(newobj, NULL, 0);
947                 return 0;
948         }
949 #endif
950 }
951
952 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
953   void * taskpointerarray[MAXTASKPARAMS];
954   int j;
955   int numparams=parameter->task->numParameters;
956   int numiterators=parameter->task->numTotal-1;
957   int retval=1;
958   int addnormal=1;
959   int adderror=1;
960
961   struct taskdescriptor * task=parameter->task;
962
963         ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
964  
965   /* Add enqueued object to parameter vector */
966   taskpointerarray[parameter->slot]=ptr;
967
968   /* Reset iterators */
969   for(j=0;j<numiterators;j++) {
970     toiReset(&parameter->iterators[j]);
971   }
972
973   /* Find initial state */
974   for(j=0;j<numiterators;j++) {
975   backtrackinit:
976     if(toiHasNext(&parameter->iterators[j], taskpointerarray OPTARG(failed)))
977       toiNext(&parameter->iterators[j], taskpointerarray OPTARG(failed));
978     else if (j>0) {
979       /* Need to backtrack */
980       toiReset(&parameter->iterators[j]);
981       j--;
982       goto backtrackinit;
983     } else {
984       /* Nothing to enqueue */
985       return retval;
986     }
987   }
988
989   
990   while(1) {
991     /* Enqueue current state */
992     int launch = 0;
993     struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
994     tpd->task=task;
995     tpd->numParameters=numiterators+1;
996     tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
997     for(j=0;j<=numiterators;j++){
998       tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
999     }
1000     /* Enqueue task */
1001     if ((!gencontains(failedtasks, tpd)&&!gencontains(activetasks,tpd))) {
1002       genputtable(activetasks, tpd, tpd);
1003     } else {
1004       RUNFREE(tpd->parameterArray);
1005       RUNFREE(tpd);
1006     }
1007     
1008     /* This loop iterates to the next parameter combination */
1009     if (numiterators==0)
1010       return retval;
1011
1012     for(j=numiterators-1; j<numiterators;j++) {
1013     backtrackinc:
1014       if(toiHasNext(&parameter->iterators[j], taskpointerarray OPTARG(failed)))
1015         toiNext(&parameter->iterators[j], taskpointerarray OPTARG(failed));
1016       else if (j>0) {
1017         /* Need to backtrack */
1018         toiReset(&parameter->iterators[j]);
1019         j--;
1020         goto backtrackinc;
1021       } else {
1022         /* Nothing more to enqueue */
1023         return retval;
1024       }
1025     }
1026   }
1027   return retval;
1028 }
1029  
1030 /* Handler for signals. The signals catch null pointer errors and
1031    arithmatic errors. */
1032
1033 void myhandler(int sig, siginfo_t *info, void *uap) {
1034   sigset_t toclear;
1035 #ifdef DEBUG
1036   printf("sig=%d\n",sig);
1037   printf("signal\n");
1038 #endif
1039   sigemptyset(&toclear);
1040   sigaddset(&toclear, sig);
1041   sigprocmask(SIG_UNBLOCK, &toclear,NULL); 
1042   longjmp(error_handler,1);
1043 }
1044
1045 fd_set readfds;
1046 int maxreadfd;
1047 struct RuntimeHash *fdtoobject;
1048
1049 void addreadfd(int fd) {
1050   if (fd>=maxreadfd)
1051     maxreadfd=fd+1;
1052   FD_SET(fd, &readfds);
1053 }
1054
1055 void removereadfd(int fd) {
1056   FD_CLR(fd, &readfds);
1057   if (maxreadfd==(fd+1)) {
1058     maxreadfd--;
1059     while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
1060       maxreadfd--;
1061   }
1062 }
1063
1064 #ifdef PRECISE_GC
1065 #define OFFSET 2
1066 #else
1067 #define OFFSET 0
1068 #endif
1069
1070 void executetasks() {
1071   void * taskpointerarray[MAXTASKPARAMS+OFFSET];
1072
1073   /* Set up signal handlers */
1074   struct sigaction sig;
1075   sig.sa_sigaction=&myhandler;
1076   sig.sa_flags=SA_SIGINFO;
1077   sigemptyset(&sig.sa_mask);
1078
1079   /* Catch bus errors, segmentation faults, and floating point exceptions*/
1080   sigaction(SIGBUS,&sig,0);
1081   sigaction(SIGSEGV,&sig,0);
1082   sigaction(SIGFPE,&sig,0);
1083   sigaction(SIGPIPE,&sig,0);
1084
1085   /* Zero fd set */
1086   FD_ZERO(&readfds);
1087   maxreadfd=0;
1088   fdtoobject=allocateRuntimeHash(100);
1089
1090   /* Map first block of memory to protected, anonymous page */
1091   mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
1092
1093   newtask:
1094   while((hashsize(activetasks)>0)||(maxreadfd>0)) {
1095
1096     /* Check if any filedescriptors have IO pending */
1097     if (maxreadfd>0) {
1098       int i;
1099       struct timeval timeout={0,0};
1100       fd_set tmpreadfds;
1101       int numselect;
1102       tmpreadfds=readfds;
1103       numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
1104       if (numselect>0) {
1105         /* Process ready fd's */
1106         int fd;
1107         for(fd=0;fd<maxreadfd;fd++) {
1108           if (FD_ISSET(fd, &tmpreadfds)) {
1109             /* Set ready flag on object */
1110             void * objptr;
1111             //      printf("Setting fd %d\n",fd);
1112             if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
1113               if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
1114                          enqueueObject(objptr, NULL, 0);
1115                   }
1116             }
1117           }
1118         }
1119       }
1120     }
1121
1122     /* See if there are any active tasks */
1123     if (hashsize(activetasks)>0) {
1124       int i;
1125       currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
1126       genfreekey(activetasks, currtpd);
1127       
1128       /* Check if this task has failed, allow a task that contains optional objects to fire */
1129       if (gencontains(failedtasks, currtpd)) {
1130         // Free up task parameter descriptor
1131         RUNFREE(currtpd->parameterArray);
1132         RUNFREE(currtpd);
1133         goto newtask;
1134       }
1135       int numparams=currtpd->task->numParameters;
1136       int numtotal=currtpd->task->numTotal;
1137       
1138       /* Make sure that the parameters are still in the queues */
1139       for(i=0;i<numparams;i++) {
1140         void * parameter=currtpd->parameterArray[i];
1141         struct parameterdescriptor * pd=currtpd->task->descriptorarray[i];
1142         struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
1143         int j;
1144         /* Check that object is still in queue */
1145         {
1146           if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
1147             RUNFREE(currtpd->parameterArray);
1148             RUNFREE(currtpd);
1149             goto newtask;
1150           }
1151         }
1152       parameterpresent:
1153         ;
1154         /* Check that object still has necessary tags */
1155         for(j=0;j<pd->numbertags;j++) {
1156           int slotid=pd->tagarray[2*j]+numparams;
1157           struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
1158           if (!containstag(parameter, tagd)) {
1159             RUNFREE(currtpd->parameterArray);
1160             RUNFREE(currtpd);
1161             goto newtask;
1162           }
1163         }
1164         
1165         taskpointerarray[i+OFFSET]=parameter;
1166       }
1167       /* Copy the tags */
1168       for(;i<numtotal;i++) {
1169         taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
1170       }
1171
1172       {
1173         /* Checkpoint the state */
1174         forward=allocateRuntimeHash(100);
1175         reverse=allocateRuntimeHash(100);
1176         //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
1177         int x;
1178         if (x=setjmp(error_handler)) {
1179           int counter;
1180           /* Recover */
1181           
1182 #ifdef DEBUG
1183           printf("Fatal Error=%d, Recovering!\n",x);
1184 #endif
1185          /*
1186           genputtable(failedtasks,currtpd,currtpd);
1187           //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
1188
1189           freeRuntimeHash(forward);
1190           freeRuntimeHash(reverse);
1191           freemalloc();
1192           forward=NULL;
1193           reverse=NULL;
1194           */
1195           fflush(stdout);
1196           exit(-1);
1197         } else {
1198           /*if (injectfailures) {
1199             if ((((double)random())/RAND_MAX)<failurechance) {
1200               printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
1201               longjmp(error_handler,10);
1202             }
1203           }*/
1204           /* Actually call task */
1205 #ifdef PRECISE_GC
1206           ((int *)taskpointerarray)[0]=currtpd->numParameters;
1207           taskpointerarray[1]=NULL;
1208 #endif
1209           if(debugtask){
1210             printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1211             ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1212             printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1213           } else
1214             ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1215
1216           freeRuntimeHash(forward);
1217           freeRuntimeHash(reverse);
1218           freemalloc();
1219           // Free up task parameter descriptor
1220           RUNFREE(currtpd->parameterArray);
1221           RUNFREE(currtpd);
1222           forward=NULL;
1223           reverse=NULL;
1224         }
1225       }
1226     }
1227   }
1228 }
1229  
1230 /* This function processes an objects tags */
1231 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
1232   int i;
1233   
1234   for(i=0;i<pd->numbertags;i++) {
1235     int slotid=pd->tagarray[2*i];
1236     int tagid=pd->tagarray[2*i+1];
1237     
1238     if (statusarray[slotid+numparams]==0) {
1239       parameter->iterators[*iteratorcount].istag=1;
1240       parameter->iterators[*iteratorcount].tagid=tagid;
1241       parameter->iterators[*iteratorcount].slot=slotid+numparams;
1242       parameter->iterators[*iteratorcount].tagobjectslot=index;
1243       statusarray[slotid+numparams]=1;
1244       (*iteratorcount)++;
1245     }
1246   }
1247 }
1248
1249
1250 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
1251   int i;
1252   int tagcount=0;
1253   struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
1254
1255   parameter->iterators[*iteratorcount].istag=0;
1256   parameter->iterators[*iteratorcount].slot=index;
1257   parameter->iterators[*iteratorcount].objectset=objectset;
1258   statusarray[index]=1;
1259
1260   for(i=0;i<pd->numbertags;i++) {
1261     int slotid=pd->tagarray[2*i];
1262     int tagid=pd->tagarray[2*i+1];
1263     if (statusarray[slotid+numparams]!=0) {
1264       /* This tag has already been enqueued, use it to narrow search */
1265       parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
1266       tagcount++;
1267     }
1268   }
1269   parameter->iterators[*iteratorcount].numtags=tagcount;
1270
1271   (*iteratorcount)++;
1272 }
1273
1274 /* This function builds the iterators for a task & parameter */
1275
1276 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
1277   int statusarray[MAXTASKPARAMS];
1278   int i;
1279   int numparams=task->numParameters;
1280   int iteratorcount=0;
1281   for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
1282
1283   statusarray[index]=1; /* Initial parameter */
1284   /* Process tags for initial iterator */
1285   
1286   processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
1287   
1288   while(1) {
1289   loopstart:
1290     /* Check for objects with existing tags */
1291     for(i=0;i<numparams;i++) {
1292       if (statusarray[i]==0) {
1293         struct parameterdescriptor *pd=task->descriptorarray[i];
1294         int j;
1295         for(j=0;j<pd->numbertags;j++) {
1296           int slotid=pd->tagarray[2*j];
1297           if(statusarray[slotid+numparams]!=0) {
1298             processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1299             processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1300             goto loopstart;
1301           }
1302         }
1303       }
1304     }
1305
1306     /* Next do objects w/ unbound tags*/
1307
1308     for(i=0;i<numparams;i++) {
1309       if (statusarray[i]==0) {
1310         struct parameterdescriptor *pd=task->descriptorarray[i];
1311         if (pd->numbertags>0) {
1312           processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1313           processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1314           goto loopstart;
1315         }
1316       }
1317     }
1318
1319     /* Nothing with a tag enqueued */
1320
1321     for(i=0;i<numparams;i++) {
1322       if (statusarray[i]==0) {
1323         struct parameterdescriptor *pd=task->descriptorarray[i];
1324         processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1325         processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1326         goto loopstart;
1327       }
1328     }
1329
1330     /* Nothing left */
1331     return;
1332   }
1333 }
1334
1335  void printdebug() {
1336    int i;
1337    int j;
1338 #ifdef THREADSIMULATE
1339    int numofcore = pthread_getspecific(key);
1340    for(i=0;i<numtasks[numofcore];i++) {
1341            struct taskdescriptor * task=taskarray[numofcore][i];
1342 #else
1343    for(i=0;i<numtasks[corenum];i++) {
1344      struct taskdescriptor * task=taskarray[corenum][i];
1345 #endif
1346      printf("%s\n", task->name);
1347      for(j=0;j<task->numParameters;j++) {
1348        struct parameterdescriptor *param=task->descriptorarray[j];
1349        struct parameterwrapper *parameter=param->queue;
1350        struct ObjectHash * set=parameter->objectset;
1351        struct ObjectIterator objit;
1352        printf("  Parameter %d\n", j);
1353        ObjectHashiterator(set, &objit);
1354        while(ObjhasNext(&objit)) {
1355          struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
1356          struct ___Object___ * tagptr=obj->___tags___;
1357          int nonfailed=Objdata4(&objit);
1358          int numflags=Objdata3(&objit);
1359          int flags=Objdata2(&objit);
1360          Objnext(&objit);
1361          printf("    Contains %lx\n", obj);
1362          printf("      flag=%d\n", obj->flag); 
1363          if (tagptr==NULL) {
1364          } else if (tagptr->type==TAGTYPE) {
1365            printf("      tag=%lx\n",tagptr);
1366          } else {
1367            int tagindex=0;
1368            struct ArrayObject *ao=(struct ArrayObject *)tagptr;
1369            for(;tagindex<ao->___cachedCode___;tagindex++) {
1370              printf("      tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
1371            }
1372          }
1373        }
1374      }
1375    }
1376  }
1377  
1378
1379 /* This function processes the task information to create queues for
1380    each parameter type. */
1381
1382 void processtasks() {
1383   int i;
1384 #ifdef THREADSIMULATE
1385   int numofcore = pthread_getspecific(key);
1386   for(i=0;i<numtasks[numofcore];i++) {
1387           struct taskdescriptor *task=taskarray[numofcore][i];
1388 #else
1389   for(i=0;i<numtasks[corenum];i++) {
1390     struct taskdescriptor * task=taskarray[corenum][i];
1391 #endif
1392     int j;
1393
1394     /* Build iterators for parameters */
1395     for(j=0;j<task->numParameters;j++) {
1396       struct parameterdescriptor *param=task->descriptorarray[j];
1397       struct parameterwrapper *parameter=param->queue;
1398           parameter->objectset=allocateObjectHash(10);
1399           parameter->task=task;
1400       builditerators(task, j, parameter);
1401     }
1402   }
1403 }
1404
1405 void toiReset(struct tagobjectiterator * it) {
1406   if (it->istag) {
1407     it->tagobjindex=0;
1408   } else if (it->numtags>0) {
1409     it->tagobjindex=0;
1410   } else {
1411     ObjectHashiterator(it->objectset, &it->it);
1412   }
1413 }
1414
1415 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
1416   if (it->istag) {
1417     /* Iterate tag */
1418     /* Get object with tags */
1419     struct ___Object___ *obj=objectarray[it->tagobjectslot];
1420     struct ___Object___ *tagptr=obj->___tags___;
1421     if (tagptr->type==TAGTYPE) {
1422       if ((it->tagobjindex==0)&& /* First object */
1423           (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
1424         return 1;
1425       else
1426         return 0;
1427     } else {
1428       struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1429       int tagindex=it->tagobjindex;
1430       for(;tagindex<ao->___cachedCode___;tagindex++) {
1431         struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
1432         if (td->flag==it->tagid) {
1433           it->tagobjindex=tagindex; /* Found right type of tag */
1434           return 1;
1435         }
1436       }
1437       return 0;
1438     }
1439   } else if (it->numtags>0) {
1440     /* Use tags to locate appropriate objects */
1441     struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1442     struct ___Object___ *objptr=tag->flagptr;
1443     int i;
1444     if (objptr->type!=OBJECTARRAYTYPE) {
1445       if (it->tagobjindex>0)
1446         return 0;
1447       if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1448         return 0;
1449       for(i=1;i<it->numtags;i++) {
1450         struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1451         if (!containstag(objptr,tag2))
1452           return 0;
1453       }
1454       return 1;
1455     } else {
1456       struct ArrayObject *ao=(struct ArrayObject *) objptr;
1457       int tagindex;
1458       int i;
1459       for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
1460         struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
1461         if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1462           continue;
1463         for(i=1;i<it->numtags;i++) {
1464           struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1465           if (!containstag(objptr,tag2))
1466             goto nexttag;
1467         }
1468         it->tagobjindex=tagindex;
1469         return 1;
1470       nexttag:
1471         ;
1472       }
1473       it->tagobjindex=tagindex;
1474       return 0;
1475     }
1476   } else {
1477     return ObjhasNext(&it->it);
1478   }
1479 }
1480
1481 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
1482   int j;
1483   struct ___Object___ * objptr=tag->flagptr;
1484   if (objptr->type==OBJECTARRAYTYPE) {
1485     struct ArrayObject *ao=(struct ArrayObject *)objptr;
1486     for(j=0;j<ao->___cachedCode___;j++) {
1487       if (ptr==ARRAYGET(ao, struct ___Object___*, j))
1488         return 1;
1489     }
1490     return 0;
1491   } else
1492     return objptr==ptr;
1493 }
1494
1495 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
1496   /* hasNext has all of the intelligence */
1497   if(it->istag) {
1498     /* Iterate tag */
1499     /* Get object with tags */
1500     struct ___Object___ *obj=objectarray[it->tagobjectslot];
1501     struct ___Object___ *tagptr=obj->___tags___;
1502     if (tagptr->type==TAGTYPE) {
1503       it->tagobjindex++;
1504       objectarray[it->slot]=tagptr;
1505     } else {
1506       struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1507       objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
1508     }
1509   } else if (it->numtags>0) {
1510     /* Use tags to locate appropriate objects */
1511     struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1512     struct ___Object___ *objptr=tag->flagptr;
1513     if (objptr->type!=OBJECTARRAYTYPE) {
1514       it->tagobjindex++;
1515       objectarray[it->slot]=objptr;
1516     } else {
1517       struct ArrayObject *ao=(struct ArrayObject *) objptr;
1518       objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
1519     }
1520   } else {
1521     /* Iterate object */
1522     objectarray[it->slot]=(void *)Objkey(&it->it);
1523     Objnext(&it->it);
1524   }
1525 }
1526 #endif