fix bugs in sockpool...test and set has to be atomic
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "machinepile.h"
5 #include "mlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "prelookup.h"
9 #include "threadnotify.h"
10 #include "queue.h"
11 #ifdef COMPILER
12 #include "thread.h"
13 #endif
14
15 #define NUM_THREADS 1
16 #define PREFETCH_CACHE_SIZE 1048576 //1MB
17 #define CONFIG_FILENAME "dstm.conf"
18
19 /* Global Variables */
20 extern int classsize[];
21 extern primarypfq_t pqueue; //Shared prefetch queue
22 objstr_t *prefetchcache; //Global Prefetch cache
23 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
24 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
25 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
26 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
27 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
28 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
29 extern objstr_t *mainobjstore;
30 unsigned int myIpAddr;
31 unsigned int *hostIpAddrs;
32 int sizeOfHostArray;
33 int numHostsInSystem;
34 int myIndexInHostArray;
35 unsigned int oidsPerBlock;
36 unsigned int oidMin;
37 unsigned int oidMax;
38
39 sockPoolHashTable_t *transReadSockPool;
40 sockPoolHashTable_t *transPrefetchSockPool;
41
42 void printhex(unsigned char *, int);
43 plistnode_t *createPiles(transrecord_t *);
44
45 /*******************************
46  * Send and Recv function calls 
47  *******************************/
48 void send_data(int fd , void *buf, int buflen) {
49         char *buffer = (char *)(buf); 
50         int size = buflen;
51         int numbytes; 
52         while (size > 0) {
53                 numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
54                 if (numbytes == -1) {
55                         perror("send");
56                         exit(-1);
57                 }
58                 buffer += numbytes;
59                 size -= numbytes;
60         }
61 }
62
63 void recv_data(int fd , void *buf, int buflen) {
64         char *buffer = (char *)(buf); 
65         int size = buflen;
66         int numbytes; 
67         while (size > 0) {
68                 numbytes = recv(fd, buffer, size, 0);
69                 if (numbytes == -1) {
70                         perror("recv");
71                         exit(-1);
72                 }
73                 buffer += numbytes;
74                 size -= numbytes;
75         }
76 }
77
78 int recv_data_errorcode(int fd , void *buf, int buflen) {
79   char *buffer = (char *)(buf); 
80   int size = buflen;
81   int numbytes; 
82   while (size > 0) {
83     numbytes = recv(fd, buffer, size, 0);
84     if (numbytes == -1) {
85       return -1;
86     }
87     buffer += numbytes;
88     size -= numbytes;
89   }
90   return 0;
91 }
92
93 void printhex(unsigned char *ptr, int numBytes)
94 {
95         int i;
96         for (i = 0; i < numBytes; i++)
97         {
98                 if (ptr[i] < 16)
99                         printf("0%x ", ptr[i]);
100                 else
101                         printf("%x ", ptr[i]);
102         }
103         printf("\n");
104         return;
105 }
106
107 inline int arrayLength(int *array) {
108         int i;
109         for(i=0 ;array[i] != -1; i++)
110                 ;
111         return i;
112 }
113 inline int findmax(int *array, int arraylength) {
114         int max, i;
115         max = array[0];
116         for(i = 0; i < arraylength; i++){
117                 if(array[i] > max) {
118                         max = array[i];
119                 }
120         }
121         return max;
122 }
123 /* This function is a prefetch call generated by the compiler that
124  * populates the shared primary prefetch queue*/
125 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
126   /* Allocate for the queue node*/
127   if(ntuples > 0) {
128     int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
129     char * node;
130     
131     if((node = calloc(1, qnodesize)) == NULL) {
132       printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
133       return;
134     } else {
135       /* Set queue node values */
136       int len = sizeof(prefetchqelem_t);
137       int i;
138       unsigned int *narray;
139       unsigned short *narray2;
140       short * narray3;
141       int top=endoffsets[ntuples-1];
142       *((int *)(node+len))=ntuples;
143       len += sizeof(int);
144       narray=(unsigned int *)(node+len);
145       narray2=(unsigned short *)(narray+ntuples);
146       narray3=(short *)(narray2+ntuples);
147       
148       for(i=0;i<ntuples;i++) {
149         narray[i]=oids[i];
150         narray2[i]=endoffsets[i];
151       }
152       for(i=0;i<top;i++) {
153         narray3[i]=arrayfields[i];
154       }
155       /* Lock and insert into primary prefetch queue */
156       pthread_mutex_lock(&pqueue.qlock);
157       pre_enqueue((prefetchqelem_t *)node);
158       pthread_cond_signal(&pqueue.qcond);
159       pthread_mutex_unlock(&pqueue.qlock);
160     }
161   }
162 }
163
164 /* This function starts up the transaction runtime. */
165 int dstmStartup(const char * option) {
166   pthread_t thread_Listen;
167   pthread_attr_t attr;
168   int master=option!=NULL && strcmp(option, "master")==0;
169   
170   if (processConfigFile() != 0)
171     return 0; //TODO: return error value, cause main program to exit
172 #ifdef COMPILER
173   if (!master)
174     threadcount--;
175 #endif
176   
177   //Initialize socket pool
178   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
179   transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
180   
181   dstmInit();
182   transInit();
183   
184   if (master) {
185     pthread_attr_init(&attr);
186     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
187     pthread_create(&thread_Listen, &attr, dstmListen, NULL);
188     return 1;
189   } else {
190     dstmListen();
191     return 0;
192   }
193 }
194
195 //TODO Use this later
196 void *pCacheAlloc(objstr_t *store, unsigned int size) {
197         void *tmp;
198         objstr_t *ptr;
199         ptr = store;
200         int success = 0;
201
202         while(ptr->next != NULL) {
203                 /* check if store is empty */
204                 if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
205                         tmp = ptr->top;
206                         ptr->top += size;
207                         success = 1;
208                         return tmp;
209                 } else {
210                         ptr = ptr-> next;
211                 }
212         }
213
214         if(success == 0) {
215                 return NULL;
216         }
217 }
218
219 /* This function initiates the prefetch thread
220  * A queue is shared between the main thread of execution
221  * and the prefetch thread to process the prefetch call
222  * Call from compiler populates the shared queue with prefetch requests while prefetch thread
223  * processes the prefetch requests */
224 void transInit() {
225         int t, rc;
226         int retval;
227         //Create and initialize prefetch cache structure
228         prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
229         //prefetchcache->next = objstrCreate(PREFETCH_CACHE_SIZE);
230         //prefetchcache->next->next = objstrCreate(PREFETCH_CACHE_SIZE);
231
232         /* Initialize attributes for mutex */
233         pthread_mutexattr_init(&prefetchcache_mutex_attr);
234         pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
235         
236         pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
237
238         //Create prefetch cache lookup table
239         if(prehashCreate(HASH_SIZE, LOADFACTOR))
240                 return; //Failure
241
242         //Initialize primary shared queue
243         queueInit();
244         //Initialize machine pile w/prefetch oids and offsets shared queue
245         mcpileqInit();
246
247         //Create the primary prefetch thread 
248         do {
249           retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
250         } while(retval!=0);
251         pthread_detach(tPrefetch);
252 }
253
254 /* This function stops the threads spawned */
255 void transExit() {
256         int t;
257         pthread_cancel(tPrefetch);
258         for(t = 0; t < NUM_THREADS; t++)
259                 pthread_cancel(wthreads[t]);
260
261         return;
262 }
263
264 /* This functions inserts randowm wait delays in the order of msec
265  * Mostly used when transaction commits retry*/
266 void randomdelay()
267 {
268         struct timespec req;
269         time_t t;
270
271         t = time(NULL);
272         req.tv_sec = 0;
273         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
274         nanosleep(&req, NULL);
275         return;
276 }
277
278 /* This function initializes things required in the transaction start*/
279 transrecord_t *transStart()
280 {
281         transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
282         tmp->cache = objstrCreate(1048576);
283         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
284 #ifdef COMPILER
285         tmp->revertlist=NULL;
286 #endif
287         return tmp;
288 }
289
290 /* This function finds the location of the objects involved in a transaction
291  * and returns the pointer to the object if found in a remote location */
292 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
293   unsigned int machinenumber;
294   objheader_t *tmp, *objheader;
295   objheader_t *objcopy;
296   int size, found = 0;
297   void *buf;
298   
299   if(oid == 0) {
300     printf("Error: %s, %d oid is NULL \n", __FILE__, __LINE__);
301     return NULL;
302   }
303   
304   /* Search local transaction cache */
305   if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
306 #ifdef COMPILER
307     return &objheader[1];
308 #else
309     return objheader;
310 #endif
311   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
312     /* Look up in machine lookup table  and copy  into cache*/
313     GETSIZE(size, objheader);
314     size += sizeof(objheader_t);
315     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
316     memcpy(objcopy, objheader, size);
317     /* Insert into cache's lookup table */
318     chashInsert(record->lookupTable, OID(objheader), objcopy); 
319 #ifdef COMPILER
320     return &objcopy[1];
321 #else
322     return objcopy;
323 #endif
324   } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
325     GETSIZE(size, tmp);
326     size+=sizeof(objheader_t);
327     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
328     memcpy(objcopy, tmp, size);
329     /* Insert into cache's lookup table */
330     chashInsert(record->lookupTable, OID(tmp), objcopy); 
331 #ifdef COMPILER
332     return &objcopy[1];
333 #else
334     return objcopy;
335 #endif
336   } else {
337     /* Get the object from the remote location */
338     if((machinenumber = lhashSearch(oid)) == 0) {
339       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
340       return NULL;
341     }
342     objcopy = getRemoteObj(record, machinenumber, oid);
343     
344     if(objcopy == NULL) {
345       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
346       return NULL;
347     } else {
348       
349 #ifdef COMPILER
350       return &objcopy[1];
351 #else
352       return objcopy;
353 #endif
354     }
355   }
356 }
357
358 /* This function creates objects in the transaction record */
359 objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
360 {
361         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
362         tmp->notifylist = NULL;
363         OID(tmp) = getNewOID();
364         tmp->version = 1;
365         tmp->rcount = 1;
366         STATUS(tmp) = NEW;
367         chashInsert(record->lookupTable, OID(tmp), tmp);
368
369 #ifdef COMPILER
370         return &tmp[1]; //want space after object header
371 #else
372         return tmp;
373 #endif
374 }
375
376 /* This function creates machine piles based on all machines involved in a
377  * transaction commit request */
378 plistnode_t *createPiles(transrecord_t *record) {
379         int i = 0;
380         unsigned int size;/* Represents number of bins in the chash table */
381         chashlistnode_t *curr, *ptr, *next;
382         plistnode_t *pile = NULL;
383         unsigned int machinenum;
384         void *localmachinenum;
385         objheader_t *headeraddr;
386
387         ptr = record->lookupTable->table;
388         size = record->lookupTable->size;
389
390         for(i = 0; i < size ; i++) {
391                 curr = &ptr[i];
392                 /* Inner loop to traverse the linked list of the cache lookupTable */
393                 while(curr != NULL) {
394                         //if the first bin in hash table is empty
395                         if(curr->key == 0) {
396                                 break;
397                         }
398                         next = curr->next;
399
400                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
401                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
402                                 return NULL;
403                         }
404
405                         //Get machine location for object id (and whether local or not)
406                         if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
407                                 machinenum = myIpAddr;
408                         } else  if ((machinenum = lhashSearch(curr->key)) == 0) {
409                                 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
410                                 return NULL;
411                         }
412
413                         //Make machine groups
414                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
415                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
416                                 return NULL;
417                         }
418
419                         curr = next;
420                 }
421         }
422         return pile; 
423 }
424
425 /* This function initiates the transaction commit process
426  * Spawns threads for each of the new connections with Participants 
427  * and creates new piles by calling the createPiles(), 
428  * Sends a transrequest() to each remote machines for objects found remotely 
429  * and calls handleLocalReq() to process objects found locally */
430 int transCommit(transrecord_t *record) {        
431         unsigned int tot_bytes_mod, *listmid;
432         plistnode_t *pile, *pile_ptr;
433         int i, j, rc, val;
434         int pilecount, offset, threadnum = 0, trecvcount = 0;
435         char control;
436         char transid[TID_LEN];
437         trans_req_data_t *tosend;
438         trans_commit_data_t transinfo;
439         static int newtid = 0;
440         char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
441         char localstat = 0;
442         thread_data_array_t *thread_data_array;
443         local_thread_data_array_t *ltdata;
444
445         do { 
446                 trecvcount = 0; 
447                 threadnum = 0; 
448                 treplyretry = 0;
449                 thread_data_array = NULL;
450                 ltdata = NULL;
451
452                 /* Look through all the objects in the transaction record and make piles 
453                  * for each machine involved in the transaction*/
454                 pile_ptr = pile = createPiles(record);
455
456                 /* Create the packet to be sent in TRANS_REQUEST */
457
458                 /* Count the number of participants */
459                 pilecount = pCount(pile);
460
461                 /* Create a list of machine ids(Participants) involved in transaction   */
462                 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
463                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
464                         return 1;
465                 }               
466                 pListMid(pile, listmid);
467
468
469                 /* Initialize thread variables,
470                  * Spawn a thread for each Participant involved in a transaction */
471                 pthread_t thread[pilecount];
472                 pthread_attr_t attr;                    
473                 pthread_cond_t tcond;
474                 pthread_mutex_t tlock;
475                 pthread_mutex_t tlshrd;
476
477                 if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
478                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
479                         pthread_cond_destroy(&tcond);
480                         pthread_mutex_destroy(&tlock);
481                         pDelete(pile_ptr);
482                         free(listmid);
483                         return 1;
484                 }
485
486                 if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
487                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
488                         pthread_cond_destroy(&tcond);
489                         pthread_mutex_destroy(&tlock);
490                         pDelete(pile_ptr);
491                         free(listmid);
492                         free(thread_data_array);
493                         return 1;
494                 }
495
496                 thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
497
498                 /* Initialize and set thread detach attribute */
499                 pthread_attr_init(&attr);
500                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
501                 pthread_mutex_init(&tlock, NULL);
502                 pthread_cond_init(&tcond, NULL);
503
504                 /* Process each machine pile */
505                 while(pile != NULL) {
506                         //Create transaction id
507                         newtid++;
508                         if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
509                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
510                                 pthread_cond_destroy(&tcond);
511                                 pthread_mutex_destroy(&tlock);
512                                 pDelete(pile_ptr);
513                                 free(listmid);
514                                 free(thread_data_array);
515                                 free(ltdata);
516                                 return 1;
517                         }
518                         tosend->f.control = TRANS_REQUEST;
519                         sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
520                         tosend->f.mcount = pilecount;
521                         tosend->f.numread = pile->numread;
522                         tosend->f.nummod = pile->nummod;
523                         tosend->f.numcreated = pile->numcreated;
524                         tosend->f.sum_bytes = pile->sum_bytes;
525                         tosend->listmid = listmid;
526                         tosend->objread = pile->objread;
527                         tosend->oidmod = pile->oidmod;
528                         tosend->oidcreated = pile->oidcreated;
529                         thread_data_array[threadnum].thread_id = threadnum;
530                         thread_data_array[threadnum].mid = pile->mid;
531                         thread_data_array[threadnum].buffer = tosend;
532                         thread_data_array[threadnum].recvmsg = rcvd_control_msg;
533                         thread_data_array[threadnum].threshold = &tcond;
534                         thread_data_array[threadnum].lock = &tlock;
535                         thread_data_array[threadnum].count = &trecvcount;
536                         thread_data_array[threadnum].replyctrl = &treplyctrl;
537                         thread_data_array[threadnum].replyretry = &treplyretry;
538                         thread_data_array[threadnum].rec = record;
539                         /* If local do not create any extra connection */
540                         if(pile->mid != myIpAddr) { /* Not local */
541                                 do {
542                                         rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
543                                 } while(rc!=0);
544                                 if(rc) {
545                                         perror("Error in pthread create\n");
546                                         pthread_cond_destroy(&tcond);
547                                         pthread_mutex_destroy(&tlock);
548                                         pDelete(pile_ptr);
549                                         free(listmid);
550                                         for (i = 0; i < threadnum; i++)
551                                                 free(thread_data_array[i].buffer);
552                                         free(thread_data_array);
553                                         free(ltdata);
554                                         return 1;
555                                 }
556                         } else { /*Local*/
557                                 ltdata->tdata = &thread_data_array[threadnum];
558                                 ltdata->transinfo = &transinfo;
559                                 do {
560                                         val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
561                                 } while(val!=0);
562                                 if(val) {
563                                         perror("Error in pthread create\n");
564                                         pthread_cond_destroy(&tcond);
565                                         pthread_mutex_destroy(&tlock);
566                                         pDelete(pile_ptr);
567                                         free(listmid);
568                                         for (i = 0; i < threadnum; i++)
569                                                 free(thread_data_array[i].buffer);
570                                         free(thread_data_array);
571                                         free(ltdata);
572                                         return 1;
573                                 }
574                         }
575
576                         threadnum++;            
577                         pile = pile->next;
578                 }
579                 /* Free attribute and wait for the other threads */
580                 pthread_attr_destroy(&attr);
581
582                 for (i = 0; i < threadnum; i++) {
583                         rc = pthread_join(thread[i], NULL);
584                         if(rc)
585                         {
586                                 printf("Error: return code from pthread_join() is %d\n", rc);
587                                 pthread_cond_destroy(&tcond);
588                                 pthread_mutex_destroy(&tlock);
589                                 pDelete(pile_ptr);
590                                 free(listmid);
591                                 for (j = i; j < threadnum; j++) {
592                                         free(thread_data_array[j].buffer);
593                                 }
594                                 return 1;
595                         }
596                         free(thread_data_array[i].buffer);
597                 }
598
599                 /* Free resources */    
600                 pthread_cond_destroy(&tcond);
601                 pthread_mutex_destroy(&tlock);
602                 free(listmid);
603                 pDelete(pile_ptr);
604
605                 /* wait a random amount of time before retrying to commit transaction*/
606                 if(treplyretry == 1) {
607                         free(thread_data_array);
608                         free(ltdata);
609                         randomdelay();
610                 }
611
612         /* Retry trans commit procedure during soft_abort case */
613         } while (treplyretry == 1);
614
615
616         if(treplyctrl == TRANS_ABORT) {
617                 /* Free Resources */
618                 objstrDelete(record->cache);
619                 chashDelete(record->lookupTable);
620                 free(record);
621                 free(thread_data_array);
622                 free(ltdata);
623                 return TRANS_ABORT;
624         } else if(treplyctrl == TRANS_COMMIT) {
625                 /* Free Resources */
626                 objstrDelete(record->cache);
627                 chashDelete(record->lookupTable);
628                 free(record);
629                 free(thread_data_array);
630                 free(ltdata);
631                 return 0;
632         } else {
633                 //TODO Add other cases
634                 printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
635                 exit(-1);
636         }
637         return 0;
638 }
639
640 /* This function sends information involved in the transaction request 
641  * to participants and accepts a response from particpants.
642  * It calls decideresponse() to decide on what control message 
643  * to send next to participants and sends the message using sendResponse()*/
644 void *transRequest(void *threadarg) {
645         int sd, i, n;
646         struct sockaddr_in serv_addr;
647         thread_data_array_t *tdata;
648         objheader_t *headeraddr;
649         char control, recvcontrol;
650         char machineip[16], retval;
651
652         tdata = (thread_data_array_t *) threadarg;
653
654         /* Send Trans Request */
655         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
656                 perror("Error in socket for TRANS_REQUEST\n");
657                 pthread_exit(NULL);
658         }
659         bzero((char*) &serv_addr, sizeof(serv_addr));
660         serv_addr.sin_family = AF_INET;
661         serv_addr.sin_port = htons(LISTEN_PORT);
662         midtoIP(tdata->mid,machineip);
663         machineip[15] = '\0';
664         serv_addr.sin_addr.s_addr = inet_addr(machineip);
665         /* Open Connection */
666         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
667                 perror("Error in connect for TRANS_REQUEST\n");
668                 close(sd);
669                 pthread_exit(NULL);
670         }
671
672         /* Send bytes of data with TRANS_REQUEST control message */
673         send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
674         
675         /* Send list of machines involved in the transaction */
676         {
677                 int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
678                 send_data(sd, tdata->buffer->listmid, size);
679         }
680
681         /* Send oids and version number tuples for objects that are read */
682         {
683                 int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
684                 send_data(sd, tdata->buffer->objread, size);
685         }
686
687         /* Send objects that are modified */
688         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
689                 int size;
690                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
691                 GETSIZE(size,headeraddr);
692                 size+=sizeof(objheader_t);
693                 send_data(sd, headeraddr, size);
694         }
695
696         /* Read control message from Participant */
697         recv_data(sd, &control, sizeof(char));
698         recvcontrol = control;
699
700         /* Update common data structure and increment count */
701         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
702
703         /* Lock and update count */
704         /* Thread sleeps until all messages from pariticipants are received by coordinator */
705         pthread_mutex_lock(tdata->lock);
706
707         (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
708
709         /* Wake up the threads and invoke decideResponse (once) */
710         if(*(tdata->count) == tdata->buffer->f.mcount) {
711                 decideResponse(tdata); 
712                 pthread_cond_broadcast(tdata->threshold);
713         } else {
714                 pthread_cond_wait(tdata->threshold, tdata->lock);
715         }
716         pthread_mutex_unlock(tdata->lock);
717
718         /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
719          * to all participants in their respective socket */
720         if (sendResponse(tdata, sd) == 0) { 
721                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
722                 close(sd);
723                 pthread_exit(NULL);
724         }
725
726         recv_data((int)sd, &control, sizeof(char)); 
727
728         if(control == TRANS_UNSUCESSFUL) {
729                 //printf("DEBUG-> TRANS_ABORTED\n");
730         } else if(control == TRANS_SUCESSFUL) {
731                 //printf("DEBUG-> TRANS_SUCCESSFUL\n");
732         } else {
733                 //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
734         }
735
736         /* Close connection */
737         close(sd);
738         pthread_exit(NULL);
739 }
740
741 /* This function decides the reponse that needs to be sent to 
742  * all Participant machines after the TRANS_REQUEST protocol */
743 void decideResponse(thread_data_array_t *tdata) {
744         char control;
745         int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
746                                                                          message to send */
747
748         for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
749                 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
750                                                            written onto the shared array */
751                 switch(control) {
752                         default:
753                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
754                                 /* treat as disagree, pass thru */
755                         case TRANS_DISAGREE:
756                                 transdisagree++;
757                                 break;
758
759                         case TRANS_AGREE:
760                                 transagree++;
761                                 break;
762
763                         case TRANS_SOFT_ABORT:
764                                 transsoftabort++;
765                                 break;
766                 }
767         }
768
769         if(transdisagree > 0) {
770                 /* Send Abort */
771                 *(tdata->replyctrl) = TRANS_ABORT;
772                 *(tdata->replyretry) = 0;
773                 /* clear objects from prefetch cache */
774                 for (i = 0; i < tdata->buffer->f.numread; i++)
775                         prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
776                 for (i = 0; i < tdata->buffer->f.nummod; i++)
777                         prehashRemove(tdata->buffer->oidmod[i]);
778         } else if(transagree == tdata->buffer->f.mcount){
779                 /* Send Commit */
780                 *(tdata->replyctrl) = TRANS_COMMIT;
781                 *(tdata->replyretry) = 0;
782         } else { 
783                 /* Send Abort in soft abort case followed by retry commiting transaction again*/
784                 *(tdata->replyctrl) = TRANS_ABORT;
785                 *(tdata->replyretry) = 1;
786         }
787
788         return;
789 }
790 /* This function sends the final response to remote machines per thread in their respective socket id 
791  * It returns a char that is only needed to check the correctness of execution of this function inside
792  * transRequest()*/
793 char sendResponse(thread_data_array_t *tdata, int sd) {
794         int n, size, sum, oidcount = 0, control;
795         char *ptr, retval = 0;
796         unsigned int *oidnotfound;
797
798         control = *(tdata->replyctrl);
799         send_data(sd, &control, sizeof(char));
800
801         //TODO read missing objects during object migration
802         /* If response is a soft abort due to missing objects at the Participant's side */
803         /*
804         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
805                 // Read list of objects missing  
806                 recv_data(sd, &oidcount, sizeof(int));
807                 //if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
808                 if(oidcount != 0) {
809                         size = oidcount * sizeof(unsigned int);
810                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
811                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
812                                 return 0;
813                         }
814                         ptr = (char *) oidnotfound;
815                         recv_data(sd, ptr, size);
816                 }
817                 retval =  TRANS_SOFT_ABORT;
818         }
819         */
820
821         /* If the decided response is TRANS_ABORT */
822         if(*(tdata->replyctrl) == TRANS_ABORT) {
823                 retval = TRANS_ABORT;
824         } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ 
825                 retval = TRANS_COMMIT;
826         }
827         
828         return retval;
829 }
830
831 /* This function opens a connection, places an object read request to the 
832  * remote machine, reads the control message and object if available  and 
833  * copies the object and its header to the local cache.
834  * */ 
835
836 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
837   int size, val;
838   struct sockaddr_in serv_addr;
839   char machineip[16];
840   char control;
841   objheader_t *h;
842   void *objcopy = NULL;
843   
844   int sd = getSock2(transReadSockPool, mnum);
845   char readrequest[sizeof(char)+sizeof(unsigned int)];
846   readrequest[0] = READ_REQUEST;
847   *((unsigned int *)(&readrequest[1])) = oid;
848   send_data(sd, readrequest, sizeof(readrequest));
849   
850   /* Read response from the Participant */
851   recv_data(sd, &control, sizeof(char));
852   
853   if (control==OBJECT_NOT_FOUND) {
854     objcopy = NULL;
855   } else {
856     /* Read object if found into local cache */
857     recv_data(sd, &size, sizeof(int));
858     objcopy = objstrAlloc(record->cache, size);
859     recv_data(sd, objcopy, size);
860     
861     /* Insert into cache's lookup table */
862     chashInsert(record->lookupTable, oid, objcopy); 
863   }
864   
865   //    freeSock(transReadSockPool, mnum, sd);
866   
867   return objcopy;
868 }
869
870 /* This function handles the local objects involved in a transaction commiting process.
871  * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
872  * Note Coordinator = local machine
873  * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
874  * based on common agreement it either commits or aborts the transaction.
875  * It also frees the memory resources */
876 void *handleLocalReq(void *threadarg) {
877         unsigned int *oidnotfound = NULL, *oidlocked = NULL;
878         local_thread_data_array_t *localtdata;
879         int objnotfound = 0, objlocked = 0; 
880         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
881         int numread, i;
882         unsigned int oid;
883         unsigned short version;
884         void *mobj;
885         objheader_t *headptr;
886
887         localtdata = (local_thread_data_array_t *) threadarg;
888
889         /* Counters and arrays to formulate decision on control message to be sent */
890         oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
891         oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
892
893         numread = localtdata->tdata->buffer->f.numread;
894         /* Process each oid in the machine pile/ group per thread */
895         for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
896                 if (i < localtdata->tdata->buffer->f.numread) {
897                         int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
898                         incr *= i;
899                         oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
900                         version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
901                 } else { // Objects Modified
902                         int tmpsize;
903                         headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
904                         if (headptr == NULL) {
905                                 printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
906                                 return NULL;
907                         }
908                         oid = OID(headptr);
909                         version = headptr->version;
910                 }
911                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
912
913                 /* Save the oids not found and number of oids not found for later use */
914                 if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
915                         /* Save the oids not found and number of oids not found for later use */
916                         oidnotfound[objnotfound] = oid;
917                         objnotfound++;
918                 } else { /* If Obj found in machine (i.e. has not moved) */
919                         /* Check if Obj is locked by any previous transaction */
920                         if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
921                                 if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
922                                         v_matchlock++;
923                                 } else {/* If versions don't match ...HARD ABORT */
924                                         v_nomatch++;
925                                         /* Send TRANS_DISAGREE to Coordinator */
926                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
927                                 }
928                         } else {/* If Obj is not locked then lock object */
929                                 STATUS(((objheader_t *)mobj)) |= LOCK;
930                                 /* Save all object oids that are locked on this machine during this transaction request call */
931                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
932                                 objlocked++;
933                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
934                                         v_matchnolock++;
935                                 } else { /* If versions don't match ...HARD ABORT */
936                                         v_nomatch++;
937                                         /* Send TRANS_DISAGREE to Coordinator */
938                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
939                                 }
940                         }
941                 }
942         } // End for
943         /* Condition to send TRANS_AGREE */
944         if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
945                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
946         }
947         /* Condition to send TRANS_SOFT_ABORT */
948         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
949                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
950         }
951
952         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
953          * if Participant receives a TRANS_COMMIT */
954         localtdata->transinfo->objlocked = oidlocked;
955         localtdata->transinfo->objnotfound = oidnotfound;
956         localtdata->transinfo->modptr = NULL;
957         localtdata->transinfo->numlocked = objlocked;
958         localtdata->transinfo->numnotfound = objnotfound;
959         /* Lock and update count */
960         //Thread sleeps until all messages from pariticipants are received by coordinator
961         pthread_mutex_lock(localtdata->tdata->lock);
962         (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
963
964         /* Wake up the threads and invoke decideResponse (once) */
965         if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
966                 decideResponse(localtdata->tdata); 
967                 pthread_cond_broadcast(localtdata->tdata->threshold);
968         } else {
969                 pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
970         }
971         pthread_mutex_unlock(localtdata->tdata->lock);
972         if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
973                 if(transAbortProcess(localtdata) != 0) {
974                         printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
975                         pthread_exit(NULL);
976                 }
977         } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
978                 if(transComProcess(localtdata) != 0) {
979                         printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
980                         pthread_exit(NULL);
981                 }
982         }
983         /* Free memory */
984         if (localtdata->transinfo->objlocked != NULL) {
985                 free(localtdata->transinfo->objlocked);
986         }
987         if (localtdata->transinfo->objnotfound != NULL) {
988                 free(localtdata->transinfo->objnotfound);
989         }
990
991         pthread_exit(NULL);
992 }
993
994 /* This function completes the ABORT process if the transaction is aborting */
995 int transAbortProcess(local_thread_data_array_t  *localtdata) {
996         int i, numlocked;
997         unsigned int *objlocked;
998         void *header;
999
1000         numlocked = localtdata->transinfo->numlocked;
1001         objlocked = localtdata->transinfo->objlocked;
1002
1003         for (i = 0; i < numlocked; i++) {
1004                 if((header = mhashSearch(objlocked[i])) == NULL) {
1005                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1006                         return 1;
1007                 }
1008                 STATUS(((objheader_t *)header)) &= ~(LOCK);
1009         }
1010
1011         return 0;
1012 }
1013
1014 /*This function completes the COMMIT process is the transaction is commiting*/
1015 int transComProcess(local_thread_data_array_t  *localtdata) {
1016         objheader_t *header, *tcptr;
1017         int i, nummod, tmpsize, numcreated, numlocked;
1018         unsigned int *oidmod, *oidcreated, *oidlocked;
1019         void *ptrcreate;
1020
1021         nummod = localtdata->tdata->buffer->f.nummod;
1022         oidmod = localtdata->tdata->buffer->oidmod;
1023         numcreated = localtdata->tdata->buffer->f.numcreated;
1024         oidcreated = localtdata->tdata->buffer->oidcreated;
1025         numlocked = localtdata->transinfo->numlocked;
1026         oidlocked = localtdata->transinfo->objlocked;
1027
1028         for (i = 0; i < nummod; i++) {
1029                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1030                         printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1031                         return 1;
1032                 }
1033                 /* Copy from transaction cache -> main object store */
1034                 if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
1035                         printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1036                         return 1;
1037                 }
1038                 GETSIZE(tmpsize, header);
1039                 pthread_mutex_lock(&mainobjstore_mutex);
1040                 memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize);
1041                 header->version += 1;
1042                 if(header->notifylist != NULL) {
1043                         notifyAll(&header->notifylist, OID(header), header->version);
1044                 }
1045                 pthread_mutex_unlock(&mainobjstore_mutex);
1046         }
1047         /* If object is newly created inside transaction then commit it */
1048         for (i = 0; i < numcreated; i++) {
1049                 if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
1050                         printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1051                         return 1;
1052                 }
1053                 GETSIZE(tmpsize, header);
1054                 tmpsize += sizeof(objheader_t);
1055                 pthread_mutex_lock(&mainobjstore_mutex);
1056                 if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1057                         printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1058                         pthread_mutex_unlock(&mainobjstore_mutex);
1059                         return 1;
1060                 }
1061                 pthread_mutex_unlock(&mainobjstore_mutex);
1062                 memcpy(ptrcreate, header, tmpsize);
1063                 mhashInsert(oidcreated[i], ptrcreate);
1064                 lhashInsert(oidcreated[i], myIpAddr);
1065         }
1066         /* Unlock locked objects */
1067         for(i = 0; i < numlocked; i++) {
1068                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1069                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1070                         return 1;
1071                 }
1072                 STATUS(header) &= ~(LOCK);
1073         }
1074
1075         return 0;
1076 }
1077
1078 /* This function checks if the prefetch oids are same and have same offsets  
1079  * for case x.a.b and y.a.b where x and y have same oid's
1080  * or if a.b.c is a subset of x.b.c.d*/ 
1081 /* check for case where the generated request a.y.z or x.y.z.g then 
1082  * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
1083 void checkPrefetchTuples(prefetchqelem_t *node) {
1084         int i,j, count,k, sindex, index;
1085         char *ptr, *tmp;
1086         int ntuples, slength;
1087         unsigned int *oid;
1088         unsigned short *endoffsets;
1089         short *offsets; 
1090
1091         /* Check for the case x.y.z and a.b.c are same oids */ 
1092         ptr = (char *) node;
1093         ntuples = *(GET_NTUPLES(ptr));
1094         oid = GET_PTR_OID(ptr);
1095         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1096         offsets = GET_PTR_ARRYFLD(ptr, ntuples);
1097         
1098         /* Find offset length for each tuple */
1099         int numoffset[ntuples];
1100         numoffset[0] = endoffsets[0];
1101         for(i = 1; i<ntuples; i++) {
1102                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1103         }
1104         /* Check for redundant tuples by comparing oids of each tuple */
1105         for(i = 0; i < ntuples; i++) {
1106                 if(oid[i] == 0)
1107                         continue;
1108                 for(j = i+1 ; j < ntuples; j++) {
1109                         if(oid[j] == 0)
1110                                 continue;
1111                         /*If oids of tuples match */ 
1112                         if (oid[i] == oid[j]) {
1113                                 /* Find the smallest offset length of two tuples*/
1114                                 if(numoffset[i] >  numoffset[j]){
1115                                         slength = numoffset[j];
1116                                         sindex = j;
1117                                 }
1118                                 else {
1119                                         slength = numoffset[i];
1120                                         sindex = i;
1121                                 }
1122
1123                                 /* Compare the offset values based on the current indices
1124                                  * break if they do not match
1125                                  * if all offset values match then pick the largest tuple*/
1126
1127                                 if(i == 0) {
1128                                         k = 0;
1129                                 } else {
1130                                         k = endoffsets[i-1];
1131                                 }
1132                                 index = endoffsets[j -1];
1133                                 for(count = 0; count < slength; count ++) {
1134                                         if (offsets[k] != offsets[index]) { 
1135                                                 break;
1136                                         }
1137                                         index++;
1138                                         k++;
1139                                 }       
1140                                 if(slength == count) {
1141                                         oid[sindex] = 0;
1142                                 }
1143                         }
1144                 }
1145         }
1146 }
1147 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
1148 prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
1149         char *ptr;
1150         int ntuples, i, machinenum, count=0;
1151         unsigned int *oid;
1152         unsigned short *endoffsets;
1153         short *arryfields, *offset; 
1154         prefetchpile_t *head = NULL, *tmp = NULL;
1155
1156         /* Check for the case x.y.z and a.b.c are same oids */ 
1157         ptr = (char *) node;
1158         ntuples = *(GET_NTUPLES(ptr));
1159         oid = GET_PTR_OID(ptr);
1160         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1161         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1162
1163         if((head = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
1164                 printf("Calloc error: %s %d\n", __FILE__, __LINE__);
1165                 return NULL;
1166         }
1167
1168         /* Check for redundant tuples by comparing oids of each tuple */
1169         for(i = 0; i < ntuples; i++) {
1170                 if(oid[i] == 0){
1171                         if(head->next != NULL) {
1172                                 if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
1173                                         printf("Calloc error: %s %d\n", __FILE__, __LINE__);
1174                                         return NULL;
1175                                 }
1176                                 tmp->mid = myIpAddr;
1177                                 tmp->next = head;
1178                                 head = tmp;
1179                         } else {
1180                                 head->mid = myIpAddr;
1181                         }
1182                         continue;
1183                 }
1184                 /* For each tuple make piles */
1185                 if ((machinenum = lhashSearch(oid[i])) == 0) {
1186                         printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
1187                         return NULL;
1188                 }
1189                 /* Insert into machine pile */
1190                 if(i == 0){
1191                         offset = &arryfields[0];
1192                 } else {
1193                         offset = &arryfields[endoffsets[i-1]];
1194                 }
1195
1196                 if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){
1197                         printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__);
1198                         return NULL;
1199                 }
1200         }
1201
1202         return head;
1203 }
1204
1205 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
1206         int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val;
1207         unsigned int *oid;
1208         int isArray;
1209         char *ptr, *tmp;
1210         objheader_t *objheader;
1211         unsigned short *endoffsets;
1212         short *arryfields; 
1213
1214         ptr = (char *) node;
1215         ntuples = *(GET_NTUPLES(ptr));
1216         oid = GET_PTR_OID(ptr);
1217         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1218         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1219
1220         /* Find offset length for each tuple */
1221         int numoffset[ntuples];//Number of offsets for each tuple
1222         numoffset[0] = endoffsets[0];
1223         for(i = 1; i<ntuples; i++) {
1224                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1225         }
1226
1227         for(i = 0; i < ntuples; i++) { 
1228                 if(oid[i] == 0){
1229                         if(i == 0) {
1230                                 arryfieldindex = 0;
1231                                 nextarryfieldindex =  endoffsets[0];
1232                         }else {
1233                                 arryfieldindex = endoffsets[i-1];
1234                                 nextarryfieldindex =  endoffsets[i];
1235                         }
1236                         numoffset[i] = 0;
1237                         endoffsets[0] = val = numoffset[0];
1238                         for(k = 1; k < ntuples; k++) {
1239                                 val = val + numoffset[k];
1240                                 endoffsets[k] = val; 
1241                         }
1242                         
1243                         for(k = 0; k<endoffsets[ntuples-1]; k++) {
1244                                 arryfields[arryfieldindex+k] = arryfields[nextarryfieldindex+k];
1245                         }
1246                         continue;
1247                 }
1248
1249                 /* If object found locally */
1250                 if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
1251                         isArray = 0;
1252                         tmp = (char *) objheader;
1253                         int orgnumoffset = numoffset[i];
1254                         if(i == 0) {
1255                                 arryfieldindex = 0;
1256                         }else {
1257                                 arryfieldindex = endoffsets[i-1];
1258                         }
1259
1260                         for(j = 0; j<orgnumoffset; j++) {
1261                                 unsigned int objoid = 0;
1262                                 /* Check for arrays  */
1263                                 if(TYPE(objheader) > NUMCLASSES) {
1264                                         isArray = 1;
1265                                 }
1266                                 if(isArray == 1) {
1267                                         int elementsize = classsize[TYPE(objheader)];
1268                                         struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
1269                                         int length = ao->___length___;
1270                                         /* Check if array out of bounds */
1271                                         if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex] >= length) {
1272                                                 break; //if yes then treat the object as found 
1273                                         }
1274                                         objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
1275                                 } else {
1276                                         objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
1277                                 }
1278                                 //Update numoffset array
1279                                 numoffset[i] = numoffset[i] - 1;
1280                                 //Update oid array
1281                                 oid[i] = objoid;
1282                                 //Update endoffset array
1283                                 endoffsets[0] = val = numoffset[0];
1284                                 for(k = 1; k < ntuples; k++) {
1285                                         val = val + numoffset[k];
1286                                         endoffsets[k] = val; 
1287                                 }
1288                                 //Update arrayfields array
1289                                 for(k = 0; k < endoffsets[ntuples-1]; k++) {
1290                                         arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1];
1291                                 }
1292                                 if((objheader = (objheader_t*) mhashSearch(oid[i])) == NULL) {
1293                                         flag = 1;
1294                                         checkPreCache(node, numoffset, oid[i], i); 
1295                                         break;
1296                                 }
1297                                 tmp = (char *) objheader;
1298                                 isArray = 0;
1299                         }
1300                         /*If all offset oids are found locally,make the prefetch tuple invalid */
1301                         if(flag == 0) {
1302                                 oid[i] = 0;
1303                         }
1304                 } else {
1305                         /* Look in Prefetch cache */
1306                         checkPreCache(node, numoffset, oid[i],i); 
1307                 }
1308                 flag = 0;
1309         }
1310         
1311         /* Make machine groups */
1312         prefetchpile_t *head = NULL;
1313         if((head = makePreGroups(node, numoffset)) == NULL) {
1314                 printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
1315                 return NULL;
1316         }
1317
1318         return head;
1319 }
1320
1321 void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, int index) {
1322         char *ptr, *tmp;
1323         int ntuples, i, k, flag=0, isArray =0, arryfieldindex, val;
1324         unsigned int * oid;
1325         unsigned short *endoffsets;
1326         short *arryfields;
1327         objheader_t *header;
1328
1329         ptr = (char *) node;
1330         ntuples = *(GET_NTUPLES(ptr));
1331         oid = GET_PTR_OID(ptr);
1332         endoffsets = GET_PTR_EOFF(ptr, ntuples);
1333         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1334
1335         if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
1336                 return;
1337         } else { //Found in Prefetch Cache
1338                 //TODO Decide if object is too old, if old remove from cache
1339                 tmp = (char *) header;
1340                 int loopcount = numoffset[index];       
1341                 if(index == 0)
1342                         arryfieldindex = 0;
1343                 else
1344                         arryfieldindex = endoffsets[(index - 1)];
1345                 // Check if any of the offset oid is available in the Prefetch cache
1346                 for(i = 0; i < loopcount; i++) {
1347                         /* Check for arrays  */
1348                         if(TYPE(header) > NUMCLASSES) {
1349                                 isArray = 1;
1350                         }
1351                 
1352                         if(isArray == 1) {
1353                                 int elementsize = classsize[TYPE(header)];
1354                                 struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
1355                                 int length = ao->___length___;
1356                                 /* Check if array out of bounds */
1357                                 if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex] >= length) {
1358                                         break; //if yes treat the object as found
1359                                 }
1360                                 objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
1361                         } else {
1362                                 objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
1363                         }
1364                         //Update numoffset array
1365                         numoffset[index] = numoffset[index] - 1;
1366                         //Update oid array
1367                         oid[index] = objoid;
1368                         //Update endoffset array
1369                         endoffsets[0] = val = numoffset[0];
1370                         for(k = 1; k < ntuples; k++) {
1371                                 val = val + numoffset[k];
1372                                 endoffsets[k] = val; 
1373                         }
1374                         //Update arrayfields array
1375                         for(k = 0; k < endoffsets[ntuples-1]; k++) {
1376                                 arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1];
1377                         }
1378                         if((header = (objheader_t *)prehashSearch(oid[index])) != NULL) {
1379                                 tmp = (char *) header;
1380                                 isArray = 0;
1381                         } else {
1382                                 flag = 1;
1383                                 break;
1384                         }
1385                 }
1386         }
1387         //Found in the prefetch cache
1388         if(flag == 0 && (numoffset[index] == 0)) {
1389                 oid[index] = 0;
1390         }
1391 }
1392
1393
1394
1395 /* This function is called by the thread calling transPrefetch */
1396 void *transPrefetch(void *t) {
1397   while(1) {
1398     /* lock mutex of primary prefetch queue */
1399     pthread_mutex_lock(&pqueue.qlock);
1400     /* while primary queue is empty, then wait */
1401     while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
1402       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
1403     }
1404     
1405     /* dequeue node to create a machine piles and  finally unlock mutex */
1406     prefetchqelem_t *qnode;
1407     if((qnode = pre_dequeue()) == NULL) {
1408       printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
1409       pthread_mutex_unlock(&pqueue.qlock);
1410       continue;
1411     }
1412     pthread_mutex_unlock(&pqueue.qlock);
1413     
1414     /* Reduce redundant prefetch requests */
1415     checkPrefetchTuples(qnode);
1416     /* Check if the tuples are found locally, if yes then reduce them further*/ 
1417     /* and group requests by remote machine ids by calling the makePreGroups() */
1418     prefetchpile_t *pilehead = foundLocal(qnode);
1419     
1420     // Get sock from shared pool 
1421     int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1422     
1423     /* Send  Prefetch Request */
1424     prefetchpile_t *ptr = pilehead;
1425     while(ptr != NULL) {
1426       sendPrefetchReq(ptr, sd);
1427       ptr = ptr->next; 
1428     }
1429     
1430     /* Release socket */
1431     //  freeSock(transPrefetchSockPool, pilehead->mid, sd);
1432     
1433     /* Deallocated pilehead */
1434     mcdealloc(pilehead);
1435     
1436     // Deallocate the prefetch queue pile node
1437     predealloc(qnode);
1438   }
1439 }
1440
1441 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1442   int off, len, endpair, count = 0;
1443   char control;
1444   objpile_t *tmp;
1445   
1446   /* Send TRANS_PREFETCH control message */
1447   control = TRANS_PREFETCH;
1448   send_data(sd, &control, sizeof(char));
1449   
1450   /* Send Oids and offsets in pairs */
1451   tmp = mcpilenode->objpiles;
1452   while(tmp != NULL) {
1453     off = 0;
1454     count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
1455     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1456     char oidnoffset[len];
1457     bzero(oidnoffset, len);
1458     *((int*)oidnoffset) = len;
1459     off = sizeof(int);
1460     *((unsigned int *)(oidnoffset + off)) = tmp->oid;
1461     off += sizeof(unsigned int);
1462     *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
1463     off += sizeof(unsigned int);
1464     int i;
1465     for(i = 0; i < tmp->numoffset; i++) {
1466       *((short*)(oidnoffset + off)) = tmp->offset[i];
1467       off+=sizeof(short);
1468     }
1469     send_data(sd, oidnoffset, len);
1470     tmp = tmp->next;
1471   }
1472   
1473   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1474   endpair = -1;
1475   send_data(sd, &endpair, sizeof(int));
1476   
1477   return;
1478 }
1479
1480 int getPrefetchResponse(int sd) {
1481         int numbytes = 0, length = 0, size = 0;
1482         char *recvbuffer, control;
1483         unsigned int oid;
1484         void *modptr, *oldptr;
1485
1486         recv_data((int)sd, &length, sizeof(int)); 
1487         size = length - sizeof(int);
1488         if((recvbuffer = calloc(1, size)) == NULL) {
1489                 printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
1490                 return -1;
1491         }
1492
1493         recv_data((int)sd, recvbuffer, size);
1494
1495         control = *((char *) recvbuffer);
1496         if(control == OBJECT_FOUND) {
1497                 numbytes = 0;
1498                 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1499                 size = size - (sizeof(char) + sizeof(unsigned int));
1500                 pthread_mutex_lock(&prefetchcache_mutex);
1501                 if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
1502                         printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1503                         pthread_mutex_unlock(&prefetchcache_mutex);
1504                         free(recvbuffer);
1505                         return -1;
1506                 }
1507                 pthread_mutex_unlock(&prefetchcache_mutex);
1508                 memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1509
1510                 /* Insert the oid and its address into the prefetch hash lookup table */
1511                 /* Do a version comparison if the oid exists */
1512                 if((oldptr = prehashSearch(oid)) != NULL) {
1513                         /* If older version then update with new object ptr */
1514                         if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1515                                 prehashRemove(oid);
1516                                 prehashInsert(oid, modptr);
1517                         } else {
1518                                 /* TODO modptr should be reference counted */
1519                         }
1520                 } else {/* Else add the object ptr to hash table*/
1521                         prehashInsert(oid, modptr);
1522                 }
1523                 /* Lock the Prefetch Cache look up table*/
1524                 pthread_mutex_lock(&pflookup.lock);
1525                 /* Broadcast signal on prefetch cache condition variable */ 
1526                 pthread_cond_broadcast(&pflookup.cond);
1527                 /* Unlock the Prefetch Cache look up table*/
1528                 pthread_mutex_unlock(&pflookup.lock);
1529         } else if(control == OBJECT_NOT_FOUND) {
1530                 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1531                 /* TODO: For each object not found query DHT for new location and retrieve the object */
1532                 /* Throw an error */
1533                 printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1534                 free(recvbuffer);
1535                 exit(-1);
1536         } else {
1537                 printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1538         }
1539
1540         free(recvbuffer);
1541
1542         return 0;
1543 }
1544
1545 unsigned short getObjType(unsigned int oid)
1546 {
1547         objheader_t *objheader;
1548         unsigned short numoffset[] ={0};
1549         short fieldoffset[] ={};
1550
1551         if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
1552         {
1553                 if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1554                 {
1555                         prefetch(1, &oid, numoffset, fieldoffset);
1556                         pthread_mutex_lock(&pflookup.lock);
1557                         while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1558                         {
1559                                 pthread_cond_wait(&pflookup.cond, &pflookup.lock);
1560                         }
1561                         pthread_mutex_unlock(&pflookup.lock);
1562                 }
1563         }
1564
1565         return TYPE(objheader);
1566 }
1567
1568 int startRemoteThread(unsigned int oid, unsigned int mid)
1569 {
1570         int sock;
1571         struct sockaddr_in remoteAddr;
1572         char msg[1 + sizeof(unsigned int)];
1573         int bytesSent;
1574         int status;
1575
1576         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1577         {
1578                 perror("startRemoteThread():socket()");
1579                 return -1;
1580         }
1581
1582         bzero(&remoteAddr, sizeof(remoteAddr));
1583         remoteAddr.sin_family = AF_INET;
1584         remoteAddr.sin_port = htons(LISTEN_PORT);
1585         remoteAddr.sin_addr.s_addr = htonl(mid);
1586         
1587         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1588         {
1589                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1590                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1591                 status = -1;
1592         }
1593         else
1594         {
1595                 msg[0] = START_REMOTE_THREAD;
1596         *((unsigned int *) &msg[1]) = oid;
1597                 send_data(sock, msg, 1 + sizeof(unsigned int));
1598         }
1599
1600         close(sock);
1601         return status;
1602 }
1603
1604 //TODO: when reusing oids, make sure they are not already in use!
1605 unsigned int getNewOID(void) {
1606         static unsigned int id = 0xFFFFFFFF;
1607         
1608         id += 2;
1609         if (id > oidMax || id < oidMin)
1610         {
1611                 id = (oidMin | 1);
1612         }
1613         return id;
1614 }
1615
1616 int processConfigFile()
1617 {
1618         FILE *configFile;
1619         const int maxLineLength = 200;
1620         char lineBuffer[maxLineLength];
1621         char *token;
1622         const char *delimiters = " \t\n";
1623         char *commentBegin;
1624         in_addr_t tmpAddr;
1625         
1626         configFile = fopen(CONFIG_FILENAME, "r");
1627         if (configFile == NULL)
1628         {
1629                 printf("error opening %s:\n", CONFIG_FILENAME);
1630                 perror("");
1631                 return -1;
1632         }
1633
1634         numHostsInSystem = 0;
1635         sizeOfHostArray = 8;
1636         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1637         
1638         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1639         {
1640                 commentBegin = strchr(lineBuffer, '#');
1641                 if (commentBegin != NULL)
1642                         *commentBegin = '\0';
1643                 token = strtok(lineBuffer, delimiters);
1644                 while (token != NULL)
1645                 {
1646                         tmpAddr = inet_addr(token);
1647                         if ((int)tmpAddr == -1)
1648                         {
1649                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1650                                 fclose(configFile);
1651                                 return -1;
1652                         }
1653                         else
1654                                 addHost(htonl(tmpAddr));
1655                         token = strtok(NULL, delimiters);
1656                 }
1657         }
1658
1659         fclose(configFile);
1660         
1661         if (numHostsInSystem < 1)
1662         {
1663                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1664                 return -1;
1665         }
1666 #ifdef MAC
1667         myIpAddr = getMyIpAddr("en1");
1668 #else
1669         myIpAddr = getMyIpAddr("eth0");
1670 #endif
1671         myIndexInHostArray = findHost(myIpAddr);
1672         if (myIndexInHostArray == -1)
1673         {
1674                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1675                 return -1;
1676         }
1677         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1678         oidMin = oidsPerBlock * myIndexInHostArray;
1679         if (myIndexInHostArray == numHostsInSystem - 1)
1680                 oidMax = 0xFFFFFFFF;
1681         else
1682                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1683
1684         return 0;
1685 }
1686
1687 void addHost(unsigned int hostIp)
1688 {
1689         unsigned int *tmpArray;
1690
1691         if (findHost(hostIp) != -1)
1692                 return;
1693
1694         if (numHostsInSystem == sizeOfHostArray)
1695         {
1696                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1697                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1698                 free(hostIpAddrs);
1699                 hostIpAddrs = tmpArray;
1700         }
1701
1702         hostIpAddrs[numHostsInSystem++] = hostIp;
1703
1704         return;
1705 }
1706
1707 int findHost(unsigned int hostIp)
1708 {
1709         int i;
1710         for (i = 0; i < numHostsInSystem; i++)
1711                 if (hostIpAddrs[i] == hostIp)
1712                         return i;
1713
1714         //not found
1715         return -1;
1716 }
1717
1718 /* This function sends notification request per thread waiting on object(s) whose version 
1719  * changes */
1720 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1721         int sock,i;
1722         objheader_t *objheader;
1723         struct sockaddr_in remoteAddr;
1724         char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1725         char *ptr;
1726         int bytesSent;
1727         int status, size;
1728         unsigned short version;
1729         unsigned int oid,mid;
1730         static unsigned int threadid = 0;
1731         pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1732         pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1733         notifydata_t *ndata;
1734
1735         //FIXME currently all oids belong to one machine
1736         oid = oidarry[0];
1737         if((mid = lhashSearch(oid)) == 0) {
1738                 printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1739                 return;
1740         }
1741
1742         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1743                 perror("reqNotify():socket()");
1744                 return -1;
1745         }
1746
1747         bzero(&remoteAddr, sizeof(remoteAddr));
1748         remoteAddr.sin_family = AF_INET;
1749         remoteAddr.sin_port = htons(LISTEN_PORT);
1750         remoteAddr.sin_addr.s_addr = htonl(mid);
1751
1752         /* Generate unique threadid */
1753         threadid++;
1754
1755         /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1756         if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1757                 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1758                 return -1;
1759         }
1760         ndata->numoid = numoid;
1761         ndata->threadid = threadid;
1762         ndata->oidarry = oidarry;
1763         ndata->versionarry = versionarry;
1764         ndata->threadcond = threadcond;
1765         ndata->threadnotify = threadnotify;
1766         if((status = notifyhashInsert(threadid, ndata)) != 0) {
1767                 printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1768                 free(ndata);
1769                 return -1; 
1770         }
1771         
1772         /* Send  number of oids, oidarry, version array, machine id and threadid */     
1773         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1774                 printf("reqNotify():error %d connecting to %s:%d\n", errno,
1775                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1776                 free(ndata);
1777                 return -1;
1778         } else {
1779                 msg[0] = THREAD_NOTIFY_REQUEST;
1780                 *((unsigned int *)(&msg[1])) = numoid;
1781                 /* Send array of oids  */
1782                 size = sizeof(unsigned int);
1783                 {
1784                         i = 0;
1785                         while(i < numoid) {
1786                                 oid = oidarry[i];
1787                                 *((unsigned int *)(&msg[1] + size)) = oid;
1788                                 size += sizeof(unsigned int);
1789                                 i++;
1790                         }
1791                 }
1792
1793                 /* Send array of version  */
1794                 {
1795                         i = 0;
1796                         while(i < numoid) {
1797                                 version = versionarry[i];
1798                                 *((unsigned short *)(&msg[1] + size)) = version;
1799                                 size += sizeof(unsigned short);
1800                                 i++;
1801                         }
1802                 }
1803
1804                 *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1805                 size += sizeof(unsigned int);
1806                 *((unsigned int *)(&msg[1] + size)) = threadid;
1807
1808                 pthread_mutex_lock(&(ndata->threadnotify));
1809                 size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1810                 send_data(sock, msg, size);
1811                 pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1812                 pthread_mutex_unlock(&(ndata->threadnotify));
1813         }
1814
1815         pthread_cond_destroy(&threadcond);
1816         pthread_mutex_destroy(&threadnotify);
1817         free(ndata);
1818         close(sock);
1819         return status;
1820 }
1821
1822 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1823         notifydata_t *ndata;
1824         int i, objIsFound = 0, index;
1825         void *ptr;
1826
1827         //Look up the tid and call the corresponding pthread_cond_signal
1828         if((ndata = notifyhashSearch(tid)) == NULL) {
1829                 printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1830                 return;
1831         } else  {
1832                 for(i = 0; i < ndata->numoid; i++) {
1833                         if(ndata->oidarry[i] == oid){
1834                                 objIsFound = 1;
1835                                 index = i;
1836                         }
1837                 }
1838                 if(objIsFound == 0){
1839                         printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1840                         return;
1841                 } else {
1842                         if(version <= ndata->versionarry[index]){
1843                                 printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
1844                                 return;
1845                         } else {
1846                                 /* Clear from prefetch cache and free thread related data structure */
1847                                 if((ptr = prehashSearch(oid)) != NULL) {
1848                                         prehashRemove(oid);
1849                                 }
1850                                 pthread_cond_signal(&(ndata->threadcond));
1851                         }
1852                 }
1853         }
1854         return;
1855 }
1856
1857 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1858         threadlist_t *ptr;
1859         unsigned int mid;
1860         struct sockaddr_in remoteAddr;
1861         char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1862         int sock, status, size, bytesSent;
1863
1864         while(*head != NULL) {
1865                 ptr = *head;
1866                 mid = ptr->mid; 
1867                 //create a socket connection to that machine
1868                 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1869                         perror("notifyAll():socket()");
1870                         return -1;
1871                 }
1872
1873                 bzero(&remoteAddr, sizeof(remoteAddr));
1874                 remoteAddr.sin_family = AF_INET;
1875                 remoteAddr.sin_port = htons(LISTEN_PORT);
1876                 remoteAddr.sin_addr.s_addr = htonl(mid);
1877                 //send Thread Notify response and threadid to that machine
1878                 if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1879                         printf("notifyAll():error %d connecting to %s:%d\n", errno,
1880                                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1881                         status = -1;
1882                 } else {
1883                         bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1884                         msg[0] = THREAD_NOTIFY_RESPONSE;
1885                         *((unsigned int *)&msg[1]) = oid;
1886                         size = sizeof(unsigned int);
1887                         *((unsigned short *)(&msg[1]+ size)) = version;
1888                         size+= sizeof(unsigned short);
1889                         *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1890
1891                         size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1892                         send_data(sock, msg, size);
1893                 }
1894                 //close socket
1895                 close(sock);
1896                 // Update head
1897                 *head = ptr->next;
1898                 free(ptr);
1899         }
1900         return status;
1901 }
1902
1903 void transAbort(transrecord_t *trans) {
1904         objstrDelete(trans->cache);
1905         chashDelete(trans->lookupTable);
1906         free(trans);
1907 }