tipc: convert tipc reference table to use generic rhashtable
[firefly-linux-kernel-4.4.55.git] / net / tipc / socket.c
index 51bddc236a15582b1a1dd0deb0b1a733c1d40070..701f31bbbbfb727202e2cc72e9a5a940c7014bc5 100644 (file)
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include <linux/rhashtable.h>
+#include <linux/jhash.h>
 #include "core.h"
 #include "name_table.h"
 #include "node.h"
 #include "link.h"
-#include <linux/export.h>
 #include "config.h"
 #include "socket.h"
 
-#define SS_LISTENING   -1      /* socket is listening */
-#define SS_READY       -2      /* socket is connectionless */
+#define SS_LISTENING           -1      /* socket is listening */
+#define SS_READY               -2      /* socket is connectionless */
 
-#define CONN_TIMEOUT_DEFAULT  8000     /* default connect timeout = 8s */
-#define CONN_PROBING_INTERVAL 3600000  /* [ms] => 1 h */
-#define TIPC_FWD_MSG         1
-#define TIPC_CONN_OK          0
-#define TIPC_CONN_PROBING     1
+#define CONN_TIMEOUT_DEFAULT   8000    /* default connect timeout = 8s */
+#define CONN_PROBING_INTERVAL  3600000 /* [ms] => 1 h */
+#define TIPC_FWD_MSG           1
+#define TIPC_CONN_OK           0
+#define TIPC_CONN_PROBING      1
+#define TIPC_MAX_PORT          0xffffffff
+#define TIPC_MIN_PORT          1
 
 /**
  * struct tipc_sock - TIPC socket structure
@@ -59,7 +62,7 @@
  * @conn_instance: TIPC instance used when connection was established
  * @published: non-zero if port has one or more associated names
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
- * @ref: unique reference to port in TIPC object registry
+ * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
  * @port_list: adjacent ports in TIPC's global list of ports
  * @publications: list of publications for port
@@ -74,6 +77,8 @@
  * @link_cong: non-zero if owner must sleep because of link congestion
  * @sent_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
+ * @node: hash table node
+ * @rcu: rcu struct for tipc_sock
  */
 struct tipc_sock {
        struct sock sk;
@@ -82,7 +87,7 @@ struct tipc_sock {
        u32 conn_instance;
        int published;
        u32 max_pkt;
-       u32 ref;
+       u32 portid;
        struct tipc_msg phdr;
        struct list_head sock_list;
        struct list_head publications;
@@ -95,6 +100,8 @@ struct tipc_sock {
        bool link_cong;
        uint sent_unacked;
        uint rcv_unacked;
+       struct rhash_head node;
+       struct rcu_head rcu;
 };
 
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -103,16 +110,14 @@ static void tipc_write_space(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
-static void tipc_sk_timeout(unsigned long ref);
+static void tipc_sk_timeout(unsigned long portid);
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
                           struct tipc_name_seq const *seq);
 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
                            struct tipc_name_seq const *seq);
-static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk);
-static void tipc_sk_ref_discard(u32 ref);
-static struct tipc_sock *tipc_sk_get(u32 ref);
-static struct tipc_sock *tipc_sk_get_next(u32 *ref);
-static void tipc_sk_put(struct tipc_sock *tsk);
+static struct tipc_sock *tipc_sk_lookup(u32 portid);
+static int tipc_sk_insert(struct tipc_sock *tsk);
+static void tipc_sk_remove(struct tipc_sock *tsk);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -121,6 +126,14 @@ static const struct proto_ops msg_ops;
 static struct proto tipc_proto;
 static struct proto tipc_proto_kern;
 
+static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
+       [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
+       [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
+       [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
+       [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
+       [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
+};
+
 /*
  * Revised TIPC socket locking policy:
  *
@@ -166,6 +179,9 @@ static struct proto tipc_proto_kern;
  *   - port reference
  */
 
+/* Protects tipc socket hash table mutations */
+static struct rhashtable tipc_sk_rht;
+
 static u32 tsk_peer_node(struct tipc_sock *tsk)
 {
        return msg_destnode(&tsk->phdr);
@@ -236,12 +252,12 @@ static void tsk_advance_rx_queue(struct sock *sk)
  */
 static void tsk_rej_rx_queue(struct sock *sk)
 {
-       struct sk_buff *buf;
+       struct sk_buff *skb;
        u32 dnode;
 
-       while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
-               if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
-                       tipc_link_xmit(buf, dnode, 0);
+       while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
+               if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
+                       tipc_link_xmit_skb(skb, dnode, 0);
        }
 }
 
@@ -297,7 +313,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        struct sock *sk;
        struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       u32 ref;
 
        /* Validate arguments */
        if (unlikely(protocol != 0))
@@ -331,24 +346,22 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                return -ENOMEM;
 
        tsk = tipc_sk(sk);
-       ref = tipc_sk_ref_acquire(tsk);
-       if (!ref) {
-               pr_warn("Socket create failed; reference table exhausted\n");
-               return -ENOMEM;
-       }
        tsk->max_pkt = MAX_PKT_DEFAULT;
-       tsk->ref = ref;
        INIT_LIST_HEAD(&tsk->publications);
        msg = &tsk->phdr;
        tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
                      NAMED_H_SIZE, 0);
-       msg_set_origport(msg, ref);
 
        /* Finish initializing socket data structures */
        sock->ops = ops;
        sock->state = state;
        sock_init_data(sock, sk);
-       k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref);
+       if (tipc_sk_insert(tsk)) {
+               pr_warn("Socket create failed; port numbrer exhausted\n");
+               return -EINVAL;
+       }
+       msg_set_origport(msg, tsk->portid);
+       k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, tsk->portid);
        sk->sk_backlog_rcv = tipc_backlog_rcv;
        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
        sk->sk_data_ready = tipc_data_ready;
@@ -434,6 +447,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
        return ret;
 }
 
+static void tipc_sk_callback(struct rcu_head *head)
+{
+       struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
+
+       sock_put(&tsk->sk);
+}
+
 /**
  * tipc_release - destroy a TIPC socket
  * @sock: socket to destroy
@@ -454,7 +474,7 @@ static int tipc_release(struct socket *sock)
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk;
-       struct sk_buff *buf;
+       struct sk_buff *skb;
        u32 dnode;
 
        /*
@@ -473,34 +493,34 @@ static int tipc_release(struct socket *sock)
         */
        dnode = tsk_peer_node(tsk);
        while (sock->state != SS_DISCONNECTING) {
-               buf = __skb_dequeue(&sk->sk_receive_queue);
-               if (buf == NULL)
+               skb = __skb_dequeue(&sk->sk_receive_queue);
+               if (skb == NULL)
                        break;
-               if (TIPC_SKB_CB(buf)->handle != NULL)
-                       kfree_skb(buf);
+               if (TIPC_SKB_CB(skb)->handle != NULL)
+                       kfree_skb(skb);
                else {
                        if ((sock->state == SS_CONNECTING) ||
                            (sock->state == SS_CONNECTED)) {
                                sock->state = SS_DISCONNECTING;
                                tsk->connected = 0;
-                               tipc_node_remove_conn(dnode, tsk->ref);
+                               tipc_node_remove_conn(dnode, tsk->portid);
                        }
-                       if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
-                               tipc_link_xmit(buf, dnode, 0);
+                       if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
+                               tipc_link_xmit_skb(skb, dnode, 0);
                }
        }
 
        tipc_sk_withdraw(tsk, 0, NULL);
-       tipc_sk_ref_discard(tsk->ref);
        k_cancel_timer(&tsk->timer);
+       tipc_sk_remove(tsk);
        if (tsk->connected) {
-               buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
+               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
                                      SHORT_H_SIZE, 0, dnode, tipc_own_addr,
                                      tsk_peer_port(tsk),
-                                     tsk->ref, TIPC_ERR_NO_PORT);
-               if (buf)
-                       tipc_link_xmit(buf, dnode, tsk->ref);
-               tipc_node_remove_conn(dnode, tsk->ref);
+                                     tsk->portid, TIPC_ERR_NO_PORT);
+               if (skb)
+                       tipc_link_xmit_skb(skb, dnode, tsk->portid);
+               tipc_node_remove_conn(dnode, tsk->portid);
        }
        k_term_timer(&tsk->timer);
 
@@ -510,7 +530,8 @@ static int tipc_release(struct socket *sock)
        /* Reject any messages that accumulated in backlog queue */
        sock->state = SS_DISCONNECTING;
        release_sock(sk);
-       sock_put(sk);
+
+       call_rcu(&tsk->rcu, tipc_sk_callback);
        sock->sk = NULL;
 
        return 0;
@@ -603,7 +624,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
                addr->addr.id.ref = tsk_peer_port(tsk);
                addr->addr.id.node = tsk_peer_node(tsk);
        } else {
-               addr->addr.id.ref = tsk->ref;
+               addr->addr.id.ref = tsk->portid;
                addr->addr.id.node = tipc_own_addr;
        }
 
@@ -692,7 +713,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
  * tipc_sendmcast - send multicast message
  * @sock: socket structure
  * @seq: destination address
- * @iov: message data to send
+ * @msg: message to send
  * @dsz: total length of message data
  * @timeo: timeout to wait for wakeup
  *
@@ -700,11 +721,11 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
  * Returns the number of bytes sent on success, or errno
  */
 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
-                         struct iovec *iov, size_t dsz, long timeo)
+                         struct msghdr *msg, size_t dsz, long timeo)
 {
        struct sock *sk = sock->sk;
        struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
-       struct sk_buff *buf;
+       struct sk_buff_head head;
        uint mtu;
        int rc;
 
@@ -719,12 +740,13 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 
 new_mtu:
        mtu = tipc_bclink_get_mtu();
-       rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
+       __skb_queue_head_init(&head);
+       rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
        if (unlikely(rc < 0))
                return rc;
 
        do {
-               rc = tipc_bclink_xmit(buf);
+               rc = tipc_bclink_xmit(&head);
                if (likely(rc >= 0)) {
                        rc = dsz;
                        break;
@@ -736,7 +758,7 @@ new_mtu:
                tipc_sk(sk)->link_cong = 1;
                rc = tipc_wait_for_sndmsg(sock, &timeo);
                if (rc)
-                       kfree_skb_list(buf);
+                       __skb_queue_purge(&head);
        } while (!rc);
        return rc;
 }
@@ -818,39 +840,6 @@ exit:
        return TIPC_OK;
 }
 
-/**
- * dest_name_check - verify user is permitted to send to specified port name
- * @dest: destination address
- * @m: descriptor for message to be sent
- *
- * Prevents restricted configuration commands from being issued by
- * unauthorized users.
- *
- * Returns 0 if permission is granted, otherwise errno
- */
-static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
-{
-       struct tipc_cfg_msg_hdr hdr;
-
-       if (unlikely(dest->addrtype == TIPC_ADDR_ID))
-               return 0;
-       if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
-               return 0;
-       if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
-               return 0;
-       if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
-               return -EACCES;
-
-       if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
-               return -EMSGSIZE;
-       if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
-               return -EFAULT;
-       if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
-               return -EACCES;
-
-       return 0;
-}
-
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
 {
        struct sock *sk = sock->sk;
@@ -897,13 +886,13 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *mhdr = &tsk->phdr;
-       struct iovec *iov = m->msg_iov;
        u32 dnode, dport;
-       struct sk_buff *buf;
+       struct sk_buff_head head;
+       struct sk_buff *skb;
        struct tipc_name_seq *seq = &dest->addr.nameseq;
        u32 mtu;
        long timeo;
-       int rc = -EINVAL;
+       int rc;
 
        if (unlikely(!dest))
                return -EDESTADDRREQ;
@@ -936,14 +925,11 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
                        tsk->conn_instance = dest->addr.name.name.instance;
                }
        }
-       rc = dest_name_check(dest, m);
-       if (rc)
-               goto exit;
 
        timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 
        if (dest->addrtype == TIPC_ADDR_MCAST) {
-               rc = tipc_sendmcast(sock, seq, iov, dsz, timeo);
+               rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
                goto exit;
        } else if (dest->addrtype == TIPC_ADDR_NAME) {
                u32 type = dest->addr.name.name.type;
@@ -973,14 +959,16 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
        }
 
 new_mtu:
-       mtu = tipc_node_get_mtu(dnode, tsk->ref);
-       rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
+       mtu = tipc_node_get_mtu(dnode, tsk->portid);
+       __skb_queue_head_init(&head);
+       rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
        if (rc < 0)
                goto exit;
 
        do {
-               TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong;
-               rc = tipc_link_xmit(buf, dnode, tsk->ref);
+               skb = skb_peek(&head);
+               TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
+               rc = tipc_link_xmit(&head, dnode, tsk->portid);
                if (likely(rc >= 0)) {
                        if (sock->state != SS_READY)
                                sock->state = SS_CONNECTING;
@@ -994,7 +982,7 @@ new_mtu:
                tsk->link_cong = 1;
                rc = tipc_wait_for_sndmsg(sock, &timeo);
                if (rc)
-                       kfree_skb_list(buf);
+                       __skb_queue_purge(&head);
        } while (!rc);
 exit:
        if (iocb)
@@ -1051,9 +1039,9 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff *buf;
+       struct sk_buff_head head;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-       u32 ref = tsk->ref;
+       u32 portid = tsk->portid;
        int rc = -EINVAL;
        long timeo;
        u32 dnode;
@@ -1086,12 +1074,13 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
 next:
        mtu = tsk->max_pkt;
        send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-       rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
+       __skb_queue_head_init(&head);
+       rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
        if (unlikely(rc < 0))
                goto exit;
        do {
                if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_link_xmit(buf, dnode, ref);
+                       rc = tipc_link_xmit(&head, dnode, portid);
                        if (likely(!rc)) {
                                tsk->sent_unacked++;
                                sent += send;
@@ -1100,7 +1089,7 @@ next:
                                goto next;
                        }
                        if (rc == -EMSGSIZE) {
-                               tsk->max_pkt = tipc_node_get_mtu(dnode, ref);
+                               tsk->max_pkt = tipc_node_get_mtu(dnode, portid);
                                goto next;
                        }
                        if (rc != -ELINKCONG)
@@ -1109,7 +1098,7 @@ next:
                }
                rc = tipc_wait_for_sndpkt(sock, &timeo);
                if (rc)
-                       kfree_skb_list(buf);
+                       __skb_queue_purge(&head);
        } while (!rc);
 exit:
        if (iocb)
@@ -1154,8 +1143,8 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
        tsk->probing_state = TIPC_CONN_OK;
        tsk->connected = 1;
        k_start_timer(&tsk->timer, tsk->probing_interval);
-       tipc_node_add_conn(peer_node, tsk->ref, peer_port);
-       tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref);
+       tipc_node_add_conn(peer_node, tsk->portid, peer_port);
+       tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->portid);
 }
 
 /**
@@ -1254,20 +1243,20 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 
 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 {
-       struct sk_buff *buf = NULL;
+       struct sk_buff *skb = NULL;
        struct tipc_msg *msg;
        u32 peer_port = tsk_peer_port(tsk);
        u32 dnode = tsk_peer_node(tsk);
 
        if (!tsk->connected)
                return;
-       buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
-                             tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
-       if (!buf)
+       skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
+                             tipc_own_addr, peer_port, tsk->portid, TIPC_OK);
+       if (!skb)
                return;
-       msg = buf_msg(buf);
+       msg = buf_msg(skb);
        msg_set_msgcnt(msg, ack);
-       tipc_link_xmit(buf, dnode, msg_link_selector(msg));
+       tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1372,8 +1361,7 @@ restart:
                        sz = buf_len;
                        m->msg_flags |= MSG_TRUNC;
                }
-               res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
-                                             m->msg_iov, sz);
+               res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
                if (res)
                        goto exit;
                res = sz;
@@ -1473,8 +1461,8 @@ restart:
                needed = (buf_len - sz_copied);
                sz_to_copy = (sz <= needed) ? sz : needed;
 
-               res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
-                                             m->msg_iov, sz_to_copy);
+               res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
+                                           m, sz_to_copy);
                if (res)
                        goto exit;
 
@@ -1556,7 +1544,7 @@ static void tipc_data_ready(struct sock *sk)
  * @tsk: TIPC socket
  * @msg: message
  *
- * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise
+ * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
  */
 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
 {
@@ -1577,7 +1565,7 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
                                tsk->connected = 0;
                                /* let timer expire on it's own */
                                tipc_node_remove_conn(tsk_peer_node(tsk),
-                                                     tsk->ref);
+                                                     tsk->portid);
                        }
                        retval = TIPC_OK;
                }
@@ -1723,20 +1711,20 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
 /**
  * tipc_backlog_rcv - handle incoming message from backlog queue
  * @sk: socket
- * @buf: message
+ * @skb: message
  *
  * Caller must hold socket lock, but not port lock.
  *
  * Returns 0
  */
-static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
+static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
        int rc;
        u32 onode;
        struct tipc_sock *tsk = tipc_sk(sk);
-       uint truesize = buf->truesize;
+       uint truesize = skb->truesize;
 
-       rc = filter_rcv(sk, buf);
+       rc = filter_rcv(sk, skb);
 
        if (likely(!rc)) {
                if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
@@ -1744,33 +1732,33 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
                return 0;
        }
 
-       if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
+       if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
                return 0;
 
-       tipc_link_xmit(buf, onode, 0);
+       tipc_link_xmit_skb(skb, onode, 0);
 
        return 0;
 }
 
 /**
  * tipc_sk_rcv - handle incoming message
- * @buf: buffer containing arriving message
+ * @skb: buffer containing arriving message
  * Consumes buffer
  * Returns 0 if success, or errno: -EHOSTUNREACH
  */
-int tipc_sk_rcv(struct sk_buff *buf)
+int tipc_sk_rcv(struct sk_buff *skb)
 {
        struct tipc_sock *tsk;
        struct sock *sk;
-       u32 dport = msg_destport(buf_msg(buf));
+       u32 dport = msg_destport(buf_msg(skb));
        int rc = TIPC_OK;
        uint limit;
        u32 dnode;
 
        /* Validate destination and message */
-       tsk = tipc_sk_get(dport);
+       tsk = tipc_sk_lookup(dport);
        if (unlikely(!tsk)) {
-               rc = tipc_msg_eval(buf, &dnode);
+               rc = tipc_msg_eval(skb, &dnode);
                goto exit;
        }
        sk = &tsk->sk;
@@ -1779,23 +1767,23 @@ int tipc_sk_rcv(struct sk_buff *buf)
        spin_lock_bh(&sk->sk_lock.slock);
 
        if (!sock_owned_by_user(sk)) {
-               rc = filter_rcv(sk, buf);
+               rc = filter_rcv(sk, skb);
        } else {
                if (sk->sk_backlog.len == 0)
                        atomic_set(&tsk->dupl_rcvcnt, 0);
-               limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
-               if (sk_add_backlog(sk, buf, limit))
+               limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
+               if (sk_add_backlog(sk, skb, limit))
                        rc = -TIPC_ERR_OVERLOAD;
        }
        spin_unlock_bh(&sk->sk_lock.slock);
-       tipc_sk_put(tsk);
+       sock_put(sk);
        if (likely(!rc))
                return 0;
 exit:
-       if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
+       if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
                return -EHOSTUNREACH;
 
-       tipc_link_xmit(buf, dnode, 0);
+       tipc_link_xmit_skb(skb, dnode, 0);
        return (rc < 0) ? -EHOSTUNREACH : 0;
 }
 
@@ -2053,7 +2041,7 @@ static int tipc_shutdown(struct socket *sock, int how)
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
-       struct sk_buff *buf;
+       struct sk_buff *skb;
        u32 dnode;
        int res;
 
@@ -2068,27 +2056,27 @@ static int tipc_shutdown(struct socket *sock, int how)
 
 restart:
                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
-               buf = __skb_dequeue(&sk->sk_receive_queue);
-               if (buf) {
-                       if (TIPC_SKB_CB(buf)->handle != NULL) {
-                               kfree_skb(buf);
+               skb = __skb_dequeue(&sk->sk_receive_queue);
+               if (skb) {
+                       if (TIPC_SKB_CB(skb)->handle != NULL) {
+                               kfree_skb(skb);
                                goto restart;
                        }
-                       if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN))
-                               tipc_link_xmit(buf, dnode, tsk->ref);
-                       tipc_node_remove_conn(dnode, tsk->ref);
+                       if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
+                               tipc_link_xmit_skb(skb, dnode, tsk->portid);
+                       tipc_node_remove_conn(dnode, tsk->portid);
                } else {
                        dnode = tsk_peer_node(tsk);
-                       buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
+                       skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                              TIPC_CONN_MSG, SHORT_H_SIZE,
                                              0, dnode, tipc_own_addr,
                                              tsk_peer_port(tsk),
-                                             tsk->ref, TIPC_CONN_SHUTDOWN);
-                       tipc_link_xmit(buf, dnode, tsk->ref);
+                                             tsk->portid, TIPC_CONN_SHUTDOWN);
+                       tipc_link_xmit_skb(skb, dnode, tsk->portid);
                }
                tsk->connected = 0;
                sock->state = SS_DISCONNECTING;
-               tipc_node_remove_conn(dnode, tsk->ref);
+               tipc_node_remove_conn(dnode, tsk->portid);
                /* fall through */
 
        case SS_DISCONNECTING:
@@ -2109,14 +2097,14 @@ restart:
        return res;
 }
 
-static void tipc_sk_timeout(unsigned long ref)
+static void tipc_sk_timeout(unsigned long portid)
 {
        struct tipc_sock *tsk;
        struct sock *sk;
-       struct sk_buff *buf = NULL;
+       struct sk_buff *skb = NULL;
        u32 peer_port, peer_node;
 
-       tsk = tipc_sk_get(ref);
+       tsk = tipc_sk_lookup(portid);
        if (!tsk)
                return;
 
@@ -2131,22 +2119,22 @@ static void tipc_sk_timeout(unsigned long ref)
 
        if (tsk->probing_state == TIPC_CONN_PROBING) {
                /* Previous probe not answered -> self abort */
-               buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
+               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
                                      SHORT_H_SIZE, 0, tipc_own_addr,
-                                     peer_node, ref, peer_port,
+                                     peer_node, portid, peer_port,
                                      TIPC_ERR_NO_PORT);
        } else {
-               buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
+               skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
                                      0, peer_node, tipc_own_addr,
-                                     peer_port, ref, TIPC_OK);
+                                     peer_port, portid, TIPC_OK);
                tsk->probing_state = TIPC_CONN_PROBING;
                k_start_timer(&tsk->timer, tsk->probing_interval);
        }
        bh_unlock_sock(sk);
-       if (buf)
-               tipc_link_xmit(buf, peer_node, ref);
+       if (skb)
+               tipc_link_xmit_skb(skb, peer_node, portid);
 exit:
-       tipc_sk_put(tsk);
+       sock_put(sk);
 }
 
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
@@ -2157,12 +2145,12 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 
        if (tsk->connected)
                return -EINVAL;
-       key = tsk->ref + tsk->pub_count + 1;
-       if (key == tsk->ref)
+       key = tsk->portid + tsk->pub_count + 1;
+       if (key == tsk->portid)
                return -EADDRINUSE;
 
        publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
-                                   scope, tsk->ref, key);
+                                   scope, tsk->portid, key);
        if (unlikely(!publ))
                return -EINVAL;
 
@@ -2213,9 +2201,9 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
                ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
                                    tipc_zone(tipc_own_addr),
                                    tipc_cluster(tipc_own_addr),
-                                   tipc_node(tipc_own_addr), tsk->ref);
+                                   tipc_node(tipc_own_addr), tsk->portid);
        else
-               ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref);
+               ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
 
        if (tsk->connected) {
                u32 dport = tsk_peer_port(tsk);
@@ -2249,13 +2237,15 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
 
 struct sk_buff *tipc_sk_socks_show(void)
 {
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
        struct sk_buff *buf;
        struct tlv_desc *rep_tlv;
        char *pb;
        int pb_len;
        struct tipc_sock *tsk;
        int str_len = 0;
-       u32 ref = 0;
+       int i;
 
        buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
        if (!buf)
@@ -2264,14 +2254,18 @@ struct sk_buff *tipc_sk_socks_show(void)
        pb = TLV_DATA(rep_tlv);
        pb_len = ULTRA_STRING_MAX_LEN;
 
-       tsk = tipc_sk_get_next(&ref);
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               str_len += tipc_sk_show(tsk, pb + str_len,
-                                       pb_len - str_len, 0);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       str_len += tipc_sk_show(tsk, pb + str_len,
+                                               pb_len - str_len, 0);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+               }
        }
+       rcu_read_unlock();
+
        str_len += 1;   /* for "\0" */
        skb_put(buf, TLV_SPACE(str_len));
        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
@@ -2284,255 +2278,91 @@ struct sk_buff *tipc_sk_socks_show(void)
  */
 void tipc_sk_reinit(void)
 {
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
+       struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       u32 ref = 0;
-       struct tipc_sock *tsk = tipc_sk_get_next(&ref);
+       int i;
 
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               msg = &tsk->phdr;
-               msg_set_prevnode(msg, tipc_own_addr);
-               msg_set_orignode(msg, tipc_own_addr);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       msg = &tsk->phdr;
+                       msg_set_prevnode(msg, tipc_own_addr);
+                       msg_set_orignode(msg, tipc_own_addr);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+               }
        }
+       rcu_read_unlock();
 }
 
-/**
- * struct reference - TIPC socket reference entry
- * @tsk: pointer to socket associated with reference entry
- * @ref: reference value for socket (combines instance & array index info)
- */
-struct reference {
-       struct tipc_sock *tsk;
-       u32 ref;
-};
-
-/**
- * struct tipc_ref_table - table of TIPC socket reference entries
- * @entries: pointer to array of reference entries
- * @capacity: array index of first unusable entry
- * @init_point: array index of first uninitialized entry
- * @first_free: array index of first unused socket reference entry
- * @last_free: array index of last unused socket reference entry
- * @index_mask: bitmask for array index portion of reference values
- * @start_mask: initial value for instance value portion of reference values
- */
-struct ref_table {
-       struct reference *entries;
-       u32 capacity;
-       u32 init_point;
-       u32 first_free;
-       u32 last_free;
-       u32 index_mask;
-       u32 start_mask;
-};
-
-/* Socket reference table consists of 2**N entries.
- *
- * State       Socket ptr      Reference
- * -----        ----------      ---------
- * In use        non-NULL       XXXX|own index
- *                             (XXXX changes each time entry is acquired)
- * Free            NULL         YYYY|next free index
- *                             (YYYY is one more than last used XXXX)
- * Uninitialized   NULL         0
- *
- * Entry 0 is not used; this allows index 0 to denote the end of the free list.
- *
- * Note that a reference value of 0 does not necessarily indicate that an
- * entry is uninitialized, since the last entry in the free list could also
- * have a reference value of 0 (although this is unlikely).
- */
-
-static struct ref_table tipc_ref_table;
-
-static DEFINE_RWLOCK(ref_table_lock);
-
-/**
- * tipc_ref_table_init - create reference table for sockets
- */
-int tipc_sk_ref_table_init(u32 req_sz, u32 start)
+static struct tipc_sock *tipc_sk_lookup(u32 portid)
 {
-       struct reference *table;
-       u32 actual_sz;
-
-       /* account for unused entry, then round up size to a power of 2 */
-
-       req_sz++;
-       for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) {
-               /* do nothing */
-       };
-
-       /* allocate table & mark all entries as uninitialized */
-       table = vzalloc(actual_sz * sizeof(struct reference));
-       if (table == NULL)
-               return -ENOMEM;
-
-       tipc_ref_table.entries = table;
-       tipc_ref_table.capacity = req_sz;
-       tipc_ref_table.init_point = 1;
-       tipc_ref_table.first_free = 0;
-       tipc_ref_table.last_free = 0;
-       tipc_ref_table.index_mask = actual_sz - 1;
-       tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
+       struct tipc_sock *tsk;
 
-       return 0;
-}
+       rcu_read_lock();
+       tsk = rhashtable_lookup(&tipc_sk_rht, &portid);
+       if (tsk)
+               sock_hold(&tsk->sk);
+       rcu_read_unlock();
 
-/**
- * tipc_ref_table_stop - destroy reference table for sockets
- */
-void tipc_sk_ref_table_stop(void)
-{
-       if (!tipc_ref_table.entries)
-               return;
-       vfree(tipc_ref_table.entries);
-       tipc_ref_table.entries = NULL;
+       return tsk;
 }
 
-/* tipc_ref_acquire - create reference to a socket
- *
- * Register an socket pointer in the reference table.
- * Returns a unique reference value that is used from then on to retrieve the
- * socket pointer, or to determine if the socket has been deregistered.
- */
-u32 tipc_sk_ref_acquire(struct tipc_sock *tsk)
+static int tipc_sk_insert(struct tipc_sock *tsk)
 {
-       u32 index;
-       u32 index_mask;
-       u32 next_plus_upper;
-       u32 ref = 0;
-       struct reference *entry;
-
-       if (unlikely(!tsk)) {
-               pr_err("Attempt to acquire ref. to non-existent obj\n");
-               return 0;
-       }
-       if (unlikely(!tipc_ref_table.entries)) {
-               pr_err("Ref. table not found in acquisition attempt\n");
-               return 0;
-       }
+       u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
+       u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
 
-       /* Take a free entry, if available; otherwise initialize a new one */
-       write_lock_bh(&ref_table_lock);
-       index = tipc_ref_table.first_free;
-       entry = &tipc_ref_table.entries[index];
-
-       if (likely(index)) {
-               index = tipc_ref_table.first_free;
-               entry = &tipc_ref_table.entries[index];
-               index_mask = tipc_ref_table.index_mask;
-               next_plus_upper = entry->ref;
-               tipc_ref_table.first_free = next_plus_upper & index_mask;
-               ref = (next_plus_upper & ~index_mask) + index;
-               entry->tsk = tsk;
-       } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
-               index = tipc_ref_table.init_point++;
-               entry = &tipc_ref_table.entries[index];
-               ref = tipc_ref_table.start_mask + index;
+       while (remaining--) {
+               portid++;
+               if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
+                       portid = TIPC_MIN_PORT;
+               tsk->portid = portid;
+               sock_hold(&tsk->sk);
+               if (rhashtable_lookup_insert(&tipc_sk_rht, &tsk->node))
+                       return 0;
+               sock_put(&tsk->sk);
        }
 
-       if (ref) {
-               entry->ref = ref;
-               entry->tsk = tsk;
-       }
-       write_unlock_bh(&ref_table_lock);
-       return ref;
+       return -1;
 }
 
-/* tipc_sk_ref_discard - invalidate reference to an socket
- *
- * Disallow future references to an socket and free up the entry for re-use.
- */
-void tipc_sk_ref_discard(u32 ref)
+static void tipc_sk_remove(struct tipc_sock *tsk)
 {
-       struct reference *entry;
-       u32 index;
-       u32 index_mask;
-
-       if (unlikely(!tipc_ref_table.entries)) {
-               pr_err("Ref. table not found during discard attempt\n");
-               return;
-       }
-
-       index_mask = tipc_ref_table.index_mask;
-       index = ref & index_mask;
-       entry = &tipc_ref_table.entries[index];
-
-       write_lock_bh(&ref_table_lock);
+       struct sock *sk = &tsk->sk;
 
-       if (unlikely(!entry->tsk)) {
-               pr_err("Attempt to discard ref. to non-existent socket\n");
-               goto exit;
+       if (rhashtable_remove(&tipc_sk_rht, &tsk->node)) {
+               WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
+               __sock_put(sk);
        }
-       if (unlikely(entry->ref != ref)) {
-               pr_err("Attempt to discard non-existent reference\n");
-               goto exit;
-       }
-
-       /* Mark entry as unused; increment instance part of entry's
-        *   reference to invalidate any subsequent references
-        */
-
-       entry->tsk = NULL;
-       entry->ref = (ref & ~index_mask) + (index_mask + 1);
-
-       /* Append entry to free entry list */
-       if (unlikely(tipc_ref_table.first_free == 0))
-               tipc_ref_table.first_free = index;
-       else
-               tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
-       tipc_ref_table.last_free = index;
-exit:
-       write_unlock_bh(&ref_table_lock);
 }
 
-/* tipc_sk_get - find referenced socket and return pointer to it
- */
-struct tipc_sock *tipc_sk_get(u32 ref)
+int tipc_sk_rht_init(void)
 {
-       struct reference *entry;
-       struct tipc_sock *tsk;
+       struct rhashtable_params rht_params = {
+               .nelem_hint = 192,
+               .head_offset = offsetof(struct tipc_sock, node),
+               .key_offset = offsetof(struct tipc_sock, portid),
+               .key_len = sizeof(u32), /* portid */
+               .hashfn = jhash,
+               .max_shift = 20, /* 1M */
+               .min_shift = 8,  /* 256 */
+               .grow_decision = rht_grow_above_75,
+               .shrink_decision = rht_shrink_below_30,
+       };
 
-       if (unlikely(!tipc_ref_table.entries))
-               return NULL;
-       read_lock_bh(&ref_table_lock);
-       entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
-       tsk = entry->tsk;
-       if (likely(tsk && (entry->ref == ref)))
-               sock_hold(&tsk->sk);
-       else
-               tsk = NULL;
-       read_unlock_bh(&ref_table_lock);
-       return tsk;
+       return rhashtable_init(&tipc_sk_rht, &rht_params);
 }
 
-/* tipc_sk_get_next - lock & return next socket after referenced one
-*/
-struct tipc_sock *tipc_sk_get_next(u32 *ref)
+void tipc_sk_rht_destroy(void)
 {
-       struct reference *entry;
-       struct tipc_sock *tsk = NULL;
-       uint index = *ref & tipc_ref_table.index_mask;
-
-       read_lock_bh(&ref_table_lock);
-       while (++index < tipc_ref_table.capacity) {
-               entry = &tipc_ref_table.entries[index];
-               if (!entry->tsk)
-                       continue;
-               tsk = entry->tsk;
-               sock_hold(&tsk->sk);
-               *ref = entry->ref;
-               break;
-       }
-       read_unlock_bh(&ref_table_lock);
-       return tsk;
-}
+       /* Wait for socket readers to complete */
+       synchronize_net();
 
-static void tipc_sk_put(struct tipc_sock *tsk)
-{
-       sock_put(&tsk->sk);
+       rhashtable_destroy(&tipc_sk_rht);
 }
 
 /**
@@ -2802,3 +2632,240 @@ void tipc_socket_stop(void)
        sock_unregister(tipc_family_ops.family);
        proto_unregister(&tipc_proto);
 }
+
+/* Caller should hold socket lock for the passed tipc socket. */
+static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
+{
+       u32 peer_node;
+       u32 peer_port;
+       struct nlattr *nest;
+
+       peer_node = tsk_peer_node(tsk);
+       peer_port = tsk_peer_port(tsk);
+
+       nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
+
+       if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
+               goto msg_full;
+       if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
+               goto msg_full;
+
+       if (tsk->conn_type != 0) {
+               if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
+                       goto msg_full;
+               if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
+                       goto msg_full;
+               if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
+                       goto msg_full;
+       }
+       nla_nest_end(skb, nest);
+
+       return 0;
+
+msg_full:
+       nla_nest_cancel(skb, nest);
+
+       return -EMSGSIZE;
+}
+
+/* Caller should hold socket lock for the passed tipc socket. */
+static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
+                           struct tipc_sock *tsk)
+{
+       int err;
+       void *hdr;
+       struct nlattr *attrs;
+
+       hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
+                         &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
+       if (!hdr)
+               goto msg_cancel;
+
+       attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
+       if (!attrs)
+               goto genlmsg_cancel;
+       if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
+               goto attr_msg_cancel;
+       if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
+               goto attr_msg_cancel;
+
+       if (tsk->connected) {
+               err = __tipc_nl_add_sk_con(skb, tsk);
+               if (err)
+                       goto attr_msg_cancel;
+       } else if (!list_empty(&tsk->publications)) {
+               if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
+                       goto attr_msg_cancel;
+       }
+       nla_nest_end(skb, attrs);
+       genlmsg_end(skb, hdr);
+
+       return 0;
+
+attr_msg_cancel:
+       nla_nest_cancel(skb, attrs);
+genlmsg_cancel:
+       genlmsg_cancel(skb, hdr);
+msg_cancel:
+       return -EMSGSIZE;
+}
+
+int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
+{
+       int err;
+       struct tipc_sock *tsk;
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
+       u32 prev_portid = cb->args[0];
+       u32 portid = prev_portid;
+       int i;
+
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       portid = tsk->portid;
+                       err = __tipc_nl_add_sk(skb, cb, tsk);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+                       if (err)
+                               break;
+
+                       prev_portid = portid;
+               }
+       }
+       rcu_read_unlock();
+
+       cb->args[0] = prev_portid;
+
+       return skb->len;
+}
+
+/* Caller should hold socket lock for the passed tipc socket. */
+static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
+                                struct netlink_callback *cb,
+                                struct publication *publ)
+{
+       void *hdr;
+       struct nlattr *attrs;
+
+       hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
+                         &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
+       if (!hdr)
+               goto msg_cancel;
+
+       attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
+       if (!attrs)
+               goto genlmsg_cancel;
+
+       if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
+               goto attr_msg_cancel;
+       if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
+               goto attr_msg_cancel;
+       if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
+               goto attr_msg_cancel;
+       if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
+               goto attr_msg_cancel;
+
+       nla_nest_end(skb, attrs);
+       genlmsg_end(skb, hdr);
+
+       return 0;
+
+attr_msg_cancel:
+       nla_nest_cancel(skb, attrs);
+genlmsg_cancel:
+       genlmsg_cancel(skb, hdr);
+msg_cancel:
+       return -EMSGSIZE;
+}
+
+/* Caller should hold socket lock for the passed tipc socket. */
+static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
+                                 struct netlink_callback *cb,
+                                 struct tipc_sock *tsk, u32 *last_publ)
+{
+       int err;
+       struct publication *p;
+
+       if (*last_publ) {
+               list_for_each_entry(p, &tsk->publications, pport_list) {
+                       if (p->key == *last_publ)
+                               break;
+               }
+               if (p->key != *last_publ) {
+                       /* We never set seq or call nl_dump_check_consistent()
+                        * this means that setting prev_seq here will cause the
+                        * consistence check to fail in the netlink callback
+                        * handler. Resulting in the last NLMSG_DONE message
+                        * having the NLM_F_DUMP_INTR flag set.
+                        */
+                       cb->prev_seq = 1;
+                       *last_publ = 0;
+                       return -EPIPE;
+               }
+       } else {
+               p = list_first_entry(&tsk->publications, struct publication,
+                                    pport_list);
+       }
+
+       list_for_each_entry_from(p, &tsk->publications, pport_list) {
+               err = __tipc_nl_add_sk_publ(skb, cb, p);
+               if (err) {
+                       *last_publ = p->key;
+                       return err;
+               }
+       }
+       *last_publ = 0;
+
+       return 0;
+}
+
+int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
+{
+       int err;
+       u32 tsk_portid = cb->args[0];
+       u32 last_publ = cb->args[1];
+       u32 done = cb->args[2];
+       struct tipc_sock *tsk;
+
+       if (!tsk_portid) {
+               struct nlattr **attrs;
+               struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
+
+               err = tipc_nlmsg_parse(cb->nlh, &attrs);
+               if (err)
+                       return err;
+
+               err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
+                                      attrs[TIPC_NLA_SOCK],
+                                      tipc_nl_sock_policy);
+               if (err)
+                       return err;
+
+               if (!sock[TIPC_NLA_SOCK_REF])
+                       return -EINVAL;
+
+               tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
+       }
+
+       if (done)
+               return 0;
+
+       tsk = tipc_sk_lookup(tsk_portid);
+       if (!tsk)
+               return -EINVAL;
+
+       lock_sock(&tsk->sk);
+       err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
+       if (!err)
+               done = 1;
+       release_sock(&tsk->sk);
+       sock_put(&tsk->sk);
+
+       cb->args[0] = tsk_portid;
+       cb->args[1] = last_publ;
+       cb->args[2] = done;
+
+       return skb->len;
+}