extern int *liveHosts;
extern int numLiveHostsInSystem;
int clearNotifyListFlag;
+pthread_mutex_t clearNotifyList_mutex;
#endif
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
pthread_mutex_t lockObjHeader;
-pthread_mutex_t clearNotifyList_mutex;
pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
sockPoolHashTable_t *transPResponseSocketPool;
-extern sockPoolHashTable_t *transRequestSockPool;
-extern sockPoolHashTable_t *transReadSockPool;
-
-int failFlag = 0; //debug
#ifdef RECOVERY
/******************************
pthread_t thread_dstm_accept;
#ifdef RECOVERY
- int firsttime = 1;
+ int firsttime = 1; // these two are for periodic checking
pthread_t thread_dstm_asking;
#endif
-#ifdef DEBUG
+
printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
-#endif
while(1) {
int retval;
int flag=1;
acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+ setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
#ifdef RECOVERY
if(firsttime) {
pthread_detach(thread_dstm_asking);
}
#endif
-#ifdef debug
- printf("%s -> fd accepted\n",__func__);
-#endif
- setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
do {
retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
} while(retval!=0);
char control,ctrl, response;
char *ptr;
void *srcObj;
+
+#ifdef RECOVERY
void *dupeptr;
+ unsigned int transIDreceived;
+ char decision;
+ int timeout;
+#endif
+
int i, tempsize;
objheader_t *h;
trans_commit_data_t transinfo;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
int n, v;
- unsigned int transIDreceived;
- char decision;
- struct sockaddr_in remoteAddr;
#ifdef DEBUG
printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
if (ret==0)
break;
if (ret==-1) {
-#ifdef DEBUG
printf("DEBUG -> RECV Error!.. retrying\n");
-#endif
// exit(0);
break;
}
printf("control -> READ_REQUEST\n");
#endif
/* Read oid requested and search if available */
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ timeout = recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+
+ if(timeout < 0)
+ break;
while((srcObj = mhashSearch(oid)) == NULL) {
int ret;
// printf("HERE!!\n");
#ifdef RECOVERY
case ASK_COMMIT :
- recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int));
+ if(recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int)) < 0)
+ break;
decision = checkDecision(transIDreceived);
#ifdef RECOVERY
case DUPLICATE_ORIGINAL:
+
+ {
+ struct sockaddr_in remoteAddr;
+ int sd;
+
#ifdef DEBUG
- printf("control -> DUPLICATE_ORIGINAL\n");
- printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);
+ printf("control -> DUPLICATE_ORIGINAL\n");
+ printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);
#endif
- //object store stuffffff
- recv_data((int)acceptfd, &mid, sizeof(unsigned int));
- tempsize = mhashGetDuplicate(&dupeptr, 0);
+ //object store stuffffff
+ recv_data((int)acceptfd, &mid, sizeof(unsigned int));
+ tempsize = mhashGetDuplicate(&dupeptr, 0);
- //send control and dupes after
- ctrl = RECEIVE_DUPES;
+ //send control and dupes after
+ ctrl = RECEIVE_DUPES;
- if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("ORIGINAL : ");
- exit(0);
- }
+ if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("ORIGINAL : ");
+ exit(0);
+ }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
- if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
- printf("ORIGINAL ERROR : %s\n",strerror(errno));
- exit(0);
- }
- else {
- send_data(sd, &ctrl, sizeof(char));
- send_data(sd, dupeptr, tempsize);
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
+ printf("ORIGINAL ERROR : %s\n",strerror(errno));
+ exit(0);
+ }
+ else {
+ send_data(sd, &ctrl, sizeof(char));
+ send_data(sd, dupeptr, tempsize);
- recv_data(sd, &response, sizeof(char));
+ recv_data(sd, &response, sizeof(char));
#ifdef DEBUG
- printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
+ printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
#endif
- if(response != DUPLICATION_COMPLETE) {
-#ifdef DEBUG
- printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
+ if(response != DUPLICATION_COMPLETE) {
+#ifndef DEBUG
+ printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
#endif
- //fail message
- exit(0);
- }
+ //fail message
+ exit(0);
+ }
- close(sd);
- }
- free(dupeptr);
+ close(sd);
+ }
+ free(dupeptr);
- ctrl = DUPLICATION_COMPLETE;
- send_data((int)acceptfd, &ctrl, sizeof(char));
-#ifndef DEBUG
- printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);
+ ctrl = DUPLICATION_COMPLETE;
+ send_data((int)acceptfd, &ctrl, sizeof(char));
+#ifdef DEBUG
+ printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);
#endif
- break;
+ }
+ break;
case DUPLICATE_BACKUP:
-#ifndef DEBUG
- printf("control -> DUPLICATE_BACKUP\n");
- printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
+ {
+ struct sockaddr_in remoteAddr;
+ int sd;
+#ifdef DEBUG
+ printf("control -> DUPLICATE_BACKUP\n");
+ printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
#endif
- //object store stuffffff
- recv_data((int)acceptfd, &mid, sizeof(unsigned int));
+ //object store stuffffff
+ recv_data((int)acceptfd, &mid, sizeof(unsigned int));
- tempsize = mhashGetDuplicate(&dupeptr, 1);
+ tempsize = mhashGetDuplicate(&dupeptr, 1);
- //send control and dupes after
- ctrl = RECEIVE_DUPES;
+ //send control and dupes after
+ ctrl = RECEIVE_DUPES;
- if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("BACKUP : ");
- exit(0);
- }
+ if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("BACKUP : ");
+ exit(0);
+ }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
- if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
- printf("BACKUP ERROR : %s\n",strerror(errno));
- exit(0);
- }
- else {
- send_data(sd, &ctrl, sizeof(char));
- send_data(sd, dupeptr, tempsize);
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
+ printf("BACKUP ERROR : %s\n",strerror(errno));
+ exit(0);
+ }
+ else {
+ send_data(sd, &ctrl, sizeof(char));
+ send_data(sd, dupeptr, tempsize);
- recv_data(sd, &response, sizeof(char));
+ recv_data(sd, &response, sizeof(char));
#ifdef DEBUG
- printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
+ printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
#endif
- if(response != DUPLICATION_COMPLETE) {
+ if(response != DUPLICATION_COMPLETE) {
#ifndef DEBUG
- printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
+ printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
#endif
- exit(0);
- }
+ exit(0);
+ }
- close(sd);
- }
+ close(sd);
+ }
- free(dupeptr);
+ free(dupeptr);
- ctrl = DUPLICATION_COMPLETE;
- send_data((int)acceptfd, &ctrl, sizeof(char));
-#ifndef DEBUG
- printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__);
+ ctrl = DUPLICATION_COMPLETE;
+ send_data((int)acceptfd, &ctrl, sizeof(char));
+#ifdef DEBUG
+ printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__);
#endif
-
+ }
break;
case RECEIVE_DUPES:
-#ifndef DEBUG
+#ifdef DEBUG
printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
#endif
if((readDuplicateObjs((int)acceptfd)) != 0) {
ctrl = DUPLICATION_COMPLETE;
send_data((int)acceptfd, &ctrl, sizeof(char));
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
#endif
break;
#endif
recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
recv_data((int)acceptfd, &size, sizeof(int));
- // do i need array of oids?
- // answer: no! now get to work
- if(numoid != 0) {
+
+ if(numoid != 0) {
if ((dupeptr = calloc(1, size)) == NULL) {
printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
return 1;
}
recv_data((int)acceptfd, dupeptr, size);
+
ptr = dupeptr;
for(i = 0; i < numoid; i++) {
header = (objheader_t *)ptr;
timeout = recv_data((int)acceptfd, objread, size);
}
+ if(timeout < 0)
+ return 0;
+
/* Read modified objects */
if(fixed.nummod != 0) {
if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
}
if(timeout < 0) // coordinator failed
+ {
+ if(modptr != NULL)
+ free(modptr);
return 0;
+ }
/* Create an array of oids for modified objects */
oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
printf("%s -> received Decision %d\n",__func__,control);
#endif
}
-
/* insert received control into thash for another transaction*/
thashInsert(transID, control);
#endif
+
switch(control) {
case TRANS_ABORT:
if (fixed->nummod > 0)
printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
#endif
+ if(control < 0)
+ printf("control = %d\n",control);
+
send_data(acceptfd, &control, sizeof(char));
#ifdef CACHE
send_data(acceptfd, &numBytes, sizeof(int));
/* Send control message */
send_data(acceptfd, &control, sizeof(char));
+
/* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
- if(*(objnotfound) != 0) {
+ /*if(*(objnotfound) != 0) {
int msg[1];
msg[0] = *(objnotfound);
send_data(acceptfd, &msg, sizeof(int));
int size = sizeof(unsigned int)* *(objnotfound);
send_data(acceptfd, oidnotfound, size);
- }
+ }*/
}
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
#endif
if(errno == EAGAIN) {
if(trycounter < 5) {
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s -> TRYcounter increases\n",__func__);
#endif
trycounter++;
setLocateObjHosts();
updateLiveHostsCommit();
paxos();
- printHostsStatus();
+// printHostsStatus();
if(!allHostsLive()) {
printf("Not all hosts live. Exiting.\n");
exit(-1);
#ifdef RECOVERY
transRetryFlag = 0;
- unsigned int machinenumber;
static int flipBit = 0; // Used to distribute requests between primary and backup evenly
// either primary or backup machine
machinenumber = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid));
#endif
free(modptr);
} else { //handle request locally
- handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
+ handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
}
sockindex++;
pile = pile->next;
char control;
int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
timeout = recv_data(sd, &control, sizeof(char));
+
+// printf("i = %d control = %d\n",i,control);
+
+
//Update common data structure with new ctrl msg
getReplyCtrl[i] = control;
/* Recv Objects if participant sends TRANS_DISAGREE */
#ifdef CACHE
if(control == TRANS_DISAGREE) {
int length;
- recv_data(sd, &length, sizeof(int));
+ timeout = recv_data(sd, &length, sizeof(int));
void *newAddr;
pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
return 1;
}
pthread_mutex_unlock(&prefetchcache_mutex);
- recv_data(sd, newAddr, length);
+ timeout = recv_data(sd, newAddr, length);
int offset = 0;
while(length != 0) {
unsigned int oidToPrefetch;
#ifdef DEBUG
printf("%s-> Decide final response now\n", __func__);
#endif
+
+
/* Decide the final response */
if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
exit(-1);
}
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
#endif
return 0;
control = getReplyCtrl[i];
switch(control) {
default:
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> Participant sent unknown message, i:%d, Control: %d\n", __func__, i, (int)control);
#endif
objheader_t *h;
void *objcopy = NULL;
- int sd = getSock2(transRequestSockPool, mnum);
+ int sd = getSock2(transReadSockPool, mnum);
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
char response;
for(i = 0; i < nummid; i++) {
- if((sd = getSock(transPrefetchSockPool, listmid[i])) < 0) {
+ if((sd = getSockWithLock(transPrefetchSockPool, listmid[i])) < 0) {
printf("%s -> socket Error!!\n");
}
else {
break; // received response
// else check next machine
- freeSock(transPrefetchSockPool, listmid[i],sd);
+ freeSockWithLock(transPrefetchSockPool, listmid[i],sd);
}
}
#ifdef DEBUG
else { // if i am the leader
updateLiveHosts();
duplicateLostObjects(deadHost);
-
+ printf("%s -> got to this point\n",__func__);
+
if(updateLiveHostsCommit() != 0) {
printf("%s -> error updateLiveHostsCommit()\n",__func__);
exit(1);
else {
pthread_mutex_unlock(&leaderFixing_mutex);
#ifdef DEBUG
- printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
+ printf("%s -> LEADER is already fixing\n",__func__);
#endif
sleep(WAIT_TIME);
}
send_data(sd, &ctrl, sizeof(char));
send_data(sd, &deadHost, sizeof(unsigned int));
freeSockWithLock(transPrefetchSockPool,leader,sd);
+ printf("%s -> Message sent\n",__func__);
sleep(WAIT_TIME);
}
//for each machine send data
for(i = 0; i < numHostsInSystem; i++) { // hard define num of retries
- if(i == myIndexInHostArray)
+ if(hostIpAddrs[i] == myIpAddr)
continue;
if(liveHosts[i] == 1) {
if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
#ifdef RECOVERY
void duplicateLostObjects(unsigned int mid){
-
+#ifndef DEBUG
printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));
+#endif
//this needs to be changed.
unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
int tempsize, sd;
char *dupeptr, ctrl, response;
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s-> Start\n", __func__);
#endif
//copy code fom dstmserver here
if(response != DUPLICATION_COMPLETE) {
//fail message
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s -> DUPLICATION_FAIL\n",__func__);
#endif
exit(0);
free(dupeptr);
-#ifndef DEBUG
+#ifdef DEBUG
printf("%s-> End\n", __func__);
#endif
return pile;
}
+// relocate the position of myIp pile to end of list
plistnode_t *sortPiles(plistnode_t *pileptr) {
- plistnode_t *head, *ptr, *tail;
- head = pileptr;
- ptr = pileptr;
- /* Get tail pointer */
- while(ptr!= NULL) {
- tail = ptr;
- ptr = ptr->next;
+ plistnode_t *ptr, *tail;
+ tail = pileptr;
+ ptr = NULL;
+ /* Get tail pointer and myIp pile ptr */
+ while(tail->next != NULL) {
+ if(tail->mid == myIpAddr)
+ ptr = tail;
+ tail = tail->next;
}
- ptr = pileptr;
- plistnode_t *prev = pileptr;
- /* Arrange local machine processing at the end of the pile list */
- while(ptr != NULL) {
- if(ptr != tail) {
- /*
- if(ptr->mid == myIpAddr && (prev != pileptr)) {
- prev->next = ptr->next;
- ptr->next = NULL;
- tail->next = ptr;
- return pileptr;
- }
- if((ptr->mid == myIpAddr) && (prev == pileptr)) {
- prev->next = ptr->next;
- ptr->next = NULL;
- tail->next = ptr;
- return pileptr;
- }
- */
-
- if((ptr->mid == myIpAddr))
- {
- tail->next = pileptr;
- pileptr = ptr->next;
- ptr->next = NULL;
- return pileptr;
- }
- prev = ptr;
- }
- ptr = ptr->next;
- }
- return pileptr;
+ // if ptr is null, then myIp pile is already at tail
+ if(ptr != NULL) {
+ /* Arrange local machine processing at the end of the pile list */
+ tail->next = pileptr;
+ pileptr = ptr->next;
+ ptr->next = NULL;
+ return pileptr;
+ }
+
+ /* get too this point iff myIpAddr pile is at tail */
+ return pileptr;
}
#ifdef RECOVERY