tipc: convert tipc reference table to use generic rhashtable
[firefly-linux-kernel-4.4.55.git] / net / tipc / socket.c
index 4731cad99d1cc8896cc0d6734f6eb48af3135207..701f31bbbbfb727202e2cc72e9a5a940c7014bc5 100644 (file)
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include <linux/rhashtable.h>
+#include <linux/jhash.h>
 #include "core.h"
 #include "name_table.h"
 #include "node.h"
 #include "link.h"
-#include <linux/export.h>
 #include "config.h"
 #include "socket.h"
 
-#define SS_LISTENING   -1      /* socket is listening */
-#define SS_READY       -2      /* socket is connectionless */
+#define SS_LISTENING           -1      /* socket is listening */
+#define SS_READY               -2      /* socket is connectionless */
 
-#define CONN_TIMEOUT_DEFAULT  8000     /* default connect timeout = 8s */
-#define CONN_PROBING_INTERVAL 3600000  /* [ms] => 1 h */
-#define TIPC_FWD_MSG         1
-#define TIPC_CONN_OK          0
-#define TIPC_CONN_PROBING     1
+#define CONN_TIMEOUT_DEFAULT   8000    /* default connect timeout = 8s */
+#define CONN_PROBING_INTERVAL  3600000 /* [ms] => 1 h */
+#define TIPC_FWD_MSG           1
+#define TIPC_CONN_OK           0
+#define TIPC_CONN_PROBING      1
+#define TIPC_MAX_PORT          0xffffffff
+#define TIPC_MIN_PORT          1
 
 /**
  * struct tipc_sock - TIPC socket structure
@@ -59,7 +62,7 @@
  * @conn_instance: TIPC instance used when connection was established
  * @published: non-zero if port has one or more associated names
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
- * @ref: unique reference to port in TIPC object registry
+ * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
  * @port_list: adjacent ports in TIPC's global list of ports
  * @publications: list of publications for port
@@ -74,6 +77,8 @@
  * @link_cong: non-zero if owner must sleep because of link congestion
  * @sent_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
+ * @node: hash table node
+ * @rcu: rcu struct for tipc_sock
  */
 struct tipc_sock {
        struct sock sk;
@@ -82,7 +87,7 @@ struct tipc_sock {
        u32 conn_instance;
        int published;
        u32 max_pkt;
-       u32 ref;
+       u32 portid;
        struct tipc_msg phdr;
        struct list_head sock_list;
        struct list_head publications;
@@ -95,6 +100,8 @@ struct tipc_sock {
        bool link_cong;
        uint sent_unacked;
        uint rcv_unacked;
+       struct rhash_head node;
+       struct rcu_head rcu;
 };
 
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -103,16 +110,14 @@ static void tipc_write_space(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
-static void tipc_sk_timeout(unsigned long ref);
+static void tipc_sk_timeout(unsigned long portid);
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
                           struct tipc_name_seq const *seq);
 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
                            struct tipc_name_seq const *seq);
-static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk);
-static void tipc_sk_ref_discard(u32 ref);
-static struct tipc_sock *tipc_sk_get(u32 ref);
-static struct tipc_sock *tipc_sk_get_next(u32 *ref);
-static void tipc_sk_put(struct tipc_sock *tsk);
+static struct tipc_sock *tipc_sk_lookup(u32 portid);
+static int tipc_sk_insert(struct tipc_sock *tsk);
+static void tipc_sk_remove(struct tipc_sock *tsk);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -174,6 +179,9 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
  *   - port reference
  */
 
+/* Protects tipc socket hash table mutations */
+static struct rhashtable tipc_sk_rht;
+
 static u32 tsk_peer_node(struct tipc_sock *tsk)
 {
        return msg_destnode(&tsk->phdr);
@@ -305,7 +313,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        struct sock *sk;
        struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       u32 ref;
 
        /* Validate arguments */
        if (unlikely(protocol != 0))
@@ -339,24 +346,22 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                return -ENOMEM;
 
        tsk = tipc_sk(sk);
-       ref = tipc_sk_ref_acquire(tsk);
-       if (!ref) {
-               pr_warn("Socket create failed; reference table exhausted\n");
-               return -ENOMEM;
-       }
        tsk->max_pkt = MAX_PKT_DEFAULT;
-       tsk->ref = ref;
        INIT_LIST_HEAD(&tsk->publications);
        msg = &tsk->phdr;
        tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
                      NAMED_H_SIZE, 0);
-       msg_set_origport(msg, ref);
 
        /* Finish initializing socket data structures */
        sock->ops = ops;
        sock->state = state;
        sock_init_data(sock, sk);
-       k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref);
+       if (tipc_sk_insert(tsk)) {
+               pr_warn("Socket create failed; port numbrer exhausted\n");
+               return -EINVAL;
+       }
+       msg_set_origport(msg, tsk->portid);
+       k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, tsk->portid);
        sk->sk_backlog_rcv = tipc_backlog_rcv;
        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
        sk->sk_data_ready = tipc_data_ready;
@@ -442,6 +447,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
        return ret;
 }
 
+static void tipc_sk_callback(struct rcu_head *head)
+{
+       struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
+
+       sock_put(&tsk->sk);
+}
+
 /**
  * tipc_release - destroy a TIPC socket
  * @sock: socket to destroy
@@ -491,7 +503,7 @@ static int tipc_release(struct socket *sock)
                            (sock->state == SS_CONNECTED)) {
                                sock->state = SS_DISCONNECTING;
                                tsk->connected = 0;
-                               tipc_node_remove_conn(dnode, tsk->ref);
+                               tipc_node_remove_conn(dnode, tsk->portid);
                        }
                        if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
                                tipc_link_xmit_skb(skb, dnode, 0);
@@ -499,16 +511,16 @@ static int tipc_release(struct socket *sock)
        }
 
        tipc_sk_withdraw(tsk, 0, NULL);
-       tipc_sk_ref_discard(tsk->ref);
        k_cancel_timer(&tsk->timer);
+       tipc_sk_remove(tsk);
        if (tsk->connected) {
                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
                                      SHORT_H_SIZE, 0, dnode, tipc_own_addr,
                                      tsk_peer_port(tsk),
-                                     tsk->ref, TIPC_ERR_NO_PORT);
+                                     tsk->portid, TIPC_ERR_NO_PORT);
                if (skb)
-                       tipc_link_xmit_skb(skb, dnode, tsk->ref);
-               tipc_node_remove_conn(dnode, tsk->ref);
+                       tipc_link_xmit_skb(skb, dnode, tsk->portid);
+               tipc_node_remove_conn(dnode, tsk->portid);
        }
        k_term_timer(&tsk->timer);
 
@@ -518,7 +530,8 @@ static int tipc_release(struct socket *sock)
        /* Reject any messages that accumulated in backlog queue */
        sock->state = SS_DISCONNECTING;
        release_sock(sk);
-       sock_put(sk);
+
+       call_rcu(&tsk->rcu, tipc_sk_callback);
        sock->sk = NULL;
 
        return 0;
@@ -611,7 +624,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
                addr->addr.id.ref = tsk_peer_port(tsk);
                addr->addr.id.node = tsk_peer_node(tsk);
        } else {
-               addr->addr.id.ref = tsk->ref;
+               addr->addr.id.ref = tsk->portid;
                addr->addr.id.node = tipc_own_addr;
        }
 
@@ -946,7 +959,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
        }
 
 new_mtu:
-       mtu = tipc_node_get_mtu(dnode, tsk->ref);
+       mtu = tipc_node_get_mtu(dnode, tsk->portid);
        __skb_queue_head_init(&head);
        rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
        if (rc < 0)
@@ -955,7 +968,7 @@ new_mtu:
        do {
                skb = skb_peek(&head);
                TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_link_xmit(&head, dnode, tsk->ref);
+               rc = tipc_link_xmit(&head, dnode, tsk->portid);
                if (likely(rc >= 0)) {
                        if (sock->state != SS_READY)
                                sock->state = SS_CONNECTING;
@@ -1028,7 +1041,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
        struct tipc_msg *mhdr = &tsk->phdr;
        struct sk_buff_head head;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-       u32 ref = tsk->ref;
+       u32 portid = tsk->portid;
        int rc = -EINVAL;
        long timeo;
        u32 dnode;
@@ -1067,7 +1080,7 @@ next:
                goto exit;
        do {
                if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_link_xmit(&head, dnode, ref);
+                       rc = tipc_link_xmit(&head, dnode, portid);
                        if (likely(!rc)) {
                                tsk->sent_unacked++;
                                sent += send;
@@ -1076,7 +1089,7 @@ next:
                                goto next;
                        }
                        if (rc == -EMSGSIZE) {
-                               tsk->max_pkt = tipc_node_get_mtu(dnode, ref);
+                               tsk->max_pkt = tipc_node_get_mtu(dnode, portid);
                                goto next;
                        }
                        if (rc != -ELINKCONG)
@@ -1130,8 +1143,8 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
        tsk->probing_state = TIPC_CONN_OK;
        tsk->connected = 1;
        k_start_timer(&tsk->timer, tsk->probing_interval);
-       tipc_node_add_conn(peer_node, tsk->ref, peer_port);
-       tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref);
+       tipc_node_add_conn(peer_node, tsk->portid, peer_port);
+       tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->portid);
 }
 
 /**
@@ -1238,7 +1251,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
        if (!tsk->connected)
                return;
        skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
-                             tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
+                             tipc_own_addr, peer_port, tsk->portid, TIPC_OK);
        if (!skb)
                return;
        msg = buf_msg(skb);
@@ -1552,7 +1565,7 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
                                tsk->connected = 0;
                                /* let timer expire on it's own */
                                tipc_node_remove_conn(tsk_peer_node(tsk),
-                                                     tsk->ref);
+                                                     tsk->portid);
                        }
                        retval = TIPC_OK;
                }
@@ -1743,7 +1756,7 @@ int tipc_sk_rcv(struct sk_buff *skb)
        u32 dnode;
 
        /* Validate destination and message */
-       tsk = tipc_sk_get(dport);
+       tsk = tipc_sk_lookup(dport);
        if (unlikely(!tsk)) {
                rc = tipc_msg_eval(skb, &dnode);
                goto exit;
@@ -1763,7 +1776,7 @@ int tipc_sk_rcv(struct sk_buff *skb)
                        rc = -TIPC_ERR_OVERLOAD;
        }
        spin_unlock_bh(&sk->sk_lock.slock);
-       tipc_sk_put(tsk);
+       sock_put(sk);
        if (likely(!rc))
                return 0;
 exit:
@@ -2050,20 +2063,20 @@ restart:
                                goto restart;
                        }
                        if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
-                               tipc_link_xmit_skb(skb, dnode, tsk->ref);
-                       tipc_node_remove_conn(dnode, tsk->ref);
+                               tipc_link_xmit_skb(skb, dnode, tsk->portid);
+                       tipc_node_remove_conn(dnode, tsk->portid);
                } else {
                        dnode = tsk_peer_node(tsk);
                        skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                              TIPC_CONN_MSG, SHORT_H_SIZE,
                                              0, dnode, tipc_own_addr,
                                              tsk_peer_port(tsk),
-                                             tsk->ref, TIPC_CONN_SHUTDOWN);
-                       tipc_link_xmit_skb(skb, dnode, tsk->ref);
+                                             tsk->portid, TIPC_CONN_SHUTDOWN);
+                       tipc_link_xmit_skb(skb, dnode, tsk->portid);
                }
                tsk->connected = 0;
                sock->state = SS_DISCONNECTING;
-               tipc_node_remove_conn(dnode, tsk->ref);
+               tipc_node_remove_conn(dnode, tsk->portid);
                /* fall through */
 
        case SS_DISCONNECTING:
@@ -2084,14 +2097,14 @@ restart:
        return res;
 }
 
-static void tipc_sk_timeout(unsigned long ref)
+static void tipc_sk_timeout(unsigned long portid)
 {
        struct tipc_sock *tsk;
        struct sock *sk;
        struct sk_buff *skb = NULL;
        u32 peer_port, peer_node;
 
-       tsk = tipc_sk_get(ref);
+       tsk = tipc_sk_lookup(portid);
        if (!tsk)
                return;
 
@@ -2108,20 +2121,20 @@ static void tipc_sk_timeout(unsigned long ref)
                /* Previous probe not answered -> self abort */
                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
                                      SHORT_H_SIZE, 0, tipc_own_addr,
-                                     peer_node, ref, peer_port,
+                                     peer_node, portid, peer_port,
                                      TIPC_ERR_NO_PORT);
        } else {
                skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
                                      0, peer_node, tipc_own_addr,
-                                     peer_port, ref, TIPC_OK);
+                                     peer_port, portid, TIPC_OK);
                tsk->probing_state = TIPC_CONN_PROBING;
                k_start_timer(&tsk->timer, tsk->probing_interval);
        }
        bh_unlock_sock(sk);
        if (skb)
-               tipc_link_xmit_skb(skb, peer_node, ref);
+               tipc_link_xmit_skb(skb, peer_node, portid);
 exit:
-       tipc_sk_put(tsk);
+       sock_put(sk);
 }
 
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
@@ -2132,12 +2145,12 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 
        if (tsk->connected)
                return -EINVAL;
-       key = tsk->ref + tsk->pub_count + 1;
-       if (key == tsk->ref)
+       key = tsk->portid + tsk->pub_count + 1;
+       if (key == tsk->portid)
                return -EADDRINUSE;
 
        publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
-                                   scope, tsk->ref, key);
+                                   scope, tsk->portid, key);
        if (unlikely(!publ))
                return -EINVAL;
 
@@ -2188,9 +2201,9 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
                ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
                                    tipc_zone(tipc_own_addr),
                                    tipc_cluster(tipc_own_addr),
-                                   tipc_node(tipc_own_addr), tsk->ref);
+                                   tipc_node(tipc_own_addr), tsk->portid);
        else
-               ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref);
+               ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
 
        if (tsk->connected) {
                u32 dport = tsk_peer_port(tsk);
@@ -2224,13 +2237,15 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
 
 struct sk_buff *tipc_sk_socks_show(void)
 {
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
        struct sk_buff *buf;
        struct tlv_desc *rep_tlv;
        char *pb;
        int pb_len;
        struct tipc_sock *tsk;
        int str_len = 0;
-       u32 ref = 0;
+       int i;
 
        buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
        if (!buf)
@@ -2239,14 +2254,18 @@ struct sk_buff *tipc_sk_socks_show(void)
        pb = TLV_DATA(rep_tlv);
        pb_len = ULTRA_STRING_MAX_LEN;
 
-       tsk = tipc_sk_get_next(&ref);
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               str_len += tipc_sk_show(tsk, pb + str_len,
-                                       pb_len - str_len, 0);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       str_len += tipc_sk_show(tsk, pb + str_len,
+                                               pb_len - str_len, 0);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+               }
        }
+       rcu_read_unlock();
+
        str_len += 1;   /* for "\0" */
        skb_put(buf, TLV_SPACE(str_len));
        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
@@ -2259,255 +2278,91 @@ struct sk_buff *tipc_sk_socks_show(void)
  */
 void tipc_sk_reinit(void)
 {
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
+       struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       u32 ref = 0;
-       struct tipc_sock *tsk = tipc_sk_get_next(&ref);
+       int i;
 
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               msg = &tsk->phdr;
-               msg_set_prevnode(msg, tipc_own_addr);
-               msg_set_orignode(msg, tipc_own_addr);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       msg = &tsk->phdr;
+                       msg_set_prevnode(msg, tipc_own_addr);
+                       msg_set_orignode(msg, tipc_own_addr);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+               }
        }
+       rcu_read_unlock();
 }
 
-/**
- * struct reference - TIPC socket reference entry
- * @tsk: pointer to socket associated with reference entry
- * @ref: reference value for socket (combines instance & array index info)
- */
-struct reference {
-       struct tipc_sock *tsk;
-       u32 ref;
-};
-
-/**
- * struct tipc_ref_table - table of TIPC socket reference entries
- * @entries: pointer to array of reference entries
- * @capacity: array index of first unusable entry
- * @init_point: array index of first uninitialized entry
- * @first_free: array index of first unused socket reference entry
- * @last_free: array index of last unused socket reference entry
- * @index_mask: bitmask for array index portion of reference values
- * @start_mask: initial value for instance value portion of reference values
- */
-struct ref_table {
-       struct reference *entries;
-       u32 capacity;
-       u32 init_point;
-       u32 first_free;
-       u32 last_free;
-       u32 index_mask;
-       u32 start_mask;
-};
-
-/* Socket reference table consists of 2**N entries.
- *
- * State       Socket ptr      Reference
- * -----        ----------      ---------
- * In use        non-NULL       XXXX|own index
- *                             (XXXX changes each time entry is acquired)
- * Free            NULL         YYYY|next free index
- *                             (YYYY is one more than last used XXXX)
- * Uninitialized   NULL         0
- *
- * Entry 0 is not used; this allows index 0 to denote the end of the free list.
- *
- * Note that a reference value of 0 does not necessarily indicate that an
- * entry is uninitialized, since the last entry in the free list could also
- * have a reference value of 0 (although this is unlikely).
- */
-
-static struct ref_table tipc_ref_table;
-
-static DEFINE_RWLOCK(ref_table_lock);
-
-/**
- * tipc_ref_table_init - create reference table for sockets
- */
-int tipc_sk_ref_table_init(u32 req_sz, u32 start)
+static struct tipc_sock *tipc_sk_lookup(u32 portid)
 {
-       struct reference *table;
-       u32 actual_sz;
-
-       /* account for unused entry, then round up size to a power of 2 */
-
-       req_sz++;
-       for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) {
-               /* do nothing */
-       };
-
-       /* allocate table & mark all entries as uninitialized */
-       table = vzalloc(actual_sz * sizeof(struct reference));
-       if (table == NULL)
-               return -ENOMEM;
-
-       tipc_ref_table.entries = table;
-       tipc_ref_table.capacity = req_sz;
-       tipc_ref_table.init_point = 1;
-       tipc_ref_table.first_free = 0;
-       tipc_ref_table.last_free = 0;
-       tipc_ref_table.index_mask = actual_sz - 1;
-       tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
+       struct tipc_sock *tsk;
 
-       return 0;
-}
+       rcu_read_lock();
+       tsk = rhashtable_lookup(&tipc_sk_rht, &portid);
+       if (tsk)
+               sock_hold(&tsk->sk);
+       rcu_read_unlock();
 
-/**
- * tipc_ref_table_stop - destroy reference table for sockets
- */
-void tipc_sk_ref_table_stop(void)
-{
-       if (!tipc_ref_table.entries)
-               return;
-       vfree(tipc_ref_table.entries);
-       tipc_ref_table.entries = NULL;
+       return tsk;
 }
 
-/* tipc_ref_acquire - create reference to a socket
- *
- * Register an socket pointer in the reference table.
- * Returns a unique reference value that is used from then on to retrieve the
- * socket pointer, or to determine if the socket has been deregistered.
- */
-u32 tipc_sk_ref_acquire(struct tipc_sock *tsk)
+static int tipc_sk_insert(struct tipc_sock *tsk)
 {
-       u32 index;
-       u32 index_mask;
-       u32 next_plus_upper;
-       u32 ref = 0;
-       struct reference *entry;
-
-       if (unlikely(!tsk)) {
-               pr_err("Attempt to acquire ref. to non-existent obj\n");
-               return 0;
-       }
-       if (unlikely(!tipc_ref_table.entries)) {
-               pr_err("Ref. table not found in acquisition attempt\n");
-               return 0;
-       }
-
-       /* Take a free entry, if available; otherwise initialize a new one */
-       write_lock_bh(&ref_table_lock);
-       index = tipc_ref_table.first_free;
-       entry = &tipc_ref_table.entries[index];
+       u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
+       u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
 
-       if (likely(index)) {
-               index = tipc_ref_table.first_free;
-               entry = &tipc_ref_table.entries[index];
-               index_mask = tipc_ref_table.index_mask;
-               next_plus_upper = entry->ref;
-               tipc_ref_table.first_free = next_plus_upper & index_mask;
-               ref = (next_plus_upper & ~index_mask) + index;
-               entry->tsk = tsk;
-       } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
-               index = tipc_ref_table.init_point++;
-               entry = &tipc_ref_table.entries[index];
-               ref = tipc_ref_table.start_mask + index;
+       while (remaining--) {
+               portid++;
+               if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
+                       portid = TIPC_MIN_PORT;
+               tsk->portid = portid;
+               sock_hold(&tsk->sk);
+               if (rhashtable_lookup_insert(&tipc_sk_rht, &tsk->node))
+                       return 0;
+               sock_put(&tsk->sk);
        }
 
-       if (ref) {
-               entry->ref = ref;
-               entry->tsk = tsk;
-       }
-       write_unlock_bh(&ref_table_lock);
-       return ref;
+       return -1;
 }
 
-/* tipc_sk_ref_discard - invalidate reference to an socket
- *
- * Disallow future references to an socket and free up the entry for re-use.
- */
-void tipc_sk_ref_discard(u32 ref)
+static void tipc_sk_remove(struct tipc_sock *tsk)
 {
-       struct reference *entry;
-       u32 index;
-       u32 index_mask;
-
-       if (unlikely(!tipc_ref_table.entries)) {
-               pr_err("Ref. table not found during discard attempt\n");
-               return;
-       }
-
-       index_mask = tipc_ref_table.index_mask;
-       index = ref & index_mask;
-       entry = &tipc_ref_table.entries[index];
-
-       write_lock_bh(&ref_table_lock);
+       struct sock *sk = &tsk->sk;
 
-       if (unlikely(!entry->tsk)) {
-               pr_err("Attempt to discard ref. to non-existent socket\n");
-               goto exit;
-       }
-       if (unlikely(entry->ref != ref)) {
-               pr_err("Attempt to discard non-existent reference\n");
-               goto exit;
+       if (rhashtable_remove(&tipc_sk_rht, &tsk->node)) {
+               WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
+               __sock_put(sk);
        }
-
-       /* Mark entry as unused; increment instance part of entry's
-        *   reference to invalidate any subsequent references
-        */
-
-       entry->tsk = NULL;
-       entry->ref = (ref & ~index_mask) + (index_mask + 1);
-
-       /* Append entry to free entry list */
-       if (unlikely(tipc_ref_table.first_free == 0))
-               tipc_ref_table.first_free = index;
-       else
-               tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
-       tipc_ref_table.last_free = index;
-exit:
-       write_unlock_bh(&ref_table_lock);
 }
 
-/* tipc_sk_get - find referenced socket and return pointer to it
- */
-struct tipc_sock *tipc_sk_get(u32 ref)
+int tipc_sk_rht_init(void)
 {
-       struct reference *entry;
-       struct tipc_sock *tsk;
+       struct rhashtable_params rht_params = {
+               .nelem_hint = 192,
+               .head_offset = offsetof(struct tipc_sock, node),
+               .key_offset = offsetof(struct tipc_sock, portid),
+               .key_len = sizeof(u32), /* portid */
+               .hashfn = jhash,
+               .max_shift = 20, /* 1M */
+               .min_shift = 8,  /* 256 */
+               .grow_decision = rht_grow_above_75,
+               .shrink_decision = rht_shrink_below_30,
+       };
 
-       if (unlikely(!tipc_ref_table.entries))
-               return NULL;
-       read_lock_bh(&ref_table_lock);
-       entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
-       tsk = entry->tsk;
-       if (likely(tsk && (entry->ref == ref)))
-               sock_hold(&tsk->sk);
-       else
-               tsk = NULL;
-       read_unlock_bh(&ref_table_lock);
-       return tsk;
+       return rhashtable_init(&tipc_sk_rht, &rht_params);
 }
 
-/* tipc_sk_get_next - lock & return next socket after referenced one
-*/
-struct tipc_sock *tipc_sk_get_next(u32 *ref)
+void tipc_sk_rht_destroy(void)
 {
-       struct reference *entry;
-       struct tipc_sock *tsk = NULL;
-       uint index = *ref & tipc_ref_table.index_mask;
+       /* Wait for socket readers to complete */
+       synchronize_net();
 
-       read_lock_bh(&ref_table_lock);
-       while (++index < tipc_ref_table.capacity) {
-               entry = &tipc_ref_table.entries[index];
-               if (!entry->tsk)
-                       continue;
-               tsk = entry->tsk;
-               sock_hold(&tsk->sk);
-               *ref = entry->ref;
-               break;
-       }
-       read_unlock_bh(&ref_table_lock);
-       return tsk;
-}
-
-static void tipc_sk_put(struct tipc_sock *tsk)
-{
-       sock_put(&tsk->sk);
+       rhashtable_destroy(&tipc_sk_rht);
 }
 
 /**
@@ -2829,7 +2684,7 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
        attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
        if (!attrs)
                goto genlmsg_cancel;
-       if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref))
+       if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
                goto attr_msg_cancel;
        if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
                goto attr_msg_cancel;
@@ -2859,22 +2714,29 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
        int err;
        struct tipc_sock *tsk;
-       u32 prev_ref = cb->args[0];
-       u32 ref = prev_ref;
-
-       tsk = tipc_sk_get_next(&ref);
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               err = __tipc_nl_add_sk(skb, cb, tsk);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
-               if (err)
-                       break;
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
+       u32 prev_portid = cb->args[0];
+       u32 portid = prev_portid;
+       int i;
 
-               prev_ref = ref;
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       portid = tsk->portid;
+                       err = __tipc_nl_add_sk(skb, cb, tsk);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+                       if (err)
+                               break;
+
+                       prev_portid = portid;
+               }
        }
+       rcu_read_unlock();
 
-       cb->args[0] = prev_ref;
+       cb->args[0] = prev_portid;
 
        return skb->len;
 }
@@ -2962,12 +2824,12 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
        int err;
-       u32 tsk_ref = cb->args[0];
+       u32 tsk_portid = cb->args[0];
        u32 last_publ = cb->args[1];
        u32 done = cb->args[2];
        struct tipc_sock *tsk;
 
-       if (!tsk_ref) {
+       if (!tsk_portid) {
                struct nlattr **attrs;
                struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
 
@@ -2984,13 +2846,13 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
                if (!sock[TIPC_NLA_SOCK_REF])
                        return -EINVAL;
 
-               tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
+               tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
        }
 
        if (done)
                return 0;
 
-       tsk = tipc_sk_get(tsk_ref);
+       tsk = tipc_sk_lookup(tsk_portid);
        if (!tsk)
                return -EINVAL;
 
@@ -2999,9 +2861,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
        if (!err)
                done = 1;
        release_sock(&tsk->sk);
-       tipc_sk_put(tsk);
+       sock_put(&tsk->sk);
 
-       cb->args[0] = tsk_ref;
+       cb->args[0] = tsk_portid;
        cb->args[1] = last_publ;
        cb->args[2] = done;