changes to build script to increase java heap memory
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #ifdef COMPILER
14 #include "thread.h"
15 #endif
16
17 #define NUM_THREADS 1
18 #define PREFETCH_CACHE_SIZE 1048576 //1MB
19 #define CONFIG_FILENAME "dstm.conf"
20
21 /* Global Variables */
22 extern int classsize[];
23 pfcstats_t *evalPrefetch;
24 extern int numprefetchsites; //Global variable containing number of prefetch sites 
25 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
26 objstr_t *prefetchcache; //Global Prefetch cache
27 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
28 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
29 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
30 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
31 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
32 extern objstr_t *mainobjstore;
33 unsigned int myIpAddr;
34 unsigned int *hostIpAddrs;
35 int sizeOfHostArray;
36 int numHostsInSystem;
37 int myIndexInHostArray;
38 unsigned int oidsPerBlock;
39 unsigned int oidMin;
40 unsigned int oidMax;
41
42 sockPoolHashTable_t *transReadSockPool;
43 sockPoolHashTable_t *transPrefetchSockPool;
44 sockPoolHashTable_t *transRequestSockPool;
45 pthread_mutex_t notifymutex;
46 pthread_mutex_t atomicObjLock;
47
48 /***********************************
49  * Global Variables for statistics
50  **********************************/
51 int numTransCommit = 0;
52 int numTransAbort = 0;
53 int nchashSearch = 0;
54 int nmhashSearch = 0;
55 int nprehashSearch = 0;
56 int nRemoteSend = 0;
57
58 void printhex(unsigned char *, int);
59 plistnode_t *createPiles(transrecord_t *);
60
61 /*******************************
62  * Send and Recv function calls 
63  *******************************/
64 void send_data(int fd , void *buf, int buflen) {
65   char *buffer = (char *)(buf); 
66   int size = buflen;
67   int numbytes; 
68   while (size > 0) {
69     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
70     if (numbytes == -1) {
71       perror("send");
72       exit(0);
73     }
74     buffer += numbytes;
75     size -= numbytes;
76   }
77 }
78
79 void recv_data(int fd , void *buf, int buflen) {
80   char *buffer = (char *)(buf); 
81   int size = buflen;
82   int numbytes; 
83   while (size > 0) {
84     numbytes = recv(fd, buffer, size, 0);
85     if (numbytes == -1) {
86       perror("recv");
87       exit(0);
88     }
89     buffer += numbytes;
90     size -= numbytes;
91   }
92 }
93
94 int recv_data_errorcode(int fd , void *buf, int buflen) {
95   char *buffer = (char *)(buf); 
96   int size = buflen;
97   int numbytes; 
98   while (size > 0) {
99     numbytes = recv(fd, buffer, size, 0);
100     if (numbytes==0)
101       return 0;
102     if (numbytes == -1) {
103       perror("recv");
104       return -1;
105     }
106     buffer += numbytes;
107     size -= numbytes;
108   }
109   return 1;
110 }
111
112 void printhex(unsigned char *ptr, int numBytes) {
113   int i;
114   for (i = 0; i < numBytes; i++) {
115     if (ptr[i] < 16)
116       printf("0%x ", ptr[i]);
117     else
118       printf("%x ", ptr[i]);
119   }
120   printf("\n");
121   return;
122 }
123
124 inline int arrayLength(int *array) {
125   int i;
126   for(i=0 ;array[i] != -1; i++)
127     ;
128   return i;
129 }
130
131 inline int findmax(int *array, int arraylength) {
132   int max, i;
133   max = array[0];
134   for(i = 0; i < arraylength; i++){
135     if(array[i] > max) {
136       max = array[i];
137     }
138   }
139   return max;
140 }
141
142 /* This function is a prefetch call generated by the compiler that
143  * populates the shared primary prefetch queue*/
144 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
145   /* Allocate for the queue node*/
146   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
147   int len;
148   char * node= getmemory(qnodesize);
149   int top=endoffsets[ntuples-1];
150   
151   if (node==NULL) 
152     return;
153   /* Set queue node values */
154
155   /* TODO: Remove this after testing */
156   evalPrefetch[siteid].callcount++;
157
158   *((int *)(node))=siteid;
159   *((int *)(node + sizeof(int))) = ntuples;
160   len = 2*sizeof(int);
161   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
162   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
163   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
164
165   /* Lock and insert into primary prefetch queue */
166   movehead(qnodesize);
167 }
168
169 /* This function starts up the transaction runtime. */
170 int dstmStartup(const char * option) {
171   pthread_t thread_Listen, udp_thread_Listen;
172   pthread_attr_t attr;
173   int master=option!=NULL && strcmp(option, "master")==0;
174   int fd;
175   int udpfd;
176
177   if (processConfigFile() != 0)
178     return 0; //TODO: return error value, cause main program to exit
179 #ifdef COMPILER
180   if (!master)
181     threadcount--;
182 #endif
183
184 #ifdef TRANSSTATS
185   printf("Trans stats is on\n");
186   fflush(stdout);
187 #endif
188   
189   //Initialize socket pool
190   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
191   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
192   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
193   
194   dstmInit();
195   transInit();
196   
197   fd=startlistening();
198   pthread_attr_init(&attr);
199   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
200 #ifdef CACHE
201   udpfd = udpInit();
202   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
203 #endif
204   if (master) {
205     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
206     return 1;
207   } else {
208     dstmListen((void *)fd);
209     return 0;
210   }
211 }
212
213 //TODO Use this later
214 void *pCacheAlloc(objstr_t *store, unsigned int size) {
215   void *tmp;
216   objstr_t *ptr;
217   ptr = store;
218   int success = 0;
219   
220   while(ptr->next != NULL) {
221     /* check if store is empty */
222     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
223       tmp = ptr->top;
224       ptr->top += size;
225       success = 1;
226       return tmp;
227     } else {
228       ptr = ptr-> next;
229     }
230   }
231   
232   if(success == 0) {
233     return NULL;
234   }
235 }
236
237 /* This function initiates the prefetch thread A queue is shared
238  * between the main thread of execution and the prefetch thread to
239  * process the prefetch call Call from compiler populates the shared
240  * queue with prefetch requests while prefetch thread processes the
241  * prefetch requests */
242
243 void transInit() {
244   //Create and initialize prefetch cache structure
245 #ifdef CACHE
246   prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
247   initializePCache();
248   if((evalPrefetch = initPrefetchStats()) == NULL) {
249     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
250     exit(0);
251   }
252 #endif
253   
254   /* Initialize attributes for mutex */
255   pthread_mutexattr_init(&prefetchcache_mutex_attr);
256   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
257   
258   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
259   pthread_mutex_init(&notifymutex, NULL);
260   pthread_mutex_init(&atomicObjLock, NULL);
261 #ifdef CACHE
262   //Create prefetch cache lookup table
263   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
264     printf("ERROR\n");
265     return; //Failure
266   }
267   
268   //Initialize primary shared queue
269   queueInit();
270   //Initialize machine pile w/prefetch oids and offsets shared queue
271   mcpileqInit();
272   
273   //Create the primary prefetch thread 
274   int retval;
275   do {
276     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
277   } while(retval!=0);
278   pthread_detach(tPrefetch);
279 #endif
280 }
281
282 /* This function stops the threads spawned */
283 void transExit() {
284 #ifdef CACHE
285   int t;
286   pthread_cancel(tPrefetch);
287   for(t = 0; t < NUM_THREADS; t++)
288     pthread_cancel(wthreads[t]);
289 #endif
290   
291   return;
292 }
293
294 /* This functions inserts randowm wait delays in the order of msec
295  * Mostly used when transaction commits retry*/
296 void randomdelay() {
297   struct timespec req;
298   time_t t;
299   
300   t = time(NULL);
301   req.tv_sec = 0;
302   req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
303   nanosleep(&req, NULL);
304   return;
305 }
306
307 /* This function initializes things required in the transaction start*/
308 transrecord_t *transStart() {
309   transrecord_t *tmp;
310   if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){
311     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
312     return NULL;
313   }
314   tmp->cache = objstrCreate(1048576);
315   tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
316 #ifdef COMPILER
317   tmp->revertlist=NULL;
318 #endif
319   return tmp;
320 }
321
322 /* This function finds the location of the objects involved in a transaction
323  * and returns the pointer to the object if found in a remote location */
324 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
325   unsigned int machinenumber;
326   objheader_t *tmp, *objheader;
327   objheader_t *objcopy;
328   int size;
329   void *buf;
330   
331   if(oid == 0) {
332     return NULL;
333   }
334   
335   if((objheader = chashSearch(record->lookupTable, oid)) != NULL){
336 #ifdef TRANSSTATS
337     nchashSearch++;
338 #endif
339     /* Search local transaction cache */
340 #ifdef COMPILER
341     return &objheader[1];
342 #else
343     return objheader;
344 #endif
345   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
346 #ifdef TRANSSTATS
347     nmhashSearch++;
348 #endif
349     /* Look up in machine lookup table  and copy  into cache*/
350     GETSIZE(size, objheader);
351     size += sizeof(objheader_t);
352     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
353     memcpy(objcopy, objheader, size);
354     /* Insert into cache's lookup table */
355     STATUS(objcopy)=0;
356     chashInsert(record->lookupTable, OID(objheader), objcopy); 
357 #ifdef COMPILER
358     return &objcopy[1];
359 #else
360     return objcopy;
361 #endif
362   } else {
363 #ifdef CACHE
364     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { 
365 #ifdef TRANSSTATS
366       nprehashSearch++;
367 #endif
368       /* Look up in prefetch cache */
369       GETSIZE(size, tmp);
370       size+=sizeof(objheader_t);
371       objcopy = (objheader_t *) objstrAlloc(record->cache, size);
372       memcpy(objcopy, tmp, size);
373       /* Insert into cache's lookup table */
374       chashInsert(record->lookupTable, OID(tmp), objcopy); 
375 #ifdef COMPILER
376       return &objcopy[1];
377 #else
378       return objcopy;
379 #endif
380     }
381 #endif
382     /* Get the object from the remote location */
383     if((machinenumber = lhashSearch(oid)) == 0) {
384       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
385       return NULL;
386     }
387     objcopy = getRemoteObj(record, machinenumber, oid);
388
389     if(objcopy == NULL) {
390       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
391       return NULL;
392     } else {
393 #ifdef TRANSSTATS
394       nRemoteSend++;
395 #endif
396       STATUS(objcopy)=0;      
397 #ifdef COMPILER
398       return &objcopy[1];
399 #else
400       return objcopy;
401 #endif
402     }
403   }
404 }
405
406 /* This function creates objects in the transaction record */
407 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
408   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
409   OID(tmp) = getNewOID();
410   tmp->version = 1;
411   tmp->rcount = 1;
412   STATUS(tmp) = NEW;
413   chashInsert(record->lookupTable, OID(tmp), tmp);
414   
415 #ifdef COMPILER
416   return &tmp[1]; //want space after object header
417 #else
418   return tmp;
419 #endif
420 }
421
422 /* This function creates machine piles based on all machines involved in a
423  * transaction commit request */
424 plistnode_t *createPiles(transrecord_t *record) {
425   int i;
426   plistnode_t *pile = NULL;
427   unsigned int machinenum;
428   objheader_t *headeraddr;
429   chashlistnode_t * ptr = record->lookupTable->table;
430   /* Represents number of bins in the chash table */
431   unsigned int size = record->lookupTable->size;
432   
433   for(i = 0; i < size ; i++) {
434     chashlistnode_t * curr = &ptr[i];
435     /* Inner loop to traverse the linked list of the cache lookupTable */
436     while(curr != NULL) {
437       //if the first bin in hash table is empty
438       if(curr->key == 0)
439         break;
440       
441       if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
442         printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
443         return NULL;
444       }
445       
446       //Get machine location for object id (and whether local or not)
447       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
448         machinenum = myIpAddr;
449       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
450         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
451         return NULL;
452       }
453       
454       //Make machine groups
455       pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
456       curr = curr->next;
457     }
458   }
459   return pile; 
460 }
461
462 /* This function initiates the transaction commit process
463  * Spawns threads for each of the new connections with Participants 
464  * and creates new piles by calling the createPiles(), 
465  * Sends a transrequest() to each remote machines for objects found remotely 
466  * and calls handleLocalReq() to process objects found locally */
467 int transCommit(transrecord_t *record) {        
468   unsigned int tot_bytes_mod, *listmid;
469   plistnode_t *pile, *pile_ptr;
470   int i, j, rc, val;
471   int pilecount, offset, threadnum, trecvcount;
472   char control;
473   char transid[TID_LEN];
474   trans_req_data_t *tosend;
475   trans_commit_data_t transinfo;
476   static int newtid = 0;
477   char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
478   thread_data_array_t *thread_data_array;
479   local_thread_data_array_t *ltdata;
480   int firsttime=1;
481
482   do { 
483     treplyctrl=0;
484     trecvcount = 0; 
485     threadnum = 0; 
486     treplyretry = 0;
487     thread_data_array = NULL;
488     ltdata = NULL;
489     
490     /* Look through all the objects in the transaction record and make piles 
491      * for each machine involved in the transaction*/
492     if (firsttime)
493       pile_ptr = pile = createPiles(record);
494     else
495       pile=pile_ptr;
496     firsttime=0;
497
498     /* Create the packet to be sent in TRANS_REQUEST */
499     
500     /* Count the number of participants */
501     pilecount = pCount(pile);
502     
503     /* Create a list of machine ids(Participants) involved in transaction       */
504     listmid = calloc(pilecount, sizeof(unsigned int));
505     pListMid(pile, listmid);
506     
507     
508     /* Initialize thread variables,
509      * Spawn a thread for each Participant involved in a transaction */
510     pthread_t thread[pilecount];
511     pthread_attr_t attr;                        
512     pthread_cond_t tcond;
513     pthread_mutex_t tlock;
514     pthread_mutex_t tlshrd;
515     
516     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
517     ltdata = calloc(1, sizeof(local_thread_data_array_t));
518     
519     thread_response_t rcvd_control_msg[pilecount];      /* Shared thread array that keeps track of responses of participants */
520     
521     /* Initialize and set thread detach attribute */
522     pthread_attr_init(&attr);
523     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
524     pthread_mutex_init(&tlock, NULL);
525     pthread_cond_init(&tcond, NULL);
526
527     /* Process each machine pile */
528     while(pile != NULL) {
529       //Create transaction id
530       newtid++;
531       tosend = calloc(1, sizeof(trans_req_data_t));
532       tosend->f.control = TRANS_REQUEST;
533       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
534       tosend->f.mcount = pilecount;
535       tosend->f.numread = pile->numread;
536       tosend->f.nummod = pile->nummod;
537       tosend->f.numcreated = pile->numcreated;
538       tosend->f.sum_bytes = pile->sum_bytes;
539       tosend->listmid = listmid;
540       tosend->objread = pile->objread;
541       tosend->oidmod = pile->oidmod;
542       tosend->oidcreated = pile->oidcreated;
543       thread_data_array[threadnum].thread_id = threadnum;
544       thread_data_array[threadnum].mid = pile->mid;
545       thread_data_array[threadnum].buffer = tosend;
546       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
547       thread_data_array[threadnum].threshold = &tcond;
548       thread_data_array[threadnum].lock = &tlock;
549       thread_data_array[threadnum].count = &trecvcount;
550       thread_data_array[threadnum].replyctrl = &treplyctrl;
551       thread_data_array[threadnum].replyretry = &treplyretry;
552       thread_data_array[threadnum].rec = record;
553       /* If local do not create any extra connection */
554       if(pile->mid != myIpAddr) { /* Not local */
555         do {
556           rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
557         } while(rc!=0);
558         if(rc) {
559           perror("Error in pthread create\n");
560           pthread_cond_destroy(&tcond);
561           pthread_mutex_destroy(&tlock);
562           pDelete(pile_ptr);
563           free(listmid);
564           for (i = 0; i < threadnum; i++)
565             free(thread_data_array[i].buffer);
566           free(thread_data_array);
567           free(ltdata);
568           return 1;
569         }
570       } else { /*Local*/
571         ltdata->tdata = &thread_data_array[threadnum];
572         ltdata->transinfo = &transinfo;
573         do {
574           val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
575         } while(val!=0);
576         if(val) {
577           perror("Error in pthread create\n");
578           pthread_cond_destroy(&tcond);
579           pthread_mutex_destroy(&tlock);
580           pDelete(pile_ptr);
581           free(listmid);
582           for (i = 0; i < threadnum; i++)
583             free(thread_data_array[i].buffer);
584           free(thread_data_array);
585           free(ltdata);
586           return 1;
587         }
588       }
589       
590       threadnum++;              
591       pile = pile->next;
592     }
593     /* Free attribute and wait for the other threads */
594     pthread_attr_destroy(&attr);
595     
596     for (i = 0; i < threadnum; i++) {
597       rc = pthread_join(thread[i], NULL);
598       if(rc)
599         {
600           printf("Error: return code from pthread_join() is %d\n", rc);
601           pthread_cond_destroy(&tcond);
602           pthread_mutex_destroy(&tlock);
603           pDelete(pile_ptr);
604           free(listmid);
605           for (j = i; j < threadnum; j++) {
606             free(thread_data_array[j].buffer);
607           }
608           return 1;
609         }
610       free(thread_data_array[i].buffer);
611     }
612
613     /* Free resources */        
614     pthread_cond_destroy(&tcond);
615     pthread_mutex_destroy(&tlock);
616     free(listmid);
617
618     if (!treplyretry)
619       pDelete(pile_ptr);
620     
621     /* wait a random amount of time before retrying to commit transaction*/
622     if(treplyretry) {
623       free(thread_data_array);
624       free(ltdata);
625       randomdelay();
626     }
627     
628     /* Retry trans commit procedure during soft_abort case */
629   } while (treplyretry);
630   
631   if(treplyctrl == TRANS_ABORT) {
632 #ifdef TRANSSTATS
633     numTransAbort++;
634 #endif
635     /* Free Resources */
636     objstrDelete(record->cache);
637     chashDelete(record->lookupTable);
638     free(record);
639     free(thread_data_array);
640     free(ltdata);
641     return TRANS_ABORT;
642   } else if(treplyctrl == TRANS_COMMIT) {
643 #ifdef TRANSSTATS
644     numTransCommit++;
645 #endif
646     /* Free Resources */
647     objstrDelete(record->cache);
648     chashDelete(record->lookupTable);
649     free(record);
650     free(thread_data_array);
651     free(ltdata);
652     return 0;
653   } else {
654     //TODO Add other cases
655     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
656     exit(-1);
657   }
658   return 0;
659 }
660
661 /* This function sends information involved in the transaction request 
662  * to participants and accepts a response from particpants.
663  * It calls decideresponse() to decide on what control message 
664  * to send next to participants and sends the message using sendResponse()*/
665 void *transRequest(void *threadarg) {
666   int sd, i, n;
667   struct sockaddr_in serv_addr;
668   thread_data_array_t *tdata;
669   objheader_t *headeraddr;
670   char control, recvcontrol;
671   char machineip[16], retval;
672
673   tdata = (thread_data_array_t *) threadarg;
674
675   if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
676     printf("transRequest(): socket create error\n");
677     pthread_exit(NULL);
678   }
679
680   /* Send bytes of data with TRANS_REQUEST control message */
681   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
682
683   /* Send list of machines involved in the transaction */
684   {
685     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
686     send_data(sd, tdata->buffer->listmid, size);
687   }
688   
689   /* Send oids and version number tuples for objects that are read */
690   {
691     int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
692     send_data(sd, tdata->buffer->objread, size);
693   }
694   
695   /* Send objects that are modified */
696   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
697     int size;
698     if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
699       printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
700       pthread_exit(NULL);
701     }
702     GETSIZE(size,headeraddr);
703     size+=sizeof(objheader_t);
704     send_data(sd, headeraddr, size);
705   }
706   
707   /* Read control message from Participant */
708   recv_data(sd, &control, sizeof(char));
709   /* Recv Objects if participant sends TRANS_DISAGREE */
710 #ifdef CACHE
711   if(control == TRANS_DISAGREE) {
712     int length;
713     recv_data(sd, &length, sizeof(int));
714     void *newAddr;
715     pthread_mutex_lock(&prefetchcache_mutex);
716     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
717       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
718       pthread_exit(NULL);
719     }
720     pthread_mutex_unlock(&prefetchcache_mutex);
721     recv_data(sd, newAddr, length);
722     int offset = 0;
723     while(length != 0) {
724       unsigned int oidToPrefetch;
725       objheader_t * header;
726       header = (objheader_t *) (((char *)newAddr) + offset);
727       oidToPrefetch = OID(header);
728       int size = 0;
729       GETSIZE(size, header);
730       size += sizeof(objheader_t);
731       //make an entry in prefetch hash table
732       void *oldptr;
733       if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
734         prehashRemove(oidToPrefetch);
735         prehashInsert(oidToPrefetch, header);
736       } else {
737         prehashInsert(oidToPrefetch, header);
738       }
739       length = length - size;
740       offset += size;
741     }
742   }
743 #endif
744
745   recvcontrol = control;
746   /* Update common data structure and increment count */
747   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
748   
749   /* Lock and update count */
750   /* Thread sleeps until all messages from pariticipants are received by coordinator */
751   pthread_mutex_lock(tdata->lock);
752   
753   (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
754   
755   /* Wake up the threads and invoke decideResponse (once) */
756   if(*(tdata->count) == tdata->buffer->f.mcount) {
757     decideResponse(tdata); 
758     pthread_cond_broadcast(tdata->threshold);
759   } else {
760     pthread_cond_wait(tdata->threshold, tdata->lock);
761   }
762   pthread_mutex_unlock(tdata->lock);
763
764   /* clear objects from prefetch cache */
765   /*
766   if(*(tdata->replyctrl) == TRANS_ABORT) {
767     int i;
768     for(i=0; i<tdata->buffer->f.nummod; i++) {
769       unsigned int oid = tdata->buffer->oidmod[i];
770       objheader_t *header;
771       if((header = prehashSearch(oid)) != NULL) {
772         prehashRemove(oid);
773       }
774     }
775     for(i=0; i<tdata->buffer->f.numread; i++) {
776       char *objread = tdata->buffer->objread;
777       unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) +
778                   sizeof(unsigned short))*i));
779       objheader_t *header;
780       if((header = prehashSearch(oid)) != NULL) {
781         prehashRemove(oid);
782       }
783     }
784   }
785   */
786
787 #ifdef CACHE
788   if(*(tdata->replyctrl) == TRANS_COMMIT) {
789     int retval;
790      /* Update prefetch cache */
791     if((retval = updatePrefetchCache(tdata)) != 0) {
792       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
793       return;
794     }
795
796     /* Invalidate objects in other machine cache */
797     if(tdata->buffer->f.nummod > 0) {
798       if((retval = invalidateObj(tdata)) != 0) {
799         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
800         return;
801       }
802     }
803   }
804 #endif
805   
806   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
807    * to all participants in their respective socket */
808   if (sendResponse(tdata, sd) == 0) { 
809     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
810     pthread_exit(NULL);
811   }
812   
813   recv_data((int)sd, &control, sizeof(char)); 
814   
815   if(control == TRANS_UNSUCESSFUL) {
816     //printf("DEBUG-> TRANS_ABORTED\n");
817   } else if(control == TRANS_SUCESSFUL) {
818     //printf("DEBUG-> TRANS_SUCCESSFUL\n");
819   } else {
820     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
821   }
822   pthread_exit(NULL);
823 }
824
825 /* This function decides the reponse that needs to be sent to 
826  * all Participant machines after the TRANS_REQUEST protocol */
827 void decideResponse(thread_data_array_t *tdata) {
828   char control;
829   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
830                                                                    message to send */
831   
832   for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
833     control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
834                                                written onto the shared array */
835     switch(control) {
836     default:
837       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
838       /* treat as disagree, pass thru */
839     case TRANS_DISAGREE:
840       transdisagree++;
841       break;
842       
843     case TRANS_AGREE:
844       transagree++;
845       break;
846       
847     case TRANS_SOFT_ABORT:
848       transsoftabort++;
849       break;
850     }
851   }
852   
853   if(transdisagree > 0) {
854     /* Send Abort */
855     *(tdata->replyctrl) = TRANS_ABORT;
856     *(tdata->replyretry) = 0;
857 #ifdef CACHE
858     /* clear objects from prefetch cache */
859     cleanPCache(tdata);
860 #endif
861   } else if(transagree == tdata->buffer->f.mcount){
862     /* Send Commit */
863     *(tdata->replyctrl) = TRANS_COMMIT;
864     *(tdata->replyretry) = 0;
865 #ifdef CACHE
866 #if 0
867     /* Turn prefetching on */
868     int i;
869     for (i=0; i<numprefetchsites; i++)
870       evalPrefetch[i].operMode = 1;
871 #endif
872 #endif
873   } else { 
874     /* Send Abort in soft abort case followed by retry commiting transaction again*/
875     *(tdata->replyctrl) = TRANS_ABORT;
876     *(tdata->replyretry) = 1;
877   }
878   return;
879 }
880
881 /* This function sends the final response to remote machines per
882  * thread in their respective socket id It returns a char that is only
883  * needed to check the correctness of execution of this function
884  * inside transRequest()*/
885
886 char sendResponse(thread_data_array_t *tdata, int sd) {
887   int n, size, sum, oidcount = 0, control;
888   char *ptr, retval = 0;
889   unsigned int *oidnotfound;
890   
891   control = *(tdata->replyctrl);
892   send_data(sd, &control, sizeof(char));
893   
894   //TODO read missing objects during object migration
895   /* If response is a soft abort due to missing objects at the
896      Participant's side */
897   
898   /* If the decided response is TRANS_ABORT */
899   if(*(tdata->replyctrl) == TRANS_ABORT) {
900     retval = TRANS_ABORT;
901   } else if(*(tdata->replyctrl) == TRANS_COMMIT) { 
902     /* If the decided response is TRANS_COMMIT */ 
903     retval = TRANS_COMMIT;
904   }
905   
906   return retval;
907 }
908
909 /* This function opens a connection, places an object read request to
910  * the remote machine, reads the control message and object if
911  * available and copies the object and its header to the local
912  * cache. */ 
913
914 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
915   int size, val;
916   struct sockaddr_in serv_addr;
917   char machineip[16];
918   char control;
919   objheader_t *h;
920   void *objcopy = NULL;
921   
922   int sd = getSock2(transReadSockPool, mnum);
923   char readrequest[sizeof(char)+sizeof(unsigned int)];
924   readrequest[0] = READ_REQUEST;
925   *((unsigned int *)(&readrequest[1])) = oid;
926   send_data(sd, readrequest, sizeof(readrequest));
927   
928   /* Read response from the Participant */
929   recv_data(sd, &control, sizeof(char));
930   
931   if (control==OBJECT_NOT_FOUND) {
932     objcopy = NULL;
933   } else {
934     /* Read object if found into local cache */
935     recv_data(sd, &size, sizeof(int));
936     objcopy = objstrAlloc(record->cache, size);
937     recv_data(sd, objcopy, size);
938     /* Insert into cache's lookup table */
939     chashInsert(record->lookupTable, oid, objcopy); 
940   }
941   
942   return objcopy;
943 }
944
945 /* This function handles the local objects involved in a transaction
946  * commiting process.  It also makes a decision if this local machine
947  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator.  Note
948  * Coordinator = local machine It wakes up the other threads from
949  * remote participants that are waiting for the coordinator's decision
950  * and based on common agreement it either commits or aborts the
951  * transaction.  It also frees the memory resources */
952
953 void *handleLocalReq(void *threadarg) {
954   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
955   local_thread_data_array_t *localtdata;
956   int numoidnotfound = 0, numoidlocked = 0;
957   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
958   int numread, i;
959   unsigned int oid;
960   unsigned short version;
961   void *mobj;
962   objheader_t *headptr;
963
964   localtdata = (local_thread_data_array_t *) threadarg;
965   
966   /* Counters and arrays to formulate decision on control message to be sent */
967   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
968   oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
969   
970   numread = localtdata->tdata->buffer->f.numread;
971   /* Process each oid in the machine pile/ group per thread */
972   for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
973     if (i < localtdata->tdata->buffer->f.numread) {
974       int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
975       incr *= i;
976       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
977       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
978     } else { // Objects Modified
979       int tmpsize;
980       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
981       if (headptr == NULL) {
982         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
983         return NULL;
984       }
985       oid = OID(headptr);
986       version = headptr->version;
987     }
988     /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
989     
990     /* Save the oids not found and number of oids not found for later use */
991     if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
992       /* Save the oids not found and number of oids not found for later use */
993       oidnotfound[numoidnotfound] = oid;
994       numoidnotfound++;
995     } else { /* If Obj found in machine (i.e. has not moved) */
996       /* Check if Obj is locked by any previous transaction */
997       if (test_and_set(STATUSPTR(mobj))) {
998         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
999           v_matchlock++;
1000         } else {/* If versions don't match ...HARD ABORT */
1001           v_nomatch++;
1002           /* Send TRANS_DISAGREE to Coordinator */
1003           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1004           break;
1005         }
1006       } else {
1007         //we're locked
1008         /* Save all object oids that are locked on this machine during this transaction request call */
1009         oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
1010         numoidlocked++;
1011         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1012           v_matchnolock++;
1013         } else { /* If versions don't match ...HARD ABORT */
1014           v_nomatch++;
1015           /* Send TRANS_DISAGREE to Coordinator */
1016           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1017           break;
1018         }
1019       }
1020     }
1021   } // End for
1022   /* Condition to send TRANS_AGREE */
1023   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
1024     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
1025   }
1026   /* Condition to send TRANS_SOFT_ABORT */
1027   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1028     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
1029   }
1030   
1031   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1032    * if Participant receives a TRANS_COMMIT */
1033   localtdata->transinfo->objlocked = oidlocked;
1034   localtdata->transinfo->objnotfound = oidnotfound;
1035   localtdata->transinfo->modptr = NULL;
1036   localtdata->transinfo->numlocked = numoidlocked;
1037   localtdata->transinfo->numnotfound = numoidnotfound;
1038   /* Lock and update count */
1039   //Thread sleeps until all messages from pariticipants are received by coordinator
1040   pthread_mutex_lock(localtdata->tdata->lock);
1041   (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
1042   
1043   /* Wake up the threads and invoke decideResponse (once) */
1044   if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
1045     decideResponse(localtdata->tdata); 
1046     pthread_cond_broadcast(localtdata->tdata->threshold);
1047   } else {
1048     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
1049   }
1050   pthread_mutex_unlock(localtdata->tdata->lock);
1051   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
1052     if(transAbortProcess(localtdata) != 0) {
1053       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1054       fflush(stdout);
1055       pthread_exit(NULL);
1056     }
1057   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
1058 #ifdef CACHE
1059     /* Invalidate objects in other machine cache */
1060     if(localtdata->tdata->buffer->f.nummod > 0) {
1061       int retval;
1062       if((retval = invalidateObj(localtdata->tdata)) != 0) {
1063         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1064         return;
1065       }
1066     }
1067 #endif
1068     if(transComProcess(localtdata) != 0) {
1069       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1070       fflush(stdout);
1071       pthread_exit(NULL);
1072     }
1073   }
1074   /* Free memory */
1075   if (localtdata->transinfo->objlocked != NULL) {
1076     free(localtdata->transinfo->objlocked);
1077   }
1078   if (localtdata->transinfo->objnotfound != NULL) {
1079     free(localtdata->transinfo->objnotfound);
1080   }
1081   pthread_exit(NULL);
1082 }
1083
1084 /* This function completes the ABORT process if the transaction is aborting */
1085 int transAbortProcess(local_thread_data_array_t  *localtdata) {
1086   int i, numlocked;
1087   unsigned int *objlocked;
1088   void *header;
1089   
1090   numlocked = localtdata->transinfo->numlocked;
1091   objlocked = localtdata->transinfo->objlocked;
1092   
1093   for (i = 0; i < numlocked; i++) {
1094     if((header = mhashSearch(objlocked[i])) == NULL) {
1095       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1096       return 1;
1097     }
1098     UnLock(STATUSPTR(header));
1099   }
1100
1101   return 0;
1102 }
1103
1104 /*This function completes the COMMIT process if the transaction is commiting*/
1105 int transComProcess(local_thread_data_array_t  *localtdata) {
1106   objheader_t *header, *tcptr;
1107   int i, nummod, tmpsize, numcreated, numlocked;
1108   unsigned int *oidmod, *oidcreated, *oidlocked;
1109   void *ptrcreate;
1110   
1111   nummod = localtdata->tdata->buffer->f.nummod;
1112   oidmod = localtdata->tdata->buffer->oidmod;
1113   numcreated = localtdata->tdata->buffer->f.numcreated;
1114   oidcreated = localtdata->tdata->buffer->oidcreated;
1115   numlocked = localtdata->transinfo->numlocked;
1116   oidlocked = localtdata->transinfo->objlocked;
1117
1118   for (i = 0; i < nummod; i++) {
1119     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1120       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1121       return 1;
1122     }
1123     /* Copy from transaction cache -> main object store */
1124     if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
1125       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1126       return 1;
1127     }
1128     GETSIZE(tmpsize, header);
1129     char *tmptcptr = (char *) tcptr;
1130     memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
1131     header->version += 1;
1132     if(header->notifylist != NULL) {
1133       notifyAll(&header->notifylist, OID(header), header->version);
1134     }
1135   }
1136   /* If object is newly created inside transaction then commit it */
1137   for (i = 0; i < numcreated; i++) {
1138     if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
1139       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1140       return 1;
1141     }
1142     GETSIZE(tmpsize, header);
1143     tmpsize += sizeof(objheader_t);
1144     pthread_mutex_lock(&mainobjstore_mutex);
1145     if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1146       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1147       pthread_mutex_unlock(&mainobjstore_mutex);
1148       return 1;
1149     }
1150     pthread_mutex_unlock(&mainobjstore_mutex);
1151     memcpy(ptrcreate, header, tmpsize);
1152     mhashInsert(oidcreated[i], ptrcreate);
1153     lhashInsert(oidcreated[i], myIpAddr);
1154   }
1155   /* Unlock locked objects */
1156   for(i = 0; i < numlocked; i++) {
1157     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1158       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1159       return 1;
1160     }
1161     UnLock(STATUSPTR(header));
1162   }
1163   
1164   return 0;
1165 }
1166
1167 prefetchpile_t *foundLocal(char *ptr) {
1168   int siteid = *(GET_SITEID(ptr));
1169   int ntuples = *(GET_NTUPLES(ptr));
1170   unsigned int * oidarray = GET_PTR_OID(ptr);
1171   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1172   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1173   prefetchpile_t * head=NULL;
1174   int numLocal = 0;
1175   
1176   int i;
1177   for(i=0;i<ntuples; i++) {
1178     unsigned short baseindex=(i==0)?0:endoffsets[i-1];
1179     unsigned short endindex=endoffsets[i];
1180     unsigned int oid=oidarray[i];
1181     int newbase;
1182     int machinenum;
1183     if (oid==0)
1184       continue;
1185     //Look up fields locally
1186     for(newbase=baseindex;newbase<endindex;newbase++) {
1187       if (!lookupObject(&oid, arryfields[newbase]))
1188         break;
1189       //Ended in a null pointer...
1190       if (oid==0)
1191         goto tuple;
1192     }
1193     //Entire prefetch is local
1194     if (newbase==endindex&&checkoid(oid)){
1195       numLocal++;
1196       goto tuple;
1197     }
1198     //Add to remote requests
1199     machinenum=lhashSearch(oid);
1200     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1201   tuple:
1202     ;
1203   }
1204
1205   /* handle dynamic prefetching */
1206   handleDynPrefetching(numLocal, ntuples, siteid);
1207   return head;
1208 }
1209
1210 int checkoid(unsigned int oid) {
1211   objheader_t *header;
1212   if ((header=mhashSearch(oid))!=NULL) {
1213     //Found on machine
1214     return 1;
1215   } else if ((header=prehashSearch(oid))!=NULL) {
1216     //Found in cache
1217     return 1;
1218   } else {
1219     return 0;
1220   }
1221 }
1222
1223 int lookupObject(unsigned int * oid, short offset) {
1224   objheader_t *header;
1225   if ((header=mhashSearch(*oid))!=NULL) {
1226     //Found on machine
1227     ;
1228   } else if ((header=prehashSearch(*oid))!=NULL) {
1229     //Found in cache
1230     ;
1231   } else {
1232     return 0;
1233   }
1234
1235   if(TYPE(header) > NUMCLASSES) {
1236     int elementsize = classsize[TYPE(header)];
1237     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1238     int length = ao->___length___;
1239     /* Check if array out of bounds */
1240     if(offset < 0 || offset >= length) {
1241       //if yes treat the object as found
1242       (*oid)=0;
1243       return 1;
1244     }
1245     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1246     return 1;
1247   } else {
1248     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1249     return 1;
1250   }
1251 }
1252
1253
1254 /* This function is called by the thread calling transPrefetch */
1255 void *transPrefetch(void *t) {
1256   while(1) {
1257     /* lock mutex of primary prefetch queue */
1258     void *node=gettail();
1259     /* Check if the tuples are found locally, if yes then reduce them further*/ 
1260     /* and group requests by remote machine ids by calling the makePreGroups() */
1261     prefetchpile_t *pilehead = foundLocal(node);
1262
1263     if (pilehead!=NULL) {
1264       // Get sock from shared pool 
1265       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1266       
1267       /* Send  Prefetch Request */
1268       prefetchpile_t *ptr = pilehead;
1269       while(ptr != NULL) {
1270         sendPrefetchReq(ptr, sd);
1271         ptr = ptr->next; 
1272       }
1273       
1274       /* Release socket */
1275       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1276       
1277       /* Deallocated pilehead */
1278       mcdealloc(pilehead);
1279     }
1280     // Deallocate the prefetch queue pile node
1281     inctail();
1282   }
1283 }
1284
1285 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1286   objpile_t *tmp;
1287   
1288   int size=sizeof(char)+sizeof(int);
1289   for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
1290     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1291   }
1292   
1293   char buft[size];
1294   char *buf=buft;
1295   *buf=TRANS_PREFETCH;
1296   buf+=sizeof(char);
1297   
1298   for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
1299     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1300     *((int*)buf)=len;
1301     buf+=sizeof(int);
1302     *((unsigned int *)buf)=tmp->oid;
1303     buf+=sizeof(unsigned int);
1304     *((unsigned int *)(buf)) = myIpAddr; 
1305     buf+=sizeof(unsigned int);
1306     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1307     buf+=tmp->numoffset*sizeof(short);
1308   }
1309   *((int *)buf)=-1;
1310   send_data(sd, buft, size);
1311   return;
1312 }
1313
1314 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1315   int len, endpair;
1316   char control;
1317   objpile_t *tmp;
1318   
1319   /* Send TRANS_PREFETCH control message */
1320   control = TRANS_PREFETCH;
1321   send_data(sd, &control, sizeof(char));
1322   
1323   /* Send Oids and offsets in pairs */
1324   tmp = mcpilenode->objpiles;
1325   while(tmp != NULL) {
1326     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1327     char oidnoffset[len];
1328     char *buf=oidnoffset;
1329     *((int*)buf) = tmp->numoffset;
1330     buf+=sizeof(int);
1331     *((unsigned int *)buf) = tmp->oid;
1332     buf+=sizeof(unsigned int);
1333     *((unsigned int *)buf) = myIpAddr; 
1334     buf += sizeof(unsigned int);
1335     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1336     send_data(sd, oidnoffset, len);
1337     tmp = tmp->next;
1338   }
1339   
1340   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1341   endpair = -1;
1342   send_data(sd, &endpair, sizeof(int));
1343   
1344   return;
1345 }
1346
1347 int getPrefetchResponse(int sd) {
1348   int length = 0, size = 0;
1349   char control;
1350   unsigned int oid;
1351   void *modptr, *oldptr;
1352   
1353   recv_data((int)sd, &length, sizeof(int)); 
1354   size = length - sizeof(int);
1355   char recvbuffer[size];
1356
1357   recv_data((int)sd, recvbuffer, size);
1358   control = *((char *) recvbuffer);
1359   if(control == OBJECT_FOUND) {
1360     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1361     size = size - (sizeof(char) + sizeof(unsigned int));
1362     pthread_mutex_lock(&prefetchcache_mutex);
1363     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1364       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1365       pthread_mutex_unlock(&prefetchcache_mutex);
1366       return -1;
1367     }
1368     pthread_mutex_unlock(&prefetchcache_mutex);
1369     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1370     STATUS(modptr)=0;
1371
1372     /* Insert the oid and its address into the prefetch hash lookup table */
1373     /* Do a version comparison if the oid exists */
1374     if((oldptr = prehashSearch(oid)) != NULL) {
1375       /* If older version then update with new object ptr */
1376       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1377         prehashRemove(oid);
1378         prehashInsert(oid, modptr);
1379       }
1380     } else {/* Else add the object ptr to hash table*/
1381       prehashInsert(oid, modptr);
1382     }
1383     /* Lock the Prefetch Cache look up table*/
1384     pthread_mutex_lock(&pflookup.lock);
1385     /* Broadcast signal on prefetch cache condition variable */ 
1386     pthread_cond_broadcast(&pflookup.cond);
1387     /* Unlock the Prefetch Cache look up table*/
1388     pthread_mutex_unlock(&pflookup.lock);
1389   } else if(control == OBJECT_NOT_FOUND) {
1390     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1391     /* TODO: For each object not found query DHT for new location and retrieve the object */
1392     /* Throw an error */
1393     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1394     //    exit(-1);
1395   } else {
1396     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1397   }
1398   
1399   return 0;
1400 }
1401
1402 unsigned short getObjType(unsigned int oid) {
1403   objheader_t *objheader;
1404   unsigned short numoffset[] ={0};
1405   short fieldoffset[] ={};
1406
1407   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1408 #ifdef CACHE
1409     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1410 #endif
1411       unsigned int mid = lhashSearch(oid);
1412       int sd = getSock2(transReadSockPool, mid);
1413       char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1414       remotereadrequest[0] = READ_REQUEST;
1415       *((unsigned int *)(&remotereadrequest[1])) = oid;
1416       send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1417
1418       /* Read response from the Participant */
1419       char control;
1420       recv_data(sd, &control, sizeof(char));
1421
1422       if (control==OBJECT_NOT_FOUND) {
1423         printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1424         fflush(stdout);
1425         exit(-1);
1426       } else {
1427         /* Read object if found into local cache */
1428         int size;
1429         recv_data(sd, &size, sizeof(int));
1430 #ifdef CACHE
1431         pthread_mutex_lock(&prefetchcache_mutex);
1432         if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1433           printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1434           pthread_exit(NULL);
1435         }
1436         pthread_mutex_unlock(&prefetchcache_mutex);
1437         recv_data(sd, objheader, size);
1438         prehashInsert(oid, objheader);
1439         return TYPE(objheader);
1440 #else
1441         char *buffer;
1442         if((buffer = calloc(1, size)) == NULL) {
1443           printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1444           fflush(stdout);
1445           return 0;
1446         }
1447         recv_data(sd, buffer, size);
1448         objheader = (objheader_t *)buffer;
1449         unsigned short type = TYPE(objheader);
1450         free(buffer);
1451         return type;
1452 #endif
1453       }
1454 #ifdef CACHE
1455     }
1456 #endif
1457   }
1458   return TYPE(objheader);
1459 }
1460
1461 int startRemoteThread(unsigned int oid, unsigned int mid)
1462 {
1463         int sock;
1464         struct sockaddr_in remoteAddr;
1465         char msg[1 + sizeof(unsigned int)];
1466         int bytesSent;
1467         int status;
1468
1469         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1470         {
1471                 perror("startRemoteThread():socket()");
1472                 return -1;
1473         }
1474
1475         bzero(&remoteAddr, sizeof(remoteAddr));
1476         remoteAddr.sin_family = AF_INET;
1477         remoteAddr.sin_port = htons(LISTEN_PORT);
1478         remoteAddr.sin_addr.s_addr = htonl(mid);
1479         
1480         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1481         {
1482                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1483                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1484                 status = -1;
1485         }
1486         else
1487         {
1488                 msg[0] = START_REMOTE_THREAD;
1489         *((unsigned int *) &msg[1]) = oid;
1490                 send_data(sock, msg, 1 + sizeof(unsigned int));
1491         }
1492
1493         close(sock);
1494         return status;
1495 }
1496
1497 //TODO: when reusing oids, make sure they are not already in use!
1498 static unsigned int id = 0xFFFFFFFF;
1499 unsigned int getNewOID(void) {
1500         id += 2;
1501         if (id > oidMax || id < oidMin)
1502         {
1503                 id = (oidMin | 1);
1504         }
1505         return id;
1506 }
1507
1508 int processConfigFile()
1509 {
1510         FILE *configFile;
1511         const int maxLineLength = 200;
1512         char lineBuffer[maxLineLength];
1513         char *token;
1514         const char *delimiters = " \t\n";
1515         char *commentBegin;
1516         in_addr_t tmpAddr;
1517         
1518         configFile = fopen(CONFIG_FILENAME, "r");
1519         if (configFile == NULL)
1520         {
1521                 printf("error opening %s:\n", CONFIG_FILENAME);
1522                 perror("");
1523                 return -1;
1524         }
1525
1526         numHostsInSystem = 0;
1527         sizeOfHostArray = 8;
1528         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1529         
1530         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1531         {
1532                 commentBegin = strchr(lineBuffer, '#');
1533                 if (commentBegin != NULL)
1534                         *commentBegin = '\0';
1535                 token = strtok(lineBuffer, delimiters);
1536                 while (token != NULL)
1537                 {
1538                         tmpAddr = inet_addr(token);
1539                         if ((int)tmpAddr == -1)
1540                         {
1541                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1542                                 fclose(configFile);
1543                                 return -1;
1544                         }
1545                         else
1546                                 addHost(htonl(tmpAddr));
1547                         token = strtok(NULL, delimiters);
1548                 }
1549         }
1550
1551         fclose(configFile);
1552         
1553         if (numHostsInSystem < 1)
1554         {
1555                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1556                 return -1;
1557         }
1558 #ifdef MAC
1559         myIpAddr = getMyIpAddr("en1");
1560 #else
1561         myIpAddr = getMyIpAddr("eth0");
1562 #endif
1563         myIndexInHostArray = findHost(myIpAddr);
1564         if (myIndexInHostArray == -1)
1565         {
1566                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1567                 return -1;
1568         }
1569         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1570         oidMin = oidsPerBlock * myIndexInHostArray;
1571         if (myIndexInHostArray == numHostsInSystem - 1)
1572                 oidMax = 0xFFFFFFFF;
1573         else
1574                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1575
1576         return 0;
1577 }
1578
1579 void addHost(unsigned int hostIp)
1580 {
1581         unsigned int *tmpArray;
1582
1583         if (findHost(hostIp) != -1)
1584                 return;
1585
1586         if (numHostsInSystem == sizeOfHostArray)
1587         {
1588                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1589                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1590                 free(hostIpAddrs);
1591                 hostIpAddrs = tmpArray;
1592         }
1593
1594         hostIpAddrs[numHostsInSystem++] = hostIp;
1595
1596         return;
1597 }
1598
1599 int findHost(unsigned int hostIp)
1600 {
1601         int i;
1602         for (i = 0; i < numHostsInSystem; i++)
1603                 if (hostIpAddrs[i] == hostIp)
1604                         return i;
1605
1606         //not found
1607         return -1;
1608 }
1609
1610 /* This function sends notification request per thread waiting on object(s) whose version 
1611  * changes */
1612 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1613   int sock,i;
1614   objheader_t *objheader;
1615   struct sockaddr_in remoteAddr;
1616   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1617   char *ptr;
1618   int bytesSent;
1619   int status, size;
1620   unsigned short version;
1621   unsigned int oid,mid;
1622   static unsigned int threadid = 0;
1623   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1624   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1625   notifydata_t *ndata;
1626   
1627   oid = oidarry[0];
1628   if((mid = lhashSearch(oid)) == 0) {
1629     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1630     return;
1631   }
1632   
1633   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1634     perror("reqNotify():socket()");
1635     return -1;
1636   }
1637   
1638   bzero(&remoteAddr, sizeof(remoteAddr));
1639   remoteAddr.sin_family = AF_INET;
1640   remoteAddr.sin_port = htons(LISTEN_PORT);
1641   remoteAddr.sin_addr.s_addr = htonl(mid);
1642   
1643   /* Generate unique threadid */
1644   threadid++;
1645   
1646   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1647   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1648     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1649     return -1;
1650   }
1651   ndata->numoid = numoid;
1652   ndata->threadid = threadid;
1653   ndata->oidarry = oidarry;
1654   ndata->versionarry = versionarry;
1655   ndata->threadcond = threadcond;
1656   ndata->threadnotify = threadnotify;
1657   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1658     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1659     free(ndata);
1660     return -1; 
1661   }
1662   
1663   /* Send  number of oids, oidarry, version array, machine id and threadid */   
1664   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1665     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1666            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1667     free(ndata);
1668     return -1;
1669   } else {
1670     msg[0] = THREAD_NOTIFY_REQUEST;
1671     *((unsigned int *)(&msg[1])) = numoid;
1672     /* Send array of oids  */
1673     size = sizeof(unsigned int);
1674     {
1675       i = 0;
1676       while(i < numoid) {
1677         oid = oidarry[i];
1678         *((unsigned int *)(&msg[1] + size)) = oid;
1679         size += sizeof(unsigned int);
1680         i++;
1681       }
1682     }
1683     
1684     /* Send array of version  */
1685     {
1686       i = 0;
1687       while(i < numoid) {
1688         version = versionarry[i];
1689         *((unsigned short *)(&msg[1] + size)) = version;
1690         size += sizeof(unsigned short);
1691         i++;
1692       }
1693     }
1694     
1695     *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1696     size += sizeof(unsigned int);
1697     *((unsigned int *)(&msg[1] + size)) = threadid;
1698     pthread_mutex_lock(&(ndata->threadnotify));
1699     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1700     send_data(sock, msg, size);
1701     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1702     pthread_mutex_unlock(&(ndata->threadnotify));
1703   }
1704   
1705   pthread_cond_destroy(&threadcond);
1706   pthread_mutex_destroy(&threadnotify);
1707   free(ndata);
1708   close(sock);
1709   return status;
1710 }
1711
1712 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1713         notifydata_t *ndata;
1714         int i, objIsFound = 0, index;
1715         void *ptr;
1716
1717         //Look up the tid and call the corresponding pthread_cond_signal
1718         if((ndata = notifyhashSearch(tid)) == NULL) {
1719                 printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1720                 return;
1721         } else  {
1722                 for(i = 0; i < ndata->numoid; i++) {
1723                         if(ndata->oidarry[i] == oid){
1724                                 objIsFound = 1;
1725                                 index = i;
1726                         }
1727                 }
1728                 if(objIsFound == 0){
1729                         printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1730                         return;
1731                 } else {
1732                         if(version <= ndata->versionarry[index]){
1733                                 printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
1734                                 return;
1735                         } else {
1736 #ifdef CACHE
1737                                 /* Clear from prefetch cache and free thread related data structure */
1738                                 if((ptr = prehashSearch(oid)) != NULL) {
1739                                         prehashRemove(oid);
1740                                 }
1741 #endif
1742                                 pthread_cond_signal(&(ndata->threadcond));
1743                         }
1744                 }
1745         }
1746         return;
1747 }
1748
1749 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1750   threadlist_t *ptr;
1751   unsigned int mid;
1752   struct sockaddr_in remoteAddr;
1753   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1754   int sock, status, size, bytesSent;
1755   
1756   while(*head != NULL) {
1757     ptr = *head;
1758     mid = ptr->mid; 
1759     //create a socket connection to that machine
1760     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1761       perror("notifyAll():socket()");
1762       return -1;
1763     }
1764     
1765     bzero(&remoteAddr, sizeof(remoteAddr));
1766     remoteAddr.sin_family = AF_INET;
1767     remoteAddr.sin_port = htons(LISTEN_PORT);
1768     remoteAddr.sin_addr.s_addr = htonl(mid);
1769     //send Thread Notify response and threadid to that machine
1770     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1771       printf("notifyAll():error %d connecting to %s:%d\n", errno,
1772              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1773       fflush(stdout);
1774       status = -1;
1775     } else {
1776       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1777       msg[0] = THREAD_NOTIFY_RESPONSE;
1778       *((unsigned int *)&msg[1]) = oid;
1779       size = sizeof(unsigned int);
1780       *((unsigned short *)(&msg[1]+ size)) = version;
1781       size+= sizeof(unsigned short);
1782       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1783       
1784       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1785       send_data(sock, msg, size);
1786     }
1787     //close socket
1788     close(sock);
1789     // Update head
1790     *head = ptr->next;
1791     free(ptr);
1792   }
1793   return status;
1794 }
1795
1796 void transAbort(transrecord_t *trans) {
1797   objstrDelete(trans->cache);
1798   chashDelete(trans->lookupTable);
1799   free(trans);
1800 }
1801
1802 /* This function inserts necessary information into 
1803  * a machine pile data structure */
1804 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
1805   plistnode_t *ptr, *tmp;
1806   int found = 0, offset = 0;
1807
1808   tmp = pile;
1809   //Add oid into a machine that is already present in the pile linked list structure
1810   while(tmp != NULL) {
1811     if (tmp->mid == mid) {
1812       int tmpsize;
1813
1814       if (STATUS(headeraddr) & NEW) {
1815         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
1816         tmp->numcreated++;
1817         GETSIZE(tmpsize, headeraddr);
1818         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1819       }else if (STATUS(headeraddr) & DIRTY) {
1820         tmp->oidmod[tmp->nummod] = OID(headeraddr);
1821         tmp->nummod++;
1822         GETSIZE(tmpsize, headeraddr);
1823         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1824       } else {
1825         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
1826         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
1827         offset += sizeof(unsigned int);
1828         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
1829         tmp->numread ++;
1830       }
1831       found = 1;
1832       break;
1833     }
1834     tmp = tmp->next;
1835   }
1836   //Add oid for any new machine 
1837   if (!found) {
1838     int tmpsize;
1839     if((ptr = pCreate(num_objs)) == NULL) {
1840       return NULL;
1841     }
1842     ptr->mid = mid;
1843     if (STATUS(headeraddr) & NEW) {
1844       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
1845       ptr->numcreated ++;
1846       GETSIZE(tmpsize, headeraddr);
1847       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1848     } else if (STATUS(headeraddr) & DIRTY) {
1849       ptr->oidmod[ptr->nummod] = OID(headeraddr);
1850       ptr->nummod ++;
1851       GETSIZE(tmpsize, headeraddr);
1852       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1853     } else {
1854       *((unsigned int *)ptr->objread)=OID(headeraddr);
1855       offset = sizeof(unsigned int);
1856       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
1857       ptr->numread ++;
1858     }
1859     ptr->next = pile;
1860     pile = ptr;
1861   }
1862   
1863   /* Clear Flags */
1864   STATUS(headeraddr) =0;
1865   
1866   return pile;
1867 }