tipc: make tipc_link_send_sections_fast exit earlier
[firefly-linux-kernel-4.4.55.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "name_distr.h"
41 #include "discover.h"
42 #include "config.h"
43
44 #include <linux/pkt_sched.h>
45
46 /*
47  * Error message prefixes
48  */
49 static const char *link_co_err = "Link changeover error, ";
50 static const char *link_rst_msg = "Resetting link ";
51 static const char *link_unk_evt = "Unknown link event ";
52
53 /*
54  * Out-of-range value for link session numbers
55  */
56 #define INVALID_SESSION 0x10000
57
58 /*
59  * Link state events:
60  */
61 #define  STARTING_EVT    856384768      /* link processing trigger */
62 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
63 #define  TIMEOUT_EVT     560817u        /* link timer expired */
64
65 /*
66  * The following two 'message types' is really just implementation
67  * data conveniently stored in the message header.
68  * They must not be considered part of the protocol
69  */
70 #define OPEN_MSG   0
71 #define CLOSED_MSG 1
72
73 /*
74  * State value stored in 'exp_msg_count'
75  */
76 #define START_CHANGEOVER 100000u
77
78 /**
79  * struct tipc_link_name - deconstructed link name
80  * @addr_local: network address of node at this end
81  * @if_local: name of interface at this end
82  * @addr_peer: network address of node at far end
83  * @if_peer: name of interface at far end
84  */
85 struct tipc_link_name {
86         u32 addr_local;
87         char if_local[TIPC_MAX_IF_NAME];
88         u32 addr_peer;
89         char if_peer[TIPC_MAX_IF_NAME];
90 };
91
92 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
93                                        struct sk_buff *buf);
94 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
95 static int  link_recv_changeover_msg(struct tipc_link **l_ptr,
96                                      struct sk_buff **buf);
97 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
98 static int  link_send_sections_long(struct tipc_port *sender,
99                                     struct iovec const *msg_sect,
100                                     u32 num_sect, unsigned int total_len,
101                                     u32 destnode);
102 static void link_state_event(struct tipc_link *l_ptr, u32 event);
103 static void link_reset_statistics(struct tipc_link *l_ptr);
104 static void link_print(struct tipc_link *l_ptr, const char *str);
105 static void link_start(struct tipc_link *l_ptr);
106 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
107 static void tipc_link_send_sync(struct tipc_link *l);
108 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
109
110 /*
111  *  Simple link routines
112  */
113 static unsigned int align(unsigned int i)
114 {
115         return (i + 3) & ~3u;
116 }
117
118 static void link_init_max_pkt(struct tipc_link *l_ptr)
119 {
120         u32 max_pkt;
121
122         max_pkt = (l_ptr->b_ptr->mtu & ~3);
123         if (max_pkt > MAX_MSG_SIZE)
124                 max_pkt = MAX_MSG_SIZE;
125
126         l_ptr->max_pkt_target = max_pkt;
127         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
128                 l_ptr->max_pkt = l_ptr->max_pkt_target;
129         else
130                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
131
132         l_ptr->max_pkt_probes = 0;
133 }
134
135 static u32 link_next_sent(struct tipc_link *l_ptr)
136 {
137         if (l_ptr->next_out)
138                 return buf_seqno(l_ptr->next_out);
139         return mod(l_ptr->next_out_no);
140 }
141
142 static u32 link_last_sent(struct tipc_link *l_ptr)
143 {
144         return mod(link_next_sent(l_ptr) - 1);
145 }
146
147 /*
148  *  Simple non-static link routines (i.e. referenced outside this file)
149  */
150 int tipc_link_is_up(struct tipc_link *l_ptr)
151 {
152         if (!l_ptr)
153                 return 0;
154         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
155 }
156
157 int tipc_link_is_active(struct tipc_link *l_ptr)
158 {
159         return  (l_ptr->owner->active_links[0] == l_ptr) ||
160                 (l_ptr->owner->active_links[1] == l_ptr);
161 }
162
163 /**
164  * link_name_validate - validate & (optionally) deconstruct tipc_link name
165  * @name: ptr to link name string
166  * @name_parts: ptr to area for link name components (or NULL if not needed)
167  *
168  * Returns 1 if link name is valid, otherwise 0.
169  */
170 static int link_name_validate(const char *name,
171                                 struct tipc_link_name *name_parts)
172 {
173         char name_copy[TIPC_MAX_LINK_NAME];
174         char *addr_local;
175         char *if_local;
176         char *addr_peer;
177         char *if_peer;
178         char dummy;
179         u32 z_local, c_local, n_local;
180         u32 z_peer, c_peer, n_peer;
181         u32 if_local_len;
182         u32 if_peer_len;
183
184         /* copy link name & ensure length is OK */
185         name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
186         /* need above in case non-Posix strncpy() doesn't pad with nulls */
187         strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
188         if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
189                 return 0;
190
191         /* ensure all component parts of link name are present */
192         addr_local = name_copy;
193         if_local = strchr(addr_local, ':');
194         if (if_local == NULL)
195                 return 0;
196         *(if_local++) = 0;
197         addr_peer = strchr(if_local, '-');
198         if (addr_peer == NULL)
199                 return 0;
200         *(addr_peer++) = 0;
201         if_local_len = addr_peer - if_local;
202         if_peer = strchr(addr_peer, ':');
203         if (if_peer == NULL)
204                 return 0;
205         *(if_peer++) = 0;
206         if_peer_len = strlen(if_peer) + 1;
207
208         /* validate component parts of link name */
209         if ((sscanf(addr_local, "%u.%u.%u%c",
210                     &z_local, &c_local, &n_local, &dummy) != 3) ||
211             (sscanf(addr_peer, "%u.%u.%u%c",
212                     &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
213             (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
214             (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
215             (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
216             (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME))
217                 return 0;
218
219         /* return link name components, if necessary */
220         if (name_parts) {
221                 name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
222                 strcpy(name_parts->if_local, if_local);
223                 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
224                 strcpy(name_parts->if_peer, if_peer);
225         }
226         return 1;
227 }
228
229 /**
230  * link_timeout - handle expiration of link timer
231  * @l_ptr: pointer to link
232  *
233  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
234  * with tipc_link_delete().  (There is no risk that the node will be deleted by
235  * another thread because tipc_link_delete() always cancels the link timer before
236  * tipc_node_delete() is called.)
237  */
238 static void link_timeout(struct tipc_link *l_ptr)
239 {
240         tipc_node_lock(l_ptr->owner);
241
242         /* update counters used in statistical profiling of send traffic */
243         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
244         l_ptr->stats.queue_sz_counts++;
245
246         if (l_ptr->first_out) {
247                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
248                 u32 length = msg_size(msg);
249
250                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
251                     (msg_type(msg) == FIRST_FRAGMENT)) {
252                         length = msg_size(msg_get_wrapped(msg));
253                 }
254                 if (length) {
255                         l_ptr->stats.msg_lengths_total += length;
256                         l_ptr->stats.msg_length_counts++;
257                         if (length <= 64)
258                                 l_ptr->stats.msg_length_profile[0]++;
259                         else if (length <= 256)
260                                 l_ptr->stats.msg_length_profile[1]++;
261                         else if (length <= 1024)
262                                 l_ptr->stats.msg_length_profile[2]++;
263                         else if (length <= 4096)
264                                 l_ptr->stats.msg_length_profile[3]++;
265                         else if (length <= 16384)
266                                 l_ptr->stats.msg_length_profile[4]++;
267                         else if (length <= 32768)
268                                 l_ptr->stats.msg_length_profile[5]++;
269                         else
270                                 l_ptr->stats.msg_length_profile[6]++;
271                 }
272         }
273
274         /* do all other link processing performed on a periodic basis */
275
276         link_state_event(l_ptr, TIMEOUT_EVT);
277
278         if (l_ptr->next_out)
279                 tipc_link_push_queue(l_ptr);
280
281         tipc_node_unlock(l_ptr->owner);
282 }
283
284 static void link_set_timer(struct tipc_link *l_ptr, u32 time)
285 {
286         k_start_timer(&l_ptr->timer, time);
287 }
288
289 /**
290  * tipc_link_create - create a new link
291  * @n_ptr: pointer to associated node
292  * @b_ptr: pointer to associated bearer
293  * @media_addr: media address to use when sending messages over link
294  *
295  * Returns pointer to link.
296  */
297 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
298                               struct tipc_bearer *b_ptr,
299                               const struct tipc_media_addr *media_addr)
300 {
301         struct tipc_link *l_ptr;
302         struct tipc_msg *msg;
303         char *if_name;
304         char addr_string[16];
305         u32 peer = n_ptr->addr;
306
307         if (n_ptr->link_cnt >= 2) {
308                 tipc_addr_string_fill(addr_string, n_ptr->addr);
309                 pr_err("Attempt to establish third link to %s\n", addr_string);
310                 return NULL;
311         }
312
313         if (n_ptr->links[b_ptr->identity]) {
314                 tipc_addr_string_fill(addr_string, n_ptr->addr);
315                 pr_err("Attempt to establish second link on <%s> to %s\n",
316                        b_ptr->name, addr_string);
317                 return NULL;
318         }
319
320         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
321         if (!l_ptr) {
322                 pr_warn("Link creation failed, no memory\n");
323                 return NULL;
324         }
325
326         l_ptr->addr = peer;
327         if_name = strchr(b_ptr->name, ':') + 1;
328         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
329                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
330                 tipc_node(tipc_own_addr),
331                 if_name,
332                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
333                 /* note: peer i/f name is updated by reset/activate message */
334         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
335         l_ptr->owner = n_ptr;
336         l_ptr->checkpoint = 1;
337         l_ptr->peer_session = INVALID_SESSION;
338         l_ptr->b_ptr = b_ptr;
339         link_set_supervision_props(l_ptr, b_ptr->tolerance);
340         l_ptr->state = RESET_UNKNOWN;
341
342         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
343         msg = l_ptr->pmsg;
344         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
345         msg_set_size(msg, sizeof(l_ptr->proto_msg));
346         msg_set_session(msg, (tipc_random & 0xffff));
347         msg_set_bearer_id(msg, b_ptr->identity);
348         strcpy((char *)msg_data(msg), if_name);
349
350         l_ptr->priority = b_ptr->priority;
351         tipc_link_set_queue_limits(l_ptr, b_ptr->window);
352
353         link_init_max_pkt(l_ptr);
354
355         l_ptr->next_out_no = 1;
356         INIT_LIST_HEAD(&l_ptr->waiting_ports);
357
358         link_reset_statistics(l_ptr);
359
360         tipc_node_attach_link(n_ptr, l_ptr);
361
362         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
363         list_add_tail(&l_ptr->link_list, &b_ptr->links);
364         tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
365
366         return l_ptr;
367 }
368
369 /**
370  * tipc_link_delete - delete a link
371  * @l_ptr: pointer to link
372  *
373  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
374  * This routine must not grab the node lock until after link timer cancellation
375  * to avoid a potential deadlock situation.
376  */
377 void tipc_link_delete(struct tipc_link *l_ptr)
378 {
379         if (!l_ptr) {
380                 pr_err("Attempt to delete non-existent link\n");
381                 return;
382         }
383
384         k_cancel_timer(&l_ptr->timer);
385
386         tipc_node_lock(l_ptr->owner);
387         tipc_link_reset(l_ptr);
388         tipc_node_detach_link(l_ptr->owner, l_ptr);
389         tipc_link_stop(l_ptr);
390         list_del_init(&l_ptr->link_list);
391         tipc_node_unlock(l_ptr->owner);
392         k_term_timer(&l_ptr->timer);
393         kfree(l_ptr);
394 }
395
396 static void link_start(struct tipc_link *l_ptr)
397 {
398         tipc_node_lock(l_ptr->owner);
399         link_state_event(l_ptr, STARTING_EVT);
400         tipc_node_unlock(l_ptr->owner);
401 }
402
403 /**
404  * link_schedule_port - schedule port for deferred sending
405  * @l_ptr: pointer to link
406  * @origport: reference to sending port
407  * @sz: amount of data to be sent
408  *
409  * Schedules port for renewed sending of messages after link congestion
410  * has abated.
411  */
412 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
413 {
414         struct tipc_port *p_ptr;
415
416         spin_lock_bh(&tipc_port_list_lock);
417         p_ptr = tipc_port_lock(origport);
418         if (p_ptr) {
419                 if (!p_ptr->wakeup)
420                         goto exit;
421                 if (!list_empty(&p_ptr->wait_list))
422                         goto exit;
423                 p_ptr->congested = 1;
424                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
425                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
426                 l_ptr->stats.link_congs++;
427 exit:
428                 tipc_port_unlock(p_ptr);
429         }
430         spin_unlock_bh(&tipc_port_list_lock);
431         return -ELINKCONG;
432 }
433
434 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
435 {
436         struct tipc_port *p_ptr;
437         struct tipc_port *temp_p_ptr;
438         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
439
440         if (all)
441                 win = 100000;
442         if (win <= 0)
443                 return;
444         if (!spin_trylock_bh(&tipc_port_list_lock))
445                 return;
446         if (link_congested(l_ptr))
447                 goto exit;
448         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
449                                  wait_list) {
450                 if (win <= 0)
451                         break;
452                 list_del_init(&p_ptr->wait_list);
453                 spin_lock_bh(p_ptr->lock);
454                 p_ptr->congested = 0;
455                 p_ptr->wakeup(p_ptr);
456                 win -= p_ptr->waiting_pkts;
457                 spin_unlock_bh(p_ptr->lock);
458         }
459
460 exit:
461         spin_unlock_bh(&tipc_port_list_lock);
462 }
463
464 /**
465  * link_release_outqueue - purge link's outbound message queue
466  * @l_ptr: pointer to link
467  */
468 static void link_release_outqueue(struct tipc_link *l_ptr)
469 {
470         struct sk_buff *buf = l_ptr->first_out;
471         struct sk_buff *next;
472
473         while (buf) {
474                 next = buf->next;
475                 kfree_skb(buf);
476                 buf = next;
477         }
478         l_ptr->first_out = NULL;
479         l_ptr->out_queue_size = 0;
480 }
481
482 /**
483  * tipc_link_reset_fragments - purge link's inbound message fragments queue
484  * @l_ptr: pointer to link
485  */
486 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
487 {
488         struct sk_buff *buf = l_ptr->defragm_buf;
489         struct sk_buff *next;
490
491         while (buf) {
492                 next = buf->next;
493                 kfree_skb(buf);
494                 buf = next;
495         }
496         l_ptr->defragm_buf = NULL;
497 }
498
499 /**
500  * tipc_link_stop - purge all inbound and outbound messages associated with link
501  * @l_ptr: pointer to link
502  */
503 void tipc_link_stop(struct tipc_link *l_ptr)
504 {
505         struct sk_buff *buf;
506         struct sk_buff *next;
507
508         buf = l_ptr->oldest_deferred_in;
509         while (buf) {
510                 next = buf->next;
511                 kfree_skb(buf);
512                 buf = next;
513         }
514
515         buf = l_ptr->first_out;
516         while (buf) {
517                 next = buf->next;
518                 kfree_skb(buf);
519                 buf = next;
520         }
521
522         tipc_link_reset_fragments(l_ptr);
523
524         kfree_skb(l_ptr->proto_msg_queue);
525         l_ptr->proto_msg_queue = NULL;
526 }
527
528 void tipc_link_reset(struct tipc_link *l_ptr)
529 {
530         struct sk_buff *buf;
531         u32 prev_state = l_ptr->state;
532         u32 checkpoint = l_ptr->next_in_no;
533         int was_active_link = tipc_link_is_active(l_ptr);
534
535         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
536
537         /* Link is down, accept any session */
538         l_ptr->peer_session = INVALID_SESSION;
539
540         /* Prepare for max packet size negotiation */
541         link_init_max_pkt(l_ptr);
542
543         l_ptr->state = RESET_UNKNOWN;
544
545         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
546                 return;
547
548         tipc_node_link_down(l_ptr->owner, l_ptr);
549         tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
550
551         if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
552             l_ptr->owner->permit_changeover) {
553                 l_ptr->reset_checkpoint = checkpoint;
554                 l_ptr->exp_msg_count = START_CHANGEOVER;
555         }
556
557         /* Clean up all queues: */
558         link_release_outqueue(l_ptr);
559         kfree_skb(l_ptr->proto_msg_queue);
560         l_ptr->proto_msg_queue = NULL;
561         buf = l_ptr->oldest_deferred_in;
562         while (buf) {
563                 struct sk_buff *next = buf->next;
564                 kfree_skb(buf);
565                 buf = next;
566         }
567         if (!list_empty(&l_ptr->waiting_ports))
568                 tipc_link_wakeup_ports(l_ptr, 1);
569
570         l_ptr->retransm_queue_head = 0;
571         l_ptr->retransm_queue_size = 0;
572         l_ptr->last_out = NULL;
573         l_ptr->first_out = NULL;
574         l_ptr->next_out = NULL;
575         l_ptr->unacked_window = 0;
576         l_ptr->checkpoint = 1;
577         l_ptr->next_out_no = 1;
578         l_ptr->deferred_inqueue_sz = 0;
579         l_ptr->oldest_deferred_in = NULL;
580         l_ptr->newest_deferred_in = NULL;
581         l_ptr->fsm_msg_cnt = 0;
582         l_ptr->stale_count = 0;
583         link_reset_statistics(l_ptr);
584 }
585
586
587 static void link_activate(struct tipc_link *l_ptr)
588 {
589         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
590         tipc_node_link_up(l_ptr->owner, l_ptr);
591         tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
592 }
593
594 /**
595  * link_state_event - link finite state machine
596  * @l_ptr: pointer to link
597  * @event: state machine event to process
598  */
599 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
600 {
601         struct tipc_link *other;
602         u32 cont_intv = l_ptr->continuity_interval;
603
604         if (!l_ptr->started && (event != STARTING_EVT))
605                 return;         /* Not yet. */
606
607         if (link_blocked(l_ptr)) {
608                 if (event == TIMEOUT_EVT)
609                         link_set_timer(l_ptr, cont_intv);
610                 return;   /* Changeover going on */
611         }
612
613         switch (l_ptr->state) {
614         case WORKING_WORKING:
615                 switch (event) {
616                 case TRAFFIC_MSG_EVT:
617                 case ACTIVATE_MSG:
618                         break;
619                 case TIMEOUT_EVT:
620                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
621                                 l_ptr->checkpoint = l_ptr->next_in_no;
622                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
623                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
624                                                                  0, 0, 0, 0, 0);
625                                         l_ptr->fsm_msg_cnt++;
626                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
627                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
628                                                                  1, 0, 0, 0, 0);
629                                         l_ptr->fsm_msg_cnt++;
630                                 }
631                                 link_set_timer(l_ptr, cont_intv);
632                                 break;
633                         }
634                         l_ptr->state = WORKING_UNKNOWN;
635                         l_ptr->fsm_msg_cnt = 0;
636                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
637                         l_ptr->fsm_msg_cnt++;
638                         link_set_timer(l_ptr, cont_intv / 4);
639                         break;
640                 case RESET_MSG:
641                         pr_info("%s<%s>, requested by peer\n", link_rst_msg,
642                                 l_ptr->name);
643                         tipc_link_reset(l_ptr);
644                         l_ptr->state = RESET_RESET;
645                         l_ptr->fsm_msg_cnt = 0;
646                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
647                         l_ptr->fsm_msg_cnt++;
648                         link_set_timer(l_ptr, cont_intv);
649                         break;
650                 default:
651                         pr_err("%s%u in WW state\n", link_unk_evt, event);
652                 }
653                 break;
654         case WORKING_UNKNOWN:
655                 switch (event) {
656                 case TRAFFIC_MSG_EVT:
657                 case ACTIVATE_MSG:
658                         l_ptr->state = WORKING_WORKING;
659                         l_ptr->fsm_msg_cnt = 0;
660                         link_set_timer(l_ptr, cont_intv);
661                         break;
662                 case RESET_MSG:
663                         pr_info("%s<%s>, requested by peer while probing\n",
664                                 link_rst_msg, l_ptr->name);
665                         tipc_link_reset(l_ptr);
666                         l_ptr->state = RESET_RESET;
667                         l_ptr->fsm_msg_cnt = 0;
668                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
669                         l_ptr->fsm_msg_cnt++;
670                         link_set_timer(l_ptr, cont_intv);
671                         break;
672                 case TIMEOUT_EVT:
673                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
674                                 l_ptr->state = WORKING_WORKING;
675                                 l_ptr->fsm_msg_cnt = 0;
676                                 l_ptr->checkpoint = l_ptr->next_in_no;
677                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
678                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
679                                                                  0, 0, 0, 0, 0);
680                                         l_ptr->fsm_msg_cnt++;
681                                 }
682                                 link_set_timer(l_ptr, cont_intv);
683                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
684                                 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
685                                                          1, 0, 0, 0, 0);
686                                 l_ptr->fsm_msg_cnt++;
687                                 link_set_timer(l_ptr, cont_intv / 4);
688                         } else {        /* Link has failed */
689                                 pr_warn("%s<%s>, peer not responding\n",
690                                         link_rst_msg, l_ptr->name);
691                                 tipc_link_reset(l_ptr);
692                                 l_ptr->state = RESET_UNKNOWN;
693                                 l_ptr->fsm_msg_cnt = 0;
694                                 tipc_link_send_proto_msg(l_ptr, RESET_MSG,
695                                                          0, 0, 0, 0, 0);
696                                 l_ptr->fsm_msg_cnt++;
697                                 link_set_timer(l_ptr, cont_intv);
698                         }
699                         break;
700                 default:
701                         pr_err("%s%u in WU state\n", link_unk_evt, event);
702                 }
703                 break;
704         case RESET_UNKNOWN:
705                 switch (event) {
706                 case TRAFFIC_MSG_EVT:
707                         break;
708                 case ACTIVATE_MSG:
709                         other = l_ptr->owner->active_links[0];
710                         if (other && link_working_unknown(other))
711                                 break;
712                         l_ptr->state = WORKING_WORKING;
713                         l_ptr->fsm_msg_cnt = 0;
714                         link_activate(l_ptr);
715                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
716                         l_ptr->fsm_msg_cnt++;
717                         if (l_ptr->owner->working_links == 1)
718                                 tipc_link_send_sync(l_ptr);
719                         link_set_timer(l_ptr, cont_intv);
720                         break;
721                 case RESET_MSG:
722                         l_ptr->state = RESET_RESET;
723                         l_ptr->fsm_msg_cnt = 0;
724                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
725                         l_ptr->fsm_msg_cnt++;
726                         link_set_timer(l_ptr, cont_intv);
727                         break;
728                 case STARTING_EVT:
729                         l_ptr->started = 1;
730                         /* fall through */
731                 case TIMEOUT_EVT:
732                         tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
733                         l_ptr->fsm_msg_cnt++;
734                         link_set_timer(l_ptr, cont_intv);
735                         break;
736                 default:
737                         pr_err("%s%u in RU state\n", link_unk_evt, event);
738                 }
739                 break;
740         case RESET_RESET:
741                 switch (event) {
742                 case TRAFFIC_MSG_EVT:
743                 case ACTIVATE_MSG:
744                         other = l_ptr->owner->active_links[0];
745                         if (other && link_working_unknown(other))
746                                 break;
747                         l_ptr->state = WORKING_WORKING;
748                         l_ptr->fsm_msg_cnt = 0;
749                         link_activate(l_ptr);
750                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
751                         l_ptr->fsm_msg_cnt++;
752                         if (l_ptr->owner->working_links == 1)
753                                 tipc_link_send_sync(l_ptr);
754                         link_set_timer(l_ptr, cont_intv);
755                         break;
756                 case RESET_MSG:
757                         break;
758                 case TIMEOUT_EVT:
759                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
760                         l_ptr->fsm_msg_cnt++;
761                         link_set_timer(l_ptr, cont_intv);
762                         break;
763                 default:
764                         pr_err("%s%u in RR state\n", link_unk_evt, event);
765                 }
766                 break;
767         default:
768                 pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
769         }
770 }
771
772 /*
773  * link_bundle_buf(): Append contents of a buffer to
774  * the tail of an existing one.
775  */
776 static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
777                            struct sk_buff *buf)
778 {
779         struct tipc_msg *bundler_msg = buf_msg(bundler);
780         struct tipc_msg *msg = buf_msg(buf);
781         u32 size = msg_size(msg);
782         u32 bundle_size = msg_size(bundler_msg);
783         u32 to_pos = align(bundle_size);
784         u32 pad = to_pos - bundle_size;
785
786         if (msg_user(bundler_msg) != MSG_BUNDLER)
787                 return 0;
788         if (msg_type(bundler_msg) != OPEN_MSG)
789                 return 0;
790         if (skb_tailroom(bundler) < (pad + size))
791                 return 0;
792         if (l_ptr->max_pkt < (to_pos + size))
793                 return 0;
794
795         skb_put(bundler, pad + size);
796         skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
797         msg_set_size(bundler_msg, to_pos + size);
798         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
799         kfree_skb(buf);
800         l_ptr->stats.sent_bundled++;
801         return 1;
802 }
803
804 static void link_add_to_outqueue(struct tipc_link *l_ptr,
805                                  struct sk_buff *buf,
806                                  struct tipc_msg *msg)
807 {
808         u32 ack = mod(l_ptr->next_in_no - 1);
809         u32 seqno = mod(l_ptr->next_out_no++);
810
811         msg_set_word(msg, 2, ((ack << 16) | seqno));
812         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
813         buf->next = NULL;
814         if (l_ptr->first_out) {
815                 l_ptr->last_out->next = buf;
816                 l_ptr->last_out = buf;
817         } else
818                 l_ptr->first_out = l_ptr->last_out = buf;
819
820         l_ptr->out_queue_size++;
821         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
822                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
823 }
824
825 static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
826                                        struct sk_buff *buf_chain,
827                                        u32 long_msgno)
828 {
829         struct sk_buff *buf;
830         struct tipc_msg *msg;
831
832         if (!l_ptr->next_out)
833                 l_ptr->next_out = buf_chain;
834         while (buf_chain) {
835                 buf = buf_chain;
836                 buf_chain = buf_chain->next;
837
838                 msg = buf_msg(buf);
839                 msg_set_long_msgno(msg, long_msgno);
840                 link_add_to_outqueue(l_ptr, buf, msg);
841         }
842 }
843
844 /*
845  * tipc_link_send_buf() is the 'full path' for messages, called from
846  * inside TIPC when the 'fast path' in tipc_send_buf
847  * has failed, and from link_send()
848  */
849 int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
850 {
851         struct tipc_msg *msg = buf_msg(buf);
852         u32 size = msg_size(msg);
853         u32 dsz = msg_data_sz(msg);
854         u32 queue_size = l_ptr->out_queue_size;
855         u32 imp = tipc_msg_tot_importance(msg);
856         u32 queue_limit = l_ptr->queue_limit[imp];
857         u32 max_packet = l_ptr->max_pkt;
858
859         /* Match msg importance against queue limits: */
860         if (unlikely(queue_size >= queue_limit)) {
861                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
862                         link_schedule_port(l_ptr, msg_origport(msg), size);
863                         kfree_skb(buf);
864                         return -ELINKCONG;
865                 }
866                 kfree_skb(buf);
867                 if (imp > CONN_MANAGER) {
868                         pr_warn("%s<%s>, send queue full", link_rst_msg,
869                                 l_ptr->name);
870                         tipc_link_reset(l_ptr);
871                 }
872                 return dsz;
873         }
874
875         /* Fragmentation needed ? */
876         if (size > max_packet)
877                 return link_send_long_buf(l_ptr, buf);
878
879         /* Packet can be queued or sent. */
880         if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
881                    !link_congested(l_ptr))) {
882                 link_add_to_outqueue(l_ptr, buf, msg);
883
884                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
885                 l_ptr->unacked_window = 0;
886                 return dsz;
887         }
888         /* Congestion: can message be bundled ? */
889         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
890             (msg_user(msg) != MSG_FRAGMENTER)) {
891
892                 /* Try adding message to an existing bundle */
893                 if (l_ptr->next_out &&
894                     link_bundle_buf(l_ptr, l_ptr->last_out, buf))
895                         return dsz;
896
897                 /* Try creating a new bundle */
898                 if (size <= max_packet * 2 / 3) {
899                         struct sk_buff *bundler = tipc_buf_acquire(max_packet);
900                         struct tipc_msg bundler_hdr;
901
902                         if (bundler) {
903                                 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
904                                          INT_H_SIZE, l_ptr->addr);
905                                 skb_copy_to_linear_data(bundler, &bundler_hdr,
906                                                         INT_H_SIZE);
907                                 skb_trim(bundler, INT_H_SIZE);
908                                 link_bundle_buf(l_ptr, bundler, buf);
909                                 buf = bundler;
910                                 msg = buf_msg(buf);
911                                 l_ptr->stats.sent_bundles++;
912                         }
913                 }
914         }
915         if (!l_ptr->next_out)
916                 l_ptr->next_out = buf;
917         link_add_to_outqueue(l_ptr, buf, msg);
918         return dsz;
919 }
920
921 /*
922  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
923  * not been selected yet, and the the owner node is not locked
924  * Called by TIPC internal users, e.g. the name distributor
925  */
926 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
927 {
928         struct tipc_link *l_ptr;
929         struct tipc_node *n_ptr;
930         int res = -ELINKCONG;
931
932         read_lock_bh(&tipc_net_lock);
933         n_ptr = tipc_node_find(dest);
934         if (n_ptr) {
935                 tipc_node_lock(n_ptr);
936                 l_ptr = n_ptr->active_links[selector & 1];
937                 if (l_ptr)
938                         res = tipc_link_send_buf(l_ptr, buf);
939                 else
940                         kfree_skb(buf);
941                 tipc_node_unlock(n_ptr);
942         } else {
943                 kfree_skb(buf);
944         }
945         read_unlock_bh(&tipc_net_lock);
946         return res;
947 }
948
949 /*
950  * tipc_link_send_sync - synchronize broadcast link endpoints.
951  *
952  * Give a newly added peer node the sequence number where it should
953  * start receiving and acking broadcast packets.
954  *
955  * Called with node locked
956  */
957 static void tipc_link_send_sync(struct tipc_link *l)
958 {
959         struct sk_buff *buf;
960         struct tipc_msg *msg;
961
962         buf = tipc_buf_acquire(INT_H_SIZE);
963         if (!buf)
964                 return;
965
966         msg = buf_msg(buf);
967         tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
968         msg_set_last_bcast(msg, l->owner->bclink.acked);
969         link_add_chain_to_outqueue(l, buf, 0);
970         tipc_link_push_queue(l);
971 }
972
973 /*
974  * tipc_link_recv_sync - synchronize broadcast link endpoints.
975  * Receive the sequence number where we should start receiving and
976  * acking broadcast packets from a newly added peer node, and open
977  * up for reception of such packets.
978  *
979  * Called with node locked
980  */
981 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
982 {
983         struct tipc_msg *msg = buf_msg(buf);
984
985         n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
986         n->bclink.recv_permitted = true;
987         kfree_skb(buf);
988 }
989
990 /*
991  * tipc_link_send_names - send name table entries to new neighbor
992  *
993  * Send routine for bulk delivery of name table messages when contact
994  * with a new neighbor occurs. No link congestion checking is performed
995  * because name table messages *must* be delivered. The messages must be
996  * small enough not to require fragmentation.
997  * Called without any locks held.
998  */
999 void tipc_link_send_names(struct list_head *message_list, u32 dest)
1000 {
1001         struct tipc_node *n_ptr;
1002         struct tipc_link *l_ptr;
1003         struct sk_buff *buf;
1004         struct sk_buff *temp_buf;
1005
1006         if (list_empty(message_list))
1007                 return;
1008
1009         read_lock_bh(&tipc_net_lock);
1010         n_ptr = tipc_node_find(dest);
1011         if (n_ptr) {
1012                 tipc_node_lock(n_ptr);
1013                 l_ptr = n_ptr->active_links[0];
1014                 if (l_ptr) {
1015                         /* convert circular list to linear list */
1016                         ((struct sk_buff *)message_list->prev)->next = NULL;
1017                         link_add_chain_to_outqueue(l_ptr,
1018                                 (struct sk_buff *)message_list->next, 0);
1019                         tipc_link_push_queue(l_ptr);
1020                         INIT_LIST_HEAD(message_list);
1021                 }
1022                 tipc_node_unlock(n_ptr);
1023         }
1024         read_unlock_bh(&tipc_net_lock);
1025
1026         /* discard the messages if they couldn't be sent */
1027         list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
1028                 list_del((struct list_head *)buf);
1029                 kfree_skb(buf);
1030         }
1031 }
1032
1033 /*
1034  * link_send_buf_fast: Entry for data messages where the
1035  * destination link is known and the header is complete,
1036  * inclusive total message length. Very time critical.
1037  * Link is locked. Returns user data length.
1038  */
1039 static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
1040                               u32 *used_max_pkt)
1041 {
1042         struct tipc_msg *msg = buf_msg(buf);
1043         int res = msg_data_sz(msg);
1044
1045         if (likely(!link_congested(l_ptr))) {
1046                 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1047                         if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
1048                                 link_add_to_outqueue(l_ptr, buf, msg);
1049                                 tipc_bearer_send(l_ptr->b_ptr, buf,
1050                                                  &l_ptr->media_addr);
1051                                 l_ptr->unacked_window = 0;
1052                                 return res;
1053                         }
1054                 } else
1055                         *used_max_pkt = l_ptr->max_pkt;
1056         }
1057         return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1058 }
1059
1060 /*
1061  * tipc_link_send_sections_fast: Entry for messages where the
1062  * destination processor is known and the header is complete,
1063  * except for total message length.
1064  * Returns user data length or errno.
1065  */
1066 int tipc_link_send_sections_fast(struct tipc_port *sender,
1067                                  struct iovec const *msg_sect,
1068                                  const u32 num_sect, unsigned int total_len,
1069                                  u32 destaddr)
1070 {
1071         struct tipc_msg *hdr = &sender->phdr;
1072         struct tipc_link *l_ptr;
1073         struct sk_buff *buf;
1074         struct tipc_node *node;
1075         int res;
1076         u32 selector = msg_origport(hdr) & 1;
1077
1078 again:
1079         /*
1080          * Try building message using port's max_pkt hint.
1081          * (Must not hold any locks while building message.)
1082          */
1083         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
1084                              sender->max_pkt, &buf);
1085         /* Exit if build request was invalid */
1086         if (unlikely(res < 0))
1087                 return res;
1088
1089         read_lock_bh(&tipc_net_lock);
1090         node = tipc_node_find(destaddr);
1091         if (likely(node)) {
1092                 tipc_node_lock(node);
1093                 l_ptr = node->active_links[selector];
1094                 if (likely(l_ptr)) {
1095                         if (likely(buf)) {
1096                                 res = link_send_buf_fast(l_ptr, buf,
1097                                                          &sender->max_pkt);
1098 exit:
1099                                 tipc_node_unlock(node);
1100                                 read_unlock_bh(&tipc_net_lock);
1101                                 return res;
1102                         }
1103
1104                         /* Exit if link (or bearer) is congested */
1105                         if (link_congested(l_ptr) ||
1106                             tipc_bearer_blocked(l_ptr->b_ptr)) {
1107                                 res = link_schedule_port(l_ptr,
1108                                                          sender->ref, res);
1109                                 goto exit;
1110                         }
1111
1112                         /*
1113                          * Message size exceeds max_pkt hint; update hint,
1114                          * then re-try fast path or fragment the message
1115                          */
1116                         sender->max_pkt = l_ptr->max_pkt;
1117                         tipc_node_unlock(node);
1118                         read_unlock_bh(&tipc_net_lock);
1119
1120
1121                         if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1122                                 goto again;
1123
1124                         return link_send_sections_long(sender, msg_sect,
1125                                                        num_sect, total_len,
1126                                                        destaddr);
1127                 }
1128                 tipc_node_unlock(node);
1129         }
1130         read_unlock_bh(&tipc_net_lock);
1131
1132         /* Couldn't find a link to the destination node */
1133         if (buf)
1134                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1135         if (res >= 0)
1136                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1137                                                  total_len, TIPC_ERR_NO_NODE);
1138         return res;
1139 }
1140
1141 /*
1142  * link_send_sections_long(): Entry for long messages where the
1143  * destination node is known and the header is complete,
1144  * inclusive total message length.
1145  * Link and bearer congestion status have been checked to be ok,
1146  * and are ignored if they change.
1147  *
1148  * Note that fragments do not use the full link MTU so that they won't have
1149  * to undergo refragmentation if link changeover causes them to be sent
1150  * over another link with an additional tunnel header added as prefix.
1151  * (Refragmentation will still occur if the other link has a smaller MTU.)
1152  *
1153  * Returns user data length or errno.
1154  */
1155 static int link_send_sections_long(struct tipc_port *sender,
1156                                    struct iovec const *msg_sect,
1157                                    u32 num_sect, unsigned int total_len,
1158                                    u32 destaddr)
1159 {
1160         struct tipc_link *l_ptr;
1161         struct tipc_node *node;
1162         struct tipc_msg *hdr = &sender->phdr;
1163         u32 dsz = total_len;
1164         u32 max_pkt, fragm_sz, rest;
1165         struct tipc_msg fragm_hdr;
1166         struct sk_buff *buf, *buf_chain, *prev;
1167         u32 fragm_crs, fragm_rest, hsz, sect_rest;
1168         const unchar *sect_crs;
1169         int curr_sect;
1170         u32 fragm_no;
1171
1172 again:
1173         fragm_no = 1;
1174         max_pkt = sender->max_pkt - INT_H_SIZE;
1175                 /* leave room for tunnel header in case of link changeover */
1176         fragm_sz = max_pkt - INT_H_SIZE;
1177                 /* leave room for fragmentation header in each fragment */
1178         rest = dsz;
1179         fragm_crs = 0;
1180         fragm_rest = 0;
1181         sect_rest = 0;
1182         sect_crs = NULL;
1183         curr_sect = -1;
1184
1185         /* Prepare reusable fragment header */
1186         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1187                  INT_H_SIZE, msg_destnode(hdr));
1188         msg_set_size(&fragm_hdr, max_pkt);
1189         msg_set_fragm_no(&fragm_hdr, 1);
1190
1191         /* Prepare header of first fragment */
1192         buf_chain = buf = tipc_buf_acquire(max_pkt);
1193         if (!buf)
1194                 return -ENOMEM;
1195         buf->next = NULL;
1196         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1197         hsz = msg_hdr_sz(hdr);
1198         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1199
1200         /* Chop up message */
1201         fragm_crs = INT_H_SIZE + hsz;
1202         fragm_rest = fragm_sz - hsz;
1203
1204         do {            /* For all sections */
1205                 u32 sz;
1206
1207                 if (!sect_rest) {
1208                         sect_rest = msg_sect[++curr_sect].iov_len;
1209                         sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1210                 }
1211
1212                 if (sect_rest < fragm_rest)
1213                         sz = sect_rest;
1214                 else
1215                         sz = fragm_rest;
1216
1217                 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1218 error:
1219                         for (; buf_chain; buf_chain = buf) {
1220                                 buf = buf_chain->next;
1221                                 kfree_skb(buf_chain);
1222                         }
1223                         return -EFAULT;
1224                 }
1225                 sect_crs += sz;
1226                 sect_rest -= sz;
1227                 fragm_crs += sz;
1228                 fragm_rest -= sz;
1229                 rest -= sz;
1230
1231                 if (!fragm_rest && rest) {
1232
1233                         /* Initiate new fragment: */
1234                         if (rest <= fragm_sz) {
1235                                 fragm_sz = rest;
1236                                 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1237                         } else {
1238                                 msg_set_type(&fragm_hdr, FRAGMENT);
1239                         }
1240                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1241                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1242                         prev = buf;
1243                         buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1244                         if (!buf)
1245                                 goto error;
1246
1247                         buf->next = NULL;
1248                         prev->next = buf;
1249                         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1250                         fragm_crs = INT_H_SIZE;
1251                         fragm_rest = fragm_sz;
1252                 }
1253         } while (rest > 0);
1254
1255         /*
1256          * Now we have a buffer chain. Select a link and check
1257          * that packet size is still OK
1258          */
1259         node = tipc_node_find(destaddr);
1260         if (likely(node)) {
1261                 tipc_node_lock(node);
1262                 l_ptr = node->active_links[sender->ref & 1];
1263                 if (!l_ptr) {
1264                         tipc_node_unlock(node);
1265                         goto reject;
1266                 }
1267                 if (l_ptr->max_pkt < max_pkt) {
1268                         sender->max_pkt = l_ptr->max_pkt;
1269                         tipc_node_unlock(node);
1270                         for (; buf_chain; buf_chain = buf) {
1271                                 buf = buf_chain->next;
1272                                 kfree_skb(buf_chain);
1273                         }
1274                         goto again;
1275                 }
1276         } else {
1277 reject:
1278                 for (; buf_chain; buf_chain = buf) {
1279                         buf = buf_chain->next;
1280                         kfree_skb(buf_chain);
1281                 }
1282                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1283                                                  total_len, TIPC_ERR_NO_NODE);
1284         }
1285
1286         /* Append chain of fragments to send queue & send them */
1287         l_ptr->long_msg_seq_no++;
1288         link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1289         l_ptr->stats.sent_fragments += fragm_no;
1290         l_ptr->stats.sent_fragmented++;
1291         tipc_link_push_queue(l_ptr);
1292         tipc_node_unlock(node);
1293         return dsz;
1294 }
1295
1296 /*
1297  * tipc_link_push_packet: Push one unsent packet to the media
1298  */
1299 u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1300 {
1301         struct sk_buff *buf = l_ptr->first_out;
1302         u32 r_q_size = l_ptr->retransm_queue_size;
1303         u32 r_q_head = l_ptr->retransm_queue_head;
1304
1305         /* Step to position where retransmission failed, if any,    */
1306         /* consider that buffers may have been released in meantime */
1307         if (r_q_size && buf) {
1308                 u32 last = lesser(mod(r_q_head + r_q_size),
1309                                   link_last_sent(l_ptr));
1310                 u32 first = buf_seqno(buf);
1311
1312                 while (buf && less(first, r_q_head)) {
1313                         first = mod(first + 1);
1314                         buf = buf->next;
1315                 }
1316                 l_ptr->retransm_queue_head = r_q_head = first;
1317                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1318         }
1319
1320         /* Continue retransmission now, if there is anything: */
1321         if (r_q_size && buf) {
1322                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1323                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1324                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1325                 l_ptr->retransm_queue_head = mod(++r_q_head);
1326                 l_ptr->retransm_queue_size = --r_q_size;
1327                 l_ptr->stats.retransmitted++;
1328                 return 0;
1329         }
1330
1331         /* Send deferred protocol message, if any: */
1332         buf = l_ptr->proto_msg_queue;
1333         if (buf) {
1334                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1335                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1336                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1337                 l_ptr->unacked_window = 0;
1338                 kfree_skb(buf);
1339                 l_ptr->proto_msg_queue = NULL;
1340                 return 0;
1341         }
1342
1343         /* Send one deferred data message, if send window not full: */
1344         buf = l_ptr->next_out;
1345         if (buf) {
1346                 struct tipc_msg *msg = buf_msg(buf);
1347                 u32 next = msg_seqno(msg);
1348                 u32 first = buf_seqno(l_ptr->first_out);
1349
1350                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1351                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1352                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1353                         tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1354                         if (msg_user(msg) == MSG_BUNDLER)
1355                                 msg_set_type(msg, CLOSED_MSG);
1356                         l_ptr->next_out = buf->next;
1357                         return 0;
1358                 }
1359         }
1360         return 1;
1361 }
1362
1363 /*
1364  * push_queue(): push out the unsent messages of a link where
1365  *               congestion has abated. Node is locked
1366  */
1367 void tipc_link_push_queue(struct tipc_link *l_ptr)
1368 {
1369         u32 res;
1370
1371         if (tipc_bearer_blocked(l_ptr->b_ptr))
1372                 return;
1373
1374         do {
1375                 res = tipc_link_push_packet(l_ptr);
1376         } while (!res);
1377 }
1378
1379 static void link_reset_all(unsigned long addr)
1380 {
1381         struct tipc_node *n_ptr;
1382         char addr_string[16];
1383         u32 i;
1384
1385         read_lock_bh(&tipc_net_lock);
1386         n_ptr = tipc_node_find((u32)addr);
1387         if (!n_ptr) {
1388                 read_unlock_bh(&tipc_net_lock);
1389                 return; /* node no longer exists */
1390         }
1391
1392         tipc_node_lock(n_ptr);
1393
1394         pr_warn("Resetting all links to %s\n",
1395                 tipc_addr_string_fill(addr_string, n_ptr->addr));
1396
1397         for (i = 0; i < MAX_BEARERS; i++) {
1398                 if (n_ptr->links[i]) {
1399                         link_print(n_ptr->links[i], "Resetting link\n");
1400                         tipc_link_reset(n_ptr->links[i]);
1401                 }
1402         }
1403
1404         tipc_node_unlock(n_ptr);
1405         read_unlock_bh(&tipc_net_lock);
1406 }
1407
1408 static void link_retransmit_failure(struct tipc_link *l_ptr,
1409                                     struct sk_buff *buf)
1410 {
1411         struct tipc_msg *msg = buf_msg(buf);
1412
1413         pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
1414
1415         if (l_ptr->addr) {
1416                 /* Handle failure on standard link */
1417                 link_print(l_ptr, "Resetting link\n");
1418                 tipc_link_reset(l_ptr);
1419
1420         } else {
1421                 /* Handle failure on broadcast link */
1422                 struct tipc_node *n_ptr;
1423                 char addr_string[16];
1424
1425                 pr_info("Msg seq number: %u,  ", msg_seqno(msg));
1426                 pr_cont("Outstanding acks: %lu\n",
1427                         (unsigned long) TIPC_SKB_CB(buf)->handle);
1428
1429                 n_ptr = tipc_bclink_retransmit_to();
1430                 tipc_node_lock(n_ptr);
1431
1432                 tipc_addr_string_fill(addr_string, n_ptr->addr);
1433                 pr_info("Broadcast link info for %s\n", addr_string);
1434                 pr_info("Reception permitted: %d,  Acked: %u\n",
1435                         n_ptr->bclink.recv_permitted,
1436                         n_ptr->bclink.acked);
1437                 pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
1438                         n_ptr->bclink.last_in,
1439                         n_ptr->bclink.oos_state,
1440                         n_ptr->bclink.last_sent);
1441
1442                 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1443
1444                 tipc_node_unlock(n_ptr);
1445
1446                 l_ptr->stale_count = 0;
1447         }
1448 }
1449
1450 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1451                           u32 retransmits)
1452 {
1453         struct tipc_msg *msg;
1454
1455         if (!buf)
1456                 return;
1457
1458         msg = buf_msg(buf);
1459
1460         if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1461                 if (l_ptr->retransm_queue_size == 0) {
1462                         l_ptr->retransm_queue_head = msg_seqno(msg);
1463                         l_ptr->retransm_queue_size = retransmits;
1464                 } else {
1465                         pr_err("Unexpected retransmit on link %s (qsize=%d)\n",
1466                                l_ptr->name, l_ptr->retransm_queue_size);
1467                 }
1468                 return;
1469         } else {
1470                 /* Detect repeated retransmit failures on unblocked bearer */
1471                 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1472                         if (++l_ptr->stale_count > 100) {
1473                                 link_retransmit_failure(l_ptr, buf);
1474                                 return;
1475                         }
1476                 } else {
1477                         l_ptr->last_retransmitted = msg_seqno(msg);
1478                         l_ptr->stale_count = 1;
1479                 }
1480         }
1481
1482         while (retransmits && (buf != l_ptr->next_out) && buf) {
1483                 msg = buf_msg(buf);
1484                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1485                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1486                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1487                 buf = buf->next;
1488                 retransmits--;
1489                 l_ptr->stats.retransmitted++;
1490         }
1491
1492         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1493 }
1494
1495 /**
1496  * link_insert_deferred_queue - insert deferred messages back into receive chain
1497  */
1498 static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1499                                                   struct sk_buff *buf)
1500 {
1501         u32 seq_no;
1502
1503         if (l_ptr->oldest_deferred_in == NULL)
1504                 return buf;
1505
1506         seq_no = buf_seqno(l_ptr->oldest_deferred_in);
1507         if (seq_no == mod(l_ptr->next_in_no)) {
1508                 l_ptr->newest_deferred_in->next = buf;
1509                 buf = l_ptr->oldest_deferred_in;
1510                 l_ptr->oldest_deferred_in = NULL;
1511                 l_ptr->deferred_inqueue_sz = 0;
1512         }
1513         return buf;
1514 }
1515
1516 /**
1517  * link_recv_buf_validate - validate basic format of received message
1518  *
1519  * This routine ensures a TIPC message has an acceptable header, and at least
1520  * as much data as the header indicates it should.  The routine also ensures
1521  * that the entire message header is stored in the main fragment of the message
1522  * buffer, to simplify future access to message header fields.
1523  *
1524  * Note: Having extra info present in the message header or data areas is OK.
1525  * TIPC will ignore the excess, under the assumption that it is optional info
1526  * introduced by a later release of the protocol.
1527  */
1528 static int link_recv_buf_validate(struct sk_buff *buf)
1529 {
1530         static u32 min_data_hdr_size[8] = {
1531                 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1532                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1533                 };
1534
1535         struct tipc_msg *msg;
1536         u32 tipc_hdr[2];
1537         u32 size;
1538         u32 hdr_size;
1539         u32 min_hdr_size;
1540
1541         if (unlikely(buf->len < MIN_H_SIZE))
1542                 return 0;
1543
1544         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1545         if (msg == NULL)
1546                 return 0;
1547
1548         if (unlikely(msg_version(msg) != TIPC_VERSION))
1549                 return 0;
1550
1551         size = msg_size(msg);
1552         hdr_size = msg_hdr_sz(msg);
1553         min_hdr_size = msg_isdata(msg) ?
1554                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1555
1556         if (unlikely((hdr_size < min_hdr_size) ||
1557                      (size < hdr_size) ||
1558                      (buf->len < size) ||
1559                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1560                 return 0;
1561
1562         return pskb_may_pull(buf, hdr_size);
1563 }
1564
1565 /**
1566  * tipc_recv_msg - process TIPC messages arriving from off-node
1567  * @head: pointer to message buffer chain
1568  * @tb_ptr: pointer to bearer message arrived on
1569  *
1570  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1571  * structure (i.e. cannot be NULL), but bearer can be inactive.
1572  */
1573 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1574 {
1575         read_lock_bh(&tipc_net_lock);
1576         while (head) {
1577                 struct tipc_node *n_ptr;
1578                 struct tipc_link *l_ptr;
1579                 struct sk_buff *crs;
1580                 struct sk_buff *buf = head;
1581                 struct tipc_msg *msg;
1582                 u32 seq_no;
1583                 u32 ackd;
1584                 u32 released = 0;
1585                 int type;
1586
1587                 head = head->next;
1588
1589                 /* Ensure bearer is still enabled */
1590                 if (unlikely(!b_ptr->active))
1591                         goto cont;
1592
1593                 /* Ensure message is well-formed */
1594                 if (unlikely(!link_recv_buf_validate(buf)))
1595                         goto cont;
1596
1597                 /* Ensure message data is a single contiguous unit */
1598                 if (unlikely(skb_linearize(buf)))
1599                         goto cont;
1600
1601                 /* Handle arrival of a non-unicast link message */
1602                 msg = buf_msg(buf);
1603
1604                 if (unlikely(msg_non_seq(msg))) {
1605                         if (msg_user(msg) ==  LINK_CONFIG)
1606                                 tipc_disc_recv_msg(buf, b_ptr);
1607                         else
1608                                 tipc_bclink_recv_pkt(buf);
1609                         continue;
1610                 }
1611
1612                 /* Discard unicast link messages destined for another node */
1613                 if (unlikely(!msg_short(msg) &&
1614                              (msg_destnode(msg) != tipc_own_addr)))
1615                         goto cont;
1616
1617                 /* Locate neighboring node that sent message */
1618                 n_ptr = tipc_node_find(msg_prevnode(msg));
1619                 if (unlikely(!n_ptr))
1620                         goto cont;
1621                 tipc_node_lock(n_ptr);
1622
1623                 /* Locate unicast link endpoint that should handle message */
1624                 l_ptr = n_ptr->links[b_ptr->identity];
1625                 if (unlikely(!l_ptr)) {
1626                         tipc_node_unlock(n_ptr);
1627                         goto cont;
1628                 }
1629
1630                 /* Verify that communication with node is currently allowed */
1631                 if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
1632                         msg_user(msg) == LINK_PROTOCOL &&
1633                         (msg_type(msg) == RESET_MSG ||
1634                                         msg_type(msg) == ACTIVATE_MSG) &&
1635                         !msg_redundant_link(msg))
1636                         n_ptr->block_setup &= ~WAIT_PEER_DOWN;
1637
1638                 if (n_ptr->block_setup) {
1639                         tipc_node_unlock(n_ptr);
1640                         goto cont;
1641                 }
1642
1643                 /* Validate message sequence number info */
1644                 seq_no = msg_seqno(msg);
1645                 ackd = msg_ack(msg);
1646
1647                 /* Release acked messages */
1648                 if (n_ptr->bclink.recv_permitted)
1649                         tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1650
1651                 crs = l_ptr->first_out;
1652                 while ((crs != l_ptr->next_out) &&
1653                        less_eq(buf_seqno(crs), ackd)) {
1654                         struct sk_buff *next = crs->next;
1655
1656                         kfree_skb(crs);
1657                         crs = next;
1658                         released++;
1659                 }
1660                 if (released) {
1661                         l_ptr->first_out = crs;
1662                         l_ptr->out_queue_size -= released;
1663                 }
1664
1665                 /* Try sending any messages link endpoint has pending */
1666                 if (unlikely(l_ptr->next_out))
1667                         tipc_link_push_queue(l_ptr);
1668                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1669                         tipc_link_wakeup_ports(l_ptr, 0);
1670                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1671                         l_ptr->stats.sent_acks++;
1672                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1673                 }
1674
1675                 /* Now (finally!) process the incoming message */
1676 protocol_check:
1677                 if (likely(link_working_working(l_ptr))) {
1678                         if (likely(seq_no == mod(l_ptr->next_in_no))) {
1679                                 l_ptr->next_in_no++;
1680                                 if (unlikely(l_ptr->oldest_deferred_in))
1681                                         head = link_insert_deferred_queue(l_ptr,
1682                                                                           head);
1683 deliver:
1684                                 if (likely(msg_isdata(msg))) {
1685                                         tipc_node_unlock(n_ptr);
1686                                         tipc_port_recv_msg(buf);
1687                                         continue;
1688                                 }
1689                                 switch (msg_user(msg)) {
1690                                         int ret;
1691                                 case MSG_BUNDLER:
1692                                         l_ptr->stats.recv_bundles++;
1693                                         l_ptr->stats.recv_bundled +=
1694                                                 msg_msgcnt(msg);
1695                                         tipc_node_unlock(n_ptr);
1696                                         tipc_link_recv_bundle(buf);
1697                                         continue;
1698                                 case NAME_DISTRIBUTOR:
1699                                         n_ptr->bclink.recv_permitted = true;
1700                                         tipc_node_unlock(n_ptr);
1701                                         tipc_named_recv(buf);
1702                                         continue;
1703                                 case BCAST_PROTOCOL:
1704                                         tipc_link_recv_sync(n_ptr, buf);
1705                                         tipc_node_unlock(n_ptr);
1706                                         continue;
1707                                 case CONN_MANAGER:
1708                                         tipc_node_unlock(n_ptr);
1709                                         tipc_port_recv_proto_msg(buf);
1710                                         continue;
1711                                 case MSG_FRAGMENTER:
1712                                         l_ptr->stats.recv_fragments++;
1713                                         ret = tipc_link_recv_fragment(
1714                                                 &l_ptr->defragm_buf,
1715                                                 &buf, &msg);
1716                                         if (ret == 1) {
1717                                                 l_ptr->stats.recv_fragmented++;
1718                                                 goto deliver;
1719                                         }
1720                                         if (ret == -1)
1721                                                 l_ptr->next_in_no--;
1722                                         break;
1723                                 case CHANGEOVER_PROTOCOL:
1724                                         type = msg_type(msg);
1725                                         if (link_recv_changeover_msg(&l_ptr,
1726                                                                      &buf)) {
1727                                                 msg = buf_msg(buf);
1728                                                 seq_no = msg_seqno(msg);
1729                                                 if (type == ORIGINAL_MSG)
1730                                                         goto deliver;
1731                                                 goto protocol_check;
1732                                         }
1733                                         break;
1734                                 default:
1735                                         kfree_skb(buf);
1736                                         buf = NULL;
1737                                         break;
1738                                 }
1739                                 tipc_node_unlock(n_ptr);
1740                                 tipc_net_route_msg(buf);
1741                                 continue;
1742                         }
1743                         link_handle_out_of_seq_msg(l_ptr, buf);
1744                         head = link_insert_deferred_queue(l_ptr, head);
1745                         tipc_node_unlock(n_ptr);
1746                         continue;
1747                 }
1748
1749                 /* Link is not in state WORKING_WORKING */
1750                 if (msg_user(msg) == LINK_PROTOCOL) {
1751                         link_recv_proto_msg(l_ptr, buf);
1752                         head = link_insert_deferred_queue(l_ptr, head);
1753                         tipc_node_unlock(n_ptr);
1754                         continue;
1755                 }
1756
1757                 /* Traffic message. Conditionally activate link */
1758                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1759
1760                 if (link_working_working(l_ptr)) {
1761                         /* Re-insert buffer in front of queue */
1762                         buf->next = head;
1763                         head = buf;
1764                         tipc_node_unlock(n_ptr);
1765                         continue;
1766                 }
1767                 tipc_node_unlock(n_ptr);
1768 cont:
1769                 kfree_skb(buf);
1770         }
1771         read_unlock_bh(&tipc_net_lock);
1772 }
1773
1774 /**
1775  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1776  *
1777  * Returns increase in queue length (i.e. 0 or 1)
1778  */
1779 u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
1780                         struct sk_buff *buf)
1781 {
1782         struct sk_buff *queue_buf;
1783         struct sk_buff **prev;
1784         u32 seq_no = buf_seqno(buf);
1785
1786         buf->next = NULL;
1787
1788         /* Empty queue ? */
1789         if (*head == NULL) {
1790                 *head = *tail = buf;
1791                 return 1;
1792         }
1793
1794         /* Last ? */
1795         if (less(buf_seqno(*tail), seq_no)) {
1796                 (*tail)->next = buf;
1797                 *tail = buf;
1798                 return 1;
1799         }
1800
1801         /* Locate insertion point in queue, then insert; discard if duplicate */
1802         prev = head;
1803         queue_buf = *head;
1804         for (;;) {
1805                 u32 curr_seqno = buf_seqno(queue_buf);
1806
1807                 if (seq_no == curr_seqno) {
1808                         kfree_skb(buf);
1809                         return 0;
1810                 }
1811
1812                 if (less(seq_no, curr_seqno))
1813                         break;
1814
1815                 prev = &queue_buf->next;
1816                 queue_buf = queue_buf->next;
1817         }
1818
1819         buf->next = queue_buf;
1820         *prev = buf;
1821         return 1;
1822 }
1823
1824 /*
1825  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1826  */
1827 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1828                                        struct sk_buff *buf)
1829 {
1830         u32 seq_no = buf_seqno(buf);
1831
1832         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1833                 link_recv_proto_msg(l_ptr, buf);
1834                 return;
1835         }
1836
1837         /* Record OOS packet arrival (force mismatch on next timeout) */
1838         l_ptr->checkpoint--;
1839
1840         /*
1841          * Discard packet if a duplicate; otherwise add it to deferred queue
1842          * and notify peer of gap as per protocol specification
1843          */
1844         if (less(seq_no, mod(l_ptr->next_in_no))) {
1845                 l_ptr->stats.duplicates++;
1846                 kfree_skb(buf);
1847                 return;
1848         }
1849
1850         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1851                                 &l_ptr->newest_deferred_in, buf)) {
1852                 l_ptr->deferred_inqueue_sz++;
1853                 l_ptr->stats.deferred_recv++;
1854                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1855                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1856         } else
1857                 l_ptr->stats.duplicates++;
1858 }
1859
1860 /*
1861  * Send protocol message to the other endpoint.
1862  */
1863 void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1864                               int probe_msg, u32 gap, u32 tolerance,
1865                               u32 priority, u32 ack_mtu)
1866 {
1867         struct sk_buff *buf = NULL;
1868         struct tipc_msg *msg = l_ptr->pmsg;
1869         u32 msg_size = sizeof(l_ptr->proto_msg);
1870         int r_flag;
1871
1872         /* Discard any previous message that was deferred due to congestion */
1873         if (l_ptr->proto_msg_queue) {
1874                 kfree_skb(l_ptr->proto_msg_queue);
1875                 l_ptr->proto_msg_queue = NULL;
1876         }
1877
1878         if (link_blocked(l_ptr))
1879                 return;
1880
1881         /* Abort non-RESET send if communication with node is prohibited */
1882         if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
1883                 return;
1884
1885         /* Create protocol message with "out-of-sequence" sequence number */
1886         msg_set_type(msg, msg_typ);
1887         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1888         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1889         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1890
1891         if (msg_typ == STATE_MSG) {
1892                 u32 next_sent = mod(l_ptr->next_out_no);
1893
1894                 if (!tipc_link_is_up(l_ptr))
1895                         return;
1896                 if (l_ptr->next_out)
1897                         next_sent = buf_seqno(l_ptr->next_out);
1898                 msg_set_next_sent(msg, next_sent);
1899                 if (l_ptr->oldest_deferred_in) {
1900                         u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
1901                         gap = mod(rec - mod(l_ptr->next_in_no));
1902                 }
1903                 msg_set_seq_gap(msg, gap);
1904                 if (gap)
1905                         l_ptr->stats.sent_nacks++;
1906                 msg_set_link_tolerance(msg, tolerance);
1907                 msg_set_linkprio(msg, priority);
1908                 msg_set_max_pkt(msg, ack_mtu);
1909                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1910                 msg_set_probe(msg, probe_msg != 0);
1911                 if (probe_msg) {
1912                         u32 mtu = l_ptr->max_pkt;
1913
1914                         if ((mtu < l_ptr->max_pkt_target) &&
1915                             link_working_working(l_ptr) &&
1916                             l_ptr->fsm_msg_cnt) {
1917                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1918                                 if (l_ptr->max_pkt_probes == 10) {
1919                                         l_ptr->max_pkt_target = (msg_size - 4);
1920                                         l_ptr->max_pkt_probes = 0;
1921                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1922                                 }
1923                                 l_ptr->max_pkt_probes++;
1924                         }
1925
1926                         l_ptr->stats.sent_probes++;
1927                 }
1928                 l_ptr->stats.sent_states++;
1929         } else {                /* RESET_MSG or ACTIVATE_MSG */
1930                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1931                 msg_set_seq_gap(msg, 0);
1932                 msg_set_next_sent(msg, 1);
1933                 msg_set_probe(msg, 0);
1934                 msg_set_link_tolerance(msg, l_ptr->tolerance);
1935                 msg_set_linkprio(msg, l_ptr->priority);
1936                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1937         }
1938
1939         r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1940         msg_set_redundant_link(msg, r_flag);
1941         msg_set_linkprio(msg, l_ptr->priority);
1942         msg_set_size(msg, msg_size);
1943
1944         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1945
1946         buf = tipc_buf_acquire(msg_size);
1947         if (!buf)
1948                 return;
1949
1950         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1951         buf->priority = TC_PRIO_CONTROL;
1952
1953         /* Defer message if bearer is already blocked */
1954         if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1955                 l_ptr->proto_msg_queue = buf;
1956                 return;
1957         }
1958
1959         tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1960         l_ptr->unacked_window = 0;
1961         kfree_skb(buf);
1962 }
1963
1964 /*
1965  * Receive protocol message :
1966  * Note that network plane id propagates through the network, and may
1967  * change at any time. The node with lowest address rules
1968  */
1969 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
1970 {
1971         u32 rec_gap = 0;
1972         u32 max_pkt_info;
1973         u32 max_pkt_ack;
1974         u32 msg_tol;
1975         struct tipc_msg *msg = buf_msg(buf);
1976
1977         if (link_blocked(l_ptr))
1978                 goto exit;
1979
1980         /* record unnumbered packet arrival (force mismatch on next timeout) */
1981         l_ptr->checkpoint--;
1982
1983         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
1984                 if (tipc_own_addr > msg_prevnode(msg))
1985                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
1986
1987         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
1988
1989         switch (msg_type(msg)) {
1990
1991         case RESET_MSG:
1992                 if (!link_working_unknown(l_ptr) &&
1993                     (l_ptr->peer_session != INVALID_SESSION)) {
1994                         if (less_eq(msg_session(msg), l_ptr->peer_session))
1995                                 break; /* duplicate or old reset: ignore */
1996                 }
1997
1998                 if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1999                                 link_working_unknown(l_ptr))) {
2000                         /*
2001                          * peer has lost contact -- don't allow peer's links
2002                          * to reactivate before we recognize loss & clean up
2003                          */
2004                         l_ptr->owner->block_setup = WAIT_NODE_DOWN;
2005                 }
2006
2007                 link_state_event(l_ptr, RESET_MSG);
2008
2009                 /* fall thru' */
2010         case ACTIVATE_MSG:
2011                 /* Update link settings according other endpoint's values */
2012                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2013
2014                 msg_tol = msg_link_tolerance(msg);
2015                 if (msg_tol > l_ptr->tolerance)
2016                         link_set_supervision_props(l_ptr, msg_tol);
2017
2018                 if (msg_linkprio(msg) > l_ptr->priority)
2019                         l_ptr->priority = msg_linkprio(msg);
2020
2021                 max_pkt_info = msg_max_pkt(msg);
2022                 if (max_pkt_info) {
2023                         if (max_pkt_info < l_ptr->max_pkt_target)
2024                                 l_ptr->max_pkt_target = max_pkt_info;
2025                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2026                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
2027                 } else {
2028                         l_ptr->max_pkt = l_ptr->max_pkt_target;
2029                 }
2030
2031                 /* Synchronize broadcast link info, if not done previously */
2032                 if (!tipc_node_is_up(l_ptr->owner)) {
2033                         l_ptr->owner->bclink.last_sent =
2034                                 l_ptr->owner->bclink.last_in =
2035                                 msg_last_bcast(msg);
2036                         l_ptr->owner->bclink.oos_state = 0;
2037                 }
2038
2039                 l_ptr->peer_session = msg_session(msg);
2040                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2041
2042                 if (msg_type(msg) == ACTIVATE_MSG)
2043                         link_state_event(l_ptr, ACTIVATE_MSG);
2044                 break;
2045         case STATE_MSG:
2046
2047                 msg_tol = msg_link_tolerance(msg);
2048                 if (msg_tol)
2049                         link_set_supervision_props(l_ptr, msg_tol);
2050
2051                 if (msg_linkprio(msg) &&
2052                     (msg_linkprio(msg) != l_ptr->priority)) {
2053                         pr_warn("%s<%s>, priority change %u->%u\n",
2054                                 link_rst_msg, l_ptr->name, l_ptr->priority,
2055                                 msg_linkprio(msg));
2056                         l_ptr->priority = msg_linkprio(msg);
2057                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
2058                         break;
2059                 }
2060                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2061                 l_ptr->stats.recv_states++;
2062                 if (link_reset_unknown(l_ptr))
2063                         break;
2064
2065                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2066                         rec_gap = mod(msg_next_sent(msg) -
2067                                       mod(l_ptr->next_in_no));
2068                 }
2069
2070                 max_pkt_ack = msg_max_pkt(msg);
2071                 if (max_pkt_ack > l_ptr->max_pkt) {
2072                         l_ptr->max_pkt = max_pkt_ack;
2073                         l_ptr->max_pkt_probes = 0;
2074                 }
2075
2076                 max_pkt_ack = 0;
2077                 if (msg_probe(msg)) {
2078                         l_ptr->stats.recv_probes++;
2079                         if (msg_size(msg) > sizeof(l_ptr->proto_msg))
2080                                 max_pkt_ack = msg_size(msg);
2081                 }
2082
2083                 /* Protocol message before retransmits, reduce loss risk */
2084                 if (l_ptr->owner->bclink.recv_permitted)
2085                         tipc_bclink_update_link_state(l_ptr->owner,
2086                                                       msg_last_bcast(msg));
2087
2088                 if (rec_gap || (msg_probe(msg))) {
2089                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2090                                                  0, rec_gap, 0, 0, max_pkt_ack);
2091                 }
2092                 if (msg_seq_gap(msg)) {
2093                         l_ptr->stats.recv_nacks++;
2094                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
2095                                              msg_seq_gap(msg));
2096                 }
2097                 break;
2098         }
2099 exit:
2100         kfree_skb(buf);
2101 }
2102
2103
2104 /*
2105  * tipc_link_tunnel(): Send one message via a link belonging to
2106  * another bearer. Owner node is locked.
2107  */
2108 static void tipc_link_tunnel(struct tipc_link *l_ptr,
2109                              struct tipc_msg *tunnel_hdr, struct tipc_msg *msg,
2110                              u32 selector)
2111 {
2112         struct tipc_link *tunnel;
2113         struct sk_buff *buf;
2114         u32 length = msg_size(msg);
2115
2116         tunnel = l_ptr->owner->active_links[selector & 1];
2117         if (!tipc_link_is_up(tunnel)) {
2118                 pr_warn("%stunnel link no longer available\n", link_co_err);
2119                 return;
2120         }
2121         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2122         buf = tipc_buf_acquire(length + INT_H_SIZE);
2123         if (!buf) {
2124                 pr_warn("%sunable to send tunnel msg\n", link_co_err);
2125                 return;
2126         }
2127         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2128         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2129         tipc_link_send_buf(tunnel, buf);
2130 }
2131
2132
2133
2134 /*
2135  * changeover(): Send whole message queue via the remaining link
2136  *               Owner node is locked.
2137  */
2138 void tipc_link_changeover(struct tipc_link *l_ptr)
2139 {
2140         u32 msgcount = l_ptr->out_queue_size;
2141         struct sk_buff *crs = l_ptr->first_out;
2142         struct tipc_link *tunnel = l_ptr->owner->active_links[0];
2143         struct tipc_msg tunnel_hdr;
2144         int split_bundles;
2145
2146         if (!tunnel)
2147                 return;
2148
2149         if (!l_ptr->owner->permit_changeover) {
2150                 pr_warn("%speer did not permit changeover\n", link_co_err);
2151                 return;
2152         }
2153
2154         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2155                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2156         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2157         msg_set_msgcnt(&tunnel_hdr, msgcount);
2158
2159         if (!l_ptr->first_out) {
2160                 struct sk_buff *buf;
2161
2162                 buf = tipc_buf_acquire(INT_H_SIZE);
2163                 if (buf) {
2164                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2165                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2166                         tipc_link_send_buf(tunnel, buf);
2167                 } else {
2168                         pr_warn("%sunable to send changeover msg\n",
2169                                 link_co_err);
2170                 }
2171                 return;
2172         }
2173
2174         split_bundles = (l_ptr->owner->active_links[0] !=
2175                          l_ptr->owner->active_links[1]);
2176
2177         while (crs) {
2178                 struct tipc_msg *msg = buf_msg(crs);
2179
2180                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2181                         struct tipc_msg *m = msg_get_wrapped(msg);
2182                         unchar *pos = (unchar *)m;
2183
2184                         msgcount = msg_msgcnt(msg);
2185                         while (msgcount--) {
2186                                 msg_set_seqno(m, msg_seqno(msg));
2187                                 tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2188                                                  msg_link_selector(m));
2189                                 pos += align(msg_size(m));
2190                                 m = (struct tipc_msg *)pos;
2191                         }
2192                 } else {
2193                         tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2194                                          msg_link_selector(msg));
2195                 }
2196                 crs = crs->next;
2197         }
2198 }
2199
2200 void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
2201 {
2202         struct sk_buff *iter;
2203         struct tipc_msg tunnel_hdr;
2204
2205         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2206                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2207         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2208         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2209         iter = l_ptr->first_out;
2210         while (iter) {
2211                 struct sk_buff *outbuf;
2212                 struct tipc_msg *msg = buf_msg(iter);
2213                 u32 length = msg_size(msg);
2214
2215                 if (msg_user(msg) == MSG_BUNDLER)
2216                         msg_set_type(msg, CLOSED_MSG);
2217                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2218                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2219                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2220                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2221                 if (outbuf == NULL) {
2222                         pr_warn("%sunable to send duplicate msg\n",
2223                                 link_co_err);
2224                         return;
2225                 }
2226                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2227                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2228                                                length);
2229                 tipc_link_send_buf(tunnel, outbuf);
2230                 if (!tipc_link_is_up(l_ptr))
2231                         return;
2232                 iter = iter->next;
2233         }
2234 }
2235
2236 /**
2237  * buf_extract - extracts embedded TIPC message from another message
2238  * @skb: encapsulating message buffer
2239  * @from_pos: offset to extract from
2240  *
2241  * Returns a new message buffer containing an embedded message.  The
2242  * encapsulating message itself is left unchanged.
2243  */
2244 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2245 {
2246         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2247         u32 size = msg_size(msg);
2248         struct sk_buff *eb;
2249
2250         eb = tipc_buf_acquire(size);
2251         if (eb)
2252                 skb_copy_to_linear_data(eb, msg, size);
2253         return eb;
2254 }
2255
2256 /*
2257  *  link_recv_changeover_msg(): Receive tunneled packet sent
2258  *  via other link. Node is locked. Return extracted buffer.
2259  */
2260 static int link_recv_changeover_msg(struct tipc_link **l_ptr,
2261                                     struct sk_buff **buf)
2262 {
2263         struct sk_buff *tunnel_buf = *buf;
2264         struct tipc_link *dest_link;
2265         struct tipc_msg *msg;
2266         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2267         u32 msg_typ = msg_type(tunnel_msg);
2268         u32 msg_count = msg_msgcnt(tunnel_msg);
2269         u32 bearer_id = msg_bearer_id(tunnel_msg);
2270
2271         if (bearer_id >= MAX_BEARERS)
2272                 goto exit;
2273         dest_link = (*l_ptr)->owner->links[bearer_id];
2274         if (!dest_link)
2275                 goto exit;
2276         if (dest_link == *l_ptr) {
2277                 pr_err("Unexpected changeover message on link <%s>\n",
2278                        (*l_ptr)->name);
2279                 goto exit;
2280         }
2281         *l_ptr = dest_link;
2282         msg = msg_get_wrapped(tunnel_msg);
2283
2284         if (msg_typ == DUPLICATE_MSG) {
2285                 if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
2286                         goto exit;
2287                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2288                 if (*buf == NULL) {
2289                         pr_warn("%sduplicate msg dropped\n", link_co_err);
2290                         goto exit;
2291                 }
2292                 kfree_skb(tunnel_buf);
2293                 return 1;
2294         }
2295
2296         /* First original message ?: */
2297         if (tipc_link_is_up(dest_link)) {
2298                 pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg,
2299                         dest_link->name);
2300                 tipc_link_reset(dest_link);
2301                 dest_link->exp_msg_count = msg_count;
2302                 if (!msg_count)
2303                         goto exit;
2304         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2305                 dest_link->exp_msg_count = msg_count;
2306                 if (!msg_count)
2307                         goto exit;
2308         }
2309
2310         /* Receive original message */
2311         if (dest_link->exp_msg_count == 0) {
2312                 pr_warn("%sgot too many tunnelled messages\n", link_co_err);
2313                 goto exit;
2314         }
2315         dest_link->exp_msg_count--;
2316         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2317                 goto exit;
2318         } else {
2319                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2320                 if (*buf != NULL) {
2321                         kfree_skb(tunnel_buf);
2322                         return 1;
2323                 } else {
2324                         pr_warn("%soriginal msg dropped\n", link_co_err);
2325                 }
2326         }
2327 exit:
2328         *buf = NULL;
2329         kfree_skb(tunnel_buf);
2330         return 0;
2331 }
2332
2333 /*
2334  *  Bundler functionality:
2335  */
2336 void tipc_link_recv_bundle(struct sk_buff *buf)
2337 {
2338         u32 msgcount = msg_msgcnt(buf_msg(buf));
2339         u32 pos = INT_H_SIZE;
2340         struct sk_buff *obuf;
2341
2342         while (msgcount--) {
2343                 obuf = buf_extract(buf, pos);
2344                 if (obuf == NULL) {
2345                         pr_warn("Link unable to unbundle message(s)\n");
2346                         break;
2347                 }
2348                 pos += align(msg_size(buf_msg(obuf)));
2349                 tipc_net_route_msg(obuf);
2350         }
2351         kfree_skb(buf);
2352 }
2353
2354 /*
2355  *  Fragmentation/defragmentation:
2356  */
2357
2358 /*
2359  * link_send_long_buf: Entry for buffers needing fragmentation.
2360  * The buffer is complete, inclusive total message length.
2361  * Returns user data length.
2362  */
2363 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2364 {
2365         struct sk_buff *buf_chain = NULL;
2366         struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2367         struct tipc_msg *inmsg = buf_msg(buf);
2368         struct tipc_msg fragm_hdr;
2369         u32 insize = msg_size(inmsg);
2370         u32 dsz = msg_data_sz(inmsg);
2371         unchar *crs = buf->data;
2372         u32 rest = insize;
2373         u32 pack_sz = l_ptr->max_pkt;
2374         u32 fragm_sz = pack_sz - INT_H_SIZE;
2375         u32 fragm_no = 0;
2376         u32 destaddr;
2377
2378         if (msg_short(inmsg))
2379                 destaddr = l_ptr->addr;
2380         else
2381                 destaddr = msg_destnode(inmsg);
2382
2383         /* Prepare reusable fragment header: */
2384         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2385                  INT_H_SIZE, destaddr);
2386
2387         /* Chop up message: */
2388         while (rest > 0) {
2389                 struct sk_buff *fragm;
2390
2391                 if (rest <= fragm_sz) {
2392                         fragm_sz = rest;
2393                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2394                 }
2395                 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2396                 if (fragm == NULL) {
2397                         kfree_skb(buf);
2398                         while (buf_chain) {
2399                                 buf = buf_chain;
2400                                 buf_chain = buf_chain->next;
2401                                 kfree_skb(buf);
2402                         }
2403                         return -ENOMEM;
2404                 }
2405                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2406                 fragm_no++;
2407                 msg_set_fragm_no(&fragm_hdr, fragm_no);
2408                 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2409                 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2410                                                fragm_sz);
2411                 buf_chain_tail->next = fragm;
2412                 buf_chain_tail = fragm;
2413
2414                 rest -= fragm_sz;
2415                 crs += fragm_sz;
2416                 msg_set_type(&fragm_hdr, FRAGMENT);
2417         }
2418         kfree_skb(buf);
2419
2420         /* Append chain of fragments to send queue & send them */
2421         l_ptr->long_msg_seq_no++;
2422         link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2423         l_ptr->stats.sent_fragments += fragm_no;
2424         l_ptr->stats.sent_fragmented++;
2425         tipc_link_push_queue(l_ptr);
2426
2427         return dsz;
2428 }
2429
2430 /*
2431  * A pending message being re-assembled must store certain values
2432  * to handle subsequent fragments correctly. The following functions
2433  * help storing these values in unused, available fields in the
2434  * pending message. This makes dynamic memory allocation unnecessary.
2435  */
2436 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2437 {
2438         msg_set_seqno(buf_msg(buf), seqno);
2439 }
2440
2441 static u32 get_fragm_size(struct sk_buff *buf)
2442 {
2443         return msg_ack(buf_msg(buf));
2444 }
2445
2446 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2447 {
2448         msg_set_ack(buf_msg(buf), sz);
2449 }
2450
2451 static u32 get_expected_frags(struct sk_buff *buf)
2452 {
2453         return msg_bcast_ack(buf_msg(buf));
2454 }
2455
2456 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2457 {
2458         msg_set_bcast_ack(buf_msg(buf), exp);
2459 }
2460
2461 /*
2462  * tipc_link_recv_fragment(): Called with node lock on. Returns
2463  * the reassembled buffer if message is complete.
2464  */
2465 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2466                             struct tipc_msg **m)
2467 {
2468         struct sk_buff *prev = NULL;
2469         struct sk_buff *fbuf = *fb;
2470         struct tipc_msg *fragm = buf_msg(fbuf);
2471         struct sk_buff *pbuf = *pending;
2472         u32 long_msg_seq_no = msg_long_msgno(fragm);
2473
2474         *fb = NULL;
2475
2476         /* Is there an incomplete message waiting for this fragment? */
2477         while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
2478                         (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2479                 prev = pbuf;
2480                 pbuf = pbuf->next;
2481         }
2482
2483         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2484                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2485                 u32 msg_sz = msg_size(imsg);
2486                 u32 fragm_sz = msg_data_sz(fragm);
2487                 u32 exp_fragm_cnt;
2488                 u32 max =  TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
2489
2490                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2491                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2492                 if (fragm_sz == 0 || msg_size(imsg) > max) {
2493                         kfree_skb(fbuf);
2494                         return 0;
2495                 }
2496                 exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
2497                 pbuf = tipc_buf_acquire(msg_size(imsg));
2498                 if (pbuf != NULL) {
2499                         pbuf->next = *pending;
2500                         *pending = pbuf;
2501                         skb_copy_to_linear_data(pbuf, imsg,
2502                                                 msg_data_sz(fragm));
2503                         /*  Prepare buffer for subsequent fragments. */
2504                         set_long_msg_seqno(pbuf, long_msg_seq_no);
2505                         set_fragm_size(pbuf, fragm_sz);
2506                         set_expected_frags(pbuf, exp_fragm_cnt - 1);
2507                 } else {
2508                         pr_debug("Link unable to reassemble fragmented message\n");
2509                         kfree_skb(fbuf);
2510                         return -1;
2511                 }
2512                 kfree_skb(fbuf);
2513                 return 0;
2514         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2515                 u32 dsz = msg_data_sz(fragm);
2516                 u32 fsz = get_fragm_size(pbuf);
2517                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2518                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2519                 skb_copy_to_linear_data_offset(pbuf, crs,
2520                                                msg_data(fragm), dsz);
2521                 kfree_skb(fbuf);
2522
2523                 /* Is message complete? */
2524                 if (exp_frags == 0) {
2525                         if (prev)
2526                                 prev->next = pbuf->next;
2527                         else
2528                                 *pending = pbuf->next;
2529                         msg_reset_reroute_cnt(buf_msg(pbuf));
2530                         *fb = pbuf;
2531                         *m = buf_msg(pbuf);
2532                         return 1;
2533                 }
2534                 set_expected_frags(pbuf, exp_frags);
2535                 return 0;
2536         }
2537         kfree_skb(fbuf);
2538         return 0;
2539 }
2540
2541 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2542 {
2543         if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
2544                 return;
2545
2546         l_ptr->tolerance = tolerance;
2547         l_ptr->continuity_interval =
2548                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2549         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2550 }
2551
2552 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
2553 {
2554         /* Data messages from this node, inclusive FIRST_FRAGM */
2555         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2556         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2557         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2558         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2559         /* Transiting data messages,inclusive FIRST_FRAGM */
2560         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2561         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2562         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2563         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2564         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2565         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2566         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2567         /* FRAGMENT and LAST_FRAGMENT packets */
2568         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2569 }
2570
2571 /**
2572  * link_find_link - locate link by name
2573  * @name: ptr to link name string
2574  * @node: ptr to area to be filled with ptr to associated node
2575  *
2576  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2577  * this also prevents link deletion.
2578  *
2579  * Returns pointer to link (or 0 if invalid link name).
2580  */
2581 static struct tipc_link *link_find_link(const char *name,
2582                                         struct tipc_node **node)
2583 {
2584         struct tipc_link_name link_name_parts;
2585         struct tipc_bearer *b_ptr;
2586         struct tipc_link *l_ptr;
2587
2588         if (!link_name_validate(name, &link_name_parts))
2589                 return NULL;
2590
2591         b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2592         if (!b_ptr)
2593                 return NULL;
2594
2595         *node = tipc_node_find(link_name_parts.addr_peer);
2596         if (!*node)
2597                 return NULL;
2598
2599         l_ptr = (*node)->links[b_ptr->identity];
2600         if (!l_ptr || strcmp(l_ptr->name, name))
2601                 return NULL;
2602
2603         return l_ptr;
2604 }
2605
2606 /**
2607  * link_value_is_valid -- validate proposed link tolerance/priority/window
2608  *
2609  * @cmd: value type (TIPC_CMD_SET_LINK_*)
2610  * @new_value: the new value
2611  *
2612  * Returns 1 if value is within range, 0 if not.
2613  */
2614 static int link_value_is_valid(u16 cmd, u32 new_value)
2615 {
2616         switch (cmd) {
2617         case TIPC_CMD_SET_LINK_TOL:
2618                 return (new_value >= TIPC_MIN_LINK_TOL) &&
2619                         (new_value <= TIPC_MAX_LINK_TOL);
2620         case TIPC_CMD_SET_LINK_PRI:
2621                 return (new_value <= TIPC_MAX_LINK_PRI);
2622         case TIPC_CMD_SET_LINK_WINDOW:
2623                 return (new_value >= TIPC_MIN_LINK_WIN) &&
2624                         (new_value <= TIPC_MAX_LINK_WIN);
2625         }
2626         return 0;
2627 }
2628
2629 /**
2630  * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
2631  * @name: ptr to link, bearer, or media name
2632  * @new_value: new value of link, bearer, or media setting
2633  * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
2634  *
2635  * Caller must hold 'tipc_net_lock' to ensure link/bearer/media is not deleted.
2636  *
2637  * Returns 0 if value updated and negative value on error.
2638  */
2639 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2640 {
2641         struct tipc_node *node;
2642         struct tipc_link *l_ptr;
2643         struct tipc_bearer *b_ptr;
2644         struct tipc_media *m_ptr;
2645
2646         l_ptr = link_find_link(name, &node);
2647         if (l_ptr) {
2648                 /*
2649                  * acquire node lock for tipc_link_send_proto_msg().
2650                  * see "TIPC locking policy" in net.c.
2651                  */
2652                 tipc_node_lock(node);
2653                 switch (cmd) {
2654                 case TIPC_CMD_SET_LINK_TOL:
2655                         link_set_supervision_props(l_ptr, new_value);
2656                         tipc_link_send_proto_msg(l_ptr,
2657                                 STATE_MSG, 0, 0, new_value, 0, 0);
2658                         break;
2659                 case TIPC_CMD_SET_LINK_PRI:
2660                         l_ptr->priority = new_value;
2661                         tipc_link_send_proto_msg(l_ptr,
2662                                 STATE_MSG, 0, 0, 0, new_value, 0);
2663                         break;
2664                 case TIPC_CMD_SET_LINK_WINDOW:
2665                         tipc_link_set_queue_limits(l_ptr, new_value);
2666                         break;
2667                 }
2668                 tipc_node_unlock(node);
2669                 return 0;
2670         }
2671
2672         b_ptr = tipc_bearer_find(name);
2673         if (b_ptr) {
2674                 switch (cmd) {
2675                 case TIPC_CMD_SET_LINK_TOL:
2676                         b_ptr->tolerance = new_value;
2677                         return 0;
2678                 case TIPC_CMD_SET_LINK_PRI:
2679                         b_ptr->priority = new_value;
2680                         return 0;
2681                 case TIPC_CMD_SET_LINK_WINDOW:
2682                         b_ptr->window = new_value;
2683                         return 0;
2684                 }
2685                 return -EINVAL;
2686         }
2687
2688         m_ptr = tipc_media_find(name);
2689         if (!m_ptr)
2690                 return -ENODEV;
2691         switch (cmd) {
2692         case TIPC_CMD_SET_LINK_TOL:
2693                 m_ptr->tolerance = new_value;
2694                 return 0;
2695         case TIPC_CMD_SET_LINK_PRI:
2696                 m_ptr->priority = new_value;
2697                 return 0;
2698         case TIPC_CMD_SET_LINK_WINDOW:
2699                 m_ptr->window = new_value;
2700                 return 0;
2701         }
2702         return -EINVAL;
2703 }
2704
2705 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2706                                      u16 cmd)
2707 {
2708         struct tipc_link_config *args;
2709         u32 new_value;
2710         int res;
2711
2712         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2713                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2714
2715         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2716         new_value = ntohl(args->value);
2717
2718         if (!link_value_is_valid(cmd, new_value))
2719                 return tipc_cfg_reply_error_string(
2720                         "cannot change, value invalid");
2721
2722         if (!strcmp(args->name, tipc_bclink_name)) {
2723                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2724                     (tipc_bclink_set_queue_limits(new_value) == 0))
2725                         return tipc_cfg_reply_none();
2726                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2727                                                    " (cannot change setting on broadcast link)");
2728         }
2729
2730         read_lock_bh(&tipc_net_lock);
2731         res = link_cmd_set_value(args->name, new_value, cmd);
2732         read_unlock_bh(&tipc_net_lock);
2733         if (res)
2734                 return tipc_cfg_reply_error_string("cannot change link setting");
2735
2736         return tipc_cfg_reply_none();
2737 }
2738
2739 /**
2740  * link_reset_statistics - reset link statistics
2741  * @l_ptr: pointer to link
2742  */
2743 static void link_reset_statistics(struct tipc_link *l_ptr)
2744 {
2745         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2746         l_ptr->stats.sent_info = l_ptr->next_out_no;
2747         l_ptr->stats.recv_info = l_ptr->next_in_no;
2748 }
2749
2750 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2751 {
2752         char *link_name;
2753         struct tipc_link *l_ptr;
2754         struct tipc_node *node;
2755
2756         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2757                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2758
2759         link_name = (char *)TLV_DATA(req_tlv_area);
2760         if (!strcmp(link_name, tipc_bclink_name)) {
2761                 if (tipc_bclink_reset_stats())
2762                         return tipc_cfg_reply_error_string("link not found");
2763                 return tipc_cfg_reply_none();
2764         }
2765
2766         read_lock_bh(&tipc_net_lock);
2767         l_ptr = link_find_link(link_name, &node);
2768         if (!l_ptr) {
2769                 read_unlock_bh(&tipc_net_lock);
2770                 return tipc_cfg_reply_error_string("link not found");
2771         }
2772
2773         tipc_node_lock(node);
2774         link_reset_statistics(l_ptr);
2775         tipc_node_unlock(node);
2776         read_unlock_bh(&tipc_net_lock);
2777         return tipc_cfg_reply_none();
2778 }
2779
2780 /**
2781  * percent - convert count to a percentage of total (rounding up or down)
2782  */
2783 static u32 percent(u32 count, u32 total)
2784 {
2785         return (count * 100 + (total / 2)) / total;
2786 }
2787
2788 /**
2789  * tipc_link_stats - print link statistics
2790  * @name: link name
2791  * @buf: print buffer area
2792  * @buf_size: size of print buffer area
2793  *
2794  * Returns length of print buffer data string (or 0 if error)
2795  */
2796 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2797 {
2798         struct tipc_link *l;
2799         struct tipc_stats *s;
2800         struct tipc_node *node;
2801         char *status;
2802         u32 profile_total = 0;
2803         int ret;
2804
2805         if (!strcmp(name, tipc_bclink_name))
2806                 return tipc_bclink_stats(buf, buf_size);
2807
2808         read_lock_bh(&tipc_net_lock);
2809         l = link_find_link(name, &node);
2810         if (!l) {
2811                 read_unlock_bh(&tipc_net_lock);
2812                 return 0;
2813         }
2814         tipc_node_lock(node);
2815         s = &l->stats;
2816
2817         if (tipc_link_is_active(l))
2818                 status = "ACTIVE";
2819         else if (tipc_link_is_up(l))
2820                 status = "STANDBY";
2821         else
2822                 status = "DEFUNCT";
2823
2824         ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
2825                             "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2826                             "  Window:%u packets\n",
2827                             l->name, status, l->max_pkt, l->priority,
2828                             l->tolerance, l->queue_limit[0]);
2829
2830         ret += tipc_snprintf(buf + ret, buf_size - ret,
2831                              "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2832                              l->next_in_no - s->recv_info, s->recv_fragments,
2833                              s->recv_fragmented, s->recv_bundles,
2834                              s->recv_bundled);
2835
2836         ret += tipc_snprintf(buf + ret, buf_size - ret,
2837                              "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2838                              l->next_out_no - s->sent_info, s->sent_fragments,
2839                              s->sent_fragmented, s->sent_bundles,
2840                              s->sent_bundled);
2841
2842         profile_total = s->msg_length_counts;
2843         if (!profile_total)
2844                 profile_total = 1;
2845
2846         ret += tipc_snprintf(buf + ret, buf_size - ret,
2847                              "  TX profile sample:%u packets  average:%u octets\n"
2848                              "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2849                              "-16384:%u%% -32768:%u%% -66000:%u%%\n",
2850                              s->msg_length_counts,
2851                              s->msg_lengths_total / profile_total,
2852                              percent(s->msg_length_profile[0], profile_total),
2853                              percent(s->msg_length_profile[1], profile_total),
2854                              percent(s->msg_length_profile[2], profile_total),
2855                              percent(s->msg_length_profile[3], profile_total),
2856                              percent(s->msg_length_profile[4], profile_total),
2857                              percent(s->msg_length_profile[5], profile_total),
2858                              percent(s->msg_length_profile[6], profile_total));
2859
2860         ret += tipc_snprintf(buf + ret, buf_size - ret,
2861                              "  RX states:%u probes:%u naks:%u defs:%u"
2862                              " dups:%u\n", s->recv_states, s->recv_probes,
2863                              s->recv_nacks, s->deferred_recv, s->duplicates);
2864
2865         ret += tipc_snprintf(buf + ret, buf_size - ret,
2866                              "  TX states:%u probes:%u naks:%u acks:%u"
2867                              " dups:%u\n", s->sent_states, s->sent_probes,
2868                              s->sent_nacks, s->sent_acks, s->retransmitted);
2869
2870         ret += tipc_snprintf(buf + ret, buf_size - ret,
2871                              "  Congestion link:%u  Send queue"
2872                              " max:%u avg:%u\n", s->link_congs,
2873                              s->max_queue_sz, s->queue_sz_counts ?
2874                              (s->accu_queue_sz / s->queue_sz_counts) : 0);
2875
2876         tipc_node_unlock(node);
2877         read_unlock_bh(&tipc_net_lock);
2878         return ret;
2879 }
2880
2881 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2882 {
2883         struct sk_buff *buf;
2884         struct tlv_desc *rep_tlv;
2885         int str_len;
2886         int pb_len;
2887         char *pb;
2888
2889         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2890                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2891
2892         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2893         if (!buf)
2894                 return NULL;
2895
2896         rep_tlv = (struct tlv_desc *)buf->data;
2897         pb = TLV_DATA(rep_tlv);
2898         pb_len = ULTRA_STRING_MAX_LEN;
2899         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2900                                   pb, pb_len);
2901         if (!str_len) {
2902                 kfree_skb(buf);
2903                 return tipc_cfg_reply_error_string("link not found");
2904         }
2905         str_len += 1;   /* for "\0" */
2906         skb_put(buf, TLV_SPACE(str_len));
2907         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2908
2909         return buf;
2910 }
2911
2912 /**
2913  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2914  * @dest: network address of destination node
2915  * @selector: used to select from set of active links
2916  *
2917  * If no active link can be found, uses default maximum packet size.
2918  */
2919 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2920 {
2921         struct tipc_node *n_ptr;
2922         struct tipc_link *l_ptr;
2923         u32 res = MAX_PKT_DEFAULT;
2924
2925         if (dest == tipc_own_addr)
2926                 return MAX_MSG_SIZE;
2927
2928         read_lock_bh(&tipc_net_lock);
2929         n_ptr = tipc_node_find(dest);
2930         if (n_ptr) {
2931                 tipc_node_lock(n_ptr);
2932                 l_ptr = n_ptr->active_links[selector & 1];
2933                 if (l_ptr)
2934                         res = l_ptr->max_pkt;
2935                 tipc_node_unlock(n_ptr);
2936         }
2937         read_unlock_bh(&tipc_net_lock);
2938         return res;
2939 }
2940
2941 static void link_print(struct tipc_link *l_ptr, const char *str)
2942 {
2943         pr_info("%s Link %x<%s>:", str, l_ptr->addr, l_ptr->b_ptr->name);
2944
2945         if (link_working_unknown(l_ptr))
2946                 pr_cont(":WU\n");
2947         else if (link_reset_reset(l_ptr))
2948                 pr_cont(":RR\n");
2949         else if (link_reset_unknown(l_ptr))
2950                 pr_cont(":RU\n");
2951         else if (link_working_working(l_ptr))
2952                 pr_cont(":WW\n");
2953         else
2954                 pr_cont("\n");
2955 }