b5527c2be573efd1ca69930b98d0e58a5fc1e57d
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #ifdef COMPILER
14 #include "thread.h"
15 #endif
16
17 #define NUM_THREADS 1
18 #define PREFETCH_CACHE_SIZE 1048576 //1MB
19 #define CONFIG_FILENAME "dstm.conf"
20
21 /* Global Variables */
22 extern int classsize[];
23 pfcstats_t *evalPrefetch;
24 objstr_t *prefetchcache; //Global Prefetch cache
25 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
26 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
27 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
28 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
29 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
30 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
31 extern objstr_t *mainobjstore;
32 unsigned int myIpAddr;
33 unsigned int *hostIpAddrs;
34 int sizeOfHostArray;
35 int numHostsInSystem;
36 int myIndexInHostArray;
37 unsigned int oidsPerBlock;
38 unsigned int oidMin;
39 unsigned int oidMax;
40
41 sockPoolHashTable_t *transReadSockPool;
42 sockPoolHashTable_t *transPrefetchSockPool;
43 sockPoolHashTable_t *transRequestSockPool;
44 pthread_mutex_t notifymutex;
45 pthread_mutex_t atomicObjLock;
46
47 /***********************************
48  * Global Variables for statistics
49  **********************************/
50 int numTransCommit = 0;
51 int numTransAbort = 0;
52 int nchashSearch = 0;
53 int nmhashSearch = 0;
54 int nprehashSearch = 0;
55 int nRemoteSend = 0;
56
57 void printhex(unsigned char *, int);
58 plistnode_t *createPiles(transrecord_t *);
59
60 /*******************************
61  * Send and Recv function calls 
62  *******************************/
63 void send_data(int fd , void *buf, int buflen) {
64   char *buffer = (char *)(buf); 
65   int size = buflen;
66   int numbytes; 
67   while (size > 0) {
68     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
69     if (numbytes == -1) {
70       perror("send");
71       exit(-1);
72     }
73     buffer += numbytes;
74     size -= numbytes;
75   }
76 }
77
78 void recv_data(int fd , void *buf, int buflen) {
79   char *buffer = (char *)(buf); 
80   int size = buflen;
81   int numbytes; 
82   while (size > 0) {
83     numbytes = recv(fd, buffer, size, 0);
84     if (numbytes == -1) {
85       perror("recv");
86       exit(-1);
87     }
88     buffer += numbytes;
89     size -= numbytes;
90   }
91 }
92
93 int recv_data_errorcode(int fd , void *buf, int buflen) {
94   char *buffer = (char *)(buf); 
95   int size = buflen;
96   int numbytes; 
97   while (size > 0) {
98     numbytes = recv(fd, buffer, size, 0);
99     if (numbytes==0)
100       return 0;
101     if (numbytes == -1) {
102       perror("recv");
103       return -1;
104     }
105     buffer += numbytes;
106     size -= numbytes;
107   }
108   return 1;
109 }
110
111 void printhex(unsigned char *ptr, int numBytes) {
112   int i;
113   for (i = 0; i < numBytes; i++) {
114     if (ptr[i] < 16)
115       printf("0%x ", ptr[i]);
116     else
117       printf("%x ", ptr[i]);
118   }
119   printf("\n");
120   return;
121 }
122
123 inline int arrayLength(int *array) {
124   int i;
125   for(i=0 ;array[i] != -1; i++)
126     ;
127   return i;
128 }
129
130 inline int findmax(int *array, int arraylength) {
131   int max, i;
132   max = array[0];
133   for(i = 0; i < arraylength; i++){
134     if(array[i] > max) {
135       max = array[i];
136     }
137   }
138   return max;
139 }
140
141 /* This function is a prefetch call generated by the compiler that
142  * populates the shared primary prefetch queue*/
143 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
144   /* Allocate for the queue node*/
145   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
146   char * node= getmemory(qnodesize);
147   /* Set queue node values */
148   int len;
149   int top=endoffsets[ntuples-1];
150
151   /* TODO: Remove this after testing */
152   evalPrefetch[siteid].callcount++;
153
154   *((int *)(node))=siteid;
155   *((int *)(node + sizeof(int))) = ntuples;
156   len = 2*sizeof(int);
157   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
158   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
159   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
160
161   /* Lock and insert into primary prefetch queue */
162   movehead(qnodesize);
163 }
164
165 /* This function starts up the transaction runtime. */
166 int dstmStartup(const char * option) {
167   pthread_t thread_Listen, udp_thread_Listen;
168   pthread_attr_t attr;
169   int master=option!=NULL && strcmp(option, "master")==0;
170   int fd;
171   int udpfd;
172
173   if (processConfigFile() != 0)
174     return 0; //TODO: return error value, cause main program to exit
175 #ifdef COMPILER
176   if (!master)
177     threadcount--;
178 #endif
179
180 #ifdef TRANSSTATS
181   printf("Trans stats is on\n");
182   fflush(stdout);
183 #endif
184   
185   //Initialize socket pool
186   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
187   transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
188   transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1);
189   
190   dstmInit();
191   transInit();
192   
193   fd=startlistening();
194   udpfd = udpInit();
195   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
196   if (master) {
197     pthread_attr_init(&attr);
198     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
199     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
200     return 1;
201   } else {
202     dstmListen((void *)fd);
203     return 0;
204   }
205 }
206
207 //TODO Use this later
208 void *pCacheAlloc(objstr_t *store, unsigned int size) {
209   void *tmp;
210   objstr_t *ptr;
211   ptr = store;
212   int success = 0;
213   
214   while(ptr->next != NULL) {
215     /* check if store is empty */
216     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
217       tmp = ptr->top;
218       ptr->top += size;
219       success = 1;
220       return tmp;
221     } else {
222       ptr = ptr-> next;
223     }
224   }
225   
226   if(success == 0) {
227     return NULL;
228   }
229 }
230
231 /* This function initiates the prefetch thread A queue is shared
232  * between the main thread of execution and the prefetch thread to
233  * process the prefetch call Call from compiler populates the shared
234  * queue with prefetch requests while prefetch thread processes the
235  * prefetch requests */
236
237 void transInit() {
238   int t, rc;
239   int retval;
240   //Create and initialize prefetch cache structure
241   prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
242   initializePCache();
243   if((evalPrefetch = initPrefetchStats()) == NULL) {
244     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
245     exit(0);
246   }
247   
248   /* Initialize attributes for mutex */
249   pthread_mutexattr_init(&prefetchcache_mutex_attr);
250   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
251   
252   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
253   
254   pthread_mutex_init(&notifymutex, NULL);
255   pthread_mutex_init(&atomicObjLock, NULL);
256   //Create prefetch cache lookup table
257   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
258     printf("ERROR\n");
259     return; //Failure
260   }
261   
262   //Initialize primary shared queue
263   queueInit();
264   //Initialize machine pile w/prefetch oids and offsets shared queue
265   mcpileqInit();
266   
267   //Create the primary prefetch thread 
268   do {
269     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
270   } while(retval!=0);
271   pthread_detach(tPrefetch);
272 }
273
274 /* This function stops the threads spawned */
275 void transExit() {
276   int t;
277   pthread_cancel(tPrefetch);
278   for(t = 0; t < NUM_THREADS; t++)
279     pthread_cancel(wthreads[t]);
280   
281   return;
282 }
283
284 /* This functions inserts randowm wait delays in the order of msec
285  * Mostly used when transaction commits retry*/
286 void randomdelay() {
287   struct timespec req;
288   time_t t;
289   
290   t = time(NULL);
291   req.tv_sec = 0;
292   req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
293   nanosleep(&req, NULL);
294   return;
295 }
296
297 /* This function initializes things required in the transaction start*/
298 transrecord_t *transStart() {
299   transrecord_t *tmp;
300   if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){
301     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
302     return NULL;
303   }
304   tmp->cache = objstrCreate(1048576);
305   tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
306 #ifdef COMPILER
307   tmp->revertlist=NULL;
308 #endif
309   return tmp;
310 }
311
312 /* This function finds the location of the objects involved in a transaction
313  * and returns the pointer to the object if found in a remote location */
314 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
315   unsigned int machinenumber;
316   objheader_t *tmp, *objheader;
317   objheader_t *objcopy;
318   int size;
319   void *buf;
320   
321   if(oid == 0) {
322     return NULL;
323   }
324   
325   if((objheader = chashSearch(record->lookupTable, oid)) != NULL){
326 #ifdef TRANSSTATS
327     nchashSearch++;
328 #endif
329     /* Search local transaction cache */
330 #ifdef COMPILER
331     return &objheader[1];
332 #else
333     return objheader;
334 #endif
335   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
336 #ifdef TRANSSTATS
337     nmhashSearch++;
338 #endif
339     /* Look up in machine lookup table  and copy  into cache*/
340     GETSIZE(size, objheader);
341     size += sizeof(objheader_t);
342     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
343     memcpy(objcopy, objheader, size);
344     /* Insert into cache's lookup table */
345     STATUS(objcopy)=0;
346     chashInsert(record->lookupTable, OID(objheader), objcopy); 
347 #ifdef COMPILER
348     return &objcopy[1];
349 #else
350     return objcopy;
351 #endif
352   } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { 
353 #ifdef TRANSSTATS
354     nprehashSearch++;
355 #endif
356     /* Look up in prefetch cache */
357     GETSIZE(size, tmp);
358     size+=sizeof(objheader_t);
359     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
360     memcpy(objcopy, tmp, size);
361     /* Insert into cache's lookup table */
362     chashInsert(record->lookupTable, OID(tmp), objcopy); 
363 #ifdef COMPILER
364     return &objcopy[1];
365 #else
366     return objcopy;
367 #endif
368   } else {
369     /* Get the object from the remote location */
370     if((machinenumber = lhashSearch(oid)) == 0) {
371       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
372       return NULL;
373     }
374     objcopy = getRemoteObj(record, machinenumber, oid);
375     
376     if(objcopy == NULL) {
377       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
378       return NULL;
379     } else {
380 #ifdef TRANSSTATS
381     nRemoteSend++;
382 #endif
383       STATUS(objcopy)=0;      
384 #ifdef COMPILER
385       return &objcopy[1];
386 #else
387       return objcopy;
388 #endif
389     }
390   }
391 }
392
393 /* This function creates objects in the transaction record */
394 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
395   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
396   OID(tmp) = getNewOID();
397   tmp->version = 1;
398   tmp->rcount = 1;
399   STATUS(tmp) = NEW;
400   chashInsert(record->lookupTable, OID(tmp), tmp);
401   
402 #ifdef COMPILER
403   return &tmp[1]; //want space after object header
404 #else
405   return tmp;
406 #endif
407 }
408
409 /* This function creates machine piles based on all machines involved in a
410  * transaction commit request */
411 plistnode_t *createPiles(transrecord_t *record) {
412   int i;
413   plistnode_t *pile = NULL;
414   unsigned int machinenum;
415   objheader_t *headeraddr;
416   chashlistnode_t * ptr = record->lookupTable->table;
417   /* Represents number of bins in the chash table */
418   unsigned int size = record->lookupTable->size;
419   
420   for(i = 0; i < size ; i++) {
421     chashlistnode_t * curr = &ptr[i];
422     /* Inner loop to traverse the linked list of the cache lookupTable */
423     while(curr != NULL) {
424       //if the first bin in hash table is empty
425       if(curr->key == 0)
426         break;
427       
428       if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
429         printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
430         return NULL;
431       }
432       
433       //Get machine location for object id (and whether local or not)
434       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
435         machinenum = myIpAddr;
436       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
437         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
438         return NULL;
439       }
440       
441       //Make machine groups
442       pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
443       curr = curr->next;
444     }
445   }
446   return pile; 
447 }
448
449 /* This function initiates the transaction commit process
450  * Spawns threads for each of the new connections with Participants 
451  * and creates new piles by calling the createPiles(), 
452  * Sends a transrequest() to each remote machines for objects found remotely 
453  * and calls handleLocalReq() to process objects found locally */
454 int transCommit(transrecord_t *record) {        
455   unsigned int tot_bytes_mod, *listmid;
456   plistnode_t *pile, *pile_ptr;
457   int i, j, rc, val;
458   int pilecount, offset, threadnum, trecvcount;
459   char control;
460   char transid[TID_LEN];
461   trans_req_data_t *tosend;
462   trans_commit_data_t transinfo;
463   static int newtid = 0;
464   char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
465   thread_data_array_t *thread_data_array;
466   local_thread_data_array_t *ltdata;
467   int firsttime=1;
468
469   do { 
470     treplyctrl=0;
471     trecvcount = 0; 
472     threadnum = 0; 
473     treplyretry = 0;
474     thread_data_array = NULL;
475     ltdata = NULL;
476     
477     /* Look through all the objects in the transaction record and make piles 
478      * for each machine involved in the transaction*/
479     if (firsttime)
480       pile_ptr = pile = createPiles(record);
481     else
482       pile=pile_ptr;
483     firsttime=0;
484
485     /* Create the packet to be sent in TRANS_REQUEST */
486     
487     /* Count the number of participants */
488     pilecount = pCount(pile);
489     
490     /* Create a list of machine ids(Participants) involved in transaction       */
491     listmid = calloc(pilecount, sizeof(unsigned int));
492     pListMid(pile, listmid);
493     
494     
495     /* Initialize thread variables,
496      * Spawn a thread for each Participant involved in a transaction */
497     pthread_t thread[pilecount];
498     pthread_attr_t attr;                        
499     pthread_cond_t tcond;
500     pthread_mutex_t tlock;
501     pthread_mutex_t tlshrd;
502     
503     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
504     ltdata = calloc(1, sizeof(local_thread_data_array_t));
505     
506     thread_response_t rcvd_control_msg[pilecount];      /* Shared thread array that keeps track of responses of participants */
507     
508     /* Initialize and set thread detach attribute */
509     pthread_attr_init(&attr);
510     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
511     pthread_mutex_init(&tlock, NULL);
512     pthread_cond_init(&tcond, NULL);
513
514     /* Process each machine pile */
515     while(pile != NULL) {
516       //Create transaction id
517       newtid++;
518       tosend = calloc(1, sizeof(trans_req_data_t));
519       tosend->f.control = TRANS_REQUEST;
520       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
521       tosend->f.mcount = pilecount;
522       tosend->f.numread = pile->numread;
523       tosend->f.nummod = pile->nummod;
524       tosend->f.numcreated = pile->numcreated;
525       tosend->f.sum_bytes = pile->sum_bytes;
526       tosend->listmid = listmid;
527       tosend->objread = pile->objread;
528       tosend->oidmod = pile->oidmod;
529       tosend->oidcreated = pile->oidcreated;
530       thread_data_array[threadnum].thread_id = threadnum;
531       thread_data_array[threadnum].mid = pile->mid;
532       thread_data_array[threadnum].buffer = tosend;
533       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
534       thread_data_array[threadnum].threshold = &tcond;
535       thread_data_array[threadnum].lock = &tlock;
536       thread_data_array[threadnum].count = &trecvcount;
537       thread_data_array[threadnum].replyctrl = &treplyctrl;
538       thread_data_array[threadnum].replyretry = &treplyretry;
539       thread_data_array[threadnum].rec = record;
540       thread_data_array[threadnum].pilehead = pile_ptr;
541       /* If local do not create any extra connection */
542       if(pile->mid != myIpAddr) { /* Not local */
543         do {
544           rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
545         } while(rc!=0);
546         if(rc) {
547           perror("Error in pthread create\n");
548           pthread_cond_destroy(&tcond);
549           pthread_mutex_destroy(&tlock);
550           pDelete(pile_ptr);
551           free(listmid);
552           for (i = 0; i < threadnum; i++)
553             free(thread_data_array[i].buffer);
554           free(thread_data_array);
555           free(ltdata);
556           return 1;
557         }
558       } else { /*Local*/
559         ltdata->tdata = &thread_data_array[threadnum];
560         ltdata->transinfo = &transinfo;
561         do {
562           val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
563         } while(val!=0);
564         if(val) {
565           perror("Error in pthread create\n");
566           pthread_cond_destroy(&tcond);
567           pthread_mutex_destroy(&tlock);
568           pDelete(pile_ptr);
569           free(listmid);
570           for (i = 0; i < threadnum; i++)
571             free(thread_data_array[i].buffer);
572           free(thread_data_array);
573           free(ltdata);
574           return 1;
575         }
576       }
577       
578       threadnum++;              
579       pile = pile->next;
580     }
581     /* Free attribute and wait for the other threads */
582     pthread_attr_destroy(&attr);
583     
584     for (i = 0; i < threadnum; i++) {
585       rc = pthread_join(thread[i], NULL);
586       if(rc)
587         {
588           printf("Error: return code from pthread_join() is %d\n", rc);
589           pthread_cond_destroy(&tcond);
590           pthread_mutex_destroy(&tlock);
591           pDelete(pile_ptr);
592           free(listmid);
593           for (j = i; j < threadnum; j++) {
594             free(thread_data_array[j].buffer);
595           }
596           return 1;
597         }
598       free(thread_data_array[i].buffer);
599     }
600
601     /* Free resources */        
602     pthread_cond_destroy(&tcond);
603     pthread_mutex_destroy(&tlock);
604     free(listmid);
605
606     if (!treplyretry)
607       pDelete(pile_ptr);
608     
609     /* wait a random amount of time before retrying to commit transaction*/
610     if(treplyretry) {
611       free(thread_data_array);
612       free(ltdata);
613       randomdelay();
614     }
615     
616     /* Retry trans commit procedure during soft_abort case */
617   } while (treplyretry);
618   
619   if(treplyctrl == TRANS_ABORT) {
620 #ifdef TRANSSTATS
621     numTransAbort++;
622 #endif
623 #ifdef CHECKTB
624     char a[] = "Aborting";
625     TABORT1(a);
626 #endif
627     /* Free Resources */
628     objstrDelete(record->cache);
629     chashDelete(record->lookupTable);
630     free(record);
631     free(thread_data_array);
632     free(ltdata);
633     return TRANS_ABORT;
634   } else if(treplyctrl == TRANS_COMMIT) {
635 #ifdef TRANSSTATS
636     numTransCommit++;
637 #endif
638 #ifdef CHECKTB
639     char a[] = "Commiting";
640     TABORT1(a);
641 #endif
642     /* Free Resources */
643     objstrDelete(record->cache);
644     chashDelete(record->lookupTable);
645     free(record);
646     free(thread_data_array);
647     free(ltdata);
648     return 0;
649   } else {
650     //TODO Add other cases
651     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
652     exit(-1);
653   }
654   return 0;
655 }
656
657 /* This function sends information involved in the transaction request 
658  * to participants and accepts a response from particpants.
659  * It calls decideresponse() to decide on what control message 
660  * to send next to participants and sends the message using sendResponse()*/
661 void *transRequest(void *threadarg) {
662   int sd, i, n;
663   struct sockaddr_in serv_addr;
664   thread_data_array_t *tdata;
665   objheader_t *headeraddr;
666   char control, recvcontrol;
667   char machineip[16], retval;
668
669   tdata = (thread_data_array_t *) threadarg;
670
671   if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
672     printf("transRequest(): socket create error\n");
673     pthread_exit(NULL);
674   }
675
676   /* Send bytes of data with TRANS_REQUEST control message */
677   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
678
679   /* Send list of machines involved in the transaction */
680   {
681     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
682     send_data(sd, tdata->buffer->listmid, size);
683   }
684   
685   /* Send oids and version number tuples for objects that are read */
686   {
687     int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
688     send_data(sd, tdata->buffer->objread, size);
689   }
690   
691   /* Send objects that are modified */
692   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
693     int size;
694     headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
695     GETSIZE(size,headeraddr);
696     size+=sizeof(objheader_t);
697     send_data(sd, headeraddr, size);
698   }
699   
700   /* Read control message from Participant */
701   recv_data(sd, &control, sizeof(char));
702   /* Recv Objects if participant sends TRANS_DISAGREE */
703   if(control == TRANS_DISAGREE) {
704     int length;
705     recv_data(sd, &length, sizeof(int));
706     void *newAddr;
707     pthread_mutex_lock(&prefetchcache_mutex);
708     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
709       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
710       pthread_exit(NULL);
711     }
712     pthread_mutex_unlock(&prefetchcache_mutex);
713     recv_data(sd, newAddr, length);
714     int offset = 0;
715     while(length != 0) {
716       unsigned int oidToPrefetch;
717       objheader_t * header;
718       header = (objheader_t *) (((char *)newAddr) + offset);
719       oidToPrefetch = OID(header);
720 #ifdef CHECKTA
721       char a[] = "object type";
722       TABORT8(__func__, a, TYPE(header));
723 #endif
724       int size = 0;
725       GETSIZE(size, header);
726       size += sizeof(objheader_t);
727       //make an entry in prefetch hash table
728       void *oldptr;
729       if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
730         prehashRemove(oidToPrefetch);
731         prehashInsert(oidToPrefetch, header);
732       } else {
733         prehashInsert(oidToPrefetch, header);
734       }
735       length = length - size;
736       offset += size;
737     }
738   }
739
740   recvcontrol = control;
741 #ifdef CHECKTA
742   char a[] = "mid";
743   char c[] = "status";
744   TABORT5(__func__, a, c, tdata->mid, control);
745 #endif
746   /* Update common data structure and increment count */
747   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
748   
749   /* Lock and update count */
750   /* Thread sleeps until all messages from pariticipants are received by coordinator */
751   pthread_mutex_lock(tdata->lock);
752   
753   (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
754   
755   /* Wake up the threads and invoke decideResponse (once) */
756   if(*(tdata->count) == tdata->buffer->f.mcount) {
757     decideResponse(tdata); 
758     pthread_cond_broadcast(tdata->threshold);
759   } else {
760     pthread_cond_wait(tdata->threshold, tdata->lock);
761   }
762   pthread_mutex_unlock(tdata->lock);
763
764   /* Invalidate objects in other machine cache */
765   if(*(tdata->replyctrl) == TRANS_COMMIT) {
766     if(tdata->buffer->f.nummod > 0) {
767       if((retval = invalidateObj(tdata)) != 0) {
768         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
769         return;
770       }
771     }
772   }
773   
774
775   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
776    * to all participants in their respective socket */
777   if (sendResponse(tdata, sd) == 0) { 
778     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
779     pthread_exit(NULL);
780   }
781   
782   recv_data((int)sd, &control, sizeof(char)); 
783   
784   if(control == TRANS_UNSUCESSFUL) {
785     //printf("DEBUG-> TRANS_ABORTED\n");
786   } else if(control == TRANS_SUCESSFUL) {
787     //printf("DEBUG-> TRANS_SUCCESSFUL\n");
788   } else {
789     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
790   }
791
792   pthread_exit(NULL);
793 }
794
795 /* This function decides the reponse that needs to be sent to 
796  * all Participant machines after the TRANS_REQUEST protocol */
797 void decideResponse(thread_data_array_t *tdata) {
798   char control;
799   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
800                                                                    message to send */
801   
802   for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
803     control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
804                                                written onto the shared array */
805     switch(control) {
806     default:
807       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
808       /* treat as disagree, pass thru */
809     case TRANS_DISAGREE:
810       transdisagree++;
811       break;
812       
813     case TRANS_AGREE:
814       transagree++;
815       break;
816       
817     case TRANS_SOFT_ABORT:
818       transsoftabort++;
819       break;
820     }
821   }
822   
823   if(transdisagree > 0) {
824     /* Send Abort */
825     *(tdata->replyctrl) = TRANS_ABORT;
826     *(tdata->replyretry) = 0;
827     /* clear objects from prefetch cache */
828     cleanPCache(tdata);
829   } else if(transagree == tdata->buffer->f.mcount){
830     /* Send Commit */
831     *(tdata->replyctrl) = TRANS_COMMIT;
832     *(tdata->replyretry) = 0;
833     int retval;
834     /* Update prefetch cache */
835     if((retval = updatePrefetchCache(tdata)) != 0) {
836       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
837       return;
838     }
839   } else { 
840     /* Send Abort in soft abort case followed by retry commiting transaction again*/
841     *(tdata->replyctrl) = TRANS_ABORT;
842     *(tdata->replyretry) = 1;
843   }
844   return;
845 }
846
847 /* This function sends the final response to remote machines per
848  * thread in their respective socket id It returns a char that is only
849  * needed to check the correctness of execution of this function
850  * inside transRequest()*/
851
852 char sendResponse(thread_data_array_t *tdata, int sd) {
853   int n, size, sum, oidcount = 0, control;
854   char *ptr, retval = 0;
855   unsigned int *oidnotfound;
856   
857   control = *(tdata->replyctrl);
858   send_data(sd, &control, sizeof(char));
859   
860   //TODO read missing objects during object migration
861   /* If response is a soft abort due to missing objects at the
862      Participant's side */
863   
864   /* If the decided response is TRANS_ABORT */
865   if(*(tdata->replyctrl) == TRANS_ABORT) {
866     retval = TRANS_ABORT;
867   } else if(*(tdata->replyctrl) == TRANS_COMMIT) { 
868     /* If the decided response is TRANS_COMMIT */ 
869     retval = TRANS_COMMIT;
870   }
871   
872   return retval;
873 }
874
875 /* This function opens a connection, places an object read request to
876  * the remote machine, reads the control message and object if
877  * available and copies the object and its header to the local
878  * cache. */ 
879
880 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
881   int size, val;
882   struct sockaddr_in serv_addr;
883   char machineip[16];
884   char control;
885   objheader_t *h;
886   void *objcopy = NULL;
887   
888   int sd = getSock2(transReadSockPool, mnum);
889   char readrequest[sizeof(char)+sizeof(unsigned int)];
890   readrequest[0] = READ_REQUEST;
891   *((unsigned int *)(&readrequest[1])) = oid;
892   send_data(sd, readrequest, sizeof(readrequest));
893   
894   /* Read response from the Participant */
895   recv_data(sd, &control, sizeof(char));
896   
897   if (control==OBJECT_NOT_FOUND) {
898     objcopy = NULL;
899   } else {
900     /* Read object if found into local cache */
901     recv_data(sd, &size, sizeof(int));
902     objcopy = objstrAlloc(record->cache, size);
903     recv_data(sd, objcopy, size);
904     /* Insert into cache's lookup table */
905     chashInsert(record->lookupTable, oid, objcopy); 
906   }
907   
908   return objcopy;
909 }
910
911 /* This function handles the local objects involved in a transaction
912  * commiting process.  It also makes a decision if this local machine
913  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator.  Note
914  * Coordinator = local machine It wakes up the other threads from
915  * remote participants that are waiting for the coordinator's decision
916  * and based on common agreement it either commits or aborts the
917  * transaction.  It also frees the memory resources */
918
919 void *handleLocalReq(void *threadarg) {
920   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
921   local_thread_data_array_t *localtdata;
922   int numoidnotfound = 0, numoidlocked = 0;
923   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
924   int numread, i;
925   unsigned int oid;
926   unsigned short version;
927   void *mobj;
928   objheader_t *headptr;
929   
930   localtdata = (local_thread_data_array_t *) threadarg;
931   
932   /* Counters and arrays to formulate decision on control message to be sent */
933   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
934   oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
935   
936   numread = localtdata->tdata->buffer->f.numread;
937   /* Process each oid in the machine pile/ group per thread */
938   for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
939     if (i < localtdata->tdata->buffer->f.numread) {
940       int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
941       incr *= i;
942       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
943       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
944     } else { // Objects Modified
945       int tmpsize;
946       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
947       if (headptr == NULL) {
948         printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
949         return NULL;
950       }
951       oid = OID(headptr);
952       version = headptr->version;
953     }
954     /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
955     
956     /* Save the oids not found and number of oids not found for later use */
957     if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
958       /* Save the oids not found and number of oids not found for later use */
959       oidnotfound[numoidnotfound] = oid;
960       numoidnotfound++;
961     } else { /* If Obj found in machine (i.e. has not moved) */
962       /* Check if Obj is locked by any previous transaction */
963       if (test_and_set(STATUSPTR(mobj))) {
964         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
965           v_matchlock++;
966         } else {/* If versions don't match ...HARD ABORT */
967           v_nomatch++;
968           /* Send TRANS_DISAGREE to Coordinator */
969           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
970 #ifdef CHECKTA
971   char a[] = "mid";
972   char b[] = "version mismatch";
973   char c[] = "object type";
974   TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
975 #endif
976           break;
977         }
978       } else {
979         //we're locked
980         /* Save all object oids that are locked on this machine during this transaction request call */
981         oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
982         numoidlocked++;
983         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
984           v_matchnolock++;
985         } else { /* If versions don't match ...HARD ABORT */
986           v_nomatch++;
987           /* Send TRANS_DISAGREE to Coordinator */
988           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
989 #ifdef CHECKTA
990   char a[] = "mid";
991   char b[] = "version mismatch";
992   char c[] = "object type";
993   TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
994 #endif
995           break;
996         }
997       }
998     }
999   } // End for
1000   /* Condition to send TRANS_AGREE */
1001   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
1002     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
1003   }
1004   /* Condition to send TRANS_SOFT_ABORT */
1005   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1006 #ifdef CHECKTA
1007   char a[] = "mid";
1008   char b[] = "version mismatch";
1009   char c[] = "object type";
1010   TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
1011 #endif
1012     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
1013   }
1014   
1015   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1016    * if Participant receives a TRANS_COMMIT */
1017   localtdata->transinfo->objlocked = oidlocked;
1018   localtdata->transinfo->objnotfound = oidnotfound;
1019   localtdata->transinfo->modptr = NULL;
1020   localtdata->transinfo->numlocked = numoidlocked;
1021   localtdata->transinfo->numnotfound = numoidnotfound;
1022   /* Lock and update count */
1023   //Thread sleeps until all messages from pariticipants are received by coordinator
1024   pthread_mutex_lock(localtdata->tdata->lock);
1025   (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
1026   
1027   /* Wake up the threads and invoke decideResponse (once) */
1028   if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
1029     decideResponse(localtdata->tdata); 
1030     pthread_cond_broadcast(localtdata->tdata->threshold);
1031   } else {
1032     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
1033   }
1034   pthread_mutex_unlock(localtdata->tdata->lock);
1035   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
1036     if(transAbortProcess(localtdata) != 0) {
1037       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1038       pthread_exit(NULL);
1039     }
1040   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
1041     if(transComProcess(localtdata) != 0) {
1042       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1043       pthread_exit(NULL);
1044     }
1045   }
1046   /* Free memory */
1047   if (localtdata->transinfo->objlocked != NULL) {
1048     free(localtdata->transinfo->objlocked);
1049   }
1050   if (localtdata->transinfo->objnotfound != NULL) {
1051     free(localtdata->transinfo->objnotfound);
1052   }
1053   
1054   pthread_exit(NULL);
1055 }
1056
1057 /* This function completes the ABORT process if the transaction is aborting */
1058 int transAbortProcess(local_thread_data_array_t  *localtdata) {
1059   int i, numlocked;
1060   unsigned int *objlocked;
1061   void *header;
1062   
1063   numlocked = localtdata->transinfo->numlocked;
1064   objlocked = localtdata->transinfo->objlocked;
1065   
1066   for (i = 0; i < numlocked; i++) {
1067     if((header = mhashSearch(objlocked[i])) == NULL) {
1068       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1069       return 1;
1070     }
1071     UnLock(STATUSPTR(header));
1072   }
1073
1074   return 0;
1075 }
1076
1077 /*This function completes the COMMIT process if the transaction is commiting*/
1078 int transComProcess(local_thread_data_array_t  *localtdata) {
1079   objheader_t *header, *tcptr;
1080   int i, nummod, tmpsize, numcreated, numlocked;
1081   unsigned int *oidmod, *oidcreated, *oidlocked;
1082   void *ptrcreate;
1083   
1084   nummod = localtdata->tdata->buffer->f.nummod;
1085   oidmod = localtdata->tdata->buffer->oidmod;
1086   numcreated = localtdata->tdata->buffer->f.numcreated;
1087   oidcreated = localtdata->tdata->buffer->oidcreated;
1088   numlocked = localtdata->transinfo->numlocked;
1089   oidlocked = localtdata->transinfo->objlocked;
1090   
1091   for (i = 0; i < nummod; i++) {
1092     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1093       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1094       return 1;
1095     }
1096     /* Copy from transaction cache -> main object store */
1097     if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
1098       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1099       return 1;
1100     }
1101     GETSIZE(tmpsize, header);
1102     char *tmptcptr = (char *) tcptr;
1103     memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
1104     header->version += 1;
1105     if(header->notifylist != NULL) {
1106       notifyAll(&header->notifylist, OID(header), header->version);
1107     }
1108   }
1109   /* If object is newly created inside transaction then commit it */
1110   for (i = 0; i < numcreated; i++) {
1111     if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
1112       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1113       return 1;
1114     }
1115     GETSIZE(tmpsize, header);
1116     tmpsize += sizeof(objheader_t);
1117     pthread_mutex_lock(&mainobjstore_mutex);
1118     if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1119       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1120       pthread_mutex_unlock(&mainobjstore_mutex);
1121       return 1;
1122     }
1123     pthread_mutex_unlock(&mainobjstore_mutex);
1124     memcpy(ptrcreate, header, tmpsize);
1125     mhashInsert(oidcreated[i], ptrcreate);
1126     lhashInsert(oidcreated[i], myIpAddr);
1127   }
1128   /* Unlock locked objects */
1129   for(i = 0; i < numlocked; i++) {
1130     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1131       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1132       return 1;
1133     }
1134     UnLock(STATUSPTR(header));
1135   }
1136   
1137   return 0;
1138 }
1139
1140 prefetchpile_t *foundLocal(char *ptr) {
1141   int siteid = *(GET_SITEID(ptr));
1142   int ntuples = *(GET_NTUPLES(ptr));
1143   unsigned int * oidarray = GET_PTR_OID(ptr);
1144   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1145   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1146   prefetchpile_t * head=NULL;
1147   int numLocal = 0;
1148   
1149   int i;
1150   for(i=0;i<ntuples; i++) {
1151     unsigned short baseindex=(i==0)?0:endoffsets[i-1];
1152     unsigned short endindex=endoffsets[i];
1153     unsigned int oid=oidarray[i];
1154     int newbase;
1155     int machinenum;
1156     if (oid==0)
1157       continue;
1158     //Look up fields locally
1159     for(newbase=baseindex;newbase<endindex;newbase++) {
1160       if (!lookupObject(&oid, arryfields[newbase]))
1161         break;
1162       //Ended in a null pointer...
1163       if (oid==0)
1164         goto tuple;
1165     }
1166     //Entire prefetch is local
1167     if (newbase==endindex&&checkoid(oid)){
1168       numLocal++;
1169       goto tuple;
1170     }
1171     //Add to remote requests
1172     machinenum=lhashSearch(oid);
1173     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1174   tuple:
1175     ;
1176   }
1177
1178   /* handle dynamic prefetching */
1179   handleDynPrefetching(numLocal, ntuples, siteid);
1180   return head;
1181 }
1182
1183 int checkoid(unsigned int oid) {
1184   objheader_t *header;
1185   if ((header=mhashSearch(oid))!=NULL) {
1186     //Found on machine
1187     return 1;
1188   } else if ((header=prehashSearch(oid))!=NULL) {
1189     //Found in cache
1190     return 1;
1191   } else {
1192     return 0;
1193   }
1194 }
1195
1196 int lookupObject(unsigned int * oid, short offset) {
1197   objheader_t *header;
1198   if ((header=mhashSearch(*oid))!=NULL) {
1199     //Found on machine
1200     ;
1201   } else if ((header=prehashSearch(*oid))!=NULL) {
1202     //Found in cache
1203     ;
1204   } else {
1205     return 0;
1206   }
1207
1208   if(TYPE(header) > NUMCLASSES) {
1209     int elementsize = classsize[TYPE(header)];
1210     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1211     int length = ao->___length___;
1212     /* Check if array out of bounds */
1213     if(offset < 0 || offset >= length) {
1214       //if yes treat the object as found
1215       (*oid)=0;
1216       return 1;
1217     }
1218     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1219     return 1;
1220   } else {
1221     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1222     return 1;
1223   }
1224 }
1225
1226
1227 /* This function is called by the thread calling transPrefetch */
1228 void *transPrefetch(void *t) {
1229   while(1) {
1230     /* lock mutex of primary prefetch queue */
1231     void *node=gettail();
1232     /* Check if the tuples are found locally, if yes then reduce them further*/ 
1233     /* and group requests by remote machine ids by calling the makePreGroups() */
1234     prefetchpile_t *pilehead = foundLocal(node);
1235
1236     if (pilehead!=NULL) {
1237       // Get sock from shared pool 
1238       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1239       
1240       /* Send  Prefetch Request */
1241       prefetchpile_t *ptr = pilehead;
1242       while(ptr != NULL) {
1243         sendPrefetchReq(ptr, sd);
1244         ptr = ptr->next; 
1245       }
1246       
1247       /* Release socket */
1248       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1249       
1250       /* Deallocated pilehead */
1251       mcdealloc(pilehead);
1252     }
1253     // Deallocate the prefetch queue pile node
1254     inctail();
1255   }
1256 }
1257
1258 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1259   objpile_t *tmp;
1260   
1261   int size=sizeof(char)+sizeof(int);
1262   for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
1263     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1264   }
1265   
1266   char buft[size];
1267   char *buf=buft;
1268   *buf=TRANS_PREFETCH;
1269   buf+=sizeof(char);
1270   
1271   for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
1272     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1273     *((int*)buf)=len;
1274     buf+=sizeof(int);
1275     *((unsigned int *)buf)=tmp->oid;
1276     buf+=sizeof(unsigned int);
1277     *((unsigned int *)(buf)) = myIpAddr; 
1278     buf+=sizeof(unsigned int);
1279     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1280     buf+=tmp->numoffset*sizeof(short);
1281   }
1282   *((int *)buf)=-1;
1283   send_data(sd, buft, size);
1284   return;
1285 }
1286
1287 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1288   int len, endpair;
1289   char control;
1290   objpile_t *tmp;
1291   
1292   /* Send TRANS_PREFETCH control message */
1293   control = TRANS_PREFETCH;
1294   send_data(sd, &control, sizeof(char));
1295   
1296   /* Send Oids and offsets in pairs */
1297   tmp = mcpilenode->objpiles;
1298   while(tmp != NULL) {
1299     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1300     char oidnoffset[len];
1301     char *buf=oidnoffset;
1302     *((int*)buf) = tmp->numoffset;
1303     buf+=sizeof(int);
1304     *((unsigned int *)buf) = tmp->oid;
1305     buf+=sizeof(unsigned int);
1306     *((unsigned int *)buf) = myIpAddr; 
1307     buf += sizeof(unsigned int);
1308     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1309     send_data(sd, oidnoffset, len);
1310     tmp = tmp->next;
1311   }
1312   
1313   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1314   endpair = -1;
1315   send_data(sd, &endpair, sizeof(int));
1316   
1317   return;
1318 }
1319
1320 int getPrefetchResponse(int sd) {
1321   int length = 0, size = 0;
1322   char control;
1323   unsigned int oid;
1324   void *modptr, *oldptr;
1325   
1326   recv_data((int)sd, &length, sizeof(int)); 
1327   size = length - sizeof(int);
1328   char recvbuffer[size];
1329
1330   recv_data((int)sd, recvbuffer, size);
1331   control = *((char *) recvbuffer);
1332   if(control == OBJECT_FOUND) {
1333     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1334     size = size - (sizeof(char) + sizeof(unsigned int));
1335     pthread_mutex_lock(&prefetchcache_mutex);
1336     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1337       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1338       pthread_mutex_unlock(&prefetchcache_mutex);
1339       return -1;
1340     }
1341     pthread_mutex_unlock(&prefetchcache_mutex);
1342     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1343     STATUS(modptr)=0;
1344
1345     /* Insert the oid and its address into the prefetch hash lookup table */
1346     /* Do a version comparison if the oid exists */
1347     if((oldptr = prehashSearch(oid)) != NULL) {
1348       /* If older version then update with new object ptr */
1349       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1350         prehashRemove(oid);
1351         prehashInsert(oid, modptr);
1352       }
1353     } else {/* Else add the object ptr to hash table*/
1354       prehashInsert(oid, modptr);
1355     }
1356     /* Lock the Prefetch Cache look up table*/
1357     pthread_mutex_lock(&pflookup.lock);
1358     /* Broadcast signal on prefetch cache condition variable */ 
1359     pthread_cond_broadcast(&pflookup.cond);
1360     /* Unlock the Prefetch Cache look up table*/
1361     pthread_mutex_unlock(&pflookup.lock);
1362   } else if(control == OBJECT_NOT_FOUND) {
1363     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1364     /* TODO: For each object not found query DHT for new location and retrieve the object */
1365     /* Throw an error */
1366     printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1367     //    exit(-1);
1368   } else {
1369     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1370   }
1371   
1372   return 0;
1373 }
1374
1375 unsigned short getObjType(unsigned int oid) {
1376   objheader_t *objheader;
1377   unsigned short numoffset[] ={0};
1378   short fieldoffset[] ={};
1379   
1380   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1381       if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1382         prefetch(0, 1, &oid, numoffset, fieldoffset);
1383         pthread_mutex_lock(&pflookup.lock);
1384         while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1385           pthread_cond_wait(&pflookup.cond, &pflookup.lock);
1386         }
1387         pthread_mutex_unlock(&pflookup.lock);
1388       }
1389   }
1390   
1391   return TYPE(objheader);
1392 }
1393
1394 int startRemoteThread(unsigned int oid, unsigned int mid)
1395 {
1396         int sock;
1397         struct sockaddr_in remoteAddr;
1398         char msg[1 + sizeof(unsigned int)];
1399         int bytesSent;
1400         int status;
1401
1402         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1403         {
1404                 perror("startRemoteThread():socket()");
1405                 return -1;
1406         }
1407
1408         bzero(&remoteAddr, sizeof(remoteAddr));
1409         remoteAddr.sin_family = AF_INET;
1410         remoteAddr.sin_port = htons(LISTEN_PORT);
1411         remoteAddr.sin_addr.s_addr = htonl(mid);
1412         
1413         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1414         {
1415                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1416                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1417                 status = -1;
1418         }
1419         else
1420         {
1421                 msg[0] = START_REMOTE_THREAD;
1422         *((unsigned int *) &msg[1]) = oid;
1423                 send_data(sock, msg, 1 + sizeof(unsigned int));
1424         }
1425
1426         close(sock);
1427         return status;
1428 }
1429
1430 //TODO: when reusing oids, make sure they are not already in use!
1431 static unsigned int id = 0xFFFFFFFF;
1432 unsigned int getNewOID(void) {
1433         id += 2;
1434         if (id > oidMax || id < oidMin)
1435         {
1436                 id = (oidMin | 1);
1437         }
1438         return id;
1439 }
1440
1441 int processConfigFile()
1442 {
1443         FILE *configFile;
1444         const int maxLineLength = 200;
1445         char lineBuffer[maxLineLength];
1446         char *token;
1447         const char *delimiters = " \t\n";
1448         char *commentBegin;
1449         in_addr_t tmpAddr;
1450         
1451         configFile = fopen(CONFIG_FILENAME, "r");
1452         if (configFile == NULL)
1453         {
1454                 printf("error opening %s:\n", CONFIG_FILENAME);
1455                 perror("");
1456                 return -1;
1457         }
1458
1459         numHostsInSystem = 0;
1460         sizeOfHostArray = 8;
1461         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1462         
1463         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1464         {
1465                 commentBegin = strchr(lineBuffer, '#');
1466                 if (commentBegin != NULL)
1467                         *commentBegin = '\0';
1468                 token = strtok(lineBuffer, delimiters);
1469                 while (token != NULL)
1470                 {
1471                         tmpAddr = inet_addr(token);
1472                         if ((int)tmpAddr == -1)
1473                         {
1474                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1475                                 fclose(configFile);
1476                                 return -1;
1477                         }
1478                         else
1479                                 addHost(htonl(tmpAddr));
1480                         token = strtok(NULL, delimiters);
1481                 }
1482         }
1483
1484         fclose(configFile);
1485         
1486         if (numHostsInSystem < 1)
1487         {
1488                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1489                 return -1;
1490         }
1491 #ifdef MAC
1492         myIpAddr = getMyIpAddr("en1");
1493 #else
1494         myIpAddr = getMyIpAddr("eth0");
1495 #endif
1496
1497 #ifdef CHECKTA
1498     printf("My ip address = %x", myIpAddr);
1499 #endif
1500         myIndexInHostArray = findHost(myIpAddr);
1501         if (myIndexInHostArray == -1)
1502         {
1503                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1504                 return -1;
1505         }
1506         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1507         oidMin = oidsPerBlock * myIndexInHostArray;
1508         if (myIndexInHostArray == numHostsInSystem - 1)
1509                 oidMax = 0xFFFFFFFF;
1510         else
1511                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1512
1513         return 0;
1514 }
1515
1516 void addHost(unsigned int hostIp)
1517 {
1518         unsigned int *tmpArray;
1519
1520         if (findHost(hostIp) != -1)
1521                 return;
1522
1523         if (numHostsInSystem == sizeOfHostArray)
1524         {
1525                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1526                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1527                 free(hostIpAddrs);
1528                 hostIpAddrs = tmpArray;
1529         }
1530
1531         hostIpAddrs[numHostsInSystem++] = hostIp;
1532
1533         return;
1534 }
1535
1536 int findHost(unsigned int hostIp)
1537 {
1538         int i;
1539         for (i = 0; i < numHostsInSystem; i++)
1540                 if (hostIpAddrs[i] == hostIp)
1541                         return i;
1542
1543         //not found
1544         return -1;
1545 }
1546
1547 /* This function sends notification request per thread waiting on object(s) whose version 
1548  * changes */
1549 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1550   int sock,i;
1551   objheader_t *objheader;
1552   struct sockaddr_in remoteAddr;
1553   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1554   char *ptr;
1555   int bytesSent;
1556   int status, size;
1557   unsigned short version;
1558   unsigned int oid,mid;
1559   static unsigned int threadid = 0;
1560   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1561   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1562   notifydata_t *ndata;
1563   
1564   oid = oidarry[0];
1565   if((mid = lhashSearch(oid)) == 0) {
1566     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1567     return;
1568   }
1569   
1570   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1571     perror("reqNotify():socket()");
1572     return -1;
1573   }
1574   
1575   bzero(&remoteAddr, sizeof(remoteAddr));
1576   remoteAddr.sin_family = AF_INET;
1577   remoteAddr.sin_port = htons(LISTEN_PORT);
1578   remoteAddr.sin_addr.s_addr = htonl(mid);
1579   
1580   /* Generate unique threadid */
1581   threadid++;
1582   
1583   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1584   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1585     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1586     return -1;
1587   }
1588   ndata->numoid = numoid;
1589   ndata->threadid = threadid;
1590   ndata->oidarry = oidarry;
1591   ndata->versionarry = versionarry;
1592   ndata->threadcond = threadcond;
1593   ndata->threadnotify = threadnotify;
1594   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1595     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1596     free(ndata);
1597     return -1; 
1598   }
1599   
1600   /* Send  number of oids, oidarry, version array, machine id and threadid */   
1601   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1602     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1603            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1604     free(ndata);
1605     return -1;
1606   } else {
1607     msg[0] = THREAD_NOTIFY_REQUEST;
1608     *((unsigned int *)(&msg[1])) = numoid;
1609     /* Send array of oids  */
1610     size = sizeof(unsigned int);
1611     {
1612       i = 0;
1613       while(i < numoid) {
1614         oid = oidarry[i];
1615         *((unsigned int *)(&msg[1] + size)) = oid;
1616         size += sizeof(unsigned int);
1617         i++;
1618       }
1619     }
1620     
1621     /* Send array of version  */
1622     {
1623       i = 0;
1624       while(i < numoid) {
1625         version = versionarry[i];
1626         *((unsigned short *)(&msg[1] + size)) = version;
1627         size += sizeof(unsigned short);
1628         i++;
1629       }
1630     }
1631     
1632     *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1633     size += sizeof(unsigned int);
1634     *((unsigned int *)(&msg[1] + size)) = threadid;
1635     pthread_mutex_lock(&(ndata->threadnotify));
1636     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1637     send_data(sock, msg, size);
1638     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1639     pthread_mutex_unlock(&(ndata->threadnotify));
1640   }
1641   
1642   pthread_cond_destroy(&threadcond);
1643   pthread_mutex_destroy(&threadnotify);
1644   free(ndata);
1645   close(sock);
1646   return status;
1647 }
1648
1649 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1650         notifydata_t *ndata;
1651         int i, objIsFound = 0, index;
1652         void *ptr;
1653
1654         //Look up the tid and call the corresponding pthread_cond_signal
1655         if((ndata = notifyhashSearch(tid)) == NULL) {
1656                 printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1657                 return;
1658         } else  {
1659                 for(i = 0; i < ndata->numoid; i++) {
1660                         if(ndata->oidarry[i] == oid){
1661                                 objIsFound = 1;
1662                                 index = i;
1663                         }
1664                 }
1665                 if(objIsFound == 0){
1666                         printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1667                         return;
1668                 } else {
1669                         if(version <= ndata->versionarry[index]){
1670                                 printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
1671                                 return;
1672                         } else {
1673                                 /* Clear from prefetch cache and free thread related data structure */
1674                                 if((ptr = prehashSearch(oid)) != NULL) {
1675                                         prehashRemove(oid);
1676                                 }
1677                                 pthread_cond_signal(&(ndata->threadcond));
1678                         }
1679                 }
1680         }
1681         return;
1682 }
1683
1684 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1685   threadlist_t *ptr;
1686   unsigned int mid;
1687   struct sockaddr_in remoteAddr;
1688   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1689   int sock, status, size, bytesSent;
1690   
1691   while(*head != NULL) {
1692     ptr = *head;
1693     mid = ptr->mid; 
1694     //create a socket connection to that machine
1695     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1696       perror("notifyAll():socket()");
1697       return -1;
1698     }
1699     
1700     bzero(&remoteAddr, sizeof(remoteAddr));
1701     remoteAddr.sin_family = AF_INET;
1702     remoteAddr.sin_port = htons(LISTEN_PORT);
1703     remoteAddr.sin_addr.s_addr = htonl(mid);
1704     //send Thread Notify response and threadid to that machine
1705     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1706       printf("notifyAll():error %d connecting to %s:%d\n", errno,
1707              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1708       status = -1;
1709       fflush(stdout);
1710     } else {
1711       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1712       msg[0] = THREAD_NOTIFY_RESPONSE;
1713       *((unsigned int *)&msg[1]) = oid;
1714       size = sizeof(unsigned int);
1715       *((unsigned short *)(&msg[1]+ size)) = version;
1716       size+= sizeof(unsigned short);
1717       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1718       
1719       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1720       send_data(sock, msg, size);
1721     }
1722     //close socket
1723     close(sock);
1724     // Update head
1725     *head = ptr->next;
1726     free(ptr);
1727   }
1728   return status;
1729 }
1730
1731 void transAbort(transrecord_t *trans) {
1732   objstrDelete(trans->cache);
1733   chashDelete(trans->lookupTable);
1734   free(trans);
1735 }
1736
1737 /* This function inserts necessary information into 
1738  * a machine pile data structure */
1739 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
1740   plistnode_t *ptr, *tmp;
1741   int found = 0, offset = 0;
1742   
1743   tmp = pile;
1744   //Add oid into a machine that is already present in the pile linked list structure
1745   while(tmp != NULL) {
1746     if (tmp->mid == mid) {
1747       int tmpsize;
1748       
1749       if (STATUS(headeraddr) & NEW) {
1750         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
1751         tmp->numcreated++;
1752         GETSIZE(tmpsize, headeraddr);
1753         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1754       }else if (STATUS(headeraddr) & DIRTY) {
1755         tmp->oidmod[tmp->nummod] = OID(headeraddr);
1756         tmp->nummod++;
1757         GETSIZE(tmpsize, headeraddr);
1758         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1759       } else {
1760         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
1761         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
1762         offset += sizeof(unsigned int);
1763         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
1764         tmp->numread ++;
1765       }
1766       found = 1;
1767       break;
1768     }
1769     tmp = tmp->next;
1770   }
1771   //Add oid for any new machine 
1772   if (!found) {
1773     int tmpsize;
1774     if((ptr = pCreate(num_objs)) == NULL) {
1775       return NULL;
1776     }
1777     ptr->mid = mid;
1778     if (STATUS(headeraddr) & NEW) {
1779       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
1780       ptr->numcreated ++;
1781       GETSIZE(tmpsize, headeraddr);
1782       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1783     } else if (STATUS(headeraddr) & DIRTY) {
1784       ptr->oidmod[ptr->nummod] = OID(headeraddr);
1785       ptr->nummod ++;
1786       GETSIZE(tmpsize, headeraddr);
1787       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1788     } else {
1789       *((unsigned int *)ptr->objread)=OID(headeraddr);
1790       offset = sizeof(unsigned int);
1791       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
1792       ptr->numread ++;
1793     }
1794     ptr->next = pile;
1795     pile = ptr;
1796   }
1797   
1798   /* Clear Flags */
1799   STATUS(headeraddr) =0;
1800   
1801   return pile;
1802 }