#include "checkpoint.h"
#include "Queue.h"
#include "SimpleHash.h"
-#include "task.h"
#include "GenericHashtable.h"
-struct SimpleHash * activetasks;
+struct Queue * activetasks;
struct parameterwrapper * objectqueues[NUMCLASSES];
+struct genhashtable * failedtasks;
int main(int argc, char **argv) {
int i;
/* Allocate startup object */
struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc);
-
- activetasks=allocateSimpleHash(50);
+ failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
+ (int (*)(void *,void *)) &comparetpd);
+
+ activetasks=createQueue();
/* Set flags */
processtasks();
executetasks();
}
-int hashCodeftd(struct failedtaskdescriptor *ftd) {
- int hash=ftd->task;
+int hashCodetpd(struct taskparamdescriptor *ftd) {
+ int hash=(int)ftd->task;
int i;
for(i=0;i<ftd->numParameters;i++) {
- hash^=(int)parameterArray[i];
+ hash^=(int)ftd->parameterArray[i];
}
return hash;
}
-int compareftd(struct failedtaskdescriptor *ftd1, failedtaskdescriptor *ftd2) {
+int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
int i;
if (ftd1->task!=ftd2->task)
return 0;
void flagorand(void * ptr, int ormask, int andmask) {
int flag=((int *)ptr)[1];
- struct QueueItem *flagptr=(struct QueueItem *)(((int*)ptr)[2]);
+ struct SimpleHash *flagptr=(struct SimpleHash *)(((int*)ptr)[2]);
flag|=ormask;
flag&=andmask;
((int*)ptr)[1]=flag;
/*Remove from all queues */
while(flagptr!=NULL) {
- struct QueueItem * next=flagptr->nextqueue;
- removeItem(flagptr->queue, flagptr);
+ struct SimpleHash *next;
+ SimpleHashget(flagptr, (int) ptr, (int *) &next);
+ SimpleHashremove(flagptr, (int)ptr, (int) next);
flagptr=next;
}
struct QueueItem *tmpptr;
struct parameterwrapper * parameter=objectqueues[((int *)ptr)[0]];
int i;
- flagptr=NULL;
+ struct SimpleHash * prevptr=NULL;
while(parameter!=NULL) {
for(i=0;i<parameter->numberofterms;i++) {
int andmask=parameter->intarray[i*2];
int checkmask=parameter->intarray[i*2+1];
if ((flag&andmask)==checkmask) {
- struct QueueItem * qitem=addNewItem(parameter->queue, ptr);
- if (flagptr==NULL) {
- flagptr=qitem;
- tmpptr=flagptr;
- } else {
- tmpptr->nextqueue=qitem;
- tmpptr=qitem;
+ SimpleHashadd(parameter->objectset, (int) ptr, (int) prevptr);
+ prevptr=parameter->objectset;
+ {
+ struct SimpleIterator iteratorarray[MAXTASKPARAMS];
+ void * taskpointerarray[MAXTASKPARAMS];
+ int j;
+ int numparams=parameter->task->numParameters;
+ int done=1;
+ struct taskdescriptor * task=parameter->task;
+ int newindex=-1;
+ for(j=0;j<numparams;j++) {
+ struct parameterwrapper *pw=(struct parameterwrapper *)task->descriptorarray[j]->queue;
+ if (parameter==pw) {
+ taskpointerarray[j]=ptr;
+ newindex=j;
+ } else {
+ SimpleHashiterator(pw->objectset, &iteratorarray[j]);
+ if (hasNext(&iteratorarray[j]))
+ taskpointerarray[j]=(void *) next(&iteratorarray[j]);
+ else
+ break; /* No tasks to dispatch */
+ }
+ }
+ /* Queue task items... */
+
+ while(done) {
+ struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
+ tpd->task=task;
+ tpd->numParameters=numparams;
+ tpd->parameterArray=RUNMALLOC(sizeof(void *)*numparams);
+ for(j=0;j<numparams;j++)
+ tpd->parameterArray[j]=taskpointerarray[j];
+ /* Queue task */
+ if (!gencontains(failedtasks, tpd))
+ addNewItem(activetasks, tpd);
+
+ /* This loop iterates to the next paramter combination */
+ for(j=0;j<numparams;j++) {
+ if (j==newindex) {
+ if ((j+1)==numparams)
+ done=0;
+ continue;
+ }
+ if (hasNext(&iteratorarray[j])) {
+ taskpointerarray[j]=(void *) next(&iteratorarray[j]);
+ break;
+ } else if ((j+1)!=numparams) {
+ SimpleHashiterator(task->descriptorarray[j]->queue, &iteratorarray[j]);
+ } else {
+ done=0;
+ break;
+ }
+ }
+ }
}
- SimpleHashadd(activetasks, (int)parameter->task, (int)parameter->task);
break;
}
}
parameter=parameter->next;
}
- ((struct QueueItem **)ptr)[2]=flagptr;
+ ((struct SimpleHash **)ptr)[2]=prevptr;
}
}
}
void executetasks() {
- void * pointerarray[MAXTASKPARAMS];
- struct genhashtable * failedtasks=genallocatehashtable(&hashCodeftd, &compareftd);
+ void * taskpointerarray[MAXTASKPARAMS];
/* Set up signal handlers */
struct sigaction sig;
mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
newtask:
- while(SimpleHashcountset(activetasks)!=0) {
- struct taskdescriptor * task=(struct taskdescriptor *) SimpleHashfirstkey(activetasks);
+ while(!isEmpty(activetasks)) {
+ struct QueueItem * qi=(struct QueueItem *) getTail(activetasks);
+ struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr;
int i;
- for(i=0;i<task->numParameters;i++) {
- struct parameterwrapper * parameter=(struct parameterwrapper *) task->descriptorarray[i]->queue;
- struct Queue * queue=parameter->queue;
- if (isEmpty(queue)) {
- SimpleHashremove(activetasks, (int)task, (int)task);
+ removeItem(activetasks, qi);
+
+ for(i=0;i<tpd->task->numParameters;i++) {
+ void * parameter=tpd->parameterArray[i];
+ struct parameterdescriptor * pd=tpd->task->descriptorarray[i];
+ struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
+ if (!SimpleHashcontainskey(pw->objectset, (int) parameter))
goto newtask;
- }
- pointerarray[i]=getTail(queue)->objectptr;
+ taskpointerarray[i]=parameter;
}
{
struct SimpleHash * forward=allocateSimpleHash(100);
struct SimpleHash * reverse=allocateSimpleHash(100);
- void ** checkpoint=makecheckpoint(task->numParameters, pointerarray, forward, reverse);
+ void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse);
if (setjmp(error_handler)) {
/* Recover */
- struct failedtaskdescriptor *ftd=RUNMALLOC(sizeof(struct failedtaskdescriptor));
int h;
- ftd->task=task;
- ftd->numParameters=task->numParameters;
- ftd->parameterArray=RUNMALLOC(task->numParameters*sizeof(void *));
- for(j=0;j<task->numParameters;j++) {
- ftd->parameterArray[j]=pointerarray[j];
- }
- genputtable(failedtasks,ftd,ftd);
- restorecheckpoint(task->numParameters, pointerarray, checkpoint, forward, reverse);
- /* TODO: REMOVE TASK FROM QUEUE */
+ printf("Recovering\n");
+ genputtable(failedtasks,tpd,tpd);
+ restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse);
} else {
/* Actually call task */
- ((void (*) (void **)) task->taskptr)(pointerarray);
+ ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray);
}
}
}
struct parameterwrapper ** ptr=&objectqueues[param->type];
param->queue=parameter;
- parameter->queue=createQueue();
+ parameter->objectset=allocateSimpleHash(10);
parameter->numberofterms=param->numberterms;
parameter->intarray=param->intarray;
parameter->task=task;