start of new file
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
1 #include <sys/socket.h>
2 #include <netinet/in.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <math.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
8 #include "prelookup.h"
9
10 /************************
11  * Global Variables *
12  ***********************/
13 int udpSockFd;
14 extern unsigned int myIpAddr;
15
16 int createUdpSocket() {
17   int sockfd;
18   struct sockaddr_in clientaddr;
19   const int on = 1;
20
21   if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
22     perror("socket creation failed");
23     return -1;
24   }
25   if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
26     perror("setsockopt - SOL_SOCKET");
27     return -1;
28   }
29   return sockfd;
30 }
31
32 int udpInit() {
33   int sockfd;
34   int setsockflag = 1;
35   struct sockaddr_in servaddr;
36
37   //Create Global Udp Socket
38   if((udpSockFd = createUdpSocket()) < 0) {
39     printf("Error in socket\n");
40   }
41
42   sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
43   if(sockfd < 0) {
44     perror("socket");
45     exit(1);
46   }
47
48   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
49     perror("socket");
50     exit(1);
51   }
52
53 #ifdef MAC 
54   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
55     perror("socket");
56     exit(1);
57   }
58 #endif
59
60   bzero(&servaddr, sizeof(servaddr));
61   servaddr.sin_family = AF_INET;
62   servaddr.sin_port = htons(UDP_PORT);
63   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
64
65   if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
66     perror("bind");
67     exit(1);
68   }
69
70   return sockfd;
71 }
72
73 /* Function that listens for udp broadcast messages */
74 void *udpListenBroadcast(void *sockfd) {
75   pthread_t thread_udpBroadcast;
76   struct sockaddr_in servaddr;
77   socklen_t socklen = sizeof(struct sockaddr);
78   char readBuffer[MAX_SIZE];
79   int retval;
80
81   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
82
83   memset(readBuffer, 0, MAX_SIZE);
84   while(1) {
85     int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
86     if(bytesRcvd == -1) {
87       printf("DEBUG-> Recv Error! \n");
88       break;
89     }
90     short status = *((short *) &readBuffer[0]);
91     switch (status) {
92       case INVALIDATE_OBJS:
93         if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
94           printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
95           break;
96         }
97         break;
98       default:
99         printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
100     }
101   }
102
103   /* Close connection */
104   if(close((int)sockfd) == -1)
105     perror("close");
106   pthread_exit(NULL);
107 }
108
109 /* Function that invalidate objects that
110  * have been currently modified
111  * returns -1 on error and 0 on success */
112 int invalidateObj(thread_data_array_t *tdata) {
113   struct sockaddr_in clientaddr;
114   int retval;
115
116   bzero(&clientaddr, sizeof(clientaddr));
117   clientaddr.sin_family = AF_INET;
118   clientaddr.sin_port = htons(UDP_PORT);
119   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
120   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
121   if(tdata->buffer->f.nummod < maxObjsPerMsg) {
122     /* send single udp msg */
123     int iteration = 0;
124     if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
125       printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
126       return -1;
127     }
128   } else {
129     /* Split into several udp msgs */
130     int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
131     if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
132     int i;
133     for(i = 1; i <= maxUdpMsg; i++) {
134       if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
135         printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
136         return -1;
137       }
138     }
139   }
140   return 0;
141 }
142
143 /* Function sends a udp broadcast, also distinguishes 
144  * msg size to be sent based on the iteration flag
145  * returns -1 on error and 0 on success */
146 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
147   char writeBuffer[MAX_SIZE];
148   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
149   int offset = 0;
150   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
151   offset += sizeof(short);
152   *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
153   offset += sizeof(unsigned int);
154   if(iteration == 0) { // iteration flag == zero, send single udp msg
155     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg
156     offset += sizeof(short);
157     int i;
158     for(i = 0; i < tdata->buffer->f.nummod; i++) {
159       *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];  //copy objects
160       offset += sizeof(unsigned int);
161     }
162   } else { // iteration flag > zero, send multiple udp msg
163     int numObj;
164     if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) 
165       numObj = maxObjsPerMsg;
166     else  
167       numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
168     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
169     offset += sizeof(short);
170     int index = (iteration - 1) * maxObjsPerMsg;
171     int i;
172     for(i = 0; i < numObj; i++) {
173       *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
174       offset += sizeof(unsigned int);
175     }
176   }
177   int n;
178   if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
179     perror("sendto error- ");
180     printf("DEBUG-> sendto error: errorno %d\n", errno);
181     return -1;
182   }
183   return 0;
184
185
186 /* Function searches given oid in prefetch cache and invalidates obj from cache 
187  * returns -1 on error and 0 on success */
188 int invalidateFromPrefetchCache(char *buffer) {
189   int offset = sizeof(short);
190   /* Read mid from msg */
191   unsigned int mid = *((unsigned int *)(buffer+offset));
192   offset += sizeof(unsigned int);
193   //Invalidate only if broadcast if from different machine
194   if(mid != myIpAddr) {
195     /* Read objects sent */
196     int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
197     int i;
198     for(i = 0; i < numObjsRecv; i++) {
199       unsigned int oid;
200       oid = *((unsigned int *)(buffer+offset));
201       objheader_t *header;
202       /* Lookup Objects in prefetch cache and remove them */
203       if((header = prehashSearch(oid)) != NULL) {
204         prehashRemove(oid);
205       }
206       offset += sizeof(unsigned int);
207     }
208   }
209   return 0;
210 }