Comments added and several minor changes to get rid of extra variables
authoradash <adash>
Fri, 7 Sep 2007 18:21:30 +0000 (18:21 +0000)
committeradash <adash>
Fri, 7 Sep 2007 18:21:30 +0000 (18:21 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index 6e9a481eafa30f7b804c6eaa08db00eb4d17d802..cf85532608355ada140027dc3e43e06f05bdfec3 100644 (file)
@@ -108,85 +108,71 @@ typedef struct transrecord {
   struct ___Object___ * revertlist;
 #endif
 } transrecord_t;
-// Structure that keeps track of responses from the participants
+// Structure is a shared structure that keeps track of responses from the participants
 typedef struct thread_response {
   char rcv_status;
 } thread_response_t;
 
-// Structure that holds  fixed data sizes to be sent along with TRANS_REQUEST
+// Structure that holds  fixed data to be sent along with TRANS_REQUEST
 typedef struct fixed_data {
-  char control;
-  char trans_id[TID_LEN];      
-  int mcount;          // Machine count
-  short numread;               // Number of objects read
-  short nummod;                // Number of objects modified
-  int sum_bytes;       // Total bytes modified
+  char control;                        /* control message */
+  char trans_id[TID_LEN];      /* transaction id */
+  int mcount;                  /* participant count */
+  short numread;               /* no of objects read */
+  short nummod;                        /* no of objects modified */
+  int sum_bytes;               /* total bytes of modified objects in a transaction */
 } fixed_data_t;
 
-// Structure that holds  variable data sizes per machine participant
+/* Structure that holds trans request information for each participant */
 typedef struct trans_req_data {
-  fixed_data_t f;
-  unsigned int *listmid;
-  char *objread;
-  unsigned int *oidmod;
-} trans_req_data_t;
-
-// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process 
-
+  fixed_data_t f;              /* Holds first few fixed bytes of data sent during TRANS_REQUEST protcol*/
+  unsigned int *listmid;       /* Pointer to array holding list of participants */
+  char *objread;               /* Pointer to array holding oid and version number of objects that are only read */ 
+  unsigned int *oidmod;                /* Pointer to array holding oids of objects that are modified */
+} trans_req_data_t;            
+
+/* Structure that holds information of objects that are not found in the participant
+ * and objs locked within a transaction during commit process */
 typedef struct trans_commit_data{
-  unsigned int *objmod;
-  unsigned int *objlocked;
-  unsigned int *objnotfound;
-  void *modptr;
-  int nummod;
-  int numlocked;
-  int numnotfound;
+  unsigned int *objlocked;     /* Pointer to array holding oids of objects locked inside a transaction */
+  unsigned int *objnotfound;    /* Pointer to array holding oids of objects not found on the participant machine */
+  void *modptr;                        /* Pointer to the address in the mainobject store of the participant that holds all modified objects */
+  int numlocked;               /* no of objects locked */
+  int numnotfound;             /* no of objects not found */
 } trans_commit_data_t;
 
 
 #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id);
-//structure for passing multiple arguments to thread
+/* Structure for passing multiple arguments to a thread
+ * spawned to process each transaction on a machine */
 typedef struct thread_data_array {
-  int thread_id;
+  int thread_id;       
   int mid;    
-  int pilecount;
-  trans_req_data_t *buffer;
-  thread_response_t *recvmsg;//shared datastructure to keep track of the control message receiv
-  pthread_cond_t *threshold; //threshhold for waking up a thread
-  pthread_mutex_t *lock;    //lock the count variable
-  int *count;             //variable to count responses of TRANS_REQUEST protocol from all participants
-  char *replyctrl;     //shared ctrl message that stores the reply to be sent, filled by decideResp
-  char *replyretry;    //shared variable to find out if we need retry (TRANS_COMMIT case) 
-  transrecord_t *rec;  // To send modified objects
+  int pilecount;               /* No of remote machines involved */
+  trans_req_data_t *buffer;    /* Holds trans request information sent to participants */  
+  thread_response_t *recvmsg;  /* Shared datastructure to keep track of the participants response to a trans request */
+  pthread_cond_t *threshold;    /* Condition var to waking up a thread */
+  pthread_mutex_t *lock;       /* Lock for counting participants response */
+  int *count;                  /* Variable to count responses from all participants to the TRANS_REQUEST protocol */
+  char *replyctrl;             /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */
+  char *replyretry;            /* Shared variable that keep track if coordinator needs retry */
+  transrecord_t *rec;          /* To send modified objects */
 } thread_data_array_t;
 
 
 //Structure for passing arguments to the local m/c thread
 typedef struct local_thread_data_array {
-       thread_data_array_t *tdata;
-       trans_commit_data_t *transinfo; //Required for trans commit process
+       thread_data_array_t *tdata;     /* Holds all the arguments send to a thread that is spawned when transaction commits */ 
+       trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ 
 } local_thread_data_array_t;
 
-// Structure to save information about an oid necesaary for the decideControl()
-typedef struct objinfo {
-       unsigned int oid;
-       int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) 
-}objinfo_t;
-
 //Structure for members within prefetch tuples
 typedef struct member {
-       short offset;
-       short index;
-       struct member *next;
+       short offset;           /* Holds offset of the ptr field */
+       short index;            /* Holds the array index value */ 
+       struct member *next;    
 }trans_member_t;
 
-/*
-//Structure that holds the compiler generated prefetch data
-typedef struct compprefetchdata {
-transrecord_t *record;
-} compprefetchdata_t;
-*/
-
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
@@ -205,10 +191,11 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes
 void *dstmListen();
 void *dstmAccept(void *);
 int readClientReq(trans_commit_data_t *, int);
-int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, int);
+int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
 char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
-int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, unsigned int *, int);
-int transCommitProcess(trans_commit_data_t *, int);
+int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
+//int transCommitProcess(trans_commit_data_t *, int);
+int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 /* end server portion */
 
 /* Prototypes for transactions */
@@ -229,8 +216,9 @@ void *handleLocalReq(void *);       //the C routine that the local m/c thread will exe
 int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-int transAbortProcess(void *, unsigned int *, int, int, int);
-int transComProcess(trans_commit_data_t *);
+int transAbortProcess(void *, unsigned int *, int, int);
+//int transComProcess(trans_commit_data_t *);
+int transComProcess(void*, unsigned int *, unsigned int *, int, int);
 void prefetch(int, unsigned int *, short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
index a28e906faa59825d494ca27d562028acb825bdcc..65460bd9a58c3997c0f12f88ea337dbd1c075a81 100644 (file)
@@ -20,11 +20,12 @@ extern int classsize[];
 
 objstr_t *mainobjstore;
 
+/* This function initializes the main objects store and creates the 
+ * global machine and location lookup table */
+
 int dstmInit(void)
 {
-       /* Initialize main object store */
        mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
-       /* Create machine lookup table and location lookup table */
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
@@ -34,6 +35,8 @@ int dstmInit(void)
        return 0;
 }
 
+/* This function starts the thread to listen on a socket 
+ * for tranaction calls */
 void *dstmListen()
 {
        int listenfd, acceptfd;
@@ -203,7 +206,9 @@ void *dstmAccept(void *acceptfd)
 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        char *ptr;
        void *modptr;
+       unsigned int *oidmod, oid;
        fixed_data_t fixed;
+       objheader_t *headaddr;
        int sum = 0, i, N, n, val;
 
        /* Read fixed_data_t data structure */ 
@@ -249,17 +254,31 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
                }
                sum = 0;
                do { // Recv the objs that are modified by the Coordinator
-                       n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
+                       n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
                        sum += n;
                } while (sum < fixed.sum_bytes && n != 0);
        }
 
+       /* Create an array of oids for modified objects */
+       oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
+       ptr = (char *) modptr;
+       for(i = 0 ; i < fixed.nummod; i++) {
+               headaddr = (objheader_t *) ptr;
+               oid = OID(headaddr);
+               oidmod[i] = oid;
+               ptr += sizeof(objheader_t) + classsize[TYPE(headaddr)];
+       }
+       
        /*Process the information read */
-       if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, acceptfd)) != 0) {
+       if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
                printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
                return 1;
        }
 
+
+       /* Free resources */
+       free(oidmod);
+
        return 0;
 }
 
@@ -267,7 +286,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
  * function and sends a reply to the co-ordinator.
  * Following this it also receives a new control message from the co-ordinator and processes this message*/
 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
-               unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
+               unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
        char *ptr, control, sendctrl;
        objheader_t *tmp_header;
        void *header;
@@ -314,7 +333,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                case TRANS_COMMIT:
                        /* Invoke the transCommit process() */
                        printf("DEBUG -> Recv TRANS_COMMIT \n");
-                       if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
+                       if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
                                printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
                                return 1;
                        }
@@ -332,10 +351,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        /* Free memory */
        printf("DEBUG -> Freeing...\n");
        fflush(stdout);
-       if (transinfo->objmod != NULL) {
-               free(transinfo->objmod);
-               transinfo->objmod = NULL;
-       }
+       
        if (transinfo->objlocked != NULL) {
                free(transinfo->objlocked);
                transinfo->objlocked = NULL;
@@ -361,14 +377,13 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        /* Counters and arrays to formulate decision on control message to be sent */
        oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
        oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
-       oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
-       int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-       int objmodnotfound = 0, nummodfound = 0;
+       int objnotfound = 0, objlocked = 0;
+       int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
 
        /* modptr points to the beginning of the object store 
         * created at the Pariticipant. 
         * Object store holds the modified objects involved in the transaction request */ 
-       ptr = modptr;
+       ptr = (char *) modptr;
        
        /* Process each oid in the machine pile/ group per thread */
        for (i = 0; i < fixed->numread + fixed->nummod; i++) {
@@ -381,8 +396,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                } else {//Objs modified
                        headptr = (objheader_t *) ptr;
                        oid = OID(headptr);
-                       oidmod[objmod] = oid;//Array containing modified oids
-                       objmod++;
                        version = headptr->version;
                        ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
                }
@@ -440,7 +453,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        
        /* Decide what control message to send to Coordinator */
        if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
-                                       modptr, oidnotfound, oidlocked, oidmod, acceptfd)) == 0) {
+                                       modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
                printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__);
                return 0;
        }
@@ -452,8 +465,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
  * to send to Coordinator based on the votes of oids involved in the transaction */
 int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
                int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
-               unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidmod,
-               int acceptfd) {
+               unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
        int val;
        char control = 0;
        /* Condition to send TRANS_AGREE */
@@ -490,11 +502,9 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
 
        /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
         * if Participant receives a TRANS_COMMIT */
-       transinfo->objmod = oidmod;
        transinfo->objlocked = oidlocked;
        transinfo->objnotfound = oidnotfound;
        transinfo->modptr = modptr;
-       transinfo->nummod = fixed->nummod;
        transinfo->numlocked = *(objlocked);
        transinfo->numnotfound = *(objnotfound);
        
@@ -504,31 +514,35 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
  * addresses in lookup table and also changes version number
  * Sends an ACK back to Coordinator */
-int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
+int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
        objheader_t *header;
        int i = 0, offset = 0;
        char control;
+
        /* Process each modified object saved in the mainobject store */
-       for(i=0; i<transinfo->nummod; i++) {
-               if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+       for(i = 0; i < nummod; i++) {
+               if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
                }
                /* Change reference count of older address and free space in objstr ?? */
                header->rcount = 1; //Not sure what would be the val
 
                /* Change ptr address in mhash table */
-               printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
-               mhashRemove(transinfo->objmod[i]);
-               mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+               mhashRemove(oidmod[i]);
+               mhashInsert(oidmod[i], (((char *)modptr) + offset));
                offset += sizeof(objheader_t) + classsize[TYPE(header)];
 
                /* Update object version number */
-               header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+               header = (objheader_t *) mhashSearch(oidmod[i]);
                header->version += 1; 
        }
        /* Unlock locked objects */
-       for(i=0; i<transinfo->numlocked; i++) {
-               header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+       for(i = 0; i < numlocked; i++) {
+               if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
                STATUS(header) &= ~(LOCK);
        }
 
@@ -544,6 +558,11 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        return 0;
 }
 
+/* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
+ * Looks for the objects to be prefetched in the main object store.
+ * If objects are not found then record those and if objects are found
+ * then use offset values to prefetch references to other objects */
+
 int prefetchReq(int acceptfd) {
        int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
        unsigned int oid, index = 0;
@@ -574,7 +593,6 @@ int prefetchReq(int acceptfd) {
                } while(sum < N && n != 0);     
 
                /* Process each oid */
-               /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
                        /* Save the oids not found in buffer for later use */
                        *(buffer + index) = OBJECT_NOT_FOUND;
index 07a19fd2a6e4fb2fa2cd79906735d455f04666b7..7bf79ba64a8de33a71ede200f2bdd203113591c1 100644 (file)
@@ -1,6 +1,13 @@
 #include "plookup.h"
 extern int classsize[];
 
+//NOTE: "pile" ptr points to the head of the linked list of the machine pile data structures 
+
+/* This function creates a new pile data structure to hold
+ * obj ids of objects modified or read inside a transaction,
+ * no of objects read and no of objects modified
+ * that belong to a single machine */
+
 plistnode_t *pCreate(int objects) {
        plistnode_t *pile;
        
@@ -9,32 +16,37 @@ plistnode_t *pCreate(int objects) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
                return NULL;
        }       
-       pile->next = NULL;
        if ((pile->oidmod = calloc(objects, sizeof(unsigned int))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               free(pile);
                return NULL;
        }
+       /*
        if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
                return NULL;
        }
-       pile->nummod = pile->numread = pile->sum_bytes = 0;
-       if ((pile->objread = calloc(objects, sizeof(int) + sizeof(short))) == NULL) {
+       */
+       if ((pile->objread = calloc(objects, sizeof(unsigned int) + sizeof(short))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               free(pile);
+               free(pile->oidmod);
                return NULL;
        }
-       pile->objmodified = NULL;
-       pile->nummod = pile->numread = pile->sum_bytes = 0;
 
+       pile->nummod = pile->numread = pile->sum_bytes = 0;
+       pile->next = NULL;
        return pile;
 }
 
+/* This function inserts necessary information into 
+ * a machine pile data structure */
 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
        plistnode_t *ptr, *tmp;
        int found = 0, offset;
 
        tmp = pile;
-       //Add oid into a machine that is a part of the pile linked list structure
+       //Add oid into a machine that is already present in the pile linked list structure
        while(tmp != NULL) {
                if (tmp->mid == mid) {
                        if (STATUS(headeraddr) & DIRTY) {
@@ -42,13 +54,12 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                                tmp->nummod = tmp->nummod + 1;
                                tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
                        } else {
-                               tmp->oidread[tmp->numread] = OID(headeraddr);
+               //              tmp->oidread[tmp->numread] = OID(headeraddr);
                                offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
                                *((unsigned int *)(tmp->objread + offset))=OID(headeraddr);
                                offset += sizeof(unsigned int);
                                memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
                                tmp->numread = tmp->numread + 1;
-                       //      printf("DEBUG->pInsert() No of obj read = %d\n", tmp->numread);
                        }
                        found = 1;
                        break;
@@ -66,7 +77,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                        ptr->nummod = ptr->nummod + 1;
                        ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
                } else {
-                       ptr->oidread[ptr->numread] = OID(headeraddr);
+               //      ptr->oidread[ptr->numread] = OID(headeraddr);
                        *((unsigned int *)ptr->objread)=OID(headeraddr);
                        memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
                        ptr->numread = ptr->numread + 1;
@@ -78,7 +89,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
        return pile;
 }
 
-//Count the number of machine groups
+//Count the number of machine piles
 int pCount(plistnode_t *pile) {
        plistnode_t *tmp;
        int pcount = 0;
@@ -110,7 +121,7 @@ void pDelete(plistnode_t *pile) {
        while(tmp != NULL) {
                next = tmp->next;
                free(tmp->oidmod);
-               free(tmp->oidread);
+               //free(tmp->oidread);
                free(tmp->objread);
                free(tmp);
                tmp = next;
index f4a84e29c81619478e9c30dc8f03c43a7301747e..e35c451b3b2fd67b02487b0bf646774df5f59c14 100644 (file)
@@ -5,17 +5,18 @@
 #include <stdio.h>
 #include "dstm.h"
 
+/* This structure is created using a transaction record.
+ * It is filled out with pile information necessary for 
+ * participants involved in a transaction. */
 typedef struct plistnode {
        unsigned int mid;
-       int local;              /*Variable that keeps track if this pile is for LOCAL machine */
-       unsigned int *oidmod;
-       unsigned int *oidread;
-       int nummod;
-       int numread;
-       int sum_bytes;
-       char *objread;
-       char *objmodified;
-       int vote;
+       int local;              /* Variable that keeps track if this pile is for LOCAL machine */
+       unsigned int *oidmod;   /* Pointer to array containing oids of modified objects */ 
+       unsigned int *oidread;  /* TODO: REMOVE THIS Pointer to array of objects read */
+       int nummod;             /* no of objects read */
+       int numread;            /* no of objects modified */
+       int sum_bytes;          /* total bytes of objects modified */
+       char *objread;          /* Pointer to array containing oids of objects read and their version numbers*/
        struct plistnode *next;
 } plistnode_t;
 
index 9053965fc9d42dbc219ced0d0fad27fbd4a9113d..8e86b346043243c6a969be4bd2861d95dde75fca 100644 (file)
@@ -218,6 +218,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                chashInsert(record->lookupTable, OID(tmp), objcopy); 
                return(objcopy);
        } else { /* If not found anywhere, then block until object appears in prefetch cache */
+#if 0  
                printf("Inside remote machine\n");
                pthread_mutex_lock(&pflookup.lock);
                while(!found) {
@@ -240,16 +241,15 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                                pthread_mutex_unlock(&pflookup.lock);
                        }
                }
+#endif
                /* Get the object from the remote location */
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
-                       //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
                        return NULL;
                }
                else {
-                       //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
                        return(objcopy);
                }
        } 
@@ -292,6 +292,7 @@ plistnode_t *createPiles(transrecord_t *record) {
                        }
                        next = curr->next;
                        //Get machine location for object id
+                       //TODO Check is the object is newly created ...if not then lookup the location table 
 
                        if ((machinenum = lhashSearch(curr->key)) == 0) {
                                printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
@@ -322,9 +323,9 @@ plistnode_t *createPiles(transrecord_t *record) {
 
 /* This function initiates the transaction commit process
  * Spawns threads for each of the new connections with Participants 
- * and creates new piles by calling the createPiles(),
- * Fills the piles with necesaary information and 
- * Sends a transrequest() to each pile*/
+ * and creates new piles by calling the createPiles(), 
+ * Sends a transrequest() to each remote machines for objects found remotely 
+ * and calls handleLocalReq() to process objects found locally */
 int transCommit(transrecord_t *record) {       
        unsigned int tot_bytes_mod, *listmid;
        plistnode_t *pile, *pile_ptr;
@@ -462,10 +463,10 @@ int transCommit(transrecord_t *record) {
        return 0;
 }
 
-/* This function sends information involved in the transaction request and 
- * accepts a response from particpants.
+/* This function sends information involved in the transaction request 
+ * to participants and accepts a response from particpants.
  * It calls decideresponse() to decide on what control message 
- * to send next and sends the message using sendResponse()*/
+ * to send next to participants and sends the message using sendResponse()*/
 void *transRequest(void *threadarg) {
        int sd, i, n;
        struct sockaddr_in serv_addr;
@@ -538,7 +539,7 @@ void *transRequest(void *threadarg) {
        tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
 
        /* Lock and update count */
-       //Thread sleeps until all messages from pariticipants are received by coordinator
+       /* Thread sleeps until all messages from pariticipants are received by coordinator */
        pthread_mutex_lock(tdata->lock);
 
        (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
@@ -572,15 +573,13 @@ void *transRequest(void *threadarg) {
 }
 
 /* This function decides the reponse that needs to be sent to 
- * all Participant machines involved in the transaction commit */
+ * all Participant machines after the TRANS_REQUEST protocol */
 int decideResponse(thread_data_array_t *tdata) {
        char control;
        int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                         message to send */
 
-       //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
-               /*Switch on response from Participant */
                control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
                                                           written onto the shared array */
                switch(control) {
@@ -604,9 +603,8 @@ int decideResponse(thread_data_array_t *tdata) {
                }
        }
 
-       /* Decide what control message to send to Participant */        
+       /* Send Abort */
        if(transdisagree > 0) {
-               /* Send Abort */
                *(tdata->replyctrl) = TRANS_ABORT;
                printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
                /* Free resources */
@@ -646,6 +644,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
                        N = oidcount * sizeof(unsigned int);
                        if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
                                printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                               return 0;
                        }
                        ptr = (char *) oidnotfound;
                        do {
@@ -661,7 +660,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
        } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
                retval = TRANS_COMMIT;
        }
-       /* Send response to the Participant */
+
        if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
                perror("Error sending ctrl message for participant\n");
        }
@@ -742,22 +741,20 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        return objcopy;
 }
 
-/*This function handles the local trans requests involved in a transaction commiting process
- * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
- * Activates the other nonlocal threads that are waiting for the decision and the
- * based on common decision by all groups involved in the transaction it 
- * either commits or aborts the transaction.
- * It also frees the calloced memory resources
- */
-
+/* This function handles the local objects involved in a transaction commiting process.
+ * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
+ * Note Coordinator = local machine
+ * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
+ * based on common agreement it either commits or aborts the transaction.
+ * It also frees the memory resources */
 void *handleLocalReq(void *threadarg) {
-       int val, i = 0;
+       int val, i = 0, size, offset = 0;
        short version;
        char control = 0, *ptr;
        unsigned int oid;
        unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
        void *mobj, *modptr;
-       objheader_t *headptr;
+       objheader_t *headptr, *headeraddr;
        local_thread_data_array_t *localtdata;
 
        localtdata = (local_thread_data_array_t *) threadarg;
@@ -765,9 +762,8 @@ void *handleLocalReq(void *threadarg) {
        /* Counters and arrays to formulate decision on control message to be sent */
        oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
        oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-       oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
-       int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-       int objmodnotfound = 0, nummodfound = 0;
+       int objnotfound = 0, objlocked = 0; 
+       int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
 
        /* modptr points to the beginning of the object store 
         * created at the Pariticipant */ 
@@ -775,8 +771,16 @@ void *handleLocalReq(void *threadarg) {
                printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                return NULL;
        }
+       /* Write modified objects into the mainobject store */
+       for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
+               headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
+               size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+               memcpy(modptr+offset, headeraddr, size);  
+               offset += size;
+       }
 
        ptr = modptr;
+       offset = 0; //Reset 
 
        /* Process each oid in the machine pile/ group per thread */
        for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
@@ -787,10 +791,8 @@ void *handleLocalReq(void *threadarg) {
                        incr += sizeof(unsigned int);
                        version = *((short *)(localtdata->tdata->buffer->objread + incr));
                } else {//Objs modified
-                       headptr = (objheader_t *) ptr;
+                       headptr = (objheader_t *)ptr;
                        oid = OID(headptr);
-                       oidmod[objmod] = oid;//Array containing modified oids
-                       objmod++;
                        version = headptr->version;
                        ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
                }
@@ -812,7 +814,6 @@ void *handleLocalReq(void *threadarg) {
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
-                                       //return tdata->recvmsg[tdata->thread_id].rcv_status;  
                                }
                        } else {/* If Obj is not locked then lock object */
                                STATUS(((objheader_t *)mobj)) |= LOCK;
@@ -829,14 +830,11 @@ void *handleLocalReq(void *threadarg) {
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
-                                       //      return tdata->recvmsg[tdata->thread_id].rcv_status;  
                                }
                        }
                }
        }
 
-       /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
-
        /* Condition to send TRANS_AGREE */
        if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
@@ -859,17 +857,12 @@ void *handleLocalReq(void *threadarg) {
 
        /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
         * if Participant receives a TRANS_COMMIT */
-       localtdata->transinfo->objmod = oidmod;
        localtdata->transinfo->objlocked = oidlocked;
        localtdata->transinfo->objnotfound = oidnotfound;
        localtdata->transinfo->modptr = modptr;
-       localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
        localtdata->transinfo->numlocked = objlocked;
        localtdata->transinfo->numnotfound = objnotfound;
 
-       /*Set flag to show that common data structure for this individual thread has been written to */
-       //*(tdata->localstatus) |= LM_UPDATED;
-
        /* Lock and update count */
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(localtdata->tdata->lock);
@@ -890,12 +883,12 @@ void *handleLocalReq(void *threadarg) {
 
        /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
        if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
-               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
+               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
                        printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
                        return NULL;
                }
        }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
-               if(transComProcess(localtdata->transinfo) != 0) {
+               if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) {
                        printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
                        return NULL;
                }
@@ -904,10 +897,7 @@ void *handleLocalReq(void *threadarg) {
        /* Free memory */
        printf("DEBUG -> Freeing...\n");
        fflush(stdout);
-       if (localtdata->transinfo->objmod != NULL) {
-               free(localtdata->transinfo->objmod);
-               localtdata->transinfo->objmod = NULL;
-       }
+
        if (localtdata->transinfo->objlocked != NULL) {
                free(localtdata->transinfo->objlocked);
                localtdata->transinfo->objlocked = NULL;
@@ -921,7 +911,7 @@ void *handleLocalReq(void *threadarg) {
 }
 /* This function completes the ABORT process if the transaction is aborting 
 */
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
        char *ptr;
        int i;
        objheader_t *tmp_header;
@@ -929,7 +919,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 
        printf("DEBUG -> Recv TRANS_ABORT\n");
        /* Set all ref counts as 1 and do garbage collection */
-       ptr = modptr;
+       ptr = (char *)modptr;
        for(i = 0; i< nummod; i++) {
                tmp_header = (objheader_t *)ptr;
                tmp_header->rcount = 1;
@@ -937,7 +927,10 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        }
        /* Unlock objects that was locked due to this transaction */
        for(i = 0; i< numlocked; i++) {
-               header = mhashSearch(objlocked[i]);// find the header address
+               if((header = mhashSearch(objlocked[i])) == NULL) {
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
                STATUS(((objheader_t *)header)) &= ~(LOCK);
        }
 
@@ -951,33 +944,36 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 
 /*This function completes the COMMIT process is the transaction is commiting
 */
-int transComProcess(trans_commit_data_t *transinfo) {
+int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) {
        objheader_t *header;
        int i = 0, offset = 0;
        char control;
 
-       printf("DEBUG -> Recv TRANS_COMMIT\n");
        /* Process each modified object saved in the mainobject store */
-       for(i=0; i<transinfo->nummod; i++) {
-               if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+       for(i = 0; i < nummod; i++) {
+               if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
                }
                /* Change reference count of older address and free space in objstr ?? */
                header->rcount = 1; //TODO Not sure what would be the val
 
                /* Change ptr address in mhash table */
-               mhashRemove(transinfo->objmod[i]);
-               mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+               mhashRemove(oidmod[i]);
+               mhashInsert(oidmod[i], (((char *)modptr) + offset));
                offset += sizeof(objheader_t) + classsize[TYPE(header)];
 
                /* Update object version number */
-               header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+               header = (objheader_t *) mhashSearch(oidmod[i]);
                header->version += 1;
        }
 
        /* Unlock locked objects */
-       for(i=0; i<transinfo->numlocked; i++) {
-               header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+       for(i = 0; i < numlocked; i++) {
+               if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
                STATUS(header) &= ~(LOCK);
        }
 
@@ -1232,7 +1228,6 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
 
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *t) {
-       //int *offstarray = NULL;
        prefetchqelem_t *qnode;
        prefetchpile_t *pilehead = NULL;
 
@@ -1342,7 +1337,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        tmp = mcpilenode->objpiles;
        while(tmp != NULL) {
                off = offset = 0;
-               count++;  // Keeps track of the number of oid and offset tuples sent per remote machine
+               count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
                len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
                char oidnoffset[len];
                memcpy(oidnoffset, &len, sizeof(int));