From b9391edbff0c56c0f484e7da23a9ce3c22d7e144 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 27 Mar 2008 07:51:54 +0000 Subject: [PATCH] small changes --- Robust/src/Runtime/DSTM/interface/dstm.h | 2 + .../src/Runtime/DSTM/interface/dstmserver.c | 277 +++++++++--------- Robust/src/Runtime/DSTM/interface/trans.c | 15 + 3 files changed, 154 insertions(+), 140 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 6c893cfc..f7478881 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -42,6 +42,7 @@ #define THREAD_NOTIFY_REQUEST 24 #define THREAD_NOTIFY_RESPONSE 25 #define TRANS_UNSUCESSFUL 26 +#define CLOSE_CONNECTION 27 //Max number of objects #define MAX_OBJECTS 20 @@ -200,6 +201,7 @@ typedef struct midSocketInfo { int dstmInit(void); void send_data(int fd, void *buf, int buflen); void recv_data(int fd, void *buf, int buflen); +int recv_data_errorcode(int fd, void *buf, int buflen); /* Prototypes for object header */ unsigned int getNewOID(void); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 08baee50..64b1c8bc 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -126,148 +126,145 @@ void *dstmListen() } /* This function accepts a new connection request, decodes the control message in the connection * and accordingly calls other functions to process new requests */ -void *dstmAccept(void *acceptfd) -{ - int val, retval, size, sum, sockid; - unsigned int oid; - char *buffer; - char control,ctrl; - char *ptr; - void *srcObj; - objheader_t *h; - trans_commit_data_t transinfo; - unsigned short objType, *versionarry, version; - unsigned int *oidarry, numoid, mid, threadid; - - transinfo.objlocked = NULL; - transinfo.objnotfound = NULL; - transinfo.modptr = NULL; - transinfo.numlocked = 0; - transinfo.numnotfound = 0; - - /* Receive control messages from other machines */ - recv_data((int)acceptfd, &control, sizeof(char)); - - switch(control) { - case READ_REQUEST: - do { - /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); - if((srcObj = mhashSearch(oid)) == NULL) { - printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); - break; - } - h = (objheader_t *) srcObj; - GETSIZE(size, h); - size += sizeof(objheader_t); - sockid = (int) acceptfd; - - if (h == NULL) { - ctrl = OBJECT_NOT_FOUND; - send_data(sockid, &ctrl, sizeof(char)); - } else { - /* Type */ - char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; - *((int *)&msg[1])=size; - send_data(sockid, &msg, sizeof(msg)); - send_data(sockid, h, size); - } - recv_data((int)acceptfd, &control, sizeof(char)); - } while(control == READ_REQUEST); - break; - - case READ_MULT_REQUEST: - break; - - case MOVE_REQUEST: - break; - - case MOVE_MULT_REQUEST: - break; - - case TRANS_REQUEST: - /* Read transaction request */ - if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { - printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); - } - break; - case TRANS_PREFETCH: - do { - if((val = prefetchReq((int)acceptfd)) != 0) { - printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); - break; - } - recv_data((int)acceptfd, &control, sizeof(char)); - } while (control == TRANS_PREFETCH); - break; - case TRANS_PREFETCH_RESPONSE: - do { - if((val = getPrefetchResponse((int) acceptfd)) != 0) { - printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); - break; - } - recv_data((int)acceptfd, &control, sizeof(char)); - } while (control == TRANS_PREFETCH_RESPONSE); - break; - case START_REMOTE_THREAD: - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); - objType = getObjType(oid); - startDSMthread(oid, objType); - break; - - case THREAD_NOTIFY_REQUEST: - recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); - size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); - if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); - } - - recv_data((int)acceptfd, buffer, size); - - oidarry = calloc(numoid, sizeof(unsigned int)); - memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); - size = sizeof(unsigned int) * numoid; - versionarry = calloc(numoid, sizeof(unsigned short)); - memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); - size += sizeof(unsigned short) * numoid; - mid = *((unsigned int *)(buffer+size)); - size += sizeof(unsigned int); - threadid = *((unsigned int *)(buffer+size)); - processReqNotify(numoid, oidarry, versionarry, mid, threadid); - free(buffer); - - break; - - case THREAD_NOTIFY_RESPONSE: - size = sizeof(unsigned short) + 2 * sizeof(unsigned int); - if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); - } - - recv_data((int)acceptfd, buffer, size); - - oid = *((unsigned int *)buffer); - size = sizeof(unsigned int); - version = *((unsigned short *)(buffer+size)); - size += sizeof(unsigned short); - threadid = *((unsigned int *)(buffer+size)); - threadNotify(oid,version,threadid); - free(buffer); - - break; - default: - printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); - } - - /* Close connection */ - if (close((int)acceptfd) == -1) - perror("close"); +void *dstmAccept(void *acceptfd) { + int val, retval, size, sum, sockid; + unsigned int oid; + char *buffer; + char control,ctrl; + char *ptr; + void *srcObj; + objheader_t *h; + trans_commit_data_t transinfo; + unsigned short objType, *versionarry, version; + unsigned int *oidarry, numoid, mid, threadid; + + transinfo.objlocked = NULL; + transinfo.objnotfound = NULL; + transinfo.modptr = NULL; + transinfo.numlocked = 0; + transinfo.numnotfound = 0; + + /* Receive control messages from other machines */ + while(true) { + int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); + if (ret==-1) + break; + switch(control) { + case READ_REQUEST: + /* Read oid requested and search if available */ + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + if((srcObj = mhashSearch(oid)) == NULL) { + printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); + break; + } + h = (objheader_t *) srcObj; + GETSIZE(size, h); + size += sizeof(objheader_t); + sockid = (int) acceptfd; + + if (h == NULL) { + ctrl = OBJECT_NOT_FOUND; + send_data(sockid, &ctrl, sizeof(char)); + } else { + /* Type */ + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + send_data(sockid, &msg, sizeof(msg)); + send_data(sockid, h, size); + } + break; + + case READ_MULT_REQUEST: + break; + + case MOVE_REQUEST: + break; + + case MOVE_MULT_REQUEST: + break; + + case TRANS_REQUEST: + /* Read transaction request */ + if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { + printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + break; + case TRANS_PREFETCH: + if((val = prefetchReq((int)acceptfd)) != 0) { + printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); + break; + } + break; + case TRANS_PREFETCH_RESPONSE: + if((val = getPrefetchResponse((int) acceptfd)) != 0) { + printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); + break; + } + break; + case START_REMOTE_THREAD: + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + objType = getObjType(oid); + startDSMthread(oid, objType); + break; + + case THREAD_NOTIFY_REQUEST: + recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); + size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); + if((buffer = calloc(1,size)) == NULL) { + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); pthread_exit(NULL); + } + + recv_data((int)acceptfd, buffer, size); + + oidarry = calloc(numoid, sizeof(unsigned int)); + memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); + size = sizeof(unsigned int) * numoid; + versionarry = calloc(numoid, sizeof(unsigned short)); + memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); + size += sizeof(unsigned short) * numoid; + mid = *((unsigned int *)(buffer+size)); + size += sizeof(unsigned int); + threadid = *((unsigned int *)(buffer+size)); + processReqNotify(numoid, oidarry, versionarry, mid, threadid); + free(buffer); + + break; + + case THREAD_NOTIFY_RESPONSE: + size = sizeof(unsigned short) + 2 * sizeof(unsigned int); + if((buffer = calloc(1,size)) == NULL) { + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); + } + + recv_data((int)acceptfd, buffer, size); + + oid = *((unsigned int *)buffer); + size = sizeof(unsigned int); + version = *((unsigned short *)(buffer+size)); + size += sizeof(unsigned short); + threadid = *((unsigned int *)(buffer+size)); + threadNotify(oid,version,threadid); + free(buffer); + break; + + case CLOSE_CONNECTION: + goto closeconnection; + + default: + printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); + } + } + + closeconnection: + /* Close connection */ + if (close((int)acceptfd) == -1) + perror("close"); + pthread_exit(NULL); } - + /* This function reads the information available in a transaction request * and makes a function call to process the request */ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 8524cae9..66e524ba 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -95,6 +95,21 @@ void recv_data(int fd , void *buf, int buflen) { } } +int recv_data_errorcode(int fd , void *buf, int buflen) { + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = recv(fd, buffer, size, 0); + if (numbytes == -1) { + return -1; + } + buffer += numbytes; + size -= numbytes; + } + return 0; +} + void printhex(unsigned char *ptr, int numBytes) { int i; -- 2.34.1