10 #include <sys/socket.h>
12 #include <netinet/in.h>
13 #include <sys/types.h>
19 #define LISTEN_PORT 2156
20 #define RECEIVE_BUFFER_SIZE 2048
21 #define NUM_THREADS 10
22 #define PREFETCH_CACHE_SIZE 1048576 //1MB
24 #define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
25 #define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
26 #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
27 #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
29 /* Global Variables */
30 extern int classsize[];
31 extern primarypfq_t pqueue; // shared prefetch queue
32 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
34 extern objstr_t *mainobjstore;
35 objstr_t *prefetchcache;
37 plistnode_t *createPiles(transrecord_t *);
38 inline int arrayLength(int *array) {
40 for(i=0 ;array[i] != -1; i++)
44 /* This function is a prefetch call generated by the compiler that
45 * populates the shared primary prefetch queue*/
46 void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
50 /* Allocate for the queue node*/
52 qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
53 if((node = calloc(1,qnodesize)) == NULL) {
54 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
57 /* Set queue node values */
58 len = sizeof(prefetchqelem_t);
59 memcpy(node + len, &ntuples, sizeof(int));
61 memcpy(node + len, oids, ntuples*sizeof(unsigned int));
62 len += ntuples * sizeof(unsigned int);
63 memcpy(node + len, endoffsets, ntuples*sizeof(short));
64 len += ntuples * sizeof(short);
65 memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
66 /* Lock and insert into primary prefetch queue */
67 pthread_mutex_lock(&pqueue.qlock);
68 enqueue((prefetchqelem_t *)node);
69 pthread_cond_signal(&pqueue.qcond);
70 pthread_mutex_unlock(&pqueue.qlock);
73 /* This function initiates the prefetch thread
74 * A queue is shared between the main thread of execution
75 * and the prefetch thread to process the prefetch call
76 * Call from compiler populates the shared queue with prefetch requests while prefetch thread
77 * processes the prefetch requests */
80 //Create and initialize prefetch cache structure
81 prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
82 //Create prefetch cache lookup table
83 if(prehashCreate(HASH_SIZE, LOADFACTOR))
85 //Initialize primary shared queue
87 //Create the primary prefetch thread
88 pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
89 //Create and Initialize a pool of threads
90 for(t = 0; t< NUM_THREADS; t++) {
91 //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
93 printf("Thread create error %s, %d\n", __FILE__, __LINE__);
99 /* This function stops the threads spawned */
102 pthread_cancel(tPrefetch);
103 for(t = 0; t < NUM_THREADS; t++)
104 pthread_cancel(wthreads[t]);
109 /* This functions inserts randowm wait delays in the order of msec
110 * Mostly used when transaction commits retry*/
111 void randomdelay(void)
113 struct timespec req, rem;
118 req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
119 nanosleep(&req, &rem);
123 /* This function initializes things required in the transaction start*/
124 transrecord_t *transStart()
126 transrecord_t *tmp = malloc(sizeof(transrecord_t));
127 tmp->cache = objstrCreate(1048576);
128 tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
133 /* This function finds the location of the objects involved in a transaction
134 * and returns the pointer to the object if found in a remote location */
135 objheader_t *transRead(transrecord_t *record, unsigned int oid)
137 unsigned int machinenumber;
138 objheader_t *tmp, *objheader;
142 /* Search local cache */
143 if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
145 } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
146 /* Look up in machine lookup table and copy into cache*/
147 // tmp = mhashSearch(oid);
148 size = sizeof(objheader_t)+classsize[tmp->type];
149 objcopy = objstrAlloc(record->cache, size);
150 memcpy(objcopy, (void *)objheader, size);
151 /* Insert into cache's lookup table */
152 chashInsert(record->lookupTable, objheader->oid, objcopy);
154 } else { /* If not found in machine look up */
155 /* Get the object from the remote location */
156 machinenumber = lhashSearch(oid);
157 objcopy = getRemoteObj(record, machinenumber, oid);
158 if(objcopy == NULL) {
159 //If object is not found in Remote location
160 //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
164 //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
170 /* This function creates objects in the transaction record */
171 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
173 objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
174 tmp->oid = getNewOID();
177 tmp->rcount = 0; //? not sure how to handle this yet
180 chashInsert(record->lookupTable, tmp->oid, tmp);
184 /* This function creates machine piles based on all machines involved in a
185 * transaction commit request */
186 plistnode_t *createPiles(transrecord_t *record) {
188 unsigned int size;/* Represents number of bins in the chash table */
189 chashlistnode_t *curr, *ptr, *next;
190 plistnode_t *pile = NULL;
191 unsigned int machinenum;
192 void *localmachinenum;
193 objheader_t *headeraddr;
195 ptr = record->lookupTable->table;
196 size = record->lookupTable->size;
198 for(i = 0; i < size ; i++) {
200 /* Inner loop to traverse the linked list of the cache lookupTable */
201 while(curr != NULL) {
202 //if the first bin in hash table is empty
207 //Get machine location for object id
209 if ((machinenum = lhashSearch(curr->key)) == 0) {
210 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
214 if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
215 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
218 //Make machine groups
219 if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
220 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
224 /* Check if local or not */
225 if((localmachinenum = mhashSearch(curr->key)) != NULL) {
226 pile->local = 1; //True i.e. local
235 /* This function initiates the transaction commit process
236 * Spawns threads for each of the new connections with Participants
237 * and creates new piles by calling the createPiles(),
238 * Fills the piles with necesaary information and
239 * Sends a transrequest() to each pile*/
240 int transCommit(transrecord_t *record) {
241 unsigned int tot_bytes_mod, *listmid;
244 int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0;
245 char buffer[RECEIVE_BUFFER_SIZE],control;
246 char transid[TID_LEN];
247 trans_req_data_t *tosend;
248 trans_commit_data_t transinfo;
249 static int newtid = 0;
250 char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
253 /* Look through all the objects in the transaction record and make piles
254 * for each machine involved in the transaction*/
255 pile = createPiles(record);
257 /* Create the packet to be sent in TRANS_REQUEST */
259 /* Count the number of participants */
260 pilecount = pCount(pile);
262 /* Create a list of machine ids(Participants) involved in transaction */
263 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
264 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
267 pListMid(pile, listmid);
270 /* Initialize thread variables,
271 * Spawn a thread for each Participant involved in a transaction */
272 pthread_t thread[pilecount];
274 pthread_cond_t tcond;
275 pthread_mutex_t tlock;
276 pthread_mutex_t tlshrd;
278 thread_data_array_t *thread_data_array;
279 thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
280 local_thread_data_array_t *ltdata;
281 if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
282 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
286 thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
288 /* Initialize and set thread detach attribute */
289 pthread_attr_init(&attr);
290 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
291 pthread_mutex_init(&tlock, NULL);
292 pthread_cond_init(&tcond, NULL);
294 /* Process each machine pile */
295 while(pile != NULL) {
296 //Create transaction id
298 //trans_req_data_t *tosend;
299 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
300 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
303 tosend->f.control = TRANS_REQUEST;
304 sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
305 tosend->f.mcount = pilecount;
306 tosend->f.numread = pile->numread;
307 tosend->f.nummod = pile->nummod;
308 tosend->f.sum_bytes = pile->sum_bytes;
309 tosend->listmid = listmid;
310 tosend->objread = pile->objread;
311 tosend->oidmod = pile->oidmod;
312 thread_data_array[threadnum].thread_id = threadnum;
313 thread_data_array[threadnum].mid = pile->mid;
314 thread_data_array[threadnum].pilecount = pilecount;
315 thread_data_array[threadnum].buffer = tosend;
316 thread_data_array[threadnum].recvmsg = rcvd_control_msg;
317 thread_data_array[threadnum].threshold = &tcond;
318 thread_data_array[threadnum].lock = &tlock;
319 thread_data_array[threadnum].count = &trecvcount;
320 thread_data_array[threadnum].replyctrl = &treplyctrl;
321 thread_data_array[threadnum].replyretry = &treplyretry;
322 thread_data_array[threadnum].rec = record;
323 /* If local do not create any extra connection */
324 if(pile->local != 1) { /* Not local */
325 rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
327 perror("Error in pthread create\n");
331 /*Unset the pile->local flag*/
333 /*Set flag to identify that Local machine is involved*/
334 ltdata->tdata = &thread_data_array[threadnum];
335 ltdata->transinfo = &transinfo;
336 val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
338 perror("Error in pthread create\n");
346 /* Free attribute and wait for the other threads */
347 pthread_attr_destroy(&attr);
348 for (i = 0 ;i < pilecount ; i++) {
349 rc = pthread_join(thread[i], NULL);
352 printf("ERROR return code from pthread_join() is %d\n", rc);
358 pthread_cond_destroy(&tcond);
359 pthread_mutex_destroy(&tlock);
363 free(thread_data_array);
366 /* Retry trans commit procedure if not sucessful in the first try */
367 if(treplyretry == 1) {
368 /* wait a random amount of time */
371 /* Retry the commiting transaction again */
378 /* This function sends information involved in the transaction request and
379 * accepts a response from particpants.
380 * It calls decideresponse() to decide on what control message
381 * to send next and sends the message using sendResponse()*/
382 void *transRequest(void *threadarg) {
384 struct sockaddr_in serv_addr;
385 struct hostent *server;
386 thread_data_array_t *tdata;
387 objheader_t *headeraddr;
388 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
389 char machineip[16], retval;
391 tdata = (thread_data_array_t *) threadarg;
393 /* Send Trans Request */
394 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
395 perror("Error in socket for TRANS_REQUEST\n");
398 bzero((char*) &serv_addr, sizeof(serv_addr));
399 serv_addr.sin_family = AF_INET;
400 serv_addr.sin_port = htons(LISTEN_PORT);
401 midtoIP(tdata->mid,machineip);
402 machineip[15] = '\0';
403 serv_addr.sin_addr.s_addr = inet_addr(machineip);
404 /* Open Connection */
405 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
406 perror("Error in connect for TRANS_REQUEST\n");
410 printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
411 /* Send bytes of data with TRANS_REQUEST control message */
412 if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
413 perror("Error sending fixed bytes for thread\n");
416 /* Send list of machines involved in the transaction */
418 int size=sizeof(unsigned int)*tdata->pilecount;
419 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
420 perror("Error sending list of machines for thread\n");
424 /* Send oids and version number tuples for objects that are read */
426 int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
427 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
428 perror("Error sending tuples for thread\n");
432 /* Send objects that are modified */
433 for(i = 0; i < tdata->buffer->f.nummod ; i++) {
435 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
436 size=sizeof(objheader_t)+classsize[headeraddr->type];
437 if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
438 perror("Error sending obj modified for thread\n");
443 /* Read control message from Participant */
444 if((n = read(sd, &control, sizeof(char))) <= 0) {
445 perror("Error in reading control message from Participant\n");
448 recvcontrol = control;
450 /* Update common data structure and increment count */
451 tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
453 /* Lock and update count */
454 //Thread sleeps until all messages from pariticipants are received by coordinator
455 pthread_mutex_lock(tdata->lock);
457 (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
459 /* Wake up the threads and invoke decideResponse (once) */
460 if(*(tdata->count) == tdata->pilecount) {
461 if (decideResponse(tdata) != 0) {
462 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
463 pthread_mutex_unlock(tdata->lock);
467 pthread_cond_broadcast(tdata->threshold);
469 pthread_cond_wait(tdata->threshold, tdata->lock);
471 pthread_mutex_unlock(tdata->lock);
473 /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
474 * to all participants in their respective socket */
475 if (sendResponse(tdata, sd) == 0) {
476 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
477 pthread_mutex_unlock(tdata->lock);
482 /* Close connection */
487 /* This function decides the reponse that needs to be sent to
488 * all Participant machines involved in the transaction commit */
489 int decideResponse(thread_data_array_t *tdata) {
491 int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
494 //Check common data structure
495 for (i = 0 ; i < tdata->pilecount ; i++) {
496 /*Switch on response from Participant */
497 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
498 written onto the shared array */
501 printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
506 printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
510 case TRANS_SOFT_ABORT:
511 printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
515 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
520 /* Decide what control message to send to Participant */
521 if(transdisagree > 0) {
523 *(tdata->replyctrl) = TRANS_ABORT;
524 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
525 objstrDelete(tdata->rec->cache);
526 chashDelete(tdata->rec->lookupTable);
528 } else if(transagree == tdata->pilecount){
530 *(tdata->replyctrl) = TRANS_COMMIT;
531 printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
532 objstrDelete(tdata->rec->cache);
533 chashDelete(tdata->rec->lookupTable);
535 } else if(transsoftabort > 0 && transdisagree == 0) {
536 /* Send Abort in soft abort case followed by retry commiting transaction again*/
537 *(tdata->replyctrl) = TRANS_ABORT;
538 *(tdata->replyretry) = 1;
539 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
541 printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
547 /* This function sends the final response to all threads in their respective socket id */
548 char sendResponse(thread_data_array_t *tdata, int sd) {
549 int n, N, sum, oidcount = 0;
550 char *ptr, retval = 0;
551 unsigned int *oidnotfound;
553 /* If the decided response is due to a soft abort and missing objects at the Participant's side */
554 if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
555 /* Read list of objects missing */
556 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
557 N = oidcount * sizeof(unsigned int);
558 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
559 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
561 ptr = (char *) oidnotfound;
563 n = read(sd, ptr+sum, N-sum);
565 } while(sum < N && n !=0);
567 retval = TRANS_SOFT_ABORT;
569 /* If the decided response is TRANS_ABORT */
570 if(*(tdata->replyctrl) == TRANS_ABORT) {
571 retval = TRANS_ABORT;
572 } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
573 retval = TRANS_COMMIT;
575 /* Send response to the Participant */
576 if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
577 perror("Error sending ctrl message for participant\n");
583 /* This function opens a connection, places an object read request to the
584 * remote machine, reads the control message and object if available and
585 * copies the object and its header to the local cache.
586 * TODO replace mnum and midtoIP() with MACHINE_IP address later */
588 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
590 struct sockaddr_in serv_addr;
591 struct hostent *server;
597 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
598 perror("Error in socket\n");
601 bzero((char*) &serv_addr, sizeof(serv_addr));
602 serv_addr.sin_family = AF_INET;
603 serv_addr.sin_port = htons(LISTEN_PORT);
604 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
605 midtoIP(mnum,machineip);
606 machineip[15] = '\0';
607 serv_addr.sin_addr.s_addr = inet_addr(machineip);
608 /* Open connection */
609 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
610 perror("Error in connect\n");
613 char readrequest[sizeof(char)+sizeof(unsigned int)];
614 readrequest[0] = READ_REQUEST;
615 *((unsigned int *)(&readrequest[1])) = oid;
616 if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
617 perror("Error sending message\n");
622 printf("DEBUG -> ready to rcv ...\n");
624 /* Read response from the Participant */
625 if((val = read(sd, &control, sizeof(char))) <= 0) {
626 perror("No control response for getRemoteObj sent\n");
630 case OBJECT_NOT_FOUND:
631 printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
634 /* Read object if found into local cache */
635 if((val = read(sd, &size, sizeof(int))) <= 0) {
636 perror("No size is read from the participant\n");
639 objcopy = objstrAlloc(record->cache, size);
640 if((val = read(sd, objcopy, size)) <= 0) {
641 perror("No objects are read from the remote participant\n");
644 /* Insert into cache's lookup table */
645 chashInsert(record->lookupTable, oid, objcopy);
648 printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
651 /* Close connection */
656 /*This function handles the local trans requests involved in a transaction commiting process
657 * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
658 * Activates the other nonlocal threads that are waiting for the decision and the
659 * based on common decision by all groups involved in the transaction it
660 * either commits or aborts the transaction.
661 * It also frees the calloced memory resources
664 void *handleLocalReq(void *threadarg) {
667 char control = 0, *ptr;
669 unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
671 objheader_t *headptr;
672 local_thread_data_array_t *localtdata;
674 localtdata = (local_thread_data_array_t *) threadarg;
676 /* Counters and arrays to formulate decision on control message to be sent */
677 oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
678 oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
679 oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
680 int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
681 int objmodnotfound = 0, nummodfound = 0;
683 /* modptr points to the beginning of the object store
684 * created at the Pariticipant */
685 if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
686 printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
692 /* Process each oid in the machine pile/ group per thread */
693 for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
694 if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
695 int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
697 oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
698 incr += sizeof(unsigned int);
699 version = *((short *)(localtdata->tdata->buffer->objread + incr));
700 } else {//Objs modified
701 headptr = (objheader_t *) ptr;
703 oidmod[objmod] = oid;//Array containing modified oids
705 version = headptr->version;
706 ptr += sizeof(objheader_t) + classsize[headptr->type];
709 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
711 /* Save the oids not found and number of oids not found for later use */
712 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
713 /* Save the oids not found and number of oids not found for later use */
715 oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
717 } else { /* If Obj found in machine (i.e. has not moved) */
718 /* Check if Obj is locked by any previous transaction */
719 if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
720 if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */
722 } else {/* If versions don't match ...HARD ABORT */
724 /* Send TRANS_DISAGREE to Coordinator */
725 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
726 printf("DEBUG -> Sending TRANS_DISAGREE\n");
727 //return tdata->recvmsg[tdata->thread_id].rcv_status;
729 } else {/* If Obj is not locked then lock object */
730 ((objheader_t *)mobj)->status |= LOCK;
731 //TODO Remove this for Testing
734 /* Save all object oids that are locked on this machine during this transaction request call */
735 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
737 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
739 } else { /* If versions don't match ...HARD ABORT */
741 /* Send TRANS_DISAGREE to Coordinator */
742 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
743 printf("DEBUG -> Sending TRANS_DISAGREE\n");
744 // return tdata->recvmsg[tdata->thread_id].rcv_status;
750 /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
752 /* Condition to send TRANS_AGREE */
753 if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
754 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
755 printf("DEBUG -> Sending TRANS_AGREE\n");
757 /* Condition to send TRANS_SOFT_ABORT */
758 if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
759 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
760 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
761 /* Send number of oids not found and the missing oids if objects are missing in the machine */
762 /* TODO Remember to store the oidnotfound for later use
763 if(objnotfound != 0) {
764 int size = sizeof(unsigned int)* objnotfound;
769 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
770 * if Participant receives a TRANS_COMMIT */
771 localtdata->transinfo->objmod = oidmod;
772 localtdata->transinfo->objlocked = oidlocked;
773 localtdata->transinfo->objnotfound = oidnotfound;
774 localtdata->transinfo->modptr = modptr;
775 localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
776 localtdata->transinfo->numlocked = objlocked;
777 localtdata->transinfo->numnotfound = objnotfound;
779 /*Set flag to show that common data structure for this individual thread has been written to */
780 //*(tdata->localstatus) |= LM_UPDATED;
782 /* Lock and update count */
783 //Thread sleeps until all messages from pariticipants are received by coordinator
784 pthread_mutex_lock(localtdata->tdata->lock);
785 (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
787 /* Wake up the threads and invoke decideResponse (once) */
788 if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
789 if (decideResponse(localtdata->tdata) != 0) {
790 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
791 pthread_mutex_unlock(localtdata->tdata->lock);
794 pthread_cond_broadcast(localtdata->tdata->threshold);
796 pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
798 pthread_mutex_unlock(localtdata->tdata->lock);
800 /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
801 if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
802 if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
803 printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
806 }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
807 if(transComProcess(localtdata->transinfo) != 0) {
808 printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
814 printf("DEBUG -> Freeing...\n");
816 if (localtdata->transinfo->objmod != NULL) {
817 free(localtdata->transinfo->objmod);
818 localtdata->transinfo->objmod = NULL;
820 if (localtdata->transinfo->objlocked != NULL) {
821 free(localtdata->transinfo->objlocked);
822 localtdata->transinfo->objlocked = NULL;
824 if (localtdata->transinfo->objnotfound != NULL) {
825 free(localtdata->transinfo->objnotfound);
826 localtdata->transinfo->objnotfound = NULL;
831 /* This function completes the ABORT process if the transaction is aborting
833 int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
836 objheader_t *tmp_header;
839 printf("DEBUG -> Recv TRANS_ABORT\n");
840 /* Set all ref counts as 1 and do garbage collection */
842 for(i = 0; i< nummod; i++) {
843 tmp_header = (objheader_t *)ptr;
844 tmp_header->rcount = 1;
845 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
847 /* Unlock objects that was locked due to this transaction */
848 for(i = 0; i< numlocked; i++) {
849 header = mhashSearch(objlocked[i]);// find the header address
850 ((objheader_t *)header)->status &= ~(LOCK);
852 //TODO/* Unset the bit for local objects */
854 /* Send ack to Coordinator */
855 printf("DEBUG-> TRANS_SUCCESSFUL\n");
857 /*Free the pointer */
862 /*This function completes the COMMIT process is the transaction is commiting
864 int transComProcess(trans_commit_data_t *transinfo) {
866 int i = 0, offset = 0;
869 printf("DEBUG -> Recv TRANS_COMMIT\n");
870 /* Process each modified object saved in the mainobject store */
871 for(i=0; i<transinfo->nummod; i++) {
872 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
873 printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
875 /* Change reference count of older address and free space in objstr ?? */
876 header->rcount = 1; //TODO Not sure what would be the val
878 /* Change ptr address in mhash table */
879 mhashRemove(transinfo->objmod[i]);
880 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
881 offset += sizeof(objheader_t) + classsize[header->type];
883 /* Update object version number */
884 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
885 header->version += 1;
888 /* Unlock locked objects */
889 for(i=0; i<transinfo->numlocked; i++) {
890 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
891 header->status &= ~(LOCK);
894 //TODO Update location lookup table
895 //TODO/* Unset the bit for local objects */
897 /* Send ack to Coordinator */
898 printf("DEBUG-> TRANS_SUCESSFUL\n");
902 /* This function is called by the thread calling transPrefetch */
903 void *transPrefetch(void *prefdata) {
904 int *offstarray = NULL;
905 prefetchqelem_t *qnode;
908 /* lock mutex of primary prefetch queue */
909 pthread_mutex_lock(&pqueue.qlock);
910 /* while primary queue is empty, then wait */
911 while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
912 pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
915 /* dequeue node to create a machine piles and finally unlock mutex */
916 if((qnode = dequeue()) == NULL) {
917 printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
920 pthread_mutex_unlock(&pqueue.qlock);
921 /* Reduce redundant prefetch requests */
922 /* Group Requests by where objects are located */
929 /* This function checks if the prefetch oids are same and have same offsets
930 * for case x.a.b and y.a.b where x and y have same oid's
931 * or if a.b.c is a subset of x.b.c.d*/
932 /* check for case where the generated request a.y.z or x.y.z.g then
933 * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/
934 void checkPrefetchTuples(prefetchqelem_t *node) {
935 int i,j, count,k, sindex, index;
937 int ntuples, slength;
939 short *endoffsets, *arryfields;
941 /* Check for the case x.y.z and a.b.c are same oids */
943 ntuples = *(GET_NTUPLES(ptr));
944 oid = GET_PTR_OID(ptr);
945 endoffsets = GET_PTR_EOFF(ptr, ntuples);
946 arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
947 /* Find offset length for each tuple */
948 int numoffset[ntuples];
949 numoffset[0] = endoffsets[0];
950 for(i = 1; i<ntuples; i++) {
951 numoffset[i] = endoffsets[i] - endoffsets[i-1];
953 /* Check for redundant tuples by comparing oids of each tuple */
954 for(i = 0; i < ntuples; i++) {
957 for(j = i+1 ; j < ntuples; j++) {
960 /*If oids of tuples match */
961 if (oid[i] == oid[j]) {
962 /* Find the smallest offset length of two tuples*/
963 if(numoffset[i] > numoffset[j]){
964 slength = numoffset[j];
968 slength = numoffset[i];
972 /* Compare the offset values based on the current indices
973 * break if they do not match
974 * if all offset values match then pick the largest tuple*/
978 index = endoffsets[j -1];
979 for(count = 0; count < slength; count ++) {
980 if (arryfields[k] != arryfields[index]) {
987 printf("i = %d, j = %d\n", i, j);
989 index = endoffsets[j-1];
990 printf("Value of slength = %d\n", slength);
991 for(count = 0; count < slength; count++) {
992 printf("Value of count =%d\n", count);
993 if(arryfields[k] != arryfields[index]) {
999 printf("Value of count =%d\n", count);
1001 printf("The value of sindex = %d\n", sindex);
1003 if(slength == count) {
1004 printf("DEBUG-> Inside slength if %d\n", sindex);
1012 void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) {
1014 int ntuples, i, k, flag;
1016 short *endoffsets, *arryfields;
1017 objheader_t *header;
1019 ptr = (char *) node;
1020 ntuples = *(GET_NTUPLES(ptr));
1021 oid = GET_PTR_OID(ptr);
1022 endoffsets = GET_PTR_EOFF(ptr, ntuples);
1023 arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1025 if(oidnfound == 1) {
1026 if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
1028 } else { //Found in Prefetch Cache
1029 //TODO Decide if object is too old, if old remove from cache
1030 tmp = (char *) header;
1031 /* Check if any of the offset oid is available in the Prefetch cache */
1032 for(i = counter; i < loopcount; i++) {
1033 objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]);
1034 if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1043 for(i = counter; i<loopcount; i++) {
1044 if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1045 tmp = (char *) header;
1046 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1056 /* If oid not found locally or in prefetch cache then
1057 * assign the latest oid found as the new oid
1058 * and copy left over offsets into the arrayoffsetfieldarray*/
1060 numoffset[iter] = numoffset[iter] - (i+1);
1062 endoffsets[iter] = numoffset[iter];
1064 endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1];
1065 for(k = 0; k < numoffset[iter] ; k++) {
1066 arryfields[k] = arryfields[counter+1];
1072 numoffset[iter] = 0;
1073 endoffsets[iter] = 0;
1077 /* This function checks if the oids within the prefetch tuples are available locally.
1078 * If yes then makes the tuple invalid. If no then rearranges oid and offset values in
1079 * the prefetchqelem_t node to represent a new prefetch tuple */
1080 void foundLocal(prefetchqelem_t *node) {
1081 int ntuples,i, j, k, oidnfound = 0, index, flag;
1083 unsigned int objoid;
1085 objheader_t *objheader;
1086 short *endoffsets, *arryfields;
1088 ptr = (char *) node;
1089 ntuples = *(GET_NTUPLES(ptr));
1090 oid = GET_PTR_OID(ptr);
1091 endoffsets = GET_PTR_EOFF(ptr, ntuples);
1092 arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1093 /* Find offset length for each tuple */
1094 int numoffset[ntuples];//Number of offsets for each tuple
1095 numoffset[0] = endoffsets[0];
1096 for(i = 1; i<ntuples; i++) {
1097 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1099 for(i = 0; i < ntuples; i++) {
1102 /* If object found locally */
1103 if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) {
1105 tmp = (char *) objheader;
1106 /* Find the oid of its offset value */
1110 index = endoffsets[i - 1];
1111 for(j = 0 ; j < numoffset[i] ; j++) {
1112 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1114 /*New offset oid not found */
1115 if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
1117 checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound);
1122 /*If oid not found locally then
1123 *assign the latest oid found as the new oid
1124 *and copy left over offsets into the arrayoffsetfieldarray*/
1126 numoffset[i] = numoffset[i] - (j+1);
1128 endoffsets[i] = numoffset[i];
1130 endoffsets[i] = numoffset[i] - endoffsets[i - 1];
1131 for(k = 0; k < numoffset[i]; k++) {
1132 arryfields[k] = arryfields[j+1];
1135 /*If all offset oids are found locally,make the prefetch tuple invalid */
1143 /* Look in Prefetch cache */
1144 checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound);
1150 void makePreGroups(prefetchqelem_t *node) {
1152 int ntuples, slength, i, machinenum;
1154 short *endoffsets, *arryfields;
1157 /* Check for the case x.y.z and a.b.c are same oids */
1158 ptr = (char *) node;
1159 ntuples = *(GET_NTUPLES(ptr));
1160 oid = GET_PTR_OID(ptr);
1161 endoffsets = GET_PTR_EOFF(ptr, ntuples);
1162 arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1163 /* Find offset length for each tuple */
1164 int numoffset[ntuples];
1165 numoffset[0] = endoffsets[0];
1166 for(i = 1; i<ntuples; i++) {
1167 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1170 /* Check for redundant tuples by comparing oids of each tuple */
1171 for(i = 0; i < ntuples; i++) {
1174 /* For each tuple make piles */
1175 if ((machinenum = lhashSearch(oid[i])) == 0) {
1176 printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
1183 /*This function is called by the thread that processes the
1184 * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
1185 int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
1187 int arraylength[numoids];
1188 unsigned int machinenumber;
1189 objheader_t *tmp, *objheader;
1192 pthread_attr_t attr;
1194 /* Given tuple find length of tuple*/
1195 for(i = 0; i < numoids ; i++) {
1196 arraylength[i] = arrayLength(arrayofoffset[i]);
1199 /* Initialize and set thread attributes
1200 * Spawn a thread for each prefetch request sent*/
1201 pthread_t thread[numoids];
1202 pthread_attr_init(&attr);
1203 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1205 /* Create Machine Piles to send prefetch requests use threads*/
1206 for( i = 0 ; i< numoids ; i++) {
1207 if(arrayofoffset[i][0] == -1)
1210 /* For each Pile in the machine send TRANS_PREFETCH */
1211 //makePiles(arrayofoffset, numoids);
1212 /* Fill thread data structure */
1213 rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
1215 perror("Error in pthread create at transPrefetchProcess()\n");
1222 /* Free attribute and wait to join other threads */
1223 for (i = 0 ;i < numoids ; i++) {
1224 rc = pthread_join(thread[i], NULL);
1226 perror("Error pthread_join() in transPrefetchProcess()\n");
1230 pthread_attr_destroy(&attr);
1236 void *sendPrefetchReq(void *prefetchtuple) {
1238 struct sockaddr_in serv_addr;
1239 struct hostent *server;
1240 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
1241 char machineip[16], retval;
1244 /* Send Trans Prefetch Request */
1245 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1246 perror("Error in socket for TRANS_REQUEST\n");
1249 bzero((char*) &serv_addr, sizeof(serv_addr));
1250 serv_addr.sin_family = AF_INET;
1251 serv_addr.sin_port = htons(LISTEN_PORT);
1252 //midtoIP(tdata->mid,machineip);
1253 // machineip[15] = '\0';
1254 // serv_addr.sin_addr.s_addr = inet_addr(machineip);
1256 /* Open Connection */
1257 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
1258 perror("Error in connect for TRANS_REQUEST\n");