added dsmdebug file for debugging using macros and setting flags in buildscript
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
1 #include <sys/socket.h>
2 #include <netinet/in.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <math.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
8 #include "prelookup.h"
9
10 /************************
11  * Global Variables *
12  ***********************/
13 int udpSockFd;
14
15 int createUdpSocket() {
16   int sockfd;
17   struct sockaddr_in clientaddr;
18   const int on = 1;
19
20   if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
21     perror("socket creation failed");
22     return -1;
23   }
24   if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
25     perror("setsockopt - SOL_SOCKET");
26     return -1;
27   }
28   return sockfd;
29 }
30
31 int udpInit() {
32   int sockfd;
33   int setsockflag = 1;
34   struct sockaddr_in servaddr;
35
36   //Create Global Udp Socket
37   if((udpSockFd = createUdpSocket()) < 0) {
38     printf("Error in socket\n");
39   }
40
41   sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
42   if(sockfd < 0) {
43     perror("socket");
44     exit(1);
45   }
46
47   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
48     perror("socket");
49     exit(1);
50   }
51
52 #ifdef MAC 
53   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
54     perror("socket");
55     exit(1);
56   }
57 #endif
58
59   bzero(&servaddr, sizeof(servaddr));
60   servaddr.sin_family = AF_INET;
61   servaddr.sin_port = htons(UDP_PORT);
62   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
63
64   if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
65     perror("bind");
66     exit(1);
67   }
68
69   return sockfd;
70 }
71
72 /* Function that listens for udp broadcast messages */
73 void *udpListenBroadcast(void *sockfd) {
74   pthread_t thread_udpBroadcast;
75   struct sockaddr_in servaddr;
76   socklen_t socklen = sizeof(struct sockaddr);
77   char readBuffer[MAX_SIZE];
78   int retval;
79
80   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
81
82   memset(readBuffer, 0, MAX_SIZE);
83   while(1) {
84     int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
85     if(bytesRcvd == -1) {
86       printf("DEBUG-> Recv Error! \n");
87       break;
88     }
89     short status = *((short *) &readBuffer[0]);
90     switch (status) {
91       case INVALIDATE_OBJS:
92         if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
93           printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
94           break;
95         }
96         break;
97       default:
98         printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
99     }
100   }
101
102   /* Close connection */
103   if(close((int)sockfd) == -1)
104     perror("close");
105   pthread_exit(NULL);
106 }
107
108 /* Function that invalidate objects that
109  * have been currently modified
110  * returns -1 on error and 0 on success */
111 int invalidateObj(thread_data_array_t *tdata) {
112   struct sockaddr_in clientaddr;
113   int retval;
114
115   bzero(&clientaddr, sizeof(clientaddr));
116   clientaddr.sin_family = AF_INET;
117   clientaddr.sin_port = htons(UDP_PORT);
118   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
119   int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
120   if(tdata->buffer->f.nummod < maxObjsPerMsg) {
121     /* send single udp msg */
122     int iteration = 0;
123     if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
124       printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
125       return -1;
126     }
127   } else {
128     /* Split into several udp msgs */
129     int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
130     if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
131     int i;
132     for(i = 1; i <= maxUdpMsg; i++) {
133       if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
134         printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
135         return -1;
136       }
137     }
138   }
139   return 0;
140 }
141
142 /* Function sends a udp broadcast, also distinguishes 
143  * msg size to be sent based on the iteration flag
144  * returns -1 on error and 0 on success */
145 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
146   char writeBuffer[MAX_SIZE];
147   int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
148   int offset = 0;
149   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
150   offset += sizeof(short);
151   if(iteration == 0) { // iteration flag == zero, send single udp msg
152     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg
153     offset += sizeof(short);
154     int i;
155     for(i = 0; i < tdata->buffer->f.nummod; i++) {
156       *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];  //copy objects
157       offset += sizeof(unsigned int);
158     }
159   } else { // iteration flag > zero, send multiple udp msg
160     int numObj;
161     if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) 
162       numObj = maxObjsPerMsg;
163     else  
164       numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
165     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
166     offset += sizeof(short);
167     int index = (iteration - 1) * maxObjsPerMsg;
168     int i;
169     for(i = 0; i < numObj; i++) {
170       *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
171       offset += sizeof(unsigned int);
172     }
173   }
174   int n;
175   if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
176     perror("sendto error- ");
177     printf("DEBUG-> sendto error: errorno %d\n", errno);
178     return -1;
179   }
180   return 0;
181
182
183 /* Function searches given oid in prefetch cache and invalidates obj from cache 
184  * returns -1 on error and 0 on success */
185 int invalidateFromPrefetchCache(char *buffer) {
186   int offset = sizeof(short);
187   /* Read objects sent */
188   int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
189   int i;
190   for(i = 0; i < numObjsRecv; i++) {
191     unsigned int oid;
192     oid = *((unsigned int *)(buffer+offset));
193     objheader_t *header;
194     /* Lookup Objects in prefetch cache and remove them */
195     if((header = prehashSearch(oid)) != NULL) {
196       prehashRemove(oid);
197     }
198     offset += sizeof(unsigned int);
199   }
200   return 0;
201 }