1 #include <sys/socket.h>
2 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
10 /************************
12 ***********************/
14 extern unsigned int myIpAddr;
16 int createUdpSocket() {
18 struct sockaddr_in clientaddr;
21 if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
22 perror("socket creation failed");
25 if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
26 perror("setsockopt - SOL_SOCKET");
35 struct sockaddr_in servaddr;
37 //Create Global Udp Socket
38 if((udpSockFd = createUdpSocket()) < 0) {
39 printf("Error in socket\n");
42 sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
48 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
54 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
60 bzero(&servaddr, sizeof(servaddr));
61 servaddr.sin_family = AF_INET;
62 servaddr.sin_port = htons(UDP_PORT);
63 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
65 if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
73 /* Function that listens for udp broadcast messages */
74 void *udpListenBroadcast(void *sockfd) {
75 pthread_t thread_udpBroadcast;
76 struct sockaddr_in servaddr;
77 socklen_t socklen = sizeof(struct sockaddr);
78 char readBuffer[MAX_SIZE];
81 printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
83 memset(readBuffer, 0, MAX_SIZE);
85 int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
87 printf("DEBUG-> Recv Error! \n");
90 short status = *((short *) &readBuffer[0]);
93 if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
94 printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
100 printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
104 /* Close connection */
105 if(close((int)sockfd) == -1)
110 /* Function that invalidate objects that
111 * have been currently modified
112 * returns -1 on error and 0 on success */
113 int invalidateObj(thread_data_array_t *tdata) {
114 struct sockaddr_in clientaddr;
117 bzero(&clientaddr, sizeof(clientaddr));
118 clientaddr.sin_family = AF_INET;
119 clientaddr.sin_port = htons(UDP_PORT);
120 clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
121 int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
122 if(tdata->buffer->f.nummod < maxObjsPerMsg) {
123 /* send single udp msg */
125 if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
126 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
130 /* Split into several udp msgs */
131 int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
132 if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
134 for(i = 1; i <= maxUdpMsg; i++) {
135 if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
136 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
144 /* Function sends a udp broadcast, also distinguishes
145 * msg size to be sent based on the iteration flag
146 * returns -1 on error and 0 on success */
147 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
148 char writeBuffer[MAX_SIZE];
149 int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
151 *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
152 offset += sizeof(short);
153 *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
154 offset += sizeof(unsigned int);
155 if(iteration == 0) { // iteration flag == zero, send single udp msg
156 *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg
157 offset += sizeof(short);
159 for(i = 0; i < tdata->buffer->f.nummod; i++) {
160 *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i]; //copy objects
161 offset += sizeof(unsigned int);
163 } else { // iteration flag > zero, send multiple udp msg
165 if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0)
166 numObj = maxObjsPerMsg;
168 numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
169 *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
170 offset += sizeof(short);
171 int index = (iteration - 1) * maxObjsPerMsg;
173 for(i = 0; i < numObj; i++) {
174 *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
175 offset += sizeof(unsigned int);
179 if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
180 perror("sendto error- ");
181 printf("DEBUG-> sendto error: errorno %d\n", errno);
187 /* Function searches given oid in prefetch cache and invalidates obj from cache
188 * returns -1 on error and 0 on success */
189 int invalidateFromPrefetchCache(char *buffer) {
190 int offset = sizeof(short);
191 /* Read mid from msg */
192 unsigned int mid = *((unsigned int *)(buffer+offset));
193 offset += sizeof(unsigned int);
194 //Invalidate only if broadcast if from different machine
195 if(mid != myIpAddr) {
196 /* Read objects sent */
197 int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
198 offset += sizeof(short);
200 for(i = 0; i < numObjsRecv; i++) {
202 oid = *((unsigned int *)(buffer+offset));
204 /* Lookup Objects in prefetch cache and remove them */
205 if(((header = prehashSearch(oid)) != NULL)) {
208 offset += sizeof(unsigned int);