From 57035d4632092295c9d552188cd5fd23fa6f96c8 Mon Sep 17 00:00:00 2001 From: adash Date: Mon, 2 Apr 2007 09:31:50 +0000 Subject: [PATCH] Finished and tested complete TRANS_COMMIT process other bug fixes TODO: handle softabort and missing object cases i.e. trans abort but retry commit --- Robust/src/Runtime/DSTM/interface/dstm.h | 25 +- .../src/Runtime/DSTM/interface/dstmserver.c | 229 +++++++++++++----- .../src/Runtime/DSTM/interface/testserver.c | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 69 ++++-- 4 files changed, 247 insertions(+), 77 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index cb3a9bd0..e990aae3 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -17,15 +17,21 @@ #define OBJECT_NOT_FOUND 11 #define OBJECTS_FOUND 12 #define OBJECTS_NOT_FOUND 13 -#define OBJ_LOCKED_BUT_VERSION_MATCH 14 -#define OBJ_UNLOCK_BUT_VERSION_MATCH 15 -#define VERSION_NO_MATCH 16 #define TRANS_AGREE 17 #define TRANS_DISAGREE 18 #define TRANS_AGREE_BUT_MISSING_OBJECTS 19 #define TRANS_SOFT_ABORT 20 #define TRANS_SUCESSFUL 21 +//Control bits for status of objects in Machine pile +#define OBJ_LOCKED_BUT_VERSION_MATCH 14 +#define OBJ_UNLOCK_BUT_VERSION_MATCH 15 +#define VERSION_NO_MATCH 16 +//TODO REMOVE THIS +#define NO_MISSING_OIDS 22 +#define MISSING_OIDS_PRESENT 23 + + #include #include #include @@ -106,6 +112,14 @@ typedef struct objinfo { int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) }objinfo_t; +// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process +typedef struct trans_commit_data{ + unsigned int *objmod; + unsigned int *objlocked; + void *modptr; + int nummod; + int numlocked; +}trans_commit_data_t; /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -123,8 +137,8 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes /* Prototypes for server portion */ void *dstmListen(); void *dstmAccept(void *); -int readClientReq(int); -int handleTransReq(int, fixed_data_t *, unsigned int *, char *, void *); +int readClientReq(int, trans_commit_data_t *); +char handleTransReq(int, fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *); /* end server portion */ /* Prototypes for transactions */ @@ -135,6 +149,7 @@ int decideResponse(thread_data_array_t *tdata, int sd);// Coordinator decides wh void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins int transCommit(transrecord_t *record); //return 0 if successful void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); +int transCommitProcess(trans_commit_data_t *, int); /* end transactions */ void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index b1a89d8e..fdc23691 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -18,9 +18,7 @@ objstr_t *mainobjstore; int dstmInit(void) { - //todo:initialize main object store - //do we want this to be a global variable, or provide - //separate access funtions and hide the structure? + //Initialize main object store mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -80,10 +78,11 @@ void *dstmAccept(void *acceptfd) { int numbytes,i, val; unsigned int oid; - char buffer[RECEIVE_BUFFER_SIZE], control; + char buffer[RECEIVE_BUFFER_SIZE], control,ctrl; char *ptr; void *srcObj; objheader_t *h; + trans_commit_data_t transinfo; int fd_flags = fcntl((int)acceptfd, F_GETFD), size; @@ -91,18 +90,24 @@ void *dstmAccept(void *acceptfd) recv((int)acceptfd, &control, sizeof(char), 0); switch(control) { case READ_REQUEST: + printf("DEBUG -> Recv READ_REQUEST from Coordinator\n"); recv((int)acceptfd, &oid, sizeof(unsigned int), 0); srcObj = mhashSearch(oid); h = (objheader_t *) srcObj; if (h == NULL) { - buffer[0] = OBJECT_NOT_FOUND; + ctrl = OBJECT_NOT_FOUND; } else { - buffer[0] = OBJECT_FOUND; + ctrl = OBJECT_FOUND; size = sizeof(objheader_t) + sizeof(classsize[h->type]); - memcpy(buffer+1, srcObj, size); - } - if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) { - perror(""); + if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) { + perror("Error sending control msg to coordinator\n"); + } + if(send((int)acceptfd, &size, sizeof(int), 0) < 0) { + perror("Error sending size of object to coordinator\n"); + } + if(send((int)acceptfd, h, size, 0) < 0) { + perror("Error in sending object\n"); + } } break; @@ -119,8 +124,8 @@ void *dstmAccept(void *acceptfd) break; case TRANS_REQUEST: - //printf("DEBUG -> TRANS_REQUEST\n"); - if((val = readClientReq((int)acceptfd)) == 1) { + printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n"); + if((val = readClientReq((int)acceptfd, &transinfo)) != 0) { printf("Error in readClientReq\n"); } break; @@ -128,28 +133,6 @@ void *dstmAccept(void *acceptfd) default: printf("Error receiving\n"); } - - //Read for new control message from Coordiator - recv((int)acceptfd, &control, sizeof(char), 0); - switch(control) { - case TRANS_ABORT: - printf("DEBUG -> TRANS_ABORT\n"); - write((int)acceptfd, &control, sizeof(char)); - break; - - case TRANS_COMMIT: - printf("DEBUG -> TRANS_COMMIT\n"); - write((int)acceptfd, &control, sizeof(char)); - //TODO - //change ptr address in mhash table - //unlock objects - //update object version - //change reference count of older address?? - //free space in objstr ?? - //Update location lookup table - break; - } - if (close((int)acceptfd) == -1) { perror("close"); @@ -160,13 +143,14 @@ void *dstmAccept(void *acceptfd) pthread_exit(NULL); } -int readClientReq(int acceptfd) { - char *ptr, control; +int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { + char *ptr, control, prevctrl, sendctrl; void *modptr; objheader_t *h, tmp_header; fixed_data_t fixed; - int sum = 0, N, n; + int sum = 0, N, n, val; + //Reads to process the TRANS_REQUEST protocol further // Read fixed_data N = sizeof(fixed) - 1; ptr = (char *)&fixed;; @@ -204,7 +188,7 @@ int readClientReq(int acceptfd) { // Read modified objects if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { - // printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__); + printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__); return 1; } sum = 0; @@ -213,24 +197,79 @@ int readClientReq(int acceptfd) { //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr)); sum += n; } while (sum < fixed.sum_bytes && n != 0); + //Send control message as per all votes from the particpants - handleTransReq(acceptfd, &fixed, listmid, objread, modptr); + if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { + printf("Handle req error\n"); + } + + //Read for new control message from Coordiator + recv((int)acceptfd, &control, sizeof(char), 0); + switch(control) { + case TRANS_ABORT: + printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n"); + //send ack to coordinator + sendctrl = TRANS_SUCESSFUL; + if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) { + perror("Error sending ACK to coordinator\n"); + } + break; + + case TRANS_COMMIT: + printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n"); + if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) { + printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); + } + break; + case TRANS_ABORT_BUT_RETRY_COMMIT: + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n"); + //Process again after waiting for sometime and on prev control message sent + switch(prevctrl) { + case TRANS_AGREE: + sleep(2); + break; + case TRANS_AGREE_BUT_MISSING_OBJECTS: + break; + case TRANS_SOFT_ABORT: + break; + } + //Try sending either agree or disagree after sometime + //TODO + //Wait in a blocking thread or something + //Recv from client new listmid, mcount and pilecount + //call 2 new functions that are similar to readClientReq and handleRequest + break; + case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n"); + break; + default: + printf("No response to TRANS_AGREE OR DISAGREE protocol\n"); + break; + } - - return 0; } //This function runs a decision after all objects are weighed under one of the 4 possibilities //and returns the appropriate control message to the Ccordinator -int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) { +char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) { short version; - char control, *ptr; - int i; - unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread]; - int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent + char control = 0, ctrlmissoid, *ptr, *oidmodnotfound; + int i, j = 0; + unsigned int oid; + unsigned int *oidnotfound, *oidlocked, *oidmod; + + oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); + oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int)); + oidmodnotfound = (char *) calloc(fixed->nummod, sizeof(char)); + + // Counters and arrays to formulate decision on control message to be sent + int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int objmodnotfound = 0, nummodfound = 0; void *mobj; objheader_t *headptr; + //TODO remove this deadcode from here objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc) //Process each object present in the pile @@ -246,12 +285,18 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha } else {//Obj is modified headptr = (objheader_t *) ptr; oid = headptr->oid; + oidmod[objmod] = oid;//Array containing modified oids + objmod++; version = headptr->version; ptr += sizeof(objheader_t) + classsize[headptr->type]; } //Check if object is still present in the machine since the beginning of TRANS_REQUEST if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found objinfo[i].poss_val = OBJECT_NOT_FOUND; + if(i >= fixed->numread && (i < (fixed->nummod + fixed->numread))) { + oidmodnotfound[i - fixed->numread] = 1; //array keeps track of oids that are a subset of oidmod and found on machine + objmodnotfound++; + } //Save the oids not found for later use oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; objnotfound++; @@ -267,10 +312,8 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; write(acceptfd, &control, sizeof(char)); - //TODO when TRANS_DISAGREE is sent - //Free space allocated in main objstore - //Unlock objects that was locked in the trans - return 0; + printf("DEBUG -> Sending TRANS_DISAGREE\n"); + return control; } } else {//Obj is not locked , so lock object ((objheader_t *)mobj)->status |= LOCK; @@ -286,24 +329,34 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; write(acceptfd, &control, sizeof(char)); - return 0; + printf("DEBUG -> Sending TRANS_DISAGREE\n"); + return control; } } } } + printf("No of objs locked = %d\n", objlocked); + printf("No of v_nomatch = %d\n", v_nomatch); + printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock); + printf("No of objs v_match but had locks before = %d\n", v_matchlock); + printf("No of objs not found = %d\n", objnotfound); + printf("No of objs modified but not found = %d\n", objmodnotfound); + //Decide what control message(s) to send if(v_matchnolock == fixed->numread + fixed->nummod) { //send TRANS_AGREE to Coordinator control = TRANS_AGREE; write(acceptfd, &control, sizeof(char)); + printf("DEBUG -> Sending TRANS_AGREE\n"); } if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) { //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator control = TRANS_AGREE_BUT_MISSING_OBJECTS; write(acceptfd, &control, sizeof(char)); - //send missing oids and number of oids not found with it + printf("DEBUG -> Sending TRANS_AGREE_BUT_MISSING_OBJECTS\n"); + //send number of oids not found and the missing oids write(acceptfd, &objnotfound, sizeof(int)); write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound)); } @@ -312,19 +365,81 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha //send TRANS_SOFT_ABORT to Coordinator control = TRANS_SOFT_ABORT; write(acceptfd, &control, sizeof(char)); - //send missing oids and number of oids not found with it + printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); + //send number of oids not found and the missing oids write(acceptfd, &objnotfound, sizeof(int)); - write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound)); + if(objnotfound != 0) + write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound)); } - //TODO when TRANS_DISAGREE is sent - //Free space allocated in main objstore - //Unlock objects that was locked in the trans + //Do the following when TRANS_DISAGREE is sent if(control == TRANS_DISAGREE) { + //Set the reference count of the object to 1 in mainstore for garbage collection + ptr = modptr; + for(i = 0; i< fixed->nummod; i++) { + headptr = (objheader_t *) ptr; + headptr->rcount = 1; + ptr += sizeof(objheader_t) + classsize[headptr->type]; + } + //Unlock objects that was locked in the trans for(i = 0; i< objlocked ; i++) { mobj = mhashSearch(oidlocked[i]);// find the header address ((objheader_t *)mobj)->status &= ~(LOCK); } } + + // List of objects that were sent as modified in the TRANS_REQUEST but are now not found on the machine + nummodfound = fixed->nummod - objmodnotfound; + unsigned int oidmodfound[nummodfound]; + for(i = 0; i< fixed->nummod; i++) { + if(oidmodnotfound[i] == 0) { + oidmodfound[j] = oidmod[i]; + j++; + } + } + //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT + transinfo->objmod = oidmod; + transinfo->objlocked = oidlocked; + transinfo->modptr = modptr; + transinfo->nummod = fixed->nummod; + transinfo->numlocked = objlocked; + + return control; +} + +//Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back +int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { + objheader_t *header; + int i = 0, offset = 0; + char control; + //Process each modified object saved in the mainobject store + for(i=0; inummod; i++) { + if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { + printf("mhashserach returns NULL\n"); + } + //change reference count of older address and free space in objstr ?? + header->rcount = 1; //Not sure what would be th val + //change ptr address in mhash table + mhashRemove(transinfo->objmod[i]); + mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); + offset += sizeof(objheader_t) + classsize[header->type]; + //update object version + header = (objheader_t *) mhashSearch(transinfo->objmod[i]); + header->version += 1; + } + for(i=0; inumlocked; i++) { + //unlock objects + header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); + header->status &= ~(LOCK); + } + + //TODO Update location lookup table + + //send ack to coordinator + control = TRANS_SUCESSFUL; + if(send((int)acceptfd, &control, sizeof(char), 0) < 0) { + perror("Error sending ACK to coordinator\n"); + } + return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index 6e378e97..f7bf58cc 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -14,6 +14,7 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) { header = transCreateObj(record, type); tmp = (objheader_t *) objstrAlloc(mainobjstore, size); memcpy(tmp, header, size); +// printf("Insert oid = %d at address %x\n",tmp->oid, tmp); mhashInsert(tmp->oid, tmp); lhashInsert(tmp->oid, 1); //Lock oid 3 object diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 96bbdf9f..a04aa4e5 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -88,6 +88,7 @@ int decideResponse(thread_data_array_t *tdata, int sd) { //Free transaction records objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); + free(tdata->rec); //send Abort ctrl = TRANS_ABORT; if (write(sd, &ctrl, sizeof(char)) < 0) { @@ -105,17 +106,21 @@ int decideResponse(thread_data_array_t *tdata, int sd) { printf("DEBUG-> Inside TRANS_SOFT_ABORT\n"); transsoftabort++; //Read list of objects missing - read(sd, &oidcount, sizeof(int)); - N = oidcount * sizeof(unsigned int); - if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); + if(read(sd, &oidcount, sizeof(int)) != 0) { + if (oidcount == 0) { + sleep(2); + break; + } + N = oidcount * sizeof(unsigned int); + if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + } + ptr = (char *) oidnotfound[i]; + do { + n = read(sd, ptr+sum, N-sum); + sum += n; + } while(sum < N && n !=0); } - ptr = (char *) oidnotfound[i]; - do { - n = read(sd, ptr+sum, N-sum); - sum += n; - } while(sum < N && n !=0); - break; case TRANS_AGREE_BUT_MISSING_OBJECTS: @@ -151,6 +156,7 @@ int decideResponse(thread_data_array_t *tdata, int sd) { if(transagree == tdata->pilecount){ //Send Commit ctrl = TRANS_COMMIT; + printf("Sending TRANS_COMMIT\n"); if (write(sd, &ctrl, sizeof(char)) < 0) { perror("Error sending ctrl message for participant\n"); return 1; @@ -160,17 +166,29 @@ int decideResponse(thread_data_array_t *tdata, int sd) { if(transsoftabort > 0 && transdisagree == 0) { //Send abort but retry commit ctrl = TRANS_ABORT_BUT_RETRY_COMMIT; + printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n"); if (write(sd, &ctrl, sizeof(char)) < 0) { perror("Error sending ctrl message for participant\n"); return 1; } - //lookup objects and then retry commit + //TODO + //Relookup all missing objects //set up a new connection readClientReq() //rebuilt the pile and llookup table //i.e. wait at the participant end and then resend either agree or disagree } if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) { + //Send abort but retry commit + ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING; + printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n"); + if (write(sd, &ctrl, sizeof(char)) < 0) { + perror("Error sending ctrl message for participant\n"); + return 1; + } + //TODO //Relookup all missing objects + //set up a new connection readClientReq() + //rebuilt the pile and llookup table //send missing mising object/ objects } @@ -178,6 +196,7 @@ int decideResponse(thread_data_array_t *tdata, int sd) { for(i=0 ; i< tdata->pilecount; i++) { free(oidnotfound[i]); } + return 0; } @@ -211,20 +230,20 @@ void *transRequest(void *threadarg) { //Multiple writes for sending packets of data //Send first few fixed bytes of the TRANS_REQUEST protocol printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control); -// printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t)); +// printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t)); // printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes); if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) { perror("Error sending fixed bytes for thread"); return NULL; } //Send list of machines involved in the transaction -// printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); +// printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) { perror("Error sending list of machines for thread"); return NULL; } //Send oids and version number tuples for objects that are read -// printf("Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread); +// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread); // printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) { perror("Error sending tuples for thread"); @@ -258,7 +277,7 @@ void *transRequest(void *threadarg) { if(*(tdata->count) == tdata->pilecount) { pthread_cond_broadcast(tdata->threshold); //process the participant's request - if (decideResponse(tdata, sd) == 1) { + if (decideResponse(tdata, sd) != 0) { printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__); return NULL; } @@ -397,6 +416,7 @@ int transCommit(transrecord_t *record) { pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); free(listmid); + pDelete(pile); return 0; } @@ -443,6 +463,23 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { #ifdef DEBUG1 printf("DEBUG -> ready to rcv ...\n"); #endif + //Read response from the Participant + read(sd, &control, sizeof(char)); + switch(control) { + case OBJECT_NOT_FOUND: + return NULL; + break; + case OBJECT_FOUND: + read(sd, &size, sizeof(int)); + objcopy = objstrAlloc(record->cache, size); + read(sd, objcopy, size); + break; + default: + printf("Error in recv request from participant on a READ_REQUEST\n"); + return NULL; + } + +#if 0 read(sd, buffer, sizeof(buffer)); close(sd); if (buffer[0] == OBJECT_NOT_FOUND) { @@ -457,6 +494,8 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { } objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)buffer+1, size); + +#endif //Insert into cache's lookup table chashInsert(record->lookupTable, oid, objcopy); return objcopy; -- 2.34.1