int okCommit; // machine flag
extern numWaitMachine;
extern unsigned int currentEpoch;
+unsigned int leader_index;
#endif
#ifdef RECOVERY
if(firsttime) {
do {
- retval = pthread_create(&thread_dstm_asking, NULL, startAsking, NULL);
+ retval = pthread_create(&thread_dstm_asking, NULL, startPolling, NULL);
}while(retval!=0);
firsttime=0;
pthread_detach(thread_dstm_asking);
}
#ifdef RECOVERY
-void* startAsking()
+void* startPolling()
{
unsigned int deadMachineIndex = -1;
int i;
char response;
while(1){
- for(i = 0; i< numHostsInSystem;i++) {
- if(socklist[i] > 0) {
- send_data(socklist[i], &control,sizeof(char));
- if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
- // if machine is dead, returns index of socket
- return i;
- }
- else {
- // machine responded
- if(response != LIVE) {
+ if(okCommit == TRANS_OK) {
+ for(i = 0; i< numHostsInSystem;i++) {
+ if(socklist[i] > 0) {
+ send_data(socklist[i], &control,sizeof(char));
+
+ if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+ // if machine is dead, returns index of socket
return i;
}
- } // end else
- }// end if(socklist[i]
- } // end for()
+ else {
+ // machine responded
+ if(response != LIVE) {
+ return i;
+ }
+ } // end else
+ }// end if(socklist[i]
+ } // end for()
- clearDeadThreadsNotification();
+ clearDeadThreadsNotification();
+ }
+ else {
+ send_data(socklist[i],&control,sizeof(char));
+
+ if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+ // if machine is dead, returns index of socket
+ return i;
+ }
+ else {
+ // machine responded
+ if(response != LIVE) {
+ return i;
+ }
+ } // end else
+ }
sleep(numLiveHostsInSystem); // wait for seconds for next checking
} // end while(1)
#ifdef RECOVERY
case REQUEST_TRANS_WAIT:
{
+ unsigned int new_leader_index;
recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
+ recv_data((int)acceptfd,&new_leader_index,sizeof(unsigned int));
if(inspectEpoch(epoch_num) < 0) {
response = RESPOND_HIGHER_EPOCH;
}
else {
printf("Got new Leader! : %d\n",epoch_num);
- currentEpoch = epoch_num;
+
stopTransactions(TRANS_BEFORE);
+
+ pthread_mutex_lock(&recovery_mutex);
+ currentEpoch = epoch_num;
+ leader_index = new_leader_index;
+ pthread_mutex_unlock(&recovery_mutex);
+
response = RESPOND_TRANS_WAIT;
send_data((int)acceptfd,&response,sizeof(char));
sendMyList((int)acceptfd);
updateLiveHosts();
setLocateObjHosts();
updateLiveHostsCommit();
-// leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
-// printHostsStatus();
if(!allHostsLive()) {
printf("Not all hosts live. Exiting.\n");
exit(-1);
request = REQUEST_TRANS_WAIT;
send_data(sdlist[i],&request, sizeof(char));
send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
+ send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
}
/* stop all local transactions */