--- /dev/null
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <string.h>
+#include "addUdpEnhance.h"
+
+/************************
+ * Global Variables *
+ ***********************/
+int udpSockFd;
+
+int createUdpSocket() {
+ int sockfd;
+ struct sockaddr_in clientaddr;
+ const int on = 1;
+
+ if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ perror("socket creation failed");
+ return -1;
+ }
+ if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
+ perror("setsockopt - SOL_SOCKET");
+ return -1;
+ }
+ return sockfd;
+}
+
+int udpInit() {
+ int sockfd;
+ int setsockflag = 1;
+ struct sockaddr_in servaddr;
+
+ //Create Global Udp Socket
+ if((udpSockFd = createUdpSocket()) < 0) {
+ printf("Error in socket\n");
+ }
+
+ sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if(sockfd < 0) {
+ perror("socket");
+ exit(1);
+ }
+
+ if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
+
+#ifdef MAC
+ if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
+ perror("socket");
+ exit(1);
+ }
+#endif
+
+ bzero(&servaddr, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_port = htons(UDP_PORT);
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
+ perror("bind");
+ exit(1);
+ }
+
+ return sockfd;
+}
+
+void *udpListenBroadcast(void *sockfd) {
+ pthread_t thread_udpBroadcast;
+ struct sockaddr_in servaddr;
+ char readBuffer[MAX_SIZE];
+ socklen_t socklen = sizeof(struct sockaddr);
+ int retval;
+
+ memset(readBuffer, 0, MAX_SIZE);
+ printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
+
+ while(1) {
+ //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
+ int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
+ if(bytesRcvd == 0) {
+ break;
+ }
+
+ if(bytesRcvd == -1) {
+ printf("DEBUG-> Recv Error! \n");
+ break;
+ }
+
+ short status = *((short *) &readBuffer[0]);
+ switch (status) {
+ case INVALIDATE_OBJS:
+ if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
+ printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
+ break;
+ default:
+ printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
+ }
+ }
+
+closeconnection:
+ /* Close connection */
+ if(close((int)sockfd) == -1)
+ perror("close");
+ pthread_exit(NULL);
+}
+
+/* Function that sends a broadcast to Invalidate objects that
+ * have been currently modified */
+int invalidateObj(thread_data_array_t *tdata) {
+ struct sockaddr_in clientaddr;
+ //TODO Instead of sending "hello" send modified objects
+ char writeBuffer[MAX_SIZE];
+ //char writeBuffer[] = "hello";
+ const int on = 1;
+
+ bzero(&clientaddr, sizeof(clientaddr));
+ clientaddr.sin_family = AF_INET;
+ clientaddr.sin_port = htons(UDP_PORT);
+ clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
+ /* Create Udp Message */
+ int offset = 0;
+ *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
+ offset += sizeof(short);
+ *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
+ offset += sizeof(short);
+ int i;
+ for(i = 0; i < tdata->buffer->f.nummod; i++) {
+ if(offset == MAX_SIZE) {
+ if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+ perror("sendto error- ");
+ printf("DEBUG-> sendto error: errorno %d\n", errno);
+ return -1;
+ }
+ offset = 0;
+ }
+ /*
+ if(offset >= MAX_SIZE) {
+ printf("DEBUG-> Large number of objects for one udp message\n");
+ return -1;
+ }
+ */
+
+ *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
+ offset += sizeof(unsigned int);
+ }
+ int n;
+ if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+ perror("sendto error- ");
+ printf("DEBUG-> sendto error: errorno %d\n", errno);
+ return -1;
+ }
+ //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
+ return 0;
+}
+
+int invalidateFromPrefetchCache(char *buffer) {
+ int offset = sizeof(int);
+ /* Read objects sent */
+ int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
+ int i;
+ for(i = 0; i < numObjs; i++) {
+ unsigned int oid;
+ oid = *((unsigned int *)(buffer+offset));
+ objheader_t *header;
+ /* Lookup Objects in prefetch cache and remove them */
+ if((header = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
+ offset += sizeof(unsigned int);
+ }
+ return 0;
+}
}
}
-
sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size) {
if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) {
printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
return sd;
}
-
int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
socknode_t **ptr;
int key = mid%(sockhash->size);
int sd;
Lock(&sockhash->mylock);
- ptr=&sockhash->table[key];
+ ptr=&(sockhash->table[key]);
while(*ptr!=NULL) {
if (mid == (*ptr)->mid) {
int key = mid%(sockhash->size);
int sd;
- ptr=&sockhash->table[key];
+ ptr=&(sockhash->table[key]);
while(*ptr!=NULL) {
if (mid == (*ptr)->mid) {
int key = mid%(sockhash->size);
int sd;
- ptr=&sockhash->table[key];
+ ptr=&(sockhash->table[key]);
while(*ptr!=NULL) {
if (mid == (*ptr)->mid) {
}
}
-
void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
Lock(&sockhash->mylock);
inusenode->next = sockhash->inuse;
#include "prelookup.h"
#include "threadnotify.h"
#include "queue.h"
+#include "addUdpEnhance.h"
#ifdef COMPILER
#include "thread.h"
#endif
/* This function starts up the transaction runtime. */
int dstmStartup(const char * option) {
- pthread_t thread_Listen;
+ pthread_t thread_Listen, udp_thread_Listen;
pthread_attr_t attr;
int master=option!=NULL && strcmp(option, "master")==0;
int fd;
+ int udpfd;
if (processConfigFile() != 0)
return 0; //TODO: return error value, cause main program to exit
transInit();
fd=startlistening();
+ udpfd = udpInit();
+ pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
if (master) {
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- midtoIP(tdata->mid,machineip);
- machineip[15] = '\0';
- serv_addr.sin_addr.s_addr = inet_addr(machineip);
+ serv_addr.sin_addr.s_addr = htonl(tdata->mid);
+
/* Open Connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("Error in connect for TRANS_REQUEST\n");
close(sd);
pthread_exit(NULL);
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
+ void *oldptr;
+ if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+ prehashRemove(oidToPrefetch);
+ prehashInsert(oidToPrefetch, header);
+ } else {
prehashInsert(oidToPrefetch, header);
+ }
length = length - size;
offset += size;
}
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
return;
}
+ /* Invalidate objects in other machine cache */
+ if((retval = invalidateObj(tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
int size;
unsigned int oid;
if(oidType == 'R') {
- oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
} else {
oid = tdata->buffer->oidmod[i];
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(newAddr, header, (size + sizeof(objheader_t)));
//make an entry in prefetch hash table
- prehashInsert(oid, newAddr);
+ void *oldptr;
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ prehashInsert(oid, newAddr);
+ } else {
+ prehashInsert(oid, newAddr);
+ }
}
return 0;
}
+
/* This function sends the final response to remote machines per
* thread in their respective socket id It returns a char that is only
* needed to check the correctness of execution of this function