1 #include <sys/socket.h>
2 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
10 /************************
12 ***********************/
14 extern unsigned int myIpAddr;
16 int createUdpSocket() {
18 struct sockaddr_in clientaddr;
21 if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
22 perror("socket creation failed");
25 if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
26 perror("setsockopt - SOL_SOCKET");
35 struct sockaddr_in servaddr;
37 //Create Global Udp Socket
38 if((udpSockFd = createUdpSocket()) < 0) {
39 printf("Error in socket\n");
42 sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
48 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
54 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
60 bzero(&servaddr, sizeof(servaddr));
61 servaddr.sin_family = AF_INET;
62 servaddr.sin_port = htons(UDP_PORT);
63 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
65 if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
73 /* Function that listens for udp broadcast messages */
74 void *udpListenBroadcast(void *sockfd) {
75 pthread_t thread_udpBroadcast;
76 struct sockaddr_in servaddr;
77 socklen_t socklen = sizeof(struct sockaddr);
78 char readBuffer[MAX_SIZE];
81 printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
83 memset(readBuffer, 0, MAX_SIZE);
85 int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
87 printf("DEBUG-> Recv Error! \n");
90 short status = *((short *) &readBuffer[0]);
93 if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
94 printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
99 printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
103 /* Close connection */
104 if(close((int)sockfd) == -1)
109 /* Function that invalidate objects that
110 * have been currently modified
111 * returns -1 on error and 0 on success */
112 int invalidateObj(thread_data_array_t *tdata) {
113 struct sockaddr_in clientaddr;
116 bzero(&clientaddr, sizeof(clientaddr));
117 clientaddr.sin_family = AF_INET;
118 clientaddr.sin_port = htons(UDP_PORT);
119 clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
120 int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
121 if(tdata->buffer->f.nummod < maxObjsPerMsg) {
122 /* send single udp msg */
124 if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
125 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
129 /* Split into several udp msgs */
130 int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
131 if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
133 for(i = 1; i <= maxUdpMsg; i++) {
134 if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
135 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
143 /* Function sends a udp broadcast, also distinguishes
144 * msg size to be sent based on the iteration flag
145 * returns -1 on error and 0 on success */
146 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
147 char writeBuffer[MAX_SIZE];
148 int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
150 *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
151 offset += sizeof(short);
152 *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
153 offset += sizeof(unsigned int);
154 if(iteration == 0) { // iteration flag == zero, send single udp msg
155 *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg
156 offset += sizeof(short);
158 for(i = 0; i < tdata->buffer->f.nummod; i++) {
159 *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i]; //copy objects
160 offset += sizeof(unsigned int);
162 } else { // iteration flag > zero, send multiple udp msg
164 if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0)
165 numObj = maxObjsPerMsg;
167 numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
168 *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
169 offset += sizeof(short);
170 int index = (iteration - 1) * maxObjsPerMsg;
172 for(i = 0; i < numObj; i++) {
173 *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
174 offset += sizeof(unsigned int);
178 if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
179 perror("sendto error- ");
180 printf("DEBUG-> sendto error: errorno %d\n", errno);
186 /* Function searches given oid in prefetch cache and invalidates obj from cache
187 * returns -1 on error and 0 on success */
188 int invalidateFromPrefetchCache(char *buffer) {
189 int offset = sizeof(short);
190 /* Read mid from msg */
191 unsigned int mid = *((unsigned int *)(buffer+offset));
192 offset += sizeof(unsigned int);
193 //Invalidate only if broadcast if from different machine
194 if(mid != myIpAddr) {
195 /* Read objects sent */
196 int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
197 offset += sizeof(short);
199 for(i = 0; i < numObjsRecv; i++) {
201 oid = *((unsigned int *)(buffer+offset));
203 /* Lookup Objects in prefetch cache and remove them */
204 if(((header = prehashSearch(oid)) != NULL)) {
207 offset += sizeof(unsigned int);