Implemented leader/follower state machines for rebuild procedure up to
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 #include "dht.h"
2
3 #ifdef SIMPLE_DHT
4
5 #include <arpa/inet.h>
6
7 #define NUM_HOSTS 4
8 #define OIDS_PER_HOST 0x40000000
9
10 //set these to your IP addresses
11 unsigned int hosts[NUM_HOSTS] = {
12         0xc0a802c8,
13         0xc0a802c9,
14         0xc0a802ca,
15         0xc0a802cb,
16 };
17
18 //does nothing
19 void dhtInit(unsigned int maxKeyCapaciy)
20 {       return;}
21
22 //does nothing
23 void dhtExit()
24 {       return;}
25
26 //does nothing, returns 0
27 int dhtInsert(unsigned int key, unsigned int val)
28 {       return 0;}
29
30 //does nothing, returns 0
31 int dhtRemove(unsigned int key)
32 {       return 0;}
33
34 //returns 0 if successful and copies val into *val,
35 // 1 if key not found, -1 if an error occurred
36 int dhtSearch(unsigned int key, unsigned int *val)
37 {
38         *val = hosts[key / OIDS_PER_HOST];
39         return 0;
40 }
41
42 #else
43
44 /*******************************************************************************
45 *                              Includes
46 *******************************************************************************/
47
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <sys/types.h>
51 #include <sys/socket.h>
52 #include <sys/ioctl.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56 #include <stdlib.h>
57 #include <unistd.h>
58 #include <pthread.h>
59 #include <sys/time.h>
60 #include <sys/poll.h>
61 #include <netdb.h>
62 #include <net/if.h>
63 #include <linux/sockios.h>
64 #include <sys/time.h>
65 #include "clookup.h" //this works for now, do we need anything better?
66
67 /*******************************************************************************
68 *                           Local Defines, Structs
69 *******************************************************************************/
70
71 #define BUFFER_SIZE 512 //maximum message size
72 #define UDP_PORT 2157
73 #define TCP_PORT 2157
74 #define BACKLOG 10 //max pending tcp connections
75 #define TIMEOUT_MS 500
76 #define MAX_RETRIES 3
77 #define INIT_HOST_ALLOC 1
78 #define INIT_BLOCK_NUM 1
79 #define DEFAULT_INTERFACE "eth0"
80 #define DHT_LOG "dht.log"
81
82 //make sure this is consistent with enum below
83 #define NUM_MSG_TYPES 20
84
85 //make sure this matches msg_types global var
86 enum {
87         INSERT_CMD,
88         INSERT_RES,
89         REMOVE_CMD,
90         REMOVE_RES,
91         SEARCH_CMD,
92         SEARCH_RES,
93         FIND_LEADER_REQ,
94         FIND_LEADER_RES,
95         REBUILD_REQ,
96         REBUILD_RES,
97         NOT_LEADER,
98         REBUILD_CMD,
99         JOIN_REQ,
100         JOIN_RES,
101         GET_DHT_INFO_CMD,
102         DHT_INFO_REQ,
103         DHT_INFO_RES,
104         FILL_DHT_CMD,
105         FILL_DHT_RES,
106         REBUILD_DONE_INFO
107 };
108
109 //status codes
110 enum {
111         INSERT_OK,
112         INSERT_ERROR,
113         REMOVE_OK,
114         REMOVE_ERROR,
115         KEY_FOUND,
116         KEY_NOT_FOUND,
117         NOT_KEY_OWNER,
118 };
119
120 enum {
121         NORMAL_STATE,
122         REBUILD1_STATE,
123         REBUILD2_STATE,
124         REBUILD3_STATE,
125         LEAD_NORMAL_STATE,
126         LEAD_REBUILD1_STATE,
127         LEAD_REBUILD2_STATE,
128         LEAD_REBUILD3_STATE
129 };
130
131 struct hostData {
132         unsigned int ipAddr;
133         unsigned int maxKeyCapacity;
134 };
135
136 struct insertCmd {
137         unsigned int msgType:8;
138         unsigned int unused:24;
139         unsigned int key;
140         unsigned int val;
141 };
142
143 struct removeCmd {
144         unsigned int msgType:8;
145         unsigned int unused:24;
146         unsigned int key;
147 };
148
149 struct searchCmd {
150         unsigned int msgType:8;
151         unsigned int unused:24;
152         unsigned int key;
153 };
154
155 struct insertRes {
156         unsigned int msgType:8;
157         unsigned int unused:24;
158         unsigned int status;
159 };
160
161 struct removeRes {
162         unsigned int msgType:8;
163         unsigned int unused:24;
164         unsigned int status;
165 };
166
167 struct searchRes {
168         unsigned int msgType:8;
169         unsigned int unused:24;
170         unsigned int status;
171         unsigned int val;
172 };
173
174 struct joinReq {
175         unsigned int msgType:8;
176         unsigned int unused:24;
177         struct hostData newHostData;
178 };
179
180 /*******************************************************************************
181 *                           Global Variables
182 *******************************************************************************/
183
184 //make sure this matches enumeration above
185 const char *msg_types[NUM_MSG_TYPES] =
186 {
187         "INSERT_CMD",
188         "INSERT_RES",
189         "REMOVE_CMD",
190         "REMOVE_RES",
191         "SEARCH_CMD",
192         "SEARCH_RES",
193         "FIND_LEADER_REQ",
194         "FIND_LEADER_RES",
195         "REBUILD_REQ",
196         "REBUILD_RES",
197         "NOT_LEADER",
198         "REBUILD_CMD",
199         "JOIN_REQ",
200         "JOIN_RES",
201         "GET_DHT_INFO_CMD",
202         "DHT_INFO_REQ",
203         "DHT_INFO_RES",
204         "FILL_DHT_CMD",
205         "FILL_DHT_RES",
206         "REBUILD_DONE_INFO"
207 };
208
209 FILE *logfile;
210 //ip address of leader
211 unsigned int leader;
212 //set by dhtInit()
213 struct hostData myHostData;
214 //number of hosts in the system
215 unsigned int numHosts;
216 //ip address and max key capacity of each host
217 struct hostData *hostArray;
218 //memory allocated for this many items in hostArray
219 unsigned int hostArraySize;
220 //number of keyspace divisions, preferably a power of 2 > numHosts
221 unsigned int numBlocks;
222 //this array has numBlocks elements, each of which contains an index to hostArray
223 // the key owner is found by hashing the key into one of these blocks and using this
224 // array to find the corresponding host in hostArray
225 unsigned int *blockOwnerArray;
226 //used by leader to track which hosts have responded, etc.
227 unsigned int *hostRebuildStates;
228 //thread handles
229 pthread_t threadUdpListen;
230 pthread_t threadTcpListen;
231 //server sockets
232 struct pollfd udpServerPollSock;
233 int tcpListenSock;
234 //see above for enumeration of states
235 int state;
236
237 /*******************************************************************************
238 *                         Local Function Prototypes
239 *******************************************************************************/
240
241 //log funtion, use like printf()
242 void dhtLog(const char *format, ...);
243 //return my IP address
244 unsigned int getMyIpAddr();
245 //sends broadcast to discover leader
246 unsigned int findLeader();
247 //UDP server
248 void *udpListen();
249 //TCP server
250 void *tcpListen();
251 //TCP connection handler
252 void *tcpAccept(void *);
253 //returns number of bytes received in resBuffer, or -1 if an error occurred
254 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
255         void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
256         unsigned int timeout, unsigned int numRetries);
257 //returns number of bytes received in resBuffer, or -1 if an error occurred
258 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
259         unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
260         unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
261 //just UDP it
262 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
263         unsigned int msglen);
264 //right now this hashes the key into a block and returns the block owner
265 unsigned int getKeyOwner(unsigned int key);
266 //simple hash
267 unsigned int hash(unsigned int x);
268 //sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen
269 void initRebuild();
270 //adds entry to end of hostArray, increments numHosts,
271 // allocates more space if necessary
272 void addHost(struct hostData newHost);
273 //initiates TCP connection with leader, gets DHT data
274 int getDHTdata();
275 //outputs readable DHT data to outfile
276 void writeDHTdata(FILE *outfile);
277 void clearDHTdata();
278 void initDHTdata();
279 void makeAssignments();
280 //returns not-zero if ok, zero if not ok
281 int msgSizeOk(unsigned char type, unsigned int size);
282
283 /*******************************************************************************
284 *                      Global Function Definitions
285 *******************************************************************************/
286
287 void dhtInit(unsigned int maxKeyCapacity)
288 {
289         unsigned int myMessage;
290         int bytesReceived;
291         int i;
292         int ret;
293
294         logfile = fopen(DHT_LOG, "w");
295         dhtLog("dhtInit() - initializing...\n");
296
297         myHostData.ipAddr = getMyIpAddr();
298         myHostData.maxKeyCapacity = maxKeyCapacity;
299
300         numHosts = numBlocks = hostArraySize = 0;
301         hostArray = NULL;
302         blockOwnerArray = NULL;
303         hostRebuildStates = NULL;
304
305         state = NORMAL_STATE;
306
307         pthread_create(&threadUdpListen, NULL, udpListen, NULL);
308         pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
309
310         initRebuild();
311
312         return;
313 }
314
315 void dhtExit()
316 {
317         dhtLog("dhtExit(): cleaning up...\n");
318         fclose(logfile);
319         pthread_cancel(threadUdpListen);
320         pthread_cancel(threadTcpListen);
321         close(udpServerPollSock.fd);
322         close(tcpListenSock);
323         clearDHTdata();
324 }
325
326 int dhtInsert(unsigned int key, unsigned int val)
327 {
328         unsigned int dest_ip = getKeyOwner(key);
329         struct insertCmd myMessage;
330         struct insertRes response;
331         int bytesReceived;
332
333         myMessage.msgType = INSERT_CMD;
334         myMessage.key = key;
335         myMessage.val = val;
336         
337         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
338                 sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
339                 TIMEOUT_MS, MAX_RETRIES);
340         if (bytesReceived == sizeof(struct insertRes))
341         {
342                 if (response.msgType == INSERT_RES)
343                 {
344                         if (response.status == INSERT_OK)
345                                 return 0;
346 //                      if (response.status == NOT_KEY_OWNER)
347                 }
348         }
349 //TODO: find owner and try again, request rebuild if necessary
350         return -1; //this function should be robust enough to always return 0
351 }
352
353 int dhtRemove(unsigned int key)
354 {
355         unsigned int dest_ip = getKeyOwner(key);
356         struct removeCmd myMessage;
357         struct removeRes response;
358         int bytesReceived;
359         
360         myMessage.msgType = REMOVE_CMD;
361         myMessage.key = key;
362
363         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
364                 sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
365                 TIMEOUT_MS, MAX_RETRIES);
366         if (bytesReceived == sizeof(struct removeRes))
367         {
368                 if (response.msgType == REMOVE_RES)
369                 {
370                         if (response.status == REMOVE_OK)
371                                 return 0;
372 //                      if (response.status == NOT_KEY_OWNER)
373                 }
374         }
375 //TODO: find owner and try again, request rebuild if necessary
376         return -1; //this function should be robust enough to always return 0
377 }
378
379 int dhtSearch(unsigned int key, unsigned int *val)
380 {
381         unsigned int dest_ip = getKeyOwner(key);
382         struct searchCmd myMessage;
383         struct searchRes response;
384         int bytesReceived;
385
386         myMessage.msgType = SEARCH_CMD;
387         myMessage.key = key;
388
389         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
390                 sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
391                 TIMEOUT_MS, MAX_RETRIES);
392         if (bytesReceived == sizeof(struct searchRes))
393         {
394                 if (response.msgType == SEARCH_RES)
395                 {
396                         if (response.status == KEY_FOUND)
397                         {
398                                 *val = response.val;
399                                 return 0;
400                         }
401                         if (response.status == KEY_NOT_FOUND)
402                         {
403                                 return 1;
404                         }
405 //                      if (response.status == NOT_KEY_OWNER)
406                 }
407         }
408 //TODO: find owner and try again, request rebuild if necessary
409         return -1; //this function should be robust enough to always return 0 or 1
410 }
411
412 /*******************************************************************************
413 *                      Local Function Definitions
414 *******************************************************************************/
415
416 //use UDP for messages that are frequent and short
417 void *udpListen()
418 {
419         struct sockaddr_in myAddr;
420         struct sockaddr_in clientAddr;
421         struct sockaddr_in bcastAddr;
422         socklen_t socklen = sizeof(struct sockaddr_in);
423         char buffer[BUFFER_SIZE];
424         ssize_t bytesReceived;
425         struct insertCmd *insertCmdPtr;
426         struct removeCmd *removeCmdPtr;
427         struct searchCmd *searchCmdPtr;
428         struct insertRes *insertResPtr;
429         struct removeRes *removeResPtr;
430         struct searchRes *searchResPtr;
431         struct joinReq *joinReqPtr;
432         char replyBuffer[BUFFER_SIZE];
433         struct timeval now;
434         struct timeval rebuild1Timeout;
435         int rebuild1TimerSet;
436         int on;
437         int pollret;
438         int i;
439
440         chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
441
442         if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
443         {
444                 perror("udpListen():socket()");
445                 pthread_exit(NULL);
446         }
447
448         on = 1;
449         if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on,
450                 sizeof(on)) == -1)
451         {
452                 perror("udpBroadcastWaitForResponse():setsockopt()");
453                 pthread_exit(NULL);
454         }
455         
456         udpServerPollSock.events = POLLIN;
457         
458         bzero(&myAddr, socklen);
459         myAddr.sin_family = AF_INET;
460         myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
461         myAddr.sin_port = htons(UDP_PORT);
462
463         bzero(&bcastAddr, socklen);
464         bcastAddr.sin_family = AF_INET;
465         bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF);
466         bcastAddr.sin_port = htons(UDP_PORT);
467
468         if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1)
469         {
470                 perror("udpListen():bind()");
471                 pthread_exit(NULL);
472         }
473         dhtLog("udpListen(): listening on port %d\n", UDP_PORT);
474
475         rebuild1TimerSet = 0;
476         while(1)
477         {
478                 pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS);
479                 if (pollret < 0)
480                 {       perror("udpListen():poll()");   }
481                 else if (pollret > 0)
482                 {
483                         if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE,
484                                 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
485                         {       perror("udpListen():recvfrom()");       }
486                         else if (bytesReceived == 0)
487                         {
488                                 dhtLog("udpListen(): recvfrom() returned 0\n");
489                         }
490                         else
491                         {
492                                 dhtLog("udpListen(): received %s from %s\n",
493                                         (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] :
494                                         "unknown message"), inet_ntoa(clientAddr.sin_addr));
495                                 if (!msgSizeOk(buffer[0], bytesReceived))
496                                 {
497                                         dhtLog("udpListen(): ERROR: incorrect message size\n");
498                                 }
499                                 else
500                                 {
501                                         switch (buffer[0])
502                                         {
503                                                 case INSERT_CMD:
504                                                         if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
505                                                                 || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
506                                                         {
507                                                                 insertCmdPtr = (struct insertCmd *)buffer;
508                                                                 dhtLog( "udpListen(): Insert: key=%d, val=%d\n",
509                                                                         insertCmdPtr->key, insertCmdPtr->val);
510                                                                 insertResPtr = (struct insertRes *)replyBuffer;
511                                                                 insertResPtr->msgType = INSERT_RES;
512                                                                 insertResPtr->unused = 0;
513                                                                 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
514                                                                 {
515                                                                         //note: casting val to void * in order to conform to API
516                                                                         if(chashInsert(myHashTable, insertCmdPtr->key,
517                                                                                         (void *)insertCmdPtr->val) == 0)
518                                                                                 insertResPtr->status = INSERT_OK;
519                                                                         else
520                                                                                 insertResPtr->status = INSERT_ERROR;
521                                                                 }
522                                                                 else
523                                                                 {
524                                                                         insertResPtr->status = NOT_KEY_OWNER;;
525                                                                 }
526                                                                 if (sendto(udpServerPollSock.fd, (void *)insertResPtr,
527                                                                         sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
528                                                                         socklen) == -1)
529                                                                 {       perror("udpListen():sendto()"); }
530                                                         }
531                                                         break;
532                                                 case REMOVE_CMD:
533                                                         if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
534                                                         {
535                                                                 removeCmdPtr = (struct removeCmd *)buffer;
536                                                                 dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key);
537                                                                 removeResPtr = (struct removeRes *)replyBuffer;
538                                                                 removeResPtr->msgType = REMOVE_RES;
539                                                                 removeResPtr->unused = 0;
540                                                                 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
541                                                                 {
542                                                                         //note: casting val to void * in order to conform to API
543                                                                         if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
544                                                                                 removeResPtr->status = INSERT_OK;
545                                                                         else
546                                                                                 removeResPtr->status = INSERT_ERROR;
547                                                                 }
548                                                                 else
549                                                                 {
550                                                                         removeResPtr->status = NOT_KEY_OWNER;
551                                                                 }
552                                                                 if (sendto(udpServerPollSock.fd, (void *)removeResPtr,
553                                                                         sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr,
554                                                                         socklen) == -1)
555                                                                 {       perror("udpListen():sendto()"); }
556                                                         }
557                                                         break;
558                                                 case SEARCH_CMD:
559                                                         if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
560                                                         {
561                                                                 searchCmdPtr = (struct searchCmd *)buffer;
562                                                                 dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key);
563                                                                 searchResPtr = (struct searchRes *)replyBuffer;
564                                                                 searchResPtr->msgType = SEARCH_RES;
565                                                                 searchResPtr->unused = 0;
566                                                                 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
567                                                                 {
568                                                                         //note: casting val to void * in order to conform to API
569                                                                         if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
570                                                                                         searchCmdPtr->key)) == 0)
571                                                                                 searchResPtr->status = KEY_NOT_FOUND;
572                                                                         else
573                                                                                 searchResPtr->status = KEY_FOUND;
574                                                                 }
575                                                                 else
576                                                                 {
577                                                                         searchResPtr->status = NOT_KEY_OWNER;
578                                                                 }
579                                                                 if (sendto(udpServerPollSock.fd, (void *)searchResPtr,
580                                                                         sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr,
581                                                                         socklen) == -1)
582                                                                 {       perror("udpListen():sendto()"); }
583                                                         }
584                                                         break;
585                                                 case FIND_LEADER_REQ:
586                                                         if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
587                                                                 || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
588                                                         {
589                                                                 replyBuffer[0] = FIND_LEADER_RES;
590                                                                 if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
591                                                                         sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
592                                                                 {       perror("udpListen():sendto()"); }
593                                                         }
594                                                         break;
595                                                 case REBUILD_REQ:
596                                                         if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
597                                                                 || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
598                                                         {
599                                                                 replyBuffer[0] = REBUILD_RES;
600                                                                 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
601                                                                         sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1)
602                                                                 {       perror("udpListen():sendto()"); }
603                                                                 if (gettimeofday(&rebuild1Timeout, NULL) < 0)
604                                                                 {       perror("dhtLog():gettimeofday()"); }
605                                                                 //TODO: make this a configurable parameter
606                                                                 rebuild1Timeout.tv_sec += 3;
607                                                                 rebuild1TimerSet = 1;
608                                                                 //clear out previous host data
609                                                                 numHosts = 1;
610                                                                 hostArray[0] = myHostData;
611
612                                                                 state = LEAD_REBUILD1_STATE;
613
614                                                                 replyBuffer[0] = REBUILD_CMD;
615                                                                 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
616                                                                         sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
617                                                                 {       perror("udpListen():sendto()"); }
618                                                                 
619                                                         }
620                                                         else
621                                                         {
622                                                                 replyBuffer[0] = NOT_LEADER;
623                                                                 if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
624                                                                         sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
625                                                                 {       perror("udpListen():sendto()"); }
626                                                         }
627                                                 case REBUILD_CMD:
628                                                         if (state != LEAD_REBUILD1_STATE)
629                                                         {
630                                                                 //consider this an official declaration of authority,
631                                                                 // in case I was confused about this
632                                                                 leader = htonl(clientAddr.sin_addr.s_addr);
633                                                                 
634                                                                 clearDHTdata();
635
636                                                                 joinReqPtr = (struct joinReq *)replyBuffer;
637                                                                 joinReqPtr->msgType = JOIN_REQ;
638                                                                 joinReqPtr->unused = 0;
639                                                                 joinReqPtr->newHostData = myHostData;
640                                                                 //note: I'm reusing bytesReceived and buffer
641                                                                 bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
642                                                                         (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer,
643                                                                         BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES);
644                                                                 if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES))
645                                                                         state = REBUILD1_STATE;
646                                                                 else
647                                                                         initRebuild();
648                                                         }
649                                                         break;
650                                                 case JOIN_REQ:
651                                                         if (state == LEAD_REBUILD1_STATE)
652                                                         {
653                                                                 joinReqPtr = (struct joinReq *)buffer;
654                                                                 addHost(joinReqPtr->newHostData);
655
656                                                                 replyBuffer[0] = JOIN_RES;
657                                                                 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
658                                                                         sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
659                                                                 {       perror("udpListen():sendto()"); }
660                                                         }
661                                                         break;
662                                                 case GET_DHT_INFO_CMD:
663                                                         if (state == REBUILD1_STATE)
664                                                         {
665                                                                 getDHTdata();
666                                                                 state = REBUILD2_STATE;
667                                                         }
668                                                         break;
669                                                 default:
670                                                         dhtLog("udpListen(): ERROR: Unknown message type\n");
671                                         }
672                                 }
673                         }
674                 } //end (pollret > 0)
675                 else // (pollret == 0), timeout
676                 {
677                         if (gettimeofday(&now, NULL) < 0)
678                         {       perror("dhtLog():gettimeofday()"); }
679                         if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >))
680                         {
681                                 rebuild1TimerSet = 0;
682                                 if (state == LEAD_REBUILD1_STATE)
683                                 {
684                                         makeAssignments();
685                                         dhtLog("udpListen(): assignments made\n");
686                                         writeDHTdata(logfile);
687                                         if (hostRebuildStates != NULL)
688                                                 free(hostRebuildStates);
689                                         hostRebuildStates = calloc(numHosts, sizeof(unsigned int));
690                                         for (i = 0; i < numHosts; i++)
691                                                 hostRebuildStates[i] = REBUILD1_STATE;
692                                         state = LEAD_REBUILD2_STATE;
693                                         replyBuffer[0] = GET_DHT_INFO_CMD;
694                                         if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
695                                                 sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
696                                         {       perror("udpListen():sendto()"); }
697                                 }
698                         }
699                 }
700         }
701 }
702
703 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
704         void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
705         unsigned int timeout, unsigned int numRetries)
706 {
707         struct sockaddr_in server_addr;
708         struct sockaddr_in ack_addr;
709         socklen_t socklen = sizeof(struct sockaddr_in);
710         struct pollfd pollsock;
711         int retval;
712         int i;
713         ssize_t bytesReceived;
714
715         bzero((char *) &server_addr, sizeof(server_addr));
716         server_addr.sin_family = AF_INET;
717         server_addr.sin_port = htons(dest_port);
718         server_addr.sin_addr.s_addr = htonl(dest_ip);
719
720         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
721         {
722                 perror("udpSendWaitForResponse():socket()");
723                 return -1;
724         }
725         
726         pollsock.events = POLLIN;
727         
728         for (i = 0; i < MAX_RETRIES; i++)
729         {
730                 if (i > 0)
731                         dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1);
732                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
733                         socklen) == -1)
734                 {
735                         perror("udpSendWaitForResponse():sendto");
736                         return -1;
737                 }
738                 dhtLog("udpSendWaitForResponse(): message sent\n");
739                 retval = poll(&pollsock, 1, timeout);
740                 if (retval < 0)
741                 {
742                         perror("udpSendWaitForResponse():poll()");
743                 }
744                 else if (retval > 0)
745                 {
746                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
747                                 (struct sockaddr *)&ack_addr, &socklen);
748                         if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
749                         && (ack_addr.sin_port == server_addr.sin_port))
750                         {
751                                 close(pollsock.fd);
752                                 dhtLog("udpSendWaitForResponse(): received response\n");
753                                 return bytesReceived;
754                         }
755                 }
756         }
757         close(pollsock.fd);
758         printf("udpSendWaitForResponse(): timed out, no ack\n");
759         return -1;
760 }
761
762 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
763         unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
764         unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
765 {
766         struct sockaddr_in server_addr;
767         struct sockaddr_in ack_addr;
768         socklen_t socklen = sizeof(struct sockaddr_in);
769         struct pollfd pollsock;
770         int retval;
771         int i;
772         ssize_t bytesReceived;
773         int on;
774
775         bzero((char *) &server_addr, sizeof(server_addr));
776         server_addr.sin_family = AF_INET;
777         server_addr.sin_port = htons(dest_port);
778         server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
779
780         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
781         {
782                 perror("udpBroadcastWaitForResponse():socket()");
783                 return -1;
784         }
785
786         on = 1;
787         if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
788         {
789                 perror("udpBroadcastWaitForResponse():setsockopt()");
790                 return -1;
791         }
792
793         pollsock.events = POLLIN;
794         
795         for (i = 0; i < MAX_RETRIES; i++)
796         {
797                 if (i > 0)
798                         dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
799                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
800                         socklen) == -1)
801                 {
802                         perror("udpBroadcastWaitForResponse():sendto()");
803                         return -1;
804                 }
805                 dhtLog("udpBroadcastWaitForResponse(): message sent\n");
806                 retval = poll(&pollsock, 1, timeout);
807                 if (retval !=0)
808                 {
809                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
810                                 (struct sockaddr *)&ack_addr, &socklen);
811                         close(pollsock.fd);
812                         *reply_ip = htonl(ack_addr.sin_addr.s_addr);
813                         dhtLog("udpBroadcastWaitForResponse(): received response\n");
814                         return bytesReceived;
815                 }
816         }
817         close(pollsock.fd);
818         dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n");
819         return -1;
820 }
821
822 // use TCP for potentially large and/or important data transfer
823 void *tcpListen()
824 {
825         struct sockaddr_in myAddr;
826         struct sockaddr_in clientAddr;
827         int tcpAcceptSock;
828         socklen_t socklen = sizeof(struct sockaddr_in);
829         pthread_t threadTcpAccept;
830
831         tcpListenSock = socket(AF_INET, SOCK_STREAM, 0);
832         if (tcpListenSock == -1)
833         {
834                 perror("tcpListen():socket()");
835                 pthread_exit(NULL);
836         }
837
838         myAddr.sin_family = AF_INET;
839         myAddr.sin_port = htons(TCP_PORT);
840         myAddr.sin_addr.s_addr = INADDR_ANY;
841         memset(&(myAddr.sin_zero), '\0', 8);
842
843         if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1)
844         {
845                 perror("tcpListen():socket()");
846                 pthread_exit(NULL);
847         }
848
849         if (listen(tcpListenSock, BACKLOG) == -1)
850         {
851                 perror("tcpListen():listen()");
852                 pthread_exit(NULL);
853         }
854
855         dhtLog("tcpListen(): listening on port %d\n", TCP_PORT);
856
857         while(1)
858         {
859                 tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr,
860                         &socklen);
861                 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
862         }
863 }
864
865 void *tcpAccept(void *arg)
866 {
867         int tcpAcceptSock = (int)arg;
868         int bytesReceived;
869         char msgType;
870
871         dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n",
872                 tcpAcceptSock);
873
874         bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
875         if (bytesReceived == -1)
876         {       perror("tcpAccept():recv()");   }
877         else if (bytesReceived == 0)
878         {
879                 dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
880         }
881         else
882         {
883                 switch (msgType)
884                 {
885                         case DHT_INFO_REQ:
886                                 if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1)
887                                 {
888                                         perror("tcpAccept():send()");
889                                         break;
890                                 }
891                                 if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1)
892                                 {
893                                         perror("tcpAccept():send()");
894                                         break;
895                                 }
896                                 if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData),
897                                                 0) == -1)
898                                 {
899                                         perror("tcpAccept():send()");
900                                         break;
901                                 }
902                                 if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int),
903                                                 0) == -1)
904                                 {
905                                         perror("tcpAccept():send()");
906                                         break;
907                                 }
908                                 break;
909                         default:
910                                 dhtLog("tcpAccept(): unrecognized msg type\n");
911                 }
912         }
913
914         if (close(tcpAcceptSock) == -1)
915         {       perror("tcpAccept():close()"); }
916
917         dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n",
918                 tcpAcceptSock);
919
920         pthread_exit(NULL);
921 }
922
923 unsigned int getKeyOwner(unsigned int key)
924 {
925         if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
926                 || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
927         {
928                 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
929         }
930         else
931         { //TODO: figure out what is best to do here. Would like calls to dhtSearch,
932                 // etc. to block rather than fail during rebuilds
933                 return 0;
934         }
935 }
936
937 unsigned int getMyIpAddr()
938 {       
939         int sock;
940         struct ifreq interfaceInfo;
941         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
942
943         memset(&interfaceInfo, 0, sizeof(struct ifreq));
944
945         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
946         {
947                 perror("getMyIpAddr():socket()");
948                 return 1;
949         }
950
951         strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
952         myAddr->sin_family = AF_INET;
953         
954         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
955         {
956                 perror("getMyIpAddr():ioctl()");
957                 return 1;
958         }
959
960         return ntohl(myAddr->sin_addr.s_addr);
961 }
962
963 unsigned int findLeader()
964 {
965         unsigned int reply_ip;
966         int bytesReceived;
967         char myMessage;
968         char response;
969
970         dhtLog("findLeader(): broadcasting...\n");
971
972         myMessage = FIND_LEADER_REQ;
973
974         bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
975                 (void *)&myMessage, sizeof(myMessage), (void *)&response,
976                 sizeof(response), TIMEOUT_MS, MAX_RETRIES);
977
978         if (bytesReceived == -1)
979         {
980                 dhtLog("findLeader(): no response\n");
981                 return 0;
982         }
983         else if (response == FIND_LEADER_RES)
984         {
985                 struct in_addr reply_addr;
986                 reply_addr.s_addr = htonl(reply_ip);
987                 dhtLog("findLeader(): leader found:%s\n",
988                                         inet_ntoa(reply_addr));
989                 return reply_ip;
990         }
991         else
992         {
993                 dhtLog("findLeader(): unexpected response\n");
994                 return 0;
995         }
996 }
997
998 int getDHTdata()
999 {
1000         struct sockaddr_in leader_addr;
1001         int sock;
1002         char msg;
1003         int bytesReceived;
1004
1005         clearDHTdata();
1006
1007         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
1008         {
1009                 perror("getDHTdata():socket()");
1010                 return -1;
1011         }
1012
1013         bzero((char *)&leader_addr, sizeof(leader_addr));
1014         leader_addr.sin_family = AF_INET;
1015         leader_addr.sin_port = htons(TCP_PORT);
1016         leader_addr.sin_addr.s_addr = htonl(leader);
1017
1018         if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
1019         {
1020                 perror("getDHTdata():connect()");
1021                 close(sock);
1022                 return -1;
1023         }
1024         msg = DHT_INFO_REQ;
1025         if (send(sock, &msg, sizeof(char), 0) == -1)
1026         {
1027                 perror("getDHTdata():send()");
1028                 close(sock);
1029                 return -1;
1030         }
1031         bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
1032         if (bytesReceived == -1)
1033         {
1034                 perror("getDHTdata():recv()");
1035                 close(sock);
1036                 return -1;
1037         }
1038         if (bytesReceived != sizeof(numHosts))
1039         {
1040                 dhtLog("getDHTdata(): ERROR: numHosts not completely received\n");
1041                 close(sock);
1042                 return -1;
1043         }
1044         bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
1045         if (bytesReceived == -1)
1046         {
1047                 perror("getDHTdata():recv()");
1048                 close(sock);
1049                 return -1;
1050         }
1051         if (bytesReceived != sizeof(numBlocks))
1052         {
1053                 dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n");
1054                 close(sock);
1055                 return -1;
1056         }
1057         hostArray = calloc(numHosts, sizeof(struct hostData));
1058         bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
1059         if (bytesReceived == -1)
1060         {
1061                 perror("getDHTdata():recv()");
1062                 close(sock);
1063                 return -1;
1064         }
1065         if (bytesReceived != numHosts*sizeof(struct hostData))
1066         {
1067                 dhtLog("getDHTdata(): ERROR: hostArray not completely received\n");
1068                 close(sock);
1069                 return -1;
1070         }
1071         blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1072         bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0);
1073         if (bytesReceived == -1)
1074         {
1075                 perror("getDHTdata():recv()");
1076                 close(sock);
1077                 return -1;
1078         }
1079         if (bytesReceived != numBlocks*sizeof(unsigned int))
1080         {
1081                 dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n");
1082                 close(sock);
1083                 return -1;
1084         }
1085         dhtLog("getDHTdata(): got data:\n");
1086         writeDHTdata(logfile);
1087
1088         return 0;
1089 }
1090
1091 unsigned int hash(unsigned int x)
1092 {
1093         //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero,
1094         // make sure we are in a proper state for key owner lookups
1095                 return x % numBlocks;
1096 }
1097
1098 //This function will not return until it succeeds in submitting
1099 // a rebuild request to the leader. It is then the leader's responibility
1100 // to ensure that the rebuild is caried out
1101 void initRebuild()
1102 {
1103         int bytesReceived;
1104         char msg;
1105         char response;
1106         int done;
1107         int retry_count;
1108         int i;
1109
1110         done = 0;
1111         retry_count = 0;
1112
1113         while (!done)
1114         {
1115                 if (retry_count > 0)
1116                 {
1117                         dhtLog("initRebuild(): retry count:%d\n", retry_count);
1118                 }
1119
1120                 if (leader == 0 || retry_count > 0)
1121                 {
1122                         leader = findLeader(); //broadcast
1123                         if (leader == 0) //no response
1124                         {
1125                                 //TODO:elect leader: this will do for now
1126                                 initDHTdata();
1127                                 leader = getMyIpAddr();
1128                                 state = LEAD_NORMAL_STATE;
1129                         }
1130                 }
1131         
1132                 msg = REBUILD_REQ;
1133
1134                 bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
1135                         (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
1136                         TIMEOUT_MS, MAX_RETRIES);
1137                 if (bytesReceived == -1)
1138                 {       perror("initRebuild():recv()"); }
1139                 else if (bytesReceived != sizeof(response))
1140                 {
1141                         dhtLog("initRebuild(): ERROR: response not completely received\n");
1142                 }
1143                 else if (response == NOT_LEADER)
1144                 {
1145                         struct in_addr address;
1146                         address.s_addr = htonl(leader);
1147                         dhtLog("initRebuild(): ERROR: %s no longer leader\n",
1148                                 inet_ntoa(address));
1149                 }
1150                 else if (response != REBUILD_RES)
1151                 {
1152                         dhtLog("initRebuild(): ERROR: unexpected response\n");
1153                 }
1154                 else
1155                 {
1156                         dhtLog("initRebuild(): submitted rebuild request\n");
1157                         writeDHTdata(logfile);
1158                         done = 1;
1159                 }
1160         }
1161         return;
1162 }
1163
1164 void writeDHTdata(FILE *outfile)
1165 {
1166         int i;
1167         struct in_addr address;
1168
1169         fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
1170         fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
1171         for (i = 0; i < numHosts; i++)
1172         {
1173                 address.s_addr = htonl(hostArray[i].ipAddr);
1174                 fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
1175                         hostArray[i].maxKeyCapacity);
1176         }
1177         fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
1178         for (i = 0; i < numBlocks; i++)
1179                 fprintf(outfile,"%d: %d  ", i, blockOwnerArray[i]);
1180         fprintf(outfile,"\n");
1181 }
1182
1183 void clearDHTdata()
1184 {
1185         if (hostArray != NULL)
1186         {
1187                 free(hostArray);
1188                 hostArray = NULL;
1189         }
1190         if (blockOwnerArray != NULL)
1191         {
1192                 free(blockOwnerArray);
1193                 blockOwnerArray = NULL;
1194         }
1195         numHosts = numBlocks = hostArraySize = 0;
1196         return;
1197 }
1198
1199 void initDHTdata()
1200 {
1201         int i;
1202
1203         clearDHTdata();
1204         hostArraySize = INIT_HOST_ALLOC;
1205         hostArray = calloc(hostArraySize, sizeof(struct hostData));
1206         numHosts = 1;
1207         hostArray[0] = myHostData;
1208         numBlocks = INIT_BLOCK_NUM;
1209         blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1210         for (i = 0; i < numBlocks; i++)
1211                 blockOwnerArray[i] = 0;
1212         
1213         return;
1214 }
1215
1216 void addHost(struct hostData newHost)
1217 {
1218         struct hostData *newArray;
1219         unsigned int newArraySize;
1220
1221         if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
1222                 initDHTdata();
1223
1224         if (numHosts == hostArraySize)
1225         {
1226                 newArraySize = hostArraySize * 2;
1227                 newArray = calloc(newArraySize, sizeof(struct hostData));
1228                 memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData)));
1229                 free(hostArray);
1230                 hostArray = newArray;
1231                 hostArraySize = newArraySize;
1232         }
1233
1234         hostArray[numHosts] = newHost;
1235         numHosts++;
1236
1237         return;
1238 }
1239
1240 void makeAssignments()
1241 {
1242         int i;
1243
1244         if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
1245                 initDHTdata();
1246         
1247         if (numBlocks < numHosts)
1248         {
1249                 free(blockOwnerArray);
1250                 while (numBlocks < numHosts)
1251                         numBlocks *= 2;
1252                 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1253         }
1254
1255         for (i = 0; i < numBlocks; i++)
1256                 blockOwnerArray[i]  = i % numHosts;
1257
1258         return;
1259 }
1260
1261 //returns not-zero if ok, zero if not ok
1262 int msgSizeOk(unsigned char type, unsigned int size)
1263 {
1264         int status;
1265
1266         switch (type)
1267         {
1268                 case INSERT_CMD:
1269                         status = (size == sizeof(struct insertCmd));
1270                         break;
1271                 case INSERT_RES:
1272                         status = (size == sizeof(struct insertRes));
1273                         break;
1274                 case REMOVE_CMD:
1275                         status = (size == sizeof(struct removeCmd));
1276                         break;
1277                 case REMOVE_RES:
1278                         status = (size == sizeof(struct removeRes));
1279                         break;
1280                 case SEARCH_CMD:
1281                         status = (size == sizeof(struct searchCmd));
1282                         break;
1283                 case SEARCH_RES:
1284                         status = (size == sizeof(struct searchRes));
1285                         break;
1286                 case FIND_LEADER_REQ:
1287                         status = (size == sizeof(char));
1288                         break;
1289                 case FIND_LEADER_RES:
1290                         status = (size == sizeof(char));
1291                         break;
1292                 case REBUILD_REQ:
1293                         status = (size == sizeof(char));
1294                         break;
1295                 case REBUILD_RES:
1296                         status = (size == sizeof(char));
1297                         break;
1298                 case NOT_LEADER:
1299                         status = (size == sizeof(char));
1300                         break;
1301                 case REBUILD_CMD:
1302                         status = (size == sizeof(char));
1303                         break;
1304                 case JOIN_REQ:
1305                         status = (size == sizeof(struct joinReq));
1306                         break;
1307                 case JOIN_RES:
1308                         status = (size == sizeof(char));
1309                         break;
1310                 case GET_DHT_INFO_CMD:
1311                         status = (size == sizeof(char));
1312                         break;
1313                 case DHT_INFO_REQ:
1314                         status = (size == sizeof(char));
1315                         break;
1316                 case DHT_INFO_RES:
1317                         status = (size == sizeof(char));
1318                         break;
1319                 case FILL_DHT_CMD:
1320                         status = (size == sizeof(char));
1321                         break;
1322                 case FILL_DHT_RES:
1323                         status = (size == sizeof(char));
1324                         break;
1325                 case REBUILD_DONE_INFO:
1326                         status = (size == sizeof(char));
1327                         break;
1328                 default:
1329                         status = 0;
1330                         break;
1331         }
1332         return status;
1333 }
1334
1335 void dhtLog(const char *format, ...)
1336 {
1337         va_list args;
1338         struct timeval now;
1339
1340         if (gettimeofday(&now, NULL) < 0)
1341         {       perror("dhtLog():gettimeofday()"); }
1342         va_start(args, format);
1343         if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
1344         {       perror("dhtLog():fprintf()"); }
1345         if (vfprintf(logfile, format, args) < 0)
1346         {       perror("dhtLog():vfprintf()"); }
1347         if (fflush(logfile) == EOF)
1348         {       perror("dhtLog():fflush()"); }
1349         va_end(args);
1350
1351         return;
1352 }
1353
1354 #endif
1355
1356