a)Fix prefetch analysis bugs
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <netdb.h>
9 #include <fcntl.h>
10 #include <errno.h>
11 #include <string.h>
12 #include "dstm.h"
13 #include "mlookup.h"
14 #include "llookup.h"
15 #include "threadnotify.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19
20
21 #define LISTEN_PORT 2156
22 #define BACKLOG 10 //max pending connections
23 #define RECEIVE_BUFFER_SIZE 2048
24
25 extern int classsize[];
26
27 objstr_t *mainobjstore;
28 pthread_mutex_t mainobjstore_mutex;
29 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
30
31 /* This function initializes the main objects store and creates the 
32  * global machine and location lookup table */
33
34 int dstmInit(void)
35 {
36         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
37         /* Initialize attribute for mutex */
38         pthread_mutexattr_init(&mainobjstore_mutex_attr);
39         pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
40         pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
41         if (mhashCreate(HASH_SIZE, LOADFACTOR))
42                 return 1; //failure
43         
44         if (lhashCreate(HASH_SIZE, LOADFACTOR))
45                 return 1; //failure
46
47         if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
48                 return 1; //failure
49         
50         return 0;
51 }
52
53 /* This function starts the thread to listen on a socket 
54  * for tranaction calls */
55 void *dstmListen()
56 {
57         int listenfd, acceptfd;
58         struct sockaddr_in my_addr;
59         struct sockaddr_in client_addr;
60         socklen_t addrlength = sizeof(struct sockaddr);
61         pthread_t thread_dstm_accept;
62         int i;
63         int setsockflag=1;
64
65         listenfd = socket(AF_INET, SOCK_STREAM, 0);
66         if (listenfd == -1)
67         {
68                 perror("socket");
69                 exit(1);
70         }
71
72         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
73           perror("socket");
74           exit(1);
75         }
76 #ifdef MAC
77         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
78           perror("socket");
79           exit(1);
80         }
81 #endif
82
83         my_addr.sin_family = AF_INET;
84         my_addr.sin_port = htons(LISTEN_PORT);
85         my_addr.sin_addr.s_addr = INADDR_ANY;
86         memset(&(my_addr.sin_zero), '\0', 8);
87
88         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
89         {
90                 perror("bind");
91                 exit(1);
92         }
93         
94         if (listen(listenfd, BACKLOG) == -1)
95         {
96                 perror("listen");
97                 exit(1);
98         }
99
100         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
101         while(1)
102         {
103           int retval;
104           acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
105           do {
106             retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
107           } while(retval!=0);
108           pthread_detach(thread_dstm_accept);
109         }
110 }
111 /* This function accepts a new connection request, decodes the control message in the connection 
112  * and accordingly calls other functions to process new requests */
113 void *dstmAccept(void *acceptfd)
114 {
115         int val, retval, size, sum;
116         unsigned int oid;
117         char *buffer;
118         char control,ctrl;
119         char *ptr;
120         void *srcObj;
121         objheader_t *h;
122         trans_commit_data_t transinfo;
123         unsigned short objType, *versionarry, version;
124         unsigned int *oidarry, numoid, mid, threadid;
125         
126         transinfo.objlocked = NULL;
127         transinfo.objnotfound = NULL;
128         transinfo.modptr = NULL;
129         transinfo.numlocked = 0;
130         transinfo.numnotfound = 0;
131
132         /* Receive control messages from other machines */
133         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
134                 printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
135                 pthread_exit(NULL);
136         }
137         
138         switch(control) {
139                 case READ_REQUEST:
140                         /* Read oid requested and search if available */
141                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
142                                 perror("Error: receiving 0x0 object from cooridnator\n");
143                                 pthread_exit(NULL);
144                         }
145                         if((srcObj = mhashSearch(oid)) == NULL) {
146                                 printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
147                                 pthread_exit(NULL);
148                         }
149                         h = (objheader_t *) srcObj;
150                         GETSIZE(size, h);
151                         size += sizeof(objheader_t);
152
153                         if (h == NULL) {
154                                 ctrl = OBJECT_NOT_FOUND;
155                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
156                                         perror("Error sending control msg to coordinator\n");
157                                         pthread_exit(NULL);
158                                 }
159                         } else {
160                                 /* Type */
161                                 char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
162                                 *((int *)&msg[1])=size;
163                                 if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
164                                         perror("Error sending size of object to coordinator\n");
165                                         pthread_exit(NULL);
166                                 }
167                                 if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
168                                         perror("Error in sending object\n");
169                                         pthread_exit(NULL);
170                                 }
171                         }
172                         break;
173                 
174                 case READ_MULT_REQUEST:
175                         break;
176         
177                 case MOVE_REQUEST:
178                         break;
179
180                 case MOVE_MULT_REQUEST:
181                         break;
182
183                 case TRANS_REQUEST:
184                         /* Read transaction request */
185                         if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
186                                 printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
187                                 pthread_exit(NULL);
188                         }
189                         break;
190                 case TRANS_PREFETCH:
191                         do {
192                                 if((val = prefetchReq((int)acceptfd)) != 0) {
193                                         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
194                                         pthread_exit(NULL);
195                                 }
196
197                                 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) < 0) {
198                                         printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
199                                         pthread_exit(NULL);
200                                 } else if(retval == 0) {
201                                         printf("%s() Error: socket closed at the requesting side\n");
202                                         pthread_exit(NULL);
203                                 }
204
205                         } while (control == TRANS_PREFETCH);
206
207                         break;
208                 case TRANS_PREFETCH_RESPONSE:
209                         if((val = getPrefetchResponse((int) acceptfd)) != 0) {
210                                 printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
211                                 pthread_exit(NULL);
212                         }
213                         break;
214                 case START_REMOTE_THREAD:
215                         retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
216                         if (retval <= 0)
217                                 perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
218                         else if (retval != sizeof(unsigned int))
219                                 printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
220                                         retval, __FILE__, __LINE__);
221                         else
222                         {
223                                 objType = getObjType(oid);
224                                 startDSMthread(oid, objType);
225                         }
226                         break;
227
228                 case THREAD_NOTIFY_REQUEST:
229                         retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0);
230                         size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
231                         if((buffer = calloc(1,size)) == NULL) {
232                                 printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
233                                 pthread_exit(NULL);
234                         }
235                         sum = 0;
236                         do {
237                                 sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
238                         } while(sum < size);
239
240                         oidarry = calloc(numoid, sizeof(unsigned int)); 
241                         memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
242                         size = sizeof(unsigned int) * numoid;
243                         versionarry = calloc(numoid, sizeof(unsigned short));
244                         memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
245                         size += sizeof(unsigned short) * numoid;
246                         mid = *((unsigned int *)(buffer+size));
247                         size += sizeof(unsigned int);
248                         threadid = *((unsigned int *)(buffer+size));
249                         processReqNotify(numoid, oidarry, versionarry, mid, threadid);
250                         free(buffer);
251
252                         break;
253
254                 case THREAD_NOTIFY_RESPONSE:
255                         size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
256                         if((buffer = calloc(1,size)) == NULL) {
257                                 printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
258                                 pthread_exit(NULL);
259                         }
260
261                         sum = 0;
262                         do {
263                                 sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
264                         } while(sum < size);
265
266                         oid = *((unsigned int *)buffer);
267                         size = sizeof(unsigned int);
268                         version = *((unsigned short *)(buffer+size));
269                         size += sizeof(unsigned short);
270                         threadid = *((unsigned int *)(buffer+size));
271                         threadNotify(oid,version,threadid);
272                         free(buffer);
273
274                         break;
275                 default:
276                         printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
277         }
278
279         /* Close connection */
280         if (close((int)acceptfd) == -1)
281                 perror("close");
282         pthread_exit(NULL);
283 }
284
285 /* This function reads the information available in a transaction request
286  * and makes a function call to process the request */
287 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
288         char *ptr;
289         void *modptr;
290         unsigned int *oidmod, oid;
291         fixed_data_t fixed;
292         objheader_t *headaddr;
293         int sum = 0, i, N, n, val;
294
295         oidmod = NULL;
296
297         /* Read fixed_data_t data structure */ 
298         N = sizeof(fixed) - 1;
299         ptr = (char *)&fixed;;
300         fixed.control = TRANS_REQUEST;
301         do {
302                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
303                 sum += n;
304         } while(sum < N && n != 0); 
305
306         /* Read list of mids */
307         int mcount = fixed.mcount;
308         N = mcount * sizeof(unsigned int);
309         unsigned int listmid[mcount];
310         ptr = (char *) listmid;
311         sum = 0;
312         do {
313                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
314                 sum += n;
315         } while(sum < N && n != 0);
316
317         /* Read oid and version tuples for those objects that are not modified in the transaction */
318         int numread = fixed.numread;
319         N = numread * (sizeof(unsigned int) + sizeof(unsigned short));
320         char objread[N];
321         if(numread != 0) { //If pile contains more than one object to be read, 
322                           // keep reading all objects
323                 sum = 0;
324                 do {
325                         n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0);
326                         sum += n;
327                 } while(sum < N && n != 0);
328         }
329         
330         /* Read modified objects */
331         if(fixed.nummod != 0) {
332                 if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
333                         printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
334                         return 1;
335                 }
336                 sum = 0;
337                 do { // Recv the objs that are modified by the Coordinator
338                         n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
339                         sum += n;
340                 } while (sum < fixed.sum_bytes && n != 0);
341         }
342
343         /* Create an array of oids for modified objects */
344         oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
345         if (oidmod == NULL)
346         {
347                 printf("calloc error %s, %d\n", __FILE__, __LINE__);
348                 return 1;
349         }
350         ptr = (char *) modptr;
351         for(i = 0 ; i < fixed.nummod; i++) {
352           int tmpsize;
353           headaddr = (objheader_t *) ptr;
354           oid = OID(headaddr);
355           oidmod[i] = oid;
356           GETSIZE(tmpsize, headaddr);
357           ptr += sizeof(objheader_t) + tmpsize;
358         }
359         
360         /*Process the information read */
361         if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
362                 printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
363                 /* Free resources */
364                 if(oidmod != NULL) {
365                         free(oidmod);
366                 }
367                 return 1;
368         }
369
370         /* Free resources */
371         if(oidmod != NULL) {
372                 free(oidmod);
373         }
374
375         return 0;
376 }
377
378 /* This function processes the Coordinator's transaction request using "handleTransReq" 
379  * function and sends a reply to the co-ordinator.
380  * Following this it also receives a new control message from the co-ordinator and processes this message*/
381 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
382                 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
383         char control, sendctrl, retval;
384         objheader_t *tmp_header;
385         void *header;
386         int  i = 0, val;
387
388         /* Send reply to the Coordinator */
389         if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
390                 printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
391                 return 1;
392         }
393
394         do {
395                 retval = recv((int)acceptfd, &control, sizeof(char), 0);
396         } while(retval < sizeof(char));
397
398         /* Process the new control message */
399         switch(control) {
400                 case TRANS_ABORT:
401                         if (fixed->nummod > 0)
402                                 free(modptr);
403                         /* Unlock objects that was locked due to this transaction */
404                         for(i = 0; i< transinfo->numlocked; i++) {
405                                 if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
406                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
407                                         return 1;
408                                 }
409                                 STATUS(((objheader_t *)header)) &= ~(LOCK);             
410                         }
411
412                         /* Send ack to Coordinator */
413                         sendctrl = TRANS_UNSUCESSFUL;
414                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
415                                 perror("Error: In sending ACK to coordinator\n");
416                                 if (transinfo->objlocked != NULL) {
417                                         free(transinfo->objlocked);
418                                 }
419                                 if (transinfo->objnotfound != NULL) {
420                                         free(transinfo->objnotfound);
421                                 }
422
423                                 return 1;
424                         }
425                         break;
426
427                 case TRANS_COMMIT:
428                         /* Invoke the transCommit process() */
429                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
430                                 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
431                                 /* Free memory */
432                                 if (transinfo->objlocked != NULL) {
433                                         free(transinfo->objlocked);
434                                 }
435                                 if (transinfo->objnotfound != NULL) {
436                                         free(transinfo->objnotfound);
437                                 }
438                                 return 1;
439                         }
440                         break;
441
442                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
443                         break;
444                 default:
445                         printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
446                         //TODO Use fixed.trans_id  TID since Client may have died
447                         break;
448         }
449
450         /* Free memory */
451         if (transinfo->objlocked != NULL) {
452                 free(transinfo->objlocked);
453         }
454         if (transinfo->objnotfound != NULL) {
455                 free(transinfo->objnotfound);
456         }
457
458         return 0;
459 }
460
461 /* This function increments counters while running a voting decision on all objects involved 
462  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
463 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
464         int val, i = 0, j;
465         unsigned short version;
466         char control = 0, *ptr;
467         unsigned int oid;
468         unsigned int *oidnotfound, *oidlocked;
469         void *mobj;
470         objheader_t *headptr;
471
472         /* Counters and arrays to formulate decision on control message to be sent */
473         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
474         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
475         int objnotfound = 0, objlocked = 0;
476         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
477
478         /* modptr points to the beginning of the object store 
479          * created at the Pariticipant. 
480          * Object store holds the modified objects involved in the transaction request */ 
481         ptr = (char *) modptr;
482         
483         /* Process each oid in the machine pile/ group per thread */
484         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
485                 if (i < fixed->numread) {//Objs only read and not modified
486                         int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
487                         incr *= i;
488                         oid = *((unsigned int *)(objread + incr));
489                         incr += sizeof(unsigned int);
490                         version = *((unsigned short *)(objread + incr));
491                 } else {//Objs modified
492                   int tmpsize;
493                   headptr = (objheader_t *) ptr;
494                   oid = OID(headptr);
495                   version = headptr->version;
496                   GETSIZE(tmpsize, headptr);
497                   ptr += sizeof(objheader_t) + tmpsize;
498                 }
499                 
500                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
501
502                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
503                         /* Save the oids not found and number of oids not found for later use */
504                         oidnotfound[objnotfound] = oid;
505                         objnotfound++;
506                 } else { /* If Obj found in machine (i.e. has not moved) */
507                         /* Check if Obj is locked by any previous transaction */
508                         if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
509                                 if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
510                                         v_matchlock++;
511                                 } else {/* If versions don't match ...HARD ABORT */
512                                         v_nomatch++;
513                                         /* Send TRANS_DISAGREE to Coordinator */
514                                         control = TRANS_DISAGREE;
515                                         if (objlocked > 0) {
516                                           for(j = 0; j < objlocked; j++) {
517                                                         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
518                                                                 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
519                                                                 return 0;
520                                                         }
521                                                         STATUS(headptr) &= ~(LOCK);
522                                                 }
523                                                 free(oidlocked);
524                                         }
525                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
526                                                 perror("Error in sending control to the Coordinator\n");
527                                                 return 0;
528                                         }
529                                         return control;
530                                 }
531                         } else {/* If Obj is not locked then lock object */
532                                 STATUS(((objheader_t *)mobj)) |= LOCK;
533                                 /* Save all object oids that are locked on this machine during this transaction request call */
534                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
535                                 objlocked++;
536                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
537                                         v_matchnolock++;
538                                 } else { /* If versions don't match ...HARD ABORT */
539                                         v_nomatch++;
540                                         control = TRANS_DISAGREE;
541                                         if (objlocked > 0) {
542                                                 for(j = 0; j < objlocked; j++) {
543                                                         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
544                                                                 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
545                                                                 return 0;
546                                                         }
547                                                         STATUS(headptr) &= ~(LOCK);
548                                                 }
549                                                 free(oidlocked);
550                                         }
551
552                                         /* Send TRANS_DISAGREE to Coordinator */
553                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
554                                                 perror("Error in sending control to the Coordinator\n");
555                                                 return 0;
556                                         }
557                                         
558                                         return control;
559                                 }
560                         }
561                 }
562         }
563         
564         /* Decide what control message to send to Coordinator */
565         if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
566                                         modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
567                 printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
568                 return 0;
569         }
570         
571         return control;
572
573 }
574 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
575  * to send to Coordinator based on the votes of oids involved in the transaction */
576 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
577                 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
578                 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
579         int val;
580         char control = 0;
581
582         /* Condition to send TRANS_AGREE */
583         if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
584                 control = TRANS_AGREE;
585                 /* Send control message */
586                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
587                         perror("Error in sending control to Coordinator\n");
588                         return 0;
589                 }
590         }
591         /* Condition to send TRANS_SOFT_ABORT */
592         if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
593                 control = TRANS_SOFT_ABORT;
594
595                 /* Send control message */
596                 if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
597                         perror("Error in sending TRANS_SOFT_ABORT control\n");
598                         return 0;
599                 }
600
601                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
602                 if(*(objnotfound) != 0) { 
603                         int msg[1];
604                         msg[0] = *(objnotfound);
605                         if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
606                                 perror("Error in sending objects that are not found\n");
607                                 return 0;
608                         }
609                         int size = sizeof(unsigned int)* *(objnotfound);
610                         if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
611                                 perror("Error in sending objects that are not found\n");
612                                 return 0;
613                         }
614                 }
615         }
616
617         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
618          * if Participant receives a TRANS_COMMIT */
619         transinfo->objlocked = oidlocked;
620         transinfo->objnotfound = oidnotfound;
621         transinfo->modptr = modptr;
622         transinfo->numlocked = *(objlocked);
623         transinfo->numnotfound = *(objnotfound);
624         
625         return control;
626 }
627
628 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
629  * addresses in lookup table and also changes version number
630  * Sends an ACK back to Coordinator */
631 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
632         objheader_t *header;
633         objheader_t *newheader;
634         int i = 0, offset = 0;
635         char control;
636         int tmpsize;
637
638         /* Process each modified object saved in the mainobject store */
639         for(i = 0; i < nummod; i++) {
640                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
641                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
642                         return 1;
643                 }
644                 GETSIZE(tmpsize,header);
645                 pthread_mutex_lock(&mainobjstore_mutex);
646                 memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
647                 header->version += 1; 
648                 /* If threads are waiting on this object to be updated, notify them */
649                 if(header->notifylist != NULL) {
650                         notifyAll(&header->notifylist, OID(header), header->version);
651                 }
652                 pthread_mutex_unlock(&mainobjstore_mutex);
653                 offset += sizeof(objheader_t) + tmpsize;
654         }
655
656         if (nummod > 0)
657                 free(modptr);
658
659         /* Unlock locked objects */
660         for(i = 0; i < numlocked; i++) {
661                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
662                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
663                         return 1;
664                 }
665                 STATUS(header) &= ~(LOCK);
666         }
667         //TODO Update location lookup table
668
669         /* Send ack to coordinator */
670         control = TRANS_SUCESSFUL;
671         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
672                 perror("Error sending ACK to coordinator\n");
673                 return 1;
674         }
675         
676         return 0;
677 }
678
679 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
680  * Looks for the objects to be prefetched in the main object store.
681  * If objects are not found then record those and if objects are found
682  * then use offset values to prefetch references to other objects */
683
684 int prefetchReq(int acceptfd) {
685         int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
686         int length, sd;
687         char *recvbuffer, *sendbuffer, control;
688         unsigned int oid, mid;
689         short *offsetarry;
690         objheader_t *header;
691         struct sockaddr_in remoteAddr;
692
693         while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
694                 if(length == -1) { //-1 is special character to represent end of sending oids and offsets
695                         break;
696                 } else {
697                         numbytes = 0;
698                         size = length - sizeof(int);
699                         if((recvbuffer = calloc(1, size)) == NULL) {
700                                 printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
701                                 return -1;
702                         }
703                         while(numbytes < size) {
704                                 numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
705                         }
706                         
707                         oid = *((unsigned int *) recvbuffer);
708                         mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
709                         size = size - (2 * sizeof(unsigned int));
710                         numoffset = size / sizeof(short);
711                         if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
712                                 printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
713                                 free(recvbuffer);
714                                 return -1;
715                         }
716                         memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
717                         free(recvbuffer);
718
719                         /* Create socket to send information */
720                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
721                                 perror("prefetchReq():socket()");
722                                 return -1;
723                         }
724                         bzero(&remoteAddr, sizeof(remoteAddr));
725                         remoteAddr.sin_family = AF_INET;
726                         remoteAddr.sin_port = htons(LISTEN_PORT);
727                         remoteAddr.sin_addr.s_addr = htonl(mid);
728
729                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
730                                 printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
731                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
732                                 close(sd);
733                                 return -1;
734                         }
735
736                         /*Process each oid */
737                         if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
738                                 /* Save the oids not found in buffer for later use */
739                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
740                                 if((sendbuffer = calloc(1, size)) == NULL) {
741                                         printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
742                                         free(offsetarry);
743                                         close(sd);
744                                         return -1;
745                                 }
746                                 *((int *) sendbuffer) = size;
747                                 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
748                                 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
749
750                                 control = TRANS_PREFETCH_RESPONSE;
751                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
752                                         free(offsetarry);
753                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
754                                                         __func__, __FILE__, __LINE__);
755                                         close(sd);
756                                         return -1;
757                                 }
758                         } else { /* Object Found */
759                                 int incr = 0;
760                                 GETSIZE(objsize, header);
761                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
762                                 if((sendbuffer = calloc(1, size)) == NULL) {
763                                         printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
764                                         free(offsetarry);
765                                         close(sd);
766                                         return -1;
767                                 }
768                                 *((int *) (sendbuffer + incr)) = size;
769                                 incr += sizeof(int);
770                                 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
771                                 incr += sizeof(char);
772                                 *((unsigned int *)(sendbuffer+incr)) = oid;
773                                 incr += sizeof(unsigned int);
774                                 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
775
776                                 control = TRANS_PREFETCH_RESPONSE;
777                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
778                                         free(offsetarry);
779                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
780                                                         __func__, __FILE__, __LINE__);
781                                         close(sd);
782                                         return -1;
783                                 }
784                                 /* Calculate the oid corresponding to the offset value */
785                                 for(i = 0 ; i< numoffset ; i++) {
786                                         /* Check for arrays  */
787                                         if(TYPE(header) > NUMCLASSES) {
788                                                 isArray = 1;
789                                         }
790                                         if(isArray == 1) {
791                                                 int elementsize = classsize[TYPE(header)];
792                                                 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
793                                                 unsigned short length = ao->___length___;
794                                                 /* Check if array out of bounds */
795                                                 if(offsetarry[i]< 0 || offsetarry[i] >= length) {
796                                                         break;
797                                                 }
798                                                 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
799                                         } else {
800                                                 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
801                                         }
802
803                                         if((header = mhashSearch(oid)) == NULL) {
804                                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
805                                                 if((sendbuffer = calloc(1, size)) == NULL) {
806                                                         printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
807                                                         free(offsetarry);
808                                                         close(sd);
809                                                         return -1;
810                                                 }
811                                                 *((int *) sendbuffer) = size;
812                                                 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
813                                                 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
814
815                                                 control = TRANS_PREFETCH_RESPONSE;
816                                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
817                                                         free(offsetarry);
818                                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
819                                                                         __FILE__, __LINE__);
820                                                         close(sd);
821                                                         return -1;
822                                                 }
823                                                 break;
824                                         } else {/* Obj Found */
825                                                 int incr = 0;
826                                                 GETSIZE(objsize, header);
827                                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
828                                                 if((sendbuffer = calloc(1, size)) == NULL) {
829                                                         printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
830                                                         free(offsetarry);
831                                                         close(sd);
832                                                         return -1;
833                                                 }
834                                                 *((int *) (sendbuffer + incr)) = size;
835                                                 incr += sizeof(int);
836                                                 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
837                                                 incr += sizeof(char);
838                                                 *((unsigned int *)(sendbuffer+incr)) = oid;
839                                                 incr += sizeof(unsigned int);
840                                                 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
841
842                                                 control = TRANS_PREFETCH_RESPONSE;
843                                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
844                                                         free(offsetarry);
845                                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
846                                                                         __func__, __FILE__, __LINE__);
847                                                         close(sd);
848                                                         return -1;
849                                                 }
850                                         }
851                                         isArray = 0;
852                                 }
853                                 free(offsetarry);
854                         }
855                 }
856         }
857         close(sd);
858         return 0;
859 }
860
861 int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
862         int numbytes = 0;
863
864         if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
865                 printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
866                 free(sendbuffer);
867                 return -1;
868         }
869
870         /* Send the buffer with its size */
871         if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) {
872                 printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n",
873                                 __func__, __FILE__, __LINE__, numbytes, *(size));
874                 free(sendbuffer);
875                 return -1;
876         }
877
878         free(sendbuffer);
879         return 0;
880 }
881
882 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
883         objheader_t *header;
884         unsigned int oid;
885         unsigned short newversion;
886         char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
887         int sd;
888         struct sockaddr_in remoteAddr;
889         int bytesSent;
890         int size;
891
892         int i = 0;
893         while(i < numoid) {
894                 oid = *(oidarry + i);
895                 if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
896                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
897                         return;
898                 } else {
899                         /* Check to see if versions are same */
900 checkversion:
901                         if ((STATUS(header) & LOCK) != LOCK) {          
902                                 //FIXME make locking atomic
903                                 STATUS(header) |= LOCK;
904                                 newversion = header->version;
905                                 if(newversion == *(versionarry + i)) {
906                                         //Add to the notify list 
907                                         if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
908                                                 printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
909                                                 return;
910                                         }
911                                         STATUS(header) &= ~(LOCK);              
912                                 } else {
913                                         STATUS(header) &= ~(LOCK);              
914                                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
915                                                 perror("processReqNotify():socket()");
916                                                 return;
917                                         }
918                                         bzero(&remoteAddr, sizeof(remoteAddr));
919                                         remoteAddr.sin_family = AF_INET;
920                                         remoteAddr.sin_port = htons(LISTEN_PORT);
921                                         remoteAddr.sin_addr.s_addr = htonl(mid);
922
923                                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
924                                                 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
925                                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
926                                                 close(sd);
927                                                 return;
928                                         } else {
929                                                 //Send Update notification
930                                                 msg[0] = THREAD_NOTIFY_RESPONSE;
931                                                 *((unsigned int *)&msg[1]) = oid;
932                                                 size = sizeof(unsigned int);
933                                                 *((unsigned short *)(&msg[1]+size)) = newversion;
934                                                 size += sizeof(unsigned short);
935                                                 *((unsigned int *)(&msg[1]+size)) = threadid;
936                                                 bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
937                                                 if (bytesSent < 0){
938                                                         perror("processReqNotify():send()");
939                                                         close(sd);
940                                                         return;
941                                                 } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
942                                                         printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", 
943                                                                         bytesSent, __FILE__, __LINE__);
944                                                         close(sd);
945                                                         return;
946                                                 } else {
947                                                         close(sd);
948                                                         return;
949                                                 }
950
951                                         }
952                                         close(sd);
953                                 }
954                         } else {
955                                 randomdelay();
956                                 goto checkversion;
957                         }
958                 }
959                 i++;
960         }
961         free(oidarry);
962         free(versionarry);
963 }
964