From b6b0237974fc8c4d7051d1544b617cb4a4810c35 Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 10 Jun 2008 17:17:53 +0000 Subject: [PATCH] Udp invalidation of objects --- .../Runtime/DSTM/interface/addUdpEnhance.c | 176 ++++++++++++++++++ .../Runtime/DSTM/interface/addUdpEnhance.h | 25 +++ Robust/src/Runtime/DSTM/interface/dstm.h | 1 + Robust/src/Runtime/DSTM/interface/sockpool.c | 9 +- Robust/src/Runtime/DSTM/interface/sockpool.h | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 35 +++- 6 files changed, 234 insertions(+), 13 deletions(-) create mode 100644 Robust/src/Runtime/DSTM/interface/addUdpEnhance.c create mode 100644 Robust/src/Runtime/DSTM/interface/addUdpEnhance.h diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c new file mode 100644 index 00000000..8126962c --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c @@ -0,0 +1,176 @@ +#include +#include +#include +#include +#include "addUdpEnhance.h" + +/************************ + * Global Variables * + ***********************/ +int udpSockFd; + +int createUdpSocket() { + int sockfd; + struct sockaddr_in clientaddr; + const int on = 1; + + if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + perror("socket creation failed"); + return -1; + } + if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) { + perror("setsockopt - SOL_SOCKET"); + return -1; + } + return sockfd; +} + +int udpInit() { + int sockfd; + int setsockflag = 1; + struct sockaddr_in servaddr; + + //Create Global Udp Socket + if((udpSockFd = createUdpSocket()) < 0) { + printf("Error in socket\n"); + } + + sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if(sockfd < 0) { + perror("socket"); + exit(1); + } + + if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) { + perror("socket"); + exit(1); + } + +#ifdef MAC + if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) { + perror("socket"); + exit(1); + } +#endif + + bzero(&servaddr, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(UDP_PORT); + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + + if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { + perror("bind"); + exit(1); + } + + return sockfd; +} + +void *udpListenBroadcast(void *sockfd) { + pthread_t thread_udpBroadcast; + struct sockaddr_in servaddr; + char readBuffer[MAX_SIZE]; + socklen_t socklen = sizeof(struct sockaddr); + int retval; + + memset(readBuffer, 0, MAX_SIZE); + printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd); + + while(1) { + //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL); + int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen); + if(bytesRcvd == 0) { + break; + } + + if(bytesRcvd == -1) { + printf("DEBUG-> Recv Error! \n"); + break; + } + + short status = *((short *) &readBuffer[0]); + switch (status) { + case INVALIDATE_OBJS: + if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) { + printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__); + break; + } + break; + default: + printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__); + } + } + +closeconnection: + /* Close connection */ + if(close((int)sockfd) == -1) + perror("close"); + pthread_exit(NULL); +} + +/* Function that sends a broadcast to Invalidate objects that + * have been currently modified */ +int invalidateObj(thread_data_array_t *tdata) { + struct sockaddr_in clientaddr; + //TODO Instead of sending "hello" send modified objects + char writeBuffer[MAX_SIZE]; + //char writeBuffer[] = "hello"; + const int on = 1; + + bzero(&clientaddr, sizeof(clientaddr)); + clientaddr.sin_family = AF_INET; + clientaddr.sin_port = htons(UDP_PORT); + clientaddr.sin_addr.s_addr = INADDR_BROADCAST; + /* Create Udp Message */ + int offset = 0; + *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; + offset += sizeof(short); + *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); + offset += sizeof(short); + int i; + for(i = 0; i < tdata->buffer->f.nummod; i++) { + if(offset == MAX_SIZE) { + if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) { + perror("sendto error- "); + printf("DEBUG-> sendto error: errorno %d\n", errno); + return -1; + } + offset = 0; + } + /* + if(offset >= MAX_SIZE) { + printf("DEBUG-> Large number of objects for one udp message\n"); + return -1; + } + */ + + *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i]; + offset += sizeof(unsigned int); + } + int n; + if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) { + perror("sendto error- "); + printf("DEBUG-> sendto error: errorno %d\n", errno); + return -1; + } + //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer); + return 0; +} + +int invalidateFromPrefetchCache(char *buffer) { + int offset = sizeof(int); + /* Read objects sent */ + int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int); + int i; + for(i = 0; i < numObjs; i++) { + unsigned int oid; + oid = *((unsigned int *)(buffer+offset)); + objheader_t *header; + /* Lookup Objects in prefetch cache and remove them */ + if((header = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + } + offset += sizeof(unsigned int); + } + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h new file mode 100644 index 00000000..7d3d98c1 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h @@ -0,0 +1,25 @@ +#ifndef _UDP_H +#define _UDP_H + +#include "dstm.h" + + +/******************************* + * Udp Message structures + ******************************/ +#define INVALIDATE_OBJS 101 + +/************************* + * Global constants + ************************/ +#define MAX_SIZE 4000 + +/******************************** + * Function Prototypes + *******************************/ +int createUdpSocket(); +int udpInit(); +void *udpListenBroadcast(void *); +int invalidateObj(thread_data_array_t *); +int invalidateFromPrefetchCache(char *); +#endif diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 2bb46769..abfcae6b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -53,6 +53,7 @@ //Transaction id per machine #define TID_LEN 20 #define LISTEN_PORT 2156 +#define UDP_PORT 2158 #include diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.c b/Robust/src/Runtime/DSTM/interface/sockpool.c index 98a537ef..fed260bb 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface/sockpool.c @@ -35,7 +35,6 @@ inline void Lock(volatile unsigned int *s) { } } - sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size) { if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) { printf("Calloc error at %s line %d\n", __FILE__, __LINE__); @@ -78,14 +77,13 @@ int createNewSocket(unsigned int mid) { return sd; } - int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { socknode_t **ptr; int key = mid%(sockhash->size); int sd; Lock(&sockhash->mylock); - ptr=&sockhash->table[key]; + ptr=&(sockhash->table[key]); while(*ptr!=NULL) { if (mid == (*ptr)->mid) { @@ -114,7 +112,7 @@ int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) { int key = mid%(sockhash->size); int sd; - ptr=&sockhash->table[key]; + ptr=&(sockhash->table[key]); while(*ptr!=NULL) { if (mid == (*ptr)->mid) { @@ -142,7 +140,7 @@ int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) { int key = mid%(sockhash->size); int sd; - ptr=&sockhash->table[key]; + ptr=&(sockhash->table[key]); while(*ptr!=NULL) { if (mid == (*ptr)->mid) { @@ -160,7 +158,6 @@ int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) { } } - void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) { Lock(&sockhash->mylock); inusenode->next = sockhash->inuse; diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.h b/Robust/src/Runtime/DSTM/interface/sockpool.h index c85d7da8..be392e47 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.h +++ b/Robust/src/Runtime/DSTM/interface/sockpool.h @@ -2,6 +2,7 @@ #define _SOCKPOOL_H_ #include "dstm.h" +#include "ip.h" int test_and_set(volatile unsigned int *addr); void UnLock(volatile unsigned int *addr); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d078e0c4..ea81203e 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -8,6 +8,7 @@ #include "prelookup.h" #include "threadnotify.h" #include "queue.h" +#include "addUdpEnhance.h" #ifdef COMPILER #include "thread.h" #endif @@ -145,10 +146,11 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short /* This function starts up the transaction runtime. */ int dstmStartup(const char * option) { - pthread_t thread_Listen; + pthread_t thread_Listen, udp_thread_Listen; pthread_attr_t attr; int master=option!=NULL && strcmp(option, "master")==0; int fd; + int udpfd; if (processConfigFile() != 0) return 0; //TODO: return error value, cause main program to exit @@ -165,6 +167,8 @@ int dstmStartup(const char * option) { transInit(); fd=startlistening(); + udpfd = udpInit(); + pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd); if (master) { pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); @@ -617,11 +621,10 @@ void *transRequest(void *threadarg) { bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); - midtoIP(tdata->mid,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); + serv_addr.sin_addr.s_addr = htonl(tdata->mid); + /* Open Connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); close(sd); pthread_exit(NULL); @@ -676,7 +679,13 @@ void *transRequest(void *threadarg) { GETSIZE(size, header); size += sizeof(objheader_t); //make an entry in prefetch hash table + void *oldptr; + if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { + prehashRemove(oidToPrefetch); + prehashInsert(oidToPrefetch, header); + } else { prehashInsert(oidToPrefetch, header); + } length = length - size; offset += size; } @@ -780,6 +789,11 @@ void decideResponse(thread_data_array_t *tdata) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); return; } + /* Invalidate objects in other machine cache */ + if((retval = invalidateObj(tdata)) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + return; + } } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; @@ -800,7 +814,7 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) { int size; unsigned int oid; if(oidType == 'R') { - oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)); + oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); } else { oid = tdata->buffer->oidmod[i]; } @@ -815,11 +829,18 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) { pthread_mutex_unlock(&prefetchcache_mutex); memcpy(newAddr, header, (size + sizeof(objheader_t))); //make an entry in prefetch hash table - prehashInsert(oid, newAddr); + void *oldptr; + if((oldptr = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + prehashInsert(oid, newAddr); + } else { + prehashInsert(oid, newAddr); + } } return 0; } + /* This function sends the final response to remote machines per * thread in their respective socket id It returns a char that is only * needed to check the correctness of execution of this function -- 2.34.1