-/* This function handles the local objects involved in a transaction
- * commiting process. It also makes a decision if this local machine
- * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note
- * Coordinator = local machine It wakes up the other threads from
- * remote participants that are waiting for the coordinator's decision
- * and based on common agreement it either commits or aborts the
- * transaction. It also frees the memory resources */
-
-void *handleLocalReq(void *threadarg) {
- unsigned int *oidnotfound = NULL, *oidlocked = NULL;
- local_thread_data_array_t *localtdata;
- int numoidnotfound = 0, numoidlocked = 0;
- int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
- int numread, i;
- unsigned int oid;
- unsigned short version;
- localtdata = (local_thread_data_array_t *) threadarg;
- /* Counters and arrays to formulate decision on control message to be sent */
- oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
- //setting a divider of read locks
- //and write locks
-
- numread = localtdata->tdata->buffer->f.numread;
- /* Process each oid in the machine pile/ group per thread */
- for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
- if (i < localtdata->tdata->buffer->f.numread) {
- int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
- incr *= i;
- oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
- version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
- commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
- } else { // Objects Modified
- if(i == localtdata->tdata->buffer->f.numread) {
- oidlocked[numoidlocked] = -1;
- numoidlocked++;
- }
- int tmpsize;
- objheader_t *headptr;
- headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
- if (headptr == NULL) {
- printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
- oid = OID(headptr);
- version = headptr->version;
- commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
- }
- }
-
- /* Condition to send TRANS_AGREE */
- if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
- }
- /* Condition to send TRANS_SOFT_ABORT */
- if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
- }
-
- /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
- * if Participant receives a TRANS_COMMIT */
- localtdata->transinfo->objlocked = oidlocked;
- localtdata->transinfo->objnotfound = oidnotfound;
- localtdata->transinfo->modptr = NULL;
- localtdata->transinfo->numlocked = numoidlocked;
- localtdata->transinfo->numnotfound = numoidnotfound;
-
- /* Lock and update count */
- //Thread sleeps until all messages from pariticipants are received by coordinator
- pthread_mutex_lock(localtdata->tdata->lock);
- (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
- /* Wake up the threads and invoke decideResponse (once) */
- if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
- decideResponse(localtdata->tdata);
- pthread_cond_broadcast(localtdata->tdata->threshold);
- } else {
- pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
- }
- pthread_mutex_unlock(localtdata->tdata->lock);
- if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
- if(transAbortProcess(localtdata) != 0) {
- printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
- fflush(stdout);
- pthread_exit(NULL);
- }
- } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
-#ifdef CACHE
- /* Invalidate objects in other machine cache */
- if(localtdata->tdata->buffer->f.nummod > 0) {
- int retval;
- if((retval = invalidateObj(localtdata->tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
-#endif
- if(transComProcess(localtdata) != 0) {
- printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
- fflush(stdout);
- pthread_exit(NULL);
- }
- }
-
- /* Free memory */
- if (localtdata->transinfo->objlocked != NULL) {
- free(localtdata->transinfo->objlocked);
- }
- if (localtdata->transinfo->objnotfound != NULL) {
- free(localtdata->transinfo->objnotfound);
- }
- pthread_exit(NULL);
-}
-