send_data((int)acceptfd,&response,sizeof(char));
}
else {
- printf("Got new Leader! : %d\n",epoch_num);
+// printf("Got new Leader! : %d\n",epoch_num);
pthread_mutex_lock(&recovery_mutex);
currentEpoch = epoch_num;
okCommit = TRANS_BEFORE;
break;
case RELEASE_NEW_LIST:
- printf("control -> RELEASE_NEW_LIST\n");
+// printf("control -> RELEASE_NEW_LIST\n");
{
recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
tNode->status = TRANS_AFTER;
}
if(okCommit == TRANS_AFTER) {
- printf("%s -> 11 \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
- sleep(3);
+// printf("%s -> 11 \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+// sleep(3);
}
}
else {
tNode->status = TRANS_WAIT;
- printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
- sleep(3);
+// printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+// sleep(3);
randomdelay();
}
if(okCommit == TRANS_AFTER)
{
- printf("%s -> TRANS_AFTER!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
- printf("%s -> Before removing\n",__func__);
+// printf("%s -> TRANS_AFTER!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
}
transList = tlistRemove(transList,fixed->transid);
pthread_mutex_unlock(&translist_mutex);
- if(okCommit == TRANS_AFTER)
- printf("%s -> After removing\n",__func__);
-
-
/* Free memory */
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
{
// locking
while(walker->status != TRANS_WAIT && tlistSearch(transList,walker->transid) != NULL) {
- printf("%s -> BEFORE transid : %u - decision %d Status : %d \n",__func__,walker->transid,walker->decision,walker->status);
+// printf("%s -> BEFORE transid : %u - decision %d Status : %d \n",__func__,walker->transid,walker->decision,walker->status);
if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) {
- printf("%s -> Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
+// printf("%s -> Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
return -1;
}
sleep(3);
}
else if(TRANS_FLAG == TRANS_AFTER)
{
- printf("%s -> TRANS_AFTER\n",__func__);
+// printf("%s -> TRANS_AFTER\n",__func__);
okCommit = TRANS_AFTER;
do {
pthread_mutex_lock(&translist_mutex);
size = transList->size;
- printf("%s -> size = %d\n",__func__,size);
- printf("%s -> okCommit = %d\n",__func__,okCommit);
+// printf("%s -> size = %d\n",__func__,size);
+// printf("%s -> okCommit = %d\n",__func__,okCommit);
walker = transList->head;
while(walker){
- printf("%s -> AFTER transid : %u - decision %d Status : %d epoch = %u current epoch : %u\n",__func__,walker->transid,walker->decision,walker->status,walker->epoch_num,currentEpoch);
+// printf("%s -> AFTER transid : %u - decision %d Status : %d epoch = %u current epoch : %u\n",__func__,walker->transid,walker->decision,walker->status,walker->epoch_num,currentEpoch);
walker = walker->next;
}
pthread_mutex_unlock(&translist_mutex);
if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) {
- printf("%s -> 222Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
+// printf("%s -> 222Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
return -1;
}
void sendTransList(int acceptfd)
{
- printf("%s -> Enter\n",__func__);
+// printf("%s -> Enter\n",__func__);
int size;
char response;
int transid;
if(transList->size != 0)
tlistPrint(transList);
- printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size);
+// printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size);
for(i = 0; i< size; i++) {
- printf("ID : %u Decision : %d status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status);
+// printf("ID : %u Decision : %d status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status);
}
- printf("%s -> End transArray\n",__func__);
+// printf("%s -> End transArray\n",__func__);
send_data((int)acceptfd,&size,sizeof(int));
send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size);
int receiveNewList(int acceptfd)
{
- printf("%s -> Enter\n",__func__);
+// printf("%s -> Enter\n",__func__);
int size;
tlist_node_t* tArray;
tlist_node_t* walker;
response = -1;
}
- printf("%s -> Exit\n",__func__);
+// printf("%s -> Exit\n",__func__);
return response;
}
randomdelay();
}
#endif
+
+ pthread_mutex_lock(&recovery_mutex);
+ epoch_num = currentEpoch;
+ pthread_mutex_unlock(&recovery_mutex);
int treplyretryCount = 0;
/* Initialize timeout for exponential delay */
do {
treplyretry = 0;
- pthread_mutex_lock(&recovery_mutex);
- epoch_num = currentEpoch;
- pthread_mutex_unlock(&recovery_mutex);
-
pthread_mutex_lock(&translist_mutex);
transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);
tNode = tlistSearch(transList,transID);
if(sd != 0) {
char control;
int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
-// printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
+ printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
timeout = recv_data(sd, &control, sizeof(char));
-// printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
+ printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
//Update common data structure with new ctrl msg
getReplyCtrl[i] = control;
/* Recv Objects if participant sends TRANS_DISAGREE */
do {
sdlist = getSocketLists();
- printf("%s -> I'm currently leader num : %d ping machines\n\n",__func__,epoch_num);
+// printf("%s -> I'm currently leader num : %d ping machines\n\n",__func__,epoch_num);
if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
pthread_mutex_lock(&translist_mutex);
if(flag > -2)
break;
- printf("%s -> Retry \n",__func__);
+// printf("%s -> Retry \n",__func__);
}while(0);
if(flag < 0) {
- printf("%s -> higher epoch\n",__func__);
+// printf("%s -> higher epoch\n",__func__);
while(okCommit != TRANS_OK) {
// printf("%s -> Waiting\n",__func__);
randomdelay();
int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
{
- printf("%s -> Enter\n",__func__);
+// printf("%s -> Enter\n",__func__);
int i;
char request;
char response;
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
- printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+// printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
request = REQUEST_TRANS_WAIT;
send_data(sdlist[i],&request, sizeof(char));
send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
}
- printf("%s -> Stop transaction\n",__func__);
+// printf("%s -> Stop transaction\n",__func__);
/* stop all local transactions */
if(stopTransactions(TRANS_BEFORE,epoch_num) < 0)
return -1;
- printf("After Stop transaction\n");
+// printf("After Stop transaction\n");
// grab leader's transaction list first
tlist_node_t* walker = transList->head;
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
- printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+// printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
{
printf("Here\n");
printf("recevied response = %d\n",response);
if(response == RESPOND_TRANS_WAIT)
{
- printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
+// printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
int timeout1 = computeLiveHosts(sdlist[i]);
- printf("%s -> received host list\n",__func__);
+// printf("%s -> received host list\n",__func__);
int timeout2 = makeTransactionLists(¤tTransactionList,sdlist[i],epoch_num);
- printf("%s -> received transaction list\n",__func__);
+// printf("%s -> received transaction list\n",__func__);
// receive live host list // receive transaction list
if(timeout1 < 0 || timeout2 < 0) {
pthread_mutex_lock(&translist_mutex);
pthread_mutex_unlock(&translist_mutex);
return -2;
}
- printf("\n\n\nAfter mid : %s \n",midtoIPString(hostIpAddrs[i]));
- tlistPrint(currentTransactionList);
+ // tlistPrint(currentTransactionList);
}
else if(response == RESPOND_HIGHER_EPOCH)
{
int receivedHost[numHostsInSystem];
int i;
- if(recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem))
+ if(recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem) < 0)
return -2;
for(i = 0 ; i < numHostsInSystem;i ++)
send_data(sdlist[i],tArray,sizeof(tlist_node_t) * size);
}
}
- else {
+ else if(hostIpAddrs[i] == myIpAddr) {
setLocateObjHosts();
// printHostsStatus();
flag = combineTransactionList(tArray,size);
tlistDestroy(tlist);
return -2;
}
- if(response != TRANS_OK)
+ if(response != TRANS_OK && response != RESPOND_HIGHER_EPOCH)
{
printf("%s -> response : %d Need to fix\n",__func__,response);
}
*tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num);
}
else {
- if(tmp->decision == DECISION_LOST && transArray[j].decision != DECISION_LOST)
+ if((tmp->decision != TRANS_ABORT && tmp->decision != TRANS_COMMIT) && (transArray[j].decision == TRANS_COMMIT || transArray[j].decision == TRANS_ABORT))
{
tmp->decision = transArray[j].decision;
}
-
}
} // j loop