Finished and tested complete TRANS_COMMIT process
authoradash <adash>
Mon, 2 Apr 2007 09:31:50 +0000 (09:31 +0000)
committeradash <adash>
Mon, 2 Apr 2007 09:31:50 +0000 (09:31 +0000)
other bug fixes
TODO: handle softabort and missing object cases i.e. trans abort but retry commit

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index cb3a9bd003d849fd31c3099e345358ac6da04edd..e990aae327b7322c62cc691028005fb55ce0307d 100644 (file)
 #define OBJECT_NOT_FOUND               11
 #define OBJECTS_FOUND                  12
 #define OBJECTS_NOT_FOUND              13
-#define OBJ_LOCKED_BUT_VERSION_MATCH   14
-#define OBJ_UNLOCK_BUT_VERSION_MATCH   15
-#define VERSION_NO_MATCH               16
 #define TRANS_AGREE                    17
 #define TRANS_DISAGREE                 18
 #define TRANS_AGREE_BUT_MISSING_OBJECTS        19
 #define TRANS_SOFT_ABORT               20
 #define TRANS_SUCESSFUL                        21
 
+//Control bits for status of objects in Machine pile
+#define OBJ_LOCKED_BUT_VERSION_MATCH   14
+#define OBJ_UNLOCK_BUT_VERSION_MATCH   15
+#define VERSION_NO_MATCH               16
+//TODO REMOVE THIS
+#define NO_MISSING_OIDS                        22
+#define MISSING_OIDS_PRESENT           23
+
+
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -106,6 +112,14 @@ typedef struct objinfo {
        int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) 
 }objinfo_t;
 
+// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process 
+typedef struct trans_commit_data{
+       unsigned int *objmod;
+       unsigned int *objlocked;
+       void *modptr;
+       int nummod;
+       int numlocked;
+}trans_commit_data_t;
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
@@ -123,8 +137,8 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes
 /* Prototypes for server portion */
 void *dstmListen();
 void *dstmAccept(void *);
-int readClientReq(int);
-int handleTransReq(int, fixed_data_t *, unsigned int *, char *, void *);
+int readClientReq(int, trans_commit_data_t *);
+char handleTransReq(int, fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *);
 /* end server portion */
 
 /* Prototypes for transactions */
@@ -135,6 +149,7 @@ int decideResponse(thread_data_array_t *tdata, int sd);// Coordinator decides wh
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
 int transCommit(transrecord_t *record); //return 0 if successful
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
+int transCommitProcess(trans_commit_data_t *, int);
 /* end transactions */
 
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
index b1a89d8e7ff98e63d9ba30a97562d4f6bb3fffc4..fdc23691d1eb2146a414dfa079c51591bf2e3262 100644 (file)
@@ -18,9 +18,7 @@ objstr_t *mainobjstore;
 
 int dstmInit(void)
 {
-       //todo:initialize main object store
-       //do we want this to be a global variable, or provide
-       //separate access funtions and hide the structure?
+       //Initialize main object store
        mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
@@ -80,10 +78,11 @@ void *dstmAccept(void *acceptfd)
 {
        int numbytes,i, val;
        unsigned int oid;
-       char buffer[RECEIVE_BUFFER_SIZE], control;
+       char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
        char *ptr;
        void *srcObj;
        objheader_t *h;
+       trans_commit_data_t transinfo;
        
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
@@ -91,18 +90,24 @@ void *dstmAccept(void *acceptfd)
        recv((int)acceptfd, &control, sizeof(char), 0);
        switch(control) {
                case READ_REQUEST:
+                       printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
                        recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
                        srcObj = mhashSearch(oid);
                        h = (objheader_t *) srcObj;
                        if (h == NULL) {
-                               buffer[0] = OBJECT_NOT_FOUND;
+                               ctrl = OBJECT_NOT_FOUND;
                        } else {
-                               buffer[0] = OBJECT_FOUND;
+                               ctrl = OBJECT_FOUND;
                                size = sizeof(objheader_t) + sizeof(classsize[h->type]);
-                               memcpy(buffer+1, srcObj, size);
-                       }
-                       if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
-                               perror("");
+                               if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
+                                       perror("Error sending control msg to coordinator\n");
+                               }
+                               if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
+                                       perror("Error sending size of object to coordinator\n");
+                               }
+                               if(send((int)acceptfd, h, size, 0) < 0) {
+                                       perror("Error in sending object\n");
+                               }
                        }
                        break;
                
@@ -119,8 +124,8 @@ void *dstmAccept(void *acceptfd)
                        break;
 
                case TRANS_REQUEST:
-                       //printf("DEBUG -> TRANS_REQUEST\n");
-                       if((val = readClientReq((int)acceptfd)) == 1) {
+                       printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
+                       if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
                                printf("Error in readClientReq\n");
                        }
                        break;
@@ -128,28 +133,6 @@ void *dstmAccept(void *acceptfd)
                default:
                        printf("Error receiving\n");
        }
-       
-       //Read for new control message from Coordiator
-       recv((int)acceptfd, &control, sizeof(char), 0);
-       switch(control) {
-               case TRANS_ABORT:
-                       printf("DEBUG -> TRANS_ABORT\n");
-                       write((int)acceptfd, &control, sizeof(char));
-                       break;
-
-               case TRANS_COMMIT:
-                       printf("DEBUG -> TRANS_COMMIT\n");
-                       write((int)acceptfd, &control, sizeof(char));
-                       //TODO 
-                       //change ptr address in mhash table
-                       //unlock objects
-                       //update object version
-                       //change reference count of older address??
-                       //free space in objstr ??
-                       //Update location lookup table
-                       break;
-       }
-
        if (close((int)acceptfd) == -1)
        {
                perror("close");
@@ -160,13 +143,14 @@ void *dstmAccept(void *acceptfd)
        pthread_exit(NULL);
 }
 
-int readClientReq(int acceptfd) {
-       char *ptr, control;
+int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
+       char *ptr, control, prevctrl, sendctrl;
        void *modptr;
        objheader_t *h, tmp_header;
        fixed_data_t fixed;
-       int sum = 0, N, n;
+       int sum = 0, N, n, val;
 
+       //Reads to process the TRANS_REQUEST protocol further
        // Read fixed_data
        N = sizeof(fixed) - 1;
        ptr = (char *)&fixed;;
@@ -204,7 +188,7 @@ int readClientReq(int acceptfd) {
 
        // Read modified objects
        if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
-       //      printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+               printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
                return 1;
        }
        sum = 0;
@@ -213,24 +197,79 @@ int readClientReq(int acceptfd) {
                //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
                sum += n;
        } while (sum < fixed.sum_bytes && n != 0);
+       
        //Send control message as per all votes from the particpants
-       handleTransReq(acceptfd, &fixed, listmid, objread, modptr);
+       if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
+               printf("Handle req error\n");
+       }
+               
+       //Read for new control message from Coordiator
+       recv((int)acceptfd, &control, sizeof(char), 0);
+       switch(control) {
+               case TRANS_ABORT:
+                       printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
+                       //send ack to coordinator
+                       sendctrl = TRANS_SUCESSFUL;
+                       if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+                               perror("Error sending ACK to coordinator\n");
+                       }
+                       break;
+
+               case TRANS_COMMIT:
+                       printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
+                       if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
+                               printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
+                       }
+                       break;
+               case TRANS_ABORT_BUT_RETRY_COMMIT:
+                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
+                       //Process again after waiting for sometime and on prev control message sent
+                       switch(prevctrl) {
+                               case TRANS_AGREE:
+                                       sleep(2);
+                                       break;
+                               case TRANS_AGREE_BUT_MISSING_OBJECTS:
+                                       break;
+                               case TRANS_SOFT_ABORT:
+                                       break;
+                       }
+                       //Try sending either agree or disagree after sometime
+                       //TODO
+                       //Wait in a blocking thread or something
+                       //Recv from client new listmid, mcount and pilecount
+                       //call 2 new functions that are similar to readClientReq and handleRequest
+                       break;
+               case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
+                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
+                       break;
+               default:
+                       printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
+                       break;
+       }
 
-       
-       
        return 0;
 }
 
 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
 //and returns the appropriate control message to the Ccordinator 
-int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) {
+char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
        short version;
-       char control, *ptr;
-       int i;
-       unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread];
-       int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent
+       char control = 0, ctrlmissoid, *ptr, *oidmodnotfound;
+       int i, j = 0;
+       unsigned int oid;
+       unsigned int *oidnotfound, *oidlocked, *oidmod;
+
+       oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
+       oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
+       oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
+       oidmodnotfound = (char *) calloc(fixed->nummod, sizeof(char));
+
+       // Counters and arrays to formulate decision on control message to be sent
+       int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+       int objmodnotfound = 0, nummodfound = 0;
        void *mobj;
        objheader_t *headptr;
+       //TODO remove this deadcode from here
        objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
        
        //Process each object present in the pile 
@@ -246,12 +285,18 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha
                } else {//Obj is modified
                        headptr = (objheader_t *) ptr;
                        oid = headptr->oid;
+                       oidmod[objmod] = oid;//Array containing modified oids
+                       objmod++;
                        version = headptr->version;
                        ptr += sizeof(objheader_t) + classsize[headptr->type];
                }
                //Check if object is still present in the machine since the beginning of TRANS_REQUEST
                if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
                        objinfo[i].poss_val = OBJECT_NOT_FOUND;
+                       if(i >= fixed->numread && (i < (fixed->nummod + fixed->numread))) {
+                               oidmodnotfound[i - fixed->numread] = 1; //array keeps track of oids that are a subset of oidmod and found on machine 
+                               objmodnotfound++;
+                       }
                        //Save the oids not found for later use
                        oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
                        objnotfound++;
@@ -267,10 +312,8 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha
                                        //send TRANS_DISAGREE to Coordinator
                                        control = TRANS_DISAGREE;
                                        write(acceptfd, &control, sizeof(char));
-                                       //TODO when TRANS_DISAGREE is sent
-                                       //Free space allocated in main objstore
-                                       //Unlock objects that was locked in the trans
-                                       return 0;
+                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
+                                       return control;
                                }
                        } else {//Obj is not locked , so lock object
                                ((objheader_t *)mobj)->status |= LOCK;
@@ -286,24 +329,34 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha
                                        //send TRANS_DISAGREE to Coordinator
                                        control = TRANS_DISAGREE;
                                        write(acceptfd, &control, sizeof(char));
-                                       return 0;
+                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
+                                       return control;
                                }
                        }
                }
        }
 
+       printf("No of objs locked = %d\n", objlocked);
+       printf("No of v_nomatch = %d\n", v_nomatch);
+       printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
+       printf("No of objs v_match but had locks before = %d\n", v_matchlock);
+       printf("No of objs not found = %d\n", objnotfound);
+       printf("No of objs modified but not found = %d\n", objmodnotfound);
+
        //Decide what control message(s) to send
        if(v_matchnolock == fixed->numread + fixed->nummod) {
                //send TRANS_AGREE to Coordinator
                control = TRANS_AGREE;
                write(acceptfd, &control, sizeof(char));
+               printf("DEBUG -> Sending TRANS_AGREE\n");
        }
        
        if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
                //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
                control = TRANS_AGREE_BUT_MISSING_OBJECTS;
                write(acceptfd, &control, sizeof(char));
-               //send missing oids  and number of oids not found with it
+               printf("DEBUG -> Sending TRANS_AGREE_BUT_MISSING_OBJECTS\n");
+               //send number of oids not found and the missing oids 
                write(acceptfd, &objnotfound, sizeof(int));
                write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
        }
@@ -312,19 +365,81 @@ int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, cha
                //send TRANS_SOFT_ABORT to Coordinator
                control = TRANS_SOFT_ABORT;
                write(acceptfd, &control, sizeof(char));
-               //send missing oids  and number of oids not found with it
+               printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+               //send number of oids not found and the missing oids 
                write(acceptfd, &objnotfound, sizeof(int));
-               write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
+               if(objnotfound != 0) 
+                       write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
        }
        
-       //TODO when TRANS_DISAGREE is sent
-       //Free space allocated in main objstore
-       //Unlock objects that was locked in the trans
+       //Do the following when TRANS_DISAGREE is sent
        if(control == TRANS_DISAGREE) {
+               //Set the reference count of the object to 1 in mainstore for garbage collection
+               ptr = modptr;
+               for(i = 0; i< fixed->nummod; i++) {
+                       headptr = (objheader_t *) ptr;
+                       headptr->rcount = 1;
+                       ptr += sizeof(objheader_t) + classsize[headptr->type];
+               }
+               //Unlock objects that was locked in the trans
                for(i = 0; i< objlocked ; i++) {
                        mobj = mhashSearch(oidlocked[i]);// find the header address
                        ((objheader_t *)mobj)->status &= ~(LOCK);               
                }       
        }       
+
+       // List of objects that were sent as modified in the TRANS_REQUEST but are now not found on the machine         
+       nummodfound = fixed->nummod - objmodnotfound;
+       unsigned int oidmodfound[nummodfound];
+       for(i = 0; i< fixed->nummod; i++) {
+               if(oidmodnotfound[i] == 0) {
+                       oidmodfound[j] = oidmod[i];
+                       j++;
+               } 
+       }
+       //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
+       transinfo->objmod = oidmod;
+       transinfo->objlocked = oidlocked;
+       transinfo->modptr = modptr;
+       transinfo->nummod = fixed->nummod;
+       transinfo->numlocked = objlocked;
+       
+       return control;
+}
+
+//Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
+int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
+       objheader_t *header;
+       int i = 0, offset = 0;
+       char control;
+       //Process each modified object saved in the mainobject store
+       for(i=0; i<transinfo->nummod; i++) {
+               if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+                       printf("mhashserach returns NULL\n");
+               }
+               //change reference count of older address and free space in objstr ??
+               header->rcount = 1; //Not sure what would be th val
+               //change ptr address in mhash table
+               mhashRemove(transinfo->objmod[i]);
+               mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+               offset += sizeof(objheader_t) + classsize[header->type];
+               //update object version
+               header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+               header->version += 1; 
+       }
+       for(i=0; i<transinfo->numlocked; i++) {
+               //unlock objects
+               header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+               header->status &= ~(LOCK);
+       }
+
+       //TODO Update location lookup table
+
+       //send ack to coordinator
+       control = TRANS_SUCESSFUL;
+       if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
+               perror("Error sending ACK to coordinator\n");
+       }
+
        return 0;
 }
index 6e378e976e2468c06a72fae7c686e5f22f7434f1..f7bf58cca2538dacb5b39faf24491abd973ab44b 100644 (file)
@@ -14,6 +14,7 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) {
        header = transCreateObj(record, type);
        tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
        memcpy(tmp, header, size);
+//     printf("Insert oid = %d at address %x\n",tmp->oid, tmp);
        mhashInsert(tmp->oid, tmp);
        lhashInsert(tmp->oid, 1);
        //Lock oid 3 object
index 96bbdf9f5728190f947cbb6dd0497d4080741f4c..a04aa4e5d7ba70216b07579c7662b1e4dcc6f2c3 100644 (file)
@@ -88,6 +88,7 @@ int decideResponse(thread_data_array_t *tdata, int sd) {
                                //Free transaction records
                                objstrDelete(tdata->rec->cache);
                                chashDelete(tdata->rec->lookupTable);
+                               free(tdata->rec);
                                //send Abort
                                ctrl = TRANS_ABORT;
                                if (write(sd, &ctrl, sizeof(char)) < 0) {
@@ -105,17 +106,21 @@ int decideResponse(thread_data_array_t *tdata, int sd) {
                                printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
                                transsoftabort++;
                                //Read list of objects missing
-                               read(sd, &oidcount, sizeof(int));
-                               N = oidcount * sizeof(unsigned int);
-                               if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
-                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                               if(read(sd, &oidcount, sizeof(int)) != 0) {
+                                       if (oidcount == 0) {
+                                               sleep(2);
+                                               break;
+                                       }
+                                       N = oidcount * sizeof(unsigned int);
+                                       if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+                                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                                       }
+                                       ptr = (char *) oidnotfound[i];
+                                       do {
+                                               n = read(sd, ptr+sum, N-sum);
+                                               sum += n;
+                                       } while(sum < N && n !=0);
                                }
-                               ptr = (char *) oidnotfound[i];
-                               do {
-                                       n = read(sd, ptr+sum, N-sum);
-                                       sum += n;
-                               } while(sum < N && n !=0);
-
                                break;
                                
                        case TRANS_AGREE_BUT_MISSING_OBJECTS:
@@ -151,6 +156,7 @@ int decideResponse(thread_data_array_t *tdata, int sd) {
        if(transagree == tdata->pilecount){
                //Send Commit
                ctrl = TRANS_COMMIT;
+               printf("Sending TRANS_COMMIT\n");
                if (write(sd, &ctrl, sizeof(char)) < 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
@@ -160,17 +166,29 @@ int decideResponse(thread_data_array_t *tdata, int sd) {
        if(transsoftabort > 0 && transdisagree == 0) {
                //Send abort but retry commit
                ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
+               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
                if (write(sd, &ctrl, sizeof(char)) < 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
-               //lookup objects and then retry commit 
+               //TODO
+               //Relookup all missing objects
                //set up a new connection readClientReq()
                //rebuilt the pile and llookup table
                //i.e. wait at the participant end and then resend either agree or disagree
        }
        if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) {
+               //Send abort but retry commit
+               ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
+               if (write(sd, &ctrl, sizeof(char)) < 0) {
+                       perror("Error sending ctrl message for participant\n");
+                       return 1;
+               }
+               //TODO
                //Relookup all missing objects
+               //set up a new connection readClientReq()
+               //rebuilt the pile and llookup table
                //send missing mising object/ objects
        }
        
@@ -178,6 +196,7 @@ int decideResponse(thread_data_array_t *tdata, int sd) {
        for(i=0 ; i< tdata->pilecount; i++) {
                free(oidnotfound[i]);
        }
+       
 
        return 0;
 }
@@ -211,20 +230,20 @@ void *transRequest(void *threadarg) {
        //Multiple writes for sending packets of data 
        //Send first few fixed bytes of the TRANS_REQUEST protocol
        printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
-//     printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
+//     printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
 //     printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
        if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
                perror("Error sending fixed bytes for thread");
                return NULL;
        }
        //Send list of machines involved in the transaction
-//     printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
+//     printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
        if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
                perror("Error sending list of machines for thread");
                return NULL;
        }
        //Send oids and version number tuples for objects that are read
-//     printf("Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
+//     printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
 //     printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
        if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
                perror("Error sending tuples for thread");
@@ -258,7 +277,7 @@ void *transRequest(void *threadarg) {
        if(*(tdata->count) == tdata->pilecount) {
                pthread_cond_broadcast(tdata->threshold);
                //process the participant's request
-               if (decideResponse(tdata, sd) == 1) {
+               if (decideResponse(tdata, sd) != 0) {
                        printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
                        return NULL;
                }
@@ -397,6 +416,7 @@ int transCommit(transrecord_t *record) {
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
        free(listmid);
+       pDelete(pile);
        return 0;
 }
 
@@ -443,6 +463,23 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
 #ifdef DEBUG1
        printf("DEBUG -> ready to rcv ...\n");
 #endif
+       //Read response from the Participant
+       read(sd, &control, sizeof(char));
+       switch(control) {
+               case OBJECT_NOT_FOUND:
+                       return NULL;
+                       break;
+               case OBJECT_FOUND:
+                       read(sd, &size, sizeof(int));
+                       objcopy = objstrAlloc(record->cache, size);
+                       read(sd, objcopy, size);                
+                       break;
+               default:
+                       printf("Error in recv request from participant on a READ_REQUEST\n");
+                       return NULL;
+       }
+
+#if 0
        read(sd, buffer, sizeof(buffer));
        close(sd);
        if (buffer[0] == OBJECT_NOT_FOUND) {
@@ -457,6 +494,8 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        }
        objcopy = objstrAlloc(record->cache, size);
        memcpy(objcopy, (void *)buffer+1, size);
+
+#endif
        //Insert into cache's lookup table
        chashInsert(record->lookupTable, oid, objcopy); 
        return objcopy;