void updateLiveHostsList(int mid);
int updateLiveHostsCommit();
void receiveNewHostLists(int accept);
-void stopTransactions(int TRANS_FLAG);
+int stopTransactions(int TRANS_FLAG,unsigned int epoch_num);
void sendMyList(int);
void sendTransList(int acceptfd);
int receiveTransList(int acceptfd);
int combineTransactionList(tlist_node_t* tArray,int size);
-char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG);
void respondToLeader();
void setLocateObjHosts();
void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num);
int* getSocketLists();
void freeSocketLists(int*);
-int inspectEpoch(unsigned int);
+int inspectEpoch(unsigned int,const char*);
int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**);
int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*);
int duplicateLostObjects(unsigned int epoch_num,int* sdlist);
void restartTransactions(unsigned int epoch_num,int* sdlist);
-int makeTransactionLists(tlist_t**,int);
+int makeTransactionLists(tlist_t**,int sd,unsigned int epoch_num);
int computeLiveHosts(int);
void waitForAllMachine();
int readDuplicateObjs(int);
int readClientReq(trans_commit_data_t *, int);
int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
+void commitObjects(char control,fixed_data_t* fixed,trans_commit_data_t* transinfo,void* modptr,unsigned int* oidmod,int acceptfd);
char checkDecision(unsigned int);
char receiveDecisionFromBackup(unsigned int,int,unsigned int*);
char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
objheader_t *transCreateObj(unsigned int); //returns oid header
unsigned int locateBackupMachine(unsigned int oid);
int transCommit(); //return 0 if successful
+void commitMessages(unsigned int epoch_num,int* sdlist,unsigned int deadsd,int pilecount,trans_req_data_t* tosend,char finalResponse,char treplyretry,trans_commit_data_t transinfo);
void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins
char decideResponse(char *, char *, int); // Coordinator decides what response to send to the participant
void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine
extern int numLiveHostsInSystem;
int clearNotifyListFlag;
pthread_mutex_t clearNotifyList_mutex;
+pthread_mutex_t translist_mutex;
tlist_t* transList;
int okCommit; // machine flag
pthread_mutex_init(&liveHosts_mutex, NULL);
pthread_mutex_init(&recovery_mutex, NULL);
pthread_mutex_init(&clearNotifyList_mutex,NULL);
+ pthread_mutex_init(&translist_mutex,NULL);
#endif
if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
while(1) {
int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
//int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
+// printf("%s -> Received control = %d\n",__func__,control);
dupeptr = NULL;
if (ret==0)
#endif
#ifdef RECOVERY
case REQUEST_TRANS_WAIT:
- {
+ {
unsigned int new_leader_index;
recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
recv_data((int)acceptfd,&new_leader_index,sizeof(unsigned int));
- if(inspectEpoch(epoch_num) < 0) {
+ if(inspectEpoch(epoch_num,"REQUEST_TRANS_WAIT") < 0) {
response = RESPOND_HIGHER_EPOCH;
send_data((int)acceptfd,&response,sizeof(char));
}
else {
printf("Got new Leader! : %d\n",epoch_num);
-
- stopTransactions(TRANS_BEFORE);
-
pthread_mutex_lock(&recovery_mutex);
currentEpoch = epoch_num;
+ okCommit = TRANS_BEFORE;
leader_index = new_leader_index;
pthread_mutex_unlock(&recovery_mutex);
-
- response = RESPOND_TRANS_WAIT;
- send_data((int)acceptfd,&response,sizeof(char));
- sendMyList((int)acceptfd);
+ if(stopTransactions(TRANS_BEFORE,epoch_num) < 0) {
+ response = RESPOND_HIGHER_EPOCH;
+ send_data((int)acceptfd,&response,sizeof(char));
+ }
+ else {
+ response = RESPOND_TRANS_WAIT;
+ send_data((int)acceptfd,&response,sizeof(char));
+ sendMyList((int)acceptfd);
+ }
}
}
break;
{
recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
- if(inspectEpoch(epoch_num) < 0) {
+ if(inspectEpoch(epoch_num,"RELEASE_NEW_LIST") < 0) {
response = RESPOND_HIGHER_EPOCH;
}
else
{
response = receiveNewList((int)acceptfd);
- stopTransactions(TRANS_AFTER);
+ if(stopTransactions(TRANS_AFTER,epoch_num) < -1)
+ response = RESPOND_HIGHER_EPOCH;
}
+ printf("After stop transaction\n");
send_data((int)acceptfd,&response,sizeof(char));
}
break;
recv_data((int)acceptfd,&epoch_num,sizeof(char));
- if(inspectEpoch(epoch_num) < 0) break;
+ if(inspectEpoch(epoch_num,"REQUEST_TRANS_RESTART") < 0) break;
pthread_mutex_lock(&liveHosts_mutex);
printf("RESTART!!!\n");
recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
- if(inspectEpoch(epoch_num) < 0) {
+ if(inspectEpoch(epoch_num,"REQUEST_DUPLICATE") < 0) {
break;
}
return -1;
}
- printf("%s -> PAss this point\n",__func__);
ptr = dupeptr;
- printf("%s -> numoid = %u\n",__func__,numoid);
for(i = 0; i < numoid; i++) {
header = (objheader_t *)ptr;
oid = OID(header);
fixed.control = TRANS_REQUEST;
timeout = recv_data((int)acceptfd, ptr+1, size);
-#ifdef RECOVERY
- transList = tlistInsertNode(transList,fixed.transid,TRYING_TO_COMMIT,TRANS_OK);
-#endif
-
/* Read list of mids */
int mcount = fixed.mcount;
size = mcount * sizeof(unsigned int);
char control, sendctrl, retval;
objheader_t *tmp_header;
- void *header;
- int i = 0, val;
+ int i = 0;
+ unsigned int epoch_num;
+ tlist_node_t* tNode;
+#ifdef DEBUG
+ printf("%s -> Enter\n",__func__);
+#endif
+// printf("%s -> transID : %u\n",__func__,fixed->transid);
+ if(inspectEpoch(fixed->epoch_num,"procesClient1") < 0) {
+ printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num);
+ control = RESPOND_HIGHER_EPOCH;
+ send_data((int)acceptfd,&control,sizeof(char));
+ }
/* Send reply to the Coordinator */
- if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
+ else if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
printf("DEBUG-> Exiting processClientReq, line = %d\n", __LINE__);
return 1;
}
// printf("%s -> Waiting for transID : %u\n",__func__,fixed->transid);
+ int timeout1 = recv_data((int)acceptfd, &control, sizeof(char));
+ int timeout2 = recv_data((int)acceptfd, &epoch_num, sizeof(unsigned int));
- int timeout = recv_data((int)acceptfd, &control, sizeof(char));
-
-#ifdef RECOVERY
- if(timeout < 0) { // timeout. failed to receiving data from coordinator
- control = -1;
+ if(timeout1 < 0 || timeout2 < 0) { // timeout. failed to receiving data from coordinator
+ control = DECISION_LOST;
}
+
+ pthread_mutex_lock(&translist_mutex);
+ transList = tlistInsertNode(transList,fixed->transid,control,TRYING_TO_COMMIT,epoch_num);
+ pthread_mutex_unlock(&translist_mutex);
+
+ pthread_mutex_lock(&translist_mutex);
+ tNode = tlistSearch(transList,fixed->transid);
+ pthread_mutex_unlock(&translist_mutex);
+
// check if it is allowed to commit
- control = inspectTransaction(control,fixed->transid,"processClientReq",TRANS_BEFORE);
- thashInsert(fixed->transid, control);
+ do {
+ tNode->status = TRANS_INPROGRESS;
+ if(okCommit != TRANS_BEFORE) {
+ if(inspectEpoch(tNode->epoch_num,"processCleint2") > 0) {
+ tNode->status = TRANS_INPROGRESS;
+ thashInsert(fixed->transid,tNode->decision);
+ commitObjects(tNode->decision,fixed,transinfo,modptr,oidmod,acceptfd);
+ tNode->status = TRANS_AFTER;
+ }
+ if(okCommit == TRANS_AFTER) {
+ printf("%s -> 11 \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+ sleep(3);
+ }
+ }
+ else {
+ tNode->status = TRYING_TO_COMMIT;
+ printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+ sleep(3);
+ randomdelay();
+ }
+
+ }while(tNode->status != TRANS_AFTER);
+
+ if(okCommit == TRANS_AFTER)
+ {
+ printf("%s -> TRANS_AFTER!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+ printf("%s -> Before removing\n",__func__);
+ }
+
+
+ pthread_mutex_lock(&translist_mutex);
+ transList = tlistRemove(transList,fixed->transid);
+ pthread_mutex_unlock(&translist_mutex);
+
+ if(okCommit == TRANS_AFTER)
+ printf("%s -> After removing\n",__func__);
+
+ /* Free memory */
+ if (transinfo->objlocked != NULL) {
+ free(transinfo->objlocked);
+ }
+ if (transinfo->objnotfound != NULL) {
+ free(transinfo->objnotfound);
+ }
+#ifdef DEBUG
+ printf("%s-> Exit\n", __func__);
#endif
+ return 0;
+}
+
+void commitObjects(char control,fixed_data_t* fixed,trans_commit_data_t* transinfo,void* modptr,unsigned int* oidmod,int acceptfd)
+{
+ void *header;
+ int val;
+ int i;
+
switch(control) {
- case TRANS_ABORT:
- if (fixed->nummod > 0)
- free(modptr);
- /* Unlock objects that was locked due to this transaction */
- int useWriteUnlock = 0;
+ case TRANS_ABORT:
+ if (fixed->nummod > 0)
+ free(modptr);
+ /* Unlock objects that was locked due to this transaction */
+ int useWriteUnlock = 0;
for(i = 0; i< transinfo->numlocked; i++) {
- if(transinfo->objlocked[i] == -1) {
- useWriteUnlock = 1;
- continue;
+ if(transinfo->objlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
}
if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
- printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
- return 1;
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
+ printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
+ exit(0);
+ return ;
}
if(useWriteUnlock) {
- write_unlock(STATUSPTR(header));
+ write_unlock(STATUSPTR(header));
} else {
- read_unlock(STATUSPTR(header));
- }
+ read_unlock(STATUSPTR(header));
+ }
}
break;
-
- case TRANS_COMMIT:
+ case TRANS_COMMIT:
/* insert received control into thash for another transaction*/
/* Invoke the transCommit process() */
- if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
- printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
+ if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
+ printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
/* Free memory */
if (transinfo->objlocked != NULL) {
- free(transinfo->objlocked);
+ free(transinfo->objlocked);
}
if (transinfo->objnotfound != NULL) {
- free(transinfo->objnotfound);
- }
- printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
- return 1;
+ free(transinfo->objnotfound);
+ }
+ printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
+ exit(0);
+ return;
}
-
-
- break;
-
- case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
- break;
-
- default:
- printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
- //TODO Use fixed.trans_id TID since Client may have died
- break;
- }
-
-#ifdef RECOVERY
- inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
-
- tlist_node_t* tNode = tlistSearch(transList,fixed->transid);
- tNode->status = TRANS_OK;
-
- pthread_mutex_lock(&clearNotifyList_mutex);
- transList = tlistRemove(transList,fixed->transid);
- pthread_mutex_unlock(&clearNotifyList_mutex);
-
-#endif
-
- /* Free memory */
- if (transinfo->objlocked != NULL) {
- free(transinfo->objlocked);
- }
- if (transinfo->objnotfound != NULL) {
- free(transinfo->objnotfound);
+ break;
+ case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
+ break;
+ default:
+ printf("%s : No response to TRANS_AGREE OR DISAGREE protocol - transID = %u, control = %d\a\n",__func__,fixed->transid);
+ //TODO Use fixed.trans_id TID since Client may have died
+ break;
}
-#ifdef DEBUG
- printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
-#endif
-
- return 0;
-}
-
-#ifdef RECOVERY
+
+ return;
+}
+
+
+
char checkDecision(unsigned int transID)
{
#ifdef DEBUG
else
return response;
}
-#endif
/* This function increments counters while running a voting decision on all objects involved
* in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
unsigned int oid;
unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
objheader_t *headptr;
+#ifdef DEBUG
+ printf("%s -> Enter\n",__func__);
+#endif
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
offset += size;
}
#endif
- /*
- if (objlocked > 0) {
- int useWriteUnlock = 0;
- for(j = 0; j < objlocked; j++) {
- if(oidlocked[j] == -1) {
- useWriteUnlock = 1;
- continue;
- }
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- if(useWriteUnlock) {
- write_unlock(STATUSPTR(headptr));
- } else {
- read_unlock(STATUSPTR(headptr));
- }
- }
- free(oidlocked);
- }
- */
#ifdef DEBUG
printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
printf("control = %d\n",control);
control=TRANS_DISAGREE;
+ printf("%s -> Sent message!\n",__func__);
send_data(acceptfd, &control, sizeof(char));
#ifdef CACHE
send_data(acceptfd, &numBytes, sizeof(int));
printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
return 0;
}
+#ifdef DEBUG
+ printf("%s -> Exit\n",__func__);
+#endif
return control;
}
unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
int val;
char control = 0;
+#ifdef DEBUG
+ printf("%s -> Enter\n",__func__);
+#endif
/* Condition to send TRANS_AGREE */
if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
#endif
/* Send control message */
send_data(acceptfd, &control, sizeof(char));
-
-
- /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
- /*if(*(objnotfound) != 0) {
- int msg[1];
- msg[0] = *(objnotfound);
- send_data(acceptfd, &msg, sizeof(int));
- int size = sizeof(unsigned int)* *(objnotfound);
- send_data(acceptfd, oidnotfound, size);
- }*/
}
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
transinfo->modptr = modptr;
transinfo->numlocked = *(objlocked);
transinfo->numnotfound = *(objnotfound);
+
+#ifdef DEBUG
+ printf("%s -> Exit\n",__func__);
+#endif
return control;
}
}
/* wait until all transaction waits for leader's decision */
-void stopTransactions(int TRANS_FLAG)
+int stopTransactions(int TRANS_FLAG,unsigned int epoch_num)
{
// printf("%s - > Enter flag :%d\n",__func__,TRANS_FLAG);
int size = transList->size;
int i;
+ int flag;
tlist_node_t* walker;
-
- pthread_mutex_lock(&liveHosts_mutex);
- okCommit = TRANS_FLAG;
- pthread_mutex_unlock(&liveHosts_mutex);
- /* make sure that all transactions are stopped */
- pthread_mutex_lock(&clearNotifyList_mutex);
+ if(TRANS_FLAG == TRANS_BEFORE) {
+ okCommit = TRANS_BEFORE;
+ /* make sure that all transactions are stopped */
+ do {
+ transList->flag = 0;
+ walker = transList->head;
- do {
- transList->flag = 0;
- walker = transList->head;
+ while(walker)
+ {
+ // locking
+ while(walker->status == TRANS_INPROGRESS) {
+ printf("%s ->transid : %u - decision %d Status : %d Waitflag = %d\n",__func__,walker->transid,walker->decision,walker->status,TRANS_FLAG);
+ if(inspectEpoch(epoch_num,"stopTrans_Before") < 0)
+ return -1;
+ sleep(3);
+ }
+ walker = walker->next;
+ }
- while(walker)
- {
- // locking
- while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) {
-// printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
- randomdelay();
+ pthread_mutex_lock(&translist_mutex);
+ flag = transList->flag;
+ pthread_mutex_unlock(&translist_mutex);
+ }while(flag == 1);
+ }
+ else if(TRANS_FLAG == TRANS_AFTER)
+ {
+ printf("%s -> TRANS_AFTER\n",__func__);
+ okCommit = TRANS_AFTER;
+ do {
+ pthread_mutex_lock(&translist_mutex);
+ size = transList->size;
+ printf("%s -> size = %d\n",__func__,size);
+ printf("%s -> okCommit = %d\n",__func__,okCommit);
+ walker = transList->head;
+ while(walker){
+ printf("%s ->transid : %u - decision %d Status : %d epoch = %u current epoch : %u\n",__func__,walker->transid,walker->decision,walker->status,walker->epoch_num,currentEpoch);
+ walker = walker->next;
}
+ pthread_mutex_unlock(&translist_mutex);
- walker = walker->next;
- }
- }while(transList->flag == 1);
+ if(inspectEpoch(epoch_num,"stopTrans_Before") < 0)
+ return -1;
- pthread_mutex_unlock(&clearNotifyList_mutex);
+ sleep(3);
+ }while(size != 0);
+ }
+
+ return 0;
}
void sendMyList(int acceptfd)
void sendTransList(int acceptfd)
{
+ printf("%s -> Enter\n",__func__);
int size;
char response;
int transid;
+ int i;
+ tlist_node_t* walker = transList->head;
// send on-going transaction
+ pthread_mutex_lock(&translist_mutex);
tlist_node_t* transArray = tlistToArray(transList,&size);
+ pthread_mutex_unlock(&translist_mutex);
-/* if(transList->size != 0)
+ if(transList->size != 0)
tlistPrint(transList);
printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size);
-*/
+
+ for(i = 0; i< size; i++) {
+ printf("ID : %u Decision : %d status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status);
+ }
+ printf("%s -> End transArray\n",__func__);
+
send_data((int)acceptfd,&size,sizeof(int));
send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size);
int receiveNewList(int acceptfd)
{
+ printf("%s -> Enter\n",__func__);
int size;
tlist_node_t* tArray;
tlist_node_t* walker;
response = -1;
}
+ printf("%s -> Exit\n",__func__);
return response;
}
if(walker->transid == tArray[i].transid)
{
walker->decision = tArray[i].decision;
-// walker->status = tArray[i].status;
+ walker->epoch_num = tArray[i].epoch_num;
break;
}
}
return flag;
}
-char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int TRANS_FLAG)
-{
- tlist_node_t* tNode;
-
- tNode = tlistSearch(transList,transid);
-
- if(finalResponse <= 0) {
- tNode->decision = DECISION_LOST;
- }
- else {
- tNode->decision = finalResponse;
- }
-
-// printf("%s -> decision = %d okCommit = %d\n",__func__,tNode->decision,okCommit);
-
- if((tNode->decision == DECISION_LOST) || (okCommit != TRANS_OK))
- {
- pthread_mutex_lock(&liveHosts_mutex);
- tNode->status = TRANS_FLAG;
- pthread_mutex_unlock(&liveHosts_mutex);
-
- // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
- while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) {
-// printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
- randomdelay();
- }
-
- finalResponse = tNode->decision;
- }
-
- return finalResponse;
-}
-
#endif
char ip[16]; // for debugging purpose
extern tlist_t* transList;
+extern pthread_mutex_t translist_mutex;
extern pthread_mutex_t clearNotifyList_mutex;
unsigned int currentEpoch;
numbytes = recv(fd, buffer, size, 0);
bytesRecv += numbytes;
+
if (numbytes>0) {
buffer += numbytes;
size -= numbytes;
return -2;
}
} else {
+// printf("%s -> Here?\n",__func__);
+// printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
//Case: numbytes==0
//machine has failed -- this case probably doesn't occur in reality
//
int deadsd = -1;
int deadmid = -1;
unsigned int transID = getNewTransID();
+ unsigned int epoch_num;
+ tlist_node_t* tNode;
#endif
#ifdef DEBUG
removetransactionhash();
objstrDelete(t_cache);
t_chashDelete();
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
#endif
return 1;
// sleep(1);
randomdelay();
}
-
- transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK);
#endif
-
+
int treplyretryCount = 0;
/* Initialize timeout for exponential delay */
exponential_backoff.tv_sec = 0;
do {
treplyretry = 0;
+ pthread_mutex_lock(&recovery_mutex);
+ epoch_num = currentEpoch;
+ pthread_mutex_unlock(&recovery_mutex);
+
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
if (firsttime) {
/* Create a socket and getReplyCtrl array, initialize */
int socklist[pilecount];
+ unsigned int midlist[pilecount];
char getReplyCtrl[pilecount];
int loopcount;
for(loopcount = 0 ; loopcount < pilecount; loopcount++) {
trans_req_data_t *tosend;
tosend = calloc(pilecount, sizeof(trans_req_data_t));
+// printf("%s -> transID : %u Start!\n",__func__,transID);
+
while(pile != NULL) {
#ifdef DEBUG
printf("%s-> New pile:[%s],", __func__, midtoIPString(pile->mid));
tosend[sockindex].f.nummod = pile->nummod;
tosend[sockindex].f.numcreated = pile->numcreated;
tosend[sockindex].f.sum_bytes = pile->sum_bytes;
- tosend[sockindex].f.epoch_num = currentEpoch;
+ tosend[sockindex].f.epoch_num = epoch_num;
tosend[sockindex].listmid = listmid;
tosend[sockindex].objread = pile->objread;
tosend[sockindex].oidmod = pile->oidmod;
tosend[sockindex].oidcreated = pile->oidcreated;
- int sd = 0;
+ midlist[sockindex] = pile->mid; // debugging purpose
+
+ int sd = 0;
if(pile->mid != myIpAddr) {
if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
printf("\ntransRequest(): socket create error\n");
free(listmid);
free(tosend);
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
#endif
return 1;
send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
//forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+
free(modptr);
} else { //handle request locally
handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
/* Recv Ctrl msgs from all machines */
#ifdef DEBUG
- printf("%s-> Finished sending transaction read/mod objects\n",__func__);
+ printf("%s-> Finished sending transaction read/mod objects transID = %u\n",__func__,transID);
#endif
int i;
if(sd != 0) {
char control;
int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
+// printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
timeout = recv_data(sd, &control, sizeof(char));
-// printf("i = %d control = %d\n",i,control);
-
-
+// printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
//Update common data structure with new ctrl msg
getReplyCtrl[i] = control;
/* Recv Objects if participant sends TRANS_DISAGREE */
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
- prehashInsert(oidToPrefetch, header);
+ prehashInsert(oidToPrefetch, header);
length = length - size;
offset += size;
}
} //end of receiving objs
#endif
-
+
+// printf("%s -> Pass this point2\n",__func__);
#ifdef RECOVERY
if(timeout < 0) {
deadmid = listmid[i];
#ifdef RECOVERY
// wait until leader fix the system
- if(okCommit != TRANS_OK) {
- inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_BEFORE);
+ if(finalResponse == RESPOND_HIGHER_EPOCH) {
+ printf("%s -> Received Higher epoch\n",__func__);
finalResponse = TRANS_ABORT;
treplyretry = 0;
}
#endif
+// printf("%s -> transID = %u Passed this point\n",__func__,transID);
#ifdef CACHE
if (finalResponse == TRANS_COMMIT) {
}
#endif
- /* Send responses to all machines */
- for(i = 0; i < pilecount; i++) {
- int sd = socklist[i];
-#ifdef RECOVERY
- if(sd != deadsd) {
-#endif
- if(sd != 0) {
-#ifdef CACHE
- if(finalResponse == TRANS_COMMIT) {
- int retval;
- /* Update prefetch cache */
- if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
- return 1;
- }
-
-#ifdef ABORTREADERS
- removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
-#endif
- }
-#ifdef ABORTREADERS
- else if (!treplyretry) {
- removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
- }
-#endif
-#endif
- send_data(sd,&finalResponse,sizeof(char));
- } else {
- /* Complete local processing */
- finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
-
-#ifdef ABORTREADERS
- if(finalResponse == TRANS_COMMIT) {
- removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
- } else if (!treplyretry) {
- removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
- }
-#endif
- }
-#ifdef RECOVERY
- }
-#endif
+ if(finalResponse == TRANS_COMMIT) {
+ pthread_mutex_lock(&translist_mutex);
+ transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRYING_TO_COMMIT,epoch_num);
+ tNode = tlistSearch(transList,transID);
+ pthread_mutex_unlock(&translist_mutex);
+
+ tNode->decision = finalResponse;
+ tNode->status = TRANS_INPROGRESS;
+ if(okCommit == TRANS_OK && inspectEpoch(epoch_num,"TRANS_COMMIT") > 0)
+ {
+ finalResponse = tNode->decision;
+ thashInsert(transID,tNode->decision);
+ commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,tNode->decision,treplyretry,transinfo);
+ tNode->status = TRANS_AFTER;
+ }
+ else {
+ tNode->status = TRYING_TO_COMMIT;
+ if(inspectEpoch(epoch_num,"TRANS_COMMIT2") > 0) {
+// treplyretry = 1;
+ }
+ finalResponse = TRANS_ABORT;
+ commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,finalResponse,treplyretry,transinfo);
}
-
- for(i = 0; i< pilecount; i++) {
+
+ //=========== after transaction point
+ pthread_mutex_lock(&translist_mutex);
+ transList = tlistRemove(transList,transID);
+ pthread_mutex_unlock(&translist_mutex);
+ }
+ else {
+ commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,finalResponse,treplyretry,transinfo);
+ }
+
+ for(i = 0; i< pilecount; i++) {
if(socklist[i] > 0) {
freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]);
}
}
-
- /* Free resources */
- free(tosend);
- free(listmid);
- if (!treplyretry)
- pDelete(pile_ptr);
- /* wait a random amount of time before retrying to commit transaction*/
- if(treplyretry) {
- //treplyretryCount++;
- //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
- // exponentialdelay();
- //else
+ /* Free resources */
+ free(tosend);
+ free(listmid);
+ if (!treplyretry)
+ pDelete(pile_ptr);
+ /* wait a random amount of time before retrying to commit transaction*/
+ if(treplyretry) {
randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
} while (treplyretry && deadmid != -1);
#ifdef RECOVERY
- //=========== after transaction point
- tlist_node_t* tNode = tlistSearch(transList,transID);
- inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER);
- tNode->status = TRANS_OK;
- finalResponse = tNode->decision;
- pthread_mutex_lock(&clearNotifyList_mutex);
- transList = tlistRemove(transList,transID);
- pthread_mutex_unlock(&clearNotifyList_mutex);
+
+
#endif
if(finalResponse == TRANS_ABORT) {
objstrDelete(t_cache);
t_chashDelete();
#ifdef RECOVERY
- if(deadmid != -1) { /* if deadmid is greater than or equal to 0,
- then there is dead machine. */
+ if(deadmid != -1) { /* if deadmid is greater than or equal to 0, then there is dead machine. */
notifyLeaderDeadMachine(deadmid);
}
#endif
return 0;
}
+void commitMessages(unsigned int epoch_num,int* socklist,unsigned int deadsd,int pilecount,trans_req_data_t* tosend,char finalResponse,char treplyretry,trans_commit_data_t transinfo ) {
+ int i;
+ /* Send responses to all machines */
+ for(i = 0; i < pilecount; i++) {
+ int sd = socklist[i];
+#ifdef RECOVERY
+ if(sd != deadsd) {
+#endif
+ if(sd != 0) {
+#ifdef CACHE
+ if(finalResponse == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+// free(tosend);
+ // free(listmid);
+ exit(0);
+// return 1;
+ }
+#ifdef ABORTREADERS
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
+#endif
+ }
+#ifdef ABORTREADERS
+ else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ }
+#endif
+#endif
+ send_data(sd,&finalResponse,sizeof(char));
+ send_data(sd,&epoch_num,sizeof(unsigned int));
+ } else {
+ /* Complete local processing */
+ finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+#ifdef ABORTREADERS
+ if(finalResponse == TRANS_COMMIT) {
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ } else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ }
+#endif
+ }
+#ifdef RECOVERY
+ }
+#endif
+ }
+}
+
/* This function handles the local objects involved in a transaction
* commiting process. It also makes a decision if this local machine
* sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
}
char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
-
-#ifdef RECOVERY
- finalResponse = inspectTransaction(finalResponse,tdata->f.transid,"Local Commit",TRANS_BEFORE);
- thashInsert(tdata->f.transid,finalResponse);
-#endif
-
if(finalResponse == TRANS_ABORT) {
if(transAbortProcess(transinfo) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
return;
}
} else {
- printf("ERROR...No Decision\n");
+ printf("%s -> ERROR...No Decision transID = %u finalResponse = %d\a\n",__func__,tdata->f.transid,finalResponse);
}
char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
+
+ int higher_epoch_num=0;
for (i = 0 ; i < pilecount; i++) {
char control;
control = getReplyCtrl[i];
printf("%s-> Participant sent TRANS_SOFT_ABORT, i:%d, Control: %d\n", __func__, i, (int)control);
#endif
break;
+ case RESPOND_HIGHER_EPOCH:
+ higher_epoch_num++;
+#ifdef DEBUG
+ printf("%s-> Participant sent TRANS_DISAGREE, i:%d, Control: %d\n", __func__, i, (int)control);
+#endif
+ break;
}
}
+ if(higher_epoch_num > 0)
+ return RESPOND_HIGHER_EPOCH;
+
+
if(transdisagree > 0) {
/* Send Abort */
*treplyretry = 0;
unsigned int epoch_num;
if(!liveHosts[findHost(deadHost)]) { // if it is already fixed
- printf("%s -> already fixed\n",__func__);
+// printf("%s -> already fixed\n",__func__);
sleep(WAIT_TIME);
return;
}
// increase epoch number by number machines in the system
pthread_mutex_lock(&recovery_mutex);
epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray);
+ okCommit = TRANS_BEFORE;
pthread_mutex_unlock(&recovery_mutex);
// notify all machines that this machien will act as leader.
/* Leader's role */
void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
{
- printf("%s -> Entering\n",__func__);
int* sdlist;
tlist_t* tList;
int flag = 0;
#ifdef RECOVERYSTATS
- printf("Recovery Start\n");
+// printf("Recovery Start\n");
long long st;
long long fi;
unsigned int dupeSize = 0; // to calculate the size of backed up data
do {
sdlist = getSocketLists();
- printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+ printf("%s -> I'm currently leader num : %d ping machines\n\n",__func__,epoch_num);
if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
- printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+ pthread_mutex_lock(&translist_mutex);
+ tlistPrint(tList);
+ pthread_mutex_unlock(&translist_mutex);
+ getchar();
+ printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break;
-
+ getchar();
+ printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num);
// transfer lost objects
if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break;
printRecoveryStat();
#endif
}
- printf("%s -> Exiting\n",__func__);
}
int* getSocketLists()
char response;
tlist_t* currentTransactionList;
- if(inspectEpoch(epoch_num) < 0) {
+ if(inspectEpoch(epoch_num,__func__) < 0) {
printf("%s -> Higher Epoch\n",__func__);
return -1;
}
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
+ printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
request = REQUEST_TRANS_WAIT;
send_data(sdlist[i],&request, sizeof(char));
send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
}
+ printf("%s -> Stop transaction\n",__func__);
/* stop all local transactions */
- stopTransactions(TRANS_BEFORE);
+ if(stopTransactions(TRANS_BEFORE,epoch_num) < 0)
+ return -1;
+ printf("After Stop transaction\n");
// grab leader's transaction list first
tlist_node_t* walker = transList->head;
-
+
while(walker) {
- walker->status = TRANS_OK;
- currentTransactionList = tlistInsertNode2(currentTransactionList,walker);
+ pthread_mutex_lock(&translist_mutex);
+ currentTransactionList = tlistInsertNode2(currentTransactionList,walker,epoch_num);
+ pthread_mutex_unlock(&translist_mutex);
walker = walker->next;
}
+// printf("%s -> Local Transactions\n",__func__);
+// tlistPrint(currentTransactionList);
+
for(i = 0; i < numHostsInSystem; i++)
{
if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
continue;
+ printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
{
+ printf("Here\n");
+ pthread_mutex_lock(&translist_mutex);
tlistDestroy(currentTransactionList);
+ pthread_mutex_unlock(&translist_mutex);
return -2;
}
+ printf("recevied response = %d\n",response);
if(response == RESPOND_TRANS_WAIT)
{
+ printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
+ int timeout1 = computeLiveHosts(sdlist[i]);
+ printf("%s -> received host list\n",__func__);
+ int timeout2 = makeTransactionLists(¤tTransactionList,sdlist[i],epoch_num);
+ printf("%s -> received transaction list\n",__func__);
// receive live host list // receive transaction list
- if(computeLiveHosts(sdlist[i]) < 0 || makeTransactionLists(¤tTransactionList,sdlist[i]) < 0) {
+ if(timeout1 < 0 || timeout2 < 0) {
+ pthread_mutex_lock(&translist_mutex);
tlistDestroy(currentTransactionList);
+ pthread_mutex_unlock(&translist_mutex);
return -2;
}
+ printf("\n\n\nAfter mid : %s \n",midtoIPString(hostIpAddrs[i]));
+ tlistPrint(currentTransactionList);
}
else if(response == RESPOND_HIGHER_EPOCH)
{
+ printf("%s -> RESPOND_HIGHER_EPOCH\n",__func__);
+ pthread_mutex_lock(&translist_mutex);
tlistDestroy(currentTransactionList);
+ pthread_mutex_unlock(&translist_mutex);
return -1;
}
else {
}
walker = walker->next;
}
-
- tlistPrint(currentTransactionList);
*tList = currentTransactionList;
printf("%s -> Exit\n",__func__);
int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
{
+ printf("%s -> Enter\n",__func__);
int i;
char response = RELEASE_NEW_LIST;
int size;
tlist_node_t* tArray;
- if(inspectEpoch(epoch_num) < 0) return -1;
+ if(inspectEpoch(epoch_num,__func__) < 0) return -1;
tArray = tlistToArray(tlist,&size);
printf("%s -> problem\n",__func__);
exit(0);
}
- stopTransactions(TRANS_AFTER);
+ if(stopTransactions(TRANS_AFTER,epoch_num) < 0)
+ return -1;
}
}
// after this fuction
// leader knows all the on-going transaction list and their decisions
-int makeTransactionLists(tlist_t** tlist,int sd)
+int makeTransactionLists(tlist_t** tlist,int sd,unsigned int epoch_num)
{
tlist_node_t* transArray;
tlist_node_t* tmp;
tlist_node_t* walker;
int j;
+ int i;
int size;
// receive all on-going transaction list
return -2;
}
+ printf("%s -> Received TransArray\n",__func__);
+ for(i = 0; i< size; i++) {
+ printf("ID : %u Decision : %d status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status);
+ }
+ printf("%s -> End transArray\n",__func__);
+
// add into currentTransactionList
for(j = 0 ; j < size; j ++) {
tmp = tlistSearch(*tlist,transArray[j].transid);
if(tmp == NULL) {
tlist_node_t* tNode = &transArray[j];
tNode->status = TRANS_OK;
- *tlist = tlistInsertNode2(*tlist,&(transArray[j]));
+
+ printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision);
+ *tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num);
}
else {
if(tmp->decision == DECISION_LOST && transArray[j].decision != DECISION_LOST)
}
}
-int inspectEpoch(unsigned int epoch_num)
+int inspectEpoch(unsigned int epoch_num,const char* f)
{
int flag = 1;
+
+// printf("%s -> current epoch %u epoch num = %u\n",__func__,currentEpoch,epoch_num);
pthread_mutex_lock(&recovery_mutex);
if(epoch_num < currentEpoch) {
flag = -1;
- }
+ }/*
+ else if(epoch_num > currentEpoch) {
+// printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num);
+// currentEpoch = epoch_num;
+ }*/
pthread_mutex_unlock(&recovery_mutex);
return flag;
return count;
}
-// if flag = TRANS_OK, allow transactions
-// flag = TRANS_WAIT, stop transactins
int updateLiveHostsCommit() {
#ifdef DEBUG
printf("%s -> Enter\n",__func__);
* Backup 26 21,24
*/
- if(inspectEpoch(epoch_num) < 0) return -1;
+ if(inspectEpoch(epoch_num,__func__) < 0) return -1;
response = REQUEST_DUPLICATE;
transList->head = NULL;
transList->size = 0;
- pthread_mutex_init(&(transList->mutex),NULL);
-
return transList;
}
}
// tlistInsertNode extension
-tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode)
+tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode,unsigned int epoch_num)
{
- transList = tlistInsertNode(transList,tNode->transid,tNode->decision,tNode->status);
+ transList = tlistInsertNode(transList,tNode->transid,tNode->decision,tNode->status,epoch_num);
return transList;
}
// return 0 if success, return -1 if fail
-tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status) {
+tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status,unsigned int epoch_num) {
// printf("%s -> ADD transID : %u decision %d status %d\n",__func__,transid,decision,status);
- pthread_mutex_lock(&(transList->mutex));
tlist_node_t* head = transList->head;
if(head == NULL) {
head->transid = transid;
head->decision = decision;
head->status = status;
+ head->epoch_num = epoch_num;
head->next = NULL;
- //pthread_mutex_lock(&(transList->mutex));
transList->head = head;
(transList->size)++;
transList->flag = 1;
- pthread_mutex_unlock(&(transList->mutex));
return transList;
}
else {
tmp->transid = transid;
tmp->decision = decision;
tmp->status = status;
+ tmp->epoch_num = epoch_num;
tmp->next = transList->head;
transList->head = tmp;
(transList->size)++;
transList->flag = 1;
- pthread_mutex_unlock(&(transList->mutex));
return transList;
}
}
// return tlist_t if success, return null if cannot find
tlist_node_t* tlistSearch(tlist_t* transList,unsigned int transid)
{
- pthread_mutex_lock(&(transList->mutex));
tlist_node_t* ptr = transList->head;
while(ptr != NULL)
ptr = ptr->next;
}
- pthread_mutex_unlock(&(transList->mutex));
return ptr;
}
tlist_t* tlistRemove(tlist_t* transList,unsigned int transid)
{
// printf("%s -> REMOVE transID : %u \n",__func__,transid);
- pthread_mutex_lock(&(transList->mutex));
int flag = -1;
tlist_node_t* tmp;
tlist_node_t* ptr = transList->head;
tlist_node_t* prev = NULL;
+ if(transList->head == NULL)
+ return transList;
+
/* if it is head */
if(transList->head->transid == transid)
{
(transList->size)--;
transList->flag = 1;
- pthread_mutex_unlock(&(transList->mutex));
return transList;
}
(transList->size)--;
flag = 0;
transList->flag = 1;
- pthread_mutex_unlock(&(transList->mutex));
return transList;
}
prev = ptr;
ptr = ptr->next;
}
- pthread_mutex_unlock(&(transList->mutex));
- printf("%s -> remove Fail!\n",__func__);
-
return transList;
}
while(walker)
{
- array[i++] = *walker;
+ array[i].transid = walker->transid;
+ array[i].decision = walker->decision;
+ array[i].status = walker->status;
+ array[i].epoch_num = walker->epoch_num;
+
+ i++;
walker = walker->next;
}
/* for machine flag */
#define TRANS_OK 3
#define TRANS_BEFORE 4
-#define TRANS_AFTER 5
+#define TRANS_INPROGRESS 5
+#define TRANS_AFTER 6
/*
Status
unsigned int transid;
char decision;
char status;
+ unsigned int epoch_num;
struct trans_list_node *next;
} tlist_node_t;
tlist_node_t *head;
int size;
int flag;
- pthread_mutex_t mutex;
} tlist_t;
// allocate tlist_t, return -1 if memory overflow
tlist_t* tlistDestroy(tlist_t*);
// return 0 if success, return -1 if fail
-tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status);
-tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode) ;
+tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status,unsigned int epoch_num);
+tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode,unsigned int epoch_num) ;
// remove node.
// return 0 if success, return -1 if fail