- int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
- int isArray = 0;
- unsigned int oid, index = 0;
- char *ptr, buffer[PRE_BUF_SIZE];
- void *mobj;
- unsigned int objoid;
- char control;
- objheader_t * header;
- int bytesRecvd;
-
- /* Repeatedly recv the oid and offset pairs sent for prefetch */
- while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
- count++;
- if(length == -1)
- break;
- sum = 0;
- index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
- // first 4 bytes are saved to send the
- // size of the buffer (that is computed at the end of the loop)
- bytesRecvd = 0;
- do {
- bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
- sizeof(unsigned int) - bytesRecvd, 0);
- } while (bytesRecvd < sizeof(unsigned int));
- numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
- N = numoffset * sizeof(short);
- short offset[numoffset];
- ptr = (char *)&offset;
- /* Recv the offset values per oid */
- do {
- n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
-
- /* Process each oid */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* send the oid, it's size, it's header and data */
- header = mobj;
- GETSIZE(size, header);
- size += sizeof(objheader_t);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- /* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
- isArray = 1;
- }
- if(isArray == 1) {
- int elementsize = classsize[TYPE(header)];
- objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
- } else {
- objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
- }
- if((header = mhashSearch(objoid)) == NULL) {
- /* Obj not found, send oid */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- break;
- } else {/* Obj Found */
- /* send the oid, it's size, it's header and data */
- GETSIZE(size, header);
- size+=sizeof(objheader_t);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer+index, header, size);
- index += size;
- isArray = 0;
- continue;
- }
- }
- }
- /* Check for overflow in the buffer */
- if (index >= PRE_BUF_SIZE) {
- printf("Char buffer is overflowing\n");
- return 1;
- }
- /* Send Prefetch response control message only once*/
- if(count == 1) {
- control = TRANS_PREFETCH_RESPONSE;
- if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
- return 1;
- }
- }
-
- /* Add the buffer size into buffer as a parameter */
- *((unsigned int *)buffer)=index;
- /* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
- perror("Error sending oids found\n");
- return 1;
- }
- }
- return 0;
+ int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
+ int length, sd = -1;
+ char *recvbuffer, *sendbuffer, control;
+ unsigned int oid, mid;
+ short *offsetarry;
+ objheader_t *header;
+ struct sockaddr_in remoteAddr;
+
+ do {
+ recv_data((int)acceptfd, &length, sizeof(int));
+ if(length != -1) {
+ size = length - sizeof(int);
+ if((recvbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ recv_data((int)acceptfd, recvbuffer, size);
+ oid = *((unsigned int *) recvbuffer);
+ mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
+ size = size - (2 * sizeof(unsigned int));
+ numoffset = size / sizeof(short);
+ if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(recvbuffer);
+ return -1;
+ }
+ memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
+ free(recvbuffer);
+#if 0
+ pthread_mutex_lock(&sockLock);
+ sockIdFound = 0;
+ pthread_mutex_unlock(&sockLock);
+ for(i = 0; i < NUM_MACHINES; i++) {
+ if(sockArray[i].mid == mid) {
+ sd = sockArray[i].sockid;
+ pthread_mutex_lock(&sockLock);
+ sockIdFound = 1;
+ pthread_mutex_unlock(&sockLock);
+ break;
+ }
+ }
+
+ if(sockIdFound == 0) {
+ if(sockCount < NUM_MACHINES) {
+
+#endif
+ /* Create socket to send information */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("prefetchReq():socket()");
+ return -1;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return -1;
+ }
+
+#if 0
+ sockArray[sockCount].mid = mid;
+ sockArray[sockCount].sockid = sd;
+ pthread_mutex_lock(&sockLock);
+ sockCount++;
+ pthread_mutex_unlock(&sockLock);
+ } else {
+ //TODO Fix for connecting to more than 2 machines && close socket
+ printf("%s(): Error: Currently works for only 2 machines\n", __func__);
+ return -1;
+ }
+ }
+#endif
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ } else { /* Object Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) > NUMCLASSES) {
+ isArray = 1;
+ }
+ if(isArray == 1) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ unsigned short length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+ break;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+ }
+
+ if((header = mhashSearch(oid)) == NULL) {
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ break;
+ } else {/* Obj Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ }
+ isArray = 0;
+ }
+ free(offsetarry);
+ }
+ close(sd);
+ }
+ } while (length != -1);
+ return 0;
+}
+
+int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+ int numbytes = 0;
+
+ send_data(sd, control, sizeof(char));
+ /* Send the buffer with its size */
+ int length = *(size);
+ send_data(sd, sendbuffer, length);
+ free(sendbuffer);
+ return 0;