#include <sys/select.h>
#include "tlookup.h"
-//#define CPU_FREQ 2992440
-
#define CPU_FREQ 3056842
#endif
int nchashSearch = 0;
int nmhashSearch = 0;
int nprehashSearch = 0;
+int ndirtyCacheObj = 0;
int nRemoteSend = 0;
int nSoftAbort = 0;
int bytesSent = 0;
int waitThreadMid;
unsigned int waitThreadID;
-int transRetryFlag;
+__thread int transRetryFlag;
unsigned int transIDMin;
unsigned int transIDMax;
return 0; // completed sending data
}
+void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
+ if (buflen+sendbuffer->offset>WMAXBUF) {
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+ send_data(fd, buffer, buflen);
+ return;
+ }
+ memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
+ sendbuffer->offset+=buflen;
+ if (sendbuffer->offset>WTOP) {
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+ }
+}
+
+void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
+ if (buflen+sendbuffer->offset>WMAXBUF) {
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+ send_data(fd, buffer, buflen);
+ return;
+ }
+ memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
+ sendbuffer->offset+=buflen;
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+}
+
//Returns negative value if receive cannot be completed because of
//timeout or machine failure
GDBRECV1:
#endif
numbytes = recv(fd, buffer, size, 0);
+ bytesRecv += numbytes;
if (numbytes>0) {
buffer += numbytes;
perror("recv_data_errorcode");
return -1;
}
+ bytesRecv += numbytes;
buffer += numbytes;
size -= numbytes;
}
} else {
#ifdef CACHE
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+ ndirtyCacheObj++;
+#endif
+ goto remoteread;
+ }
#ifdef TRANSSTATS
nprehashSearch++;
#endif
return objcopy;
#endif
}
+remoteread:
#endif
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
nRemoteSend++;
#endif
#ifdef COMPILER
+#ifdef CACHE
+ //Copy object to prefetch cache
+ pthread_mutex_lock(&prefetchcache_mutex);
+ objheader_t *headerObj;
+ int size;
+ GETSIZE(size, objcopy);
+ if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
+ printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+ __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return NULL;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+ //make an entry in prefetch lookup hashtable
+ prehashInsert(oid, headerObj);
+ LOGEVENT('B');
+#endif
return &objcopy[1];
#else
return objcopy;
} else {
#ifdef CACHE
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+ ndirtyCacheObj++;
+#endif
+ goto remoteread;
+ }
#ifdef TRANSSTATS
LOGEVENT('P')
nprehashSearch++;
return objcopy;
#endif
}
+remoteread:
#endif
- /* Get the object from the remote location */
+ /* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
return NULL;
#endif
#endif
- objcopy = getRemoteObj(machinenumber, oid);
+ objcopy = getRemoteObj(machinenumber, oid);
#ifdef RECOVERY
if(transRetryFlag) {
} else {
#ifdef TRANSSTATS
LOGEVENT('R');
- nRemoteSend++;
+ nRemoteSend++;
#endif
#ifdef COMPILER
+#ifdef CACHE
+ //Copy object to prefetch cache
+ pthread_mutex_lock(&prefetchcache_mutex);
+ objheader_t *headerObj;
+ int size;
+ GETSIZE(size, objcopy);
+ if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
+ printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+ __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return NULL;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+ //make an entry in prefetch lookup hashtable
+ prehashInsert(oid, headerObj);
+#endif
+
return &objcopy[1];
#else
return objcopy;
#ifdef DEBUG
printf("%s -> Finished!!\n",__func__);
#endif
-
}
/* This function creates objects in the transaction record */
OID(tmp) = getNewOID();
tmp->notifylist = NULL;
tmp->version = 1;
- tmp->rcount = 1;
- tmp->isBackup = 0;
+ //tmp->rcount = 1;
+ tmp->isBackup = 0;
STATUS(tmp) = NEW;
t_chashInsert(OID(tmp), tmp);
int makedirty = 0;
unsigned int mid;
- mid = lhashSearch(oid);
-
// if the obj is dirty or new
if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
// set flag for backup machine
}
// if the obj is new or local, destination will be my Ip
- if((mid = lhashSearch(oid)) == 0) {
+ if((mid=lhashSearch(oid)) == 0) {
mid = myIpAddr;
}
int firsttime=1;
trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
char finalResponse;
-
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
#ifdef RECOVERY
int deadsd = -1;
int deadmid = -1;
/* Create a socket and getReplyCtrl array, initialize */
int socklist[pilecount];
+ char getReplyCtrl[pilecount];
int loopcount;
- for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++) {
socklist[loopcount] = 0;
- char getReplyCtrl[pilecount];
- for(loopcount = 0 ; loopcount < pilecount; loopcount++)
getReplyCtrl[loopcount] = 0;
+ }
/* Process each machine pile */
int sockindex = 0;
- int localReqsock = -1;
+ int localReqsock = -1;
trans_req_data_t *tosend;
tosend = calloc(pilecount, sizeof(trans_req_data_t));
while(pile != NULL) {
}
socklist[sockindex] = sd;
/* Send bytes of data with TRANS_REQUEST control message */
- send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ //send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
- send_data(sd, tosend[sockindex].listmid, size);
+ //send_data(sd, tosend[sockindex].listmid, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
}
/* Send oids and version number tuples for objects that are read */
{
int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
- send_data(sd, tosend[sockindex].objread, size);
+ //send_data(sd, tosend[sockindex].objread, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
}
/* Send objects that are modified */
memcpy(modptr+offset, headeraddr, size);
offset+=size;
}
- send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ //send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
#ifdef RECOVERY
/* send transaction id, number of machine involved, machine ids */
- send_data(sd, &transID, sizeof(unsigned int));
+ //send_data(sd, &transID, sizeof(unsigned int));
+ forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
#endif
free(modptr);
} else { //handle request locally
return 1;
}
-#if 0
- /* Invalidate objects in other machine cache */
- if(tosend[i].f.nummod > 0) {
- if((retval = invalidateObj(&(tosend[i]))) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
- return 1;
- }
- }
-#endif
#ifdef ABORTREADERS
removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
memcpy(ptrcreate, header, tmpsize);
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
-// printf("oid created : %u\n",oidcreated[i]);
}
/* Unlock locked objects */
int useWriteUnlock = 0;
*((int*)buf) = tmp->numoffset;
buf+=sizeof(int);
*((unsigned int *)buf) = tmp->oid;
+#ifdef TRANSSTATS
+ sendRemoteReq++;
+#endif
buf+=sizeof(unsigned int);
*((unsigned int *)buf) = myIpAddr;
buf += sizeof(unsigned int);
/* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
endpair = -1;
send_data(sd, &endpair, sizeof(int));
-
return;
}
recv_data((int)sd, &length, sizeof(int));
size = length - sizeof(int);
char recvbuffer[size];
-
+#ifdef TRANSSTATS
+ getResponse++;
+#endif
recv_data((int)sd, recvbuffer, size);
control = *((char *) recvbuffer);
if(control == OBJECT_FOUND) {
numRecovery++;
long long st;
long long fi;
- unsigned int dupeSize; // to calculate the size of backed up data
- unsigned int recvDataSize = 0; // to calculate the size of recv data
+ unsigned int dupeSize = 0; // to calculate the size of backed up data
st = myrdtsc(); // to get clock
recoverStat[numRecovery-1].deadMachine = mid;
* Backup 26 21,24
*/
+#ifdef RECOVERYSTATS
dupeSize = 0;
+#endif
if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
printf("%s -> Socket create error\n",__func__);
exit(0);
}
+
/* request for original */
char duperequest;
duperequest = DUPLICATE_ORIGINAL;
recv_data(psd, &p_response, sizeof(char));
recv_data(psd, &p_receivedSize, sizeof(unsigned int));
+#ifdef RECOVERYSTATS
dupeSize += p_receivedSize; // size of primary data
- recvDataSize += p_receivedSize; // size of primary data
+#endif
recv_data(bsd, &b_response, sizeof(char));
recv_data(bsd, &b_receivedSize, sizeof(unsigned int));
+#ifdef RECOVERYSTATS
dupeSize += b_receivedSize; // size of backup data
- recvDataSize += b_receivedSize; // size of backup data
+#endif
if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE)
{
fi = myrdtsc();
recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
recoverStat[numRecovery-1].recoveredData = dupeSize;
- recoverStat[numRecovery-1].recvData = recvDataSize;
-
printRecoveryStat();
#endif
int mIndex = findHost(mid);
return getStatus(mIndex);
}
+#endif
-#ifdef RECOVERYSTATS
void printRecoveryStat() {
+#ifdef RECOVERYSTATS
printf("\n***** Recovery Stats *****\n");
printf("numRecovery = %d\n",numRecovery);
int i;
printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData);
printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
- printf("Data recv(bytes) = %ld\n",recoverStat[i].recvData);
}
printf("**************************\n\n");
fflush(stdout);
-}
#else
-void printRecoveryStat() {
printf("No stat\n");
-}
-#endif
-
-
-
-
#endif
+}