8280d15f08517e8f2fb88682db4b99ec34783394
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
1 #include <sys/socket.h>
2 #include <netinet/in.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <math.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
8 #include "prelookup.h"
9
10 /************************
11  * Global Variables *
12  ***********************/
13 int udpSockFd;
14 extern unsigned int myIpAddr;
15
16 int createUdpSocket() {
17   int sockfd;
18   struct sockaddr_in clientaddr;
19   const int on = 1;
20
21   if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
22     perror("socket creation failed");
23     return -1;
24   }
25   if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
26     perror("setsockopt - SOL_SOCKET");
27     return -1;
28   }
29   return sockfd;
30 }
31
32 int udpInit() {
33   int sockfd;
34   int setsockflag = 1;
35   struct sockaddr_in servaddr;
36
37   //Create Global Udp Socket
38   if((udpSockFd = createUdpSocket()) < 0) {
39     printf("Error in socket\n");
40   }
41
42   sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
43   if(sockfd < 0) {
44     perror("socket");
45     exit(1);
46   }
47
48   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
49     perror("socket");
50     exit(1);
51   }
52
53 #ifdef MAC
54   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
55     perror("socket");
56     exit(1);
57   }
58 #endif
59
60   bzero(&servaddr, sizeof(servaddr));
61   servaddr.sin_family = AF_INET;
62   servaddr.sin_port = htons(UDP_PORT);
63   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
64
65   if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
66     perror("bind");
67     exit(1);
68   }
69
70   return sockfd;
71 }
72
73 /* Function that listens for udp broadcast messages */
74 void *udpListenBroadcast(void *sockfd) {
75   pthread_t thread_udpBroadcast;
76   struct sockaddr_in servaddr;
77   socklen_t socklen = sizeof(struct sockaddr);
78   char readBuffer[MAX_SIZE];
79   int retval;
80
81   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
82
83   memset(readBuffer, 0, MAX_SIZE);
84   while(1) {
85     int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
86     if(bytesRcvd == -1) {
87       printf("DEBUG-> Recv Error! \n");
88       break;
89     }
90     short status = *((short *) &readBuffer[0]);
91     switch (status) {
92     case INVALIDATE_OBJS:
93       if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
94         printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
95         break;
96       }
97       break;
98
99     default:
100       printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
101     }
102   }
103
104   /* Close connection */
105   if(close((int)sockfd) == -1)
106     perror("close");
107   pthread_exit(NULL);
108 }
109
110 /* Function that invalidate objects that
111  * have been currently modified
112  * returns -1 on error and 0 on success */
113 int invalidateObj(trans_req_data_t *tdata) {
114   struct sockaddr_in clientaddr;
115   int retval;
116
117   bzero(&clientaddr, sizeof(clientaddr));
118   clientaddr.sin_family = AF_INET;
119   clientaddr.sin_port = htons(UDP_PORT);
120   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
121   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
122   if(tdata->f.nummod < maxObjsPerMsg) {
123     /* send single udp msg */
124     int iteration = 0;
125     if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
126       printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
127       return -1;
128     }
129   } else {
130     /* Split into several udp msgs */
131     int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg;
132     if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++;
133     int i;
134     for(i = 1; i <= maxUdpMsg; i++) {
135       if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
136         printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
137         return -1;
138       }
139     }
140   }
141   return 0;
142 }
143
144 /* Function sends a udp broadcast, also distinguishes
145  * msg size to be sent based on the iteration flag
146  * returns -1 on error and 0 on success */
147 int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
148   char writeBuffer[MAX_SIZE];
149   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
150   int offset = 0;
151   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
152   offset += sizeof(short);
153   *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
154   offset += sizeof(unsigned int);
155   if(iteration == 0) { // iteration flag == zero, send single udp msg
156     *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod));  //sizeof msg
157     offset += sizeof(short);
158     int i;
159     for(i = 0; i < tdata->f.nummod; i++) {
160       *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i];  //copy objects
161       offset += sizeof(unsigned int);
162     }
163   } else { // iteration flag > zero, send multiple udp msg
164     int numObj;
165     if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0)
166       numObj = maxObjsPerMsg;
167     else
168       numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg);
169     *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
170     offset += sizeof(short);
171     int index = (iteration - 1) * maxObjsPerMsg;
172     int i;
173     for(i = 0; i < numObj; i++) {
174       *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i];
175       offset += sizeof(unsigned int);
176     }
177   }
178   int n;
179   if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
180     perror("sendto error- ");
181     printf("DEBUG-> sendto error: errorno %d\n", errno);
182     return -1;
183   }
184   return 0;
185 }
186
187 /* Function searches given oid in prefetch cache and invalidates obj from cache
188  * returns -1 on error and 0 on success */
189 int invalidateFromPrefetchCache(char *buffer) {
190   int offset = sizeof(short);
191   /* Read mid from msg */
192   unsigned int mid = *((unsigned int *)(buffer+offset));
193   offset += sizeof(unsigned int);
194   //Invalidate only if broadcast if from different machine
195   if(mid != myIpAddr) {
196     /* Read objects sent */
197     int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
198     offset += sizeof(short);
199     int i;
200     for(i = 0; i < numObjsRecv; i++) {
201       unsigned int oid;
202       oid = *((unsigned int *)(buffer+offset));
203       objheader_t *header;
204       /* Lookup Objects in prefetch cache and remove them */
205       if(((header = prehashSearch(oid)) != NULL)) {
206         prehashRemove(oid);
207       }
208       offset += sizeof(unsigned int);
209     }
210   }
211   return 0;
212 }