tipc: make media xmit call outside node spinlock context
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 16 Jul 2015 20:54:24 +0000 (16:54 -0400)
committerDavid S. Miller <davem@davemloft.net>
Tue, 21 Jul 2015 03:41:15 +0000 (20:41 -0700)
Currently, message sending is performed through a deep call chain,
where the node spinlock is grabbed and held during a significant
part of the transmission time. This is clearly detrimental to
overall throughput performance; it would be better if we could send
the message after the spinlock has been released.

In this commit, we do instead let the call revert on the stack after
the buffer chain has been added to the transmission queue, whereafter
clones of the buffers are transmitted to the device layer outside the
spinlock scope.

As a further step in our effort to separate the roles of the node
and link entities we also move the function tipc_link_xmit() to
node.c, and rename it to tipc_node_xmit().

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bearer.c
net/tipc/bearer.h
net/tipc/link.c
net/tipc/link.h
net/tipc/name_distr.c
net/tipc/node.c
net/tipc/node.h
net/tipc/socket.c

index 00bc0e6205326025212a85e1110bab48c208e73a..eae58a6b121cf8b3d70260038fc644edcd6c97df 100644 (file)
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
        rcu_read_unlock();
 }
 
+/* tipc_bearer_xmit() -send buffer to destination over bearer
+ */
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+                     struct sk_buff_head *xmitq,
+                     struct tipc_media_addr *dst)
+{
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct tipc_bearer *b;
+       struct sk_buff *skb, *tmp;
+
+       if (skb_queue_empty(xmitq))
+               return;
+
+       rcu_read_lock();
+       b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+       if (likely(b)) {
+               skb_queue_walk_safe(xmitq, skb, tmp) {
+                       __skb_dequeue(xmitq);
+                       b->media->send_msg(net, skb, b, dst);
+                       /* Until we remove cloning in tipc_l2_send_msg(): */
+                       kfree_skb(skb);
+               }
+       }
+       rcu_read_unlock();
+}
+
 /**
  * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
  * @buf: the received packet
index dc714d977768c105cff0b774b49be1e5ec1c59fd..6426f242f6262e80594cd1cdc438c4a94f4c7026 100644 (file)
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
 void tipc_bearer_stop(struct net *net);
 void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
                      struct tipc_media_addr *dest);
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+                     struct sk_buff_head *xmitq,
+                     struct tipc_media_addr *dst);
 
 #endif /* _TIPC_BEARER_H */
index ea32679b673797f6fa8ee61521445272b861187b..c052437a7cfad4b2592d696d6433c56c5e53d89a 100644 (file)
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
        /* This really cannot happen...  */
        if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
                pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-               tipc_link_reset(link);
                return -ENOBUFS;
        }
        /* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
        return 0;
 }
 
+/**
+ * tipc_link_xmit(): enqueue buffer list according to queue situation
+ * @link: link to use
+ * @list: chain of buffers containing message
+ * @xmitq: returned list of packets to be sent by caller
+ *
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
+ */
+int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
+                  struct sk_buff_head *xmitq)
+{
+       struct tipc_msg *hdr = buf_msg(skb_peek(list));
+       unsigned int maxwin = l->window;
+       unsigned int i, imp = msg_importance(hdr);
+       unsigned int mtu = l->mtu;
+       u16 ack = l->rcv_nxt - 1;
+       u16 seqno = l->snd_nxt;
+       u16 bc_last_in = l->owner->bclink.last_in;
+       struct sk_buff_head *transmq = &l->transmq;
+       struct sk_buff_head *backlogq = &l->backlogq;
+       struct sk_buff *skb, *_skb, *bskb;
+
+       /* Match msg importance against this and all higher backlog limits: */
+       for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+               if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+                       return link_schedule_user(l, list);
+       }
+       if (unlikely(msg_size(hdr) > mtu))
+               return -EMSGSIZE;
+
+       /* Prepare each packet for sending, and add to relevant queue: */
+       while (skb_queue_len(list)) {
+               skb = skb_peek(list);
+               hdr = buf_msg(skb);
+               msg_set_seqno(hdr, seqno);
+               msg_set_ack(hdr, ack);
+               msg_set_bcast_ack(hdr, bc_last_in);
+
+               if (likely(skb_queue_len(transmq) < maxwin)) {
+                       _skb = skb_clone(skb, GFP_ATOMIC);
+                       if (!_skb)
+                               return -ENOBUFS;
+                       __skb_dequeue(list);
+                       __skb_queue_tail(transmq, skb);
+                       __skb_queue_tail(xmitq, _skb);
+                       l->rcv_unacked = 0;
+                       seqno++;
+                       continue;
+               }
+               if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+                       kfree_skb(__skb_dequeue(list));
+                       l->stats.sent_bundled++;
+                       continue;
+               }
+               if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+                       kfree_skb(__skb_dequeue(list));
+                       __skb_queue_tail(backlogq, bskb);
+                       l->backlog[msg_importance(buf_msg(bskb))].len++;
+                       l->stats.sent_bundled++;
+                       l->stats.sent_bundles++;
+                       continue;
+               }
+               l->backlog[imp].len += skb_queue_len(list);
+               skb_queue_splice_tail_init(list, backlogq);
+       }
+       l->snd_nxt = seqno;
+       return 0;
+}
+
 static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
 {
        skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
        return __tipc_link_xmit(link->owner->net, link, &head);
 }
 
-/* tipc_link_xmit_skb(): send single buffer to destination
- * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not cause link congestion
- * The only exception is datagram messages rerouted after secondary
- * lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
- */
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
-                      u32 selector)
-{
-       struct sk_buff_head head;
-       int rc;
-
-       skb2list(skb, &head);
-       rc = tipc_link_xmit(net, &head, dnode, selector);
-       if (rc)
-               kfree_skb(skb);
-       return 0;
-}
-
-/**
- * tipc_link_xmit() is the general link level function for message sending
- * @net: the applicable net namespace
- * @list: chain of buffers containing message
- * @dsz: amount of user data to be sent
- * @dnode: address of destination node
- * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning error
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
- */
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
-                  u32 selector)
-{
-       struct tipc_link *link = NULL;
-       struct tipc_node *node;
-       int rc = -EHOSTUNREACH;
-
-       node = tipc_node_find(net, dnode);
-       if (node) {
-               tipc_node_lock(node);
-               link = node_active_link(node, selector & 1);
-               if (link)
-                       rc = __tipc_link_xmit(net, link, list);
-               tipc_node_unlock(node);
-               tipc_node_put(node);
-       }
-       if (link)
-               return rc;
-
-       if (likely(in_own_node(net, dnode))) {
-               tipc_sk_rcv(net, list);
-               return 0;
-       }
-
-       __skb_queue_purge(list);
-       return rc;
-}
-
 /*
  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
  *
index 9c71d9e42e930e8beb60274df94697d122be626a..7add2b90361d8de7769ed7aa611a74ceea1e9782 100644 (file)
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
 void tipc_link_purge_backlog(struct tipc_link *l);
 void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
-                      u32 selector);
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
-                  u32 selector);
 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
                     struct sk_buff_head *list);
+int tipc_link_xmit(struct tipc_link *link,     struct sk_buff_head *list,
+                  struct sk_buff_head *xmitq);
 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
                          u32 gap, u32 tolerance, u32 priority);
 void tipc_link_push_packets(struct tipc_link *l_ptr);
index 3a1539e962941bb6cd1aa604a9e49f0d9709c878..e6018b7eb1970dfc85bc7e0dc8945ccf45a72180 100644 (file)
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
                if (!oskb)
                        break;
                msg_set_destnode(buf_msg(oskb), dnode);
-               tipc_link_xmit_skb(net, oskb, dnode, dnode);
+               tipc_node_xmit_skb(net, oskb, dnode, dnode);
        }
        rcu_read_unlock();
 
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
                         &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
        rcu_read_unlock();
 
-       tipc_link_xmit(net, &head, dnode, dnode);
+       tipc_node_xmit(net, &head, dnode, dnode);
 }
 
 static void tipc_publ_subscribe(struct net *net, struct publication *publ,
index 19729645d494759bdc1a00560ad0631781dfdc39..ad759bb034e7d3ce34ffd56cb5c00c5600a5c7cf 100644 (file)
@@ -563,6 +563,84 @@ msg_full:
        return -EMSGSIZE;
 }
 
+static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
+                                              int *bearer_id,
+                                              struct tipc_media_addr **maddr)
+{
+       int id = n->active_links[sel & 1];
+
+       if (unlikely(id < 0))
+               return NULL;
+
+       *bearer_id = id;
+       *maddr = &n->links[id].maddr;
+       return n->links[id].link;
+}
+
+/**
+ * tipc_node_xmit() is the general link level function for message sending
+ * @net: the applicable net namespace
+ * @list: chain of buffers containing message
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
+                  u32 dnode, int selector)
+{
+       struct tipc_link *l = NULL;
+       struct tipc_node *n;
+       struct sk_buff_head xmitq;
+       struct tipc_media_addr *maddr;
+       int bearer_id;
+       int rc = -EHOSTUNREACH;
+
+       __skb_queue_head_init(&xmitq);
+       n = tipc_node_find(net, dnode);
+       if (likely(n)) {
+               tipc_node_lock(n);
+               l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
+               if (likely(l))
+                       rc = tipc_link_xmit(l, list, &xmitq);
+               if (unlikely(rc == -ENOBUFS))
+                       tipc_link_reset(l);
+               tipc_node_unlock(n);
+               tipc_node_put(n);
+       }
+       if (likely(!rc)) {
+               tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+               return 0;
+       }
+       if (likely(in_own_node(net, dnode))) {
+               tipc_sk_rcv(net, list);
+               return 0;
+       }
+       return rc;
+}
+
+/* tipc_node_xmit_skb(): send single buffer to destination
+ * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
+ * messages, which will not be rejected
+ * The only exception is datagram messages rerouted after secondary
+ * lookup, which are rare and safe to dispose of anyway.
+ * TODO: Return real return value, and let callers use
+ * tipc_wait_for_sendpkt() where applicable
+ */
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
+                      u32 selector)
+{
+       struct sk_buff_head head;
+       int rc;
+
+       skb_queue_head_init(&head);
+       __skb_queue_tail(&head, skb);
+       rc = tipc_node_xmit(net, &head, dnode, selector);
+       if (rc == -ELINKCONG)
+               kfree_skb(skb);
+       return 0;
+}
+
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
        int err;
index 74f278adada35443231bff04070f1952c4e16607..86b7c740cf8499cab5a2d85d9a23260b9eb30510 100644 (file)
@@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
                           char *linkname, size_t len);
 void tipc_node_unlock(struct tipc_node *node);
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
+                  int selector);
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
+                      u32 selector);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
 
index 87fef25f6519e47bbb3655eeb2f2059bc25739c4..5b0b08d58fcc139b88628d3d0c111c066010f002 100644 (file)
@@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
 
        while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
                if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
-                       tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
+                       tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
        }
 }
 
@@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock)
                        }
                        if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
                                             TIPC_ERR_NO_PORT))
-                               tipc_link_xmit_skb(net, skb, dnode, 0);
+                               tipc_node_xmit_skb(net, skb, dnode, 0);
                }
        }
 
@@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock)
                                      tsk_own_node(tsk), tsk_peer_port(tsk),
                                      tsk->portid, TIPC_ERR_NO_PORT);
                if (skb)
-                       tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+                       tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
                tipc_node_remove_conn(net, dnode, tsk->portid);
        }
 
@@ -925,7 +925,7 @@ new_mtu:
        do {
                skb = skb_peek(pktchain);
                TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
+               rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
                if (likely(!rc)) {
                        if (sock->state != SS_READY)
                                sock->state = SS_CONNECTING;
@@ -1045,7 +1045,7 @@ next:
                return rc;
        do {
                if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_link_xmit(net, pktchain, dnode, portid);
+                       rc = tipc_node_xmit(net, pktchain, dnode, portid);
                        if (likely(!rc)) {
                                tsk->sent_unacked++;
                                sent += send;
@@ -1224,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
                return;
        msg = buf_msg(skb);
        msg_set_msgcnt(msg, ack);
-       tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+       tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1703,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
                return 0;
        }
        if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
-               tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+               tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
        return 0;
 }
 
@@ -1799,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
                if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
                        continue;
 xmit:
-               tipc_link_xmit_skb(net, skb, dnode, dport);
+               tipc_node_xmit_skb(net, skb, dnode, dport);
        }
        return err ? -EHOSTUNREACH : 0;
 }
@@ -2092,7 +2092,7 @@ restart:
                        }
                        if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
                                             TIPC_CONN_SHUTDOWN))
-                               tipc_link_xmit_skb(net, skb, dnode,
+                               tipc_node_xmit_skb(net, skb, dnode,
                                                   tsk->portid);
                } else {
                        dnode = tsk_peer_node(tsk);
@@ -2102,7 +2102,7 @@ restart:
                                              0, dnode, tsk_own_node(tsk),
                                              tsk_peer_port(tsk),
                                              tsk->portid, TIPC_CONN_SHUTDOWN);
-                       tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+                       tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
                }
                tsk->connected = 0;
                sock->state = SS_DISCONNECTING;
@@ -2164,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)
        }
        bh_unlock_sock(sk);
        if (skb)
-               tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
+               tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
 exit:
        sock_put(sk);
 }