Add test code for client, send TRANS_RD request protocol,
authoradash <adash>
Sat, 17 Mar 2007 11:04:08 +0000 (11:04 +0000)
committeradash <adash>
Sat, 17 Mar 2007 11:04:08 +0000 (11:04 +0000)
add function to get remote object
Some bugs still present..need fixing

Robust/src/Runtime/DSTM/interface/dstm.c
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/servertest.c
Robust/src/Runtime/DSTM/interface/testclient.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/trans.c

index 36c4b0637ec86fe89ba2be42e9812f5cef69d586..62f6955f6e5d3a98fb38f8431427909c56ff0f11 100644 (file)
@@ -7,12 +7,12 @@ extern int classsize[];
 // Get a new object id
 unsigned int getNewOID(void) {
        static int id = 1;
-       return ++id;
+       return id++;
 }
 
 // Get the size of the object for a given type
 unsigned int objSize(objheader_t *object) {
-       return classsize[h.type];
+       return classsize[object->type];
 }
 
 /* END object header */
index ad02018f5d133a694d5baa279ab767d53a44537b..64600ff4a17ab14d48a4478b2e99da56f656cfec 100644 (file)
@@ -56,4 +56,6 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //retur
 int transCommit(transrecord_t *record); //return 0 if successful
 /* end transactions */
 
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
+
 #endif
index fcc57a658c046965c38e33ce4e11add6962b4ba2..e154c417d1b1b4fa14b004027da13b04e3de8d07 100644 (file)
@@ -8,28 +8,28 @@
 #include "mlookup.h"
 #include "llookup.h"
 
-#define LISTEN_PORT 2153
+#define LISTEN_PORT 2156
 #define BACKLOG 10 //max pending connections
-#define RECIEVE_BUFFER_SIZE 1500
+#define RECIEVE_BUFFER_SIZE 2048
 
+extern int classsize[];
 
 objstr_t *mainobjstore;
-mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);   
 
 int dstmInit(void)
 {
        //todo:initialize main object store
        //do we want this to be a global variable, or provide
        //separate access funtions and hide the structure?
-
+       mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
        if (lhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
-       pthread_t threadListen;
-       pthread_create(&threadListen, NULL, dstmListen, NULL);
+       //pthread_t threadListen;
+       //pthread_create(&threadListen, NULL, dstmListen, NULL);
        
        return 0;
 }
@@ -43,7 +43,7 @@ void *dstmListen()
        pthread_t thread_dstm_accept;
        int i;
 
-       listenfd = socket(PF_INET, SOCK_STREAM, 0);
+       listenfd = socket(AF_INET, SOCK_STREAM, 0);
        if (listenfd == -1)
        {
                perror("socket");
@@ -78,9 +78,13 @@ void *dstmListen()
 
 void *dstmAccept(void *acceptfd)
 {
-       int numbytes;
+       int numbytes,i,choice, oid;
        char buffer[RECIEVE_BUFFER_SIZE];
-       int fd_flags = fcntl((int)acceptfd, F_GETFD);
+       char opcode[10];
+       void *srcObj;
+       objheader_t *h;
+       int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
+
        printf("Recieved connection: fd = %d\n", (int)acceptfd);
        do
        {
@@ -93,19 +97,38 @@ void *dstmAccept(void *acceptfd)
                }
                else
                {
-                       printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
-                       printf("%s", buffer);
+                       sscanf(buffer, "%s %d\n", opcode, &oid);
+                       
+                       if (strcmp(opcode, "TRANS_RD") == 0) {
+                               printf("DEBUG -> Requesting:  %s %d\n", opcode, oid);
+                               srcObj = mhashSearch(oid);
+                               h = (objheader_t *) srcObj;
+                               printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
+                               size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+                               if(send((int)acceptfd, srcObj, size, 0) < 0) {
+                                       perror("");
+                               }
+                               //printf("DEBUG -> sent ...%d\n", write(acceptfd, srcObj, size));
+                       }
+                       else if (strcmp(opcode, "TRANS_COMMIT") == 0)
+                               printf(" This is a Commit\n");
+                       else if (strcmp(opcode,"TRANS_ABORT") == 0)
+                               printf(" This is a Abort\n");
+                       else 
+                               printf(" This is a Broadcastt\n");
+                                       
+                       //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
+                       //printf("%s", buffer);
                }
                
-       } while (numbytes != 0);
+       //} while (numbytes != 0);
+       } while (0);
        if (close((int)acceptfd) == -1)
        {
                perror("close");
-               pthread_exit(NULL);
        }
        else
                printf("Closed connection: fd = %d\n", (int)acceptfd);
        pthread_exit(NULL);
 }
 
-
index 98bd9e2a11e3440e3c1b319f5fa86f50cb93b20a..da4298e5302ba57b5882aeb54d217307a47e2ea6 100644 (file)
@@ -1,11 +1,49 @@
 #include <pthread.h>
-#include "dstmserver.h"
+#include "dstm.h"
+
+extern objstr_t *mainobjstore;
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
+
+unsigned int createObjects(transrecord_t *record, unsigned short type) {
+       objheader_t *header, *tmp;
+       unsigned int size;
+       size = sizeof(objheader_t) + classsize[type] ;
+       header = transCreateObj(record, type);
+       tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
+       memcpy(tmp, header, size);
+       mhashInsert(tmp->oid, tmp);
+       lhashInsert(tmp->oid, 1);
+       return 0;
+}
 
 int main()
 {
-       pthread_t thread_listen;
-       pthread_create(&thread_listen, NULL, dstmListen, NULL); 
-       pthread_join(thread_listen, NULL);
+       unsigned int val;
+       transrecord_t *myTrans;
+       pthread_t thread_Listen;
+
+       dstmInit();     
+       pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+       // Start Transaction    
+       myTrans = transStart();
+
+       printf("Creating Transaction\n");
+       //Create Object1
+       if((val = createObjects(myTrans, 0)) != 0) {
+               printf("Error transCreateObj1");
+       }
+       //Create Object2
+       if((val = createObjects(myTrans, 1)) != 0) {
+               printf("Error transCreateObj2");
+       }
+       //Create Object3
+       if((val = createObjects(myTrans, 2)) != 0) {
+               printf("Error transCreateObj3");
+       }
+       //Create Object4
+       if((val = createObjects(myTrans, 3)) != 0) {
+               printf("Error transCreateObj4");
+       }
+       pthread_join(thread_Listen, NULL);
        return 0;
 }
-
diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c
new file mode 100644 (file)
index 0000000..ee38447
--- /dev/null
@@ -0,0 +1,25 @@
+#include<stdio.h>
+#include "dstm.h"
+
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};      
+
+int main() {
+
+       transrecord_t *record;
+       objheader_t *h1,*h2,*h3,*h4;
+
+       dstmInit();
+       record = transStart();
+       printf("DEBUG -> Init done");
+       h1 = transRead(record, 3);
+       printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
+       h3 = transRead(record, 1);
+       printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+       /*
+       h4 = transRead(&record, 4);
+       printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+       h3 = transRead(&record, 2);
+       printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
+       */
+//     getRemoteObj(&record, 0,1);
+}
index 90507f5e2554b389b6d21498cac29e49e4f68b7d..9aea07ed9012204769a4fe5ff0ad87bf15c65158 100644 (file)
@@ -2,6 +2,14 @@
 #include "clookup.h"
 #include "mlookup.h"
 #include "llookup.h"
+#include<sys/types.h>
+#include<sys/socket.h>
+#include<netdb.h>
+#include<netinet/in.h>
+
+#define LISTEN_PORT 2156
+#define MACHINE_IP "127.0.0.1"
+#define RECIEVE_BUFFER_SIZE 2048
 
 extern int classsize[];
 
@@ -16,10 +24,10 @@ transrecord_t *transStart()
 objheader_t *transRead(transrecord_t *record, unsigned int oid)
 {      
        unsigned int machinenumber;
-       
        objheader_t *tmp, *objheader;
        void *objcopy;
        int size;
+       void *buf;
                //check cache
        if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
                return(objheader);
@@ -37,15 +45,19 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        } else {
                printf(" oid not found in Machine Lookup\n");
                machinenumber = lhashSearch(oid);
-               //TODO:broadcast
-               return(NULL);
+               //Get object from a given machine 
+       /*      if (getRemoteObj(record, machinenumber, oid) != 0) {
+                       printf("Error getRemoteObj");
+               }
+       */
+               objcopy = getRemoteObj(record, machinenumber, oid);
+               return(objcopy);
        } 
 }
 
-
 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
 {
-       objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, classsize[type]);
+       objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
        tmp->oid = getNewOID();
        tmp->type = type;
        tmp->version = 1;
@@ -63,3 +75,49 @@ int transCommit(transrecord_t *record){
 int transAbort(transrecord_t *record){
 
 }
+
+//mnun will be used to represent the machine IP address later
+void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
+       int sd, size;
+       struct sockaddr_in serv_addr;
+       struct hostent *server;
+       char buffer[RECIEVE_BUFFER_SIZE];
+       objheader_t *h;
+       void *objcopy;
+
+       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               perror("Error in socket");
+               return NULL;
+       }
+       bzero((char*) &serv_addr, sizeof(serv_addr));
+       serv_addr.sin_family = AF_INET;
+       serv_addr.sin_port = htons(LISTEN_PORT);
+       serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+
+       if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+               perror("Error in connect");
+               return NULL;
+       }
+       bzero((char *)buffer,sizeof(buffer));
+       sprintf(buffer, "TRANS_RD %d\n", oid);
+       if (write(sd, buffer, sizeof(buffer)) < 0) {
+               perror("Error sending message");
+               return NULL;
+       }
+       printf("DEBUG -> ready to rcv ...\n");
+       /*
+       while (read(sd, buffer, sizeof(buffer)) != 0) {
+               ;
+       }
+       */
+       read(sd, buffer, sizeof(buffer));
+       h = (objheader_t *) buffer;
+       size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+       printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type);
+       fflush(stdout);
+       objcopy = objstrAlloc(record->cache, size);
+       memcpy(objcopy, (void *)buffer, size);
+       //Insert into cache's lookup table
+       chashInsert(record->lookupTable, oid, objcopy); 
+       return objcopy;
+}