sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
tosend->f.mcount = pilecount;
tosend->f.numread = pile->numread;
- printf("DEBUG-> pile numread = %d\n", pile->numread);
tosend->f.nummod = pile->nummod;
tosend->f.sum_bytes = pile->sum_bytes;
tosend->listmid = listmid;
} else {
/*Unset the pile->local flag*/
pile->local = 0;
- //header->status &= ~(LOCK);
- /*Handle request of local pile */
/*Set flag to identify that Local machine is involved*/
ltdata->tdata = &thread_data_array[threadnum];
- printf("DEBUG->Address of ltdata sent = %x\n", <data);
ltdata->transinfo = &transinfo;
- printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread);
- val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) <data);
+ val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
if (val) {
perror("Error in pthread create\n");
return 1;
(*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
/* Wake up the threads and invoke decideResponse (once) */
-/*
- if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction
- if(*(tdata->count) == tdata->pilecount - 1) {
- while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) {
- ;//Do nothing and wait until Local machine thread updates the common data structure
- }
- if(decideResponse(tdata) != 0) {
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(tdata->lock);
- return NULL;
- }
- pthread_cond_broadcast(tdata->threshold);
- }
- } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction
- if(*(tdata->count) == tdata->pilecount) {
- if (decideResponse(tdata) != 0) {
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(tdata->lock);
- close(sd);
- return NULL;
- }
- pthread_cond_broadcast(tdata->threshold);
- } else {
- pthread_cond_wait(tdata->threshold, tdata->lock);
- }
- }
-*/
-
if(*(tdata->count) == tdata->pilecount) {
if (decideResponse(tdata) != 0) {
printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
* It also frees the calloced memory resources
*/
-//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) {
void *handleLocalReq(void *threadarg) {
int val, i = 0;
short version;
local_thread_data_array_t *localtdata;
localtdata = (local_thread_data_array_t *) threadarg;
- printf("DEBUG->Address of localtdata = %x\n", localtdata);
- printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread);
/* Counters and arrays to formulate decision on control message to be sent */
- printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod);
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
/*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
- if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) {
+ if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
return NULL;
}
}
/* This function completes the ABORT process if the transaction is aborting
*/
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
char *ptr;
int i;
objheader_t *tmp_header;
header = mhashSearch(objlocked[i]);// find the header address
((objheader_t *)header)->status &= ~(LOCK);
}
+ //TODO/* Unset the bit for local objects */
/* Send ack to Coordinator */
printf("DEBUG-> TRANS_SUCCESSFUL\n");
header->rcount = 1; //TODO Not sure what would be the val
/* Change ptr address in mhash table */
- printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
mhashRemove(transinfo->objmod[i]);
mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
offset += sizeof(objheader_t) + classsize[header->type];
}
//TODO Update location lookup table
+ //TODO/* Unset the bit for local objects */
/* Send ack to Coordinator */
printf("DEBUG-> TRANS_SUCESSFUL\n");