batch communications for prefetches
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <netinet/tcp.h>
5 #include "dstm.h"
6 #include "mlookup.h"
7 #include "llookup.h"
8 #include "threadnotify.h"
9 #include "prefetch.h"
10 #include <sched.h>
11 #ifdef COMPILER
12 #include "thread.h"
13 #endif
14 #include "gCollect.h"
15
16 #define BACKLOG 10 //max pending connections
17 #define RECEIVE_BUFFER_SIZE 2048
18
19 extern int classsize[];
20 extern int numHostsInSystem;
21 extern pthread_mutex_t notifymutex;
22
23 objstr_t *mainobjstore;
24 pthread_mutex_t mainobjstore_mutex;
25 pthread_mutex_t lockObjHeader;
26 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
27
28 sockPoolHashTable_t *transPResponseSocketPool;
29
30 /* This function initializes the main objects store and creates the
31  * global machine and location lookup table */
32
33 int dstmInit(void) {
34   mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
35   /* Initialize attribute for mutex */
36   pthread_mutexattr_init(&mainobjstore_mutex_attr);
37   pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
38   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
39   pthread_mutex_init(&lockObjHeader,NULL);
40   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
41     return 1;             //failure
42
43   if (lhashCreate(HASH_SIZE, LOADFACTOR))
44     return 1;             //failure
45
46   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
47     return 1;             //failure
48
49   //Initialize socket pool
50   if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, DEFAULTSOCKPOOLSIZE)) == NULL) {
51     printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
52     return 0;
53   }
54
55   return 0;
56 }
57
58
59 int startlistening() {
60   int listenfd;
61   struct sockaddr_in my_addr;
62   socklen_t addrlength = sizeof(struct sockaddr);
63   int setsockflag=1;
64
65   listenfd = socket(AF_INET, SOCK_STREAM, 0);
66   if (listenfd == -1) {
67     perror("socket");
68     exit(1);
69   }
70
71   if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
72     perror("socket");
73     exit(1);
74   }
75 #ifdef MAC
76   if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
77     perror("socket");
78     exit(1);
79   }
80 #endif
81
82   my_addr.sin_family = AF_INET;
83   my_addr.sin_port = htons(LISTEN_PORT);
84   my_addr.sin_addr.s_addr = INADDR_ANY;
85   memset(&(my_addr.sin_zero), '\0', 8);
86
87   if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
88     perror("bind");
89     exit(1);
90   }
91
92   if (listen(listenfd, BACKLOG) == -1) {
93     perror("listen");
94     exit(1);
95   }
96   return listenfd;
97 }
98
99 /* This function starts the thread to listen on a socket
100  * for tranaction calls */
101 void *dstmListen(void *lfd) {
102   int listenfd=(int)lfd;
103   int acceptfd;
104   struct sockaddr_in client_addr;
105   socklen_t addrlength = sizeof(struct sockaddr);
106   pthread_t thread_dstm_accept;
107
108   printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
109   while(1) {
110     int retval;
111     int flag=1;
112     acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
113     setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
114     do {
115       retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
116     } while(retval!=0);
117     pthread_detach(thread_dstm_accept);
118   }
119 }
120 /* This function accepts a new connection request, decodes the control message in the connection
121  * and accordingly calls other functions to process new requests */
122 void *dstmAccept(void *acceptfd) {
123   int val, retval, size, sum, sockid;
124   unsigned int oid;
125   char *buffer;
126   char control,ctrl;
127   char *ptr;
128   void *srcObj;
129   objheader_t *h;
130   trans_commit_data_t transinfo;
131   unsigned short objType, *versionarry, version;
132   unsigned int *oidarry, numoid, mid, threadid;
133
134   /* Receive control messages from other machines */
135   while(1) {
136     int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
137     if (ret==0)
138       break;
139     if (ret==-1) {
140       printf("DEBUG -> RECV Error!.. retrying\n");
141       break;
142     }
143     switch(control) {
144     case READ_REQUEST:
145       /* Read oid requested and search if available */
146       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
147       while((srcObj = mhashSearch(oid)) == NULL) {
148         int ret;
149         if((ret = sched_yield()) != 0) {
150           printf("%s(): error no %d in thread yield\n", __func__, errno);
151         }
152       }
153       h = (objheader_t *) srcObj;
154       GETSIZE(size, h);
155       size += sizeof(objheader_t);
156       sockid = (int) acceptfd;
157       if (h == NULL) {
158         ctrl = OBJECT_NOT_FOUND;
159         send_data(sockid, &ctrl, sizeof(char));
160       } else {
161         // Type
162         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
163         *((int *)&msg[1])=size;
164         send_data(sockid, &msg, sizeof(msg));
165         send_data(sockid, h, size);
166       }
167       break;
168
169     case READ_MULT_REQUEST:
170       break;
171
172     case MOVE_REQUEST:
173       break;
174
175     case MOVE_MULT_REQUEST:
176       break;
177
178     case TRANS_REQUEST:
179       /* Read transaction request */
180       transinfo.objlocked = NULL;
181       transinfo.objnotfound = NULL;
182       transinfo.modptr = NULL;
183       transinfo.numlocked = 0;
184       transinfo.numnotfound = 0;
185       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
186         printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
187         pthread_exit(NULL);
188       }
189       break;
190
191     case TRANS_PREFETCH:
192 #ifdef RANGEPREFETCH
193       if((val = rangePrefetchReq((int)acceptfd)) != 0) {
194         printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
195         break;
196       }
197 #else
198       if((val = prefetchReq((int)acceptfd)) != 0) {
199         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
200         break;
201       }
202 #endif
203       break;
204
205     case TRANS_PREFETCH_RESPONSE:
206 #ifdef RANGEPREFETCH
207       if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
208         printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
209         break;
210       }
211 #else
212       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
213         printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
214         break;
215       }
216 #endif
217       break;
218
219     case START_REMOTE_THREAD:
220       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
221       objType = getObjType(oid);
222       startDSMthread(oid, objType);
223       break;
224
225     case THREAD_NOTIFY_REQUEST:
226       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
227       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
228       if((buffer = calloc(1,size)) == NULL) {
229         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
230         pthread_exit(NULL);
231       }
232
233       recv_data((int)acceptfd, buffer, size);
234
235       oidarry = calloc(numoid, sizeof(unsigned int));
236       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
237       size = sizeof(unsigned int) * numoid;
238       versionarry = calloc(numoid, sizeof(unsigned short));
239       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
240       size += sizeof(unsigned short) * numoid;
241       mid = *((unsigned int *)(buffer+size));
242       size += sizeof(unsigned int);
243       threadid = *((unsigned int *)(buffer+size));
244       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
245       free(buffer);
246
247       break;
248
249     case THREAD_NOTIFY_RESPONSE:
250       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
251       if((buffer = calloc(1,size)) == NULL) {
252         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
253         pthread_exit(NULL);
254       }
255
256       recv_data((int)acceptfd, buffer, size);
257
258       oid = *((unsigned int *)buffer);
259       size = sizeof(unsigned int);
260       version = *((unsigned short *)(buffer+size));
261       size += sizeof(unsigned short);
262       threadid = *((unsigned int *)(buffer+size));
263       threadNotify(oid,version,threadid);
264       free(buffer);
265       break;
266
267     case CLOSE_CONNECTION:
268       goto closeconnection;
269
270     default:
271       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
272     }
273   }
274
275 closeconnection:
276   /* Close connection */
277   if (close((int)acceptfd) == -1)
278     perror("close");
279   pthread_exit(NULL);
280 }
281
282 /* This function reads the information available in a transaction request
283  * and makes a function call to process the request */
284 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
285   char *ptr;
286   void *modptr;
287   unsigned int *oidmod, oid;
288   fixed_data_t fixed;
289   objheader_t *headaddr;
290   int sum, i, size, n, val;
291
292   oidmod = NULL;
293
294   /* Read fixed_data_t data structure */
295   size = sizeof(fixed) - 1;
296   ptr = (char *)&fixed;;
297   fixed.control = TRANS_REQUEST;
298   recv_data((int)acceptfd, ptr+1, size);
299
300   /* Read list of mids */
301   int mcount = fixed.mcount;
302   size = mcount * sizeof(unsigned int);
303   unsigned int listmid[mcount];
304   ptr = (char *) listmid;
305   recv_data((int)acceptfd, ptr, size);
306
307   /* Read oid and version tuples for those objects that are not modified in the transaction */
308   int numread = fixed.numread;
309   size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
310   char objread[size];
311   if(numread != 0) { //If pile contains more than one object to be read,
312     // keep reading all objects
313     recv_data((int)acceptfd, objread, size);
314   }
315
316   /* Read modified objects */
317   if(fixed.nummod != 0) {
318     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
319       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
320       return 1;
321     }
322     size = fixed.sum_bytes;
323     recv_data((int)acceptfd, modptr, size);
324   }
325
326   /* Create an array of oids for modified objects */
327   oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
328   if (oidmod == NULL) {
329     printf("calloc error %s, %d\n", __FILE__, __LINE__);
330     return 1;
331   }
332   ptr = (char *) modptr;
333   for(i = 0 ; i < fixed.nummod; i++) {
334     int tmpsize;
335     headaddr = (objheader_t *) ptr;
336     oid = OID(headaddr);
337     oidmod[i] = oid;
338     GETSIZE(tmpsize, headaddr);
339     ptr += sizeof(objheader_t) + tmpsize;
340   }
341
342   /*Process the information read */
343   if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
344     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
345     /* Free resources */
346     if(oidmod != NULL) {
347       free(oidmod);
348     }
349     return 1;
350   }
351
352   /* Free resources */
353   if(oidmod != NULL) {
354     free(oidmod);
355   }
356
357   return 0;
358 }
359
360 /* This function processes the Coordinator's transaction request using "handleTransReq"
361  * function and sends a reply to the co-ordinator.
362  * Following this it also receives a new control message from the co-ordinator and processes this message*/
363 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
364                      unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
365
366   char control, sendctrl, retval;
367   objheader_t *tmp_header;
368   void *header;
369   int i = 0, val;
370
371   /* Send reply to the Coordinator */
372   if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
373     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
374     return 1;
375   }
376
377   recv_data((int)acceptfd, &control, sizeof(char));
378   /* Process the new control message */
379   switch(control) {
380   case TRANS_ABORT:
381     if (fixed->nummod > 0)
382       free(modptr);
383     /* Unlock objects that was locked due to this transaction */
384     int useWriteUnlock = 0;
385     for(i = 0; i< transinfo->numlocked; i++) {
386       if(transinfo->objlocked[i] == -1) {
387         useWriteUnlock = 1;
388         continue;
389       }
390       if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
391         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
392         return 1;
393       }
394       if(useWriteUnlock) {
395         write_unlock(STATUSPTR(header));
396       } else {
397         read_unlock(STATUSPTR(header));
398       }
399     }
400     break;
401
402   case TRANS_COMMIT:
403     /* Invoke the transCommit process() */
404     if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
405       printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
406       /* Free memory */
407       if (transinfo->objlocked != NULL) {
408         free(transinfo->objlocked);
409       }
410       if (transinfo->objnotfound != NULL) {
411         free(transinfo->objnotfound);
412       }
413       return 1;
414     }
415     break;
416
417   case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
418     break;
419
420   default:
421     printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
422     //TODO Use fixed.trans_id  TID since Client may have died
423     break;
424   }
425
426   /* Free memory */
427   if (transinfo->objlocked != NULL) {
428     free(transinfo->objlocked);
429   }
430   if (transinfo->objnotfound != NULL) {
431     free(transinfo->objnotfound);
432   }
433
434   return 0;
435 }
436
437 /* This function increments counters while running a voting decision on all objects involved
438  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
439 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
440   int val, i = 0, j;
441   unsigned short version;
442   char control = 0, *ptr;
443   unsigned int oid;
444   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
445   objheader_t *headptr;
446
447   /* Counters and arrays to formulate decision on control message to be sent */
448   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
449   oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
450   oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
451   int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
452   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
453   int numBytes = 0;
454   /* modptr points to the beginning of the object store
455    * created at the Pariticipant.
456    * Object store holds the modified objects involved in the transaction request */
457   ptr = (char *) modptr;
458
459   /* Process each oid in the machine pile/ group per thread */
460   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
461     if (i < fixed->numread) { //Objs only read and not modified
462       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
463       incr *= i;
464       oid = *((unsigned int *)(objread + incr));
465       incr += sizeof(unsigned int);
466       version = *((unsigned short *)(objread + incr));
467       getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
468                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
469     } else {  //Objs modified
470       if(i == fixed->numread) {
471         oidlocked[objlocked++] = -1;
472       }
473       int tmpsize;
474       headptr = (objheader_t *) ptr;
475       oid = OID(headptr);
476       version = headptr->version;
477       GETSIZE(tmpsize, headptr);
478       ptr += sizeof(objheader_t) + tmpsize;
479       getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
480                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
481                               &numBytes, &control, oid, version);
482     }
483   }
484
485   /* send TRANS_DISAGREE and objs*/
486   if(v_nomatch > 0) {
487 #ifdef CACHE
488     char *objs = calloc(1, numBytes);
489     int j, offset = 0;
490     for(j = 0; j<objvernotmatch; j++) {
491       objheader_t *header = mhashSearch(oidvernotmatch[j]);
492       int size = 0;
493       GETSIZE(size, header);
494       size += sizeof(objheader_t);
495       memcpy(objs+offset, header, size);
496       offset += size;
497     }
498 #endif
499     if (objlocked > 0) {
500       int useWriteUnlock = 0;
501       for(j = 0; j < objlocked; j++) {
502         if(oidlocked[j] == -1) {
503           useWriteUnlock = 1;
504           continue;
505         }
506         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
507           printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
508           return 0;
509         }
510         if(useWriteUnlock) {
511           write_unlock(STATUSPTR(headptr));
512         } else {
513           read_unlock(STATUSPTR(headptr));
514         }
515       }
516       free(oidlocked);
517     }
518     send_data(acceptfd, &control, sizeof(char));
519 #ifdef CACHE
520     send_data(acceptfd, &numBytes, sizeof(int));
521     send_data(acceptfd, objs, numBytes);
522     transinfo->objvernotmatch = oidvernotmatch;
523     transinfo->numvernotmatch = objvernotmatch;
524     free(objs);
525     free(transinfo->objvernotmatch);
526 #endif
527     return control;
528   }
529
530   /* Decide what control message to send to Coordinator */
531   if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
532                                    modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
533     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
534     return 0;
535   }
536   return control;
537 }
538
539 /* Update Commit info for objects that are read */
540 void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
541                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
542                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
543                              char *control, unsigned int oid, unsigned short version) {
544   void *mobj;
545   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
546
547   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
548     /* Save the oids not found and number of oids not found for later use */
549     oidnotfound[*objnotfound] = oid;
550     (*objnotfound)++;
551   } else {     /* If Obj found in machine (i.e. has not moved) */
552     /* Check if Obj is locked by any previous transaction */
553     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
554       if (version == ((objheader_t *)mobj)->version) { /* match versions */
555         (*v_matchnolock)++;
556       } else { /* If versions don't match ...HARD ABORT */
557         (*v_nomatch)++;
558         oidvernotmatch[*objvernotmatch] = oid;
559         (*objvernotmatch)++;
560         int size;
561         GETSIZE(size, mobj);
562         size += sizeof(objheader_t);
563         *numBytes += size;
564         /* Send TRANS_DISAGREE to Coordinator */
565         *control = TRANS_DISAGREE;
566         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
567       }
568       //Keep track of oid locked
569       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
570     } else {  //we are locked
571       if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
572         (*v_matchlock)++;
573       } else { /* If versions don't match ...HARD ABORT */
574         (*v_nomatch)++;
575         oidvernotmatch[*objvernotmatch] = oid;
576         (*objvernotmatch)++;
577         int size;
578         GETSIZE(size, mobj);
579         size += sizeof(objheader_t);
580         *numBytes += size;
581         *control = TRANS_DISAGREE;
582         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
583       }
584     }
585   }
586 }
587
588 /* Update Commit info for objects that are read */
589 void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
590                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
591                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
592   void *mobj;
593   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
594   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
595     /* Save the oids not found and number of oids not found for later use */
596     oidnotfound[*objnotfound] = oid;
597     (*objnotfound)++;
598   } else {     /* If Obj found in machine (i.e. has not moved) */
599     /* Check if Obj is locked by any previous transaction */
600     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
601       if (version == ((objheader_t *)mobj)->version) { /* match versions */
602         (*v_matchnolock)++;
603       } else { /* If versions don't match ...HARD ABORT */
604         (*v_nomatch)++;
605         oidvernotmatch[(*objvernotmatch)++] = oid;
606         int size;
607         GETSIZE(size, mobj);
608         size += sizeof(objheader_t);
609         *numBytes += size;
610         /* Send TRANS_DISAGREE to Coordinator */
611         *control = TRANS_DISAGREE;
612         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
613       }
614       //Keep track of oid locked
615       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
616     } else { /* Some other transaction has aquired a write lock on this object */
617       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
618         (*v_matchlock)++;
619       } else { /* If versions don't match ...HARD ABORT */
620         (*v_nomatch)++;
621         oidvernotmatch[*objvernotmatch] = oid;
622         (*objvernotmatch)++;
623         int size;
624         GETSIZE(size, mobj);
625         size += sizeof(objheader_t);
626         *numBytes += size;
627         *control = TRANS_DISAGREE;
628         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
629       }
630     }
631   }
632 }
633
634 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
635  * to send to Coordinator based on the votes of oids involved in the transaction */
636 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
637                        int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
638                        unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
639   int val;
640   char control = 0;
641
642   /* Condition to send TRANS_AGREE */
643   if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
644     control = TRANS_AGREE;
645     /* Send control message */
646     send_data(acceptfd, &control, sizeof(char));
647   }
648   /* Condition to send TRANS_SOFT_ABORT */
649   if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
650     control = TRANS_SOFT_ABORT;
651
652     /* Send control message */
653     send_data(acceptfd, &control, sizeof(char));
654
655     /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
656     if(*(objnotfound) != 0) {
657       int msg[1];
658       msg[0] = *(objnotfound);
659       send_data(acceptfd, &msg, sizeof(int));
660       int size = sizeof(unsigned int)* *(objnotfound);
661       send_data(acceptfd, oidnotfound, size);
662     }
663   }
664
665   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
666    * if Participant receives a TRANS_COMMIT */
667   transinfo->objlocked = oidlocked;
668   transinfo->objnotfound = oidnotfound;
669   transinfo->modptr = modptr;
670   transinfo->numlocked = *(objlocked);
671   transinfo->numnotfound = *(objnotfound);
672   return control;
673 }
674
675 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
676  * addresses in lookup table and also changes version number
677  * Sends an ACK back to Coordinator */
678 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
679   objheader_t *header;
680   objheader_t *newheader;
681   int i = 0, offset = 0;
682   char control;
683   int tmpsize;
684
685   /* Process each modified object saved in the mainobject store */
686   for(i = 0; i < nummod; i++) {
687     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
688       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
689       return 1;
690     }
691     GETSIZE(tmpsize,header);
692
693     {
694       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
695       struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
696       dst->type=src->type;
697       dst->___cachedCode___=src->___cachedCode___;
698       dst->___cachedHash___=src->___cachedHash___;
699       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
700     }
701     header->version += 1;
702     /* If threads are waiting on this object to be updated, notify them */
703     if(header->notifylist != NULL) {
704       notifyAll(&header->notifylist, OID(header), header->version);
705     }
706     offset += sizeof(objheader_t) + tmpsize;
707   }
708
709   if (nummod > 0)
710     free(modptr);
711
712   /* Unlock locked objects */
713   int useWriteUnlock = 0;
714   for(i = 0; i < numlocked; i++) {
715     if(oidlocked[i] == -1) {
716       useWriteUnlock = 1;
717       continue;
718     }
719     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
720       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
721       return 1;
722     }
723
724     if(useWriteUnlock) {
725       write_unlock(STATUSPTR(header));
726     } else {
727       read_unlock(STATUSPTR(header));
728     }
729   }
730   //TODO Update location lookup table
731   return 0;
732 }
733
734 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
735  * Looks for the objects to be prefetched in the main object store.
736  * If objects are not found then record those and if objects are found
737  * then use offset values to prefetch references to other objects */
738
739 int prefetchReq(int acceptfd) {
740   int i, size, objsize, numoffset = 0;
741   int length;
742   char *recvbuffer, control;
743   unsigned int oid, mid=-1;
744   objheader_t *header;
745   oidmidpair_t oidmid;
746   int sd = -1;
747
748   while(1) {
749     recv_data((int)acceptfd, &numoffset, sizeof(int));
750     if(numoffset == -1)
751       break;
752     recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
753     oid = oidmid.oid;
754     if (mid != oidmid.mid) {
755       if (mid!=-1) {
756         freeSockWithLock(transPResponseSocketPool, mid, sd);
757       }
758       mid=oidmid.mid;
759       sd = getSockWithLock(transPResponseSocketPool, mid);
760     }
761     short offsetarry[numoffset];
762     recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
763
764     /*Process each oid */
765     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
766       /* Save the oids not found in buffer for later use */
767       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
768       char sendbuffer[size];
769       *((int *) sendbuffer) = size;
770       *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
771       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
772       control = TRANS_PREFETCH_RESPONSE;
773       sendPrefetchResponse(sd, &control, sendbuffer, &size);
774     } else { /* Object Found */
775       int incr = 0;
776       GETSIZE(objsize, header);
777       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
778       char sendbuffer[size];
779       *((int *)(sendbuffer + incr)) = size;
780       incr += sizeof(int);
781       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
782       incr += sizeof(char);
783       *((unsigned int *)(sendbuffer+incr)) = oid;
784       incr += sizeof(unsigned int);
785       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
786
787       control = TRANS_PREFETCH_RESPONSE;
788       sendPrefetchResponse(sd, &control, sendbuffer, &size);
789
790       /* Calculate the oid corresponding to the offset value */
791       for(i = 0 ; i< numoffset ; i++) {
792         /* Check for arrays  */
793         if(TYPE(header) >= NUMCLASSES) {
794           int elementsize = classsize[TYPE(header)];
795           struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
796           unsigned short length = ao->___length___;
797           /* Check if array out of bounds */
798           if(offsetarry[i]< 0 || offsetarry[i] >= length) {
799             break;
800           }
801           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
802         } else {
803           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
804         }
805
806         /* Don't continue if we hit a NULL pointer */
807         if (oid==0)
808           break;
809
810         if((header = mhashSearch(oid)) == NULL) {
811           size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
812           char sendbuffer[size];
813           *((int *) sendbuffer) = size;
814           *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
815           *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
816
817           control = TRANS_PREFETCH_RESPONSE;
818           sendPrefetchResponse(sd, &control, sendbuffer, &size);
819           break;
820         } else { /* Obj Found */
821           int incr = 0;
822           GETSIZE(objsize, header);
823           size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
824           char sendbuffer[size];
825           *((int *)(sendbuffer + incr)) = size;
826           incr += sizeof(int);
827           *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
828           incr += sizeof(char);
829           *((unsigned int *)(sendbuffer+incr)) = oid;
830           incr += sizeof(unsigned int);
831           memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
832
833           control = TRANS_PREFETCH_RESPONSE;
834           sendPrefetchResponse(sd, &control, sendbuffer, &size);
835         }
836       } //end of for
837     }
838   } //end of while
839     //Release socket
840   if (mid!=-1)
841     freeSockWithLock(transPResponseSocketPool, mid, sd);
842
843   return 0;
844 }
845
846 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
847   send_data(sd, control, sizeof(char));
848   /* Send the buffer with its size */
849   int length = *(size);
850   send_data(sd, sendbuffer, length);
851 }
852
853 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
854   objheader_t *header;
855   unsigned int oid;
856   unsigned short newversion;
857   char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
858   int sd;
859   struct sockaddr_in remoteAddr;
860   int bytesSent;
861   int size;
862   int i = 0;
863
864   while(i < numoid) {
865     oid = *(oidarry + i);
866     if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
867       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
868       return;
869     } else {
870       /* Check to see if versions are same */
871 checkversion:
872       if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
873         newversion = header->version;
874         if(newversion == *(versionarry + i)) {
875           //Add to the notify list
876           if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
877             printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
878             return;
879           }
880           write_unlock(STATUSPTR(header));
881         } else {
882           write_unlock(STATUSPTR(header));
883           if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
884             perror("processReqNotify():socket()");
885             return;
886           }
887           bzero(&remoteAddr, sizeof(remoteAddr));
888           remoteAddr.sin_family = AF_INET;
889           remoteAddr.sin_port = htons(LISTEN_PORT);
890           remoteAddr.sin_addr.s_addr = htonl(mid);
891
892           if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
893             printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
894                    inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
895             close(sd);
896             return;
897           } else {
898             //Send Update notification
899             msg[0] = THREAD_NOTIFY_RESPONSE;
900             *((unsigned int *)&msg[1]) = oid;
901             size = sizeof(unsigned int);
902             *((unsigned short *)(&msg[1]+size)) = newversion;
903             size += sizeof(unsigned short);
904             *((unsigned int *)(&msg[1]+size)) = threadid;
905             size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
906             send_data(sd, msg, size);
907           }
908           close(sd);
909         }
910       } else {
911         randomdelay();
912         goto checkversion;
913       }
914     }
915     i++;
916   }
917   free(oidarry);
918   free(versionarry);
919 }