Various bug fixes
authoradash <adash>
Thu, 19 Apr 2007 07:39:38 +0000 (07:39 +0000)
committeradash <adash>
Thu, 19 Apr 2007 07:39:38 +0000 (07:39 +0000)
Able to run multithreaded versions on 2 diff. machines

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/testclient.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 1a714de00bc229e21d98fa73e0e44e3deb75ff36..7b693293e92d3a60e4be73fc6972b1d6acae3227 100644 (file)
@@ -1,8 +1,11 @@
 client:
-       gcc -lpthread -g -O0 -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
 server:
-       gcc -lpthread -g -O0 -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+all:
+       gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
 
 clean:
        rm client server
index 5b791c35d070015b0ca42b853b7496ff98f70626..5d292c9d8b2c4f2ec0a7eb91b42e2257d1bbbaf6 100644 (file)
@@ -76,7 +76,7 @@ void *dstmListen()
 
 void *dstmAccept(void *acceptfd)
 {
-       int numbytes,i, val;
+       int numbytes,i, val, retval;
        unsigned int oid;
        char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
        char *ptr;
@@ -87,11 +87,17 @@ void *dstmAccept(void *acceptfd)
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
        printf("Recieved connection: fd = %d\n", (int)acceptfd);
-       recv((int)acceptfd, &control, sizeof(char), 0);
+       if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
+               perror("Error in receiving control from coordinator\n");
+               return;
+       }
        switch(control) {
                case READ_REQUEST:
                        printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
-                       recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+                       if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
+                               perror("Error receiving object from cooridnator\n");
+                               return;
+                       }
                        srcObj = mhashSearch(oid);
                        h = (objheader_t *) srcObj;
                        size = sizeof(objheader_t) + sizeof(classsize[h->type]);
@@ -145,7 +151,7 @@ void *dstmAccept(void *acceptfd)
                        break;
 
                default:
-                       printf("Error receiving\n");
+                       printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
        }
        if (close((int)acceptfd) == -1)
        {
@@ -166,7 +172,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        void *modptr, *header;
        objheader_t *tmp_header;
        fixed_data_t fixed;
-       int sum = 0, i, N, n, val;
+       int sum = 0, i, N, n, val, retval;
 
        //Reads to process the TRANS_REQUEST protocol further
        // Read fixed_data
@@ -196,33 +202,43 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        int numread = fixed.numread;
        N = numread * (sizeof(unsigned int) + sizeof(short));
        char objread[N];
-       sum = 0;
-       do {
-               n = recv((int)acceptfd, (void *) objread, N, 0);
-       //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
-               sum += n;
-       } while(sum < N && n != 0);
-       //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
-
-       // Read modified objects
-       if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
-               printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
-               return 1;
+       if(numread != 0) { // If pile contains objects to be read 
+       //      N = numread * (sizeof(unsigned int) + sizeof(short));
+       //      char objread[N];
+               sum = 0;
+               do {
+                       n = recv((int)acceptfd, (void *) objread, N, 0);
+               //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
+                       sum += n;
+               } while(sum < N && n != 0);
+//             printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
        }
-       sum = 0;
-       do {
-               n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
-               //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
-               sum += n;
-       } while (sum < fixed.sum_bytes && n != 0);
        
+       // Read modified objects
+       if(fixed.nummod != 0) { // If pile contains modified objects 
+               if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
+                       printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+                       return 1;
+               }
+               sum = 0;
+               do { // Recv the objs that are modified at Coordinator
+                       n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
+               //      printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
+                       sum += n;
+               } while (sum < fixed.sum_bytes && n != 0);
+       }
+
        //Send control message as per all votes from all oids in the machine
        if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
                printf("Handle req error\n");
        }
-               
+
        //Read for new control message from Coordiator
-       recv((int)acceptfd, &control, sizeof(char), 0);
+       if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
+               perror("Error in receiving control message");
+               return 1;
+       }
+
        switch(control) {
                case TRANS_ABORT:
                        printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
@@ -230,6 +246,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                        sendctrl = TRANS_SUCESSFUL;
                        if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
                                perror("Error sending ACK to coordinator\n");
+                               return 1;
                        }
                        //Mark all ref counts as 1 and do garbage collection
                        ptr = modptr;
@@ -292,10 +309,10 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                                                        perror("Error sending ACK to coordinator\n");
                                                }
                                        }
-                                               
+
                                        break;
                        }
-                       
+
                        break;
                case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
                        //TODO expect another transrequest from client
@@ -313,6 +330,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
 //and returns the appropriate control message to the Ccordinator 
 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
+       int val;
        short version;
        char control = 0, ctrlmissoid, *ptr;
        int i, j = 0;
@@ -331,7 +349,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        
        //Process each object present in the pile 
        ptr = modptr;
-       printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
+       //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
        fflush(stdout);
        //Process each oid in the machine pile/ group
        for (i = 0; i < fixed->numread + fixed->nummod; i++) {
@@ -363,7 +381,10 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                        v_nomatch++;
                                        //send TRANS_DISAGREE to Coordinator
                                        control = TRANS_DISAGREE;
-                                       write(acceptfd, &control, sizeof(char));
+                                       if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+                                               perror("Error in sending control to the Coordinator\n");
+                                               return 0;
+                                       }
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                        return control;
                                }
@@ -371,7 +392,6 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                ((objheader_t *)mobj)->status |= LOCK;
                                //Save all object oids that are locked on this machine during this transaction request call
                                oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
-                               printf("DEBUG-> Object to be locked is %d\n", ((objheader_t *)mobj)->oid);
                                objlocked++;
                                if (version == ((objheader_t *)mobj)->version) { //If versions match
                                        v_matchnolock++;
@@ -379,7 +399,10 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                        v_nomatch++;
                                        //send TRANS_DISAGREE to Coordinator
                                        control = TRANS_DISAGREE;
-                                       write(acceptfd, &control, sizeof(char));
+                                       if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+                                               perror("Error in sending control to the Coordinator\n");
+                                               return 0;
+                                       }
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                        return control;
                                }
@@ -387,30 +410,43 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                }
        }
 
-       printf("No of objs locked = %d\n", objlocked);
-       printf("No of v_nomatch = %d\n", v_nomatch);
-       printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
-       printf("No of objs v_match but had locks before = %d\n", v_matchlock);
-       printf("No of objs not found = %d\n", objnotfound);
-       printf("No of objs modified but not found = %d\n", objmodnotfound);
+       //printf("No of objs locked = %d\n", objlocked);
+       //printf("No of v_nomatch = %d\n", v_nomatch);
+       //printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
+       //printf("No of objs v_match but had locks before = %d\n", v_matchlock);
+       //printf("No of objs not found = %d\n", objnotfound);
+       //printf("No of objs modified but not found = %d\n", objmodnotfound);
 
        //Decide what control message(s) to send
        if(v_matchnolock == fixed->numread + fixed->nummod) {
                //send TRANS_AGREE to Coordinator
                control = TRANS_AGREE;
-               write(acceptfd, &control, sizeof(char));
+               if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+                       perror("Error in sending control to Coordinator\n");
+                       return 0;
+               }
                printf("DEBUG -> Sending TRANS_AGREE\n");
        }
 
        if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
                //send TRANS_SOFT_ABORT to Coordinator
                control = TRANS_SOFT_ABORT;
-               write(acceptfd, &control, sizeof(char));
+               if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
+                       perror("Error in sending control back to coordinator\n");
+                       return 0;
+               }
                printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
                //send number of oids not found and the missing oids 
-               write(acceptfd, &objnotfound, sizeof(int));
-               if(objnotfound != 0) 
-                       write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
+               if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
+                       perror("Error in sending no of objects that are not found\n");
+                       return 0;
+               }
+               if(objnotfound != 0) { 
+                       if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
+                               perror("Error in sending objects that are not found\n");
+                               return 0;
+                       }
+               }
        }
        
        //Do the following when TRANS_DISAGREE is sent
index 971905425ea59e38e908b9bb1404b6c48c231788..7bf85693cfc521b45437926a188898819b42a3ee 100644 (file)
@@ -48,6 +48,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                                offset += sizeof(unsigned int);
                                memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
                                tmp->numread = tmp->numread + 1;
+                       //      printf("DEBUG->pInsert() No of obj read = %d\n", tmp->numread);
                        }
                        found = 1;
                        break;
@@ -67,13 +68,13 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                } else {
                        ptr->oidread[ptr->numread] = headeraddr->oid;
                        memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int));
-                       //printf("DEBUG -> objread oid is %d\n", *(ptr->objread));
                        memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
                        ptr->numread = ptr->numread + 1;
                }
                ptr->next = pile;
                pile = ptr;
        }
+
        return pile;
 }
 
index 14c573b567433b415809a5839623fbc55abc84f2..688d44a3bfa76a6f393e0b9799874398fb5f37bf 100644 (file)
@@ -1,10 +1,11 @@
 #include<stdio.h>
+#include<pthread.h>
 #include "dstm.h"
 #include "llookup.h"
 #include "ip.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
+//#include <sys/socket.h>
+//#include <netinet/in.h>
+//#include <arpa/inet.h>
 
 extern objstr_t *mainobjstore;
 //extern lhashtable_t llookup;         //Global Hash table
@@ -17,7 +18,6 @@ int test2(void);
 
 unsigned int createObjects(transrecord_t *record) {
        objheader_t *header, *tmp;
-       struct sockaddr_in antelope;
        unsigned int size, mid;
        int i = 0;
        for(i = 20 ; i< 23; i++) {
@@ -33,11 +33,9 @@ unsigned int createObjects(transrecord_t *record) {
                header = (objheader_t *) objstrAlloc(mainobjstore, size);
                memcpy(header, tmp, size);
                mhashInsert(header->oid, header);
-               //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
-               //mid = iptoMid(inet_ntoa(antelope.sin_addr));
-               mid = iptoMid("127.0.0.1");
+               mid = iptoMid("128.200.9.27");//machine d-2
+               printf("DEBUG -> createObjects mid is %x\n", mid);
                lhashInsert(header->oid, mid);
-       //      lhashInsert(header->oid, 1);
        }
        //      printf("Insert oid = %d at address %x\n",tmp->oid, tmp);
        size = sizeof(objheader_t) + classsize[0] ;
@@ -49,9 +47,7 @@ unsigned int createObjects(transrecord_t *record) {
        header->status = 0;
        header->status |= NEW;
        mhashInsert(header->oid, header);
-       //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
-       //mid = iptoMid(inet_ntoa(antelope.sin_addr));
-       mid = iptoMid("127.0.0.1");
+       mid = iptoMid("128.200.9.27");
        lhashInsert(header->oid, mid);
        size = sizeof(objheader_t) + classsize[1] ;
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
@@ -62,9 +58,7 @@ unsigned int createObjects(transrecord_t *record) {
        header->status = 0;
        header->status |= LOCK;
        mhashInsert(header->oid, header);
-       //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
-       //mid = iptoMid(inet_ntoa(antelope.sin_addr));
-       mid = iptoMid("127.0.0.1");
+       mid = iptoMid("128.200.9.27");
        lhashInsert(header->oid, mid);
        size = sizeof(objheader_t) + classsize[2] ;
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
@@ -75,9 +69,7 @@ unsigned int createObjects(transrecord_t *record) {
        header->status = 0;
        header->status |= LOCK;
        mhashInsert(header->oid, header);
-       //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
-       //mid = iptoMid(inet_ntoa(antelope.sin_addr));
-       mid = iptoMid("127.0.0.1");
+       mid = iptoMid("128.200.9.27");
        lhashInsert(header->oid, mid);
        return 0;
 }
@@ -207,20 +199,53 @@ int test4(void) {
 //trans commit 
 int test5(void) {
        transrecord_t *record;
-       unsigned int mid;
+       objheader_t *header;
+       unsigned int size, mid;
+       pthread_t thread_Listen;
+       pthread_attr_t attr;
        objheader_t *h1,*h2, *h3, *h4;
 
        dstmInit();
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
        record = transStart();
        printf("DEBUG -> Init done\n");
-       mid = iptoMid("127.0.0.1");     
+       mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu    
        lhashInsert(1,mid);
        lhashInsert(2,mid);
        lhashInsert(3,mid);
        lhashInsert(4,mid);
        lhashInsert(5,mid);
        lhashInsert(6,mid);
-       createObjects(record);
+       pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+       
+       //Create and Insert Oid 20
+       size = sizeof(objheader_t) + classsize[2] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       header->oid = 20;
+       header->type = 2;
+       header->version = 1;
+       header->rcount = 0; //? not sure how to handle this yet
+       header->status = 0;
+       header->status |= NEW;
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.27");
+       lhashInsert(header->oid, mid);
+
+       //Create and Insert Oid 21
+       size = sizeof(objheader_t) + classsize[1] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       header->oid = 21;
+       header->type = 1;
+       header->version = 1;
+       header->rcount = 0; //? not sure how to handle this yet
+       header->status = 0;
+       header->status |= NEW;
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.27");
+       lhashInsert(header->oid, mid);
        //read object 1
        if((h1 = transRead(record, 1)) == NULL){
                printf("Object not found\n");
@@ -239,4 +264,5 @@ int test5(void) {
        }
        
        transCommit(record);
+       pthread_join(thread_Listen, NULL);
 }
index bf1d57f551cc2aaa6c01ae7fd3a66b6d8d2d30be..46f17bb14dad2eddf93b5835e426470436615b3a 100644 (file)
@@ -19,7 +19,7 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) {
        tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
        memcpy(tmp, header, size);
        mhashInsert(tmp->oid, tmp);
-       mid = iptoMid("127.0.0.1");
+       mid = iptoMid("128.200.9.10");
        lhashInsert(tmp->oid, mid);
        //Lock oid 3 object
 //     if(tmp->oid == 3)
@@ -79,7 +79,8 @@ int test2() {
        pthread_t thread_Listen;
 
        dstmInit();     
-       mid = iptoMid("127.0.0.1");
+       mid = iptoMid("128.200.9.27");
+       //Inserting into lhashtable
        lhashInsert(20, mid);
        lhashInsert(21, mid);
        lhashInsert(22, mid);
@@ -117,6 +118,4 @@ int test2() {
                printf("Error transCreateObj6");
        }
        pthread_join(thread_Listen, NULL);
-
-
 }
index d4d9bb17b08d8ecc16a7355087e8120f979eef56..369b34537d2b38e1f3ec97e6fc2faa508e54d17e 100644 (file)
@@ -38,7 +38,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
-               printf("oid not found in local cache\n");
+               printf("oid is found in Local mlookup\n");
                tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                //Copy into cache
@@ -49,11 +49,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objcopy);
        } else {
                //Get the object from the remote location
-               //printf("oid not found in local machine lookup\n");
-               printf("machinenumber = %d\n",machinenumber);
-               printf("oid = %d\n",oid);
+               printf("oid is found in remote machine\n");
                machinenumber = lhashSearch(oid);
-               printf("machinenumber = %d\n",machinenumber);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
@@ -79,11 +76,12 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
 }
 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
-       int i, n, N, sum, oidcount = 0;
+       int i, n, N, sum, retval, oidcount = 0;
        int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
        char ctrl, control, *ptr;
        unsigned int *oidnotfound;
        objheader_t *header;
+       
 
        //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
@@ -99,9 +97,11 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                                free(tdata->rec);
                                //send Abort
                                ctrl = TRANS_ABORT;
-                               if (write(sd, &ctrl, sizeof(char)) < 0) {
-                                       perror("Error sending ctrl message for participant\n");
-                                       return 1;
+                               for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
+                                       if (write(sd, &ctrl, sizeof(char)) < 0) {
+                                               perror("Error sending ctrl message for participant\n");
+                                               return 1;
+                                       }
                                }
                                return 0;
 
@@ -145,24 +145,29 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                //Send Commit
                ctrl = TRANS_COMMIT;
                printf("Sending TRANS_COMMIT\n");
-               if (write(sd, &ctrl, sizeof(char)) < 0) {
+               if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
+               //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
        }
 
        if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
                //Send abort but retry commit
                ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
                printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
-               if (write(sd, &ctrl, sizeof(char)) < 0) {
+               if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
-               //Sleep
+               //Sleep and the resend the request
                sleep(5);
                //Read new control message from Participant
-               n = read(sd, &control, sizeof(char));
+
+               if((n = read(sd, &control, sizeof(char))) <= 0) {
+                       perror("No bytes are read for participant\n");
+                       return 1;
+               }
                
                //Update common data structure and increment count
                tdata->recvmsg[tdata->thread_id].rcv_status = control;
@@ -171,17 +176,17 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
        }
 
        if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
-               //Send abort but retry commit after relloking up objects
-               //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+               //Send abort but retry commit after relooking up objects
                ctrl = TRANS_ABORT;
                printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
-               if (write(sd, &ctrl, sizeof(char)) < 0) {
+               if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
                //TODO
                //Relook up objects
                //update location table
+               
                //Free pointers
                free(oidnotfound);
        }
@@ -200,7 +205,6 @@ void *transRequest(void *threadarg) {
        char machineip[16];
 
        tdata = (thread_data_array_t *) threadarg;
-       printf("DEBUG -> New thread id %d\n", tdata->thread_id);
        //Send Trans Request
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST");
@@ -213,7 +217,6 @@ void *transRequest(void *threadarg) {
        midtoIP(tdata->mid,machineip);
        machineip[15] = '\0';
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
-       //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
 
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST");
@@ -237,7 +240,7 @@ void *transRequest(void *threadarg) {
        }
        //Send oids and version number tuples for objects that are read
 //     printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-//     printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
+//     printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
        if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
                perror("Error sending tuples for thread");
                return NULL;
@@ -245,7 +248,6 @@ void *transRequest(void *threadarg) {
        //Send objects that are modified
        for(i = 0; i < tdata->buffer->f.nummod ; i++) {
                headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-//             printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
                if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
                        perror("Error sending obj modified for thread");
                        return NULL;
@@ -253,9 +255,11 @@ void *transRequest(void *threadarg) {
        }
        
        //Read message  control message from participant side
-       n = read(sd, &control, sizeof(char));
+       if((n = read(sd, &control, sizeof(char))) <= 0) {
+               perror("Error in reading control message from Participant\n");
+               return NULL;
+       }
        recvcontrol = control;
-       printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
        
        //Update common data structure and increment count
        tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
@@ -266,15 +270,16 @@ void *transRequest(void *threadarg) {
        
        if(*(tdata->count) == tdata->pilecount) {
                pthread_cond_broadcast(tdata->threshold);
-               //process the participant's request
-               if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
-                       printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(tdata->lock);
-                       return NULL;
-               }
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
        }       
+
+       //process the participant's request
+       if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
+               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+               pthread_mutex_unlock(tdata->lock);
+               return NULL;
+       }
        pthread_mutex_unlock(tdata->lock);
 
        close(sd);
@@ -292,6 +297,7 @@ int transCommit(transrecord_t *record) {
        char buffer[RECEIVE_BUFFER_SIZE],control;
        char transid[TID_LEN];
        static int newtid = 0;
+       trans_req_data_t *tosend;
 
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
@@ -337,7 +343,10 @@ int transCommit(transrecord_t *record) {
        pthread_cond_t tcond;
        pthread_mutex_t tlock;
        pthread_mutex_t tlshrd;
-       thread_data_array_t thread_data_array[pilecount];
+       //thread_data_array_t thread_data_array[pilecount];
+       thread_data_array_t *thread_data_array;
+
+       thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
        
        thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
                
@@ -356,10 +365,9 @@ int transCommit(transrecord_t *record) {
        pListMid(pile, listmid);
        //Process each machine group
        while(tmp != NULL) {
-               printf("DEBUG -> Created thread %d... \n", numthreads);
                //Create transaction id
                newtid++;
-               trans_req_data_t *tosend;
+               //trans_req_data_t *tosend;
                if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                        return 1;
@@ -388,9 +396,10 @@ int transCommit(transrecord_t *record) {
                        perror("Error in pthread create");
                        return 1;
                }               
+
                numthreads++;           
                //TODO frees 
-               free(tosend);
+               //free(tosend);
                tmp = tmp->next;
        }
 
@@ -408,6 +417,9 @@ int transCommit(transrecord_t *record) {
        //Free resources        
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
+//     for(i = 0 ;i< pilecount ;i++) {
+               free(tosend);
+//     }
        free(listmid);
        pDelete(pile);
        return 0;
@@ -415,7 +427,7 @@ int transCommit(transrecord_t *record) {
 
 //mnun will be used to represent the machine IP address later
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
-       int sd, size;
+       int sd, size, val;
        struct sockaddr_in serv_addr;
        struct hostent *server;
        char control;
@@ -424,7 +436,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        void *objcopy;
 
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket");
+               perror("Error in socket\n");
                return NULL;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
@@ -436,14 +448,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
 
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect");
+               perror("Error in connect\n");
                return NULL;
        }
        char readrequest[sizeof(char)+sizeof(unsigned int)];
        readrequest[0] = READ_REQUEST;
        *((unsigned int *)(&readrequest[1])) = oid;
        if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
-               perror("Error sending message");
+               perror("Error sending message\n");
                return NULL;
        }
 
@@ -451,15 +463,24 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        printf("DEBUG -> ready to rcv ...\n");
 #endif
        //Read response from the Participant
-       read(sd, &control, sizeof(char));
+       if((val = read(sd, &control, sizeof(char))) <= 0) {
+               perror("No control response for getRemoteObj sent\n");
+               return NULL;
+       }
        switch(control) {
                case OBJECT_NOT_FOUND:
                        return NULL;
                        break;
                case OBJECT_FOUND:
-                       read(sd, &size, sizeof(int));
+                       if((val = read(sd, &size, sizeof(int))) <= 0) {
+                               perror("No size is read from the participant\n");
+                               return NULL;
+                       }
                        objcopy = objstrAlloc(record->cache, size);
-                       read(sd, objcopy, size);                
+                       if((val = read(sd, objcopy, size)) <= 0) {
+                               perror("No objects are read from the remote participant\n");
+                               return NULL;
+                       }
                        //Insert into cache's lookup table
                        chashInsert(record->lookupTable, oid, objcopy); 
                        break;
@@ -467,5 +488,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                        printf("Error in recv request from participant on a READ_REQUEST\n");
                        return NULL;
        }
+       close(sd);
        return objcopy;
 }