+#include "dstm.h"
+#include "ip.h"
#include "machinepile.h"
#include "mlookup.h"
#include "llookup.h"
int sizeOfHostArray;
int numHostsInSystem;
int myIndexInHostArray;
-int waitThreadMid;
-unsigned int waitThreadID;
-
unsigned int oidsPerBlock;
unsigned int oidMin;
unsigned int oidMax;
int bytesSent = 0;
int bytesRecv = 0;
int totalObjSize = 0;
+int sendRemoteReq = 0;
+int getResponse = 0;
#ifdef RECOVERY
/***********************************
* Global variables for Duplication
***********************************/
int *liveHosts;
-int liveHostsValid;
int numLiveHostsInSystem;
-int flipBit; // Used to distribute requests between primary and backup evenly
unsigned int *locateObjHosts;
-#endif
+
+
+/* variables to clear dead threads */
+int waitThreadMid;
+unsigned int waitThreadID;
int transRetryFlag;
-unsigned int transIDMax;
unsigned int transIDMin;
-unsigned int transIDIndex;
-char ip[16];
+unsigned int transIDMax;
+
+char ip[16]; // for debugging purpose
-#ifdef RECOVERY
/******************************
* Global variables for Paxos
******************************/
plistnode_t *createPiles();
plistnode_t *sortPiles(plistnode_t *pileptr);
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+ int tmp=bigindex++; \
+ bigarray[tmp]=x; \
+ }
+#else
+#define LOGEVENT(x)
+#endif
+
/*******************************
* Send and Recv function calls
*******************************/
int numbytes;
while (size > 0) {
-
#ifdef GDBDEBUG
GDBSEND1:
#endif
numbytes = send(fd, buffer, size, 0);
- if( numbytes>0) {
+ if( numbytes > 0) {
bytesSent += numbytes;
size -= numbytes;
}
#ifdef RECOVERY
- else if( numbytes < 0) {
+ else if( numbytes < 0) {
// Receive returned an error.
// Analyze underlying cause
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno));
fflush(stdout);
#endif
return -1;
} else {
#ifdef GDBDEBUG
- if(errno == 4)
- goto GDBSEND1;
+ if(errno == 4)
+ goto GDBSEND1;
#endif
-
#ifdef DEBUG
printf("%s -> Unexpected ERROR!\n",__func__);
#endif
else{
// Case : numbytes == 0
// // machine has failed -- this case probably doesn't occur in reality
- //
-
-
-
#ifdef DEBUG
printf("%s -> SHOULD NOT BE HERE\n",__func__);
#endif
return -1;
}
+#else
+ if(numbytes == -1) {
+ perror("send");
+ exit(0);
+ }
#endif
} // close while loop
#ifdef DEBUG
printf("%s-> Exiting\n", __func__);
#endif
-
return 0; // completed sending data
}
#endif
return -1;
}
+#else
+ if( numbytes == -1) {
+ perror("recv");
+ exit(0);
+ }
#endif
} //close while loop
#ifdef DEBUG
perror("recv_data_errorcode");
return -1;
}
-
buffer += numbytes;
size -= numbytes;
}
return max;
}
+#ifdef RECOVERY
char* midtoIPString(unsigned int mid){
midtoIP(mid, ip);
return ip;
}
+#endif
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
setLocateObjHosts();
updateLiveHostsCommit();
paxos();
+ printHostsStatus();
if(!allHostsLive()) {
printf("Not all hosts live. Exiting.\n");
exit(-1);
/* Insert into cache's lookup table */
STATUS(objcopy)=0;
t_chashInsert(OID(objheader), objcopy);
+#ifdef DEBUG
+ printf("%s -> obj type = %d\n",__func__,getObjType(oid));
+ printf("%s -> obj grabbed\n",__func__);
+#endif
#ifdef COMPILER
return &objcopy[1];
#else
#endif
} else {
#ifdef CACHE
- , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
#ifdef TRANSSTATS
+ LOGEVENT('P')
nprehashSearch++;
#endif
/* Look up in prefetch cache */
}
#endif
/* Get the object from the remote location */
-
+ if((machinenumber = lhashSearch(oid)) == 0) {
+ printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+ return NULL;
+ }
#ifdef DEBUG
printf("%s-> Grab from remote machine\n", __func__);
#endif
#ifdef RECOVERY
transRetryFlag = 0;
- unsigned int mindex = findHost(lhashSearch(oid));
- machinenumber = locateObjHosts[2*mindex+flipBit];
-
- if(numLiveHostsInSystem > 1)
- flipBit ^= 1;
- else
- flipBit = 0;
+
+ unsigned int machinenumber;
+ static int flipBit = 0; // Used to distribute requests between primary and backup evenly
+ // either primary or backup machine
+ machinenumber = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid));
+ flipBit ^= 1;
#ifdef DEBUG
printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
#endif
-#else
- if((machinenumber = lhashSearch(oid)) == 0) {
- printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
- return NULL;
- }
#endif
+
objcopy = getRemoteObj(machinenumber, oid);
+
#ifdef RECOVERY
if(transRetryFlag) {
restoreDuplicationState(machinenumber);
return transRead2(oid);
}
#endif
- }
if(objcopy == NULL) {
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
#ifdef TRANSSTATS
+ LOGEVENT('R');
nRemoteSend++;
#endif
#ifdef COMPILER
return objcopy;
#endif
}
+ }
#ifdef DEBUG
printf("%s -> Finished!!\n",__func__);
#endif
+
}
/* This function creates objects in the transaction record */
/* This function creates machine piles based on all machines involved in a
* transaction commit request */
plistnode_t *createPiles() {
+
+#ifdef DEBUG
+ printf("%s -> Entering\n",__func__);
+#endif
int i;
unsigned int oid;
plistnode_t *pile = NULL;
#if RECOVERY
oid = OID(headeraddr);
-#ifdef DEBUG
- printf("%s-> oid:%u, version:%d, status:%d, type:%d\n", __func__, OID(headeraddr), headeraddr->version, STATUS(headeraddr), TYPE(headeraddr));
- if (STATUS(headeraddr) & NEW) { // new/local object
- printf("%s-> new/local object\n", __func__);
- }
- else if ((mhashSearch(curr->key) != NULL)) { //local/nonnew
- if(STATUS(headeraddr) & DIRTY) { // modified
- printf("%s-> old/local/mod object\n", __func__);
- }
- else { //read
- printf("%s-> old/local/read object\n", __func__);
- }
- }
- else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
- if(STATUS(headeraddr) & DIRTY) { //modified
- printf("%s-> remote/local/mod object\n", __func__);
- }
- else { //read
- printf("%s-> remote/local/read object\n", __func__);
- }
- }
- else {
- printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
- unsigned int pmid = getPrimaryMachine(lhashSearch(oid));
- unsigned int bmid = getBackupMachine(lhashSearch(oid));
- printf("%s-> Primary Machine: [%s], ", __func__, midtoIPString(pmid));
- printf("Backup Machine: [%s]\n", midtoIPString(bmid));
-#endif
- int makedirty = 0;
- if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
- makedirty = 1;
- }
- pile = pInsert(pile, headeraddr, getPrimaryMachine(lhashSearch(oid)), c_numelements);
-//problem here
- if(makedirty) {
- STATUS(headeraddr) = DIRTY;
- }
- pile = pInsert(pile, headeraddr, getBackupMachine(lhashSearch(oid)), c_numelements);
+ int makedirty = 0;
+ unsigned int mid;
+
+ mid = lhashSearch(oid);
+
+ // if the obj is dirty or new
+ if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
+ // set flag for backup machine
+ makedirty = 1;
+ }
+
+ // if the obj is new or local, destination will be my Ip
+ if((mid = lhashSearch(oid)) == 0) {
+ mid = myIpAddr;
+ }
+
+ pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+
+ if(makedirty) {
+ STATUS(headeraddr) = DIRTY;
+ }
+
+ pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
#else
// Get machine location for object id (and whether local or not)
if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
int firsttime=1;
trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
char finalResponse;
+
+#ifdef RECOVERY
int deadsd = -1;
int deadmid = -1;
unsigned int transID = getNewTransID();
+#endif
#ifdef DEBUG
printf("%s -> Starts transCommit\n",__func__);
/* Create a list of machine ids(Participants) involved in transaction */
listmid = calloc(pilecount, sizeof(unsigned int));
pListMid(pile, listmid);
-
+
/* Create a socket and getReplyCtrl array, initialize */
int socklist[pilecount];
int loopcount;
tosend[sockindex].f.numread = pile->numread;
tosend[sockindex].f.nummod = pile->nummod;
tosend[sockindex].f.numcreated = pile->numcreated;
-#ifdef DEBUG
- printf("%s-> numread:%d, nummod:%d, numcreated:%d\n", __func__, pile->numread, pile->nummod, pile->numcreated);
-#endif
tosend[sockindex].f.sum_bytes = pile->sum_bytes;
tosend[sockindex].listmid = listmid;
tosend[sockindex].objread = pile->objread;
tosend[sockindex].oidmod = pile->oidmod;
tosend[sockindex].oidcreated = pile->oidcreated;
- int sd = 0;
+
+
+ int sd = 0;
if(pile->mid != myIpAddr) {
if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
printf("\ntransRequest(): socket create error\n");
printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
free(listmid);
free(tosend);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 1;
}
int offset = 0;
free(modptr);
free(listmid);
free(tosend);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 1;
}
GETSIZE(size,headeraddr);
#endif
free(modptr);
} else { //handle request locally
- localReqsock = sockindex;
handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
}
sockindex++;
int i;
for(i = 0; i < pilecount; i++) {
- if(i == localReqsock)
- continue;
int sd = socklist[i];
if(sd != 0) {
char control;
//Update common data structure with new ctrl msg
getReplyCtrl[i] = control;
/* Recv Objects if participant sends TRANS_DISAGREE */
- //printf("getReplyCtrl[%d] = %d\n", i, (int)getReplyCtrl[i]);
#ifdef CACHE
if(control == TRANS_DISAGREE) {
int length;
- timeout = recv_data(sd, &length, sizeof(int));
+ recv_data(sd, &length, sizeof(int));
void *newAddr;
pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
free(tosend);
free(listmid);
pthread_mutex_unlock(&prefetchcache_mutex);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 1;
}
pthread_mutex_unlock(&prefetchcache_mutex);
- timeout = recv_data(sd, newAddr, length);
+ recv_data(sd, newAddr, length);
int offset = 0;
while(length != 0) {
unsigned int oidToPrefetch;
#ifdef RECOVERY
if(timeout < 0) {
-#ifdef DEBUG
- printf("%s -> TIMEOUT!!!!!!!\n",__func__);
-#endif
-
deadmid = listmid[i];
deadsd = sd;
#ifdef DEBUG
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(tosend);
free(listmid);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 1;
}
#ifdef DEBUG
/* Send responses to all machines */
for(i = 0; i < pilecount; i++) {
int sd = socklist[i];
-
+#ifdef RECOVERY
if(sd != deadsd) {
+#endif
if(sd != 0) {
#ifdef CACHE
if(finalResponse == TRANS_COMMIT) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(tosend);
free(listmid);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 1;
}
printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
free(tosend);
free(listmid);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 1;
}
}
}
#endif
}
- } else {
-#ifdef ABORTREADERS
- removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
+#ifdef RECOVERY
+ }
#endif
- }
- }
+ }
-#ifdef DEBUG
- printf("%s-> Free sockets\n", __func__);
-#endif
- for(i = 0; i < pilecount; i++) {
- if(socklist[i] != 0) {
- freeSockWithLock(transRequestSockPool, listmid[i], socklist[i]);
- }
- }
+ for(i = 0; i< pilecount; i++) {
+ if(socklist[i] > 0) {
+ freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]);
+ }
+ }
/* Free resources */
free(tosend);
nSoftAbort++;
#endif
}
+
+
} while (treplyretry && deadmid != -1);
if(finalResponse == TRANS_ABORT) {
-
#ifdef TRANSSTATS
numTransAbort++;
#endif
/* Free Resources */
objstrDelete(t_cache);
t_chashDelete();
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
#ifdef RECOVERY
if(deadmid != -1) { /* if deadmid is greater than or equal to 0,
then there is dead machine. */
printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
#endif
restoreDuplicationState(deadmid);
-#ifdef DEBUG
- printf("%s -> Duplication completed\n",__func__);
-#endif
}
#endif
return TRANS_ABORT;
/* Free Resources */
objstrDelete(t_cache);
t_chashDelete();
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
return 0;
} else {
//TODO Add other cases
printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
exit(-1);
}
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
#endif
return 0;
/* Condition to send TRANS_AGREE */
if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
-#ifdef DEBUG
- printf("%s -> TRANS_AGREE\n",__func__);
-#endif
*getReplyCtrl = TRANS_AGREE;
}
/* Condition to send TRANS_SOFT_ABORT */
if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
-#ifdef DEBUG
- printf("%s -> TRANS_SOFT_ABORT\n",__func__);
-#endif
*getReplyCtrl = TRANS_SOFT_ABORT;
}
}
* cache. */
void *getRemoteObj(unsigned int mnum, unsigned int oid) {
+#ifdef DEBUG
+ printf("%s -> entering\n",__func__);
+#endif
int size, val;
struct sockaddr_in serv_addr;
- char machineip[16];
char control = 0;
objheader_t *h;
void *objcopy = NULL;
- int sd;
- int flag;
-
- if((sd = getSock2(transReadSockPool, mnum)) != -1) {
- char readrequest[sizeof(char)+sizeof(unsigned int)];
- readrequest[0] = READ_REQUEST;
- *((unsigned int *)(&readrequest[1])) = oid;
- send_data(sd, readrequest, sizeof(readrequest));
- }
- else {
- printf("%s -> creating socket error\n",__func__);
- }
+ int sd = getSock2(transRequestSockPool, mnum);
+ char readrequest[sizeof(char)+sizeof(unsigned int)];
+ readrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&readrequest[1])) = oid;
+ send_data(sd, readrequest, sizeof(readrequest));
/* Read response from the Participant */
if(recv_data(sd, &control, sizeof(char)) < 0) {
- transRetryFlag = 1;
- return NULL;
+ transRetryFlag = 1;
+ return NULL;
}
if (control==OBJECT_NOT_FOUND) {
transRetryFlag = 1;
return NULL;
}
-
STATUS(objcopy)=0;
/* Insert into cache's lookup table */
char response;
for(i = 0; i < nummid; i++) {
- if((sd = getSock(transReadSockPool, listmid[i])) < 0) {
+ if((sd = getSock(transPrefetchSockPool, listmid[i])) < 0) {
printf("%s -> socket Error!!\n");
}
else {
break; // received response
// else check next machine
- freeSock(transReadSockPool, listmid[i],sd);
+ freeSock(transPrefetchSockPool, listmid[i],sd);
}
}
#ifdef DEBUG
int sd;
char ctrl;
- if(!liveHosts[findHost(deadHost)]) {
+ if(!liveHosts[findHost(deadHost)]) { // if it is already fixed
sleep(WAIT_TIME);
return;
}
- if(deadHost == leader)
+ if(deadHost == leader) // if leader is dead, then pick a new leader
paxos();
#ifdef DEBUG
leaderFixing = 1;
pthread_mutex_unlock(&leaderFixing_mutex);
- if(!liveHosts[findHost(deadHost)]) {
+ if(!liveHosts[findHost(deadHost)]) { // if it is already fixed
#ifdef DEBUG
printf("%s -> already fixed\n",__func__);
#endif
leaderFixing =0;
pthread_mutex_unlock(&leaderFixing_mutex);
}
- else {
+ else { // if i am the leader
updateLiveHosts();
duplicateLostObjects(deadHost);
sleep(WAIT_TIME);
}
}
- else {
+ else { // request leader to fix the situation
if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) {
printf("%s -> socket create error\n",__func__);
exit(-1);
numlocked = transinfo->numlocked;
oidlocked = transinfo->objlocked;
-#ifdef DEBUG
- printf("%s-> nummod: %d, numcreated: %d, numlocked: %d\n", __func__, nummod, numcreated, numlocked);
-#endif
-
for (i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
header->version += 1;
//printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
if(header->notifylist != NULL) {
-#ifdef DEBUG
- printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
-#endif
#ifdef RECOVERY
- if(header->isBackup != 0)
+ if(header->isBackup != 0) // if it is primary obj, notify
notifyAll(&header->notifylist, OID(header), header->version);
- else
+ else // if not, just clear the notification list
clearNotifyList(OID(header));
#else
notifyAll(&header->notifylist, OID(header), header->version);
return 1;
}
header->version += 1;
- //printf("oid: %u, new header version: %d\n", oidcreated[i], header->version);
GETSIZE(tmpsize, header);
tmpsize += sizeof(objheader_t);
pthread_mutex_lock(&mainobjstore_mutex);
memcpy(ptrcreate, header, tmpsize);
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
+// printf("oid created : %u\n",oidcreated[i]);
}
/* Unlock locked objects */
int useWriteUnlock = 0;
#ifdef CACHE
if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
#endif
+
+#ifdef RECOVERY
+ unsigned int mid = lhashSearch(oid);
+ unsigned int machineID;
+ static flipBit = 0;
+ machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid));
+ int sd = getSock2(transReadSockPool, machineID);
+#else
unsigned int mid = lhashSearch(oid);
int sd = getSock2(transReadSockPool, mid);
+#endif
char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
remotereadrequest[0] = READ_REQUEST;
*((unsigned int *)(&remotereadrequest[1])) = oid;
return id;
}
+#ifdef RECOVERY
static unsigned int tid = 0xFFFFFFFF;
unsigned int getNewTransID(void) {
tid++;
}
return tid;
}
+#endif
int processConfigFile() {
FILE *configFile;
#ifdef RECOVERY
liveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
locateObjHosts = calloc(sizeOfHostArray*2, sizeof(unsigned int));
-
- liveHostsValid = 0;
#endif
while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
myIndexInHostArray = findHost(myIpAddr);
#ifdef RECOVERY
liveHosts[myIndexInHostArray] = 1;
- //locateObjHosts[myIndexInHostArray] = myIpAddr;
#endif
if (myIndexInHostArray == -1) {
printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
unsigned int pmid;
int pmidindex = 2*findHost(mid);
+ if(pmidindex < 0)
+ printf("What!!!\n");
+
pthread_mutex_lock(&liveHosts_mutex);
pmid = locateObjHosts[pmidindex];
pthread_mutex_unlock(&liveHosts_mutex);
unsigned int bmid;
int bmidindex = 2*findHost(mid)+1;
+ if(bmidindex < 0)
+ printf("damn!!\n");
+
pthread_mutex_lock(&liveHosts_mutex);
bmid = locateObjHosts[bmidindex];
pthread_mutex_unlock(&liveHosts_mutex);
printf("%s-> Entering updateLiveHosts\n", __func__);
#endif
// update everyone's list
- liveHostsValid = 0;
//foreach in hostipaddrs, ping -> update list of livemachines
//socket connection?
int sd = 0, i, j, tmpNumLiveHosts = 0;
for(i = 0; i < numHostsInSystem; i++) {
if(i == myIndexInHostArray)
- {
+ {
+ liveHosts[i] = 1;
tmpNumLiveHosts++;
continue;
}
- for(j = 0; j < 5; j++) { // hard define num of retries
- if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
-#ifdef DEBUG
- printf("%s -> Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
-#endif
- usleep(1000);
-
- if(j == 4) {
- if(liveHosts[i]) {
- liveHosts[i] = 0;
- deadhost = i;
- }
- }
- continue;
- }
+ if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
+ usleep(1000);
+
+ if(liveHosts[i]) {
+ liveHosts[i] = 0;
+ deadhost = i;
+ }
+ continue;
+ }
- char liverequest[sizeof(char)];
- liverequest[0] = RESPOND_LIVE;
+ char liverequest;
+ liverequest = RESPOND_LIVE;
- send_data(sd, &liverequest[0], sizeof(liverequest));
+ send_data(sd, &liverequest, sizeof(char));
- char response = 0;
- int timeout = recv_data(sd, &response, sizeof(response));
+ char response = 0;
+ int timeout = recv_data(sd, &response, sizeof(char));
- //try to send msg
- //if timeout, dead host
- if(response == LIVE) {
- liveHosts[i] = 1;
- tmpNumLiveHosts++;
- }
- else {
- if(liveHosts[i]) {
- liveHosts[i] = 0;
- deadhost = i;
- }
- }
- freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
- break;
+ //try to send msg
+ //if timeout, dead host
+ if(response == LIVE) {
+ liveHosts[i] = 1;
+ tmpNumLiveHosts++;
}
-#ifdef DEBUG
- if(liveHosts[i] == 0)
-
- printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
-#endif
+ else {
+ if(liveHosts[i]) {
+ liveHosts[i] = 0;
+ deadhost = i;
+ }
+ }
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
- numLiveHostsInSystem = tmpNumLiveHosts;
+
+ numLiveHostsInSystem = tmpNumLiveHosts;
#ifdef DEBUG
printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
#endif
//have updated list of live machines
#ifdef DEBUG
- printf("%s-> Exiting updateLiveHosts\n", __func__);
printHostsStatus();
+ printf("%s-> Exiting updateLiveHosts\n", __func__);
#endif
return deadhost;
int sd = 0, i;
char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
-
+
updaterequest[0] = UPDATE_LIVE_HOSTS;
-
for(i = 0; i < numHostsInSystem; i++) {
*((int *)(&updaterequest[i*4+1])) = liveHosts[i]; // clean this up later
}
}
//for each machine send data
-
for(i = 0; i < numHostsInSystem; i++) { // hard define num of retries
if(i == myIndexInHostArray)
continue;
freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
}
- liveHostsValid = 1;
#ifdef DEBUG
printHostsStatus();
printf("%s -> Finish\n",__func__);
return 0;
}
+#endif
+#ifdef RECOVERY
void setLocateObjHosts() {
int i = 0, validIndex = 0;
//check num hosts even valid first
- for(;i < numHostsInSystem; i++) {
-#ifdef DEBUG
- printf("%s-> i:%d\n", __func__, i);
-#endif
+ for(i = 0;i < numHostsInSystem; i++) {
while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) {
validIndex++;
int newPrimaryIndex = findHost(newPrimary);
int i;
+ /* duplicateLostObject example
+ * Before M24 die,
+ * MID 21 24 26
+ * Primary 21 24 26
+ * Backup 26 21 24
+ * After M24 die,
+ * MID 21 26
+ * Primary 21,24 26
+ * Backup 26 21,24
+ */
+
locateObjHosts[2*newPrimaryIndex+1] = backupMachine;
locateObjHosts[2*mIndex] = newPrimary;
}
return 1;
}
+#endif
+#ifdef RECOVERY
void duplicateLostObjects(unsigned int mid){
printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));
//connect to these machines
//go through their object store copying necessary (in a transaction)
- //transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
int sd = 0, i, j, tmpNumLiveHosts = 0;
/* duplicateLostObject example
while(*head != NULL) {
ptr = *head;
-
mid = ptr->mid;
-#ifdef DEBUG
- printf("%s -> trying to connect MID : %s\n",__func__,midtoIPString(mid));
-#endif
//create a socket connection to that machine
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fflush(stdout);
status = -1;
} else {
-#ifdef DEBUG
- printf("%s -> connected\n",__func__);
-#endif
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;
*((unsigned int *)&msg[1]) = oid;
}
//close socket
close(sock);
-
// Update head
*head = ptr->next;
free(ptr);
-#ifdef DEBUG
- printf("%s -> End notifying MID : %s\n",__func__,midtoIPString(mid));
-#endif
}
return status;
}
plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
plistnode_t *ptr, *tmp;
int found = 0, offset = 0;
- char ip[16];
tmp = pile;
+
//Add oid into a machine that is already present in the pile linked list structure
while(tmp != NULL) {
-// printf("tmp->mid = [%s], mid = [%s]\n", midtoIPString(tmp->mid), midtoIPString(mid));
if (tmp->mid == mid) {
int tmpsize;
if (!found) {
int tmpsize;
if((ptr = pCreate(num_objs)) == NULL) {
+ printf("pCreate Error\n");
return NULL;
}
+
ptr->mid = mid;
if (STATUS(headeraddr) & NEW) {
ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
*((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
ptr->numread++;
}
+
ptr->next = pile;
pile = ptr;
}
/* Arrange local machine processing at the end of the pile list */
while(ptr != NULL) {
if(ptr != tail) {
+ /*
if(ptr->mid == myIpAddr && (prev != pileptr)) {
prev->next = ptr->next;
ptr->next = NULL;
return pileptr;
}
if((ptr->mid == myIpAddr) && (prev == pileptr)) {
- prev = ptr->next;
- ptr->next = NULL;
- tail->next = ptr;
- return prev;
+ prev->next = ptr->next;
+ ptr->next = NULL;
+ tail->next = ptr;
+ return pileptr;
}
+ */
+
+ if((ptr->mid == myIpAddr))
+ {
+ tail->next = pileptr;
+ pileptr = ptr->next;
+ ptr->next = NULL;
+ return pileptr;
+ }
prev = ptr;
}
ptr = ptr->next;
}
//return v_a;
}
+#endif
-
+#ifdef RECOVERY
void clearDeadThreadsNotification()
{