From: adash Date: Fri, 1 Aug 2008 17:48:33 +0000 (+0000) Subject: remove the system.clearPrefetchCache call X-Git-Tag: preEdgeChange~46 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=f2332431429eb9750e12cf241c61f5c1a9b6438c;p=IRC.git remove the system.clearPrefetchCache call bug fix for openning large number of sockets (use socket pool and recycle sockets) bug fix for updating prefetch cache using version increment for modified objects fix compile errors by moving #include for plookup.h --- diff --git a/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3dNold.java b/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3dNold.java index daf334ef..d2769e7e 100644 --- a/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3dNold.java +++ b/Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3dNold.java @@ -37,7 +37,7 @@ public class Em3d extends Thread **/ private boolean printMsgs; - int numThreads; + int numThreads; BiGraph bg; int upperlimit; @@ -82,7 +82,6 @@ public class Em3d extends Thread } Barrier.enterBarrier(barr); - System.clearPrefetchCache(); /* for hNodes */ atomic { @@ -94,7 +93,6 @@ public class Em3d extends Thread } } Barrier.enterBarrier(barr); - System.clearPrefetchCache(); } } @@ -112,11 +110,11 @@ public class Em3d extends Thread long start0 = System.currentTimeMillis(); int numThreads = em.numThreads; int[] mid = new int[4]; - mid[0] = (128<<24)|(195<<16)|(175<<8)|69; - mid[1] = (128<<24)|(195<<16)|(175<<8)|80; - mid[2] = (128<<24)|(195<<16)|(175<<8)|73; - mid[3] = (128<<24)|(195<<16)|(175<<8)|78; - System.printString("DEBUG -> numThreads = " + numThreads+"\n"); + mid[0] = (128<<24)|(195<<16)|(175<<8)|79; + mid[1] = (128<<24)|(195<<16)|(175<<8)|73; + mid[2] = (128<<24)|(195<<16)|(175<<8)|78; + mid[3] = (128<<24)|(195<<16)|(175<<8)|69; + //System.printString("DEBUG -> numThreads = " + numThreads+"\n"); Barrier mybarr; BiGraph graph; Random rand = new Random(783); diff --git a/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java b/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java index 273155a7..11741f1d 100644 --- a/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java +++ b/Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java @@ -93,9 +93,9 @@ public class JGFSORBench { SORRunner tmp; int[] mid = new int[4]; mid[0] = (128<<24)|(195<<16)|(175<<8)|79; - mid[1] = (128<<24)|(195<<16)|(175<<8)|80; - mid[2] = (128<<24)|(195<<16)|(175<<8)|73; - mid[3] = (128<<24)|(195<<16)|(175<<8)|78; + mid[1] = (128<<24)|(195<<16)|(175<<8)|73; + mid[2] = (128<<24)|(195<<16)|(175<<8)|78; + mid[3] = (128<<24)|(195<<16)|(175<<8)|69; for(int i=1;irec; + unsigned int size = rec->lookupTable->size; + chashlistnode_t *ptr = rec->lookupTable->table; + int i; + for(i = 0; i < size; i++) { + chashlistnode_t *curr = &ptr[i]; //for each entry in the cache lookupTable + while(curr != NULL) { + if(curr->key == 0) + break; + objheader_t *header1, *header2; + if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) { + /* Not found in local machine's object store and found in prefetch cache */ + /* Remove from prefetch cache */ + prehashRemove(curr->key); + } + curr = curr->next; + } + } +} + +/* This function updates the prefetch cache with + * entires from the transaction cache when a + * transaction commits + * Return -1 on error else returns 0 */ +int updatePrefetchCache(thread_data_array_t* tdata) { + plistnode_t *pile = tdata->pilehead; + while(pile != NULL) { + if(pile->mid != myIpAddr) { //Not local machine + int retval; + char oidType; + oidType = 'R'; + if((retval = copyToCache(pile->numread, (unsigned int *)(pile->objread), tdata, oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + oidType = 'M'; + if((retval = copyToCache(pile->nummod, pile->oidmod, tdata, oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + } + pile = pile->next; + } + return 0; +} + +int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata, char oidType) { + int i; + for (i = 0; i < numoid; i++) { + unsigned int oid; + if(oidType == 'R') { + char * objread = (char *) oidarray; + oid = *((unsigned int *)(objread+(sizeof(unsigned int)+ + sizeof(unsigned short))*i)); + } else { + oid = oidarray[i]; + } + pthread_mutex_lock(&prefetchcache_mutex); + objheader_t *header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); + //copy into prefetch cache + int size; + GETSIZE(size, header); + objheader_t * newAddr; + if((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) { + printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__, + __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + return -1; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(newAddr, header, size+sizeof(objheader_t)); + //Increment version for every modified object + if(oidType == 'M') { + newAddr->version += 1; + } + //make an entry in prefetch lookup hashtable + void *oldptr; + if((oldptr = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + prehashInsert(oid, newAddr); + } else { + prehashInsert(oid, newAddr); + } + } //end of for + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h index e33e3766..a335c0ac 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h @@ -1,6 +1,10 @@ #ifndef _ADDPREFETCHENHANCE_H_ #define _ADDPREFETCHENHANCE_H_ +#include "dstm.h" +#include "mlookup.h" +#include "gCollect.h" + typedef struct prefetchCountStats { int retrycount; /* keeps track of when to retry and check if we can turn on this prefetch site */ int uselesscount; /* keeps track of how long was the prefetching at site useles */ @@ -13,5 +17,8 @@ int getRetryCount(int siteid); int getUselessCount(int siteid); char getOperationMode(int); void handleDynPrefetching(int, int, int); +void cleanPCache(thread_data_array_t *tdata); +int updatePrefetchCache(thread_data_array_t *); +int copyToCache(int , unsigned int *, thread_data_array_t *, char ); #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index e2ad79cf..11927ab1 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -58,8 +58,8 @@ #define LISTEN_PORT 2156 #define UDP_PORT 2158 //Prefetch tuning paramters -#define RETRYINTERVAL 100 //N -#define SHUTDOWNINTERVAL 1 //M +#define RETRYINTERVAL 7 //N +#define SHUTDOWNINTERVAL 4 //M #include #include @@ -81,9 +81,8 @@ #include #include #include "sockpool.h" -#include "prelookup.h" #include -#include "addPrefetchEnhance.h" +#include "plookup.h" //bit designations for status field of objheader #define DIRTY 0x01 @@ -209,7 +208,7 @@ typedef struct trans_commit_data{ typedef struct thread_data_array { int thread_id; int mid; - trans_req_data_t *buffer; /* Holds trans request information sent to participants */ + trans_req_data_t *buffer; /* Holds trans request information sent to a participant, based on threadid */ thread_response_t *recvmsg; /* Shared datastructure to keep track of the participants response to a trans request */ pthread_cond_t *threshold; /* Condition var to waking up a thread */ pthread_mutex_t *lock; /* Lock for counting participants response */ @@ -217,6 +216,7 @@ typedef struct thread_data_array { char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */ char *replyretry; /* Shared variable that keep track if coordinator needs retry */ transrecord_t *rec; /* To send modified objects */ + plistnode_t *pilehead; /* Shared variable, ptr to the head of the machine piles for the transaction rec */ } thread_data_array_t; @@ -296,9 +296,7 @@ void sendPrefetchReqnew(prefetchpile_t*, int); int getPrefetchResponse(int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); -int updatePrefetchCache(thread_data_array_t *, int, char); - - +plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs); /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid); diff --git a/Robust/src/Runtime/DSTM/interface/gCollect.c b/Robust/src/Runtime/DSTM/interface/gCollect.c index 3d1371b3..fed70e2d 100644 --- a/Robust/src/Runtime/DSTM/interface/gCollect.c +++ b/Robust/src/Runtime/DSTM/interface/gCollect.c @@ -1,4 +1,5 @@ #include "gCollect.h" +#include "prelookup.h" extern objstr_t *prefetchcache; //Global Prefetch cache extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index aafebf0a..683f11d4 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -40,73 +40,6 @@ plistnode_t *pCreate(int objects) { return pile; } -/* This function inserts necessary information into - * a machine pile data structure */ -plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { - plistnode_t *ptr, *tmp; - int found = 0, offset = 0; - - tmp = pile; - //Add oid into a machine that is already present in the pile linked list structure - while(tmp != NULL) { - if (tmp->mid == mid) { - int tmpsize; - - if (STATUS(headeraddr) & NEW) { - tmp->oidcreated[tmp->numcreated] = OID(headeraddr); - tmp->numcreated++; - GETSIZE(tmpsize, headeraddr); - tmp->sum_bytes += sizeof(objheader_t) + tmpsize; - }else if (STATUS(headeraddr) & DIRTY) { - tmp->oidmod[tmp->nummod] = OID(headeraddr); - tmp->nummod++; - GETSIZE(tmpsize, headeraddr); - tmp->sum_bytes += sizeof(objheader_t) + tmpsize; - } else { - offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; - *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr); - offset += sizeof(unsigned int); - *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version; - tmp->numread ++; - } - found = 1; - break; - } - tmp = tmp->next; - } - //Add oid for any new machine - if (!found) { - int tmpsize; - if((ptr = pCreate(num_objs)) == NULL) { - return NULL; - } - ptr->mid = mid; - if (STATUS(headeraddr) & NEW) { - ptr->oidcreated[ptr->numcreated] = OID(headeraddr); - ptr->numcreated ++; - GETSIZE(tmpsize, headeraddr); - ptr->sum_bytes += sizeof(objheader_t) + tmpsize; - } else if (STATUS(headeraddr) & DIRTY) { - ptr->oidmod[ptr->nummod] = OID(headeraddr); - ptr->nummod ++; - GETSIZE(tmpsize, headeraddr); - ptr->sum_bytes += sizeof(objheader_t) + tmpsize; - } else { - *((unsigned int *)ptr->objread)=OID(headeraddr); - offset = sizeof(unsigned int); - *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version; - ptr->numread ++; - } - ptr->next = pile; - pile = ptr; - } - - /* Clear Flags */ - STATUS(headeraddr) =0; - - return pile; -} - //Count the number of machine piles int pCount(plistnode_t *pile) { plistnode_t *tmp; diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index fdcfc535..4d15b4a7 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -3,7 +3,6 @@ #include #include -#include "dstm.h" /* This structure is created using a transaction record. * It is filled out with pile information necessary for @@ -21,7 +20,6 @@ typedef struct plistnode { } plistnode_t; plistnode_t *pCreate(int); -plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs); int pCount(plistnode_t *pile); int pListMid(plistnode_t *pile, unsigned int *list); void pDelete(plistnode_t *pile); diff --git a/Robust/src/Runtime/DSTM/interface/signal.c b/Robust/src/Runtime/DSTM/interface/signal.c index 062e8a0f..dae52b8d 100644 --- a/Robust/src/Runtime/DSTM/interface/signal.c +++ b/Robust/src/Runtime/DSTM/interface/signal.c @@ -1,4 +1,5 @@ #include "dstm.h" +#include "addPrefetchEnhance.h" #include extern int numTransAbort; diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.c b/Robust/src/Runtime/DSTM/interface/sockpool.c index 2cb2a241..27d2e953 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface/sockpool.c @@ -101,6 +101,8 @@ int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { UnLock(&sockhash->mylock); if((sd = createNewSocket(mid)) != -1) { socknode_t *inusenode = calloc(1, sizeof(socknode_t)); + inusenode->sd = sd; + inusenode->mid = mid; insToListWithLock(sockhash, inusenode); return sd; } else { @@ -159,6 +161,33 @@ int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) { } } +/*socket pool with multiple TR threads asking to connect to same machine */ +int getSock2WithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { + socknode_t **ptr; + int key = mid%(sockhash->size); + int sd; + + Lock(&sockhash->mylock); + ptr=&(sockhash->table[key]); + while(*ptr!=NULL) { + if (mid == (*ptr)->mid) { + UnLock(&sockhash->mylock); + return (*ptr)->sd; + } + ptr=&((*ptr)->next); + } + UnLock(&sockhash->mylock); + if((sd = createNewSocket(mid)) != -1) { + *ptr=calloc(1, sizeof(socknode_t)); + (*ptr)->mid=mid; + (*ptr)->sd=sd; + //insToListWithLock(sockhash, *ptr); + return sd; + } else { + return -1; + } +} + void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) { Lock(&sockhash->mylock); inusenode->next = sockhash->inuse; diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.h b/Robust/src/Runtime/DSTM/interface/sockpool.h index be392e47..212c426a 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.h +++ b/Robust/src/Runtime/DSTM/interface/sockpool.h @@ -23,6 +23,7 @@ typedef struct sockPoolHashTable { sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int); int getSock(sockPoolHashTable_t *, unsigned int); int getSock2(sockPoolHashTable_t *, unsigned int); +int getSock2WithLock(sockPoolHashTable_t *h, unsigned int); int getSockWithLock(sockPoolHashTable_t *, unsigned int); void freeSock(sockPoolHashTable_t *, unsigned int, int); void freeSockWithLock(sockPoolHashTable_t *, unsigned int, int); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d706678b..087c208a 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -9,6 +9,7 @@ #include "threadnotify.h" #include "queue.h" #include "addUdpEnhance.h" +#include "addPrefetchEnhance.h" #include "gCollect.h" #ifdef COMPILER #include "thread.h" @@ -40,6 +41,7 @@ unsigned int oidMax; sockPoolHashTable_t *transReadSockPool; sockPoolHashTable_t *transPrefetchSockPool; +sockPoolHashTable_t *transRequestSockPool; pthread_mutex_t notifymutex; pthread_mutex_t atomicObjLock; @@ -180,6 +182,7 @@ int dstmStartup(const char * option) { //Initialize socket pool transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1); transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1); + transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1); dstmInit(); transInit(); @@ -483,7 +486,6 @@ int transCommit(transrecord_t *record) { pthread_mutex_t tlshrd; thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t)); - ltdata = calloc(1, sizeof(local_thread_data_array_t)); thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ @@ -520,6 +522,7 @@ int transCommit(transrecord_t *record) { thread_data_array[threadnum].replyctrl = &treplyctrl; thread_data_array[threadnum].replyretry = &treplyretry; thread_data_array[threadnum].rec = record; + thread_data_array[threadnum].pilehead = pile_ptr; /* If local do not create any extra connection */ if(pile->mid != myIpAddr) { /* Not local */ do { @@ -639,25 +642,11 @@ void *transRequest(void *threadarg) { objheader_t *headeraddr; char control, recvcontrol; char machineip[16], retval; - + tdata = (thread_data_array_t *) threadarg; - - /* Send Trans Request */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) <= 0) { - printf("transRequest():error %d\n", errno); - perror("transRequest() socket error"); - pthread_exit(NULL); - } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - serv_addr.sin_addr.s_addr = htonl(tdata->mid); - - /* Open Connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - printf("transRequest():error %d, sd= %d\n", errno, sd); - perror("transRequest() connect"); - close(sd); + + if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) { + printf("transRequest(): socket create error\n"); pthread_exit(NULL); } @@ -695,7 +684,6 @@ void *transRequest(void *threadarg) { pthread_mutex_lock(&prefetchcache_mutex); if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) { printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); - close(sd); pthread_exit(NULL); } pthread_mutex_unlock(&prefetchcache_mutex); @@ -744,7 +732,6 @@ void *transRequest(void *threadarg) { * to all participants in their respective socket */ if (sendResponse(tdata, sd) == 0) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); - close(sd); pthread_exit(NULL); } @@ -757,9 +744,6 @@ void *transRequest(void *threadarg) { } else { //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); } - - /* Close connection */ - close(sd); pthread_exit(NULL); } @@ -796,30 +780,17 @@ void decideResponse(thread_data_array_t *tdata) { *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 0; /* clear objects from prefetch cache */ - for (i = 0; i < tdata->buffer->f.numread; i++) { - prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); - } - for (i = 0; i < tdata->buffer->f.nummod; i++) { - prehashRemove(tdata->buffer->oidmod[i]); - } + cleanPCache(tdata); } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; *(tdata->replyretry) = 0; - /* update prefetch cache */ - /* For objects read */ - char oidType; int retval; - oidType = 'R'; - if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) { - printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); - return; - } - oidType = 'M'; - if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) { + if((retval = updatePrefetchCache(tdata)) != 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); return; } + /* Invalidate objects in other machine cache */ if(tdata->buffer->f.nummod > 0) { if((retval = invalidateObj(tdata)) != 0) { @@ -835,47 +806,6 @@ void decideResponse(thread_data_array_t *tdata) { return; } -/* This function updates the prefetch cache when commiting objects - * based on the type of oid i.e. if oid is read or oid is modified - * Return -1 on error else returns 0 - */ -int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) { - int i; - for (i = 0; i < numoid; i++) { - //find address object - objheader_t *header, *newAddr; - int size; - unsigned int oid; - if(oidType == 'R') { - oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); - } else { - oid = tdata->buffer->oidmod[i]; - } - pthread_mutex_lock(&prefetchcache_mutex); - header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); - header->version += 1; - //copy object into prefetch cache - GETSIZE(size, header); - if ((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) { - printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); - pthread_mutex_unlock(&prefetchcache_mutex); - return -1; - } - pthread_mutex_unlock(&prefetchcache_mutex); - memcpy(newAddr, header, (size + sizeof(objheader_t))); - //make an entry in prefetch hash table - void *oldptr; - if((oldptr = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - prehashInsert(oid, newAddr); - } else { - prehashInsert(oid, newAddr); - } - } - return 0; -} - - /* This function sends the final response to remote machines per * thread in their respective socket id It returns a char that is only * needed to check the correctness of execution of this function @@ -1741,3 +1671,70 @@ void transAbort(transrecord_t *trans) { chashDelete(trans->lookupTable); free(trans); } + +/* This function inserts necessary information into + * a machine pile data structure */ +plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { + plistnode_t *ptr, *tmp; + int found = 0, offset = 0; + + tmp = pile; + //Add oid into a machine that is already present in the pile linked list structure + while(tmp != NULL) { + if (tmp->mid == mid) { + int tmpsize; + + if (STATUS(headeraddr) & NEW) { + tmp->oidcreated[tmp->numcreated] = OID(headeraddr); + tmp->numcreated++; + GETSIZE(tmpsize, headeraddr); + tmp->sum_bytes += sizeof(objheader_t) + tmpsize; + }else if (STATUS(headeraddr) & DIRTY) { + tmp->oidmod[tmp->nummod] = OID(headeraddr); + tmp->nummod++; + GETSIZE(tmpsize, headeraddr); + tmp->sum_bytes += sizeof(objheader_t) + tmpsize; + } else { + offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; + *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr); + offset += sizeof(unsigned int); + *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version; + tmp->numread ++; + } + found = 1; + break; + } + tmp = tmp->next; + } + //Add oid for any new machine + if (!found) { + int tmpsize; + if((ptr = pCreate(num_objs)) == NULL) { + return NULL; + } + ptr->mid = mid; + if (STATUS(headeraddr) & NEW) { + ptr->oidcreated[ptr->numcreated] = OID(headeraddr); + ptr->numcreated ++; + GETSIZE(tmpsize, headeraddr); + ptr->sum_bytes += sizeof(objheader_t) + tmpsize; + } else if (STATUS(headeraddr) & DIRTY) { + ptr->oidmod[ptr->nummod] = OID(headeraddr); + ptr->nummod ++; + GETSIZE(tmpsize, headeraddr); + ptr->sum_bytes += sizeof(objheader_t) + tmpsize; + } else { + *((unsigned int *)ptr->objread)=OID(headeraddr); + offset = sizeof(unsigned int); + *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version; + ptr->numread ++; + } + ptr->next = pile; + pile = ptr; + } + + /* Clear Flags */ + STATUS(headeraddr) =0; + + return pile; +}