tipc: clean up handling of message priorities
authorJon Paul Maloy <jon.maloy@ericsson.com>
Fri, 13 Mar 2015 20:08:11 +0000 (16:08 -0400)
committerDavid S. Miller <davem@davemloft.net>
Sat, 14 Mar 2015 18:38:32 +0000 (14:38 -0400)
Messages transferred by TIPC are assigned an "importance priority", -an
integer value indicating how to treat the message when there is link or
destination socket congestion.

There is no separate header field for this value. Instead, the message
user values have been chosen in ascending order according to perceived
importance, so that the message user field can be used for this.

This is not a good solution. First, we have many more users than the
needed priority levels, so we end up with treating more priority
levels than necessary. Second, the user field cannot always
accurately reflect the priority of the message. E.g., a message
fragment packet should really have the priority of the enveloped
user data message, and not the priority of the MSG_FRAGMENTER user.
Until now, we have been working around this problem in different ways,
but it is now time to implement a consistent way of handling such
priorities, although still within the constraint that we cannot
allocate any more bits in the regular data message header for this.

In this commit, we define a new priority level, TIPC_SYSTEM_IMPORTANCE,
that will be the only one used apart from the four (lower) user data
levels. All non-data messages map down to this priority. Furthermore,
we take some free bits from the MSG_FRAGMENTER header and allocate
them to store the priority of the enveloped message. We then adjust
the functions msg_importance()/msg_set_importance() so that they
read/set the correct header fields depending on user type.

This small protocol change is fully compatible, because the code at
the receiving end of a link currently reads the importance level
only from user data messages, where there is no change.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bcast.c
net/tipc/link.c
net/tipc/msg.c
net/tipc/msg.h

index 17cb0ff5f344eb675ded1852674d661709b8d0e5..5aff0844d4d361efc45fb017ca501b7072caa464 100644 (file)
@@ -383,7 +383,6 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
                __skb_queue_purge(list);
                return -EHOSTUNREACH;
        }
-
        /* Broadcast to all nodes */
        if (likely(bclink)) {
                tipc_bclink_lock(net);
index 7e0036f5a364c3664d12050583fae8ea3df1c2fc..bc49120bfb44fbb69463a302cfbbfe17b75877f5 100644 (file)
@@ -35,6 +35,7 @@
  */
 
 #include "core.h"
+#include "subscr.h"
 #include "link.h"
 #include "bcast.h"
 #include "socket.h"
@@ -305,12 +306,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
        msg_set_session(msg, (tn->random & 0xffff));
        msg_set_bearer_id(msg, b_ptr->identity);
        strcpy((char *)msg_data(msg), if_name);
-
-       l_ptr->priority = b_ptr->priority;
-       tipc_link_set_queue_limits(l_ptr, b_ptr->window);
-
        l_ptr->net_plane = b_ptr->net_plane;
        link_init_max_pkt(l_ptr);
+       l_ptr->priority = b_ptr->priority;
+       tipc_link_set_queue_limits(l_ptr, b_ptr->window);
 
        l_ptr->next_out_no = 1;
        __skb_queue_head_init(&l_ptr->transmq);
@@ -708,7 +707,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
 {
        struct sk_buff *skb = skb_peek(list);
        struct tipc_msg *msg = buf_msg(skb);
-       uint imp = tipc_msg_tot_importance(msg);
+       int imp = msg_importance(msg);
        u32 oport = msg_tot_origport(msg);
 
        if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
@@ -745,7 +744,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 {
        struct tipc_msg *msg = buf_msg(skb_peek(list));
        unsigned int maxwin = link->window;
-       uint imp = tipc_msg_tot_importance(msg);
+       unsigned int imp = msg_importance(msg);
        uint mtu = link->max_pkt;
        uint ack = mod(link->next_in_no - 1);
        uint seqno = link->next_out_no;
@@ -755,7 +754,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
        struct sk_buff_head *backlogq = &link->backlogq;
        struct sk_buff *skb, *tmp;
 
-       /* Match queue limits against msg importance: */
+       /* Match queue limit against msg importance: */
        if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
                return tipc_link_cong(link, list);
 
@@ -1811,25 +1810,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
        l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
 }
 
-void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
+void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
 {
-       l_ptr->window = window;
-
-       /* Data messages from this node, inclusive FIRST_FRAGM */
-       l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
-       l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
-       l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
-       l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
-       /* Transiting data messages,inclusive FIRST_FRAGM */
-       l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
-       l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
-       l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
-       l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
-       l_ptr->queue_limit[CONN_MANAGER] = 1200;
-       l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
-       l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
-       /* FRAGMENT and LAST_FRAGMENT packets */
-       l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
+       int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
+
+       l->window = win;
+       l->queue_limit[TIPC_LOW_IMPORTANCE]      = win / 2;
+       l->queue_limit[TIPC_MEDIUM_IMPORTANCE]   = win;
+       l->queue_limit[TIPC_HIGH_IMPORTANCE]     = win / 2 * 3;
+       l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
+       l->queue_limit[TIPC_SYSTEM_IMPORTANCE]   = max_bulk;
 }
 
 /* tipc_link_find_owner - locate owner node of link by link's name
index 47c8fd8e2fb23c1dd012be8a550f5d0f41f63636..0c6dad8180a0f5acd42750b77c3472eabfd50790 100644 (file)
@@ -272,6 +272,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
                      FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
        msg_set_size(&pkthdr, pktmax);
        msg_set_fragm_no(&pkthdr, pktno);
+       msg_set_importance(&pkthdr, msg_importance(mhdr));
 
        /* Prepare first fragment */
        skb = tipc_buf_acquire(pktmax);
@@ -467,7 +468,6 @@ bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
                      int err)
 {
        struct tipc_msg *msg = buf_msg(buf);
-       uint imp = msg_importance(msg);
        struct tipc_msg ohdr;
        uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
 
@@ -479,9 +479,6 @@ bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
        if (msg_errcode(msg))
                goto exit;
        memcpy(&ohdr, msg, msg_hdr_sz(msg));
-       imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
-       if (msg_isdata(msg))
-               msg_set_importance(msg, imp);
        msg_set_errcode(msg, err);
        msg_set_origport(msg, msg_destport(&ohdr));
        msg_set_destport(msg, msg_origport(&ohdr));
index e5fc5fdb2ea71879e4772f04714f4273422fdf33..bd3969a80dd4edede16c8d7bf1d38fbed217fe85 100644 (file)
@@ -54,6 +54,8 @@ struct plist;
  * - TIPC_HIGH_IMPORTANCE
  * - TIPC_CRITICAL_IMPORTANCE
  */
+#define TIPC_SYSTEM_IMPORTANCE 4
+
 
 /*
  * Payload message types
@@ -63,6 +65,19 @@ struct plist;
 #define TIPC_NAMED_MSG         2
 #define TIPC_DIRECT_MSG                3
 
+/*
+ * Internal message users
+ */
+#define  BCAST_PROTOCOL       5
+#define  MSG_BUNDLER          6
+#define  LINK_PROTOCOL        7
+#define  CONN_MANAGER         8
+#define  CHANGEOVER_PROTOCOL  10
+#define  NAME_DISTRIBUTOR     11
+#define  MSG_FRAGMENTER       12
+#define  LINK_CONFIG          13
+#define  SOCK_WAKEUP          14       /* pseudo user */
+
 /*
  * Message header sizes
  */
@@ -170,16 +185,6 @@ static inline void msg_set_user(struct tipc_msg *m, u32 n)
        msg_set_bits(m, 0, 25, 0xf, n);
 }
 
-static inline u32 msg_importance(struct tipc_msg *m)
-{
-       return msg_bits(m, 0, 25, 0xf);
-}
-
-static inline void msg_set_importance(struct tipc_msg *m, u32 i)
-{
-       msg_set_user(m, i);
-}
-
 static inline u32 msg_hdr_sz(struct tipc_msg *m)
 {
        return msg_bits(m, 0, 21, 0xf) << 2;
@@ -336,6 +341,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)
 /*
  * Words 3-10
  */
+static inline u32 msg_importance(struct tipc_msg *m)
+{
+       if (unlikely(msg_user(m) == MSG_FRAGMENTER))
+               return msg_bits(m, 5, 13, 0x7);
+       if (likely(msg_isdata(m) && !msg_errcode(m)))
+               return msg_user(m);
+       return TIPC_SYSTEM_IMPORTANCE;
+}
+
+static inline void msg_set_importance(struct tipc_msg *m, u32 i)
+{
+       if (unlikely(msg_user(m) == MSG_FRAGMENTER))
+               msg_set_bits(m, 5, 13, 0x7, i);
+       else if (likely(i < TIPC_SYSTEM_IMPORTANCE))
+               msg_set_user(m, i);
+       else
+               pr_warn("Trying to set illegal importance in message\n");
+}
+
 static inline u32 msg_prevnode(struct tipc_msg *m)
 {
        return msg_word(m, 3);
@@ -457,20 +481,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
  * Constants and routines used to read and write TIPC internal message headers
  */
 
-/*
- * Internal message users
- */
-#define  BCAST_PROTOCOL       5
-#define  MSG_BUNDLER          6
-#define  LINK_PROTOCOL        7
-#define  CONN_MANAGER         8
-#define  ROUTE_DISTRIBUTOR    9                /* obsoleted */
-#define  CHANGEOVER_PROTOCOL  10
-#define  NAME_DISTRIBUTOR     11
-#define  MSG_FRAGMENTER       12
-#define  LINK_CONFIG          13
-#define  SOCK_WAKEUP          14       /* pseudo user */
-
 /*
  *  Connection management protocol message types
  */
@@ -743,13 +753,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
        msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
-static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
-{
-       if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
-               return msg_importance(msg_get_wrapped(m));
-       return msg_importance(m);
-}
-
 static inline u32 msg_tot_origport(struct tipc_msg *m)
 {
        if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))