Untest code for trans commit
authoradash <adash>
Mon, 26 Mar 2007 20:48:02 +0000 (20:48 +0000)
committeradash <adash>
Mon, 26 Mar 2007 20:48:02 +0000 (20:48 +0000)
spawns threads for each trans request message sent
modifed code to handle the current format of the trans_request protocol

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index 7ac0706a59af55bf5bbadf59fa92f891524a231f..82739f6cb29b157ff18007fd7202c2a1545dccc9 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef _DSTM_H_
 #define _DSTM_H_
 
-//Client Messages
+//Coordinator Messages
 #define READ_REQUEST           1
 #define READ_MULT_REQUEST      2
 #define MOVE_REQUEST           3
@@ -9,23 +9,28 @@
 #define        TRANS_REQUEST           5
 #define        TRANS_ABORT             6
 #define TRANS_COMMIT           7
+#define TRANS_ABORT_BUT_RETRY_COMMIT   8
+#define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING   9
 
-//Server Messages
-#define OBJECT_FOUND           8
-#define OBJECT_NOT_FOUND       9
-#define OBJECTS_FOUND          10
-#define OBJECTS_NOT_FOUND      11
-#define TRANS_AGREE            12
-#define TRANS_DISAGREE         13//for soft abort
-#define TRANS_DISAGREE_ABORT   14//for hard abort
-#define TRANS_SUCESSFUL                15//Not necessary for now
+//Participant Messages
+#define OBJECT_FOUND           10
+#define OBJECT_NOT_FOUND       11
+#define OBJECTS_FOUND          12
+#define OBJECTS_NOT_FOUND      13
+#define TRANS_AGREE            14
+#define TRANS_DISAGREE         15
+#define TRANS_AGREE_BUT_MISSING_OBJECTS        16
+#define TRANS_SOFT_ABORT       17
+#define TRANS_SUCESSFUL                18//Not necessary for now
 
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
+#include <pthread.h>
 #include "clookup.h"
 
 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
+#define TID_LEN 20
 //bit designations for status field of objheader
 #define DIRTY 0x01
 #define NEW   0x02
@@ -56,6 +61,18 @@ typedef struct pile {
        struct pile *next;
 }pile_t;
 
+//structure for passing multiple arguments to thread
+typedef struct thread_data_array {
+       int thread_id;
+       int mid;    
+       int pilecount;
+       char *buffer;           //buffer contains the packet for trans req
+       char *recvmsg;          //shared datastructure to keep track of the control message receiv
+       pthread_cond_t *threshold; //threshhold for waking up a thread
+       pthread_mutex_t *lock;    //lock the count variable
+       int *count;             //count variable
+}thread_data_array_t;
+
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
@@ -79,6 +96,7 @@ void *dstmAccept(void *);
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *record, unsigned int oid);
 objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
+void *transRequest(void *);
 int transCommit(transrecord_t *record); //return 0 if successful
 /* end transactions */
 
index 05ea34277f437ef11a170080098d8e16e2bea59d..cfcb012636cad849f3e04fd0a1b8b357edb368dc 100644 (file)
@@ -15,11 +15,11 @@ plistnode_t *pCreate(int objects) {
                return NULL;
        }
        pile->index = 0;
-       pile->vote = 0;
+       //pile->vote = 0;
        return pile;
 }
 
-unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) {
+plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) {
        plistnode_t *ptr, *tmp;
        int found = 0;
 
@@ -37,7 +37,7 @@ unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int
        //Add oid for any new machine 
        if (!found) {
                if((ptr = pCreate(num_objs)) == NULL) {
-                       return 1;
+                       return NULL;
                }
                ptr->mid = mid;
                ptr->obj[ptr->index] = oid;
@@ -45,6 +45,30 @@ unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int
                ptr->next = pile;
                pile = ptr;
        }
+       return pile;
+}
+
+//Count the number of machine groups
+int pCount(plistnode_t *pile) {
+       plistnode_t *tmp;
+       int pcount = 0;
+       tmp = pile;
+       while(tmp != NULL) {
+               pcount++;
+               tmp = tmp->next;
+       }
+       return pcount;
+}
+
+//Make a list of mid's for each machine group
+int pListMid(plistnode_t *pile, unsigned int *list) {
+        int i = 0;
+       plistnode_t *tmp;
+       tmp = pile;
+       while (tmp != NULL) {
+               list[i] = tmp->mid;
+               i++;
+       }
        return 0;
 }
 
index a1ee01b139791b21d9d2f5ed09532939cd39b2eb..2e769f26fe9206a01c234f8c409ba5e175676cbd 100644 (file)
@@ -13,7 +13,9 @@ typedef struct plistnode {
 } plistnode_t;
 
 plistnode_t  *pCreate(int);
-unsigned int pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int);
+plistnode_t  *pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int);
+int pCount(plistnode_t *pile);
+int pListMid(plistnode_t *pile, unsigned int *list);
 unsigned int *pSearch(plistnode_t *, unsigned int mid);
 void pDelete(plistnode_t *);
 
index d852dcce8df7caf1a22408436d99f13829fbf288..3cb5d560039aae7a9e5cfc20d9b1367f0501749e 100644 (file)
@@ -3,6 +3,7 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
+#include<pthread.h>
 #include<sys/types.h>
 #include<sys/socket.h>
 #include<netdb.h>
@@ -68,22 +69,112 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        return tmp;
 }
 
+void *transRequest(void *threadarg) {
+       int sd, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+       struct sockaddr_in serv_addr;
+       struct hostent *server;
+       thread_data_array_t *tdata;
+       char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+
+       tdata = (thread_data_array_t *) threadarg;
+       //Send Trans Request
+       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               perror("Error in socket for TRANS_REQUEST");
+               return NULL;
+       }
+       bzero((char*) &serv_addr, sizeof(serv_addr));
+       serv_addr.sin_family = AF_INET;
+       serv_addr.sin_port = htons(LISTEN_PORT);
+       serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+       //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
+
+       if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+               perror("Error in connect for TRANS_REQUEST");
+               return NULL;
+       }
+
+       if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+               perror("Error sending message for thread");
+               return NULL;
+       }
+       //Read message from participant side
+       read(sd, buffer, sizeof(buffer));
+       //process the participant's request
+       recvcontrol = buffer[0];
+       //Update common data structure and increment count
+       tdata->recvmsg[tdata->thread_id] = recvcontrol;
+       //Lock and update count
+       //Thread sleeps until all messages from pariticipants are received by coordinator
+       pthread_mutex_lock(tdata->lock);
+               (*(tdata->count))++;
+       
+       if(*(tdata->count) == tdata->pilecount) {
+               pthread_cond_broadcast(tdata->threshold);
+               //Check common data structure 
+               for (i = 0 ; i < tdata->pilecount ; i++) {
+                       //Check in any DISAGREE has come
+                       if(tdata->recvmsg[i] == TRANS_DISAGREE) {
+                               //Send abort
+                               transabort++;
+                               buffer[0] = TRANS_ABORT;
+                               if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+                                       perror("Error sending message for thread");
+                                       return NULL;
+                               }
+                       } else if(tdata->recvmsg[i] == AGREE) {
+                               transagree++;
+                       } else if(tdata->recvmsg[i] == AGREE_BUT_MISSING_OBJECTS) {
+                               transmiss++;
+                       } else
+                               transsoftabort++;
+               }
+               if(transagree == tdata->pilecount){
+                       //Send Commit
+                       buffer[0] = TRANS_COMMIT;
+                       if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+                               perror("Error sending message for thread");
+                               return NULL;
+                       }
+               }
+               if(transsoftabort > 0 && transabort == 0) {
+                       //Send abort but retry commit
+                       //i.e. wait at the participant end and then resend either agree or disagree
+                       //
+
+               }
+               if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
+                       //Relookup all missing objects
+                       //send missing mising object/ objects
+               }
+
+       } else {
+               pthread_cond_wait(tdata->threshold, tdata->lock);
+       }       
+       pthread_mutex_unlock(tdata->lock);
+       close(sd);
+       //Reset numread and nummod for the next machine
+       pthread_exit(NULL);
+}
+
 int transCommit(transrecord_t *record){        
        chashlistnode_t *curr, *ptr, *next;
        unsigned int size;//Represents number of bins in the chash table
-       unsigned int machinenum;
-       objheader_t *headeraddr, *localheaderaddr;
+       unsigned int machinenum, tot_bytes_mod;
+       objheader_t *headeraddr;
        plistnode_t *tmp, *pile = NULL;
-       int sd,n,i;
+       int i, rc;
+       int pilecount = 0, offset, numthreads = 0, trecvcount = 0;
        short numread = 0,nummod = 0;
-       struct sockaddr_in serv_addr;
-       struct hostent *server;
        char buffer[RECEIVE_BUFFER_SIZE],control;
-       
+       char tmpbuffer[RECEIVE_BUFFER_SIZE];
+       char transid[TID_LEN];
+       static int newtid = 0;
+       pthread_cond_t threshold;
+       pthread_mutex_t count;
+
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
-       //Look through all the objects in the cache and make pils
-       //Outer loop for chashtable
+       //Look through all the objects in the cache and make piles
        for(i = 0; i < size ;i++) {
                curr = &ptr[i];
                //Inner loop to traverse the linked list of the cache lookupTable
@@ -95,60 +186,88 @@ int transCommit(transrecord_t *record){
                        next = curr->next;
                        //Get machine location for object id
                        machinenum = lhashSearch(curr->key);
-                       // Make piles
-                       pInsert(pile, machinenum, curr->key, record->lookupTable->numelements); 
+                       //Make machine groups
+                       if ((pile = pInsert(pile, machinenum, curr->key, record->lookupTable->numelements)) == NULL) {
+                               perror("pInsert calloc error");
+                               return 1;
+                       }
                        curr = next;
                }
        }
 
-       tmp = pile;     
-       unsigned int oidmod[record->lookupTable->numelements];
-       unsigned int oidread[record->lookupTable->numelements];
-       //Process each machine in pile
+       //Create the packet to be sent in TRANS_REQUEST
+       tmp = pile;
+       pilecount = pCount(pile);               //Keeps track of the number of participants
+       
+       //Thread related variables
+       pthread_t thread[pilecount];            //Create threads for each participant
+       pthread_attr_t attr;                    
+       pthread_cond_t tcond;
+       pthread_mutex_t tlock;
+       thread_data_array_t thread_data_array[pilecount];
+       
+       char rcvd_control_msg[pilecount];      //Shared thread array that keeps track of responses of participants
+               
+       //Initialize and set thread detach attribute
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       pthread_mutex_init(&tlock, NULL);
+       pthread_cond_init(&tcond, NULL);
+       
+       //Keep track of list of machine ids per transaction     
+       unsigned int *listmid = calloc(pilecount, sizeof(unsigned int));
+       pListMid(pile, listmid);
+       //Process each machine group
        while(tmp != NULL) {
-               //Identify which oids have been updated and which ones have been just read
-               for(i = 0; i < pile->index; i++) {
-                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, pile->obj[i]);
-                       //check if object modified in cache  ??
-                       if(headeraddr->status >>= DIRTY){
+               unsigned int *oidmod = calloc(record->lookupTable->numelements, sizeof(unsigned int));
+               unsigned int *oidread = calloc(record->lookupTable->numelements, sizeof(unsigned int));
+               nummod = numread = tot_bytes_mod = 0;
+               offset = 0;
+               
+               
+               //Create transaction id
+               newtid++;
+               sprintf(transid, "%x_%d", tmp->mid, newtid);
+               //Browse through each oid in machine group
+               for(i = 0; i < tmp->index; i++) {
+                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, tmp->obj[i]);
+                       //check if object modified in cache
+                       if((headeraddr->status >> 1) == 1){
                                //Keep track of oids that have been modified    
                                oidmod[nummod] = headeraddr->oid;
                                nummod++;
+                               tot_bytes_mod += (sizeof(objheader_t) + classsize[headeraddr->type]); //Keeps track of total bytes of modified object 
                        } else {
+                               //Keep track of oids that are read      
                                oidread[numread] = headeraddr->oid;
+                               //create <oid,version> tuples in temporary buffer
+                               memcpy(tmpbuffer+offset, &headeraddr->oid, sizeof(unsigned int));       
+                               offset += sizeof(unsigned int);
+                               memcpy(tmpbuffer+offset, &headeraddr->version, sizeof(short));  
+                               offset += sizeof(short);
                                numread++;
                        }
                }
-               //Send Trans Request in the form
-               if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-                       perror("Error in socket for TRANS_REQUEST");
-                       return 1;
-               }
-               bzero((char*) &serv_addr, sizeof(serv_addr));
-               serv_addr.sin_family = AF_INET;
-               serv_addr.sin_port = htons(LISTEN_PORT);
-               serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
-               //serv_addr.sin_addr.s_addr = inet_addr(pile->mid);
-
-               if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-                       perror("Error in connect for TRANS_REQUEST");
-                       return 1;
-               }
-               
+               //Copy each field of the packet into buffer
                bzero((char *)buffer,sizeof(buffer));
-               control = TRANS_REQUEST;
-               buffer[0] = control;
-               //Send numread, nummod, sizeof header for objects read, size of header+objects that are modified
-               int offset = 1;
+               offset = 0;
+               buffer[offset] = TRANS_REQUEST;
+               offset = offset + 1;
+               memcpy(buffer+offset, transid, sizeof(char) * TID_LEN);
+               offset += (sizeof(char) * TID_LEN);
+               memcpy(buffer+offset, &pilecount, sizeof(int));
+               offset += sizeof(int);
                memcpy(buffer+offset, &numread, sizeof(short));
                offset += sizeof(short);
                memcpy(buffer+offset, &nummod, sizeof(short));
                offset += sizeof(short);
-               for( i= 0; i< numread; i++) {
-                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidread[i]);
-                       memcpy(buffer+offset, headeraddr, sizeof(objheader_t));
-                       offset += sizeof(objheader_t);
-               }
+               memcpy(buffer+offset, &tot_bytes_mod, sizeof(unsigned int));
+               offset += sizeof(unsigned int);
+               memcpy(buffer+offset, listmid, sizeof(unsigned int) * pilecount);
+               offset += (sizeof(unsigned int) * pilecount);
+               memcpy(buffer+offset, tmpbuffer, sizeof(char) * RECEIVE_BUFFER_SIZE);
+               offset += (sizeof(char) * RECEIVE_BUFFER_SIZE);
+               //send objects for all objects modified
                for( i= 0; i< nummod; i++) {
                        headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]);
                        memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]);
@@ -157,117 +276,53 @@ int transCommit(transrecord_t *record){
                if (offset > RECEIVE_BUFFER_SIZE) {
                        printf("Error: Buffersize too small");
                }
-               if (write(sd, buffer, sizeof(buffer)) < 0) {
-                       perror("Error sending message");
-                       return 1;
+               //Create thread input to pass multiple arguments via structure 
+               thread_data_array[numthreads].thread_id = numthreads;
+               thread_data_array[numthreads].mid = tmp->mid;
+               thread_data_array[numthreads].pilecount = pilecount;
+               thread_data_array[numthreads].buffer = buffer;
+               thread_data_array[numthreads].recvmsg = rcvd_control_msg;
+               thread_data_array[numthreads].threshold = &tcond;
+               thread_data_array[numthreads].lock = &tlock;
+               thread_data_array[numthreads].count = &trecvcount;
+               //Spawn thread for each TRANS_REQUEST
+               rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]);  
+               if (rc) {
+                       perror("Error in pthread create");
+                       exit(-1);       
+               }               
+               numthreads++;           
+               sleep(2);
+               free(oidmod);
+               free(oidread);
+               tmp = tmp->next;
+       }
+       // Free attribute and wait for the other threads
+       pthread_attr_destroy(&attr);
+       for (i = 0 ;i < pilecount ; i++) {
+               rc = pthread_join(thread[i], NULL);
+               if (rc)
+               {
+                       printf("ERROR return code from pthread_join() is %d\n", rc);
+                       exit(-1);
                }
-#ifdef DEBUG1
-               printf("DEBUG -> ready to rcv ...\n");
-#endif
-               read(sd, buffer, sizeof(buffer));
-               close(sd);
-               printf("Server sent %d\n",buffer[0]);
-               /*
-               if (buffer[0] == TRANS_AGREE) {
-                       //change machine pile
-                       
-               } 
-               */
-               //Reset numread and nummod for the next pile
-              numread = nummod = 0;    
-              tmp = tmp->next;
-
        }
-
-}
-
-
-#if 0
-int transCommit(transrecord_t *record){        
-       //Look through all the objects in the cache
-       int i,numelements,isFirst;
-       unsigned int size,machinenum;//Represents number of buckets
-       void *address;
-       objheader_t *headeraddr,localheaderaddr;
-       chashlistnode_t *curr, *ptr, *next;
-       int sd, size;
-       struct sockaddr_in serv_addr;
-       struct hostent *server;
-       char buffer[RECEIVE_BUFFER_SIZE],control;
+               
        
-       ptr = record->lookupTable->table;
-       size = record->lookupTable->size;
-       //Outer loop for chashtable
-       for(i = 0; i< size ;i++) {
-               curr = &ptr[i];
-               //Inner look to traverse the linked list of the cache lookupTable
-               while(curr != NULL) {
-                       if(curr->key == 0) {
-                               break;
-                       }
-                       //Find if local or remote
-                       address = mhashSearch(curr->key);
-                       d
-                       localheaderaddr = (objheader_t *) curr->value;
-                       if(address != NULL) {
-                               //Is local so  check if the local copy has been updated 
-                               headeraddr = (objheader_t *) address;                           
-                               if(localheaderaddr->version == headeraddr->version){
-                                       //Lock Object
-                                       
-                               }
-                               else {
-                                       //vote as DISAGREE
-                                       //Start TransAbort();
-                                       //Unlock object
-                               }
-                       }
-                       else {
-                               //Is remote
-                               //Find which machine it belongs to
-                               machinenum = lhashSearch(curr->key);
-                               //Start TRANS_REQUEST to machine
-
-                               if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-                                       perror("Error in socket");
-                                       return NULL;
-                               }
-                               bzero((char*) &serv_addr, sizeof(serv_addr));
-                               serv_addr.sin_family = AF_INET;
-                               serv_addr.sin_port = htons(LISTEN_PORT);
-                               serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
-
-                               if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-                                       perror("Error in connect");
-                                       return NULL;
-                               }
-                               bzero((char *)buffer,sizeof(buffer));
-                               control = READ_REQUEST;
-                               buffer[0] = control;
-                               memcpy(buffer+1, &oid, sizeof(int));
-                               if (write(sd, buffer, sizeof(buffer)) < 0) {
-                                       perror("Error sending message");
-                                       return NULL;
-                               }
+       //Free resources        
+       pthread_cond_destroy(&tcond);
+       pthread_mutex_destroy(&tlock);
+       pthread_exit(NULL);
+       free(listmid);
+}
 
-#ifdef DEBUG1
-                               printf("DEBUG -> ready to rcv ...\n");
-#endif
-                               read(sd, buffer, sizeof(buffer));
-                               close(sd);
-
-                       }
-                       next = curr->next;
-               }
-               curr = next;
-       }       
+int transSoftAbort(transrecord_t *record){
 
 }
-#endif
-
 
 int transAbort(transrecord_t *record){
 
+
 }
 
 //mnun will be used to represent the machine IP address later