*** empty log message ***
authoradash <adash>
Mon, 19 Mar 2007 02:12:39 +0000 (02:12 +0000)
committeradash <adash>
Mon, 19 Mar 2007 02:12:39 +0000 (02:12 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/llookup.c
Robust/src/Runtime/DSTM/interface/mlookup.c
Robust/src/Runtime/DSTM/interface/testclient.c
Robust/src/Runtime/DSTM/interface/trans.c

index 64600ff4a17ab14d48a4478b2e99da56f656cfec..eb82144a00fb4db64a3b09277bab31f1d5e0cdf5 100644 (file)
@@ -1,6 +1,24 @@
 #ifndef _DSTM_H_
 #define _DSTM_H_
 
+//Client Messages
+#define READ_REQUEST           1
+#define READ_MULT_REQUEST      2
+#define MOVE_REQUEST           3
+#define MOVE_MULT_REQUEST      4
+#define        TRANS_REQUEST           5
+#define        TRANS_ABORT             6
+#define TRANS_COMMIT           7
+
+//Server Messages
+#define OBJECT_FOUND           8
+#define OBJECT_NOT_FOUND       9
+#define OBJECTS_FOUND          10
+#define OBJECTS_NOT_FOUND      11
+#define TRANS_AGREE            12
+#define TRANS_DISAGREE         13
+#define TRANS_SUCESSFUL                14
+
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
index e154c417d1b1b4fa14b004027da13b04e3de8d07..fca266442d51c00dddbd6ac148cba62c246cb1c2 100644 (file)
@@ -10,7 +10,7 @@
 
 #define LISTEN_PORT 2156
 #define BACKLOG 10 //max pending connections
-#define RECIEVE_BUFFER_SIZE 2048
+#define RECEIVE_BUFFER_SIZE 2048
 
 extern int classsize[];
 
@@ -79,50 +79,62 @@ void *dstmListen()
 void *dstmAccept(void *acceptfd)
 {
        int numbytes,i,choice, oid;
-       char buffer[RECIEVE_BUFFER_SIZE];
-       char opcode[10];
+       char buffer[RECEIVE_BUFFER_SIZE], control;
        void *srcObj;
        objheader_t *h;
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
        printf("Recieved connection: fd = %d\n", (int)acceptfd);
-       do
+       numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
+       if (numbytes == -1)
        {
-               numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
-               buffer[numbytes] = '\0';
-               if (numbytes == -1)
-               {
-                       perror("recv");
-                       pthread_exit(NULL);
-               }
-               else
-               {
-                       sscanf(buffer, "%s %d\n", opcode, &oid);
-                       
-                       if (strcmp(opcode, "TRANS_RD") == 0) {
-                               printf("DEBUG -> Requesting:  %s %d\n", opcode, oid);
+               perror("recv");
+               pthread_exit(NULL);
+       }
+       else
+       {
+               control = buffer[0];
+               switch(control) {
+                       case READ_REQUEST:
+                               oid = *((int *)(buffer+1));
+#ifdef DEBUG1
+                               printf("DEBUG -> Received oid is %d\n", oid);
+#endif
                                srcObj = mhashSearch(oid);
                                h = (objheader_t *) srcObj;
+                               if (h == NULL) {
+                                       buffer[0] = OBJECT_NOT_FOUND;
+                               } else {
+                                       buffer[0] = OBJECT_FOUND;
+                                       size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+                                       memcpy(buffer+1, srcObj, size);
+                               }
+#ifdef DEBUG1
                                printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
-                               size = sizeof(objheader_t) + sizeof(classsize[h->type]);
-                               if(send((int)acceptfd, srcObj, size, 0) < 0) {
+#endif
+
+                               if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
                                        perror("");
                                }
-                               //printf("DEBUG -> sent ...%d\n", write(acceptfd, srcObj, size));
-                       }
-                       else if (strcmp(opcode, "TRANS_COMMIT") == 0)
-                               printf(" This is a Commit\n");
-                       else if (strcmp(opcode,"TRANS_ABORT") == 0)
-                               printf(" This is a Abort\n");
-                       else 
-                               printf(" This is a Broadcastt\n");
-                                       
-                       //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
-                       //printf("%s", buffer);
+                               break;
+                       case READ_MULT_REQUEST:
+                               break;
+                       case MOVE_REQUEST:
+                               break;
+                       case MOVE_MULT_REQUEST:
+                               break;
+                       case TRANS_REQUEST:
+                               break;
+                       case TRANS_ABORT:
+                               break;
+                       case TRANS_COMMIT:
+                               break;
+                       default:
+                               printf("Error receiving");
                }
-               
-       //} while (numbytes != 0);
-       } while (0);
+               //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
+               //printf("%s", buffer);
+       }
        if (close((int)acceptfd) == -1)
        {
                perror("close");
index 02469a149b890c09a5a5b616290c132f82d367c2..cc989a289707f29d0c3d7cd0206b0388acff467e 100644 (file)
@@ -64,6 +64,7 @@ unsigned int lhashInsert(unsigned int oid, unsigned int mid) {
        } else {                        // Insert in the linked list
                if ((node = calloc(1, sizeof(lhashlistnode_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&llookup.locktable);
                        return 1;
                }
                node->oid = oid;
@@ -87,6 +88,7 @@ unsigned int lhashSearch(unsigned int oid) {
        pthread_mutex_lock(&llookup.locktable);
        while(node != NULL) {
                if(node->oid == oid) {
+                       pthread_mutex_unlock(&llookup.locktable);
                        return node->mid;
                }
                node = node->next;
@@ -122,6 +124,7 @@ unsigned int lhashRemove(unsigned int oid) {
                                prev->next = curr->next;
                                free(curr);
                        }
+                       pthread_mutex_unlock(&llookup.locktable);
                        return 0;
                }       
                prev = curr; 
index 44317e2aecb707affe0c859d73fd3d73cbbf5863..bcee6650bf4f0eb39ca2d376c1e5a9dc803f4f6e 100644 (file)
@@ -54,6 +54,7 @@ unsigned int mhashInsert(unsigned int key, void *val) {
        } else {                        // Insert in the beginning of linked list
                if ((node = calloc(1, sizeof(mhashlistnode_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&mlookup.locktable);
                        return 1;
                }
                node->key = key;
@@ -76,6 +77,7 @@ void *mhashSearch(unsigned int key) {
        pthread_mutex_lock(&mlookup.locktable);
        while(node != NULL) {
                if(node->key == key) {
+                       pthread_mutex_unlock(&mlookup.locktable);
                        return node->val;
                }
                node = node->next;
@@ -111,6 +113,7 @@ unsigned int mhashRemove(unsigned int key) {
                                prev->next = curr->next;
                                free(curr);
                        }
+                       pthread_mutex_unlock(&mlookup.locktable);
                        return 0;
                }       
                prev = curr; 
index ee384473f356525434546af441736958f3f8ff93..0a1715b941c4a2b14344e3327b7385e607a85d12 100644 (file)
@@ -10,16 +10,19 @@ int main() {
 
        dstmInit();
        record = transStart();
-       printf("DEBUG -> Init done");
-       h1 = transRead(record, 3);
+       printf("DEBUG -> Init done\n");
+       h1 = transRead(record, 1);
        printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
-       h3 = transRead(record, 1);
+       h3 = transRead(record, 3);
        printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
-       /*
-       h4 = transRead(&record, 4);
+       h4 = transRead(record, 4);
        printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
-       h3 = transRead(&record, 2);
+       h2 = transRead(record, 2);
        printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
-       */
+       h4 = transRead(record, 4);
+       printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+       h3 = transRead(record, 3);
+       printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+       h1 = transRead(record, 5);
 //     getRemoteObj(&record, 0,1);
 }
index 9aea07ed9012204769a4fe5ff0ad87bf15c65158..7934b6a15be14a57aa383b4a9387973d9c268538 100644 (file)
@@ -9,7 +9,7 @@
 
 #define LISTEN_PORT 2156
 #define MACHINE_IP "127.0.0.1"
-#define RECIEVE_BUFFER_SIZE 2048
+#define RECEIVE_BUFFER_SIZE 2048
 
 extern int classsize[];
 
@@ -33,7 +33,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
-               printf(" oid not found in cache\n");
+               printf("oid not found in local cache\n");
                tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                //Copy into cache
@@ -43,15 +43,13 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                chashInsert(record->lookupTable, objheader->oid, objcopy); 
                return(objcopy);
        } else {
-               printf(" oid not found in Machine Lookup\n");
+               printf("oid not found in local machine lookup\n");
                machinenumber = lhashSearch(oid);
-               //Get object from a given machine 
-       /*      if (getRemoteObj(record, machinenumber, oid) != 0) {
-                       printf("Error getRemoteObj");
-               }
-       */
                objcopy = getRemoteObj(record, machinenumber, oid);
-               return(objcopy);
+               if(objcopy == NULL)
+                       printf("Object not found in Machine %d\n", machinenumber);
+               else
+                       return(objcopy);
        } 
 }
 
@@ -81,7 +79,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        int sd, size;
        struct sockaddr_in serv_addr;
        struct hostent *server;
-       char buffer[RECIEVE_BUFFER_SIZE];
+       char buffer[RECEIVE_BUFFER_SIZE],control;
        objheader_t *h;
        void *objcopy;
 
@@ -99,24 +97,31 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                return NULL;
        }
        bzero((char *)buffer,sizeof(buffer));
-       sprintf(buffer, "TRANS_RD %d\n", oid);
+       control = READ_REQUEST;
+       buffer[0] = control;
+       memcpy(buffer+1, &oid, sizeof(int));
        if (write(sd, buffer, sizeof(buffer)) < 0) {
                perror("Error sending message");
                return NULL;
        }
+
+#ifdef DEBUG1
        printf("DEBUG -> ready to rcv ...\n");
-       /*
-       while (read(sd, buffer, sizeof(buffer)) != 0) {
-               ;
-       }
-       */
+#endif
        read(sd, buffer, sizeof(buffer));
-       h = (objheader_t *) buffer;
-       size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+       close(sd);
+       if (buffer[0] == OBJECT_NOT_FOUND) {
+               return NULL;
+       } else {
+
+               h = (objheader_t *) buffer+1;
+               size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+#ifdef DEBUG1
        printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type);
-       fflush(stdout);
+#endif
+       }
        objcopy = objstrAlloc(record->cache, size);
-       memcpy(objcopy, (void *)buffer, size);
+       memcpy(objcopy, (void *)buffer+1, size);
        //Insert into cache's lookup table
        chashInsert(record->lookupTable, oid, objcopy); 
        return objcopy;