added new testcases
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include<pthread.h>
8 #include<sys/types.h>
9 #include<sys/socket.h>
10 #include<netdb.h>
11 #include<netinet/in.h>
12
13 #define LISTEN_PORT 2156
14 #define MACHINE_IP "127.0.0.1"
15 #define RECEIVE_BUFFER_SIZE 2048
16
17 extern int classsize[];
18
19 transrecord_t *transStart()
20 {
21         transrecord_t *tmp = malloc(sizeof(transrecord_t));
22         tmp->cache = objstrCreate(1048576);
23         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
24         return tmp;
25 }
26
27 objheader_t *transRead(transrecord_t *record, unsigned int oid)
28 {       
29         printf("Enter TRANS_READ\n");
30         unsigned int machinenumber;
31         objheader_t *tmp, *objheader;
32         void *objcopy;
33         int size;
34         void *buf;
35                 //check cache
36         if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
37                 printf("DEBUG -> transRead oid %d found local\n", oid);
38                 return(objheader);
39         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
40                 //Look up in Machine lookup table and found
41                 printf("oid not found in local cache\n");
42                 tmp = mhashSearch(oid);
43                 size = sizeof(objheader_t)+classsize[tmp->type];
44                 //Copy into cache
45                 objcopy = objstrAlloc(record->cache, size);
46                 memcpy(objcopy, (void *)tmp, size);
47                 //Insert into cache's lookup table
48                 chashInsert(record->lookupTable, objheader->oid, objcopy); 
49                 return(objcopy);
50         } else {
51                 //Get the object from the remote location
52                 //printf("oid not found in local machine lookup\n");
53                 printf("machinenumber = %d\n",machinenumber);
54                 printf("oid = %d\n",oid);
55                 machinenumber = lhashSearch(oid);
56                 printf("machinenumber = %d\n",machinenumber);
57                 objcopy = getRemoteObj(record, machinenumber, oid);
58                 if(objcopy == NULL) {
59                         //If object is not found in Remote location
60                         printf("Object not found in Machine %d\n", machinenumber);
61                         return NULL;
62                 }
63                 else
64                         return(objcopy);
65         } 
66 }
67
68 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
69 {
70         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
71         tmp->oid = getNewOID();
72         tmp->type = type;
73         tmp->version = 1;
74         tmp->rcount = 0; //? not sure how to handle this yet
75         tmp->status = 0;
76         tmp->status |= NEW;
77         chashInsert(record->lookupTable, tmp->oid, tmp);
78         return tmp;
79 }
80 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
81 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
82         int i, n, N, sum, oidcount = 0;
83         int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
84         char ctrl, control, *ptr;
85         unsigned int *oidnotfound;
86         objheader_t *header;
87
88         //Check common data structure 
89         for (i = 0 ; i < tdata->pilecount ; i++) {
90                 //Switch case
91                 control = tdata->recvmsg[i].rcv_status;
92                 switch(control) {
93                         case TRANS_DISAGREE:
94                                 printf("DEBUG-> Inside TRANS_DISAGREE\n");
95                                 transdisagree++;
96                                 //Free transaction records
97                                 objstrDelete(tdata->rec->cache);
98                                 chashDelete(tdata->rec->lookupTable);
99                                 free(tdata->rec);
100                                 //send Abort
101                                 ctrl = TRANS_ABORT;
102                                 if (write(sd, &ctrl, sizeof(char)) < 0) {
103                                         perror("Error sending ctrl message for participant\n");
104                                         return 1;
105                                 }
106                                 return 0;
107
108                         case TRANS_AGREE:
109                                 printf("DEBUG-> Inside TRANS_AGREE\n");
110                                 transagree++;
111                                 break;
112                                 
113                         case TRANS_SOFT_ABORT:
114                                 printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
115                                 transsoftabort++;
116                                 /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
117                                 if ((i == tdata->thread_id) && (val == 0)) {
118                                         //Read list of objects missing
119                                         if(read(sd, &oidcount, sizeof(int)) != 0) {
120                                                 //Break if only objs are locked at the Participant side
121                                                 if (oidcount == 0) {
122                                                         break;
123                                                 }
124                                                 transsoftabortmiss++;
125                                                 N = oidcount * sizeof(unsigned int);
126                                                 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
127                                                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
128                                                 }
129                                                 ptr = (char *) oidnotfound;
130                                                 do {
131                                                         n = read(sd, ptr+sum, N-sum);
132                                                         sum += n;
133                                                 } while(sum < N && n !=0);
134                                         }
135                                 }
136
137                                 break;
138                         default:
139                                 printf("Participant sent unknown message\n");
140                 }
141         }
142         
143         //Decide what control message to send to Participant    
144         if(transagree == tdata->pilecount){
145                 //Send Commit
146                 ctrl = TRANS_COMMIT;
147                 printf("Sending TRANS_COMMIT\n");
148                 if (write(sd, &ctrl, sizeof(char)) < 0) {
149                         perror("Error sending ctrl message for participant\n");
150                         return 1;
151                 }
152         }
153
154         if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
155                 //Send abort but retry commit
156                 ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
157                 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
158                 if (write(sd, &ctrl, sizeof(char)) < 0) {
159                         perror("Error sending ctrl message for participant\n");
160                         return 1;
161                 }
162                 //Sleep
163                 sleep(5);
164                 //Read new control message from Participant
165                 n = read(sd, &control, sizeof(char));
166                 
167                 //Update common data structure and increment count
168                 tdata->recvmsg[tdata->thread_id].rcv_status = control;
169                 val = 1;
170                 decideResponse(tdata, sd, val);         //Second call to decideResponse(); indicated by parameter val = 1
171         }
172
173         if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
174                 //Send abort but retry commit after relloking up objects
175                 //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
176                 ctrl = TRANS_ABORT;
177                 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
178                 if (write(sd, &ctrl, sizeof(char)) < 0) {
179                         perror("Error sending ctrl message for participant\n");
180                         return 1;
181                 }
182                 //TODO
183                 //Relook up objects
184                 //update location table
185                 //Free pointers
186                 free(oidnotfound);
187         }
188         
189         return 0;
190 }
191
192 void *transRequest(void *threadarg) {
193         int sd, i, n;
194         struct sockaddr_in serv_addr;
195         struct hostent *server;
196         thread_data_array_t *tdata;
197         objheader_t *headeraddr;
198         //unsigned int *oidnotfound;
199         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
200         char machineip[16];
201
202         tdata = (thread_data_array_t *) threadarg;
203         printf("DEBUG -> New thread id %d\n", tdata->thread_id);
204         //Send Trans Request
205         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
206                 perror("Error in socket for TRANS_REQUEST");
207                 return NULL;
208         }
209         bzero((char*) &serv_addr, sizeof(serv_addr));
210         serv_addr.sin_family = AF_INET;
211         serv_addr.sin_port = htons(LISTEN_PORT);
212         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
213         midtoIP(tdata->mid,machineip);
214         machineip[15] = '\0';
215         serv_addr.sin_addr.s_addr = inet_addr(machineip);
216         //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
217
218         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
219                 perror("Error in connect for TRANS_REQUEST");
220                 return NULL;
221         }
222
223         //Multiple writes for sending packets of data 
224         //Send first few fixed bytes of the TRANS_REQUEST protocol
225         printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
226 //      printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
227 //      printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
228         if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
229                 perror("Error sending fixed bytes for thread");
230                 return NULL;
231         }
232         //Send list of machines involved in the transaction
233 //      printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
234         if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
235                 perror("Error sending list of machines for thread");
236                 return NULL;
237         }
238         //Send oids and version number tuples for objects that are read
239 //      printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
240 //      printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
241         if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
242                 perror("Error sending tuples for thread");
243                 return NULL;
244         }
245         //Send objects that are modified
246         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
247                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
248 //              printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
249                 if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
250                         perror("Error sending obj modified for thread");
251                         return NULL;
252                 }
253         }
254         
255         //Read message  control message from participant side
256         n = read(sd, &control, sizeof(char));
257         recvcontrol = control;
258         printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
259         
260         //Update common data structure and increment count
261         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
262         //Lock and update count
263         //Thread sleeps until all messages from pariticipants are received by coordinator
264         pthread_mutex_lock(tdata->lock);
265                 (*(tdata->count))++;
266         
267         if(*(tdata->count) == tdata->pilecount) {
268                 pthread_cond_broadcast(tdata->threshold);
269                 //process the participant's request
270                 if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
271                         printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
272                         pthread_mutex_unlock(tdata->lock);
273                         return NULL;
274                 }
275         } else {
276                 pthread_cond_wait(tdata->threshold, tdata->lock);
277         }       
278         pthread_mutex_unlock(tdata->lock);
279
280         close(sd);
281         pthread_exit(NULL);
282 }
283
284 int transCommit(transrecord_t *record) {        
285         chashlistnode_t *curr, *ptr, *next;
286         unsigned int size;//Represents number of bins in the chash table
287         unsigned int machinenum, tot_bytes_mod, *listmid;
288         objheader_t *headeraddr;
289         plistnode_t *tmp, *pile = NULL;
290         int i, rc;
291         int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
292         char buffer[RECEIVE_BUFFER_SIZE],control;
293         char transid[TID_LEN];
294         static int newtid = 0;
295
296         ptr = record->lookupTable->table;
297         size = record->lookupTable->size;
298         //Look through all the objects in the cache and make piles
299         for(i = 0; i < size ;i++) {
300                 curr = &ptr[i];
301                 //Inner loop to traverse the linked list of the cache lookupTable
302                 while(curr != NULL) {
303                         //if the first bin in hash table is empty
304                         if(curr->key == 0) {
305                                 break;
306                         }
307                         next = curr->next;
308                         //Get machine location for object id
309                         
310                         if ((machinenum = lhashSearch(curr->key)) == 0) {
311                                printf("Error: No such machine\n");
312                                return 1;
313                         }
314                                         
315                         //TODO only for debug
316                         //machinenum = 1;
317                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
318                                 printf("Error: No such oid\n");
319                                 return 1;
320                         }
321                         //Make machine groups
322                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
323                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
324                                 return 1;
325                         }
326                         curr = next;
327                 }
328         }
329
330         //Create the packet to be sent in TRANS_REQUEST
331         tmp = pile;
332         pilecount = pCount(pile);               //Keeps track of the number of participants
333         
334         //Thread related variables
335         pthread_t thread[pilecount];            //Create threads for each participant
336         pthread_attr_t attr;                    
337         pthread_cond_t tcond;
338         pthread_mutex_t tlock;
339         pthread_mutex_t tlshrd;
340         thread_data_array_t thread_data_array[pilecount];
341         
342         thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
343         
344         //Initialize and set thread detach attribute
345         pthread_attr_init(&attr);
346         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
347         pthread_mutex_init(&tlock, NULL);
348         pthread_cond_init(&tcond, NULL);
349         
350         //Keep track of list of machine ids per transaction     
351         if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
352                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
353                 return 1;
354         }
355                                 
356         pListMid(pile, listmid);
357         //Process each machine group
358         while(tmp != NULL) {
359                 printf("DEBUG -> Created thread %d... \n", numthreads);
360                 //Create transaction id
361                 newtid++;
362                 trans_req_data_t *tosend;
363                 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
364                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
365                         return 1;
366                 }
367                 tosend->f.control = TRANS_REQUEST;
368                 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
369                 tosend->f.mcount = pilecount;
370                 tosend->f.numread = tmp->numread;
371                 tosend->f.nummod = tmp->nummod;
372                 tosend->f.sum_bytes = tmp->sum_bytes;
373                 tosend->listmid = listmid;
374                 tosend->objread = tmp->objread;
375                 tosend->oidmod = tmp->oidmod;
376                 thread_data_array[numthreads].thread_id = numthreads;
377                 thread_data_array[numthreads].mid = tmp->mid;
378                 thread_data_array[numthreads].pilecount = pilecount;
379                 thread_data_array[numthreads].buffer = tosend;
380                 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
381                 thread_data_array[numthreads].threshold = &tcond;
382                 thread_data_array[numthreads].lock = &tlock;
383                 thread_data_array[numthreads].count = &trecvcount;
384                 thread_data_array[numthreads].rec = record;
385
386                 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
387                 if (rc) {
388                         perror("Error in pthread create");
389                         return 1;
390                 }               
391                 numthreads++;           
392                 //TODO frees 
393                 free(tosend);
394                 tmp = tmp->next;
395         }
396
397         // Free attribute and wait for the other threads
398         pthread_attr_destroy(&attr);
399         for (i = 0 ;i < pilecount ; i++) {
400                 rc = pthread_join(thread[i], NULL);
401                 if (rc)
402                 {
403                         printf("ERROR return code from pthread_join() is %d\n", rc);
404                         return 1;
405                 }
406         }
407         
408         //Free resources        
409         pthread_cond_destroy(&tcond);
410         pthread_mutex_destroy(&tlock);
411         free(listmid);
412         pDelete(pile);
413         return 0;
414 }
415
416 //mnun will be used to represent the machine IP address later
417 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
418         int sd, size;
419         struct sockaddr_in serv_addr;
420         struct hostent *server;
421         char control;
422         char machineip[16];
423         objheader_t *h;
424         void *objcopy;
425
426         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
427                 perror("Error in socket");
428                 return NULL;
429         }
430         bzero((char*) &serv_addr, sizeof(serv_addr));
431         serv_addr.sin_family = AF_INET;
432         serv_addr.sin_port = htons(LISTEN_PORT);
433         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
434         midtoIP(mnum,machineip);
435         machineip[15] = '\0';
436         serv_addr.sin_addr.s_addr = inet_addr(machineip);
437
438         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
439                 perror("Error in connect");
440                 return NULL;
441         }
442         char readrequest[sizeof(char)+sizeof(unsigned int)];
443         readrequest[0] = READ_REQUEST;
444         *((unsigned int *)(&readrequest[1])) = oid;
445         if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
446                 perror("Error sending message");
447                 return NULL;
448         }
449
450 #ifdef DEBUG1
451         printf("DEBUG -> ready to rcv ...\n");
452 #endif
453         //Read response from the Participant
454         read(sd, &control, sizeof(char));
455         switch(control) {
456                 case OBJECT_NOT_FOUND:
457                         return NULL;
458                         break;
459                 case OBJECT_FOUND:
460                         read(sd, &size, sizeof(int));
461                         objcopy = objstrAlloc(record->cache, size);
462                         read(sd, objcopy, size);                
463                         //Insert into cache's lookup table
464                         chashInsert(record->lookupTable, oid, objcopy); 
465                         break;
466                 default:
467                         printf("Error in recv request from participant on a READ_REQUEST\n");
468                         return NULL;
469         }
470         return objcopy;
471 }