+
+/* This function sends notification request per thread waiting on object(s) whose version
+ * changes */
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
+ int sock,i;
+ objheader_t *objheader;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
+ char *ptr;
+ int bytesSent;
+ int status, size;
+ unsigned short version;
+ unsigned int oid,mid;
+ static unsigned int threadid = 0;
+ pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
+ pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
+ notifydata_t *ndata;
+
+ oid = oidarry[0];
+ if((mid = lhashSearch(oid)) == 0) {
+ printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
+ return;
+ }
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("reqNotify():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ /* Generate unique threadid */
+ threadid++;
+
+ /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+ if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+ printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ ndata->numoid = numoid;
+ ndata->threadid = threadid;
+ ndata->oidarry = oidarry;
+ ndata->versionarry = versionarry;
+ ndata->threadcond = threadcond;
+ ndata->threadnotify = threadnotify;
+ if((status = notifyhashInsert(threadid, ndata)) != 0) {
+ printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+ free(ndata);
+ return -1;
+ }
+
+ /* Send number of oids, oidarry, version array, machine id and threadid */
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("reqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ free(ndata);
+ return -1;
+ } else {
+ msg[0] = THREAD_NOTIFY_REQUEST;
+ *((unsigned int *)(&msg[1])) = numoid;
+ /* Send array of oids */
+ size = sizeof(unsigned int);
+ {
+ i = 0;
+ while(i < numoid) {
+ oid = oidarry[i];
+ *((unsigned int *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned int);
+ i++;
+ }
+ }
+
+ /* Send array of version */
+ {
+ i = 0;
+ while(i < numoid) {
+ version = versionarry[i];
+ *((unsigned short *)(&msg[1] + size)) = version;
+ size += sizeof(unsigned short);
+ i++;
+ }
+ }
+
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+ size += sizeof(unsigned int);
+ *((unsigned int *)(&msg[1] + size)) = threadid;
+ pthread_mutex_lock(&(ndata->threadnotify));
+ size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+ send_data(sock, msg, size);
+ pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
+ pthread_mutex_unlock(&(ndata->threadnotify));
+ }
+
+ pthread_cond_destroy(&threadcond);
+ pthread_mutex_destroy(&threadnotify);
+ free(ndata);
+ close(sock);
+ return status;
+}
+
+void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
+ notifydata_t *ndata;
+ int i, objIsFound = 0, index;
+ void *ptr;
+
+ //Look up the tid and call the corresponding pthread_cond_signal
+ if((ndata = notifyhashSearch(tid)) == NULL) {
+ printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ for(i = 0; i < ndata->numoid; i++) {
+ if(ndata->oidarry[i] == oid){
+ objIsFound = 1;
+ index = i;
+ }
+ }
+ if(objIsFound == 0){
+ printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ if(version <= ndata->versionarry[index]){
+ printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
+ return;
+ } else {
+ /* Clear from prefetch cache and free thread related data structure */
+ if((ptr = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
+ pthread_cond_signal(&(ndata->threadcond));
+ }
+ }
+ }
+ return;
+}
+
+int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
+ threadlist_t *ptr;
+ unsigned int mid;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+ int sock, status, size, bytesSent;
+
+ while(*head != NULL) {
+ ptr = *head;
+ mid = ptr->mid;
+ //create a socket connection to that machine
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("notifyAll():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+ //send Thread Notify response and threadid to that machine
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("notifyAll():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ fflush(stdout);
+ status = -1;
+ } else {
+ bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+ size)) = version;
+ size+= sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
+
+ size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sock, msg, size);
+ }
+ //close socket
+ close(sock);
+ // Update head
+ *head = ptr->next;
+ free(ptr);
+ }
+ return status;
+}
+
+void transAbort(transrecord_t *trans) {
+ objstrDelete(trans->cache);
+ chashDelete(trans->lookupTable);
+ free(trans);
+}
+
+/* This function inserts necessary information into
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+ plistnode_t *ptr, *tmp;
+ int found = 0, offset = 0;
+
+ tmp = pile;
+ //Add oid into a machine that is already present in the pile linked list structure
+ while(tmp != NULL) {
+ if (tmp->mid == mid) {
+ int tmpsize;
+
+ if (STATUS(headeraddr) & NEW) {
+ tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+ tmp->numcreated++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ }else if (STATUS(headeraddr) & DIRTY) {
+ tmp->oidmod[tmp->nummod] = OID(headeraddr);
+ tmp->nummod++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+ offset += sizeof(unsigned int);
+ *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+ tmp->numread ++;
+ }
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+ //Add oid for any new machine
+ if (!found) {
+ int tmpsize;
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return NULL;
+ }
+ ptr->mid = mid;
+ if (STATUS(headeraddr) & NEW) {
+ ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+ ptr->numcreated ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ ptr->oidmod[ptr->nummod] = OID(headeraddr);
+ ptr->nummod ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ *((unsigned int *)ptr->objread)=OID(headeraddr);
+ offset = sizeof(unsigned int);
+ *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+ ptr->numread ++;
+ }
+ ptr->next = pile;
+ pile = ptr;
+ }
+
+ /* Clear Flags */
+ STATUS(headeraddr) =0;
+
+ return pile;
+}