+ case INSERT_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
+ || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ insertCmdPtr = (struct insertCmd *)buffer;
+ dhtLog( "udpListen(): Insert: key=%d, val=%d\n",
+ insertCmdPtr->key, insertCmdPtr->val);
+ insertResPtr = (struct insertRes *)replyBuffer;
+ insertResPtr->msgType = INSERT_RES;
+ insertResPtr->unused = 0;
+ if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if(chashInsert(myHashTable, insertCmdPtr->key,
+ (void *)insertCmdPtr->val) == 0)
+ insertResPtr->status = INSERT_OK;
+ else
+ insertResPtr->status = INSERT_ERROR;
+ }
+ else
+ {
+ insertResPtr->status = NOT_KEY_OWNER;;
+ }
+ if (sendto(udpServerPollSock.fd, (void *)insertResPtr,
+ sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
+ socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case REMOVE_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
+ {
+ removeCmdPtr = (struct removeCmd *)buffer;
+ dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key);
+ removeResPtr = (struct removeRes *)replyBuffer;
+ removeResPtr->msgType = REMOVE_RES;
+ removeResPtr->unused = 0;
+ if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
+ removeResPtr->status = INSERT_OK;
+ else
+ removeResPtr->status = INSERT_ERROR;
+ }
+ else
+ {
+ removeResPtr->status = NOT_KEY_OWNER;
+ }
+ if (sendto(udpServerPollSock.fd, (void *)removeResPtr,
+ sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr,
+ socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case SEARCH_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
+ {
+ searchCmdPtr = (struct searchCmd *)buffer;
+ dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key);
+ searchResPtr = (struct searchRes *)replyBuffer;
+ searchResPtr->msgType = SEARCH_RES;
+ searchResPtr->unused = 0;
+ if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
+ searchCmdPtr->key)) == 0)
+ searchResPtr->status = KEY_NOT_FOUND;
+ else
+ searchResPtr->status = KEY_FOUND;
+ }
+ else
+ {
+ searchResPtr->status = NOT_KEY_OWNER;
+ }
+ if (sendto(udpServerPollSock.fd, (void *)searchResPtr,
+ sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr,
+ socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case FIND_LEADER_REQ:
+ if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
+ || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ replyBuffer[0] = FIND_LEADER_RES;
+ if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case REBUILD_REQ:
+ if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
+ || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ replyBuffer[0] = REBUILD_RES;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ if (gettimeofday(&rebuild1Timeout, NULL) < 0)
+ { perror("dhtLog():gettimeofday()"); }
+ //TODO: make this a configurable parameter
+ rebuild1Timeout.tv_sec += 3;
+ rebuild1TimerSet = 1;
+ //clear out previous host data
+ numHosts = 1;
+ hostArray[0] = myHostData;
+
+ state = LEAD_REBUILD1_STATE;
+
+ replyBuffer[0] = REBUILD_CMD;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+
+ }
+ else
+ {
+ replyBuffer[0] = NOT_LEADER;
+ if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ case REBUILD_CMD:
+ if (state != LEAD_REBUILD1_STATE)
+ {
+ //consider this an official declaration of authority,
+ // in case I was confused about this
+ leader = htonl(clientAddr.sin_addr.s_addr);
+
+ clearDHTdata();
+
+ joinReqPtr = (struct joinReq *)replyBuffer;
+ joinReqPtr->msgType = JOIN_REQ;
+ joinReqPtr->unused = 0;
+ joinReqPtr->newHostData = myHostData;
+ //note: I'm reusing bytesReceived and buffer
+ bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
+ (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer,
+ BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES);
+ if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES))
+ state = REBUILD1_STATE;
+ else
+ initRebuild();
+ }
+ break;
+ case JOIN_REQ:
+ if (state == LEAD_REBUILD1_STATE)
+ {
+ joinReqPtr = (struct joinReq *)buffer;
+ addHost(joinReqPtr->newHostData);
+
+ replyBuffer[0] = JOIN_RES;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case GET_DHT_INFO_CMD:
+ if (state == REBUILD1_STATE)
+ {
+ getDHTdata();
+ state = REBUILD2_STATE;
+ }
+ break;
+ default:
+ dhtLog("udpListen(): ERROR: Unknown message type\n");