From: adash Date: Sat, 17 Mar 2007 11:04:08 +0000 (+0000) Subject: Add test code for client, send TRANS_RD request protocol, X-Git-Tag: preEdgeChange~655 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=8d3c53d33976cd3e3e38bd886ddf4504a0ab70f9;p=IRC.git Add test code for client, send TRANS_RD request protocol, add function to get remote object Some bugs still present..need fixing --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.c b/Robust/src/Runtime/DSTM/interface/dstm.c index 36c4b063..62f6955f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.c +++ b/Robust/src/Runtime/DSTM/interface/dstm.c @@ -7,12 +7,12 @@ extern int classsize[]; // Get a new object id unsigned int getNewOID(void) { static int id = 1; - return ++id; + return id++; } // Get the size of the object for a given type unsigned int objSize(objheader_t *object) { - return classsize[h.type]; + return classsize[object->type]; } /* END object header */ diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index ad02018f..64600ff4 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -56,4 +56,6 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //retur int transCommit(transrecord_t *record); //return 0 if successful /* end transactions */ +void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); + #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index fcc57a65..e154c417 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -8,28 +8,28 @@ #include "mlookup.h" #include "llookup.h" -#define LISTEN_PORT 2153 +#define LISTEN_PORT 2156 #define BACKLOG 10 //max pending connections -#define RECIEVE_BUFFER_SIZE 1500 +#define RECIEVE_BUFFER_SIZE 2048 +extern int classsize[]; objstr_t *mainobjstore; -mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); int dstmInit(void) { //todo:initialize main object store //do we want this to be a global variable, or provide //separate access funtions and hide the structure? - + mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure if (lhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure - pthread_t threadListen; - pthread_create(&threadListen, NULL, dstmListen, NULL); + //pthread_t threadListen; + //pthread_create(&threadListen, NULL, dstmListen, NULL); return 0; } @@ -43,7 +43,7 @@ void *dstmListen() pthread_t thread_dstm_accept; int i; - listenfd = socket(PF_INET, SOCK_STREAM, 0); + listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd == -1) { perror("socket"); @@ -78,9 +78,13 @@ void *dstmListen() void *dstmAccept(void *acceptfd) { - int numbytes; + int numbytes,i,choice, oid; char buffer[RECIEVE_BUFFER_SIZE]; - int fd_flags = fcntl((int)acceptfd, F_GETFD); + char opcode[10]; + void *srcObj; + objheader_t *h; + int fd_flags = fcntl((int)acceptfd, F_GETFD), size; + printf("Recieved connection: fd = %d\n", (int)acceptfd); do { @@ -93,19 +97,38 @@ void *dstmAccept(void *acceptfd) } else { - printf("Read %d bytes from %d\n", numbytes, (int)acceptfd); - printf("%s", buffer); + sscanf(buffer, "%s %d\n", opcode, &oid); + + if (strcmp(opcode, "TRANS_RD") == 0) { + printf("DEBUG -> Requesting: %s %d\n", opcode, oid); + srcObj = mhashSearch(oid); + h = (objheader_t *) srcObj; + printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type); + size = sizeof(objheader_t) + sizeof(classsize[h->type]); + if(send((int)acceptfd, srcObj, size, 0) < 0) { + perror(""); + } + //printf("DEBUG -> sent ...%d\n", write(acceptfd, srcObj, size)); + } + else if (strcmp(opcode, "TRANS_COMMIT") == 0) + printf(" This is a Commit\n"); + else if (strcmp(opcode,"TRANS_ABORT") == 0) + printf(" This is a Abort\n"); + else + printf(" This is a Broadcastt\n"); + + //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd); + //printf("%s", buffer); } - } while (numbytes != 0); + //} while (numbytes != 0); + } while (0); if (close((int)acceptfd) == -1) { perror("close"); - pthread_exit(NULL); } else printf("Closed connection: fd = %d\n", (int)acceptfd); pthread_exit(NULL); } - diff --git a/Robust/src/Runtime/DSTM/interface/servertest.c b/Robust/src/Runtime/DSTM/interface/servertest.c index 98bd9e2a..da4298e5 100644 --- a/Robust/src/Runtime/DSTM/interface/servertest.c +++ b/Robust/src/Runtime/DSTM/interface/servertest.c @@ -1,11 +1,49 @@ #include -#include "dstmserver.h" +#include "dstm.h" + +extern objstr_t *mainobjstore; +int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; + +unsigned int createObjects(transrecord_t *record, unsigned short type) { + objheader_t *header, *tmp; + unsigned int size; + size = sizeof(objheader_t) + classsize[type] ; + header = transCreateObj(record, type); + tmp = (objheader_t *) objstrAlloc(mainobjstore, size); + memcpy(tmp, header, size); + mhashInsert(tmp->oid, tmp); + lhashInsert(tmp->oid, 1); + return 0; +} int main() { - pthread_t thread_listen; - pthread_create(&thread_listen, NULL, dstmListen, NULL); - pthread_join(thread_listen, NULL); + unsigned int val; + transrecord_t *myTrans; + pthread_t thread_Listen; + + dstmInit(); + pthread_create(&thread_Listen, NULL, dstmListen, NULL); + // Start Transaction + myTrans = transStart(); + + printf("Creating Transaction\n"); + //Create Object1 + if((val = createObjects(myTrans, 0)) != 0) { + printf("Error transCreateObj1"); + } + //Create Object2 + if((val = createObjects(myTrans, 1)) != 0) { + printf("Error transCreateObj2"); + } + //Create Object3 + if((val = createObjects(myTrans, 2)) != 0) { + printf("Error transCreateObj3"); + } + //Create Object4 + if((val = createObjects(myTrans, 3)) != 0) { + printf("Error transCreateObj4"); + } + pthread_join(thread_Listen, NULL); return 0; } - diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c new file mode 100644 index 00000000..ee384473 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -0,0 +1,25 @@ +#include +#include "dstm.h" + +int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; + +int main() { + + transrecord_t *record; + objheader_t *h1,*h2,*h3,*h4; + + dstmInit(); + record = transStart(); + printf("DEBUG -> Init done"); + h1 = transRead(record, 3); + printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]); + h3 = transRead(record, 1); + printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]); + /* + h4 = transRead(&record, 4); + printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]); + h3 = transRead(&record, 2); + printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]); + */ +// getRemoteObj(&record, 0,1); +} diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 90507f5e..9aea07ed 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -2,6 +2,14 @@ #include "clookup.h" #include "mlookup.h" #include "llookup.h" +#include +#include +#include +#include + +#define LISTEN_PORT 2156 +#define MACHINE_IP "127.0.0.1" +#define RECIEVE_BUFFER_SIZE 2048 extern int classsize[]; @@ -16,10 +24,10 @@ transrecord_t *transStart() objheader_t *transRead(transrecord_t *record, unsigned int oid) { unsigned int machinenumber; - objheader_t *tmp, *objheader; void *objcopy; int size; + void *buf; //check cache if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ return(objheader); @@ -37,15 +45,19 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) } else { printf(" oid not found in Machine Lookup\n"); machinenumber = lhashSearch(oid); - //TODO:broadcast - return(NULL); + //Get object from a given machine + /* if (getRemoteObj(record, machinenumber, oid) != 0) { + printf("Error getRemoteObj"); + } + */ + objcopy = getRemoteObj(record, machinenumber, oid); + return(objcopy); } } - objheader_t *transCreateObj(transrecord_t *record, unsigned short type) { - objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, classsize[type]); + objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type])); tmp->oid = getNewOID(); tmp->type = type; tmp->version = 1; @@ -63,3 +75,49 @@ int transCommit(transrecord_t *record){ int transAbort(transrecord_t *record){ } + +//mnun will be used to represent the machine IP address later +void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { + int sd, size; + struct sockaddr_in serv_addr; + struct hostent *server; + char buffer[RECIEVE_BUFFER_SIZE]; + objheader_t *h; + void *objcopy; + + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket"); + return NULL; + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); + + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("Error in connect"); + return NULL; + } + bzero((char *)buffer,sizeof(buffer)); + sprintf(buffer, "TRANS_RD %d\n", oid); + if (write(sd, buffer, sizeof(buffer)) < 0) { + perror("Error sending message"); + return NULL; + } + printf("DEBUG -> ready to rcv ...\n"); + /* + while (read(sd, buffer, sizeof(buffer)) != 0) { + ; + } + */ + read(sd, buffer, sizeof(buffer)); + h = (objheader_t *) buffer; + size = sizeof(objheader_t) + sizeof(classsize[h->type]); + printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type); + fflush(stdout); + objcopy = objstrAlloc(record->cache, size); + memcpy(objcopy, (void *)buffer, size); + //Insert into cache's lookup table + chashInsert(record->lookupTable, oid, objcopy); + return objcopy; +}