11 #include<netinet/in.h>
12 #include <sys/types.h>
16 #define LISTEN_PORT 2156
17 #define MACHINE_IP "127.0.0.1"
18 #define RECEIVE_BUFFER_SIZE 2048
20 extern int classsize[];
21 plistnode_t *createPiles(transrecord_t *);
22 /* This functions inserts randowm wait delays in the order of msec */
23 void randomdelay(void)
25 struct timespec req, rem;
30 req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
31 nanosleep(&req, &rem);
35 transrecord_t *transStart()
37 transrecord_t *tmp = malloc(sizeof(transrecord_t));
38 tmp->cache = objstrCreate(1048576);
39 tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
43 /* This function finds the location of the objects involved in a transaction
44 * and returns the pointer to the object if found in a remote location */
45 objheader_t *transRead(transrecord_t *record, unsigned int oid)
47 unsigned int machinenumber;
48 objheader_t *tmp, *objheader;
52 /* Search local cache */
53 if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
54 //printf("DEBUG -> transRead oid %d found local\n", oid);
56 } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
57 /* Look up in machine lookup table and copy into cache*/
58 //printf("oid is found in Local machinelookup\n");
59 tmp = mhashSearch(oid);
60 size = sizeof(objheader_t)+classsize[tmp->type];
61 objcopy = objstrAlloc(record->cache, size);
62 memcpy(objcopy, (void *)tmp, size);
63 /* Insert into cache's lookup table */
64 chashInsert(record->lookupTable, objheader->oid, objcopy);
66 } else { /* If not found in machine look up */
67 /* Get the object from the remote location */
68 machinenumber = lhashSearch(oid);
69 objcopy = getRemoteObj(record, machinenumber, oid);
71 //If object is not found in Remote location
72 //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
76 //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
81 /* This function creates objects in the transaction record */
82 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
84 objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
85 tmp->oid = getNewOID();
88 tmp->rcount = 0; //? not sure how to handle this yet
91 chashInsert(record->lookupTable, tmp->oid, tmp);
95 plistnode_t *createPiles(transrecord_t *record) {
97 unsigned int size;/* Represents number of bins in the chash table */
98 chashlistnode_t *curr, *ptr, *next;
99 plistnode_t *pile = NULL;
100 unsigned int machinenum;
101 objheader_t *headeraddr;
103 ptr = record->lookupTable->table;
104 size = record->lookupTable->size;
106 for(i = 0; i < size ; i++) {
108 /* Inner loop to traverse the linked list of the cache lookupTable */
109 while(curr != NULL) {
110 //if the first bin in hash table is empty
115 //Get machine location for object id
117 if ((machinenum = lhashSearch(curr->key)) == 0) {
118 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
122 if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
123 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
126 //Make machine groups
127 if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
128 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
137 /* This function initiates the transaction commit process
138 * Spawns threads for each of the new connections with Participants
139 * by creating new piles,
140 * Fills the piles with necesaary information and
141 * Sends a transrequest() to each pile*/
142 int transCommit(transrecord_t *record) {
143 unsigned int tot_bytes_mod, *listmid;
146 int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
147 char buffer[RECEIVE_BUFFER_SIZE],control;
148 char transid[TID_LEN];
149 trans_req_data_t *tosend;
150 static int newtid = 0;
151 char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
153 /* Look through all the objects in the transaction record and make piles
154 * for each machine involved in the transaction*/
155 pile = createPiles(record);
157 /* Create the packet to be sent in TRANS_REQUEST */
159 /* Count the number of participants */
160 pilecount = pCount(pile);
162 /* Create a list of machine ids(Participants) involved in transaction */
163 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
164 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
167 pListMid(pile, listmid);
170 /* Initialize thread variables,
171 * Spawn a thread for each Participant involved in a transaction */
172 pthread_t thread[pilecount];
174 pthread_cond_t tcond;
175 pthread_mutex_t tlock;
176 pthread_mutex_t tlshrd;
178 thread_data_array_t *thread_data_array;
179 thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
180 thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
182 /* Initialize and set thread detach attribute */
183 pthread_attr_init(&attr);
184 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
185 pthread_mutex_init(&tlock, NULL);
186 pthread_cond_init(&tcond, NULL);
188 /* Process each machine pile */
189 while(pile != NULL) {
190 //Create transaction id
192 //trans_req_data_t *tosend;
193 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
194 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
197 tosend->f.control = TRANS_REQUEST;
198 sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
199 tosend->f.mcount = pilecount;
200 tosend->f.numread = pile->numread;
201 tosend->f.nummod = pile->nummod;
202 tosend->f.sum_bytes = pile->sum_bytes;
203 tosend->listmid = listmid;
204 tosend->objread = pile->objread;
205 tosend->oidmod = pile->oidmod;
206 thread_data_array[numthreads].thread_id = numthreads;
207 thread_data_array[numthreads].mid = pile->mid;
208 thread_data_array[numthreads].pilecount = pilecount;
209 thread_data_array[numthreads].buffer = tosend;
210 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
211 thread_data_array[numthreads].threshold = &tcond;
212 thread_data_array[numthreads].lock = &tlock;
213 thread_data_array[numthreads].count = &trecvcount;
214 thread_data_array[numthreads].replyctrl = &treplyctrl;
215 thread_data_array[numthreads].replyretry = &treplyretry;
216 thread_data_array[numthreads].rec = record;
218 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
220 perror("Error in pthread create");
228 /* Free attribute and wait for the other threads */
229 pthread_attr_destroy(&attr);
230 for (i = 0 ;i < pilecount ; i++) {
231 rc = pthread_join(thread[i], NULL);
234 printf("ERROR return code from pthread_join() is %d\n", rc);
240 pthread_cond_destroy(&tcond);
241 pthread_mutex_destroy(&tlock);
246 /* Retry trans commit procedure if not sucessful in the first try */
247 if(treplyretry == 1) {
248 /* wait a random amount of time */
251 /* Retry the commiting transaction again */
258 /* This function sends information involved in the transaction request and
259 * accepts a response from particpants.
260 * It calls decideresponse() to decide on what control message
261 * to send next and sends the message using sendResponse()*/
262 void *transRequest(void *threadarg) {
264 struct sockaddr_in serv_addr;
265 struct hostent *server;
266 thread_data_array_t *tdata;
267 objheader_t *headeraddr;
268 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
269 char machineip[16], retval;
271 tdata = (thread_data_array_t *) threadarg;
273 /* Send Trans Request */
274 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
275 perror("Error in socket for TRANS_REQUEST\n");
278 bzero((char*) &serv_addr, sizeof(serv_addr));
279 serv_addr.sin_family = AF_INET;
280 serv_addr.sin_port = htons(LISTEN_PORT);
281 midtoIP(tdata->mid,machineip);
282 machineip[15] = '\0';
283 serv_addr.sin_addr.s_addr = inet_addr(machineip);
284 /* Open Connection */
285 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
286 perror("Error in connect for TRANS_REQUEST\n");
290 printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
291 /* Send bytes of data with TRANS_REQUEST control message */
292 if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
293 perror("Error sending fixed bytes for thread\n");
296 /* Send list of machines involved in the transaction */
298 int size=sizeof(unsigned int)*tdata->pilecount;
299 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
300 perror("Error sending list of machines for thread\n");
304 /* Send oids and version number tuples for objects that are read */
306 int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
307 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
308 perror("Error sending tuples for thread\n");
312 /* Send objects that are modified */
313 for(i = 0; i < tdata->buffer->f.nummod ; i++) {
315 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
316 size=sizeof(objheader_t)+classsize[headeraddr->type];
317 if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
318 perror("Error sending obj modified for thread\n");
323 /* Read control message from Participant */
324 if((n = read(sd, &control, sizeof(char))) <= 0) {
325 perror("Error in reading control message from Participant\n");
328 recvcontrol = control;
330 /* Update common data structure and increment count */
331 tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
333 /* Lock and update count */
334 //Thread sleeps until all messages from pariticipants are received by coordinator
335 pthread_mutex_lock(tdata->lock);
337 (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
339 /* Wake up the threads and invoke decideResponse (once) */
340 if(*(tdata->count) == tdata->pilecount) {
341 if (decideResponse(tdata) != 0) {
342 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
343 pthread_mutex_unlock(tdata->lock);
347 pthread_cond_broadcast(tdata->threshold);
349 pthread_cond_wait(tdata->threshold, tdata->lock);
352 pthread_mutex_unlock(tdata->lock);
354 /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
355 * to all participants in their respective socket */
356 if (sendResponse(tdata, sd) == 0) {
357 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
358 pthread_mutex_unlock(tdata->lock);
363 /* Close connection */
368 /* This function decides the reponse that needs to be sent to
369 * all Participant machines involved in the transaction commit */
370 int decideResponse(thread_data_array_t *tdata) {
372 int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
375 //Check common data structure
376 for (i = 0 ; i < tdata->pilecount ; i++) {
377 /*Switch on response from Participant */
378 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
379 written onto the shared array */
382 printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
387 printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
391 case TRANS_SOFT_ABORT:
392 printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
396 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
401 /* Decide what control message to send to Participant */
402 if(transdisagree > 0) {
404 *(tdata->replyctrl) = TRANS_ABORT;
405 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
406 objstrDelete(tdata->rec->cache);
407 chashDelete(tdata->rec->lookupTable);
409 } else if(transagree == tdata->pilecount){
411 *(tdata->replyctrl) = TRANS_COMMIT;
412 printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
413 objstrDelete(tdata->rec->cache);
414 chashDelete(tdata->rec->lookupTable);
416 } else if(transsoftabort > 0 && transdisagree == 0) {
417 /* Send Abort in soft abort case followed by retry commiting transaction again*/
418 *(tdata->replyctrl) = TRANS_ABORT;
419 *(tdata->replyretry) = 1;
420 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
422 printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
428 /* This function sends the final response to all threads in their respective socket id */
429 char sendResponse(thread_data_array_t *tdata, int sd) {
430 int n, N, sum, oidcount = 0;
431 char *ptr, retval = 0;
432 unsigned int *oidnotfound;
434 /* If the decided response is due to a soft abort and missing objects at the Participant's side */
435 if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
436 /* Read list of objects missing */
437 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
438 N = oidcount * sizeof(unsigned int);
439 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
440 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
442 ptr = (char *) oidnotfound;
444 n = read(sd, ptr+sum, N-sum);
446 } while(sum < N && n !=0);
448 retval = TRANS_SOFT_ABORT;
450 /* If the decided response is TRANS_ABORT */
451 if(*(tdata->replyctrl) == TRANS_ABORT) {
452 retval = TRANS_ABORT;
454 /* If the decided response is TRANS_COMMIT */
455 if(*(tdata->replyctrl) == TRANS_COMMIT) {
456 retval = TRANS_COMMIT;
458 /* Send response to the Participant */
459 if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
460 perror("Error sending ctrl message for participant\n");
466 /* This function opens a connection, places an object read request to the
467 * remote machine, reads the control message and object if available and
468 * copies the object and its header to the local cache.
469 * TODO replace mnum and midtoIP() with MACHINE_IP address later */
471 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
473 struct sockaddr_in serv_addr;
474 struct hostent *server;
480 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
481 perror("Error in socket\n");
484 bzero((char*) &serv_addr, sizeof(serv_addr));
485 serv_addr.sin_family = AF_INET;
486 serv_addr.sin_port = htons(LISTEN_PORT);
487 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
488 midtoIP(mnum,machineip);
489 machineip[15] = '\0';
490 serv_addr.sin_addr.s_addr = inet_addr(machineip);
491 /* Open connection */
492 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
493 perror("Error in connect\n");
496 char readrequest[sizeof(char)+sizeof(unsigned int)];
497 readrequest[0] = READ_REQUEST;
498 *((unsigned int *)(&readrequest[1])) = oid;
499 if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
500 perror("Error sending message\n");
505 printf("DEBUG -> ready to rcv ...\n");
507 /* Read response from the Participant */
508 if((val = read(sd, &control, sizeof(char))) <= 0) {
509 perror("No control response for getRemoteObj sent\n");
513 case OBJECT_NOT_FOUND:
514 printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
517 /* Read object if found into local cache */
518 if((val = read(sd, &size, sizeof(int))) <= 0) {
519 perror("No size is read from the participant\n");
522 objcopy = objstrAlloc(record->cache, size);
523 if((val = read(sd, objcopy, size)) <= 0) {
524 perror("No objects are read from the remote participant\n");
527 /* Insert into cache's lookup table */
528 chashInsert(record->lookupTable, oid, objcopy);
531 printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
534 /* Close connection */