X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;ds=sidebyside;f=Robust%2Fsrc%2FRuntime%2FDSTM%2Finterface%2Fdstmserver.c;h=e1ec8c926cd2620ab51a761e2c2574061bd6907c;hb=6e2ca7663bc8fa22199e01c92b46d892187fccaa;hp=64b1c8bc0d45f06226d2c0a6dd758662da561850;hpb=b9391edbff0c56c0f484e7da23a9ce3c22d7e144;p=IRC.git diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 64b1c8bc..e1ec8c92 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -1,14 +1,7 @@ /* Coordinator => Machine that initiates the transaction request call for commiting a transaction * Participant => Machines that host the objects involved in a transaction commit */ -#include -#include -#include -#include -#include -#include -#include -#include +#include #include "dstm.h" #include "mlookup.h" #include "llookup.h" @@ -17,24 +10,19 @@ #include "thread.h" #endif - -#define LISTEN_PORT 2156 #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 extern int classsize[]; +extern int numHostsInSystem; +extern pthread_mutex_t notifymutex; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; +pthread_mutex_t lockObjHeader; pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ -/********************************************************** - * Global variables to map socketid and remote mid - * to resuse sockets - **************************************************/ -midSocketInfo_t sockArray[NUM_MACHINES]; -int sockCount; //number of connections with all remote machines(one socket per mc) -int sockIdFound; //track if socket file descriptor is already established -pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent + +sockPoolHashTable_t *transPResponseSocketPool; /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -46,6 +34,7 @@ int dstmInit(void) pthread_mutexattr_init(&mainobjstore_mutex_attr); pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); + pthread_mutex_init(&lockObjHeader,NULL); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -54,75 +43,77 @@ int dstmInit(void) if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) return 1; //failure - - //Initialize mid to socketid mapping array - int t; - sockCount = 0; - for(t = 0; t < NUM_MACHINES; t++) { - sockArray[t].mid = 0; - sockArray[t].sockid = 0; - } + + //Initialize socket pool + if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) { + printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); + return 0; + } return 0; } -/* This function starts the thread to listen on a socket - * for tranaction calls */ -void *dstmListen() -{ - int listenfd, acceptfd; - struct sockaddr_in my_addr; - struct sockaddr_in client_addr; - socklen_t addrlength = sizeof(struct sockaddr); - pthread_t thread_dstm_accept; - int i; - int setsockflag=1; - - listenfd = socket(AF_INET, SOCK_STREAM, 0); - if (listenfd == -1) - { - perror("socket"); - exit(1); - } - if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } +int startlistening() { + int listenfd; + struct sockaddr_in my_addr; + socklen_t addrlength = sizeof(struct sockaddr); + int setsockflag=1; + + listenfd = socket(AF_INET, SOCK_STREAM, 0); + if (listenfd == -1) { + perror("socket"); + exit(1); + } + + if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #ifdef MAC - if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } + if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #endif + + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(LISTEN_PORT); + my_addr.sin_addr.s_addr = INADDR_ANY; + memset(&(my_addr.sin_zero), '\0', 8); + + if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) { + perror("bind"); + exit(1); + } + + if (listen(listenfd, BACKLOG) == -1) { + perror("listen"); + exit(1); + } + return listenfd; +} - my_addr.sin_family = AF_INET; - my_addr.sin_port = htons(LISTEN_PORT); - my_addr.sin_addr.s_addr = INADDR_ANY; - memset(&(my_addr.sin_zero), '\0', 8); - - if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) - { - perror("bind"); - exit(1); - } - - if (listen(listenfd, BACKLOG) == -1) - { - perror("listen"); - exit(1); - } - - printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); - while(1) - { - int retval; - acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); - do { - retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); - } while(retval!=0); - pthread_detach(thread_dstm_accept); - } +/* This function starts the thread to listen on a socket + * for tranaction calls */ +void *dstmListen(void *lfd) { + int listenfd=(int)lfd; + int acceptfd; + struct sockaddr_in client_addr; + socklen_t addrlength = sizeof(struct sockaddr); + pthread_t thread_dstm_accept; + + printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); + while(1) { + int retval; + int flag=1; + acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); + do { + retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); + } while(retval!=0); + pthread_detach(thread_dstm_accept); + } } /* This function accepts a new connection request, decodes the control message in the connection * and accordingly calls other functions to process new requests */ @@ -138,17 +129,15 @@ void *dstmAccept(void *acceptfd) { unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; - transinfo.objlocked = NULL; - transinfo.objnotfound = NULL; - transinfo.modptr = NULL; - transinfo.numlocked = 0; - transinfo.numnotfound = 0; - /* Receive control messages from other machines */ - while(true) { + while(1) { int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); - if (ret==-1) + if (ret==0) break; + if (ret==-1) { + printf("DEBUG -> RECV Error!.. retrying\n"); + break; + } switch(control) { case READ_REQUEST: /* Read oid requested and search if available */ @@ -185,6 +174,11 @@ void *dstmAccept(void *acceptfd) { case TRANS_REQUEST: /* Read transaction request */ + transinfo.objlocked = NULL; + transinfo.objnotfound = NULL; + transinfo.modptr = NULL; + transinfo.numlocked = 0; + transinfo.numnotfound = 0; if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); @@ -373,7 +367,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address return 1; } - STATUS(((objheader_t *)header)) &= ~(LOCK); + UnLock(STATUSPTR(header)); } /* Send ack to Coordinator */ @@ -449,66 +443,66 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne int tmpsize; headptr = (objheader_t *) ptr; oid = OID(headptr); - version = headptr->version; - GETSIZE(tmpsize, headptr); - ptr += sizeof(objheader_t) + tmpsize; - } - - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ - - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found and number of oids not found for later use */ - oidnotfound[objnotfound] = oid; - objnotfound++; - } else { /* If Obj found in machine (i.e. has not moved) */ - /* Check if Obj is locked by any previous transaction */ - if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) { - if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ - v_matchlock++; - } else {/* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - control = TRANS_DISAGREE; - if (objlocked > 0) { - for(j = 0; j < objlocked; j++) { - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - STATUS(headptr) &= ~(LOCK); - } - free(oidlocked); - } - send_data(acceptfd, &control, sizeof(char)); - return control; - } - } else {/* If Obj is not locked then lock object */ - STATUS(((objheader_t *)mobj)) |= LOCK; - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = OID(((objheader_t *)mobj)); - objlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - control = TRANS_DISAGREE; - if (objlocked > 0) { - for(j = 0; j < objlocked; j++) { - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - STATUS(headptr) &= ~(LOCK); - } - free(oidlocked); - } - - /* Send TRANS_DISAGREE to Coordinator */ - send_data(acceptfd, &control, sizeof(char)); - return control; - } - } - } + version = headptr->version; + GETSIZE(tmpsize, headptr); + ptr += sizeof(objheader_t) + tmpsize; + } + + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[objnotfound] = oid; + objnotfound++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (test_and_set(STATUSPTR(mobj))) { + //don't have lock + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ + v_matchlock++; + } else {/* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + control = TRANS_DISAGREE; + if (objlocked > 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + UnLock(STATUSPTR(headptr)); + } + free(oidlocked); + } + send_data(acceptfd, &control, sizeof(char)); + return control; + } + } else {/* If Obj is not locked then lock object */ + /* Save all object oids that are locked on this machine during this transaction request call */ + oidlocked[objlocked] = OID(((objheader_t *)mobj)); + objlocked++; + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + v_matchnolock++; + } else { /* If versions don't match ...HARD ABORT */ + v_nomatch++; + control = TRANS_DISAGREE; + if (objlocked > 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + UnLock(STATUSPTR(headptr)); + } + free(oidlocked); + } + + /* Send TRANS_DISAGREE to Coordinator */ + send_data(acceptfd, &control, sizeof(char)); + return control; + } + } + } } /* Decide what control message to send to Coordinator */ @@ -559,7 +553,7 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int transinfo->modptr = modptr; transinfo->numlocked = *(objlocked); transinfo->numnotfound = *(objnotfound); - + return control; } @@ -567,47 +561,45 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * addresses in lookup table and also changes version number * Sends an ACK back to Coordinator */ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) { - objheader_t *header; - objheader_t *newheader; - int i = 0, offset = 0; - char control; - int tmpsize; - - /* Process each modified object saved in the mainobject store */ - for(i = 0; i < nummod; i++) { - if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - GETSIZE(tmpsize,header); - pthread_mutex_lock(&mainobjstore_mutex); - memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); - header->version += 1; - /* If threads are waiting on this object to be updated, notify them */ - if(header->notifylist != NULL) { - notifyAll(&header->notifylist, OID(header), header->version); - } - pthread_mutex_unlock(&mainobjstore_mutex); - offset += sizeof(objheader_t) + tmpsize; - } - - if (nummod > 0) - free(modptr); - - /* Unlock locked objects */ - for(i = 0; i < numlocked; i++) { - if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - STATUS(header) &= ~(LOCK); - } - //TODO Update location lookup table - - /* Send ack to coordinator */ - control = TRANS_SUCESSFUL; - send_data((int)acceptfd, &control, sizeof(char)); - return 0; + objheader_t *header; + objheader_t *newheader; + int i = 0, offset = 0; + char control; + int tmpsize; + + /* Process each modified object saved in the mainobject store */ + for(i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize,header); + memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); + header->version += 1; + /* If threads are waiting on this object to be updated, notify them */ + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + offset += sizeof(objheader_t) + tmpsize; + } + + if (nummod > 0) + free(modptr); + + /* Unlock locked objects */ + for(i = 0; i < numlocked; i++) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + UnLock(STATUSPTR(header)); + } + //TODO Update location lookup table + + /* Send ack to coordinator */ + control = TRANS_SUCESSFUL; + send_data((int)acceptfd, &control, sizeof(char)); + return 0; } /* This function recevies the oid and offset tuples from the Coordinator's prefetch call. @@ -616,281 +608,184 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * then use offset values to prefetch references to other objects */ int prefetchReq(int acceptfd) { - int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0; - int length, sd = -1; - char *recvbuffer, *sendbuffer, control; - unsigned int oid, mid; - short *offsetarry; - objheader_t *header; - struct sockaddr_in remoteAddr; - - do { - recv_data((int)acceptfd, &length, sizeof(int)); - if(length != -1) { - size = length - sizeof(int); - if((recvbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - return -1; - } - recv_data((int)acceptfd, recvbuffer, size); - oid = *((unsigned int *) recvbuffer); - mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int))); - size = size - (2 * sizeof(unsigned int)); - numoffset = size / sizeof(short); - if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(recvbuffer); - return -1; - } - memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size); - free(recvbuffer); - pthread_mutex_lock(&sockLock); - sockIdFound = 0; - pthread_mutex_unlock(&sockLock); - /* If socket is already established then send data reusing socket */ - for(i = 0; i < NUM_MACHINES; i++) { - if(sockArray[i].mid == mid) { - sd = sockArray[i].sockid; - pthread_mutex_lock(&sockLock); - sockIdFound = 1; - pthread_mutex_unlock(&sockLock); - break; - } - } - - if(sockIdFound == 0) { - if(sockCount < NUM_MACHINES) { - /* Create socket to send information */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("prefetchReq():socket()"); - return -1; - } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - perror("connect"); - printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - close(sd); - return -1; - } - sockArray[sockCount].mid = mid; - sockArray[sockCount].sockid = sd; - pthread_mutex_lock(&sockLock); - sockCount++; - pthread_mutex_unlock(&sockLock); - } else { - //TODO Fix for connecting to more than 2 machines && close socket - printf("%s(): Error: Currently works for only 2 machines\n", __func__); - return -1; - } - } - - /*Process each oid */ - if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found in buffer for later use */ - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(offsetarry); - close(sd); - return -1; - } - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - } else { /* Object Found */ - int incr = 0; - GETSIZE(objsize, header); - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(offsetarry); - close(sd); - return -1; - } - *((int *) (sendbuffer + incr)) = size; - incr += sizeof(int); - *((char *)(sendbuffer + incr)) = OBJECT_FOUND; - incr += sizeof(char); - *((unsigned int *)(sendbuffer+incr)) = oid; - incr += sizeof(unsigned int); - memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - - /* Calculate the oid corresponding to the offset value */ - for(i = 0 ; i< numoffset ; i++) { - /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { - isArray = 1; - } - if(isArray == 1) { - int elementsize = classsize[TYPE(header)]; - struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); - unsigned short length = ao->___length___; - /* Check if array out of bounds */ - if(offsetarry[i]< 0 || offsetarry[i] >= length) { - break; - } - oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i]))); - } else { - oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i])); - } - - if((header = mhashSearch(oid)) == NULL) { - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(offsetarry); - close(sd); - return -1; - } - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); - printf("Error: %s() in sending prefetch response at %s, %d\n", - __FILE__, __LINE__); - close(sd); - return -1; - } - break; - } else {/* Obj Found */ - int incr = 0; - GETSIZE(objsize, header); - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); - free(offsetarry); - close(sd); - return -1; - } - *((int *) (sendbuffer + incr)) = size; - incr += sizeof(int); - *((char *)(sendbuffer + incr)) = OBJECT_FOUND; - incr += sizeof(char); - *((unsigned int *)(sendbuffer+incr)) = oid; - incr += sizeof(unsigned int); - memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - } - isArray = 0; - } - free(offsetarry); - } - } - } while (length != -1); - return 0; + int i, size, objsize, numoffset = 0; + int length; + char *recvbuffer, control; + unsigned int oid, mid=-1; + objheader_t *header; + oidmidpair_t oidmid; + int sd = -1; + + while(1) { + recv_data((int)acceptfd, &numoffset, sizeof(int)); + if(numoffset == -1) + break; + recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); + oid = oidmid.oid; + if (mid != oidmid.mid) { + if (mid!=-1) { + freeSockWithLock(transPResponseSocketPool, mid, sd); + } + mid=oidmid.mid; + sd = getSockWithLock(transPResponseSocketPool, mid); + } + short offsetarry[numoffset]; + recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short)); + + /*Process each oid */ + if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found in buffer for later use */ + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + char sendbuffer[size]; + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + control = TRANS_PREFETCH_RESPONSE; + sendPrefetchResponse(sd, &control, sendbuffer, &size); + } else { /* Object Found */ + int incr = 0; + GETSIZE(objsize, header); + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + char sendbuffer[size]; + *((int *) (sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer+incr)) = oid; + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + control = TRANS_PREFETCH_RESPONSE; + sendPrefetchResponse(sd, &control, sendbuffer, &size); + + /* Calculate the oid corresponding to the offset value */ + for(i = 0 ; i< numoffset ; i++) { + /* Check for arrays */ + if(TYPE(header) > NUMCLASSES) { + int elementsize = classsize[TYPE(header)]; + struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); + unsigned short length = ao->___length___; + /* Check if array out of bounds */ + if(offsetarry[i]< 0 || offsetarry[i] >= length) { + break; + } + oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i]))); + } else { + oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i])); + } + + /* Don't continue if we hit a NULL pointer */ + if (oid==0) + break; + + if((header = mhashSearch(oid)) == NULL) { + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + char sendbuffer[size]; + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + + control = TRANS_PREFETCH_RESPONSE; + sendPrefetchResponse(sd, &control, sendbuffer, &size); + break; + } else {/* Obj Found */ + int incr = 0; + GETSIZE(objsize, header); + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + char sendbuffer[size]; + *((int *) (sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer+incr)) = oid; + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + control = TRANS_PREFETCH_RESPONSE; + sendPrefetchResponse(sd, &control, sendbuffer, &size); + } + } + } + } + //Release socket + if (mid!=-1) + freeSockWithLock(transPResponseSocketPool, mid, sd); + + return 0; } -int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { - int numbytes = 0; - +void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { send_data(sd, control, sizeof(char)); /* Send the buffer with its size */ int length = *(size); send_data(sd, sendbuffer, length); - free(sendbuffer); - return 0; } void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) { - objheader_t *header; - unsigned int oid; - unsigned short newversion; - char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; - int sd; - struct sockaddr_in remoteAddr; - int bytesSent; - int size; - - int i = 0; - while(i < numoid) { - oid = *(oidarry + i); - if((header = (objheader_t *) mhashSearch(oid)) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return; - } else { - /* Check to see if versions are same */ -checkversion: - if ((STATUS(header) & LOCK) != LOCK) { - //FIXME make locking atomic - STATUS(header) |= LOCK; - newversion = header->version; - if(newversion == *(versionarry + i)) { - //Add to the notify list - if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) { - printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); - return; - } - STATUS(header) &= ~(LOCK); - } else { - STATUS(header) &= ~(LOCK); - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("processReqNotify():socket()"); - return; - } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - close(sd); - return; - } else { - //Send Update notification - msg[0] = THREAD_NOTIFY_RESPONSE; - *((unsigned int *)&msg[1]) = oid; - size = sizeof(unsigned int); - *((unsigned short *)(&msg[1]+size)) = newversion; - size += sizeof(unsigned short); - *((unsigned int *)(&msg[1]+size)) = threadid; - size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short); - send_data(sd, msg, size); - } - close(sd); - } - } else { - randomdelay(); - goto checkversion; - } - } - i++; + objheader_t *header; + unsigned int oid; + unsigned short newversion; + char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; + int sd; + struct sockaddr_in remoteAddr; + int bytesSent; + int size; + int i = 0; + + while(i < numoid) { + oid = *(oidarry + i); + if((header = (objheader_t *) mhashSearch(oid)) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return; + } else { + /* Check to see if versions are same */ + checkversion: + if (test_and_set(STATUSPTR(header))==0) { + //have lock + newversion = header->version; + if(newversion == *(versionarry + i)) { + //Add to the notify list + if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) { + printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); + return; + } + UnLock(STATUSPTR(header)); + } else { + UnLock(STATUSPTR(header)); + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("processReqNotify():socket()"); + return; + } + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + close(sd); + return; + } else { + //Send Update notification + msg[0] = THREAD_NOTIFY_RESPONSE; + *((unsigned int *)&msg[1]) = oid; + size = sizeof(unsigned int); + *((unsigned short *)(&msg[1]+size)) = newversion; + size += sizeof(unsigned short); + *((unsigned int *)(&msg[1]+size)) = threadid; + size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short); + send_data(sd, msg, size); + } + close(sd); } - free(oidarry); - free(versionarry); + } else { + randomdelay(); + goto checkversion; + } + } + i++; + } + free(oidarry); + free(versionarry); }