send_data((int)acceptfd,&response,sizeof(char));
}
else {
-// printf("Got new Leader! : %d\n",epoch_num);
+ printf("Got new Leader! : %d\n",epoch_num);
pthread_mutex_lock(&recovery_mutex);
currentEpoch = epoch_num;
okCommit = TRANS_BEFORE;
#endif
/* Read fixed_data_t data structure */
- size = sizeof(fixed) - 1;
+ size = sizeof(fixed_data_t) - 1;
ptr = (char *)&fixed;
fixed.control = TRANS_REQUEST;
timeout = recv_data((int)acceptfd, ptr+1, size);
+// printf("%s -> Received transid = %u\n",__func__,fixed.transid);
+
/* Read list of mids */
int mcount = fixed.mcount;
size = mcount * sizeof(unsigned int);
commitObjects(tNode->decision,fixed,transinfo,modptr,oidmod,acceptfd);
tNode->status = TRANS_AFTER;
}
- if(okCommit == TRANS_AFTER) {
-// printf("%s -> 11 \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
-// sleep(3);
+ else {
+ tNode->decision = TRANS_ABORT;
+ tNode->status = TRANS_INPROGRESS;
+ thashInsert(fixed->transid,tNode->decision);
+ commitObjects(tNode->decision,fixed,transinfo,modptr,oidmod,acceptfd);
+ tNode->status = TRANS_AFTER;
}
}
else {
tNode->status = TRANS_WAIT;
- printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+// printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+// sleep(3);
randomdelay();
}
}while(tNode->status != TRANS_AFTER);
- if(okCommit == TRANS_AFTER)
- {
-// printf("%s -> TRANS_AFTER!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
- }
-
+// printf("%s -> trans ID : %u is cleared\n",__func__,tNode->transid);
pthread_mutex_lock(&translist_mutex);
transList = tlistRemove(transList,fixed->transid);
// printf("%s -> Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
return -1;
}
- sleep(3);
+// sleep(5);
}
walker = walker->next;
}
pthread_mutex_lock(&translist_mutex);
size = transList->size;
// printf("%s -> size = %d\n",__func__,size);
-// printf("%s -> okCommit = %d\n",__func__,okCommit);
+// printf("%s -> okCommit = %d\n",__func__,okCommit);
walker = transList->head;
while(walker){
// printf("%s -> AFTER transid : %u - decision %d Status : %d epoch = %u current epoch : %u\n",__func__,walker->transid,walker->decision,walker->status,walker->epoch_num,currentEpoch);
return -1;
}
- sleep(3);
+// sleep(5);
+ randomdelay();
}while(size != 0);
}
tlist_node_t* transArray = tlistToArray(transList,&size);
pthread_mutex_unlock(&translist_mutex);
- if(transList->size != 0)
- tlistPrint(transList);
+// if(transList->size != 0)
+// tlistPrint(transList);
// printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size);
#ifdef RECOVERY
+
#define INCREASE_EPOCH(x,y,z) ((x/y+1)*y + z)
/***********************************
* Global variables for Duplication
do {
treplyretry = 0;
- pthread_mutex_lock(&translist_mutex);
- transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);
- tNode = tlistSearch(transList,transID);
- pthread_mutex_unlock(&translist_mutex);
-
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
if (firsttime) {
if(sd != 0) {
char control;
int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
- printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
+// printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
timeout = recv_data(sd, &control, sizeof(char));
- printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
+// printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
//Update common data structure with new ctrl msg
getReplyCtrl[i] = control;
/* Recv Objects if participant sends TRANS_DISAGREE */
}
#endif
// printf("%s -> transID = %u Passed this point\n",__func__,transID);
+ pthread_mutex_lock(&translist_mutex);
+ transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);
+ tNode = tlistSearch(transList,transID);
+ pthread_mutex_unlock(&translist_mutex);
#ifdef CACHE
if (finalResponse == TRANS_COMMIT) {
}
} while (treplyretry && deadmid != -1);
-#ifdef RECOVERY
-
-
-
-
-#endif
-
if(finalResponse == TRANS_ABORT) {
#ifdef TRANSSTATS
numTransAbort++;
if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
pthread_mutex_lock(&translist_mutex);
- tlistPrint(tList);
+// tlistPrint(tList);
pthread_mutex_unlock(&translist_mutex);
// getchar();
printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
if(flag < 0) {
printf("%s -> higher epoch\n",__func__);
while(okCommit != TRANS_OK) {
- //printf("%s -> Waiting\n",__func__);
+// sleep(3);
randomdelay();
}