9aea07ed9012204769a4fe5ff0ad87bf15c65158
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "clookup.h"
3 #include "mlookup.h"
4 #include "llookup.h"
5 #include<sys/types.h>
6 #include<sys/socket.h>
7 #include<netdb.h>
8 #include<netinet/in.h>
9
10 #define LISTEN_PORT 2156
11 #define MACHINE_IP "127.0.0.1"
12 #define RECIEVE_BUFFER_SIZE 2048
13
14 extern int classsize[];
15
16 transrecord_t *transStart()
17 {
18         transrecord_t *tmp = malloc(sizeof(transrecord_t));
19         tmp->cache = objstrCreate(1048576);
20         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
21         return tmp;
22 }
23
24 objheader_t *transRead(transrecord_t *record, unsigned int oid)
25 {       
26         unsigned int machinenumber;
27         objheader_t *tmp, *objheader;
28         void *objcopy;
29         int size;
30         void *buf;
31                 //check cache
32         if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
33                 return(objheader);
34         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
35                 //Look up in Machine lookup table and found
36                 printf(" oid not found in cache\n");
37                 tmp = mhashSearch(oid);
38                 size = sizeof(objheader_t)+classsize[tmp->type];
39                 //Copy into cache
40                 objcopy = objstrAlloc(record->cache, size);
41                 memcpy(objcopy, (void *)tmp, size);
42                 //Insert into cache's lookup table
43                 chashInsert(record->lookupTable, objheader->oid, objcopy); 
44                 return(objcopy);
45         } else {
46                 printf(" oid not found in Machine Lookup\n");
47                 machinenumber = lhashSearch(oid);
48                 //Get object from a given machine 
49         /*      if (getRemoteObj(record, machinenumber, oid) != 0) {
50                         printf("Error getRemoteObj");
51                 }
52         */
53                 objcopy = getRemoteObj(record, machinenumber, oid);
54                 return(objcopy);
55         } 
56 }
57
58 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
59 {
60         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
61         tmp->oid = getNewOID();
62         tmp->type = type;
63         tmp->version = 1;
64         tmp->rcount = 0; //? not sure how to handle this yet
65         tmp->status |= NEW;
66         chashInsert(record->lookupTable, tmp->oid, tmp);
67         return tmp;
68 }
69
70 int transCommit(transrecord_t *record){ 
71         //Move objects to machine that hosts it 
72
73 }
74
75 int transAbort(transrecord_t *record){
76
77 }
78
79 //mnun will be used to represent the machine IP address later
80 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
81         int sd, size;
82         struct sockaddr_in serv_addr;
83         struct hostent *server;
84         char buffer[RECIEVE_BUFFER_SIZE];
85         objheader_t *h;
86         void *objcopy;
87
88         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
89                 perror("Error in socket");
90                 return NULL;
91         }
92         bzero((char*) &serv_addr, sizeof(serv_addr));
93         serv_addr.sin_family = AF_INET;
94         serv_addr.sin_port = htons(LISTEN_PORT);
95         serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
96
97         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
98                 perror("Error in connect");
99                 return NULL;
100         }
101         bzero((char *)buffer,sizeof(buffer));
102         sprintf(buffer, "TRANS_RD %d\n", oid);
103         if (write(sd, buffer, sizeof(buffer)) < 0) {
104                 perror("Error sending message");
105                 return NULL;
106         }
107         printf("DEBUG -> ready to rcv ...\n");
108         /*
109         while (read(sd, buffer, sizeof(buffer)) != 0) {
110                 ;
111         }
112         */
113         read(sd, buffer, sizeof(buffer));
114         h = (objheader_t *) buffer;
115         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
116         printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type);
117         fflush(stdout);
118         objcopy = objstrAlloc(record->cache, size);
119         memcpy(objcopy, (void *)buffer, size);
120         //Insert into cache's lookup table
121         chashInsert(record->lookupTable, oid, objcopy); 
122         return objcopy;
123 }