/* Function that invalidate objects that
* have been currently modified
* returns -1 on error and 0 on success */
+int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) {
+ struct timeval start, end;
+ struct sockaddr_in clientaddr;
+ int retval;
+ int i;
+ int nummod=0;
+ for(i=0;i<pilecount;i++) {
+ nummod+=tdata[i].f.nummod;
+ }
+ bzero(&clientaddr, sizeof(clientaddr));
+ clientaddr.sin_family = AF_INET;
+ clientaddr.sin_port = htons(UDP_PORT);
+ clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
+ int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
+ /* send single udp msg */
+ if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+#if 0
int invalidateObj(trans_req_data_t *tdata) {
struct sockaddr_in clientaddr;
int retval;
clientaddr.sin_port = htons(UDP_PORT);
clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
- if(tdata->f.nummod < maxObjsPerMsg) {
- /* send single udp msg */
- int iteration = 0;
- if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
- printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
- }
- } else {
- /* Split into several udp msgs */
- int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg;
- if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++;
- int i;
- for(i = 1; i <= maxUdpMsg; i++) {
- if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
- printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
- return -1;
+ /* send single udp msg */
+ if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+#endif
+
+/* Function sends a udp broadcast, also distinguishes
+ * msg size to be sent based on the total number of objects modified
+ * returns -1 on error and 0 on success */
+int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) {
+ char writeBuffer[MAX_SIZE];
+ int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
+ int offset = 0;
+ int i=0,j=0;
+
+ *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
+ offset += sizeof(short);
+ *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
+ offset += sizeof(unsigned int);
+
+ while(nummod>0) {
+ int numtosend=nummod>maxObjsPerMsg?maxObjsPerMsg:nummod;
+ int localoffset=offset;
+ int sentmsgs=0;
+ *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend);
+ localoffset += sizeof(short);
+
+ for(; j < pilecount; j++) {
+ for(; i < tdata[j].f.nummod; i++) {
+ *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i]; //copy objects
+ localoffset += sizeof(unsigned int);
+ if ((++sentmsgs)==numtosend) {
+ i++;
+ goto send;
+ }
}
+ i=0;
}
+send:
+ if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) {
+ perror("sendto error- ");
+ printf("DEBUG-> sendto error: errorno %d\n", errno);
+ return -1;
+ }
+ nummod= nummod - numtosend;
}
return 0;
}
+#if 0
+
/* Function sends a udp broadcast, also distinguishes
* msg size to be sent based on the iteration flag
* returns -1 on error and 0 on success */
}
return 0;
}
+#endif
/* Function searches given oid in prefetch cache and invalidates obj from cache
* returns -1 on error and 0 on success */
objheader_t *header;
/* Lookup Objects in prefetch cache and remove them */
if(((header = prehashSearch(oid)) != NULL)) {
- prehashRemove(oid);
+ //Keep invalid objects
+ STATUS(header)=DIRTY;
+ //prehashRemove(oid);
}
offset += sizeof(unsigned int);
}
#include "dstm.h"
#include "ip.h"
#include "machinepile.h"
-#include "mlookup.h"
+#include "altmlookup.h"
#include "llookup.h"
#include "plookup.h"
#include "prelookup.h"
return ip;
}
#endif
+
+#define INLINEPREFETCH
+#define PREFTHRESHOLD 0
+
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
/* Allocate for the queue node*/
int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
int len;
- char * node= getmemory(qnodesize);
+#ifdef INLINEPREFETCH
+ int attempted=0;
+ char *node;
+ do {
+ node=getmemory(qnodesize);
+ if (node==NULL&&attempted)
+ break;
+ if (node!=NULL) {
+#else
+ char *node=getmemory(qnodesize);
+#endif
int top=endoffsets[ntuples-1];
- if (node==NULL)
+ if (node==NULL) {
+ LOGEVENT('D');
return;
+ }
/* Set queue node values */
/* TODO: Remove this after testing */
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
+#ifdef INLINEPREFETCH
+ movehead(qnodesize);
+ }
+ int numpref=numavailable();
+ attempted=1;
+
+ if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
+ node=gettail();
+ prefetchpile_t *pilehead = foundLocal(node,numpref);
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ mcdealloc(pilehead);
+ }
+ resetqueue();
+ }//end do prefetch if condition
+ } while(node==NULL);
+#else
/* Lock and insert into primary prefetch queue */
movehead(qnodesize);
+#endif
}
/* This function starts up the transaction runtime. */
retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
} while(retval!=0);
#else
+#ifndef INLINEPREFETCH
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
#endif
+#endif
+#ifndef INLINEPREFETCH
pthread_detach(tPrefetch);
#endif
+#endif
}
/* This function stops the threads spawned */
#ifdef DEBUG
printf("%s-> Final Response: %d\n", __func__, (int)finalResponse);
#endif
-
+
+#ifdef CACHE
+ if (finalResponse == TRANS_COMMIT) {
+ /* Invalidate objects in other machine cache */
+ int retval;
+ if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ }
+#endif
+
/* Send responses to all machines */
for(i = 0; i < pilecount; i++) {
int sd = socklist[i];
return 1;
}
+#if 0
/* Invalidate objects in other machine cache */
if(tosend[i].f.nummod > 0) {
if((retval = invalidateObj(&(tosend[i]))) != 0) {
return 1;
}
}
+#endif
#ifdef ABORTREADERS
removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
return;
}
} else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
- /* Invalidate objects in other machine cache */
- if(tdata->f.nummod > 0) {
- int retval;
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
-#endif
if(transComProcess(tdata, transinfo) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
fflush(stdout);
return 0;
}
+prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
+ int i;
+ int j;
+ prefetchpile_t * head=NULL;
+
+ for(j=0;j<numprefetches;j++) {
+ int siteid = *(GET_SITEID(ptr));
+ int ntuples = *(GET_NTUPLES(ptr));
+ unsigned int * oidarray = GET_PTR_OID(ptr);
+ unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ int numLocal = 0;
+
+ for(i=0; i<ntuples; i++) {
+ unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
+ unsigned short endindex=endoffsets[i];
+ unsigned int oid=oidarray[i];
+ int newbase;
+ int machinenum;
+ int countInvalidObj=0;
+
+ if (oid==0) {
+ numLocal++;
+ continue;
+ }
+ //Look up fields locally
+ int isLastOffset=0;
+ if(endindex==0)
+ isLastOffset=1;
+ for(newbase=baseindex; newbase<endindex; newbase++) {
+ if(newbase==(endindex-1))
+ isLastOffset=1;
+ if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
+ break;
+ }
+ //Ended in a null pointer...
+ if (oid==0) {
+ numLocal++;
+ goto tuple;
+ }
+ }
+
+ //Entire prefetch is local
+ if (newbase==endindex&&checkoid(oid,isLastOffset)) {
+ numLocal++;
+ goto tuple;
+ }
+
+ //Add to remote requests
+ machinenum=lhashSearch(oid);
+ insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+ tuple:
+ ;
+ }
+
+ /* handle dynamic prefetching */
+ handleDynPrefetching(numLocal, ntuples, siteid);
+ ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
+ }
+
+ return head;
+}
+
+/*
prefetchpile_t *foundLocal(char *ptr) {
int siteid = *(GET_SITEID(ptr));
int ntuples = *(GET_NTUPLES(ptr));
;
}
- /* handle dynamic prefetching */
+ // handle dynamic prefetching
handleDynPrefetching(numLocal, ntuples, siteid);
return head;
}
+*/
+
+int checkoid(unsigned int oid, int isLastOffset) {
+ objheader_t *header;
+ if ((header=mhashSearch(oid))!=NULL) {
+ //Found on machine
+ return 1;
+ } else if ((header=prehashSearch(oid))!=NULL) {
+ //if the last offset then prefetch object
+ if((STATUS(header) & DIRTY) && isLastOffset) {
+ return 0;
+ }
+ //Found in cache
+ return 1;
+ } else {
+ return 0;
+ }
+}
+#if 0
int checkoid(unsigned int oid) {
objheader_t *header;
if ((header=mhashSearch(oid))!=NULL) {
return 0;
}
}
+#endif
+
+int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
+ objheader_t *header;
+ if ((header=mhashSearch(*oid))!=NULL) {
+ //Found on machine
+ ;
+ } else if ((header=prehashSearch(*oid))!=NULL) {
+ //Found in cache
+ if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+ //only once because later old entries may still cause unnecessary roundtrips during prefetching
+ (*countInvalidObj)+=1;
+ if(*countInvalidObj > 1) {
+ return 0;
+ }
+ }
+ } else {
+ return 0;
+ }
+ if(TYPE(header) >= NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ int length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offset < 0 || offset >= length) {
+ //if yes treat the object as found
+ (*oid)=0;
+ return 1;
+ }
+ (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
+ return 1;
+ } else {
+ (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
+ return 1;
+ }
+}
+
+#if 0
int lookupObject(unsigned int * oid, short offset) {
objheader_t *header;
if ((header=mhashSearch(*oid))!=NULL) {
return 1;
}
}
+#endif
+
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *t) {
+ while(1) {
+ /* read from prefetch queue */
+ void *node=gettail();
+ /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* and group requests by remote machine ids by calling the makePreGroups() */
+ int count=numavailable();
+ prefetchpile_t *pilehead = foundLocal(node, count);
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ /* Release socket */
+ // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+
+ /* Deallocated pilehead */
+ mcdealloc(pilehead);
+ }
+ // Deallocate the prefetch queue pile node
+ incmulttail(count);
+ }
+}
/* This function is called by the thread calling transPrefetch */
+#if 0
void *transPrefetch(void *t) {
while(1) {
/* read from prefetch queue */
inctail();
}
}
+#endif
void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
objpile_t *tmp;
numRecovery++;
long long st;
long long fi;
- unsigned int dupeSize = 0; // to calculate the size of backed up data
+ unsigned int dupeSize; // to calculate the size of backed up data
+ unsigned int recvDataSize = 0; // to calculate the size of recv data
st = myrdtsc(); // to get clock
recoverStat[numRecovery-1].deadMachine = mid;
if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
printf("%s -> Socket create error\n",__func__);
exit(0);
- }
/* request for original */
char duperequest;
send_data(bsd, &originalMid, sizeof(unsigned int));
char p_response,b_response;
- unsigned int p_receivedSize,b_receivedSize;
+ unsigned int p_receivedSize,b_receivedSize;
recv_data(psd, &p_response, sizeof(char));
recv_data(psd, &p_receivedSize, sizeof(unsigned int));
dupeSize += p_receivedSize; // size of primary data
+ recvDataSize += p_receivedSize; // size of primary data
recv_data(bsd, &b_response, sizeof(char));
recv_data(bsd, &b_receivedSize, sizeof(unsigned int));
dupeSize += b_receivedSize; // size of backup data
+ recvDataSize += b_receivedSize; // size of backup data
if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE)
{
fi = myrdtsc();
recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
recoverStat[numRecovery-1].recoveredData = dupeSize;
+ recoverStat[numRecovery-1].recvData = recvDataSize;
printRecoveryStat();
#endif
for(i=0; i < numRecovery;i++) {
printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData);
- printf("Recovery Time(us) = %ld\n",recoverStat[i].elapsedTime);
+ printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
+ printf("Data recv(bytes) = %ld\n",recoverStat[i].recvData);
}
printf("**************************\n\n");
fflush(stdout);