From 22189b456122c7213b12ae9e4555a256f1316f4c Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 29 Mar 2007 08:09:51 +0000 Subject: [PATCH] Buggy code, first packet lost during communication between server and client TODO: fix receiving and decoding control bit at server --- .../src/Runtime/DSTM/interface/dstmserver.c | 44 ++++++++------ Robust/src/Runtime/DSTM/interface/plookup.c | 1 + .../src/Runtime/DSTM/interface/testclient.c | 42 +++++++++++++- .../src/Runtime/DSTM/interface/testserver.c | 57 +++++++++++++++++++ Robust/src/Runtime/DSTM/interface/trans.c | 17 +++++- 5 files changed, 139 insertions(+), 22 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index ae334255..efa0efb7 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -85,17 +85,13 @@ void *dstmAccept(void *acceptfd) int fd_flags = fcntl((int)acceptfd, F_GETFD), size; printf("Recieved connection: fd = %d\n", (int)acceptfd); - numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0); - if (numbytes == -1) - { - perror("recv"); - pthread_exit(NULL); - } - else + while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0) { + printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes); control = buffer[0]; switch(control) { case READ_REQUEST: + printf("DEBUG -> READ_REQUEST\n"); oid = *((int *)(buffer+1)); #ifdef DEBUG1 printf("DEBUG -> Received oid is %d\n", oid); @@ -118,29 +114,35 @@ void *dstmAccept(void *acceptfd) } break; case READ_MULT_REQUEST: + printf("DEBUG-> READ_MULT_REQUEST\n"); break; case MOVE_REQUEST: + printf("DEBUG -> MOVE_REQUEST\n"); break; case MOVE_MULT_REQUEST: + printf("DEBUG -> MOVE_MULT_REQUEST\n"); break; case TRANS_REQUEST: + printf("DEBUG -> TRANS_REQUEST\n"); printf("Client sent %d\n",buffer[0]); - int offset = 1; - printf("Num Read %d\n",*((short*)(buffer+offset))); - offset += sizeof(short); - printf("Num modified %d\n",*((short*)(buffer+offset))); - handleTransReq(acceptfd, buffer); + // handleTransReq(acceptfd, buffer); break; case TRANS_ABORT: + printf("DEBUG -> TRANS_ABORT\n"); break; case TRANS_COMMIT: + printf("DEBUG -> TRANS_COMMIT\n"); + printf("Client sent %d\n",buffer[0]); + //TODO copy the objects into the machine + /*copy the object into the object store from its old + location in the objstore(pointer to its header is already stored before)*/ break; default: printf("Error receiving"); } //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd); //printf("%s", buffer); - } + } if (close((int)acceptfd) == -1) { perror("close"); @@ -151,6 +153,7 @@ void *dstmAccept(void *acceptfd) } //TOOD put __FILE__ __LINE__ for all error conditions +#if 0 int handleTransReq(int acceptfd, char *buf) { short numread = 0, nummod = 0; char control; @@ -159,11 +162,12 @@ int handleTransReq(int acceptfd, char *buf) { objheader_t *headptr = NULL; objstr_t *tmpholder; void *top, *mobj; + char sendbuf[RECEIVE_BUFFER_SIZE]; control = buf[0]; - offset = 1; - numread = *((short *)(buf+offset)); + offset = sizeof(fixed_data_t); + list = *((short *)(buf+offset)); offset += sizeof(short); nummod = *((short *)(buf+offset)); offset += sizeof(short); @@ -303,17 +307,25 @@ int handleTransReq(int acceptfd, char *buf) { offset += size; } } + /* if(transabort > 0) { sendbuf[0] = TRANS_DISAGREE_ABORT; if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { perror(""); } - } else { + } else if(transagree == numread+nummod) { sendbuf[0] = TRANS_AGREE; if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { perror(""); } + } else { + sendbuf[0] = TRANS_DISAGREE; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } } + */ return 0; } +#endif diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 15236d51..47c3e012 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -96,6 +96,7 @@ int pListMid(plistnode_t *pile, unsigned int *list) { while (tmp != NULL) { list[i] = tmp->mid; i++; + tmp = tmp->next; } return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index 0a1715b9..6d1c34e1 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -3,10 +3,18 @@ int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; -int main() { +int test1(void); +int test2(void); + +int main() +{ + test2(); +} + +int test1(void) { transrecord_t *record; - objheader_t *h1,*h2,*h3,*h4; + objheader_t *h1,*h2,*h3,*h4,*h5, *h6; dstmInit(); record = transStart(); @@ -23,6 +31,34 @@ int main() { printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]); h3 = transRead(record, 3); printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]); - h1 = transRead(record, 5); + h5 = transRead(record, 5); + printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]); // getRemoteObj(&record, 0,1); } + +int test2(void) { + + transrecord_t *record; + objheader_t *h1,*h2,*h3,*h4,*h5, *h6; + + dstmInit(); + record = transStart(); + printf("DEBUG -> Init done\n"); + h1 = transRead(record, 1); +// printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]); + h2 = transRead(record, 2); +// printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]); + h3 = transRead(record, 3); +// printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]); + h4 = transRead(record, 4); +// printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]); + h4->status |= DIRTY; + h5 = transRead(record, 5); +// printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]); + h6 = transRead(record, 6); +// printf("oid = %d\tsize = %d\n", h6->oid,classsize[h6->type]); + h6->status |= DIRTY; + transCommit(record); +} + + diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index da4298e5..22672f46 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -4,6 +4,9 @@ extern objstr_t *mainobjstore; int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; +int test1(void); +int test2(void); + unsigned int createObjects(transrecord_t *record, unsigned short type) { objheader_t *header, *tmp; unsigned int size; @@ -17,6 +20,11 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) { } int main() +{ + test2(); +} + +int test1() { unsigned int val; transrecord_t *myTrans; @@ -44,6 +52,55 @@ int main() if((val = createObjects(myTrans, 3)) != 0) { printf("Error transCreateObj4"); } + //Create Object5 + if((val = createObjects(myTrans, 0)) != 0) { + printf("Error transCreateObj5"); + } + //Create Object6 + if((val = createObjects(myTrans, 1)) != 0) { + printf("Error transCreateObj6"); + } pthread_join(thread_Listen, NULL); return 0; } + +int test2() { + + unsigned int val; + transrecord_t *myTrans; + pthread_t thread_Listen; + + dstmInit(); + pthread_create(&thread_Listen, NULL, dstmListen, NULL); + // Start Transaction + myTrans = transStart(); + + printf("Creating Transaction\n"); + //Create Object1 + if((val = createObjects(myTrans, 0)) != 0) { + printf("Error transCreateObj1"); + } + //Create Object2 + if((val = createObjects(myTrans, 1)) != 0) { + printf("Error transCreateObj2"); + } + //Create Object3 + if((val = createObjects(myTrans, 2)) != 0) { + printf("Error transCreateObj3"); + } + //Create Object4 + if((val = createObjects(myTrans, 3)) != 0) { + printf("Error transCreateObj4"); + } + //Create Object5 + if((val = createObjects(myTrans, 0)) != 0) { + printf("Error transCreateObj5"); + } + //Create Object6 + if((val = createObjects(myTrans, 1)) != 0) { + printf("Error transCreateObj6"); + } + pthread_join(thread_Listen, NULL); + + +} diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index a3661d43..19333829 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -46,7 +46,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objcopy); } else { //Get the object from the remote location - printf("oid not found in local machine lookup\n"); + //printf("oid not found in local machine lookup\n"); machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) @@ -119,6 +119,7 @@ void *transRequest(void *threadarg) { char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; tdata = (thread_data_array_t *) threadarg; + printf("DEBUG -> New thread id %d\n", tdata->thread_id); //Send Trans Request if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST"); @@ -137,16 +138,20 @@ void *transRequest(void *threadarg) { //Multiple writes for sending packets of data //Send first few fixed bytes of the TRANS_REQUEST protocol + printf("DEBUG -> Start sending commit data...\n", tdata->buffer->f.control); + printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t)); if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) { perror("Error sending fixed bytes for thread"); return NULL; } //Send list of machines involved in the transaction + printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) { perror("Error sending list of machines for thread"); return NULL; } //Send oids and version number tuples for objects that are read + printf("Bytes sent in the third write: %d\n", sizeof(unsigned int) + sizeof(short) * tdata->pilecount); if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) { perror("Error sending tuples for thread"); return NULL; @@ -154,6 +159,7 @@ void *transRequest(void *threadarg) { //Send objects that are modified for( i = 0; i < tdata->buffer->f.nummod ; i++) { headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + printf("Bytes sent for %d obj modified %d\n", i+1, sizeof(objheader_t) + classsize[headeraddr->type]); if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) { perror("Error sending obj modified for thread"); return NULL; @@ -187,7 +193,7 @@ void *transRequest(void *threadarg) { pthread_exit(NULL); } -int transCommit(transrecord_t *record){ +int transCommit(transrecord_t *record) { chashlistnode_t *curr, *ptr, *next; unsigned int size;//Represents number of bins in the chash table unsigned int machinenum, tot_bytes_mod; @@ -212,10 +218,14 @@ int transCommit(transrecord_t *record){ } next = curr->next; //Get machine location for object id + /* if ((machinenum = lhashSearch(curr->key)) == 0) { printf("Error: No such machine\n"); return 1; } + */ + //TODO only for debug + machinenum = 1; if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { printf("Error: No such oid\n"); return 1; @@ -253,6 +263,7 @@ int transCommit(transrecord_t *record){ pListMid(pile, listmid); //Process each machine group while(tmp != NULL) { + printf("DEBUG -> Created thread %d... \n", numthreads); //Create transaction id newtid++; trans_req_data_t *tosend; @@ -279,7 +290,7 @@ int transCommit(transrecord_t *record){ thread_data_array[numthreads].count = &trecvcount; thread_data_array[numthreads].rec = record; - rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]); + rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]); if (rc) { perror("Error in pthread create"); return 1; -- 2.34.1