480b93f438bc30f29bc8e9e043374590969e2bf4
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
1 #include <sys/socket.h>
2 #include <netinet/in.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <math.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
8 #include "altprelookup.h"
9 #ifdef ABORTREADERS
10 #include "abortreaders.h"
11 #endif
12
13 /************************
14  * Global Variables *
15  ***********************/
16 int udpSockFd;
17 extern unsigned int myIpAddr;
18
19 int createUdpSocket() {
20   int sockfd;
21   struct sockaddr_in clientaddr;
22   const int on = 1;
23
24   if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
25     perror("socket creation failed");
26     return -1;
27   }
28   if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
29     perror("setsockopt - SOL_SOCKET");
30     return -1;
31   }
32   return sockfd;
33 }
34
35 int udpInit() {
36   int sockfd;
37   int setsockflag = 1;
38   struct sockaddr_in servaddr;
39
40   //Create Global Udp Socket
41   if((udpSockFd = createUdpSocket()) < 0) {
42     printf("Error in socket\n");
43   }
44
45   sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
46   if(sockfd < 0) {
47     perror("socket");
48     exit(1);
49   }
50
51   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
52     perror("socket");
53     exit(1);
54   }
55
56 #ifdef MAC
57   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
58     perror("socket");
59     exit(1);
60   }
61 #endif
62
63   bzero(&servaddr, sizeof(servaddr));
64   servaddr.sin_family = AF_INET;
65   servaddr.sin_port = htons(UDP_PORT);
66   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
67
68   if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
69     perror("bind");
70     exit(1);
71   }
72
73   return sockfd;
74 }
75
76 /* Function that listens for udp broadcast messages */
77 void *udpListenBroadcast(void *sockfd) {
78   pthread_t thread_udpBroadcast;
79   struct sockaddr_in servaddr;
80   socklen_t socklen = sizeof(struct sockaddr);
81   char readBuffer[MAX_SIZE];
82   int retval;
83
84   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
85
86   while(1) {
87     int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
88     if(bytesRcvd == -1) {
89       printf("DEBUG-> Recv Error! \n");
90       break;
91     }
92     short status = *((short *) &readBuffer[0]);
93     switch (status) {
94     case INVALIDATE_OBJS:
95       if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
96         printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
97         break;
98       }
99       break;
100
101     default:
102       printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
103     }
104   }
105
106   /* Close connection */
107   if(close((int)sockfd) == -1)
108     perror("close");
109   pthread_exit(NULL);
110 }
111
112 /* Function that invalidate objects that
113  * have been currently modified
114  * returns -1 on error and 0 on success */
115 int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) {
116   struct timeval start, end;
117   struct sockaddr_in clientaddr;
118   int retval;
119   int i;
120   int nummod=0;
121   for(i=0; i<pilecount; i++) {
122     nummod+=tdata[i].f.nummod;
123   }
124   bzero(&clientaddr, sizeof(clientaddr));
125   clientaddr.sin_family = AF_INET;
126   clientaddr.sin_port = htons(UDP_PORT);
127   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
128   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
129   /* send single udp msg */
130   if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
131     printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
132     return -1;
133   }
134   return 0;
135 }
136
137 /* Function sends a udp broadcast, also distinguishes
138  * msg size to be sent based on the total number of objects modified
139  * returns -1 on error and 0 on success */
140 int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) {
141   char writeBuffer[MAX_SIZE];
142   int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
143   int offset = 0;
144   int i=0,j=0;
145
146   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
147   offset += sizeof(short);
148   *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
149   offset += sizeof(unsigned int);
150
151   while(nummod>0) {
152     int numtosend=nummod>maxObjsPerMsg ? maxObjsPerMsg : nummod;
153     int localoffset=offset;
154     int sentmsgs=0;
155     *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend);
156     localoffset += sizeof(short);
157
158     for(; j < pilecount; j++) {
159       for(; i < tdata[j].f.nummod; i++) {
160         *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i];  //copy objects
161         localoffset += sizeof(unsigned int);
162         if ((++sentmsgs)==numtosend) {
163           i++;
164           goto send;
165         }
166       }
167       i=0;
168     }
169 send:
170     if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) {
171       perror("sendto error- ");
172       printf("DEBUG-> sendto error: errorno %d\n", errno);
173       return -1;
174     }
175     nummod= nummod - numtosend;
176   }
177   return 0;
178 }
179
180 /* Function searches given oid in prefetch cache and invalidates obj from cache
181  * returns -1 on error and 0 on success */
182 int invalidateFromPrefetchCache(char *buffer) {
183   int offset = sizeof(short);
184   /* Read mid from msg */
185   unsigned int mid = *((unsigned int *)(buffer+offset));
186   offset += sizeof(unsigned int);
187   //Invalidate only if broadcast if from different machine
188   if(mid != myIpAddr) {
189     /* Read objects sent */
190     int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
191     offset += sizeof(short);
192     int i;
193 #ifdef ABORTREADERS
194     removetransaction((unsigned int *)(buffer+offset), numObjsRecv);
195 #endif
196     for(i = 0; i < numObjsRecv; i++) {
197       unsigned int oid;
198       oid = *((unsigned int *)(buffer+offset));
199       objheader_t *header;
200       /* Lookup Objects in prefetch cache and remove them */
201       if(((header = prehashSearch(oid)) != NULL)) {
202         //Keep invalid objects
203         STATUS(header)=DIRTY;
204         //prehashRemove(oid);
205       }
206       offset += sizeof(unsigned int);
207     }
208   }
209   return 0;
210 }
211