else {
printf("Got new Leader! : %d\n",epoch_num);
pthread_mutex_lock(&recovery_mutex);
- currentEpoch = epoch_num;
okCommit = TRANS_BEFORE;
leader_index = new_leader_index;
pthread_mutex_unlock(&recovery_mutex);
// printf("%s -> transID : %u\n",__func__,fixed->transid);
if(inspectEpoch(fixed->epoch_num,"procesClient1") < 0) {
- printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num);
+// printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num);
control = RESPOND_HIGHER_EPOCH;
send_data((int)acceptfd,&control,sizeof(char));
}
int timeout1 = recv_data((int)acceptfd, &control, sizeof(char));
int timeout2 = recv_data((int)acceptfd, &epoch_num, sizeof(unsigned int));
+// printf("%s -> Received for transID : %u\n",__func__,fixed->transid);
if(timeout1 < 0 || timeout2 < 0) { // timeout. failed to receiving data from coordinator
control = DECISION_LOST;
}
// check if it is allowed to commit
tNode->decision = control;
do {
- tNode->status = TRANS_INPROGRESS;
+ tNode->status = TRANS_BEFORE;
if(okCommit != TRANS_BEFORE) {
if(inspectEpoch(tNode->epoch_num,"processCleint2") > 0) {
tNode->status = TRANS_INPROGRESS;
}
else {
tNode->status = TRANS_WAIT;
-// printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
-// sleep(3);
+ printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+ sleep(3);
randomdelay();
}
-
}while(tNode->status != TRANS_AFTER);
// printf("%s -> trans ID : %u is cleared\n",__func__,tNode->transid);
while(walker)
{
// locking
- while(walker->status != TRANS_WAIT && tlistSearch(transList,walker->transid) != NULL) {
+ while((walker->status != TRYING_TO_COMMIT && walker->status != TRANS_WAIT) && tlistSearch(transList,walker->transid) != NULL) {
// printf("%s -> BEFORE transid : %u - decision %d Status : %d \n",__func__,walker->transid,walker->decision,walker->status);
if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) {
// printf("%s -> Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
return -1;
}
-// sleep(5);
+ // sleep(5);
randomdelay();
}while(size != 0);
}
#include "abortreaders.h"
#endif
#include "trans.h"
-#include "mlp_lock.h"
#ifdef RECOVERY
#include <unistd.h>
extern tlist_t* transList;
extern pthread_mutex_t translist_mutex;
extern pthread_mutex_t clearNotifyList_mutex;
+pthread_mutex_t oidlock;
+pthread_mutex_t tidlock;
unsigned int currentEpoch;
goto GDBRECV1;
#endif
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s -> Unexpected ERROR!\n",__func__);
printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
#endif
//Case: numbytes==0
//machine has failed -- this case probably doesn't occur in reality
//
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s -> SHOULD NOT BE HERE\n",__func__);
#endif
return -1;
pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
pthread_mutex_init(¬ifymutex, NULL);
pthread_mutex_init(&atomicObjLock, NULL);
+#ifdef RECOVERY
+ pthread_mutex_init(&oidlock,NULL);
+ pthread_mutex_init(&tidlock,NULL);
+#endif
#ifdef CACHE
/* This function creates objects in the transaction record */
objheader_t *transCreateObj(unsigned int size) {
+ pthread_mutex_lock(&oidlock);
objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
OID(tmp) = getNewOID();
tmp->notifylist = NULL;
tmp->isBackup = 0;
STATUS(tmp) = NEW;
t_chashInsert(OID(tmp), tmp);
+ pthread_mutex_unlock(&oidlock);
#ifdef COMPILER
return &tmp[1]; //want space after object header
#else
pile = pile->next;
} //end of pile processing
- /* Recv Ctrl msgs from all machines */
+
+ pthread_mutex_lock(&translist_mutex);
+ transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);
+ tNode = tlistSearch(transList,transID);
+ pthread_mutex_unlock(&translist_mutex);
+
+
+ /* Recv Ctrl msgs from all machines */
#ifdef DEBUG
printf("%s-> Finished sending transaction read/mod objects transID = %u\n",__func__,transID);
#endif
if(sd != 0) {
char control;
int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
-// printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
+// printf("%s -> Waiting for mid : %s transID = %u sd = %d\n",__func__,midtoIPString(midlist[i]),transID,sd);
timeout = recv_data(sd, &control, sizeof(char));
// printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
printf("%s -> Received Higher epoch\n",__func__);
finalResponse = TRANS_ABORT;
treplyretry = 0;
+// sleep(5);
}
#endif
-// printf("%s -> transID = %u Passed this point\n",__func__,transID);
- pthread_mutex_lock(&translist_mutex);
- transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);
- tNode = tlistSearch(transList,transID);
- pthread_mutex_unlock(&translist_mutex);
+ // printf("%s -> transID = %u Passed this point\n",__func__,transID);
+
+
+
+
#ifdef CACHE
if (finalResponse == TRANS_COMMIT) {
tNode->status = TRANS_AFTER;
}
else {
- tNode->status = TRYING_TO_COMMIT;
if(inspectEpoch(epoch_num,"TRANS_COMMIT2") > 0) {
treplyretry = 1;
}
}
#endif
#endif
+// printf("%s -> Trans Id = %u Sending to sd = %d\n",__func__,tosend[i].f.transid,sd);
send_data(sd,&finalResponse,sizeof(char));
send_data(sd,&epoch_num,sizeof(unsigned int));
} else {
int flag = 0;
#ifdef RECOVERYSTATS
-// printf("Recovery Start\n");
+ printf("Recovery Start dead = %s\n",midtoIPString(deadHost));
long long st;
long long fi;
unsigned int dupeSize = 0; // to calculate the size of backed up data
if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
pthread_mutex_lock(&translist_mutex);
-// tlistPrint(tList);
+ tlistPrint(tList);
pthread_mutex_unlock(&translist_mutex);
// getchar();
-// printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
+ printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break;
// getchar();
-// printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num);
+ printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num);
// transfer lost objects
if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break;
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
-// printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+ printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
request = REQUEST_TRANS_WAIT;
send_data(sdlist[i],&request, sizeof(char));
send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
}
-// printf("%s -> Stop transaction\n",__func__);
+ printf("%s -> Stop transaction\n",__func__);
/* stop all local transactions */
if(stopTransactions(TRANS_BEFORE,epoch_num) < 0)
return -1;
-// printf("After Stop transaction\n");
+ printf("After Stop transaction\n");
// grab leader's transaction list first
tlist_node_t* walker = transList->head;
walker = walker->next;
}
-// printf("%s -> Local Transactions\n",__func__);
-// tlistPrint(currentTransactionList);
+ printf("%s -> Local Transactions\n",__func__);
+ tlistPrint(currentTransactionList);
for(i = 0; i < numHostsInSystem; i++)
{
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
-// printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+ printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
{
printf("Here\n");
if(response == RESPOND_TRANS_WAIT)
{
-// printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
+ printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
int timeout1 = computeLiveHosts(sdlist[i]);
-// printf("%s -> received host list\n",__func__);
+ printf("%s -> received host list\n",__func__);
int timeout2 = makeTransactionLists(¤tTransactionList,sdlist[i],epoch_num);
-// printf("%s -> received transaction list\n",__func__);
+ printf("%s -> received transaction list\n",__func__);
// receive live host list // receive transaction list
if(timeout1 < 0 || timeout2 < 0) {
pthread_mutex_lock(&translist_mutex);
pthread_mutex_unlock(&translist_mutex);
return -2;
}
- // tlistPrint(currentTransactionList);
+ tlistPrint(currentTransactionList);
}
else if(response == RESPOND_HIGHER_EPOCH)
{
}
*tList = currentTransactionList;
-// printf("%s -> Exit\n",__func__);
+ printf("%s -> Exit\n",__func__);
return 0;
}
tlist_node_t* tNode = &transArray[j];
tNode->status = TRANS_OK;
-// printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision);
+ printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision);
*tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num);
}
else {
pthread_mutex_lock(&recovery_mutex);
if(epoch_num < currentEpoch) {
flag = -1;
- }/*
- else if(epoch_num > currentEpoch) {
-// printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num);
-// currentEpoch = epoch_num;
- }*/
+ }
+ else if((epoch_num > currentEpoch) && strcmp(f,"REQUEST_TRANS_WAIT")==0) {
+ printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num);
+ currentEpoch = epoch_num;
+ }
pthread_mutex_unlock(&recovery_mutex);
return flag;
//TODO: when reusing oids, make sure they are not already in use!
static unsigned int id = 0xFFFFFFFF;
unsigned int getNewOID(void) {
- do {
- unsigned int origid=id;
- unsigned int newid=id+2;
- if (newid> oidMax || newid < oidMin) {
- newid=oidMin | 1;
- }
- if (CAS32(&id, origid, newid)==origid)
- return newid;
- } while(1);
+ id += 2;
+ if (id > oidMax || id < oidMin) {
+ id = (oidMin | 1);
+ }
+ return id;
}
#ifdef RECOVERY
static unsigned int tid = 0xFFFFFFFF;
unsigned int getNewTransID(void) {
- do {
- unsigned int origtid=tid;
- unsigned int newtid=tid+2;
- if (newtid>transIDMax || newtid < transIDMin) {
- newtid=transIDMin | 1;
- }
- if (CAS32(&tid, origtid, newtid)==origtid)
- return newtid;
- } while(1);
+ pthread_mutex_lock(&tidlock);
+ tid+=2;
+ if (tid > transIDMax || tid < transIDMin) {
+ tid = (transIDMin | 1);
+ }
+ pthread_mutex_unlock(&tidlock);
+ return tid;
}
#endif