}
/* This function accepts a new connection request, decodes the control message in the connection
* and accordingly calls other functions to process new requests */
-void *dstmAccept(void *acceptfd)
-{
- int val, retval, size, sum, sockid;
- unsigned int oid;
- char *buffer;
- char control,ctrl;
- char *ptr;
- void *srcObj;
- objheader_t *h;
- trans_commit_data_t transinfo;
- unsigned short objType, *versionarry, version;
- unsigned int *oidarry, numoid, mid, threadid;
-
- transinfo.objlocked = NULL;
- transinfo.objnotfound = NULL;
- transinfo.modptr = NULL;
- transinfo.numlocked = 0;
- transinfo.numnotfound = 0;
-
- /* Receive control messages from other machines */
- recv_data((int)acceptfd, &control, sizeof(char));
-
- switch(control) {
- case READ_REQUEST:
- do {
- /* Read oid requested and search if available */
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
- if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
- break;
- }
- h = (objheader_t *) srcObj;
- GETSIZE(size, h);
- size += sizeof(objheader_t);
- sockid = (int) acceptfd;
-
- if (h == NULL) {
- ctrl = OBJECT_NOT_FOUND;
- send_data(sockid, &ctrl, sizeof(char));
- } else {
- /* Type */
- char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
- *((int *)&msg[1])=size;
- send_data(sockid, &msg, sizeof(msg));
- send_data(sockid, h, size);
- }
- recv_data((int)acceptfd, &control, sizeof(char));
- } while(control == READ_REQUEST);
- break;
-
- case READ_MULT_REQUEST:
- break;
-
- case MOVE_REQUEST:
- break;
-
- case MOVE_MULT_REQUEST:
- break;
-
- case TRANS_REQUEST:
- /* Read transaction request */
- if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
- printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
- }
- break;
- case TRANS_PREFETCH:
- do {
- if((val = prefetchReq((int)acceptfd)) != 0) {
- printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
- break;
- }
- recv_data((int)acceptfd, &control, sizeof(char));
- } while (control == TRANS_PREFETCH);
- break;
- case TRANS_PREFETCH_RESPONSE:
- do {
- if((val = getPrefetchResponse((int) acceptfd)) != 0) {
- printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
- break;
- }
- recv_data((int)acceptfd, &control, sizeof(char));
- } while (control == TRANS_PREFETCH_RESPONSE);
- break;
- case START_REMOTE_THREAD:
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
- objType = getObjType(oid);
- startDSMthread(oid, objType);
- break;
-
- case THREAD_NOTIFY_REQUEST:
- recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
- size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
- if((buffer = calloc(1,size)) == NULL) {
- printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
- }
-
- recv_data((int)acceptfd, buffer, size);
-
- oidarry = calloc(numoid, sizeof(unsigned int));
- memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
- size = sizeof(unsigned int) * numoid;
- versionarry = calloc(numoid, sizeof(unsigned short));
- memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
- size += sizeof(unsigned short) * numoid;
- mid = *((unsigned int *)(buffer+size));
- size += sizeof(unsigned int);
- threadid = *((unsigned int *)(buffer+size));
- processReqNotify(numoid, oidarry, versionarry, mid, threadid);
- free(buffer);
-
- break;
-
- case THREAD_NOTIFY_RESPONSE:
- size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
- if((buffer = calloc(1,size)) == NULL) {
- printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
- }
-
- recv_data((int)acceptfd, buffer, size);
-
- oid = *((unsigned int *)buffer);
- size = sizeof(unsigned int);
- version = *((unsigned short *)(buffer+size));
- size += sizeof(unsigned short);
- threadid = *((unsigned int *)(buffer+size));
- threadNotify(oid,version,threadid);
- free(buffer);
-
- break;
- default:
- printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
- }
-
- /* Close connection */
- if (close((int)acceptfd) == -1)
- perror("close");
+void *dstmAccept(void *acceptfd) {
+ int val, retval, size, sum, sockid;
+ unsigned int oid;
+ char *buffer;
+ char control,ctrl;
+ char *ptr;
+ void *srcObj;
+ objheader_t *h;
+ trans_commit_data_t transinfo;
+ unsigned short objType, *versionarry, version;
+ unsigned int *oidarry, numoid, mid, threadid;
+
+ transinfo.objlocked = NULL;
+ transinfo.objnotfound = NULL;
+ transinfo.modptr = NULL;
+ transinfo.numlocked = 0;
+ transinfo.numnotfound = 0;
+
+ /* Receive control messages from other machines */
+ while(true) {
+ int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+ if (ret==-1)
+ break;
+ switch(control) {
+ case READ_REQUEST:
+ /* Read oid requested and search if available */
+ recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ if((srcObj = mhashSearch(oid)) == NULL) {
+ printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+ break;
+ }
+ h = (objheader_t *) srcObj;
+ GETSIZE(size, h);
+ size += sizeof(objheader_t);
+ sockid = (int) acceptfd;
+
+ if (h == NULL) {
+ ctrl = OBJECT_NOT_FOUND;
+ send_data(sockid, &ctrl, sizeof(char));
+ } else {
+ /* Type */
+ char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+ *((int *)&msg[1])=size;
+ send_data(sockid, &msg, sizeof(msg));
+ send_data(sockid, h, size);
+ }
+ break;
+
+ case READ_MULT_REQUEST:
+ break;
+
+ case MOVE_REQUEST:
+ break;
+
+ case MOVE_MULT_REQUEST:
+ break;
+
+ case TRANS_REQUEST:
+ /* Read transaction request */
+ if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
+ printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ break;
+ case TRANS_PREFETCH:
+ if((val = prefetchReq((int)acceptfd)) != 0) {
+ printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
+ break;
+ case TRANS_PREFETCH_RESPONSE:
+ if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+ printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
+ break;
+ case START_REMOTE_THREAD:
+ recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ objType = getObjType(oid);
+ startDSMthread(oid, objType);
+ break;
+
+ case THREAD_NOTIFY_REQUEST:
+ recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+ size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
+ if((buffer = calloc(1,size)) == NULL) {
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
pthread_exit(NULL);
+ }
+
+ recv_data((int)acceptfd, buffer, size);
+
+ oidarry = calloc(numoid, sizeof(unsigned int));
+ memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
+ size = sizeof(unsigned int) * numoid;
+ versionarry = calloc(numoid, sizeof(unsigned short));
+ memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
+ size += sizeof(unsigned short) * numoid;
+ mid = *((unsigned int *)(buffer+size));
+ size += sizeof(unsigned int);
+ threadid = *((unsigned int *)(buffer+size));
+ processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+ free(buffer);
+
+ break;
+
+ case THREAD_NOTIFY_RESPONSE:
+ size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
+ if((buffer = calloc(1,size)) == NULL) {
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+
+ recv_data((int)acceptfd, buffer, size);
+
+ oid = *((unsigned int *)buffer);
+ size = sizeof(unsigned int);
+ version = *((unsigned short *)(buffer+size));
+ size += sizeof(unsigned short);
+ threadid = *((unsigned int *)(buffer+size));
+ threadNotify(oid,version,threadid);
+ free(buffer);
+ break;
+
+ case CLOSE_CONNECTION:
+ goto closeconnection;
+
+ default:
+ printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
+ }
+ }
+
+ closeconnection:
+ /* Close connection */
+ if (close((int)acceptfd) == -1)
+ perror("close");
+ pthread_exit(NULL);
}
-
+
/* This function reads the information available in a transaction request
* and makes a function call to process the request */
int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {