--- /dev/null
+public class Signal {
+ public Signal() {
+ }
+ public native void nativeSigAction();
+ public void sigAction() {
+ nativeSigAction();
+ }
+}
outmethod.println("pthread_exit(NULL);");
}
-
+ outmethod.println("printf(\"numTransAbort= %d\\n\", numTransAbort);");
+ outmethod.println("printf(\"numTransCommit= %d\\n\", numTransCommit);");
outmethod.println("}");
+
}
/* This method outputs code for each task. */
* information. */
private void generateSizeArray(PrintWriter outclassdefs) {
+ outclassdefs.print("int numTransAbort = 0;");
+ outclassdefs.print("int numTransCommit = 0;");
outclassdefs.print("int classsize[]={");
Iterator it=state.getClassSymbolTable().getDescriptorsIterator();
cdarray=new ClassDescriptor[state.numClasses()];
readSourceFile(state, ClassLibraryPrefix+"gnu/Random.java");
readSourceFile(state, ClassLibraryPrefix+"Vector.java");
readSourceFile(state, ClassLibraryPrefix+"Enumeration.java");
+ readSourceFile(state, ClassLibraryPrefix+"Signal.java");
if (state.TASK) {
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
+#include <math.h>
+#include <netinet/tcp.h>
#include "addUdpEnhance.h"
/************************
return sockfd;
}
+/* Function that listens for udp broadcast messages */
void *udpListenBroadcast(void *sockfd) {
pthread_t thread_udpBroadcast;
struct sockaddr_in servaddr;
- char readBuffer[MAX_SIZE];
socklen_t socklen = sizeof(struct sockaddr);
+ char readBuffer[MAX_SIZE];
int retval;
- memset(readBuffer, 0, MAX_SIZE);
printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
+ memset(readBuffer, 0, MAX_SIZE);
while(1) {
- //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
- int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
- if(bytesRcvd == 0) {
- break;
- }
-
+ int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
if(bytesRcvd == -1) {
printf("DEBUG-> Recv Error! \n");
break;
}
-
short status = *((short *) &readBuffer[0]);
switch (status) {
case INVALIDATE_OBJS:
}
}
-closeconnection:
- /* Close connection */
- if(close((int)sockfd) == -1)
- perror("close");
- pthread_exit(NULL);
+ /* Close connection */
+ if(close((int)sockfd) == -1)
+ perror("close");
+ pthread_exit(NULL);
}
-/* Function that sends a broadcast to Invalidate objects that
- * have been currently modified */
+/* Function that invalidate objects that
+ * have been currently modified
+ * returns -1 on error and 0 on success */
int invalidateObj(thread_data_array_t *tdata) {
struct sockaddr_in clientaddr;
- //TODO Instead of sending "hello" send modified objects
- char writeBuffer[MAX_SIZE];
- //char writeBuffer[] = "hello";
- const int on = 1;
+ int retval;
bzero(&clientaddr, sizeof(clientaddr));
clientaddr.sin_family = AF_INET;
clientaddr.sin_port = htons(UDP_PORT);
clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
- /* Create Udp Message */
- int offset = 0;
- *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
- offset += sizeof(short);
- *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
- offset += sizeof(short);
- int i;
- for(i = 0; i < tdata->buffer->f.nummod; i++) {
- if(offset == MAX_SIZE) {
- if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
- perror("sendto error- ");
- printf("DEBUG-> sendto error: errorno %d\n", errno);
+ int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+ if(tdata->buffer->f.nummod < maxObjsPerMsg) {
+ /* send single udp msg */
+ int iteration = 0;
+ if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ } else {
+ /* Split into several udp msgs */
+ int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
+ if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
+ int i;
+ for(i = 1; i <= maxUdpMsg; i++) {
+ if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
return -1;
}
- offset = 0;
- }
- /*
- if(offset >= MAX_SIZE) {
- printf("DEBUG-> Large number of objects for one udp message\n");
- return -1;
}
- */
+ }
+ return 0;
+}
- *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
- offset += sizeof(unsigned int);
+/* Function sends a udp broadcast, also distinguishes
+ * msg size to be sent based on the iteration flag
+ * returns -1 on error and 0 on success */
+int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
+ char writeBuffer[MAX_SIZE];
+ int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+ int offset = 0;
+ *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
+ offset += sizeof(short);
+ if(iteration == 0) { // iteration flag == zero, send single udp msg
+ *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
+ offset += sizeof(short);
+ int i;
+ for(i = 0; i < tdata->buffer->f.nummod; i++) {
+ *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
+ offset += sizeof(unsigned int);
+ }
+ } else { // iteration flag > zero, send multiple udp msg
+ int numObj;
+ if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0)
+ numObj = maxObjsPerMsg;
+ else
+ numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
+ *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
+ offset += sizeof(short);
+ int index = (iteration - 1) * maxObjsPerMsg;
+ int i;
+ for(i = 0; i < numObj; i++) {
+ *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
+ offset += sizeof(unsigned int);
+ }
}
int n;
- if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+ if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
perror("sendto error- ");
printf("DEBUG-> sendto error: errorno %d\n", errno);
return -1;
}
- //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
return 0;
-}
+}
+/* Function searches given oid in prefetch cache and invalidates obj from cache
+ * returns -1 on error and 0 on success */
int invalidateFromPrefetchCache(char *buffer) {
- int offset = sizeof(int);
+ int offset = sizeof(short);
/* Read objects sent */
- int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
+ int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
int i;
- for(i = 0; i < numObjs; i++) {
+ for(i = 0; i < numObjsRecv; i++) {
unsigned int oid;
oid = *((unsigned int *)(buffer+offset));
objheader_t *header;
/*************************
* Global constants
************************/
-#define MAX_SIZE 4000
+#define MAX_SIZE 2000
/********************************
* Function Prototypes
void *udpListenBroadcast(void *);
int invalidateObj(thread_data_array_t *);
int invalidateFromPrefetchCache(char *);
+int sendUdpMsg(thread_data_array_t *, struct sockaddr_in *, int);
#endif
/* Intitialize primary queue */
headoffset=0;
tailoffset=0;
- memory=malloc(QSIZE);
+ memory=malloc(QSIZE+sizeof(int));//leave space for -1
pthread_mutexattr_init(&qlockattr);
pthread_mutexattr_settype(&qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&qlock, &qlockattr);
//Wait for tail to go past new start
while(tailoffset<tmpoffset)
;
- *((int *)(memory+headoffset))=-1;
+ *((int *)(memory+headoffset))=-1;//safe because we left space
*((int*)memory)=size+sizeof(int);
return memory+sizeof(int);
} else {
while(headoffset<tailoffset&&tailoffset<tmpoffset)
;
- *((int*)(memory+headoffset))=size+sizeof(int);
+ *((int*)(memory+headoffset))=size+sizeof(int);
return memory+headoffset+sizeof(int);
}
}
tailoffset=tmpoffset;
}
-
void predealloc() {
free(memory);
}
pthread_mutex_t notifymutex;
pthread_mutex_t atomicObjLock;
+/***********************************
+ * Global Variables for statistics
+ **********************************/
+extern int numTransCommit;
+extern int numTransAbort;
+
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
if (!master)
threadcount--;
#endif
+
+#ifdef TRANSSTATS
+ printf("Trans stats is on\n");
+ fflush(stdout);
+#endif
//Initialize socket pool
transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
thread_data_array_t *thread_data_array;
local_thread_data_array_t *ltdata;
int firsttime=1;
-
+
do {
treplyctrl=0;
trecvcount = 0;
} while (treplyretry);
if(treplyctrl == TRANS_ABORT) {
+#ifdef TRANSSTATS
+ ++numTransAbort;
+#endif
/* Free Resources */
objstrDelete(record->cache);
chashDelete(record->lookupTable);
free(ltdata);
return TRANS_ABORT;
} else if(treplyctrl == TRANS_COMMIT) {
+#ifdef TRANSSTATS
+ ++numTransCommit;
+#endif
/* Free Resources */
objstrDelete(record->cache);
chashDelete(record->lookupTable);
//make an entry in prefetch hash table
void *oldptr;
if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
- prehashRemove(oidToPrefetch);
- prehashInsert(oidToPrefetch, header);
+ prehashRemove(oidToPrefetch);
+ prehashInsert(oidToPrefetch, header);
} else {
- prehashInsert(oidToPrefetch, header);
+ prehashInsert(oidToPrefetch, header);
}
length = length - size;
offset += size;
return;
}
/* Invalidate objects in other machine cache */
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
+ if(tdata->buffer->f.nummod > 0) {
+ if((retval = invalidateObj(tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
}
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
} else {
oid = tdata->buffer->oidmod[i];
}
+ pthread_mutex_lock(&prefetchcache_mutex);
header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
//copy object into prefetch cache
GETSIZE(size, header);
- pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
return -1;
buf+=sizeof(unsigned int);
*((unsigned int *)buf) = myIpAddr;
buf += sizeof(unsigned int);
- memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
+ memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
send_data(sd, oidnoffset, len);
tmp = tmp->next;
}
#include "mem.h"
#include<fcntl.h>
#include<errno.h>
-#include<signal.h>
#include<stdio.h>
#include "option.h"
#ifdef DSTM
--- /dev/null
+#include "runtime.h"
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+extern int numTransAbort;
+extern int numTransCommit;
+
+
+void transStatsHandler(int sig, siginfo_t* info, void *context) {
+#ifdef TRANSSTATS
+ printf("numTransCommit = %d\n", numTransCommit);
+ printf("numTransAbort = %d\n", numTransAbort);
+ exit(0);
+#endif
+}
+
+#ifdef TRANSSTATS
+void CALL00(___Signal______nativeSigAction____) {
+ struct sigaction siga;
+ siga.sa_handler = NULL;
+ siga.sa_flags = SA_SIGINFO;
+ siga.sa_sigaction = &transStatsHandler;
+ sigemptyset(&siga.sa_mask);
+ sigaction(SIGUSR1, &siga, 0);
+}
+#endif
echo -optional enable optional
echo -debug generate debug symbols
echo -prefetch do prefetch analysis
+echo -transstats generates transaction stats on commits and aborts
echo -webinterface enable web interface
echo -runtimedebug printout runtime debug messages
echo "-thread use support for multiple threads"
CHECKFLAG=false
RECOVERFLAG=false
MULTICOREFLAG=false
+TRANSSTATSFLAG=false
RAWFLAG=false
THREADSIMULATEFLAG=false;
USEDMALLOC=false
elif [[ $1 = '-prefetch' ]]
then
JAVAOPTS="$JAVAOPTS -prefetch"
+elif [[ $1 = '-transstats' ]]
+then
+TRANSSTATSFLAG=true
elif [[ $1 = '-printflat' ]]
then
JAVAOPTS="$JAVAOPTS -printflat"
$ROBUSTROOT/Runtime/ObjectHash.c \
$ROBUSTROOT/Runtime/garbage.c $ROBUSTROOT/Runtime/socket.c \
$ROBUSTROOT/Runtime/math.c \
+$ROBUSTROOT/Runtime/signal.c \
$ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c"
if $DSMFLAG
then
EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME"
+if $TRANSSTATSFLAG
+then
+EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DTRANSSTATS -DCOMPILER -DDSTM -I$DSMRUNTIME"
+fi
FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c"
fi