#ifdef RECOVERY
while(okCommit != TRANS_OK) {
-// printf("%s -> new Transactin is waiting\n",__func__);
+// printf("%s -> new Transaction is waiting\n",__func__);
+ // sleep(1);
randomdelay();
}
return;
}
+ pthread_mutex_lock(&liveHosts_mutex);
+ liveHosts[findHost(deadHost)] = 0;
+ numLiveHostsInSystem--;
+ pthread_mutex_unlock(&liveHosts_mutex);
+
+ if(numLiveHostsInSystem == 1)
+ return;
+
// increase epoch number by number machines in the system
pthread_mutex_lock(&recovery_mutex);
epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray);
// update leader's live host list and object locations
do {
- sdlist = getSocketLists();
+ do {
+ sdlist = getSocketLists();
+
+ printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+ if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
- printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
- if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) {
- break;;
- }
+ printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+ if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break;
- printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+ // transfer lost objects
+ if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break;
- if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) {
- break;;
- }
+ // restart transactions
+ restartTransactions(epoch_num,sdlist);
+ }while(0);
+
+ freeSocketLists(sdlist);
- // transfer lost objects
- if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) {
+ // falg == 0 - fixed
+ // == -1 - higher epoch
+ // == -2 - found another failure, redo everything
+ if(flag > -2)
break;
- }
- // restart transactions
- restartTransactions(epoch_num,sdlist);
- }while(0);
- freeSocketLists(sdlist);
+ printf("%s -> Retry \n",__func__);
+ }while(0);
if(flag < 0) {
printf("%s -> higher epoch\n",__func__);
randomdelay();
}
- }else {
+ }else {
+ printf("%s -> I was leader! num : %d\n",__func__,epoch_num);
#ifdef RECOVERYSTATS
fi = myrdtsc();
recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ;
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
- recv_data(sdlist[i],&response,sizeof(char));
+ if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
+ {
+ tlistDestroy(currentTransactionList);
+ return -2;
+ }
if(response == RESPOND_TRANS_WAIT)
{
- // receive live host list
- computeLiveHosts(sdlist[i]);
- // receive transaction list
- makeTransactionLists(¤tTransactionList,sdlist[i]);
+ // receive live host list // receive transaction list
+ if(computeLiveHosts(sdlist[i]) < 0 || makeTransactionLists(¤tTransactionList,sdlist[i]) < 0) {
+ tlistDestroy(currentTransactionList);
+ return -2;
+ }
}
else if(response == RESPOND_HIGHER_EPOCH)
{
return 0;
}
-void computeLiveHosts(int sd)
+int computeLiveHosts(int sd)
{
int receivedHost[numHostsInSystem];
int i;
- recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem);
+ if(recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem))
+ return -2;
for(i = 0 ; i < numHostsInSystem;i ++)
{
liveHosts[i] = 0;
}
- return;
+ return 0;
}
int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
{
// printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i]));
- recv_data(sdlist[i], &response, sizeof(char));
+ if(recv_data(sdlist[i], &response, sizeof(char)) < 0) {
+ tlistDestroy(tlist);
+ return -2;
+ }
if(response != TRANS_OK)
{
printf("%s -> response : %d Need to fix\n",__func__,response);
}
tlistDestroy(tlist);
printf("%s -> End\n",__func__);
+ return 0;
}
// after this fuction
// leader knows all the on-going transaction list and their decisions
-void makeTransactionLists(tlist_t** tlist,int sd)
+int makeTransactionLists(tlist_t** tlist,int sd)
{
tlist_node_t* transArray;
tlist_node_t* tmp;
int size;
// receive all on-going transaction list
- recv_data(sd, &size, sizeof(int));
+ if(recv_data(sd, &size, sizeof(int)) < 0)
+ return -2;
if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) {
printf("%s -> calloc error\n",__func__);
exit(0);
}
- recv_data(sd,transArray, sizeof(tlist_node_t) * size);
+ if(recv_data(sd,transArray, sizeof(tlist_node_t) * size) < 0) {
+ free(transArray);
+ return -2;
+ }
// add into currentTransactionList
for(j = 0 ; j < size; j ++) {
send_data(sd, &request, sizeof(char));
send_data(sd, &(walker->transid), sizeof(unsigned int));
- recv_data(sd, &respond, sizeof(char));
+ if(recv_data(sd, &respond, sizeof(char)) < 0)
+ return -2;
if(respond > 0)
{
request = REQUEST_TRANS_COMPLETE;
send_data(sd, &request,sizeof(char));
- return;
+ return 0;
}
void restartTransactions(unsigned int epoch_num,int* sdlist)
myIndexInHostArray = findHost(myIpAddr);
#ifdef RECOVERY
liveHosts[myIndexInHostArray] = 1;
- currentEpoch = myIndexInHostArray;
+ currentEpoch = 1;
#ifdef RECOVERYSTATS
numRecovery = 0;
for(i = 0 ; i < numHostsInSystem; i ++) {
if(sdlist[i] == -1)
continue;
- recv_data(sdlist[i],&response,sizeof(char));
+ if(recv_data(sdlist[i],&response,sizeof(char)))
+ return -2;
if(response != DUPLICATION_COMPLETE) {
- printf("%s -> fail!\n",__func__);
- exit(0);
+ return -2;
}
}
#ifndef DEBUG
printf("%s-> End\n", __func__);
#endif
+ return 0;
}
#endif
void addHost(unsigned int hostIp) {