#define RECEIVE_BUFFER_SIZE 2048
extern int classsize[];
+objstr_t *mainobjstore;
plistnode_t *createPiles(transrecord_t *);
+
/* This functions inserts randowm wait delays in the order of msec */
void randomdelay(void)
{
void *buf;
/* Search local cache */
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+ //LOCAL Object
+ objheader->status |= LOCAL;
//printf("DEBUG -> transRead oid %d found local\n", oid);
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
size = sizeof(objheader_t)+classsize[tmp->type];
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
+ //LOCAL Object
+ ((objheader_t *) objcopy)->status |= LOCAL;
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, objheader->oid, objcopy);
return(objcopy);
chashInsert(record->lookupTable, tmp->oid, tmp);
return tmp;
}
-
+/* This function creates machine piles based on all machines involved in a
+ * transaction commit request */
plistnode_t *createPiles(transrecord_t *record) {
int i = 0;
unsigned int size;/* Represents number of bins in the chash table */
printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
return NULL;
}
-
+
if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
return NULL;
printf("pInsert error %s, %d\n", __FILE__, __LINE__);
return NULL;
}
+ /* Check if local */
+ if((headeraddr->status & LOCAL) == LOCAL) {
+ pile->local = 1; //True i.e. local
+ }
curr = next;
}
}
}
/* This function initiates the transaction commit process
* Spawns threads for each of the new connections with Participants
- * by creating new piles,
+ * and creates new piles by calling the createPiles(),
* Fills the piles with necesaary information and
* Sends a transrequest() to each pile*/
int transCommit(transrecord_t *record) {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile;
- int i, rc;
- int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
+ int i, rc, val;
+ int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0;
char buffer[RECEIVE_BUFFER_SIZE],control;
char transid[TID_LEN];
trans_req_data_t *tosend;
+ trans_commit_data_t transinfo;
static int newtid = 0;
char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
+ char localstat = 0;
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
thread_data_array_t *thread_data_array;
thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
+ local_thread_data_array_t *ltdata;
+ if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
/* Initialize and set thread detach attribute */
sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
tosend->f.mcount = pilecount;
tosend->f.numread = pile->numread;
+ printf("DEBUG-> pile numread = %d\n", pile->numread);
tosend->f.nummod = pile->nummod;
tosend->f.sum_bytes = pile->sum_bytes;
tosend->listmid = listmid;
tosend->objread = pile->objread;
tosend->oidmod = pile->oidmod;
- thread_data_array[numthreads].thread_id = numthreads;
- thread_data_array[numthreads].mid = pile->mid;
- thread_data_array[numthreads].pilecount = pilecount;
- thread_data_array[numthreads].buffer = tosend;
- thread_data_array[numthreads].recvmsg = rcvd_control_msg;
- thread_data_array[numthreads].threshold = &tcond;
- thread_data_array[numthreads].lock = &tlock;
- thread_data_array[numthreads].count = &trecvcount;
- thread_data_array[numthreads].replyctrl = &treplyctrl;
- thread_data_array[numthreads].replyretry = &treplyretry;
- thread_data_array[numthreads].rec = record;
-
- rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
- if (rc) {
- perror("Error in pthread create");
- return 1;
- }
- numthreads++;
- //TODO frees
+ thread_data_array[threadnum].thread_id = threadnum;
+ thread_data_array[threadnum].mid = pile->mid;
+ thread_data_array[threadnum].pilecount = pilecount;
+ thread_data_array[threadnum].buffer = tosend;
+ thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+ thread_data_array[threadnum].threshold = &tcond;
+ thread_data_array[threadnum].lock = &tlock;
+ thread_data_array[threadnum].count = &trecvcount;
+ //thread_data_array[threadnum].localstatus = &localstat;
+ thread_data_array[threadnum].replyctrl = &treplyctrl;
+ thread_data_array[threadnum].replyretry = &treplyretry;
+ thread_data_array[threadnum].rec = record;
+ /* If local do not create any extra connection */
+ if(pile->local != 1) {
+ rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
+ if (rc) {
+ perror("Error in pthread create\n");
+ return 1;
+ }
+ } else {
+ /*Unset the pile->local flag*/
+ pile->local = 0;
+ //header->status &= ~(LOCK);
+ /*Handle request of local pile */
+ /*Set flag to identify that Local machine is involved*/
+ ltdata->tdata = &thread_data_array[threadnum];
+ printf("DEBUG->Address of ltdata sent = %x\n", <data);
+ ltdata->transinfo = &transinfo;
+ printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread);
+ val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) <data);
+ if (val) {
+ perror("Error in pthread create\n");
+ return 1;
+ }
+ }
+ threadnum++;
pile = pile->next;
}
free(tosend);
free(listmid);
pDelete(pile);
+ free(thread_data_array);
+ free(ltdata);
/* Retry trans commit procedure if not sucessful in the first try */
if(treplyretry == 1) {
(*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
/* Wake up the threads and invoke decideResponse (once) */
+/*
+ if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction
+ if(*(tdata->count) == tdata->pilecount - 1) {
+ while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) {
+ ;//Do nothing and wait until Local machine thread updates the common data structure
+ }
+ if(decideResponse(tdata) != 0) {
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+ return NULL;
+ }
+ pthread_cond_broadcast(tdata->threshold);
+ }
+ } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction
+ if(*(tdata->count) == tdata->pilecount) {
+ if (decideResponse(tdata) != 0) {
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+ close(sd);
+ return NULL;
+ }
+ pthread_cond_broadcast(tdata->threshold);
+ } else {
+ pthread_cond_wait(tdata->threshold, tdata->lock);
+ }
+ }
+*/
+
if(*(tdata->count) == tdata->pilecount) {
if (decideResponse(tdata) != 0) {
printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_cond_broadcast(tdata->threshold);
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
- }
-
+ }
pthread_mutex_unlock(tdata->lock);
/* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
/* If the decided response is TRANS_ABORT */
if(*(tdata->replyctrl) == TRANS_ABORT) {
retval = TRANS_ABORT;
- }
- /* If the decided response is TRANS_COMMIT */
- if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
retval = TRANS_COMMIT;
}
/* Send response to the Participant */
close(sd);
return objcopy;
}
+
+/*This function handles the local trans requests involved in a transaction commiting process
+ * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
+ * Activates the other nonlocal threads that are waiting for the decision and the
+ * based on common decision by all groups involved in the transaction it
+ * either commits or aborts the transaction.
+ * It also frees the calloced memory resources
+ */
+
+//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) {
+void *handleLocalReq(void *threadarg) {
+ int val, i = 0;
+ short version;
+ char control = 0, *ptr;
+ unsigned int oid;
+ unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
+ void *mobj, *modptr;
+ objheader_t *headptr;
+ local_thread_data_array_t *localtdata;
+
+ localtdata = (local_thread_data_array_t *) threadarg;
+ printf("DEBUG->Address of localtdata = %x\n", localtdata);
+
+ printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread);
+ /* Counters and arrays to formulate decision on control message to be sent */
+ printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod);
+ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+ oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
+ int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+ int objmodnotfound = 0, nummodfound = 0;
+
+ /* modptr points to the beginning of the object store
+ * created at the Pariticipant */
+ if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
+ printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
+ ptr = modptr;
+
+ /* Process each oid in the machine pile/ group per thread */
+ for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
+ if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
+ int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
+ incr += sizeof(unsigned int);
+ version = *((short *)(localtdata->tdata->buffer->objread + incr));
+ } else {//Objs modified
+ headptr = (objheader_t *) ptr;
+ oid = headptr->oid;
+ oidmod[objmod] = oid;//Array containing modified oids
+ objmod++;
+ version = headptr->version;
+ ptr += sizeof(objheader_t) + classsize[headptr->type];
+ }
+
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+
+ oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+ objnotfound++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
+ if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */
+ v_matchlock++;
+ } else {/* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ //return tdata->recvmsg[tdata->thread_id].rcv_status;
+ }
+ } else {/* If Obj is not locked then lock object */
+ ((objheader_t *)mobj)->status |= LOCK;
+ //TODO Remove this for Testing
+ randomdelay();
+
+ /* Save all object oids that are locked on this machine during this transaction request call */
+ oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+ objlocked++;
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ v_matchnolock++;
+ } else { /* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ // return tdata->recvmsg[tdata->thread_id].rcv_status;
+ }
+ }
+ }
+ }
+
+ /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
+
+ /* Condition to send TRANS_AGREE */
+ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
+ printf("DEBUG -> Sending TRANS_AGREE\n");
+ }
+ /* Condition to send TRANS_SOFT_ABORT */
+ if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
+ printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+ /* Send number of oids not found and the missing oids if objects are missing in the machine */
+ /* TODO Remember to store the oidnotfound for later use
+ if(objnotfound != 0) {
+ int size = sizeof(unsigned int)* objnotfound;
+ }
+ */
+ }
+
+ /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+ * if Participant receives a TRANS_COMMIT */
+ localtdata->transinfo->objmod = oidmod;
+ localtdata->transinfo->objlocked = oidlocked;
+ localtdata->transinfo->objnotfound = oidnotfound;
+ localtdata->transinfo->modptr = modptr;
+ localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
+ localtdata->transinfo->numlocked = objlocked;
+ localtdata->transinfo->numnotfound = objnotfound;
+
+ /*Set flag to show that common data structure for this individual thread has been written to */
+ //*(tdata->localstatus) |= LM_UPDATED;
+
+ /* Lock and update count */
+ //Thread sleeps until all messages from pariticipants are received by coordinator
+ pthread_mutex_lock(localtdata->tdata->lock);
+ (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
+
+ /* Wake up the threads and invoke decideResponse (once) */
+ if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
+ if (decideResponse(localtdata->tdata) != 0) {
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(localtdata->tdata->lock);
+ return NULL;
+ }
+ pthread_cond_broadcast(localtdata->tdata->threshold);
+ } else {
+ pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
+ }
+ pthread_mutex_unlock(localtdata->tdata->lock);
+
+ /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
+ if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
+ if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) {
+ printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
+ if(transComProcess(localtdata->transinfo) != 0) {
+ printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ }
+
+ /* Free memory */
+ printf("DEBUG -> Freeing...\n");
+ fflush(stdout);
+ if (localtdata->transinfo->objmod != NULL) {
+ free(localtdata->transinfo->objmod);
+ localtdata->transinfo->objmod = NULL;
+ }
+ if (localtdata->transinfo->objlocked != NULL) {
+ free(localtdata->transinfo->objlocked);
+ localtdata->transinfo->objlocked = NULL;
+ }
+ if (localtdata->transinfo->objnotfound != NULL) {
+ free(localtdata->transinfo->objnotfound);
+ localtdata->transinfo->objnotfound = NULL;
+ }
+
+ pthread_exit(NULL);
+}
+/* This function completes the ABORT process if the transaction is aborting
+ */
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
+ char *ptr;
+ int i;
+ objheader_t *tmp_header;
+ void *header;
+
+ printf("DEBUG -> Recv TRANS_ABORT\n");
+ /* Set all ref counts as 1 and do garbage collection */
+ ptr = modptr;
+ for(i = 0; i< nummod; i++) {
+ tmp_header = (objheader_t *)ptr;
+ tmp_header->rcount = 1;
+ ptr += sizeof(objheader_t) + classsize[tmp_header->type];
+ }
+ /* Unlock objects that was locked due to this transaction */
+ for(i = 0; i< numlocked; i++) {
+ header = mhashSearch(objlocked[i]);// find the header address
+ ((objheader_t *)header)->status &= ~(LOCK);
+ }
+
+ /* Send ack to Coordinator */
+ printf("DEBUG-> TRANS_SUCCESSFUL\n");
+
+ /*Free the pointer */
+ ptr = NULL;
+ return 0;
+}
+
+/*This function completes the COMMIT process is the transaction is commiting
+ */
+ int transComProcess(trans_commit_data_t *transinfo) {
+ objheader_t *header;
+ int i = 0, offset = 0;
+ char control;
+
+ printf("DEBUG -> Recv TRANS_COMMIT\n");
+ /* Process each modified object saved in the mainobject store */
+ for(i=0; i<transinfo->nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ }
+ /* Change reference count of older address and free space in objstr ?? */
+ header->rcount = 1; //TODO Not sure what would be the val
+
+ /* Change ptr address in mhash table */
+ printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
+ mhashRemove(transinfo->objmod[i]);
+ mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+ offset += sizeof(objheader_t) + classsize[header->type];
+
+ /* Update object version number */
+ header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+ header->version += 1;
+ }
+
+ /* Unlock locked objects */
+ for(i=0; i<transinfo->numlocked; i++) {
+ header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+ header->status &= ~(LOCK);
+ }
+
+ //TODO Update location lookup table
+
+ /* Send ack to Coordinator */
+ printf("DEBUG-> TRANS_SUCESSFUL\n");
+ return 0;
+ }
+
+/*This function makes piles to prefetch records and prefetches the oids from remote machines */
+int transPrefetch(transrecord_t *record, trans_prefetchtuple_t *prefetchtuple){
+ /* Create Pile*/
+ /* For each Pile in the machine send TRANS_PREFETCH */
+}