From 5b6348a2bded51013a57601611b6e35b375889d9 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 28 Mar 2007 07:44:35 +0000 Subject: [PATCH] make changes to get rid of long sequence of memcpy's, address buffer structures being overwritten, split long functions etc. --- Robust/src/Runtime/DSTM/interface/dstm.h | 33 ++- Robust/src/Runtime/DSTM/interface/plookup.c | 67 +++--- Robust/src/Runtime/DSTM/interface/plookup.h | 15 +- Robust/src/Runtime/DSTM/interface/trans.c | 217 ++++++++++---------- 4 files changed, 185 insertions(+), 147 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 82739f6c..047626fa 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -21,7 +21,7 @@ #define TRANS_DISAGREE 15 #define TRANS_AGREE_BUT_MISSING_OBJECTS 16 #define TRANS_SOFT_ABORT 17 -#define TRANS_SUCESSFUL 18//Not necessary for now +#define TRANS_SUCESSFUL 18 #include #include @@ -61,16 +61,40 @@ typedef struct pile { struct pile *next; }pile_t; +// Structure that keeps track of responses from the participants +typedef struct thread_response { + char rcv_status; +}thread_response_t; + +// Structure that holds fixed data sizes to be sent along with TRANS_REQUEST +typedef struct fixed_data { + char control; + char trans_id[TID_LEN]; + int mcount; // Machine count + short numread; // Number of objects read + short nummod; // Number of objects modified + int sum_bytes; // Total bytes modified +}fixed_data_t; + +// Structure that holds variable data sizes per machine participant +typedef struct trans_req_data { + fixed_data_t f; + unsigned int *listmid; + char *objread; + unsigned int *oidmod; +}trans_req_data_t; + //structure for passing multiple arguments to thread typedef struct thread_data_array { int thread_id; int mid; int pilecount; - char *buffer; //buffer contains the packet for trans req - char *recvmsg; //shared datastructure to keep track of the control message receiv + trans_req_data_t *buffer; + thread_response_t *recvmsg;//shared datastructure to keep track of the control message receiv pthread_cond_t *threshold; //threshhold for waking up a thread pthread_mutex_t *lock; //lock the count variable int *count; //count variable + transrecord_t *rec; // To send modified objects }thread_data_array_t; /* Initialize main object store and lookup tables, start server thread. */ @@ -96,8 +120,9 @@ void *dstmAccept(void *); transrecord_t *transStart(); objheader_t *transRead(transrecord_t *record, unsigned int oid); objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid -void *transRequest(void *); +void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins int transCommit(transrecord_t *record); //return 0 if successful +int decideResponse(thread_data_array_t *tdata, char *buffer, int sd);// Coordinator decides what response to send to the participant /* end transactions */ void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index cfcb0126..15236d51 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -1,4 +1,5 @@ - #include "plookup.h" +#include "plookup.h" +extern int classsize[]; plistnode_t *pCreate(int objects) { plistnode_t *pile; @@ -9,26 +10,45 @@ plistnode_t *pCreate(int objects) { return NULL; } pile->next = NULL; - //Create array of objects - if((pile->obj = calloc(objects, sizeof(unsigned int))) == NULL) { + if ((pile->oidmod = calloc(objects, sizeof(unsigned int))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); return NULL; } - pile->index = 0; - //pile->vote = 0; + if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + pile->nummod = pile->numread = pile->sum_bytes = 0; + if ((pile->objread = calloc(objects, sizeof(int) + sizeof(short))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + pile->objmodified = NULL; + pile->nummod = pile->numread = pile->sum_bytes = 0; + return pile; } -plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) { +plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { plistnode_t *ptr, *tmp; - int found = 0; + int found = 0, offset; tmp = pile; //Add oid into a machine that is a part of the pile linked list structure while(tmp != NULL) { if (tmp->mid == mid) { - tmp->obj[tmp->index] = oid; - tmp->index++; + if ((headeraddr->status >> 1) == 1) { + tmp->oidmod[tmp->nummod] = headeraddr->oid; + tmp->nummod = tmp->nummod + 1; + tmp->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type]; + } else { + tmp->oidread[tmp->numread] = headeraddr->oid; + offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; + memcpy(tmp->objread, &headeraddr->oid, sizeof(unsigned int)); + offset += sizeof(unsigned int); + memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short)); + tmp->numread = tmp->numread + 1; + } found = 1; break; } @@ -40,8 +60,16 @@ plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int return NULL; } ptr->mid = mid; - ptr->obj[ptr->index] = oid; - ptr->index++; + if ((headeraddr->status >> 1) == 1) { + ptr->oidmod[ptr->nummod] = headeraddr->oid; + ptr->nummod = ptr->nummod + 1; + ptr->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type]; + } else { + ptr->oidread[ptr->numread] = headeraddr->oid; + memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int)); + memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short)); + ptr->numread = ptr->numread + 1; + } ptr->next = pile; pile = ptr; } @@ -72,26 +100,15 @@ int pListMid(plistnode_t *pile, unsigned int *list) { return 0; } -// Return objects for a given mid -unsigned int *pSearch(plistnode_t *pile, unsigned int mid) { - plistnode_t *tmp; - tmp = pile; - while(tmp != NULL) { - if(tmp->mid == mid) { - return(tmp->obj); - } - tmp = tmp->next; - } - return NULL; -} - //Delete the entire pile void pDelete(plistnode_t *pile) { plistnode_t *next, *tmp; tmp = pile; while(tmp != NULL) { next = tmp->next; - free(tmp->obj); + free(tmp->oidmod); + free(tmp->oidread); + free(tmp->objread); free(tmp); tmp = next; } diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index 2e769f26..0d8f67af 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -3,21 +3,26 @@ #include #include +#include "dstm.h" typedef struct plistnode { unsigned int mid; - unsigned int *obj; //this can be cast to another type or used to point to a larger structure - int index; + unsigned int *oidmod; + unsigned int *oidread; + int nummod; + int numread; + int sum_bytes; + char *objread; + char *objmodified; int vote; struct plistnode *next; } plistnode_t; plistnode_t *pCreate(int); -plistnode_t *pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int); +plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs); int pCount(plistnode_t *pile); int pListMid(plistnode_t *pile, unsigned int *list); -unsigned int *pSearch(plistnode_t *, unsigned int mid); -void pDelete(plistnode_t *); +void pDelete(plistnode_t *pile); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 3cb5d560..a3661d43 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -69,11 +69,53 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) return tmp; } +int decideResponse(thread_data_array_t *tdata, char *buffer, int sd) { + int i, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0; + + //Check common data structure + for (i = 0 ; i < tdata->pilecount ; i++) { + //Check in any DISAGREE has come + if(tdata->recvmsg[i].rcv_status == TRANS_DISAGREE) { + //Send abort + transabort++; + buffer[0] = TRANS_ABORT; + if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { + perror("Error sending message for thread"); + return 1; + } + } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE) { + transagree++; + } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE_BUT_MISSING_OBJECTS) { + transmiss++; + } else + transsoftabort++; + } + if(transagree == tdata->pilecount){ + //Send Commit + buffer[0] = TRANS_COMMIT; + if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { + perror("Error sending message for thread"); + return 1; + } + } + if(transsoftabort > 0 && transabort == 0) { + //Send abort but retry commit + //i.e. wait at the participant end and then resend either agree or disagree + // + + } + if(transmiss > 0 && transsoftabort == 0 && transabort == 0) { + //Relookup all missing objects + //send missing mising object/ objects + } +} + void *transRequest(void *threadarg) { - int sd, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0; + int sd, i, n; struct sockaddr_in serv_addr; struct hostent *server; thread_data_array_t *tdata; + objheader_t *headeraddr; char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; tdata = (thread_data_array_t *) threadarg; @@ -93,16 +135,39 @@ void *transRequest(void *threadarg) { return NULL; } - if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { - perror("Error sending message for thread"); + //Multiple writes for sending packets of data + //Send first few fixed bytes of the TRANS_REQUEST protocol + if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) { + perror("Error sending fixed bytes for thread"); + return NULL; + } + //Send list of machines involved in the transaction + if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) { + perror("Error sending list of machines for thread"); + return NULL; + } + //Send oids and version number tuples for objects that are read + if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) { + perror("Error sending tuples for thread"); return NULL; } + //Send objects that are modified + for( i = 0; i < tdata->buffer->f.nummod ; i++) { + headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) { + perror("Error sending obj modified for thread"); + return NULL; + } + } + //Read message from participant side - read(sd, buffer, sizeof(buffer)); + while(n != 0) { + n = read(sd, buffer, sizeof(buffer)); + } //process the participant's request recvcontrol = buffer[0]; //Update common data structure and increment count - tdata->recvmsg[tdata->thread_id] = recvcontrol; + tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; //Lock and update count //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(tdata->lock); @@ -110,49 +175,15 @@ void *transRequest(void *threadarg) { if(*(tdata->count) == tdata->pilecount) { pthread_cond_broadcast(tdata->threshold); - //Check common data structure - for (i = 0 ; i < tdata->pilecount ; i++) { - //Check in any DISAGREE has come - if(tdata->recvmsg[i] == TRANS_DISAGREE) { - //Send abort - transabort++; - buffer[0] = TRANS_ABORT; - if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { - perror("Error sending message for thread"); - return NULL; - } - } else if(tdata->recvmsg[i] == AGREE) { - transagree++; - } else if(tdata->recvmsg[i] == AGREE_BUT_MISSING_OBJECTS) { - transmiss++; - } else - transsoftabort++; - } - if(transagree == tdata->pilecount){ - //Send Commit - buffer[0] = TRANS_COMMIT; - if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) { - perror("Error sending message for thread"); - return NULL; - } - } - if(transsoftabort > 0 && transabort == 0) { - //Send abort but retry commit - //i.e. wait at the participant end and then resend either agree or disagree - // - - } - if(transmiss > 0 && transsoftabort == 0 && transabort == 0) { - //Relookup all missing objects - //send missing mising object/ objects + if (decideResponse(tdata, buffer, sd) == 1) { + printf("decideResponse returned error\n"); + return NULL; } - } else { pthread_cond_wait(tdata->threshold, tdata->lock); } pthread_mutex_unlock(tdata->lock); close(sd); - //Reset numread and nummod for the next machine pthread_exit(NULL); } @@ -164,13 +195,9 @@ int transCommit(transrecord_t *record){ plistnode_t *tmp, *pile = NULL; int i, rc; int pilecount = 0, offset, numthreads = 0, trecvcount = 0; - short numread = 0,nummod = 0; char buffer[RECEIVE_BUFFER_SIZE],control; - char tmpbuffer[RECEIVE_BUFFER_SIZE]; char transid[TID_LEN]; static int newtid = 0; - pthread_cond_t threshold; - pthread_mutex_t count; ptr = record->lookupTable->table; size = record->lookupTable->size; @@ -185,9 +212,16 @@ int transCommit(transrecord_t *record){ } next = curr->next; //Get machine location for object id - machinenum = lhashSearch(curr->key); + if ((machinenum = lhashSearch(curr->key)) == 0) { + printf("Error: No such machine\n"); + return 1; + } + if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { + printf("Error: No such oid\n"); + return 1; + } //Make machine groups - if ((pile = pInsert(pile, machinenum, curr->key, record->lookupTable->numelements)) == NULL) { + if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { perror("pInsert calloc error"); return 1; } @@ -206,7 +240,7 @@ int transCommit(transrecord_t *record){ pthread_mutex_t tlock; thread_data_array_t thread_data_array[pilecount]; - char rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants + thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants //Initialize and set thread detach attribute pthread_attr_init(&attr); @@ -219,84 +253,42 @@ int transCommit(transrecord_t *record){ pListMid(pile, listmid); //Process each machine group while(tmp != NULL) { - unsigned int *oidmod = calloc(record->lookupTable->numelements, sizeof(unsigned int)); - unsigned int *oidread = calloc(record->lookupTable->numelements, sizeof(unsigned int)); - nummod = numread = tot_bytes_mod = 0; - offset = 0; - - //Create transaction id newtid++; - sprintf(transid, "%x_%d", tmp->mid, newtid); - //Browse through each oid in machine group - for(i = 0; i < tmp->index; i++) { - headeraddr = (objheader_t *) chashSearch(record->lookupTable, tmp->obj[i]); - //check if object modified in cache - if((headeraddr->status >> 1) == 1){ - //Keep track of oids that have been modified - oidmod[nummod] = headeraddr->oid; - nummod++; - tot_bytes_mod += (sizeof(objheader_t) + classsize[headeraddr->type]); //Keeps track of total bytes of modified object - } else { - //Keep track of oids that are read - oidread[numread] = headeraddr->oid; - //create tuples in temporary buffer - memcpy(tmpbuffer+offset, &headeraddr->oid, sizeof(unsigned int)); - offset += sizeof(unsigned int); - memcpy(tmpbuffer+offset, &headeraddr->version, sizeof(short)); - offset += sizeof(short); - numread++; - } - } - //Copy each field of the packet into buffer - bzero((char *)buffer,sizeof(buffer)); - offset = 0; - buffer[offset] = TRANS_REQUEST; - offset = offset + 1; - memcpy(buffer+offset, transid, sizeof(char) * TID_LEN); - offset += (sizeof(char) * TID_LEN); - memcpy(buffer+offset, &pilecount, sizeof(int)); - offset += sizeof(int); - memcpy(buffer+offset, &numread, sizeof(short)); - offset += sizeof(short); - memcpy(buffer+offset, &nummod, sizeof(short)); - offset += sizeof(short); - memcpy(buffer+offset, &tot_bytes_mod, sizeof(unsigned int)); - offset += sizeof(unsigned int); - memcpy(buffer+offset, listmid, sizeof(unsigned int) * pilecount); - offset += (sizeof(unsigned int) * pilecount); - memcpy(buffer+offset, tmpbuffer, sizeof(char) * RECEIVE_BUFFER_SIZE); - offset += (sizeof(char) * RECEIVE_BUFFER_SIZE); - //send objects for all objects modified - for( i= 0; i< nummod; i++) { - headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]); - memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]); - offset += sizeof(objheader_t) + classsize[headeraddr->type]; + trans_req_data_t *tosend; + if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { + perror(""); + return 1; } - if (offset > RECEIVE_BUFFER_SIZE) { - printf("Error: Buffersize too small"); - } - //Create thread input to pass multiple arguments via structure + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = tmp->numread; + tosend->f.nummod = tmp->nummod; + tosend->f.sum_bytes = tmp->sum_bytes; + tosend->listmid = listmid; + tosend->objread = tmp->objread; + tosend->oidmod = tmp->oidmod; thread_data_array[numthreads].thread_id = numthreads; thread_data_array[numthreads].mid = tmp->mid; thread_data_array[numthreads].pilecount = pilecount; - thread_data_array[numthreads].buffer = buffer; + thread_data_array[numthreads].buffer = tosend; thread_data_array[numthreads].recvmsg = rcvd_control_msg; thread_data_array[numthreads].threshold = &tcond; thread_data_array[numthreads].lock = &tlock; thread_data_array[numthreads].count = &trecvcount; - //Spawn thread for each TRANS_REQUEST + thread_data_array[numthreads].rec = record; + rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]); if (rc) { perror("Error in pthread create"); - exit(-1); + return 1; } numthreads++; - sleep(2); - free(oidmod); - free(oidread); + //TODO frees ? tmp = tmp->next; } + // Free attribute and wait for the other threads pthread_attr_destroy(&attr); for (i = 0 ;i < pilecount ; i++) { @@ -304,16 +296,15 @@ int transCommit(transrecord_t *record){ if (rc) { printf("ERROR return code from pthread_join() is %d\n", rc); - exit(-1); + return 1; } } - //Free resources pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); - pthread_exit(NULL); free(listmid); + return 0; } int transSoftAbort(transrecord_t *record){ -- 2.34.1