get rid of unnecessary recv_data , SUCCESSFUL/UNSUCESSFUL ctrl msg
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #ifdef COMPILER
15 #include "thread.h"
16 #endif
17
18 #define NUM_THREADS 1
19 #define PREFETCH_CACHE_SIZE 1048576 //1MB
20 #define CONFIG_FILENAME "dstm.conf"
21
22
23 /* Global Variables */
24 extern int classsize[];
25 pfcstats_t *evalPrefetch;
26 extern int numprefetchsites; //Global variable containing number of prefetch sites
27 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
28 objstr_t *prefetchcache; //Global Prefetch cache
29 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
30 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
31 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
32 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
33 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
34 extern objstr_t *mainobjstore;
35 unsigned int myIpAddr;
36 unsigned int *hostIpAddrs;
37 int sizeOfHostArray;
38 int numHostsInSystem;
39 int myIndexInHostArray;
40 unsigned int oidsPerBlock;
41 unsigned int oidMin;
42 unsigned int oidMax;
43
44 sockPoolHashTable_t *transReadSockPool;
45 sockPoolHashTable_t *transPrefetchSockPool;
46 sockPoolHashTable_t *transRequestSockPool;
47 pthread_mutex_t notifymutex;
48 pthread_mutex_t atomicObjLock;
49
50 /***********************************
51  * Global Variables for statistics
52  **********************************/
53 int numTransCommit = 0;
54 int numTransAbort = 0;
55 int nchashSearch = 0;
56 int nmhashSearch = 0;
57 int nprehashSearch = 0;
58 int nRemoteSend = 0;
59 int nSoftAbort = 0;
60
61 void printhex(unsigned char *, int);
62 plistnode_t *createPiles(transrecord_t *);
63
64 /*******************************
65 * Send and Recv function calls
66 *******************************/
67 void send_data(int fd, void *buf, int buflen) {
68   char *buffer = (char *)(buf);
69   int size = buflen;
70   int numbytes;
71   while (size > 0) {
72     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
73     if (numbytes == -1) {
74       perror("send");
75       exit(0);
76     }
77     buffer += numbytes;
78     size -= numbytes;
79   }
80 }
81
82 void recv_data(int fd, void *buf, int buflen) {
83   char *buffer = (char *)(buf);
84   int size = buflen;
85   int numbytes;
86   while (size > 0) {
87     numbytes = recv(fd, buffer, size, 0);
88     if (numbytes == -1) {
89       perror("recv");
90       exit(0);
91     }
92     buffer += numbytes;
93     size -= numbytes;
94   }
95 }
96
97 int recv_data_errorcode(int fd, void *buf, int buflen) {
98   char *buffer = (char *)(buf);
99   int size = buflen;
100   int numbytes;
101   while (size > 0) {
102     numbytes = recv(fd, buffer, size, 0);
103     if (numbytes==0)
104       return 0;
105     if (numbytes == -1) {
106       perror("recv");
107       return -1;
108     }
109     buffer += numbytes;
110     size -= numbytes;
111   }
112   return 1;
113 }
114
115 void printhex(unsigned char *ptr, int numBytes) {
116   int i;
117   for (i = 0; i < numBytes; i++) {
118     if (ptr[i] < 16)
119       printf("0%x ", ptr[i]);
120     else
121       printf("%x ", ptr[i]);
122   }
123   printf("\n");
124   return;
125 }
126
127 inline int arrayLength(int *array) {
128   int i;
129   for(i=0 ; array[i] != -1; i++)
130     ;
131   return i;
132 }
133
134 inline int findmax(int *array, int arraylength) {
135   int max, i;
136   max = array[0];
137   for(i = 0; i < arraylength; i++) {
138     if(array[i] > max) {
139       max = array[i];
140     }
141   }
142   return max;
143 }
144
145 /* This function is a prefetch call generated by the compiler that
146  * populates the shared primary prefetch queue*/
147 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
148   /* Allocate for the queue node*/
149   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
150   int len;
151   char * node= getmemory(qnodesize);
152   int top=endoffsets[ntuples-1];
153
154   if (node==NULL)
155     return;
156   /* Set queue node values */
157
158   /* TODO: Remove this after testing */
159   evalPrefetch[siteid].callcount++;
160
161   *((int *)(node))=siteid;
162   *((int *)(node + sizeof(int))) = ntuples;
163   len = 2*sizeof(int);
164   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
165   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
166   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
167
168   /* Lock and insert into primary prefetch queue */
169   movehead(qnodesize);
170 }
171
172 /* This function starts up the transaction runtime. */
173 int dstmStartup(const char * option) {
174   pthread_t thread_Listen, udp_thread_Listen;
175   pthread_attr_t attr;
176   int master=option!=NULL && strcmp(option, "master")==0;
177   int fd;
178   int udpfd;
179
180   if (processConfigFile() != 0)
181     return 0; //TODO: return error value, cause main program to exit
182 #ifdef COMPILER
183   if (!master)
184     threadcount--;
185 #endif
186
187 #ifdef TRANSSTATS
188   printf("Trans stats is on\n");
189   fflush(stdout);
190 #endif
191
192   //Initialize socket pool
193   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
194   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
195   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
196
197   dstmInit();
198   transInit();
199
200   fd=startlistening();
201   pthread_attr_init(&attr);
202   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
203 #ifdef CACHE
204   udpfd = udpInit();
205   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
206 #endif
207   if (master) {
208     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
209     return 1;
210   } else {
211     dstmListen((void *)fd);
212     return 0;
213   }
214 }
215
216 //TODO Use this later
217 void *pCacheAlloc(objstr_t *store, unsigned int size) {
218   void *tmp;
219   objstr_t *ptr;
220   ptr = store;
221   int success = 0;
222
223   while(ptr->next != NULL) {
224     /* check if store is empty */
225     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
226       tmp = ptr->top;
227       ptr->top += size;
228       success = 1;
229       return tmp;
230     } else {
231       ptr = ptr->next;
232     }
233   }
234
235   if(success == 0) {
236     return NULL;
237   }
238 }
239
240 /* This function initiates the prefetch thread A queue is shared
241  * between the main thread of execution and the prefetch thread to
242  * process the prefetch call Call from compiler populates the shared
243  * queue with prefetch requests while prefetch thread processes the
244  * prefetch requests */
245
246 void transInit() {
247   //Create and initialize prefetch cache structure
248 #ifdef CACHE
249   prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
250   initializePCache();
251   if((evalPrefetch = initPrefetchStats()) == NULL) {
252     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
253     exit(0);
254   }
255 #endif
256
257   /* Initialize attributes for mutex */
258   pthread_mutexattr_init(&prefetchcache_mutex_attr);
259   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
260
261   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
262   pthread_mutex_init(&notifymutex, NULL);
263   pthread_mutex_init(&atomicObjLock, NULL);
264 #ifdef CACHE
265   //Create prefetch cache lookup table
266   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
267     printf("ERROR\n");
268     return; //Failure
269   }
270
271   //Initialize primary shared queue
272   queueInit();
273   //Initialize machine pile w/prefetch oids and offsets shared queue
274   mcpileqInit();
275
276   //Create the primary prefetch thread
277   int retval;
278   do {
279     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
280   } while(retval!=0);
281   pthread_detach(tPrefetch);
282 #endif
283 }
284
285 /* This function stops the threads spawned */
286 void transExit() {
287 #ifdef CACHE
288   int t;
289   pthread_cancel(tPrefetch);
290   for(t = 0; t < NUM_THREADS; t++)
291     pthread_cancel(wthreads[t]);
292 #endif
293
294   return;
295 }
296
297 /* This functions inserts randowm wait delays in the order of msec
298  * Mostly used when transaction commits retry*/
299 void randomdelay() {
300   struct timespec req;
301   time_t t;
302
303   t = time(NULL);
304   req.tv_sec = 0;
305   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
306   nanosleep(&req, NULL);
307   return;
308 }
309
310 /* This function initializes things required in the transaction start*/
311 transrecord_t *transStart() {
312   transrecord_t *tmp;
313   if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
314     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
315     return NULL;
316   }
317   tmp->cache = objstrCreate(1048576);
318   tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
319 #ifdef COMPILER
320   tmp->revertlist=NULL;
321 #endif
322   return tmp;
323 }
324
325 /* This function finds the location of the objects involved in a transaction
326  * and returns the pointer to the object if found in a remote location */
327 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
328   unsigned int machinenumber;
329   objheader_t *tmp, *objheader;
330   objheader_t *objcopy;
331   int size;
332   void *buf;
333
334   if(oid == 0) {
335     return NULL;
336   }
337
338   if((objheader = chashSearch(record->lookupTable, oid)) != NULL) {
339 #ifdef TRANSSTATS
340     nchashSearch++;
341 #endif
342     /* Search local transaction cache */
343 #ifdef COMPILER
344     return &objheader[1];
345 #else
346     return objheader;
347 #endif
348   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
349 #ifdef TRANSSTATS
350     nmhashSearch++;
351 #endif
352     /* Look up in machine lookup table  and copy  into cache*/
353     GETSIZE(size, objheader);
354     size += sizeof(objheader_t);
355     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
356     memcpy(objcopy, objheader, size);
357     /* Insert into cache's lookup table */
358     STATUS(objcopy)=0;
359     chashInsert(record->lookupTable, OID(objheader), objcopy);
360 #ifdef COMPILER
361     return &objcopy[1];
362 #else
363     return objcopy;
364 #endif
365   } else {
366 #ifdef CACHE
367     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
368 #ifdef TRANSSTATS
369       nprehashSearch++;
370 #endif
371       /* Look up in prefetch cache */
372       GETSIZE(size, tmp);
373       size+=sizeof(objheader_t);
374       objcopy = (objheader_t *) objstrAlloc(record->cache, size);
375       memcpy(objcopy, tmp, size);
376       /* Insert into cache's lookup table */
377       chashInsert(record->lookupTable, OID(tmp), objcopy);
378 #ifdef COMPILER
379       return &objcopy[1];
380 #else
381       return objcopy;
382 #endif
383     }
384 #endif
385     /* Get the object from the remote location */
386     if((machinenumber = lhashSearch(oid)) == 0) {
387       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
388       return NULL;
389     }
390     objcopy = getRemoteObj(record, machinenumber, oid);
391
392     if(objcopy == NULL) {
393       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
394       return NULL;
395     } else {
396 #ifdef TRANSSTATS
397       nRemoteSend++;
398 #endif
399       STATUS(objcopy)=0;
400 #ifdef COMPILER
401       return &objcopy[1];
402 #else
403       return objcopy;
404 #endif
405     }
406   }
407 }
408
409 /* This function creates objects in the transaction record */
410 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
411   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
412   OID(tmp) = getNewOID();
413   tmp->version = 1;
414   tmp->rcount = 1;
415   STATUS(tmp) = NEW;
416   chashInsert(record->lookupTable, OID(tmp), tmp);
417
418 #ifdef COMPILER
419   return &tmp[1]; //want space after object header
420 #else
421   return tmp;
422 #endif
423 }
424
425 /* This function creates machine piles based on all machines involved in a
426  * transaction commit request */
427 plistnode_t *createPiles(transrecord_t *record) {
428   int i;
429   plistnode_t *pile = NULL;
430   unsigned int machinenum;
431   objheader_t *headeraddr;
432   chashlistnode_t * ptr = record->lookupTable->table;
433   /* Represents number of bins in the chash table */
434   unsigned int size = record->lookupTable->size;
435
436   for(i = 0; i < size ; i++) {
437     chashlistnode_t * curr = &ptr[i];
438     /* Inner loop to traverse the linked list of the cache lookupTable */
439     while(curr != NULL) {
440       //if the first bin in hash table is empty
441       if(curr->key == 0)
442         break;
443
444       if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
445         printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
446         return NULL;
447       }
448
449       //Get machine location for object id (and whether local or not)
450       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
451         machinenum = myIpAddr;
452       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
453         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
454         return NULL;
455       }
456
457       //Make machine groups
458       pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
459       curr = curr->next;
460     }
461   }
462   return pile;
463 }
464
465 /* This function initiates the transaction commit process
466  * Spawns threads for each of the new connections with Participants
467  * and creates new piles by calling the createPiles(),
468  * Sends a transrequest() to each remote machines for objects found remotely
469  * and calls handleLocalReq() to process objects found locally */
470 int transCommit(transrecord_t *record) {
471   unsigned int tot_bytes_mod, *listmid;
472   plistnode_t *pile, *pile_ptr;
473   int i, j, rc, val;
474   int pilecount, offset, threadnum, trecvcount;
475   char control;
476   char transid[TID_LEN];
477   trans_req_data_t *tosend;
478   trans_commit_data_t transinfo;
479   static int newtid = 0;
480   char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
481   thread_data_array_t *thread_data_array;
482   local_thread_data_array_t *ltdata;
483   int firsttime=1;
484
485
486   do {
487     treplyctrl=0;
488     trecvcount = 0;
489     threadnum = 0;
490     treplyretry = 0;
491     thread_data_array = NULL;
492     ltdata = NULL;
493
494     /* Look through all the objects in the transaction record and make piles
495      * for each machine involved in the transaction*/
496     if (firsttime)
497       pile_ptr = pile = createPiles(record);
498     else
499       pile=pile_ptr;
500     firsttime=0;
501     /* Create the packet to be sent in TRANS_REQUEST */
502
503     /* Count the number of participants */
504     pilecount = pCount(pile);
505
506     /* Create a list of machine ids(Participants) involved in transaction       */
507     listmid = calloc(pilecount, sizeof(unsigned int));
508     pListMid(pile, listmid);
509
510
511     /* Initialize thread variables,
512      * Spawn a thread for each Participant involved in a transaction */
513     pthread_t thread[pilecount];
514     pthread_attr_t attr;
515     pthread_cond_t tcond;
516     pthread_mutex_t tlock;
517     pthread_mutex_t tlshrd;
518
519     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
520     ltdata = calloc(1, sizeof(local_thread_data_array_t));
521
522     thread_response_t rcvd_control_msg[pilecount];      /* Shared thread array that keeps track of responses of participants */
523
524     /* Initialize and set thread detach attribute */
525     pthread_attr_init(&attr);
526     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
527     pthread_mutex_init(&tlock, NULL);
528     pthread_cond_init(&tcond, NULL);
529
530     /* Process each machine pile */
531     while(pile != NULL) {
532       //Create transaction id
533       newtid++;
534       tosend = calloc(1, sizeof(trans_req_data_t));
535       tosend->f.control = TRANS_REQUEST;
536       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
537       tosend->f.mcount = pilecount;
538       tosend->f.numread = pile->numread;
539       tosend->f.nummod = pile->nummod;
540       tosend->f.numcreated = pile->numcreated;
541       tosend->f.sum_bytes = pile->sum_bytes;
542       tosend->listmid = listmid;
543       tosend->objread = pile->objread;
544       tosend->oidmod = pile->oidmod;
545       tosend->oidcreated = pile->oidcreated;
546       thread_data_array[threadnum].thread_id = threadnum;
547       thread_data_array[threadnum].mid = pile->mid;
548       thread_data_array[threadnum].buffer = tosend;
549       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
550       thread_data_array[threadnum].threshold = &tcond;
551       thread_data_array[threadnum].lock = &tlock;
552       thread_data_array[threadnum].count = &trecvcount;
553       thread_data_array[threadnum].replyctrl = &treplyctrl;
554       thread_data_array[threadnum].replyretry = &treplyretry;
555       thread_data_array[threadnum].rec = record;
556       /* If local do not create any extra connection */
557       if(pile->mid != myIpAddr) { /* Not local */
558         do {
559           rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
560         } while(rc!=0);
561         if(rc) {
562           perror("Error in pthread create\n");
563           pthread_cond_destroy(&tcond);
564           pthread_mutex_destroy(&tlock);
565           pDelete(pile_ptr);
566           free(listmid);
567           for (i = 0; i < threadnum; i++)
568             free(thread_data_array[i].buffer);
569           free(thread_data_array);
570           free(ltdata);
571           return 1;
572         }
573       } else { /*Local*/
574         ltdata->tdata = &thread_data_array[threadnum];
575         ltdata->transinfo = &transinfo;
576         do {
577           val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
578         } while(val!=0);
579         if(val) {
580           perror("Error in pthread create\n");
581           pthread_cond_destroy(&tcond);
582           pthread_mutex_destroy(&tlock);
583           pDelete(pile_ptr);
584           free(listmid);
585           for (i = 0; i < threadnum; i++)
586             free(thread_data_array[i].buffer);
587           free(thread_data_array);
588           free(ltdata);
589           return 1;
590         }
591       }
592
593       threadnum++;
594       pile = pile->next;
595     }
596
597     /* Free attribute and wait for the other threads */
598     pthread_attr_destroy(&attr);
599     for (i = 0; i < threadnum; i++) {
600       rc = pthread_join(thread[i], NULL);
601       if(rc) {
602         printf("Error: return code from pthread_join() is %d\n", rc);
603         pthread_cond_destroy(&tcond);
604         pthread_mutex_destroy(&tlock);
605         pDelete(pile_ptr);
606         free(listmid);
607         for (j = i; j < threadnum; j++) {
608           free(thread_data_array[j].buffer);
609         }
610         return 1;
611       }
612       free(thread_data_array[i].buffer);
613     }
614     /* Free resources */
615     pthread_cond_destroy(&tcond);
616     pthread_mutex_destroy(&tlock);
617     free(listmid);
618
619     if (!treplyretry)
620       pDelete(pile_ptr);
621
622     /* wait a random amount of time before retrying to commit transaction*/
623     if(treplyretry) {
624       free(thread_data_array);
625       free(ltdata);
626       randomdelay();
627 #ifdef TRANSSTATS
628       nSoftAbort++;
629 #endif
630     }
631
632     /* Retry trans commit procedure during soft_abort case */
633   } while (treplyretry);
634
635   if(treplyctrl == TRANS_ABORT) {
636 #ifdef TRANSSTATS
637     numTransAbort++;
638 #endif
639     /* Free Resources */
640     objstrDelete(record->cache);
641     chashDelete(record->lookupTable);
642     free(record);
643     free(thread_data_array);
644     free(ltdata);
645     return TRANS_ABORT;
646   } else if(treplyctrl == TRANS_COMMIT) {
647 #ifdef TRANSSTATS
648     numTransCommit++;
649 #endif
650     /* Free Resources */
651     objstrDelete(record->cache);
652     chashDelete(record->lookupTable);
653     free(record);
654     free(thread_data_array);
655     free(ltdata);
656     return 0;
657   } else {
658     //TODO Add other cases
659     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
660     exit(-1);
661   }
662   return 0;
663 }
664
665 /* This function sends information involved in the transaction request
666  * to participants and accepts a response from particpants.
667  * It calls decideresponse() to decide on what control message
668  * to send next to participants and sends the message using sendResponse()*/
669 void *transRequest(void *threadarg) {
670   int sd, i, n;
671   struct sockaddr_in serv_addr;
672   thread_data_array_t *tdata;
673   objheader_t *headeraddr;
674   char control, recvcontrol;
675   char machineip[16], retval;
676
677   tdata = (thread_data_array_t *) threadarg;
678   if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
679     printf("transRequest(): socket create error\n");
680     pthread_exit(NULL);
681   }
682
683   /* Send bytes of data with TRANS_REQUEST control message */
684   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
685
686   /* Send list of machines involved in the transaction */
687   {
688     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
689     send_data(sd, tdata->buffer->listmid, size);
690   }
691
692   /* Send oids and version number tuples for objects that are read */
693   {
694     int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
695     send_data(sd, tdata->buffer->objread, size);
696   }
697
698   /* Send objects that are modified */
699   void *modptr;
700   if((modptr = calloc(1, tdata->buffer->f.sum_bytes)) == NULL) {
701     printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
702     pthread_exit(NULL);
703   }
704   int offset = 0;
705   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
706     int size;
707     if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
708       printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
709       pthread_exit(NULL);
710     }
711     GETSIZE(size,headeraddr);
712     size+=sizeof(objheader_t);
713     memcpy(modptr+offset, headeraddr, size);
714     offset+=size;
715   }
716   send_data(sd, modptr, tdata->buffer->f.sum_bytes);
717   free(modptr);
718   /* Read control message from Participant */
719   recv_data(sd, &control, sizeof(char));
720
721   /* Recv Objects if participant sends TRANS_DISAGREE */
722 #ifdef CACHE
723   if(control == TRANS_DISAGREE) {
724     int length;
725     recv_data(sd, &length, sizeof(int));
726     void *newAddr;
727     pthread_mutex_lock(&prefetchcache_mutex);
728     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
729       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
730       pthread_mutex_unlock(&prefetchcache_mutex);
731       pthread_exit(NULL);
732     }
733     pthread_mutex_unlock(&prefetchcache_mutex);
734     recv_data(sd, newAddr, length);
735     int offset = 0;
736     while(length != 0) {
737       unsigned int oidToPrefetch;
738       objheader_t * header;
739       header = (objheader_t *)(((char *)newAddr) + offset);
740       oidToPrefetch = OID(header);
741       int size = 0;
742       GETSIZE(size, header);
743       size += sizeof(objheader_t);
744       //make an entry in prefetch hash table
745       void *oldptr;
746       if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
747         prehashRemove(oidToPrefetch);
748         prehashInsert(oidToPrefetch, header);
749       } else {
750         prehashInsert(oidToPrefetch, header);
751       }
752       length = length - size;
753       offset += size;
754     }
755   }
756 #endif
757   recvcontrol = control;
758   /* Update common data structure and increment count */
759   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
760
761   /* Lock and update count */
762   /* Thread sleeps until all messages from pariticipants are received by coordinator */
763   pthread_mutex_lock(tdata->lock);
764
765   (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
766
767   /* Wake up the threads and invoke decideResponse (once) */
768   if(*(tdata->count) == tdata->buffer->f.mcount) {
769     decideResponse(tdata);
770     pthread_cond_broadcast(tdata->threshold);
771   } else {
772     pthread_cond_wait(tdata->threshold, tdata->lock);
773   }
774   pthread_mutex_unlock(tdata->lock);
775
776 #ifdef CACHE
777   if(*(tdata->replyctrl) == TRANS_COMMIT) {
778     int retval;
779     /* Update prefetch cache */
780     if((retval = updatePrefetchCache(tdata)) != 0) {
781       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
782       return;
783     }
784
785     /* Invalidate objects in other machine cache */
786     if(tdata->buffer->f.nummod > 0) {
787       if((retval = invalidateObj(tdata)) != 0) {
788         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
789         return;
790       }
791     }
792   }
793 #endif
794
795   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
796    * to all participants in their respective socket */
797   if (sendResponse(tdata, sd) == 0) {
798     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
799     pthread_exit(NULL);
800   }
801   pthread_exit(NULL);
802 }
803
804 /* This function decides the reponse that needs to be sent to
805  * all Participant machines after the TRANS_REQUEST protocol */
806 void decideResponse(thread_data_array_t *tdata) {
807   char control;
808   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
809                                                                    message to send */
810   for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
811     control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
812                                                written onto the shared array */
813     switch(control) {
814     default:
815       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
816
817       /* treat as disagree, pass thru */
818     case TRANS_DISAGREE:
819       transdisagree++;
820       break;
821
822     case TRANS_AGREE:
823       transagree++;
824       break;
825
826     case TRANS_SOFT_ABORT:
827       transsoftabort++;
828       break;
829     }
830   }
831
832   if(transdisagree > 0) {
833     /* Send Abort */
834     *(tdata->replyctrl) = TRANS_ABORT;
835     *(tdata->replyretry) = 0;
836 #ifdef CACHE
837     /* clear objects from prefetch cache */
838     cleanPCache(tdata);
839 #endif
840   } else if(transagree == tdata->buffer->f.mcount) {
841     /* Send Commit */
842     *(tdata->replyctrl) = TRANS_COMMIT;
843     *(tdata->replyretry) = 0;
844   } else {
845     /* Send Abort in soft abort case followed by retry commiting transaction again*/
846     *(tdata->replyctrl) = TRANS_ABORT;
847     *(tdata->replyretry) = 1;
848   }
849   return;
850 }
851
852 /* This function sends the final response to remote machines per
853  * thread in their respective socket id It returns a char that is only
854  * needed to check the correctness of execution of this function
855  * inside transRequest()*/
856
857 char sendResponse(thread_data_array_t *tdata, int sd) {
858   int n, size, sum, oidcount = 0, control;
859   char *ptr, retval = 0;
860   unsigned int *oidnotfound;
861
862   control = *(tdata->replyctrl);
863   send_data(sd, &control, sizeof(char));
864
865   //TODO read missing objects during object migration
866   /* If response is a soft abort due to missing objects at the
867      Participant's side */
868
869   /* If the decided response is TRANS_ABORT */
870   if(*(tdata->replyctrl) == TRANS_ABORT) {
871     retval = TRANS_ABORT;
872   } else if(*(tdata->replyctrl) == TRANS_COMMIT) {
873     /* If the decided response is TRANS_COMMIT */
874     retval = TRANS_COMMIT;
875   }
876   return retval;
877 }
878
879 /* This function opens a connection, places an object read request to
880  * the remote machine, reads the control message and object if
881  * available and copies the object and its header to the local
882  * cache. */
883
884 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
885   int size, val;
886   struct sockaddr_in serv_addr;
887   char machineip[16];
888   char control;
889   objheader_t *h;
890   void *objcopy = NULL;
891
892   int sd = getSock2(transReadSockPool, mnum);
893   char readrequest[sizeof(char)+sizeof(unsigned int)];
894   readrequest[0] = READ_REQUEST;
895   *((unsigned int *)(&readrequest[1])) = oid;
896   send_data(sd, readrequest, sizeof(readrequest));
897
898   /* Read response from the Participant */
899   recv_data(sd, &control, sizeof(char));
900
901   if (control==OBJECT_NOT_FOUND) {
902     objcopy = NULL;
903   } else {
904     /* Read object if found into local cache */
905     recv_data(sd, &size, sizeof(int));
906     objcopy = objstrAlloc(record->cache, size);
907     recv_data(sd, objcopy, size);
908     /* Insert into cache's lookup table */
909     chashInsert(record->lookupTable, oid, objcopy);
910   }
911
912   return objcopy;
913 }
914
915 /* This function handles the local objects involved in a transaction
916  * commiting process.  It also makes a decision if this local machine
917  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator.  Note
918  * Coordinator = local machine It wakes up the other threads from
919  * remote participants that are waiting for the coordinator's decision
920  * and based on common agreement it either commits or aborts the
921  * transaction.  It also frees the memory resources */
922
923 void *handleLocalReq(void *threadarg) {
924   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
925   local_thread_data_array_t *localtdata;
926   int numoidnotfound = 0, numoidlocked = 0;
927   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
928   int numread, i;
929   unsigned int oid;
930   unsigned short version;
931   localtdata = (local_thread_data_array_t *) threadarg;
932   /* Counters and arrays to formulate decision on control message to be sent */
933   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
934   oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
935                                                                                                                                               //setting a divider of read locks
936                                                                                                                                               //and write locks
937
938   numread = localtdata->tdata->buffer->f.numread;
939   /* Process each oid in the machine pile/ group per thread */
940   for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
941     if (i < localtdata->tdata->buffer->f.numread) {
942       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
943       incr *= i;
944       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
945       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
946       commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
947     } else { // Objects Modified
948       if(i == localtdata->tdata->buffer->f.numread) {
949         oidlocked[numoidlocked] = -1;
950         numoidlocked++;
951       }
952       int tmpsize;
953       objheader_t *headptr;
954       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
955       if (headptr == NULL) {
956         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
957         return NULL;
958       }
959       oid = OID(headptr);
960       version = headptr->version;
961       commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
962     }
963   }
964
965   /* Condition to send TRANS_AGREE */
966   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
967     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
968   }
969   /* Condition to send TRANS_SOFT_ABORT */
970   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
971     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
972   }
973
974   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
975    * if Participant receives a TRANS_COMMIT */
976   localtdata->transinfo->objlocked = oidlocked;
977   localtdata->transinfo->objnotfound = oidnotfound;
978   localtdata->transinfo->modptr = NULL;
979   localtdata->transinfo->numlocked = numoidlocked;
980   localtdata->transinfo->numnotfound = numoidnotfound;
981
982   /* Lock and update count */
983   //Thread sleeps until all messages from pariticipants are received by coordinator
984   pthread_mutex_lock(localtdata->tdata->lock);
985   (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
986
987   /* Wake up the threads and invoke decideResponse (once) */
988   if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
989     decideResponse(localtdata->tdata);
990     pthread_cond_broadcast(localtdata->tdata->threshold);
991   } else {
992     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
993   }
994   pthread_mutex_unlock(localtdata->tdata->lock);
995   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
996     if(transAbortProcess(localtdata) != 0) {
997       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
998       fflush(stdout);
999       pthread_exit(NULL);
1000     }
1001   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
1002 #ifdef CACHE
1003     /* Invalidate objects in other machine cache */
1004     if(localtdata->tdata->buffer->f.nummod > 0) {
1005       int retval;
1006       if((retval = invalidateObj(localtdata->tdata)) != 0) {
1007         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1008         return;
1009       }
1010     }
1011 #endif
1012     if(transComProcess(localtdata) != 0) {
1013       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1014       fflush(stdout);
1015       pthread_exit(NULL);
1016     }
1017   }
1018
1019   /* Free memory */
1020   if (localtdata->transinfo->objlocked != NULL) {
1021     free(localtdata->transinfo->objlocked);
1022   }
1023   if (localtdata->transinfo->objnotfound != NULL) {
1024     free(localtdata->transinfo->objnotfound);
1025   }
1026   pthread_exit(NULL);
1027 }
1028
1029 /*  Commit info for objects modified */
1030 void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1031                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1032   void *mobj;
1033   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1034   /* Save the oids not found and number of oids not found for later use */
1035   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1036     /* Save the oids not found and number of oids not found for later use */
1037     oidnotfound[*numoidnotfound] = oid;
1038     (*numoidnotfound)++;
1039   } else { /* If Obj found in machine (i.e. has not moved) */
1040     /* Check if Obj is locked by any previous transaction */
1041     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1042       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1043         (*v_matchnolock)++;
1044         //Keep track of what is locked
1045         oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
1046         (*numoidlocked)++;
1047       } else { /* If versions don't match ...HARD ABORT */
1048         (*v_nomatch)++;
1049         /* Send TRANS_DISAGREE to Coordinator */
1050         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1051         //Keep track of what is locked
1052         oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
1053         (*numoidlocked)++;
1054         return;
1055       }
1056     } else { //A lock is acquired some place else
1057       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1058         (*v_matchlock)++;
1059       } else { /* If versions don't match ...HARD ABORT */
1060         (*v_nomatch)++;
1061         /* Send TRANS_DISAGREE to Coordinator */
1062         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1063         return;
1064       }
1065     }
1066   }
1067 }
1068
1069 /*  Commit info for objects modified */
1070 void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1071                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1072   void *mobj;
1073   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1074   /* Save the oids not found and number of oids not found for later use */
1075   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1076     /* Save the oids not found and number of oids not found for later use */
1077     oidnotfound[*numoidnotfound] = oid;
1078     (*numoidnotfound)++;
1079   } else { /* If Obj found in machine (i.e. has not moved) */
1080     /* Check if Obj is locked by any previous transaction */
1081     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1082       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1083         (*v_matchnolock)++;
1084         //Keep track of what is locked
1085         oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
1086         (*numoidlocked)++;
1087       } else { /* If versions don't match ...HARD ABORT */
1088         (*v_nomatch)++;
1089         /* Send TRANS_DISAGREE to Coordinator */
1090         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1091         //Keep track of what is locked
1092         oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
1093         (*numoidlocked)++;
1094         return;
1095       }
1096     } else { //Has reached max number of readers or some other transaction
1097       //has acquired a lock on this object
1098       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1099         (*v_matchlock)++;
1100       } else { /* If versions don't match ...HARD ABORT */
1101         (*v_nomatch)++;
1102         /* Send TRANS_DISAGREE to Coordinator */
1103         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1104         return;
1105       }
1106     }
1107   }
1108 }
1109
1110 /* This function completes the ABORT process if the transaction is aborting */
1111 int transAbortProcess(local_thread_data_array_t  *localtdata) {
1112   int i, numlocked;
1113   unsigned int *objlocked;
1114   void *header;
1115
1116   numlocked = localtdata->transinfo->numlocked;
1117   objlocked = localtdata->transinfo->objlocked;
1118
1119   int useWriteUnlock = 0;
1120   for (i = 0; i < numlocked; i++) {
1121     if(objlocked[i] == -1) {
1122       useWriteUnlock = 1;
1123       continue;
1124     }
1125     if((header = mhashSearch(objlocked[i])) == NULL) {
1126       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1127       return 1;
1128     }
1129     if(!useWriteUnlock) {
1130       read_unlock(STATUSPTR(header));
1131     } else {
1132       write_unlock(STATUSPTR(header));
1133     }
1134   }
1135
1136   return 0;
1137 }
1138
1139 /*This function completes the COMMIT process if the transaction is commiting*/
1140 int transComProcess(local_thread_data_array_t  *localtdata) {
1141   objheader_t *header, *tcptr;
1142   int i, nummod, tmpsize, numcreated, numlocked;
1143   unsigned int *oidmod, *oidcreated, *oidlocked;
1144   void *ptrcreate;
1145
1146   nummod = localtdata->tdata->buffer->f.nummod;
1147   oidmod = localtdata->tdata->buffer->oidmod;
1148   numcreated = localtdata->tdata->buffer->f.numcreated;
1149   oidcreated = localtdata->tdata->buffer->oidcreated;
1150   numlocked = localtdata->transinfo->numlocked;
1151   oidlocked = localtdata->transinfo->objlocked;
1152
1153   for (i = 0; i < nummod; i++) {
1154     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1155       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1156       return 1;
1157     }
1158     /* Copy from transaction cache -> main object store */
1159     if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
1160       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1161       return 1;
1162     }
1163     GETSIZE(tmpsize, header);
1164     char *tmptcptr = (char *) tcptr;
1165     memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
1166     header->version += 1;
1167     if(header->notifylist != NULL) {
1168       notifyAll(&header->notifylist, OID(header), header->version);
1169     }
1170   }
1171   /* If object is newly created inside transaction then commit it */
1172   for (i = 0; i < numcreated; i++) {
1173     if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
1174       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1175       return 1;
1176     }
1177     GETSIZE(tmpsize, header);
1178     tmpsize += sizeof(objheader_t);
1179     pthread_mutex_lock(&mainobjstore_mutex);
1180     if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1181       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1182       pthread_mutex_unlock(&mainobjstore_mutex);
1183       return 1;
1184     }
1185     pthread_mutex_unlock(&mainobjstore_mutex);
1186     /* Initialize read and write locks */
1187     initdsmlocks(STATUSPTR(header));
1188     memcpy(ptrcreate, header, tmpsize);
1189     mhashInsert(oidcreated[i], ptrcreate);
1190     lhashInsert(oidcreated[i], myIpAddr);
1191   }
1192   /* Unlock locked objects */
1193   int useWriteUnlock = 0;
1194   for(i = 0; i < numlocked; i++) {
1195     if(oidlocked[i] == -1) {
1196       useWriteUnlock = 1;
1197       continue;
1198     }
1199     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1200       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1201       return 1;
1202     }
1203     if(!useWriteUnlock) {
1204       read_unlock(STATUSPTR(header));
1205     } else {
1206       write_unlock(STATUSPTR(header));
1207     }
1208   }
1209   return 0;
1210 }
1211
1212 prefetchpile_t *foundLocal(char *ptr) {
1213   int siteid = *(GET_SITEID(ptr));
1214   int ntuples = *(GET_NTUPLES(ptr));
1215   unsigned int * oidarray = GET_PTR_OID(ptr);
1216   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1217   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1218   prefetchpile_t * head=NULL;
1219   int numLocal = 0;
1220
1221   int i;
1222   for(i=0; i<ntuples; i++) {
1223     unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1224     unsigned short endindex=endoffsets[i];
1225     unsigned int oid=oidarray[i];
1226     int newbase;
1227     int machinenum;
1228     if (oid==0)
1229       continue;
1230     //Look up fields locally
1231     for(newbase=baseindex; newbase<endindex; newbase++) {
1232       if (!lookupObject(&oid, arryfields[newbase]))
1233         break;
1234       //Ended in a null pointer...
1235       if (oid==0)
1236         goto tuple;
1237     }
1238     //Entire prefetch is local
1239     if (newbase==endindex&&checkoid(oid)) {
1240       numLocal++;
1241       goto tuple;
1242     }
1243     //Add to remote requests
1244     machinenum=lhashSearch(oid);
1245     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1246 tuple:
1247     ;
1248   }
1249
1250   /* handle dynamic prefetching */
1251   handleDynPrefetching(numLocal, ntuples, siteid);
1252   return head;
1253 }
1254
1255 int checkoid(unsigned int oid) {
1256   objheader_t *header;
1257   if ((header=mhashSearch(oid))!=NULL) {
1258     //Found on machine
1259     return 1;
1260   } else if ((header=prehashSearch(oid))!=NULL) {
1261     //Found in cache
1262     return 1;
1263   } else {
1264     return 0;
1265   }
1266 }
1267
1268 int lookupObject(unsigned int * oid, short offset) {
1269   objheader_t *header;
1270   if ((header=mhashSearch(*oid))!=NULL) {
1271     //Found on machine
1272     ;
1273   } else if ((header=prehashSearch(*oid))!=NULL) {
1274     //Found in cache
1275     ;
1276   } else {
1277     return 0;
1278   }
1279
1280   if(TYPE(header) > NUMCLASSES) {
1281     int elementsize = classsize[TYPE(header)];
1282     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1283     int length = ao->___length___;
1284     /* Check if array out of bounds */
1285     if(offset < 0 || offset >= length) {
1286       //if yes treat the object as found
1287       (*oid)=0;
1288       return 1;
1289     }
1290     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1291     return 1;
1292   } else {
1293     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1294     return 1;
1295   }
1296 }
1297
1298
1299 /* This function is called by the thread calling transPrefetch */
1300 void *transPrefetch(void *t) {
1301   while(1) {
1302     /* lock mutex of primary prefetch queue */
1303     void *node=gettail();
1304     /* Check if the tuples are found locally, if yes then reduce them further*/
1305     /* and group requests by remote machine ids by calling the makePreGroups() */
1306     prefetchpile_t *pilehead = foundLocal(node);
1307
1308     if (pilehead!=NULL) {
1309       // Get sock from shared pool
1310       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1311
1312       /* Send  Prefetch Request */
1313       prefetchpile_t *ptr = pilehead;
1314       while(ptr != NULL) {
1315         sendPrefetchReq(ptr, sd);
1316         ptr = ptr->next;
1317       }
1318
1319       /* Release socket */
1320       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1321
1322       /* Deallocated pilehead */
1323       mcdealloc(pilehead);
1324     }
1325     // Deallocate the prefetch queue pile node
1326     inctail();
1327   }
1328 }
1329
1330 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1331   objpile_t *tmp;
1332
1333   int size=sizeof(char)+sizeof(int);
1334   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1335     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1336   }
1337
1338   char buft[size];
1339   char *buf=buft;
1340   *buf=TRANS_PREFETCH;
1341   buf+=sizeof(char);
1342
1343   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1344     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1345     *((int*)buf)=len;
1346     buf+=sizeof(int);
1347     *((unsigned int *)buf)=tmp->oid;
1348     buf+=sizeof(unsigned int);
1349     *((unsigned int *)(buf)) = myIpAddr;
1350     buf+=sizeof(unsigned int);
1351     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1352     buf+=tmp->numoffset*sizeof(short);
1353   }
1354   *((int *)buf)=-1;
1355   send_data(sd, buft, size);
1356   return;
1357 }
1358
1359 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1360   int len, endpair;
1361   char control;
1362   objpile_t *tmp;
1363
1364   /* Send TRANS_PREFETCH control message */
1365   control = TRANS_PREFETCH;
1366   send_data(sd, &control, sizeof(char));
1367
1368   /* Send Oids and offsets in pairs */
1369   tmp = mcpilenode->objpiles;
1370   while(tmp != NULL) {
1371     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1372     char oidnoffset[len];
1373     char *buf=oidnoffset;
1374     *((int*)buf) = tmp->numoffset;
1375     buf+=sizeof(int);
1376     *((unsigned int *)buf) = tmp->oid;
1377     buf+=sizeof(unsigned int);
1378     *((unsigned int *)buf) = myIpAddr;
1379     buf += sizeof(unsigned int);
1380     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1381     send_data(sd, oidnoffset, len);
1382     tmp = tmp->next;
1383   }
1384
1385   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1386   endpair = -1;
1387   send_data(sd, &endpair, sizeof(int));
1388
1389   return;
1390 }
1391
1392 int getPrefetchResponse(int sd) {
1393   int length = 0, size = 0;
1394   char control;
1395   unsigned int oid;
1396   void *modptr, *oldptr;
1397
1398   recv_data((int)sd, &length, sizeof(int));
1399   size = length - sizeof(int);
1400   char recvbuffer[size];
1401
1402   recv_data((int)sd, recvbuffer, size);
1403   control = *((char *) recvbuffer);
1404   if(control == OBJECT_FOUND) {
1405     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1406     size = size - (sizeof(char) + sizeof(unsigned int));
1407     pthread_mutex_lock(&prefetchcache_mutex);
1408     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1409       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1410       pthread_mutex_unlock(&prefetchcache_mutex);
1411       return -1;
1412     }
1413     pthread_mutex_unlock(&prefetchcache_mutex);
1414     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1415     STATUS(modptr)=0;
1416
1417     /* Insert the oid and its address into the prefetch hash lookup table */
1418     /* Do a version comparison if the oid exists */
1419     if((oldptr = prehashSearch(oid)) != NULL) {
1420       /* If older version then update with new object ptr */
1421       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1422         prehashRemove(oid);
1423         prehashInsert(oid, modptr);
1424       }
1425     } else { /* Else add the object ptr to hash table*/
1426       prehashInsert(oid, modptr);
1427     }
1428     /* Lock the Prefetch Cache look up table*/
1429     pthread_mutex_lock(&pflookup.lock);
1430     /* Broadcast signal on prefetch cache condition variable */
1431     pthread_cond_broadcast(&pflookup.cond);
1432     /* Unlock the Prefetch Cache look up table*/
1433     pthread_mutex_unlock(&pflookup.lock);
1434   } else if(control == OBJECT_NOT_FOUND) {
1435     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1436     /* TODO: For each object not found query DHT for new location and retrieve the object */
1437     /* Throw an error */
1438     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1439     //    exit(-1);
1440   } else {
1441     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1442   }
1443
1444   return 0;
1445 }
1446
1447 unsigned short getObjType(unsigned int oid) {
1448   objheader_t *objheader;
1449   unsigned short numoffset[] ={0};
1450   short fieldoffset[] ={};
1451
1452   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1453 #ifdef CACHE
1454     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1455 #endif
1456     unsigned int mid = lhashSearch(oid);
1457     int sd = getSock2(transReadSockPool, mid);
1458     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1459     remotereadrequest[0] = READ_REQUEST;
1460     *((unsigned int *)(&remotereadrequest[1])) = oid;
1461     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1462
1463     /* Read response from the Participant */
1464     char control;
1465     recv_data(sd, &control, sizeof(char));
1466
1467     if (control==OBJECT_NOT_FOUND) {
1468       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1469       fflush(stdout);
1470       exit(-1);
1471     } else {
1472       /* Read object if found into local cache */
1473       int size;
1474       recv_data(sd, &size, sizeof(int));
1475 #ifdef CACHE
1476       pthread_mutex_lock(&prefetchcache_mutex);
1477       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1478         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1479         pthread_exit(NULL);
1480       }
1481       pthread_mutex_unlock(&prefetchcache_mutex);
1482       recv_data(sd, objheader, size);
1483       prehashInsert(oid, objheader);
1484       return TYPE(objheader);
1485 #else
1486       char *buffer;
1487       if((buffer = calloc(1, size)) == NULL) {
1488         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1489         fflush(stdout);
1490         return 0;
1491       }
1492       recv_data(sd, buffer, size);
1493       objheader = (objheader_t *)buffer;
1494       unsigned short type = TYPE(objheader);
1495       free(buffer);
1496       return type;
1497 #endif
1498     }
1499 #ifdef CACHE
1500   }
1501 #endif
1502   }
1503   return TYPE(objheader);
1504 }
1505
1506 int startRemoteThread(unsigned int oid, unsigned int mid) {
1507   int sock;
1508   struct sockaddr_in remoteAddr;
1509   char msg[1 + sizeof(unsigned int)];
1510   int bytesSent;
1511   int status;
1512
1513   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1514     perror("startRemoteThread():socket()");
1515     return -1;
1516   }
1517
1518   bzero(&remoteAddr, sizeof(remoteAddr));
1519   remoteAddr.sin_family = AF_INET;
1520   remoteAddr.sin_port = htons(LISTEN_PORT);
1521   remoteAddr.sin_addr.s_addr = htonl(mid);
1522
1523   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1524     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1525            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1526     status = -1;
1527   } else
1528   {
1529     msg[0] = START_REMOTE_THREAD;
1530     *((unsigned int *) &msg[1]) = oid;
1531     send_data(sock, msg, 1 + sizeof(unsigned int));
1532   }
1533
1534   close(sock);
1535   return status;
1536 }
1537
1538 //TODO: when reusing oids, make sure they are not already in use!
1539 static unsigned int id = 0xFFFFFFFF;
1540 unsigned int getNewOID(void) {
1541   id += 2;
1542   if (id > oidMax || id < oidMin) {
1543     id = (oidMin | 1);
1544   }
1545   return id;
1546 }
1547
1548 int processConfigFile() {
1549   FILE *configFile;
1550   const int maxLineLength = 200;
1551   char lineBuffer[maxLineLength];
1552   char *token;
1553   const char *delimiters = " \t\n";
1554   char *commentBegin;
1555   in_addr_t tmpAddr;
1556
1557   configFile = fopen(CONFIG_FILENAME, "r");
1558   if (configFile == NULL) {
1559     printf("error opening %s:\n", CONFIG_FILENAME);
1560     perror("");
1561     return -1;
1562   }
1563
1564   numHostsInSystem = 0;
1565   sizeOfHostArray = 8;
1566   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1567
1568   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1569     commentBegin = strchr(lineBuffer, '#');
1570     if (commentBegin != NULL)
1571       *commentBegin = '\0';
1572     token = strtok(lineBuffer, delimiters);
1573     while (token != NULL) {
1574       tmpAddr = inet_addr(token);
1575       if ((int)tmpAddr == -1) {
1576         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1577         fclose(configFile);
1578         return -1;
1579       } else
1580         addHost(htonl(tmpAddr));
1581       token = strtok(NULL, delimiters);
1582     }
1583   }
1584
1585   fclose(configFile);
1586
1587   if (numHostsInSystem < 1) {
1588     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1589     return -1;
1590   }
1591 #ifdef MAC
1592   myIpAddr = getMyIpAddr("en1");
1593 #else
1594   myIpAddr = getMyIpAddr("eth0");
1595 #endif
1596   myIndexInHostArray = findHost(myIpAddr);
1597   if (myIndexInHostArray == -1) {
1598     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1599     return -1;
1600   }
1601   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1602   oidMin = oidsPerBlock * myIndexInHostArray;
1603   if (myIndexInHostArray == numHostsInSystem - 1)
1604     oidMax = 0xFFFFFFFF;
1605   else
1606     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1607
1608   return 0;
1609 }
1610
1611 void addHost(unsigned int hostIp) {
1612   unsigned int *tmpArray;
1613
1614   if (findHost(hostIp) != -1)
1615     return;
1616
1617   if (numHostsInSystem == sizeOfHostArray) {
1618     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1619     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1620     free(hostIpAddrs);
1621     hostIpAddrs = tmpArray;
1622   }
1623
1624   hostIpAddrs[numHostsInSystem++] = hostIp;
1625
1626   return;
1627 }
1628
1629 int findHost(unsigned int hostIp) {
1630   int i;
1631   for (i = 0; i < numHostsInSystem; i++)
1632     if (hostIpAddrs[i] == hostIp)
1633       return i;
1634
1635   //not found
1636   return -1;
1637 }
1638
1639 /* This function sends notification request per thread waiting on object(s) whose version
1640  * changes */
1641 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1642   int sock,i;
1643   objheader_t *objheader;
1644   struct sockaddr_in remoteAddr;
1645   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1646   char *ptr;
1647   int bytesSent;
1648   int status, size;
1649   unsigned short version;
1650   unsigned int oid,mid;
1651   static unsigned int threadid = 0;
1652   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1653   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1654   notifydata_t *ndata;
1655
1656   oid = oidarry[0];
1657   if((mid = lhashSearch(oid)) == 0) {
1658     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1659     return;
1660   }
1661
1662   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1663     perror("reqNotify():socket()");
1664     return -1;
1665   }
1666
1667   bzero(&remoteAddr, sizeof(remoteAddr));
1668   remoteAddr.sin_family = AF_INET;
1669   remoteAddr.sin_port = htons(LISTEN_PORT);
1670   remoteAddr.sin_addr.s_addr = htonl(mid);
1671
1672   /* Generate unique threadid */
1673   threadid++;
1674
1675   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1676   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1677     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1678     return -1;
1679   }
1680   ndata->numoid = numoid;
1681   ndata->threadid = threadid;
1682   ndata->oidarry = oidarry;
1683   ndata->versionarry = versionarry;
1684   ndata->threadcond = threadcond;
1685   ndata->threadnotify = threadnotify;
1686   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1687     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1688     free(ndata);
1689     return -1;
1690   }
1691
1692   /* Send  number of oids, oidarry, version array, machine id and threadid */
1693   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1694     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1695            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1696     free(ndata);
1697     return -1;
1698   } else {
1699     msg[0] = THREAD_NOTIFY_REQUEST;
1700     *((unsigned int *)(&msg[1])) = numoid;
1701     /* Send array of oids  */
1702     size = sizeof(unsigned int);
1703     {
1704       i = 0;
1705       while(i < numoid) {
1706         oid = oidarry[i];
1707         *((unsigned int *)(&msg[1] + size)) = oid;
1708         size += sizeof(unsigned int);
1709         i++;
1710       }
1711     }
1712
1713     /* Send array of version  */
1714     {
1715       i = 0;
1716       while(i < numoid) {
1717         version = versionarry[i];
1718         *((unsigned short *)(&msg[1] + size)) = version;
1719         size += sizeof(unsigned short);
1720         i++;
1721       }
1722     }
1723
1724     *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1725     size += sizeof(unsigned int);
1726     *((unsigned int *)(&msg[1] + size)) = threadid;
1727     pthread_mutex_lock(&(ndata->threadnotify));
1728     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1729     send_data(sock, msg, size);
1730     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1731     pthread_mutex_unlock(&(ndata->threadnotify));
1732   }
1733
1734   pthread_cond_destroy(&threadcond);
1735   pthread_mutex_destroy(&threadnotify);
1736   free(ndata);
1737   close(sock);
1738   return status;
1739 }
1740
1741 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1742   notifydata_t *ndata;
1743   int i, objIsFound = 0, index;
1744   void *ptr;
1745
1746   //Look up the tid and call the corresponding pthread_cond_signal
1747   if((ndata = notifyhashSearch(tid)) == NULL) {
1748     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1749     return;
1750   } else  {
1751     for(i = 0; i < ndata->numoid; i++) {
1752       if(ndata->oidarry[i] == oid) {
1753         objIsFound = 1;
1754         index = i;
1755       }
1756     }
1757     if(objIsFound == 0) {
1758       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1759       return;
1760     } else {
1761       if(version <= ndata->versionarry[index]) {
1762         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
1763         return;
1764       } else {
1765 #ifdef CACHE
1766         /* Clear from prefetch cache and free thread related data structure */
1767         if((ptr = prehashSearch(oid)) != NULL) {
1768           prehashRemove(oid);
1769         }
1770 #endif
1771         pthread_cond_signal(&(ndata->threadcond));
1772       }
1773     }
1774   }
1775   return;
1776 }
1777
1778 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1779   threadlist_t *ptr;
1780   unsigned int mid;
1781   struct sockaddr_in remoteAddr;
1782   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1783   int sock, status, size, bytesSent;
1784
1785   while(*head != NULL) {
1786     ptr = *head;
1787     mid = ptr->mid;
1788     //create a socket connection to that machine
1789     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1790       perror("notifyAll():socket()");
1791       return -1;
1792     }
1793
1794     bzero(&remoteAddr, sizeof(remoteAddr));
1795     remoteAddr.sin_family = AF_INET;
1796     remoteAddr.sin_port = htons(LISTEN_PORT);
1797     remoteAddr.sin_addr.s_addr = htonl(mid);
1798     //send Thread Notify response and threadid to that machine
1799     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1800       printf("notifyAll():error %d connecting to %s:%d\n", errno,
1801              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1802       fflush(stdout);
1803       status = -1;
1804     } else {
1805       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1806       msg[0] = THREAD_NOTIFY_RESPONSE;
1807       *((unsigned int *)&msg[1]) = oid;
1808       size = sizeof(unsigned int);
1809       *((unsigned short *)(&msg[1]+ size)) = version;
1810       size+= sizeof(unsigned short);
1811       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1812
1813       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1814       send_data(sock, msg, size);
1815     }
1816     //close socket
1817     close(sock);
1818     // Update head
1819     *head = ptr->next;
1820     free(ptr);
1821   }
1822   return status;
1823 }
1824
1825 void transAbort(transrecord_t *trans) {
1826   objstrDelete(trans->cache);
1827   chashDelete(trans->lookupTable);
1828   free(trans);
1829 }
1830
1831 /* This function inserts necessary information into
1832  * a machine pile data structure */
1833 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
1834   plistnode_t *ptr, *tmp;
1835   int found = 0, offset = 0;
1836
1837   tmp = pile;
1838   //Add oid into a machine that is already present in the pile linked list structure
1839   while(tmp != NULL) {
1840     if (tmp->mid == mid) {
1841       int tmpsize;
1842
1843       if (STATUS(headeraddr) & NEW) {
1844         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
1845         tmp->numcreated++;
1846         GETSIZE(tmpsize, headeraddr);
1847         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1848       } else if (STATUS(headeraddr) & DIRTY) {
1849         tmp->oidmod[tmp->nummod] = OID(headeraddr);
1850         tmp->nummod++;
1851         GETSIZE(tmpsize, headeraddr);
1852         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1853       } else {
1854         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
1855         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
1856         offset += sizeof(unsigned int);
1857         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
1858         tmp->numread++;
1859       }
1860       found = 1;
1861       break;
1862     }
1863     tmp = tmp->next;
1864   }
1865   //Add oid for any new machine
1866   if (!found) {
1867     int tmpsize;
1868     if((ptr = pCreate(num_objs)) == NULL) {
1869       return NULL;
1870     }
1871     ptr->mid = mid;
1872     if (STATUS(headeraddr) & NEW) {
1873       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
1874       ptr->numcreated++;
1875       GETSIZE(tmpsize, headeraddr);
1876       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1877     } else if (STATUS(headeraddr) & DIRTY) {
1878       ptr->oidmod[ptr->nummod] = OID(headeraddr);
1879       ptr->nummod++;
1880       GETSIZE(tmpsize, headeraddr);
1881       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1882     } else {
1883       *((unsigned int *)ptr->objread)=OID(headeraddr);
1884       offset = sizeof(unsigned int);
1885       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
1886       ptr->numread++;
1887     }
1888     ptr->next = pile;
1889     pile = ptr;
1890   }
1891
1892   /* Clear Flags */
1893   STATUS(headeraddr) =0;
1894
1895
1896   return pile;
1897 }