make changes to get rid of long sequence of memcpy's, address buffer structures being
authoradash <adash>
Wed, 28 Mar 2007 07:44:35 +0000 (07:44 +0000)
committeradash <adash>
Wed, 28 Mar 2007 07:44:35 +0000 (07:44 +0000)
overwritten, split long functions etc.

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index 82739f6cb29b157ff18007fd7202c2a1545dccc9..047626fa0d312302138f8847d8c502dc316b0745 100644 (file)
@@ -21,7 +21,7 @@
 #define TRANS_DISAGREE         15
 #define TRANS_AGREE_BUT_MISSING_OBJECTS        16
 #define TRANS_SOFT_ABORT       17
-#define TRANS_SUCESSFUL                18//Not necessary for now
+#define TRANS_SUCESSFUL                18
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -61,16 +61,40 @@ typedef struct pile {
        struct pile *next;
 }pile_t;
 
+// Structure that keeps track of responses from the participants
+typedef struct thread_response {
+       char rcv_status;
+}thread_response_t;
+
+// Structure that holds  fixed data sizes to be sent along with TRANS_REQUEST
+typedef struct fixed_data {
+       char control;
+       char trans_id[TID_LEN]; 
+       int mcount;             // Machine count
+       short numread;          // Number of objects read
+       short nummod;           // Number of objects modified
+       int sum_bytes;  // Total bytes modified
+}fixed_data_t;
+
+// Structure that holds  variable data sizes per machine participant
+typedef struct trans_req_data {
+       fixed_data_t f;
+       unsigned int *listmid;
+       char *objread;
+       unsigned int *oidmod;
+}trans_req_data_t;
+
 //structure for passing multiple arguments to thread
 typedef struct thread_data_array {
        int thread_id;
        int mid;    
        int pilecount;
-       char *buffer;           //buffer contains the packet for trans req
-       char *recvmsg;          //shared datastructure to keep track of the control message receiv
+       trans_req_data_t *buffer;
+       thread_response_t *recvmsg;//shared datastructure to keep track of the control message receiv
        pthread_cond_t *threshold; //threshhold for waking up a thread
        pthread_mutex_t *lock;    //lock the count variable
        int *count;             //count variable
+       transrecord_t *rec;     // To send modified objects
 }thread_data_array_t;
 
 /* Initialize main object store and lookup tables, start server thread. */
@@ -96,8 +120,9 @@ void *dstmAccept(void *);
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *record, unsigned int oid);
 objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
-void *transRequest(void *);
+void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
 int transCommit(transrecord_t *record); //return 0 if successful
+int decideResponse(thread_data_array_t *tdata, char *buffer, int sd);// Coordinator decides what response to send to the participant
 /* end transactions */
 
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
index cfcb012636cad849f3e04fd0a1b8b357edb368dc..15236d518582ec2e74be2ec2e722c89beb7803c4 100644 (file)
@@ -1,4 +1,5 @@
- #include "plookup.h"
+#include "plookup.h"
+extern int classsize[];
 
 plistnode_t *pCreate(int objects) {
        plistnode_t *pile;
@@ -9,26 +10,45 @@ plistnode_t *pCreate(int objects) {
                return NULL;
        }       
        pile->next = NULL;
-       //Create array of objects
-       if((pile->obj = calloc(objects, sizeof(unsigned int))) == NULL) {
+       if ((pile->oidmod = calloc(objects, sizeof(unsigned int))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
                return NULL;
        }
-       pile->index = 0;
-       //pile->vote = 0;
+       if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       pile->nummod = pile->numread = pile->sum_bytes = 0;
+       if ((pile->objread = calloc(objects, sizeof(int) + sizeof(short))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       pile->objmodified = NULL;
+       pile->nummod = pile->numread = pile->sum_bytes = 0;
+
        return pile;
 }
 
-plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) {
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
        plistnode_t *ptr, *tmp;
-       int found = 0;
+       int found = 0, offset;
 
        tmp = pile;
        //Add oid into a machine that is a part of the pile linked list structure
        while(tmp != NULL) {
                if (tmp->mid == mid) {
-                       tmp->obj[tmp->index] = oid;
-                       tmp->index++;
+                       if ((headeraddr->status >> 1) == 1) {
+                               tmp->oidmod[tmp->nummod] = headeraddr->oid;
+                               tmp->nummod = tmp->nummod + 1;
+                               tmp->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
+                       } else {
+                               tmp->oidread[tmp->numread] = headeraddr->oid;
+                               offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+                               memcpy(tmp->objread, &headeraddr->oid, sizeof(unsigned int));
+                               offset += sizeof(unsigned int);
+                               memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
+                               tmp->numread = tmp->numread + 1;
+                       }
                        found = 1;
                        break;
                }
@@ -40,8 +60,16 @@ plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int
                        return NULL;
                }
                ptr->mid = mid;
-               ptr->obj[ptr->index] = oid;
-               ptr->index++;
+               if ((headeraddr->status >> 1) == 1) {
+                       ptr->oidmod[ptr->nummod] = headeraddr->oid;
+                       ptr->nummod = ptr->nummod + 1;
+                       ptr->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
+               } else {
+                       ptr->oidread[ptr->numread] = headeraddr->oid;
+                       memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int));
+                       memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
+                       ptr->numread = ptr->numread + 1;
+               }
                ptr->next = pile;
                pile = ptr;
        }
@@ -72,26 +100,15 @@ int pListMid(plistnode_t *pile, unsigned int *list) {
        return 0;
 }
 
-// Return objects for a given mid
-unsigned int *pSearch(plistnode_t *pile, unsigned int mid) {
-       plistnode_t *tmp;
-       tmp = pile;
-       while(tmp != NULL) {
-               if(tmp->mid == mid) {
-                       return(tmp->obj);
-               }
-               tmp = tmp->next;
-       }
-       return NULL;
-}
-
 //Delete the entire pile
 void pDelete(plistnode_t *pile) {
        plistnode_t *next, *tmp;
        tmp = pile;
        while(tmp != NULL) {
                next = tmp->next;
-               free(tmp->obj);
+               free(tmp->oidmod);
+               free(tmp->oidread);
+               free(tmp->objread);
                free(tmp);
                tmp = next;
        }
index 2e769f26fe9206a01c234f8c409ba5e175676cbd..0d8f67af120fb445b6e3572aacfce1a0d080174f 100644 (file)
@@ -3,21 +3,26 @@
 
 #include <stdlib.h>
 #include <stdio.h>
+#include "dstm.h"
 
 typedef struct plistnode {
        unsigned int mid;
-       unsigned int *obj; //this can be cast to another type or used to point to a larger structure
-       int index;
+       unsigned int *oidmod;
+       unsigned int *oidread;
+       int nummod;
+       int numread;
+       int sum_bytes;
+       char *objread;
+       char *objmodified;
        int vote;
        struct plistnode *next;
 } plistnode_t;
 
 plistnode_t  *pCreate(int);
-plistnode_t  *pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int);
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs);
 int pCount(plistnode_t *pile);
 int pListMid(plistnode_t *pile, unsigned int *list);
-unsigned int *pSearch(plistnode_t *, unsigned int mid);
-void pDelete(plistnode_t *);
+void pDelete(plistnode_t *pile);
 
 #endif
 
index 3cb5d560039aae7a9e5cfc20d9b1367f0501749e..a3661d438bbcb0be4ac8e0e417e63cc0cec2aa37 100644 (file)
@@ -69,11 +69,53 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        return tmp;
 }
 
+int decideResponse(thread_data_array_t *tdata, char *buffer, int sd) {
+       int i, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+
+       //Check common data structure 
+       for (i = 0 ; i < tdata->pilecount ; i++) {
+               //Check in any DISAGREE has come
+               if(tdata->recvmsg[i].rcv_status == TRANS_DISAGREE) {
+                       //Send abort
+                       transabort++;
+                       buffer[0] = TRANS_ABORT;
+                       if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+                               perror("Error sending message for thread");
+                               return 1;
+                       }
+               } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE) {
+                       transagree++;
+               } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE_BUT_MISSING_OBJECTS) {
+                       transmiss++;
+               } else
+                       transsoftabort++;
+       }
+       if(transagree == tdata->pilecount){
+               //Send Commit
+               buffer[0] = TRANS_COMMIT;
+               if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+                       perror("Error sending message for thread");
+                       return 1;
+               }
+       }
+       if(transsoftabort > 0 && transabort == 0) {
+               //Send abort but retry commit
+               //i.e. wait at the participant end and then resend either agree or disagree
+               //
+
+       }
+       if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
+               //Relookup all missing objects
+               //send missing mising object/ objects
+       }
+}
+
 void *transRequest(void *threadarg) {
-       int sd, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+       int sd, i, n;
        struct sockaddr_in serv_addr;
        struct hostent *server;
        thread_data_array_t *tdata;
+       objheader_t *headeraddr;
        char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
 
        tdata = (thread_data_array_t *) threadarg;
@@ -93,16 +135,39 @@ void *transRequest(void *threadarg) {
                return NULL;
        }
 
-       if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
-               perror("Error sending message for thread");
+       //Multiple writes for sending packets of data 
+       //Send first few fixed bytes of the TRANS_REQUEST protocol
+       if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) {
+               perror("Error sending fixed bytes for thread");
+               return NULL;
+       }
+       //Send list of machines involved in the transaction
+       if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
+               perror("Error sending list of machines for thread");
+               return NULL;
+       }
+       //Send oids and version number tuples for objects that are read
+       if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) {
+               perror("Error sending tuples for thread");
                return NULL;
        }
+       //Send objects that are modified
+       for( i = 0; i < tdata->buffer->f.nummod ; i++) {
+               headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+               if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
+                       perror("Error sending obj modified for thread");
+                       return NULL;
+               }
+       }
+       
        //Read message from participant side
-       read(sd, buffer, sizeof(buffer));
+       while(n != 0) {
+               n = read(sd, buffer, sizeof(buffer));
+       }
        //process the participant's request
        recvcontrol = buffer[0];
        //Update common data structure and increment count
-       tdata->recvmsg[tdata->thread_id] = recvcontrol;
+       tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
        //Lock and update count
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(tdata->lock);
@@ -110,49 +175,15 @@ void *transRequest(void *threadarg) {
        
        if(*(tdata->count) == tdata->pilecount) {
                pthread_cond_broadcast(tdata->threshold);
-               //Check common data structure 
-               for (i = 0 ; i < tdata->pilecount ; i++) {
-                       //Check in any DISAGREE has come
-                       if(tdata->recvmsg[i] == TRANS_DISAGREE) {
-                               //Send abort
-                               transabort++;
-                               buffer[0] = TRANS_ABORT;
-                               if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
-                                       perror("Error sending message for thread");
-                                       return NULL;
-                               }
-                       } else if(tdata->recvmsg[i] == AGREE) {
-                               transagree++;
-                       } else if(tdata->recvmsg[i] == AGREE_BUT_MISSING_OBJECTS) {
-                               transmiss++;
-                       } else
-                               transsoftabort++;
-               }
-               if(transagree == tdata->pilecount){
-                       //Send Commit
-                       buffer[0] = TRANS_COMMIT;
-                       if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
-                               perror("Error sending message for thread");
-                               return NULL;
-                       }
-               }
-               if(transsoftabort > 0 && transabort == 0) {
-                       //Send abort but retry commit
-                       //i.e. wait at the participant end and then resend either agree or disagree
-                       //
-
-               }
-               if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
-                       //Relookup all missing objects
-                       //send missing mising object/ objects
+               if (decideResponse(tdata, buffer, sd) == 1) {
+                       printf("decideResponse returned error\n");
+                       return NULL;
                }
-
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
        }       
        pthread_mutex_unlock(tdata->lock);
        close(sd);
-       //Reset numread and nummod for the next machine
        pthread_exit(NULL);
 }
 
@@ -164,13 +195,9 @@ int transCommit(transrecord_t *record){
        plistnode_t *tmp, *pile = NULL;
        int i, rc;
        int pilecount = 0, offset, numthreads = 0, trecvcount = 0;
-       short numread = 0,nummod = 0;
        char buffer[RECEIVE_BUFFER_SIZE],control;
-       char tmpbuffer[RECEIVE_BUFFER_SIZE];
        char transid[TID_LEN];
        static int newtid = 0;
-       pthread_cond_t threshold;
-       pthread_mutex_t count;
 
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
@@ -185,9 +212,16 @@ int transCommit(transrecord_t *record){
                        }
                        next = curr->next;
                        //Get machine location for object id
-                       machinenum = lhashSearch(curr->key);
+                       if ((machinenum = lhashSearch(curr->key)) == 0) {
+                              printf("Error: No such machine\n");
+                              return 1;
+                       }               
+                       if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+                               printf("Error: No such oid\n");
+                               return 1;
+                       }
                        //Make machine groups
-                       if ((pile = pInsert(pile, machinenum, curr->key, record->lookupTable->numelements)) == NULL) {
+                       if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
                                perror("pInsert calloc error");
                                return 1;
                        }
@@ -206,7 +240,7 @@ int transCommit(transrecord_t *record){
        pthread_mutex_t tlock;
        thread_data_array_t thread_data_array[pilecount];
        
-       char rcvd_control_msg[pilecount];      //Shared thread array that keeps track of responses of participants
+       thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
                
        //Initialize and set thread detach attribute
        pthread_attr_init(&attr);
@@ -219,84 +253,42 @@ int transCommit(transrecord_t *record){
        pListMid(pile, listmid);
        //Process each machine group
        while(tmp != NULL) {
-               unsigned int *oidmod = calloc(record->lookupTable->numelements, sizeof(unsigned int));
-               unsigned int *oidread = calloc(record->lookupTable->numelements, sizeof(unsigned int));
-               nummod = numread = tot_bytes_mod = 0;
-               offset = 0;
-               
-               
                //Create transaction id
                newtid++;
-               sprintf(transid, "%x_%d", tmp->mid, newtid);
-               //Browse through each oid in machine group
-               for(i = 0; i < tmp->index; i++) {
-                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, tmp->obj[i]);
-                       //check if object modified in cache
-                       if((headeraddr->status >> 1) == 1){
-                               //Keep track of oids that have been modified    
-                               oidmod[nummod] = headeraddr->oid;
-                               nummod++;
-                               tot_bytes_mod += (sizeof(objheader_t) + classsize[headeraddr->type]); //Keeps track of total bytes of modified object 
-                       } else {
-                               //Keep track of oids that are read      
-                               oidread[numread] = headeraddr->oid;
-                               //create <oid,version> tuples in temporary buffer
-                               memcpy(tmpbuffer+offset, &headeraddr->oid, sizeof(unsigned int));       
-                               offset += sizeof(unsigned int);
-                               memcpy(tmpbuffer+offset, &headeraddr->version, sizeof(short));  
-                               offset += sizeof(short);
-                               numread++;
-                       }
-               }
-               //Copy each field of the packet into buffer
-               bzero((char *)buffer,sizeof(buffer));
-               offset = 0;
-               buffer[offset] = TRANS_REQUEST;
-               offset = offset + 1;
-               memcpy(buffer+offset, transid, sizeof(char) * TID_LEN);
-               offset += (sizeof(char) * TID_LEN);
-               memcpy(buffer+offset, &pilecount, sizeof(int));
-               offset += sizeof(int);
-               memcpy(buffer+offset, &numread, sizeof(short));
-               offset += sizeof(short);
-               memcpy(buffer+offset, &nummod, sizeof(short));
-               offset += sizeof(short);
-               memcpy(buffer+offset, &tot_bytes_mod, sizeof(unsigned int));
-               offset += sizeof(unsigned int);
-               memcpy(buffer+offset, listmid, sizeof(unsigned int) * pilecount);
-               offset += (sizeof(unsigned int) * pilecount);
-               memcpy(buffer+offset, tmpbuffer, sizeof(char) * RECEIVE_BUFFER_SIZE);
-               offset += (sizeof(char) * RECEIVE_BUFFER_SIZE);
-               //send objects for all objects modified
-               for( i= 0; i< nummod; i++) {
-                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]);
-                       memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]);
-                       offset += sizeof(objheader_t) + classsize[headeraddr->type];
+               trans_req_data_t *tosend;
+               if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+                       perror("");
+                       return 1;
                }
-               if (offset > RECEIVE_BUFFER_SIZE) {
-                       printf("Error: Buffersize too small");
-               }
-               //Create thread input to pass multiple arguments via structure 
+               tosend->f.control = TRANS_REQUEST;
+               sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
+               tosend->f.mcount = pilecount;
+               tosend->f.numread = tmp->numread;
+               tosend->f.nummod = tmp->nummod;
+               tosend->f.sum_bytes = tmp->sum_bytes;
+               tosend->listmid = listmid;
+               tosend->objread = tmp->objread;
+               tosend->oidmod = tmp->oidmod;
                thread_data_array[numthreads].thread_id = numthreads;
                thread_data_array[numthreads].mid = tmp->mid;
                thread_data_array[numthreads].pilecount = pilecount;
-               thread_data_array[numthreads].buffer = buffer;
+               thread_data_array[numthreads].buffer = tosend;
                thread_data_array[numthreads].recvmsg = rcvd_control_msg;
                thread_data_array[numthreads].threshold = &tcond;
                thread_data_array[numthreads].lock = &tlock;
                thread_data_array[numthreads].count = &trecvcount;
-               //Spawn thread for each TRANS_REQUEST
+               thread_data_array[numthreads].rec = record;
+
                rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]);  
                if (rc) {
                        perror("Error in pthread create");
-                       exit(-1);       
+                       return 1;
                }               
                numthreads++;           
-               sleep(2);
-               free(oidmod);
-               free(oidread);
+               //TODO frees ?
                tmp = tmp->next;
        }
+
        // Free attribute and wait for the other threads
        pthread_attr_destroy(&attr);
        for (i = 0 ;i < pilecount ; i++) {
@@ -304,16 +296,15 @@ int transCommit(transrecord_t *record){
                if (rc)
                {
                        printf("ERROR return code from pthread_join() is %d\n", rc);
-                       exit(-1);
+                       return 1;
                }
        }
-               
        
        //Free resources        
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
-       pthread_exit(NULL);
        free(listmid);
+       return 0;
 }
 
 int transSoftAbort(transrecord_t *record){