Add support for multi-parameter tasks as well as tag in multi-core version
[IRC.git] / Robust / src / Runtime / multicoretask.c
1 #ifdef TASK
2 #include "runtime.h"
3 #include "structdefs.h"
4 #include "mem.h"
5 #include "checkpoint.h"
6 #include "Queue.h"
7 #include "SimpleHash.h"
8 #include "GenericHashtable.h"
9 #include <sys/select.h>
10 #include <sys/types.h>
11 #include <sys/mman.h>
12 #include <string.h>
13 #include <signal.h>
14 #include <assert.h>
15 #include <errno.h>
16 #ifdef RAW
17 #elif defined THREADSIMULATE
18 #if 0
19 #include <sys/mman.h> // for mmap
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23
24 int offset_transObj = 0;
25 #endif
26
27 // use POSIX message queue
28 // for each core, its message queue named as
29 // /msgqueue_corenum
30 #include <mqueue.h>
31 #include <sys/stat.h>
32 #endif
33 /*
34 extern int injectfailures;
35 extern float failurechance;
36 */
37 extern int debugtask;
38 extern int instaccum;
39
40 #ifdef CONSCHECK
41 #include "instrument.h"
42 #endif
43
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
47 struct RuntimeHash * forward;
48 struct RuntimeHash * reverse;
49
50 int corestatus[NUMCORES]; // records status of each core
51                           // 1: running tasks
52                                                   // 0: stall
53 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
54 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
55 #ifdef THREADSIMULATE
56 struct thread_data {
57         int corenum;
58         int argc;
59         char** argv;
60         int numsendobjs;
61         int numreceiveobjs;
62 };
63 struct thread_data thread_data_array[NUMCORES];
64 mqd_t mqd[NUMCORES];
65 static pthread_key_t key;
66 static struct RuntimeHash* locktbl;
67 static pthread_rwlock_t rwlock_tbl;
68 static pthread_rwlock_t rwlock_init;
69 #endif
70 bool transStallMsg(int targetcore);
71 void transTerminateMsg(int targetcore);
72 void run(void * arg);
73 bool getreadlock(void* ptr);
74 void releasereadlock(void* ptr);
75 bool getwritelock(void* ptr);
76 void releasewritelock(void* ptr);
77
78 int main(int argc, char **argv) {
79 #ifdef THREADSIMULATE
80         errno = 0;
81         int tids[NUMCORES];
82         int rc[NUMCORES];
83         pthread_t threads[NUMCORES];
84         int i = 0;
85
86         // initialize three arrays and msg queue array
87         char * pathhead = "/msgqueue_";
88         int targetlen = strlen(pathhead);
89         for(i = 0; i < NUMCORES; ++i) {
90                 corestatus[i] = 1;
91                 numsendobjs[i] = 0;
92                 numreceiveobjs[i] = 0;
93
94                 char corenumstr[3];
95                 int sourcelen = 0;
96                 if(i < 10) {
97                         corenumstr[0] = i + '0';
98                         corenumstr[1] = '\0';
99                         sourcelen = 1;
100                 } else if(i < 100) {
101                         corenumstr[1] = i %10 + '0';
102                         corenumstr[0] = (i / 10) + '0';
103                         corenumstr[2] = '\0';
104                         sourcelen = 2;
105                 } else {
106                         printf("Error: i >= 100\n");
107                         fflush(stdout);
108                         exit(-1);
109                 }
110                 char path[targetlen + sourcelen + 1];
111                 strcpy(path, pathhead);
112                 strncat(path, corenumstr, sourcelen);
113                 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
114                 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
115                 mq_unlink(path);
116                 mqd[i]= mq_open(path, oflags, omodes, NULL);
117                 if(mqd[i] == -1) {
118                         printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
119                         exit(-1);
120                 } else {
121                         printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
122                 }
123         }
124
125         // create the key
126         pthread_key_create(&key, NULL);
127
128         // create the lock table and initialize its mutex
129         locktbl = allocateRuntimeHash(20);
130         int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
131         printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
132
133 /*      if(argc < 2) {
134                 printf("Usage: <bin> <corenum>\n");
135                 fflush(stdout);
136                 exit(-1);
137         }
138
139         int cnum = 0;
140         char * number = argv[1];
141         int len = strlen(number);
142         for(i = 0; i < len; ++i) {
143                 cnum = (number[i] - '0') + cnum * 10;   
144         }
145 */
146         for(i = 0; i < NUMCORES; ++i) {
147         /*      if(STARTUPCORE == i) {
148                         continue;
149                 }*/
150                 thread_data_array[i].corenum = i;
151                 thread_data_array[i].argc = argc;
152                 thread_data_array[i].argv = argv;
153                 thread_data_array[i].numsendobjs = 0;
154                 thread_data_array[i].numreceiveobjs = 0;
155                 printf("[main] creating thread %d\n", i);
156                 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
157         if (rc[i]){
158                         printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
159                         fflush(stdout);
160                         exit(-1);
161                 }
162         }//*/
163         
164         /*// do stuff of startup core
165         thread_data_array[STARTUPCORE].corenum = STARTUPCORE;
166         thread_data_array[STARTUPCORE].argc = argc;// - 1;
167         thread_data_array[STARTUPCORE].argv = argv;//&argv[1];
168         thread_data_array[STARTUPCORE].numsendobjs = 0;
169         thread_data_array[STARTUPCORE].numreceiveobjs = 0;
170         run(&thread_data_array[STARTUPCORE]);*/
171         pthread_exit(NULL);
172 }
173
174 void run(void* arg) {
175         struct thread_data * my_tdata = (struct thread_data *)arg;
176         //corenum = my_tdata->corenum;
177         //void * ptr = malloc(sizeof(int));
178         //*((int*)ptr) = my_tdata->corenum;
179         pthread_setspecific(key, (void *)my_tdata->corenum);
180         int argc = my_tdata->argc;
181         char** argv = my_tdata->argv;
182         printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
183         fflush(stdout);
184
185 #endif
186
187 #ifdef BOEHM_GC
188   GC_init(); // Initialize the garbage collector
189 #endif
190 #ifdef CONSCHECK
191   initializemmap();
192 #endif
193   processOptions();
194   initializeexithandler();
195   /* Create table for failed tasks */
196   failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, 
197                                    (int (*)(void *,void *)) &comparetpd);
198   /* Create queue of active tasks */
199   activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, 
200                                    (int (*)(void *,void *)) &comparetpd);
201   
202   /* Process task information */
203   processtasks();
204
205   /* Create startup object */
206   createstartupobject(argc, argv);
207
208   /* Start executing the tasks */
209   executetasks();
210
211 #ifdef THREADSIMULATE
212
213   int i = 0;
214   // check if there are new objects coming
215   bool sendStall = false;
216
217   int numofcore = pthread_getspecific(key);
218   while(true) {
219           switch(receiveObject()) {
220                   case 0: {
221                                           printf("[run, %d] receive an object\n", numofcore);
222                                           sendStall = false;
223                                           // received an object
224                                           // check if there are new active tasks can be executed
225                                           executetasks();
226                                           break;
227                                   }
228                   case 1: {
229                                           //printf("[run, %d] no msg\n", numofcore);
230                                           // no msg received
231                                           if(STARTUPCORE == numofcore) {
232                                                   corestatus[numofcore] = 0;
233                                                   // check the status of all cores
234                                                   bool allStall = true;
235                                                   for(i = 0; i < NUMCORES; ++i) {
236                                                           if(corestatus[i] != 0) {
237                                                                   allStall = false;
238                                                                   break;
239                                                           }
240                                                   }
241                                                   if(allStall) {
242                                                           // check if the sum of send objs and receive obj are the same
243                                                           // yes->terminate
244                                                           // no->go on executing
245                                                           int sumsendobj = 0;
246                                                           for(i = 0; i < NUMCORES; ++i) {
247                                                                   sumsendobj += numsendobjs[i];
248                                                           }
249                                                           for(i = 0; i < NUMCORES; ++i) {
250                                                                   sumsendobj -= numreceiveobjs[i];
251                                                           }
252                                                           if(0 == sumsendobj) {
253                                                                   // terminate
254                                                                   // TODO
255                                                                  /* for(i = 0; i < NUMCORES; ++i) {
256                                                                           if(i != corenum) {
257                                                                                   transTerminateMsg(i);
258                                                                           }
259                                                                   }
260                                                                   mq_close(mqd[corenum]);*/
261                                                                   
262                                                                   // release all locks
263                                                                   struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl); 
264                                                                   while(0 != RunhasNext(it_lock)) {
265                                                                           int key = Runkey(it_lock);
266                                                                           pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
267                                                                           int rc_des = pthread_rwlock_destroy(rwlock_obj);
268                                                                           printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
269                                                                   }
270                                                                   freeRuntimeHash(locktbl);
271                                                                   locktbl = NULL;
272                                                                   RUNFREE(it_lock);
273
274                                                                   // destroy all message queues
275                                                                   char * pathhead = "/msgqueue_";
276                                                                   int targetlen = strlen(pathhead);
277                                                                   for(i = 0; i < NUMCORES; ++i) {
278                                                                           char corenumstr[3];
279                                                                           int sourcelen = 0;
280                                                                           if(i < 10) {
281                                                                                   corenumstr[0] = i + '0';
282                                                                                   corenumstr[1] = '\0';
283                                                                                   sourcelen = 1;
284                                                                           } else if(i < 100) {
285                                                                                   corenumstr[1] = i %10 + '0';
286                                                                                   corenumstr[0] = (i / 10) + '0';
287                                                                                   corenumstr[2] = '\0';
288                                                                                   sourcelen = 2;
289                                                                           } else {
290                                                                                   printf("Error: i >= 100\n");  
291                                                                                   fflush(stdout);
292                                                                                   exit(-1);
293                                                                           }
294                                                                           char path[targetlen + sourcelen + 1];
295                                                                           strcpy(path, pathhead);
296                                                                           strncat(path, corenumstr, sourcelen);
297                                                                           mq_unlink(path);
298                                                                   }
299
300                                                                   printf("[run, %d] terminate!\n", numofcore);
301                                                                   fflush(stdout);
302                                                                   exit(0);
303                                                           }
304                                                   }
305                                           } else {
306                                                   if(!sendStall) {
307                                                           // send StallMsg to startup core
308                                                           sendStall = transStallMsg(STARTUPCORE);
309                                                   }
310                                           }
311                                           break;
312                                   }
313                   case 2: {
314                                           printf("[run, %d] receive a stall msg\n", numofcore);
315                                           // receive a Stall Msg, do nothing
316                                           assert(STARTUPCORE == numofcore); // only startup core can receive such msg
317                                           sendStall = false;
318                                           break;
319                                   }
320                  /* case 3: {
321                                           printf("[run, %d] receive a terminate msg\n", numofcore);
322                                           // receive a terminate Msg
323                                           assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
324                                           mq_close(mqd[corenum]);
325                                           fflush(stdout);
326                                           exit(0);
327                                           break;
328                                   }*/
329                   default: {
330                                            printf("[run, %d] Error: invalid message type.\n", numofcore);
331                                            fflush(stdout);
332                                            exit(-1);
333                                            break;
334                                    }
335           }
336   }
337 #endif
338 }
339
340 void createstartupobject(int argc, char ** argv) {
341   int i;
342   
343   /* Allocate startup object     */
344 #ifdef PRECISE_GC
345   struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
346   struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1); 
347 #else
348   struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
349   struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1); 
350 #endif
351   /* Build array of strings */
352   startupobject->___parameters___=stringarray;
353   for(i=1;i<argc;i++) {
354     int length=strlen(argv[i]);
355 #ifdef PRECISE_GC
356     struct ___String___ *newstring=NewString(NULL, argv[i],length);
357 #else
358     struct ___String___ *newstring=NewString(argv[i],length);
359 #endif
360     ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
361   }
362   
363   startupobject->isolate = 1;
364   startupobject->version = 0;
365
366   /* Set initialized flag for startup object */ 
367   flagorandinit(startupobject,1,0xFFFFFFFF);
368   enqueueObject(startupobject, NULL, 0);
369   //enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]);
370 }
371
372 int hashCodetpd(struct taskparamdescriptor *ftd) {
373   int hash=(int)ftd->task;
374   int i;
375   for(i=0;i<ftd->numParameters;i++){ 
376     hash^=(int)ftd->parameterArray[i];
377   }
378   return hash;
379 }
380
381 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
382   int i;
383   if (ftd1->task!=ftd2->task)
384     return 0;
385   for(i=0;i<ftd1->numParameters;i++)
386     if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
387       return 0;
388   return 1;
389 }
390
391 /* This function sets a tag. */
392 #ifdef PRECISE_GC
393 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
394 #else
395 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
396 #endif
397   struct ___Object___ * tagptr=obj->___tags___;
398   if (tagptr==NULL) {
399     obj->___tags___=(struct ___Object___ *)tagd;
400   } else {
401     /* Have to check if it is already set */
402     if (tagptr->type==TAGTYPE) {
403       struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
404       if (td==tagd)
405         return;
406 #ifdef PRECISE_GC
407       int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
408       struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
409       obj=(struct ___Object___ *)ptrarray[2];
410       tagd=(struct ___TagDescriptor___ *)ptrarray[3];
411       td=(struct ___TagDescriptor___ *) obj->___tags___;
412 #else
413       struct ArrayObject * ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
414 #endif
415       ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
416       ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
417       obj->___tags___=(struct ___Object___ *) ao;
418       ao->___cachedCode___=2;
419     } else {
420       /* Array Case */
421       int i;
422       struct ArrayObject *ao=(struct ArrayObject *) tagptr;
423       for(i=0;i<ao->___cachedCode___;i++) {
424         struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
425         if (td==tagd)
426           return;
427       }
428       if (ao->___cachedCode___<ao->___length___) {
429         ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
430         ao->___cachedCode___++;
431       } else {
432 #ifdef PRECISE_GC
433         int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
434         struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
435         obj=(struct ___Object___ *)ptrarray[2];
436         tagd=(struct ___TagDescriptor___ *) ptrarray[3];
437         ao=(struct ArrayObject *)obj->___tags___;
438 #else
439         struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
440 #endif
441         aonew->___cachedCode___=ao->___length___+1;
442         for(i=0;i<ao->___length___;i++) {
443           ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
444         }
445         ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
446       }
447     }
448   }
449
450   {
451     struct ___Object___ * tagset=tagd->flagptr;
452     if(tagset==NULL) {
453       tagd->flagptr=obj;
454     } else if (tagset->type!=OBJECTARRAYTYPE) {
455 #ifdef PRECISE_GC
456       int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
457       struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
458       obj=(struct ___Object___ *)ptrarray[2];
459       tagd=(struct ___TagDescriptor___ *)ptrarray[3];
460 #else
461       struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
462 #endif
463       ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
464       ARRAYSET(ao, struct ___Object___ *, 1, obj);
465       ao->___cachedCode___=2;
466       tagd->flagptr=(struct ___Object___ *)ao;
467     } else {
468       struct ArrayObject *ao=(struct ArrayObject *) tagset;
469       if (ao->___cachedCode___<ao->___length___) {
470         ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
471       } else {
472         int i;
473 #ifdef PRECISE_GC
474         int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
475         struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
476         obj=(struct ___Object___ *)ptrarray[2];
477         tagd=(struct ___TagDescriptor___ *)ptrarray[3];
478         ao=(struct ArrayObject *)tagd->flagptr;
479 #else
480         struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
481 #endif
482         aonew->___cachedCode___=ao->___cachedCode___+1;
483         for(i=0;i<ao->___length___;i++) {
484           ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
485         }
486         ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
487         tagd->flagptr=(struct ___Object___ *) aonew;
488       }
489     }
490   }
491 }
492
493 /* This function clears a tag. */
494 #ifdef PRECISE_GC
495 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
496 #else
497 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
498 #endif
499   /* We'll assume that tag is alway there.
500      Need to statically check for this of course. */
501   struct ___Object___ * tagptr=obj->___tags___;
502
503   if (tagptr->type==TAGTYPE) {
504     if ((struct ___TagDescriptor___ *)tagptr==tagd)
505       obj->___tags___=NULL;
506     else
507       printf("ERROR 1 in tagclear\n");
508   } else {
509     struct ArrayObject *ao=(struct ArrayObject *) tagptr;
510     int i;
511     for(i=0;i<ao->___cachedCode___;i++) {
512       struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
513       if (td==tagd) {
514         ao->___cachedCode___--;
515         if (i<ao->___cachedCode___)
516           ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
517         ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
518         if (ao->___cachedCode___==0)
519           obj->___tags___=NULL;
520         goto PROCESSCLEAR;
521       }
522     }
523     printf("ERROR 2 in tagclear\n");
524   }
525  PROCESSCLEAR:
526   {
527     struct ___Object___ *tagset=tagd->flagptr;
528     if (tagset->type!=OBJECTARRAYTYPE) {
529       if (tagset==obj)
530         tagd->flagptr=NULL;
531       else
532         printf("ERROR 3 in tagclear\n");
533     } else {
534       struct ArrayObject *ao=(struct ArrayObject *) tagset;
535       int i;
536       for(i=0;i<ao->___cachedCode___;i++) {
537         struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
538         if (tobj==obj) {
539           ao->___cachedCode___--;
540           if (i<ao->___cachedCode___)
541             ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
542           ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
543           if (ao->___cachedCode___==0)
544             tagd->flagptr=NULL;
545           goto ENDCLEAR;
546         }
547       }
548       printf("ERROR 4 in tagclear\n");
549     }
550   }
551  ENDCLEAR:
552   return;
553 }
554  
555 /* This function allocates a new tag. */
556 #ifdef PRECISE_GC
557 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
558   struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
559 #else
560 struct ___TagDescriptor___ * allocate_tag(int index) {
561   struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
562 #endif
563   v->type=TAGTYPE;
564   v->flag=index;
565   return v;
566
567
568
569
570 /* This function updates the flag for object ptr.  It or's the flag
571    with the or mask and and's it with the andmask. */
572
573 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
574  
575  int flagcomp(const int *val1, const int *val2) {
576    return (*val1)-(*val2);
577  } 
578
579 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
580     {
581       int oldflag=((int *)ptr)[1];
582       int flag=ormask|oldflag;
583       flag&=andmask;
584           flagbody(ptr, flag, queues, length, false);
585     }
586 }
587  
588 bool intflagorand(void * ptr, int ormask, int andmask) {
589     {
590       int oldflag=((int *)ptr)[1];
591       int flag=ormask|oldflag;
592       flag&=andmask;
593       if (flag==oldflag) /* Don't do anything */
594         return false;
595       else {
596                  flagbody(ptr, flag, NULL, 0, false);
597                  return true;
598           }
599     }
600 }
601
602 void flagorandinit(void * ptr, int ormask, int andmask) {
603   int oldflag=((int *)ptr)[1];
604   int flag=ormask|oldflag;
605   flag&=andmask;
606   flagbody(ptr,flag,NULL,0,true);
607 }
608
609 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
610   struct parameterwrapper * flagptr = NULL;
611   int i = 0;
612   struct parameterwrapper ** queues = vqueues;
613   int length = vlength;
614   if((!isnew) && (queues == NULL)) {
615 #ifdef THREADSIMULATE
616           int numofcore = pthread_getspecific(key);
617           queues = objectqueues[numofcore][ptr->type];
618           length = numqueues[numofcore][ptr->type];
619 #else
620           queues = objectqueues[corenum][ptr->type];
621           length = numqueues[corenum][ptr->type];
622 #endif
623   }
624   ptr->flag=flag;
625   
626   /*Remove object from all queues */
627   for(i = 0; i < length; ++i) {
628           flagptr = queues[i];
629         int next;
630     int UNUSED, UNUSED2;
631     int * enterflags;
632     ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
633     ObjectHashremove(flagptr->objectset, (int)ptr);
634     if (enterflags!=NULL)
635       free(enterflags);
636   }
637  }
638
639  void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
640    struct ___Object___ *ptr = (struct ___Object___ *)vptr;
641   
642   {
643     struct QueueItem *tmpptr;
644         struct parameterwrapper * parameter=NULL;
645         int j;
646         struct parameterwrapper ** queues = vqueues;
647         int length = vlength;
648         if(queues == NULL) {
649 #ifdef THREADSIMULATE
650                 int numofcore = pthread_getspecific(key);
651                 queues = objectqueues[numofcore][ptr->type];
652                 length = numqueues[numofcore][ptr->type];
653 #else
654                 queues = objectqueues[corenum][ptr->type];
655                 length = numqueues[corenum][ptr->type];
656 #endif
657         }
658     int i;
659     struct parameterwrapper * prevptr=NULL;
660     struct ___Object___ *tagptr=ptr->___tags___;
661     
662     /* Outer loop iterates through all parameter queues an object of
663        this type could be in.  */
664     
665         for(j = 0; j < length; ++j) {
666                 parameter = queues[j];
667       /* Check tags */
668       if (parameter->numbertags>0) {
669         if (tagptr==NULL)
670           goto nextloop;//that means the object has no tag but that param needs tag
671         else if(tagptr->type==TAGTYPE) {//one tag
672           struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
673           for(i=0;i<parameter->numbertags;i++) {
674             //slotid is parameter->tagarray[2*i];
675             int tagid=parameter->tagarray[2*i+1];
676             if (tagid!=tagptr->flag)
677               goto nextloop; /*We don't have this tag */          
678            }
679         } else {//multiple tags
680           struct ArrayObject * ao=(struct ArrayObject *) tagptr;
681           for(i=0;i<parameter->numbertags;i++) {
682             //slotid is parameter->tagarray[2*i];
683             int tagid=parameter->tagarray[2*i+1];
684             int j;
685             for(j=0;j<ao->___cachedCode___;j++) {
686               if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
687                 goto foundtag;
688             }
689             goto nextloop;
690           foundtag:
691             ;
692           }
693         }
694       }
695       
696       /* Check flags */
697       for(i=0;i<parameter->numberofterms;i++) {
698         int andmask=parameter->intarray[i*2];
699         int checkmask=parameter->intarray[i*2+1];
700         if ((ptr->flag&andmask)==checkmask) {
701           enqueuetasks(parameter, prevptr, ptr, NULL, 0);
702           prevptr=parameter;
703           break;
704         }
705       }
706     nextloop:
707           ;
708     }
709   }
710 }
711
712 // transfer an object to targetcore
713 // format: object
714 void transferObject(void * obj, int targetcore) {
715         int type=((int *)obj)[0];
716         assert(type < NUMCLASSES); // can only transfer normal object
717     int size=classsize[type];
718
719 #ifdef RAW
720
721 #elif defined THREADSIMULATE
722 #if 0
723     // use shared memory to transfer objects between cores
724         int fd = 0; // mapped file
725         void * p_map = NULL;
726         char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
727         int offset;
728         // open the file 
729         fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
730         offset = lseek(fd, 0, SEEK_CUR);
731         if(offset == -1) {
732                 printf("fail to open file " + filepath + " in transferObject.\n");
733                 fflush(stdout);
734                 exit(-1);
735         }
736         lseek(fd, size + sizeof(int)*2, SEEK_CUR);
737         write(fd, "", 1); 
738         p_map = (void *)mmap(NULL,size+sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
739         close(fd);
740         memcpy(p_map, type, sizeof(int));
741         memcpy(p_map+sizeof(int), corenum, sizeof(int));
742         memcpy((p_map+sizeof(int)*2), obj, size);
743         munmap(p_map, size+sizeof(int)*2); 
744         //printf( "umap ok \n" );
745 #endif
746
747         int numofcore = pthread_getspecific(key);
748
749         // use POSIX message queue to transfer objects between cores
750         mqd_t mqdnum;
751         char corenumstr[3];
752         int sourcelen = 0;
753         if(targetcore < 10) {
754                 corenumstr[0] = targetcore + '0';
755                 corenumstr[1] = '\0';
756                 sourcelen = 1;
757         } else if(targetcore < 100) {
758                 corenumstr[1] = targetcore % 10 + '0';
759                 corenumstr[0] = (targetcore / 10) + '0';
760                 corenumstr[2] = '\0';
761                 sourcelen = 2;
762         } else {
763                 printf("Error: targetcore >= 100\n");
764                 fflush(stdout);
765                 exit(-1);
766         }
767         char * pathhead = "/msgqueue_";
768         int targetlen = strlen(pathhead);
769         char path[targetlen + sourcelen + 1];
770         strcpy(path, pathhead);
771         strncat(path, corenumstr, sourcelen);
772         int oflags = O_WRONLY|O_NONBLOCK;
773         int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
774         mqdnum = mq_open(path, oflags, omodes, NULL);
775         if(mqdnum==-1) {
776                 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
777                 fflush(stdout);
778                 exit(-1);
779         }
780         struct ___Object___ * newobj = (struct ___Object___ *)obj;
781         if(0 == newobj->isolate) {
782                 newobj = RUNMALLOC(size);
783                 memcpy(newobj, obj, size);
784                 newobj->original=obj;
785         }
786         int ret;
787         do {
788                 ret=mq_send(mqdnum, (void *)newobj, size, 0); // send the object into the queue
789                 if(ret != 0) {
790                         printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
791                 }
792         }while(ret!=0); 
793         if(numofcore == STARTUPCORE) {
794                 ++numsendobjs[numofcore];
795         } else {
796                 ++(thread_data_array[numofcore].numsendobjs);
797         }
798         printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
799 #endif
800 }
801
802 // send terminate message to targetcore
803 // format: -1
804 bool transStallMsg(int targetcore) {
805         struct ___Object___ newobj;
806         // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
807         newobj.type = -1;
808
809 #ifdef RAW
810         newobj.flag = corenum;
811         newobj.___cachedHash___ = thread_data_array[corenum].numsendobjs;
812         newobj.___cachedCode___ = thread_data_array[corenum].numreceiveobjs;
813
814 #elif defined THREADSIMULATE
815         int numofcore = pthread_getspecific(key);
816         newobj.flag = numofcore;
817         newobj.___cachedHash___ = thread_data_array[numofcore].numsendobjs;
818         newobj.___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
819 #if 0
820     // use shared memory to transfer objects between cores
821         int fd = 0; // mapped file
822         void * p_map = NULL;
823         char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
824         int offset;
825         // open the file 
826         fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
827         offset = lseek(fd, 0, SEEK_CUR);
828         if(offset == -1) {
829                 printf("fail to open file " + filepath + " in transferObject.\n");
830                 fflush(stdout);
831                 exit(-1);
832         }
833         lseek(fd, sizeof(int)*2, SEEK_CUR);
834         write(fd, "", 1); 
835         p_map = (void *)mmap(NULL,sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
836         close(fd);
837         memcpy(p_map, type, sizeof(int));
838         memcpy(p_map+sizeof(int), corenum, sizeof(int));
839         munmap(p_map, sizeof(int)*2); 
840         //printf( "umap ok \n" );
841 #endif
842
843         // use POSIX message queue to send stall msg to startup core
844         assert(targetcore == STARTUPCORE);
845         mqd_t mqdnum;
846         char corenumstr[3];
847         int sourcelen = 0;
848         if(targetcore < 10) {
849                 corenumstr[0] = targetcore + '0';
850                 corenumstr[1] = '\0';
851                 sourcelen = 1;
852         } else if(targetcore < 100) {
853                 corenumstr[1] = targetcore % 10 + '0';
854                 corenumstr[0] = (targetcore / 10) + '0';
855                 corenumstr[2] = '\0';
856                 sourcelen = 2;
857         } else {
858                 printf("Error: targetcore >= 100\n");
859                 fflush(stdout);
860                 exit(-1);
861         }
862         char * pathhead = "/msgqueue_";
863         int targetlen = strlen(pathhead);
864         char path[targetlen + sourcelen + 1];
865         strcpy(path, pathhead);
866         strncat(path, corenumstr, sourcelen);
867         int oflags = O_WRONLY|O_NONBLOCK;
868         int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
869         mqdnum = mq_open(path, oflags, omodes, NULL);
870         if(mqdnum==-1) {
871                 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
872                 fflush(stdout);
873                 exit(-1);
874         }
875         int ret;
876         ret=mq_send(mqdnum, (void *)&newobj, sizeof(struct ___Object___), 0); // send the object into the queue
877         if(ret != 0) {
878                 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
879                 return false;
880         } else {
881                 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
882                 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj.flag, newobj.___cachedHash___, newobj.___cachedCode___);
883                 return true;
884         }
885 #endif
886 }
887 #if 0
888 // send terminate message to targetcore
889 // format: -1
890 void transTerminateMsg(int targetcore) {
891         // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
892         int type = -2;
893
894 #ifdef RAW
895
896 #elif defined THREADSIMULATE
897
898         // use POSIX message queue to send stall msg to startup core
899         assert(targetcore != STARTUPCORE);
900         mqd_t mqdnum;
901         char corenumstr[3];
902         int sourcelen = 0;
903         if(targetcore < 10) {
904                 corenumstr[0] = targetcore + '0';
905                 corenumstr[1] = '\0';
906                 sourcelen = 1;
907         } else if(corenum < 100) {
908                 corenumstr[1] = targetcore % 10 + '0';
909                 corenumstr[0] = (targetcore / 10) + '0';
910                 corenumstr[2] = '\0';
911                 sourcelen = 2;
912         } else {
913                 printf("Error: targetcore >= 100\n");
914                 fflush(stdout);
915                 exit(-1);
916         }
917         char * pathhead = "/msgqueue_";
918         int targetlen = strlen(pathhead);
919         char path[targetlen + sourcelen + 1];
920         strcpy(path, pathhead);
921         strncat(path, corenumstr, sourcelen);
922         int oflags = O_WRONLY|O_NONBLOCK;
923         int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
924         mqdnum = mq_open(path, oflags, omodes, NULL);
925         if(mqdnum==-1) {
926                 printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
927                 fflush(stdout);
928                 exit(-1);
929         }
930         int ret;
931         do {
932                 ret=mq_send(mqdnum, (void *)&type, sizeof(int), 0); // send the object into the queue
933                 if(ret != 0) {
934                         printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
935                 }
936         }while(ret != 0);
937 #endif
938 }
939 #endif
940 // receive object transferred from other cores
941 // or the terminate message from other cores
942 // format: type [+ object]
943 // type: -1--stall msg
944 //      !-1--object
945 // return value: 0--received an object
946 //               1--received nothing
947 //               2--received a Stall Msg
948 int receiveObject() {
949 #ifdef RAW
950
951 #elif defined THREADSIMULATE
952 #if 0
953     char * filepath = "/scratch/transObj/file_" + corenum + ".txt";
954         int fd = 0;
955         void * p_map = NULL;
956         int type = 0;
957         int sourcecorenum = 0;
958         int size = 0;
959         fd = open(filepath, O_CREAT|O_RDONLY, 00777);
960         lseek(fd, offset_transObj, SEEK_SET);
961         p_map = (void*)mmap(NULL,sizeof(int)*2,PROT_READ,MAP_SHARED,fd,offset_transObj);
962         type = *(int*)p_map;
963         sourcecorenum = *(int*)(p_map+sinzeof(int));
964         offset_transObj += sizeof(int)*2;
965         munmap(p_map,sizeof(int)*2);
966         if(type == -1) {
967                 // sourecorenum has terminated
968                 ++offset_transObj;
969                 return;
970         }
971         size = classsize[type];
972         p_map = (void*)mmap(NULL,size,PROT_READ,MAP_SHARED,fd,offset_transObj);
973         struct ___Object___ * newobj=RUNMALLOC(size);
974     memcpy(newobj, p_map, size);
975         ++offset_transObj;
976         enqueueObject(newobj,NULL,0);
977 #endif
978         int numofcore = pthread_getspecific(key);
979         // use POSIX message queue to transfer object
980         int msglen = 0;
981         struct mq_attr mqattr;
982         mq_getattr(mqd[numofcore], &mqattr);
983         void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
984         msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
985         if(-1 == msglen) {
986                 // no msg
987                 free(msgptr);
988                 return 1;
989         }
990         //printf("msg: %s\n",msgptr);
991         if(((int*)msgptr)[0] == -1) {
992                 // StallMsg
993                 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
994                 int index = tmpptr->flag;
995                 corestatus[index] = 0;
996                 numsendobjs[index] = tmpptr->___cachedHash___;
997                 numreceiveobjs[index] = tmpptr->___cachedCode___;
998                 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
999                 free(msgptr);
1000                 return 2;
1001         } /*else if(((int*)msgptr)[0] == -2) {
1002                 // terminate msg
1003                 return 3;
1004         } */else {
1005                 // an object
1006                 if(numofcore == STARTUPCORE) {
1007                         ++(numreceiveobjs[numofcore]);
1008                 } else {
1009                         ++(thread_data_array[numofcore].numreceiveobjs);
1010                 }
1011                 struct ___Object___ * newobj=RUNMALLOC(msglen);
1012                 memcpy(newobj, msgptr, msglen);
1013                 free(msgptr);
1014                 enqueueObject(newobj, NULL, 0);
1015                 return 0;
1016         }
1017 #endif
1018 }
1019
1020 bool getreadlock(void * ptr) {
1021 #ifdef RAW
1022
1023 #elif defined THREADSIMULATE
1024         int numofcore = pthread_getspecific(key);
1025
1026         int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1027         printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1028         if(EBUSY == rc) {
1029                 return false;
1030         }
1031         if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1032                 // no locks for this object
1033                 // first time to operate on this shared object
1034                 // create a lock for it
1035                 rc = pthread_rwlock_unlock(&rwlock_tbl);
1036                 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1037                 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1038                 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1039                 rc = pthread_rwlock_init(rwlock, NULL);
1040                 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1041                 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1042                 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1043                 if(EBUSY == rc) {
1044                         return false;
1045                 } else {
1046                         RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1047                         rc = pthread_rwlock_unlock(&rwlock_tbl);
1048                         printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1049                 }
1050                 //rc = pthread_rwlock_rdlock(&rwlock);
1051                 rc = pthread_rwlock_tryrdlock(rwlock);
1052                 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));       
1053                 if(EBUSY == rc) {
1054                         return false;
1055                 } else {
1056                         return true;
1057                 }
1058         } else {
1059                 pthread_rwlock_t* rwlock_obj = NULL;
1060                 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1061                 rc = pthread_rwlock_unlock(&rwlock_tbl);
1062                 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1063                 //int rc_obj = pthread_rwlock_rdlock(&rwlock_obj);
1064                 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1065                 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1066                 if(EBUSY == rc_obj) {
1067                         return false;
1068                 } else {
1069                         return true;
1070                 }
1071         }
1072 #endif
1073 }
1074
1075 void releasereadlock(void * ptr) {
1076 #ifdef RAW
1077
1078 #elif defined THREADSIMULATE
1079         int numofcore = pthread_getspecific(key);
1080         int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1081         printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1082         if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1083                 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1084                 exit(-1);
1085         }
1086         pthread_rwlock_t* rwlock_obj = NULL;
1087         RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1088         int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1089         printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1090         rc = pthread_rwlock_unlock(&rwlock_tbl);
1091         printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1092 #endif
1093 }
1094
1095 bool getwritelock(void * ptr) {
1096 #ifdef RAW
1097
1098 #elif defined THREADSIMULATE
1099         int numofcore = pthread_getspecific(key);
1100
1101         int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1102         printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1103         if(EBUSY == rc) {
1104                 return false;
1105         }
1106         if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1107                 // no locks for this object
1108                 // first time to operate on this shared object
1109                 // create a lock for it
1110                 rc = pthread_rwlock_unlock(&rwlock_tbl);
1111                 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1112                 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1113                 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1114                 rc = pthread_rwlock_init(rwlock, NULL);
1115                 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1116                 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1117                 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1118                 if(EBUSY == rc) {
1119                         return false;
1120                 } else {
1121                         RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1122                         rc = pthread_rwlock_unlock(&rwlock_tbl);
1123                         printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1124                 }
1125                 //rc = pthread_rwlock_wrlock(rwlock);
1126                 rc = pthread_rwlock_trywrlock(rwlock);
1127                 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));     
1128                 if(EBUSY == rc) {
1129                         return false;
1130                 } else {
1131                         return true;
1132                 }
1133         } else {
1134                 pthread_rwlock_t* rwlock_obj = NULL;
1135                 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1136                 rc = pthread_rwlock_unlock(&rwlock_tbl);
1137                 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1138                 //int rc_obj = pthread_rwlock_wrlock(rwlock_obj);
1139                 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
1140                 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1141                 if(EBUSY == rc_obj) {
1142                         return false;
1143                 } else {
1144                         return true;
1145                 }
1146         }
1147
1148 #endif
1149 }
1150
1151 void releasewritelock(void * ptr) {
1152 #ifdef RAW
1153
1154 #elif defined THREADSIMULATE
1155         int numofcore = pthread_getspecific(key);
1156         int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1157         printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1158         if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1159                 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1160                 exit(-1);
1161         }
1162         pthread_rwlock_t* rwlock_obj = NULL;
1163         RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1164         int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1165         printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1166         rc = pthread_rwlock_unlock(&rwlock_tbl);
1167         printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1168 #endif
1169 }
1170
1171 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
1172   void * taskpointerarray[MAXTASKPARAMS];
1173   int j;
1174   int numparams=parameter->task->numParameters;
1175   int numiterators=parameter->task->numTotal-1;
1176   int retval=1;
1177   int addnormal=1;
1178   int adderror=1;
1179
1180   struct taskdescriptor * task=parameter->task;
1181
1182         ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
1183  
1184   /* Add enqueued object to parameter vector */
1185   taskpointerarray[parameter->slot]=ptr;
1186
1187   /* Reset iterators */
1188   for(j=0;j<numiterators;j++) {
1189     toiReset(&parameter->iterators[j]);
1190   }
1191
1192   /* Find initial state */
1193   for(j=0;j<numiterators;j++) {
1194   backtrackinit:
1195     if(toiHasNext(&parameter->iterators[j], taskpointerarray OPTARG(failed)))
1196       toiNext(&parameter->iterators[j], taskpointerarray OPTARG(failed));
1197     else if (j>0) {
1198       /* Need to backtrack */
1199       toiReset(&parameter->iterators[j]);
1200       j--;
1201       goto backtrackinit;
1202     } else {
1203       /* Nothing to enqueue */
1204       return retval;
1205     }
1206   }
1207
1208   
1209   while(1) {
1210     /* Enqueue current state */
1211     int launch = 0;
1212     struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
1213     tpd->task=task;
1214     tpd->numParameters=numiterators+1;
1215     tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
1216     for(j=0;j<=numiterators;j++){
1217       tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
1218     }
1219     /* Enqueue task */
1220     if ((!gencontains(failedtasks, tpd)&&!gencontains(activetasks,tpd))) {
1221       genputtable(activetasks, tpd, tpd);
1222     } else {
1223       RUNFREE(tpd->parameterArray);
1224       RUNFREE(tpd);
1225     }
1226     
1227     /* This loop iterates to the next parameter combination */
1228     if (numiterators==0)
1229       return retval;
1230
1231     for(j=numiterators-1; j<numiterators;j++) {
1232     backtrackinc:
1233       if(toiHasNext(&parameter->iterators[j], taskpointerarray OPTARG(failed)))
1234         toiNext(&parameter->iterators[j], taskpointerarray OPTARG(failed));
1235       else if (j>0) {
1236         /* Need to backtrack */
1237         toiReset(&parameter->iterators[j]);
1238         j--;
1239         goto backtrackinc;
1240       } else {
1241         /* Nothing more to enqueue */
1242         return retval;
1243       }
1244     }
1245   }
1246   return retval;
1247 }
1248  
1249 /* Handler for signals. The signals catch null pointer errors and
1250    arithmatic errors. */
1251
1252 void myhandler(int sig, siginfo_t *info, void *uap) {
1253   sigset_t toclear;
1254 #ifdef DEBUG
1255   printf("sig=%d\n",sig);
1256   printf("signal\n");
1257 #endif
1258   sigemptyset(&toclear);
1259   sigaddset(&toclear, sig);
1260   sigprocmask(SIG_UNBLOCK, &toclear,NULL); 
1261   longjmp(error_handler,1);
1262 }
1263
1264 fd_set readfds;
1265 int maxreadfd;
1266 struct RuntimeHash *fdtoobject;
1267
1268 void addreadfd(int fd) {
1269   if (fd>=maxreadfd)
1270     maxreadfd=fd+1;
1271   FD_SET(fd, &readfds);
1272 }
1273
1274 void removereadfd(int fd) {
1275   FD_CLR(fd, &readfds);
1276   if (maxreadfd==(fd+1)) {
1277     maxreadfd--;
1278     while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
1279       maxreadfd--;
1280   }
1281 }
1282
1283 #ifdef PRECISE_GC
1284 #define OFFSET 2
1285 #else
1286 #define OFFSET 0
1287 #endif
1288
1289 void executetasks() {
1290   void * taskpointerarray[MAXTASKPARAMS+OFFSET];
1291
1292   /* Set up signal handlers */
1293   struct sigaction sig;
1294   sig.sa_sigaction=&myhandler;
1295   sig.sa_flags=SA_SIGINFO;
1296   sigemptyset(&sig.sa_mask);
1297
1298   /* Catch bus errors, segmentation faults, and floating point exceptions*/
1299   sigaction(SIGBUS,&sig,0);
1300   sigaction(SIGSEGV,&sig,0);
1301   sigaction(SIGFPE,&sig,0);
1302   sigaction(SIGPIPE,&sig,0);
1303
1304   /* Zero fd set */
1305   FD_ZERO(&readfds);
1306   maxreadfd=0;
1307   fdtoobject=allocateRuntimeHash(100);
1308
1309   /* Map first block of memory to protected, anonymous page */
1310   mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
1311
1312   newtask:
1313   while((hashsize(activetasks)>0)||(maxreadfd>0)) {
1314
1315     /* Check if any filedescriptors have IO pending */
1316     if (maxreadfd>0) {
1317       int i;
1318       struct timeval timeout={0,0};
1319       fd_set tmpreadfds;
1320       int numselect;
1321       tmpreadfds=readfds;
1322       numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
1323       if (numselect>0) {
1324         /* Process ready fd's */
1325         int fd;
1326         for(fd=0;fd<maxreadfd;fd++) {
1327           if (FD_ISSET(fd, &tmpreadfds)) {
1328             /* Set ready flag on object */
1329             void * objptr;
1330             //      printf("Setting fd %d\n",fd);
1331             if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
1332               if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
1333                          enqueueObject(objptr, NULL, 0);
1334                   }
1335             }
1336           }
1337         }
1338       }
1339     }
1340
1341     /* See if there are any active tasks */
1342     if (hashsize(activetasks)>0) {
1343       int i;
1344       currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
1345       genfreekey(activetasks, currtpd);
1346       
1347       /* Check if this task has failed, allow a task that contains optional objects to fire */
1348       if (gencontains(failedtasks, currtpd)) {
1349         // Free up task parameter descriptor
1350         RUNFREE(currtpd->parameterArray);
1351         RUNFREE(currtpd);
1352         goto newtask;
1353       }
1354       int numparams=currtpd->task->numParameters;
1355       int numtotal=currtpd->task->numTotal;
1356       
1357           int isolateflags[numparams];
1358       /* Make sure that the parameters are still in the queues */
1359       for(i=0;i<numparams;i++) {
1360         void * parameter=currtpd->parameterArray[i];
1361         struct ___Object___ * tmpparam = (struct ___Object___ *)parameter;
1362         if(0 == tmpparam->isolate) {
1363                 isolateflags[i] = 0;
1364                 // shared object, need to flush with current value
1365                 //if(!getreadlock(tmpparam->original)) {
1366                 //      // fail to get read lock of the original object, try this task later
1367                 if(!getwritelock(tmpparam->original)) {
1368                         // fail to get write lock, release all obtained locks and try this task later
1369                         int j = 0;
1370                         for(j = 0; j < i; ++j) {
1371                                 if(0 == isolateflags[j]) {
1372                                         releasewritelock(taskpointerarray[j]);
1373                                 }
1374                         }
1375                         genputtable(activetasks, currtpd, currtpd);
1376                         goto newtask;
1377                 }
1378                 if(tmpparam->version != tmpparam->original->version) {
1379                         // flush this object
1380                         memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
1381                         //releasereadlock(tmpparam->original);
1382                         // fail to get write lock, release all obtained locks and try this task later
1383                         int j = 0;
1384                         for(j = 0; j < i; ++j) {
1385                                 if(0 == isolateflags[j]) {
1386                                         releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
1387                                 }
1388                         }
1389                         releasewritelock(tmpparam->original);
1390
1391                         // some task on another core has changed this object
1392                         // Free up task parameter descriptor
1393                         RUNFREE(currtpd->parameterArray);
1394                         RUNFREE(currtpd);
1395                         // dequeue this object
1396 #ifdef THREADSIMULATE
1397                         int numofcore = pthread_getspecific(key);
1398                         struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
1399                         int length = numqueues[numofcore][tmpparam->type];
1400 #else
1401                         struct parameterwrapper ** queues = objectqueues[corenum][tmpparam->type];
1402                         int length = numqueues[corenum][tmpparam->type];
1403 #endif
1404                         for(j = 0; j < length; ++j) {
1405                                 struct parameterwrapper * pw = queues[j];
1406                                 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
1407                                         int next;
1408                                         int UNUSED, UNUSED2;
1409                                         int * enterflags;
1410                                         ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1411                                         ObjectHashremove(pw->objectset, (int)tmpparam);
1412                                         if (enterflags!=NULL)
1413                                                 free(enterflags);
1414                                 }
1415                         }
1416                         // try to enqueue it again to check if it feeds other tasks;
1417                         enqueueObject(tmpparam, NULL, 0);
1418                         goto newtask;
1419                 }
1420                 //releasereadlock(tmpparam->original);
1421         } else {
1422                 isolateflags[i] = 1;
1423         }
1424         struct parameterdescriptor * pd=currtpd->task->descriptorarray[i];
1425         struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
1426         int j;
1427         /* Check that object is still in queue */
1428         {
1429           if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
1430             RUNFREE(currtpd->parameterArray);
1431             RUNFREE(currtpd);
1432             goto newtask;
1433           }
1434         }
1435       parameterpresent:
1436         ;
1437         /* Check that object still has necessary tags */
1438         for(j=0;j<pd->numbertags;j++) {
1439           int slotid=pd->tagarray[2*j]+numparams;
1440           struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
1441           if (!containstag(parameter, tagd)) {
1442             RUNFREE(currtpd->parameterArray);
1443             RUNFREE(currtpd);
1444             goto newtask;
1445           }
1446         }
1447         
1448         taskpointerarray[i+OFFSET]=parameter;
1449       }
1450       /* Copy the tags */
1451       for(;i<numtotal;i++) {
1452         taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
1453       }
1454
1455          for(i = 0; i < numparams; ++i) {
1456                   if(0 == isolateflags[i]) {
1457                           struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
1458                           // shared object, need to replace this copy with original one
1459                           /*if(!getwritelock(tmpparam->original)) {
1460                                   // fail to get write lock, release all obtained locks and try this task later
1461                                   int j = 0;
1462                                   for(j = 0; j < i; ++j) {
1463                                           if(0 == isolateflags[j]) {
1464                                                   releasewritelock(taskpointerarray[j]);
1465                                           }
1466                                   }
1467                                   genputtable(activetasks, tpd, tpd);
1468                                   goto newtask;
1469                           }*/
1470                           if(tmpparam != tmpparam->original) {
1471                                   taskpointerarray[i+OFFSET] = tmpparam->original;
1472                           }
1473                   }
1474           }
1475
1476       {
1477         /* Checkpoint the state */
1478         forward=allocateRuntimeHash(100);
1479         reverse=allocateRuntimeHash(100);
1480         //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
1481         int x;
1482         if (x=setjmp(error_handler)) {
1483           int counter;
1484           /* Recover */
1485           
1486 #ifdef DEBUG
1487           printf("Fatal Error=%d, Recovering!\n",x);
1488 #endif
1489          /*
1490           genputtable(failedtasks,currtpd,currtpd);
1491           //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
1492
1493           freeRuntimeHash(forward);
1494           freeRuntimeHash(reverse);
1495           freemalloc();
1496           forward=NULL;
1497           reverse=NULL;
1498           */
1499           fflush(stdout);
1500           exit(-1);
1501         } else {
1502           /*if (injectfailures) {
1503             if ((((double)random())/RAND_MAX)<failurechance) {
1504               printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
1505               longjmp(error_handler,10);
1506             }
1507           }*/
1508           /* Actually call task */
1509 #ifdef PRECISE_GC
1510           ((int *)taskpointerarray)[0]=currtpd->numParameters;
1511           taskpointerarray[1]=NULL;
1512 #endif
1513           if(debugtask){
1514             printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1515             ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1516             printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
1517           } else
1518             ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
1519
1520           for(i = 0; i < numparams; ++i) {
1521                   if(0 == isolateflags[i]) {
1522                           struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
1523                           releasewritelock(tmpparam);
1524                   }
1525           }
1526
1527           freeRuntimeHash(forward);
1528           freeRuntimeHash(reverse);
1529           freemalloc();
1530           // Free up task parameter descriptor
1531           RUNFREE(currtpd->parameterArray);
1532           RUNFREE(currtpd);
1533           forward=NULL;
1534           reverse=NULL;
1535         }
1536       }
1537     }
1538   }
1539 }
1540  
1541 /* This function processes an objects tags */
1542 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
1543   int i;
1544   
1545   for(i=0;i<pd->numbertags;i++) {
1546     int slotid=pd->tagarray[2*i];
1547     int tagid=pd->tagarray[2*i+1];
1548     
1549     if (statusarray[slotid+numparams]==0) {
1550       parameter->iterators[*iteratorcount].istag=1;
1551       parameter->iterators[*iteratorcount].tagid=tagid;
1552       parameter->iterators[*iteratorcount].slot=slotid+numparams;
1553       parameter->iterators[*iteratorcount].tagobjectslot=index;
1554       statusarray[slotid+numparams]=1;
1555       (*iteratorcount)++;
1556     }
1557   }
1558 }
1559
1560
1561 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
1562   int i;
1563   int tagcount=0;
1564   struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
1565
1566   parameter->iterators[*iteratorcount].istag=0;
1567   parameter->iterators[*iteratorcount].slot=index;
1568   parameter->iterators[*iteratorcount].objectset=objectset;
1569   statusarray[index]=1;
1570
1571   for(i=0;i<pd->numbertags;i++) {
1572     int slotid=pd->tagarray[2*i];
1573     int tagid=pd->tagarray[2*i+1];
1574     if (statusarray[slotid+numparams]!=0) {
1575       /* This tag has already been enqueued, use it to narrow search */
1576       parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
1577       tagcount++;
1578     }
1579   }
1580   parameter->iterators[*iteratorcount].numtags=tagcount;
1581
1582   (*iteratorcount)++;
1583 }
1584
1585 /* This function builds the iterators for a task & parameter */
1586
1587 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
1588   int statusarray[MAXTASKPARAMS];
1589   int i;
1590   int numparams=task->numParameters;
1591   int iteratorcount=0;
1592   for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
1593
1594   statusarray[index]=1; /* Initial parameter */
1595   /* Process tags for initial iterator */
1596   
1597   processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
1598   
1599   while(1) {
1600   loopstart:
1601     /* Check for objects with existing tags */
1602     for(i=0;i<numparams;i++) {
1603       if (statusarray[i]==0) {
1604         struct parameterdescriptor *pd=task->descriptorarray[i];
1605         int j;
1606         for(j=0;j<pd->numbertags;j++) {
1607           int slotid=pd->tagarray[2*j];
1608           if(statusarray[slotid+numparams]!=0) {
1609             processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1610             processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1611             goto loopstart;
1612           }
1613         }
1614       }
1615     }
1616
1617     /* Next do objects w/ unbound tags*/
1618
1619     for(i=0;i<numparams;i++) {
1620       if (statusarray[i]==0) {
1621         struct parameterdescriptor *pd=task->descriptorarray[i];
1622         if (pd->numbertags>0) {
1623           processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1624           processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1625           goto loopstart;
1626         }
1627       }
1628     }
1629
1630     /* Nothing with a tag enqueued */
1631
1632     for(i=0;i<numparams;i++) {
1633       if (statusarray[i]==0) {
1634         struct parameterdescriptor *pd=task->descriptorarray[i];
1635         processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
1636         processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
1637         goto loopstart;
1638       }
1639     }
1640
1641     /* Nothing left */
1642     return;
1643   }
1644 }
1645
1646  void printdebug() {
1647    int i;
1648    int j;
1649 #ifdef THREADSIMULATE
1650    int numofcore = pthread_getspecific(key);
1651    for(i=0;i<numtasks[numofcore];i++) {
1652            struct taskdescriptor * task=taskarray[numofcore][i];
1653 #else
1654    for(i=0;i<numtasks[corenum];i++) {
1655      struct taskdescriptor * task=taskarray[corenum][i];
1656 #endif
1657      printf("%s\n", task->name);
1658      for(j=0;j<task->numParameters;j++) {
1659        struct parameterdescriptor *param=task->descriptorarray[j];
1660        struct parameterwrapper *parameter=param->queue;
1661        struct ObjectHash * set=parameter->objectset;
1662        struct ObjectIterator objit;
1663        printf("  Parameter %d\n", j);
1664        ObjectHashiterator(set, &objit);
1665        while(ObjhasNext(&objit)) {
1666          struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
1667          struct ___Object___ * tagptr=obj->___tags___;
1668          int nonfailed=Objdata4(&objit);
1669          int numflags=Objdata3(&objit);
1670          int flags=Objdata2(&objit);
1671          Objnext(&objit);
1672          printf("    Contains %lx\n", obj);
1673          printf("      flag=%d\n", obj->flag); 
1674          if (tagptr==NULL) {
1675          } else if (tagptr->type==TAGTYPE) {
1676            printf("      tag=%lx\n",tagptr);
1677          } else {
1678            int tagindex=0;
1679            struct ArrayObject *ao=(struct ArrayObject *)tagptr;
1680            for(;tagindex<ao->___cachedCode___;tagindex++) {
1681              printf("      tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
1682            }
1683          }
1684        }
1685      }
1686    }
1687  }
1688  
1689
1690 /* This function processes the task information to create queues for
1691    each parameter type. */
1692
1693 void processtasks() {
1694   int i;
1695 #ifdef THREADSIMULATE
1696   int numofcore = pthread_getspecific(key);
1697   for(i=0;i<numtasks[numofcore];i++) {
1698           struct taskdescriptor *task=taskarray[numofcore][i];
1699 #else
1700   for(i=0;i<numtasks[corenum];i++) {
1701     struct taskdescriptor * task=taskarray[corenum][i];
1702 #endif
1703     int j;
1704
1705         /* Build objectsets */
1706         for(j=0;j<task->numParameters;j++) {
1707       struct parameterdescriptor *param=task->descriptorarray[j];
1708       struct parameterwrapper *parameter=param->queue;
1709           parameter->objectset=allocateObjectHash(10);
1710           parameter->task=task;
1711     }
1712
1713     /* Build iterators for parameters */
1714     for(j=0;j<task->numParameters;j++) {
1715       struct parameterdescriptor *param=task->descriptorarray[j];
1716       struct parameterwrapper *parameter=param->queue;
1717       builditerators(task, j, parameter);
1718     }
1719   }
1720 }
1721
1722 void toiReset(struct tagobjectiterator * it) {
1723   if (it->istag) {
1724     it->tagobjindex=0;
1725   } else if (it->numtags>0) {
1726     it->tagobjindex=0;
1727   } else {
1728     ObjectHashiterator(it->objectset, &it->it);
1729   }
1730 }
1731
1732 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
1733   if (it->istag) {
1734     /* Iterate tag */
1735     /* Get object with tags */
1736     struct ___Object___ *obj=objectarray[it->tagobjectslot];
1737     struct ___Object___ *tagptr=obj->___tags___;
1738     if (tagptr->type==TAGTYPE) {
1739       if ((it->tagobjindex==0)&& /* First object */
1740           (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
1741         return 1;
1742       else
1743         return 0;
1744     } else {
1745       struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1746       int tagindex=it->tagobjindex;
1747       for(;tagindex<ao->___cachedCode___;tagindex++) {
1748         struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
1749         if (td->flag==it->tagid) {
1750           it->tagobjindex=tagindex; /* Found right type of tag */
1751           return 1;
1752         }
1753       }
1754       return 0;
1755     }
1756   } else if (it->numtags>0) {
1757     /* Use tags to locate appropriate objects */
1758     struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1759     struct ___Object___ *objptr=tag->flagptr;
1760     int i;
1761     if (objptr->type!=OBJECTARRAYTYPE) {
1762       if (it->tagobjindex>0)
1763         return 0;
1764       if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1765         return 0;
1766       for(i=1;i<it->numtags;i++) {
1767         struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1768         if (!containstag(objptr,tag2))
1769           return 0;
1770       }
1771       return 1;
1772     } else {
1773       struct ArrayObject *ao=(struct ArrayObject *) objptr;
1774       int tagindex;
1775       int i;
1776       for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
1777         struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
1778         if (!ObjectHashcontainskey(it->objectset, (int) objptr))
1779           continue;
1780         for(i=1;i<it->numtags;i++) {
1781           struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
1782           if (!containstag(objptr,tag2))
1783             goto nexttag;
1784         }
1785         it->tagobjindex=tagindex;
1786         return 1;
1787       nexttag:
1788         ;
1789       }
1790       it->tagobjindex=tagindex;
1791       return 0;
1792     }
1793   } else {
1794     return ObjhasNext(&it->it);
1795   }
1796 }
1797
1798 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
1799   int j;
1800   struct ___Object___ * objptr=tag->flagptr;
1801   if (objptr->type==OBJECTARRAYTYPE) {
1802     struct ArrayObject *ao=(struct ArrayObject *)objptr;
1803     for(j=0;j<ao->___cachedCode___;j++) {
1804       if (ptr==ARRAYGET(ao, struct ___Object___*, j))
1805         return 1;
1806     }
1807     return 0;
1808   } else
1809     return objptr==ptr;
1810 }
1811
1812 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
1813   /* hasNext has all of the intelligence */
1814   if(it->istag) {
1815     /* Iterate tag */
1816     /* Get object with tags */
1817     struct ___Object___ *obj=objectarray[it->tagobjectslot];
1818     struct ___Object___ *tagptr=obj->___tags___;
1819     if (tagptr->type==TAGTYPE) {
1820       it->tagobjindex++;
1821       objectarray[it->slot]=tagptr;
1822     } else {
1823       struct ArrayObject *ao=(struct ArrayObject *) tagptr;
1824       objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
1825     }
1826   } else if (it->numtags>0) {
1827     /* Use tags to locate appropriate objects */
1828     struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
1829     struct ___Object___ *objptr=tag->flagptr;
1830     if (objptr->type!=OBJECTARRAYTYPE) {
1831       it->tagobjindex++;
1832       objectarray[it->slot]=objptr;
1833     } else {
1834       struct ArrayObject *ao=(struct ArrayObject *) objptr;
1835       objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
1836     }
1837   } else {
1838     /* Iterate object */
1839     objectarray[it->slot]=(void *)Objkey(&it->it);
1840     Objnext(&it->it);
1841   }
1842 }
1843 #endif