From 12bcf905f76b6eab19ee2a78f2c4f712334d9b28 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Sat, 3 Apr 2010 07:18:55 +0000 Subject: [PATCH] paxos in separate file. still problem in transaction clearing --- .../Runtime/DSTM/interface_recovery/dstm.h | 21 +- .../DSTM/interface_recovery/dstmserver.c | 56 +--- .../Runtime/DSTM/interface_recovery/paxos.c | 292 +++++++++++++++++ .../Runtime/DSTM/interface_recovery/paxos.h | 38 +++ .../Runtime/DSTM/interface_recovery/trans.c | 303 ++++-------------- 5 files changed, 392 insertions(+), 318 deletions(-) create mode 100644 Robust/src/Runtime/DSTM/interface_recovery/paxos.c create mode 100644 Robust/src/Runtime/DSTM/interface_recovery/paxos.h diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index a5e9b6f9..2800d8ab 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -64,18 +64,6 @@ #define DUPLICATION_COMPLETE 35 #define RECEIVE_DUPES 36 -/********************************* - * Paxos Messages - *******************************/ -#define PAXOS_PREPARE 40 -#define PAXOS_PREPARE_REJECT 41 -#define PAXOS_PREPARE_OK 42 -#define PAXOS_ACCEPT 43 -#define PAXOS_ACCEPT_REJECT 44 -#define PAXOS_ACCEPT_OK 45 -#define PAXOS_LEARN 46 -#define DELETE_LEADER 47 - /********************************* * Transaction Clear Messages *********************************/ @@ -132,6 +120,7 @@ #endif #ifdef RECOVERY #include "translist.h" +#include "paxos.h" #endif //bit designations for status field of objheader @@ -338,12 +327,6 @@ void restartTransactions(); int readDuplicateObjs(int); void printRecoveryStat(); -/* Paxo's algorithm */ -int paxos(); -int paxosPrepare(); -int paxosAccept(); -void paxosLearn(); - #endif /* Prototypes for server portion */ @@ -401,7 +384,7 @@ char decideResponse(char *, char *, int); // Coordinator decides what response void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, char *); int transComProcess(trans_req_data_t *, trans_commit_data_t *); -void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *); +char doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *); int transAbortProcess(trans_commit_data_t *); void transAbort(); void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index b43beb15..5a93f44c 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -53,17 +53,7 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a sockPoolHashTable_t *transPResponseSocketPool; #ifdef RECOVERY -/****************************** - * Global variables for Paxos - ******************************/ -extern int n_a; -extern unsigned int v_a; -extern int n_h; -extern int my_n; -extern int leader; -extern int paxosRound; -/* This function initializes the main objects store and creates the - * global machine and location lookup table */ +extern unsigned int leader; long long myrdtsc(void) { @@ -74,6 +64,8 @@ long long myrdtsc(void) #endif +/* This function initializes the main objects store and creates the + * global machine and location lookup table */ int dstmInit(void) { mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); /* Initialize attribute for mutex */ @@ -726,57 +718,21 @@ void *dstmAccept(void *acceptfd) { #ifdef DEBUG printf("control -> PAXOS_PREPARE\n"); #endif - recv_data((int)acceptfd, &val, sizeof(int)); - if (val <= n_h) { - control = PAXOS_PREPARE_REJECT; - send_data((int)acceptfd, &control, sizeof(char)); - } - else { - n_h = val; - control = PAXOS_PREPARE_OK; - - send_data((int)acceptfd, &control, sizeof(char)); - send_data((int)acceptfd, &n_a, sizeof(int)); - send_data((int)acceptfd, &v_a, sizeof(int)); - } + paxosPrepare_receiver((int)acceptfd); break; case PAXOS_ACCEPT: #ifdef DEBUG printf("control -> PAXOS_ACCEPT\n"); #endif - recv_data((int)acceptfd, &n, sizeof(int)); - recv_data((int)acceptfd, &v, sizeof(int)); - if (n < n_h) { - control = PAXOS_ACCEPT_REJECT; - send_data((int)acceptfd, &control, sizeof(char)); - } - else { - n_a = n; - v_a = v; - n_h = n; - control = PAXOS_ACCEPT_OK; - send_data((int)acceptfd, &control, sizeof(char)); - } + paxosAccept_receiver((int)acceptfd); break; case PAXOS_LEARN: #ifdef DEBUG printf("control -> PAXOS_LEARN\n"); #endif - recv_data((int)acceptfd, &v, sizeof(int)); - leader = v_a; - paxosRound++; -#ifdef DEBUG - printf("%s (PAXOS_LEARN)-> This is my leader!: [%s]\n", __func__, midtoIPString(leader)); -#endif - break; - - case DELETE_LEADER: -#ifdef DEBUG - printf("control -> DELETE_LEADER\n"); -#endif - v_a = 0; + leader = paxosLearn_receiver((int)acceptfd); break; #endif default: diff --git a/Robust/src/Runtime/DSTM/interface_recovery/paxos.c b/Robust/src/Runtime/DSTM/interface_recovery/paxos.c new file mode 100644 index 00000000..2d17294a --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface_recovery/paxos.c @@ -0,0 +1,292 @@ +/* Paxo Algorithm: + * Executes when the known leader has failed. + * Guarantees consensus on next leader among all live hosts. */ + +#include "paxos.h" + +/****************************** + * Global variables for Paxos + ******************************/ +int n_a; +unsigned int v_a; +int n_h; +int my_n; +unsigned int leader; +unsigned int origleader; +unsigned int temp_v_a; +int paxosRound; + +unsigned int myIpAddr; +int numHostsInSystem; +int numLiveHostsInSystem; + +int* hostIpAddrs; +int* liveHosts; + +int paxos(int* hostIp,int* hosts,unsigned int myIp,int nHosts,int nLiveHosts) +{ + int origRound = paxosRound; + origleader = leader; + int ret = -1; + + numHostsInSystem = nHosts; + numLiveHostsInSystem =nLiveHosts; + myIpAddr = myIp; + hostIpAddrs = hostIp; + liveHosts = hosts; + + do { + ret = paxosPrepare(); // phase 1 + if (ret == 1) { + ret = paxosAccept(); // phase 2 + if (ret == 1) { + paxosLearn(); // phase 3 + break; + } + } + // Paxos not successful; wait and retry if new leader is not yet slected + sleep(WAIT_TIME); + if(paxosRound != origRound) + break; + } while (ret == -1); + +#ifdef DEBUG + printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader); +#endif + + return leader; +} + +int paxosPrepare() +{ + struct sockaddr_in remoteAddr; + char control; + int remote_n; + int remote_v; + int tmp_n = -1; + int cnt = 0; + int sd; + int i; + temp_v_a = v_a; + my_n = n_h + 1; + +#ifdef DEBUG + printf("[Prepare]...\n"); +#endif + + temp_v_a = myIpAddr; // if no other value is proposed, make this machine the new leader + + for (i = 0; i < numHostsInSystem; ++i) { + control = PAXOS_PREPARE; + if(!liveHosts[i]) + continue; + + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("paxosPrepare(): socket create error\n"); + continue; + } else { + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(hostIpAddrs[i]); +// printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i])); + + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { + printf("%s -> socket connect error\n",__func__); + continue; + } + + send_data(sd, &control, sizeof(char)); + send_data(sd, &my_n, sizeof(int)); + int timeout = recv_data(sd, &control, sizeof(char)); + if ((sd == -1) || (timeout < 0)) { + continue; + } + + switch (control) { + case PAXOS_PREPARE_OK: + cnt++; + recv_data(sd, &remote_n, sizeof(int)); + recv_data(sd, &remote_v, sizeof(int)); + if(remote_v != origleader) { + if (remote_n > tmp_n) { + tmp_n = remote_n; + temp_v_a = remote_v; + } + } + break; + case PAXOS_PREPARE_REJECT: + break; + } + close(sd); + } + } + + if (cnt >= (numLiveHostsInSystem / 2)) { // majority of OK replies + return 1; + } + else { + return -1; + } +} + +int paxosAccept() +{ + struct sockaddr_in remoteAddr; + char control; + int i; + int cnt = 0; + int sd; + int remote_v = temp_v_a; + + for (i = 0; i < numHostsInSystem; ++i) { + control = PAXOS_ACCEPT; + + if(!liveHosts[i]) + continue; + + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("paxosPrepare(): socket create error\n"); + continue; + } else { + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(hostIpAddrs[i]); + + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { + printf("%s -> socket connect error\n",__func__); + continue; + } + + send_data(sd, &control, sizeof(char)); + send_data(sd, &my_n, sizeof(int)); + send_data(sd, &remote_v, sizeof(int)); + + int timeout = recv_data(sd, &control, sizeof(char)); + if (timeout < 0) { + close(sd); + continue; + } + + switch (control) { + case PAXOS_ACCEPT_OK: + cnt++; + break; + case PAXOS_ACCEPT_REJECT: + break; + } + close(sd); + } + } + + if (cnt >= (numLiveHostsInSystem / 2)) { + return 1; + } + else { + return -1; + } +} + +void paxosLearn() +{ + char control; + struct sockaddr_in remoteAddr; + int sd; + int i; + +#ifdef DEBUG + printf("[Learn]...\n"); +#endif + + control = PAXOS_LEARN; + + for (i = 0; i < numHostsInSystem; ++i) { + if(!liveHosts[i]) + continue; + if(hostIpAddrs[i] == myIpAddr) + { + leader = v_a; + paxosRound++; + continue; + } + + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("paxosPrepare(): socket create error\n"); + continue; + } else { + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(hostIpAddrs[i]); +// printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i])); + + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { + printf("%s -> socket connect error\n",__func__); + continue; + } + + send_data(sd, &control, sizeof(char)); + send_data(sd, &v_a, sizeof(int)); + + close(sd); + } + } +} + +void paxosPrepare_receiver(int acceptfd) +{ + int val; + char control; + + recv_data((int)acceptfd, &val, sizeof(int)); + + if (val <= n_h) { + control = PAXOS_PREPARE_REJECT; + send_data((int)acceptfd, &control, sizeof(char)); + } + else { + n_h = val; + control = PAXOS_PREPARE_OK; + + send_data((int)acceptfd, &control, sizeof(char)); + send_data((int)acceptfd, &n_a, sizeof(int)); + send_data((int)acceptfd, &v_a, sizeof(int)); + } +} + +void paxosAccept_receiver(int acceptfd) +{ + int n,v; + char control; + + recv_data((int)acceptfd, &n, sizeof(int)); + recv_data((int)acceptfd, &v, sizeof(int)); + + if (n < n_h) { + control = PAXOS_ACCEPT_REJECT; + send_data((int)acceptfd, &control, sizeof(char)); + } + else { + n_a = n; + v_a = v; + n_h = n; + control = PAXOS_ACCEPT_OK; + send_data((int)acceptfd, &control, sizeof(char)); + } +} + + +int paxosLearn_receiver(int acceptfd) +{ + int v; + + recv_data((int)acceptfd, &v, sizeof(int)); + leader = v_a; + paxosRound++; + v_a = 0; + + return leader; +} + + diff --git a/Robust/src/Runtime/DSTM/interface_recovery/paxos.h b/Robust/src/Runtime/DSTM/interface_recovery/paxos.h new file mode 100644 index 00000000..c0e46520 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface_recovery/paxos.h @@ -0,0 +1,38 @@ +/* Paxo Algorithm: + * Executes when the known leader has failed. + * Guarantees consensus on next leader among all live hosts. */ + +#ifndef _PAXOS_H_ +#define _PAXOS_H_ + +#include "dstm.h" + +#define WAIT_TIME 3 + +/********************************* + * Paxos Messages + *******************************/ +#define PAXOS_PREPARE 40 +#define PAXOS_PREPARE_REJECT 41 +#define PAXOS_PREPARE_OK 42 +#define PAXOS_ACCEPT 43 +#define PAXOS_ACCEPT_REJECT 44 +#define PAXOS_ACCEPT_OK 45 +#define PAXOS_LEARN 46 +#define DELETE_LEADER 47 + + +/* Paxo's algorithm */ + +/* coordinator side */ +int paxos(int* hostIpAddrs,int* liveHosts,unsigned int myIpAddr,int numHostsInSystem,int numLiveHostsInSystem); +int paxosPrepare(); +int paxosAccept(); +void paxosLearn(); + +/* participant side */ +void paxosPrepare_receiver(int acceptfd); +void paxosAccept_receiver(int acceptfd); +int paxosLearn_receiver(int acceptfd); + +#endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 5636064b..6a197c47 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -111,17 +111,7 @@ char ip[16]; // for debugging purpose extern tlist_t* transList; extern pthread_mutex_t clearNotifyList_mutex; -/****************************** - * Global variables for Paxos - ******************************/ -int n_a; -unsigned int v_a; -int n_h; -int my_n; -unsigned int leader; -unsigned int origleader; -unsigned int temp_v_a; -int paxosRound; +extern unsigned int leader; #ifdef RECOVERYSTATS int numRecovery = 0; @@ -533,7 +523,7 @@ int dstmStartup(const char * option) { updateLiveHosts(); setLocateObjHosts(); updateLiveHostsCommit(); - paxos(); + leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem); // printHostsStatus(); if(!allHostsLive()) { printf("Not all hosts live. Exiting.\n"); @@ -1162,6 +1152,15 @@ int transCommit() { } #endif +#ifdef RECOVERY + while(okCommit != TRANS_OK) { + printf("%s -> new Transactin is waiting\n",__func__); + sleep(2); + } + + transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK); +#endif + int treplyretryCount = 0; /* Initialize timeout for exponential delay */ exponential_backoff.tv_sec = 0; @@ -1343,21 +1342,12 @@ int transCommit() { if(timeout < 0) { deadmid = listmid[i]; deadsd = sd; -#ifdef DEBUG - printf("%s -> Dead Machine ID : %s\n",__func__,midtoIPString(deadmid)); - printf("%s -> Dead SD : %d\n",__func__,sd); -#endif getReplyCtrl[i] = TRANS_DISAGREE; } #endif } } -#ifdef DEBUG - printf("%s-> Decide final response now\n", __func__); -#endif - - /* Decide the final response */ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); @@ -1365,8 +1355,16 @@ int transCommit() { free(listmid); return 1; } -#ifdef DEBUG - printf("%s-> Final Response: %d\n", __func__, (int)finalResponse); + +#ifdef RECOVERY +// wait until leader fix the system + if(okCommit != TRANS_OK) { + while(okCommit != TRANS_OK) { + printf("%s -> Coordinator is waiting finalResponse : %d\n",__func__,finalResponse); + sleep(1); + } + finalResponse = TRANS_ABORT; + } #endif #ifdef CACHE @@ -1413,13 +1411,9 @@ int transCommit() { #endif #endif send_data(sd,&finalResponse,sizeof(char)); -#ifdef DEBUG - printf("%s -> Decision Sent to %s\n",__func__,midtoIPString(listmid[i])); -#endif - } else { /* Complete local processing */ - doLocalProcess(finalResponse, &(tosend[i]), &transinfo); + finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo); #ifdef ABORTREADERS if(finalResponse == TRANS_COMMIT) { @@ -1450,10 +1444,6 @@ int transCommit() { pDelete(pile_ptr); /* wait a random amount of time before retrying to commit transaction*/ if(treplyretry) { - //treplyretryCount++; - //if(treplyretryCount >= NUM_TRY_TO_COMMIT) - // exponentialdelay(); - //else randomdelay(); #ifdef TRANSSTATS nSoftAbort++; @@ -1461,6 +1451,15 @@ int transCommit() { } } while (treplyretry && deadmid != -1); +#ifdef RECOVERY + tlist_node_t* tNode = tlistSearch(transList,transID); + tNode->status = TRANS_OK; + + pthread_mutex_lock(&clearNotifyList_mutex); + transList = tlistRemove(transList,transID); + pthread_mutex_unlock(&clearNotifyList_mutex); +#endif + if(finalResponse == TRANS_ABORT) { #ifdef TRANSSTATS numTransAbort++; @@ -1471,9 +1470,6 @@ int transCommit() { #ifdef RECOVERY if(deadmid != -1) { /* if deadmid is greater than or equal to 0, then there is dead machine. */ -#ifdef DEBUG - printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid)); -#endif notifyLeaderDeadMachine(deadmid); } #endif @@ -1508,10 +1504,6 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha unsigned int oid; unsigned short version; -#ifdef RECOVERY - transList = tlistInsertNode(transList,tdata->f.transid,TRYING_TO_COMMIT,TRANS_OK); -#endif - /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for @@ -1560,7 +1552,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha } } -void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { +char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { #ifdef RECOVERY finalResponse = inspectTransaction(finalResponse,tdata->f.transid); @@ -1583,11 +1575,6 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da printf("ERROR...No Decision\n"); } -#ifdef RECOVERY - pthread_mutex_lock(&clearNotifyList_mutex); - transList = tlistRemove(transList,tdata->f.transid); - pthread_mutex_unlock(&clearNotifyList_mutex); -#endif /* Free memory */ if (transinfo->objlocked != NULL) { @@ -1596,6 +1583,8 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da if (transinfo->objnotfound != NULL) { free(transinfo->objnotfound); } + + return finalResponse; } /* This function decides the reponse that needs to be sent to @@ -1764,7 +1753,7 @@ void notifyLeaderDeadMachine(unsigned int deadHost) { } if(deadHost == leader) // if leader is dead, then pick a new leader - paxos(); + leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem); #ifdef DEBUG printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER"); @@ -1819,6 +1808,17 @@ void restoreDuplicationState(unsigned int deadHost) { printf("%s -> Entering\n",__func__); +#ifdef RECOVERYSTATS + printf("Recovery Start\n"); + numRecovery++; + long long st; + long long fi; + unsigned int dupeSize = 0; // to calculate the size of backed up data + + st = myrdtsc(); // to get clock + recoverStat[numRecovery-1].deadMachine = deadHost; +#endif + // update leader's live host list and object locations updateLiveHostsList(deadHost); setReLocateObjHosts(deadHost); @@ -1840,6 +1840,12 @@ void restoreDuplicationState(unsigned int deadHost) // restart transactions restartTransactions(); +#ifdef RECOVERYSTATS + fi = myrdtsc(); + recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ; + printRecoveryStat(); +#endif + printf("%s -> Exiting\n",__func__); } @@ -3226,12 +3232,9 @@ void updateLiveHostsList(int mid) int mIndex = findHost(mid); int sd; - printf("%s -> Enter with %s\n",__func__,midtoIPString(mid)); - if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[mIndex])) < 0) { liveHosts[mIndex] = 0; numLiveHostsInSystem--; - printf("%s -> 111End with %s\n",__func__,midtoIPString(mid)); return; } @@ -3248,7 +3251,6 @@ void updateLiveHostsList(int mid) } freeSockWithLock(transPrefetchSockPool,hostIpAddrs[mIndex],sd); - printf("%s -> 222End with %s\n",__func__,midtoIPString(mid)); return; } @@ -3316,27 +3318,14 @@ int allHostsLive() { void duplicateLostObjects(unsigned int mid){ #ifdef RECOVERYSTATS - printf("Recovery Start\n"); - numRecovery++; - long long st; - long long fi; - unsigned int dupeSize = 0; // to calculate the size of backed up data - - st = myrdtsc(); // to get clock - recoverStat[numRecovery-1].deadMachine = mid; -#endif - -#ifdef DEBUG - printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid)); + unsigned int dupeSize = 0; #endif //this needs to be changed. unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine unsigned int originalMid = getPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine. -#ifndef DEBUG - printf("%s-> backupMid: %d\t[%s]", __func__, backupMid,midtoIPString(backupMid)); - printf("originalMid: %d\t[%s]\n", originalMid,midtoIPString(originalMid)); +#ifdef DEBUG printHostsStatus(); #endif @@ -3405,10 +3394,7 @@ void duplicateLostObjects(unsigned int mid){ freeSockWithLock(transRequestSockPool, backupMid, bsd); #ifdef RECOVERYSTATS - fi = myrdtsc(); - recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ; recoverStat[numRecovery-1].recoveredData = dupeSize; - printRecoveryStat(); #endif #ifndef DEBUG @@ -3953,187 +3939,6 @@ plistnode_t *sortPiles(plistnode_t *pileptr) { return pileptr; } -#ifdef RECOVERY -/* Paxo Algorithm: - * Executes when the known leader has failed. - * Guarantees consensus on next leader among all live hosts. */ -int paxos() -{ - int origRound = paxosRound; - origleader = leader; - int ret = -1; -#ifdef DEBUG - printf(">> Debug : Starting paxos..\n"); -#endif - - do { - ret = paxosPrepare(); // phase 1 - if (ret == 1) { - ret = paxosAccept(); // phase 2 - if (ret == 1) { - paxosLearn(); // phase 3 - break; - } - } - // Paxos not successful; wait and retry if new leader is not yet slected - sleep(WAIT_TIME); - if(paxosRound != origRound) - break; - } while (ret == -1); - -#ifdef DEBUG - printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader); -#endif - - return ret; -} - -int paxosPrepare() -{ - char control; - //int origleader = leader; - int remote_n; - int remote_v; - int tmp_n = -1; - int cnt = 0; - int sd; - int i; - temp_v_a = v_a; - my_n = n_h + 1; - -#ifdef DEBUG - printf("[Prepare]...\n"); -#endif - - temp_v_a = myIpAddr; // if no other value is proposed, make this machine the new leader - - for (i = 0; i < numHostsInSystem; ++i) { - control = PAXOS_PREPARE; - if(!liveHosts[i]) - continue; - - if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { - printf("paxosPrepare(): socket create error\n"); - continue; - } - send_data(sd, &control, sizeof(char)); - send_data(sd, &my_n, sizeof(int)); - int timeout = recv_data(sd, &control, sizeof(char)); - if ((sd == -1) || (timeout < 0)) { - continue; - } - - switch (control) { - case PAXOS_PREPARE_OK: - cnt++; - recv_data(sd, &remote_n, sizeof(int)); - recv_data(sd, &remote_v, sizeof(int)); - if(remote_v != origleader) { - if (remote_n > tmp_n) { - tmp_n = remote_n; - temp_v_a = remote_v; - } - } - break; - case PAXOS_PREPARE_REJECT: - break; - } - - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); - } - - if (cnt >= (numLiveHostsInSystem / 2)) { // majority of OK replies - return 1; - } - else { - return -1; - } -} - -int paxosAccept() -{ - char control; - int i; - int cnt = 0; - int sd; - int remote_v = temp_v_a; - - for (i = 0; i < numHostsInSystem; ++i) { - control = PAXOS_ACCEPT; - - if(!liveHosts[i]) - continue; - - if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { - printf("paxosAccept(): socket create error\n"); - continue; - } - - send_data(sd, &control, sizeof(char)); - send_data(sd, &my_n, sizeof(int)); - send_data(sd, &remote_v, sizeof(int)); - - int timeout = recv_data(sd, &control, sizeof(char)); - if (timeout < 0) { - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); - continue; - } - - switch (control) { - case PAXOS_ACCEPT_OK: - cnt++; - break; - case PAXOS_ACCEPT_REJECT: - break; - } -#ifdef DEBUG - printf(">> Debug : Accept - n_h [%d], n_a [%d], v_a [%s]\n", n_h, n_a, midtoIPString(v_a)); -#endif - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); - } - - if (cnt >= (numLiveHostsInSystem / 2)) { - return 1; - } - else { - return -1; - } -} - -void paxosLearn() -{ - char control; - int sd; - int i; - -#ifdef DEBUG - printf("[Learn]...\n"); -#endif - - control = PAXOS_LEARN; - - for (i = 0; i < numHostsInSystem; ++i) { - if(!liveHosts[i]) - continue; - if(hostIpAddrs[i] == myIpAddr) - { - leader = v_a; - paxosRound++; - continue; - } - if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { - continue; - } - - send_data(sd, &control, sizeof(char)); - send_data(sd, &v_a, sizeof(int)); - - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); - - } -} -#endif - #ifdef RECOVERY void clearDeadThreadsNotification() { -- 2.34.1