tipc: eliminate race condition at dual link establishment
[firefly-linux-kernel-4.4.55.git] / net / tipc / link.c
index 58e2460682da7392c8bb2af78dfd54d07aafdefd..1287161e9424a854ab18e8442fdf74528ac7cec7 100644 (file)
@@ -139,6 +139,13 @@ static void tipc_link_put(struct tipc_link *l_ptr)
        kref_put(&l_ptr->ref, tipc_link_release);
 }
 
+static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
+{
+       if (l->owner->active_links[0] != l)
+               return l->owner->active_links[0];
+       return l->owner->active_links[1];
+}
+
 static void link_init_max_pkt(struct tipc_link *l_ptr)
 {
        struct tipc_node *node = l_ptr->owner;
@@ -1026,6 +1033,32 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
        }
 }
 
+/* link_synch(): check if all packets arrived before the synch
+ *               point have been consumed
+ * Returns true if the parallel links are synched, otherwise false
+ */
+static bool link_synch(struct tipc_link *l)
+{
+       unsigned int post_synch;
+       struct tipc_link *pl;
+
+       pl  = tipc_parallel_link(l);
+       if (pl == l)
+               goto synched;
+
+       /* Was last pre-synch packet added to input queue ? */
+       if (less_eq(pl->next_in_no, l->synch_point))
+               return false;
+
+       /* Is it still in the input queue ? */
+       post_synch = mod(pl->next_in_no - l->synch_point) - 1;
+       if (skb_queue_len(&pl->inputq) > post_synch)
+               return false;
+synched:
+       l->flags &= ~LINK_SYNCHING;
+       return true;
+}
+
 static void link_retrieve_defq(struct tipc_link *link,
                               struct sk_buff_head *list)
 {
@@ -1156,6 +1189,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
                        skb = NULL;
                        goto unlock;
                }
+               /* Synchronize with parallel link if applicable */
+               if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
+                       link_handle_out_of_seq_msg(l_ptr, skb);
+                       if (link_synch(l_ptr))
+                               link_retrieve_defq(l_ptr, &head);
+                       skb = NULL;
+                       goto unlock;
+               }
                l_ptr->next_in_no++;
                if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
                        link_retrieve_defq(l_ptr, &head);
@@ -1231,6 +1272,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
 
        switch (msg_user(msg)) {
        case CHANGEOVER_PROTOCOL:
+               if (msg_dup(msg)) {
+                       link->flags |= LINK_SYNCHING;
+                       link->synch_point = msg_seqno(msg_get_wrapped(msg));
+               }
                if (!tipc_link_tunnel_rcv(node, &skb))
                        break;
                if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {