case REQUEST_TRANS_LIST:
printf("control -> REQUEST_TRANS_LIST\n");
sendTransList((int)acceptfd);
- receiveTransList((int)acceptfd);
-
- pthread_mutex_lock(&liveHosts_mutex);
- okCommit = TRANS_AFTER;
- pthread_mutex_unlock(&liveHosts_mutex);
+ response = receiveTransList((int)acceptfd);
+ stopTransactions(TRANS_AFTER);
+
+ send_data((int)acceptfd,&response,sizeof(char));
break;
case REQUEST_TRANS_RESTART:
}
#ifdef RECOVERY
-// printf("%s -> transID : %u has been committed\n",__func__,transID);
+ inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
tlist_node_t* tNode = tlistSearch(transList,fixed->transid);
tNode->status = TRANS_OK;
- inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
pthread_mutex_lock(&clearNotifyList_mutex);
transList = tlistRemove(transList,fixed->transid);
pthread_mutex_unlock(&clearNotifyList_mutex);
- // ====================after transaction point
-
#endif
/* Free memory */
(*objnotfound)++;
*control = TRANS_DISAGREE;
} else { /* If Obj found in machine (i.e. has not moved) */
-#ifdef DEBUG
- printf("%s -> Obj found!!\n",__func__);
- printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
- fflush(stdout);
-#endif
-
/* Check if Obj is locked by any previous transaction */
if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
if (version == ((objheader_t *)mobj)->version) { /* match versions */
pthread_mutex_lock(&liveHosts_mutex);
okCommit = TRANS_FLAG;
pthread_mutex_unlock(&liveHosts_mutex);
- /* make sure that all transactions are stopped */
+ /* make sure that all transactions are stopped */
pthread_mutex_lock(&clearNotifyList_mutex);
do {
{
// locking
while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) {
- printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
- sleep(2);
+// printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
+ randomdelay();
}
walker = walker->next;
}while(transList->flag == 1);
pthread_mutex_unlock(&clearNotifyList_mutex);
-// printf("%s - > Exit\n",__func__);
}
void sendTransList(int acceptfd)
// check if it already commit the decision for a transaction
recv_data((int)acceptfd,&response, sizeof(char));
- while(response == REQUEST_TRANS_CHECK)
+ while(response == REQUEST_TRANS_CHECK && response != REQUEST_TRANS_COMPLETE )
{
int transid;
recv_data((int)acceptfd,&transid, sizeof(unsigned int));
free(transArray);
}
-void receiveTransList(int acceptfd)
+int receiveTransList(int acceptfd)
{
int size;
tlist_node_t* tArray;
response = -1;
}
- send_data((int)acceptfd,&response,sizeof(char));
+ return response;
}
if(walker->transid == tArray[i].transid)
{
walker->decision = tArray[i].decision;
- walker->status = tArray[i].status;
+// walker->status = tArray[i].status;
break;
}
}
tNode->decision = finalResponse;
}
- if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK)))
+// printf("%s -> decision = %d okCommit = %d\n",__func__,tNode->decision,okCommit);
+
+ if((tNode->decision == DECISION_LOST) || (okCommit != TRANS_OK))
{
pthread_mutex_lock(&liveHosts_mutex);
tNode->status = TRANS_FLAG;
// if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) {
- printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
- sleep(3);
+// printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
+ randomdelay();
}
finalResponse = tNode->decision;
#ifdef RECOVERY
while(okCommit != TRANS_OK) {
// printf("%s -> new Transactin is waiting\n",__func__);
- sleep(2);
+ randomdelay();
}
transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK);
#ifdef RECOVERY
// wait until leader fix the system
-
if(okCommit != TRANS_OK) {
- inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_AFTER);
+ inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_BEFORE);
finalResponse = TRANS_ABORT;
+ treplyretry = 0;
}
#endif
// clear transaction
clearTransaction();
-// getchar();
// transfer lost objects
duplicateLostObjects(deadHost);
+
// restart transactions
restartTransactions();
returns an array of ongoing transactions */
makeTransactionLists(&tlist,sdlist);
-// getchar();
/* release the cleared decisions to all machines */
releaseTransactionLists(tlist,sdlist);
}
}
} // j loop
+
+ free(transArray);
}
} // i loop
exit(0);
}
- pthread_mutex_lock(&liveHosts_mutex);
- okCommit = TRANS_AFTER;
- pthread_mutex_unlock(&liveHosts_mutex);
-
+// okCommit = TRANS_AFTER;
+ stopTransactions(TRANS_AFTER);
}
}