#ifdef RECOVERY
case REQUEST_TRANS_WAIT:
receiveNewHostLists((int)acceptfd);
- stopTransactions();
+ stopTransactions(TRANS_BEFORE);
response = RESPOND_TRANS_WAIT;
send_data((int)acceptfd,&response,sizeof(char));
printf("control -> REQUEST_TRANS_LIST\n");
sendTransList((int)acceptfd);
receiveTransList((int)acceptfd);
+
+ pthread_mutex_lock(&liveHosts_mutex);
+ okCommit = TRANS_AFTER;
+ pthread_mutex_unlock(&liveHosts_mutex);
break;
case REQUEST_TRANS_RESTART:
return 1;
}
+// printf("%s -> Waiting for transID : %u\n",__func__,fixed->transid);
+
int timeout = recv_data((int)acceptfd, &control, sizeof(char));
#ifdef RECOVERY
control = -1;
}
// check if it is allowed to commit
- control = inspectTransaction(control,fixed->transid);
+ control = inspectTransaction(control,fixed->transid,"processClientReq",TRANS_BEFORE);
thashInsert(fixed->transid, control);
#endif
tlist_node_t* tNode = tlistSearch(transList,fixed->transid);
tNode->status = TRANS_OK;
+ inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
pthread_mutex_lock(&clearNotifyList_mutex);
transList = tlistRemove(transList,fixed->transid);
pthread_mutex_unlock(&clearNotifyList_mutex);
+ // ====================after transaction point
+
#endif
/* Free memory */
}
/* wait until all transaction waits for leader's decision */
-void stopTransactions()
+void stopTransactions(int TRANS_FLAG)
{
- printf("%s - > Enter\n",__func__);
+// printf("%s - > Enter flag :%d\n",__func__,TRANS_FLAG);
int size = transList->size;
int i;
tlist_node_t* walker;
pthread_mutex_lock(&liveHosts_mutex);
- okCommit = TRANS_WAIT;
+ okCommit = TRANS_FLAG;
pthread_mutex_unlock(&liveHosts_mutex);
/* make sure that all transactions are stopped */
while(walker)
{
// locking
- while(!(walker->status == TRANS_WAIT || walker->status == TRANS_OK)) {
+ while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) {
printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
sleep(2);
}
}while(transList->flag == 1);
pthread_mutex_unlock(&clearNotifyList_mutex);
- printf("%s - > Exit\n",__func__);
+// printf("%s - > Exit\n",__func__);
}
void sendTransList(int acceptfd)
}
free(transArray);
- printf("%s - > Exit\n",__func__);
}
void receiveTransList(int acceptfd)
{
- printf("%s -> Enter\n",__func__);
int size;
tlist_node_t* tArray;
tlist_node_t* walker;
recv_data((int)acceptfd,&size,sizeof(int));
- printf("%s -> size : %d\n",__func__,size);
if(size > 0) {
if((tArray = calloc(size,sizeof(tlist_node_t) * size)) == NULL)
response = -1;
}
- printf("%s -> response : %d\n",__func__,response);
-
send_data((int)acceptfd,&response,sizeof(char));
-
- printf("%s -> End\n",__func__);
}
if(walker->transid == tArray[i].transid)
{
walker->decision = tArray[i].decision;
+ walker->status = tArray[i].status;
break;
}
}
return flag;
}
-char inspectTransaction(char finalResponse,unsigned int transid)
+char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int TRANS_FLAG)
{
tlist_node_t* tNode;
tNode = tlistSearch(transList,transid);
- if(finalResponse < 0) {
+ if(finalResponse <= 0) {
tNode->decision = DECISION_LOST;
}
else {
if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK)))
{
pthread_mutex_lock(&liveHosts_mutex);
- tNode->status = TRANS_WAIT;
+ tNode->status = TRANS_FLAG;
pthread_mutex_unlock(&liveHosts_mutex);
- while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) {
- printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision);
+ // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
+ while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) {
+ printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
sleep(3);
}
#ifdef RECOVERY
while(okCommit != TRANS_OK) {
- printf("%s -> new Transactin is waiting\n",__func__);
+// printf("%s -> new Transactin is waiting\n",__func__);
sleep(2);
}
#ifdef RECOVERY
// wait until leader fix the system
+
if(okCommit != TRANS_OK) {
- while(okCommit != TRANS_OK) {
- printf("%s -> Coordinator is waiting finalResponse : %d\n",__func__,finalResponse);
- sleep(1);
- }
+ inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_AFTER);
finalResponse = TRANS_ABORT;
}
#endif
} while (treplyretry && deadmid != -1);
#ifdef RECOVERY
+
+ //=========== after transaction point
tlist_node_t* tNode = tlistSearch(transList,transID);
+ inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER);
+
tNode->status = TRANS_OK;
+ finalResponse = tNode->decision;
pthread_mutex_lock(&clearNotifyList_mutex);
transList = tlistRemove(transList,transID);
char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
#ifdef RECOVERY
- finalResponse = inspectTransaction(finalResponse,tdata->f.transid);
+ finalResponse = inspectTransaction(finalResponse,tdata->f.transid,"Local Commit",TRANS_BEFORE);
thashInsert(tdata->f.transid,finalResponse);
#endif
// clear transaction
clearTransaction();
+// getchar();
// transfer lost objects
duplicateLostObjects(deadHost);
- getchar();
// restart transactions
restartTransactions();
int sd;
int sdlist[numHostsInSystem];
- printf("%s -> Enter\n",__func__);
-
printHostsStatus();
pthread_mutex_lock(&liveHosts_mutex);
}
}
/* stop all local transactions */
- stopTransactions();
- printf("%s -> End\n",__func__);
+ stopTransactions(TRANS_BEFORE);
}
/* acknowledge leader that all transactions are waiting */
returns an array of ongoing transactions */
makeTransactionLists(&tlist,sdlist);
+// getchar();
+
/* release the cleared decisions to all machines */
releaseTransactionLists(tlist,sdlist);
}
tlistDestroy(tlist);
-
- printf("%s -> End\n",__func__);
+ printf("%s -> End\n",__func__);
}
// after this fuction
tlist_node_t* walker = transList->head;
while(walker) {
- tlistInsertNode2(currentTransactionList,walker);
+ walker->status = TRANS_OK;
+ currentTransactionList = tlistInsertNode2(currentTransactionList,walker);
walker = walker->next;
}
tmp = tlistSearch(currentTransactionList,transArray[j].transid);
if(tmp == NULL) {
+ tlist_node_t* tNode = &transArray[j];
+ tNode->status = TRANS_OK;
currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j]));
}
else {
- if(tmp->decision == DECISION_LOST)
+ if((tmp->decision != TRANS_COMMIT && tmp->decision != TRANS_ABORT)
+ && (transArray[j].decision == TRANS_COMMIT || transArray[j].decision == TRANS_ABORT))
{
tmp->decision = transArray[j].decision;
- }
+ }
}
} // j loop
}
} // i loop
+
+ printf("Before\n");
+ tlistPrint(currentTransactionList);
// current transaction list is completed
// now see if any transaction is still missing
walker = currentTransactionList->head;
while(walker) {
- if(walker->decision == DECISION_LOST) {
+// if(walker->decision == DECISION_LOST) {
for(i = 0 ; i < numHostsInSystem; i++) {
if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
{
printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid);
walker->decision = TRANS_ABORT;
}
- }
+ if(walker->decision == TRYING_TO_COMMIT) {
+ printf("%s -> no decision yet transID : %u\n",__func__,walker->transid);
+ }
walker = walker->next;
} // while loop
}
*tlist = currentTransactionList;
+ printf("\n\nAfter\n");
tlistPrint(currentTransactionList);
printf("%s -> End\n",__func__);
{
if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
{
-// printf("%s -> Sent to sd : %d\n",__func__,sdlist[i]);
-
if(size == 0) {
size = -1;
send_data(sdlist[i],&size,sizeof(int));
printf("%s -> problem\n",__func__);
exit(0);
}
+
+ pthread_mutex_lock(&liveHosts_mutex);
+ okCommit = TRANS_AFTER;
+ pthread_mutex_unlock(&liveHosts_mutex);
+
}
}
{
int i;
int sd;
- printf("%s -> Enter\n",__func__);
for(i = 0; i < numHostsInSystem; i++) {
if(hostIpAddrs[i] == myIpAddr) {
pthread_mutex_lock(&liveHosts_mutex);
}
}
}
- printf("%s -> End\n",__func__);
}
#endif
#ifdef RECOVERY
void duplicateLostObjects(unsigned int mid){
+ printf("%s -> Enter\n",__func__);
#ifdef RECOVERYSTATS
unsigned int dupeSize = 0;
* Backup 26 21,24
*/
- if(((psd = getSockWithLock(transRequestSockPool, originalMid)) < 0 ) ||
- ((bsd = getSockWithLock(transRequestSockPool,backupMid)) <0)) {
+ if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) ||
+ ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd);
printf("%s -> Socket create error\n",__func__);
exit(0);
}
- freeSockWithLock(transRequestSockPool, originalMid, psd);
- freeSockWithLock(transRequestSockPool, backupMid, bsd);
+ freeSockWithLock(transPrefetchSockPool, originalMid, psd);
+ freeSockWithLock(transPrefetchSockPool, backupMid, bsd);
#ifdef RECOVERYSTATS
recoverStat[numRecovery-1].recoveredData = dupeSize;