From fa4b892a37ce3e41bf4395f10bbe39495aaf0889 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 26 Mar 2008 18:18:02 +0000 Subject: [PATCH] completed and tested socket reuse code for trans read and trans prefetch() messages --- .../src/Runtime/DSTM/interface/dstmserver.c | 61 ++++++++-------- Robust/src/Runtime/DSTM/interface/trans.c | 69 ++++++++++++------- 2 files changed, 73 insertions(+), 57 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 672a955c..08baee50 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -150,27 +150,30 @@ void *dstmAccept(void *acceptfd) switch(control) { case READ_REQUEST: - /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); - if((srcObj = mhashSearch(oid)) == NULL) { - printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); - pthread_exit(NULL); - } - h = (objheader_t *) srcObj; - GETSIZE(size, h); - size += sizeof(objheader_t); - sockid = (int) acceptfd; - - if (h == NULL) { - ctrl = OBJECT_NOT_FOUND; - send_data(sockid, &ctrl, sizeof(char)); - } else { - /* Type */ - char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; - *((int *)&msg[1])=size; - send_data(sockid, &msg, sizeof(msg)); - send_data(sockid, h, size); - } + do { + /* Read oid requested and search if available */ + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + if((srcObj = mhashSearch(oid)) == NULL) { + printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); + break; + } + h = (objheader_t *) srcObj; + GETSIZE(size, h); + size += sizeof(objheader_t); + sockid = (int) acceptfd; + + if (h == NULL) { + ctrl = OBJECT_NOT_FOUND; + send_data(sockid, &ctrl, sizeof(char)); + } else { + /* Type */ + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + send_data(sockid, &msg, sizeof(msg)); + send_data(sockid, h, size); + } + recv_data((int)acceptfd, &control, sizeof(char)); + } while(control == READ_REQUEST); break; case READ_MULT_REQUEST: @@ -199,12 +202,13 @@ void *dstmAccept(void *acceptfd) } while (control == TRANS_PREFETCH); break; case TRANS_PREFETCH_RESPONSE: - //do { + do { if((val = getPrefetchResponse((int) acceptfd)) != 0) { printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); + break; } - //} while (control == TRANS_PREFETCH_RESPONSE); + recv_data((int)acceptfd, &control, sizeof(char)); + } while (control == TRANS_PREFETCH_RESPONSE); break; case START_REMOTE_THREAD: recv_data((int)acceptfd, &oid, sizeof(unsigned int)); @@ -643,10 +647,10 @@ int prefetchReq(int acceptfd) { } memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size); free(recvbuffer); -#if 0 pthread_mutex_lock(&sockLock); sockIdFound = 0; pthread_mutex_unlock(&sockLock); + /* If socket is already established then send data reusing socket */ for(i = 0; i < NUM_MACHINES; i++) { if(sockArray[i].mid == mid) { sd = sockArray[i].sockid; @@ -659,8 +663,6 @@ int prefetchReq(int acceptfd) { if(sockIdFound == 0) { if(sockCount < NUM_MACHINES) { - -#endif /* Create socket to send information */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ perror("prefetchReq():socket()"); @@ -672,13 +674,12 @@ int prefetchReq(int acceptfd) { remoteAddr.sin_addr.s_addr = htonl(mid); if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + perror("connect"); printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); close(sd); return -1; } - -#if 0 sockArray[sockCount].mid = mid; sockArray[sockCount].sockid = sd; pthread_mutex_lock(&sockLock); @@ -690,7 +691,6 @@ int prefetchReq(int acceptfd) { return -1; } } -#endif /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ @@ -812,7 +812,6 @@ int prefetchReq(int acceptfd) { } free(offsetarry); } - close(sd); } } while (length != -1); return 0; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index da01610c..8524cae9 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -48,13 +48,16 @@ unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; -/************************************************************ +/************************************************************************ * Global variables to map socketid and remote mid to - * reuse sockets - ***********************************************************/ + * reuse sockets for sending prefetches and making remote read requests + ************************************************************************/ midSocketInfo_t midSocketArray[NUM_MACHINES]; int sockCount; //number of connections with all remote machines(one socket per mc) int sockIdFound; //track if socket file descriptor is already established +midSocketInfo_t sockArrayRemoteRead[NUM_MACHINES]; +int sockCountRemoteRead; //number of connections with all remote machines(one socket per mc) +int sockIdFoundRemoteRead; //track if socket file descriptor is already established void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); @@ -70,7 +73,6 @@ void send_data(int fd , void *buf, int buflen) { numbytes = send(fd, buffer, size, MSG_NOSIGNAL); if (numbytes == -1) { perror("send"); - printf("error: at %s, %d\n", __FILE__, __LINE__); exit(-1); } buffer += numbytes; @@ -86,7 +88,6 @@ void recv_data(int fd , void *buf, int buflen) { numbytes = recv(fd, buffer, size, 0); if (numbytes == -1) { perror("recv"); - printf("error: at %s, %d\n", __FILE__, __LINE__); exit(-1); } buffer += numbytes; @@ -653,7 +654,6 @@ int transCommit(transrecord_t *record) { printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); exit(-1); } - return 0; } @@ -861,24 +861,43 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { objheader_t *h; void *objcopy; - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket\n"); - return NULL; - } - - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - midtoIP(mnum,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); - - // Open connection - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("getRemoteObj() Error in connect\n"); - return NULL; - } - + int i; + for(i = 0; i < NUM_MACHINES; i++) { + if(sockArrayRemoteRead[i].mid == mnum) { + sd = sockArrayRemoteRead[i].sockid; + sockIdFoundRemoteRead = 1; + break; + } + } + + if(sockIdFoundRemoteRead == 0) { + if(sockCountRemoteRead < NUM_MACHINES) { + /* Create socket */ + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket\n"); + return NULL; + } + + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + serv_addr.sin_addr.s_addr = htonl(mnum); + // Open connection + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("getRemoteObj() Error in connect\n"); + close(sd); + return NULL; + } + sockArrayRemoteRead[sockCountRemoteRead].mid = mnum; + sockArrayRemoteRead[sockCountRemoteRead].sockid = sd; + sockCountRemoteRead++; + } else { + //TODO Fix for connecting to more than 2 machines && close socket + printf("%s(): Error: Currently works for two remote machines\n", __func__); + return NULL; + } + } + char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; @@ -904,8 +923,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return NULL; } - //Close connection - close(sd); return objcopy; } -- 2.34.1