#include "addPrefetchEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
extern int numprefetchsites; // Number of prefetch sites
extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
* we take action accordingly */
void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
if(numLocal < ntuples) {
- /* prefetch not found locally(miss in cache) */
+ /* prefetch not found locally(miss in cache); turn on prefetching*/
evalPrefetch[siteid].operMode = 1;
evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL;
} else {
+ //Turn off prefetch site
if(getOperationMode(siteid) != 0) {
evalPrefetch[siteid].uselesscount--;
if(evalPrefetch[siteid].uselesscount <= 0) {
newAddr->version += 1;
newAddr->notifylist = NULL;
}
+ STATUS(newAddr)=0;
+
//make an entry in prefetch lookup hashtable
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, newAddr);
- } else {
- prehashInsert(oid, newAddr);
- }
+ prehashInsert(oid, newAddr);
} //end of for
return 0;
}
#define _ADDPREFETCHENHANCE_H_
#include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
#include "gCollect.h"
typedef struct prefetchCountStats {
#include <math.h>
#include <netinet/tcp.h>
#include "addUdpEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#ifdef ABORTREADERS
#include "abortreaders.h"
#endif
objheader_t *header;
/* Lookup Objects in prefetch cache and remove them */
if(((header = prehashSearch(oid)) != NULL)) {
- prehashRemove(oid);
+ //Keep invalid objects
+ STATUS(header)=DIRTY;
+ //prehashRemove(oid);
}
offset += sizeof(unsigned int);
}
#define _ADDUDPENHANCE_H
#include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
/*******************************
}
unsigned int prehashRemove(unsigned int key) {
- int index;
- prehashlistnode_t *prev;
- prehashlistnode_t *ptr, *node;
-
- //eom
- unsigned int keyindex=key>>1;
+ unsigned int keyindex = key >> 1;
volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+ prehashlistnode_t *node, *prev;
while(!write_trylock(lockptr)) {
sched_yield();
}
-
prehashlistnode_t *curr = &pflookup.table[keyindex&pflookup.mask];
- //eom
-
- for (; curr != NULL; curr = curr->next) {
- if (curr->key == key) {
- // Find a match in the hash table
- //decrement the number of elements in the global hashtable
+ // If there are no elements
+ //delete from first bin of table
+ if (curr->next == NULL && curr->key == key) {
+ curr->key = 0;
+ //TODO free(val) ?
+ curr->val = NULL;
+ atomic_dec(&(pflookup.numelements));
+ write_unlock(lockptr);
+ return 0;
+ }
+ //delete from first bin of table but elements follow in linked list
+ if (curr->next != NULL && curr->key == key) {
+ curr->key = curr->next->key;
+ curr->val = curr->next->val;
+ node = curr->next;
+ curr->next = node->next;
+ free(node);
+ atomic_dec(&(pflookup.numelements));
+ write_unlock(lockptr);
+ return 0;
+ }
+ prev = curr;
+ curr = curr->next;
+ //delete from elements in the linked list
+ for(; curr != NULL; curr = curr->next) {
+ if (curr->key == key) {
+ prev->next = curr->next;
+ free(curr);
atomic_dec(&(pflookup.numelements));
-
- if ((curr == &ptr[index]) && (curr->next == NULL)) {
- // Delete the first item inside the hashtable with no linked list of prehashlistnode_t
- curr->key = 0;
- curr->val = NULL;
- } else if ((curr == &ptr[index]) && (curr->next != NULL)) {
- //Delete the first item with a linked list of prehashlistnode_t connected
- curr->key = curr->next->key;
- curr->val = curr->next->val;
- node = curr->next;
- curr->next = curr->next->next;
- free(node);
- } else {
- // Regular delete from linked listed
- prev->next = curr->next;
- free(curr);
- }
- //pthread_mutex_unlock(&pflookup.lock);
- write_unlock(lockptr);
+ write_unlock(lockptr);
return 0;
}
prev = curr;
}
write_unlock(lockptr);
-
return 1;
}
-
+
unsigned int prehashResize(unsigned int newsize) {
prehashlistnode_t *node, *ptr; // curr and next keep track of the current and the next chashlistnodes in a linked list
unsigned int oldsize;
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
+#include <math.h>
#define PORT 8500
/* REPLACE with your server machine name*/
#define DIRSIZE 64
-#define NUMITER 1024
+#define NUMITER 10000
static __inline__ unsigned long long rdtsc(void)
}
//printf("DEBUG: dir[0]= %lld\n", dir[0]);
array2[i]=rdtsc() - dir[0];
+ printf("%lld\n", array2[i]);
}
for(i=0;i<(NUMITER-1);i++) {
norm += array2[i];
}
+
+
/* spew-out the results */
//printf("DEBUG: Average offset= %lld\n", (norm/(NUMITER-1)));
+ long long average=(norm/(NUMITER-1));
+ printf("average= %lld",(norm/(NUMITER-1)));
+ long long stddev, avg1=0;
+ for(i=0;i<(NUMITER-1);i++) {
+ avg1 += ((array2[i] - average) * (array2[i] - average));
+ }
+ float ans = (avg1/(NUMITER-1));
+ float squareroot= sqrt(ans);
+ float squareroot2= sqrt(avg1);
+
+ printf("stddev= %f\n", squareroot);
+ printf("error= %f\n", squareroot2/(NUMITER-1));
+
fprintf(f1,"%lld",(norm/(NUMITER-1)));
close(sd);
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
+#include <math.h>
#define PORT 8500
-#define NUMITER 1024
+#define NUMITER 10000
#define DIRSIZE 1
static __inline__ unsigned long long rdtsc(void)
//printf("DEBUG: dir[0]= %lld\n", dir[0]);
array2[i] = rdtsc();
//printf("DEBUG: array2[i]= %lld\n", array2[i]);
- array1[i]=array2[i] - dir[0];
+ //array1[i]=array2[i] - dir[0];
+ array1[i]= dir[0] - array2[i];
+ printf("%lld\n", array1[i]);
/* acknowledge the message, reply w/ the file names */
if (send(sd_current, &array2[i], sizeof(unsigned long long), MSG_NOSIGNAL) == -1) {
/* spew-out the results */
//printf("DEBUG: Average offset= %lld\n", (norm/(NUMITER-1)));
- fprintf(f1,"%lld",(norm/(NUMITER-1)));
+ long long average=(norm/(NUMITER-1));
+ printf("average= %lld",(norm/(NUMITER-1)));
+
+ long long stddev, avg1=0;
+ for(i=0;i<(NUMITER-1);i++) {
+ avg1 += ((array1[i] - average) * (array1[i] - average));
+ }
+ float ans = (avg1/(NUMITER-1));
+ float squareroot= sqrt(ans);
+ float squareroot2= sqrt(avg1);
+
+ printf("stddev= %f\n", squareroot);
+ printf("error= %f\n", squareroot2/(NUMITER-1));
+ fprintf(f1,"%lld\n",(norm/(NUMITER-1)));
/* give client a chance to properly shutdown */
--- /dev/null
+#ifndef _DEBUGMACRO_H_
+#define _DEBUGMACRO_H_
+
+/** Macro to print oid and object type **/
+//#define LOGOIDTYPES //turn on printing oid and type events
+#ifdef LOGOIDTYPES
+#define LOGOIDTYPE(x,y,z,t) printf("[%s: %u %u %lld]\n", x, y, z, t);
+#else
+#define LOGOIDTYPE(x,y,z,t)
+#endif
+
+
+/** Macro to print prefetch site id **/
+//#define LOGPREFETCHSITES
+#ifdef LOGPREFETCHSITES
+#define LOGPREFETCHSITE(PTR) printf("[siteid= %u] ", PTR->siteid);
+#else
+#define LOGPREFETCHSITE(PTR)
+#endif
+
+
+/*
+#define LOGEVENTS //turn on Logging events
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+ int tmp=bigindex++; \
+ bigarray[tmp]=x; \
+ }
+#else
+#define LOGEVENT(x)
+#endif
+*/
+
+/**
+ * Record Time after clock synchronization
+ **/
+/*
+#define LOGTIMES
+#ifdef LOGTIMES
+char bigarray1[8*1024*1024];
+unsigned int bigarray2[8*1024*1024];
+unsigned int bigarray3[8*1024*1024];
+long long bigarray4[8*1024*1024];
+int bigindex1=0;
+#define LOGTIME(x,y,z,a) {\
+ int tmp=bigindex1++; \
+ bigarray1[tmp]=x; \
+ bigarray2[tmp]=y; \
+ bigarray3[tmp]=z; \
+ bigarray4[tmp]=a; \
+}
+#else
+#define LOGTIME(x,y,z,a)
+#endif
+*/
+
+#endif
#define UDP_PORT 2158
//Prefetch tuning paramters
//#define RETRYINTERVAL 20 //N (For Em3d, SOR, Moldyn benchmarks)
-//#define SHUTDOWNINTERVAL 3 //M
-#define RETRYINTERVAL 20 //N (For MatrixMultiply, 2DFFT benchmarks)
-#define SHUTDOWNINTERVAL 1 //M
+//#define SHUTDOWNINTERVAL 3 //M
+#define RETRYINTERVAL 100 //N (For MatrixMultiply, 2DFFT, 2DConv benchmarks)
+#define SHUTDOWNINTERVAL 1 //M
#include <stdlib.h>
#include <stdio.h>
void randomdelay();
void transStart();
+//#define TRANSREAD(x,y,z(tobe passed as a parameter to transRead2)) {
#define TRANSREAD(x,y) { \
unsigned int inputvalue;\
if ((inputvalue=(unsigned int)y)==0) x=NULL;\
void prefetch(int, int, unsigned int *, unsigned short *, short*);
void *transPrefetch(void *);
void *mcqProcess(void *);
-prefetchpile_t *foundLocal(char *, int); // returns node with prefetch elements(oids, offsets)
-int lookupObject(unsigned int * oid, short offset);
-int checkoid(unsigned int oid);
+prefetchpile_t *foundLocal(char *, int, int); // returns node with prefetch elements(oids, offsets, siteid)
+int lookupObject(unsigned int * oid, short offset, int *);
+int checkoid(unsigned int oid, int isLastOffset);
int transPrefetchProcess(int **, short);
-void sendPrefetchReq(prefetchpile_t*, int);
+void sendPrefetchReq(prefetchpile_t*, int, int);
void sendPrefetchReqnew(prefetchpile_t*, int);
int getPrefetchResponse(int, struct readstruct *);
unsigned short getObjType(unsigned int oid);
void commitCountForObjRead(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
void commitCountForObjMod(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
+long long myrdtsc(void);
/* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
#include <netinet/tcp.h>
#include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
#include "llookup.h"
#include "threadnotify.h"
#include "prefetch.h"
#endif
#include "gCollect.h"
#include "readstruct.h"
+#include "debugmacro.h"
#define BACKLOG 10 //max pending connections
#define RECEIVE_BUFFER_SIZE 2048
extern int classsize[];
extern int numHostsInSystem;
extern pthread_mutex_t notifymutex;
+extern unsigned long long clockoffset;
+long long startreq, endreq, diff;
+
+//#define LOGTIMES
+#ifdef LOGTIMES
+extern char bigarray1[6*1024*1024];
+extern unsigned int bigarray2[6*1024*1024];
+extern unsigned int bigarray3[6*1024*1024];
+extern long long bigarray4[6*1024*1024];
+extern int bigarray5[6*1024*1024];
+extern int bigindex1;
+#define LOGTIME(x,y,z,a,b) {\
+ int tmp=bigindex1; \
+ bigarray1[tmp]=x; \
+ bigarray2[tmp]=y; \
+ bigarray3[tmp]=z; \
+ bigarray4[tmp]=a; \
+ bigarray5[tmp]=b; \
+ bigindex1++; \
+}
+#else
+#define LOGTIME(x,y,z,a,b)
+#endif
+
+
+long long myrdtsc(void)
+{
+ unsigned hi, lo;
+ __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi));
+ return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
+}
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
break;
}
#else
+ LOGTIME('X',0,0,myrdtsc(),0);
if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
break;
free(oidlocked);
}
*/
- //control=TRANS_DISAGREE;
+ control=TRANS_DISAGREE;
send_data(acceptfd, &control, sizeof(char));
#ifdef CACHE
send_data(acceptfd, &numBytes, sizeof(int));
*numBytes += size;
/* Send TRANS_DISAGREE to Coordinator */
*control = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
//Keep track of oid locked
oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
size += sizeof(objheader_t);
*numBytes += size;
*control = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
}
}
*numBytes += size;
/* Send TRANS_DISAGREE to Coordinator */
*control = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
//Keep track of oid locked
oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
size += sizeof(objheader_t);
*numBytes += size;
*control = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
}
}
unsigned short version;
/* Process each oid in the machine pile/ group per thread */
- //printf("DEBUG: index= %d, numread= %d, nummod= %d numread+nummod= %d\n", index,numread,nummod,numread+nummod);
for (i = index; i < numread+nummod; i++) {
- //printf("DEBUG: i= %d\n", i);
- //fflush(stdout);
if (i < numread) { //Objs only read and not modified
int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
incr *= i;
* Looks for the objects to be prefetched in the main object store.
* If objects are not found then record those and if objects are found
* then use offset values to prefetch references to other objects */
-
int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
- int i, size, objsize, numoffset = 0;
+ int i, size, objsize, numoffset = 0, gid=0;
int length;
char *recvbuffer, control;
unsigned int oid, mid=-1;
oidmidpair_t oidmid;
struct writestruct writebuffer;
int sd = -1;
+
while(1) {
recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
if(numoffset == -1)
writebuffer.offset=0;
}
short offsetarry[numoffset];
+ recv_data_buf((int)acceptfd, readbuffer, &gid, sizeof(int));
recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
+ LOGTIME('A',oid ,0,myrdtsc(),gid); //after recv the entire prefetch request
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found in buffer for later use */
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
char sendbuffer[size+1];
sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *) (sendbuffer+sizeof(char))) = size;
*((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
*((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+ *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
send_buf(sd, &writebuffer, sendbuffer, size+1);
+ LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
} else { /* Object Found */
int incr = 1;
GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
char sendbuffer[size+1];
sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *)(sendbuffer + incr)) = size;
incr += sizeof(char);
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
+ *((int *)(sendbuffer+incr)) = gid;
+ incr += sizeof(int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
send_buf(sd, &writebuffer, sendbuffer, size+1);
+ LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
+ LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
/* Calculate the oid corresponding to the offset value */
for(i = 0 ; i< numoffset ; i++) {
if (oid==0)
break;
+ LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request
+
if((header = mhashSearch(oid)) == NULL) {
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
char sendbuffer[size+1];
sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *) (sendbuffer+1)) = size;
*((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
*((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
+ *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
send_buf(sd, &writebuffer, sendbuffer, size+1);
+ LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
break;
} else { /* Obj Found */
int incr = 1;
GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
char sendbuffer[size+1];
sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *)(sendbuffer + incr)) = size;
incr += sizeof(char);
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
+ *((int *)(sendbuffer+incr)) = gid;
+ incr += sizeof(int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
send_buf(sd, &writebuffer, sendbuffer, size+1);
+ LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
+ LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
}
} //end of for
}
} //end of while
+
//Release socket
if (mid!=-1) {
forcesend_buf(sd, &writebuffer, NULL, 0);
#include "gCollect.h"
-#if 0
#include "altprelookup.h"
-#else
-#inlcude "prelookup.h"
-#endif
extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
return ptr;
}
-#if 0
void clearBlock(objstr_t *block) {
unsigned long int tmpbegin=(unsigned int)block;
unsigned long int tmpend=(unsigned int)block->top;
int i, j;
prehashlistnode_t *ptr;
- //pthread_mutex_lock(&pflookup.lock);
- /*
- for(i=0;i<PRENUMLOCKS;i++) {
- volatile unsigned int * lockptr=&pflookup.larray[i].lock;
-
- while(!write_trylock(lockptr)) {
- sched_yield();
- }
- }
- */
int lockindex=0;
ptr = pflookup.table;
while(!write_trylock(lockptr_new)){
sched_yield();
}
- //printf("grab new lock id=%d for %d\n",lockindex,i);
write_unlock(lockptr_current);
lockptr_current=lockptr_new;
}
}// end of for (pflokup)
write_unlock(lockptr_current);
-
-}
-#else
-void clearBlock(objstr_t *block) {
- unsigned long int tmpbegin=(unsigned int)block;
- unsigned long int tmpend=(unsigned int)block->top;
- int i, j;
- prehashlistnode_t *ptr;
- pthread_mutex_lock(&pflookup.lock);
-
- ptr = pflookup.table;
- for(i = 0; i<pflookup.size; i++) {
- prehashlistnode_t *orig=&ptr[i];
- prehashlistnode_t *curr = orig;
- prehashlistnode_t *next=curr->next;
- for(; next != NULL; curr=next, next = next->next) {
- unsigned int val=(unsigned int)next->val;
- if ((val>=tmpbegin)&(val<tmpend)) {
- prehashlistnode_t *tmp=curr->next=next->next;
- free(next);
- next=curr;
- //loop condition is broken now...need to check before incrementing
- //if (next==NULL)
- // break;
- }
- }
- {
- unsigned int val=(unsigned int)orig->val;
- if ((val>=tmpbegin)&(val<tmpend)) {
- if (orig->next==NULL) {
- orig->key=0;
- orig->val=NULL;
- } else {
- next=orig->next;
- orig->val=next->val;
- orig->key=next->key;
- orig->next=next->next;
- free(next);
- }
- }
- }
- }
- pthread_mutex_unlock(&pflookup.lock);
}
-#endif
objstr_t *allocateNew(unsigned int size) {
objstr_t *tmp;
#include "machinepile.h"
-void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) {
+void insertPile(int mid, unsigned int oid, int siteid, short numoffset, short *offset, prefetchpile_t **head) {
prefetchpile_t *ptr;
objpile_t *objnode;
unsigned int *oidarray;
objnode->offset = offset;
objnode->oid = oid;
objnode->numoffset = numoffset;
+ objnode->siteid = siteid;
objnode->next = NULL;
tmp->objpiles = objnode;
tmp->next = *head;
objnode->offset = offset;
objnode->oid = oid;
objnode->numoffset = numoffset;
+ objnode->siteid = siteid;
objnode->next = *tmp;
*tmp = objnode;
return;
objnode->offset = offset;
objnode->oid = oid;
objnode->numoffset = numoffset;
+ objnode->siteid = siteid;
objnode->next = *tmp;
*tmp = objnode;
return;
#include <stdio.h>
#include <stdlib.h>
-void insertPile(int, unsigned int, short, short *, prefetchpile_t **);
+//add prefetch site as an argument for debugging
+void insertPile(int, unsigned int, int, short, short *, prefetchpile_t **);
#endif
//Structure to make machine groups when prefetching
typedef struct objpile {
unsigned int oid;
+ int siteid;
short numoffset;
short *offset;
struct objpile *next;
#include "prefetch.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#include "sockpool.h"
#include "gCollect.h"
void * oldptr;
if((oldptr = prehashSearch(oid)) != NULL) {
if(((objheader_t *)oldptr)->version < ((objheader_t *)ptr)->version) {
- //prehashRemove(oid);
prehashInsert(oid, ptr);
}
} else {
size-=objsize;
}
- pthread_mutex_lock(&pflookup.lock);
- pthread_cond_broadcast(&pflookup.cond);
- pthread_mutex_unlock(&pflookup.lock);
} else if(control == OBJECT_NOT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
} else {
#include "addPrefetchEnhance.h"
#include <signal.h>
#include <fcntl.h>
+#include <sys/utsname.h>
extern int numTransAbort;
extern int numTransCommit;
extern int nchashSearch;
extern int nmhashSearch;
extern int nprehashSearch;
+extern int ndirtyCacheObj;
extern int nRemoteSend;
extern int nSoftAbort;
extern int bytesSent;
void transStatsHandler(int sig, siginfo_t* info, void *context) {
#ifdef TRANSSTATS
- FILE *fp;
- if ((fp = fopen("/tmp/client_stats.txt", "a+")) == NULL) {
+ char filepath[200], exectime[10];
+ struct utsname buf;
+ FILE *fp, *envfp;
+
+ if ((envfp = fopen("/home/adash/.tmpenvs", "r")) == NULL) {
+ fprintf(stderr, "Error opening file .tmpenvfs");
+ exit(-1);
+ }
+ memset(filepath, 0, 200);
+ fscanf(envfp, "%s\n", filepath);
+ uname(&buf);
+ strncat(filepath + strlen(filepath), buf.nodename, 4);
+ strcat(filepath, (const char *) ".txt");
+
+ memset(exectime, 0, 10);
+ fscanf(envfp, "%s\n", exectime);
+ fclose(envfp);
+
+ if ((fp = fopen(filepath, "a+")) == NULL) {
exit(-1);
}
+
fprintf(fp, "****** Transaction Stats ******\n");
fprintf(fp, "myIpAddr = %x\n", myIpAddr);
fprintf(fp, "numTransAbort = %d\n", numTransAbort);
fprintf(fp, "nchashSearch = %d\n", nchashSearch);
fprintf(fp, "nmhashSearch = %d\n", nmhashSearch);
fprintf(fp, "nprehashSearch = %d\n", nprehashSearch);
+ fprintf(fp, "ndirtyCacheObj = %d\n", ndirtyCacheObj);
fprintf(fp, "nRemoteReadSend = %d\n", nRemoteSend);
fprintf(fp, "nSoftAbort = %d\n", nSoftAbort);
fprintf(fp, "bytesSent = %d\n", bytesSent);
fprintf(fp, "totalObjSize= %d\n", totalObjSize);
fprintf(fp, "sendRemoteReq= %d\n", sendRemoteReq);
fprintf(fp, "getResponse= %d\n", getResponse);
+ fprintf(fp, "executionTime = %s\n", exectime);
fprintf(fp, "**********************************\n");
fflush(fp);
fclose(fp);
printf("nchashSearch = %d\n", nchashSearch);
printf("nmhashSearch = %d\n", nmhashSearch);
printf("nprehashSearch = %d\n", nprehashSearch);
+ printf("ndirtyCacheObj = %d\n", ndirtyCacheObj);
printf("nRemoteReadSend = %d\n", nRemoteSend);
printf("nSoftAbort = %d\n", nSoftAbort);
printf("bytesSent = %d\n", bytesSent);
// Insert at the first position in the hashtable
ptr[index].threadid = tid;
ptr[index].ndata = ndata;
+ nlookup.numelements++;
} else {
tmp = &ptr[index];
while(tmp != NULL) {
if(tmp->threadid == tid) {
isFound = 1;
tmp->ndata = ndata;
+ pthread_mutex_unlock(&nlookup.locktable);
+ return 0;
}
tmp = tmp->next;
}
node->ndata = ndata;
node->next = ptr[index].next;
ptr[index].next = node;
+ nlookup.numelements++;
}
}
pthread_mutex_unlock(&nlookup.locktable);
#include "dstm.h"
+#include "debugmacro.h"
#include "ip.h"
#include "machinepile.h"
-#include "mlookup.h"
+#include "altmlookup.h"
#include "llookup.h"
#include "plookup.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#include "threadnotify.h"
#include "queue.h"
#include "addUdpEnhance.h"
#define NUM_THREADS 1
#define CONFIG_FILENAME "dstm.conf"
+//#define LOGEVENTS //turn on Logging events
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+ int tmp=bigindex++; \
+ bigarray[tmp]=x; \
+ }
+#else
+#define LOGEVENT(x)
+#endif
+
+//#define LOGTIMES
+#ifdef LOGTIMES
+char bigarray1[6*1024*1024];
+unsigned int bigarray2[6*1024*1024];
+unsigned int bigarray3[6*1024*1024];
+long long bigarray4[6*1024*1024];
+int bigarray5[6*1024*1024];
+int bigindex1=0;
+#define LOGTIME(x,y,z,a,b) {\
+ int tmp=bigindex1; \
+ bigarray1[tmp]=x; \
+ bigarray2[tmp]=y; \
+ bigarray3[tmp]=z; \
+ bigarray4[tmp]=a; \
+ bigarray5[tmp]=b; \
+ bigindex1++; \
+}
+#else
+#define LOGTIME(x,y,z,a,b)
+#endif
+
/* Thread transaction variables */
__thread objstr_t *t_cache;
__thread jmp_buf aborttrans;
#endif
+int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */
/* Global Variables */
extern int classsize[];
int nchashSearch = 0;
int nmhashSearch = 0;
int nprehashSearch = 0;
+int ndirtyCacheObj = 0;
int nRemoteSend = 0;
int nSoftAbort = 0;
int bytesSent = 0;
plistnode_t *createPiles();
plistnode_t *sortPiles(plistnode_t *pileptr);
-//#define LOGEVENTS
-#ifdef LOGEVENTS
-char bigarray[16*1024*1024];
-int bigindex=0;
-#define LOGEVENT(x) { \
- int tmp=bigindex++; \
- bigarray[tmp]=x; \
- }
-#else
-#define LOGEVENT(x)
-#endif
+
/*******************************
* Send and Recv function calls
}
//#define INLINEPREFETCH
-#define PREFTHRESHOLD 4
+#define PREFTHRESHOLD 0
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
int numpref=numavailable();
attempted=1;
- if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
+ if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
node=gettail();
- prefetchpile_t *pilehead = foundLocal(node,numpref);
+ prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
if (pilehead!=NULL) {
// Get sock from shared pool
/* Send Prefetch Request */
prefetchpile_t *ptr = pilehead;
while(ptr != NULL) {
- int sd = getSock2(transPrefetchSockPool, ptr->mid);
- sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
+ globalid++;
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd, globalid);
+ ptr = ptr->next;
}
mcdealloc(pilehead);
- resetqueue();
}
+ resetqueue();
}//end do prefetch if condition
} while(node==NULL);
#else
#ifdef ABORTREADERS
if (t_abort) {
//abort this transaction
- //printf("ABORTING\n");
removetransactionhash();
objstrDelete(t_cache);
t_chashDelete();
} else {
#ifdef CACHE
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+ ndirtyCacheObj++;
+#endif
+ goto remoteread;
+ }
#ifdef TRANSSTATS
nprehashSearch++;
#endif
return objcopy;
#endif
}
+remoteread:
#endif
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(headerObj, objcopy, size+sizeof(objheader_t));
//make an entry in prefetch lookup hashtable
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, headerObj);
- } else {
- prehashInsert(oid, headerObj);
- }
+ prehashInsert(oid, headerObj);
LOGEVENT('B');
#endif
return &objcopy[1];
/* This function finds the location of the objects involved in a transaction
* and returns the pointer to the object if found in a remote location */
__attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
+//DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) {
unsigned int machinenumber;
objheader_t *tmp, *objheader;
objheader_t *objcopy;
#ifdef ABORTREADERS
if (t_abort) {
//abort this transaction
- //printf("ABORTING\n");
removetransactionhash();
objstrDelete(t_cache);
t_chashDelete();
} else {
#ifdef CACHE
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+ ndirtyCacheObj++;
+#endif
+ goto remoteread;
+ }
#ifdef TRANSSTATS
LOGEVENT('P')
nprehashSearch++;
size+=sizeof(objheader_t);
objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
memcpy(objcopy, tmp, size);
+ LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc());
/* Insert into cache's lookup table */
t_chashInsert(OID(tmp), objcopy);
#ifdef COMPILER
return objcopy;
#endif
}
+remoteread:
#endif
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
return NULL;
}
objcopy = getRemoteObj(machinenumber, oid);
-
- if(objcopy == NULL) {
- printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
- return NULL;
- } else {
#ifdef TRANSSTATS
-
LOGEVENT('R');
nRemoteSend++;
#endif
+
+ if(objcopy == NULL) {
+ printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__);
+ return NULL;
+ } else {
#ifdef COMPILER
#ifdef CACHE
+ LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc());
//Copy object to prefetch cache
pthread_mutex_lock(&prefetchcache_mutex);
objheader_t *headerObj;
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(headerObj, objcopy, size+sizeof(objheader_t));
//make an entry in prefetch lookup hashtable
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, headerObj);
- } else {
- prehashInsert(oid, headerObj);
- }
+ prehashInsert(oid, headerObj);
LOGEVENT('B');
#endif
return &objcopy[1];
}
#endif
+#ifdef LOGTIMES
+ int jjj;
+ for(jjj=0; jjj<bigindex1; jjj++) {
+ printf("[%c %u %u %lld %d]\n", bigarray1[jjj], bigarray2[jjj], bigarray3[jjj], bigarray4[jjj], bigarray5[jjj]);
+ }
+#endif
+
#ifdef ABORTREADERS
if (t_abort) {
//abort this transaction
- /* Debug
- * printf("ABORTING TRANSACTION AT COMMIT\n");
- */
removetransactionhash();
objstrDelete(t_cache);
t_chashDelete();
/* Create a socket and getReplyCtrl array, initialize */
int socklist[pilecount];
+ char getReplyCtrl[pilecount];
int loopcount;
- for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++){
socklist[loopcount] = 0;
- char getReplyCtrl[pilecount];
- for(loopcount = 0 ; loopcount < pilecount; loopcount++)
getReplyCtrl[loopcount] = 0;
+ }
/* Process each machine pile */
int sockindex = 0;
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
- prehashRemove(oidToPrefetch);
- prehashInsert(oidToPrefetch, header);
- } else {
- prehashInsert(oidToPrefetch, header);
- }
+ prehashInsert(oidToPrefetch, header);
LOGEVENT('E');
length = length - size;
offset += size;
} while (treplyretry);
if(finalResponse == TRANS_ABORT) {
- //printf("Aborting trans\n");
#ifdef TRANSSTATS
LOGEVENT('A');
numTransAbort++;
control = getReplyCtrl[i];
switch(control) {
default:
- printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+ printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
/* treat as disagree, pass thru */
case TRANS_DISAGREE:
return TRANS_ABORT;
#ifdef CACHE
/* clear objects from prefetch cache */
- cleanPCache();
+ //cleanPCache();
#endif
} else if(transagree == pilecount) {
/* Send Commit */
//Keep track of what is locked
oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //A lock is acquired some place else
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
*getReplyCtrl = TRANS_DISAGREE;
//Keep track of what is locked
oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //Has reached max number of readers or some other transaction
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
return 0;
}
-prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
+prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
int i;
int j;
prefetchpile_t * head=NULL;
unsigned int oid=oidarray[i];
int newbase;
int machinenum;
-
- if (oid==0)
+ int countInvalidObj=0;
+
+ if (oid==0) {
+ numLocal++;
continue;
+ }
//Look up fields locally
+ int isLastOffset=0;
+ if(endindex==0)
+ isLastOffset=1;
for(newbase=baseindex; newbase<endindex; newbase++) {
- if (!lookupObject(&oid, arryfields[newbase]))
+ if(newbase==(endindex-1))
+ isLastOffset=1;
+ if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
break;
+ }
//Ended in a null pointer...
- if (oid==0)
+ if (oid==0) {
+ numLocal++;
goto tuple;
+ }
}
+
//Entire prefetch is local
- if (newbase==endindex&&checkoid(oid)) {
+ if (newbase==endindex&&checkoid(oid,isLastOffset)) {
numLocal++;
goto tuple;
}
+
//Add to remote requests
machinenum=lhashSearch(oid);
- insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+ insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
tuple:
;
}
return head;
}
-int checkoid(unsigned int oid) {
+int checkoid(unsigned int oid, int isLastOffset) {
objheader_t *header;
if ((header=mhashSearch(oid))!=NULL) {
//Found on machine
return 1;
} else if ((header=prehashSearch(oid))!=NULL) {
+ if((STATUS(header) & DIRTY) && isLastOffset) {
+ return 0;
+ }
//Found in cache
return 1;
} else {
}
}
-int lookupObject(unsigned int * oid, short offset) {
+int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
objheader_t *header;
if ((header=mhashSearch(*oid))!=NULL) {
//Found on machine
;
} else if ((header=prehashSearch(*oid))!=NULL) {
//Found in cache
- ;
+ if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+ //only once because later old entries may still cause unnecessary roundtrips during prefetching
+ (*countInvalidObj)+=1;
+ if(*countInvalidObj > 1) {
+ return 0;
+ }
+ }
} else {
return 0;
}
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
int count=numavailable();
- prefetchpile_t *pilehead = foundLocal(node, count);
+ prefetchpile_t *pilehead = foundLocal(node, count, 0);
if (pilehead!=NULL) {
// Get sock from shared pool
/* Send Prefetch Request */
prefetchpile_t *ptr = pilehead;
while(ptr != NULL) {
- int sd = getSock2(transPrefetchSockPool, ptr->mid);
- sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
+ globalid++;
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd,globalid);
+ ptr = ptr->next;
}
/* Release socket */
return;
}
-void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
+/**
+ * parameters: mcpilenode -> pile node to traverse to assemble pref requests
+ * sd -> socket id
+ * gid -> global identifier for each prefetch request sent, starts with 0
+ **/
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
int len, endpair;
char control;
objpile_t *tmp;
struct writestruct writebuffer;
writebuffer.offset=0;
+
/* Send TRANS_PREFETCH control message */
int first=1;
/* Send Oids and offsets in pairs */
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
- len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
char oidnoffset[len+5];
char *buf=oidnoffset;
if (first) {
*((int*)buf) = tmp->numoffset;
buf+=sizeof(int);
*((unsigned int *)buf) = tmp->oid;
+ LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc());
#ifdef TRANSSTATS
sendRemoteReq++;
#endif
buf+=sizeof(unsigned int);
*((unsigned int *)buf) = myIpAddr;
- buf += sizeof(unsigned int);
+ buf+= sizeof(unsigned int);
+ *((int*)buf) = gid;
+ buf+=sizeof(int);
memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
tmp = tmp->next;
if (tmp==NULL) {
len+=sizeof(int);
}
if (tmp!=NULL)
- send_buf(sd, & writebuffer, oidnoffset, len);
+ send_buf(sd, &writebuffer, oidnoffset, len);
else
- forcesend_buf(sd, & writebuffer, oidnoffset, len);
+ forcesend_buf(sd, &writebuffer, oidnoffset, len);
}
-
+ LOGOIDTYPE("SREQ",0,0,myrdtsc());
LOGEVENT('S');
+ LOGTIME('S',0,0,myrdtsc(),gid); //after sending
return;
}
int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
- int length = 0, size = 0;
+ int gid,length = 0, size = 0;
char control;
unsigned int oid;
void *modptr, *oldptr;
size = length - sizeof(int);
char recvbuffer[size];
#ifdef TRANSSTATS
- getResponse++;
- LOGEVENT('Z');
+ getResponse++;
+ LOGEVENT('Z');
+ LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv
#endif
- recv_data_buf(sd, readbuffer, recvbuffer, size);
+ recv_data_buf(sd, readbuffer, recvbuffer, size);
control = *((char *) recvbuffer);
if(control == OBJECT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
- size = size - (sizeof(char) + sizeof(unsigned int));
+ gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
+ LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv
+ size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int));
pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
return -1;
}
pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+ memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size);
STATUS(modptr)=0;
+
/* Insert the oid and its address into the prefetch hash lookup table */
/* Do a version comparison if the oid exists */
if((oldptr = prehashSearch(oid)) != NULL) {
/* If older version then update with new object ptr */
if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
- prehashRemove(oid);
- prehashInsert(oid, modptr);
+ prehashInsert(oid, modptr);
}
} else { /* Else add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
- /* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
- /* Broadcast signal on prefetch cache condition variable */
- pthread_cond_broadcast(&pflookup.cond);
- /* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
+ LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc());
+ LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache
} else if(control == OBJECT_NOT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
+ LOGOIDTYPE("NF",oid,0,myrdtsc());
+ LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
//printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
#define NUMPTRS 100
-#define INITIALHEAPSIZE 5000*1024*1024L
+#define INITIALHEAPSIZE 256*1024*1024L
#define GCPOINT(x) ((INTPTR)((x)*0.99))
/* This define takes in how full the heap is initially and returns a new heap size to use */
#define HEAPSIZE(x,y) ((INTPTR)(x+y))*2
#include "DSTM/interface_recovery/prelookup.h"
#else
#include "DSTM/interface/dstm.h"
-#include "DSTM/interface/prelookup.h"
+#include "DSTM/interface/altprelookup.h"
#include "DSTM/interface/prefetch.h"
#endif
#endif