extern int classsize[];
extern int numHostsInSystem;
extern pthread_mutex_t notifymutex;
-extern unsigned int myIpAddr;
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
int dstmInit(void)
{
- mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
- /* Initialize attribute for mutex */
- pthread_mutexattr_init(&mainobjstore_mutex_attr);
- pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
- pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
- pthread_mutex_init(&lockObjHeader,NULL);
- if (mhashCreate(HASH_SIZE, LOADFACTOR))
- return 1; //failure
-
- if (lhashCreate(HASH_SIZE, LOADFACTOR))
- return 1; //failure
-
- if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
- return 1; //failure
-
- //Initialize socket pool
- if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
- printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
- return 0;
- }
- return 0;
+ mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
+ /* Initialize attribute for mutex */
+ pthread_mutexattr_init(&mainobjstore_mutex_attr);
+ pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+ pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
+ pthread_mutex_init(&lockObjHeader,NULL);
+ if (mhashCreate(HASH_SIZE, LOADFACTOR))
+ return 1; //failure
+
+ if (lhashCreate(HASH_SIZE, LOADFACTOR))
+ return 1; //failure
+
+ if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
+ return 1; //failure
+
+ //Initialize socket pool
+ if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
+ printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+
+ return 0;
}
/* This function starts the thread to listen on a socket
* for tranaction calls */
void *dstmListen()
{
- int listenfd, acceptfd;
- struct sockaddr_in my_addr;
- struct sockaddr_in client_addr;
- socklen_t addrlength = sizeof(struct sockaddr);
- pthread_t thread_dstm_accept;
- int i;
- int setsockflag=1;
-
- listenfd = socket(AF_INET, SOCK_STREAM, 0);
- if (listenfd == -1)
- {
- perror("socket");
- exit(1);
- }
+ int listenfd, acceptfd;
+ struct sockaddr_in my_addr;
+ struct sockaddr_in client_addr;
+ socklen_t addrlength = sizeof(struct sockaddr);
+ pthread_t thread_dstm_accept;
+ int i;
+ int setsockflag=1;
+
+ listenfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (listenfd == -1)
+ {
+ perror("socket");
+ exit(1);
+ }
- if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
- perror("socket");
- exit(1);
- }
+ if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
#ifdef MAC
- if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
- perror("socket");
- exit(1);
- }
+ if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
#endif
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(LISTEN_PORT);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- memset(&(my_addr.sin_zero), '\0', 8);
-
- if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
- {
- perror("bind");
- exit(1);
- }
+ my_addr.sin_family = AF_INET;
+ my_addr.sin_port = htons(LISTEN_PORT);
+ my_addr.sin_addr.s_addr = INADDR_ANY;
+ memset(&(my_addr.sin_zero), '\0', 8);
- if (listen(listenfd, BACKLOG) == -1)
- {
- perror("listen");
- exit(1);
- }
+ if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
+ {
+ perror("bind");
+ exit(1);
+ }
+
+ if (listen(listenfd, BACKLOG) == -1)
+ {
+ perror("listen");
+ exit(1);
+ }
- printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
- while(1)
- {
- int retval;
- int flag=1;
- acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
- if (acceptfd < 0) {
- perror("Error in accept: ");
- printf("error %x", acceptfd); fflush(stdout);
- }
- setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
- do {
- retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
- } while(retval!=0);
- pthread_detach(thread_dstm_accept);
- }
+ printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
+ while(1)
+ {
+ int retval;
+ int flag=1;
+ acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+ setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
+ do {
+ retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+ } while(retval!=0);
+ pthread_detach(thread_dstm_accept);
+ }
}
/* This function accepts a new connection request, decodes the control message in the connection
* and accordingly calls other functions to process new requests */
trans_commit_data_t transinfo;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
-
+
/* Receive control messages from other machines */
- if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) {
- printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
- pthread_exit(NULL);
- }
-
- switch(control) {
+ while(1) {
+ int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+ if (ret==0)
+ break;
+ if (ret==-1) {
+ printf("DEBUG -> RECV Error!.. retrying\n");
+ break;
+ }
+ switch(control) {
case READ_REQUEST:
- do {
- /* Read oid requested and search if available */
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
- if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
- break;
- }
- h = (objheader_t *) srcObj;
- GETSIZE(size, h);
- size += sizeof(objheader_t);
- sockid = (int) acceptfd;
-
- if (h == NULL) {
- ctrl = OBJECT_NOT_FOUND;
- send_data(sockid, &ctrl, sizeof(char));
- } else {
- /* Type */
- char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
- *((int *)&msg[1])=size;
- send_data(sockid, &msg, sizeof(msg));
- send_data(sockid, h, size);
- }
- if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) {
- pthread_exit(NULL);
- }
- } while(control == READ_REQUEST);
+ /* Read oid requested and search if available */
+ recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ if((srcObj = mhashSearch(oid)) == NULL) {
+ printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+ break;
+ }
+ h = (objheader_t *) srcObj;
+ GETSIZE(size, h);
+ size += sizeof(objheader_t);
+ sockid = (int) acceptfd;
+
+ if (h == NULL) {
+ ctrl = OBJECT_NOT_FOUND;
+ send_data(sockid, &ctrl, sizeof(char));
+ } else {
+ /* Type */
+ char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+ *((int *)&msg[1])=size;
+ send_data(sockid, &msg, sizeof(msg));
+ send_data(sockid, h, size);
+ }
break;
-
+
case READ_MULT_REQUEST:
break;
-
+
case MOVE_REQUEST:
break;
-
+
case MOVE_MULT_REQUEST:
break;
-
+
case TRANS_REQUEST:
/* Read transaction request */
transinfo.objlocked = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
- printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
- break;
+ printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
}
break;
case TRANS_PREFETCH:
- do {
- if((val = prefetchReq((int)acceptfd)) != 0) {
- printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
- break;
- }
- if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) {
- pthread_exit(NULL);
- }
- } while (control == TRANS_PREFETCH);
+ if((val = prefetchReq((int)acceptfd)) != 0) {
+ printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
break;
case TRANS_PREFETCH_RESPONSE:
- do {
- if((val = getPrefetchResponse((int) acceptfd)) != 0) {
- printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
- break;
- }
- if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) {
- pthread_exit(NULL);
- }
- } while (control == TRANS_PREFETCH_RESPONSE);
+ if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+ printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
break;
case START_REMOTE_THREAD:
recv_data((int)acceptfd, &oid, sizeof(unsigned int));
objType = getObjType(oid);
startDSMthread(oid, objType);
break;
-
+
case THREAD_NOTIFY_REQUEST:
recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
- printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
}
-
+
recv_data((int)acceptfd, buffer, size);
-
+
oidarry = calloc(numoid, sizeof(unsigned int));
memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
size = sizeof(unsigned int) * numoid;
threadid = *((unsigned int *)(buffer+size));
processReqNotify(numoid, oidarry, versionarry, mid, threadid);
free(buffer);
-
+
break;
case THREAD_NOTIFY_RESPONSE:
size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
- printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
}
-
+
recv_data((int)acceptfd, buffer, size);
-
+
oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
version = *((unsigned short *)(buffer+size));
default:
printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
+ }
}
-closeconnection:
+ closeconnection:
/* Close connection */
if (close((int)acceptfd) == -1)
perror("close");