From: jzhou Date: Fri, 7 Aug 2009 00:13:05 +0000 (+0000) Subject: finish most parts, need to fix large objs handling and memory allocation(add free... X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=ed2279d8b9680c1b7fb8df71fa03e22a79c48755;p=IRC.git finish most parts, need to fix large objs handling and memory allocation(add free memory space list) --- diff --git a/Robust/src/Runtime/RAW/task_arch.c b/Robust/src/Runtime/RAW/task_arch.c index cb0c1e18..7751ece9 100644 --- a/Robust/src/Runtime/RAW/task_arch.c +++ b/Robust/src/Runtime/RAW/task_arch.c @@ -3,14 +3,6 @@ #include "multicoreruntime.h" #include "runtime_arch.h" -// data structures for locking -struct RuntimeHash locktable; -static struct RuntimeHash* locktbl = &locktable; -struct LockValue { - int redirectlock; - int value; -}; - __attribute__((always_inline)) inline void initialization() { } // initialization() diff --git a/Robust/src/Runtime/multicoregarbage.c b/Robust/src/Runtime/multicoregarbage.c index 5c4362f0..e83199c4 100644 --- a/Robust/src/Runtime/multicoregarbage.c +++ b/Robust/src/Runtime/multicoregarbage.c @@ -196,51 +196,7 @@ void transferCompactStart(int core) { // end of sending this msg, set sand msg flag false isMsgSending = false; - ++(self_numsendobjs); - // check if there are pending msgs - while(isMsgHanging) { - // get the msg from outmsgdata[] - // length + target + msg - outmsgleft = outmsgdata[outmsgindex]; - outmsgindex = (outmsgindex + 1) % BAMBOO_OUT_BUF_LENGTH; - int target = outmsgdata[outmsgindex]; - outmsgindex = (outmsgindex + 1) % BAMBOO_OUT_BUF_LENGTH; - // mark to start sending the msg - isMsgSending = true; - // Build the message header - msgHdr = tmc_udn_header_from_cpu(target); - __tmc_udn_send_header_with_size_and_tag(msgHdr, outmsgleft, UDN0_DEMUX_TAG); // send header -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xbbbb); - BAMBOO_DEBUGPRINT(0xb000 + target); // targetcore -#endif - while(outmsgleft-- > 0) { - udn_send(outmsgdata[outmsgindex]); -#ifdef DEBUG - BAMBOO_DEBUGPRINT_REG(outmsgdata[outmsgindex]); -#endif - outmsgindex = (outmsgindex + 1) % BAMBOO_OUT_BUF_LENGTH; - } -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xffff); -#endif - // mark to end sending the msg - isMsgSending = false; - BAMBOO_START_CRITICAL_SECTION_MSG(); -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf001); -#endif - // check if there are still msg hanging - if(outmsgindex == outmsglast) { - // no more msgs - outmsgindex = outmsglast = 0; - isMsgHanging = false; - } - BAMBOO_CLOSE_CRITICAL_SECTION_MSG(); -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); -#endif - } + send_hanging_msg(); } void checkMarkStatue() { @@ -336,7 +292,7 @@ bool preGC() { } // compute load balance for all cores -int loadbalance() { +void loadbalance() { // compute load balance // initialize the deltas int i; @@ -349,6 +305,8 @@ int loadbalance() { gcdeltal[i] = gcdeltar[i] = 0; gcreloads[i] = gcloads[i]; } + + // iteratively balance the loads do { stop = true; delta = deltanew; @@ -385,10 +343,11 @@ int loadbalance() { } } } while(!stop); + + // decide how to do load balance for(i = 0; i < NUMCORES; i++) { gcdeltal[i] = gcdeltar[i] = 0; } - // decide how to do load balance for(i = 0; i < NUMCORES; i++) { int tomove = (gcloads[i] - gcreloads[i]); if(tomove > 0) { @@ -424,23 +383,6 @@ int loadbalance() { } } } - - // compute heap top after load balancing - int heaptop = 0; - int localheaptop = 0; - int numblocks = 0; - INTPTR baseptr = 0; - int offset = 0; - for(i = 0; i < NUMCORES; ++i) { - NUMBLOCKS(gcreloads[i], &numblocks); - BASEPTR(i, numblocks, &baseptr); - OFFSET(gcreloads[i], &offset); - localheaptop = baseptr + offset; - if(localheaptop > heaptop) { - heaptop = localheaptop; - } - } - return heaptop; } void gc(struct garbagelist * stackptr) { @@ -457,6 +399,7 @@ void gc(struct garbagelist * stackptr) { return; } + gcprocessing = true; int i = 0; gcwaitconfirm = false; gcwaitconfirm = 0; @@ -484,7 +427,7 @@ void gc(struct garbagelist * stackptr) { send_msg_1(i, GCLOBJREQUEST); } while(numconfirm != 0) {} // wait for responses - int heaptop = loadbalance(); + loadbalance(); // TODO need to decide where to put large objects // TODO cache all large objects @@ -541,11 +484,22 @@ void gc(struct garbagelist * stackptr) { // send gc finish messages to all cores send_msg_1(i, GCFINISH); } + + // need to create free memory list and invalidate all shared mem pointers TODO + + gcflag = false; + gcprocessing = false; return; } else { + gcprocessing = true; gc_collect(stackptr); } + // invalidate all shared mem pointers + bamboo_cur_msp = NULL; + bamboo_smem_size = 0; gcflag = false; + gcprocessing = false; + } // enqueue root objs @@ -705,186 +659,249 @@ void mark(bool isfirst, struct garbagelist * stackptr) { } // while(MARKPHASE == gcphase) } // mark() -void compact() { - if(COMPACTPHASE != gcphase) { - BAMBOO_EXIT(0xb003); +struct moveHelper { + int numblocks; // block num for heap + INTPTR base; // real base virtual address of current heap block + INTPTR ptr; // real virtual address of current heap top + int offset; // offset in current heap block + int blockbase; // real virtual address of current small block to check with + int blockbound; // real bound virtual address of current small blcok to check + int top; // real size of current heap block to check + int bound; // real bound virtual address of current heap block to check +}; + +void nextSBlock(struct moveHelper * orig) { + orig->blockbase = orig->blockbound; + if(orig->blockbase == orig->bound) { + // end of current heap block, jump to next one + orig->numblocks++; + BASEPTR(BAMBOO_NUM_OF_CORE, orig->numblocks, &(orig->base)); + orig->bound = orig->base + BAMBOO_SMEM_SIZE; + orig->blockbase = orig->base; } + orig->blockbound = orig->blockbase + *((int*)(orig->blockbase)); + orig->offset = BAMBOO_CACHE_LINE_SIZE; + orig->ptr = orig->blockbase + orig->offset; +} - int numblocks = 0; // block num for dst heap for move - INTPTR curr_heapbase = 0; // real base virtual address of current heap block - INTPTR curr_heapptr = 0; // real virtual address of current heap top - int curr_offset = 0; // offset in current heap block - INTPTR orig_ptr; // real virtual address of obj to move - int curr_blockbase = 0; // real virtual address of current small block to check with - int curr_blockbound = 0; // real bound virtual address of current small blcok to check - int curr_base = 0; // real base virtual address of current heap block to check - int curr_bound = 0; // real bound virtual address of current heap block to check - int numblocks1 = 0; // block num for orig heap for move - curr_heaptop = curr_offset = BAMBOO_CACHE_LINE_SIZE; // logic heap top - curr_heapbound = BAMBOO_SMEM_SIZE_L; // logic heap bound - BASEPTR(BAMBOO_NUM_OF_CORE, numblocks, &curr_heapbase); - curr_heapptr = orig_ptr = curr_heapbase + curr_offset; - curr_base = curr_heapbase; - curr_bound = curr_heapbound; - curr_blockbase = curr_heapbase; - curr_blockbound = curr_blockbase + *((int*)curr_blockbase); +void nextBlock(struct moveHelper * to) { + to->top = to->bound + BAMBOO_CACHE_LINE_SIZE; // header! + to->bound += BAMBOO_SMEM_SIZE; + to->numblocks++; + BASEPTR(BAMBOO_NUM_OF_CORE, to->numblocks, &(to->base)); + to->offset = BAMBOO_CACHE_LINE_SIZE; + to->ptr = to->base + to->offset; +} - // scan over all objs in this block, compact those scheduled to - // reside on this core +// endaddr does not contain spaces for headers +bool moveobj(struct moveHelper * orig, struct moveHelper * to, INTPTR * endaddr) { int type = 0; int size = 0; int mark = 0; int isize = 0; - // loop stop when finishing either scanning all active objs or moving - // all objs to reside on this core - do { -innercompact: - // TODO all objs are aligned, how to filter out the paddings? - while((*((int*)orig_ptr)) == -2) { - orig_ptr++; - if(orig_ptr == curr_blockbound) { - curr_blockbase = curr_blockbound; - if(curr_blockbase == curr_bound) { - // end of current heap block, jump to next one - numblocks1++; - BASEPTR(BAMBOO_NUM_OF_CORE, numblocks1, &curr_base); - curr_bound = curr_base + BAMBOO_SMEM_SIZE; - curr_blockbase = curr_base; - } - curr_blockbound = curr_blockbase + *((int*)curr_blockbase); - orig_ptr = curr_blockbase + BAMBOO_CACHE_LINE_SIZE; - goto innercompact; - } +innermoveobj: + while((*((int*)(orig->ptr))) == -2) { + orig->ptr++; + if(orig->ptr == orig->blockbound) { + nextSBlock(orig); + goto innermoveobj; } - // check the obj's type, size and mark flag - type = ((int *)orig_ptr)[0]; - size = 0; - if(type == -1) { - // end of this block, go to next one - curr_blockbase = curr_blockbound; - if(curr_blockbase == curr_bound) { - // end of current heap block, jump to next one - numblocks1++; - BASEPTR(BAMBOO_NUM_OF_CORE, numblocks1, &curr_base); - curr_bound = curr_base + BAMBOO_SMEM_SIZE; - curr_blockbase = curr_base; - } - curr_blockbound = curr_blockbase + *((int*)curr_blockbase); - orig_ptr = curr_blockbase + BAMBOO_CACHE_LINE_SIZE; - continue; - } else if(type < NUMCLASSES) { - // a normal object - size = classsize[type]; - } else { - // an array - struct ArrayObject *ao=(struct ArrayObject *)ptr; - int elementsize=classsize[type]; - int length=ao->___length___; - size=sizeof(struct ArrayObject)+length*elementsize; - } - mark = ((int *)orig_ptr)[6]; - if(mark == 1) { - // marked obj, copy it to current heap top - // check to see if remaining space is enough - ALIGNSIZE(size, &isize); - if((curr_heaptop + isize > cinstruction->loads) - && (cinstruction->movenum != 0)) { - // all objs to reside on this core have been moved - // the remainging objs should be moved to other cores - // STOP the loop - break; - } - if(curr_heaptop + isize > curr_heapbound) { - // fill the header of this block and then go to next block - curr_offset += curr_heapbound - curr_heaptop; - (*((int*)curr_heapbase)) = curr_offset; - curr_heaptop = curr_heapbound + BAMBOO_CACHE_LINE_SIZE; // header! - curr_heapbound += BAMBOO_SMEM_SIZE; - numblocks++; - BASEPTR(BAMBOO_NUM_OF_CORE, numblocks, &curr_heapbase); - curr_offset = BAMBOO_CACHE_LINE_SIZE; - curr_heapptr = curr_heapbase + curr_offset; - } - memcpy(curr_heapptr, orig_ptr, size); - genputtable(pointertbl, orig_ptr, curr_heapptr); // store the mapping infor - curr_heapptr += isize; - curr_offset += iseize; - } - // move to next obj - orig_ptr += size; - if(orig_ptr == curr_blockbound) { - curr_blockbase = curr_blockbound; - if(curr_blockbase == curr_bound) { - // end of current heap block, jump to next one - numblocks1++; - BASEPTR(BAMBOO_NUM_OF_CORE, numblocks1, &curr_base); - curr_bound = curr_base + BAMBOO_SMEM_SIZE; - curr_blockbase = curr_base; - } - curr_blockbound = curr_blockbase + *((int*)curr_blockbase); - orig_ptr = curr_blockbase + BAMBOO_CACHE_LINE_SIZE; + } + // check the obj's type, size and mark flag + type = ((int *)(orig->ptr))[0]; + size = 0; + if(type == -1) { + // end of this block, go to next one + nextSBlock(orig); + goto innermoveobj; + } else if(type < NUMCLASSES) { + // a normal object + size = classsize[type]; + } else { + // an array + struct ArrayObject *ao=(struct ArrayObject *)(orig->ptr); + int elementsize=classsize[type]; + int length=ao->___length___; + size=sizeof(struct ArrayObject)+length*elementsize; + } + mark = ((int *)(orig->ptr))[6]; + if(mark == 1) { + // marked obj, copy it to current heap top + // check to see if remaining space is enough + ALIGNSIZE(size, &isize); + if((endaddr != NULL) && (to->top + isize > *endaddr)) { + // reached the endaddr + // fill offset to the endaddr for later configuration of header + to->offset += *endaddr - to->top; + to->top += *endaddr - to->top; + return true; } - } while(orig_ptr < markedptrbound + 1); - // TODO move objs - - struct markedObjItem * moi = mObjList.head; - bool iscopy = true; - if(moi == NULL) { - if(cinstruction->incomingobjs != NULL) { - for(int j = 0; j < cinstruction->incomingobjs->length; j++) { - // send messages to corresponding cores to start moving - send_msg_2(cinstruction->incomingobjs->dsts[j], GCMOVESTART, - BAMBOO_NUM_OF_CORE); + if(to->top + isize > to->bound) { + // fill the header of this block and then go to next block + to->offset += to->bound - to->top; + (*((int*)(to->base))) = to->offset; + if(endaddr != NULL) { + *endaddr = *endaddr + BAMBOO_CACHE_LINE_SIZE; } + nextBlock(to); } - } else { - int num_dsts = cinstruction->tomoveobjs->length; - while(num_dsts > 0) { - while(!gctomove) {} - // start moving objects to other cores - gctomove = 0; - while(!isEmpty(gcdsts)) { - int dst = (int)(getItem(gcdsts)); + memcpy(to->ptr, orig->ptr, size); + RuntimeHashadd(pointertbl, orig->ptr, to->ptr); // store the mapping info + to->ptr += isize; + to->offset += isize; + to->top += isize; + } + // move to next obj + orig->ptr += size; + if(orig->ptr == orig->blockbound) { + nextSBlock(orig); + } + return false; +} + +void migrateobjs(struct moveHelper * orig) { + int num_dsts = cinstruction->movenum; + while(num_dsts > 0) { + while(!gctomove) {} + // start moving objects to other cores + gctomove = false; + struct moveHelper * into = (struct moveHelper *)RUNMALLOC(sizeof(struct moveHelper)); + for(int j = 0; j < cinstruction->movenum; j++) { + if(cinstruction->moveflag[j] == 1) { + // can start moving to corresponding core + int dst = cinstruction->dsts[j]; num_dsts--; - int j = 0; - for(j = 0; j < cinstruction->tomoveobjs->length; j++) { - if(dst == cinstruction->tomoveobjs->dsts[j]) { + into->ptr = cinstruction->startaddrs[j]; + BLOCKINDEX(into->ptr, &(into->numblocks)); + into->bound = + (into->numblocks==0)?BAMBOO_SMEM_SIZE_L:BAMBOO_SMEM_SIZE_L+BAMBOO_SMEM_SIZE*into->numblocks; + BASEPTR(BAMBOO_NUM_OF_CORE, into->numblocks, &(into->base)); + into->offset = into->ptr - into->base; + into->top = (into->numblocks==0)?(into->offset):(into->bound-BAMBOO_SMEM_SIZE+into->offset); + into->base = into->ptr; + into->offset = BAMBOO_CACHE_LINE_SIZE; + into->ptr += into->offset; // for header + into->top += into->offset; + int endaddr = into->top + cinstruction->endaddrs[j]; + do { + bool stop = moveobj(orig, into, &endaddr); + if(stop) { + // all objs before endaddr have been moved + // STOP the loop break; - } + } + } while(orig->ptr < markedptrbound + 1); + cinstruction->moveflag[j] = 2; // set the flag indicating move finished + // fill the header of this blockk + (*((int*)(into->base))) = into->offset; + } // if(cinstruction->moveflag[j] == 1) + } // for(int j = 0; j < cinstruction->movenum; j++) + RUNFREE(into); + } // while(num_dsts > 0) +} + +void compact() { + if(COMPACTPHASE != gcphase) { + BAMBOO_EXIT(0xb003); + } + + INTPTR heaptopptr = 0; + + // initialize pointers for comapcting + struct moveHelper * orig = (struct moveHelper *)RUNMALLOC(sizeof(struct moveHelper)); + struct moveHelper * to = (struct moveHelper *)RUNMALLOC(sizeof(struct moveHelper)); + to->numblocks = 0; + to->top = to->offset = BAMBOO_CACHE_LINE_SIZE; + to->bound = BAMBOO_SMEM_SIZE_L; + BASEPTR(BAMBOO_NUM_OF_CORE, to->numblocks, &(to->base)); + to->ptr = to->base + to->offset; + orig->numblocks = 0; + orig->ptr = to->ptr; + orig->base = to->base; + orig->bound = to->bound; + orig->blockbase = to->base; + orig->blockbound = orig->blockbase + *((int*)(orig->blockbase)); + + // scan over all objs in this block, compact those scheduled to + // reside on this core + // loop stop when finishing either scanning all active objs or moving + // all objs to reside on this core + int endaddr = cinstruction->loads; + do { + bool stop = moveobj(orig, to, &endaddr); + curr_heaptop = to->top; + curr_heapbound = to->bound; + if(stop && (cinstruction->movenum != 0)) { + // all objs to reside on this core have been moved + // the remainging objs should be moved to other cores + // STOP the loop + break; + } + } while(orig->ptr < markedptrbound + 1); + // fill the header of this block + (*((int*)(to->base))) = to->offset; + heaptopptr = to->ptr; + + // move objs + if(cinstruction->movenum != 0) { + if(cinstruction->ismove) { + // have objs to move to other cores + migrateobjs(orig); + + // might still have objs left, compact them to this core + // leave space for header + if(orig->ptr < markedptrbound + 1) { + if(to->top + BAMBOO_CACHE_LINE_SIZE > to->bound) { + // fill the left part of current block + memset(to->top, -2, to->bound - to->top); + // go to next block + nextBlock(to); + } else { + to->top += BAMBOO_CACHE_LINE_SIZE; // for header + to->offset = BAMBOO_CACHE_LINE_SIZE; + to->base = to->ptr; + to->ptr += BAMBOO_CACHE_LINE_SIZE; } - INTPTR top = cinstruction->tomoveobjs->dststarts[j]; - INTPTR start = cinstruction->tomoveobjs->starts[j]; - INTPTR end = cinstruction->tomoveobjs->ends[j]; - struct markedObjItem * tomove = getStartItem(moi, start); - do { - int type = ((int *)(tomove->orig))[0]; - int size = 0; - if(type == -1) { - // do nothing - } - if(type < NUMCLASSES) { - // a normal object - size = classsize[type]; - moi->dst = top; - top += size; - memcpy(moi->dst, moi->orig, size); - genputtable(pointertbl, moi->orig, moi->dst); - } else { - // an array - struct ArrayObject *ao=(struct ArrayObject *)ptr; - int elementsize=classsize[type]; - int length=ao->___length___; - size=sizeof(struct ArrayObject)+length*elementsize; - moi->dst = top; - top += size; - memcpy(moi->dst, moi->orig, size); - genputtable(pointertbl, moi->orig, moi->dst); - } - tomove = tomove->next; - } while(tomove->orig < end); - } // while(!isEmpty(gcdsts)) - } // while(num_dsts > 0) - } // if(moi == NULL) else() + while(orig->ptr < markedptrbound + 1) { + moveobj(orig, to, NULL); + curr_heaptop = to->top; + curr_heapbound = to->bound; + } + // fill the header of this blockk + (*((int*)(to->base))) = to->offset; + } + heaptopptr = to->ptr; + } else { + // have incoming objs, send messages to corresponding cores to start moving + INTPTR startaddr = 0; + INTPTR endaddr = 0; + int heapptr = curr_heapptr; + int top = curr_heaptop; + int bound = curr_heapbound; + for(int j = 0; j < cinstruction->movenum; j++) { + startaddr = heapptr; + top = top + cinstruction->size2move[j] + BAMBOO_CACHE_LINE_SIZE; + if(top > bound) { + // will cross block boundary + int numb = (top - bound) / BAMBOO_SMEM_SIZE + 1; + top += numb * BAMBOO_CACHE_LINE_SIZE; + BASEPTR(BAMBOO_NUM_OF_CORE, numblocks + numb, &endaddr); + endaddr += (top - bound) % BAMBOO_SMEM_SIZE + BAMBOO_CACHE_LINE_SIZE; + heapptr = endaddr; + bound += BAMBOO_SMEM_SIZE * numb; + } else { + endaddr = heapptr + cinstruction->size2move[j] + BAMBOO_CACHE_LINE_SIZE; + heapptr = endaddr; + } + send_msg_4(cinstruction->dsts[j], GCMOVESTART, + BAMBOO_NUM_OF_CORE, startaddr, cinstruction->size2move[j]); + } + heaptopptr = heapptr; + } // if(cinstruction->ismove) + } // if(cinstruction->movenum != 0) + + // TODO large obj + /* if((cinstruction != NULL) && (cinstruction->largeobjs != NULL)) { // move all large objects do { @@ -893,19 +910,20 @@ innercompact: cinstruction->largeobjs = loi->next; // move this large obj memcpy(loi->dst, loi->orig, loi->length); - genputtable(pointertbl, loi->orig, loi->dst); + RuntimeHashadd(pointertbl, loi->orig, loi->dst); RUNFREE(loi); }while(cinstruction->largeobjs != NULL); - } + }*/ // send compact finish message to core coordinator - send_msg_2(STARTUPCORE, GCFINISHCOMPACT, BAMBOO_NUM_OF_CORE); - + send_msg_3(STARTUPCORE, GCFINISHCOMPACT, BAMBOO_NUM_OF_CORE, to->ptr); + + RUNFREE(orig); + RUNFREE(to); } // compact() void flush() { - struct markedObjItem * moi = mObjList.head; - while(moi != NULL) { - void * ptr = moi->dst; + while(gc_moreItems()) { + voit * ptr = gc_dequeue(); int type = ((int *)(ptr))[0]; // scan all pointers in ptr unsigned INTPTR * pointer; @@ -921,7 +939,8 @@ void flush() { for(j=0; j___length___)+sizeof(int)))[j]; // change to new address - void *dstptr = gengettable(pointertbl, objptr); + void *dstptr = NULL; + RuntimeHashget(pointertbl, objptr, &dstptr); if(NULL == dstptr) { // send msg to host core for the mapping info obj2map = (int)objptr; @@ -930,7 +949,7 @@ void flush() { send_msg_3(hostcore(objptr), GCMAPREQUEST, (int)objptr, BAMBOO_NUM_OF_CORE); while(!ismapped) {} - dstptr = mappedobj; + RuntimeHashget(pointertbl, objptr, &dstptr); } ((void **)(((char *)&ao->___length___)+sizeof(int)))[j] = dstptr; } @@ -941,7 +960,8 @@ void flush() { unsigned int offset=pointer[i]; void * objptr=*((void **)(((char *)ptr)+offset)); // change to new address - void *dstptr = gengettable(pointertbl, objptr); + void *dstptr = NULL; + RuntimeHashget(pointertbl, objptr, &dstptr); if(NULL == dstptr) { // send msg to host core for the mapping info obj2map = (int)objptr; @@ -950,16 +970,14 @@ void flush() { send_msg_3(hostcore(objptr), GCMAPREQUEST, (int)objptr, BAMBOO_NUM_OF_CORE); while(!ismapped) {} - dstptr = mappedobj; + RuntimeHashget(pointertbl, objptr, &dstptr); } *((void **)(((char *)ptr)+offset)) = dstptr; } } - moi = moi->next; } // while(moi != NULL) // send flush finish message to core coordinator send_msg_2(STARTUPCORE, GCFINISHFLUSH, BAMBOO_NUM_OF_CORE); - } // flush() void gc_collect(struct garbagelist * stackptr) { @@ -968,12 +986,8 @@ void gc_collect(struct garbagelist * stackptr) { compact(); while(FLUSHPHASE != gcphase) {} flush(); - - while(true) { - if(FINISHPHASE == gcphase) { - return; - } - } + + while(FINISHPHASE != gcphase) {} } #endif diff --git a/Robust/src/Runtime/multicoregarbage.h b/Robust/src/Runtime/multicoregarbage.h index 81000a62..d43e5f44 100644 --- a/Robust/src/Runtime/multicoregarbage.h +++ b/Robust/src/Runtime/multicoregarbage.h @@ -42,21 +42,16 @@ struct largeObjItem { int length; struct largeObjItem * next; }; -/* -struct moveObj { - INTPTR * starts; - INTPTR * ends; - INTPTR * dststarts; - int * dsts; - int length; -}; -*/ + struct compactInstr { int loads; int ismove; int movenum; - int * size2move; - int * dsts; + int size2move[2]; + int dsts[2]; + int moveflag[2]; + INTPTR startaddrs[2]; + INTPTR endaddrs[2]; struct largeObjItem * largeobjs; }; @@ -70,8 +65,7 @@ enum GCPHASETYPE { volatile bool gcflag; volatile bool gcprocessing; GCPHASETYPE gcphase; // indicating GC phase -bool gctomove; // flag indicating if can start moving objects to other cores -struct Queue * gcdsts; + // for mark phase termination int gccorestatus[NUMCORES]; // records status of each core // 1: running gc @@ -80,6 +74,7 @@ int gcnumsendobjs[NUMCORES]; // records how many objects a core has sent out int gcnumreceiveobjs[NUMCORES]; // records how many objects a core has received int gcself_numsendobjs; int gcself_numreceiveobjs; + // for load balancing int gcloads[NUMCORES]; int gcreloads[NUMCORES]; @@ -89,8 +84,10 @@ int gcdeltar[NUMCORES]; // compact instruction INTPTR markedptrbound; struct compactInstr * cinstruction; +bool gctomove; // flag indicating if can start moving objects to other cores + // mapping of old address to new address -struct genhashtable * pointertbl; +struct RuntimeHash * pointertbl; int obj2map; int mappedobj; bool ismapped; diff --git a/Robust/src/Runtime/multicoreruntime.h b/Robust/src/Runtime/multicoreruntime.h index 1588ad5d..c30dfb57 100644 --- a/Robust/src/Runtime/multicoreruntime.h +++ b/Robust/src/Runtime/multicoreruntime.h @@ -7,7 +7,8 @@ // data structures for msgs #define BAMBOO_OUT_BUF_LENGTH 300 -int msgdata[30]; +#define BAMBOO_MSG_BUF_LENGTH 30 +int msgdata[BAMBOO_MSG_BUF_LENGTH]; int msgtype; int msgdataindex; int msglength; @@ -139,6 +140,12 @@ int self_numreceiveobjs; // get rid of lock msgs for GC version #ifndef MULTICORE_GC // data structures for locking +struct RuntimeHash locktable; +static struct RuntimeHash* locktbl = &locktable; +struct LockValue { + int redirectlock; + int value; +}; struct RuntimeHash * objRedirectLockTbl; int lockobj; int lock2require; @@ -234,6 +241,7 @@ inline int processlockrequest(int locktype, int lock, int obj, int requestcore, inline void processlockrelease(int locktype, int lock, int redirectlock, bool isredirect) __attribute_((always_inline)); // msg related functions +inline void send_hanging_msg() __attribute__((always_inline)); inline void send_msg_1(int targetcore, unsigned long n0) __attribute__((always_inline)); inline void send_msg_2(int targetcore, unsigned long n0, unsigned long n1) __attribute__((always_inline)); inline void send_msg_3(int targetcore, unsigned long n0, unsigned long n1, unsigned long n2) __attribute__((always_inline)); diff --git a/Robust/src/Runtime/multicoretask.c b/Robust/src/Runtime/multicoretask.c index 85c5c31b..28fb9094 100644 --- a/Robust/src/Runtime/multicoretask.c +++ b/Robust/src/Runtime/multicoretask.c @@ -3,85 +3,55 @@ #include "multicoreruntime.h" #include "runtime_arch.h" #include "GenericHashtable.h" -#if 0 -/* - extern int injectfailures; - extern float failurechance; - */ -extern int debugtask; -extern int instaccum; - -void * curr_heapbase=0; -void * curr_heaptop=0; - -#ifdef CONSCHECK -#include "instrument.h" -#endif -#endif // if 0: for recovery // data structures for task invocation struct genhashtable * activetasks; -//struct genhashtable * failedtasks; // for recovery struct taskparamdescriptor * currtpd; -#if 0 -struct RuntimeHash * forward; -struct RuntimeHash * reverse; -#endif // if 0: for recovery // specific functions used inside critical sections void enqueueObject_I(void * ptr, struct parameterwrapper ** queues, int length); int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags); -// main function for each core -inline void run(void * arg) { - int i = 0; - int argc = 1; - char ** argv = NULL; - bool sendStall = false; - bool isfirst = true; - bool tocontinue = false; - struct transObjInfo * objInfo = NULL; - int grount = 0; - bool allStall = true; - int sumsendobj = 0; - - corenum = BAMBOO_GET_NUM_OF_CORE(); -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xeeee); - BAMBOO_DEBUGPRINT_REG(corenum); - BAMBOO_DEBUGPRINT(STARTUPCORE); -#endif - - // initialize the arrays +inline void initruntimedata() { + // initialize the arrays if(STARTUPCORE == BAMBOO_NUM_OF_CORE) { // startup core to initialize corestatus[] for(i = 0; i < NUMCORES; ++i) { corestatus[i] = 1; numsendobjs[i] = 0; // assume all variables are local variables! MAY BE WRONG!!! numreceiveobjs[i] = 0; - } - numconfirm = 0; - waitconfirm = false; #ifdef PROFILE - // initialize the profile data arrays - for(i = 0; i < NUMCORES; ++i) { - profilestatus[i] = 1; - } + // initialize the profile data arrays + profilestatus[i] = 1; #endif - // TODO for test - total_num_t6 = 0; +#ifdef MULTICORE_GC + gccorestatus[i] = 1; + gcnumsendobjs[i] = 0; + gcnumreceiveobjs[i] = 0; + gcloads[i] = 0; + gcreloads[i] = 0; + gcdeltal[i] = 0; + gcdeltar[i] = 0; +#endif + } // for(i = 0; i < NUMCORES; ++i) + numconfirm = 0; + waitconfirm = false; + + // TODO for test + total_num_t6 = 0; } + busystatus = true; self_numsendobjs = 0; self_numreceiveobjs = 0; - for(i = 0; i < 30; ++i) { + for(i = 0; i < BAMBOO_MSG_BUF_LENGTH; ++i) { msgdata[i] = -1; } msgtype = -1; msgdataindex = 0; - msglength = 30; - for(i = 0; i < 30; ++i) { + msglength = BAMBOO_MSG_BUF_LENGTH; + for(i = 0; i < BAMBOO_OUT_BUF_LENGTH; ++i) { outmsgdata[i] = -1; } outmsgindex = 0; @@ -94,7 +64,21 @@ inline void run(void * arg) { bamboo_cur_msp = NULL; bamboo_smem_size = 0; - // create the lock table, lockresult table and obj queue +#ifdef MULTICORE_GC + gcflag = false; + gcprocessing = false; + gcphase = FINISHPHASE; + gcself_numsendobjs = 0; + gcself_numreceiveobjs = 0; + markedptrbound = 0; + cinstruction = NULL; + gctomove = false; + pointertbl = allocateRuntimeHash(20); + obj2map = 0; + mappedobj = 0; + ismapped = false; +#else + // create the lock table, lockresult table and obj queue locktable.size = 20; locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20); /* Set allocation blocks*/ @@ -106,13 +90,14 @@ inline void run(void * arg) { lock2require = 0; lockresult = 0; lockflag = false; + lockRedirectTbl = allocateRuntimeHash(20); + objRedirectLockTbl = allocateRuntimeHash(20); +#endif #ifndef INTERRUPT reside = false; #endif objqueue.head = NULL; objqueue.tail = NULL; - lockRedirectTbl = allocateRuntimeHash(20); - objRedirectLockTbl = allocateRuntimeHash(20); #ifdef PROFILE stall = false; @@ -123,355 +108,386 @@ inline void run(void * arg) { /*interruptInfoIndex = 0; interruptInfoOverflow = false;*/ #endif +} - // other architecture related initialization - initialization(); - - initCommunication(); - -#if 0 -#ifdef BOEHM_GC - GC_init(); // Initialize the garbage collector -#endif -#ifdef CONSCHECK - initializemmap(); -#endif - processOptions(); -#endif // #if 0: for recovery and garbage collection - initializeexithandler(); - - // main process of the execution module - if(BAMBOO_NUM_OF_CORE > NUMCORES - 1) { - // non-executing cores, only processing communications - //failedtasks = NULL; - activetasks = NULL; -/*#ifdef PROFILE - BAMBOO_DEBUGPRINT(0xee01); - BAMBOO_DEBUGPRINT_REG(taskInfoIndex); - BAMBOO_DEBUGPRINT_REG(taskInfoOverflow); - profileTaskStart("msg handling"); - } - #endif*/ -#ifdef PROFILE - //isInterrupt = false; -#endif - fakeExecution(); - } else { - /* Create table for failed tasks */ -#if 0 - failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, - (int (*)(void *,void *)) &comparetpd); -#endif // #if 0: for recovery - /* Create queue of active tasks */ - activetasks=genallocatehashtable((unsigned int(*) (void *)) &hashCodetpd, - (int(*) (void *,void *)) &comparetpd); - - /* Process task information */ - processtasks(); - - if(STARTUPCORE == BAMBOO_NUM_OF_CORE) { - /* Create startup object */ - createstartupobject(argc, argv); - } - -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee00); -#endif - - while(true) { +inline void disruntimedata() { #ifdef MULTICORE_GC - // check if need to do GC - gc(NULL); + freeRuntimeHash(pointertbl); +#else + freeRuntimeHash(lockRedirectTbl); + freeRuntimeHash(objRedirectLockTbl); + RUNFREE(locktable.bucket); #endif + genfreehashtable(activetasks); + RUNFREE(currtpd); +} - // check if there are new active tasks can be executed - executetasks(); - -#ifndef INTERRUPT - while(receiveObject() != -1) { - } -#endif - -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee01); -#endif - - // check if there are some pending objects, if yes, enqueue them and executetasks again - tocontinue = false; +bool checkObjQueue(void * sendStall) { + int tocontinue = false; + struct transObjInfo * objInfo = NULL; + int grount = 0; #ifdef PROFILE #ifdef ACCURATEPROFILE - { - bool isChecking = false; - if(!isEmpty(&objqueue)) { - profileTaskStart("objqueue checking"); - isChecking = true; - } +{ + bool isChecking = false; + if(!isEmpty(&objqueue)) { + profileTaskStart("objqueue checking"); + isChecking = true; + } #endif #endif - while(!isEmpty(&objqueue)) { - void * obj = NULL; - BAMBOO_START_CRITICAL_SECTION_OBJ_QUEUE(); + while(!isEmpty(&objqueue)) { + void * obj = NULL; + BAMBOO_START_CRITICAL_SECTION_OBJ_QUEUE(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf001); + BAMBOO_DEBUGPRINT(0xf001); #endif #ifdef PROFILE - //isInterrupt = false; + //isInterrupt = false; #endif #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xeee1); + BAMBOO_DEBUGPRINT(0xeee1); #endif - sendStall = false; - tocontinue = true; - objInfo = (struct transObjInfo *)getItem(&objqueue); - obj = objInfo->objptr; + (*((bool *)sendStall)) = false; + tocontinue = true; + objInfo = (struct transObjInfo *)getItem(&objqueue); + obj = objInfo->objptr; #ifdef DEBUG - BAMBOO_DEBUGPRINT_REG((int)obj); + BAMBOO_DEBUGPRINT_REG((int)obj); #endif - // grab lock and flush the obj - grount = 0; - getwritelock_I(obj); - while(!lockflag) { - BAMBOO_WAITING_FOR_LOCK(); - } - grount = lockresult; + // grab lock and flush the obj + grount = 0; + getwritelock_I(obj); + while(!lockflag) { + BAMBOO_WAITING_FOR_LOCK(); + } + grount = lockresult; #ifdef DEBUG - BAMBOO_DEBUGPRINT_REG(grount); + BAMBOO_DEBUGPRINT_REG(grount); #endif - lockresult = 0; - lockobj = 0; - lock2require = 0; - lockflag = false; + lockresult = 0; + lockobj = 0; + lock2require = 0; + lockflag = false; #ifndef INTERRUPT - reside = false; + reside = false; #endif - if(grount == 1) { - int k = 0; - // flush the object + if(grount == 1) { + int k = 0; + // flush the object #ifdef CACHEFLUSH - BAMBOO_CACHE_FLUSH_RANGE((int)obj,sizeof(int)); - BAMBOO_CACHE_FLUSH_RANGE((int)obj, classsize[((struct ___Object___ *)obj)->type]); + BAMBOO_CACHE_FLUSH_RANGE((int)obj,sizeof(int)); + BAMBOO_CACHE_FLUSH_RANGE((int)obj, classsize[((struct ___Object___ *)obj)->type]); #endif - // enqueue the object - for(k = 0; k < objInfo->length; ++k) { - int taskindex = objInfo->queues[2 * k]; - int paramindex = objInfo->queues[2 * k + 1]; - struct parameterwrapper ** queues = &(paramqueues[BAMBOO_NUM_OF_CORE][taskindex][paramindex]); + // enqueue the object + for(k = 0; k < objInfo->length; ++k) { + int taskindex = objInfo->queues[2 * k]; + int paramindex = objInfo->queues[2 * k + 1]; + struct parameterwrapper ** queues = &(paramqueues[BAMBOO_NUM_OF_CORE][taskindex][paramindex]); #ifdef DEBUG - BAMBOO_DEBUGPRINT_REG(taskindex); - BAMBOO_DEBUGPRINT_REG(paramindex); - struct ___Object___ * tmpptr = (struct ___Object___ *)obj; - tprintf("Process %x(%d): receive obj %x(%lld), ptrflag %x\n", BAMBOO_NUM_OF_CORE, BAMBOO_NUM_OF_CORE, (int)obj, (long)obj, tmpptr->flag); + BAMBOO_DEBUGPRINT_REG(taskindex); + BAMBOO_DEBUGPRINT_REG(paramindex); + struct ___Object___ * tmpptr = (struct ___Object___ *)obj; + tprintf("Process %x(%d): receive obj %x(%lld), ptrflag %x\n", BAMBOO_NUM_OF_CORE, BAMBOO_NUM_OF_CORE, (int)obj, (long)obj, tmpptr->flag); #endif - enqueueObject_I(obj, queues, 1); + enqueueObject_I(obj, queues, 1); #ifdef DEBUG - BAMBOO_DEBUGPRINT_REG(hashsize(activetasks)); + BAMBOO_DEBUGPRINT_REG(hashsize(activetasks)); #endif - } - releasewritelock_I(obj); - RUNFREE(objInfo->queues); - RUNFREE(objInfo); - } else { - // can not get lock - // put it at the end of the queue if no update version in the queue - struct QueueItem * qitem = getHead(&objqueue); - struct QueueItem * prev = NULL; - while(qitem != NULL) { - struct transObjInfo * tmpinfo = (struct transObjInfo *)(qitem->objectptr); - if(tmpinfo->objptr == obj) { - // the same object in the queue, which should be enqueued - // recently. Current one is outdate, do not re-enqueue it - RUNFREE(objInfo->queues); - RUNFREE(objInfo); - goto objqueuebreak; - } else { - prev = qitem; - } - qitem = getNextQueueItem(prev); - } - // try to execute active tasks already enqueued first - addNewItem_I(&objqueue, objInfo); + } + releasewritelock_I(obj); + RUNFREE(objInfo->queues); + RUNFREE(objInfo); + } else { + // can not get lock + // put it at the end of the queue if no update version in the queue + struct QueueItem * qitem = getHead(&objqueue); + struct QueueItem * prev = NULL; + while(qitem != NULL) { + struct transObjInfo * tmpinfo = (struct transObjInfo *)(qitem->objectptr); + if(tmpinfo->objptr == obj) { + // the same object in the queue, which should be enqueued + // recently. Current one is outdate, do not re-enqueue it + RUNFREE(objInfo->queues); + RUNFREE(objInfo); + goto objqueuebreak; + } else { + prev = qitem; + } // if(tmpinfo->objptr == obj) + qitem = getNextQueueItem(prev); + } // while(qitem != NULL) + // try to execute active tasks already enqueued first + addNewItem_I(&objqueue, objInfo); #ifdef PROFILE - //isInterrupt = true; + //isInterrupt = true; #endif objqueuebreak: - BAMBOO_CLOSE_CRITICAL_SECTION_OBJ_QUEUE(); + BAMBOO_CLOSE_CRITICAL_SECTION_OBJ_QUEUE(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); + BAMBOO_DEBUGPRINT(0xf000); #endif - break; - } - BAMBOO_CLOSE_CRITICAL_SECTION_OBJ_QUEUE(); + break; + } // if(grount == 1) + BAMBOO_CLOSE_CRITICAL_SECTION_OBJ_QUEUE(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); + BAMBOO_DEBUGPRINT(0xf000); #endif - } + } // while(!isEmpty(&objqueue)) #ifdef PROFILE #ifdef ACCURATEPROFILE - if(isChecking) { - profileTaskEnd(); - } - } + if(isChecking) { + profileTaskEnd(); + } +} #endif #endif #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee02); + BAMBOO_DEBUGPRINT(0xee02); #endif + return tocontinue; +} - if(!tocontinue) { - // check if stop - if(STARTUPCORE == BAMBOO_NUM_OF_CORE) { - if(isfirst) { -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee03); -#endif - isfirst = false; - } - if((!waitconfirm) || - (waitconfirm && (numconfirm == 0))) { -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee04); - BAMBOO_DEBUGPRINT_REG(waitconfirm); -#endif - BAMBOO_START_CRITICAL_SECTION_STATUS(); +void checkCoreStatue() { + bool allStall = false; + int i = 0; + int sumsendobj = 0; + if((!waitconfirm) || + (waitconfirm && (numconfirm == 0))) { #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf001); + BAMBOO_DEBUGPRINT(0xee04); + BAMBOO_DEBUGPRINT_REG(waitconfirm); #endif - corestatus[BAMBOO_NUM_OF_CORE] = 0; - numsendobjs[BAMBOO_NUM_OF_CORE] = self_numsendobjs; - numreceiveobjs[BAMBOO_NUM_OF_CORE] = self_numreceiveobjs; - // check the status of all cores - allStall = true; + BAMBOO_START_CRITICAL_SECTION_STATUS(); #ifdef DEBUG - BAMBOO_DEBUGPRINT_REG(NUMCORES); + BAMBOO_DEBUGPRINT(0xf001); #endif - for(i = 0; i < NUMCORES; ++i) { + corestatus[BAMBOO_NUM_OF_CORE] = 0; + numsendobjs[BAMBOO_NUM_OF_CORE] = self_numsendobjs; + numreceiveobjs[BAMBOO_NUM_OF_CORE] = self_numreceiveobjs; + // check the status of all cores + allStall = true; #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xe000 + corestatus[i]); + BAMBOO_DEBUGPRINT_REG(NUMCORES); #endif - if(corestatus[i] != 0) { - allStall = false; - break; - } - } - if(allStall) { - // check if the sum of send objs and receive obj are the same - // yes->check if the info is the latest; no->go on executing - sumsendobj = 0; - for(i = 0; i < NUMCORES; ++i) { - sumsendobj += numsendobjs[i]; + for(i = 0; i < NUMCORES; ++i) { #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000 + numsendobjs[i]); + BAMBOO_DEBUGPRINT(0xe000 + corestatus[i]); #endif - } - for(i = 0; i < NUMCORES; ++i) { - sumsendobj -= numreceiveobjs[i]; + if(corestatus[i] != 0) { + allStall = false; + break; + } + } // for(i = 0; i < NUMCORES; ++i) + if(allStall) { + // check if the sum of send objs and receive obj are the same + // yes->check if the info is the latest; no->go on executing + sumsendobj = 0; + for(i = 0; i < NUMCORES; ++i) { + sumsendobj += numsendobjs[i]; +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xf000 + numsendobjs[i]); +#endif + } // for(i = 0; i < NUMCORES; ++i) + for(i = 0; i < NUMCORES; ++i) { + sumsendobj -= numreceiveobjs[i]; +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xf000 + numreceiveobjs[i]); +#endif + } // for(i = 0; i < NUMCORES; ++i) + if(0 == sumsendobj) { + if(!waitconfirm) { + // the first time found all cores stall + // send out status confirm msg to all other cores + // reset the corestatus array too +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xee05); +#endif + corestatus[BAMBOO_NUM_OF_CORE] = 1; + for(i = 1; i < NUMCORES; ++i) { + corestatus[i] = 1; + // send status confirm msg to core i + send_msg_1(i, STATUSCONFIRM); + } // for(i = 1; i < NUMCORES; ++i) + waitconfirm = true; + numconfirm = NUMCORES - 1; + } else { + // all the core status info are the latest + // terminate; for profiling mode, send request to all + // other cores to pour out profiling data #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000 + numreceiveobjs[i]); -#endif - } - if(0 == sumsendobj) { - if(!waitconfirm) { - // the first time found all cores stall - // send out status confirm msg to all other cores - // reset the corestatus array too -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee05); -#endif - corestatus[BAMBOO_NUM_OF_CORE] = 1; - for(i = 1; i < NUMCORES; ++i) { - corestatus[i] = 1; - // send status confirm msg to core i - send_msg_1(i, STATUSCONFIRM); - } - waitconfirm = true; - numconfirm = NUMCORES - 1; - } else { - // all the core status info are the latest - // terminate; for profiling mode, send request to all - // other cores to pour out profiling data -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee06); + BAMBOO_DEBUGPRINT(0xee06); #endif - + #ifdef USEIO - totalexetime = BAMBOO_GET_EXE_TIME(); + totalexetime = BAMBOO_GET_EXE_TIME(); #else - BAMBOO_DEBUGPRINT(BAMBOO_GET_EXE_TIME()); - BAMBOO_DEBUGPRINT_REG(total_num_t6); // TODO for test - BAMBOO_DEBUGPRINT(0xbbbbbbbb); + BAMBOO_DEBUGPRINT(BAMBOO_GET_EXE_TIME()); + BAMBOO_DEBUGPRINT_REG(total_num_t6); // TODO for test + BAMBOO_DEBUGPRINT(0xbbbbbbbb); #endif - // profile mode, send msgs to other cores to request pouring - // out progiling data + // profile mode, send msgs to other cores to request pouring + // out progiling data #ifdef PROFILE - BAMBOO_CLOSE_CRITICAL_SECTION_STATUS(); + BAMBOO_CLOSE_CRITICAL_SECTION_STATUS(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); + BAMBOO_DEBUGPRINT(0xf000); #endif - for(i = 1; i < NUMCORES; ++i) { - // send profile request msg to core i - send_msg_2(i, PROFILEOUTPUT, totalexetime); - } - // pour profiling data on startup core - outputProfileData(); - while(true) { - BAMBOO_START_CRITICAL_SECTION_STATUS(); + for(i = 1; i < NUMCORES; ++i) { + // send profile request msg to core i + send_msg_2(i, PROFILEOUTPUT, totalexetime); + } // for(i = 1; i < NUMCORES; ++i) + // pour profiling data on startup core + outputProfileData(); + while(true) { + BAMBOO_START_CRITICAL_SECTION_STATUS(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf001); + BAMBOO_DEBUGPRINT(0xf001); #endif - profilestatus[BAMBOO_NUM_OF_CORE] = 0; - // check the status of all cores - allStall = true; + profilestatus[BAMBOO_NUM_OF_CORE] = 0; + // check the status of all cores + allStall = true; #ifdef DEBUG - BAMBOO_DEBUGPRINT_REG(NUMCORES); + BAMBOO_DEBUGPRINT_REG(NUMCORES); #endif - for(i = 0; i < NUMCORES; ++i) { -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xe000 + profilestatus[i]); -#endif - if(profilestatus[i] != 0) { - allStall = false; - break; - } - } - if(!allStall) { - int halt = 100; - BAMBOO_CLOSE_CRITICAL_SECTION_STATUS(); -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); -#endif - while(halt--) { - } - } else { - break; - } - } -#endif - terminate(); // All done. - } // if-else of line 364: if(!waitconfirm) - } else { - // still some objects on the fly on the network - // reset the waitconfirm and numconfirm + for(i = 0; i < NUMCORES; ++i) { #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee07); + BAMBOO_DEBUGPRINT(0xe000 + profilestatus[i]); #endif - waitconfirm = false; - numconfirm = 0; - } // if-else of line 363: if(0 == sumsendobj) - } else { - // not all cores are stall, keep on waiting + if(profilestatus[i] != 0) { + allStall = false; + break; + } + } // for(i = 0; i < NUMCORES; ++i) + if(!allStall) { + int halt = 100; + BAMBOO_CLOSE_CRITICAL_SECTION_STATUS(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xee08); + BAMBOO_DEBUGPRINT(0xf000); #endif - waitconfirm = false; - numconfirm = 0; - } // if-else of line 347: if(allStall) - BAMBOO_CLOSE_CRITICAL_SECTION_STATUS(); + while(halt--) { + } + } else { + break; + } // if(!allStall) + } // while(true) +#endif + disruntimedata(); + terminate(); // All done. + } // if(!waitconfirm) + } else { + // still some objects on the fly on the network + // reset the waitconfirm and numconfirm +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xee07); +#endif + waitconfirm = false; + numconfirm = 0; + } // if(0 == sumsendobj) + } else { + // not all cores are stall, keep on waiting +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xee08); +#endif + waitconfirm = false; + numconfirm = 0; + } // if(allStall) + BAMBOO_CLOSE_CRITICAL_SECTION_STATUS(); +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xf000); +#endif + } // if((!waitconfirm) || +} + +// main function for each core +inline void run(void * arg) { + int i = 0; + int argc = 1; + char ** argv = NULL; + bool sendStall = false; + bool isfirst = true; + bool tocontinue = false; + + corenum = BAMBOO_GET_NUM_OF_CORE(); #ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); + BAMBOO_DEBUGPRINT(0xeeee); + BAMBOO_DEBUGPRINT_REG(corenum); + BAMBOO_DEBUGPRINT(STARTUPCORE); #endif - } // if-else of line 320: if((!waitconfirm) || + + // initialize runtime data structures + initruntimedata(); + + // other architecture related initialization + initialization(); + initCommunication(); + + initializeexithandler(); + + // main process of the execution module + if(BAMBOO_NUM_OF_CORE > NUMCORES - 1) { + // non-executing cores, only processing communications + activetasks = NULL; +/*#ifdef PROFILE + BAMBOO_DEBUGPRINT(0xee01); + BAMBOO_DEBUGPRINT_REG(taskInfoIndex); + BAMBOO_DEBUGPRINT_REG(taskInfoOverflow); + profileTaskStart("msg handling"); + } + #endif*/ +#ifdef PROFILE + //isInterrupt = false; +#endif + fakeExecution(); + } else { + /* Create queue of active tasks */ + activetasks=genallocatehashtable((unsigned int(*) (void *)) &hashCodetpd, + (int(*) (void *,void *)) &comparetpd); + + /* Process task information */ + processtasks(); + + if(STARTUPCORE == BAMBOO_NUM_OF_CORE) { + /* Create startup object */ + createstartupobject(argc, argv); + } + +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xee00); +#endif + + while(true) { +#ifdef MULTICORE_GC + // check if need to do GC + gc(NULL); +#endif + + // check if there are new active tasks can be executed + executetasks(); + +#ifndef INTERRUPT + while(receiveObject() != -1) { + } +#endif + +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xee01); +#endif + + // check if there are some pending objects, if yes, enqueue them and executetasks again + tocontinue = checkObjQueue(); + + if(!tocontinue) { + // check if stop + if(STARTUPCORE == BAMBOO_NUM_OF_CORE) { + if(isfirst) { +#ifdef DEBUG + BAMBOO_DEBUGPRINT(0xee03); +#endif + isfirst = false; + } + checkCoreStatus(); } else { if(!sendStall) { #ifdef DEBUG @@ -509,11 +525,11 @@ objqueuebreak: #ifdef DEBUG BAMBOO_DEBUGPRINT(0xee0c); #endif - } // if-else of line 464: if(!sendStall) - } // if-else of line 313: if(STARTUPCORE == BAMBOO_NUM_OF_CORE) - } // if-else of line 311: if(!tocontinue) - } // line 193: while(true) - } // right-bracket for if-else of line 153: if(BAMBOO_NUM_OF_CORE > NUMCORES - 1) + } // if(!sendStall) + } // if(STARTUPCORE == BAMBOO_NUM_OF_CORE) + } // if(!tocontinue) + } // while(true) + } // if(BAMBOO_NUM_OF_CORE > NUMCORES - 1) } // run() @@ -1452,6 +1468,7 @@ msg: BAMBOO_DEBUGPRINT(0xe889); #endif #endif + disruntimedata(); BAMBOO_EXIT(0); break; } @@ -1471,7 +1488,7 @@ msg: #endif #endif #ifdef MULTICORE_GC - if(gcprocess) { + if(gcprocessing) { // is currently doing gc, dump this msg break; } @@ -1505,7 +1522,7 @@ msg: #endif #endif #ifdef MULTICORE_GC - if(gcprocess) { + if(gcprocessing) { // is currently doing gc, dump this msg break; } @@ -1552,8 +1569,6 @@ msg: if(cinstruction == NULL) { cinstruction = (struct compactInstr *)RUNMALLOC(sizeof(struct compactInstr)); - cinstruction->size2move = (int *)RUNMALLOC(sizeof(int) * 2); - cinstruction->dsts = (int*)RUNMALLOC(sizeof(int) * 2); } else { // clean up out of date info cinstruction->movenum = 0; @@ -1568,7 +1583,11 @@ msg: for(i = 0; i < cinstruction->movenum; i++) { cinstruction->size2move[i] = msgdata[startindex++]; cinstruction->dsts[i] = msgdata[startindex++]; + cinstruction->moveflag[i] = 0; + cinstruction->startaddrs[i] = 0; + cinstruction->endaddrs[i] = 0; } + // TODO /*// process large objs num = msgdata[startindex++]; for(i = 0; i < num; i++) { @@ -1623,6 +1642,7 @@ msg: } if(data1 < NUMCORES) { gccorestatus[data1] = 0; + gcloads[data1] = msgdata[2]; } break; } @@ -1697,23 +1717,35 @@ msg: case GCMOVESTART: { // received a start moving objs msg - addNewItem_I(gcdsts, data1); + if(cinstruction == NULL) { + // something is wrong + BAMBOO_EXIT(0xa023); + } + for(i = 0; i < cinstruction->movenum; i++) { + if(cinstruction->dsts[i] == data1) { + // set the flag to indicate the core is ready to accept objs + cinstruction->moveflag[i] = 1; + cinstruction->startaddrs[i] = msgdata[2]; + cinstruction->endaddrs[i] = msgdata[3]; + } + } tomove = true; break; } case GCMAPREQUEST: { // received a mapping info request msg - void * dstptr = gengettable(pointertbl, data1); + void * dstptr = NULL; + RuntimeHashget(pointertbl, data1, &dstptr); if(NULL == dstptr) { // no such pointer in this core, something is wrong BAMBOO_EXIT(0xb008); } else { // send back the mapping info if(isMsgSending) { - cache_msg_3(msgdata[2], GCMAPINFO, data1, dstptr); + cache_msg_3(msgdata[2], GCMAPINFO, data1, (int)dstptr); } else { - send_msg_3(msgdata[2], GCMAPINFO,data1, dstptr); + send_msg_3(msgdata[2], GCMAPINFO,data1, (int)dstptr); } } break; @@ -1726,7 +1758,7 @@ msg: BAMBOO_EXIT(0xb009); } else { mappedobj = msgdata[2]; - genputtable(pointertbl, obj2map, mappedobj); + RuntimeHashadd(pointertbl, obj2map, mappedobj); } ismapped = true; break; @@ -1792,8 +1824,6 @@ ent enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *pr //int numparams=parameter->task->numParameters; int numiterators=parameter->task->numTotal-1; int retval=1; - //int addnormal=1; - //int adderror=1; struct taskdescriptor * task=parameter->task; @@ -2006,75 +2036,15 @@ void executetasks() { int andmask=0; int checkmask=0; -#if 0 - /* Set up signal handlers */ - struct sigaction sig; - sig.sa_sigaction=&myhandler; - sig.sa_flags=SA_SIGINFO; - sigemptyset(&sig.sa_mask); - - /* Catch bus errors, segmentation faults, and floating point exceptions*/ - sigaction(SIGBUS,&sig,0); - sigaction(SIGSEGV,&sig,0); - sigaction(SIGFPE,&sig,0); - sigaction(SIGPIPE,&sig,0); -#endif // #if 0: non-multicore - -#if 0 - /* Zero fd set */ - FD_ZERO(&readfds); -#endif -#ifndef MULTICORE - maxreadfd=0; -#endif -#if 0 - fdtoobject=allocateRuntimeHash(100); -#endif - -#if 0 - /* Map first block of memory to protected, anonymous page */ - mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0); -#endif newtask: -#ifdef MULTICORE while(hashsize(activetasks)>0) { #ifdef MULTICORE_GC gc(NULL); #endif -#else - while((hashsize(activetasks)>0)||(maxreadfd>0)) { -#endif #ifdef DEBUG BAMBOO_DEBUGPRINT(0xe990); #endif -#if 0 - /* Check if any filedescriptors have IO pending */ - if (maxreadfd>0) { - int i; - struct timeval timeout={0,0}; - fd_set tmpreadfds; - int numselect; - tmpreadfds=readfds; - numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout); - if (numselect>0) { - /* Process ready fd's */ - int fd; - for(fd=0; fd0) { @@ -2219,22 +2189,6 @@ newtask: // flush the object #ifdef CACHEFLUSH BAMBOO_CACHE_FLUSH_RANGE((int)parameter, classsize[((struct ___Object___ *)parameter)->type]); - /* - BAMBOO_START_CRITICAL_SECTION_LOCK(); -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf001); -#endif - if(RuntimeHashcontainskey(objRedirectLockTbl, (int)parameter)) { - int redirectlock_r = 0; - RuntimeHashget(objRedirectLockTbl, (int)parameter, &redirectlock_r); - ((struct ___Object___ *)parameter)->lock = redirectlock_r; - RuntimeHashremovekey(objRedirectLockTbl, (int)parameter); - } - BAMBOO_CLOSE_CRITICAL_SECTION_LOCK(); -#ifdef DEBUG - BAMBOO_DEBUGPRINT(0xf000); -#endif -*/ #endif tmpparam = (struct ___Object___ *)parameter; pd=currtpd->task->descriptorarray[i]; @@ -2328,45 +2282,6 @@ parameterpresent: } { -#if 0 -#ifndef RAW - /* Checkpoint the state */ - forward=allocateRuntimeHash(100); - reverse=allocateRuntimeHash(100); - //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse); -#endif -#endif // #if 0: for recovery -#ifndef MULTICORE - if (x=setjmp(error_handler)) { - //int counter; - /* Recover */ -#ifdef DEBUG -#ifndef MULTICORE - printf("Fatal Error=%d, Recovering!\n",x); -#endif -#endif -#if 0 - genputtable(failedtasks,currtpd,currtpd); - //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse); - - freeRuntimeHash(forward); - freeRuntimeHash(reverse); - freemalloc(); - forward=NULL; - reverse=NULL; -#endif // #if 0: for recovery - BAMBOO_DEBUGPRINT_REG(x); - BAMBOO_EXIT(0xa022); - } else { -#endif // #ifndef MULTICORE -#if 0 - if (injectfailures) { - if ((((double)random())/RAND_MAX)task->name); - longjmp(error_handler,10); - } - } -#endif // #if 0: for recovery /* Actually call task */ #ifdef MULTICORE_GC ((int *)taskpointerarray)[0]=currtpd->numParameters; @@ -2435,28 +2350,15 @@ execute: profileTaskEnd(); #endif -#if 0 - freeRuntimeHash(forward); - freeRuntimeHash(reverse); - freemalloc(); -#endif // Free up task parameter descriptor RUNFREE(currtpd->parameterArray); RUNFREE(currtpd); -#if 0 - forward=NULL; - reverse=NULL; -#endif #ifdef DEBUG BAMBOO_DEBUGPRINT(0xe99a); - //BAMBOO_DEBUGPRINT_REG(hashsize(activetasks)); -#endif -#ifndef MULTICORE - } // line 2946: if (x=setjmp(error_handler)) #endif - } // line2936: - } // line 2697: if (hashsize(activetasks)>0) - } // line 2659: while(hashsize(activetasks)>0) + } // + } // if (hashsize(activetasks)>0) + } // while(hashsize(activetasks)>0) #ifdef DEBUG BAMBOO_DEBUGPRINT(0xe99b); #endif