From: adash Date: Mon, 26 Mar 2007 20:48:02 +0000 (+0000) Subject: Untest code for trans commit X-Git-Tag: preEdgeChange~645 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=0f5f288448c61863cae1eb4dfd966a05f906b41c;p=IRC.git Untest code for trans commit spawns threads for each trans request message sent modifed code to handle the current format of the trans_request protocol --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 7ac0706a..82739f6c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -1,7 +1,7 @@ #ifndef _DSTM_H_ #define _DSTM_H_ -//Client Messages +//Coordinator Messages #define READ_REQUEST 1 #define READ_MULT_REQUEST 2 #define MOVE_REQUEST 3 @@ -9,23 +9,28 @@ #define TRANS_REQUEST 5 #define TRANS_ABORT 6 #define TRANS_COMMIT 7 +#define TRANS_ABORT_BUT_RETRY_COMMIT 8 +#define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9 -//Server Messages -#define OBJECT_FOUND 8 -#define OBJECT_NOT_FOUND 9 -#define OBJECTS_FOUND 10 -#define OBJECTS_NOT_FOUND 11 -#define TRANS_AGREE 12 -#define TRANS_DISAGREE 13//for soft abort -#define TRANS_DISAGREE_ABORT 14//for hard abort -#define TRANS_SUCESSFUL 15//Not necessary for now +//Participant Messages +#define OBJECT_FOUND 10 +#define OBJECT_NOT_FOUND 11 +#define OBJECTS_FOUND 12 +#define OBJECTS_NOT_FOUND 13 +#define TRANS_AGREE 14 +#define TRANS_DISAGREE 15 +#define TRANS_AGREE_BUT_MISSING_OBJECTS 16 +#define TRANS_SOFT_ABORT 17 +#define TRANS_SUCESSFUL 18//Not necessary for now #include #include #include +#include #include "clookup.h" #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB +#define TID_LEN 20 //bit designations for status field of objheader #define DIRTY 0x01 #define NEW 0x02 @@ -56,6 +61,18 @@ typedef struct pile { struct pile *next; }pile_t; +//structure for passing multiple arguments to thread +typedef struct thread_data_array { + int thread_id; + int mid; + int pilecount; + char *buffer; //buffer contains the packet for trans req + char *recvmsg; //shared datastructure to keep track of the control message receiv + pthread_cond_t *threshold; //threshhold for waking up a thread + pthread_mutex_t *lock; //lock the count variable + int *count; //count variable +}thread_data_array_t; + /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -79,6 +96,7 @@ void *dstmAccept(void *); transrecord_t *transStart(); objheader_t *transRead(transrecord_t *record, unsigned int oid); objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid +void *transRequest(void *); int transCommit(transrecord_t *record); //return 0 if successful /* end transactions */ diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 05ea3427..cfcb0126 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -15,11 +15,11 @@ plistnode_t *pCreate(int objects) { return NULL; } pile->index = 0; - pile->vote = 0; + //pile->vote = 0; return pile; } -unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) { +plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) { plistnode_t *ptr, *tmp; int found = 0; @@ -37,7 +37,7 @@ unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int //Add oid for any new machine if (!found) { if((ptr = pCreate(num_objs)) == NULL) { - return 1; + return NULL; } ptr->mid = mid; ptr->obj[ptr->index] = oid; @@ -45,6 +45,30 @@ unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int ptr->next = pile; pile = ptr; } + return pile; +} + +//Count the number of machine groups +int pCount(plistnode_t *pile) { + plistnode_t *tmp; + int pcount = 0; + tmp = pile; + while(tmp != NULL) { + pcount++; + tmp = tmp->next; + } + return pcount; +} + +//Make a list of mid's for each machine group +int pListMid(plistnode_t *pile, unsigned int *list) { + int i = 0; + plistnode_t *tmp; + tmp = pile; + while (tmp != NULL) { + list[i] = tmp->mid; + i++; + } return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index a1ee01b1..2e769f26 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -13,7 +13,9 @@ typedef struct plistnode { } plistnode_t; plistnode_t *pCreate(int); -unsigned int pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int); +plistnode_t *pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int); +int pCount(plistnode_t *pile); +int pListMid(plistnode_t *pile, unsigned int *list); unsigned int *pSearch(plistnode_t *, unsigned int mid); void pDelete(plistnode_t *); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d852dcce..3cb5d560 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -3,6 +3,7 @@ #include "mlookup.h" #include "llookup.h" #include "plookup.h" +#include #include #include #include @@ -68,22 +69,112 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) return tmp; } +void *transRequest(void *threadarg) { + int sd, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0; + struct sockaddr_in serv_addr; + struct hostent *server; + thread_data_array_t *tdata; + char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; + + tdata = (thread_data_array_t *) threadarg; + //Send Trans Request + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket for TRANS_REQUEST"); + return NULL; + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); + //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid); + + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("Error in connect for TRANS_REQUEST"); + return NULL; + } + + if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { + perror("Error sending message for thread"); + return NULL; + } + //Read message from participant side + read(sd, buffer, sizeof(buffer)); + //process the participant's request + recvcontrol = buffer[0]; + //Update common data structure and increment count + tdata->recvmsg[tdata->thread_id] = recvcontrol; + //Lock and update count + //Thread sleeps until all messages from pariticipants are received by coordinator + pthread_mutex_lock(tdata->lock); + (*(tdata->count))++; + + if(*(tdata->count) == tdata->pilecount) { + pthread_cond_broadcast(tdata->threshold); + //Check common data structure + for (i = 0 ; i < tdata->pilecount ; i++) { + //Check in any DISAGREE has come + if(tdata->recvmsg[i] == TRANS_DISAGREE) { + //Send abort + transabort++; + buffer[0] = TRANS_ABORT; + if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { + perror("Error sending message for thread"); + return NULL; + } + } else if(tdata->recvmsg[i] == AGREE) { + transagree++; + } else if(tdata->recvmsg[i] == AGREE_BUT_MISSING_OBJECTS) { + transmiss++; + } else + transsoftabort++; + } + if(transagree == tdata->pilecount){ + //Send Commit + buffer[0] = TRANS_COMMIT; + if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { + perror("Error sending message for thread"); + return NULL; + } + } + if(transsoftabort > 0 && transabort == 0) { + //Send abort but retry commit + //i.e. wait at the participant end and then resend either agree or disagree + // + + } + if(transmiss > 0 && transsoftabort == 0 && transabort == 0) { + //Relookup all missing objects + //send missing mising object/ objects + } + + } else { + pthread_cond_wait(tdata->threshold, tdata->lock); + } + pthread_mutex_unlock(tdata->lock); + close(sd); + //Reset numread and nummod for the next machine + pthread_exit(NULL); +} + int transCommit(transrecord_t *record){ chashlistnode_t *curr, *ptr, *next; unsigned int size;//Represents number of bins in the chash table - unsigned int machinenum; - objheader_t *headeraddr, *localheaderaddr; + unsigned int machinenum, tot_bytes_mod; + objheader_t *headeraddr; plistnode_t *tmp, *pile = NULL; - int sd,n,i; + int i, rc; + int pilecount = 0, offset, numthreads = 0, trecvcount = 0; short numread = 0,nummod = 0; - struct sockaddr_in serv_addr; - struct hostent *server; char buffer[RECEIVE_BUFFER_SIZE],control; - + char tmpbuffer[RECEIVE_BUFFER_SIZE]; + char transid[TID_LEN]; + static int newtid = 0; + pthread_cond_t threshold; + pthread_mutex_t count; + ptr = record->lookupTable->table; size = record->lookupTable->size; - //Look through all the objects in the cache and make pils - //Outer loop for chashtable + //Look through all the objects in the cache and make piles for(i = 0; i < size ;i++) { curr = &ptr[i]; //Inner loop to traverse the linked list of the cache lookupTable @@ -95,60 +186,88 @@ int transCommit(transrecord_t *record){ next = curr->next; //Get machine location for object id machinenum = lhashSearch(curr->key); - // Make piles - pInsert(pile, machinenum, curr->key, record->lookupTable->numelements); + //Make machine groups + if ((pile = pInsert(pile, machinenum, curr->key, record->lookupTable->numelements)) == NULL) { + perror("pInsert calloc error"); + return 1; + } curr = next; } } - tmp = pile; - unsigned int oidmod[record->lookupTable->numelements]; - unsigned int oidread[record->lookupTable->numelements]; - //Process each machine in pile + //Create the packet to be sent in TRANS_REQUEST + tmp = pile; + pilecount = pCount(pile); //Keeps track of the number of participants + + //Thread related variables + pthread_t thread[pilecount]; //Create threads for each participant + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + thread_data_array_t thread_data_array[pilecount]; + + char rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants + + //Initialize and set thread detach attribute + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + //Keep track of list of machine ids per transaction + unsigned int *listmid = calloc(pilecount, sizeof(unsigned int)); + pListMid(pile, listmid); + //Process each machine group while(tmp != NULL) { - //Identify which oids have been updated and which ones have been just read - for(i = 0; i < pile->index; i++) { - headeraddr = (objheader_t *) chashSearch(record->lookupTable, pile->obj[i]); - //check if object modified in cache ?? - if(headeraddr->status >>= DIRTY){ + unsigned int *oidmod = calloc(record->lookupTable->numelements, sizeof(unsigned int)); + unsigned int *oidread = calloc(record->lookupTable->numelements, sizeof(unsigned int)); + nummod = numread = tot_bytes_mod = 0; + offset = 0; + + + //Create transaction id + newtid++; + sprintf(transid, "%x_%d", tmp->mid, newtid); + //Browse through each oid in machine group + for(i = 0; i < tmp->index; i++) { + headeraddr = (objheader_t *) chashSearch(record->lookupTable, tmp->obj[i]); + //check if object modified in cache + if((headeraddr->status >> 1) == 1){ //Keep track of oids that have been modified oidmod[nummod] = headeraddr->oid; nummod++; + tot_bytes_mod += (sizeof(objheader_t) + classsize[headeraddr->type]); //Keeps track of total bytes of modified object } else { + //Keep track of oids that are read oidread[numread] = headeraddr->oid; + //create tuples in temporary buffer + memcpy(tmpbuffer+offset, &headeraddr->oid, sizeof(unsigned int)); + offset += sizeof(unsigned int); + memcpy(tmpbuffer+offset, &headeraddr->version, sizeof(short)); + offset += sizeof(short); numread++; } } - //Send Trans Request in the form - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket for TRANS_REQUEST"); - return 1; - } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); - //serv_addr.sin_addr.s_addr = inet_addr(pile->mid); - - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for TRANS_REQUEST"); - return 1; - } - + //Copy each field of the packet into buffer bzero((char *)buffer,sizeof(buffer)); - control = TRANS_REQUEST; - buffer[0] = control; - //Send numread, nummod, sizeof header for objects read, size of header+objects that are modified - int offset = 1; + offset = 0; + buffer[offset] = TRANS_REQUEST; + offset = offset + 1; + memcpy(buffer+offset, transid, sizeof(char) * TID_LEN); + offset += (sizeof(char) * TID_LEN); + memcpy(buffer+offset, &pilecount, sizeof(int)); + offset += sizeof(int); memcpy(buffer+offset, &numread, sizeof(short)); offset += sizeof(short); memcpy(buffer+offset, &nummod, sizeof(short)); offset += sizeof(short); - for( i= 0; i< numread; i++) { - headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidread[i]); - memcpy(buffer+offset, headeraddr, sizeof(objheader_t)); - offset += sizeof(objheader_t); - } + memcpy(buffer+offset, &tot_bytes_mod, sizeof(unsigned int)); + offset += sizeof(unsigned int); + memcpy(buffer+offset, listmid, sizeof(unsigned int) * pilecount); + offset += (sizeof(unsigned int) * pilecount); + memcpy(buffer+offset, tmpbuffer, sizeof(char) * RECEIVE_BUFFER_SIZE); + offset += (sizeof(char) * RECEIVE_BUFFER_SIZE); + //send objects for all objects modified for( i= 0; i< nummod; i++) { headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]); memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]); @@ -157,117 +276,53 @@ int transCommit(transrecord_t *record){ if (offset > RECEIVE_BUFFER_SIZE) { printf("Error: Buffersize too small"); } - if (write(sd, buffer, sizeof(buffer)) < 0) { - perror("Error sending message"); - return 1; + //Create thread input to pass multiple arguments via structure + thread_data_array[numthreads].thread_id = numthreads; + thread_data_array[numthreads].mid = tmp->mid; + thread_data_array[numthreads].pilecount = pilecount; + thread_data_array[numthreads].buffer = buffer; + thread_data_array[numthreads].recvmsg = rcvd_control_msg; + thread_data_array[numthreads].threshold = &tcond; + thread_data_array[numthreads].lock = &tlock; + thread_data_array[numthreads].count = &trecvcount; + //Spawn thread for each TRANS_REQUEST + rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]); + if (rc) { + perror("Error in pthread create"); + exit(-1); + } + numthreads++; + sleep(2); + free(oidmod); + free(oidread); + tmp = tmp->next; + } + // Free attribute and wait for the other threads + pthread_attr_destroy(&attr); + for (i = 0 ;i < pilecount ; i++) { + rc = pthread_join(thread[i], NULL); + if (rc) + { + printf("ERROR return code from pthread_join() is %d\n", rc); + exit(-1); } -#ifdef DEBUG1 - printf("DEBUG -> ready to rcv ...\n"); -#endif - read(sd, buffer, sizeof(buffer)); - close(sd); - printf("Server sent %d\n",buffer[0]); - /* - if (buffer[0] == TRANS_AGREE) { - //change machine pile - - } - */ - //Reset numread and nummod for the next pile - numread = nummod = 0; - tmp = tmp->next; - } - -} - - -#if 0 -int transCommit(transrecord_t *record){ - //Look through all the objects in the cache - int i,numelements,isFirst; - unsigned int size,machinenum;//Represents number of buckets - void *address; - objheader_t *headeraddr,localheaderaddr; - chashlistnode_t *curr, *ptr, *next; - int sd, size; - struct sockaddr_in serv_addr; - struct hostent *server; - char buffer[RECEIVE_BUFFER_SIZE],control; + - ptr = record->lookupTable->table; - size = record->lookupTable->size; - //Outer loop for chashtable - for(i = 0; i< size ;i++) { - curr = &ptr[i]; - //Inner look to traverse the linked list of the cache lookupTable - while(curr != NULL) { - if(curr->key == 0) { - break; - } - //Find if local or remote - address = mhashSearch(curr->key); - d - localheaderaddr = (objheader_t *) curr->value; - if(address != NULL) { - //Is local so check if the local copy has been updated - headeraddr = (objheader_t *) address; - if(localheaderaddr->version == headeraddr->version){ - //Lock Object - - } - else { - //vote as DISAGREE - //Start TransAbort(); - //Unlock object - } - } - else { - //Is remote - //Find which machine it belongs to - machinenum = lhashSearch(curr->key); - //Start TRANS_REQUEST to machine - - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket"); - return NULL; - } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); - - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect"); - return NULL; - } - bzero((char *)buffer,sizeof(buffer)); - control = READ_REQUEST; - buffer[0] = control; - memcpy(buffer+1, &oid, sizeof(int)); - if (write(sd, buffer, sizeof(buffer)) < 0) { - perror("Error sending message"); - return NULL; - } + //Free resources + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pthread_exit(NULL); + free(listmid); +} -#ifdef DEBUG1 - printf("DEBUG -> ready to rcv ...\n"); -#endif - read(sd, buffer, sizeof(buffer)); - close(sd); - - } - next = curr->next; - } - curr = next; - } +int transSoftAbort(transrecord_t *record){ } -#endif - int transAbort(transrecord_t *record){ + } //mnun will be used to represent the machine IP address later