switch(control) {
case READ_REQUEST:
- /* Read oid requested and search if available */
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
- if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
- pthread_exit(NULL);
- }
- h = (objheader_t *) srcObj;
- GETSIZE(size, h);
- size += sizeof(objheader_t);
- sockid = (int) acceptfd;
-
- if (h == NULL) {
- ctrl = OBJECT_NOT_FOUND;
- send_data(sockid, &ctrl, sizeof(char));
- } else {
- /* Type */
- char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
- *((int *)&msg[1])=size;
- send_data(sockid, &msg, sizeof(msg));
- send_data(sockid, h, size);
- }
+ do {
+ /* Read oid requested and search if available */
+ recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ if((srcObj = mhashSearch(oid)) == NULL) {
+ printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+ break;
+ }
+ h = (objheader_t *) srcObj;
+ GETSIZE(size, h);
+ size += sizeof(objheader_t);
+ sockid = (int) acceptfd;
+
+ if (h == NULL) {
+ ctrl = OBJECT_NOT_FOUND;
+ send_data(sockid, &ctrl, sizeof(char));
+ } else {
+ /* Type */
+ char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+ *((int *)&msg[1])=size;
+ send_data(sockid, &msg, sizeof(msg));
+ send_data(sockid, h, size);
+ }
+ recv_data((int)acceptfd, &control, sizeof(char));
+ } while(control == READ_REQUEST);
break;
case READ_MULT_REQUEST:
} while (control == TRANS_PREFETCH);
break;
case TRANS_PREFETCH_RESPONSE:
- //do {
+ do {
if((val = getPrefetchResponse((int) acceptfd)) != 0) {
printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
+ break;
}
- //} while (control == TRANS_PREFETCH_RESPONSE);
+ recv_data((int)acceptfd, &control, sizeof(char));
+ } while (control == TRANS_PREFETCH_RESPONSE);
break;
case START_REMOTE_THREAD:
recv_data((int)acceptfd, &oid, sizeof(unsigned int));
}
memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
free(recvbuffer);
-#if 0
pthread_mutex_lock(&sockLock);
sockIdFound = 0;
pthread_mutex_unlock(&sockLock);
+ /* If socket is already established then send data reusing socket */
for(i = 0; i < NUM_MACHINES; i++) {
if(sockArray[i].mid == mid) {
sd = sockArray[i].sockid;
if(sockIdFound == 0) {
if(sockCount < NUM_MACHINES) {
-
-#endif
/* Create socket to send information */
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
perror("prefetchReq():socket()");
remoteAddr.sin_addr.s_addr = htonl(mid);
if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ perror("connect");
printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
close(sd);
return -1;
}
-
-#if 0
sockArray[sockCount].mid = mid;
sockArray[sockCount].sockid = sd;
pthread_mutex_lock(&sockLock);
return -1;
}
}
-#endif
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
}
free(offsetarry);
}
- close(sd);
}
} while (length != -1);
return 0;
unsigned int oidMin;
unsigned int oidMax;
-/************************************************************
+/************************************************************************
* Global variables to map socketid and remote mid to
- * reuse sockets
- ***********************************************************/
+ * reuse sockets for sending prefetches and making remote read requests
+ ************************************************************************/
midSocketInfo_t midSocketArray[NUM_MACHINES];
int sockCount; //number of connections with all remote machines(one socket per mc)
int sockIdFound; //track if socket file descriptor is already established
+midSocketInfo_t sockArrayRemoteRead[NUM_MACHINES];
+int sockCountRemoteRead; //number of connections with all remote machines(one socket per mc)
+int sockIdFoundRemoteRead; //track if socket file descriptor is already established
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
if (numbytes == -1) {
perror("send");
- printf("error: at %s, %d\n", __FILE__, __LINE__);
exit(-1);
}
buffer += numbytes;
numbytes = recv(fd, buffer, size, 0);
if (numbytes == -1) {
perror("recv");
- printf("error: at %s, %d\n", __FILE__, __LINE__);
exit(-1);
}
buffer += numbytes;
printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
exit(-1);
}
-
return 0;
}
objheader_t *h;
void *objcopy;
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket\n");
- return NULL;
- }
-
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- midtoIP(mnum,machineip);
- machineip[15] = '\0';
- serv_addr.sin_addr.s_addr = inet_addr(machineip);
-
- // Open connection
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("getRemoteObj() Error in connect\n");
- return NULL;
- }
-
+ int i;
+ for(i = 0; i < NUM_MACHINES; i++) {
+ if(sockArrayRemoteRead[i].mid == mnum) {
+ sd = sockArrayRemoteRead[i].sockid;
+ sockIdFoundRemoteRead = 1;
+ break;
+ }
+ }
+
+ if(sockIdFoundRemoteRead == 0) {
+ if(sockCountRemoteRead < NUM_MACHINES) {
+ /* Create socket */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket\n");
+ return NULL;
+ }
+
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ serv_addr.sin_addr.s_addr = htonl(mnum);
+ // Open connection
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ perror("getRemoteObj() Error in connect\n");
+ close(sd);
+ return NULL;
+ }
+ sockArrayRemoteRead[sockCountRemoteRead].mid = mnum;
+ sockArrayRemoteRead[sockCountRemoteRead].sockid = sd;
+ sockCountRemoteRead++;
+ } else {
+ //TODO Fix for connecting to more than 2 machines && close socket
+ printf("%s(): Error: Currently works for two remote machines\n", __func__);
+ return NULL;
+ }
+ }
+
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
return NULL;
}
- //Close connection
- close(sd);
return objcopy;
}