#include "prelookup.h"
#include "threadnotify.h"
#include "queue.h"
+#include "addUdpEnhance.h"
#ifdef COMPILER
#include "thread.h"
#endif
/* This function starts up the transaction runtime. */
int dstmStartup(const char * option) {
- pthread_t thread_Listen;
+ pthread_t thread_Listen, udp_thread_Listen;
pthread_attr_t attr;
int master=option!=NULL && strcmp(option, "master")==0;
int fd;
+ int udpfd;
if (processConfigFile() != 0)
return 0; //TODO: return error value, cause main program to exit
transInit();
fd=startlistening();
+ udpfd = udpInit();
+ pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
if (master) {
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- midtoIP(tdata->mid,machineip);
- machineip[15] = '\0';
- serv_addr.sin_addr.s_addr = inet_addr(machineip);
+ serv_addr.sin_addr.s_addr = htonl(tdata->mid);
+
/* Open Connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("Error in connect for TRANS_REQUEST\n");
close(sd);
pthread_exit(NULL);
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
+ void *oldptr;
+ if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+ prehashRemove(oidToPrefetch);
+ prehashInsert(oidToPrefetch, header);
+ } else {
prehashInsert(oidToPrefetch, header);
+ }
length = length - size;
offset += size;
}
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
return;
}
+ /* Invalidate objects in other machine cache */
+ if((retval = invalidateObj(tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
int size;
unsigned int oid;
if(oidType == 'R') {
- oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
} else {
oid = tdata->buffer->oidmod[i];
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(newAddr, header, (size + sizeof(objheader_t)));
//make an entry in prefetch hash table
- prehashInsert(oid, newAddr);
+ void *oldptr;
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ prehashInsert(oid, newAddr);
+ } else {
+ prehashInsert(oid, newAddr);
+ }
}
return 0;
}
+
/* This function sends the final response to remote machines per
* thread in their respective socket id It returns a char that is only
* needed to check the correctness of execution of this function