various changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "machinepile.h"
5 #include "mlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "prelookup.h"
9 #include "threadnotify.h"
10 #include "queue.h"
11 #ifdef COMPILER
12 #include "thread.h"
13 #endif
14
15 #define NUM_THREADS 1
16 #define PREFETCH_CACHE_SIZE 1048576 //1MB
17 #define CONFIG_FILENAME "dstm.conf"
18
19 /* Global Variables */
20 extern int classsize[];
21 extern primarypfq_t pqueue; //Shared prefetch queue
22 objstr_t *prefetchcache; //Global Prefetch cache
23 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
24 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
25 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
26 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
27 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
28 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
29 extern objstr_t *mainobjstore;
30 unsigned int myIpAddr;
31 unsigned int *hostIpAddrs;
32 int sizeOfHostArray;
33 int numHostsInSystem;
34 int myIndexInHostArray;
35 unsigned int oidsPerBlock;
36 unsigned int oidMin;
37 unsigned int oidMax;
38
39 sockPoolHashTable_t *transReadSockPool;
40 sockPoolHashTable_t *transPrefetchSockPool;
41
42 void printhex(unsigned char *, int);
43 plistnode_t *createPiles(transrecord_t *);
44
45 /*******************************
46  * Send and Recv function calls 
47  *******************************/
48 void send_data(int fd , void *buf, int buflen) {
49   char *buffer = (char *)(buf); 
50   int size = buflen;
51   int numbytes; 
52   while (size > 0) {
53     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
54     if (numbytes == -1) {
55       perror("send");
56       exit(-1);
57     }
58     buffer += numbytes;
59     size -= numbytes;
60   }
61 }
62
63 void recv_data(int fd , void *buf, int buflen) {
64   char *buffer = (char *)(buf); 
65   int size = buflen;
66   int numbytes; 
67   while (size > 0) {
68     numbytes = recv(fd, buffer, size, 0);
69     if (numbytes == -1) {
70       perror("recv");
71       exit(-1);
72     }
73     buffer += numbytes;
74     size -= numbytes;
75   }
76 }
77
78 int recv_data_errorcode(int fd , void *buf, int buflen) {
79   char *buffer = (char *)(buf); 
80   int size = buflen;
81   int numbytes; 
82   while (size > 0) {
83     numbytes = recv(fd, buffer, size, 0);
84     if (numbytes == -1) {
85       return -1;
86     }
87     buffer += numbytes;
88     size -= numbytes;
89   }
90   return 0;
91 }
92
93 void printhex(unsigned char *ptr, int numBytes) {
94   int i;
95   for (i = 0; i < numBytes; i++) {
96     if (ptr[i] < 16)
97       printf("0%x ", ptr[i]);
98     else
99       printf("%x ", ptr[i]);
100   }
101   printf("\n");
102   return;
103 }
104
105 inline int arrayLength(int *array) {
106   int i;
107   for(i=0 ;array[i] != -1; i++)
108     ;
109   return i;
110 }
111
112 inline int findmax(int *array, int arraylength) {
113   int max, i;
114   max = array[0];
115   for(i = 0; i < arraylength; i++){
116     if(array[i] > max) {
117       max = array[i];
118     }
119   }
120   return max;
121 }
122
123 /* This function is a prefetch call generated by the compiler that
124  * populates the shared primary prefetch queue*/
125 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
126   /* Allocate for the queue node*/
127   int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
128   char * node= malloc(qnodesize);
129   /* Set queue node values */
130   int len = sizeof(prefetchqelem_t);
131   int top=endoffsets[ntuples-1];
132   *((int *)(node+len))=ntuples;
133   len += sizeof(int);
134   
135   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
136   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
137   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
138
139   /* Lock and insert into primary prefetch queue */
140   pthread_mutex_lock(&pqueue.qlock);
141   pre_enqueue((prefetchqelem_t *)node);
142   pthread_cond_signal(&pqueue.qcond);
143   pthread_mutex_unlock(&pqueue.qlock);
144 }
145
146 /* This function starts up the transaction runtime. */
147 int dstmStartup(const char * option) {
148   pthread_t thread_Listen;
149   pthread_attr_t attr;
150   int master=option!=NULL && strcmp(option, "master")==0;
151   
152   if (processConfigFile() != 0)
153     return 0; //TODO: return error value, cause main program to exit
154 #ifdef COMPILER
155   if (!master)
156     threadcount--;
157 #endif
158   
159   //Initialize socket pool
160   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
161   transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
162   
163   dstmInit();
164   transInit();
165   
166   if (master) {
167     pthread_attr_init(&attr);
168     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
169     pthread_create(&thread_Listen, &attr, dstmListen, NULL);
170     return 1;
171   } else {
172     dstmListen();
173     return 0;
174   }
175 }
176
177 //TODO Use this later
178 void *pCacheAlloc(objstr_t *store, unsigned int size) {
179   void *tmp;
180   objstr_t *ptr;
181   ptr = store;
182   int success = 0;
183   
184   while(ptr->next != NULL) {
185     /* check if store is empty */
186     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
187       tmp = ptr->top;
188       ptr->top += size;
189       success = 1;
190       return tmp;
191     } else {
192       ptr = ptr-> next;
193     }
194   }
195   
196   if(success == 0) {
197     return NULL;
198   }
199 }
200
201 /* This function initiates the prefetch thread A queue is shared
202  * between the main thread of execution and the prefetch thread to
203  * process the prefetch call Call from compiler populates the shared
204  * queue with prefetch requests while prefetch thread processes the
205  * prefetch requests */
206
207 void transInit() {
208   int t, rc;
209   int retval;
210   //Create and initialize prefetch cache structure
211   prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
212   
213   /* Initialize attributes for mutex */
214   pthread_mutexattr_init(&prefetchcache_mutex_attr);
215   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
216   
217   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
218   
219   //Create prefetch cache lookup table
220   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
221     printf("ERROR\n");
222     return; //Failure
223   }
224   
225   //Initialize primary shared queue
226   queueInit();
227   //Initialize machine pile w/prefetch oids and offsets shared queue
228   mcpileqInit();
229   
230   //Create the primary prefetch thread 
231   do {
232     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
233   } while(retval!=0);
234   pthread_detach(tPrefetch);
235 }
236
237 /* This function stops the threads spawned */
238 void transExit() {
239   int t;
240   pthread_cancel(tPrefetch);
241   for(t = 0; t < NUM_THREADS; t++)
242     pthread_cancel(wthreads[t]);
243   
244   return;
245 }
246
247 /* This functions inserts randowm wait delays in the order of msec
248  * Mostly used when transaction commits retry*/
249 void randomdelay() {
250   struct timespec req;
251   time_t t;
252   
253   t = time(NULL);
254   req.tv_sec = 0;
255   req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
256   nanosleep(&req, NULL);
257   return;
258 }
259
260 /* This function initializes things required in the transaction start*/
261 transrecord_t *transStart() {
262   transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
263   tmp->cache = objstrCreate(1048576);
264   tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
265 #ifdef COMPILER
266   tmp->revertlist=NULL;
267 #endif
268   return tmp;
269 }
270
271 /* This function finds the location of the objects involved in a transaction
272  * and returns the pointer to the object if found in a remote location */
273 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
274   unsigned int machinenumber;
275   objheader_t *tmp, *objheader;
276   objheader_t *objcopy;
277   int size, found = 0;
278   void *buf;
279   
280   if(oid == 0) {
281     printf("Error: %s, %d oid is NULL \n", __FILE__, __LINE__);
282     return NULL;
283   }
284   
285   if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
286     /* Search local transaction cache */
287 #ifdef COMPILER
288     return &objheader[1];
289 #else
290     return objheader;
291 #endif
292   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
293     /* Look up in machine lookup table  and copy  into cache*/
294     GETSIZE(size, objheader);
295     size += sizeof(objheader_t);
296     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
297     memcpy(objcopy, objheader, size);
298     /* Insert into cache's lookup table */
299     chashInsert(record->lookupTable, OID(objheader), objcopy); 
300 #ifdef COMPILER
301     return &objcopy[1];
302 #else
303     return objcopy;
304 #endif
305   } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { 
306     /* Look up in prefetch cache */
307     GETSIZE(size, tmp);
308     size+=sizeof(objheader_t);
309     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
310     memcpy(objcopy, tmp, size);
311     /* Insert into cache's lookup table */
312     chashInsert(record->lookupTable, OID(tmp), objcopy); 
313 #ifdef COMPILER
314     return &objcopy[1];
315 #else
316     return objcopy;
317 #endif
318   } else {
319     /* Get the object from the remote location */
320     if((machinenumber = lhashSearch(oid)) == 0) {
321       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
322       return NULL;
323     }
324     objcopy = getRemoteObj(record, machinenumber, oid);
325     
326     if(objcopy == NULL) {
327       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
328       return NULL;
329     } else {
330       
331 #ifdef COMPILER
332       return &objcopy[1];
333 #else
334       return objcopy;
335 #endif
336     }
337   }
338 }
339
340 /* This function creates objects in the transaction record */
341 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
342   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
343   tmp->notifylist = NULL;
344   OID(tmp) = getNewOID();
345   tmp->version = 1;
346   tmp->rcount = 1;
347   STATUS(tmp) = NEW;
348   chashInsert(record->lookupTable, OID(tmp), tmp);
349   
350 #ifdef COMPILER
351   return &tmp[1]; //want space after object header
352 #else
353   return tmp;
354 #endif
355 }
356
357 /* This function creates machine piles based on all machines involved in a
358  * transaction commit request */
359 plistnode_t *createPiles(transrecord_t *record) {
360   int i;
361   plistnode_t *pile = NULL;
362   unsigned int machinenum;
363   objheader_t *headeraddr;
364   chashlistnode_t * ptr = record->lookupTable->table;
365   /* Represents number of bins in the chash table */
366   unsigned int size = record->lookupTable->size;
367   
368   for(i = 0; i < size ; i++) {
369     chashlistnode_t * curr = &ptr[i];
370     /* Inner loop to traverse the linked list of the cache lookupTable */
371     while(curr != NULL) {
372       //if the first bin in hash table is empty
373       if(curr->key == 0)
374         break;
375       
376       if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
377         printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
378         return NULL;
379       }
380       
381       //Get machine location for object id (and whether local or not)
382       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
383         machinenum = myIpAddr;
384       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
385         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
386         return NULL;
387       }
388       
389       //Make machine groups
390       pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
391       curr = curr->next;
392     }
393   }
394   return pile; 
395 }
396
397 /* This function initiates the transaction commit process
398  * Spawns threads for each of the new connections with Participants 
399  * and creates new piles by calling the createPiles(), 
400  * Sends a transrequest() to each remote machines for objects found remotely 
401  * and calls handleLocalReq() to process objects found locally */
402 int transCommit(transrecord_t *record) {        
403   unsigned int tot_bytes_mod, *listmid;
404   plistnode_t *pile, *pile_ptr;
405   int i, j, rc, val;
406   int pilecount, offset, threadnum = 0, trecvcount = 0;
407   char control;
408   char transid[TID_LEN];
409   trans_req_data_t *tosend;
410   trans_commit_data_t transinfo;
411   static int newtid = 0;
412   char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
413   char localstat = 0;
414   thread_data_array_t *thread_data_array;
415   local_thread_data_array_t *ltdata;
416   
417   do { 
418     trecvcount = 0; 
419     threadnum = 0; 
420     treplyretry = 0;
421     thread_data_array = NULL;
422     ltdata = NULL;
423     
424     /* Look through all the objects in the transaction record and make piles 
425      * for each machine involved in the transaction*/
426     pile_ptr = pile = createPiles(record);
427     
428     /* Create the packet to be sent in TRANS_REQUEST */
429     
430     /* Count the number of participants */
431     pilecount = pCount(pile);
432     
433     /* Create a list of machine ids(Participants) involved in transaction       */
434     listmid = calloc(pilecount, sizeof(unsigned int));
435     pListMid(pile, listmid);
436     
437     
438     /* Initialize thread variables,
439      * Spawn a thread for each Participant involved in a transaction */
440     pthread_t thread[pilecount];
441     pthread_attr_t attr;                        
442     pthread_cond_t tcond;
443     pthread_mutex_t tlock;
444     pthread_mutex_t tlshrd;
445     
446     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
447     
448     ltdata = calloc(1, sizeof(local_thread_data_array_t));
449     
450     thread_response_t rcvd_control_msg[pilecount];      /* Shared thread array that keeps track of responses of participants */
451     
452     /* Initialize and set thread detach attribute */
453     pthread_attr_init(&attr);
454     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
455     pthread_mutex_init(&tlock, NULL);
456     pthread_cond_init(&tcond, NULL);
457     
458     /* Process each machine pile */
459     while(pile != NULL) {
460       //Create transaction id
461       newtid++;
462       tosend = calloc(1, sizeof(trans_req_data_t));
463       tosend->f.control = TRANS_REQUEST;
464       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
465       tosend->f.mcount = pilecount;
466       tosend->f.numread = pile->numread;
467       tosend->f.nummod = pile->nummod;
468       tosend->f.numcreated = pile->numcreated;
469       tosend->f.sum_bytes = pile->sum_bytes;
470       tosend->listmid = listmid;
471       tosend->objread = pile->objread;
472       tosend->oidmod = pile->oidmod;
473       tosend->oidcreated = pile->oidcreated;
474       thread_data_array[threadnum].thread_id = threadnum;
475       thread_data_array[threadnum].mid = pile->mid;
476       thread_data_array[threadnum].buffer = tosend;
477       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
478       thread_data_array[threadnum].threshold = &tcond;
479       thread_data_array[threadnum].lock = &tlock;
480       thread_data_array[threadnum].count = &trecvcount;
481       thread_data_array[threadnum].replyctrl = &treplyctrl;
482       thread_data_array[threadnum].replyretry = &treplyretry;
483       thread_data_array[threadnum].rec = record;
484       /* If local do not create any extra connection */
485       if(pile->mid != myIpAddr) { /* Not local */
486         do {
487           rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
488         } while(rc!=0);
489         if(rc) {
490           perror("Error in pthread create\n");
491           pthread_cond_destroy(&tcond);
492           pthread_mutex_destroy(&tlock);
493           pDelete(pile_ptr);
494           free(listmid);
495           for (i = 0; i < threadnum; i++)
496             free(thread_data_array[i].buffer);
497           free(thread_data_array);
498           free(ltdata);
499           return 1;
500         }
501       } else { /*Local*/
502         ltdata->tdata = &thread_data_array[threadnum];
503         ltdata->transinfo = &transinfo;
504         do {
505           val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
506         } while(val!=0);
507         if(val) {
508           perror("Error in pthread create\n");
509           pthread_cond_destroy(&tcond);
510           pthread_mutex_destroy(&tlock);
511           pDelete(pile_ptr);
512           free(listmid);
513           for (i = 0; i < threadnum; i++)
514             free(thread_data_array[i].buffer);
515           free(thread_data_array);
516           free(ltdata);
517           return 1;
518         }
519       }
520       
521       threadnum++;              
522       pile = pile->next;
523     }
524     /* Free attribute and wait for the other threads */
525     pthread_attr_destroy(&attr);
526     
527     for (i = 0; i < threadnum; i++) {
528       rc = pthread_join(thread[i], NULL);
529       if(rc)
530         {
531           printf("Error: return code from pthread_join() is %d\n", rc);
532           pthread_cond_destroy(&tcond);
533           pthread_mutex_destroy(&tlock);
534           pDelete(pile_ptr);
535           free(listmid);
536           for (j = i; j < threadnum; j++) {
537             free(thread_data_array[j].buffer);
538           }
539           return 1;
540         }
541       free(thread_data_array[i].buffer);
542     }
543     
544     /* Free resources */        
545     pthread_cond_destroy(&tcond);
546     pthread_mutex_destroy(&tlock);
547     free(listmid);
548     pDelete(pile_ptr);
549     
550     /* wait a random amount of time before retrying to commit transaction*/
551     if(treplyretry) {
552       free(thread_data_array);
553       free(ltdata);
554       randomdelay();
555     }
556     
557     /* Retry trans commit procedure during soft_abort case */
558   } while (treplyretry);
559   
560   
561   if(treplyctrl == TRANS_ABORT) {
562     /* Free Resources */
563     objstrDelete(record->cache);
564     chashDelete(record->lookupTable);
565     free(record);
566     free(thread_data_array);
567     free(ltdata);
568     return TRANS_ABORT;
569   } else if(treplyctrl == TRANS_COMMIT) {
570     /* Free Resources */
571     objstrDelete(record->cache);
572     chashDelete(record->lookupTable);
573     free(record);
574     free(thread_data_array);
575     free(ltdata);
576     return 0;
577   } else {
578     //TODO Add other cases
579     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
580     exit(-1);
581   }
582   return 0;
583 }
584
585 /* This function sends information involved in the transaction request 
586  * to participants and accepts a response from particpants.
587  * It calls decideresponse() to decide on what control message 
588  * to send next to participants and sends the message using sendResponse()*/
589 void *transRequest(void *threadarg) {
590   int sd, i, n;
591   struct sockaddr_in serv_addr;
592   thread_data_array_t *tdata;
593   objheader_t *headeraddr;
594   char control, recvcontrol;
595   char machineip[16], retval;
596   
597   tdata = (thread_data_array_t *) threadarg;
598   
599   /* Send Trans Request */
600   if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
601     perror("Error in socket for TRANS_REQUEST\n");
602     pthread_exit(NULL);
603   }
604   bzero((char*) &serv_addr, sizeof(serv_addr));
605   serv_addr.sin_family = AF_INET;
606   serv_addr.sin_port = htons(LISTEN_PORT);
607   midtoIP(tdata->mid,machineip);
608   machineip[15] = '\0';
609   serv_addr.sin_addr.s_addr = inet_addr(machineip);
610   /* Open Connection */
611   if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
612     perror("Error in connect for TRANS_REQUEST\n");
613     close(sd);
614     pthread_exit(NULL);
615   }
616   
617   /* Send bytes of data with TRANS_REQUEST control message */
618   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
619   
620   /* Send list of machines involved in the transaction */
621   {
622     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
623     send_data(sd, tdata->buffer->listmid, size);
624   }
625   
626   /* Send oids and version number tuples for objects that are read */
627   {
628     int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
629     send_data(sd, tdata->buffer->objread, size);
630   }
631   
632   /* Send objects that are modified */
633   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
634     int size;
635     headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
636     GETSIZE(size,headeraddr);
637     size+=sizeof(objheader_t);
638     send_data(sd, headeraddr, size);
639   }
640   
641   /* Read control message from Participant */
642   recv_data(sd, &control, sizeof(char));
643   recvcontrol = control;
644   
645   /* Update common data structure and increment count */
646   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
647   
648   /* Lock and update count */
649   /* Thread sleeps until all messages from pariticipants are received by coordinator */
650   pthread_mutex_lock(tdata->lock);
651   
652   (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
653   
654   /* Wake up the threads and invoke decideResponse (once) */
655   if(*(tdata->count) == tdata->buffer->f.mcount) {
656     decideResponse(tdata); 
657     pthread_cond_broadcast(tdata->threshold);
658   } else {
659     pthread_cond_wait(tdata->threshold, tdata->lock);
660   }
661   pthread_mutex_unlock(tdata->lock);
662   
663   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
664    * to all participants in their respective socket */
665   if (sendResponse(tdata, sd) == 0) { 
666     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
667     close(sd);
668     pthread_exit(NULL);
669   }
670   
671   recv_data((int)sd, &control, sizeof(char)); 
672   
673   if(control == TRANS_UNSUCESSFUL) {
674     //printf("DEBUG-> TRANS_ABORTED\n");
675   } else if(control == TRANS_SUCESSFUL) {
676     //printf("DEBUG-> TRANS_SUCCESSFUL\n");
677   } else {
678     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
679   }
680   
681   /* Close connection */
682   close(sd);
683   pthread_exit(NULL);
684 }
685
686 /* This function decides the reponse that needs to be sent to 
687  * all Participant machines after the TRANS_REQUEST protocol */
688 void decideResponse(thread_data_array_t *tdata) {
689   char control;
690   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
691                                                                    message to send */
692   
693   for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
694     control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
695                                                written onto the shared array */
696     switch(control) {
697     default:
698       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
699       /* treat as disagree, pass thru */
700     case TRANS_DISAGREE:
701       transdisagree++;
702       break;
703       
704     case TRANS_AGREE:
705       transagree++;
706       break;
707       
708     case TRANS_SOFT_ABORT:
709       transsoftabort++;
710       break;
711     }
712   }
713   
714   if(transdisagree > 0) {
715     /* Send Abort */
716     *(tdata->replyctrl) = TRANS_ABORT;
717     *(tdata->replyretry) = 0;
718     /* clear objects from prefetch cache */
719     for (i = 0; i < tdata->buffer->f.numread; i++)
720       prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
721     for (i = 0; i < tdata->buffer->f.nummod; i++)
722       prehashRemove(tdata->buffer->oidmod[i]);
723   } else if(transagree == tdata->buffer->f.mcount){
724     /* Send Commit */
725     *(tdata->replyctrl) = TRANS_COMMIT;
726     *(tdata->replyretry) = 0;
727   } else { 
728     /* Send Abort in soft abort case followed by retry commiting transaction again*/
729     *(tdata->replyctrl) = TRANS_ABORT;
730     *(tdata->replyretry) = 1;
731   }
732   
733   return;
734 }
735
736 /* This function sends the final response to remote machines per
737  * thread in their respective socket id It returns a char that is only
738  * needed to check the correctness of execution of this function
739  * inside transRequest()*/
740
741 char sendResponse(thread_data_array_t *tdata, int sd) {
742   int n, size, sum, oidcount = 0, control;
743   char *ptr, retval = 0;
744   unsigned int *oidnotfound;
745   
746   control = *(tdata->replyctrl);
747   send_data(sd, &control, sizeof(char));
748   
749   //TODO read missing objects during object migration
750   /* If response is a soft abort due to missing objects at the
751      Participant's side */
752   
753   /* If the decided response is TRANS_ABORT */
754   if(*(tdata->replyctrl) == TRANS_ABORT) {
755     retval = TRANS_ABORT;
756   } else if(*(tdata->replyctrl) == TRANS_COMMIT) { 
757     /* If the decided response is TRANS_COMMIT */ 
758     retval = TRANS_COMMIT;
759   }
760   
761   return retval;
762 }
763
764 /* This function opens a connection, places an object read request to
765  * the remote machine, reads the control message and object if
766  * available and copies the object and its header to the local
767  * cache. */ 
768
769 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
770   int size, val;
771   struct sockaddr_in serv_addr;
772   char machineip[16];
773   char control;
774   objheader_t *h;
775   void *objcopy = NULL;
776   
777   int sd = getSock2(transReadSockPool, mnum);
778   char readrequest[sizeof(char)+sizeof(unsigned int)];
779   readrequest[0] = READ_REQUEST;
780   *((unsigned int *)(&readrequest[1])) = oid;
781   send_data(sd, readrequest, sizeof(readrequest));
782   
783   /* Read response from the Participant */
784   recv_data(sd, &control, sizeof(char));
785   
786   if (control==OBJECT_NOT_FOUND) {
787     objcopy = NULL;
788   } else {
789     /* Read object if found into local cache */
790     recv_data(sd, &size, sizeof(int));
791     objcopy = objstrAlloc(record->cache, size);
792     recv_data(sd, objcopy, size);
793     
794     /* Insert into cache's lookup table */
795     chashInsert(record->lookupTable, oid, objcopy); 
796   }
797   
798   //    freeSock(transReadSockPool, mnum, sd);
799   
800   return objcopy;
801 }
802
803 /* This function handles the local objects involved in a transaction
804  * commiting process.  It also makes a decision if this local machine
805  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator.  Note
806  * Coordinator = local machine It wakes up the other threads from
807  * remote participants that are waiting for the coordinator's decision
808  * and based on common agreement it either commits or aborts the
809  * transaction.  It also frees the memory resources */
810
811 void *handleLocalReq(void *threadarg) {
812   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
813   local_thread_data_array_t *localtdata;
814   int objnotfound = 0, objlocked = 0; 
815   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
816   int numread, i;
817   unsigned int oid;
818   unsigned short version;
819   void *mobj;
820   objheader_t *headptr;
821   
822   localtdata = (local_thread_data_array_t *) threadarg;
823   
824   /* Counters and arrays to formulate decision on control message to be sent */
825   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
826   oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
827   
828   numread = localtdata->tdata->buffer->f.numread;
829   /* Process each oid in the machine pile/ group per thread */
830   for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
831     if (i < localtdata->tdata->buffer->f.numread) {
832       int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
833       incr *= i;
834       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
835       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
836     } else { // Objects Modified
837       int tmpsize;
838       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
839       if (headptr == NULL) {
840         printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
841         return NULL;
842       }
843       oid = OID(headptr);
844       version = headptr->version;
845     }
846     /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
847     
848     /* Save the oids not found and number of oids not found for later use */
849     if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
850       /* Save the oids not found and number of oids not found for later use */
851       oidnotfound[objnotfound] = oid;
852       objnotfound++;
853     } else { /* If Obj found in machine (i.e. has not moved) */
854       /* Check if Obj is locked by any previous transaction */
855       if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
856         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
857           v_matchlock++;
858         } else {/* If versions don't match ...HARD ABORT */
859           v_nomatch++;
860           /* Send TRANS_DISAGREE to Coordinator */
861           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
862         }
863       } else {/* If Obj is not locked then lock object */
864         STATUS(((objheader_t *)mobj)) |= LOCK;
865         /* Save all object oids that are locked on this machine during this transaction request call */
866         oidlocked[objlocked] = OID(((objheader_t *)mobj));
867         objlocked++;
868         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
869           v_matchnolock++;
870         } else { /* If versions don't match ...HARD ABORT */
871           v_nomatch++;
872           /* Send TRANS_DISAGREE to Coordinator */
873           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
874         }
875       }
876     }
877   } // End for
878   /* Condition to send TRANS_AGREE */
879   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
880     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
881   }
882   /* Condition to send TRANS_SOFT_ABORT */
883   if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
884     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
885   }
886   
887   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
888    * if Participant receives a TRANS_COMMIT */
889   localtdata->transinfo->objlocked = oidlocked;
890   localtdata->transinfo->objnotfound = oidnotfound;
891   localtdata->transinfo->modptr = NULL;
892   localtdata->transinfo->numlocked = objlocked;
893   localtdata->transinfo->numnotfound = objnotfound;
894   /* Lock and update count */
895   //Thread sleeps until all messages from pariticipants are received by coordinator
896   pthread_mutex_lock(localtdata->tdata->lock);
897   (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
898   
899   /* Wake up the threads and invoke decideResponse (once) */
900   if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
901     decideResponse(localtdata->tdata); 
902     pthread_cond_broadcast(localtdata->tdata->threshold);
903   } else {
904     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
905   }
906   pthread_mutex_unlock(localtdata->tdata->lock);
907   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
908     if(transAbortProcess(localtdata) != 0) {
909       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
910       pthread_exit(NULL);
911     }
912   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
913     if(transComProcess(localtdata) != 0) {
914       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
915       pthread_exit(NULL);
916     }
917   }
918   /* Free memory */
919   if (localtdata->transinfo->objlocked != NULL) {
920     free(localtdata->transinfo->objlocked);
921   }
922   if (localtdata->transinfo->objnotfound != NULL) {
923     free(localtdata->transinfo->objnotfound);
924   }
925   
926   pthread_exit(NULL);
927 }
928
929 /* This function completes the ABORT process if the transaction is aborting */
930 int transAbortProcess(local_thread_data_array_t  *localtdata) {
931   int i, numlocked;
932   unsigned int *objlocked;
933   void *header;
934   
935   numlocked = localtdata->transinfo->numlocked;
936   objlocked = localtdata->transinfo->objlocked;
937   
938   for (i = 0; i < numlocked; i++) {
939     if((header = mhashSearch(objlocked[i])) == NULL) {
940       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
941       return 1;
942     }
943     STATUS(((objheader_t *)header)) &= ~(LOCK);
944   }
945   
946   return 0;
947 }
948
949 /*This function completes the COMMIT process is the transaction is commiting*/
950 int transComProcess(local_thread_data_array_t  *localtdata) {
951         objheader_t *header, *tcptr;
952         int i, nummod, tmpsize, numcreated, numlocked;
953         unsigned int *oidmod, *oidcreated, *oidlocked;
954         void *ptrcreate;
955
956         nummod = localtdata->tdata->buffer->f.nummod;
957         oidmod = localtdata->tdata->buffer->oidmod;
958         numcreated = localtdata->tdata->buffer->f.numcreated;
959         oidcreated = localtdata->tdata->buffer->oidcreated;
960         numlocked = localtdata->transinfo->numlocked;
961         oidlocked = localtdata->transinfo->objlocked;
962
963         for (i = 0; i < nummod; i++) {
964                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
965                         printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
966                         return 1;
967                 }
968                 /* Copy from transaction cache -> main object store */
969                 if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
970                         printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
971                         return 1;
972                 }
973                 GETSIZE(tmpsize, header);
974                 pthread_mutex_lock(&mainobjstore_mutex);
975                 memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize);
976                 header->version += 1;
977                 if(header->notifylist != NULL) {
978                         notifyAll(&header->notifylist, OID(header), header->version);
979                 }
980                 pthread_mutex_unlock(&mainobjstore_mutex);
981         }
982         /* If object is newly created inside transaction then commit it */
983         for (i = 0; i < numcreated; i++) {
984                 if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
985                         printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
986                         return 1;
987                 }
988                 GETSIZE(tmpsize, header);
989                 tmpsize += sizeof(objheader_t);
990                 pthread_mutex_lock(&mainobjstore_mutex);
991                 if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
992                         printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
993                         pthread_mutex_unlock(&mainobjstore_mutex);
994                         return 1;
995                 }
996                 pthread_mutex_unlock(&mainobjstore_mutex);
997                 memcpy(ptrcreate, header, tmpsize);
998                 mhashInsert(oidcreated[i], ptrcreate);
999                 lhashInsert(oidcreated[i], myIpAddr);
1000         }
1001         /* Unlock locked objects */
1002         for(i = 0; i < numlocked; i++) {
1003                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1004                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1005                         return 1;
1006                 }
1007                 STATUS(header) &= ~(LOCK);
1008         }
1009
1010         return 0;
1011 }
1012
1013 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
1014   char * ptr = (char *) node;
1015   int ntuples = *(GET_NTUPLES(ptr));
1016   unsigned int * oidarray = GET_PTR_OID(ptr);
1017   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1018   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1019   prefetchpile_t * head=NULL;
1020   
1021   int i;
1022   for(i=0;i<ntuples; i++) {
1023     unsigned short baseindex=(i==0)?0:endoffsets[i-1];
1024     unsigned short endindex=endoffsets[i];
1025     unsigned int oid=oidarray[i];
1026     int newbase;
1027     int machinenum;
1028     //Look up fields locally
1029     for(newbase=baseindex;newbase<endindex;newbase++) {
1030       if (!lookupObject(&oid, arryfields[newbase]))
1031         break;
1032       //Ended in a null pointer...
1033       if (oid==0)
1034         goto tuple;
1035     }
1036     //Entire prefetch is local
1037     if (newbase==endindex&&checkoid(oid))
1038       goto tuple;
1039     //Add to remote requests
1040     machinenum=lhashSearch(oid);
1041     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1042   tuple:
1043     ;
1044   }
1045   return head;
1046 }
1047
1048 int checkoid(unsigned int oid) {
1049   objheader_t *header;
1050   if ((header=mhashSearch(oid))!=NULL) {
1051     //Found on machine
1052     return 1;
1053   } else if ((header=prehashSearch(oid))!=NULL) {
1054     //Found in cache
1055     return 1;
1056   } else {
1057     return 0;
1058   }
1059 }
1060
1061 int lookupObject(unsigned int * oid, short offset) {
1062   objheader_t *header;
1063   if ((header=mhashSearch(*oid))!=NULL) {
1064     //Found on machine
1065     ;
1066   } else if ((header=prehashSearch(*oid))!=NULL) {
1067     //Found in cache
1068     ;
1069   } else {
1070     return 0;
1071   }
1072
1073   if(TYPE(header) > NUMCLASSES) {
1074     int elementsize = classsize[TYPE(header)];
1075     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1076     int length = ao->___length___;
1077     /* Check if array out of bounds */
1078     if(offset < 0 || offset >= length) {
1079       //if yes treat the object as found
1080       (*oid)=0;
1081       return 1;
1082     }
1083     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1084     return 1;
1085   } else {
1086     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1087     return 1;
1088   }
1089 }
1090
1091
1092 /* This function is called by the thread calling transPrefetch */
1093 void *transPrefetch(void *t) {
1094   while(1) {
1095     /* lock mutex of primary prefetch queue */
1096     pthread_mutex_lock(&pqueue.qlock);
1097     /* while primary queue is empty, then wait */
1098     while(pqueue.front == NULL) {
1099       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
1100     }
1101     
1102     /* dequeue node to create a machine piles and  finally unlock mutex */
1103     prefetchqelem_t *qnode = pre_dequeue();
1104     pthread_mutex_unlock(&pqueue.qlock);
1105     
1106     /* Check if the tuples are found locally, if yes then reduce them further*/ 
1107     /* and group requests by remote machine ids by calling the makePreGroups() */
1108     prefetchpile_t *pilehead = foundLocal(qnode);
1109
1110     if (pilehead!=NULL) {
1111       // Get sock from shared pool 
1112       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1113       
1114       /* Send  Prefetch Request */
1115       prefetchpile_t *ptr = pilehead;
1116       while(ptr != NULL) {
1117         sendPrefetchReq(ptr, sd);
1118         ptr = ptr->next; 
1119       }
1120       
1121       /* Release socket */
1122       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1123       
1124       /* Deallocated pilehead */
1125       mcdealloc(pilehead);
1126       
1127     }
1128     // Deallocate the prefetch queue pile node
1129     predealloc(qnode);
1130   }
1131 }
1132
1133 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1134   objpile_t *tmp;
1135   
1136   int size=sizeof(char)+sizeof(int);
1137   for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
1138     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1139   }
1140   
1141   char buft[size];
1142   char *buf=buft;
1143   *buf=TRANS_PREFETCH;
1144   buf+=sizeof(char);
1145   
1146   for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
1147     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1148     *((int*)buf)=len;
1149     buf+=sizeof(int);
1150     *((unsigned int *)buf)=tmp->oid;
1151     buf+=sizeof(unsigned int);
1152     *((unsigned int *)(buf)) = myIpAddr; 
1153     buf+=sizeof(unsigned int);
1154     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1155     buf+=tmp->numoffset*sizeof(short);
1156   }
1157   *((int *)buf)=-1;
1158   send_data(sd, buft, size);
1159   return;
1160 }
1161
1162 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1163   int len, endpair;
1164   char control;
1165   objpile_t *tmp;
1166   
1167   /* Send TRANS_PREFETCH control message */
1168   control = TRANS_PREFETCH;
1169   send_data(sd, &control, sizeof(char));
1170   
1171   /* Send Oids and offsets in pairs */
1172   tmp = mcpilenode->objpiles;
1173   while(tmp != NULL) {
1174     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1175     char oidnoffset[len];
1176     char *buf=oidnoffset;
1177     *((int*)buf) = tmp->numoffset;
1178     buf+=sizeof(int);
1179     *((unsigned int *)buf) = tmp->oid;
1180     buf+=sizeof(unsigned int);
1181     *((unsigned int *)buf) = myIpAddr; 
1182     buf += sizeof(unsigned int);
1183     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1184     send_data(sd, oidnoffset, len);
1185     tmp = tmp->next;
1186   }
1187   
1188   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1189   endpair = -1;
1190   send_data(sd, &endpair, sizeof(int));
1191   
1192   return;
1193 }
1194
1195 int getPrefetchResponse(int sd) {
1196   int length = 0, size = 0;
1197   char control;
1198   unsigned int oid;
1199   void *modptr, *oldptr;
1200   
1201   recv_data((int)sd, &length, sizeof(int)); 
1202   size = length - sizeof(int);
1203   char recvbuffer[size];
1204
1205   recv_data((int)sd, recvbuffer, size);
1206   control = *((char *) recvbuffer);
1207   if(control == OBJECT_FOUND) {
1208     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1209     size = size - (sizeof(char) + sizeof(unsigned int));
1210     pthread_mutex_lock(&prefetchcache_mutex);
1211     if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
1212       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1213       pthread_mutex_unlock(&prefetchcache_mutex);
1214       return -1;
1215     }
1216     pthread_mutex_unlock(&prefetchcache_mutex);
1217     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1218     
1219     /* Insert the oid and its address into the prefetch hash lookup table */
1220     /* Do a version comparison if the oid exists */
1221     if((oldptr = prehashSearch(oid)) != NULL) {
1222       /* If older version then update with new object ptr */
1223       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1224         prehashRemove(oid);
1225         prehashInsert(oid, modptr);
1226       }
1227     } else {/* Else add the object ptr to hash table*/
1228       prehashInsert(oid, modptr);
1229     }
1230     /* Lock the Prefetch Cache look up table*/
1231     pthread_mutex_lock(&pflookup.lock);
1232     /* Broadcast signal on prefetch cache condition variable */ 
1233     pthread_cond_broadcast(&pflookup.cond);
1234     /* Unlock the Prefetch Cache look up table*/
1235     pthread_mutex_unlock(&pflookup.lock);
1236   } else if(control == OBJECT_NOT_FOUND) {
1237     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1238     /* TODO: For each object not found query DHT for new location and retrieve the object */
1239     /* Throw an error */
1240     printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1241     exit(-1);
1242   } else {
1243     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1244   }
1245   
1246   return 0;
1247 }
1248
1249 unsigned short getObjType(unsigned int oid) {
1250   objheader_t *objheader;
1251   unsigned short numoffset[] ={0};
1252   short fieldoffset[] ={};
1253   
1254   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1255       if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1256         prefetch(1, &oid, numoffset, fieldoffset);
1257         pthread_mutex_lock(&pflookup.lock);
1258         while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1259           pthread_cond_wait(&pflookup.cond, &pflookup.lock);
1260         }
1261         pthread_mutex_unlock(&pflookup.lock);
1262       }
1263   }
1264   
1265   return TYPE(objheader);
1266 }
1267
1268 int startRemoteThread(unsigned int oid, unsigned int mid)
1269 {
1270         int sock;
1271         struct sockaddr_in remoteAddr;
1272         char msg[1 + sizeof(unsigned int)];
1273         int bytesSent;
1274         int status;
1275
1276         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1277         {
1278                 perror("startRemoteThread():socket()");
1279                 return -1;
1280         }
1281
1282         bzero(&remoteAddr, sizeof(remoteAddr));
1283         remoteAddr.sin_family = AF_INET;
1284         remoteAddr.sin_port = htons(LISTEN_PORT);
1285         remoteAddr.sin_addr.s_addr = htonl(mid);
1286         
1287         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1288         {
1289                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1290                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1291                 status = -1;
1292         }
1293         else
1294         {
1295                 msg[0] = START_REMOTE_THREAD;
1296         *((unsigned int *) &msg[1]) = oid;
1297                 send_data(sock, msg, 1 + sizeof(unsigned int));
1298         }
1299
1300         close(sock);
1301         return status;
1302 }
1303
1304 //TODO: when reusing oids, make sure they are not already in use!
1305 unsigned int getNewOID(void) {
1306         static unsigned int id = 0xFFFFFFFF;
1307         
1308         id += 2;
1309         if (id > oidMax || id < oidMin)
1310         {
1311                 id = (oidMin | 1);
1312         }
1313         return id;
1314 }
1315
1316 int processConfigFile()
1317 {
1318         FILE *configFile;
1319         const int maxLineLength = 200;
1320         char lineBuffer[maxLineLength];
1321         char *token;
1322         const char *delimiters = " \t\n";
1323         char *commentBegin;
1324         in_addr_t tmpAddr;
1325         
1326         configFile = fopen(CONFIG_FILENAME, "r");
1327         if (configFile == NULL)
1328         {
1329                 printf("error opening %s:\n", CONFIG_FILENAME);
1330                 perror("");
1331                 return -1;
1332         }
1333
1334         numHostsInSystem = 0;
1335         sizeOfHostArray = 8;
1336         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1337         
1338         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1339         {
1340                 commentBegin = strchr(lineBuffer, '#');
1341                 if (commentBegin != NULL)
1342                         *commentBegin = '\0';
1343                 token = strtok(lineBuffer, delimiters);
1344                 while (token != NULL)
1345                 {
1346                         tmpAddr = inet_addr(token);
1347                         if ((int)tmpAddr == -1)
1348                         {
1349                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1350                                 fclose(configFile);
1351                                 return -1;
1352                         }
1353                         else
1354                                 addHost(htonl(tmpAddr));
1355                         token = strtok(NULL, delimiters);
1356                 }
1357         }
1358
1359         fclose(configFile);
1360         
1361         if (numHostsInSystem < 1)
1362         {
1363                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1364                 return -1;
1365         }
1366 #ifdef MAC
1367         myIpAddr = getMyIpAddr("en1");
1368 #else
1369         myIpAddr = getMyIpAddr("eth0");
1370 #endif
1371         myIndexInHostArray = findHost(myIpAddr);
1372         if (myIndexInHostArray == -1)
1373         {
1374                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1375                 return -1;
1376         }
1377         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1378         oidMin = oidsPerBlock * myIndexInHostArray;
1379         if (myIndexInHostArray == numHostsInSystem - 1)
1380                 oidMax = 0xFFFFFFFF;
1381         else
1382                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1383
1384         return 0;
1385 }
1386
1387 void addHost(unsigned int hostIp)
1388 {
1389         unsigned int *tmpArray;
1390
1391         if (findHost(hostIp) != -1)
1392                 return;
1393
1394         if (numHostsInSystem == sizeOfHostArray)
1395         {
1396                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1397                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1398                 free(hostIpAddrs);
1399                 hostIpAddrs = tmpArray;
1400         }
1401
1402         hostIpAddrs[numHostsInSystem++] = hostIp;
1403
1404         return;
1405 }
1406
1407 int findHost(unsigned int hostIp)
1408 {
1409         int i;
1410         for (i = 0; i < numHostsInSystem; i++)
1411                 if (hostIpAddrs[i] == hostIp)
1412                         return i;
1413
1414         //not found
1415         return -1;
1416 }
1417
1418 /* This function sends notification request per thread waiting on object(s) whose version 
1419  * changes */
1420 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1421         int sock,i;
1422         objheader_t *objheader;
1423         struct sockaddr_in remoteAddr;
1424         char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1425         char *ptr;
1426         int bytesSent;
1427         int status, size;
1428         unsigned short version;
1429         unsigned int oid,mid;
1430         static unsigned int threadid = 0;
1431         pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1432         pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1433         notifydata_t *ndata;
1434
1435         //FIXME currently all oids belong to one machine
1436         oid = oidarry[0];
1437         if((mid = lhashSearch(oid)) == 0) {
1438                 printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1439                 return;
1440         }
1441
1442         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1443                 perror("reqNotify():socket()");
1444                 return -1;
1445         }
1446
1447         bzero(&remoteAddr, sizeof(remoteAddr));
1448         remoteAddr.sin_family = AF_INET;
1449         remoteAddr.sin_port = htons(LISTEN_PORT);
1450         remoteAddr.sin_addr.s_addr = htonl(mid);
1451
1452         /* Generate unique threadid */
1453         threadid++;
1454
1455         /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1456         if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1457                 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1458                 return -1;
1459         }
1460         ndata->numoid = numoid;
1461         ndata->threadid = threadid;
1462         ndata->oidarry = oidarry;
1463         ndata->versionarry = versionarry;
1464         ndata->threadcond = threadcond;
1465         ndata->threadnotify = threadnotify;
1466         if((status = notifyhashInsert(threadid, ndata)) != 0) {
1467                 printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1468                 free(ndata);
1469                 return -1; 
1470         }
1471         
1472         /* Send  number of oids, oidarry, version array, machine id and threadid */     
1473         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1474                 printf("reqNotify():error %d connecting to %s:%d\n", errno,
1475                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1476                 free(ndata);
1477                 return -1;
1478         } else {
1479                 msg[0] = THREAD_NOTIFY_REQUEST;
1480                 *((unsigned int *)(&msg[1])) = numoid;
1481                 /* Send array of oids  */
1482                 size = sizeof(unsigned int);
1483                 {
1484                         i = 0;
1485                         while(i < numoid) {
1486                                 oid = oidarry[i];
1487                                 *((unsigned int *)(&msg[1] + size)) = oid;
1488                                 size += sizeof(unsigned int);
1489                                 i++;
1490                         }
1491                 }
1492
1493                 /* Send array of version  */
1494                 {
1495                         i = 0;
1496                         while(i < numoid) {
1497                                 version = versionarry[i];
1498                                 *((unsigned short *)(&msg[1] + size)) = version;
1499                                 size += sizeof(unsigned short);
1500                                 i++;
1501                         }
1502                 }
1503
1504                 *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1505                 size += sizeof(unsigned int);
1506                 *((unsigned int *)(&msg[1] + size)) = threadid;
1507
1508                 pthread_mutex_lock(&(ndata->threadnotify));
1509                 size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1510                 send_data(sock, msg, size);
1511                 pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1512                 pthread_mutex_unlock(&(ndata->threadnotify));
1513         }
1514
1515         pthread_cond_destroy(&threadcond);
1516         pthread_mutex_destroy(&threadnotify);
1517         free(ndata);
1518         close(sock);
1519         return status;
1520 }
1521
1522 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1523         notifydata_t *ndata;
1524         int i, objIsFound = 0, index;
1525         void *ptr;
1526
1527         //Look up the tid and call the corresponding pthread_cond_signal
1528         if((ndata = notifyhashSearch(tid)) == NULL) {
1529                 printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1530                 return;
1531         } else  {
1532                 for(i = 0; i < ndata->numoid; i++) {
1533                         if(ndata->oidarry[i] == oid){
1534                                 objIsFound = 1;
1535                                 index = i;
1536                         }
1537                 }
1538                 if(objIsFound == 0){
1539                         printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1540                         return;
1541                 } else {
1542                         if(version <= ndata->versionarry[index]){
1543                                 printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
1544                                 return;
1545                         } else {
1546                                 /* Clear from prefetch cache and free thread related data structure */
1547                                 if((ptr = prehashSearch(oid)) != NULL) {
1548                                         prehashRemove(oid);
1549                                 }
1550                                 pthread_cond_signal(&(ndata->threadcond));
1551                         }
1552                 }
1553         }
1554         return;
1555 }
1556
1557 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1558         threadlist_t *ptr;
1559         unsigned int mid;
1560         struct sockaddr_in remoteAddr;
1561         char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1562         int sock, status, size, bytesSent;
1563
1564         while(*head != NULL) {
1565                 ptr = *head;
1566                 mid = ptr->mid; 
1567                 //create a socket connection to that machine
1568                 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1569                         perror("notifyAll():socket()");
1570                         return -1;
1571                 }
1572
1573                 bzero(&remoteAddr, sizeof(remoteAddr));
1574                 remoteAddr.sin_family = AF_INET;
1575                 remoteAddr.sin_port = htons(LISTEN_PORT);
1576                 remoteAddr.sin_addr.s_addr = htonl(mid);
1577                 //send Thread Notify response and threadid to that machine
1578                 if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1579                         printf("notifyAll():error %d connecting to %s:%d\n", errno,
1580                                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1581                         status = -1;
1582                 } else {
1583                         bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1584                         msg[0] = THREAD_NOTIFY_RESPONSE;
1585                         *((unsigned int *)&msg[1]) = oid;
1586                         size = sizeof(unsigned int);
1587                         *((unsigned short *)(&msg[1]+ size)) = version;
1588                         size+= sizeof(unsigned short);
1589                         *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1590
1591                         size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1592                         send_data(sock, msg, size);
1593                 }
1594                 //close socket
1595                 close(sock);
1596                 // Update head
1597                 *head = ptr->next;
1598                 free(ptr);
1599         }
1600         return status;
1601 }
1602
1603 void transAbort(transrecord_t *trans) {
1604         objstrDelete(trans->cache);
1605         chashDelete(trans->lookupTable);
1606         free(trans);
1607 }