okCommit = TRANS_OK;
currentEpoch = 1;
+ leader_index = -1;
#endif
clearDeadThreadsNotification();
}
else {
- send_data(socklist[i],&control,sizeof(char));
-
- if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
- // if machine is dead, returns index of socket
- return i;
- }
- else {
- // machine responded
- if(response != LIVE) {
+ if(leader_index >= 0 ) {
+ send_data(socklist[i],&control,sizeof(char));
+
+ if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+ // if machine is dead, returns index of socket
return i;
}
- } // end else
+ else {
+ // machine responded
+ if(response != LIVE) {
+ return i;
+ }
+ } // end else
+ }
}
sleep(numLiveHostsInSystem); // wait for seconds for next checking
unsigned int *oidarry, numoid, mid, threadid;
int n, v;
-#ifdef DEBUG
- printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
-#endif
/* Receive control messages from other machines */
while(1) {
int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
// exit(0);
break;
}
-#ifdef DEBUG
- printf("%s-> dstmAccept control = %d\n", __func__, (int)control);
-#endif
switch(control) {
case READ_REQUEST:
#ifdef DEBUG
printf("RESTART!!!\n");
okCommit = TRANS_OK;
pthread_mutex_unlock(&liveHosts_mutex);
+
+ pthread_mutex_lock(&recovery_mutex);
+ leader_index = -1;
+ pthread_mutex_unlock(&recovery_mutex);
+
break;
case UPDATE_LIVE_HOSTS:
#ifdef DEBUG
// if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) {
// printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
-// sleep(3);
randomdelay();
}
else if( numbytes < 0) {
// Receive returned an error.
// Analyze underlying cause
-#ifdef DEBUG
- printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno));
- fflush(stdout);
-#endif
if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
// machine has failed
//
// when we start send and finish send see if it is longer
// than our threshold
//
-#ifdef DEBUG
- printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
-#endif
return -1;
} else {
#ifdef GDBDEBUG
goto GDBSEND1;
#endif
-#ifdef DEBUG
- printf("%s -> Unexpected ERROR!\n",__func__);
-#endif
return -2;
}
}
}
#endif
} // close while loop
-#ifdef DEBUG
- printf("%s-> Exiting\n", __func__);
-#endif
return 0; // completed sending data
}
}
int recv_data_errorcode(int fd, void *buf, int buflen) {
-#ifdef DEBUG
- printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
char *buffer = (char *)(buf);
int size = buflen;
int numbytes;
while (size > 0) {
numbytes = recv(fd, buffer, size, 0);
-#ifdef DEBUG
- printf("%s-> numbytes: %d\n", __func__, numbytes);
-#endif
if (numbytes==0)
return 0;
else if (numbytes == -1) {
buffer += numbytes;
size -= numbytes;
}
-#ifdef DEBUG
- printf("%s-> Exiting\n", __func__);
-#endif
return 1;
}
printf("%s -> Entering\n",__func__);
int* sdlist;
tlist_t* tList;
+ int flag = 0;
#ifdef RECOVERYSTATS
printf("Recovery Start\n");
long long st;
long long fi;
- int flag = 0;
unsigned int dupeSize = 0; // to calculate the size of backed up data
st = myrdtsc(); // to get clock
return;
}
else {
- printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
- printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
+// printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
+// printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
msg[0] = CLEAR_NOTIFY_LIST;
*((unsigned int *)(&msg[1])) = oid;
int i;
for(i=0; i < numRecovery;i++) {
printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
- printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData);
printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
}
printf("**************************\n\n");
fflush(stdout);
- fflush(stdout);
#else
printf("No stat\n");
#endif