+ /* Process each modified object saved in the mainobject store */
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize,header);
+ pthread_mutex_lock(&mainobjstore_mutex);
+ memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
+ header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ offset += sizeof(objheader_t) + tmpsize;
+ }
+
+ if (nummod > 0)
+ free(modptr);
+
+ /* Unlock locked objects */
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ STATUS(header) &= ~(LOCK);
+ }
+ //TODO Update location lookup table
+
+ /* Send ack to coordinator */
+ control = TRANS_SUCESSFUL;
+ send_data((int)acceptfd, &control, sizeof(char));
+ return 0;
+}
+
+/* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
+ * Looks for the objects to be prefetched in the main object store.
+ * If objects are not found then record those and if objects are found
+ * then use offset values to prefetch references to other objects */
+
+int prefetchReq(int acceptfd) {
+ int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
+ int length, sd = -1;
+ char *recvbuffer, *sendbuffer, control;
+ unsigned int oid, mid;
+ short *offsetarry;
+ objheader_t *header;
+ struct sockaddr_in remoteAddr;
+
+ do {
+ recv_data((int)acceptfd, &length, sizeof(int));
+ if(length != -1) {
+ size = length - sizeof(int);
+ if((recvbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ recv_data((int)acceptfd, recvbuffer, size);
+ oid = *((unsigned int *) recvbuffer);
+ mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
+ size = size - (2 * sizeof(unsigned int));
+ numoffset = size / sizeof(short);
+ if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(recvbuffer);
+ return -1;
+ }
+ memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
+ free(recvbuffer);
+#if 0
+ pthread_mutex_lock(&sockLock);
+ sockIdFound = 0;
+ pthread_mutex_unlock(&sockLock);
+ for(i = 0; i < NUM_MACHINES; i++) {
+ if(sockArray[i].mid == mid) {
+ sd = sockArray[i].sockid;
+ pthread_mutex_lock(&sockLock);
+ sockIdFound = 1;
+ pthread_mutex_unlock(&sockLock);
+ break;
+ }
+ }
+
+ if(sockIdFound == 0) {
+ if(sockCount < NUM_MACHINES) {
+
+#endif
+ /* Create socket to send information */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("prefetchReq():socket()");
+ return -1;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return -1;
+ }
+
+#if 0
+ sockArray[sockCount].mid = mid;
+ sockArray[sockCount].sockid = sd;
+ pthread_mutex_lock(&sockLock);
+ sockCount++;
+ pthread_mutex_unlock(&sockLock);
+ } else {
+ //TODO Fix for connecting to more than 2 machines && close socket
+ printf("%s(): Error: Currently works for only 2 machines\n", __func__);
+ return -1;
+ }
+ }
+#endif
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ } else { /* Object Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) > NUMCLASSES) {
+ isArray = 1;
+ }
+ if(isArray == 1) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ unsigned short length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+ break;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+ }
+
+ if((header = mhashSearch(oid)) == NULL) {
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ break;
+ } else {/* Obj Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ }
+ isArray = 0;
+ }
+ free(offsetarry);
+ }
+ close(sd);
+ }
+ } while (length != -1);
+ return 0;
+}
+
+int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+ int numbytes = 0;
+
+ send_data(sd, control, sizeof(char));
+ /* Send the buffer with its size */
+ int length = *(size);
+ send_data(sd, sendbuffer, length);
+ free(sendbuffer);
+ return 0;
+}
+
+void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
+ objheader_t *header;
+ unsigned int oid;
+ unsigned short newversion;
+ char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
+ int sd;
+ struct sockaddr_in remoteAddr;
+ int bytesSent;
+ int size;
+
+ int i = 0;
+ while(i < numoid) {
+ oid = *(oidarry + i);
+ if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ /* Check to see if versions are same */
+checkversion:
+ if ((STATUS(header) & LOCK) != LOCK) {
+ //FIXME make locking atomic
+ STATUS(header) |= LOCK;
+ newversion = header->version;
+ if(newversion == *(versionarry + i)) {
+ //Add to the notify list
+ if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+ printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ STATUS(header) &= ~(LOCK);
+ } else {
+ STATUS(header) &= ~(LOCK);
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("processReqNotify():socket()");
+ return;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return;
+ } else {
+ //Send Update notification
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+size)) = newversion;
+ size += sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+size)) = threadid;
+ size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sd, msg, size);
+ }
+ close(sd);
+ }
+ } else {
+ randomdelay();
+ goto checkversion;
+ }
+ }
+ i++;
+ }
+ free(oidarry);
+ free(versionarry);
+}