#include <unistd.h>
#include <signal.h>
#include "tlookup.h"
+#include "translist.h"
#endif
#define BACKLOG 10 //max pending connections
extern int numLiveHostsInSystem;
int clearNotifyListFlag;
pthread_mutex_t clearNotifyList_mutex;
+
+tlist_t* transList;
+int okCommit; // machine flag
+extern numWaitMachine;
+
#endif
objstr_t *mainobjstore;
#ifdef RECOVERY
if (thashCreate(THASH_SIZE, LOADFACTOR))
return 1;
+ if ((transList = tlistCreate())== NULL) {
+ printf("well error\n");
+ return 1;
+ }
#endif
if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
return 0;
}
+ okCommit = TRANS_OK;
+
return 0;
}
int validHost;
int *socklist;
int sd;
-#ifdef DEBUG
- printf("%s -> Entering\n",__func__);
-#endif
socklist = (int*) calloc(numHostsInSystem,sizeof(int));
#ifdef DEBUG
printf("%s -> Dead Machine : %s\n",__func__, midtoIPString(hostIpAddrs[deadMachineIndex]));
#endif
- restoreDuplicationState(hostIpAddrs[deadMachineIndex]);
+ notifyLeaderDeadMachine(hostIpAddrs[deadMachineIndex]);
freeSockWithLock(transPResponseSocketPool, hostIpAddrs[deadMachineIndex], socklist[deadMachineIndex]);
socklist[deadMachineIndex] = -1;
} // end of if 2
} // end of while 1
-#ifdef DEBUG
- printf("%s -> Exiting\n",__func__);
-#endif
}
int i;
char control = RESPOND_LIVE;
char response;
-#ifdef DEBUG
- printf("%s -> Entering\n",__func__);
-#endif
while(1){
for(i = 0; i< numHostsInSystem;i++) {
-#ifdef DEBUG
- printf("%s -> socklist[%d] = %d\n",__func__,i,socklist[i]);
-#endif
if(socklist[i] > 0) {
send_data(socklist[i], &control,sizeof(char));
if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
// if machine is dead, returns index of socket
-#ifdef DEBUG
- printf("%s -> Machine dead detecteed\n",__func__);
-#endif
return i;
}
else {
// machine responded
if(response != LIVE) {
-#ifdef DEBUG
- printf("%s -> Machine dead detected\n",__func__);
-#endif
return i;
}
} // end else
#ifdef RECOVERY
case RESPOND_LIVE:
-#ifdef DEBUG
- printf("control -> RESPOND_LIVE\n");
-#endif
ctrl = LIVE;
send_data((int)acceptfd, &ctrl, sizeof(ctrl));
-#ifdef DEBUG
- printf("%s (RESPOND_LIVE)-> Sending LIVE!\n", __func__);
-#endif
break;
#endif
#ifdef RECOVERY
if(!leaderFixing) {
leaderFixing = 1;
pthread_mutex_unlock(&leaderFixing_mutex);
- // begin fixing
- updateLiveHosts();
- duplicateLostObjects(mid);
- if(updateLiveHostsCommit() != 0) {
- printf("error updateLiveHostsCommit()\n");
- exit(1);
- }
-
- // finish fixing
- pthread_mutex_lock(&leaderFixing_mutex);
- leaderFixing = 0;
- pthread_mutex_unlock(&leaderFixing_mutex);
+
+ restoreDuplicationState(mid);
+ // finish fixing
+ pthread_mutex_lock(&leaderFixing_mutex);
+ leaderFixing = 0;
+ pthread_mutex_unlock(&leaderFixing_mutex);
}
else {
pthread_mutex_unlock(&leaderFixing_mutex);
break;
#endif
#ifdef RECOVERY
+ case REQUEST_TRANS_WAIT:
+ receiveNewHostLists((int)acceptfd);
+ stopTransactions();
+
+ response = RESPOND_TRANS_WAIT;
+ send_data((int)acceptfd,&response,sizeof(char));
+// respondToLeader();
+ break;
+
+ case RESPOND_TRANS_WAIT:
+ printf("control -> RESPOND_TRANS_WAIT\n");
+ pthread_mutex_lock(&liveHosts_mutex);
+ numWaitMachine++;
+ pthread_mutex_unlock(&liveHosts_mutex);
+ printf("numWaitMachine = %d\n",numWaitMachine);
+ break;
+
+ case REQUEST_TRANS_LIST:
+ printf("control -> REQUEST_TRANS_LIST\n");
+ sendTransList((int)acceptfd);
+ receiveTransList((int)acceptfd);
+ break;
+
+ case REQUEST_TRANS_RESTART:
+ pthread_mutex_lock(&liveHosts_mutex);
+ okCommit = TRANS_OK;
+ pthread_mutex_unlock(&liveHosts_mutex);
+ break;
case UPDATE_LIVE_HOSTS:
#ifdef DEBUG
printf("control -> UPDATE_LIVE_HOSTS\n");
#endif
- // copy back
- pthread_mutex_lock(&liveHosts_mutex);
- recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
- recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
- pthread_mutex_unlock(&liveHosts_mutex);
- numLiveHostsInSystem = getNumLiveHostsInSystem();
+ receiveNewHostLists((int)acceptfd);
+
#ifdef DEBUG
printHostsStatus();
printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);
#endif
- //exit(0);
break;
#endif
#ifdef RECOVERY
case DUPLICATE_ORIGINAL:
-
{
struct sockaddr_in remoteAddr;
int sd;
fixed.control = TRANS_REQUEST;
timeout = recv_data((int)acceptfd, ptr+1, size);
+#ifdef RECOVERY
+ transList = tlistInsertNode(transList,fixed.transid,TRYING_TO_COMMIT,TRANS_OK);
+#endif
+
/* Read list of mids */
int mcount = fixed.mcount;
size = mcount * sizeof(unsigned int);
objheader_t *tmp_header;
void *header;
int i = 0, val;
- unsigned int transID;
-#ifdef DEBUG
- printf("%s-> Entering\n", __func__);
-#endif
-
- /* receives transaction id */
- recv_data((int)acceptfd, &transID, sizeof(unsigned int));
/* Send reply to the Coordinator */
if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
}
int timeout = recv_data((int)acceptfd, &control, sizeof(char));
- /* Process the new control message */
-#ifdef DEBUG
- printf("%s -> timeout = %d control = %d\n",__func__,timeout,control);
-#endif
-
+
#ifdef RECOVERY
+ tlist_node_t* tNode;
+ tNode = tlistSearch(transList,fixed->transid);
+
if(timeout < 0) { // timeout. failed to receiving data from coordinator
-#ifdef DEBUG
- printf("%s -> timeout!! assumes coordinator is dead\n",__func__);
-#endif
- control = receiveDecisionFromBackup(transID,fixed->mcount,listmid);
-#ifdef DEBUG
- printf("%s -> received Decision %d\n",__func__,control);
-#endif
- }
- /* insert received control into thash for another transaction*/
- thashInsert(transID, control);
+ tNode->decision = DECISION_LOST;
+ printf("%s -> DECISON_LOST! control = %d\n",__func__,control);
+ }
+ else
+ tNode->decision = control;
+
+ // check if it is allowed to commit
+ if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK)))
+ {
+ pthread_mutex_lock(&liveHosts_mutex);
+ tNode->status = TRANS_WAIT;
+ pthread_mutex_unlock(&liveHosts_mutex);
+
+ while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) {
+ printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision);
+ sleep(1);
+ }
+ }
+
+ control = tNode->decision;
+
+ thashInsert(fixed->transid, control);
#endif
switch(control) {
break;
case TRANS_COMMIT:
+ /* insert received control into thash for another transaction*/
/* Invoke the transCommit process() */
if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
break;
}
+#ifdef RECOVERY
+// printf("%s -> transID : %u has been committed\n",__func__,transID);
+ tNode->status = TRANS_OK;
+
+ pthread_mutex_lock(&clearNotifyList_mutex);
+ transList = tlistRemove(transList,fixed->transid);
+ pthread_mutex_unlock(&clearNotifyList_mutex);
+
+#endif
+
/* Free memory */
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
printf("%s -> finished\n",__func__);
#endif
}
+
+void receiveNewHostLists(int acceptfd)
+{
+ // copy back
+ pthread_mutex_lock(&liveHosts_mutex);
+ recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
+ recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
+ pthread_mutex_unlock(&liveHosts_mutex);
+
+ numLiveHostsInSystem = getNumLiveHostsInSystem();
+}
+
+/* wait until all transaction waits for leader's decision */
+void stopTransactions()
+{
+ printf("%s - > Enter\n",__func__);
+ int size = transList->size;
+ int i;
+ tlist_node_t* walker;
+
+ pthread_mutex_lock(&liveHosts_mutex);
+ okCommit = TRANS_WAIT;
+ pthread_mutex_unlock(&liveHosts_mutex);
+ /* make sure that all transactions are stopped */
+
+ pthread_mutex_lock(&clearNotifyList_mutex);
+
+ do {
+ transList->flag = 0;
+ walker = transList->head;
+
+ while(walker)
+ {
+ // locking
+ while(!(walker->status == TRANS_WAIT || walker->status == TRANS_OK)) {
+ printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
+ sleep(2);
+ }
+
+ walker = walker->next;
+ }
+ }while(transList->flag == 1);
+
+ pthread_mutex_unlock(&clearNotifyList_mutex);
+ printf("%s - > Exit\n",__func__);
+}
+
+void sendTransList(int acceptfd)
+{
+ printf("%s -> Enter\n",__func__);
+
+ int size;
+ char response;
+ int transid;
+
+ // send on-going transaction
+ tlist_node_t* transArray = tlistToArray(transList,&size);
+
+ if(transList->size != 0)
+ tlistPrint(transList);
+
+ printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size);
+
+ send_data((int)acceptfd,&size,sizeof(int));
+ send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size);
+
+ // check if it already commit the decision for a transaction
+ recv_data((int)acceptfd,&response, sizeof(char));
+
+ while(response == REQUEST_TRANS_CHECK)
+ {
+ int transid;
+ recv_data((int)acceptfd,&transid, sizeof(unsigned int));
+
+ response = checkDecision(transid);
+ send_data((int)acceptfd,&response, sizeof(char));
+
+ recv_data((int)acceptfd,&response,sizeof(char));
+ }
+
+ free(transArray);
+ printf("%s - > Exit\n",__func__);
+}
+
+void receiveTransList(int acceptfd)
+{
+ printf("%s -> Enter\n",__func__);
+ int size;
+ tlist_node_t* tArray;
+ tlist_node_t* walker;
+ int i;
+ int flag = 1;
+ char response;
+
+ recv_data((int)acceptfd,&size,sizeof(int));
+
+ printf("%s -> size : %d\n",__func__,size);
+
+ if(size > 0) {
+ if((tArray = calloc(size,sizeof(tlist_node_t) * size)) == NULL)
+ {
+ printf("%s -> calloc error\n",__func__);
+ exit(0);
+ }
+
+ recv_data((int)acceptfd,tArray,sizeof(tlist_node_t) * size);
+
+ flag = combineTransactionList(tArray,size);
+
+ free(tArray);
+ }
+
+
+ if(flag == 1)
+ {
+ response = TRANS_OK;
+ }
+ else
+ {
+ response = -1;
+ }
+
+ printf("%s -> response : %d\n",__func__,response);
+
+ send_data((int)acceptfd,&response,sizeof(char));
+
+ printf("%s -> End\n",__func__);
+}
+
+
+int combineTransactionList(tlist_node_t* tArray,int size)
+{
+ int flag = 1;
+ tlist_node_t* walker;
+ int i;
+
+ walker = transList->head;
+
+ while(walker){
+ for(i = 0; i < size; i++)
+ {
+ if(walker->transid == tArray[i].transid)
+ {
+ walker->decision = tArray[i].decision;
+ break;
+ }
+ }
+ walker = walker->next;
+ }
+
+ return flag;
+}
+
#endif
#include <signal.h>
#include <sys/select.h>
#include "tlookup.h"
+#include "translist.h"
#define CPU_FREQ 3056842
#endif
int numLiveHostsInSystem;
unsigned int *locateObjHosts;
+unsigned int numWaitMachine;
+extern int okCommit;
/* variables to clear dead threads */
int waitThreadMid;
char ip[16]; // for debugging purpose
+extern tlist_t* transList;
+extern pthread_mutex_t clearNotifyList_mutex;
+
/******************************
* Global variables for Paxos
******************************/
else if (numbytes<0){
//Receive returned an error.
//Analyze underlying cause
-#ifdef DEBUG
- printf("%s-> fd : %d errno = %d %s\n", __func__, fd, errno, strerror(errno));
-#endif
if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
//machine has failed
//if we see EAGAIN w/o failures, we should record the time
//when we start read and finish read and see if it is longer
//than our threshold
-#ifdef DEBUG
- printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
-#endif
if(errno == EAGAIN) {
if(trycounter < 5) {
-#ifdef DEBUG
- printf("%s -> TRYcounter increases\n",__func__);
-#endif
trycounter++;
continue;
}
}
#endif
} //close while loop
-#ifdef DEBUG
- printf("%s -> fd = %d Exiting\n",__func__,fd);
-#endif
return 0; // got all the data
}
#endif
objcopy = getRemoteObj(machinenumber, oid);
+
+#ifdef RECOVERY
+ if(transRetryFlag) {
+ notifyLeaderDeadMachine(machinenumber);
+ return transRead2(oid);
+ }
+#endif
+
+ if(objcopy == NULL) {
+ printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ } else {
#ifdef TRANSSTATS
LOGEVENT('R');
nRemoteSend++;
#endif
}
-#ifdef RECOVERY
- if(transRetryFlag) {
- restoreDuplicationState(machinenumber);
-#ifdef DEBUG
- printf("%s -> Recall transRead2\n",__func__);
-#endif
- return transRead2(oid);
- }
-#endif
-
- if(objcopy == NULL) {
- printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
- return NULL;
- } else {
+ if(objcopy == NULL) {
+ printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ } else {
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
#endif
- }
- }
+ }
+ }
+ }
#ifdef DEBUG
printf("%s -> Finished!!\n",__func__);
#endif
printf(" myIp:[%s]\n", midtoIPString(myIpAddr));
#endif
tosend[sockindex].f.control = TRANS_REQUEST;
+ tosend[sockindex].f.transid = transID;
tosend[sockindex].f.mcount = pilecount;
tosend[sockindex].f.numread = pile->numread;
tosend[sockindex].f.nummod = pile->nummod;
send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
//send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
-#ifdef RECOVERY
- /* send transaction id, number of machine involved, machine ids */
- send_data(sd, &transID, sizeof(unsigned int));
- //forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int));
-#endif
free(modptr);
} else { //handle request locally
handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
#ifdef DEBUG
printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
#endif
- restoreDuplicationState(deadmid);
+ notifyLeaderDeadMachine(deadmid);
}
#endif
return TRANS_ABORT;
unsigned int oid;
unsigned short version;
+#ifdef RECOVERY
+ transList = tlistInsertNode(transList,tdata->f.transid,TRYING_TO_COMMIT,TRANS_OK);
+#endif
+
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
printf("ERROR...No Decision\n");
}
+#ifdef RECOVERY
+ pthread_mutex_lock(&clearNotifyList_mutex);
+ transList = tlistRemove(transList,tdata->f.transid);
+ pthread_mutex_unlock(&clearNotifyList_mutex);
+#endif
+
/* Free memory */
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
#endif
#ifdef RECOVERY
-void restoreDuplicationState(unsigned int deadHost) {
+void notifyLeaderDeadMachine(unsigned int deadHost) {
int sd;
char ctrl;
if(!liveHosts[findHost(deadHost)]) { // if it is already fixed
+ printf("%s -> already fixed\n",__func__);
sleep(WAIT_TIME);
return;
}
printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER");
#endif
- if(leader == myIpAddr) {
+ if(leader == myIpAddr) { // if i am the leader
pthread_mutex_lock(&leaderFixing_mutex);
if(!leaderFixing) {
leaderFixing = 1;
pthread_mutex_unlock(&leaderFixing_mutex);
if(!liveHosts[findHost(deadHost)]) { // if it is already fixed
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s -> already fixed\n",__func__);
#endif
pthread_mutex_lock(&leaderFixing_mutex);
leaderFixing =0;
pthread_mutex_unlock(&leaderFixing_mutex);
}
- else { // if i am the leader
- updateLiveHosts();
-
- if(numLiveHostsInSystem == 1)
- setReLocateObjHosts(deadHost);
- else
- duplicateLostObjects(deadHost);
-
- if(updateLiveHostsCommit() != 0) {
- printf("%s -> error updateLiveHostsCommit()\n",__func__);
- exit(1);
- }
- pthread_mutex_lock(&leaderFixing_mutex);
+ else {
+ restoreDuplicationState(deadHost);
+
+ pthread_mutex_lock(&leaderFixing_mutex);
leaderFixing = 0;
pthread_mutex_unlock(&leaderFixing_mutex);
}
}
else {
pthread_mutex_unlock(&leaderFixing_mutex);
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s -> LEADER is already fixing\n",__func__);
#endif
sleep(WAIT_TIME);
printf("%s -> Message sent\n",__func__);
sleep(WAIT_TIME);
}
+}
+
+/* Leader's role */
+void restoreDuplicationState(unsigned int deadHost)
+{
+ printf("%s -> Entering\n",__func__);
+
+ // update leader's live host list and object locations
+ updateLiveHostsList(deadHost);
+ setReLocateObjHosts(deadHost);
+
+ // stop all transactions and update all other's machine list
+ notifyRestoration();
+
+
+ // wait until all machines wait for leader
+ waitForAllMachine();
+
+
+ // clear transaction
+ clearTransaction();
+
+ // transfer lost objects
+ duplicateLostObjects(deadHost);
+ getchar();
+ // restart transactions
+ restartTransactions();
+
+ printf("%s -> Exiting\n",__func__);
+}
+
+/*
+ 1. request all other machines to stop transactions
+ 2. update their live machine list
+ */
+
+void notifyRestoration()
+{
+ int i;
+ int sd;
+ int sdlist[numHostsInSystem];
+
+ printf("%s -> Enter\n",__func__);
+
+ printHostsStatus();
+
+ pthread_mutex_lock(&liveHosts_mutex);
+ numWaitMachine = 0;
+ pthread_mutex_unlock(&liveHosts_mutex);
+ // for other machines
+ for(i = 0; i < numHostsInSystem; i++) {
+ if(liveHosts[i] != 1 || hostIpAddrs[i] == myIpAddr) {
+ sdlist[i] = -1;
+ continue;
+ }
+
+ if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0)
+ {
+ printf("%s -> socket create error\n",__func__);
+ exit(0);
+ }
+ else {
+ sdlist[i] = sd;
+ char request = REQUEST_TRANS_WAIT;
+
+ send_data(sd, &request, sizeof(char));
+
+ /* send new host lists and object location */
+ pthread_mutex_lock(&liveHosts_mutex);
+ send_data(sd, liveHosts, sizeof(int)*numHostsInSystem);
+ send_data(sd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
+ pthread_mutex_unlock(&liveHosts_mutex);
+ }
+ }
+
+
+ for(i = 0 ; i < numHostsInSystem; i++) {
+ if(sdlist[i] != -1)
+ {
+ char response;
+ recv_data(sdlist[i],&response,sizeof(char));
+ if(response == RESPOND_TRANS_WAIT) {
+ pthread_mutex_lock(&liveHosts_mutex);
+ numWaitMachine++;
+ pthread_mutex_unlock(&liveHosts_mutex);
+ }
+
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sdlist[i]);
+ }
+ }
+ /* stop all local transactions */
+ stopTransactions();
+ printf("%s -> End\n",__func__);
+}
+
+/* acknowledge leader that all transactions are waiting */
+void respondToLeader()
+{
+ printf("%s -> Enter\n",__func__);
+ int sd;
+
+ if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) {
+ printf("%s -> cannot open the socket\n",__func__);
+ exit(0);
+ }
+ else {
+ char request = RESPOND_TRANS_WAIT;
+// printf("%s -> request = %s\n sd = %d\n",__func__,(request==RESPOND_TRANS_WAIT)?"RESPOND_TRANS_WAIT":"NONO");
+ send_data(sd,&request,sizeof(char));
+ freeSockWithLock(transPrefetchSockPool,leader,sd);
+ }
+
+ printf("%s -> Exit\n",__func__);
+ return;
+}
+
+/* wait untill receive from all machine */
+void waitForAllMachine()
+{
+
+ pthread_mutex_lock(&liveHosts_mutex);
+ numWaitMachine++; // for local. It is done
+ pthread_mutex_unlock(&liveHosts_mutex);
+
- printf("%s -> Finished!\n",__func__);
+ /* wait untill receive from all machine */
+ while(numWaitMachine < numLiveHostsInSystem) {
+ sleep(1);
+ }
}
+
+void clearTransaction()
+{
+ int size;
+ tlist_t* tlist;
+ int sd;
+ struct sockaddr_in remoteAddr[numHostsInSystem];
+ int sdlist[numHostsInSystem];
+ int i;
+
+ // open sockets to all live machines
+ for(i = 0 ; i < numHostsInSystem; i++) {
+ if(liveHosts[i] == 1 && hostIpAddrs[i] != myIpAddr) {
+ if((sd = socket(AF_INET , SOCK_STREAM, 0 )) < 0)
+ {
+ printf("%s -> socket create Error\n",__func__);
+ }
+ else {
+ bzero(&remoteAddr[i], sizeof(remoteAddr[i]));
+ remoteAddr[i].sin_family = AF_INET;
+ remoteAddr[i].sin_port = htons(LISTEN_PORT);
+ remoteAddr[i].sin_addr.s_addr = htonl(hostIpAddrs[i]);
+// printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i]));
+
+ if(connect(sd, (struct sockaddr *)&remoteAddr[i], sizeof(remoteAddr[i])) < 0) {
+ printf("%s -> socket connect error\n",__func__);
+ exit(0);
+ }
+ else {
+ sdlist[i] = sd;
+ }
+ }
+ }
+ else {
+ sdlist[i] = -1;
+ }
+ }
+
+ /* receive transaction lists from all machines and
+ clarefy all decisions
+ returns an array of ongoing transactions */
+ makeTransactionLists(&tlist,sdlist);
+
+ /* release the cleared decisions to all machines */
+ releaseTransactionLists(tlist,sdlist);
+
+ for(i = 0 ; i < numHostsInSystem; i++) {
+ if(sdlist[i] != -1) {
+ close(sdlist[i]);
+ }
+ }
+
+ tlistDestroy(tlist);
+
+ printf("%s -> End\n",__func__);
+}
+
+// after this fuction
+// leader knows all the on-going transaction list and their decisions
+void makeTransactionLists(tlist_t** tlist,int* sdlist)
+{
+ printf("%s -> Enter\n",__func__);
+ int sd;
+ int i;
+ tlist_t* currentTransactionList = tlistCreate();
+
+ printf("%s -> tlist size : %d\n",__func__,transList->size);
+ printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size);
+
+
+ // grab leader's transaction list first
+ tlist_node_t* walker = transList->head;
+
+ while(walker) {
+ tlistInsertNode2(currentTransactionList,walker);
+ walker = walker->next;
+ }
+
+ // receive others transaction list
+ for(i = 0; i < numHostsInSystem;i ++) {
+ if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) {
+ char request = REQUEST_TRANS_LIST;
+ int size;
+ int j;
+ tlist_node_t* transArray;
+ tlist_node_t* tmp;
+
+ sd = sdlist[i];
+
+ // send request
+ send_data(sd, &request, sizeof(char));
+
+ // receive all on-going transaction list
+ recv_data(sd, &size, sizeof(int));
+
+ printf("%s -> %s - size : %d\n",__func__,midtoIPString(hostIpAddrs[i]),size);
+ if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) {
+ printf("%s -> calloc error\n",__func__);
+ exit(0);
+ }
+
+ recv_data(sd,transArray, sizeof(tlist_node_t) * size);
+
+ // add into currentTransactionList
+ for(j = 0 ; j < size; j ++) {
+ tmp = tlistSearch(currentTransactionList,transArray[j].transid);
+
+ if(tmp == NULL) {
+ currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j]));
+ }
+ else {
+ if(tmp->decision == DECISION_LOST)
+ {
+ tmp->decision = transArray[j].decision;
+ }
+ }
+ } // j loop
+ }
+ } // i loop
+
+ // current transaction list is completed
+ // now see if any transaction is still missing
+ walker = currentTransactionList->head;
+
+ while(walker) {
+ if(walker->decision == DECISION_LOST) {
+ for(i = 0 ; i < numHostsInSystem; i++) {
+ if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
+ {
+ char request = REQUEST_TRANS_CHECK;
+ char respond;
+
+ send_data(sdlist[i], &request, sizeof(char));
+ send_data(sdlist[i], &(walker->transid), sizeof(unsigned int));
+
+ recv_data(sdlist[i], &respond, sizeof(char));
+
+ if(respond > 0)
+ {
+ walker->decision = respond;
+ break;
+ }
+ }
+ else if(hostIpAddrs[i] == myIpAddr)
+ {
+ char decision = checkDecision(walker->transid);
+ if(decision > 0) {
+ walker->decision = decision;
+ break;
+ }
+ }
+ } // i loop
+
+ if(walker->decision == DECISION_LOST) {
+ printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid);
+ walker->decision = TRANS_ABORT;
+ }
+ }
+ walker = walker->next;
+ } // while loop
+
+ printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size);
+
+ for(i = 0; i < numHostsInSystem; i++) {
+ if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
+ {
+ char request = REQUEST_TRANS_COMPLETE;
+ send_data(sdlist[i], &request,sizeof(char));
+ }
+ }
+
+ *tlist = currentTransactionList;
+ tlistPrint(currentTransactionList);
+
+ printf("%s -> End\n",__func__);
+}
+
+// send out current on-going transaction
+void releaseTransactionLists(tlist_t* tlist,int* sdlist)
+{
+ printf("%s -> Enter\n",__func__);
+ int size;
+ tlist_node_t* tArray = tlistToArray(tlist,&size);
+ int i;
+ char response;
+ int flag;
+
+ printf("%s -> size : %d\n",__func__,size);
+
+ for(i = 0; i < numHostsInSystem; i++)
+ {
+ if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
+ {
+// printf("%s -> Sent to sd : %d\n",__func__,sdlist[i]);
+
+ if(size == 0) {
+ size = -1;
+ send_data(sdlist[i],&size,sizeof(int));
+ }
+ else {
+ send_data(sdlist[i],&size,sizeof(int));
+ send_data(sdlist[i],tArray,sizeof(tlist_node_t) * size);
+ }
+ }
+ else {
+ flag = combineTransactionList(tArray,size);
+
+ if(flag == 0) {
+ printf("%s -> problem\n",__func__);
+ exit(0);
+ }
+ }
+ }
+
+ if(size > 0)
+ free(tArray);
+
+ for(i = 0; i < numHostsInSystem; i ++) {
+ if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
+ {
+ // printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+ recv_data(sdlist[i], &response, sizeof(char));
+
+ if(response != TRANS_OK)
+ {
+ printf("%s -> response : %d Need to fix\n",__func__,response);
+ }
+ }
+ }
+
+ printf("%s -> End\n",__func__);
+}
+
+void restartTransactions()
+{
+ int i;
+ int sd;
+ printf("%s -> Enter\n",__func__);
+ for(i = 0; i < numHostsInSystem; i++) {
+ if(hostIpAddrs[i] == myIpAddr) {
+ pthread_mutex_lock(&liveHosts_mutex);
+ okCommit = TRANS_OK;
+ pthread_mutex_unlock(&liveHosts_mutex);
+ continue;
+ }
+ if(liveHosts[i] == 1)
+ {
+ if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0)
+ {
+ printf("%s -> socket create error sd : %d\n",__func__,sd);
+ exit(0);
+ }
+ else {
+ char request = REQUEST_TRANS_RESTART;
+
+ send_data(sd, &request, sizeof(char));
+
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
+ }
+ }
+ }
+ printf("%s -> End\n",__func__);
+}
+
#endif
machinenum = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid));
flipBit ^= 1;
#ifdef DEBUG
- printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber));
+// printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber));
#endif
#endif
insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
{
msg[0] = START_REMOTE_THREAD;
*((unsigned int *) &msg[1]) = oid;
+
send_data(sock, msg, 1 + sizeof(unsigned int));
}
return count;
}
+// if flag = TRANS_OK, allow transactions
+// flag = TRANS_WAIT, stop transactins
int updateLiveHostsCommit() {
#ifdef DEBUG
printf("%s -> Enter\n",__func__);
}
}
+// check the passed machine if it is still alive
+void updateLiveHostsList(int mid)
+{
+ int mIndex = findHost(mid);
+ int sd;
+
+ printf("%s -> Enter with %s\n",__func__,midtoIPString(mid));
+
+ if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[mIndex])) < 0) {
+ liveHosts[mIndex] = 0;
+ numLiveHostsInSystem--;
+ printf("%s -> 111End with %s\n",__func__,midtoIPString(mid));
+ return;
+ }
+
+ char liverequest = RESPOND_LIVE;
+
+ send_data(sd, &liverequest, sizeof(char));
+
+ char response = 0;
+ int timeout = recv_data(sd, &response, sizeof(char));
+
+ if(response != LIVE) {
+ liveHosts[mIndex] = 0;
+ numLiveHostsInSystem--;
+ }
+
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[mIndex],sd);
+ printf("%s -> 222End with %s\n",__func__,midtoIPString(mid));
+ return;
+
+}
+
+// rearrange object location array of leader machine
void setReLocateObjHosts(int mid)
{
int mIndex = findHost(mid);
recoverStat[numRecovery-1].deadMachine = mid;
#endif
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));
#endif
//this needs to be changed.
unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
- unsigned int originalMid = getDuplicatedPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
+ unsigned int originalMid = getPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
-#ifdef DEBUG
- printf("%s-> backupMid: [%s], ", __func__, midtoIPString(backupMid));
- printf("originalMid: [%s]\n", midtoIPString(originalMid));
+#ifndef DEBUG
+ printf("%s-> backupMid: %d\t[%s]", __func__, backupMid,midtoIPString(backupMid));
+ printf("originalMid: %d\t[%s]\n", originalMid,midtoIPString(originalMid));
printHostsStatus();
#endif
- setReLocateObjHosts(mid);
//connect to these machines
//go through their object store copying necessary (in a transaction)
* Backup 26 21,24
*/
-#ifdef RECOVERYSTATS
- dupeSize = 0;
-#endif
+ if(((psd = getSockWithLock(transRequestSockPool, originalMid)) < 0 ) ||
+ ((bsd = getSockWithLock(transRequestSockPool,backupMid)) <0)) {
- if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
+ printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd);
printf("%s -> Socket create error\n",__func__);
- exit(0);
+
+ while(1)
+ sleep(10);
}
/* request for original */
send_data(bsd, &originalMid, sizeof(unsigned int));
char p_response,b_response;
- unsigned int p_receivedSize,b_receivedSize;
+ unsigned int p_receivedSize,b_receivedSize;
recv_data(psd, &p_response, sizeof(char));
recv_data(psd, &p_receivedSize, sizeof(unsigned int));
exit(0);
}
- freeSockWithLock(transPrefetchSockPool, originalMid, psd);
- freeSockWithLock(transPrefetchSockPool, backupMid, bsd);
+ freeSockWithLock(transRequestSockPool, originalMid, psd);
+ freeSockWithLock(transRequestSockPool, backupMid, bsd);
#ifdef RECOVERYSTATS
fi = myrdtsc();
printf("paxosPrepare(): socket create error\n");
continue;
}
-#ifdef DEBUG
- printf("%s-> Send PAXOS_PREPARE to mid [%s] with my_n=%d\n", __func__, midtoIPString(hostIpAddrs[i]), my_n);
-#endif
send_data(sd, &control, sizeof(char));
send_data(sd, &my_n, sizeof(int));
int timeout = recv_data(sd, &control, sizeof(char));
if ((sd == -1) || (timeout < 0)) {
-#ifdef DEBUG
- printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
-#endif
continue;
}
cnt++;
recv_data(sd, &remote_n, sizeof(int));
recv_data(sd, &remote_v, sizeof(int));
-#ifdef DEBUG
- printf("%s-> Received PAXOS_PREPARE_OK from mindex [%d] with remote_v=%s\n", __func__, i, midtoIPString(remote_v));
-#endif
if(remote_v != origleader) {
if (remote_n > tmp_n) {
tmp_n = remote_n;
freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
-#ifdef DEBUG
- printf("%s-> cnt:%d, numLiveHostsInSystem:%d\n", __func__, cnt, numLiveHostsInSystem);
-#endif
-
if (cnt >= (numLiveHostsInSystem / 2)) { // majority of OK replies
return 1;
}
int sd;
int remote_v = temp_v_a;
-#ifdef DEBUG
- printf("[Accept]...\n");
-#endif
for (i = 0; i < numHostsInSystem; ++i) {
control = PAXOS_ACCEPT;
send_data(sd, &remote_v, sizeof(int));
int timeout = recv_data(sd, &control, sizeof(char));
- if ((sd == -1) || (timeout < 0)) {
-#ifdef DEBUG
- printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
-#endif
+ if (timeout < 0) {
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
continue;
}
{
leader = v_a;
paxosRound++;
-#ifdef DEBUG
- printf("This is my leader!!!: [%s]\n", midtoIPString(leader));
-#endif
continue;
}
if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
continue;
- // printf("paxosLearn(): socket create error, attemp\n");
}
send_data(sd, &control, sizeof(char));
freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
- //return v_a;
}
#endif
#ifdef RECOVERY
void clearDeadThreadsNotification()
{
-
-#ifdef DEBUG
- printf("%s -> Entered\n",__func__);
-#endif
// clear all the threadnotify request first
if(waitThreadID != -1) {
-#ifdef DEBUG
- printf("%s -> I was waitng for %s\n",__func__,midtoIPString(waitThreadMid));
-#endif
int waitThreadIndex = findHost(waitThreadMid);
int i;
notifydata_t *ndata;
}
}
-#ifdef DEBUG
- printf("%s -> Finished\n",__func__);
-#endif
}
/* request the primary and the backup machines to clear