+ if (dhtInsertMult(numKeys, keys, vals) == 0)
+ fillStatus = 2;
+ else
+ fillStatus = 3;
+
+ pthread_exit(NULL);
+}
+
+void *udpListen()
+{
+ ssize_t bytesRcvd;
+ struct sockaddr_in peerAddr;
+ unsigned int peerIp;
+ socklen_t socklen = sizeof(struct sockaddr_in);
+ unsigned char inBuffer[MAX_MSG_SIZE];
+ unsigned char outBuffer[MAX_MSG_SIZE];
+ int pollret;
+ struct timeval now;
+ struct in_addr tmpAddr;
+ struct hostData tmpHost;
+ unsigned int tmpKey;
+ unsigned int tmpVal;
+ struct hostData *hostDataPtr;
+ unsigned short *uShortPtr;
+ unsigned int tmpUInt;
+ unsigned int tmpUShort;
+ int i;
+ unsigned int oldState;
+
+ dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
+
+ while (1)
+ {
+ pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
+ pthread_mutex_lock(&stateMutex);
+ oldState = state;
+ if (pollret < 0)
+ {
+ perror("udpListen():poll()");
+ }
+ else if (pollret > 0)
+ {
+ bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
+ (struct sockaddr *)&peerAddr, &socklen);
+ if (bytesRcvd < 1)
+ {
+ dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
+ }
+ else if (inBuffer[0] >= NUM_MSG_TYPES)
+ {
+ dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
+ }
+ else if (!msgSizeOk(inBuffer, bytesRcvd))
+ {
+ dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
+ msg_types[inBuffer[0]], bytesRcvd);
+ }
+ else if (state == EXIT2_STATE)
+ {
+ //do nothing
+ }
+ else if (state == INIT1_STATE)
+ { //after initialization with seed, do not proceed until seed replies
+ dhtLog("udpListen(): received %s from %s, %d bytes\n",
+ msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
+ for (i = 0; i < bytesRcvd; i++)
+ dhtLog(" %x", inBuffer[i]);
+ dhtLog("\n");
+ peerIp = ntohl(peerAddr.sin_addr.s_addr);
+ if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES)
+ {
+ tmpHost.ipAddr = peerIp;
+ tmpHost.maxKeyCapacity = 0;
+ addHost(tmpHost);
+ writeHostList();
+ leader = read4(&inBuffer[1]);
+ tmpAddr.s_addr = htonl(leader);
+ dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
+ if (leader != 0)
+ {
+ setState(INIT2_STATE);
+ outBuffer[0] = JOIN_REQ;
+ write4(&outBuffer[1], myHostData.maxKeyCapacity);
+ udpSend(outBuffer, 5, leader);
+ }
+ else
+ {
+ electionOriginator = myHostData.ipAddr;
+ setState(ELECT1_STATE);
+ outBuffer[0] = ELECT_LEADER_CMD;
+ write4(&outBuffer[1], myHostData.ipAddr); //originator = me
+ udpSendAll(outBuffer, 5);
+ }
+ }
+ }
+ else
+ {
+ dhtLog("udpListen(): received %s from %s, %d bytes\n",
+ msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
+ for (i = 0; i < bytesRcvd; i++)
+ dhtLog(" %x", inBuffer[i]);
+ dhtLog("\n");
+ peerIp = ntohl(peerAddr.sin_addr.s_addr);
+ switch (inBuffer[0])
+ {
+ case INSERT_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+ || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
+ || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ tmpKey = read4(&inBuffer[1]);
+ tmpVal = read4(&inBuffer[5]);
+ outBuffer[0] = INSERT_RES;
+ if (getKeyOwner(tmpKey) == myHostData.ipAddr)
+ {
+ if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
+ outBuffer[1] = OPERATION_OK;
+ else
+ outBuffer[1] = INTERNAL_ERROR;
+ }
+ else
+ {
+ outBuffer[1] = NOT_KEY_OWNER;
+ }
+ //reply to client socket
+ sendto(udpPollSock.fd, outBuffer, 2, 0,
+ (struct sockaddr *)&peerAddr, socklen);
+ }
+ break;
+ case REMOVE_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+ || state == LEAD_NORMAL2_STATE)
+ {
+ tmpKey = read4(&inBuffer[1]);
+ outBuffer[0] = REMOVE_RES;
+ if (getKeyOwner(tmpKey) == myHostData.ipAddr)
+ {
+ if (chashRemove(myHashTable, tmpKey) == 0)
+ outBuffer[1] = OPERATION_OK;
+ else
+ outBuffer[1] = KEY_NOT_FOUND;
+ }
+ else
+ {
+ outBuffer[1] = NOT_KEY_OWNER;
+ }
+ //reply to client socket
+ sendto(udpPollSock.fd, outBuffer, 2, 0,
+ (struct sockaddr *)&peerAddr, socklen);
+ }
+ break;
+ case SEARCH_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+ || state == LEAD_NORMAL2_STATE)
+ {
+ tmpKey = read4(&inBuffer[1]);
+ outBuffer[0] = SEARCH_RES;
+ if (getKeyOwner(tmpKey) == myHostData.ipAddr)
+ {
+ if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0)
+ {
+ outBuffer[1] = OPERATION_OK;
+ write4(&outBuffer[2], tmpVal);
+ }
+ else
+ {
+ outBuffer[1] = KEY_NOT_FOUND;
+ write4(&outBuffer[2], 0);
+ }
+ }
+ else
+ {
+ outBuffer[1] = NOT_KEY_OWNER;
+ write4(&outBuffer[2], 0);
+ }
+ //reply to client socket
+ sendto(udpPollSock.fd, outBuffer, 6, 0,
+ (struct sockaddr *)&peerAddr, socklen);
+ }
+ break;
+ case WHO_IS_LEADER_CMD:
+ tmpHost.ipAddr = peerIp;
+ tmpHost.maxKeyCapacity = 0;
+ addHost(tmpHost);
+ writeHostList();
+ outBuffer[0] = WHO_IS_LEADER_RES;
+ //leader == 0 means I don't know who it is
+ write4(&outBuffer[1], leader);
+ udpSend(outBuffer, 5, peerIp);
+ break;
+ case JOIN_REQ:
+ if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
+ {
+ tmpHost.ipAddr = peerIp;
+ tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
+ addHost(tmpHost);
+ writeHostList();
+ if (state == LEAD_NORMAL1_STATE)
+ setState(LEAD_NORMAL2_STATE);
+ outBuffer[0] = JOIN_RES;
+ outBuffer[1] = 0; //status, success
+ udpSend(outBuffer, 2, peerIp);
+ }
+ else if (state == LEAD_REBUILD1_STATE)
+ {
+ //note: I don't need to addHost().
+ checkReplied(peerIp);
+ outBuffer[0] = JOIN_RES;
+ outBuffer[1] = 0; //status, success
+ udpSend(outBuffer, 2, peerIp);
+ if (allReplied())
+ {
+ makeAssignments();
+ setState(LEAD_REBUILD2_STATE);
+ outBuffer[0] = DHT_UPDATE_CMD;
+ write2(&outBuffer[1], numHosts);
+ write2(&outBuffer[3], numBlocks);
+ memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
+ memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
+ blockOwnerArray, numBlocks*2);
+ udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
+ + 2 * numBlocks);
+ }
+ }
+ break;
+ case JOIN_RES:
+ if (state == REBUILD1_STATE)
+ {
+ setState(REBUILD2_STATE);
+ }
+ else if (state == INIT2_STATE)
+ {
+ setState(NORMAL_STATE);
+ }
+ break;
+ case LEAVE_REQ:
+ if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
+ { //TODO: make this graceful, instead of just rebuilding
+ removeHost(peerIp);
+ if (state != LEAD_NORMAL2_STATE)
+ setState(LEAD_NORMAL2_STATE);
+ }
+ break;
+ case DHT_UPDATE_CMD:
+ if (state == REBUILD2_STATE && peerIp == leader)
+ {
+ free(hostArray);
+ free(blockOwnerArray);
+ numHosts = read2(&inBuffer[1]);
+ numBlocks = read2(&inBuffer[3]);
+ while (hostArraySize < numHosts)
+ hostArraySize *= 2;
+ hostArray = calloc(hostArraySize, sizeof(struct hostData));
+ blockOwnerArray = calloc(numBlocks, 2);
+ memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
+ memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
+ writeHostList();
+ setState(REBUILD3_STATE);
+ outBuffer[0] = DHT_UPDATE_RES;
+ udpSend(outBuffer, 1, peerIp);
+ }
+ break;
+ case DHT_UPDATE_RES:
+ if (state == LEAD_REBUILD2_STATE)
+ {
+ checkReplied(peerIp);
+ if (allReplied())
+ {
+ setState(LEAD_REBUILD3_STATE);
+ outBuffer[0] = FILL_DHT_CMD;
+ udpSendAll(outBuffer, 1);
+ if (fillStatus != 0)
+ dhtLog("udpListen(): ERROR: fillTask already running\n");
+ fillStatus = 1;
+ if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
+ dhtLog("udpListen(): ERROR creating threadFillTask\n");
+ }
+ }
+ break;
+ case ELECT_LEADER_CMD:
+ tmpUInt = read4(&inBuffer[1]);
+ if ((state == ELECT1_STATE || state == ELECT2_STATE)
+ && tmpUInt >= electionOriginator)
+ { //already participating in a higher-priority election
+ outBuffer[0] = ELECT_LEADER_RES;
+ outBuffer[1] = 0xFF;
+ udpSend(outBuffer, 2, peerIp);
+ }
+ else
+ { //join election
+ electionOriginator = tmpUInt;
+ electionParent = peerIp;
+ setState(ELECT1_STATE);
+ outBuffer[0] = ELECT_LEADER_CMD;
+ write4(&outBuffer[1], electionOriginator);
+ //don't bother forwarding the message to originator or parent
+ checkReplied(electionOriginator);
+ checkReplied(electionParent);
+ if (allReplied())
+ { //in case that is everybody I know of
+ setState(ELECT2_STATE);
+ outBuffer[0] = ELECT_LEADER_RES;
+ outBuffer[1] = 0;
+ write2(&outBuffer[2], numHosts);
+ memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
+ * numHosts);
+ udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+ electionParent);
+ }
+ else
+ {
+ udpSendAll(outBuffer, 5);
+ }
+ }
+ break;
+ case ELECT_LEADER_RES:
+ if (state == ELECT1_STATE)
+ {
+ checkReplied(peerIp);
+ if (inBuffer[1] != 0xFF)
+ {
+ tmpUShort = read2(&inBuffer[2]);
+ hostDataPtr = (struct hostData *)&inBuffer[4];
+ for (i = 0; i < tmpUShort; i++)
+ addHost(hostDataPtr[i]);
+ writeHostList();
+ }
+ if (allReplied())
+ {
+ setState(ELECT2_STATE);
+ if (electionOriginator == myHostData.ipAddr)
+ {
+ leader = hostArray[0].ipAddr;
+ if (leader == myHostData.ipAddr)
+ { //I am the leader
+ dhtLog("I am the leader!\n");
+ setState(LEAD_REBUILD1_STATE);
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ }
+ else
+ { //notify leader
+ outBuffer[0] = CONGRATS_CMD;
+ write2(&outBuffer[1], numHosts);
+ hostDataPtr = (struct hostData *)&outBuffer[3];
+ for (i = 0; i < numHosts; i++)
+ hostDataPtr[i] = hostArray[i];
+ udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
+ leader);
+ }
+ }
+ else
+ {
+ outBuffer[0] = ELECT_LEADER_RES;
+ outBuffer[1] = 0;
+ write2(&outBuffer[2], numHosts);
+ hostDataPtr = (struct hostData *)&outBuffer[4];
+ for (i = 0; i < numHosts; i++)
+ hostDataPtr[i] = hostArray[i];
+ udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+ electionParent);
+ }
+ }
+ }
+ break;
+ case CONGRATS_CMD:
+ if (state == ELECT2_STATE)
+ { //I am the leader
+ leader = myHostData.ipAddr;
+ dhtLog("I am the leader!\n");
+ tmpUShort = read2(&inBuffer[1]);
+ hostDataPtr = (struct hostData *)&inBuffer[3];
+ for (i = 0; i < tmpUShort; i++)
+ addHost(hostDataPtr[i]);
+ writeHostList();
+ setState(LEAD_REBUILD1_STATE);
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ }
+ break;
+ case REBUILD_REQ:
+ if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
+ {
+ setState(LEAD_REBUILD1_STATE);
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ }
+ break;
+ case REBUILD_CMD:
+ leader = peerIp; //consider this a declaration of authority
+ setState(REBUILD1_STATE);
+ outBuffer[0] = JOIN_REQ;
+ write4(&outBuffer[1], myHostData.maxKeyCapacity);
+ udpSend(outBuffer, 5, leader);
+ break;
+ case FILL_DHT_CMD:
+ if (state == REBUILD3_STATE && peerIp == leader)
+ {
+ setState(REBUILD4_STATE);
+ if (fillStatus != 0)
+ dhtLog("udpListen(): ERROR: fillTask already running\n");
+ fillStatus = 1;
+ if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
+ dhtLog("udpListen(): ERROR creating threadFillTask\n");
+ }
+ break;
+ case FILL_DHT_RES:
+ if (state == LEAD_REBUILD3_STATE)
+ {
+ checkReplied(peerIp);
+ if (allReplied() && fillStatus == 2)
+ {
+ fillStatus = 0;
+ setState(LEAD_REBUILD4_STATE);
+ outBuffer[0] = RESUME_NORMAL_CMD;
+ udpSendAll(outBuffer, 1);
+ }
+ }
+ break;
+ case RESUME_NORMAL_CMD:
+ if (state == REBUILD5_STATE && peerIp == leader)
+ {
+ setState(NORMAL_STATE);
+ outBuffer[0] = RESUME_NORMAL_RES;
+ udpSend(outBuffer, 1, leader);
+ }
+ break;
+ case RESUME_NORMAL_RES:
+ if (state == LEAD_REBUILD4_STATE)
+ {
+ checkReplied(peerIp);
+ if (allReplied())
+ {
+ setState(LEAD_NORMAL1_STATE);
+ }
+ }
+ break;
+ }
+ }
+ }
+ if (state == REBUILD4_STATE)
+ {
+ switch (fillStatus)
+ {
+ case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
+ break;
+ case 1: //do nothing
+ break;
+ case 2: //done filling the dht, notify leader
+ fillStatus = 0;
+ setState(REBUILD5_STATE);
+ outBuffer[0] = FILL_DHT_RES;
+ udpSend(outBuffer, 1, leader);
+ break;
+ case 3: //error encountered -> restart rebuild
+ fillStatus = 0;
+ setState(REBUILD0_STATE);
+ outBuffer[0] = REBUILD_REQ;
+ udpSend(outBuffer, 1, leader);
+ break;
+ }
+ }
+ if (state == LEAD_REBUILD3_STATE)
+ {
+ switch (fillStatus)
+ {
+ case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
+ break;
+ case 1: //do nothing
+ break;
+ case 2: //I'm done, now is everybody else also done?
+ if (allReplied())
+ {
+ fillStatus = 0;
+ setState(LEAD_REBUILD4_STATE);
+ outBuffer[0] = RESUME_NORMAL_CMD;
+ udpSendAll(outBuffer, 1);
+ }
+ break;
+ case 3: //error encountered -> restart rebuild
+ fillStatus = 0;
+ setState(LEAD_REBUILD1_STATE);
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ break;
+ }
+ }
+ if (timerSet)
+ {
+ gettimeofday(&now, NULL);
+ if (timercmp(&now, &timer, >))
+ {
+ if (timeoutCntr < retry_vals[state])
+ {
+ timeoutCntr++;
+ timeradd(&now, &timeout_vals[state], &timer);
+ dhtLog("udpListen(): retry: %d\n", timeoutCntr);
+ switch (state)
+ {
+ case INIT1_STATE:
+ outBuffer[0] = WHO_IS_LEADER_CMD;
+ udpSend(outBuffer, 1, seed);
+ break;
+ case INIT2_STATE:
+ outBuffer[0] = JOIN_REQ;
+ write4(&outBuffer[1], myHostData.maxKeyCapacity);
+ udpSend(outBuffer, 5, leader);
+ break;
+ case ELECT1_STATE:
+ outBuffer[0] = ELECT_LEADER_CMD;
+ write4(&outBuffer[1], electionOriginator);
+ udpSendAll(outBuffer, 5);
+ break;
+ case ELECT2_STATE:
+ if (electionOriginator == myHostData.ipAddr)
+ { //retry notify leader
+ outBuffer[0] = CONGRATS_CMD;
+ write2(&outBuffer[1], numHosts);
+ memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
+ * numHosts);
+ udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
+ leader);
+ }
+ else
+ {
+ outBuffer[0] = ELECT_LEADER_RES;
+ outBuffer[1] = 0;
+ write2(&outBuffer[2], numHosts);
+ memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
+ * numHosts);
+ udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+ electionParent);
+ }
+ break;
+ case REBUILD0_STATE:
+ outBuffer[0] = REBUILD_REQ;
+ udpSend(outBuffer, 1, leader);
+ break;
+ case REBUILD1_STATE:
+ outBuffer[0] = JOIN_REQ;
+ write4(&outBuffer[1], myHostData.maxKeyCapacity);
+ udpSend(outBuffer, 5, leader);
+ break;
+ case REBUILD5_STATE:
+ outBuffer[0] = FILL_DHT_RES;
+ udpSend(outBuffer, 1, leader);
+ break;
+ case LEAD_REBUILD1_STATE:
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ break;
+ case LEAD_REBUILD2_STATE:
+ outBuffer[0] = DHT_UPDATE_CMD;
+ write2(&outBuffer[1], numHosts);
+ write2(&outBuffer[3], numBlocks);
+ memcpy(&outBuffer[5], hostArray, numHosts
+ * sizeof(struct hostData));
+ memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
+ blockOwnerArray, numBlocks*2);
+ udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
+ + 2 * numBlocks);
+ break;
+ case LEAD_REBUILD3_STATE:
+ outBuffer[0] = FILL_DHT_CMD;
+ udpSendAll(outBuffer, 1);
+ break;
+ case LEAD_REBUILD4_STATE:
+ outBuffer[0] = RESUME_NORMAL_CMD;
+ udpSendAll(outBuffer, 1);
+ break;
+ case EXIT1_STATE: //TODO...
+ break;
+ case NORMAL_STATE:
+ case LEAD_NORMAL1_STATE:
+ case LEAD_NORMAL2_STATE:
+ case REBUILD2_STATE:
+ case REBUILD3_STATE:
+ case REBUILD4_STATE:
+ case EXIT2_STATE: //we shouldn't get here
+ break;
+ }
+ }
+ else
+ {
+ dhtLog("udpListen(): timed out in state %s after %d retries\n",
+ state_names[state], timeoutCntr);
+ switch (state)
+ {
+ case INIT1_STATE:
+ setState(EXIT2_STATE);
+ break;
+ case LEAD_NORMAL2_STATE:
+ setState(LEAD_REBUILD1_STATE);
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ break;
+ case ELECT1_STATE:
+ dhtLog("removing unresponsive hosts, before:\n");
+ writeHostList();
+ removeUnresponsiveHosts();
+ dhtLog("after\n");
+ writeHostList();
+ setState(ELECT2_STATE);
+ if (electionOriginator == myHostData.ipAddr)
+ {
+ leader = hostArray[0].ipAddr;
+ if (leader == myHostData.ipAddr)
+ { //I am the leader
+ dhtLog("I am the leader!\n");
+ setState(LEAD_REBUILD1_STATE);
+ outBuffer[0] = REBUILD_CMD;
+ udpSendAll(outBuffer, 1);
+ }
+ else
+ { //notify leader
+ outBuffer[0] = CONGRATS_CMD;
+ write2(&outBuffer[1], numHosts);
+ memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
+ * numHosts);
+ udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
+ leader);
+ }
+ }
+ else
+ {
+ outBuffer[0] = ELECT_LEADER_RES;
+ outBuffer[1] = 0;
+ write2(&outBuffer[2], numHosts);
+ memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
+ * numHosts);
+ udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+ electionParent);
+ }
+ break;
+ case INIT2_STATE:
+ case ELECT2_STATE:
+ case REBUILD0_STATE:
+ case REBUILD1_STATE:
+ case REBUILD2_STATE:
+ case REBUILD3_STATE:
+ case REBUILD4_STATE:
+ case REBUILD5_STATE:
+ case LEAD_REBUILD1_STATE:
+ case LEAD_REBUILD2_STATE:
+ case LEAD_REBUILD3_STATE:
+ case LEAD_REBUILD4_STATE:
+ //start election
+ electionOriginator = myHostData.ipAddr;
+ setState(ELECT1_STATE);
+ outBuffer[0] = ELECT_LEADER_CMD;
+ write4(&outBuffer[1], myHostData.ipAddr); //originator = me
+ udpSendAll(outBuffer, 5);
+ break;
+ case EXIT1_STATE:
+ setState(EXIT2_STATE);
+ break;
+ case NORMAL_STATE:
+ case LEAD_NORMAL1_STATE:
+ case EXIT2_STATE: //we shouldn't get here
+ break;
+ }
+ }
+ }
+ }
+ if (state != oldState)
+ pthread_cond_broadcast(&stateCond);
+ pthread_mutex_unlock(&stateMutex);
+ }
+}