int okCommit; // machine flag
extern numWaitMachine;
extern unsigned int currentEpoch;
+extern unsigned int currentBackupMachine;
unsigned int leader_index;
#endif
printf("control -> UPDATE_LIVE_HOSTS\n");
#endif
receiveNewHostLists((int)acceptfd);
-
+ pthread_mutex_lock(&recovery_mutex);
+ currentBackupMachine = getBackupMachine(myIpAddr);
+ pthread_mutex_unlock(&recovery_mutex);
#ifdef DEBUG
printHostsStatus();
printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);
{
struct sockaddr_in remoteAddr;
int sd;
+ unsigned int dupeSize;
unsigned int epoch_num;
recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
//object store stuffffff
mid = getBackupMachine(myIpAddr);
- dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
-
- //send control and dupes after
- ctrl = RECEIVE_DUPES;
-
- if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("ORIGINAL : ");
- exit(0);
- }
+ if(mid != currentBackupMachine) {
+ currentBackupMachine = mid;
+ dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
+ //send control and dupes after
+ ctrl = RECEIVE_DUPES;
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
- printf("REQUEST_DUPE ERROR : %s\n",strerror(errno));
-// exit(0);
- break;
- }
- else {
- send_data(sd, &ctrl, sizeof(char));
- send_data(sd, dupeptr, tempsize);
+ if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("ORIGINAL : ");
+ exit(0);
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
- if((readDuplicateObjs(sd) )!= 0) {
- printf("Fail in readDuplicateObj()\n");
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
+ printf("REQUEST_DUPE ERROR : %s\n",strerror(errno));
break;
-// exit(0);
}
- recv_data(sd, &response, sizeof(char));
+ else {
+ send_data(sd, &ctrl, sizeof(char));
+ send_data(sd, dupeptr, tempsize);
+
+ dupeSize = tempsize;
+
+ if((dupeSize += readDuplicateObjs(sd) ) < 0) {
+ break;
+ }
+ recv_data(sd, &response, sizeof(char));
- if(response != DUPLICATION_COMPLETE) {
+ if(response != DUPLICATION_COMPLETE) {
#ifndef DEBUG
- printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__);
+ printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__);
#endif
//fail message
- break;
+ break;
// exit(0);
- }
+ }
- close(sd);
- }
- free(dupeptr);
+ close(sd);
+ }
+ free(dupeptr);
+
+ ctrl = DUPLICATION_COMPLETE;
+ send_data((int)acceptfd, &ctrl, sizeof(char));
+ send_data((int)acceptfd, &dupeSize,sizeof(unsigned int));
- ctrl = DUPLICATION_COMPLETE;
- send_data((int)acceptfd, &ctrl, sizeof(char));
#ifdef DEBUG
- printf("%s (REQUEST_DUPE)-> Finished\n", __func__);
+ printf("%s (REQUEST_DUPE)-> Finished\n", __func__);
#endif
+ }
+ else {
+ ctrl = DUPLICATION_COMPLETE;
+ send_data((int)acceptfd,&ctrl,sizeof(char));
+ tempsize = 0;
+ send_data((int)acceptfd,&tempsize,sizeof(unsigned int));
+ }
}
- break;
+ break;
+
case RECEIVE_DUPES:
#ifdef DEBUG
printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
#endif
- if((readDuplicateObjs((int)acceptfd)) != 0) {
+ if((readDuplicateObjs((int)acceptfd)) < 0) {
printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
+// pthread_exit(NULL);
}
-
- dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
-
- send_data((int)acceptfd,dupeptr,tempsize);
+ else {
+ dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
+ send_data((int)acceptfd,dupeptr,tempsize);
- free(dupeptr);
- ctrl = DUPLICATION_COMPLETE;
- send_data((int)acceptfd, &ctrl, sizeof(char));
+ free(dupeptr);
+ ctrl = DUPLICATION_COMPLETE;
+ send_data((int)acceptfd, &ctrl, sizeof(char));
#ifdef DEBUG
- printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
+ printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
#endif
+ }
break;
#endif
default:
unsigned int oid;
void *dupeptr, *ptrcreate, *ptr;
objheader_t *header;
+ int timeout1;
+ int timeout2;
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> Start\n", __func__);
#endif
- recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
- recv_data((int)acceptfd, &size, sizeof(int));
+ timeout1 = recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+ timeout2 = recv_data((int)acceptfd, &size, sizeof(int));
+
+ if(timeout1 < 0 || timeout2 < 0) {
+ return -1;
+ }
if(numoid != 0) {
if ((dupeptr = calloc(1, size)) == NULL) {
printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
- return 1;
+ return -1;
}
- recv_data((int)acceptfd, dupeptr, size);
+
+ if(recv_data((int)acceptfd, dupeptr, size) < 0) {
+ free(dupeptr);
+ return -1;
+ }
+
+ printf("%s -> PAss this point\n",__func__);
ptr = dupeptr;
+ printf("%s -> numoid = %u\n",__func__,numoid);
for(i = 0; i < numoid; i++) {
header = (objheader_t *)ptr;
oid = OID(header);
if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&mainobjstore_mutex);
- return 1;
+ return -1;
}
pthread_mutex_unlock(&mainobjstore_mutex);
memcpy(ptrcreate, header, tmpsize);
}
ptr += tmpsize;
}
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End\n", __func__);
#endif
free(dupeptr);
- return 0;
+ return size;
}
else {
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> No objects duplicated\n", __func__);
#endif
return 0;
extern pthread_mutex_t clearNotifyList_mutex;
unsigned int currentEpoch;
+unsigned int currentBackupMachine;
#ifdef RECOVERYSTATS
int numRecovery = 0;
unsigned int dupeSize = 0; // to calculate the size of backed up data
st = myrdtsc(); // to get clock
- recoverStat[numRecovery].deadMachine = deadHost;
#endif
// update leader's live host list and object locations
#ifdef RECOVERYSTATS
fi = myrdtsc();
recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ;
+ recoverStat[numRecovery].recoveredData = flag;
numRecovery++;
printRecoveryStat();
#endif
freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
}
+
+ pthread_mutex_lock(&recovery_mutex);
+ currentBackupMachine = getBackupMachine(myIpAddr);
+ pthread_mutex_unlock(&recovery_mutex);
+
#ifdef DEBUG
printHostsStatus();
printf("%s -> Finish\n",__func__);
int duplicateLostObjects(unsigned int epoch_num,int *sdlist){
int i;
char response;
+ unsigned int dupeSize = 0;
+ unsigned int tempSize;
printf("%s -> Enter\n",__func__);
/* duplicateLostObject example
for(i = 0 ; i < numHostsInSystem; i ++) {
if(sdlist[i] == -1)
continue;
- if(recv_data(sdlist[i],&response,sizeof(char)))
- return -2;
- if(response != DUPLICATION_COMPLETE) {
- return -2;
- }
+ if(recv_data(sdlist[i],&response,sizeof(char))) return -2;
+
+ if(response != DUPLICATION_COMPLETE) return -2;
+
+ if(recv_data(sdlist[i],&tempSize,sizeof(unsigned int)) < 0) return -2;
+
+ dupeSize += tempSize;
+
}
#ifndef DEBUG
printf("%s-> End\n", __func__);
#endif
- return 0;
+ return dupeSize;
}
#endif
void addHost(unsigned int hostIp) {
printf("numRecovery = %d\n",numRecovery);
int i;
for(i=0; i < numRecovery;i++) {
- printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
+// printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
+ printf("Recovery Byte = %u\n",recoverStat[i].recoveredData);
}
printf("**************************\n\n");
fflush(stdout);