X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=net%2Ftipc%2Flink.c;h=9efbdbde2b0863542a08c91c136fadc9aa0cc6d8;hb=27eb427bdc0960ad64b72da03e3596c801e7a9e9;hp=75db07c78a6900157c0b568491a5d4e8e694e274;hpb=84a73014d86fd660822a20c032625e3afe99ca58;p=firefly-linux-kernel-4.4.55.git diff --git a/net/tipc/link.c b/net/tipc/link.c index 75db07c78a69..9efbdbde2b08 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -50,6 +50,7 @@ */ static const char *link_co_err = "Link tunneling error, "; static const char *link_rst_msg = "Resetting link "; +static const char tipc_bclink_name[] = "broadcast-link"; static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, @@ -75,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } }; +/* Send states for broadcast NACKs + */ +enum { + BC_NACK_SND_CONDITIONAL, + BC_NACK_SND_UNCONDITIONAL, + BC_NACK_SND_SUPPRESS, +}; + /* * Interval between NACKs when packets arrive out of order */ @@ -110,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, struct sk_buff_head *xmitq); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static void tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +static void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +static bool tipc_link_release_pkts(struct tipc_link *l, u16 to); /* * Simple non-static link routines (i.e. referenced outside this file) @@ -120,11 +133,21 @@ bool tipc_link_is_up(struct tipc_link *l) return link_is_up(l); } +bool tipc_link_peer_is_down(struct tipc_link *l) +{ + return l->state == LINK_PEER_RESET; +} + bool tipc_link_is_reset(struct tipc_link *l) { return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING); } +bool tipc_link_is_establishing(struct tipc_link *l) +{ + return l->state == LINK_ESTABLISHING; +} + bool tipc_link_is_synching(struct tipc_link *l) { return l->state == LINK_SYNCHING; @@ -140,11 +163,66 @@ bool tipc_link_is_blocked(struct tipc_link *l) return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); } +static bool link_is_bc_sndlink(struct tipc_link *l) +{ + return !l->bc_sndlink; +} + +static bool link_is_bc_rcvlink(struct tipc_link *l) +{ + return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l)); +} + int tipc_link_is_active(struct tipc_link *l) { - struct tipc_node *n = l->owner; + return l->active; +} + +void tipc_link_set_active(struct tipc_link *l, bool active) +{ + l->active = active; +} + +void tipc_link_add_bc_peer(struct tipc_link *snd_l, + struct tipc_link *uc_l, + struct sk_buff_head *xmitq) +{ + struct tipc_link *rcv_l = uc_l->bc_rcvlink; + + snd_l->ackers++; + rcv_l->acked = snd_l->snd_nxt - 1; + tipc_link_build_bc_init_msg(uc_l, xmitq); +} + +void tipc_link_remove_bc_peer(struct tipc_link *snd_l, + struct tipc_link *rcv_l, + struct sk_buff_head *xmitq) +{ + u16 ack = snd_l->snd_nxt - 1; + + snd_l->ackers--; + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); + tipc_link_reset(rcv_l); + rcv_l->state = LINK_RESET; + if (!snd_l->ackers) { + tipc_link_reset(snd_l); + __skb_queue_purge(xmitq); + } +} + +int tipc_link_bc_peers(struct tipc_link *l) +{ + return l->ackers; +} + +void tipc_link_set_mtu(struct tipc_link *l, int mtu) +{ + l->mtu = mtu; +} - return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); +int tipc_link_mtu(struct tipc_link *l) +{ + return l->mtu; } static u32 link_own_addr(struct tipc_link *l) @@ -155,57 +233,72 @@ static u32 link_own_addr(struct tipc_link *l) /** * tipc_link_create - create a new link * @n: pointer to associated node - * @b: pointer to associated bearer + * @if_name: associated interface name + * @bearer_id: id (index) of associated bearer + * @tolerance: link tolerance to be used by link + * @net_plane: network plane (A,B,c..) this link belongs to + * @mtu: mtu to be advertised by link + * @priority: priority to be used by link + * @window: send window to be used by link + * @session: session to be used by link * @ownnode: identity of own node - * @peer: identity of peer node - * @maddr: media address to be used + * @peer: node id of peer node + * @peer_caps: bitmap describing peer node capabilities + * @bc_sndlink: the namespace global link used for broadcast sending + * @bc_rcvlink: the peer specific link used for broadcast reception * @inputq: queue to put messages ready for delivery * @namedq: queue to put binding table update messages ready for delivery * @link: return value, pointer to put the created link * * Returns true if link was created, otherwise false */ -bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, - u32 ownnode, u32 peer, struct tipc_media_addr *maddr, - struct sk_buff_head *inputq, struct sk_buff_head *namedq, +bool tipc_link_create(struct net *net, char *if_name, int bearer_id, + int tolerance, char net_plane, u32 mtu, int priority, + int window, u32 session, u32 ownnode, u32 peer, + u16 peer_caps, + struct tipc_link *bc_sndlink, + struct tipc_link *bc_rcvlink, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, struct tipc_link **link) { struct tipc_link *l; struct tipc_msg *hdr; - char *if_name; l = kzalloc(sizeof(*l), GFP_ATOMIC); if (!l) return false; *link = l; + l->pmsg = (struct tipc_msg *)&l->proto_msg; + hdr = l->pmsg; + tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); + msg_set_size(hdr, sizeof(l->proto_msg)); + msg_set_session(hdr, session); + msg_set_bearer_id(hdr, l->bearer_id); /* Note: peer i/f name is completed by reset/activate message */ - if_name = strchr(b->name, ':') + 1; sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); + strcpy((char *)msg_data(hdr), if_name); l->addr = peer; - l->media_addr = maddr; - l->owner = n; + l->peer_caps = peer_caps; + l->net = net; l->peer_session = WILDCARD_SESSION; - l->bearer_id = b->identity; - l->tolerance = b->tolerance; - l->net_plane = b->net_plane; - l->advertised_mtu = b->mtu; - l->mtu = b->mtu; - l->priority = b->priority; - tipc_link_set_queue_limits(l, b->window); + l->bearer_id = bearer_id; + l->tolerance = tolerance; + l->net_plane = net_plane; + l->advertised_mtu = mtu; + l->mtu = mtu; + l->priority = priority; + tipc_link_set_queue_limits(l, window); + l->ackers = 1; + l->bc_sndlink = bc_sndlink; + l->bc_rcvlink = bc_rcvlink; l->inputq = inputq; l->namedq = namedq; l->state = LINK_RESETTING; - l->pmsg = (struct tipc_msg *)&l->proto_msg; - hdr = l->pmsg; - tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); - msg_set_size(hdr, sizeof(l->proto_msg)); - msg_set_session(hdr, session); - msg_set_bearer_id(hdr, l->bearer_id); - strcpy((char *)msg_data(hdr), if_name); __skb_queue_head_init(&l->transmq); __skb_queue_head_init(&l->backlogq); __skb_queue_head_init(&l->deferdq); @@ -214,27 +307,43 @@ bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, return true; } -/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. +/** + * tipc_link_bc_create - create new link to be used for broadcast + * @n: pointer to associated node + * @mtu: mtu to be used + * @window: send window to be used + * @inputq: queue to put messages ready for delivery + * @namedq: queue to put binding table update messages ready for delivery + * @link: return value, pointer to put the created link * - * Give a newly added peer node the sequence number where it should - * start receiving and acking broadcast packets. + * Returns true if link was created, otherwise false */ -void tipc_link_build_bcast_sync_msg(struct tipc_link *l, - struct sk_buff_head *xmitq) +bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, + int mtu, int window, u16 peer_caps, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, + struct tipc_link *bc_sndlink, + struct tipc_link **link) { - struct sk_buff *skb; - struct sk_buff_head list; - u16 last_sent; + struct tipc_link *l; - skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, - 0, l->addr, link_own_addr(l), 0, 0, 0); - if (!skb) - return; - last_sent = tipc_bclink_get_last_sent(l->owner->net); - msg_set_last_bcast(buf_msg(skb), last_sent); - __skb_queue_head_init(&list); - __skb_queue_tail(&list, skb); - tipc_link_xmit(l, &list, xmitq); + if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, + 0, ownnode, peer, peer_caps, bc_sndlink, + NULL, inputq, namedq, link)) + return false; + + l = *link; + strcpy(l->name, tipc_bclink_name); + tipc_link_reset(l); + l->state = LINK_RESET; + l->ackers = 0; + l->bc_rcvlink = l; + + /* Broadcast send link is always up */ + if (link_is_bc_sndlink(l)) + l->state = LINK_ESTABLISHED; + + return true; } /** @@ -321,14 +430,15 @@ int tipc_link_fsm_evt(struct tipc_link *l, int evt) switch (evt) { case LINK_ESTABLISH_EVT: l->state = LINK_ESTABLISHED; - rc |= TIPC_LINK_UP_EVT; break; case LINK_FAILOVER_BEGIN_EVT: l->state = LINK_FAILINGOVER; break; - case LINK_PEER_RESET_EVT: case LINK_RESET_EVT: + l->state = LINK_RESET; + break; case LINK_FAILURE_EVT: + case LINK_PEER_RESET_EVT: case LINK_SYNCH_BEGIN_EVT: case LINK_FAILOVER_END_EVT: break; @@ -438,6 +548,8 @@ static void link_profile_stats(struct tipc_link *l) l->stats.msg_length_profile[6]++; } +/* tipc_link_timeout - perform periodic task as instructed from node timeout + */ /* tipc_link_timeout - perform periodic task as instructed from node timeout */ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) @@ -446,6 +558,9 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) int mtyp = STATE_MSG; bool xmit = false; bool prb = false; + u16 bc_snt = l->bc_sndlink->snd_nxt - 1; + u16 bc_acked = l->bc_rcvlink->acked; + bool bc_up = link_is_up(l->bc_rcvlink); link_profile_stats(l); @@ -453,7 +568,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) case LINK_ESTABLISHED: case LINK_SYNCHING: if (!l->silent_intv_cnt) { - if (tipc_bclink_acks_missing(l->owner)) + if (bc_up && (bc_acked != bc_snt)) xmit = true; } else if (l->silent_intv_cnt <= l->abort_limit) { xmit = true; @@ -544,42 +659,8 @@ void link_prepare_wakeup(struct tipc_link *l) } } -/** - * tipc_link_reset_fragments - purge link's inbound message fragments queue - * @l_ptr: pointer to link - */ -void tipc_link_reset_fragments(struct tipc_link *l_ptr) -{ - kfree_skb(l_ptr->reasm_buf); - l_ptr->reasm_buf = NULL; -} - -void tipc_link_purge_backlog(struct tipc_link *l) -{ - __skb_queue_purge(&l->backlogq); - l->backlog[TIPC_LOW_IMPORTANCE].len = 0; - l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; - l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; - l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; - l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; -} - -/** - * tipc_link_purge_queues - purge all pkt queues associated with link - * @l_ptr: pointer to link - */ -void tipc_link_purge_queues(struct tipc_link *l_ptr) -{ - __skb_queue_purge(&l_ptr->deferdq); - __skb_queue_purge(&l_ptr->transmq); - tipc_link_purge_backlog(l_ptr); - tipc_link_reset_fragments(l_ptr); -} - void tipc_link_reset(struct tipc_link *l) { - tipc_link_fsm_evt(l, LINK_RESET_EVT); - /* Link is down, accept any session */ l->peer_session = WILDCARD_SESSION; @@ -589,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l) /* Prepare for renewed mtu size negotiation */ l->mtu = l->advertised_mtu; - /* Clean up all queues: */ + /* Clean up all queues and counters: */ __skb_queue_purge(&l->transmq); __skb_queue_purge(&l->deferdq); skb_queue_splice_init(&l->wakeupq, l->inputq); - - tipc_link_purge_backlog(l); + __skb_queue_purge(&l->backlogq); + l->backlog[TIPC_LOW_IMPORTANCE].len = 0; + l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; + l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; + l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; + l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; @@ -602,80 +687,14 @@ void tipc_link_reset(struct tipc_link *l) l->rcv_unacked = 0; l->snd_nxt = 1; l->rcv_nxt = 1; + l->acked = 0; l->silent_intv_cnt = 0; l->stats.recv_info = 0; l->stale_count = 0; + l->bc_peer_is_up = false; link_reset_statistics(l); } -/** - * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked - * @link: link to use - * @list: chain of buffers containing message - * - * Consumes the buffer chain, except when returning an error code, - * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS - * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted - */ -int __tipc_link_xmit(struct net *net, struct tipc_link *link, - struct sk_buff_head *list) -{ - struct tipc_msg *msg = buf_msg(skb_peek(list)); - unsigned int maxwin = link->window; - unsigned int i, imp = msg_importance(msg); - uint mtu = link->mtu; - u16 ack = mod(link->rcv_nxt - 1); - u16 seqno = link->snd_nxt; - u16 bc_last_in = link->owner->bclink.last_in; - struct tipc_media_addr *addr = link->media_addr; - struct sk_buff_head *transmq = &link->transmq; - struct sk_buff_head *backlogq = &link->backlogq; - struct sk_buff *skb, *bskb; - - /* Match msg importance against this and all higher backlog limits: */ - for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { - if (unlikely(link->backlog[i].len >= link->backlog[i].limit)) - return link_schedule_user(link, list); - } - if (unlikely(msg_size(msg) > mtu)) - return -EMSGSIZE; - - /* Prepare each packet for sending, and add to relevant queue: */ - while (skb_queue_len(list)) { - skb = skb_peek(list); - msg = buf_msg(skb); - msg_set_seqno(msg, seqno); - msg_set_ack(msg, ack); - msg_set_bcast_ack(msg, bc_last_in); - - if (likely(skb_queue_len(transmq) < maxwin)) { - __skb_dequeue(list); - __skb_queue_tail(transmq, skb); - tipc_bearer_send(net, link->bearer_id, skb, addr); - link->rcv_unacked = 0; - seqno++; - continue; - } - if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) { - kfree_skb(__skb_dequeue(list)); - link->stats.sent_bundled++; - continue; - } - if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) { - kfree_skb(__skb_dequeue(list)); - __skb_queue_tail(backlogq, bskb); - link->backlog[msg_importance(buf_msg(bskb))].len++; - link->stats.sent_bundled++; - link->stats.sent_bundles++; - continue; - } - link->backlog[imp].len += skb_queue_len(list); - skb_queue_splice_tail_init(list, backlogq); - } - link->snd_nxt = seqno; - return 0; -} - /** * tipc_link_xmit(): enqueue buffer list according to queue situation * @link: link to use @@ -696,7 +715,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, unsigned int mtu = l->mtu; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; - u16 bc_last_in = l->owner->bclink.last_in; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; struct sk_buff_head *transmq = &l->transmq; struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff *skb, *_skb, *bskb; @@ -715,7 +734,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, hdr = buf_msg(skb); msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_last_in); + msg_set_bcast_ack(hdr, bc_ack); if (likely(skb_queue_len(transmq) < maxwin)) { _skb = skb_clone(skb, GFP_ATOMIC); @@ -724,6 +743,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, __skb_dequeue(list); __skb_queue_tail(transmq, skb); __skb_queue_tail(xmitq, _skb); + TIPC_SKB_CB(skb)->ackers = l->ackers; l->rcv_unacked = 0; seqno++; continue; @@ -748,62 +768,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return 0; } -/* - * tipc_link_sync_rcv - synchronize broadcast link endpoints. - * Receive the sequence number where we should start receiving and - * acking broadcast packets from a newly added peer node, and open - * up for reception of such packets. - * - * Called with node locked - */ -static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - - n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); - n->bclink.recv_permitted = true; - kfree_skb(buf); -} - -/* - * tipc_link_push_packets - push unsent packets to bearer - * - * Push out the unsent messages of a link where congestion - * has abated. Node is locked. - * - * Called with node locked - */ -void tipc_link_push_packets(struct tipc_link *link) -{ - struct sk_buff *skb; - struct tipc_msg *msg; - u16 seqno = link->snd_nxt; - u16 ack = mod(link->rcv_nxt - 1); - - while (skb_queue_len(&link->transmq) < link->window) { - skb = __skb_dequeue(&link->backlogq); - if (!skb) - break; - msg = buf_msg(skb); - link->backlog[msg_importance(msg)].len--; - msg_set_ack(msg, ack); - msg_set_seqno(msg, seqno); - seqno = mod(seqno + 1); - msg_set_bcast_ack(msg, link->owner->bclink.last_in); - link->rcv_unacked = 0; - __skb_queue_tail(&link->transmq, skb); - tipc_bearer_send(link->owner->net, link->bearer_id, - skb, link->media_addr); - } - link->snd_nxt = seqno; -} - void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) { struct sk_buff *skb, *_skb; struct tipc_msg *hdr; u16 seqno = l->snd_nxt; u16 ack = l->rcv_nxt - 1; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; while (skb_queue_len(&l->transmq) < l->window) { skb = skb_peek(&l->backlogq); @@ -817,96 +788,35 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) l->backlog[msg_importance(hdr)].len--; __skb_queue_tail(&l->transmq, skb); __skb_queue_tail(xmitq, _skb); - msg_set_ack(hdr, ack); + TIPC_SKB_CB(skb)->ackers = l->ackers; msg_set_seqno(hdr, seqno); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); l->rcv_unacked = 0; seqno++; } l->snd_nxt = seqno; } -static void link_retransmit_failure(struct tipc_link *l_ptr, - struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - struct net *net = l_ptr->owner->net; - - pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); - - if (l_ptr->addr) { - /* Handle failure on standard link */ - link_print(l_ptr, "Resetting link "); - pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", - msg_user(msg), msg_type(msg), msg_size(msg), - msg_errcode(msg)); - pr_info("sqno %u, prev: %x, src: %x\n", - msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg)); - } else { - /* Handle failure on broadcast link */ - struct tipc_node *n_ptr; - char addr_string[16]; - - pr_info("Msg seq number: %u, ", msg_seqno(msg)); - pr_cont("Outstanding acks: %lu\n", - (unsigned long) TIPC_SKB_CB(buf)->handle); - - n_ptr = tipc_bclink_retransmit_to(net); - - tipc_addr_string_fill(addr_string, n_ptr->addr); - pr_info("Broadcast link info for %s\n", addr_string); - pr_info("Reception permitted: %d, Acked: %u\n", - n_ptr->bclink.recv_permitted, - n_ptr->bclink.acked); - pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", - n_ptr->bclink.last_in, - n_ptr->bclink.oos_state, - n_ptr->bclink.last_sent); - - n_ptr->action_flags |= TIPC_BCAST_RESET; - l_ptr->stale_count = 0; - } -} - -void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, - u32 retransmits) +static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb) { - struct tipc_msg *msg; - - if (!skb) - return; - - msg = buf_msg(skb); - - /* Detect repeated retransmit failures */ - if (l_ptr->last_retransm == msg_seqno(msg)) { - if (++l_ptr->stale_count > 100) { - link_retransmit_failure(l_ptr, skb); - return; - } - } else { - l_ptr->last_retransm = msg_seqno(msg); - l_ptr->stale_count = 1; - } + struct tipc_msg *hdr = buf_msg(skb); - skb_queue_walk_from(&l_ptr->transmq, skb) { - if (!retransmits) - break; - msg = buf_msg(skb); - msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb, - l_ptr->media_addr); - retransmits--; - l_ptr->stats.retransmitted++; - } + pr_warn("Retransmission failure on link <%s>\n", l->name); + link_print(l, "Resetting link "); + pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", + msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr)); + pr_info("sqno %u, prev: %x, src: %x\n", + msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); } -static int tipc_link_retransm(struct tipc_link *l, int retransm, - struct sk_buff_head *xmitq) +int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, + struct sk_buff_head *xmitq) { struct sk_buff *_skb, *skb = skb_peek(&l->transmq); struct tipc_msg *hdr; + u16 ack = l->rcv_nxt - 1; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; if (!skb) return 0; @@ -919,19 +829,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, link_retransmit_failure(l, skb); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } + + /* Move forward to where retransmission should start */ skb_queue_walk(&l->transmq, skb) { - if (!retransm) - return 0; + if (!less(buf_seqno(skb), from)) + break; + } + + skb_queue_walk_from(&l->transmq, skb) { + if (more(buf_seqno(skb), to)) + break; hdr = buf_msg(skb); _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); if (!_skb) return 0; hdr = buf_msg(_skb); - msg_set_ack(hdr, l->rcv_nxt - 1); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); - retransm--; l->stats.retransmitted++; } return 0; @@ -942,22 +858,20 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, * Consumes buffer if message is of right type * Node lock must be held */ -static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, +static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - struct tipc_node *node = link->owner; - switch (msg_user(buf_msg(skb))) { case TIPC_LOW_IMPORTANCE: case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: case CONN_MANAGER: - __skb_queue_tail(inputq, skb); + skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: - node->bclink.recv_permitted = true; - skb_queue_tail(link->namedq, skb); + l->bc_rcvlink->state = LINK_ESTABLISHED; + skb_queue_tail(l->namedq, skb); return true; case MSG_BUNDLER: case TUNNEL_PROTOCOL: @@ -978,10 +892,10 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - struct tipc_node *node = l->owner; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; + struct sk_buff_head tmpq; int usr = msg_user(hdr); int rc = 0; int pos = 0; @@ -1006,23 +920,27 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, } if (usr == MSG_BUNDLER) { + skb_queue_head_init(&tmpq); l->stats.recv_bundles++; l->stats.recv_bundled += msg_msgcnt(hdr); while (tipc_msg_extract(skb, &iskb, &pos)) - tipc_data_input(l, iskb, inputq); + tipc_data_input(l, iskb, &tmpq); + tipc_skb_queue_splice_tail(&tmpq, inputq); return 0; } else if (usr == MSG_FRAGMENTER) { l->stats.recv_fragments++; if (tipc_buf_append(reasm_skb, &skb)) { l->stats.recv_fragmented++; tipc_data_input(l, skb, inputq); - } else if (!*reasm_skb) { + } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) { + pr_warn_ratelimited("Unable to build fragment list\n"); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } return 0; } else if (usr == BCAST_PROTOCOL) { - tipc_link_sync_rcv(node, skb); - return 0; + tipc_bcast_lock(l->net); + tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); + tipc_bcast_unlock(l->net); } drop: kfree_skb(skb); @@ -1044,49 +962,95 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) return released; } +/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission + * + * Note that sending of broadcast ack is coordinated among nodes, to reduce + * risk of ack storms towards the sender + */ +int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +{ + if (!l) + return 0; + + /* Broadcast ACK must be sent via a unicast link => defer to caller */ + if (link_is_bc_rcvlink(l)) { + if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf) + return 0; + l->rcv_unacked = 0; + return TIPC_LINK_SND_BC_ACK; + } + + /* Unicast ACK */ + l->rcv_unacked = 0; + l->stats.sent_acks++; + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); + return 0; +} + +/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message + */ +void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +{ + int mtyp = RESET_MSG; + + if (l->state == LINK_ESTABLISHING) + mtyp = ACTIVATE_MSG; + + tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq); +} + +/* tipc_link_build_nack_msg: prepare link nack message for transmission + */ +static void tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + u32 def_cnt = ++l->stats.deferred_recv; + + if (link_is_bc_rcvlink(l)) + return; + + if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); +} + /* tipc_link_rcv - process TIPC packets/messages arriving from off-node - * @link: the link that should handle the message + * @l: the link that should handle the message * @skb: TIPC packet * @xmitq: queue to place packets to be sent after this call */ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { - struct sk_buff_head *arrvq = &l->deferdq; - struct sk_buff_head tmpq; + struct sk_buff_head *defq = &l->deferdq; struct tipc_msg *hdr; - u16 seqno, rcv_nxt; + u16 seqno, rcv_nxt, win_lim; int rc = 0; - __skb_queue_head_init(&tmpq); - - if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { - if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) - tipc_link_build_proto_msg(l, STATE_MSG, 0, - 0, 0, 0, xmitq); - return rc; - } - - while ((skb = skb_peek(arrvq))) { + do { hdr = buf_msg(skb); + seqno = msg_seqno(hdr); + rcv_nxt = l->rcv_nxt; + win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; /* Verify and update link state */ - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { - __skb_dequeue(arrvq); - rc = tipc_link_proto_rcv(l, skb, xmitq); - continue; - } + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) + return tipc_link_proto_rcv(l, skb, xmitq); if (unlikely(!link_is_up(l))) { - rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - if (!link_is_up(l)) { - kfree_skb(__skb_dequeue(arrvq)); - goto exit; - } + if (l->state == LINK_ESTABLISHING) + rc = TIPC_LINK_UP_EVT; + goto drop; } + /* Don't send probe at next timeout expiration */ l->silent_intv_cnt = 0; + /* Drop if outside receive window */ + if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) { + l->stats.duplicates++; + goto drop; + } + /* Forward queues and wake up waiting users */ if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { tipc_link_advance_backlog(l, xmitq); @@ -1094,79 +1058,28 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, link_prepare_wakeup(l); } - /* Defer reception if there is a gap in the sequence */ - seqno = msg_seqno(hdr); - rcv_nxt = l->rcv_nxt; - if (unlikely(less(rcv_nxt, seqno))) { - l->stats.deferred_recv++; - goto exit; - } - - __skb_dequeue(arrvq); - - /* Drop if packet already received */ - if (unlikely(more(rcv_nxt, seqno))) { - l->stats.duplicates++; - kfree_skb(skb); - goto exit; + /* Defer delivery if sequence gap */ + if (unlikely(seqno != rcv_nxt)) { + __tipc_skb_queue_sorted(defq, seqno, skb); + tipc_link_build_nack_msg(l, xmitq); + break; } - /* Packet can be delivered */ + /* Deliver packet */ l->rcv_nxt++; l->stats.recv_info++; - if (unlikely(!tipc_data_input(l, skb, &tmpq))) - rc = tipc_link_input(l, skb, &tmpq); - - /* Ack at regular intervals */ - if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { - l->rcv_unacked = 0; - l->stats.sent_acks++; - tipc_link_build_proto_msg(l, STATE_MSG, - 0, 0, 0, 0, xmitq); - } - } -exit: - tipc_skb_queue_splice_tail(&tmpq, l->inputq); - return rc; -} - -/** - * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue - * - * Returns increase in queue length (i.e. 0 or 1) - */ -u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) -{ - struct sk_buff *skb1; - u16 seq_no = buf_seqno(skb); - - /* Empty queue ? */ - if (skb_queue_empty(list)) { - __skb_queue_tail(list, skb); - return 1; - } - - /* Last ? */ - if (less(buf_seqno(skb_peek_tail(list)), seq_no)) { - __skb_queue_tail(list, skb); - return 1; - } - - /* Locate insertion point in queue, then insert; discard if duplicate */ - skb_queue_walk(list, skb1) { - u16 curr_seqno = buf_seqno(skb1); - - if (seq_no == curr_seqno) { - kfree_skb(skb); - return 0; - } - - if (less(seq_no, curr_seqno)) + if (!tipc_data_input(l, skb, l->inputq)) + rc |= tipc_link_input(l, skb, l->inputq); + if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) + rc |= tipc_link_build_ack_msg(l, xmitq); + if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK)) break; - } + } while ((skb = __skb_dequeue(defq))); - __skb_queue_before(list, skb1, skb); - return 1; + return rc; +drop: + kfree_skb(skb); + return rc; } /* @@ -1184,23 +1097,17 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, skb = __skb_dequeue(&xmitq); if (!skb) return; - tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); + tipc_bearer_xmit_skb(l->net, l->bearer_id, skb, l->media_addr); l->rcv_unacked = 0; - kfree_skb(skb); } -/* tipc_link_build_proto_msg: prepare link protocol message for transmission - */ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq) { struct sk_buff *skb = NULL; struct tipc_msg *hdr = l->pmsg; - u16 snd_nxt = l->snd_nxt; - u16 rcv_nxt = l->rcv_nxt; - u16 rcv_last = rcv_nxt - 1; - int node_up = l->owner->bclink.recv_permitted; + bool node_up = link_is_up(l->bc_rcvlink); /* Don't send protocol message during reset or link failover */ if (tipc_link_is_blocked(l)) @@ -1208,33 +1115,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_type(hdr, mtyp); msg_set_net_plane(hdr, l->net_plane); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); - msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); + msg_set_next_sent(hdr, l->snd_nxt); + msg_set_ack(hdr, l->rcv_nxt - 1); + msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1); + msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); msg_set_link_tolerance(hdr, tolerance); msg_set_linkprio(hdr, priority); msg_set_redundant_link(hdr, node_up); msg_set_seq_gap(hdr, 0); /* Compatibility: created msg must not be in sequence with pkt flow */ - msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); + msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2); if (mtyp == STATE_MSG) { if (!tipc_link_is_up(l)) return; - msg_set_next_sent(hdr, snd_nxt); /* Override rcvgap if there are packets in deferred queue */ if (!skb_queue_empty(&l->deferdq)) - rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; + rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt; if (rcvgap) { msg_set_seq_gap(hdr, rcvgap); l->stats.sent_nacks++; } - msg_set_ack(hdr, rcv_last); msg_set_probe(hdr, probe); if (probe) l->stats.sent_probes++; l->stats.sent_states++; + l->rcv_unacked = 0; } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_max_pkt(hdr, l->advertised_mtu); @@ -1250,7 +1158,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, } /* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets - * with contents of the link's tranmsit and backlog queues. + * with contents of the link's transmit and backlog queues. */ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, int mtyp, struct sk_buff_head *xmitq) @@ -1326,21 +1234,23 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, { struct tipc_msg *hdr = buf_msg(skb); u16 rcvgap = 0; - u16 nacked_gap = msg_seq_gap(hdr); + u16 ack = msg_ack(hdr); + u16 gap = msg_seq_gap(hdr); u16 peers_snd_nxt = msg_next_sent(hdr); u16 peers_tol = msg_link_tolerance(hdr); u16 peers_prio = msg_linkprio(hdr); u16 rcv_nxt = l->rcv_nxt; + int mtyp = msg_type(hdr); char *if_name; int rc = 0; - if (tipc_link_is_blocked(l)) + if (tipc_link_is_blocked(l) || !xmitq) goto exit; if (link_own_addr(l) > msg_prevnode(hdr)) l->net_plane = msg_net_plane(hdr); - switch (msg_type(hdr)) { + switch (mtyp) { case RESET_MSG: /* Ignore duplicate RESET with old session number */ @@ -1367,12 +1277,14 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI)) l->priority = peers_prio; - if (msg_type(hdr) == RESET_MSG) { - rc |= tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); - } else if (!link_is_up(l)) { - tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); - rc |= tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - } + /* ACTIVATE_MSG serves as PEER_RESET if link is already down */ + if ((mtyp == RESET_MSG) || !link_is_up(l)) + rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); + + /* ACTIVATE_MSG takes up link if it was already locally reset */ + if ((mtyp == ACTIVATE_MSG) && (l->state == LINK_ESTABLISHING)) + rc = TIPC_LINK_UP_EVT; + l->peer_session = msg_session(hdr); l->peer_bearer_id = msg_bearer_id(hdr); if (l->mtu > msg_max_pkt(hdr)) @@ -1389,9 +1301,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, l->stats.recv_states++; if (msg_probe(hdr)) l->stats.recv_probes++; - rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - if (!link_is_up(l)) + + if (!link_is_up(l)) { + if (l->state == LINK_ESTABLISHING) + rc = TIPC_LINK_UP_EVT; break; + } /* Send NACK if peer has sent pkts we haven't received yet */ if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l)) @@ -1399,11 +1314,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (rcvgap || (msg_probe(hdr))) tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 0, 0, xmitq); - tipc_link_release_pkts(l, msg_ack(hdr)); + tipc_link_release_pkts(l, ack); /* If NACK, retransmit will now start at right position */ - if (nacked_gap) { - rc = tipc_link_retransm(l, nacked_gap, xmitq); + if (gap) { + rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq); l->stats.recv_nacks++; } @@ -1416,6 +1331,188 @@ exit: return rc; } +/* tipc_link_build_bc_proto_msg() - create broadcast protocol message + */ +static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast, + u16 peers_snd_nxt, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + struct tipc_msg *hdr; + struct sk_buff *dfrd_skb = skb_peek(&l->deferdq); + u16 ack = l->rcv_nxt - 1; + u16 gap_to = peers_snd_nxt - 1; + + skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, + 0, l->addr, link_own_addr(l), 0, 0, 0); + if (!skb) + return false; + hdr = buf_msg(skb); + msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); + msg_set_bcast_ack(hdr, ack); + msg_set_bcgap_after(hdr, ack); + if (dfrd_skb) + gap_to = buf_seqno(dfrd_skb) - 1; + msg_set_bcgap_to(hdr, gap_to); + msg_set_non_seq(hdr, bcast); + __skb_queue_tail(xmitq, skb); + return true; +} + +/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + */ +static void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + struct sk_buff_head list; + + __skb_queue_head_init(&list); + if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list)) + return; + tipc_link_xmit(l, &list, xmitq); +} + +/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer + */ +void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr) +{ + int mtyp = msg_type(hdr); + u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + + if (link_is_up(l)) + return; + + if (msg_user(hdr) == BCAST_PROTOCOL) { + l->rcv_nxt = peers_snd_nxt; + l->state = LINK_ESTABLISHED; + return; + } + + if (l->peer_caps & TIPC_BCAST_SYNCH) + return; + + if (msg_peer_node_is_up(hdr)) + return; + + /* Compatibility: accept older, less safe initial synch data */ + if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG)) + l->rcv_nxt = peers_snd_nxt; +} + +/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state + */ +void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) +{ + u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + + if (!link_is_up(l)) + return; + + if (!msg_peer_node_is_up(hdr)) + return; + + l->bc_peer_is_up = true; + + /* Ignore if peers_snd_nxt goes beyond receive window */ + if (more(peers_snd_nxt, l->rcv_nxt + l->window)) + return; + + if (!more(peers_snd_nxt, l->rcv_nxt)) { + l->nack_state = BC_NACK_SND_CONDITIONAL; + return; + } + + /* Don't NACK if one was recently sent or peeked */ + if (l->nack_state == BC_NACK_SND_SUPPRESS) { + l->nack_state = BC_NACK_SND_UNCONDITIONAL; + return; + } + + /* Conditionally delay NACK sending until next synch rcv */ + if (l->nack_state == BC_NACK_SND_CONDITIONAL) { + l->nack_state = BC_NACK_SND_UNCONDITIONAL; + if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN) + return; + } + + /* Send NACK now but suppress next one */ + tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq); + l->nack_state = BC_NACK_SND_SUPPRESS; +} + +void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb, *tmp; + struct tipc_link *snd_l = l->bc_sndlink; + + if (!link_is_up(l) || !l->bc_peer_is_up) + return; + + if (!more(acked, l->acked)) + return; + + /* Skip over packets peer has already acked */ + skb_queue_walk(&snd_l->transmq, skb) { + if (more(buf_seqno(skb), l->acked)) + break; + } + + /* Update/release the packets peer is acking now */ + skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) { + if (more(buf_seqno(skb), acked)) + break; + if (!--TIPC_SKB_CB(skb)->ackers) { + __skb_unlink(skb, &snd_l->transmq); + kfree_skb(skb); + } + } + l->acked = acked; + tipc_link_advance_backlog(snd_l, xmitq); + if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) + link_prepare_wakeup(snd_l); +} + +/* tipc_link_bc_nack_rcv(): receive broadcast nack message + */ +int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + u32 dnode = msg_destnode(hdr); + int mtyp = msg_type(hdr); + u16 acked = msg_bcast_ack(hdr); + u16 from = acked + 1; + u16 to = msg_bcgap_to(hdr); + u16 peers_snd_nxt = to + 1; + int rc = 0; + + kfree_skb(skb); + + if (!tipc_link_is_up(l) || !l->bc_peer_is_up) + return 0; + + if (mtyp != STATE_MSG) + return 0; + + if (dnode == link_own_addr(l)) { + tipc_link_bc_ack_rcv(l, acked, xmitq); + rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq); + l->stats.recv_nacks++; + return rc; + } + + /* Msg for other node => suppress own NACK at next sync if applicable */ + if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from)) + l->nack_state = BC_NACK_SND_SUPPRESS; + + return 0; +} + void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); @@ -1480,7 +1577,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr) static void link_print(struct tipc_link *l, const char *str) { struct sk_buff *hskb = skb_peek(&l->transmq); - u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; + u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1; u16 tail = l->snd_nxt - 1; pr_info("%s Link <%s> state %x\n", str, l->name, l->state); @@ -1704,7 +1801,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, if (tipc_link_is_up(link)) if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) goto attr_msg_full; - if (tipc_link_is_active(link)) + if (link->active) if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) goto attr_msg_full;