40554000b6307f6eff36d58850e0fca41822efc8
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <netinet/tcp.h>
5 #include "dstm.h"
6 #include "altmlookup.h"
7 #include "llookup.h"
8 #include "threadnotify.h"
9 #include "prefetch.h"
10 #include <sched.h>
11 #ifdef COMPILER
12 #include "thread.h"
13 #endif
14 #include "gCollect.h"
15 #include "readstruct.h"
16 #include "debugmacro.h"
17 #ifdef SANDBOX
18 #include "sandbox.h"
19 #endif
20
21 #define BACKLOG 10 //max pending connections
22 #define RECEIVE_BUFFER_SIZE 2048
23
24 extern int classsize[];
25 extern int numHostsInSystem;
26 extern pthread_mutex_t notifymutex;
27 extern unsigned long long clockoffset;
28 long long startreq, endreq, diff;
29
30 //#define LOGTIMES
31 #ifdef LOGTIMES
32 extern char bigarray1[6*1024*1024];
33 extern unsigned int bigarray2[6*1024*1024];
34 extern unsigned int bigarray3[6*1024*1024];
35 extern long long bigarray4[6*1024*1024];
36 extern int bigarray5[6*1024*1024];
37 extern int bigindex1;
38 #define LOGTIME(x,y,z,a,b) {\
39   int tmp=bigindex1; \
40   bigarray1[tmp]=x; \
41   bigarray2[tmp]=y; \
42   bigarray3[tmp]=z; \
43   bigarray4[tmp]=a; \
44   bigarray5[tmp]=b; \
45   bigindex1++; \
46 }
47 #else
48 #define LOGTIME(x,y,z,a,b)
49 #endif
50
51
52 long long myrdtsc(void)
53 {
54   unsigned hi, lo; 
55   __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi));
56   return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
57 }
58
59 objstr_t *mainobjstore;
60 pthread_mutex_t mainobjstore_mutex;
61 pthread_mutex_t lockObjHeader;
62 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
63
64 sockPoolHashTable_t *transPResponseSocketPool;
65
66 /* This function initializes the main objects store and creates the
67  * global machine and location lookup table */
68
69 int dstmInit(void) {
70   mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
71   /* Initialize attribute for mutex */
72   pthread_mutexattr_init(&mainobjstore_mutex_attr);
73   pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
74   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
75   pthread_mutex_init(&lockObjHeader,NULL);
76   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
77     return 1;             //failure
78
79   if (lhashCreate(HASH_SIZE, LOADFACTOR))
80     return 1;             //failure
81
82   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
83     return 1;             //failure
84
85   //Initialize socket pool
86   if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, DEFAULTSOCKPOOLSIZE)) == NULL) {
87     printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
88     return 0;
89   }
90
91   return 0;
92 }
93
94
95 int startlistening() {
96   int listenfd;
97   struct sockaddr_in my_addr;
98   socklen_t addrlength = sizeof(struct sockaddr);
99   int setsockflag=1;
100
101   listenfd = socket(AF_INET, SOCK_STREAM, 0);
102   if (listenfd == -1) {
103     perror("socket");
104     exit(1);
105   }
106
107   if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
108     perror("socket");
109     exit(1);
110   }
111 #ifdef MAC
112   if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
113     perror("socket");
114     exit(1);
115   }
116 #endif
117
118   my_addr.sin_family = AF_INET;
119   my_addr.sin_port = htons(LISTEN_PORT);
120   my_addr.sin_addr.s_addr = INADDR_ANY;
121   memset(&(my_addr.sin_zero), '\0', 8);
122
123   if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
124     perror("bind");
125     exit(1);
126   }
127
128   if (listen(listenfd, BACKLOG) == -1) {
129     perror("listen");
130     exit(1);
131   }
132   return listenfd;
133 }
134
135 /* This function starts the thread to listen on a socket
136  * for tranaction calls */
137 void *dstmListen(void *lfd) {
138   int listenfd=(int)lfd;
139   int acceptfd;
140   struct sockaddr_in client_addr;
141   socklen_t addrlength = sizeof(struct sockaddr);
142   pthread_t thread_dstm_accept;
143
144   printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
145   while(1) {
146     int retval;
147     int flag=1;
148     acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
149     setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
150     do {
151       retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
152     } while(retval!=0);
153     pthread_detach(thread_dstm_accept);
154   }
155 }
156 /* This function accepts a new connection request, decodes the control message in the connection
157  * and accordingly calls other functions to process new requests */
158 void *dstmAccept(void *acceptfd) {
159   int val, retval, size, sum, sockid;
160   unsigned int oid;
161   char *buffer;
162   char control,ctrl;
163   void *srcObj;
164   objheader_t *h;
165   trans_commit_data_t transinfo;
166   unsigned short objType, *versionarry, version;
167   unsigned int *oidarry, numoid, mid, threadid;
168   struct readstruct readbuffer;
169   readbuffer.head=0;
170   readbuffer.tail=0;
171   unsigned int numread=0, nummod=0;
172 #ifdef SANDBOX
173   objData_t odata;
174   char *ptr;
175 #endif
176
177   /* Receive control messages from other machines */
178   while(1) {
179     int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
180     if (ret==0)
181       break;
182     if (ret==-1) {
183       printf("DEBUG -> RECV Error!.. retrying\n");
184       break;
185     }
186     switch(control) {
187     case READ_REQUEST:
188       /* Read oid requested and search if available */
189       recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
190       while((srcObj = mhashSearch(oid)) == NULL) {
191         int ret;
192         if((ret = sched_yield()) != 0) {
193           printf("%s(): error no %d in thread yield\n", __func__, errno);
194         }
195       }
196       h = (objheader_t *) srcObj;
197       GETSIZE(size, h);
198       size += sizeof(objheader_t);
199       sockid = (int) acceptfd;
200       if (h == NULL) {
201         ctrl = OBJECT_NOT_FOUND;
202         send_data(sockid, &ctrl, sizeof(char));
203       } else {
204         // Type
205         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
206         *((int *)&msg[1])=size;
207         send_data(sockid, &msg, sizeof(msg));
208         send_data(sockid, h, size);
209       }
210       break;
211
212     case READ_MULT_REQUEST:
213       break;
214
215     case MOVE_REQUEST:
216       break;
217
218     case MOVE_MULT_REQUEST:
219       break;
220
221     case TRANS_REQUEST:
222       /* Read transaction request */
223       transinfo.objlocked = NULL;
224       transinfo.objnotfound = NULL;
225       transinfo.modptr = NULL;
226       transinfo.numlocked = 0;
227       transinfo.numnotfound = 0;
228       if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) {
229         printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
230         pthread_exit(NULL);
231       }
232       break;
233
234     case TRANS_PREFETCH:
235 #ifdef RANGEPREFETCH
236       if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) {
237         printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
238         break;
239       }
240 #else
241       LOGTIME('X',0,0,myrdtsc(),0);
242       if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
243         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
244         break;
245       }
246 #endif
247       break;
248
249     case TRANS_PREFETCH_RESPONSE:
250 #ifdef RANGEPREFETCH
251       if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) {
252         printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
253         break;
254       }
255 #else
256       if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) {
257         printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
258         break;
259       }
260 #endif
261       break;
262
263     case START_REMOTE_THREAD:
264       recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
265       objType = getObjType(oid);
266       startDSMthread(oid, objType);
267       break;
268
269     case THREAD_NOTIFY_REQUEST:
270       recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int));
271       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
272       if((buffer = calloc(1,size)) == NULL) {
273         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
274         pthread_exit(NULL);
275       }
276
277       recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
278
279       oidarry = calloc(numoid, sizeof(unsigned int));
280       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
281       size = sizeof(unsigned int) * numoid;
282       versionarry = calloc(numoid, sizeof(unsigned short));
283       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
284       size += sizeof(unsigned short) * numoid;
285       mid = *((unsigned int *)(buffer+size));
286       size += sizeof(unsigned int);
287       threadid = *((unsigned int *)(buffer+size));
288       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
289       free(buffer);
290       break;
291
292     case THREAD_NOTIFY_RESPONSE:
293       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
294       if((buffer = calloc(1,size)) == NULL) {
295         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
296         pthread_exit(NULL);
297       }
298
299       recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
300
301       oid = *((unsigned int *)buffer);
302       size = sizeof(unsigned int);
303       version = *((unsigned short *)(buffer+size));
304       size += sizeof(unsigned short);
305       threadid = *((unsigned int *)(buffer+size));
306       threadNotify(oid,version,threadid);
307       free(buffer);
308       break;
309
310 #ifdef SANDBOX
311     case CHECK_OBJECTS: // check if versions of objects match
312       size = sizeof(odata) - 1;
313       ptr = (char*)&odata;
314       recv_data_buf((int)acceptfd, &readbuffer, ptr+1, size);
315       numread = odata.numread;
316       nummod = odata.nummod;
317       checkObjVersion(&readbuffer, (int) acceptfd, numread, nummod);
318       break;
319 #endif
320
321     case CLOSE_CONNECTION:
322       goto closeconnection;
323
324     default:
325       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
326     }
327   }
328
329 closeconnection:
330   /* Close connection */
331   if (close((int)acceptfd) == -1)
332     perror("close");
333   pthread_exit(NULL);
334 }
335
336 /* This function reads the information available in a transaction request
337  * and makes a function call to process the request */
338 int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) {
339   char *ptr;
340   void *modptr;
341   unsigned int *oidmod, oid;
342   fixed_data_t fixed;
343   objheader_t *headaddr;
344   int sum, i, size, n, val;
345
346   oidmod = NULL;
347
348   /* Read fixed_data_t data structure */
349   size = sizeof(fixed) - 1;
350   ptr = (char *)&fixed;
351   fixed.control = TRANS_REQUEST;
352   recv_data_buf((int)acceptfd, readbuffer, ptr+1, size);
353
354   /* Read list of mids */
355   int mcount = fixed.mcount;
356   size = mcount * sizeof(unsigned int);
357   unsigned int listmid[mcount];
358   ptr = (char *) listmid;
359   recv_data_buf((int)acceptfd, readbuffer, ptr, size);
360
361   /* Read oid and version tuples for those objects that are not modified in the transaction */
362   int numread = fixed.numread;
363   size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
364   char objread[size];
365   if(numread != 0) { //If pile contains more than one object to be read,
366     // keep reading all objects
367     recv_data_buf((int)acceptfd, readbuffer, objread, size);
368   }
369
370   /* Read modified objects */
371   if(fixed.nummod != 0) {
372     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
373       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
374       return 1;
375     }
376     size = fixed.sum_bytes;
377     recv_data_buf((int)acceptfd, readbuffer, modptr, size);
378   }
379
380   /* Create an array of oids for modified objects */
381   oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
382   if (oidmod == NULL) {
383     printf("calloc error %s, %d\n", __FILE__, __LINE__);
384     return 1;
385   }
386   ptr = (char *) modptr;
387   for(i = 0 ; i < fixed.nummod; i++) {
388     int tmpsize;
389     headaddr = (objheader_t *) ptr;
390     oid = OID(headaddr);
391     oidmod[i] = oid;
392     GETSIZE(tmpsize, headaddr);
393     ptr += sizeof(objheader_t) + tmpsize;
394   }
395
396   /*Process the information read */
397   if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) {
398     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
399     /* Free resources */
400     if(oidmod != NULL) {
401       free(oidmod);
402     }
403     return 1;
404   }
405
406   /* Free resources */
407   if(oidmod != NULL) {
408     free(oidmod);
409   }
410
411   return 0;
412 }
413
414 /* This function processes the Coordinator's transaction request using "handleTransReq"
415  * function and sends a reply to the co-ordinator.
416  * Following this it also receives a new control message from the co-ordinator and processes this message*/
417 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
418                      unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) {
419
420   char control, sendctrl, retval;
421   objheader_t *tmp_header;
422   void *header;
423   int i = 0, val;
424
425   /* Send reply to the Coordinator */
426   if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
427     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
428     return 1;
429   }
430
431   recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char));
432   /* Process the new control message */
433   switch(control) {
434   case TRANS_ABORT:
435     if (fixed->nummod > 0)
436       free(modptr);
437     /* Unlock objects that was locked due to this transaction */
438     int useWriteUnlock = 0; //TODO verify is this piece of unlocking code ever used
439     for(i = 0; i< transinfo->numlocked; i++) {
440       if(transinfo->objlocked[i] == -1) {
441         useWriteUnlock = 1;
442         continue;
443       }
444       if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
445         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
446         return 1;
447       }
448       if(useWriteUnlock) {
449         write_unlock(STATUSPTR(header));
450       } else {
451         read_unlock(STATUSPTR(header));
452       }
453     }
454     break;
455
456   case TRANS_COMMIT:
457     /* Invoke the transCommit process() */
458     if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
459       printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
460       /* Free memory */
461       if (transinfo->objlocked != NULL) {
462         free(transinfo->objlocked);
463       }
464       if (transinfo->objnotfound != NULL) {
465         free(transinfo->objnotfound);
466       }
467       return 1;
468     }
469     break;
470
471   case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
472     break;
473
474   default:
475     printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
476     //TODO Use fixed.trans_id  TID since Client may have died
477     break;
478   }
479
480   /* Free memory */
481   if (transinfo->objlocked != NULL) {
482     free(transinfo->objlocked);
483   }
484   if (transinfo->objnotfound != NULL) {
485     free(transinfo->objnotfound);
486   }
487
488   return 0;
489 }
490
491 /* This function increments counters while running a voting decision on all objects involved
492  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
493 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
494   int val, i = 0, j;
495   unsigned short version;
496   char control = 0, *ptr;
497   unsigned int oid;
498   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
499   objheader_t *headptr;
500
501   /* Counters and arrays to formulate decision on control message to be sent */
502   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
503   oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
504   oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
505   int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
506   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
507   int numBytes = 0;
508   /* modptr points to the beginning of the object store
509    * created at the Pariticipant.
510    * Object store holds the modified objects involved in the transaction request */
511   ptr = (char *) modptr;
512
513   char retval;
514
515   /* Process each oid in the machine pile/ group per thread */
516   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
517     if (i < fixed->numread) { //Objs only read and not modified
518       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
519       incr *= i;
520       oid = *((unsigned int *)(objread + incr));
521       incr += sizeof(unsigned int);
522       version = *((unsigned short *)(objread + incr));
523       retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
524                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
525     } else {  //Objs modified
526       if(i == fixed->numread) {
527         oidlocked[objlocked++] = -1;
528       }
529       int tmpsize;
530       headptr = (objheader_t *) ptr;
531       oid = OID(headptr);
532       version = headptr->version;
533       GETSIZE(tmpsize, headptr);
534       ptr += sizeof(objheader_t) + tmpsize;
535       retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
536                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
537                               &numBytes, &control, oid, version);
538     }
539     if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
540       //unlock objects as soon versions mismatch or locks cannot be acquired)
541       if (objlocked > 0) {
542         int useWriteUnlock = 0;
543         for(j = 0; j < objlocked; j++) {
544           if(oidlocked[j] == -1) {
545             useWriteUnlock = 1;
546             continue;
547           }
548           if((headptr = mhashSearch(oidlocked[j])) == NULL) {
549             printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
550             return 0;
551           }
552           if(useWriteUnlock) {
553             write_unlock(STATUSPTR(headptr));
554           } else {
555             read_unlock(STATUSPTR(headptr));
556           }
557         }
558         if(v_nomatch > 0)
559           free(oidlocked);
560       }
561       objlocked=0;
562       break;
563     }
564   }
565   //go through rest of the objects for version mismatches
566   if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
567     i++;
568     procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes);
569   }
570
571   /* send TRANS_DISAGREE and objs*/
572   if(v_nomatch > 0) {
573 #ifdef CACHE
574     char *objs = calloc(1, numBytes);
575     int j, offset = 0;
576     for(j = 0; j<objvernotmatch; j++) {
577       objheader_t *header = mhashSearch(oidvernotmatch[j]);
578       int size = 0;
579       GETSIZE(size, header);
580       size += sizeof(objheader_t);
581       memcpy(objs+offset, header, size);
582       offset += size;
583     }
584 #endif
585     /*
586     if (objlocked > 0) {
587       int useWriteUnlock = 0;
588       for(j = 0; j < objlocked; j++) {
589         if(oidlocked[j] == -1) {
590           useWriteUnlock = 1;
591           continue;
592         }
593         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
594           printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
595           return 0;
596         }
597         if(useWriteUnlock) {
598           write_unlock(STATUSPTR(headptr));
599         } else {
600           read_unlock(STATUSPTR(headptr));
601         }
602       }
603       free(oidlocked);
604     }
605     */
606     control=TRANS_DISAGREE;
607     send_data(acceptfd, &control, sizeof(char));
608 #ifdef CACHE
609     send_data(acceptfd, &numBytes, sizeof(int));
610     send_data(acceptfd, objs, numBytes);
611     transinfo->objvernotmatch = oidvernotmatch;
612     transinfo->numvernotmatch = objvernotmatch;
613     free(objs);
614     free(transinfo->objvernotmatch);
615 #endif
616     return control;
617   }
618
619   /* Decide what control message to send to Coordinator */
620   if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
621                                    modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
622     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
623     return 0;
624   }
625   return control;
626 }
627
628 /* Update Commit info for objects that are read */
629 char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
630                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
631                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
632                              char *control, unsigned int oid, unsigned short version) {
633   void *mobj;
634   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
635
636   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
637     /* Save the oids not found and number of oids not found for later use */
638     oidnotfound[*objnotfound] = oid;
639     (*objnotfound)++;
640     *control = TRANS_DISAGREE;
641   } else {     /* If Obj found in machine (i.e. has not moved) */
642     /* Check if Obj is locked by any previous transaction */
643     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
644       if (version == ((objheader_t *)mobj)->version) { /* match versions */
645         (*v_matchnolock)++;
646     *control = TRANS_AGREE;
647       } else { /* If versions don't match ...HARD ABORT */
648         (*v_nomatch)++;
649         oidvernotmatch[*objvernotmatch] = oid;
650         (*objvernotmatch)++;
651         int size;
652         GETSIZE(size, mobj);
653         size += sizeof(objheader_t);
654         *numBytes += size;
655         /* Send TRANS_DISAGREE to Coordinator */
656         *control = TRANS_DISAGREE;
657       }
658       //Keep track of oid locked
659       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
660     } else {  //we are locked
661       if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
662         (*v_matchlock)++;
663       *control=TRANS_SOFT_ABORT;
664       } else { /* If versions don't match ...HARD ABORT */
665         (*v_nomatch)++;
666         oidvernotmatch[*objvernotmatch] = oid;
667         (*objvernotmatch)++;
668         int size;
669         GETSIZE(size, mobj);
670         size += sizeof(objheader_t);
671         *numBytes += size;
672         *control = TRANS_DISAGREE;
673       }
674     }
675   }
676   return *control;
677 }
678
679 /* Update Commit info for objects that are read */
680 char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
681                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
682                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
683   void *mobj;
684   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
685   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
686     /* Save the oids not found and number of oids not found for later use */
687     oidnotfound[*objnotfound] = oid;
688     (*objnotfound)++;
689         *control = TRANS_DISAGREE;
690   } else {     /* If Obj found in machine (i.e. has not moved) */
691     /* Check if Obj is locked by any previous transaction */
692     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
693       if (version == ((objheader_t *)mobj)->version) { /* match versions */
694         (*v_matchnolock)++;
695       *control=TRANS_AGREE;
696       } else { /* If versions don't match ...HARD ABORT */
697         (*v_nomatch)++;
698         oidvernotmatch[(*objvernotmatch)++] = oid;
699         int size;
700         GETSIZE(size, mobj);
701         size += sizeof(objheader_t);
702         *numBytes += size;
703         /* Send TRANS_DISAGREE to Coordinator */
704         *control = TRANS_DISAGREE;
705       }
706       //Keep track of oid locked
707       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
708     } else { /* Some other transaction has aquired a write lock on this object */
709       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
710         (*v_matchlock)++;
711        *control=TRANS_SOFT_ABORT;
712       } else { /* If versions don't match ...HARD ABORT */
713         (*v_nomatch)++;
714         oidvernotmatch[*objvernotmatch] = oid;
715         (*objvernotmatch)++;
716         int size;
717         GETSIZE(size, mobj);
718         size += sizeof(objheader_t);
719         *numBytes += size;
720         *control = TRANS_DISAGREE;
721       }
722     }
723   }
724   return *control;
725 }
726
727 void procRestObjs(char *objread, 
728                   char *objmod, 
729                   int index, 
730                   int numread, 
731                   int nummod, 
732                   unsigned int *oidnotfound, 
733                   unsigned int *oidvernotmatch,
734                   int *objnotfound, 
735                   int *objvernotmatch, 
736                   int *v_nomatch, 
737                   int *numBytes) {
738   int i;
739   unsigned int oid;
740   unsigned short version;
741
742   /* Process each oid in the machine pile/ group per thread */
743   for (i = index; i < numread+nummod; i++) {
744     if (i < numread) { //Objs only read and not modified
745       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
746       incr *= i;
747       oid = *((unsigned int *)(objread + incr));
748       incr += sizeof(unsigned int);
749       version = *((unsigned short *)(objread + incr));
750     } else {  //Objs modified
751       objheader_t *headptr;
752       headptr = (objheader_t *) objmod;
753       oid = OID(headptr);
754       version = headptr->version;
755       int tmpsize;
756       GETSIZE(tmpsize, headptr);
757       objmod += sizeof(objheader_t) + tmpsize;
758     }
759     processVerNoMatch(oidnotfound,
760         oidvernotmatch,
761         objnotfound,
762         objvernotmatch,
763         v_nomatch,
764         numBytes,
765         oid, 
766         version);
767   }
768   return;
769 }
770
771 void processVerNoMatch(unsigned int *oidnotfound, 
772                       unsigned int *oidvernotmatch, 
773                       int *objnotfound, 
774                       int *objvernotmatch, 
775                       int *v_nomatch, 
776                       int *numBytes,
777                       unsigned int oid, 
778                       unsigned short version) {
779   void *mobj;
780   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
781
782   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
783     /* Save the oids not found and number of oids not found for later use */
784     oidnotfound[*objnotfound] = oid;
785     (*objnotfound)++;
786   } else {     /* If Obj found in machine (i.e. has not moved) */
787     /* Check if Obj is locked by any previous transaction */
788     //if (!write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
789     if (version != ((objheader_t *)mobj)->version) { /* match versions */
790       (*v_nomatch)++;
791       oidvernotmatch[*objvernotmatch] = oid;
792           (*objvernotmatch)++;
793           int size;
794       GETSIZE(size, mobj);
795       size += sizeof(objheader_t);
796       *numBytes += size;
797     }
798   }
799 }
800
801   /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
802    * to send to Coordinator based on the votes of oids involved in the transaction */
803   char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
804       int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
805       unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
806     int val;
807     char control = 0;
808
809     /* Condition to send TRANS_AGREE */
810     if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
811       control = TRANS_AGREE;
812       /* Send control message */
813       send_data(acceptfd, &control, sizeof(char));
814     }
815     /* Condition to send TRANS_SOFT_ABORT */
816     if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
817     //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
818       control = TRANS_SOFT_ABORT;
819
820       /* Send control message */
821       send_data(acceptfd, &control, sizeof(char));
822
823       /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
824       if(*(objnotfound) != 0) {
825         int msg[1];
826         msg[0] = *(objnotfound);
827         send_data(acceptfd, &msg, sizeof(int));
828         int size = sizeof(unsigned int)* *(objnotfound);
829         send_data(acceptfd, oidnotfound, size);
830       }
831     }
832
833     /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
834      * if Participant receives a TRANS_COMMIT */
835     transinfo->objlocked = oidlocked;
836     transinfo->objnotfound = oidnotfound;
837     transinfo->modptr = modptr;
838     transinfo->numlocked = *(objlocked);
839     transinfo->numnotfound = *(objnotfound);
840     return control;
841   }
842
843   /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
844    * addresses in lookup table and also changes version number
845    * Sends an ACK back to Coordinator */
846   int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
847     objheader_t *header;
848     objheader_t *newheader;
849     int i = 0, offset = 0;
850     char control;
851     int tmpsize;
852
853     /* Process each modified object saved in the mainobject store */
854     for(i = 0; i < nummod; i++) {
855       if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
856         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
857         return 1;
858       }
859       GETSIZE(tmpsize,header);
860
861       {
862         struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
863         struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
864         dst->type=src->type;
865         dst->___cachedCode___=src->___cachedCode___;
866         dst->___cachedHash___=src->___cachedHash___;
867         memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
868       }
869       header->version += 1;
870       /* If threads are waiting on this object to be updated, notify them */
871       if(header->notifylist != NULL) {
872         notifyAll(&header->notifylist, OID(header), header->version);
873       }
874       offset += sizeof(objheader_t) + tmpsize;
875     }
876
877   if (nummod > 0)
878     free(modptr);
879
880   /* Unlock locked objects */
881   int useWriteUnlock = 0;
882   for(i = 0; i < numlocked; i++) {
883     if(oidlocked[i] == -1) {
884       useWriteUnlock = 1;
885       continue;
886     }
887     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
888       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
889       return 1;
890     }
891
892     if(useWriteUnlock) {
893       write_unlock(STATUSPTR(header));
894     } else {
895       read_unlock(STATUSPTR(header));
896     }
897   }
898   //TODO Update location lookup table
899   return 0;
900 }
901
902 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
903  * Looks for the objects to be prefetched in the main object store.
904  * If objects are not found then record those and if objects are found
905  * then use offset values to prefetch references to other objects */
906 int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
907   int i, size, objsize, numoffset = 0, gid=0;
908   int length;
909   char *recvbuffer, control;
910   unsigned int oid, mid=-1;
911   objheader_t *header;
912   oidmidpair_t oidmid;
913   struct writestruct writebuffer;
914   int sd = -1;
915
916   while(1) {
917     recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
918     if(numoffset == -1)
919       break;
920     recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
921     oid = oidmid.oid;
922     if (mid != oidmid.mid) {
923       if (mid!=-1) {
924         forcesend_buf(sd, &writebuffer, NULL, 0);
925         freeSockWithLock(transPResponseSocketPool, mid, sd);
926       }
927       mid=oidmid.mid;
928       sd = getSockWithLock(transPResponseSocketPool, mid);
929       writebuffer.offset=0;
930     }
931     short offsetarry[numoffset];
932     recv_data_buf((int)acceptfd, readbuffer, &gid, sizeof(int));
933     recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
934     LOGTIME('A',oid ,0,myrdtsc(),gid); //after recv the entire prefetch request 
935
936     /*Process each oid */
937     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
938       /* Save the oids not found in buffer for later use */
939       size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
940       char sendbuffer[size+1];
941       sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
942       *((int *) (sendbuffer+sizeof(char))) = size;
943       *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
944       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
945       *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
946       send_buf(sd, &writebuffer, sendbuffer, size+1);
947       LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
948     } else { /* Object Found */
949       int incr = 1;
950       GETSIZE(objsize, header);
951       size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
952       char sendbuffer[size+1];
953       sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
954       *((int *)(sendbuffer + incr)) = size;
955       incr += sizeof(int);
956       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
957       incr += sizeof(char);
958       *((unsigned int *)(sendbuffer+incr)) = oid;
959       incr += sizeof(unsigned int);
960       *((int *)(sendbuffer+incr)) = gid;
961       incr += sizeof(int);
962       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
963       send_buf(sd, &writebuffer, sendbuffer, size+1);
964       LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
965       LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
966
967       /* Calculate the oid corresponding to the offset value */
968       for(i = 0 ; i< numoffset ; i++) {
969         /* Check for arrays  */
970         if(TYPE(header) >= NUMCLASSES) {
971           int elementsize = classsize[TYPE(header)];
972           struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
973           unsigned short length = ao->___length___;
974           /* Check if array out of bounds */
975           if(offsetarry[i]< 0 || offsetarry[i] >= length) {
976             break;
977           }
978           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
979         } else {
980           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
981         }
982
983         /* Don't continue if we hit a NULL pointer */
984         if (oid==0)
985           break;
986
987     LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request
988
989         if((header = mhashSearch(oid)) == NULL) {
990           size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
991           char sendbuffer[size+1];
992           sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
993           *((int *) (sendbuffer+1)) = size;
994           *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
995           *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
996       *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
997
998           send_buf(sd, &writebuffer, sendbuffer, size+1);
999       LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
1000           break;
1001         } else { /* Obj Found */
1002           int incr = 1;
1003           GETSIZE(objsize, header);
1004           size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1005           char sendbuffer[size+1];
1006           sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
1007           *((int *)(sendbuffer + incr)) = size;
1008           incr += sizeof(int);
1009           *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1010           incr += sizeof(char);
1011           *((unsigned int *)(sendbuffer+incr)) = oid;
1012           incr += sizeof(unsigned int);
1013       *((int *)(sendbuffer+incr)) = gid;
1014       incr += sizeof(int);
1015           memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1016           send_buf(sd, &writebuffer, sendbuffer, size+1);
1017       LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
1018       LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
1019         }
1020       } //end of for
1021     }
1022   } //end of while
1023
1024     //Release socket
1025   if (mid!=-1) {
1026     forcesend_buf(sd, &writebuffer, NULL, 0);
1027     freeSockWithLock(transPResponseSocketPool, mid, sd);
1028   }
1029   return 0;
1030 }
1031
1032 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
1033   send_data(sd, control, sizeof(char));
1034   /* Send the buffer with its size */
1035   int length = *(size);
1036   send_data(sd, sendbuffer, length);
1037 }
1038
1039 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
1040   objheader_t *header;
1041   unsigned int oid;
1042   unsigned short newversion;
1043   char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
1044   int sd;
1045   struct sockaddr_in remoteAddr;
1046   int bytesSent;
1047   int size;
1048   int i = 0;
1049
1050   while(i < numoid) {
1051     oid = *(oidarry + i);
1052     if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1053       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1054       return;
1055     } else {
1056       /* Check to see if versions are same */
1057 checkversion:
1058       if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
1059         newversion = header->version;
1060         if(newversion == *(versionarry + i)) {
1061           //Add to the notify list
1062           if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
1063             printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
1064             return;
1065           }
1066           write_unlock(STATUSPTR(header));
1067         } else {
1068           write_unlock(STATUSPTR(header));
1069           if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1070             perror("processReqNotify():socket()");
1071             return;
1072           }
1073           bzero(&remoteAddr, sizeof(remoteAddr));
1074           remoteAddr.sin_family = AF_INET;
1075           remoteAddr.sin_port = htons(LISTEN_PORT);
1076           remoteAddr.sin_addr.s_addr = htonl(mid);
1077
1078           if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1079             printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
1080                    inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1081             close(sd);
1082             return;
1083           } else {
1084       
1085             //Send Update notification
1086             msg[0] = THREAD_NOTIFY_RESPONSE;
1087             *((unsigned int *)&msg[1]) = oid;
1088             size = sizeof(unsigned int);
1089             *((unsigned short *)(&msg[1]+size)) = newversion;
1090             size += sizeof(unsigned short);
1091             *((unsigned int *)(&msg[1]+size)) = threadid;
1092             size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
1093             send_data(sd, msg, size);
1094           }
1095           close(sd);
1096         }
1097       } else {
1098         randomdelay();
1099         goto checkversion;
1100       }
1101     }
1102     i++;
1103   }
1104   free(oidarry);
1105   free(versionarry);
1106 }