tipc: make name table distributor use new send function
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 17 Jul 2014 00:40:58 +0000 (20:40 -0400)
committerDavid S. Miller <davem@davemloft.net>
Thu, 17 Jul 2014 04:38:18 +0000 (21:38 -0700)
In a previous commit series ("tipc: new unicast transmission code")
we introduced a new message sending function, tipc_link_xmit2(),
and moved the unicast data users over to use that function. We now
let the internal name table distributor do the same.

The interaction between the name distributor and the node/link
layer also becomes significantly simpler, so we can eliminate
the function tipc_link_names_xmit().

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c
net/tipc/link.h
net/tipc/name_distr.c
net/tipc/name_distr.h
net/tipc/node.c

index a235b245f682bb8acbbf94b74334bada8040ff4f..367b0f5886f8d22a46e44524619140f63777f338 100644 (file)
@@ -1032,47 +1032,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
        kfree_skb(buf);
 }
 
-/*
- * tipc_link_names_xmit - send name table entries to new neighbor
- *
- * Send routine for bulk delivery of name table messages when contact
- * with a new neighbor occurs. No link congestion checking is performed
- * because name table messages *must* be delivered. The messages must be
- * small enough not to require fragmentation.
- * Called without any locks held.
- */
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
-{
-       struct tipc_node *n_ptr;
-       struct tipc_link *l_ptr;
-       struct sk_buff *buf;
-       struct sk_buff *temp_buf;
-
-       if (list_empty(message_list))
-               return;
-
-       n_ptr = tipc_node_find(dest);
-       if (n_ptr) {
-               tipc_node_lock(n_ptr);
-               l_ptr = n_ptr->active_links[0];
-               if (l_ptr) {
-                       /* convert circular list to linear list */
-                       ((struct sk_buff *)message_list->prev)->next = NULL;
-                       link_add_chain_to_outqueue(l_ptr,
-                               (struct sk_buff *)message_list->next, 0);
-                       tipc_link_push_queue(l_ptr);
-                       INIT_LIST_HEAD(message_list);
-               }
-               tipc_node_unlock(n_ptr);
-       }
-
-       /* discard the messages if they couldn't be sent */
-       list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
-               list_del((struct list_head *)buf);
-               kfree_skb(buf);
-       }
-}
-
 /*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
index 227ff812089739fafb48438f0f451d3ed151965b..04a59c58d38514b8fdfac94565b4e93dddfb6981 100644 (file)
@@ -228,7 +228,6 @@ void tipc_link_reset(struct tipc_link *l_ptr);
 void tipc_link_reset_list(unsigned int bearer_id);
 int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
 int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
 int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
 int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
 int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
index 8ce730984aa1f0d429b2325d58862e1ef7799d4d..d16f9475fa765e8c457f352c75e617180fc66b74 100644 (file)
@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
 
 void named_cluster_distribute(struct sk_buff *buf)
 {
-       struct sk_buff *buf_copy;
-       struct tipc_node *n_ptr;
-       struct tipc_link *l_ptr;
+       struct sk_buff *obuf;
+       struct tipc_node *node;
+       u32 dnode;
 
        rcu_read_lock();
-       list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
-               tipc_node_lock(n_ptr);
-               l_ptr = n_ptr->active_links[n_ptr->addr & 1];
-               if (l_ptr) {
-                       buf_copy = skb_copy(buf, GFP_ATOMIC);
-                       if (!buf_copy) {
-                               tipc_node_unlock(n_ptr);
-                               break;
-                       }
-                       msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
-                       __tipc_link_xmit(l_ptr, buf_copy);
-               }
-               tipc_node_unlock(n_ptr);
+       list_for_each_entry_rcu(node, &tipc_node_list, list) {
+               dnode = node->addr;
+               if (in_own_node(dnode))
+                       continue;
+               if (!tipc_node_active_links(node))
+                       continue;
+               obuf = skb_copy(buf, GFP_ATOMIC);
+               if (!obuf)
+                       break;
+               msg_set_destnode(buf_msg(obuf), dnode);
+               tipc_link_xmit2(obuf, dnode, dnode);
        }
        rcu_read_unlock();
 
@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
        return buf;
 }
 
-/*
+/**
  * named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
  */
-static void named_distribute(struct list_head *message_list, u32 node,
-                            struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+                            struct publ_list *pls)
 {
        struct publication *publ;
        struct sk_buff *buf = NULL;
        struct distr_item *item = NULL;
-       u32 left = 0;
-       u32 rest = pls->size * ITEM_SIZE;
+       uint dsz = pls->size * ITEM_SIZE;
+       uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+       uint rem = dsz;
+       uint msg_rem = 0;
 
        list_for_each_entry(publ, &pls->list, local_list) {
+               /* Prepare next buffer: */
                if (!buf) {
-                       left = (rest <= max_item_buf) ? rest : max_item_buf;
-                       rest -= left;
-                       buf = named_prepare_buf(PUBLICATION, left, node);
+                       msg_rem = min_t(uint, rem, msg_dsz);
+                       rem -= msg_rem;
+                       buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
                        if (!buf) {
                                pr_warn("Bulk publication failure\n");
                                return;
                        }
                        item = (struct distr_item *)msg_data(buf_msg(buf));
                }
+
+               /* Pack publication into message: */
                publ_to_item(item, publ);
                item++;
-               left -= ITEM_SIZE;
-               if (!left) {
-                       list_add_tail((struct list_head *)buf, message_list);
+               msg_rem -= ITEM_SIZE;
+
+               /* Append full buffer to list: */
+               if (!msg_rem) {
+                       list_add_tail((struct list_head *)buf, msg_list);
                        buf = NULL;
                }
        }
@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(u32 max_item_buf, u32 node)
+void tipc_named_node_up(u32 dnode)
 {
-       LIST_HEAD(message_list);
+       LIST_HEAD(msg_list);
+       struct sk_buff *buf_chain;
 
        read_lock_bh(&tipc_nametbl_lock);
-       named_distribute(&message_list, node, &publ_cluster, max_item_buf);
-       named_distribute(&message_list, node, &publ_zone, max_item_buf);
+       named_distribute(&msg_list, dnode, &publ_cluster);
+       named_distribute(&msg_list, dnode, &publ_zone);
        read_unlock_bh(&tipc_nametbl_lock);
 
-       tipc_link_names_xmit(&message_list, node);
+       /* Convert circular list to linear list and send: */
+       buf_chain = (struct sk_buff *)msg_list.next;
+       ((struct sk_buff *)msg_list.prev)->next = NULL;
+       tipc_link_xmit2(buf_chain, dnode, dnode);
 }
 
 /**
index b2eed4ec1526a34efd8b191e4b952bf9718f7a2e..8afe32b7fc9a0b6a2b9f78657fd13756eae0b465 100644 (file)
@@ -70,7 +70,7 @@ struct distr_item {
 struct sk_buff *tipc_named_publish(struct publication *publ);
 struct sk_buff *tipc_named_withdraw(struct publication *publ);
 void named_cluster_distribute(struct sk_buff *buf);
-void tipc_named_node_up(u32 max_item_buf, u32 node);
+void tipc_named_node_up(u32 dnode);
 void tipc_named_rcv(struct sk_buff *buf);
 void tipc_named_reinit(void);
 
index d959343043fd020628cba615f5cae979d581ccc3..f7069299943f847f7853a3bb9e5cf781e6caf9b8 100644 (file)
@@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
 void tipc_node_unlock(struct tipc_node *node)
 {
        LIST_HEAD(nsub_list);
-       struct tipc_link *link;
-       int pkt_sz = 0;
        u32 addr = 0;
 
        if (likely(!node->action_flags)) {
@@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node)
                node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
        }
        if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
-               link = node->active_links[0];
                node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
-               if (link) {
-                       pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
-                                 ITEM_SIZE;
-                       addr = node->addr;
-               }
+               addr = node->addr;
        }
        spin_unlock_bh(&node->lock);
 
        if (!list_empty(&nsub_list))
                tipc_nodesub_notify(&nsub_list);
-       if (pkt_sz)
-               tipc_named_node_up(pkt_sz, addr);
+       if (addr)
+               tipc_named_node_up(addr);
 }