Udp invalidation of objects
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index d078e0c42b993aa6dbe69b2964fded9779539a11..ea81203ecec6c562f58c4fa785bd3283fd033554 100644 (file)
@@ -8,6 +8,7 @@
 #include "prelookup.h"
 #include "threadnotify.h"
 #include "queue.h"
+#include "addUdpEnhance.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -145,10 +146,11 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
 
 /* This function starts up the transaction runtime. */
 int dstmStartup(const char * option) {
-  pthread_t thread_Listen;
+  pthread_t thread_Listen, udp_thread_Listen;
   pthread_attr_t attr;
   int master=option!=NULL && strcmp(option, "master")==0;
   int fd;
+  int udpfd;
 
   if (processConfigFile() != 0)
     return 0; //TODO: return error value, cause main program to exit
@@ -165,6 +167,8 @@ int dstmStartup(const char * option) {
   transInit();
   
   fd=startlistening();
+  udpfd = udpInit();
+  pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
   if (master) {
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
@@ -617,11 +621,10 @@ void *transRequest(void *threadarg) {
   bzero((char*) &serv_addr, sizeof(serv_addr));
   serv_addr.sin_family = AF_INET;
   serv_addr.sin_port = htons(LISTEN_PORT);
-  midtoIP(tdata->mid,machineip);
-  machineip[15] = '\0';
-  serv_addr.sin_addr.s_addr = inet_addr(machineip);
+  serv_addr.sin_addr.s_addr = htonl(tdata->mid);
+
   /* Open Connection */
-  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
     perror("Error in connect for TRANS_REQUEST\n");
     close(sd);
     pthread_exit(NULL);
@@ -676,7 +679,13 @@ void *transRequest(void *threadarg) {
       GETSIZE(size, header);
       size += sizeof(objheader_t);
       //make an entry in prefetch hash table
+      void *oldptr;
+      if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+      prehashRemove(oidToPrefetch);
+      prehashInsert(oidToPrefetch, header);
+      } else {
       prehashInsert(oidToPrefetch, header);
+      }
       length = length - size;
       offset += size;
     }
@@ -780,6 +789,11 @@ void decideResponse(thread_data_array_t *tdata) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       return;
     }
+    /* Invalidate objects in other machine cache */
+    if((retval = invalidateObj(tdata)) != 0) {
+      printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -800,7 +814,7 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
     int size;
     unsigned int oid;
     if(oidType == 'R') {
-      oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
+      oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
     } else {
       oid = tdata->buffer->oidmod[i];
     }
@@ -815,11 +829,18 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
     pthread_mutex_unlock(&prefetchcache_mutex);
     memcpy(newAddr, header, (size + sizeof(objheader_t)));
     //make an entry in prefetch hash table
-    prehashInsert(oid, newAddr);
+    void *oldptr;
+    if((oldptr = prehashSearch(oid)) != NULL) {
+      prehashRemove(oid);
+      prehashInsert(oid, newAddr);
+    } else {
+      prehashInsert(oid, newAddr);
+    }
   }
   return 0;
 }
 
+
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function