Buggy code, first packet lost during communication between server and client
authoradash <adash>
Thu, 29 Mar 2007 08:09:51 +0000 (08:09 +0000)
committeradash <adash>
Thu, 29 Mar 2007 08:09:51 +0000 (08:09 +0000)
TODO: fix receiving and decoding control bit at server

Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/testclient.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index ae334255570cf5ec56aa9a7417d1b981d9e6f838..efa0efb79d6b94c499cf7cf27100d3637ccc7886 100644 (file)
@@ -85,17 +85,13 @@ void *dstmAccept(void *acceptfd)
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
        printf("Recieved connection: fd = %d\n", (int)acceptfd);
-       numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
-       if (numbytes == -1)
-       {
-               perror("recv");
-               pthread_exit(NULL);
-       }
-       else
+       while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0) 
        {
+               printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes);
                control = buffer[0];
                switch(control) {
                        case READ_REQUEST:
+                               printf("DEBUG -> READ_REQUEST\n");
                                oid = *((int *)(buffer+1));
 #ifdef DEBUG1
                                printf("DEBUG -> Received oid is %d\n", oid);
@@ -118,29 +114,35 @@ void *dstmAccept(void *acceptfd)
                                }
                                break;
                        case READ_MULT_REQUEST:
+                               printf("DEBUG-> READ_MULT_REQUEST\n");
                                break;
                        case MOVE_REQUEST:
+                               printf("DEBUG -> MOVE_REQUEST\n");
                                break;
                        case MOVE_MULT_REQUEST:
+                               printf("DEBUG -> MOVE_MULT_REQUEST\n");
                                break;
                        case TRANS_REQUEST:
+                               printf("DEBUG -> TRANS_REQUEST\n");
                                printf("Client sent %d\n",buffer[0]);
-                               int offset = 1;
-                               printf("Num Read  %d\n",*((short*)(buffer+offset)));
-                               offset += sizeof(short);
-                               printf("Num modified  %d\n",*((short*)(buffer+offset)));
-                               handleTransReq(acceptfd, buffer);
+       //                      handleTransReq(acceptfd, buffer);
                                break;
                        case TRANS_ABORT:
+                               printf("DEBUG -> TRANS_ABORT\n");
                                break;
                        case TRANS_COMMIT:
+                               printf("DEBUG -> TRANS_COMMIT\n");
+                               printf("Client sent %d\n",buffer[0]);
+                               //TODO copy the objects into the machine 
+                               /*copy the object into the object store from its old 
+                                 location in the objstore(pointer to its header is already stored before)*/
                                break;
                        default:
                                printf("Error receiving");
                }
                //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
                //printf("%s", buffer);
-       }
+       } 
        if (close((int)acceptfd) == -1)
        {
                perror("close");
@@ -151,6 +153,7 @@ void *dstmAccept(void *acceptfd)
 }
 
 //TOOD put __FILE__ __LINE__ for all error conditions
+#if 0
 int handleTransReq(int acceptfd, char *buf) {
        short numread = 0, nummod = 0;
        char control;
@@ -159,11 +162,12 @@ int handleTransReq(int acceptfd, char *buf) {
        objheader_t *headptr = NULL;
        objstr_t *tmpholder;
        void *top, *mobj;
+       
        char sendbuf[RECEIVE_BUFFER_SIZE];
 
        control = buf[0];
-       offset = 1;
-       numread = *((short *)(buf+offset));
+       offset = sizeof(fixed_data_t);
+       list = *((short *)(buf+offset));
        offset += sizeof(short);
        nummod = *((short *)(buf+offset));
        offset += sizeof(short);
@@ -303,17 +307,25 @@ int handleTransReq(int acceptfd, char *buf) {
                        offset += size;
                }
        }
+       /*
        if(transabort > 0) {
                sendbuf[0] = TRANS_DISAGREE_ABORT;
                if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
                        perror("");
                }
        
-       } else {
+       } else if(transagree == numread+nummod) {
                sendbuf[0] = TRANS_AGREE;
                if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
                        perror("");
                }
+       } else {
+               sendbuf[0] = TRANS_DISAGREE;
+               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                       perror("");
+               }
        }
+       */
        return 0;
 }
+#endif
index 15236d518582ec2e74be2ec2e722c89beb7803c4..47c3e01293664061be05b6432981932cfb9de5d7 100644 (file)
@@ -96,6 +96,7 @@ int pListMid(plistnode_t *pile, unsigned int *list) {
        while (tmp != NULL) {
                list[i] = tmp->mid;
                i++;
+               tmp = tmp->next;
        }
        return 0;
 }
index 0a1715b941c4a2b14344e3327b7385e607a85d12..6d1c34e1c3e1f9a7c160e388223718180e943baf 100644 (file)
@@ -3,10 +3,18 @@
 
 int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};      
 
-int main() {
+int test1(void);
+int test2(void);
+
+int main() 
+{
+       test2();
+}
+
+int test1(void) {
 
        transrecord_t *record;
-       objheader_t *h1,*h2,*h3,*h4;
+       objheader_t *h1,*h2,*h3,*h4,*h5, *h6;
 
        dstmInit();
        record = transStart();
@@ -23,6 +31,34 @@ int main() {
        printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
        h3 = transRead(record, 3);
        printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
-       h1 = transRead(record, 5);
+       h5 = transRead(record, 5);
+       printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]);
 //     getRemoteObj(&record, 0,1);
 }
+
+int test2(void) {
+
+       transrecord_t *record;
+       objheader_t *h1,*h2,*h3,*h4,*h5, *h6;
+
+       dstmInit();
+       record = transStart();
+       printf("DEBUG -> Init done\n");
+       h1 = transRead(record, 1);
+//     printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
+       h2 = transRead(record, 2);
+//     printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
+       h3 = transRead(record, 3);
+//     printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+       h4 = transRead(record, 4);
+//     printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+       h4->status |= DIRTY;
+       h5 = transRead(record, 5);
+//     printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]);
+       h6 = transRead(record, 6);
+//     printf("oid = %d\tsize = %d\n", h6->oid,classsize[h6->type]);
+       h6->status |= DIRTY;
+       transCommit(record);
+}
+
+
index da4298e5302ba57b5882aeb54d217307a47e2ea6..22672f46705ddbfe0bc2ee0b767deb5b850358f8 100644 (file)
@@ -4,6 +4,9 @@
 extern objstr_t *mainobjstore;
 int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
 
+int test1(void);
+int test2(void);
+
 unsigned int createObjects(transrecord_t *record, unsigned short type) {
        objheader_t *header, *tmp;
        unsigned int size;
@@ -17,6 +20,11 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) {
 }
 
 int main()
+{
+       test2();
+}
+
+int test1()
 {
        unsigned int val;
        transrecord_t *myTrans;
@@ -44,6 +52,55 @@ int main()
        if((val = createObjects(myTrans, 3)) != 0) {
                printf("Error transCreateObj4");
        }
+       //Create Object5
+       if((val = createObjects(myTrans, 0)) != 0) {
+               printf("Error transCreateObj5");
+       }
+       //Create Object6
+       if((val = createObjects(myTrans, 1)) != 0) {
+               printf("Error transCreateObj6");
+       }
        pthread_join(thread_Listen, NULL);
        return 0;
 }
+
+int test2() {
+       
+       unsigned int val;
+       transrecord_t *myTrans;
+       pthread_t thread_Listen;
+
+       dstmInit();     
+       pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+       // Start Transaction    
+       myTrans = transStart();
+
+       printf("Creating Transaction\n");
+       //Create Object1
+       if((val = createObjects(myTrans, 0)) != 0) {
+               printf("Error transCreateObj1");
+       }
+       //Create Object2
+       if((val = createObjects(myTrans, 1)) != 0) {
+               printf("Error transCreateObj2");
+       }
+       //Create Object3
+       if((val = createObjects(myTrans, 2)) != 0) {
+               printf("Error transCreateObj3");
+       }
+       //Create Object4
+       if((val = createObjects(myTrans, 3)) != 0) {
+               printf("Error transCreateObj4");
+       }
+       //Create Object5
+       if((val = createObjects(myTrans, 0)) != 0) {
+               printf("Error transCreateObj5");
+       }
+       //Create Object6
+       if((val = createObjects(myTrans, 1)) != 0) {
+               printf("Error transCreateObj6");
+       }
+       pthread_join(thread_Listen, NULL);
+
+
+}
index a3661d438bbcb0be4ac8e0e417e63cc0cec2aa37..1933382994bc8c638c541aa2e6724c5373f72bf8 100644 (file)
@@ -46,7 +46,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objcopy);
        } else {
                //Get the object from the remote location
-               printf("oid not found in local machine lookup\n");
+               //printf("oid not found in local machine lookup\n");
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL)
@@ -119,6 +119,7 @@ void *transRequest(void *threadarg) {
        char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
 
        tdata = (thread_data_array_t *) threadarg;
+       printf("DEBUG -> New thread id %d\n", tdata->thread_id);
        //Send Trans Request
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST");
@@ -137,16 +138,20 @@ void *transRequest(void *threadarg) {
 
        //Multiple writes for sending packets of data 
        //Send first few fixed bytes of the TRANS_REQUEST protocol
+       printf("DEBUG -> Start sending commit data...\n", tdata->buffer->f.control);
+       printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
        if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) {
                perror("Error sending fixed bytes for thread");
                return NULL;
        }
        //Send list of machines involved in the transaction
+       printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
        if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
                perror("Error sending list of machines for thread");
                return NULL;
        }
        //Send oids and version number tuples for objects that are read
+       printf("Bytes sent in the third write: %d\n", sizeof(unsigned int) + sizeof(short) * tdata->pilecount);
        if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) {
                perror("Error sending tuples for thread");
                return NULL;
@@ -154,6 +159,7 @@ void *transRequest(void *threadarg) {
        //Send objects that are modified
        for( i = 0; i < tdata->buffer->f.nummod ; i++) {
                headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+               printf("Bytes sent for %d obj modified %d\n", i+1, sizeof(objheader_t) + classsize[headeraddr->type]);
                if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
                        perror("Error sending obj modified for thread");
                        return NULL;
@@ -187,7 +193,7 @@ void *transRequest(void *threadarg) {
        pthread_exit(NULL);
 }
 
-int transCommit(transrecord_t *record)       
+int transCommit(transrecord_t *record) {       
        chashlistnode_t *curr, *ptr, *next;
        unsigned int size;//Represents number of bins in the chash table
        unsigned int machinenum, tot_bytes_mod;
@@ -212,10 +218,14 @@ int transCommit(transrecord_t *record){
                        }
                        next = curr->next;
                        //Get machine location for object id
+                       /*
                        if ((machinenum = lhashSearch(curr->key)) == 0) {
                               printf("Error: No such machine\n");
                               return 1;
                        }               
+                       */
+                       //TODO only for debug
+                       machinenum = 1;
                        if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
                                printf("Error: No such oid\n");
                                return 1;
@@ -253,6 +263,7 @@ int transCommit(transrecord_t *record){
        pListMid(pile, listmid);
        //Process each machine group
        while(tmp != NULL) {
+               printf("DEBUG -> Created thread %d... \n", numthreads);
                //Create transaction id
                newtid++;
                trans_req_data_t *tosend;
@@ -279,7 +290,7 @@ int transCommit(transrecord_t *record){
                thread_data_array[numthreads].count = &trecvcount;
                thread_data_array[numthreads].rec = record;
 
-               rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]);  
+               rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
                if (rc) {
                        perror("Error in pthread create");
                        return 1;