fix accounting of bytes received
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21 #include "trans.h"
22
23 #define NUM_THREADS 1
24 #define CONFIG_FILENAME "dstm.conf"
25
26 /* Thread transaction variables */
27
28 __thread objstr_t *t_cache;
29 __thread struct ___Object___ *revertlist;
30 #ifdef ABORTREADERS
31 __thread int t_abort;
32 __thread jmp_buf aborttrans;
33 #endif
34
35
36 /* Global Variables */
37 extern int classsize[];
38 pfcstats_t *evalPrefetch;
39 extern int numprefetchsites; //Global variable containing number of prefetch sites
40 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
41 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
42 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
43 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
44 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
45 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
46 extern objstr_t *mainobjstore;
47 unsigned int myIpAddr;
48 unsigned int *hostIpAddrs;
49 int sizeOfHostArray;
50 int numHostsInSystem;
51 int myIndexInHostArray;
52 unsigned int oidsPerBlock;
53 unsigned int oidMin;
54 unsigned int oidMax;
55
56 sockPoolHashTable_t *transReadSockPool;
57 sockPoolHashTable_t *transPrefetchSockPool;
58 sockPoolHashTable_t *transRequestSockPool;
59 pthread_mutex_t notifymutex;
60 pthread_mutex_t atomicObjLock;
61
62 /***********************************
63  * Global Variables for statistics
64  **********************************/
65 int numTransCommit = 0;
66 int numTransAbort = 0;
67 int nchashSearch = 0;
68 int nmhashSearch = 0;
69 int nprehashSearch = 0;
70 int nRemoteSend = 0;
71 int nSoftAbort = 0;
72 int bytesSent = 0;
73 int bytesRecv = 0;
74 int totalObjSize = 0;
75 int sendRemoteReq = 0;
76 int getResponse = 0;
77
78 void printhex(unsigned char *, int);
79 plistnode_t *createPiles();
80 plistnode_t *sortPiles(plistnode_t *pileptr);
81
82 #ifdef LOGEVENTS
83 char bigarray[16*1024*1024];
84 int bigindex=0;
85 #define LOGEVENT(x) { \
86     int tmp=bigindex++;                         \
87     bigarray[tmp]=x;                            \
88   }
89 #else
90 #define LOGEVENT(x)
91 #endif
92
93 /*******************************
94 * Send and Recv function calls
95 *******************************/
96 void send_data(int fd, void *buf, int buflen) {
97   char *buffer = (char *)(buf);
98   int size = buflen;
99   int numbytes;
100   while (size > 0) {
101     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
102     bytesSent = bytesSent + numbytes;
103     if (numbytes == -1) {
104       perror("send");
105       exit(0);
106     }
107     buffer += numbytes;
108     size -= numbytes;
109   }
110 }
111
112 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
113   if (buflen+sendbuffer->offset>WMAXBUF) {
114     send_data(fd, sendbuffer->buf, sendbuffer->offset);
115     sendbuffer->offset=0;
116     send_data(fd, buffer, buflen);
117     return;
118   }
119   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
120   sendbuffer->offset+=buflen;
121   if (sendbuffer->offset>WTOP) {
122     send_data(fd, sendbuffer->buf, sendbuffer->offset);
123     sendbuffer->offset=0;
124   }
125 }
126
127 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
128   if (buflen+sendbuffer->offset>WMAXBUF) {
129     send_data(fd, sendbuffer->buf, sendbuffer->offset);
130     sendbuffer->offset=0;
131     send_data(fd, buffer, buflen);
132     return;
133   }
134   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
135   sendbuffer->offset+=buflen;
136   send_data(fd, sendbuffer->buf, sendbuffer->offset);
137   sendbuffer->offset=0;
138 }
139
140 int recvw(int fd, void *buf, int len, int flags) {
141   return recv(fd, buf, len, flags);
142 }
143  
144 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
145   char *buf=(char *)buffer;
146   int numbytes=readbuffer->head-readbuffer->tail;
147   if (numbytes>buflen)
148     numbytes=buflen;
149   if (numbytes>0) {
150     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
151     readbuffer->tail+=numbytes;
152     buflen-=numbytes;
153     buf+=numbytes;
154   }
155   if (buflen==0) {
156     return;
157   }
158   if (buflen>=MAXBUF) {
159     recv_data(fd, buf, buflen);
160     return;
161   }
162   
163   int maxbuf=MAXBUF;
164   int obufflen=buflen;
165   readbuffer->head=0;
166   
167   while (buflen > 0) {
168     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
169     if (numbytes == -1) {
170       perror("recv");
171       exit(0);
172     }
173     bytesRecv+=numbytes;
174     buflen-=numbytes;
175     readbuffer->head+=numbytes;
176     maxbuf-=numbytes;
177   }
178   memcpy(buf,readbuffer->buf,obufflen);
179   readbuffer->tail=obufflen;
180 }
181
182 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
183   char *buf=(char *)buffer;
184   //now tail<=head
185   int numbytes=readbuffer->head-readbuffer->tail;
186   if (numbytes>buflen)
187     numbytes=buflen;
188   if (numbytes>0) {
189     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
190     readbuffer->tail+=numbytes;
191     buflen-=numbytes;
192     buf+=numbytes;
193   }
194   if (buflen==0)
195     return 1;
196
197   if (buflen>=MAXBUF) {
198     return recv_data_errorcode(fd, buf, buflen);
199   }
200
201   int maxbuf=MAXBUF;
202   int obufflen=buflen;
203   readbuffer->head=0;
204   
205   while (buflen > 0) {
206     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
207     if (numbytes ==0) {
208       return 0;
209     }
210     if (numbytes==-1) {
211       perror("recvbuf");
212       return -1;
213     }
214     bytesRecv+=numbytes;
215     buflen-=numbytes;
216     readbuffer->head+=numbytes;
217     maxbuf-=numbytes;
218   }
219   memcpy(buf,readbuffer->buf,obufflen);
220   readbuffer->tail=obufflen;
221   return 1;
222 }
223
224
225 void recv_data(int fd, void *buf, int buflen) {
226   char *buffer = (char *)(buf);
227   int size = buflen;
228   int numbytes;
229   while (size > 0) {
230     numbytes = recvw(fd, buffer, size, 0);
231     bytesRecv = bytesRecv + numbytes;
232     if (numbytes == -1) {
233       perror("recv");
234       exit(0);
235     }
236     buffer += numbytes;
237     size -= numbytes;
238   }
239 }
240
241 int recv_data_errorcode(int fd, void *buf, int buflen) {
242   char *buffer = (char *)(buf);
243   int size = buflen;
244   int numbytes;
245   while (size > 0) {
246     numbytes = recvw(fd, buffer, size, 0);
247     if (numbytes==0)
248       return 0;
249     if (numbytes == -1) {
250       perror("recv");
251       return -1;
252     }
253     bytesRecv+=numbytes;
254     buffer += numbytes;
255     size -= numbytes;
256   }
257   return 1;
258 }
259
260 void printhex(unsigned char *ptr, int numBytes) {
261   int i;
262   for (i = 0; i < numBytes; i++) {
263     if (ptr[i] < 16)
264       printf("0%x ", ptr[i]);
265     else
266       printf("%x ", ptr[i]);
267   }
268   printf("\n");
269   return;
270 }
271
272 inline int arrayLength(int *array) {
273   int i;
274   for(i=0 ; array[i] != -1; i++)
275     ;
276   return i;
277 }
278
279 inline int findmax(int *array, int arraylength) {
280   int max, i;
281   max = array[0];
282   for(i = 0; i < arraylength; i++) {
283     if(array[i] > max) {
284       max = array[i];
285     }
286   }
287   return max;
288 }
289
290 //#define INLINEPREFETCH
291 #define PREFTHRESHOLD 4
292
293 /* This function is a prefetch call generated by the compiler that
294  * populates the shared primary prefetch queue*/
295 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
296   /* Allocate for the queue node*/
297   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
298   int len;
299 #ifdef INLINEPREFETCH
300   int attempted=0;
301   char *node;
302   do {
303   node=getmemory(qnodesize);
304   if (node==NULL&&attempted)
305     break;
306   if (node!=NULL) {
307 #else
308   char *node=getmemory(qnodesize);
309 #endif
310   int top=endoffsets[ntuples-1];
311
312   if (node==NULL) {
313     LOGEVENT('D');
314     return;
315   }
316   /* Set queue node values */
317
318   /* TODO: Remove this after testing */
319   evalPrefetch[siteid].callcount++;
320
321   *((int *)(node))=siteid;
322   *((int *)(node + sizeof(int))) = ntuples;
323   len = 2*sizeof(int);
324   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
325   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
326   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
327
328 #ifdef INLINEPREFETCH
329   movehead(qnodesize);
330   }
331   int numpref=numavailable();
332   attempted=1;
333
334   if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
335     node=gettail();
336     prefetchpile_t *pilehead = foundLocal(node,numpref);
337     if (pilehead!=NULL) {
338       // Get sock from shared pool
339       
340       /* Send  Prefetch Request */
341       prefetchpile_t *ptr = pilehead;
342       while(ptr != NULL) {
343         int sd = getSock2(transPrefetchSockPool, ptr->mid);
344         sendPrefetchReq(ptr, sd);
345         ptr = ptr->next;
346       }
347       
348       mcdealloc(pilehead);
349       resetqueue();
350     }
351   }//end do prefetch if condition
352   } while(node==NULL);
353 #else
354   /* Lock and insert into primary prefetch queue */
355   movehead(qnodesize);
356 #endif
357 }
358
359 /* This function starts up the transaction runtime. */
360 int dstmStartup(const char * option) {
361   pthread_t thread_Listen, udp_thread_Listen;
362   pthread_attr_t attr;
363   int master=option!=NULL && strcmp(option, "master")==0;
364   int fd;
365   int udpfd;
366
367   if (processConfigFile() != 0)
368     return 0; //TODO: return error value, cause main program to exit
369 #ifdef COMPILER
370   if (!master)
371     threadcount--;
372 #endif
373
374 #ifdef TRANSSTATS
375   printf("Trans stats is on\n");
376   fflush(stdout);
377 #endif
378 #ifdef ABORTREADERS
379   initreaderlist();
380 #endif
381
382   //Initialize socket pool
383   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
384   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
385   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
386
387   dstmInit();
388   transInit();
389
390   fd=startlistening();
391   pthread_attr_init(&attr);
392   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
393 #ifdef CACHE
394   udpfd = udpInit();
395   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
396 #endif
397   if (master) {
398     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
399     return 1;
400   } else {
401     dstmListen((void *)fd);
402     return 0;
403   }
404 }
405
406 //TODO Use this later
407 void *pCacheAlloc(objstr_t *store, unsigned int size) {
408   void *tmp;
409   objstr_t *ptr;
410   ptr = store;
411   int success = 0;
412
413   while(ptr->next != NULL) {
414     /* check if store is empty */
415     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
416       tmp = ptr->top;
417       ptr->top += size;
418       success = 1;
419       return tmp;
420     } else {
421       ptr = ptr->next;
422     }
423   }
424
425   if(success == 0) {
426     return NULL;
427   }
428 }
429
430 /* This function initiates the prefetch thread A queue is shared
431  * between the main thread of execution and the prefetch thread to
432  * process the prefetch call Call from compiler populates the shared
433  * queue with prefetch requests while prefetch thread processes the
434  * prefetch requests */
435
436 void transInit() {
437   //Create and initialize prefetch cache structure
438 #ifdef CACHE
439   initializePCache();
440   if((evalPrefetch = initPrefetchStats()) == NULL) {
441     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
442     exit(0);
443   }
444 #endif
445
446   /* Initialize attributes for mutex */
447   pthread_mutexattr_init(&prefetchcache_mutex_attr);
448   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
449
450   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
451   pthread_mutex_init(&notifymutex, NULL);
452   pthread_mutex_init(&atomicObjLock, NULL);
453 #ifdef CACHE
454   //Create prefetch cache lookup table
455   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
456     printf("ERROR\n");
457     return; //Failure
458   }
459
460   //Initialize primary shared queue
461   queueInit();
462   //Initialize machine pile w/prefetch oids and offsets shared queue
463   mcpileqInit();
464
465   //Create the primary prefetch thread
466   int retval;
467 #ifdef RANGEPREFETCH
468   do {
469     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
470   } while(retval!=0);
471 #else
472 #ifndef INLINEPREFETCH
473   do {
474     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
475   } while(retval!=0);
476 #endif
477 #endif
478 #ifndef INLINEPREFETCH
479   pthread_detach(tPrefetch);
480 #endif
481 #endif
482 }
483
484 /* This function stops the threads spawned */
485 void transExit() {
486 #ifdef CACHE
487   int t;
488   pthread_cancel(tPrefetch);
489   for(t = 0; t < NUM_THREADS; t++)
490     pthread_cancel(wthreads[t]);
491 #endif
492
493   return;
494 }
495
496 /* This functions inserts randowm wait delays in the order of msec
497  * Mostly used when transaction commits retry*/
498 void randomdelay() {
499   struct timespec req;
500   time_t t;
501
502   t = time(NULL);
503   req.tv_sec = 0;
504   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
505   nanosleep(&req, NULL);
506   return;
507 }
508
509 /* This function initializes things required in the transaction start*/
510 void transStart() {
511   t_cache = objstrCreate(1048576);
512   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
513   revertlist=NULL;
514 #ifdef ABORTREADERS
515   t_abort=0;
516 #endif
517 }
518
519 // Search for an address for a given oid                                                                               
520 /*#define INLINE    inline __attribute__((always_inline))
521
522 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
523   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
524   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
525
526   do {
527     if(node->key == key) {
528       return node->val;
529     }
530     node = node->next;
531   } while(node != NULL);
532
533   return NULL;
534   }*/
535
536
537
538
539 /* This function finds the location of the objects involved in a transaction
540  * and returns the pointer to the object if found in a remote location */
541 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
542   unsigned int machinenumber;
543   objheader_t *tmp, *objheader;
544   objheader_t *objcopy;
545   int size;
546   void *buf;
547   chashlistnode_t *node;
548
549   if(oid == 0) {
550     return NULL;
551   }
552   
553
554   node= &c_table[(oid & c_mask)>>1];
555   do {
556     if(node->key == oid) {
557 #ifdef TRANSSTATS
558     nchashSearch++;
559 #endif
560 #ifdef COMPILER
561     return &((objheader_t*)node->val)[1];
562 #else
563     return node->val;
564 #endif
565     }
566     node = node->next;
567   } while(node != NULL);
568   
569
570   /*  
571   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
572 #ifdef TRANSSTATS
573     nchashSearch++;
574 #endif
575 #ifdef COMPILER
576     return &objheader[1];
577 #else
578     return objheader;
579 #endif
580   } else 
581   */
582
583 #ifdef ABORTREADERS
584   if (t_abort) {
585     //abort this transaction
586     //printf("ABORTING\n");
587     removetransactionhash();
588     objstrDelete(t_cache);
589     t_chashDelete();
590     _longjmp(aborttrans,1);
591   } else
592     addtransaction(oid);
593 #endif
594
595   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
596 #ifdef TRANSSTATS
597     nmhashSearch++;
598 #endif
599     /* Look up in machine lookup table  and copy  into cache*/
600     GETSIZE(size, objheader);
601     size += sizeof(objheader_t);
602     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
603     memcpy(objcopy, objheader, size);
604     /* Insert into cache's lookup table */
605     STATUS(objcopy)=0;
606     t_chashInsert(OID(objheader), objcopy);
607 #ifdef COMPILER
608     return &objcopy[1];
609 #else
610     return objcopy;
611 #endif
612   } else {
613 #ifdef CACHE
614     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
615 #ifdef TRANSSTATS
616       nprehashSearch++;
617 #endif
618       /* Look up in prefetch cache */
619       GETSIZE(size, tmp);
620       size+=sizeof(objheader_t);
621       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
622       memcpy(objcopy, tmp, size);
623       /* Insert into cache's lookup table */
624       t_chashInsert(OID(tmp), objcopy);
625 #ifdef COMPILER
626       return &objcopy[1];
627 #else
628       return objcopy;
629 #endif
630     }
631 #endif
632     /* Get the object from the remote location */
633     if((machinenumber = lhashSearch(oid)) == 0) {
634       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
635       return NULL;
636     }
637     objcopy = getRemoteObj(machinenumber, oid);
638
639     if(objcopy == NULL) {
640       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
641       return NULL;
642     } else {
643 #ifdef TRANSSTATS
644       nRemoteSend++;
645 #endif
646 #ifdef COMPILER
647 #ifdef CACHE
648       //Copy object to prefetch cache
649       pthread_mutex_lock(&prefetchcache_mutex);
650       objheader_t *headerObj;
651       int size;
652       GETSIZE(size, objcopy);
653       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
654         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
655             __FILE__, __LINE__);
656         pthread_mutex_unlock(&prefetchcache_mutex);
657         return NULL;
658       }
659       pthread_mutex_unlock(&prefetchcache_mutex);
660       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
661       //make an entry in prefetch lookup hashtable
662       void *oldptr;
663       if((oldptr = prehashSearch(oid)) != NULL) {
664         prehashRemove(oid);
665         prehashInsert(oid, headerObj);
666       } else {
667         prehashInsert(oid, headerObj);
668       }
669 #endif
670       return &objcopy[1];
671 #else
672       return objcopy;
673 #endif
674     }
675   }
676 }
677
678
679 /* This function finds the location of the objects involved in a transaction
680  * and returns the pointer to the object if found in a remote location */
681 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
682   unsigned int machinenumber;
683   objheader_t *tmp, *objheader;
684   objheader_t *objcopy;
685   int size;
686
687 #ifdef ABORTREADERS
688   if (t_abort) {
689     //abort this transaction
690     //printf("ABORTING\n");
691     removetransactionhash();
692     objstrDelete(t_cache);
693     t_chashDelete();
694     _longjmp(aborttrans,1);
695   } else
696     addtransaction(oid);
697 #endif
698
699   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
700 #ifdef TRANSSTATS
701     nmhashSearch++;
702 #endif
703     /* Look up in machine lookup table  and copy  into cache*/
704     GETSIZE(size, objheader);
705     size += sizeof(objheader_t);
706     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
707     memcpy(objcopy, objheader, size);
708     /* Insert into cache's lookup table */
709     STATUS(objcopy)=0;
710     t_chashInsert(OID(objheader), objcopy);
711 #ifdef COMPILER
712     return &objcopy[1];
713 #else
714     return objcopy;
715 #endif
716   } else {
717 #ifdef CACHE
718     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
719 #ifdef TRANSSTATS
720       LOGEVENT('P')
721       nprehashSearch++;
722 #endif
723       /* Look up in prefetch cache */
724       GETSIZE(size, tmp);
725       size+=sizeof(objheader_t);
726       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
727       memcpy(objcopy, tmp, size);
728       /* Insert into cache's lookup table */
729       t_chashInsert(OID(tmp), objcopy);
730 #ifdef COMPILER
731       return &objcopy[1];
732 #else
733       return objcopy;
734 #endif
735     }
736 #endif
737     /* Get the object from the remote location */
738     if((machinenumber = lhashSearch(oid)) == 0) {
739       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
740       return NULL;
741     }
742     objcopy = getRemoteObj(machinenumber, oid);
743
744     if(objcopy == NULL) {
745       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
746       return NULL;
747     } else {
748 #ifdef TRANSSTATS
749
750       LOGEVENT('R');
751       nRemoteSend++;
752 #endif
753 #ifdef COMPILER
754 #ifdef CACHE
755       //Copy object to prefetch cache
756       pthread_mutex_lock(&prefetchcache_mutex);
757       objheader_t *headerObj;
758       int size;
759       GETSIZE(size, objcopy);
760       if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
761         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
762             __FILE__, __LINE__);
763         pthread_mutex_unlock(&prefetchcache_mutex);
764         return NULL;
765       }
766       pthread_mutex_unlock(&prefetchcache_mutex);
767       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
768       //make an entry in prefetch lookup hashtable
769       void *oldptr;
770       if((oldptr = prehashSearch(oid)) != NULL) {
771         prehashRemove(oid);
772         prehashInsert(oid, headerObj);
773       } else {
774         prehashInsert(oid, headerObj);
775       }
776 #endif
777       return &objcopy[1];
778 #else
779       return objcopy;
780 #endif
781     }
782   }
783 }
784
785 /* This function creates objects in the transaction record */
786 objheader_t *transCreateObj(unsigned int size) {
787   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
788   OID(tmp) = getNewOID();
789   tmp->version = 1;
790   tmp->rcount = 1;
791   STATUS(tmp) = NEW;
792   t_chashInsert(OID(tmp), tmp);
793
794 #ifdef COMPILER
795   return &tmp[1]; //want space after object header
796 #else
797   return tmp;
798 #endif
799 }
800
801 #if 1
802 /* This function creates machine piles based on all machines involved in a
803  * transaction commit request */
804 plistnode_t *createPiles() {
805   int i;
806   plistnode_t *pile = NULL;
807   unsigned int machinenum;
808   objheader_t *headeraddr;
809   chashlistnode_t * ptr = c_table;
810   /* Represents number of bins in the chash table */
811   unsigned int size = c_size;
812
813   for(i = 0; i < size ; i++) {
814     chashlistnode_t * curr = &ptr[i];
815     /* Inner loop to traverse the linked list of the cache lookupTable */
816     while(curr != NULL) {
817       //if the first bin in hash table is empty
818       if(curr->key == 0)
819         break;
820       headeraddr=(objheader_t *) curr->val;
821
822       //Get machine location for object id (and whether local or not)
823       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
824         machinenum = myIpAddr;
825       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
826         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
827         return NULL;
828       }
829
830       //Make machine groups
831       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
832       curr = curr->next;
833     }
834   }
835   return pile;
836 }
837 #else
838 /* This function creates machine piles based on all machines involved in a
839  * transaction commit request */
840 plistnode_t *createPiles() {
841   int i;
842   plistnode_t *pile = NULL;
843   unsigned int machinenum;
844   objheader_t *headeraddr;
845   struct chashentry * ptr = c_table;
846   /* Represents number of bins in the chash table */
847   unsigned int size = c_size;
848
849   for(i = 0; i < size ; i++) {
850     struct chashentry * curr = & ptr[i];
851     /* Inner loop to traverse the linked list of the cache lookupTable */
852     //if the first bin in hash table is empty
853     if(curr->key == 0)
854       continue;
855     headeraddr=(objheader_t *) curr->ptr;
856
857     //Get machine location for object id (and whether local or not)
858     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
859       machinenum = myIpAddr;
860     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
861       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
862       return NULL;
863     }
864
865     //Make machine groups
866     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
867   }
868   return pile;
869 }
870 #endif
871
872 /* This function initiates the transaction commit process
873  * Spawns threads for each of the new connections with Participants
874  * and creates new piles by calling the createPiles(),
875  * Sends a transrequest() to each remote machines for objects found remotely
876  * and calls handleLocalReq() to process objects found locally */
877 int transCommit() {
878   unsigned int tot_bytes_mod, *listmid;
879   plistnode_t *pile, *pile_ptr;
880   char treplyretry; /* keeps track of the common response that needs to be sent */
881   int firsttime=1;
882   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
883   char finalResponse;
884
885 #ifdef LOGEVENTS
886   int iii;
887   for(iii=0;iii<bigindex;iii++) {
888     printf("%c", bigarray[iii]);
889   }
890 #endif
891
892 #ifdef ABORTREADERS
893   if (t_abort) {
894     //abort this transaction
895     /* Debug
896      * printf("ABORTING TRANSACTION AT COMMIT\n");
897      */
898     removetransactionhash();
899     objstrDelete(t_cache);
900     t_chashDelete();
901     return 1;
902   }
903 #endif
904
905
906   do {
907     treplyretry = 0;
908
909     /* Look through all the objects in the transaction record and make piles
910      * for each machine involved in the transaction*/
911     if (firsttime) {
912       pile_ptr = pile = createPiles();
913       pile_ptr = pile = sortPiles(pile);
914     } else {
915       pile = pile_ptr;
916     }
917     firsttime = 0;
918     /* Create the packet to be sent in TRANS_REQUEST */
919
920     /* Count the number of participants */
921     int pilecount;
922     pilecount = pCount(pile);
923
924     /* Create a list of machine ids(Participants) involved in transaction   */
925     listmid = calloc(pilecount, sizeof(unsigned int));
926     pListMid(pile, listmid);
927
928     /* Create a socket and getReplyCtrl array, initialize */
929     int socklist[pilecount];
930     int loopcount;
931     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
932       socklist[loopcount] = 0;
933     char getReplyCtrl[pilecount];
934     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
935       getReplyCtrl[loopcount] = 0;
936
937     /* Process each machine pile */
938     int sockindex = 0;
939     trans_req_data_t *tosend;
940     tosend = calloc(pilecount, sizeof(trans_req_data_t));
941     while(pile != NULL) {
942       tosend[sockindex].f.control = TRANS_REQUEST;
943       tosend[sockindex].f.mcount = pilecount;
944       tosend[sockindex].f.numread = pile->numread;
945       tosend[sockindex].f.nummod = pile->nummod;
946       tosend[sockindex].f.numcreated = pile->numcreated;
947       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
948       tosend[sockindex].listmid = listmid;
949       tosend[sockindex].objread = pile->objread;
950       tosend[sockindex].oidmod = pile->oidmod;
951       tosend[sockindex].oidcreated = pile->oidcreated;
952       int sd = 0;
953       if(pile->mid != myIpAddr) {
954         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
955           printf("transRequest(): socket create error\n");
956           free(listmid);
957           free(tosend);
958           return 1;
959         }
960         socklist[sockindex] = sd;
961         /* Send bytes of data with TRANS_REQUEST control message */
962         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
963
964         /* Send list of machines involved in the transaction */
965         {
966           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
967           send_data(sd, tosend[sockindex].listmid, size);
968         }
969
970         /* Send oids and version number tuples for objects that are read */
971         {
972           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
973           send_data(sd, tosend[sockindex].objread, size);
974         }
975
976         /* Send objects that are modified */
977         void *modptr;
978         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
979           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
980           free(listmid);
981           free(tosend);
982           return 1;
983         }
984         int offset = 0;
985         int i;
986         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
987           int size;
988           objheader_t *headeraddr;
989           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
990             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
991             free(modptr);
992             free(listmid);
993             free(tosend);
994             return 1;
995           }
996           GETSIZE(size,headeraddr);
997           size+=sizeof(objheader_t);
998           memcpy(modptr+offset, headeraddr, size);
999           offset+=size;
1000         }
1001         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1002         free(modptr);
1003       } else { //handle request locally
1004         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1005       }
1006       sockindex++;
1007       pile = pile->next;
1008     } //end of pile processing
1009       /* Recv Ctrl msgs from all machines */
1010     int i;
1011     for(i = 0; i < pilecount; i++) {
1012       int sd = socklist[i];
1013       if(sd != 0) {
1014         char control;
1015         recv_data(sd, &control, sizeof(char));
1016         //Update common data structure with new ctrl msg
1017         getReplyCtrl[i] = control;
1018         /* Recv Objects if participant sends TRANS_DISAGREE */
1019 #ifdef CACHE
1020         if(control == TRANS_DISAGREE) {
1021           int length;
1022           recv_data(sd, &length, sizeof(int));
1023           void *newAddr;
1024           pthread_mutex_lock(&prefetchcache_mutex);
1025           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1026             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1027             free(tosend);
1028             free(listmid);
1029             pthread_mutex_unlock(&prefetchcache_mutex);
1030             return 1;
1031           }
1032           pthread_mutex_unlock(&prefetchcache_mutex);
1033           recv_data(sd, newAddr, length);
1034           int offset = 0;
1035           while(length != 0) {
1036             unsigned int oidToPrefetch;
1037             objheader_t * header;
1038             header = (objheader_t *)(((char *)newAddr) + offset);
1039             oidToPrefetch = OID(header);
1040             STATUS(header)=0;
1041             int size = 0;
1042             GETSIZE(size, header);
1043             size += sizeof(objheader_t);
1044             //make an entry in prefetch hash table
1045             void *oldptr;
1046             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
1047               prehashRemove(oidToPrefetch);
1048               prehashInsert(oidToPrefetch, header);
1049             } else {
1050               prehashInsert(oidToPrefetch, header);
1051             }
1052             length = length - size;
1053             offset += size;
1054           }
1055         } //end of receiving objs
1056 #endif
1057       }
1058     }
1059     /* Decide the final response */
1060     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1061       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1062       free(tosend);
1063       free(listmid);
1064       return 1;
1065     }
1066
1067     /* Send responses to all machines */
1068     for(i = 0; i < pilecount; i++) {
1069       int sd = socklist[i];
1070       if(sd != 0) {
1071 #ifdef CACHE
1072         if(finalResponse == TRANS_COMMIT) {
1073           int retval;
1074           /* Update prefetch cache */
1075           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1076             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1077             free(tosend);
1078             free(listmid);
1079             return 1;
1080           }
1081
1082
1083           /* Invalidate objects in other machine cache */
1084           if(tosend[i].f.nummod > 0) {
1085             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1086               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1087               free(tosend);
1088               free(listmid);
1089               return 1;
1090             }
1091           }
1092 #ifdef ABORTREADERS
1093           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1094           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1095 #endif
1096         }
1097 #ifdef ABORTREADERS
1098         else if (!treplyretry) {
1099           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1100           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1101         }
1102 #endif
1103 #endif
1104         send_data(sd, &finalResponse, sizeof(char));
1105       } else {
1106         /* Complete local processing */
1107         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1108 #ifdef ABORTREADERS
1109         if(finalResponse == TRANS_COMMIT) {
1110           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1111           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1112         } else if (!treplyretry) {
1113           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1114           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1115         }
1116 #endif
1117       }
1118     }
1119
1120     /* Free resources */
1121     free(tosend);
1122     free(listmid);
1123     if (!treplyretry)
1124       pDelete(pile_ptr);
1125     /* wait a random amount of time before retrying to commit transaction*/
1126     if(treplyretry) {
1127       randomdelay();
1128 #ifdef TRANSSTATS
1129       nSoftAbort++;
1130 #endif
1131     }
1132     /* Retry trans commit procedure during soft_abort case */
1133   } while (treplyretry);
1134
1135   if(finalResponse == TRANS_ABORT) {
1136     //printf("Aborting trans\n");
1137 #ifdef TRANSSTATS
1138     numTransAbort++;
1139 #endif
1140     /* Free Resources */
1141     objstrDelete(t_cache);
1142     t_chashDelete();
1143     return TRANS_ABORT;
1144   } else if(finalResponse == TRANS_COMMIT) {
1145 #ifdef TRANSSTATS
1146     numTransCommit++;
1147 #endif
1148     /* Free Resources */
1149     objstrDelete(t_cache);
1150     t_chashDelete();
1151     return 0;
1152   } else {
1153     //TODO Add other cases
1154     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1155     exit(-1);
1156   }
1157   return 0;
1158 }
1159
1160 /* This function handles the local objects involved in a transaction
1161  * commiting process.  It also makes a decision if this local machine
1162  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1163 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1164   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1165   int numoidnotfound = 0, numoidlocked = 0;
1166   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1167   int numread, i;
1168   unsigned int oid;
1169   unsigned short version;
1170
1171   /* Counters and arrays to formulate decision on control message to be sent */
1172   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1173   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1174   //setting a divider between read and write locks
1175   numread = tdata->f.numread;
1176   /* Process each oid in the machine pile/ group per thread */
1177   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1178     if (i < tdata->f.numread) {
1179       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1180       incr *= i;
1181       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1182       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1183       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1184     } else { // Objects Modified
1185       if(i == tdata->f.numread) {
1186         oidlocked[numoidlocked++] = -1;
1187       }
1188       int tmpsize;
1189       objheader_t *headptr;
1190       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1191       if (headptr == NULL) {
1192         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1193         return;
1194       }
1195       oid = OID(headptr);
1196       version = headptr->version;
1197       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1198     }
1199   }
1200
1201 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1202  * if Participant receives a TRANS_COMMIT */
1203   transinfo->objlocked = oidlocked;
1204   transinfo->objnotfound = oidnotfound;
1205   transinfo->modptr = NULL;
1206   transinfo->numlocked = numoidlocked;
1207   transinfo->numnotfound = numoidnotfound;
1208
1209   /* Condition to send TRANS_AGREE */
1210   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1211     *getReplyCtrl = TRANS_AGREE;
1212   }
1213   /* Condition to send TRANS_SOFT_ABORT */
1214   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1215     *getReplyCtrl = TRANS_SOFT_ABORT;
1216   }
1217 }
1218
1219 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1220   if(finalResponse == TRANS_ABORT) {
1221     if(transAbortProcess(transinfo) != 0) {
1222       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1223       fflush(stdout);
1224       return;
1225     }
1226   } else if(finalResponse == TRANS_COMMIT) {
1227 #ifdef CACHE
1228     /* Invalidate objects in other machine cache */
1229     if(tdata->f.nummod > 0) {
1230       int retval;
1231       if((retval = invalidateObj(tdata)) != 0) {
1232         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1233         return;
1234       }
1235     }
1236 #endif
1237     if(transComProcess(tdata, transinfo) != 0) {
1238       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1239       fflush(stdout);
1240       return;
1241     }
1242   } else {
1243     printf("ERROR...No Decision\n");
1244   }
1245
1246   /* Free memory */
1247   if (transinfo->objlocked != NULL) {
1248     free(transinfo->objlocked);
1249   }
1250   if (transinfo->objnotfound != NULL) {
1251     free(transinfo->objnotfound);
1252   }
1253 }
1254
1255 /* This function decides the reponse that needs to be sent to
1256  * all Participant machines after the TRANS_REQUEST protocol */
1257 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1258   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1259                                                                    message to send */
1260   for (i = 0 ; i < pilecount; i++) {
1261     char control;
1262     control = getReplyCtrl[i];
1263     switch(control) {
1264     default:
1265       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
1266
1267       /* treat as disagree, pass thru */
1268     case TRANS_DISAGREE:
1269       transdisagree++;
1270       break;
1271
1272     case TRANS_AGREE:
1273       transagree++;
1274       break;
1275
1276     case TRANS_SOFT_ABORT:
1277       transsoftabort++;
1278       break;
1279     }
1280   }
1281
1282   if(transdisagree > 0) {
1283     /* Send Abort */
1284     *treplyretry = 0;
1285     return TRANS_ABORT;
1286 #ifdef CACHE
1287     /* clear objects from prefetch cache */
1288     cleanPCache();
1289 #endif
1290   } else if(transagree == pilecount) {
1291     /* Send Commit */
1292     *treplyretry = 0;
1293     return TRANS_COMMIT;
1294   } else {
1295     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1296     *treplyretry = 1;
1297     return TRANS_ABORT;
1298   }
1299   return 0;
1300 }
1301
1302 /* This function opens a connection, places an object read request to
1303  * the remote machine, reads the control message and object if
1304  * available and copies the object and its header to the local
1305  * cache. */
1306
1307 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1308   int size, val;
1309   struct sockaddr_in serv_addr;
1310   char machineip[16];
1311   char control;
1312   objheader_t *h;
1313   void *objcopy = NULL;
1314
1315   int sd = getSock2(transReadSockPool, mnum);
1316   char readrequest[sizeof(char)+sizeof(unsigned int)];
1317   readrequest[0] = READ_REQUEST;
1318   *((unsigned int *)(&readrequest[1])) = oid;
1319   send_data(sd, readrequest, sizeof(readrequest));
1320
1321   /* Read response from the Participant */
1322   recv_data(sd, &control, sizeof(char));
1323
1324   if (control==OBJECT_NOT_FOUND) {
1325     objcopy = NULL;
1326   } else {
1327     /* Read object if found into local cache */
1328     recv_data(sd, &size, sizeof(int));
1329     objcopy = objstrAlloc(&t_cache, size);
1330     recv_data(sd, objcopy, size);
1331     STATUS(objcopy)=0;
1332     /* Insert into cache's lookup table */
1333     t_chashInsert(oid, objcopy);
1334 #ifdef TRANSSTATS
1335     totalObjSize += size;
1336 #endif
1337   }
1338
1339   return objcopy;
1340 }
1341
1342 /*  Commit info for objects modified */
1343 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1344                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1345   void *mobj;
1346   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1347   /* Save the oids not found and number of oids not found for later use */
1348   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1349     /* Save the oids not found and number of oids not found for later use */
1350     oidnotfound[*numoidnotfound] = oid;
1351     (*numoidnotfound)++;
1352   } else { /* If Obj found in machine (i.e. has not moved) */
1353     /* Check if Obj is locked by any previous transaction */
1354     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1355       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1356         (*v_matchnolock)++;
1357         //Keep track of what is locked
1358         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1359       } else { /* If versions don't match ...HARD ABORT */
1360         (*v_nomatch)++;
1361         /* Send TRANS_DISAGREE to Coordinator */
1362         *getReplyCtrl = TRANS_DISAGREE;
1363
1364         //Keep track of what is locked
1365         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1366         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1367         return;
1368       }
1369     } else { //A lock is acquired some place else
1370       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1371         (*v_matchlock)++;
1372       } else { /* If versions don't match ...HARD ABORT */
1373         (*v_nomatch)++;
1374         /* Send TRANS_DISAGREE to Coordinator */
1375         *getReplyCtrl = TRANS_DISAGREE;
1376         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1377         return;
1378       }
1379     }
1380   }
1381 }
1382
1383 /*  Commit info for objects modified */
1384 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1385                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1386   void *mobj;
1387   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1388   /* Save the oids not found and number of oids not found for later use */
1389   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1390     /* Save the oids not found and number of oids not found for later use */
1391     oidnotfound[*numoidnotfound] = oid;
1392     (*numoidnotfound)++;
1393   } else { /* If Obj found in machine (i.e. has not moved) */
1394     /* Check if Obj is locked by any previous transaction */
1395     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1396       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1397         (*v_matchnolock)++;
1398         //Keep track of what is locked
1399         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1400       } else { /* If versions don't match ...HARD ABORT */
1401         (*v_nomatch)++;
1402         /* Send TRANS_DISAGREE to Coordinator */
1403         *getReplyCtrl = TRANS_DISAGREE;
1404         //Keep track of what is locked
1405         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1406         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1407         return;
1408       }
1409     } else { //Has reached max number of readers or some other transaction
1410       //has acquired a lock on this object
1411       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1412         (*v_matchlock)++;
1413       } else { /* If versions don't match ...HARD ABORT */
1414         (*v_nomatch)++;
1415         /* Send TRANS_DISAGREE to Coordinator */
1416         *getReplyCtrl = TRANS_DISAGREE;
1417         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1418         return;
1419       }
1420     }
1421   }
1422 }
1423
1424 /* This function completes the ABORT process if the transaction is aborting */
1425 int transAbortProcess(trans_commit_data_t *transinfo) {
1426   int i, numlocked;
1427   unsigned int *objlocked;
1428   void *header;
1429
1430   numlocked = transinfo->numlocked;
1431   objlocked = transinfo->objlocked;
1432
1433   int useWriteUnlock = 0;
1434   for (i = 0; i < numlocked; i++) {
1435     if(objlocked[i] == -1) {
1436       useWriteUnlock = 1;
1437       continue;
1438     }
1439     if((header = mhashSearch(objlocked[i])) == NULL) {
1440       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1441       return 1;
1442     }
1443     if(!useWriteUnlock) {
1444       read_unlock(STATUSPTR(header));
1445     } else {
1446       write_unlock(STATUSPTR(header));
1447     }
1448   }
1449
1450   return 0;
1451 }
1452
1453 /*This function completes the COMMIT process if the transaction is commiting*/
1454 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1455   objheader_t *header, *tcptr;
1456   int i, nummod, tmpsize, numcreated, numlocked;
1457   unsigned int *oidmod, *oidcreated, *oidlocked;
1458   void *ptrcreate;
1459
1460   nummod = tdata->f.nummod;
1461   oidmod = tdata->oidmod;
1462   numcreated = tdata->f.numcreated;
1463   oidcreated = tdata->oidcreated;
1464   numlocked = transinfo->numlocked;
1465   oidlocked = transinfo->objlocked;
1466
1467   for (i = 0; i < nummod; i++) {
1468     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1469       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1470       return 1;
1471     }
1472     /* Copy from transaction cache -> main object store */
1473     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1474       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1475       return 1;
1476     }
1477     GETSIZE(tmpsize, header);
1478     char *tmptcptr = (char *) tcptr;
1479     {
1480       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1481       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1482       dst->___cachedCode___=src->___cachedCode___;
1483       dst->___cachedHash___=src->___cachedHash___;
1484
1485       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1486     }
1487
1488     header->version += 1;
1489     if(header->notifylist != NULL) {
1490       notifyAll(&header->notifylist, OID(header), header->version);
1491     }
1492   }
1493   /* If object is newly created inside transaction then commit it */
1494   for (i = 0; i < numcreated; i++) {
1495     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1496       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1497       return 1;
1498     }
1499     GETSIZE(tmpsize, header);
1500     tmpsize += sizeof(objheader_t);
1501     pthread_mutex_lock(&mainobjstore_mutex);
1502     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1503       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1504       pthread_mutex_unlock(&mainobjstore_mutex);
1505       return 1;
1506     }
1507     pthread_mutex_unlock(&mainobjstore_mutex);
1508     /* Initialize read and write locks */
1509     initdsmlocks(STATUSPTR(header));
1510     memcpy(ptrcreate, header, tmpsize);
1511     mhashInsert(oidcreated[i], ptrcreate);
1512     lhashInsert(oidcreated[i], myIpAddr);
1513   }
1514   /* Unlock locked objects */
1515   int useWriteUnlock = 0;
1516   for(i = 0; i < numlocked; i++) {
1517     if(oidlocked[i] == -1) {
1518       useWriteUnlock = 1;
1519       continue;
1520     }
1521     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1522       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1523       return 1;
1524     }
1525     if(!useWriteUnlock) {
1526       read_unlock(STATUSPTR(header));
1527     } else {
1528       write_unlock(STATUSPTR(header));
1529     }
1530   }
1531   return 0;
1532 }
1533
1534 prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
1535   int i;
1536   int j;
1537   prefetchpile_t * head=NULL;
1538
1539   for(j=0;j<numprefetches;j++) {
1540     int siteid = *(GET_SITEID(ptr));
1541     int ntuples = *(GET_NTUPLES(ptr));
1542     unsigned int * oidarray = GET_PTR_OID(ptr);
1543     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1544     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1545     int numLocal = 0;
1546     
1547     for(i=0; i<ntuples; i++) {
1548       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1549       unsigned short endindex=endoffsets[i];
1550       unsigned int oid=oidarray[i];
1551       int newbase;
1552       int machinenum;
1553       
1554       if (oid==0)
1555         continue;
1556       //Look up fields locally
1557       for(newbase=baseindex; newbase<endindex; newbase++) {
1558         if (!lookupObject(&oid, arryfields[newbase]))
1559           break;
1560         //Ended in a null pointer...
1561         if (oid==0)
1562           goto tuple;
1563       }
1564       //Entire prefetch is local
1565       if (newbase==endindex&&checkoid(oid)) {
1566         numLocal++;
1567         goto tuple;
1568       }
1569       //Add to remote requests
1570       machinenum=lhashSearch(oid);
1571       insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1572     tuple:
1573       ;
1574     }
1575     
1576     /* handle dynamic prefetching */
1577     handleDynPrefetching(numLocal, ntuples, siteid);
1578     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1579   }
1580
1581   return head;
1582 }
1583
1584 int checkoid(unsigned int oid) {
1585   objheader_t *header;
1586   if ((header=mhashSearch(oid))!=NULL) {
1587     //Found on machine
1588     return 1;
1589   } else if ((header=prehashSearch(oid))!=NULL) {
1590     //Found in cache
1591     return 1;
1592   } else {
1593     return 0;
1594   }
1595 }
1596
1597 int lookupObject(unsigned int * oid, short offset) {
1598   objheader_t *header;
1599   if ((header=mhashSearch(*oid))!=NULL) {
1600     //Found on machine
1601     ;
1602   } else if ((header=prehashSearch(*oid))!=NULL) {
1603     //Found in cache
1604     ;
1605   } else {
1606     return 0;
1607   }
1608
1609   if(TYPE(header) >= NUMCLASSES) {
1610     int elementsize = classsize[TYPE(header)];
1611     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1612     int length = ao->___length___;
1613     /* Check if array out of bounds */
1614     if(offset < 0 || offset >= length) {
1615       //if yes treat the object as found
1616       (*oid)=0;
1617       return 1;
1618     }
1619     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1620     return 1;
1621   } else {
1622     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1623     return 1;
1624   }
1625 }
1626
1627
1628 /* This function is called by the thread calling transPrefetch */
1629 void *transPrefetch(void *t) {
1630   while(1) {
1631     /* read from prefetch queue */
1632     void *node=gettail();
1633     /* Check if the tuples are found locally, if yes then reduce them further*/
1634     /* and group requests by remote machine ids by calling the makePreGroups() */
1635     int count=numavailable();
1636     prefetchpile_t *pilehead = foundLocal(node, count);
1637
1638     if (pilehead!=NULL) {
1639       // Get sock from shared pool
1640
1641       /* Send  Prefetch Request */
1642       prefetchpile_t *ptr = pilehead;
1643       while(ptr != NULL) {
1644         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1645         sendPrefetchReq(ptr, sd);
1646         ptr = ptr->next;
1647       }
1648
1649       /* Release socket */
1650       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1651
1652       /* Deallocated pilehead */
1653       mcdealloc(pilehead);
1654     }
1655     // Deallocate the prefetch queue pile node
1656     incmulttail(count);
1657   }
1658 }
1659
1660 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1661   objpile_t *tmp;
1662
1663   int size=sizeof(char)+sizeof(int);
1664   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1665     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1666   }
1667
1668   char buft[size];
1669   char *buf=buft;
1670   *buf=TRANS_PREFETCH;
1671   buf+=sizeof(char);
1672
1673   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1674     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1675     *((int*)buf)=len;
1676     buf+=sizeof(int);
1677     *((unsigned int *)buf)=tmp->oid;
1678     buf+=sizeof(unsigned int);
1679     *((unsigned int *)(buf)) = myIpAddr;
1680     buf+=sizeof(unsigned int);
1681     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1682     buf+=tmp->numoffset*sizeof(short);
1683   }
1684   *((int *)buf)=-1;
1685   send_data(sd, buft, size);
1686   return;
1687 }
1688
1689 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1690   int len, endpair;
1691   char control;
1692   objpile_t *tmp;
1693   struct writestruct writebuffer;
1694   writebuffer.offset=0;
1695
1696   /* Send TRANS_PREFETCH control message */
1697   int first=1;
1698
1699   /* Send Oids and offsets in pairs */
1700   tmp = mcpilenode->objpiles;
1701   while(tmp != NULL) {
1702     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1703     char oidnoffset[len+5];
1704     char *buf=oidnoffset;
1705     if (first) {
1706       *buf=TRANS_PREFETCH;
1707       buf++;len++;
1708       first=0;
1709     }
1710     *((int*)buf) = tmp->numoffset;
1711     buf+=sizeof(int);
1712     *((unsigned int *)buf) = tmp->oid;
1713 #ifdef TRANSSTATS
1714     sendRemoteReq++;
1715 #endif
1716     buf+=sizeof(unsigned int);
1717     *((unsigned int *)buf) = myIpAddr;
1718     buf += sizeof(unsigned int);
1719     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1720     tmp = tmp->next;
1721     if (tmp==NULL) {
1722       *((int *)(&oidnoffset[len]))=-1;
1723       len+=sizeof(int);
1724     }
1725     if (tmp!=NULL)
1726       send_buf(sd, & writebuffer, oidnoffset, len);
1727     else
1728       forcesend_buf(sd, & writebuffer, oidnoffset, len);
1729   }
1730
1731   LOGEVENT('S');
1732   return;
1733 }
1734
1735 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1736   int length = 0, size = 0;
1737   char control;
1738   unsigned int oid;
1739   void *modptr, *oldptr;
1740
1741   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1742   size = length - sizeof(int);
1743   char recvbuffer[size];
1744 #ifdef TRANSSTATS
1745     getResponse++;
1746     LOGEVENT('Z');
1747 #endif
1748     recv_data_buf(sd, readbuffer, recvbuffer, size);
1749   control = *((char *) recvbuffer);
1750   if(control == OBJECT_FOUND) {
1751     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1752     size = size - (sizeof(char) + sizeof(unsigned int));
1753     pthread_mutex_lock(&prefetchcache_mutex);
1754     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1755       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1756       pthread_mutex_unlock(&prefetchcache_mutex);
1757       return -1;
1758     }
1759     pthread_mutex_unlock(&prefetchcache_mutex);
1760     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1761     STATUS(modptr)=0;
1762
1763     /* Insert the oid and its address into the prefetch hash lookup table */
1764     /* Do a version comparison if the oid exists */
1765     if((oldptr = prehashSearch(oid)) != NULL) {
1766       /* If older version then update with new object ptr */
1767       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1768         prehashRemove(oid);
1769         prehashInsert(oid, modptr);
1770       }
1771     } else { /* Else add the object ptr to hash table*/
1772       prehashInsert(oid, modptr);
1773     }
1774     /* Lock the Prefetch Cache look up table*/
1775     pthread_mutex_lock(&pflookup.lock);
1776     /* Broadcast signal on prefetch cache condition variable */
1777     pthread_cond_broadcast(&pflookup.cond);
1778     /* Unlock the Prefetch Cache look up table*/
1779     pthread_mutex_unlock(&pflookup.lock);
1780   } else if(control == OBJECT_NOT_FOUND) {
1781     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1782     /* TODO: For each object not found query DHT for new location and retrieve the object */
1783     /* Throw an error */
1784     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1785     //    exit(-1);
1786   } else {
1787     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1788   }
1789
1790   return 0;
1791 }
1792
1793 unsigned short getObjType(unsigned int oid) {
1794   objheader_t *objheader;
1795   unsigned short numoffset[] ={0};
1796   short fieldoffset[] ={};
1797
1798   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1799 #ifdef CACHE
1800     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1801 #endif
1802     unsigned int mid = lhashSearch(oid);
1803     int sd = getSock2(transReadSockPool, mid);
1804     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1805     remotereadrequest[0] = READ_REQUEST;
1806     *((unsigned int *)(&remotereadrequest[1])) = oid;
1807     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1808
1809     /* Read response from the Participant */
1810     char control;
1811     recv_data(sd, &control, sizeof(char));
1812
1813     if (control==OBJECT_NOT_FOUND) {
1814       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1815       fflush(stdout);
1816       exit(-1);
1817     } else {
1818       /* Read object if found into local cache */
1819       int size;
1820       recv_data(sd, &size, sizeof(int));
1821 #ifdef CACHE
1822       pthread_mutex_lock(&prefetchcache_mutex);
1823       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1824         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1825         pthread_exit(NULL);
1826       }
1827       pthread_mutex_unlock(&prefetchcache_mutex);
1828       recv_data(sd, objheader, size);
1829       prehashInsert(oid, objheader);
1830       return TYPE(objheader);
1831 #else
1832       char *buffer;
1833       if((buffer = calloc(1, size)) == NULL) {
1834         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1835         fflush(stdout);
1836         return 0;
1837       }
1838       recv_data(sd, buffer, size);
1839       objheader = (objheader_t *)buffer;
1840       unsigned short type = TYPE(objheader);
1841       free(buffer);
1842       return type;
1843 #endif
1844     }
1845 #ifdef CACHE
1846   }
1847 #endif
1848   }
1849   return TYPE(objheader);
1850 }
1851
1852 int startRemoteThread(unsigned int oid, unsigned int mid) {
1853   int sock;
1854   struct sockaddr_in remoteAddr;
1855   char msg[1 + sizeof(unsigned int)];
1856   int bytesSent;
1857   int status;
1858
1859   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1860     perror("startRemoteThread():socket()");
1861     return -1;
1862   }
1863
1864   bzero(&remoteAddr, sizeof(remoteAddr));
1865   remoteAddr.sin_family = AF_INET;
1866   remoteAddr.sin_port = htons(LISTEN_PORT);
1867   remoteAddr.sin_addr.s_addr = htonl(mid);
1868
1869   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1870     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1871            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1872     status = -1;
1873   } else
1874   {
1875     msg[0] = START_REMOTE_THREAD;
1876     *((unsigned int *) &msg[1]) = oid;
1877     send_data(sock, msg, 1 + sizeof(unsigned int));
1878   }
1879
1880   close(sock);
1881   return status;
1882 }
1883
1884 //TODO: when reusing oids, make sure they are not already in use!
1885 static unsigned int id = 0xFFFFFFFF;
1886 unsigned int getNewOID(void) {
1887   id += 2;
1888   if (id > oidMax || id < oidMin) {
1889     id = (oidMin | 1);
1890   }
1891   return id;
1892 }
1893
1894 int processConfigFile() {
1895   FILE *configFile;
1896   const int maxLineLength = 200;
1897   char lineBuffer[maxLineLength];
1898   char *token;
1899   const char *delimiters = " \t\n";
1900   char *commentBegin;
1901   in_addr_t tmpAddr;
1902
1903   configFile = fopen(CONFIG_FILENAME, "r");
1904   if (configFile == NULL) {
1905     printf("error opening %s:\n", CONFIG_FILENAME);
1906     perror("");
1907     return -1;
1908   }
1909
1910   numHostsInSystem = 0;
1911   sizeOfHostArray = 8;
1912   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1913
1914   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1915     commentBegin = strchr(lineBuffer, '#');
1916     if (commentBegin != NULL)
1917       *commentBegin = '\0';
1918     token = strtok(lineBuffer, delimiters);
1919     while (token != NULL) {
1920       tmpAddr = inet_addr(token);
1921       if ((int)tmpAddr == -1) {
1922         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1923         fclose(configFile);
1924         return -1;
1925       } else
1926         addHost(htonl(tmpAddr));
1927       token = strtok(NULL, delimiters);
1928     }
1929   }
1930
1931   fclose(configFile);
1932
1933   if (numHostsInSystem < 1) {
1934     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1935     return -1;
1936   }
1937 #ifdef MAC
1938   myIpAddr = getMyIpAddr("en1");
1939 #else
1940   myIpAddr = getMyIpAddr("eth0");
1941 #endif
1942   myIndexInHostArray = findHost(myIpAddr);
1943   if (myIndexInHostArray == -1) {
1944     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1945     return -1;
1946   }
1947   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1948   oidMin = oidsPerBlock * myIndexInHostArray;
1949   if (myIndexInHostArray == numHostsInSystem - 1)
1950     oidMax = 0xFFFFFFFF;
1951   else
1952     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1953
1954   return 0;
1955 }
1956
1957 void addHost(unsigned int hostIp) {
1958   unsigned int *tmpArray;
1959
1960   if (findHost(hostIp) != -1)
1961     return;
1962
1963   if (numHostsInSystem == sizeOfHostArray) {
1964     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1965     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1966     free(hostIpAddrs);
1967     hostIpAddrs = tmpArray;
1968   }
1969
1970   hostIpAddrs[numHostsInSystem++] = hostIp;
1971
1972   return;
1973 }
1974
1975 int findHost(unsigned int hostIp) {
1976   int i;
1977   for (i = 0; i < numHostsInSystem; i++)
1978     if (hostIpAddrs[i] == hostIp)
1979       return i;
1980
1981   //not found
1982   return -1;
1983 }
1984
1985 /* This function sends notification request per thread waiting on object(s) whose version
1986  * changes */
1987 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1988   int sock,i;
1989   objheader_t *objheader;
1990   struct sockaddr_in remoteAddr;
1991   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1992   char *ptr;
1993   int bytesSent;
1994   int status, size;
1995   unsigned short version;
1996   unsigned int oid,mid;
1997   static unsigned int threadid = 0;
1998   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1999   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
2000   notifydata_t *ndata;
2001
2002   oid = oidarry[0];
2003   if((mid = lhashSearch(oid)) == 0) {
2004     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2005     return;
2006   }
2007
2008   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2009     perror("reqNotify():socket()");
2010     return -1;
2011   }
2012
2013   bzero(&remoteAddr, sizeof(remoteAddr));
2014   remoteAddr.sin_family = AF_INET;
2015   remoteAddr.sin_port = htons(LISTEN_PORT);
2016   remoteAddr.sin_addr.s_addr = htonl(mid);
2017
2018   /* Generate unique threadid */
2019   threadid++;
2020
2021   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2022   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2023     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2024     return -1;
2025   }
2026   ndata->numoid = numoid;
2027   ndata->threadid = threadid;
2028   ndata->oidarry = oidarry;
2029   ndata->versionarry = versionarry;
2030   ndata->threadcond = threadcond;
2031   ndata->threadnotify = threadnotify;
2032   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2033     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2034     free(ndata);
2035     return -1;
2036   }
2037
2038   /* Send  number of oids, oidarry, version array, machine id and threadid */
2039   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2040     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2041            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2042     free(ndata);
2043     return -1;
2044   } else {
2045     msg[0] = THREAD_NOTIFY_REQUEST;
2046     *((unsigned int *)(&msg[1])) = numoid;
2047     /* Send array of oids  */
2048     size = sizeof(unsigned int);
2049
2050     for(i = 0;i < numoid; i++) {
2051       oid = oidarry[i];
2052       *((unsigned int *)(&msg[1] + size)) = oid;
2053       size += sizeof(unsigned int);
2054     }
2055
2056     /* Send array of version  */
2057     for(i = 0;i < numoid; i++) {
2058       version = versionarry[i];
2059       *((unsigned short *)(&msg[1] + size)) = version;
2060       size += sizeof(unsigned short);
2061     }
2062
2063     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2064     *((unsigned int *)(&msg[1] + size)) = threadid;
2065     pthread_mutex_lock(&(ndata->threadnotify));
2066     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2067     send_data(sock, msg, size);
2068     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2069     pthread_mutex_unlock(&(ndata->threadnotify));
2070   }
2071
2072   pthread_cond_destroy(&threadcond);
2073   pthread_mutex_destroy(&threadnotify);
2074   free(ndata);
2075   close(sock);
2076   return status;
2077 }
2078
2079 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2080   notifydata_t *ndata;
2081   int i, objIsFound = 0, index;
2082   void *ptr;
2083
2084   //Look up the tid and call the corresponding pthread_cond_signal
2085   if((ndata = notifyhashSearch(tid)) == NULL) {
2086     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2087     return;
2088   } else  {
2089     for(i = 0; i < ndata->numoid; i++) {
2090       if(ndata->oidarry[i] == oid) {
2091         objIsFound = 1;
2092         index = i;
2093       }
2094     }
2095     if(objIsFound == 0) {
2096       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2097       return;
2098     } else {
2099       if(version <= ndata->versionarry[index]) {
2100         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2101         return;
2102       } else {
2103 #ifdef CACHE
2104         /* Clear from prefetch cache and free thread related data structure */
2105         if((ptr = prehashSearch(oid)) != NULL) {
2106           prehashRemove(oid);
2107         }
2108 #endif
2109         pthread_mutex_lock(&(ndata->threadnotify));
2110         pthread_cond_signal(&(ndata->threadcond));
2111         pthread_mutex_unlock(&(ndata->threadnotify));
2112       }
2113     }
2114   }
2115   return;
2116 }
2117
2118 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2119   threadlist_t *ptr;
2120   unsigned int mid;
2121   struct sockaddr_in remoteAddr;
2122   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2123   int sock, status, size, bytesSent;
2124
2125   while(*head != NULL) {
2126     ptr = *head;
2127     mid = ptr->mid;
2128     //create a socket connection to that machine
2129     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2130       perror("notifyAll():socket()");
2131       return -1;
2132     }
2133
2134     bzero(&remoteAddr, sizeof(remoteAddr));
2135     remoteAddr.sin_family = AF_INET;
2136     remoteAddr.sin_port = htons(LISTEN_PORT);
2137     remoteAddr.sin_addr.s_addr = htonl(mid);
2138     //send Thread Notify response and threadid to that machine
2139     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2140       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2141              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2142       fflush(stdout);
2143       status = -1;
2144     } else {
2145       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2146       msg[0] = THREAD_NOTIFY_RESPONSE;
2147       *((unsigned int *)&msg[1]) = oid;
2148       size = sizeof(unsigned int);
2149       *((unsigned short *)(&msg[1]+ size)) = version;
2150       size+= sizeof(unsigned short);
2151       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2152
2153       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2154       send_data(sock, msg, size);
2155     }
2156     //close socket
2157     close(sock);
2158     // Update head
2159     *head = ptr->next;
2160     free(ptr);
2161   }
2162   return status;
2163 }
2164
2165 void transAbort() {
2166 #ifdef ABORTREADERS
2167   removetransactionhash();
2168 #endif
2169   objstrDelete(t_cache);
2170   t_chashDelete();
2171 }
2172
2173 /* This function inserts necessary information into
2174  * a machine pile data structure */
2175 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2176   plistnode_t *ptr, *tmp;
2177   int found = 0, offset = 0;
2178
2179   tmp = pile;
2180   //Add oid into a machine that is already present in the pile linked list structure
2181   while(tmp != NULL) {
2182     if (tmp->mid == mid) {
2183       int tmpsize;
2184
2185       if (STATUS(headeraddr) & NEW) {
2186         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2187         tmp->numcreated++;
2188         GETSIZE(tmpsize, headeraddr);
2189         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2190       } else if (STATUS(headeraddr) & DIRTY) {
2191         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2192         tmp->nummod++;
2193         GETSIZE(tmpsize, headeraddr);
2194         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2195       } else {
2196         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2197         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2198         offset += sizeof(unsigned int);
2199         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2200         tmp->numread++;
2201       }
2202       found = 1;
2203       break;
2204     }
2205     tmp = tmp->next;
2206   }
2207   //Add oid for any new machine
2208   if (!found) {
2209     int tmpsize;
2210     if((ptr = pCreate(num_objs)) == NULL) {
2211       return NULL;
2212     }
2213     ptr->mid = mid;
2214     if (STATUS(headeraddr) & NEW) {
2215       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2216       ptr->numcreated++;
2217       GETSIZE(tmpsize, headeraddr);
2218       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2219     } else if (STATUS(headeraddr) & DIRTY) {
2220       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2221       ptr->nummod++;
2222       GETSIZE(tmpsize, headeraddr);
2223       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2224     } else {
2225       *((unsigned int *)ptr->objread)=OID(headeraddr);
2226       offset = sizeof(unsigned int);
2227       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2228       ptr->numread++;
2229     }
2230     ptr->next = pile;
2231     pile = ptr;
2232   }
2233
2234   /* Clear Flags */
2235   STATUS(headeraddr) =0;
2236
2237
2238   return pile;
2239 }
2240
2241 plistnode_t *sortPiles(plistnode_t *pileptr) {
2242   plistnode_t *head, *ptr, *tail;
2243   head = pileptr;
2244   ptr = pileptr;
2245   /* Get tail pointer */
2246   while(ptr!= NULL) {
2247     tail = ptr;
2248     ptr = ptr->next;
2249   }
2250   ptr = pileptr;
2251   plistnode_t *prev = pileptr;
2252   /* Arrange local machine processing at the end of the pile list */
2253   while(ptr != NULL) {
2254     if(ptr != tail) {
2255       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2256         prev->next = ptr->next;
2257         ptr->next = NULL;
2258         tail->next = ptr;
2259         return pileptr;
2260       }
2261       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2262         prev = ptr->next;
2263         ptr->next = NULL;
2264         tail->next = ptr;
2265         return prev;
2266       }
2267       prev = ptr;
2268     }
2269     ptr = ptr->next;
2270   }
2271   return pileptr;
2272 }