#ifdef TASK
#include "runtime.h"
+#ifndef RAW
#include "structdefs.h"
#include "mem.h"
#include "checkpoint.h"
#include <signal.h>
#include <assert.h>
#include <errno.h>
+#endif
#ifdef RAW
+#include <raw.h>
+#include <raw_compiler_defs.h>
+//#include <libints.h>
#elif defined THREADSIMULATE
-#if 0
-#include <sys/mman.h> // for mmap
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-int offset_transObj = 0;
-#endif
-
// use POSIX message queue
// for each core, its message queue named as
// /msgqueue_corenum
extern int debugtask;
extern int instaccum;
+#ifdef RAW
+#define TOTALCORE raw_get_num_tiles()
+#endif
+
#ifdef CONSCHECK
#include "instrument.h"
#endif
struct genhashtable * activetasks;
struct genhashtable * failedtasks;
struct taskparamdescriptor * currtpd;
+#ifndef RAW
struct RuntimeHash * forward;
struct RuntimeHash * reverse;
+#endif
int corestatus[NUMCORES]; // records status of each core
// 1: running tasks
// 0: stall
int numsendobjs[NUMCORES]; // records how many objects a core has sent out
int numreceiveobjs[NUMCORES]; // records how many objects a core has received
-#ifdef THREADSIMULATE
+#ifdef RAW
+struct RuntimeHash locktable;
+static struct RuntimeHash* locktbl = &locktable;
+void * curr_heapbase=0;
+void * curr_heaptop=0;
+int self_numsendobjs;
+int self_numreceiveobjs;
+int lockobj;
+int lockresult;
+bool lockflag;
+#ifndef INTERRUPT
+bool reside;
+#endif
+struct Queue objqueue;
+int msgdata[30];
+int msgtype;
+int msgdataindex;
+int msglength;
+void calCoords(int core_num, int* coordY, int* coordX);
+#elif defined THREADSIMULATE
+static struct RuntimeHash* locktbl;
struct thread_data {
int corenum;
int argc;
struct thread_data thread_data_array[NUMCORES];
mqd_t mqd[NUMCORES];
static pthread_key_t key;
-static struct RuntimeHash* locktbl;
static pthread_rwlock_t rwlock_tbl;
static pthread_rwlock_t rwlock_init;
+
+void run(void * arg);
#endif
+
bool transStallMsg(int targetcore);
void transTerminateMsg(int targetcore);
-void run(void * arg);
+int receiveObject();
bool getreadlock(void* ptr);
void releasereadlock(void* ptr);
+#ifdef RAW
+bool getreadlock_I(void* ptr);
+void releasereadlock_I(void* ptr);
+#endif
bool getwritelock(void* ptr);
void releasewritelock(void* ptr);
+#ifdef RAW
+void begin() {
+#else
int main(int argc, char **argv) {
-#ifdef THREADSIMULATE
+#endif
+#ifdef RAW
+ int i = 0;
+ int argc = 1;
+ char ** argv = NULL;
+ bool sendStall = false;
+ bool isfirst = true;
+ bool tocontinue = false;
+ struct QueueItem * objitem = NULL;
+ struct transObjInfo * objInfo = NULL;
+ int grount = 0;
+ bool allStall = true;
+ int sumsendobj = 0;
+
+#ifdef RAWDEBUG
+ raw_test_pass(0xee01);
+#endif
+ corenum = raw_get_abs_pos_x() + 4 * raw_get_abs_pos_y();
+
+ // initialize the arrays
+ if(STARTUPCORE == corenum) {
+ // startup core to initialize corestatus[]
+ for(i = 0; i < NUMCORES; ++i) {
+ corestatus[i] = 1;
+ numsendobjs[i] = 0; // assume all variables in RAW are local variables! MAY BE WRONG!!!
+ numreceiveobjs[i] = 0;
+ }
+ }
+ self_numsendobjs = 0;
+ self_numreceiveobjs = 0;
+ for(i = 0; i < 30; ++i) {
+ msgdata[i] = -1;
+ }
+ //msgdata = NULL;
+ msgtype = -1;
+ msgdataindex = 0;
+ msglength = 30;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee02);
+#endif
+
+ // create the lock table, lockresult table and obj queue
+ locktable.size = 20;
+ locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
+ /* Set allocation blocks*/
+ locktable.listhead=NULL;
+ locktable.listtail=NULL;
+ /*Set data counts*/
+ locktable.numelements = 0;
+ lockobj = 0;
+ lockresult = 0;
+ lockflag = false;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+ objqueue.head = NULL;
+ objqueue.tail = NULL;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee03);
+#endif
+
+#ifdef INTERRUPT
+ if (corenum < NUMCORES) {
+ // set up interrupts
+ setup_ints();
+ //setup_interrupts();
+ //start_gdn_avail_ints(recvMsg);
+ raw_user_interrupts_on();
+#ifdef RAWDEBUG
+ raw_test_pass(0xee04);
+#endif
+ }
+#endif
+
+#elif defined THREADSIMULATE
errno = 0;
int tids[NUMCORES];
int rc[NUMCORES];
int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
-/* if(argc < 2) {
- printf("Usage: <bin> <corenum>\n");
- fflush(stdout);
- exit(-1);
- }
-
- int cnum = 0;
- char * number = argv[1];
- int len = strlen(number);
- for(i = 0; i < len; ++i) {
- cnum = (number[i] - '0') + cnum * 10;
- }
-*/
for(i = 0; i < NUMCORES; ++i) {
- /* if(STARTUPCORE == i) {
- continue;
- }*/
thread_data_array[i].corenum = i;
thread_data_array[i].argc = argc;
thread_data_array[i].argv = argv;
fflush(stdout);
exit(-1);
}
- }//*/
+ }
- /*// do stuff of startup core
- thread_data_array[STARTUPCORE].corenum = STARTUPCORE;
- thread_data_array[STARTUPCORE].argc = argc;// - 1;
- thread_data_array[STARTUPCORE].argv = argv;//&argv[1];
- thread_data_array[STARTUPCORE].numsendobjs = 0;
- thread_data_array[STARTUPCORE].numreceiveobjs = 0;
- run(&thread_data_array[STARTUPCORE]);*/
- pthread_exit(NULL);
+ //pthread_exit(NULL);
+ while(true) {}
}
void run(void* arg) {
struct thread_data * my_tdata = (struct thread_data *)arg;
- //corenum = my_tdata->corenum;
- //void * ptr = malloc(sizeof(int));
- //*((int*)ptr) = my_tdata->corenum;
pthread_setspecific(key, (void *)my_tdata->corenum);
int argc = my_tdata->argc;
char** argv = my_tdata->argv;
#ifdef CONSCHECK
initializemmap();
#endif
+#ifndef RAW
processOptions();
+#endif
initializeexithandler();
+#ifdef RAWDEBUG
+ raw_test_pass(0xee05);
+#endif
/* Create table for failed tasks */
- failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
- (int (*)(void *,void *)) &comparetpd);
+#ifdef RAW
+ if(corenum > NUMCORES - 1) {
+ failedtasks = NULL;
+ activetasks = NULL;
+ while(true) {
+ receiveObject();
+ }
+ } else {
+#ifdef RAWDEBUG
+ raw_test_pass(0xee06);
+#endif
+#endif
+ /*failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
+ (int (*)(void *,void *)) &comparetpd);*/
+ failedtasks = NULL;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee07);
+#endif
/* Create queue of active tasks */
activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
(int (*)(void *,void *)) &comparetpd);
+#ifdef RAWDEBUG
+ raw_test_pass(0xee08);
+#endif
/* Process task information */
processtasks();
+#ifdef RAWDEBUG
+ raw_test_pass(0xee09);
+#endif
+ if(STARTUPCORE == corenum) {
/* Create startup object */
createstartupobject(argc, argv);
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xee0a);
+#endif
+#ifdef RAW
+#ifdef RAWDEBUG
+ raw_test_pass(0xee0b);
+#endif
+
+ while(true) {
+/*#ifndef INTERRUPT
+ while(receiveObject() != -1) {
+ }
+#endif*/
+
+ // check if there are new active tasks can be executed
+ executetasks();
+
+#ifndef INTERRUPT
+ while(receiveObject() != -1) {
+ }
+#endif
+
+#ifdef RAWDEBUG
+ raw_test_pass(0xee0c);
+#endif
+
+ // check if there are some pending objects, if yes, enqueue them and executetasks again
+ tocontinue = false;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee0d);
+#endif
+ while(!isEmpty(&objqueue)) {
+ void * obj = NULL;
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+#ifdef RAWDEBUG
+ raw_test_pass(0xeee1);
+#endif
+ sendStall = false;
+ tocontinue = true;
+ objitem = getTail(&objqueue);
+ //obj = objitem->objectptr;
+ objInfo = (struct transObjInfo *)objitem->objectptr;
+ obj = objInfo->objptr;
+#ifdef RAWDEBUG
+ raw_test_pass_reg((int)obj);
+#endif
+ // grab lock and flush the obj
+ getreadlock_I(obj);
+ while(!lockflag) {
+ receiveObject();
+ }
+ grount = lockresult;
+#ifdef RAWDEBUG
+ raw_test_pass_reg(grount);
+#endif
+
+ lockresult = 0;
+ lockobj = 0;
+ lockflag = false;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+
+ if(grount == 1) {
+ int k = 0;
+ raw_invalidate_cache_range(obj, classsize[((struct ___Object___ *)obj)->type]);
+ // flush the obj
+ /*for(k = 0; k < classsize[((struct ___Object___ *)obj)->type]; ++k) {
+ invalidateAddr(obj + k);
+ }*/
+ // enqueue the object
+ for(k = 0; k < objInfo->length; ++k) {
+ int taskindex = objInfo->queues[2 * k];
+ int paramindex = objInfo->queues[2 * k + 1];
+ struct parameterwrapper ** queues = &(paramqueues[corenum][taskindex][paramindex]);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(taskindex);
+ raw_test_pass_reg(paramindex);
+#endif
+ enqueueObject_I(obj, queues, 1);
+ }
+ removeItem(&objqueue, objitem);
+ releasereadlock_I(obj);
+ RUNFREE(objInfo->queues);
+ RUNFREE(objInfo);
+ /*enqueueObject_I(obj, NULL, 0);
+ removeItem(&objqueue, objitem);
+ releasereadlock_I(obj);*/
+ } else {
+ // can not get lock
+ // put it at the end of the queue
+ // and try to execute active tasks already enqueued first
+ removeItem(&objqueue, objitem);
+ addNewItem_I(&objqueue, objInfo);
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+ break;
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+#ifdef RAWDEBUG
+ raw_test_pass(0xee0e);
+#endif
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xee0f);
+#endif
+
+ if(!tocontinue) {
+ // check if stop
+ if(STARTUPCORE == corenum) {
+ if(isfirst) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xee10);
+#endif
+ isfirst = false;
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+ corestatus[corenum] = 0;
+ numsendobjs[corenum] = self_numsendobjs;
+ numreceiveobjs[corenum] = self_numreceiveobjs;
+ // check the status of all cores
+ allStall = true;
+#ifdef RAWDEBUG
+ raw_test_pass_reg(NUMCORES);
+#endif
+ for(i = 0; i < NUMCORES; ++i) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xe000 + corestatus[i]);
+#endif
+ if(corestatus[i] != 0) {
+ allStall = false;
+ break;
+ }
+ }
+ if(allStall) {
+ // check if the sum of send objs and receive obj are the same
+ // yes->terminate
+ // no->go on executing
+ sumsendobj = 0;
+ for(i = 0; i < NUMCORES; ++i) {
+ sumsendobj += numsendobjs[i];
+#ifdef RAWDEBUG
+ raw_test_pass(0xf000 + numsendobjs[i]);
+#endif
+ }
+ for(i = 0; i < NUMCORES; ++i) {
+ sumsendobj -= numreceiveobjs[i];
+#ifdef RAWDEBUG
+ raw_test_pass(0xf000 + numreceiveobjs[i]);
+#endif
+ }
+ if(0 == sumsendobj) {
+ // terminate
+#ifdef RAWDEBUG
+ raw_test_pass(0xee11);
+#endif
+ raw_test_pass(raw_get_cycle());
+ raw_test_done(1); // All done.
+ }
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+ } else {
+ if(!sendStall) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xee12);
+#endif
+ if(isfirst) {
+ // wait for some time
+ int halt = 10000;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee13);
+#endif
+ while(halt--){}
+ isfirst = false;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee14);
+#endif
+ } else {
+ // send StallMsg to startup core
+#ifdef RAWDEBUG
+ raw_test_pass(0xee15);
+#endif
+ sendStall = transStallMsg(STARTUPCORE);
+ isfirst = true;
+ }
+ } else {
+ isfirst = true;
+#ifdef RAWDEBUG
+ raw_test_pass(0xee16);
+#endif
+ }
+ }
+ }
+ }
+ }
+#elif defined THREADSIMULATE
/* Start executing the tasks */
executetasks();
-#ifdef THREADSIMULATE
-
int i = 0;
// check if there are new objects coming
bool sendStall = false;
}
if(0 == sumsendobj) {
// terminate
- // TODO
- /* for(i = 0; i < NUMCORES; ++i) {
- if(i != corenum) {
- transTerminateMsg(i);
- }
- }
- mq_close(mqd[corenum]);*/
// release all locks
+ int rc_tbl = pthread_rwlock_wrlock(&rwlock_tbl);
+ printf("[run, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc_tbl, strerror(rc_tbl));
struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
while(0 != RunhasNext(it_lock)) {
int key = Runkey(it_lock);
pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
int rc_des = pthread_rwlock_destroy(rwlock_obj);
printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
+ RUNFREE(rwlock_obj);
}
freeRuntimeHash(locktbl);
locktbl = NULL;
/* Set initialized flag for startup object */
flagorandinit(startupobject,1,0xFFFFFFFF);
enqueueObject(startupobject, NULL, 0);
- //enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]);
+#ifdef RAW
+ //flushAll();
+ raw_flush_entire_cache();
+#endif
}
int hashCodetpd(struct taskparamdescriptor *ftd) {
#else
void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
#endif
+ struct ArrayObject * ao=NULL;
struct ___Object___ * tagptr=obj->___tags___;
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb0);
+#endif
if (tagptr==NULL) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb1);
+#endif
obj->___tags___=(struct ___Object___ *)tagd;
} else {
/* Have to check if it is already set */
if (tagptr->type==TAGTYPE) {
struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
- if (td==tagd)
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb2);
+#endif
+ if (td==tagd) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb3);
+#endif
return;
+ }
#ifdef PRECISE_GC
int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
tagd=(struct ___TagDescriptor___ *)ptrarray[3];
td=(struct ___TagDescriptor___ *) obj->___tags___;
#else
- struct ArrayObject * ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb4);
+#endif
+ ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
+#endif
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb5);
#endif
ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
obj->___tags___=(struct ___Object___ *) ao;
ao->___cachedCode___=2;
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb6);
+#endif
} else {
/* Array Case */
int i;
struct ArrayObject *ao=(struct ArrayObject *) tagptr;
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb7);
+#endif
for(i=0;i<ao->___cachedCode___;i++) {
struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
- if (td==tagd)
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb8);
+#endif
+ if (td==tagd) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xebb9);
+#endif
return;
+ }
}
if (ao->___cachedCode___<ao->___length___) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xebba);
+#endif
ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
ao->___cachedCode___++;
+#ifdef RAWDEBUG
+ raw_test_pass(0xebbb);
+#endif
} else {
#ifdef PRECISE_GC
int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
ao=(struct ArrayObject *)obj->___tags___;
#else
struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
+#endif
+#ifdef RAWDEBUG
+ raw_test_pass(0xebbc);
#endif
aonew->___cachedCode___=ao->___length___+1;
for(i=0;i<ao->___length___;i++) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xebbd);
+#endif
ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
}
+#ifdef RAWDEBUG
+ raw_test_pass(0xebbe);
+#endif
ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
+#ifdef RAWDEBUG
+ raw_test_pass(0xebbf);
+#endif
}
}
}
{
struct ___Object___ * tagset=tagd->flagptr;
+#ifdef RAWDEBUG
+ raw_test_pass(0xb008);
+#endif
if(tagset==NULL) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xb009);
+#endif
tagd->flagptr=obj;
} else if (tagset->type!=OBJECTARRAYTYPE) {
#ifdef PRECISE_GC
ARRAYSET(ao, struct ___Object___ *, 1, obj);
ao->___cachedCode___=2;
tagd->flagptr=(struct ___Object___ *)ao;
+#ifdef RAWDEBUG
+ raw_test_pass(0xb00a);
+#endif
} else {
struct ArrayObject *ao=(struct ArrayObject *) tagset;
if (ao->___cachedCode___<ao->___length___) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xb00b);
+#endif
ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
} else {
int i;
}
ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
tagd->flagptr=(struct ___Object___ *) aonew;
+#ifdef RAWDEBUG
+ raw_test_pass(0xb00c);
+#endif
}
}
}
if ((struct ___TagDescriptor___ *)tagptr==tagd)
obj->___tags___=NULL;
else
+#ifndef RAW
printf("ERROR 1 in tagclear\n");
+#endif
+ ;
} else {
struct ArrayObject *ao=(struct ArrayObject *) tagptr;
int i;
goto PROCESSCLEAR;
}
}
+#ifndef RAW
printf("ERROR 2 in tagclear\n");
+#endif
+ ;
}
PROCESSCLEAR:
{
if (tagset==obj)
tagd->flagptr=NULL;
else
+#ifndef RAW
printf("ERROR 3 in tagclear\n");
+#endif
+ ;
} else {
struct ArrayObject *ao=(struct ArrayObject *) tagset;
int i;
goto ENDCLEAR;
}
}
+#ifndef RAW
printf("ERROR 4 in tagclear\n");
+#endif
}
}
ENDCLEAR:
int oldflag=((int *)ptr)[1];
int flag=ormask|oldflag;
flag&=andmask;
+#ifdef RAWDEBUG
+ raw_test_pass_reg((int)ptr);
+ raw_test_pass(0xaa000000 + oldflag);
+ raw_test_pass(0xaa000000 + flag);
+#endif
flagbody(ptr, flag, queues, length, false);
}
}
int oldflag=((int *)ptr)[1];
int flag=ormask|oldflag;
flag&=andmask;
+#ifdef RAWDEBUG
+ raw_test_pass(0xaa100000 + oldflag);
+ raw_test_pass(0xaa100000 + flag);
+#endif
flagbody(ptr,flag,NULL,0,true);
}
int i = 0;
struct parameterwrapper ** queues = vqueues;
int length = vlength;
+ int next;
+ int UNUSED, UNUSED2;
+ int * enterflags = NULL;
if((!isnew) && (queues == NULL)) {
#ifdef THREADSIMULATE
int numofcore = pthread_getspecific(key);
queues = objectqueues[numofcore][ptr->type];
length = numqueues[numofcore][ptr->type];
#else
+#ifdef RAW
+ if(corenum < NUMCORES) {
+#endif
queues = objectqueues[corenum][ptr->type];
length = numqueues[corenum][ptr->type];
+#ifdef RAW
+ } else {
+ return;
+ }
+#endif
#endif
}
ptr->flag=flag;
+#ifdef RAWDEBUG
+ raw_test_pass(0xbb000000 + ptr->flag);
+#endif
/*Remove object from all queues */
for(i = 0; i < length; ++i) {
flagptr = queues[i];
- int next;
- int UNUSED, UNUSED2;
- int * enterflags;
ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
ObjectHashremove(flagptr->objectset, (int)ptr);
if (enterflags!=NULL)
- free(enterflags);
+ RUNFREE(enterflags);
}
}
struct QueueItem *tmpptr;
struct parameterwrapper * parameter=NULL;
int j;
+ int i;
+ struct parameterwrapper * prevptr=NULL;
+ struct ___Object___ *tagptr=NULL;
struct parameterwrapper ** queues = vqueues;
int length = vlength;
+#ifdef RAW
+ if(corenum > NUMCORES - 1) {
+ return;
+ }
+#endif
if(queues == NULL) {
#ifdef THREADSIMULATE
int numofcore = pthread_getspecific(key);
length = numqueues[corenum][ptr->type];
#endif
}
- int i;
- struct parameterwrapper * prevptr=NULL;
- struct ___Object___ *tagptr=ptr->___tags___;
+ tagptr=ptr->___tags___;
/* Outer loop iterates through all parameter queues an object of
this type could be in. */
-
- for(j = 0; j < length; ++j) {
+ for(j = 0; j < length; ++j) {
parameter = queues[j];
/* Check tags */
if (parameter->numbertags>0) {
int andmask=parameter->intarray[i*2];
int checkmask=parameter->intarray[i*2+1];
if ((ptr->flag&andmask)==checkmask) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xcc000000 + andmask);
+ raw_test_pass_reg((int)ptr);
+ raw_test_pass(0xcc000000 + ptr->flag);
+ raw_test_pass(0xcc000000 + checkmask);
+#endif
enqueuetasks(parameter, prevptr, ptr, NULL, 0);
prevptr=parameter;
break;
}
}
-// transfer an object to targetcore
-// format: object
-void transferObject(void * obj, int targetcore) {
- int type=((int *)obj)[0];
- assert(type < NUMCLASSES); // can only transfer normal object
- int size=classsize[type];
-
#ifdef RAW
-
-#elif defined THREADSIMULATE
-#if 0
- // use shared memory to transfer objects between cores
- int fd = 0; // mapped file
- void * p_map = NULL;
- char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
- int offset;
- // open the file
- fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
- offset = lseek(fd, 0, SEEK_CUR);
- if(offset == -1) {
- printf("fail to open file " + filepath + " in transferObject.\n");
- fflush(stdout);
- exit(-1);
+void enqueueObject_I(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
+ struct ___Object___ *ptr = (struct ___Object___ *)vptr;
+
+ {
+ struct QueueItem *tmpptr;
+ struct parameterwrapper * parameter=NULL;
+ int j;
+ int i;
+ struct parameterwrapper * prevptr=NULL;
+ struct ___Object___ *tagptr=NULL;
+ struct parameterwrapper ** queues = vqueues;
+ int length = vlength;
+#ifdef RAW
+ if(corenum > NUMCORES - 1) {
+ return;
}
- lseek(fd, size + sizeof(int)*2, SEEK_CUR);
- write(fd, "", 1);
- p_map = (void *)mmap(NULL,size+sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
- close(fd);
- memcpy(p_map, type, sizeof(int));
- memcpy(p_map+sizeof(int), corenum, sizeof(int));
- memcpy((p_map+sizeof(int)*2), obj, size);
- munmap(p_map, size+sizeof(int)*2);
- //printf( "umap ok \n" );
#endif
-
- int numofcore = pthread_getspecific(key);
-
- // use POSIX message queue to transfer objects between cores
- mqd_t mqdnum;
- char corenumstr[3];
- int sourcelen = 0;
- if(targetcore < 10) {
- corenumstr[0] = targetcore + '0';
- corenumstr[1] = '\0';
- sourcelen = 1;
- } else if(targetcore < 100) {
- corenumstr[1] = targetcore % 10 + '0';
- corenumstr[0] = (targetcore / 10) + '0';
- corenumstr[2] = '\0';
- sourcelen = 2;
- } else {
- printf("Error: targetcore >= 100\n");
- fflush(stdout);
- exit(-1);
- }
- char * pathhead = "/msgqueue_";
- int targetlen = strlen(pathhead);
- char path[targetlen + sourcelen + 1];
- strcpy(path, pathhead);
- strncat(path, corenumstr, sourcelen);
- int oflags = O_WRONLY|O_NONBLOCK;
- int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
- mqdnum = mq_open(path, oflags, omodes, NULL);
- if(mqdnum==-1) {
- printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
- fflush(stdout);
- exit(-1);
- }
- struct ___Object___ * newobj = (struct ___Object___ *)obj;
- if(0 == newobj->isolate) {
- newobj = RUNMALLOC(size);
- memcpy(newobj, obj, size);
- newobj->original=obj;
+ if(queues == NULL) {
+#ifdef THREADSIMULATE
+ int numofcore = pthread_getspecific(key);
+ queues = objectqueues[numofcore][ptr->type];
+ length = numqueues[numofcore][ptr->type];
+#else
+ queues = objectqueues[corenum][ptr->type];
+ length = numqueues[corenum][ptr->type];
+#endif
}
- int ret;
- do {
- ret=mq_send(mqdnum, (void *)newobj, size, 0); // send the object into the queue
- if(ret != 0) {
- printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
- }
- }while(ret!=0);
- if(numofcore == STARTUPCORE) {
- ++numsendobjs[numofcore];
- } else {
- ++(thread_data_array[numofcore].numsendobjs);
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa1);
+ raw_test_pass_reg(queues);
+ raw_test_pass_reg(length);
+#endif
+ tagptr=ptr->___tags___;
+
+ /* Outer loop iterates through all parameter queues an object of
+ this type could be in. */
+ for(j = 0; j < length; ++j) {
+ parameter = queues[j];
+ /* Check tags */
+ if (parameter->numbertags>0) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa2);
+ raw_test_pass_reg(tagptr);
+#endif
+ if (tagptr==NULL)
+ goto nextloop;//that means the object has no tag but that param needs tag
+ else if(tagptr->type==TAGTYPE) {//one tag
+ struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa3);
+#endif
+ for(i=0;i<parameter->numbertags;i++) {
+ //slotid is parameter->tagarray[2*i];
+ int tagid=parameter->tagarray[2*i+1];
+ if (tagid!=tagptr->flag) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa4);
+#endif
+ goto nextloop; /*We don't have this tag */
+ }
+ }
+ } else {//multiple tags
+ struct ArrayObject * ao=(struct ArrayObject *) tagptr;
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa5);
+#endif
+ for(i=0;i<parameter->numbertags;i++) {
+ //slotid is parameter->tagarray[2*i];
+ int tagid=parameter->tagarray[2*i+1];
+ int j;
+ for(j=0;j<ao->___cachedCode___;j++) {
+ if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) {
+ goto foundtag;
+ }
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa6);
+#endif
+ goto nextloop;
+ foundtag:
+ ;
+ }
}
- printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
+ }
+
+ /* Check flags */
+ for(i=0;i<parameter->numberofterms;i++) {
+ int andmask=parameter->intarray[i*2];
+ int checkmask=parameter->intarray[i*2+1];
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa7);
+ raw_test_pass(0xcc000000 + andmask);
+ raw_test_pass_reg(ptr);
+ raw_test_pass(0xcc000000 + ptr->flag);
+ raw_test_pass(0xcc000000 + checkmask);
+#endif
+ if ((ptr->flag&andmask)==checkmask) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xeaa8);
#endif
+ enqueuetasks_I(parameter, prevptr, ptr, NULL, 0);
+ prevptr=parameter;
+ break;
+ }
+ }
+ nextloop:
+ ;
+ }
+ }
}
-// send terminate message to targetcore
-// format: -1
-bool transStallMsg(int targetcore) {
- struct ___Object___ newobj;
- // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
- newobj.type = -1;
+// helper function to compute the coordinates of a core from the core number
+void calCoords(int core_num, int* coordY, int* coordX) {
+ *coordX = core_num % 4;
+ *coordY = core_num / 4;
+}
+#endif
+
+/* Message format for RAW version:
+ * type + Msgbody
+ * type: 0 -- transfer object
+ * 1 -- transfer stall msg
+ * 2 -- lock request
+ * 3 -- lock grount
+ * 4 -- lock deny
+ * 5 -- lock release
+ *
+ * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
+ * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
+ * LockMsg: 2 + lock type + obj pointer + request core (size is always 4 * sizeof(int))
+ * 3/4/5 + lock type + obj pointer (size is always 3 * sizeof(int))
+ * lock type: 0 -- read; 1 -- write
+ */
+
+// transfer an object to targetcore
+// format: object
+void transferObject(struct transObjInfo * transObj) {
+ void * obj = transObj->objptr;
+ int type=((int *)obj)[0];
+ int size=classsize[type];
+ int targetcore = transObj->targetcore;
+ //assert(type < NUMCLASSES); // can only transfer normal object
#ifdef RAW
- newobj.flag = corenum;
- newobj.___cachedHash___ = thread_data_array[corenum].numsendobjs;
- newobj.___cachedCode___ = thread_data_array[corenum].numreceiveobjs;
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ //int isshared = 0;
+ // for 32 bit machine, the size of fixed part is always 3 words
+ //int msgsize = sizeof(int) * 2 + sizeof(void *);
+ int msgsize = 3 + transObj->length * 2;
+ int i = 0;
-#elif defined THREADSIMULATE
- int numofcore = pthread_getspecific(key);
- newobj.flag = numofcore;
- newobj.___cachedHash___ = thread_data_array[numofcore].numsendobjs;
- newobj.___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
-#if 0
- // use shared memory to transfer objects between cores
- int fd = 0; // mapped file
- void * p_map = NULL;
- char * filepath = "/scratch/transObj/file_" + targetcore + ".txt";
- int offset;
- // open the file
- fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file
- offset = lseek(fd, 0, SEEK_CUR);
- if(offset == -1) {
- printf("fail to open file " + filepath + " in transferObject.\n");
- fflush(stdout);
- exit(-1);
+ struct ___Object___ * newobj = (struct ___Object___ *)obj;
+ /*if(0 == newobj->isolate) {
+ isshared = 1;
+ }*/
+
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(0);
+#ifdef RAWDEBUG
+ raw_test_pass(0);
+#endif
+ gdn_send(msgsize);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(msgsize);
+#endif
+ gdn_send(obj);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(obj);
+#endif
+ for(i = 0; i < transObj->length; ++i) {
+ int taskindex = transObj->queues[2*i];
+ int paramindex = transObj->queues[2*i+1];
+ gdn_send(taskindex);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(taskindex);
+#endif
+ gdn_send(paramindex);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(paramindex);
+#endif
}
- lseek(fd, sizeof(int)*2, SEEK_CUR);
- write(fd, "", 1);
- p_map = (void *)mmap(NULL,sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset);
- close(fd);
- memcpy(p_map, type, sizeof(int));
- memcpy(p_map+sizeof(int), corenum, sizeof(int));
- munmap(p_map, sizeof(int)*2);
- //printf( "umap ok \n" );
+#ifdef RAWDEBUG
+ raw_test_pass(0xffff);
#endif
+ ++(self_numsendobjs);
+#elif defined THREADSIMULATE
+ int numofcore = pthread_getspecific(key);
- // use POSIX message queue to send stall msg to startup core
- assert(targetcore == STARTUPCORE);
+ // use POSIX message queue to transfer objects between cores
mqd_t mqdnum;
char corenumstr[3];
int sourcelen = 0;
int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
mqdnum = mq_open(path, oflags, omodes, NULL);
if(mqdnum==-1) {
- printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
+ printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
fflush(stdout);
exit(-1);
}
+ /*struct ___Object___ * newobj = (struct ___Object___ *)obj;
+ if(0 == newobj->isolate) {
+ newobj = RUNMALLOC(size);
+ memcpy(newobj, obj, size);
+ newobj->original=obj;
+ }*/
+ struct transObjInfo * tmptransObj = RUNMALLOC(sizeof(struct transObjInfo));
+ memcpy(tmptransObj, transObj, sizeof(struct transObjInfo));
+ int * tmpqueue = RUNMALLOC(sizeof(int)*2*tmptransObj->length);
+ memcpy(tmpqueue, tmptransObj->queues, sizeof(int)*2*tmptransObj->length);
+ tmptransObj->queues = tmpqueue;
+ struct ___Object___ * newobj = RUNMALLOC(sizeof(struct ___Object___));
+ newobj->type = ((struct ___Object___ *)obj)->type;
+ newobj->original = (struct ___Object___ *)tmptransObj;
int ret;
- ret=mq_send(mqdnum, (void *)&newobj, sizeof(struct ___Object___), 0); // send the object into the queue
- if(ret != 0) {
- printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
- return false;
+ do {
+ ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
+ if(ret != 0) {
+ printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
+ }
+ }while(ret!=0);
+ RUNFREE(newobj);
+ if(numofcore == STARTUPCORE) {
+ ++numsendobjs[numofcore];
} else {
- printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
- printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj.flag, newobj.___cachedHash___, newobj.___cachedCode___);
- return true;
+ ++(thread_data_array[numofcore].numsendobjs);
}
+ printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
#endif
}
-#if 0
+
// send terminate message to targetcore
// format: -1
-void transTerminateMsg(int targetcore) {
- // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
- int type = -2;
-
+bool transStallMsg(int targetcore) {
#ifdef RAW
-
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ // for 32 bit machine, the size is always 4 words
+ //int msgsize = sizeof(int) * 4;
+ int msgsize = 4;
+
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(1);
+#ifdef RAWDEBUG
+ raw_test_pass(1);
+#endif
+ gdn_send(corenum);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(corenum);
+#endif
+ gdn_send(self_numsendobjs);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(self_numsendobjs);
+#endif
+ gdn_send(self_numreceiveobjs);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(self_numreceiveobjs);
+ raw_test_pass(0xffff);
+#endif
+ return true;
#elif defined THREADSIMULATE
+ struct ___Object___ *newobj = RUNMALLOC(sizeof(struct ___Object___));
+ // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
+ newobj->type = -1;
+ int numofcore = pthread_getspecific(key);
+ newobj->flag = numofcore;
+ newobj->___cachedHash___ = thread_data_array[numofcore].numsendobjs;
+ newobj->___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
// use POSIX message queue to send stall msg to startup core
- assert(targetcore != STARTUPCORE);
+ assert(targetcore == STARTUPCORE);
mqd_t mqdnum;
char corenumstr[3];
int sourcelen = 0;
corenumstr[0] = targetcore + '0';
corenumstr[1] = '\0';
sourcelen = 1;
- } else if(corenum < 100) {
+ } else if(targetcore < 100) {
corenumstr[1] = targetcore % 10 + '0';
corenumstr[0] = (targetcore / 10) + '0';
corenumstr[2] = '\0';
int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
mqdnum = mq_open(path, oflags, omodes, NULL);
if(mqdnum==-1) {
- printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno));
+ printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
fflush(stdout);
exit(-1);
}
int ret;
- do {
- ret=mq_send(mqdnum, (void *)&type, sizeof(int), 0); // send the object into the queue
- if(ret != 0) {
- printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno));
- }
- }while(ret != 0);
+ ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
+ if(ret != 0) {
+ printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
+ RUNFREE(newobj);
+ return false;
+ } else {
+ printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
+ printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj->flag, newobj->___cachedHash___, newobj->___cachedCode___);
+ RUNFREE(newobj);
+ return true;
+ }
#endif
}
-#endif
+
// receive object transferred from other cores
// or the terminate message from other cores
-// format: type [+ object]
+// NOTICE: following format is for threadsimulate version only
+// RAW version please see previous description
+// format: type + object
// type: -1--stall msg
// !-1--object
// return value: 0--received an object
// 1--received nothing
// 2--received a Stall Msg
+// 3--received a lock Msg
+// RAW version: -1 -- received nothing
+// otherwise -- received msg type
int receiveObject() {
#ifdef RAW
-
-#elif defined THREADSIMULATE
-#if 0
- char * filepath = "/scratch/transObj/file_" + corenum + ".txt";
- int fd = 0;
- void * p_map = NULL;
- int type = 0;
- int sourcecorenum = 0;
- int size = 0;
- fd = open(filepath, O_CREAT|O_RDONLY, 00777);
- lseek(fd, offset_transObj, SEEK_SET);
- p_map = (void*)mmap(NULL,sizeof(int)*2,PROT_READ,MAP_SHARED,fd,offset_transObj);
- type = *(int*)p_map;
- sourcecorenum = *(int*)(p_map+sinzeof(int));
- offset_transObj += sizeof(int)*2;
- munmap(p_map,sizeof(int)*2);
- if(type == -1) {
- // sourecorenum has terminated
- ++offset_transObj;
- return;
+ bool deny = false;
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = 0;
+ if(gdn_input_avail() == 0) {
+#ifdef RAWDEBUG
+ if(corenum < NUMCORES) {
+ raw_test_pass(0xd001);
+ }
+#endif
+ return -1;
}
- size = classsize[type];
- p_map = (void*)mmap(NULL,size,PROT_READ,MAP_SHARED,fd,offset_transObj);
- struct ___Object___ * newobj=RUNMALLOC(size);
- memcpy(newobj, p_map, size);
- ++offset_transObj;
- enqueueObject(newobj,NULL,0);
+msg:
+#ifdef RAWDEBUG
+ raw_test_pass(0xcccc);
+#endif
+ while((gdn_input_avail() != 0) && (msgdataindex < msglength)) {
+ msgdata[msgdataindex] = gdn_receive();
+ if(msgdataindex == 0) {
+ if(msgdata[0] > 2) {
+ msglength = 3;
+ } else if(msgdata[0] > 0) {
+ msglength = 4;
+ }
+ } else if((msgdataindex == 1) && (msgdata[0] == 0)) {
+ msglength = msgdata[msgdataindex];
+ }
+#ifdef RAWDEBUG
+ raw_test_pass_reg(msgdata[msgdataindex]);
+#endif
+ msgdataindex++;
+
+ /*if(msgdataindex == 0) {
+ // type
+ msgtype = gdn_receive();
+ if(msgtype > 2) {
+ msglength = 3;
+ } else {
+ msglength = 4;
+ }
+ if(msgtype != 0) {
+ msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
+ msgdata[msgdataindex] = msgtype;
+ }
+#ifdef RAWDEBUG
+ raw_test_pass_reg(msgtype);
+#endif
+ } else if((msgdataindex == 1) && (msgtype == 0)) {
+ // object transfer msg
+ msglength = gdn_receive();
+ msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
+ msgdata[0] = msgtype;
+ msgdata[msgdataindex] = msglength;
+#ifdef RAWDEBUG
+ raw_test_pass_reg(msgdata[msgdataindex]);
+#endif
+ } else {
+ msgdata[msgdataindex] = gdn_receive();
+#ifdef RAWDEBUG
+ raw_test_pass_reg(msgdata[msgdataindex]);
+#endif
+ }
+ msgdataindex++;*/
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xffff);
+#endif
+ if(msgdataindex == msglength) {
+ // received a whole msg
+ int type, data1, data2; // will receive at least 3 words including type
+ type = msgdata[0];
+ data1 = msgdata[1];
+ data2 = msgdata[2];
+ switch(type) {
+ case 0: {
+ // receive a object transfer msg
+ struct transObjInfo * transObj = RUNMALLOC_I(sizeof(struct transObjInfo));
+ int k = 0;
+ if(corenum > NUMCORES - 1) {
+ raw_test_done(0xa00a);
+ }
+ // store the object and its corresponding queue info, enqueue it later
+ transObj->objptr = (void *)data2; // data1 is now size of the msg
+ transObj->length = (msglength - 3) / 2;
+ transObj->queues = RUNMALLOC_I(sizeof(int)*(msglength - 3));
+ for(k = 0; k < transObj->length; ++k) {
+ transObj->queues[2*k] = msgdata[3+2*k];
+#ifdef RAWDEBUG
+ raw_test_pass_reg(transObj->queues[2*k]);
#endif
+ transObj->queues[2*k+1] = msgdata[3+2*k+1];
+#ifdef RAWDEBUG
+ raw_test_pass_reg(transObj->queues[2*k+1]);
+#endif
+ }
+ //memcpy(transObj->queues, msgdata[3], sizeof(int)*(msglength - 3));
+ addNewItem_I(&objqueue, (void *)transObj);
+ ++(self_numreceiveobjs);
+#ifdef RAWDEBUG
+ raw_test_pass(0xe881);
+#endif
+ /*
+ addNewItem_I(&objqueue, (void *)data2);
+ ++(self_numreceiveobjs);
+#ifdef RAWDEBUG
+ raw_test_pass(0xe881);
+#endif
+ */
+ break;
+ }
+ case 1: {
+ // receive a stall msg
+ if(corenum != STARTUPCORE) {
+ // non startup core can not receive stall msg
+ // return -1
+ raw_test_done(0xa001);
+ }
+ if(data1 < NUMCORES) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xe882);
+#endif
+ corestatus[data1] = 0;
+ numsendobjs[data1] = data2;
+ numreceiveobjs[data1] = msgdata[3];
+ }
+ break;
+ }
+ case 2: {
+ // receive lock request msg
+ // for 32 bit machine, the size is always 3 words
+ //int msgsize = sizeof(int) * 3;
+ int msgsize = 3;
+ // lock request msg, handle it right now
+ // check to see if there is a lock exist in locktbl for the required obj
+ int data3 = msgdata[3];
+ deny = false;
+ if(!RuntimeHashcontainskey(locktbl, data2)) {
+ // no locks for this object
+ // first time to operate on this shared object
+ // create a lock for it
+ // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
+#ifdef RAWDEBUG
+ raw_test_pass(0xe883);
+#endif
+ if(data1 == 0) {
+ RuntimeHashadd_I(locktbl, data2, 1);
+ } else {
+ RuntimeHashadd_I(locktbl, data2, -1);
+ }
+ } else {
+ int rwlock_obj = 0;
+#ifdef RAWDEBUG
+ raw_test_pass(0xe884);
+#endif
+ RuntimeHashget(locktbl, data2, &rwlock_obj);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ if(0 == rwlock_obj) {
+ if(data1 == 0) {
+ rwlock_obj = 1;
+ } else {
+ rwlock_obj = -1;
+ }
+ RuntimeHashremovekey(locktbl, data2);
+ RuntimeHashadd_I(locktbl, data2, rwlock_obj);
+ } else if((rwlock_obj > 0) && (data1 == 0)) {
+ // read lock request and there are only read locks
+ rwlock_obj++;
+ RuntimeHashremovekey(locktbl, data2);
+ RuntimeHashadd_I(locktbl, data2, rwlock_obj);
+ } else {
+ deny = true;
+ }
+#ifdef RAWDEBUG
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ }
+ targetcore = data3;
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ if(deny == true) {
+ // deny the lock request
+ gdn_send(4); // lock request
+#ifdef RAWDEBUG
+ raw_test_pass(4);
+#endif
+ } else {
+ // grount the lock request
+ gdn_send(3); // lock request
+#ifdef RAWDEBUG
+ raw_test_pass(3);
+#endif
+ }
+ gdn_send(data1); // lock type
+#ifdef RAWDEBUG
+ raw_test_pass_reg(data1);
+#endif
+ gdn_send(data2); // lock target
+#ifdef RAWDEBUG
+ raw_test_pass_reg(data2);
+ raw_test_pass(0xffff);
+#endif
+ break;
+ }
+ case 3: {
+ // receive lock grount msg
+ if(corenum > NUMCORES - 1) {
+ raw_test_done(0xa00b);
+ }
+ if(lockobj == data2) {
+ lockresult = 1;
+ lockflag = true;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+ } else {
+ // conflicts on lockresults
+ raw_test_done(0xa002);
+ }
+ break;
+ }
+ case 4: {
+ // receive lock grount/deny msg
+ if(corenum > NUMCORES - 1) {
+ raw_test_done(0xa00c);
+ }
+ if(lockobj == data2) {
+ lockresult = 0;
+ lockflag = true;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+ } else {
+ // conflicts on lockresults
+ raw_test_done(0xa003);
+ }
+ break;
+ }
+ case 5: {
+ // receive lock release msg
+ if(!RuntimeHashcontainskey(locktbl, data2)) {
+ // no locks for this object, something is wrong
+ raw_test_done(0xa004);
+ } else {
+ int rwlock_obj = 0;
+ RuntimeHashget(locktbl, data2, &rwlock_obj);
+#ifdef RAWDEBUG
+ raw_test_pass(0xe885);
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ if(data1 == 0) {
+ rwlock_obj--;
+ } else {
+ rwlock_obj++;
+ }
+ RuntimeHashremovekey(locktbl, data2);
+ RuntimeHashadd_I(locktbl, data2, rwlock_obj);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ //RUNFREE(msgdata);
+ //msgdata = NULL;
+ for(msgdataindex--;msgdataindex > 0; --msgdataindex) {
+ msgdata[msgdataindex] = -1;
+ }
+ msgtype = -1;
+ //msgdataindex = 0;
+ msglength = 30;
+#ifdef RAWDEBUG
+ raw_test_pass(0xe886);
+#endif
+ if(gdn_input_avail() != 0) {
+ goto msg;
+ }
+ return type;
+ } else {
+ // not a whole msg
+#ifdef RAWDEBUG
+ raw_test_pass(0xe887);
+#endif
+ return -2;
+ }
+#elif defined THREADSIMULATE
int numofcore = pthread_getspecific(key);
// use POSIX message queue to transfer object
int msglen = 0;
} else {
++(thread_data_array[numofcore].numreceiveobjs);
}
- struct ___Object___ * newobj=RUNMALLOC(msglen);
- memcpy(newobj, msgptr, msglen);
- free(msgptr);
- enqueueObject(newobj, NULL, 0);
+ struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
+ struct transObjInfo * transObj = (struct transObjInfo *)tmpptr->original;
+ tmpptr = (struct ___Object___ *)(transObj->objptr);
+ int type = tmpptr->type;
+ int size=classsize[type];
+ struct ___Object___ * newobj=RUNMALLOC(size);
+ memcpy(newobj, tmpptr, size);
+ if(0 == newobj->isolate) {
+ newobj->original=tmpptr;
+ }
+ RUNFREE(msgptr);
+ tmpptr = NULL;
+ int k = 0;
+ for(k = 0; k < transObj->length; ++k) {
+ int taskindex = transObj->queues[2 * k];
+ int paramindex = transObj->queues[2 * k + 1];
+ struct parameterwrapper ** queues = &(paramqueues[numofcore][taskindex][paramindex]);
+ enqueueObject(newobj, queues, 1);
+ }
+ RUNFREE(transObj->queues);
+ RUNFREE(transObj);
return 0;
}
#endif
bool getreadlock(void * ptr) {
#ifdef RAW
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = ((int)ptr >> 5) % TOTALCORE;
+ // for 32 bit machine, the size is always 4 words
+ //int msgsize = sizeof(int) * 4;
+ int msgsize = 4;
+
+ lockobj = (int)ptr;
+ lockflag = false;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+ lockresult = 0;
+ if(targetcore == corenum) {
+ // reside on this core
+ bool deny = false;
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // no locks for this object
+ // first time to operate on this shared object
+ // create a lock for it
+ // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
+ RuntimeHashadd_I(locktbl, (int)ptr, 1);
+ } else {
+ int rwlock_obj = 0;
+ RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
+ if(-1 != rwlock_obj) {
+ rwlock_obj++;
+ RuntimeHashremovekey(locktbl, (int)ptr);
+ RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
+ } else {
+ deny = true;
+ }
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+ if(lockobj == (int)ptr) {
+ if(deny) {
+ lockresult = 0;
+ } else {
+ lockresult = 1;
+ }
+ lockflag = true;
+#ifndef INTERRUPT
+ reside = true;
+#endif
+ } else {
+ // conflicts on lockresults
+ raw_test_done(0xa005);
+ }
+ return true;
+ }
+
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(2); // lock request
+ #ifdef RAWDEBUG
+ raw_test_pass(2);
+#endif
+ gdn_send(0); // read lock
+#ifdef RAWDEBUG
+ raw_test_pass(0);
+#endif
+ gdn_send(ptr);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(ptr);
+#endif
+ gdn_send(corenum);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(corenum);
+ raw_test_pass(0xffff);
+#endif
+ return true;
#elif defined THREADSIMULATE
int numofcore = pthread_getspecific(key);
int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
- if(EBUSY == rc) {
+ if(0 != rc) {
return false;
}
if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
rc = pthread_rwlock_trywrlock(&rwlock_tbl);
printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
- if(EBUSY == rc) {
+ if(0 != rc) {
+ RUNFREE(rwlock);
return false;
} else {
- RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // check again
+ RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
+ } else {
+ RUNFREE(rwlock);
+ RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
+ }
rc = pthread_rwlock_unlock(&rwlock_tbl);
- printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
+ printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
}
- //rc = pthread_rwlock_rdlock(&rwlock);
rc = pthread_rwlock_tryrdlock(rwlock);
printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
- if(EBUSY == rc) {
+ if(0 != rc) {
return false;
} else {
return true;
RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
rc = pthread_rwlock_unlock(&rwlock_tbl);
printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
- //int rc_obj = pthread_rwlock_rdlock(&rwlock_obj);
int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
- if(EBUSY == rc_obj) {
+ if(0 != rc_obj) {
return false;
} else {
return true;
void releasereadlock(void * ptr) {
#ifdef RAW
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = ((int)ptr >> 5) % TOTALCORE;
+ // for 32 bit machine, the size is always 3 words
+ //int msgsize = sizeof(int) * 3;
+ int msgsize = 3;
+
+ if(targetcore == corenum) {
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+ // reside on this core
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // no locks for this object, something is wrong
+ raw_test_done(0xa006);
+ } else {
+ int rwlock_obj = 0;
+ RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
+ rwlock_obj--;
+ RuntimeHashremovekey(locktbl, (int)ptr);
+ RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+ return;
+ }
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(5); // lock release
+#ifdef RAWDEBUG
+ raw_test_pass(5);
+#endif
+ gdn_send(0); // read lock
+#ifdef RAWDEBUG
+ raw_test_pass(0);
+#endif
+ gdn_send(ptr);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(ptr);
+ raw_test_pass(0xffff);
+#endif
#elif defined THREADSIMULATE
int numofcore = pthread_getspecific(key);
int rc = pthread_rwlock_rdlock(&rwlock_tbl);
#endif
}
+#ifdef RAW
+bool getreadlock_I(void * ptr) {
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = ((int)ptr >> 5) % TOTALCORE;
+ // for 32 bit machine, the size is always 4 words
+ //int msgsize = sizeof(int) * 4;
+ int msgsize = 4;
+
+ lockobj = (int)ptr;
+ lockflag = false;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+ lockresult = 0;
+
+ if(targetcore == corenum) {
+ // reside on this core
+ bool deny = false;
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // no locks for this object
+ // first time to operate on this shared object
+ // create a lock for it
+ // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
+ RuntimeHashadd_I(locktbl, (int)ptr, 1);
+ } else {
+ int rwlock_obj = 0;
+ RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
+ if(-1 != rwlock_obj) {
+ rwlock_obj++;
+ RuntimeHashremovekey(locktbl, (int)ptr);
+ RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
+ } else {
+ deny = true;
+ }
+ }
+ if(lockobj == (int)ptr) {
+ if(deny) {
+ lockresult = 0;
+ } else {
+ lockresult = 1;
+ }
+ lockflag = true;
+#ifndef INTERRUPT
+ reside = true;
+#endif
+ } else {
+ // conflicts on lockresults
+ raw_test_done(0xa005);
+ }
+ return true;
+ }
+
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(2); // lock request
+#ifdef RAWDEBUG
+ raw_test_pass(2);
+#endif
+ gdn_send(0); // read lock
+#ifdef RAWDEBUG
+ raw_test_pass(0);
+#endif
+ gdn_send(ptr);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(ptr);
+#endif
+ gdn_send(corenum);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(corenum);
+ raw_test_pass(0xffff);
+#endif
+ return true;
+}
+
+void releasereadlock_I(void * ptr) {
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = ((int)ptr >> 5) % TOTALCORE;
+ // for 32 bit machine, the size is always 3 words
+ //int msgsize = sizeof(int) * 3;
+ int msgsize = 3;
+
+ if(targetcore == corenum) {
+ // reside on this core
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // no locks for this object, something is wrong
+ raw_test_done(0xa006);
+ } else {
+ int rwlock_obj = 0;
+ RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
+ rwlock_obj--;
+ RuntimeHashremovekey(locktbl, (int)ptr);
+ RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
+ }
+ return;
+ }
+
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(5); // lock release
+#ifdef RAWDEBUG
+ raw_test_pass(5);
+#endif
+ gdn_send(0); // read lock
+#ifdef RAWDEBUG
+ raw_test_pass(0);
+#endif
+ gdn_send(ptr);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(ptr);
+ raw_test_pass(0xffff);
+#endif
+}
+#endif
+
+// not reentrant
bool getwritelock(void * ptr) {
#ifdef RAW
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = ((int)ptr >> 5) % TOTALCORE;
+ // for 32 bit machine, the size is always 4 words
+ //int msgsize = sizeof(int) * 4;
+ int msgsize= 4;
+ int tc = TOTALCORE;
+#ifdef INTERRUPT
+ //raw_user_interrupts_off();
+#endif
+ //targetcore = ((int)ptr) % tc;
+#ifdef INTERRUPT
+ //raw_user_interrupts_on();
+#endif
+
+#ifdef RAWDEBUG
+ raw_test_pass(0xe551);
+ raw_test_pass_reg(ptr);
+ raw_test_pass_reg(targetcore);
+ raw_test_pass_reg(tc);
+#endif
+
+ lockobj = (int)ptr;
+ lockflag = false;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+ lockresult = 0;
+
+ if(targetcore == corenum) {
+ // reside on this core
+ bool deny = false;
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // no locks for this object
+ // first time to operate on this shared object
+ // create a lock for it
+ // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
+#ifdef RAWDEBUG
+ raw_test_pass(0xe552);
+#endif
+ RuntimeHashadd_I(locktbl, (int)ptr, -1);
+ } else {
+ int rwlock_obj = 0;
+ RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
+#ifdef RAWDEBUG
+ raw_test_pass(0xe553);
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ if(0 == rwlock_obj) {
+ rwlock_obj = -1;
+ RuntimeHashremovekey(locktbl, (int)ptr);
+ RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
+ } else {
+ deny = true;
+ }
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+#ifdef RAWDEBUG
+ raw_test_pass(0xe554);
+ raw_test_pass_reg(lockresult);
+#endif
+ if(lockobj == (int)ptr) {
+ if(deny) {
+ lockresult = 0;
+#ifdef RAWDEBUG
+ raw_test_pass(0);
+#endif
+ } else {
+ lockresult = 1;
+#ifdef RAWDEBUG
+ raw_test_pass(1);
+#endif
+ }
+ lockflag = true;
+#ifndef INTERRUPT
+ reside = true;
+#endif
+ } else {
+ // conflicts on lockresults
+ raw_test_done(0xa007);
+ }
+ return true;
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xe555);
+#endif
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore); // targetcore
+#endif
+ gdn_send(2); // lock request
+#ifdef RAWDEBUG
+ raw_test_pass(2);
+#endif
+ gdn_send(1); // write lock
+#ifdef RAWDEBUG
+ raw_test_pass(1);
+#endif
+ gdn_send(ptr);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(ptr);
+#endif
+ gdn_send(corenum);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(corenum);
+ raw_test_pass(0xffff);
+#endif
+ return true;
#elif defined THREADSIMULATE
int numofcore = pthread_getspecific(key);
int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
- if(EBUSY == rc) {
+ if(0 != rc) {
return false;
}
if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
rc = pthread_rwlock_trywrlock(&rwlock_tbl);
printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
- if(EBUSY == rc) {
+ if(0 != rc) {
+ pthread_rwlock_destroy(rwlock);
+ RUNFREE(rwlock);
return false;
} else {
- RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // check again
+ RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
+ } else {
+ pthread_rwlock_destroy(rwlock);
+ RUNFREE(rwlock);
+ RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
+ }
rc = pthread_rwlock_unlock(&rwlock_tbl);
printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
}
- //rc = pthread_rwlock_wrlock(rwlock);
rc = pthread_rwlock_trywrlock(rwlock);
printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
- if(EBUSY == rc) {
+ if(0 != rc) {
return false;
} else {
return true;
RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
rc = pthread_rwlock_unlock(&rwlock_tbl);
printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
- //int rc_obj = pthread_rwlock_wrlock(rwlock_obj);
int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
- if(EBUSY == rc_obj) {
+ if(0 != rc_obj) {
return false;
} else {
return true;
void releasewritelock(void * ptr) {
#ifdef RAW
+ unsigned msgHdr;
+ int self_y, self_x, target_y, target_x;
+ int targetcore = ((int)ptr >> 5) % TOTALCORE;
+ // for 32 bit machine, the size is always 3 words
+ //int msgsize = sizeof(int) * 3;
+ int msgsize = 3;
+
+ if(targetcore == corenum) {
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+ // reside on this core
+ if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
+ // no locks for this object, something is wrong
+ raw_test_done(0xa008);
+ } else {
+ int rwlock_obj = 0;
+#ifdef RAWDEBUG
+ raw_test_pass(0xe662);
+#endif
+ RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ rwlock_obj++;
+ RuntimeHashremovekey(locktbl, (int)ptr);
+ RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(rwlock_obj);
+#endif
+ }
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+ return;
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xe663);
+#endif
+ calCoords(corenum, &self_y, &self_x);
+ calCoords(targetcore, &target_y, &target_x);
+ // Build the message header
+ msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
+ self_y, self_x,
+ target_y, target_x);
+ gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
+#ifdef RAWDEBUG
+ raw_test_pass(0xbbbb);
+ raw_test_pass(0xb000 + targetcore);
+#endif
+ gdn_send(5); // lock release
+ #ifdef RAWDEBUG
+ raw_test_pass(5);
+#endif
+ gdn_send(1); // write lock
+#ifdef RAWDEBUG
+ raw_test_pass(1);
+#endif
+ gdn_send(ptr);
+#ifdef RAWDEBUG
+ raw_test_pass_reg(ptr);
+ raw_test_pass(0xffff);
+#endif
#elif defined THREADSIMULATE
int numofcore = pthread_getspecific(key);
int rc = pthread_rwlock_rdlock(&rwlock_tbl);
tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
}
/* Enqueue task */
- if ((!gencontains(failedtasks, tpd)&&!gencontains(activetasks,tpd))) {
+ if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
genputtable(activetasks, tpd, tpd);
} else {
RUNFREE(tpd->parameterArray);
}
return retval;
}
+
+#ifdef RAW
+int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
+ void * taskpointerarray[MAXTASKPARAMS];
+ int j;
+ int numparams=parameter->task->numParameters;
+ int numiterators=parameter->task->numTotal-1;
+ int retval=1;
+ int addnormal=1;
+ int adderror=1;
+
+ struct taskdescriptor * task=parameter->task;
+
+ ObjectHashadd_I(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
+ /* Add enqueued object to parameter vector */
+ taskpointerarray[parameter->slot]=ptr;
+
+ /* Reset iterators */
+ for(j=0;j<numiterators;j++) {
+ toiReset(¶meter->iterators[j]);
+ }
+
+ /* Find initial state */
+ for(j=0;j<numiterators;j++) {
+ backtrackinit:
+ if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
+ toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
+ else if (j>0) {
+ /* Need to backtrack */
+ toiReset(¶meter->iterators[j]);
+ j--;
+ goto backtrackinit;
+ } else {
+ /* Nothing to enqueue */
+ return retval;
+ }
+ }
+
+ while(1) {
+ /* Enqueue current state */
+ int launch = 0;
+ struct taskparamdescriptor *tpd=RUNMALLOC_I(sizeof(struct taskparamdescriptor));
+ tpd->task=task;
+ tpd->numParameters=numiterators+1;
+ tpd->parameterArray=RUNMALLOC_I(sizeof(void *)*(numiterators+1));
+ for(j=0;j<=numiterators;j++){
+ tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
+ }
+ /* Enqueue task */
+ if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
+ genputtable_I(activetasks, tpd, tpd);
+ } else {
+ RUNFREE(tpd->parameterArray);
+ RUNFREE(tpd);
+ }
+
+ /* This loop iterates to the next parameter combination */
+ if (numiterators==0)
+ return retval;
+
+ for(j=numiterators-1; j<numiterators;j++) {
+ backtrackinc:
+ if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
+ toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
+ else if (j>0) {
+ /* Need to backtrack */
+ toiReset(¶meter->iterators[j]);
+ j--;
+ goto backtrackinc;
+ } else {
+ /* Nothing more to enqueue */
+ return retval;
+ }
+ }
+ }
+ return retval;
+}
+#endif
+
/* Handler for signals. The signals catch null pointer errors and
arithmatic errors. */
-
+#ifndef RAW
void myhandler(int sig, siginfo_t *info, void *uap) {
sigset_t toclear;
#ifdef DEBUG
sigprocmask(SIG_UNBLOCK, &toclear,NULL);
longjmp(error_handler,1);
}
+#endif
fd_set readfds;
int maxreadfd;
void executetasks() {
void * taskpointerarray[MAXTASKPARAMS+OFFSET];
+ int numparams=0;
+ int numtotal=0;
+ struct ___Object___ * tmpparam = NULL;
+ struct parameterdescriptor * pd=NULL;
+ struct parameterwrapper *pw=NULL;
+ int j = 0;
+ int x = 0;
+ bool lock = true;
+
+#ifdef RAW
+ int grount = 0;
+ int andmask=0;
+ int checkmask=0;
+#ifdef RAWDEBUG
+ raw_test_pass(0xe991);
+#endif
+#endif
+#ifndef RAW
/* Set up signal handlers */
struct sigaction sig;
sig.sa_sigaction=&myhandler;
sigaction(SIGSEGV,&sig,0);
sigaction(SIGFPE,&sig,0);
sigaction(SIGPIPE,&sig,0);
+#endif
+#ifndef RAW
/* Zero fd set */
FD_ZERO(&readfds);
+#endif
maxreadfd=0;
+#ifndef RAW
fdtoobject=allocateRuntimeHash(100);
+#endif
+#ifndef RAW
/* Map first block of memory to protected, anonymous page */
mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
+#endif
newtask:
while((hashsize(activetasks)>0)||(maxreadfd>0)) {
+#ifdef RAW
+#ifdef RAWDEBUG
+ raw_test_pass(0xe992);
+#endif
+#else
/* Check if any filedescriptors have IO pending */
if (maxreadfd>0) {
int i;
}
}
}
+#endif
/* See if there are any active tasks */
if (hashsize(activetasks)>0) {
genfreekey(activetasks, currtpd);
/* Check if this task has failed, allow a task that contains optional objects to fire */
- if (gencontains(failedtasks, currtpd)) {
+ /*if (gencontains(failedtasks, currtpd)) {
// Free up task parameter descriptor
RUNFREE(currtpd->parameterArray);
RUNFREE(currtpd);
goto newtask;
- }
- int numparams=currtpd->task->numParameters;
- int numtotal=currtpd->task->numTotal;
+ }*/
+ numparams=currtpd->task->numParameters;
+ numtotal=currtpd->task->numTotal;
+#ifdef THREADSIMULATE
int isolateflags[numparams];
+#endif
/* Make sure that the parameters are still in the queues */
for(i=0;i<numparams;i++) {
void * parameter=currtpd->parameterArray[i];
- struct ___Object___ * tmpparam = (struct ___Object___ *)parameter;
+#ifdef RAW
+#ifdef RAWDEBUG
+ raw_test_pass(0xe993);
+#endif
+
+ if(((struct ___Object___ *)parameter)->type == STARTUPTYPE) {
+ lock = false;
+ taskpointerarray[i+OFFSET]=parameter;
+ goto execute;
+ }
+ lock = true;
+ // require locks for this parameter if it is not a startup object
+ getwritelock(parameter);
+ grount = 0;
+
+#ifdef INTERRUPT
+ raw_user_interrupts_off();
+#endif
+ while(!lockflag) {
+ receiveObject();
+ }
+#ifndef INTERRUPT
+ if(reside) {
+ while(receiveObject() != -1) {
+ }
+ }
+#endif
+ grount = lockresult;
+
+ lockresult = 0;
+ lockobj = 0;
+ lockflag = false;
+#ifndef INTERRUPT
+ reside = false;
+#endif
+#ifdef INTERRUPT
+ raw_user_interrupts_on();
+#endif
+
+ if(grount == 0) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xe994);
+#endif
+ // can not get the lock, try later
+ for(j = 0; j < i; ++j) {
+ releasewritelock(taskpointerarray[j+OFFSET]);
+ }
+ genputtable(activetasks, currtpd, currtpd);
+ if(hashsize(activetasks) == 1) {
+ // only one task right now, wait a little while before next try
+ int halt = 10000;
+ while(halt--){}
+ }
+ goto newtask;
+ }
+ // flush the object
+ {
+ raw_invalidate_cache_range((int)parameter, classsize[((struct ___Object___ *)parameter)->type]);
+ /*int tmp = 0;
+ for(tmp = 0; tmp < classsize[((struct ___Object___ *)parameter)->type]; ++tmp) {
+ invalidateAddr(parameter + tmp);
+ }*/
+ }
+#endif
+ tmpparam = (struct ___Object___ *)parameter;
+#ifdef THREADSIMULATE
+ if(((struct ___Object___ *)parameter)->type == STARTUPTYPE) {
+ lock = false;
+ taskpointerarray[i+OFFSET]=parameter;
+ goto execute;
+ }
+ lock = true;
if(0 == tmpparam->isolate) {
isolateflags[i] = 0;
// shared object, need to flush with current value
int j = 0;
for(j = 0; j < i; ++j) {
if(0 == isolateflags[j]) {
- releasewritelock(taskpointerarray[j]);
+ releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
}
}
genputtable(activetasks, currtpd, currtpd);
goto newtask;
}
if(tmpparam->version != tmpparam->original->version) {
+ // some task on another core has changed this object
// flush this object
- memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
- //releasereadlock(tmpparam->original);
- // fail to get write lock, release all obtained locks and try this task later
+ //memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
+ // release all obtained locks
int j = 0;
for(j = 0; j < i; ++j) {
if(0 == isolateflags[j]) {
}
releasewritelock(tmpparam->original);
- // some task on another core has changed this object
- // Free up task parameter descriptor
- RUNFREE(currtpd->parameterArray);
- RUNFREE(currtpd);
// dequeue this object
-#ifdef THREADSIMULATE
int numofcore = pthread_getspecific(key);
struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
int length = numqueues[numofcore][tmpparam->type];
-#else
- struct parameterwrapper ** queues = objectqueues[corenum][tmpparam->type];
- int length = numqueues[corenum][tmpparam->type];
-#endif
for(j = 0; j < length; ++j) {
struct parameterwrapper * pw = queues[j];
if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
}
}
// try to enqueue it again to check if it feeds other tasks;
- enqueueObject(tmpparam, NULL, 0);
+ //enqueueObject(tmpparam, NULL, 0);
+ // Free up task parameter descriptor
+ RUNFREE(currtpd->parameterArray);
+ RUNFREE(currtpd);
goto newtask;
}
- //releasereadlock(tmpparam->original);
} else {
isolateflags[i] = 1;
}
- struct parameterdescriptor * pd=currtpd->task->descriptorarray[i];
- struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
- int j;
+#endif
+ pd=currtpd->task->descriptorarray[i];
+ pw=(struct parameterwrapper *) pd->queue;
/* Check that object is still in queue */
{
if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xe995);
+#endif
+ // release grabbed locks
+ for(j = 0; j < i; ++j) {
+ releasewritelock(taskpointerarray[j+OFFSET]);
+ }
+ releasewritelock(parameter);
RUNFREE(currtpd->parameterArray);
RUNFREE(currtpd);
goto newtask;
}
}
+#ifdef RAW
+ /* Check if the object's flags still meets requirements */
+ {
+ int tmpi = 0;
+ bool ismet = false;
+ for(tmpi = 0; tmpi < pw->numberofterms; ++tmpi) {
+ andmask=pw->intarray[tmpi*2];
+ checkmask=pw->intarray[tmpi*2+1];
+#ifdef RAWDEBUG
+ raw_test_pass(0xdd000000 + andmask);
+ raw_test_pass_reg((int)parameter);
+ raw_test_pass(0xdd000000 + ((struct ___Object___ *)parameter)->flag);
+ raw_test_pass(0xdd000000 + checkmask);
+#endif
+ if((((struct ___Object___ *)parameter)->flag&andmask)==checkmask) {
+ ismet = true;
+ break;
+ }
+ }
+ if (!ismet) {
+ // flags are never suitable
+ // remove this obj from the queue
+ int next;
+ int UNUSED, UNUSED2;
+ int * enterflags;
+#ifdef RAWDEBUG
+ raw_test_pass(0xe996);
+#endif
+ ObjectHashget(pw->objectset, (int) parameter, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
+ ObjectHashremove(pw->objectset, (int)parameter);
+ if (enterflags!=NULL)
+ free(enterflags);
+ // release grabbed locks
+ for(j = 0; j < i; ++j) {
+ releasewritelock(taskpointerarray[j+OFFSET]);
+ }
+ releasewritelock(parameter);
+ RUNFREE(currtpd->parameterArray);
+ RUNFREE(currtpd);
+ goto newtask;
+ }
+ }
+#endif
parameterpresent:
;
/* Check that object still has necessary tags */
int slotid=pd->tagarray[2*j]+numparams;
struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
if (!containstag(parameter, tagd)) {
+#ifdef RAWDEBUG
+ raw_test_pass(0xe997);
+#endif
RUNFREE(currtpd->parameterArray);
RUNFREE(currtpd);
goto newtask;
taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
}
+#ifdef THREADSIMULATE
for(i = 0; i < numparams; ++i) {
if(0 == isolateflags[i]) {
struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
- // shared object, need to replace this copy with original one
- /*if(!getwritelock(tmpparam->original)) {
- // fail to get write lock, release all obtained locks and try this task later
- int j = 0;
- for(j = 0; j < i; ++j) {
- if(0 == isolateflags[j]) {
- releasewritelock(taskpointerarray[j]);
- }
- }
- genputtable(activetasks, tpd, tpd);
- goto newtask;
- }*/
if(tmpparam != tmpparam->original) {
taskpointerarray[i+OFFSET] = tmpparam->original;
}
}
}
+#endif
{
+#if 0
+#ifndef RAW
/* Checkpoint the state */
forward=allocateRuntimeHash(100);
reverse=allocateRuntimeHash(100);
//void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
- int x;
+#endif
+#endif
if (x=setjmp(error_handler)) {
int counter;
/* Recover */
-
+#ifndef RAW
#ifdef DEBUG
printf("Fatal Error=%d, Recovering!\n",x);
+#endif
#endif
/*
genputtable(failedtasks,currtpd,currtpd);
forward=NULL;
reverse=NULL;
*/
- fflush(stdout);
+ //fflush(stdout);
+#ifdef RAW
+#ifdef RAWDEBUG
+ raw_test_pass_reg(x);
+#endif
+ raw_test_done(0xa009);
+#else
exit(-1);
+#endif
} else {
/*if (injectfailures) {
if ((((double)random())/RAND_MAX)<failurechance) {
((int *)taskpointerarray)[0]=currtpd->numParameters;
taskpointerarray[1]=NULL;
#endif
+execute:
if(debugtask){
+#ifndef RAW
printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
+#endif
((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
+#ifndef RAW
printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
- } else
+#endif
+ } else {
((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
+ }
+#ifdef RAWDEBUG
+ raw_test_pass(0xe998);
+ raw_test_pass_reg(lock);
+ #endif
+ if(lock) {
+#ifdef RAW
+ for(i = 0; i < numparams; ++i) {
+ int j = 0;
+ struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
+#ifdef RAWDEBUG
+ raw_test_pass(0xe999);
+ raw_test_pass(0xdd100000 + tmpparam->flag);
+#endif
+ releasewritelock(tmpparam);
+ }
+#elif defined THREADSIMULATE
for(i = 0; i < numparams; ++i) {
if(0 == isolateflags[i]) {
struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
releasewritelock(tmpparam);
}
}
+#endif
+ }
+#if 0
+#ifndef RAW
freeRuntimeHash(forward);
freeRuntimeHash(reverse);
+#endif
+#endif
freemalloc();
// Free up task parameter descriptor
RUNFREE(currtpd->parameterArray);
RUNFREE(currtpd);
+#if 0
+#ifndef RAW
forward=NULL;
reverse=NULL;
+#endif
+#endif
+#ifdef RAWDEBUG
+ raw_test_pass(0xe99a);
+ raw_test_pass_reg(lock);
+ #endif
+
}
}
}
}
+#ifdef RAWDEBUG
+ raw_test_pass(0xe999);
+#endif
}
/* This function processes an objects tags */
for(i=0;i<numtasks[numofcore];i++) {
struct taskdescriptor * task=taskarray[numofcore][i];
#else
+#ifdef RAW
+ if(corenum > NUMCORES - 1) {
+ return;
+ }
+#endif
for(i=0;i<numtasks[corenum];i++) {
struct taskdescriptor * task=taskarray[corenum][i];
#endif
+#ifndef RAW
printf("%s\n", task->name);
+#endif
for(j=0;j<task->numParameters;j++) {
struct parameterdescriptor *param=task->descriptorarray[j];
struct parameterwrapper *parameter=param->queue;
struct ObjectHash * set=parameter->objectset;
struct ObjectIterator objit;
+#ifndef RAW
printf(" Parameter %d\n", j);
+#endif
ObjectHashiterator(set, &objit);
while(ObjhasNext(&objit)) {
struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
int numflags=Objdata3(&objit);
int flags=Objdata2(&objit);
Objnext(&objit);
+#ifndef RAW
printf(" Contains %lx\n", obj);
printf(" flag=%d\n", obj->flag);
+#endif
if (tagptr==NULL) {
} else if (tagptr->type==TAGTYPE) {
+#ifndef RAW
printf(" tag=%lx\n",tagptr);
+#endif
+ ;
} else {
int tagindex=0;
struct ArrayObject *ao=(struct ArrayObject *)tagptr;
for(;tagindex<ao->___cachedCode___;tagindex++) {
+#ifndef RAW
printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
+#endif
}
}
}
void processtasks() {
int i;
+#ifdef RAW
+ if(corenum > NUMCORES - 1) {
+ return;
+ }
+#endif
#ifdef THREADSIMULATE
int numofcore = pthread_getspecific(key);
for(i=0;i<numtasks[numofcore];i++) {
struct parameterwrapper *parameter=param->queue;
builditerators(task, j, parameter);
}
- }
+ }
}
void toiReset(struct tagobjectiterator * it) {