From dec769b268a0063ce1e1360d698704aa62e77aef Mon Sep 17 00:00:00 2001 From: adash Date: Sat, 31 May 2008 01:28:18 +0000 Subject: [PATCH] changes for prefetch objects on a transaction abort and updating the prefetch cache when transactions commit --- Robust/src/Runtime/DSTM/interface/dht.c | 1 - Robust/src/Runtime/DSTM/interface/dstm.h | 11 +- .../src/Runtime/DSTM/interface/dstmserver.c | 224 ++++++++++-------- Robust/src/Runtime/DSTM/interface/trans.c | 102 ++++++-- 4 files changed, 218 insertions(+), 120 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index e0ef4445..cccbcfcd 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -39,7 +39,6 @@ #include #include #include -#include #include #include "dht.h" #include "clookup.h" //this works for now, do we need anything better? diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 98f60f86..2bb46769 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -63,8 +63,11 @@ #include "queue.h" #include "mcpileq.h" #include "threadnotify.h" +#include #include #include +#include +#include #include #include #include @@ -182,9 +185,11 @@ typedef struct trans_req_data { typedef struct trans_commit_data{ unsigned int *objlocked; /* Pointer to array holding oids of objects locked inside a transaction */ unsigned int *objnotfound; /* Pointer to array holding oids of objects not found on the participant machine */ + unsigned int *objvernotmatch; /* Pointer to array holding oids whose version doesn't match on the participant machine */ void *modptr; /* Pointer to the address in the mainobject store of the participant that holds all modified objects */ int numlocked; /* no of objects locked */ int numnotfound; /* no of objects not found */ + int numvernotmatch; /* no of objects whose version doesn't match */ } trans_commit_data_t; @@ -213,7 +218,7 @@ typedef struct local_thread_data_array { //Structure to store mid and socketid information typedef struct midSocketInfo { - unsigned int mid; /* To communicate with mid use sockid in this data structure*/ + unsigned int mid; /* To communicate with mid use sockid in this data structure */ int sockid; } midSocketInfo_t; @@ -280,6 +285,10 @@ void sendPrefetchReq(prefetchpile_t*, int); int getPrefetchResponse(int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); +int updatePrefetchCache(thread_data_array_t *, int, char); + + + /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid); void threadNotify(unsigned int oid, unsigned short version, unsigned int tid); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index e1ec8c92..4a35c19f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -143,35 +143,34 @@ void *dstmAccept(void *acceptfd) { /* Read oid requested and search if available */ recv_data((int)acceptfd, &oid, sizeof(unsigned int)); if((srcObj = mhashSearch(oid)) == NULL) { - printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); - break; + printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); + break; } h = (objheader_t *) srcObj; GETSIZE(size, h); size += sizeof(objheader_t); sockid = (int) acceptfd; - if (h == NULL) { - ctrl = OBJECT_NOT_FOUND; - send_data(sockid, &ctrl, sizeof(char)); + ctrl = OBJECT_NOT_FOUND; + send_data(sockid, &ctrl, sizeof(char)); } else { - /* Type */ - char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; - *((int *)&msg[1])=size; - send_data(sockid, &msg, sizeof(msg)); - send_data(sockid, h, size); + // Type + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + send_data(sockid, &msg, sizeof(msg)); + send_data(sockid, h, size); } break; - + case READ_MULT_REQUEST: break; - + case MOVE_REQUEST: break; - + case MOVE_MULT_REQUEST: break; - + case TRANS_REQUEST: /* Read transaction request */ transinfo.objlocked = NULL; @@ -180,20 +179,20 @@ void *dstmAccept(void *acceptfd) { transinfo.numlocked = 0; transinfo.numnotfound = 0; if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { - printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); + printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); + pthread_exit(NULL); } break; case TRANS_PREFETCH: if((val = prefetchReq((int)acceptfd)) != 0) { - printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); - break; + printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); + break; } break; case TRANS_PREFETCH_RESPONSE: if((val = getPrefetchResponse((int) acceptfd)) != 0) { - printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); - break; + printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); + break; } break; case START_REMOTE_THREAD: @@ -201,17 +200,17 @@ void *dstmAccept(void *acceptfd) { objType = getObjType(oid); startDSMthread(oid, objType); break; - + case THREAD_NOTIFY_REQUEST: recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - + recv_data((int)acceptfd, buffer, size); - + oidarry = calloc(numoid, sizeof(unsigned int)); memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); size = sizeof(unsigned int) * numoid; @@ -223,18 +222,18 @@ void *dstmAccept(void *acceptfd) { threadid = *((unsigned int *)(buffer+size)); processReqNotify(numoid, oidarry, versionarry, mid, threadid); free(buffer); - + break; case THREAD_NOTIFY_RESPONSE: size = sizeof(unsigned short) + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - + recv_data((int)acceptfd, buffer, size); - + oid = *((unsigned int *)buffer); size = sizeof(unsigned int); version = *((unsigned short *)(buffer+size)); @@ -252,56 +251,56 @@ void *dstmAccept(void *acceptfd) { } } - closeconnection: +closeconnection: /* Close connection */ if (close((int)acceptfd) == -1) perror("close"); pthread_exit(NULL); } - + /* This function reads the information available in a transaction request * and makes a function call to process the request */ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { - char *ptr; - void *modptr; - unsigned int *oidmod, oid; - fixed_data_t fixed; - objheader_t *headaddr; - int sum, i, size, n, val; - - oidmod = NULL; - - /* Read fixed_data_t data structure */ - size = sizeof(fixed) - 1; - ptr = (char *)&fixed;; - fixed.control = TRANS_REQUEST; - recv_data((int)acceptfd, ptr+1, size); - - /* Read list of mids */ - int mcount = fixed.mcount; - size = mcount * sizeof(unsigned int); - unsigned int listmid[mcount]; - ptr = (char *) listmid; - recv_data((int)acceptfd, ptr, size); - - /* Read oid and version tuples for those objects that are not modified in the transaction */ - int numread = fixed.numread; - size = numread * (sizeof(unsigned int) + sizeof(unsigned short)); - char objread[size]; - if(numread != 0) { //If pile contains more than one object to be read, - // keep reading all objects - recv_data((int)acceptfd, objread, size); - } - - /* Read modified objects */ - if(fixed.nummod != 0) { - if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) { - printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__); - return 1; - } - size = fixed.sum_bytes; - recv_data((int)acceptfd, modptr, size); - } + char *ptr; + void *modptr; + unsigned int *oidmod, oid; + fixed_data_t fixed; + objheader_t *headaddr; + int sum, i, size, n, val; + + oidmod = NULL; + + /* Read fixed_data_t data structure */ + size = sizeof(fixed) - 1; + ptr = (char *)&fixed;; + fixed.control = TRANS_REQUEST; + recv_data((int)acceptfd, ptr+1, size); + + /* Read list of mids */ + int mcount = fixed.mcount; + size = mcount * sizeof(unsigned int); + unsigned int listmid[mcount]; + ptr = (char *) listmid; + recv_data((int)acceptfd, ptr, size); + + /* Read oid and version tuples for those objects that are not modified in the transaction */ + int numread = fixed.numread; + size = numread * (sizeof(unsigned int) + sizeof(unsigned short)); + char objread[size]; + if(numread != 0) { //If pile contains more than one object to be read, + // keep reading all objects + recv_data((int)acceptfd, objread, size); + } + + /* Read modified objects */ + if(fixed.nummod != 0) { + if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) { + printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__); + return 1; + } + size = fixed.sum_bytes; + recv_data((int)acceptfd, modptr, size); + } /* Create an array of oids for modified objects */ oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int)); @@ -405,7 +404,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, if (transinfo->objnotfound != NULL) { free(transinfo->objnotfound); } - return 0; } @@ -416,16 +414,17 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne unsigned short version; char control = 0, *ptr; unsigned int oid; - unsigned int *oidnotfound, *oidlocked; + unsigned int *oidnotfound, *oidlocked, *oidvernotmatch; void *mobj; objheader_t *headptr; /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); - int objnotfound = 0, objlocked = 0; + oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); + int objnotfound = 0, objlocked = 0, objvernotmatch = 0; int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - + int numBytes = 0; /* modptr points to the beginning of the object store * created at the Pariticipant. * Object store holds the modified objects involved in the transaction request */ @@ -462,20 +461,14 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne v_matchlock++; } else {/* If versions don't match ...HARD ABORT */ v_nomatch++; + oidvernotmatch[objvernotmatch] = oid; + objvernotmatch++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + numBytes += size; /* Send TRANS_DISAGREE to Coordinator */ control = TRANS_DISAGREE; - if (objlocked > 0) { - for(j = 0; j < objlocked; j++) { - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - UnLock(STATUSPTR(headptr)); - } - free(oidlocked); - } - send_data(acceptfd, &control, sizeof(char)); - return control; } } else {/* If Obj is not locked then lock object */ /* Save all object oids that are locked on this machine during this transaction request call */ @@ -485,26 +478,50 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne v_matchnolock++; } else { /* If versions don't match ...HARD ABORT */ v_nomatch++; + oidvernotmatch[objvernotmatch] = oid; + objvernotmatch++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + numBytes += size; control = TRANS_DISAGREE; - if (objlocked > 0) { - for(j = 0; j < objlocked; j++) { - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - UnLock(STATUSPTR(headptr)); - } - free(oidlocked); - } - - /* Send TRANS_DISAGREE to Coordinator */ - send_data(acceptfd, &control, sizeof(char)); - return control; } } } } + /* send TRANS_DISAGREE and objs*/ + if(v_nomatch > 0) { + char *objs = calloc(1, numBytes); + int j, offset = 0; + for(j = 0; j 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + UnLock(STATUSPTR(headptr)); + } + free(oidlocked); + } + send_data(acceptfd, &control, sizeof(char)); + send_data(acceptfd, &numBytes, sizeof(int)); + send_data(acceptfd, objs, numBytes); + transinfo->objvernotmatch = oidvernotmatch; + transinfo->numvernotmatch = objvernotmatch; + free(objs); + free(transinfo->objvernotmatch); + return control; + } + /* Decide what control message to send to Coordinator */ if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked, modptr, oidnotfound, oidlocked, acceptfd)) == 0) { @@ -513,7 +530,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } return control; - } /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT * to send to Coordinator based on the votes of oids involved in the transaction */ @@ -536,7 +552,7 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int /* Send control message */ send_data(acceptfd, &control, sizeof(char)); - /* Send number of oids not found and the missing oids if objects are missing in the machine */ + /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */ if(*(objnotfound) != 0) { int msg[1]; msg[0] = *(objnotfound); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d60bea5f..d078e0c4 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -653,6 +653,34 @@ void *transRequest(void *threadarg) { /* Read control message from Participant */ recv_data(sd, &control, sizeof(char)); + /* Recv Objects if participant sends TRANS_DISAGREE */ + if(control == TRANS_DISAGREE) { + int length; + recv_data(sd, &length, sizeof(int)); + void *newAddr; + pthread_mutex_lock(&prefetchcache_mutex); + if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) { + printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + close(sd); + pthread_exit(NULL); + } + pthread_mutex_unlock(&prefetchcache_mutex); + recv_data(sd, newAddr, length); + int offset = 0; + while(length != 0) { + unsigned int oidToPrefetch; + objheader_t * header; + header = (objheader_t *) (((char *)newAddr) + offset); + oidToPrefetch = OID(header); + int size = 0; + GETSIZE(size, header); + size += sizeof(objheader_t); + //make an entry in prefetch hash table + prehashInsert(oidToPrefetch, header); + length = length - size; + offset += size; + } + } recvcontrol = control; /* Update common data structure and increment count */ tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; @@ -672,7 +700,7 @@ void *transRequest(void *threadarg) { } pthread_mutex_unlock(tdata->lock); - /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t + /* Send the final response such as TRANS_COMMIT or TRANS_ABORT * to all participants in their respective socket */ if (sendResponse(tdata, sd) == 0) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); @@ -728,23 +756,70 @@ void decideResponse(thread_data_array_t *tdata) { *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 0; /* clear objects from prefetch cache */ - for (i = 0; i < tdata->buffer->f.numread; i++) + for (i = 0; i < tdata->buffer->f.numread; i++) { prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); - for (i = 0; i < tdata->buffer->f.nummod; i++) + } + for (i = 0; i < tdata->buffer->f.nummod; i++) { prehashRemove(tdata->buffer->oidmod[i]); + } } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; *(tdata->replyretry) = 0; + /* update prefetch cache */ + /* For objects read */ + char oidType; + int retval; + oidType = 'R'; + if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return; + } + oidType = 'M'; + if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return; + } } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 1; } - return; } +/* This function updates the prefetch cache when commiting objects + * based on the type of oid i.e. if oid is read or oid is modified + * Return -1 on error else returns 0 + */ +int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) { + int i; + for (i = 0; i < numoid; i++) { + //find address object + objheader_t *header, *newAddr; + int size; + unsigned int oid; + if(oidType == 'R') { + oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)); + } else { + oid = tdata->buffer->oidmod[i]; + } + header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); + //copy object into prefetch cache + GETSIZE(size, header); + pthread_mutex_lock(&prefetchcache_mutex); + if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) { + printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(newAddr, header, (size + sizeof(objheader_t))); + //make an entry in prefetch hash table + prehashInsert(oid, newAddr); + } + return 0; +} + /* This function sends the final response to remote machines per * thread in their respective socket id It returns a char that is only * needed to check the correctness of execution of this function @@ -802,7 +877,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { recv_data(sd, &size, sizeof(int)); objcopy = objstrAlloc(record->cache, size); recv_data(sd, objcopy, size); - /* Insert into cache's lookup table */ chashInsert(record->lookupTable, oid, objcopy); } @@ -821,7 +895,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { void *handleLocalReq(void *threadarg) { unsigned int *oidnotfound = NULL, *oidlocked = NULL; local_thread_data_array_t *localtdata; - int objnotfound = 0, objlocked = 0; + int numoidnotfound = 0, numoidlocked = 0; int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; int numread, i; unsigned int oid; @@ -858,8 +932,8 @@ void *handleLocalReq(void *threadarg) { /* Save the oids not found and number of oids not found for later use */ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ /* Save the oids not found and number of oids not found for later use */ - oidnotfound[objnotfound] = oid; - objnotfound++; + oidnotfound[numoidnotfound] = oid; + numoidnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ if (test_and_set(STATUSPTR(mobj))) { @@ -873,8 +947,8 @@ void *handleLocalReq(void *threadarg) { } else { //we're locked /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = OID(((objheader_t *)mobj)); - objlocked++; + oidlocked[numoidlocked] = OID(((objheader_t *)mobj)); + numoidlocked++; if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ v_matchnolock++; } else { /* If versions don't match ...HARD ABORT */ @@ -890,7 +964,7 @@ void *handleLocalReq(void *threadarg) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; } /* Condition to send TRANS_SOFT_ABORT */ - if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { + if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; } @@ -899,8 +973,8 @@ void *handleLocalReq(void *threadarg) { localtdata->transinfo->objlocked = oidlocked; localtdata->transinfo->objnotfound = oidnotfound; localtdata->transinfo->modptr = NULL; - localtdata->transinfo->numlocked = objlocked; - localtdata->transinfo->numnotfound = objnotfound; + localtdata->transinfo->numlocked = numoidlocked; + localtdata->transinfo->numnotfound = numoidnotfound; /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(localtdata->tdata->lock); @@ -952,7 +1026,7 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { } UnLock(STATUSPTR(header)); } - + return 0; } -- 2.34.1