Comments added and several minor changes to get rid of extra variables
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 9053965fc9d42dbc219ced0d0fad27fbd4a9113d..8e86b346043243c6a969be4bd2861d95dde75fca 100644 (file)
@@ -218,6 +218,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                chashInsert(record->lookupTable, OID(tmp), objcopy); 
                return(objcopy);
        } else { /* If not found anywhere, then block until object appears in prefetch cache */
+#if 0  
                printf("Inside remote machine\n");
                pthread_mutex_lock(&pflookup.lock);
                while(!found) {
@@ -240,16 +241,15 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                                pthread_mutex_unlock(&pflookup.lock);
                        }
                }
+#endif
                /* Get the object from the remote location */
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
-                       //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
                        return NULL;
                }
                else {
-                       //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
                        return(objcopy);
                }
        } 
@@ -292,6 +292,7 @@ plistnode_t *createPiles(transrecord_t *record) {
                        }
                        next = curr->next;
                        //Get machine location for object id
+                       //TODO Check is the object is newly created ...if not then lookup the location table 
 
                        if ((machinenum = lhashSearch(curr->key)) == 0) {
                                printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
@@ -322,9 +323,9 @@ plistnode_t *createPiles(transrecord_t *record) {
 
 /* This function initiates the transaction commit process
  * Spawns threads for each of the new connections with Participants 
- * and creates new piles by calling the createPiles(),
- * Fills the piles with necesaary information and 
- * Sends a transrequest() to each pile*/
+ * and creates new piles by calling the createPiles(), 
+ * Sends a transrequest() to each remote machines for objects found remotely 
+ * and calls handleLocalReq() to process objects found locally */
 int transCommit(transrecord_t *record) {       
        unsigned int tot_bytes_mod, *listmid;
        plistnode_t *pile, *pile_ptr;
@@ -462,10 +463,10 @@ int transCommit(transrecord_t *record) {
        return 0;
 }
 
-/* This function sends information involved in the transaction request and 
- * accepts a response from particpants.
+/* This function sends information involved in the transaction request 
+ * to participants and accepts a response from particpants.
  * It calls decideresponse() to decide on what control message 
- * to send next and sends the message using sendResponse()*/
+ * to send next to participants and sends the message using sendResponse()*/
 void *transRequest(void *threadarg) {
        int sd, i, n;
        struct sockaddr_in serv_addr;
@@ -538,7 +539,7 @@ void *transRequest(void *threadarg) {
        tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
 
        /* Lock and update count */
-       //Thread sleeps until all messages from pariticipants are received by coordinator
+       /* Thread sleeps until all messages from pariticipants are received by coordinator */
        pthread_mutex_lock(tdata->lock);
 
        (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
@@ -572,15 +573,13 @@ void *transRequest(void *threadarg) {
 }
 
 /* This function decides the reponse that needs to be sent to 
- * all Participant machines involved in the transaction commit */
+ * all Participant machines after the TRANS_REQUEST protocol */
 int decideResponse(thread_data_array_t *tdata) {
        char control;
        int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                         message to send */
 
-       //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
-               /*Switch on response from Participant */
                control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
                                                           written onto the shared array */
                switch(control) {
@@ -604,9 +603,8 @@ int decideResponse(thread_data_array_t *tdata) {
                }
        }
 
-       /* Decide what control message to send to Participant */        
+       /* Send Abort */
        if(transdisagree > 0) {
-               /* Send Abort */
                *(tdata->replyctrl) = TRANS_ABORT;
                printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
                /* Free resources */
@@ -646,6 +644,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
                        N = oidcount * sizeof(unsigned int);
                        if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
                                printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                               return 0;
                        }
                        ptr = (char *) oidnotfound;
                        do {
@@ -661,7 +660,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
        } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
                retval = TRANS_COMMIT;
        }
-       /* Send response to the Participant */
+
        if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
                perror("Error sending ctrl message for participant\n");
        }
@@ -742,22 +741,20 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        return objcopy;
 }
 
-/*This function handles the local trans requests involved in a transaction commiting process
- * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
- * Activates the other nonlocal threads that are waiting for the decision and the
- * based on common decision by all groups involved in the transaction it 
- * either commits or aborts the transaction.
- * It also frees the calloced memory resources
- */
-
+/* This function handles the local objects involved in a transaction commiting process.
+ * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
+ * Note Coordinator = local machine
+ * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
+ * based on common agreement it either commits or aborts the transaction.
+ * It also frees the memory resources */
 void *handleLocalReq(void *threadarg) {
-       int val, i = 0;
+       int val, i = 0, size, offset = 0;
        short version;
        char control = 0, *ptr;
        unsigned int oid;
        unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
        void *mobj, *modptr;
-       objheader_t *headptr;
+       objheader_t *headptr, *headeraddr;
        local_thread_data_array_t *localtdata;
 
        localtdata = (local_thread_data_array_t *) threadarg;
@@ -765,9 +762,8 @@ void *handleLocalReq(void *threadarg) {
        /* Counters and arrays to formulate decision on control message to be sent */
        oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
        oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-       oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
-       int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-       int objmodnotfound = 0, nummodfound = 0;
+       int objnotfound = 0, objlocked = 0; 
+       int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
 
        /* modptr points to the beginning of the object store 
         * created at the Pariticipant */ 
@@ -775,8 +771,16 @@ void *handleLocalReq(void *threadarg) {
                printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                return NULL;
        }
+       /* Write modified objects into the mainobject store */
+       for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
+               headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
+               size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+               memcpy(modptr+offset, headeraddr, size);  
+               offset += size;
+       }
 
        ptr = modptr;
+       offset = 0; //Reset 
 
        /* Process each oid in the machine pile/ group per thread */
        for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
@@ -787,10 +791,8 @@ void *handleLocalReq(void *threadarg) {
                        incr += sizeof(unsigned int);
                        version = *((short *)(localtdata->tdata->buffer->objread + incr));
                } else {//Objs modified
-                       headptr = (objheader_t *) ptr;
+                       headptr = (objheader_t *)ptr;
                        oid = OID(headptr);
-                       oidmod[objmod] = oid;//Array containing modified oids
-                       objmod++;
                        version = headptr->version;
                        ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
                }
@@ -812,7 +814,6 @@ void *handleLocalReq(void *threadarg) {
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
-                                       //return tdata->recvmsg[tdata->thread_id].rcv_status;  
                                }
                        } else {/* If Obj is not locked then lock object */
                                STATUS(((objheader_t *)mobj)) |= LOCK;
@@ -829,14 +830,11 @@ void *handleLocalReq(void *threadarg) {
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
-                                       //      return tdata->recvmsg[tdata->thread_id].rcv_status;  
                                }
                        }
                }
        }
 
-       /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
-
        /* Condition to send TRANS_AGREE */
        if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
@@ -859,17 +857,12 @@ void *handleLocalReq(void *threadarg) {
 
        /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
         * if Participant receives a TRANS_COMMIT */
-       localtdata->transinfo->objmod = oidmod;
        localtdata->transinfo->objlocked = oidlocked;
        localtdata->transinfo->objnotfound = oidnotfound;
        localtdata->transinfo->modptr = modptr;
-       localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
        localtdata->transinfo->numlocked = objlocked;
        localtdata->transinfo->numnotfound = objnotfound;
 
-       /*Set flag to show that common data structure for this individual thread has been written to */
-       //*(tdata->localstatus) |= LM_UPDATED;
-
        /* Lock and update count */
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(localtdata->tdata->lock);
@@ -890,12 +883,12 @@ void *handleLocalReq(void *threadarg) {
 
        /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
        if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
-               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
+               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
                        printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
                        return NULL;
                }
        }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
-               if(transComProcess(localtdata->transinfo) != 0) {
+               if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) {
                        printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
                        return NULL;
                }
@@ -904,10 +897,7 @@ void *handleLocalReq(void *threadarg) {
        /* Free memory */
        printf("DEBUG -> Freeing...\n");
        fflush(stdout);
-       if (localtdata->transinfo->objmod != NULL) {
-               free(localtdata->transinfo->objmod);
-               localtdata->transinfo->objmod = NULL;
-       }
+
        if (localtdata->transinfo->objlocked != NULL) {
                free(localtdata->transinfo->objlocked);
                localtdata->transinfo->objlocked = NULL;
@@ -921,7 +911,7 @@ void *handleLocalReq(void *threadarg) {
 }
 /* This function completes the ABORT process if the transaction is aborting 
 */
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
        char *ptr;
        int i;
        objheader_t *tmp_header;
@@ -929,7 +919,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 
        printf("DEBUG -> Recv TRANS_ABORT\n");
        /* Set all ref counts as 1 and do garbage collection */
-       ptr = modptr;
+       ptr = (char *)modptr;
        for(i = 0; i< nummod; i++) {
                tmp_header = (objheader_t *)ptr;
                tmp_header->rcount = 1;
@@ -937,7 +927,10 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        }
        /* Unlock objects that was locked due to this transaction */
        for(i = 0; i< numlocked; i++) {
-               header = mhashSearch(objlocked[i]);// find the header address
+               if((header = mhashSearch(objlocked[i])) == NULL) {
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
                STATUS(((objheader_t *)header)) &= ~(LOCK);
        }
 
@@ -951,33 +944,36 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 
 /*This function completes the COMMIT process is the transaction is commiting
 */
-int transComProcess(trans_commit_data_t *transinfo) {
+int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) {
        objheader_t *header;
        int i = 0, offset = 0;
        char control;
 
-       printf("DEBUG -> Recv TRANS_COMMIT\n");
        /* Process each modified object saved in the mainobject store */
-       for(i=0; i<transinfo->nummod; i++) {
-               if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+       for(i = 0; i < nummod; i++) {
+               if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
                }
                /* Change reference count of older address and free space in objstr ?? */
                header->rcount = 1; //TODO Not sure what would be the val
 
                /* Change ptr address in mhash table */
-               mhashRemove(transinfo->objmod[i]);
-               mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+               mhashRemove(oidmod[i]);
+               mhashInsert(oidmod[i], (((char *)modptr) + offset));
                offset += sizeof(objheader_t) + classsize[TYPE(header)];
 
                /* Update object version number */
-               header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+               header = (objheader_t *) mhashSearch(oidmod[i]);
                header->version += 1;
        }
 
        /* Unlock locked objects */
-       for(i=0; i<transinfo->numlocked; i++) {
-               header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+       for(i = 0; i < numlocked; i++) {
+               if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
                STATUS(header) &= ~(LOCK);
        }
 
@@ -1232,7 +1228,6 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
 
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *t) {
-       //int *offstarray = NULL;
        prefetchqelem_t *qnode;
        prefetchpile_t *pilehead = NULL;
 
@@ -1342,7 +1337,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        tmp = mcpilenode->objpiles;
        while(tmp != NULL) {
                off = offset = 0;
-               count++;  // Keeps track of the number of oid and offset tuples sent per remote machine
+               count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
                len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
                char oidnoffset[len];
                memcpy(oidnoffset, &len, sizeof(int));