Fixed bugs
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include<pthread.h>
8 #include<sys/types.h>
9 #include<sys/socket.h>
10 #include<netdb.h>
11 #include<netinet/in.h>
12 #include <sys/types.h>
13 #include <unistd.h>
14 #include <time.h>
15
16 #define LISTEN_PORT 2156
17 #define MACHINE_IP "127.0.0.1"
18 #define RECEIVE_BUFFER_SIZE 2048
19
20 extern int classsize[];
21
22 void randomdelay(void)
23 {
24         struct timespec req, rem;
25         time_t t;
26
27         t = time(NULL);
28         req.tv_sec = 0;
29         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
30         nanosleep(&req, &rem);
31         return;
32 }
33
34 transrecord_t *transStart()
35 {
36         transrecord_t *tmp = malloc(sizeof(transrecord_t));
37         tmp->cache = objstrCreate(1048576);
38         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
39         return tmp;
40 }
41
42 objheader_t *transRead(transrecord_t *record, unsigned int oid)
43 {       
44         unsigned int machinenumber;
45         objheader_t *tmp, *objheader;
46         void *objcopy;
47         int size;
48         void *buf;
49                 //check cache
50         if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
51                 //printf("DEBUG -> transRead oid %d found local\n", oid);
52                 return(objheader);
53         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
54                 //Look up in Machine lookup table and found
55
56                 //printf("oid is found in Local machinelookup\n");
57                 tmp = mhashSearch(oid);
58                 size = sizeof(objheader_t)+classsize[tmp->type];
59                 //Copy into cache
60                 objcopy = objstrAlloc(record->cache, size);
61                 memcpy(objcopy, (void *)tmp, size);
62                 //Insert into cache's lookup table
63                 chashInsert(record->lookupTable, objheader->oid, objcopy); 
64                 return(objcopy);
65         } else {
66                 //Get the object from the remote location
67                 //printf("oid is found in remote machine\n");
68                 machinenumber = lhashSearch(oid);
69                 objcopy = getRemoteObj(record, machinenumber, oid);
70                 if(objcopy == NULL) {
71                         //If object is not found in Remote location
72                         //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
73                         return NULL;
74                 }
75                 else {
76                         //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
77                         return(objcopy);
78                 }
79         } 
80 }
81
82 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
83 {
84         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
85         tmp->oid = getNewOID();
86         tmp->type = type;
87         tmp->version = 1;
88         tmp->rcount = 0; //? not sure how to handle this yet
89         tmp->status = 0;
90         tmp->status |= NEW;
91         chashInsert(record->lookupTable, tmp->oid, tmp);
92         return tmp;
93 }
94
95 //This function decides the reponse that needs to be sent to all other machines involved in a 
96 //transaction by the machine that initiated the transaction request
97
98 int decideResponse(thread_data_array_t *tdata) {
99         char control;
100         int i, transagree = 0, transdisagree = 0, transsoftabort = 0;
101
102         //Check common data structure 
103         for (i = 0 ; i < tdata->pilecount ; i++) {
104                 //Switch case
105                 control = tdata->recvmsg[i].rcv_status;
106                 switch(control) {
107                         case TRANS_DISAGREE:
108                                 printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
109                                 transdisagree++;
110                                 break;
111
112                         case TRANS_AGREE:
113                                 printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
114                                 transagree++;
115                                 break;
116                                 
117                         case TRANS_SOFT_ABORT:
118                                 printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
119                                 transsoftabort++;
120                                 break;
121                         default:
122                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
123                                 return -1;
124                 }
125         }
126         
127         //Decide what control message to send to Participant    
128         if(transdisagree > 0) {
129                 //Send Abort
130                 *(tdata->replyctrl) = TRANS_ABORT;
131                 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
132                 objstrDelete(tdata->rec->cache);
133                 chashDelete(tdata->rec->lookupTable);
134                 free(tdata->rec);
135         } else if(transagree == tdata->pilecount){
136                 //Send Commit
137                 *(tdata->replyctrl) = TRANS_COMMIT;
138                 printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
139                 objstrDelete(tdata->rec->cache);
140                 chashDelete(tdata->rec->lookupTable);
141                 free(tdata->rec);
142         } else if(transsoftabort > 0 && transdisagree == 0) {
143                 //Send Abort
144                 *(tdata->replyctrl) = TRANS_ABORT;
145                 *(tdata->replyretry) = 1;
146                 //objstrDelete(tdata->rec->cache);
147                 //chashDelete(tdata->rec->lookupTable);
148                 //free(tdata->rec);
149                 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
150         } else {
151                 printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
152                 return -1;
153         }
154         
155         return 0;
156 }
157 //This function sends the final response to all threads in their respective socket id 
158 char sendResponse(thread_data_array_t *tdata, int sd) {
159         int n, N, sum, oidcount = 0;
160         char *ptr, retval = 0;
161         unsigned int *oidnotfound;
162
163         //If the decided response is due to a soft abort and missing objects at the Participant's side
164         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
165                 //Read list of objects missing
166                 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
167                         //Break if only objs are locked at the Participant side
168                         N = oidcount * sizeof(unsigned int);
169                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
170                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
171                         }
172                         ptr = (char *) oidnotfound;
173                         do {
174                                 n = read(sd, ptr+sum, N-sum);
175                                 sum += n;
176                         } while(sum < N && n !=0);
177                 }
178                 retval =  TRANS_SOFT_ABORT;
179         }
180         //If the decided response is TRANS_ABORT
181         if(*(tdata->replyctrl) == TRANS_ABORT) {
182                 retval = TRANS_ABORT;
183         }
184         if(*(tdata->replyctrl) == TRANS_COMMIT) {
185                 retval = TRANS_COMMIT;
186         }
187         // Send response to the Participant
188         if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
189                 perror("Error sending ctrl message for participant\n");
190         }
191
192         return retval;
193 }
194
195 void *transRequest(void *threadarg) {
196         int sd, i, n;
197         struct sockaddr_in serv_addr;
198         struct hostent *server;
199         thread_data_array_t *tdata;
200         objheader_t *headeraddr;
201         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
202         char machineip[16], retval;
203
204         tdata = (thread_data_array_t *) threadarg;
205         //Send Trans Request
206         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
207                 perror("Error in socket for TRANS_REQUEST\n");
208                 return NULL;
209         }
210         bzero((char*) &serv_addr, sizeof(serv_addr));
211         serv_addr.sin_family = AF_INET;
212         serv_addr.sin_port = htons(LISTEN_PORT);
213         midtoIP(tdata->mid,machineip);
214         machineip[15] = '\0';
215         serv_addr.sin_addr.s_addr = inet_addr(machineip);
216
217         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
218                 perror("Error in connect for TRANS_REQUEST\n");
219                 return NULL;
220         }
221         
222         printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
223         //Send bytes of data with TRANS_REQUEST control message
224         if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
225                 perror("Error sending fixed bytes for thread\n");
226                 return NULL;
227         }
228         //Send list of machines involved in the transaction
229         {
230           int size=sizeof(unsigned int)*tdata->pilecount;
231           if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
232             perror("Error sending list of machines for thread\n");
233             return NULL;
234           }
235         }
236         //Send oids and version number tuples for objects that are read
237         {
238           int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
239           if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
240             perror("Error sending tuples for thread\n");
241             return NULL;
242           }
243         }
244         //Send objects that are modified
245         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
246           int size;
247           headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
248           size=sizeof(objheader_t)+classsize[headeraddr->type];
249           if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
250             perror("Error sending obj modified for thread\n");
251             return NULL;
252           }
253         }
254
255         //Read message  control message from participant side
256         if((n = read(sd, &control, sizeof(char))) <= 0) {
257                 perror("Error in reading control message from Participant\n");
258                 return NULL;
259         }
260         recvcontrol = control;
261         
262         //Update common data structure and increment count
263         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
264         //Lock and update count
265         //Thread sleeps until all messages from pariticipants are received by coordinator
266         pthread_mutex_lock(tdata->lock);
267
268         (*(tdata->count))++;
269         
270         if(*(tdata->count) == tdata->pilecount) {
271                 if (decideResponse(tdata) != 0) { 
272                         printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
273                         pthread_mutex_unlock(tdata->lock);
274                         close(sd);
275                         return NULL;
276                 }
277                 pthread_cond_broadcast(tdata->threshold);
278         } else {
279                 pthread_cond_wait(tdata->threshold, tdata->lock);
280         }       
281
282         pthread_mutex_unlock(tdata->lock);
283         
284         if (sendResponse(tdata, sd) == 0) { 
285                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
286                 pthread_mutex_unlock(tdata->lock);
287                 close(sd);
288                 return NULL;
289         }
290         close(sd);
291         pthread_exit(NULL);
292 }
293
294 int transCommit(transrecord_t *record) {        
295         chashlistnode_t *curr, *ptr, *next;
296         unsigned int size;//Represents number of bins in the chash table
297         unsigned int machinenum, tot_bytes_mod, *listmid;
298         objheader_t *headeraddr;
299         plistnode_t *tmp, *pile = NULL;
300         int i, rc;
301         int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
302         char buffer[RECEIVE_BUFFER_SIZE],control;
303         char transid[TID_LEN];
304         static int newtid = 0;
305         trans_req_data_t *tosend;
306         char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent
307
308         ptr = record->lookupTable->table;
309         size = record->lookupTable->size;
310         //Look through all the objects in the cache and make piles
311         for(i = 0; i < size ;i++) {
312                 curr = &ptr[i];
313                 //Inner loop to traverse the linked list of the cache lookupTable
314                 while(curr != NULL) {
315                         //if the first bin in hash table is empty
316                         if(curr->key == 0) {
317                                 break;
318                         }
319                         next = curr->next;
320                         //Get machine location for object id
321                         
322                         if ((machinenum = lhashSearch(curr->key)) == 0) {
323                                printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
324                                return 1;
325                         }
326                                         
327                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
328                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
329                                 return 1;
330                         }
331                         //Make machine groups
332                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
333                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
334                                 return 1;
335                         }
336                         curr = next;
337                 }
338         }
339
340         //Create the packet to be sent in TRANS_REQUEST
341         tmp = pile;
342         pilecount = pCount(pile);               //Keeps track of the number of participants
343         
344         //Thread related variables
345         pthread_t thread[pilecount];            //Create threads for each participant
346         pthread_attr_t attr;                    
347         pthread_cond_t tcond;
348         pthread_mutex_t tlock;
349         pthread_mutex_t tlshrd;
350         //thread_data_array_t thread_data_array[pilecount];
351         thread_data_array_t *thread_data_array;
352
353         thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
354         
355         thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
356         
357         //Initialize and set thread detach attribute
358         pthread_attr_init(&attr);
359         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
360         pthread_mutex_init(&tlock, NULL);
361         pthread_cond_init(&tcond, NULL);
362         
363         //Keep track of list of machine ids per transaction     
364         if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
365                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
366                 return 1;
367         }
368                                 
369         pListMid(pile, listmid);
370         //Process each machine group
371         //Should be a new function for while loop
372         while(tmp != NULL) {
373                 //Create transaction id
374                 newtid++;
375                 //trans_req_data_t *tosend;
376                 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
377                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
378                         return 1;
379                 }
380                 tosend->f.control = TRANS_REQUEST;
381                 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
382                 tosend->f.mcount = pilecount;
383                 tosend->f.numread = tmp->numread;
384                 tosend->f.nummod = tmp->nummod;
385                 tosend->f.sum_bytes = tmp->sum_bytes;
386                 tosend->listmid = listmid;
387                 tosend->objread = tmp->objread;
388                 tosend->oidmod = tmp->oidmod;
389                 thread_data_array[numthreads].thread_id = numthreads;
390                 thread_data_array[numthreads].mid = tmp->mid;
391                 thread_data_array[numthreads].pilecount = pilecount;
392                 thread_data_array[numthreads].buffer = tosend;
393                 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
394                 thread_data_array[numthreads].threshold = &tcond;
395                 thread_data_array[numthreads].lock = &tlock;
396                 thread_data_array[numthreads].count = &trecvcount;
397                 thread_data_array[numthreads].replyctrl = &treplyctrl;
398                 thread_data_array[numthreads].replyretry = &treplyretry;
399                 thread_data_array[numthreads].rec = record;
400
401                 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
402                 if (rc) {
403                         perror("Error in pthread create");
404                         return 1;
405                 }               
406                 numthreads++;           
407                 //TODO frees 
408                 tmp = tmp->next;
409         }
410
411         // Free attribute and wait for the other threads
412         pthread_attr_destroy(&attr);
413         for (i = 0 ;i < pilecount ; i++) {
414                 rc = pthread_join(thread[i], NULL);
415                 if (rc)
416                 {
417                         printf("ERROR return code from pthread_join() is %d\n", rc);
418                         return 1;
419                 }
420         }
421         
422         //Free resources        
423         pthread_cond_destroy(&tcond);
424         pthread_mutex_destroy(&tlock);
425
426         free(tosend);
427         free(listmid);
428         pDelete(pile);
429         if(treplyretry == 1) {
430                 //wait a random amount of time
431                 randomdelay();
432                 //sleep(1);
433                 //Retry the commiting transaction again
434                 transCommit(record);
435         }       
436         
437         return 0;
438 }
439
440 //mnun will be used to represent the machine IP address later
441 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
442         int sd, size, val;
443         struct sockaddr_in serv_addr;
444         struct hostent *server;
445         char control;
446         char machineip[16];
447         objheader_t *h;
448         void *objcopy;
449
450         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
451                 perror("Error in socket\n");
452                 return NULL;
453         }
454         bzero((char*) &serv_addr, sizeof(serv_addr));
455         serv_addr.sin_family = AF_INET;
456         serv_addr.sin_port = htons(LISTEN_PORT);
457         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
458         midtoIP(mnum,machineip);
459         machineip[15] = '\0';
460         serv_addr.sin_addr.s_addr = inet_addr(machineip);
461
462         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
463                 perror("Error in connect\n");
464                 return NULL;
465         }
466         char readrequest[sizeof(char)+sizeof(unsigned int)];
467         readrequest[0] = READ_REQUEST;
468         *((unsigned int *)(&readrequest[1])) = oid;
469         if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
470                 perror("Error sending message\n");
471                 return NULL;
472         }
473
474 #ifdef DEBUG1
475         printf("DEBUG -> ready to rcv ...\n");
476 #endif
477         //Read response from the Participant
478         if((val = read(sd, &control, sizeof(char))) <= 0) {
479                 perror("No control response for getRemoteObj sent\n");
480                 return NULL;
481         }
482         switch(control) {
483                 case OBJECT_NOT_FOUND:
484                         printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
485                         return NULL;
486                 case OBJECT_FOUND:
487                         if((val = read(sd, &size, sizeof(int))) <= 0) {
488                                 perror("No size is read from the participant\n");
489                                 return NULL;
490                         }
491                         objcopy = objstrAlloc(record->cache, size);
492                         if((val = read(sd, objcopy, size)) <= 0) {
493                                 perror("No objects are read from the remote participant\n");
494                                 return NULL;
495                         }
496                         //Insert into cache's lookup table
497                         chashInsert(record->lookupTable, oid, objcopy); 
498                         break;
499                 default:
500                         printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
501                         return NULL;
502         }
503         close(sd);
504         return objcopy;
505 }