#define DUPLICATION_COMPLETE 35
#define RECEIVE_DUPES 36
-/*********************************
- * Paxos Messages
- *******************************/
-#define PAXOS_PREPARE 40
-#define PAXOS_PREPARE_REJECT 41
-#define PAXOS_PREPARE_OK 42
-#define PAXOS_ACCEPT 43
-#define PAXOS_ACCEPT_REJECT 44
-#define PAXOS_ACCEPT_OK 45
-#define PAXOS_LEARN 46
-#define DELETE_LEADER 47
-
/*********************************
* Transaction Clear Messages
*********************************/
#endif
#ifdef RECOVERY
#include "translist.h"
+#include "paxos.h"
#endif
//bit designations for status field of objheader
int readDuplicateObjs(int);
void printRecoveryStat();
-/* Paxo's algorithm */
-int paxos();
-int paxosPrepare();
-int paxosAccept();
-void paxosLearn();
-
#endif
/* Prototypes for server portion */
void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine
void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, char *);
int transComProcess(trans_req_data_t *, trans_commit_data_t *);
-void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *);
+char doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *);
int transAbortProcess(trans_commit_data_t *);
void transAbort();
void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
sockPoolHashTable_t *transPResponseSocketPool;
#ifdef RECOVERY
-/******************************
- * Global variables for Paxos
- ******************************/
-extern int n_a;
-extern unsigned int v_a;
-extern int n_h;
-extern int my_n;
-extern int leader;
-extern int paxosRound;
-/* This function initializes the main objects store and creates the
- * global machine and location lookup table */
+extern unsigned int leader;
long long myrdtsc(void)
{
#endif
+/* This function initializes the main objects store and creates the
+ * global machine and location lookup table */
int dstmInit(void) {
mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
/* Initialize attribute for mutex */
#ifdef DEBUG
printf("control -> PAXOS_PREPARE\n");
#endif
- recv_data((int)acceptfd, &val, sizeof(int));
- if (val <= n_h) {
- control = PAXOS_PREPARE_REJECT;
- send_data((int)acceptfd, &control, sizeof(char));
- }
- else {
- n_h = val;
- control = PAXOS_PREPARE_OK;
-
- send_data((int)acceptfd, &control, sizeof(char));
- send_data((int)acceptfd, &n_a, sizeof(int));
- send_data((int)acceptfd, &v_a, sizeof(int));
- }
+ paxosPrepare_receiver((int)acceptfd);
break;
case PAXOS_ACCEPT:
#ifdef DEBUG
printf("control -> PAXOS_ACCEPT\n");
#endif
- recv_data((int)acceptfd, &n, sizeof(int));
- recv_data((int)acceptfd, &v, sizeof(int));
- if (n < n_h) {
- control = PAXOS_ACCEPT_REJECT;
- send_data((int)acceptfd, &control, sizeof(char));
- }
- else {
- n_a = n;
- v_a = v;
- n_h = n;
- control = PAXOS_ACCEPT_OK;
- send_data((int)acceptfd, &control, sizeof(char));
- }
+ paxosAccept_receiver((int)acceptfd);
break;
case PAXOS_LEARN:
#ifdef DEBUG
printf("control -> PAXOS_LEARN\n");
#endif
- recv_data((int)acceptfd, &v, sizeof(int));
- leader = v_a;
- paxosRound++;
-#ifdef DEBUG
- printf("%s (PAXOS_LEARN)-> This is my leader!: [%s]\n", __func__, midtoIPString(leader));
-#endif
- break;
-
- case DELETE_LEADER:
-#ifdef DEBUG
- printf("control -> DELETE_LEADER\n");
-#endif
- v_a = 0;
+ leader = paxosLearn_receiver((int)acceptfd);
break;
#endif
default:
--- /dev/null
+/* Paxo Algorithm:
+ * Executes when the known leader has failed.
+ * Guarantees consensus on next leader among all live hosts. */
+
+#include "paxos.h"
+
+/******************************
+ * Global variables for Paxos
+ ******************************/
+int n_a;
+unsigned int v_a;
+int n_h;
+int my_n;
+unsigned int leader;
+unsigned int origleader;
+unsigned int temp_v_a;
+int paxosRound;
+
+unsigned int myIpAddr;
+int numHostsInSystem;
+int numLiveHostsInSystem;
+
+int* hostIpAddrs;
+int* liveHosts;
+
+int paxos(int* hostIp,int* hosts,unsigned int myIp,int nHosts,int nLiveHosts)
+{
+ int origRound = paxosRound;
+ origleader = leader;
+ int ret = -1;
+
+ numHostsInSystem = nHosts;
+ numLiveHostsInSystem =nLiveHosts;
+ myIpAddr = myIp;
+ hostIpAddrs = hostIp;
+ liveHosts = hosts;
+
+ do {
+ ret = paxosPrepare(); // phase 1
+ if (ret == 1) {
+ ret = paxosAccept(); // phase 2
+ if (ret == 1) {
+ paxosLearn(); // phase 3
+ break;
+ }
+ }
+ // Paxos not successful; wait and retry if new leader is not yet slected
+ sleep(WAIT_TIME);
+ if(paxosRound != origRound)
+ break;
+ } while (ret == -1);
+
+#ifdef DEBUG
+ printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader);
+#endif
+
+ return leader;
+}
+
+int paxosPrepare()
+{
+ struct sockaddr_in remoteAddr;
+ char control;
+ int remote_n;
+ int remote_v;
+ int tmp_n = -1;
+ int cnt = 0;
+ int sd;
+ int i;
+ temp_v_a = v_a;
+ my_n = n_h + 1;
+
+#ifdef DEBUG
+ printf("[Prepare]...\n");
+#endif
+
+ temp_v_a = myIpAddr; // if no other value is proposed, make this machine the new leader
+
+ for (i = 0; i < numHostsInSystem; ++i) {
+ control = PAXOS_PREPARE;
+ if(!liveHosts[i])
+ continue;
+
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ printf("paxosPrepare(): socket create error\n");
+ continue;
+ } else {
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(hostIpAddrs[i]);
+// printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i]));
+
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("%s -> socket connect error\n",__func__);
+ continue;
+ }
+
+ send_data(sd, &control, sizeof(char));
+ send_data(sd, &my_n, sizeof(int));
+ int timeout = recv_data(sd, &control, sizeof(char));
+ if ((sd == -1) || (timeout < 0)) {
+ continue;
+ }
+
+ switch (control) {
+ case PAXOS_PREPARE_OK:
+ cnt++;
+ recv_data(sd, &remote_n, sizeof(int));
+ recv_data(sd, &remote_v, sizeof(int));
+ if(remote_v != origleader) {
+ if (remote_n > tmp_n) {
+ tmp_n = remote_n;
+ temp_v_a = remote_v;
+ }
+ }
+ break;
+ case PAXOS_PREPARE_REJECT:
+ break;
+ }
+ close(sd);
+ }
+ }
+
+ if (cnt >= (numLiveHostsInSystem / 2)) { // majority of OK replies
+ return 1;
+ }
+ else {
+ return -1;
+ }
+}
+
+int paxosAccept()
+{
+ struct sockaddr_in remoteAddr;
+ char control;
+ int i;
+ int cnt = 0;
+ int sd;
+ int remote_v = temp_v_a;
+
+ for (i = 0; i < numHostsInSystem; ++i) {
+ control = PAXOS_ACCEPT;
+
+ if(!liveHosts[i])
+ continue;
+
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ printf("paxosPrepare(): socket create error\n");
+ continue;
+ } else {
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(hostIpAddrs[i]);
+
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("%s -> socket connect error\n",__func__);
+ continue;
+ }
+
+ send_data(sd, &control, sizeof(char));
+ send_data(sd, &my_n, sizeof(int));
+ send_data(sd, &remote_v, sizeof(int));
+
+ int timeout = recv_data(sd, &control, sizeof(char));
+ if (timeout < 0) {
+ close(sd);
+ continue;
+ }
+
+ switch (control) {
+ case PAXOS_ACCEPT_OK:
+ cnt++;
+ break;
+ case PAXOS_ACCEPT_REJECT:
+ break;
+ }
+ close(sd);
+ }
+ }
+
+ if (cnt >= (numLiveHostsInSystem / 2)) {
+ return 1;
+ }
+ else {
+ return -1;
+ }
+}
+
+void paxosLearn()
+{
+ char control;
+ struct sockaddr_in remoteAddr;
+ int sd;
+ int i;
+
+#ifdef DEBUG
+ printf("[Learn]...\n");
+#endif
+
+ control = PAXOS_LEARN;
+
+ for (i = 0; i < numHostsInSystem; ++i) {
+ if(!liveHosts[i])
+ continue;
+ if(hostIpAddrs[i] == myIpAddr)
+ {
+ leader = v_a;
+ paxosRound++;
+ continue;
+ }
+
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ printf("paxosPrepare(): socket create error\n");
+ continue;
+ } else {
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(hostIpAddrs[i]);
+// printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i]));
+
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("%s -> socket connect error\n",__func__);
+ continue;
+ }
+
+ send_data(sd, &control, sizeof(char));
+ send_data(sd, &v_a, sizeof(int));
+
+ close(sd);
+ }
+ }
+}
+
+void paxosPrepare_receiver(int acceptfd)
+{
+ int val;
+ char control;
+
+ recv_data((int)acceptfd, &val, sizeof(int));
+
+ if (val <= n_h) {
+ control = PAXOS_PREPARE_REJECT;
+ send_data((int)acceptfd, &control, sizeof(char));
+ }
+ else {
+ n_h = val;
+ control = PAXOS_PREPARE_OK;
+
+ send_data((int)acceptfd, &control, sizeof(char));
+ send_data((int)acceptfd, &n_a, sizeof(int));
+ send_data((int)acceptfd, &v_a, sizeof(int));
+ }
+}
+
+void paxosAccept_receiver(int acceptfd)
+{
+ int n,v;
+ char control;
+
+ recv_data((int)acceptfd, &n, sizeof(int));
+ recv_data((int)acceptfd, &v, sizeof(int));
+
+ if (n < n_h) {
+ control = PAXOS_ACCEPT_REJECT;
+ send_data((int)acceptfd, &control, sizeof(char));
+ }
+ else {
+ n_a = n;
+ v_a = v;
+ n_h = n;
+ control = PAXOS_ACCEPT_OK;
+ send_data((int)acceptfd, &control, sizeof(char));
+ }
+}
+
+
+int paxosLearn_receiver(int acceptfd)
+{
+ int v;
+
+ recv_data((int)acceptfd, &v, sizeof(int));
+ leader = v_a;
+ paxosRound++;
+ v_a = 0;
+
+ return leader;
+}
+
+
--- /dev/null
+/* Paxo Algorithm:
+ * Executes when the known leader has failed.
+ * Guarantees consensus on next leader among all live hosts. */
+
+#ifndef _PAXOS_H_
+#define _PAXOS_H_
+
+#include "dstm.h"
+
+#define WAIT_TIME 3
+
+/*********************************
+ * Paxos Messages
+ *******************************/
+#define PAXOS_PREPARE 40
+#define PAXOS_PREPARE_REJECT 41
+#define PAXOS_PREPARE_OK 42
+#define PAXOS_ACCEPT 43
+#define PAXOS_ACCEPT_REJECT 44
+#define PAXOS_ACCEPT_OK 45
+#define PAXOS_LEARN 46
+#define DELETE_LEADER 47
+
+
+/* Paxo's algorithm */
+
+/* coordinator side */
+int paxos(int* hostIpAddrs,int* liveHosts,unsigned int myIpAddr,int numHostsInSystem,int numLiveHostsInSystem);
+int paxosPrepare();
+int paxosAccept();
+void paxosLearn();
+
+/* participant side */
+void paxosPrepare_receiver(int acceptfd);
+void paxosAccept_receiver(int acceptfd);
+int paxosLearn_receiver(int acceptfd);
+
+#endif
extern tlist_t* transList;
extern pthread_mutex_t clearNotifyList_mutex;
-/******************************
- * Global variables for Paxos
- ******************************/
-int n_a;
-unsigned int v_a;
-int n_h;
-int my_n;
-unsigned int leader;
-unsigned int origleader;
-unsigned int temp_v_a;
-int paxosRound;
+extern unsigned int leader;
#ifdef RECOVERYSTATS
int numRecovery = 0;
updateLiveHosts();
setLocateObjHosts();
updateLiveHostsCommit();
- paxos();
+ leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
// printHostsStatus();
if(!allHostsLive()) {
printf("Not all hosts live. Exiting.\n");
}
#endif
+#ifdef RECOVERY
+ while(okCommit != TRANS_OK) {
+ printf("%s -> new Transactin is waiting\n",__func__);
+ sleep(2);
+ }
+
+ transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK);
+#endif
+
int treplyretryCount = 0;
/* Initialize timeout for exponential delay */
exponential_backoff.tv_sec = 0;
if(timeout < 0) {
deadmid = listmid[i];
deadsd = sd;
-#ifdef DEBUG
- printf("%s -> Dead Machine ID : %s\n",__func__,midtoIPString(deadmid));
- printf("%s -> Dead SD : %d\n",__func__,sd);
-#endif
getReplyCtrl[i] = TRANS_DISAGREE;
}
#endif
}
}
-#ifdef DEBUG
- printf("%s-> Decide final response now\n", __func__);
-#endif
-
-
/* Decide the final response */
if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(listmid);
return 1;
}
-#ifdef DEBUG
- printf("%s-> Final Response: %d\n", __func__, (int)finalResponse);
+
+#ifdef RECOVERY
+// wait until leader fix the system
+ if(okCommit != TRANS_OK) {
+ while(okCommit != TRANS_OK) {
+ printf("%s -> Coordinator is waiting finalResponse : %d\n",__func__,finalResponse);
+ sleep(1);
+ }
+ finalResponse = TRANS_ABORT;
+ }
#endif
#ifdef CACHE
#endif
#endif
send_data(sd,&finalResponse,sizeof(char));
-#ifdef DEBUG
- printf("%s -> Decision Sent to %s\n",__func__,midtoIPString(listmid[i]));
-#endif
-
} else {
/* Complete local processing */
- doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+ finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
#ifdef ABORTREADERS
if(finalResponse == TRANS_COMMIT) {
pDelete(pile_ptr);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
- //treplyretryCount++;
- //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
- // exponentialdelay();
- //else
randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
}
} while (treplyretry && deadmid != -1);
+#ifdef RECOVERY
+ tlist_node_t* tNode = tlistSearch(transList,transID);
+ tNode->status = TRANS_OK;
+
+ pthread_mutex_lock(&clearNotifyList_mutex);
+ transList = tlistRemove(transList,transID);
+ pthread_mutex_unlock(&clearNotifyList_mutex);
+#endif
+
if(finalResponse == TRANS_ABORT) {
#ifdef TRANSSTATS
numTransAbort++;
#ifdef RECOVERY
if(deadmid != -1) { /* if deadmid is greater than or equal to 0,
then there is dead machine. */
-#ifdef DEBUG
- printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
-#endif
notifyLeaderDeadMachine(deadmid);
}
#endif
unsigned int oid;
unsigned short version;
-#ifdef RECOVERY
- transList = tlistInsertNode(transList,tdata->f.transid,TRYING_TO_COMMIT,TRANS_OK);
-#endif
-
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
}
}
-void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
+char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
#ifdef RECOVERY
finalResponse = inspectTransaction(finalResponse,tdata->f.transid);
printf("ERROR...No Decision\n");
}
-#ifdef RECOVERY
- pthread_mutex_lock(&clearNotifyList_mutex);
- transList = tlistRemove(transList,tdata->f.transid);
- pthread_mutex_unlock(&clearNotifyList_mutex);
-#endif
/* Free memory */
if (transinfo->objlocked != NULL) {
if (transinfo->objnotfound != NULL) {
free(transinfo->objnotfound);
}
+
+ return finalResponse;
}
/* This function decides the reponse that needs to be sent to
}
if(deadHost == leader) // if leader is dead, then pick a new leader
- paxos();
+ leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
#ifdef DEBUG
printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER");
{
printf("%s -> Entering\n",__func__);
+#ifdef RECOVERYSTATS
+ printf("Recovery Start\n");
+ numRecovery++;
+ long long st;
+ long long fi;
+ unsigned int dupeSize = 0; // to calculate the size of backed up data
+
+ st = myrdtsc(); // to get clock
+ recoverStat[numRecovery-1].deadMachine = deadHost;
+#endif
+
// update leader's live host list and object locations
updateLiveHostsList(deadHost);
setReLocateObjHosts(deadHost);
// restart transactions
restartTransactions();
+#ifdef RECOVERYSTATS
+ fi = myrdtsc();
+ recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
+ printRecoveryStat();
+#endif
+
printf("%s -> Exiting\n",__func__);
}
int mIndex = findHost(mid);
int sd;
- printf("%s -> Enter with %s\n",__func__,midtoIPString(mid));
-
if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[mIndex])) < 0) {
liveHosts[mIndex] = 0;
numLiveHostsInSystem--;
- printf("%s -> 111End with %s\n",__func__,midtoIPString(mid));
return;
}
}
freeSockWithLock(transPrefetchSockPool,hostIpAddrs[mIndex],sd);
- printf("%s -> 222End with %s\n",__func__,midtoIPString(mid));
return;
}
void duplicateLostObjects(unsigned int mid){
#ifdef RECOVERYSTATS
- printf("Recovery Start\n");
- numRecovery++;
- long long st;
- long long fi;
- unsigned int dupeSize = 0; // to calculate the size of backed up data
-
- st = myrdtsc(); // to get clock
- recoverStat[numRecovery-1].deadMachine = mid;
-#endif
-
-#ifdef DEBUG
- printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));
+ unsigned int dupeSize = 0;
#endif
//this needs to be changed.
unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
unsigned int originalMid = getPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
-#ifndef DEBUG
- printf("%s-> backupMid: %d\t[%s]", __func__, backupMid,midtoIPString(backupMid));
- printf("originalMid: %d\t[%s]\n", originalMid,midtoIPString(originalMid));
+#ifdef DEBUG
printHostsStatus();
#endif
freeSockWithLock(transRequestSockPool, backupMid, bsd);
#ifdef RECOVERYSTATS
- fi = myrdtsc();
- recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
recoverStat[numRecovery-1].recoveredData = dupeSize;
- printRecoveryStat();
#endif
#ifndef DEBUG
return pileptr;
}
-#ifdef RECOVERY
-/* Paxo Algorithm:
- * Executes when the known leader has failed.
- * Guarantees consensus on next leader among all live hosts. */
-int paxos()
-{
- int origRound = paxosRound;
- origleader = leader;
- int ret = -1;
-#ifdef DEBUG
- printf(">> Debug : Starting paxos..\n");
-#endif
-
- do {
- ret = paxosPrepare(); // phase 1
- if (ret == 1) {
- ret = paxosAccept(); // phase 2
- if (ret == 1) {
- paxosLearn(); // phase 3
- break;
- }
- }
- // Paxos not successful; wait and retry if new leader is not yet slected
- sleep(WAIT_TIME);
- if(paxosRound != origRound)
- break;
- } while (ret == -1);
-
-#ifdef DEBUG
- printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader);
-#endif
-
- return ret;
-}
-
-int paxosPrepare()
-{
- char control;
- //int origleader = leader;
- int remote_n;
- int remote_v;
- int tmp_n = -1;
- int cnt = 0;
- int sd;
- int i;
- temp_v_a = v_a;
- my_n = n_h + 1;
-
-#ifdef DEBUG
- printf("[Prepare]...\n");
-#endif
-
- temp_v_a = myIpAddr; // if no other value is proposed, make this machine the new leader
-
- for (i = 0; i < numHostsInSystem; ++i) {
- control = PAXOS_PREPARE;
- if(!liveHosts[i])
- continue;
-
- if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
- printf("paxosPrepare(): socket create error\n");
- continue;
- }
- send_data(sd, &control, sizeof(char));
- send_data(sd, &my_n, sizeof(int));
- int timeout = recv_data(sd, &control, sizeof(char));
- if ((sd == -1) || (timeout < 0)) {
- continue;
- }
-
- switch (control) {
- case PAXOS_PREPARE_OK:
- cnt++;
- recv_data(sd, &remote_n, sizeof(int));
- recv_data(sd, &remote_v, sizeof(int));
- if(remote_v != origleader) {
- if (remote_n > tmp_n) {
- tmp_n = remote_n;
- temp_v_a = remote_v;
- }
- }
- break;
- case PAXOS_PREPARE_REJECT:
- break;
- }
-
- freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
- }
-
- if (cnt >= (numLiveHostsInSystem / 2)) { // majority of OK replies
- return 1;
- }
- else {
- return -1;
- }
-}
-
-int paxosAccept()
-{
- char control;
- int i;
- int cnt = 0;
- int sd;
- int remote_v = temp_v_a;
-
- for (i = 0; i < numHostsInSystem; ++i) {
- control = PAXOS_ACCEPT;
-
- if(!liveHosts[i])
- continue;
-
- if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
- printf("paxosAccept(): socket create error\n");
- continue;
- }
-
- send_data(sd, &control, sizeof(char));
- send_data(sd, &my_n, sizeof(int));
- send_data(sd, &remote_v, sizeof(int));
-
- int timeout = recv_data(sd, &control, sizeof(char));
- if (timeout < 0) {
- freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
- continue;
- }
-
- switch (control) {
- case PAXOS_ACCEPT_OK:
- cnt++;
- break;
- case PAXOS_ACCEPT_REJECT:
- break;
- }
-#ifdef DEBUG
- printf(">> Debug : Accept - n_h [%d], n_a [%d], v_a [%s]\n", n_h, n_a, midtoIPString(v_a));
-#endif
- freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
- }
-
- if (cnt >= (numLiveHostsInSystem / 2)) {
- return 1;
- }
- else {
- return -1;
- }
-}
-
-void paxosLearn()
-{
- char control;
- int sd;
- int i;
-
-#ifdef DEBUG
- printf("[Learn]...\n");
-#endif
-
- control = PAXOS_LEARN;
-
- for (i = 0; i < numHostsInSystem; ++i) {
- if(!liveHosts[i])
- continue;
- if(hostIpAddrs[i] == myIpAddr)
- {
- leader = v_a;
- paxosRound++;
- continue;
- }
- if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
- continue;
- }
-
- send_data(sd, &control, sizeof(char));
- send_data(sd, &v_a, sizeof(int));
-
- freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
-
- }
-}
-#endif
-
#ifdef RECOVERY
void clearDeadThreadsNotification()
{