void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
+plistnode_t *sortPiles(plistnode_t *pileptr);
/*******************************
* Send and Recv function calls
int transCommit(transrecord_t *record) {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
- int i, j, rc, val;
- int pilecount, offset, threadnum, trecvcount;
- char control;
- char transid[TID_LEN];
- trans_req_data_t *tosend;
- trans_commit_data_t transinfo;
- static int newtid = 0;
- char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
- thread_data_array_t *thread_data_array;
- local_thread_data_array_t *ltdata;
+ int trecvcount;
+ char treplyretry; /* keeps track of the common response that needs to be sent */
int firsttime=1;
-
+ trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
+ char finalResponse;
do {
- treplyctrl=0;
trecvcount = 0;
- threadnum = 0;
treplyretry = 0;
- thread_data_array = NULL;
- ltdata = NULL;
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
- if (firsttime)
+ if (firsttime) {
pile_ptr = pile = createPiles(record);
- else
- pile=pile_ptr;
- firsttime=0;
+ pile_ptr = pile = sortPiles(pile);
+ } else {
+ pile = pile_ptr;
+ }
+ firsttime = 0;
/* Create the packet to be sent in TRANS_REQUEST */
/* Count the number of participants */
+ int pilecount;
pilecount = pCount(pile);
- /* Create a list of machine ids(Participants) involved in transaction */
+ /* Create a list of machine ids(Participants) involved in transaction */
listmid = calloc(pilecount, sizeof(unsigned int));
pListMid(pile, listmid);
-
- /* Initialize thread variables,
- * Spawn a thread for each Participant involved in a transaction */
- pthread_t thread[pilecount];
- pthread_attr_t attr;
- pthread_cond_t tcond;
- pthread_mutex_t tlock;
- pthread_mutex_t tlshrd;
-
- thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
- ltdata = calloc(1, sizeof(local_thread_data_array_t));
-
- thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
-
- /* Initialize and set thread detach attribute */
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_mutex_init(&tlock, NULL);
- pthread_cond_init(&tcond, NULL);
+ /* Create a socket and getReplyCtrl array, initialize */
+ int socklist[pilecount];
+ int loopcount;
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ socklist[loopcount] = 0;
+ char getReplyCtrl[pilecount];
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ getReplyCtrl[loopcount] = 0;
/* Process each machine pile */
+ int sockindex = 0;
+ trans_req_data_t *tosend;
+ tosend = calloc(pilecount, sizeof(trans_req_data_t));
while(pile != NULL) {
- //Create transaction id
- newtid++;
- tosend = calloc(1, sizeof(trans_req_data_t));
- tosend->f.control = TRANS_REQUEST;
- sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
- tosend->f.mcount = pilecount;
- tosend->f.numread = pile->numread;
- tosend->f.nummod = pile->nummod;
- tosend->f.numcreated = pile->numcreated;
- tosend->f.sum_bytes = pile->sum_bytes;
- tosend->listmid = listmid;
- tosend->objread = pile->objread;
- tosend->oidmod = pile->oidmod;
- tosend->oidcreated = pile->oidcreated;
- thread_data_array[threadnum].thread_id = threadnum;
- thread_data_array[threadnum].mid = pile->mid;
- thread_data_array[threadnum].buffer = tosend;
- thread_data_array[threadnum].recvmsg = rcvd_control_msg;
- thread_data_array[threadnum].threshold = &tcond;
- thread_data_array[threadnum].lock = &tlock;
- thread_data_array[threadnum].count = &trecvcount;
- thread_data_array[threadnum].replyctrl = &treplyctrl;
- thread_data_array[threadnum].replyretry = &treplyretry;
- thread_data_array[threadnum].rec = record;
- /* If local do not create any extra connection */
- if(pile->mid != myIpAddr) { /* Not local */
- do {
- rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
- } while(rc!=0);
- if(rc) {
- perror("Error in pthread create\n");
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
+ tosend[sockindex].f.control = TRANS_REQUEST;
+ tosend[sockindex].f.mcount = pilecount;
+ tosend[sockindex].f.numread = pile->numread;
+ tosend[sockindex].f.nummod = pile->nummod;
+ tosend[sockindex].f.numcreated = pile->numcreated;
+ tosend[sockindex].f.sum_bytes = pile->sum_bytes;
+ tosend[sockindex].listmid = listmid;
+ tosend[sockindex].objread = pile->objread;
+ tosend[sockindex].oidmod = pile->oidmod;
+ tosend[sockindex].oidcreated = pile->oidcreated;
+ int sd = 0;
+ if(pile->mid != myIpAddr) {
+ if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
+ printf("transRequest(): socket create error\n");
free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
+ free(tosend);
return 1;
}
- } else { /*Local*/
- ltdata->tdata = &thread_data_array[threadnum];
- ltdata->transinfo = &transinfo;
- do {
- val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
- } while(val!=0);
- if(val) {
- perror("Error in pthread create\n");
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
+ socklist[sockindex] = sd;
+ /* Send bytes of data with TRANS_REQUEST control message */
+ send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+
+ /* Send list of machines involved in the transaction */
+ {
+ int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
+ send_data(sd, tosend[sockindex].listmid, size);
+ }
+
+ /* Send oids and version number tuples for objects that are read */
+ {
+ int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
+ send_data(sd, tosend[sockindex].objread, size);
+ }
+
+ /* Send objects that are modified */
+ void *modptr;
+ if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
+ printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
+ free(tosend);
return 1;
}
+ int offset = 0;
+ int i;
+ for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
+ int size;
+ objheader_t *headeraddr;
+ if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) {
+ printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+ free(modptr);
+ free(listmid);
+ free(tosend);
+ return 1;
+ }
+ GETSIZE(size,headeraddr);
+ size+=sizeof(objheader_t);
+ memcpy(modptr+offset, headeraddr, size);
+ offset+=size;
+ }
+ send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ free(modptr);
+ } else { //handle request locally
+ handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]);
}
-
- threadnum++;
+ sockindex++;
pile = pile->next;
+ } //end of pile processing
+ /* Recv Ctrl msgs from all machines */
+ int i;
+ for(i = 0; i < pilecount; i++) {
+ int sd = socklist[i];
+ if(sd != 0) {
+ char control;
+ recv_data(sd, &control, sizeof(char));
+ //Update common data structure with new ctrl msg
+ getReplyCtrl[i] = control;
+ /* Recv Objects if participant sends TRANS_DISAGREE */
+#ifdef CACHE
+ if(control == TRANS_DISAGREE) {
+ int length;
+ recv_data(sd, &length, sizeof(int));
+ void *newAddr;
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, newAddr, length);
+ int offset = 0;
+ while(length != 0) {
+ unsigned int oidToPrefetch;
+ objheader_t * header;
+ header = (objheader_t *)(((char *)newAddr) + offset);
+ oidToPrefetch = OID(header);
+ int size = 0;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ //make an entry in prefetch hash table
+ void *oldptr;
+ if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+ prehashRemove(oidToPrefetch);
+ prehashInsert(oidToPrefetch, header);
+ } else {
+ prehashInsert(oidToPrefetch, header);
+ }
+ length = length - size;
+ offset += size;
+ }
+ } //end of receiving objs
+#endif
+ }
+ }
+ /* Decide the final response */
+ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
}
- /* Free attribute and wait for the other threads */
- pthread_attr_destroy(&attr);
- for (i = 0; i < threadnum; i++) {
- rc = pthread_join(thread[i], NULL);
- if(rc) {
- printf("Error: return code from pthread_join() is %d\n", rc);
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- for (j = i; j < threadnum; j++) {
- free(thread_data_array[j].buffer);
+ /* Send responses to all machines */
+ for(i = 0; i < pilecount; i++) {
+ int sd = socklist[i];
+ if(sd != 0) {
+#ifdef CACHE
+ if(finalResponse == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ /* Invalidate objects in other machine cache */
+ if(tosend[i].f.nummod > 0) {
+ if((retval = invalidateObj(&(tosend[i]))) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ }
}
- return 1;
+#endif
+ send_data(sd, &finalResponse, sizeof(char));
+ } else {
+ /* Complete local processing */
+ doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
}
- free(thread_data_array[i].buffer);
}
+
/* Free resources */
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
+ free(tosend);
free(listmid);
-
if (!treplyretry)
pDelete(pile_ptr);
-
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
- free(thread_data_array);
- free(ltdata);
randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
#endif
}
-
/* Retry trans commit procedure during soft_abort case */
} while (treplyretry);
- if(treplyctrl == TRANS_ABORT) {
+ if(finalResponse == TRANS_ABORT) {
+ //printf("Aborting trans\n");
#ifdef TRANSSTATS
numTransAbort++;
#endif
objstrDelete(record->cache);
chashDelete(record->lookupTable);
free(record);
- free(thread_data_array);
- free(ltdata);
return TRANS_ABORT;
- } else if(treplyctrl == TRANS_COMMIT) {
+ } else if(finalResponse == TRANS_COMMIT) {
#ifdef TRANSSTATS
numTransCommit++;
#endif
objstrDelete(record->cache);
chashDelete(record->lookupTable);
free(record);
- free(thread_data_array);
- free(ltdata);
return 0;
} else {
//TODO Add other cases
return 0;
}
-/* This function sends information involved in the transaction request
- * to participants and accepts a response from particpants.
- * It calls decideresponse() to decide on what control message
- * to send next to participants and sends the message using sendResponse()*/
-void *transRequest(void *threadarg) {
- int sd, i, n;
- struct sockaddr_in serv_addr;
- thread_data_array_t *tdata;
- objheader_t *headeraddr;
- char control, recvcontrol;
- char machineip[16], retval;
-
- tdata = (thread_data_array_t *) threadarg;
- if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
- printf("transRequest(): socket create error\n");
- pthread_exit(NULL);
- }
-
- /* Send bytes of data with TRANS_REQUEST control message */
- send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
-
- /* Send list of machines involved in the transaction */
- {
- int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
- send_data(sd, tdata->buffer->listmid, size);
- }
-
- /* Send oids and version number tuples for objects that are read */
- {
- int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
- send_data(sd, tdata->buffer->objread, size);
- }
-
- /* Send objects that are modified */
- void *modptr;
- if((modptr = calloc(1, tdata->buffer->f.sum_bytes)) == NULL) {
- printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
- }
- int offset = 0;
- for(i = 0; i < tdata->buffer->f.nummod ; i++) {
- int size;
- if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
- printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
- }
- GETSIZE(size,headeraddr);
- size+=sizeof(objheader_t);
- memcpy(modptr+offset, headeraddr, size);
- offset+=size;
- }
- send_data(sd, modptr, tdata->buffer->f.sum_bytes);
- free(modptr);
- /* Read control message from Participant */
- recv_data(sd, &control, sizeof(char));
+/* This function handles the local objects involved in a transaction
+ * commiting process. It also makes a decision if this local machine
+ * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
+void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) {
+ unsigned int *oidnotfound = NULL, *oidlocked = NULL;
+ int numoidnotfound = 0, numoidlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+ int numread, i;
+ unsigned int oid;
+ unsigned short version;
- /* Recv Objects if participant sends TRANS_DISAGREE */
-#ifdef CACHE
- if(control == TRANS_DISAGREE) {
- int length;
- recv_data(sd, &length, sizeof(int));
- void *newAddr;
- pthread_mutex_lock(&prefetchcache_mutex);
- if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
- printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- pthread_exit(NULL);
- }
- pthread_mutex_unlock(&prefetchcache_mutex);
- recv_data(sd, newAddr, length);
- int offset = 0;
- while(length != 0) {
- unsigned int oidToPrefetch;
- objheader_t * header;
- header = (objheader_t *)(((char *)newAddr) + offset);
- oidToPrefetch = OID(header);
- int size = 0;
- GETSIZE(size, header);
- size += sizeof(objheader_t);
- //make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
- prehashRemove(oidToPrefetch);
- prehashInsert(oidToPrefetch, header);
- } else {
- prehashInsert(oidToPrefetch, header);
+ /* Counters and arrays to formulate decision on control message to be sent */
+ oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
+ //setting a divider between read and write locks
+ numread = tdata->f.numread;
+ /* Process each oid in the machine pile/ group per thread */
+ for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
+ if (i < tdata->f.numread) {
+ int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(((char *)tdata->objread) + incr));
+ version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
+ commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
+ } else { // Objects Modified
+ if(i == tdata->f.numread) {
+ oidlocked[numoidlocked] = -1;
+ numoidlocked++;
+ }
+ int tmpsize;
+ objheader_t *headptr;
+ headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]);
+ if (headptr == NULL) {
+ printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+ return;
}
- length = length - size;
- offset += size;
+ oid = OID(headptr);
+ version = headptr->version;
+ commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
}
}
-#endif
- recvcontrol = control;
- /* Update common data structure and increment count */
- tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
-
- /* Lock and update count */
- /* Thread sleeps until all messages from pariticipants are received by coordinator */
- pthread_mutex_lock(tdata->lock);
- (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
+/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+ * if Participant receives a TRANS_COMMIT */
+ transinfo->objlocked = oidlocked;
+ transinfo->objnotfound = oidnotfound;
+ transinfo->modptr = NULL;
+ transinfo->numlocked = numoidlocked;
+ transinfo->numnotfound = numoidnotfound;
- /* Wake up the threads and invoke decideResponse (once) */
- if(*(tdata->count) == tdata->buffer->f.mcount) {
- decideResponse(tdata);
- pthread_cond_broadcast(tdata->threshold);
- } else {
- pthread_cond_wait(tdata->threshold, tdata->lock);
+ /* Condition to send TRANS_AGREE */
+ if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
+ *getReplyCtrl = TRANS_AGREE;
}
- pthread_mutex_unlock(tdata->lock);
+ /* Condition to send TRANS_SOFT_ABORT */
+ if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+ *getReplyCtrl = TRANS_SOFT_ABORT;
+ }
+}
-#ifdef CACHE
- if(*(tdata->replyctrl) == TRANS_COMMIT) {
- int retval;
- /* Update prefetch cache */
- if((retval = updatePrefetchCache(tdata)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) {
+ if(finalResponse == TRANS_ABORT) {
+ if(transAbortProcess(transinfo) != 0) {
+ printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
return;
}
-
+ } else if(finalResponse == TRANS_COMMIT) {
+#ifdef CACHE
/* Invalidate objects in other machine cache */
- if(tdata->buffer->f.nummod > 0) {
+ if(tdata->f.nummod > 0) {
+ int retval;
if((retval = invalidateObj(tdata)) != 0) {
printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
return;
}
}
- }
#endif
+ if(transComProcess(tdata, transinfo, record) != 0) {
+ printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
+ return;
+ }
+ }
- /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
- * to all participants in their respective socket */
- if (sendResponse(tdata, sd) == 0) {
- printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
+ /* Free memory */
+ if (transinfo->objlocked != NULL) {
+ free(transinfo->objlocked);
+ }
+ if (transinfo->objnotfound != NULL) {
+ free(transinfo->objnotfound);
}
- pthread_exit(NULL);
}
/* This function decides the reponse that needs to be sent to
* all Participant machines after the TRANS_REQUEST protocol */
-void decideResponse(thread_data_array_t *tdata) {
- char control;
+char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) {
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
- for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
- control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
- written onto the shared array */
+ for (i = 0 ; i < pilecount; i++) {
+ char control;
+ control = getReplyCtrl[i];
switch(control) {
default:
printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
if(transdisagree > 0) {
/* Send Abort */
- *(tdata->replyctrl) = TRANS_ABORT;
- *(tdata->replyretry) = 0;
+ *treplyretry = 0;
+ return TRANS_ABORT;
#ifdef CACHE
/* clear objects from prefetch cache */
- cleanPCache(tdata);
+ cleanPCache(record);
#endif
- } else if(transagree == tdata->buffer->f.mcount) {
+ } else if(transagree == pilecount) {
/* Send Commit */
- *(tdata->replyctrl) = TRANS_COMMIT;
- *(tdata->replyretry) = 0;
+ *treplyretry = 0;
+ return TRANS_COMMIT;
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
- *(tdata->replyctrl) = TRANS_ABORT;
- *(tdata->replyretry) = 1;
- }
- return;
-}
-
-/* This function sends the final response to remote machines per
- * thread in their respective socket id It returns a char that is only
- * needed to check the correctness of execution of this function
- * inside transRequest()*/
-
-char sendResponse(thread_data_array_t *tdata, int sd) {
- int n, size, sum, oidcount = 0, control;
- char *ptr, retval = 0;
- unsigned int *oidnotfound;
-
- control = *(tdata->replyctrl);
- send_data(sd, &control, sizeof(char));
-
- //TODO read missing objects during object migration
- /* If response is a soft abort due to missing objects at the
- Participant's side */
-
- /* If the decided response is TRANS_ABORT */
- if(*(tdata->replyctrl) == TRANS_ABORT) {
- retval = TRANS_ABORT;
- } else if(*(tdata->replyctrl) == TRANS_COMMIT) {
- /* If the decided response is TRANS_COMMIT */
- retval = TRANS_COMMIT;
+ *treplyretry = 1;
+ return TRANS_ABORT;
}
- return retval;
+ return 0;
}
/* This function opens a connection, places an object read request to
return objcopy;
}
-/* This function handles the local objects involved in a transaction
- * commiting process. It also makes a decision if this local machine
- * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note
- * Coordinator = local machine It wakes up the other threads from
- * remote participants that are waiting for the coordinator's decision
- * and based on common agreement it either commits or aborts the
- * transaction. It also frees the memory resources */
-
-void *handleLocalReq(void *threadarg) {
- unsigned int *oidnotfound = NULL, *oidlocked = NULL;
- local_thread_data_array_t *localtdata;
- int numoidnotfound = 0, numoidlocked = 0;
- int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
- int numread, i;
- unsigned int oid;
- unsigned short version;
- localtdata = (local_thread_data_array_t *) threadarg;
- /* Counters and arrays to formulate decision on control message to be sent */
- oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
- //setting a divider of read locks
- //and write locks
-
- numread = localtdata->tdata->buffer->f.numread;
- /* Process each oid in the machine pile/ group per thread */
- for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
- if (i < localtdata->tdata->buffer->f.numread) {
- int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
- incr *= i;
- oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
- version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
- commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
- } else { // Objects Modified
- if(i == localtdata->tdata->buffer->f.numread) {
- oidlocked[numoidlocked] = -1;
- numoidlocked++;
- }
- int tmpsize;
- objheader_t *headptr;
- headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
- if (headptr == NULL) {
- printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
- oid = OID(headptr);
- version = headptr->version;
- commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
- }
- }
-
- /* Condition to send TRANS_AGREE */
- if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
- }
- /* Condition to send TRANS_SOFT_ABORT */
- if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
- }
-
- /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
- * if Participant receives a TRANS_COMMIT */
- localtdata->transinfo->objlocked = oidlocked;
- localtdata->transinfo->objnotfound = oidnotfound;
- localtdata->transinfo->modptr = NULL;
- localtdata->transinfo->numlocked = numoidlocked;
- localtdata->transinfo->numnotfound = numoidnotfound;
-
- /* Lock and update count */
- //Thread sleeps until all messages from pariticipants are received by coordinator
- pthread_mutex_lock(localtdata->tdata->lock);
- (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
- /* Wake up the threads and invoke decideResponse (once) */
- if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
- decideResponse(localtdata->tdata);
- pthread_cond_broadcast(localtdata->tdata->threshold);
- } else {
- pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
- }
- pthread_mutex_unlock(localtdata->tdata->lock);
- if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
- if(transAbortProcess(localtdata) != 0) {
- printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
- fflush(stdout);
- pthread_exit(NULL);
- }
- } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
-#ifdef CACHE
- /* Invalidate objects in other machine cache */
- if(localtdata->tdata->buffer->f.nummod > 0) {
- int retval;
- if((retval = invalidateObj(localtdata->tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
-#endif
- if(transComProcess(localtdata) != 0) {
- printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
- fflush(stdout);
- pthread_exit(NULL);
- }
- }
-
- /* Free memory */
- if (localtdata->transinfo->objlocked != NULL) {
- free(localtdata->transinfo->objlocked);
- }
- if (localtdata->transinfo->objnotfound != NULL) {
- free(localtdata->transinfo->objnotfound);
- }
- pthread_exit(NULL);
-}
-
/* Commit info for objects modified */
-void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
void *mobj;
/* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ *getReplyCtrl = TRANS_DISAGREE;
+
//Keep track of what is locked
oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
(*numoidlocked)++;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //A lock is acquired some place else
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ *getReplyCtrl = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
}
/* Commit info for objects modified */
-void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
void *mobj;
/* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ *getReplyCtrl = TRANS_DISAGREE;
//Keep track of what is locked
oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
(*numoidlocked)++;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //Has reached max number of readers or some other transaction
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ *getReplyCtrl = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
}
/* This function completes the ABORT process if the transaction is aborting */
-int transAbortProcess(local_thread_data_array_t *localtdata) {
+int transAbortProcess(trans_commit_data_t *transinfo) {
int i, numlocked;
unsigned int *objlocked;
void *header;
- numlocked = localtdata->transinfo->numlocked;
- objlocked = localtdata->transinfo->objlocked;
+ numlocked = transinfo->numlocked;
+ objlocked = transinfo->objlocked;
int useWriteUnlock = 0;
for (i = 0; i < numlocked; i++) {
}
/*This function completes the COMMIT process if the transaction is commiting*/
-int transComProcess(local_thread_data_array_t *localtdata) {
+int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) {
objheader_t *header, *tcptr;
int i, nummod, tmpsize, numcreated, numlocked;
unsigned int *oidmod, *oidcreated, *oidlocked;
void *ptrcreate;
- nummod = localtdata->tdata->buffer->f.nummod;
- oidmod = localtdata->tdata->buffer->oidmod;
- numcreated = localtdata->tdata->buffer->f.numcreated;
- oidcreated = localtdata->tdata->buffer->oidcreated;
- numlocked = localtdata->transinfo->numlocked;
- oidlocked = localtdata->transinfo->objlocked;
+ nummod = tdata->f.nummod;
+ oidmod = tdata->oidmod;
+ numcreated = tdata->f.numcreated;
+ oidcreated = tdata->oidcreated;
+ numlocked = transinfo->numlocked;
+ oidlocked = transinfo->objlocked;
for (i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
return 1;
}
/* Copy from transaction cache -> main object store */
- if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+ if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) {
printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
}
/* If object is newly created inside transaction then commit it */
for (i = 0; i < numcreated; i++) {
- if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+ if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) {
printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
return 1;
}
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
while(1) {
- /* lock mutex of primary prefetch queue */
+ /* read from prefetch queue */
void *node=gettail();
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
return pile;
}
+
+plistnode_t *sortPiles(plistnode_t *pileptr) {
+ plistnode_t *head, *ptr, *tail;
+ head = pileptr;
+ ptr = pileptr;
+ /* Get tail pointer */
+ while(ptr!= NULL) {
+ tail = ptr;
+ ptr = ptr->next;
+ }
+ ptr = pileptr;
+ plistnode_t *prev = pileptr;
+ /* Arrange local machine processing at the end of the pile list */
+ while(ptr != NULL) {
+ if(ptr != tail) {
+ if(ptr->mid == myIpAddr && (prev != pileptr)) {
+ prev->next = ptr->next;
+ ptr->next = NULL;
+ tail->next = ptr;
+ return pileptr;
+ }
+ if((ptr->mid == myIpAddr) && (prev == pileptr)) {
+ prev = ptr->next;
+ ptr->next = NULL;
+ tail->next = ptr;
+ return prev;
+ }
+ prev = ptr;
+ }
+ ptr = ptr->next;
+ }
+ return pileptr;
+}